Skip to content

Statistics (tit.stats)

Statistical analysis utilities for TI-Toolbox.

This package includes permutation testing, atlas-based posthoc analyses, and reporting/visualization helpers.

atlas_overlap_analysis

atlas_overlap_analysis(sig_mask, atlas_files, data_dir, reference_img=None, verbose=True)

Analyze overlap between significant voxels and atlas regions

Parameters:

sig_mask : ndarray (x, y, z) Binary mask of significant voxels atlas_files : list of str List of atlas file names data_dir : str Directory containing atlas files reference_img : nibabel image, optional Reference image for resampling verbose : bool Print progress information

Returns:

results : dict Dictionary mapping atlas names to DataFrames of region overlap statistics

Source code in tit/stats/atlas_utils.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def atlas_overlap_analysis(sig_mask, atlas_files, data_dir, reference_img=None, verbose=True):
    """
    Analyze overlap between significant voxels and atlas regions

    Parameters:
    -----------
    sig_mask : ndarray (x, y, z)
        Binary mask of significant voxels
    atlas_files : list of str
        List of atlas file names
    data_dir : str
        Directory containing atlas files
    reference_img : nibabel image, optional
        Reference image for resampling
    verbose : bool
        Print progress information

    Returns:
    --------
    results : dict
        Dictionary mapping atlas names to DataFrames of region overlap statistics
    """
    if verbose:
        print("\n" + "="*60)
        print("ATLAS OVERLAP ANALYSIS")
        print("="*60)

    results = {}

    for atlas_file in atlas_files:
        atlas_path = os.path.join(data_dir, atlas_file)
        if not os.path.exists(atlas_path):
            if verbose:
                print(f"Warning: Atlas file not found - {atlas_file}")
            continue

        if verbose:
            print(f"\n--- {atlas_file} ---")
        atlas_img = nib.load(atlas_path)

        # Check dimensions and resample if needed
        if reference_img is not None:
            atlas_data = check_and_resample_atlas(atlas_img, reference_img, atlas_file, verbose)
        else:
            atlas_data = atlas_img.get_fdata().astype(int)

        # Get unique regions (excluding 0 = background)
        regions = np.unique(atlas_data[atlas_data > 0])

        region_counts = []
        for region_id in regions:
            region_mask = (atlas_data == region_id)
            overlap = np.sum(sig_mask & region_mask)

            if overlap > 0:
                region_counts.append({
                    'region_id': int(region_id),
                    'overlap_voxels': int(overlap),
                    'region_size': int(np.sum(region_mask))
                })

        # Sort by overlap count
        region_counts = sorted(region_counts, key=lambda x: x['overlap_voxels'], reverse=True)

        if verbose:
            print(f"\nTop regions by significant voxel count:")
            for i, r in enumerate(region_counts[:15], 1):
                pct = 100 * r['overlap_voxels'] / r['region_size']
                print(f"{i:2d}. Region {r['region_id']:3d}: {r['overlap_voxels']:4d} sig. voxels "
                      f"({pct:.1f}% of region)")

        results[atlas_file] = region_counts

    return results

cluster_analysis

cluster_analysis(sig_mask, affine, verbose=True)

Perform cluster analysis on significant voxels

Parameters:

sig_mask : ndarray (x, y, z) Binary mask of significant voxels affine : ndarray Affine transformation matrix verbose : bool Print progress information

Returns:

clusters : list of dict Cluster information including size, center of mass in voxel and MNI coordinates

Source code in tit/stats/stats_utils.py
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
def cluster_analysis(sig_mask, affine, verbose=True):
    """
    Perform cluster analysis on significant voxels

    Parameters:
    -----------
    sig_mask : ndarray (x, y, z)
        Binary mask of significant voxels
    affine : ndarray
        Affine transformation matrix
    verbose : bool
        Print progress information

    Returns:
    --------
    clusters : list of dict
        Cluster information including size, center of mass in voxel and MNI coordinates
    """
    import nibabel as nib

    if verbose:
        print("\nPerforming cluster analysis...")

    # Find connected clusters
    labeled_array, num_clusters = label(sig_mask)

    if num_clusters == 0:
        if verbose:
            print("No clusters found")
        return []

    clusters = []
    for cluster_id in range(1, num_clusters + 1):
        cluster_mask = (labeled_array == cluster_id)
        cluster_size = np.sum(cluster_mask)

        # Get center of mass in voxel coordinates
        coords = np.argwhere(cluster_mask)
        com_voxel = np.mean(coords, axis=0)

        # Convert to MNI coordinates
        com_mni = nib.affines.apply_affine(affine, com_voxel)

        clusters.append({
            'cluster_id': cluster_id,
            'size': cluster_size,
            'center_voxel': com_voxel,
            'center_mni': com_mni
        })

    # Sort by size
    clusters = sorted(clusters, key=lambda x: x['size'], reverse=True)

    if verbose:
        print(f"Found {num_clusters} clusters")
        for c in clusters[:10]:  # Show top 10
            print(f"  Cluster {c['cluster_id']}: {c['size']} voxels, "
                  f"MNI center: ({c['center_mni'][0]:.1f}, {c['center_mni'][1]:.1f}, {c['center_mni'][2]:.1f})")

    return clusters

cluster_based_correction

cluster_based_correction(responders, non_responders, p_values, valid_mask, cluster_threshold=0.01, n_permutations=500, alpha=0.05, test_type='unpaired', alternative='two-sided', cluster_stat='size', t_statistics=None, n_jobs=-1, verbose=True, logger=None, save_permutation_log=False, permutation_log_file=None, subject_ids_resp=None, subject_ids_non_resp=None)

Apply cluster-based permutation correction for multiple comparisons

This implements the cluster-based approach commonly used in neuroimaging. Tests all valid voxels in permutations and uses parallel processing for speed.

Parameters:

responders : ndarray (x, y, z, n_subjects) Responder data non_responders : ndarray (x, y, z, n_subjects) Non-responder data p_values : ndarray (x, y, z) Uncorrected p-values from initial test valid_mask : ndarray (x, y, z) Boolean mask of valid voxels cluster_threshold : float Initial p-value threshold for cluster formation (uncorrected) n_permutations : int Number of permutations for null distribution (500-1000 recommended) alpha : float Significance level for cluster-level correction test_type : str Either 'paired' or 'unpaired' t-test for permutations alternative : {'two-sided', 'greater', 'less'}, optional Alternative hypothesis (default: 'two-sided') cluster_stat : {'size', 'mass'}, optional Cluster statistic to use (default: 'size'): * 'size': count of contiguous significant voxels * 'mass': sum of t-statistics in contiguous significant voxels t_statistics : ndarray (x, y, z), optional T-statistics from initial test (required if cluster_stat='mass') n_jobs : int Number of parallel jobs. -1 uses all available CPU cores. 1 disables parallelization. verbose : bool Print progress information logger : logging.Logger, optional Logger instance for output (default: None) save_permutation_log : bool, optional If True, save detailed permutation information to file (default: False) permutation_log_file : str, optional Path to save permutation log. If None and save_permutation_log=True, will use default name subject_ids_resp : list, optional List of responder subject IDs (for logging) subject_ids_non_resp : list, optional List of non-responder subject IDs (for logging)

Returns:

sig_mask : ndarray (x, y, z) Binary mask of significant voxels cluster_stat_threshold : float Cluster statistic threshold from permutation distribution sig_clusters : list of dict Information about significant clusters null_max_cluster_stats : ndarray Maximum cluster statistics from permutation null distribution cluster_stats : list of dict All clusters from observed data (for plotting) correlation_data : dict Dictionary with 'sizes' and 'masses' arrays for correlation analysis

Source code in tit/stats/stats_utils.py
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
def cluster_based_correction(responders, non_responders, p_values, valid_mask,
                            cluster_threshold=0.01, n_permutations=500, alpha=0.05,
                            test_type='unpaired', alternative='two-sided', cluster_stat='size',
                            t_statistics=None, n_jobs=-1, verbose=True,
                            logger=None, save_permutation_log=False, permutation_log_file=None,
                            subject_ids_resp=None, subject_ids_non_resp=None):
    """
    Apply cluster-based permutation correction for multiple comparisons

    This implements the cluster-based approach commonly used in neuroimaging.
    Tests all valid voxels in permutations and uses parallel processing for speed.

    Parameters:
    -----------
    responders : ndarray (x, y, z, n_subjects)
        Responder data
    non_responders : ndarray (x, y, z, n_subjects)
        Non-responder data
    p_values : ndarray (x, y, z)
        Uncorrected p-values from initial test
    valid_mask : ndarray (x, y, z)
        Boolean mask of valid voxels
    cluster_threshold : float
        Initial p-value threshold for cluster formation (uncorrected)
    n_permutations : int
        Number of permutations for null distribution (500-1000 recommended)
    alpha : float
        Significance level for cluster-level correction
    test_type : str
        Either 'paired' or 'unpaired' t-test for permutations
    alternative : {'two-sided', 'greater', 'less'}, optional
        Alternative hypothesis (default: 'two-sided')
    cluster_stat : {'size', 'mass'}, optional
        Cluster statistic to use (default: 'size'):
        * 'size': count of contiguous significant voxels
        * 'mass': sum of t-statistics in contiguous significant voxels
    t_statistics : ndarray (x, y, z), optional
        T-statistics from initial test (required if cluster_stat='mass')
    n_jobs : int
        Number of parallel jobs. -1 uses all available CPU cores. 1 disables parallelization.
    verbose : bool
        Print progress information
    logger : logging.Logger, optional
        Logger instance for output (default: None)
    save_permutation_log : bool, optional
        If True, save detailed permutation information to file (default: False)
    permutation_log_file : str, optional
        Path to save permutation log. If None and save_permutation_log=True, 
        will use default name
    subject_ids_resp : list, optional
        List of responder subject IDs (for logging)
    subject_ids_non_resp : list, optional
        List of non-responder subject IDs (for logging)

    Returns:
    --------
    sig_mask : ndarray (x, y, z)
        Binary mask of significant voxels
    cluster_stat_threshold : float
        Cluster statistic threshold from permutation distribution
    sig_clusters : list of dict
        Information about significant clusters
    null_max_cluster_stats : ndarray
        Maximum cluster statistics from permutation null distribution
    cluster_stats : list of dict
        All clusters from observed data (for plotting)
    correlation_data : dict
        Dictionary with 'sizes' and 'masses' arrays for correlation analysis
    """
    try:
        from .io_utils import save_permutation_details
    except ImportError:
        from io_utils import save_permutation_details

    # Validate cluster_stat parameter
    if cluster_stat not in ['size', 'mass']:
        raise ValueError(f"cluster_stat must be 'size' or 'mass', got '{cluster_stat}'")

    # Check that t_statistics is provided if using cluster mass
    if cluster_stat == 'mass' and t_statistics is None:
        raise ValueError("t_statistics must be provided when cluster_stat='mass'")

    if verbose:
        header = f"\n{'='*70}\nCLUSTER-BASED PERMUTATION CORRECTION\n{'='*70}"
        if logger:
            logger.info(header)
        else:
            print(header)
        cluster_stat_name = "Cluster Size" if cluster_stat == 'size' else "Cluster Mass"
        config_info = f"Cluster statistic: {cluster_stat_name}\nCluster-forming threshold: p < {cluster_threshold}\nNumber of permutations: {n_permutations}\nCluster-level alpha: {alpha}"
        if logger:
            for line in config_info.split('\n'):
                logger.info(line)
        else:
            print(config_info)

    # Step 1: Form clusters based on initial threshold
    initial_mask = (p_values < cluster_threshold) & valid_mask
    labeled_array, n_clusters = label(initial_mask)

    if verbose:
        msg = f"\nFound {n_clusters} clusters at p < {cluster_threshold} (uncorrected)"
        if logger:
            logger.info(msg)
        else:
            print(msg)

    if n_clusters == 0:
        if verbose:
            msg = "No clusters found. Try increasing cluster_threshold (e.g., 0.05)"
            if logger:
                logger.warning(msg)
            else:
                print(msg)
        # Return all 6 expected values with empty/default data
        empty_correlation_data = {
            'sizes': np.array([]),
            'masses': np.array([])
        }
        return np.zeros_like(p_values, dtype=int), cluster_threshold, [], np.array([]), [], empty_correlation_data

    # Calculate cluster statistics efficiently without storing all clusters
    if n_clusters > 1000 and verbose:
        warning_msg = f"\nWARNING: Found {n_clusters} clusters! This is unusually high.\nConsider:\n  1. Using a stricter cluster_threshold (e.g., 0.01 instead of 0.05)\n  2. Checking if your data is properly masked\n  3. Verifying your p-values are computed correctly"
        if logger:
            logger.warning(warning_msg)
        else:
            print(warning_msg)

    # Find top clusters for display without storing all in memory
    if verbose:
        msg = f"\nFinding largest clusters for summary..."
        if logger:
            logger.info(msg)
        else:
            print(msg)
        top_clusters = []

        # Find top 10 clusters by their statistic
        for cluster_id in range(1, n_clusters + 1):
            cluster_mask = (labeled_array == cluster_id)
            size = np.sum(cluster_mask)

            if size > 1:  # Only multi-voxel clusters
                if cluster_stat == 'size':
                    stat_value = float(size)
                elif cluster_stat == 'mass':
                    stat_value = float(np.sum(t_statistics[cluster_mask]))

                # Keep only top 10
                if len(top_clusters) < 10:
                    top_clusters.append((cluster_id, size, stat_value))
                    top_clusters.sort(key=lambda x: x[2], reverse=True)
                elif stat_value > top_clusters[-1][2]:
                    top_clusters[-1] = (cluster_id, size, stat_value)
                    top_clusters.sort(key=lambda x: x[2], reverse=True)

        if top_clusters:
            # Show summary of top clusters
            if cluster_stat == 'size':
                msg = f"\nTop {len(top_clusters)} clusters (largest: {top_clusters[0][2]:.0f} voxels)"
            else:
                msg = f"\nTop {len(top_clusters)} clusters (largest mass: {top_clusters[0][2]:.2f})"

            if logger:
                logger.info(msg)
            else:
                print(msg)

            for i, (cid, size, stat_value) in enumerate(top_clusters[:5], 1):
                if cluster_stat == 'size':
                    msg = f"  {i}. Cluster {cid}: {size} voxels"
                else:
                    msg = f"  {i}. Cluster {cid}: {size} voxels, mass = {stat_value:.2f}"
                if logger:
                    logger.info(msg)
                else:
                    print(msg)
        else:
            msg = f"\nNo multi-voxel clusters found"
            if logger:
                logger.info(msg)
            else:
                print(msg)

    # Step 2: Test all valid voxels in permutations
    test_mask = valid_mask
    test_coords = np.argwhere(test_mask)
    n_test_voxels = len(test_coords)

    if verbose:
        msg = f"\nTesting all {n_test_voxels} valid voxels in permutations"
        if logger:
            logger.info(msg)
        else:
            print(msg)

    # Step 3: Permutation testing
    if verbose:
        msg = f"\nRunning {n_permutations} permutations..."
        if logger:
            logger.info(msg)
        else:
            print(msg)

    # Combine all subjects
    all_data = np.concatenate([responders, non_responders], axis=-1)
    n_resp = responders.shape[-1]
    n_non_resp = non_responders.shape[-1]
    n_total = n_resp + n_non_resp

    # Pre-extract data for test voxels
    if verbose:
        msg = f"Pre-extracting voxel data for faster permutations...\n  Test voxels: {n_test_voxels:,}\n  Total subjects: {n_total}"
        if logger:
            logger.info(msg)
        else:
            print(msg)

    # Use float32 to save memory
    test_data = np.zeros((n_test_voxels, n_total), dtype=np.float32)
    for idx, coord in enumerate(test_coords):
        i, j, k = coord[0], coord[1], coord[2]
        test_data[idx, :] = all_data[i, j, k, :].astype(np.float32)

    # Free the original combined data array
    del all_data
    import gc
    gc.collect()

    # Report memory usage
    if verbose:
        test_data_mb = test_data.nbytes / (1024**2)
        msg = f"  Test data size: {test_data_mb:.1f} MB"
        if logger:
            logger.info(msg)
        else:
            print(msg)

    # Determine number of jobs
    if n_jobs == -1:
        n_jobs = multiprocessing.cpu_count()
        if verbose:
            print(f"Auto-detected {n_jobs} CPU cores")

    if verbose:
        if n_jobs == 1:
            print("Running permutations sequentially (1 core)...")
        else:
            print(f"Running permutations in parallel using {n_jobs} cores...")

    # Run permutations in parallel
    # Use seeds for reproducibility
    seeds = np.random.randint(0, 2**31, size=n_permutations)

    # Determine if we need to track permutation indices
    track_indices = save_permutation_log and subject_ids_resp is not None and subject_ids_non_resp is not None

    if n_jobs == 1:
        # Sequential execution with progress bar
        null_max_cluster_stats = []
        null_max_cluster_sizes = []
        null_max_cluster_masses = []
        permutation_indices = [] if track_indices else None
        iterator = tqdm(range(n_permutations), desc="Permutations") if verbose else range(n_permutations)
        for perm in iterator:
            result = _run_single_permutation(
                test_data, test_coords, n_resp, n_total,
                cluster_threshold, valid_mask, p_values.shape,
                test_type=test_type,
                alternative=alternative,
                cluster_stat=cluster_stat,
                seed=seeds[perm],
                return_indices=track_indices
            )
            if track_indices:
                max_stat, perm_idx, max_size, max_mass = result
                null_max_cluster_stats.append(max_stat)
                null_max_cluster_sizes.append(max_size)
                null_max_cluster_masses.append(max_mass)
                permutation_indices.append(perm_idx)
            else:
                max_stat, max_size, max_mass = result
                null_max_cluster_stats.append(max_stat)
                null_max_cluster_sizes.append(max_size)
                null_max_cluster_masses.append(max_mass)
    else:
        # Parallel execution using joblib - same as local implementation
        if verbose:
            print(f"Using joblib with {n_jobs} processes...")

        # Run permutations with joblib
        results = Parallel(n_jobs=n_jobs, verbose=10 if verbose else 0)(
            delayed(_run_single_permutation)(
                test_data, test_coords, n_resp, n_total,
                cluster_threshold, valid_mask, p_values.shape,
                test_type=test_type,
                alternative=alternative,
                cluster_stat=cluster_stat,
                seed=seeds[perm],
                return_indices=track_indices
            ) for perm in range(n_permutations)
        )

        # Clean up parallel processing resources
        import gc
        gc.collect()

        if verbose:
            print(f"All {n_permutations} permutations completed")

        if track_indices:
            null_max_cluster_stats = [r[0] for r in results]
            permutation_indices = [r[1] for r in results]
            null_max_cluster_sizes = [r[2] for r in results]
            null_max_cluster_masses = [r[3] for r in results]
        else:
            null_max_cluster_stats = [r[0] for r in results]
            null_max_cluster_sizes = [r[1] for r in results]
            null_max_cluster_masses = [r[2] for r in results]
            permutation_indices = None

    # Explicitly free memory from test_data and other large arrays
    del test_data
    del test_coords
    gc.collect()

    # Step 4: Determine cluster statistic threshold using discrete approach
    null_max_cluster_stats = np.array(null_max_cluster_stats)

    # For p < alpha, we need (count >= observed) < alpha * n_permutations
    # The threshold is the value where exactly alpha * n_permutations values exceed it
    sorted_null = np.sort(null_max_cluster_stats)[::-1]  # Sort descending
    threshold_index = int(alpha * n_permutations)
    if threshold_index == 0:
        threshold_index = 1  # Need at least 1 for edge case
    if threshold_index > len(sorted_null):
        threshold_index = len(sorted_null)

    # Discrete threshold: the value where exactly alpha proportion exceeds it
    cluster_stat_threshold = sorted_null[threshold_index - 1]  # -1 for 0-indexing

    if verbose:
        stat_unit = "voxels" if cluster_stat == 'size' else "mass units"
        msg = f"\nDiscrete threshold for significance (p < {alpha}): {cluster_stat_threshold:.2f} {stat_unit}\n  (This is the {threshold_index}th largest value out of {n_permutations} permutations)\nNull distribution stats: min={np.min(null_max_cluster_stats):.2f}, mean={np.mean(null_max_cluster_stats):.2f}, max={np.max(null_max_cluster_stats):.2f}"
        if logger:
            logger.info(msg)
        else:
            print(msg)

    # Save permutation details if requested
    if save_permutation_log and permutation_indices is not None:
        if permutation_log_file is None:
            permutation_log_file = "permutation_details.txt"

        # Prepare permutation info
        permutation_info = []
        for perm_num in range(n_permutations):
            permutation_info.append({
                'perm_num': perm_num,
                'perm_idx': permutation_indices[perm_num],
                'max_cluster_size': null_max_cluster_stats[perm_num]
            })

        save_permutation_details(permutation_info, permutation_log_file, 
                                subject_ids_resp, subject_ids_non_resp)

        if verbose:
            print(f"Saved permutation details to: {permutation_log_file}")

    # Step 5: Identify significant clusters using per-cluster p-values (MNE-style)
    # Include observed max in null distribution for proper p-value computation
    # Reference: Maris & Oostenveld (2007), MNE-Python implementation
    sig_mask = np.zeros_like(p_values, dtype=int)
    sig_clusters = []
    all_cluster_stats = []  # Store all cluster statistics for p-value computation

    # First pass: collect all cluster statistics
    max_clusters_to_check = min(n_clusters, 10000)  # Safety limit

    if n_clusters > max_clusters_to_check and verbose:
        print(f"\nNote: Checking first {max_clusters_to_check} clusters for significance (out of {n_clusters} total)")

    cluster_info_list = []
    for cluster_id in range(1, max_clusters_to_check + 1):
        # Get cluster coordinates without creating full boolean mask
        cluster_coords = np.where(labeled_array == cluster_id)
        size = len(cluster_coords[0])

        if size > 1:  # Only multi-voxel clusters
            if cluster_stat == 'size':
                stat_value = float(size)
            elif cluster_stat == 'mass':
                stat_value = float(np.sum(t_statistics[cluster_coords]))

            cluster_info_list.append({
                'id': cluster_id,
                'size': size,
                'stat_value': stat_value,
                'coords': cluster_coords
            })
            all_cluster_stats.append(stat_value)

    # Compute per-cluster p-values using MNE-style approach
    # Add observed max to null distribution for proper family-wise error control
    if all_cluster_stats:
        all_cluster_stats = np.array(all_cluster_stats)

        # Determine tail based on alternative hypothesis
        if alternative == 'greater':
            tail = 1
        elif alternative == 'less':
            tail = -1
        else:
            tail = 0

        # Compute p-values for each cluster
        cluster_pvalues = pval_from_histogram(all_cluster_stats, null_max_cluster_stats, tail=tail)

        # Log top 10 clusters with their p-values for transparency
        if verbose and len(cluster_info_list) > 0:
            msg = f"\nTop {min(10, len(cluster_info_list))} observed clusters with p-values:"
            if logger:
                logger.info(msg)
            else:
                print(msg)

            for i in range(min(10, len(cluster_info_list))):
                info = cluster_info_list[i]
                cluster_pv = cluster_pvalues[i]
                status = "SIGNIFICANT" if cluster_pv < alpha else "not significant"
                if cluster_stat == 'size':
                    msg = (f"  Cluster {info['id']}: {info['size']} voxels, "
                           f"p={cluster_pv:.4f} ({status})")
                else:
                    msg = (f"  Cluster {info['id']}: mass={info['stat_value']:.2f}, "
                           f"size={info['size']}, p={cluster_pv:.4f} ({status})")
                if logger:
                    logger.info(msg)
                else:
                    print(msg)

        # Collect top observed clusters for visualization (top 10 by cluster statistic)
        all_observed_clusters = []
        for i, info in enumerate(cluster_info_list[:min(10, len(cluster_info_list))]):
            all_observed_clusters.append({
                'id': info['id'],
                'size': info['size'],
                'stat_value': info['stat_value'],
                'p_value': cluster_pvalues[i]
            })

        # Second pass: identify significant clusters
        for i, info in enumerate(cluster_info_list):
            cluster_pv = cluster_pvalues[i]

            # Check if significant using per-cluster p-value
            if cluster_pv < alpha:
                sig_mask[info['coords']] = 1
                sig_clusters.append({
                    'id': info['id'],
                    'size': info['size'],
                    'stat_value': info['stat_value'],
                    'p_value': cluster_pv
                })

    if verbose:
        msg = f"\nSignificant clusters (p < {alpha}): {len(sig_clusters)}\nTotal significant voxels: {np.sum(sig_mask)}"
        if logger:
            logger.info(msg)
        else:
            print(msg)

    # Prepare correlation data
    correlation_data = {
        'sizes': np.array(null_max_cluster_sizes),
        'masses': np.array(null_max_cluster_masses)
    }

    return sig_mask, cluster_stat_threshold, sig_clusters, null_max_cluster_stats, all_observed_clusters, correlation_data

correlation_cluster_correction

correlation_cluster_correction(subject_data, effect_sizes, r_values, t_statistics, p_values, valid_mask, weights=None, correlation_type='pearson', cluster_threshold=0.05, n_permutations=1000, alpha=0.05, cluster_stat='mass', alternative='two-sided', n_jobs=-1, verbose=True, logger=None, save_permutation_log=False, permutation_log_file=None, subject_ids=None)

Apply cluster-based permutation correction for correlation analysis.

This implements the ACES approach where effect sizes are shuffled to break the association between E-field magnitude and outcome.

Parameters:

subject_data : ndarray (x, y, z, n_subjects) Electric field magnitude data effect_sizes : ndarray (n_subjects,) Continuous outcome measure r_values : ndarray (x, y, z) Correlation coefficients from initial test t_statistics : ndarray (x, y, z) T-statistics from initial test p_values : ndarray (x, y, z) P-values from initial test valid_mask : ndarray (x, y, z) Boolean mask of valid voxels weights : ndarray (n_subjects,), optional Subject weights for weighted correlation correlation_type : str 'pearson' or 'spearman' cluster_threshold : float P-value threshold for cluster formation n_permutations : int Number of permutations alpha : float Significance level for cluster-level correction cluster_stat : str 'size' or 'mass' (sum of t-values in cluster) alternative : {'two-sided', 'greater', 'less'}, optional Defines the alternative hypothesis (default: 'two-sided'): * 'two-sided': test both positive and negative correlations * 'greater': test positive correlations only (one-tailed, uses full alpha) * 'less': test negative correlations only (one-tailed, uses full alpha) n_jobs : int Number of parallel jobs (-1 = all cores) verbose : bool Print progress information logger : logging.Logger, optional Logger instance save_permutation_log : bool Save permutation details permutation_log_file : str, optional Path for permutation log subject_ids : list, optional Subject IDs for logging

Returns:

sig_mask : ndarray (x, y, z) Binary mask of significant voxels cluster_stat_threshold : float Cluster statistic threshold from null distribution sig_clusters : list of dict Information about significant clusters null_distribution : ndarray Maximum cluster statistics from permutations all_clusters : list All observed clusters correlation_data : dict Cluster size and mass data for analysis

Source code in tit/stats/stats_utils.py
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
def correlation_cluster_correction(subject_data, effect_sizes, r_values, t_statistics,
                                   p_values, valid_mask, weights=None,
                                   correlation_type='pearson', cluster_threshold=0.05,
                                   n_permutations=1000, alpha=0.05, cluster_stat='mass',
                                   alternative='two-sided', n_jobs=-1, verbose=True, logger=None,
                                   save_permutation_log=False, permutation_log_file=None,
                                   subject_ids=None):
    """
    Apply cluster-based permutation correction for correlation analysis.

    This implements the ACES approach where effect sizes are shuffled to 
    break the association between E-field magnitude and outcome.

    Parameters:
    -----------
    subject_data : ndarray (x, y, z, n_subjects)
        Electric field magnitude data
    effect_sizes : ndarray (n_subjects,)
        Continuous outcome measure
    r_values : ndarray (x, y, z)
        Correlation coefficients from initial test
    t_statistics : ndarray (x, y, z)
        T-statistics from initial test
    p_values : ndarray (x, y, z)
        P-values from initial test
    valid_mask : ndarray (x, y, z)
        Boolean mask of valid voxels
    weights : ndarray (n_subjects,), optional
        Subject weights for weighted correlation
    correlation_type : str
        'pearson' or 'spearman'
    cluster_threshold : float
        P-value threshold for cluster formation
    n_permutations : int
        Number of permutations
    alpha : float
        Significance level for cluster-level correction
    cluster_stat : str
        'size' or 'mass' (sum of t-values in cluster)
    alternative : {'two-sided', 'greater', 'less'}, optional
        Defines the alternative hypothesis (default: 'two-sided'):
        * 'two-sided': test both positive and negative correlations
        * 'greater': test positive correlations only (one-tailed, uses full alpha)
        * 'less': test negative correlations only (one-tailed, uses full alpha)
    n_jobs : int
        Number of parallel jobs (-1 = all cores)
    verbose : bool
        Print progress information
    logger : logging.Logger, optional
        Logger instance
    save_permutation_log : bool
        Save permutation details
    permutation_log_file : str, optional
        Path for permutation log
    subject_ids : list, optional
        Subject IDs for logging

    Returns:
    --------
    sig_mask : ndarray (x, y, z)
        Binary mask of significant voxels
    cluster_stat_threshold : float
        Cluster statistic threshold from null distribution
    sig_clusters : list of dict
        Information about significant clusters
    null_distribution : ndarray
        Maximum cluster statistics from permutations
    all_clusters : list
        All observed clusters
    correlation_data : dict
        Cluster size and mass data for analysis
    """
    try:
        from .io_utils import save_permutation_details
    except ImportError:
        from io_utils import save_permutation_details

    effect_sizes = np.asarray(effect_sizes, dtype=np.float64)
    n_subjects = len(effect_sizes)

    if verbose:
        header = f"\n{'='*70}\nCORRELATION CLUSTER-BASED PERMUTATION CORRECTION\n{'='*70}"
        if logger:
            logger.info(header)
        else:
            print(header)

        stat_name = "Cluster Size" if cluster_stat == 'size' else "Cluster Mass"
        weight_str = " (weighted)" if weights is not None else ""
        alt_text = {
            'two-sided': 'two-sided (tests both positive and negative correlations)',
            'greater': 'one-sided (tests positive correlations only, full alpha)',
            'less': 'one-sided (tests negative correlations only, full alpha)'
        }
        config_info = (f"Correlation type: {correlation_type.capitalize()}{weight_str}\n"
                      f"Alternative hypothesis: {alt_text.get(alternative, alternative)}\n"
                      f"Cluster statistic: {stat_name}\n"
                      f"Cluster-forming threshold: p < {cluster_threshold}\n"
                      f"Number of permutations: {n_permutations}\n"
                      f"Cluster-level alpha: {alpha}")
        if logger:
            for line in config_info.split('\n'):
                logger.info(line)
        else:
            print(config_info)

    # Form clusters based on initial threshold and alternative hypothesis
    if alternative == 'greater':
        # Test positive correlations only
        initial_mask = (p_values < cluster_threshold) & valid_mask & (t_statistics > 0)
        corr_type_str = "positive correlations"
    elif alternative == 'less':
        # Test negative correlations only
        initial_mask = (p_values < cluster_threshold) & valid_mask & (t_statistics < 0)
        corr_type_str = "negative correlations"
    else:
        # Two-sided: test all significant correlations
        initial_mask = (p_values < cluster_threshold) & valid_mask
        corr_type_str = "correlations (both positive and negative)"

    labeled_array, n_clusters = label(initial_mask)

    if verbose:
        msg = f"\nFound {n_clusters} clusters at p < {cluster_threshold} ({corr_type_str})"
        if logger:
            logger.info(msg)
        else:
            print(msg)

    if n_clusters == 0:
        if verbose:
            msg = "No clusters found. Try increasing cluster_threshold."
            if logger:
                logger.warning(msg)
            else:
                print(msg)
        empty_data = {'sizes': np.array([]), 'masses': np.array([])}
        return np.zeros_like(p_values, dtype=int), 0, [], np.array([]), [], empty_data

    # Pre-extract voxel data
    valid_coords = np.argwhere(valid_mask)
    n_valid = len(valid_coords)

    voxel_data = np.zeros((n_valid, n_subjects), dtype=np.float64)
    for idx, coord in enumerate(valid_coords):
        i, j, k = coord
        voxel_data[idx, :] = subject_data[i, j, k, :]

    # OPTIMIZATION: For Spearman, pre-rank voxel data ONCE before permutations
    # This avoids re-ranking the same data in every permutation (huge speedup!)
    voxel_data_preranked = False
    if correlation_type == 'spearman':
        if verbose:
            print(f"Pre-ranking voxel data for Spearman correlation (one-time operation)...")
        voxel_data = np.apply_along_axis(rankdata, 1, voxel_data)
        voxel_data_preranked = True
        if verbose:
            print(f"  ✓ Voxel data pre-ranked ({n_valid} voxels)")

    if verbose:
        print(f"\nRunning {n_permutations} permutations...")

    # Determine number of jobs
    if n_jobs == -1:
        n_jobs = multiprocessing.cpu_count()

    # Generate seeds for reproducibility
    seeds = np.random.randint(0, 2**31, size=n_permutations)

    track_indices = save_permutation_log and subject_ids is not None

    if n_jobs == 1:
        # Sequential execution
        null_max_cluster_stats = []
        null_max_cluster_sizes = []
        null_max_cluster_masses = []
        permutation_indices = [] if track_indices else None

        iterator = tqdm(range(n_permutations), desc="Permutations") if verbose else range(n_permutations)
        for perm in iterator:
            result = _run_single_correlation_permutation(
                voxel_data, effect_sizes, valid_coords,
                cluster_threshold, valid_mask, p_values.shape,
                correlation_type=correlation_type, weights=weights,
                cluster_stat=cluster_stat, alternative=alternative, seed=seeds[perm],
                return_indices=track_indices,
                voxel_data_preranked=voxel_data_preranked
            )
            if track_indices:
                max_stat, perm_idx, max_size, max_mass = result
                null_max_cluster_stats.append(max_stat)
                null_max_cluster_sizes.append(max_size)
                null_max_cluster_masses.append(max_mass)
                permutation_indices.append(perm_idx)
            else:
                max_stat, max_size, max_mass = result
                null_max_cluster_stats.append(max_stat)
                null_max_cluster_sizes.append(max_size)
                null_max_cluster_masses.append(max_mass)
    else:
        # Parallel execution
        if verbose:
            print(f"Using {n_jobs} parallel processes...")

        results = Parallel(n_jobs=n_jobs, verbose=10 if verbose else 0)(
            delayed(_run_single_correlation_permutation)(
                voxel_data, effect_sizes, valid_coords,
                cluster_threshold, valid_mask, p_values.shape,
                correlation_type=correlation_type, weights=weights,
                cluster_stat=cluster_stat, alternative=alternative, seed=seeds[perm],
                return_indices=track_indices,
                voxel_data_preranked=voxel_data_preranked
            ) for perm in range(n_permutations)
        )

        # Clean up parallel processing resources
        import gc
        gc.collect()

        if track_indices:
            null_max_cluster_stats = [r[0] for r in results]
            permutation_indices = [r[1] for r in results]
            null_max_cluster_sizes = [r[2] for r in results]
            null_max_cluster_masses = [r[3] for r in results]
        else:
            null_max_cluster_stats = [r[0] for r in results]
            null_max_cluster_sizes = [r[1] for r in results]
            null_max_cluster_masses = [r[2] for r in results]
            permutation_indices = None

    # Clean up
    del voxel_data
    gc.collect()

    # Determine threshold using discrete approach (consistent with exact p-values)
    null_max_cluster_stats = np.array(null_max_cluster_stats)

    # For p < alpha, we need (count >= observed) < alpha * n_permutations
    # The threshold is the value where exactly alpha * n_permutations values exceed it
    sorted_null = np.sort(null_max_cluster_stats)[::-1]  # Sort descending
    threshold_index = int(alpha * n_permutations)
    if threshold_index == 0:
        threshold_index = 1  # Need at least 1 for edge case
    if threshold_index > len(sorted_null):
        threshold_index = len(sorted_null)

    # Discrete threshold: the value where exactly alpha proportion exceeds it
    cluster_stat_threshold = sorted_null[threshold_index - 1]  # -1 for 0-indexing

    if verbose:
        stat_unit = "voxels" if cluster_stat == 'size' else "mass units"
        msg = (f"\nDiscrete threshold for significance (p < {alpha}): "
               f"{cluster_stat_threshold:.2f} {stat_unit}\n"
               f"  (This is the {threshold_index}th largest value out of {n_permutations} permutations)\n"
               f"Null distribution: min={np.min(null_max_cluster_stats):.2f}, "
               f"mean={np.mean(null_max_cluster_stats):.2f}, "
               f"max={np.max(null_max_cluster_stats):.2f}")
        if logger:
            logger.info(msg)
        else:
            print(msg)

    # Identify significant clusters using per-cluster p-values (MNE-style)
    sig_mask = np.zeros_like(p_values, dtype=int)
    sig_clusters = []

    # Safety limit to prevent memory issues with very large number of clusters
    max_clusters_to_check = min(n_clusters, 10000)

    if n_clusters > max_clusters_to_check and verbose:
        print(f"\nNote: Checking first {max_clusters_to_check} clusters for significance (out of {n_clusters} total)")

    # First pass: collect cluster statistics WITHOUT storing full masks (memory efficient)
    # We only store the cluster_id and compute masks later for significant clusters only
    cluster_info_list = []
    all_cluster_stats = []

    # Use scipy.ndimage for efficient cluster statistics
    from scipy.ndimage import sum as ndimage_sum

    # Get cluster sizes efficiently using bincount
    cluster_labels_flat = labeled_array.ravel()
    cluster_sizes_all = np.bincount(cluster_labels_flat, minlength=n_clusters + 1)

    # Get cluster masses using ndimage.sum (vectorized) - only if needed
    cluster_ids_all = np.arange(1, n_clusters + 1)
    if cluster_stat == 'mass':
        cluster_masses_all = ndimage_sum(t_statistics, labeled_array, index=cluster_ids_all)

    for cluster_id in range(1, max_clusters_to_check + 1):
        size = cluster_sizes_all[cluster_id]

        if size > 1:
            if cluster_stat == 'size':
                stat_value = float(size)
            else:
                stat_value = float(cluster_masses_all[cluster_id - 1])

            # Don't store masks here - only store ID and stats
            cluster_info_list.append({
                'id': cluster_id,
                'size': int(size),
                'stat_value': stat_value
            })
            all_cluster_stats.append(stat_value)

    # Compute per-cluster p-values using MNE-style approach
    if all_cluster_stats:
        all_cluster_stats = np.array(all_cluster_stats)

        # Determine tail based on alternative hypothesis
        if alternative == 'greater':
            tail = 1  # Test positive correlations (upper tail)
        elif alternative == 'less':
            tail = -1  # Test negative correlations (lower tail)
        else:
            tail = 0  # Two-sided test

        cluster_pvalues = pval_from_histogram(all_cluster_stats, null_max_cluster_stats, tail=tail)

        # Log top 10 clusters with their p-values for transparency
        if verbose and len(cluster_info_list) > 0:
            msg = f"\nTop {min(10, len(cluster_info_list))} observed clusters with p-values:"
            if logger:
                logger.info(msg)
            else:
                print(msg)

            for i in range(min(10, len(cluster_info_list))):
                info = cluster_info_list[i]
                cluster_pv = cluster_pvalues[i]
                status = "SIGNIFICANT" if cluster_pv < alpha else "not significant"
                msg = (f"  Cluster {info['id']}: mass={info['stat_value']:.2f}, "
                       f"size={info['size']}, p={cluster_pv:.4f} ({status})")
                if logger:
                    logger.info(msg)
                else:
                    print(msg)

        # Collect top observed clusters for visualization (top 10 by cluster mass)
        all_observed_clusters = []
        for i, info in enumerate(cluster_info_list[:min(10, len(cluster_info_list))]):
            all_observed_clusters.append({
                'id': info['id'],
                'size': info['size'],
                'stat_value': info['stat_value'],
                'p_value': cluster_pvalues[i]
            })

        # Second pass: identify significant clusters and compute masks ONLY for those
        for i, info in enumerate(cluster_info_list):
            cluster_pv = cluster_pvalues[i]

            if cluster_pv < alpha:
                # Get cluster coordinates and set significant mask (memory efficient)
                cluster_coords = np.where(labeled_array == info['id'])
                sig_mask[cluster_coords] = 1

                # Compute additional stats only for significant clusters
                cluster_r_values = r_values[cluster_coords]
                peak_r = np.max(cluster_r_values)
                mean_r = np.mean(cluster_r_values)

                sig_clusters.append({
                    'id': info['id'],
                    'size': info['size'],
                    'stat_value': info['stat_value'],
                    'peak_r': peak_r,
                    'mean_r': mean_r,
                    'p_value': cluster_pv
                })

    if verbose:
        msg = (f"\nSignificant clusters (p < {alpha}): {len(sig_clusters)}\n"
               f"Total significant voxels: {np.sum(sig_mask)}")
        if logger:
            logger.info(msg)
        else:
            print(msg)

    correlation_data = {
        'sizes': np.array(null_max_cluster_sizes),
        'masses': np.array(null_max_cluster_masses)
    }

    return sig_mask, cluster_stat_threshold, sig_clusters, null_max_cluster_stats, all_observed_clusters, correlation_data

correlation_voxelwise

correlation_voxelwise(subject_data, effect_sizes, weights=None, correlation_type='pearson', verbose=True)

Perform vectorized correlation at each voxel between electric field magnitude and continuous outcome measure.

This implements the ACES (Automated Correlation of Electric field strength and Stimulation effect) approach for continuous outcomes.

Parameters:

subject_data : ndarray (x, y, z, n_subjects) Electric field magnitude data for all subjects effect_sizes : ndarray (n_subjects,) Continuous outcome measure for each subject (e.g., effect size, % improvement, behavioral score) weights : ndarray (n_subjects,), optional Weights for each subject (e.g., sample size for meta-analysis) correlation_type : {'pearson', 'spearman'}, optional Type of correlation (default: 'pearson') verbose : bool Print progress information

Returns:

r_values : ndarray (x, y, z) Correlation coefficient at each voxel t_statistics : ndarray (x, y, z) Studentized correlation (t-statistic) at each voxel p_values : ndarray (x, y, z) P-value at each voxel valid_mask : ndarray (x, y, z) Boolean mask of valid voxels

Source code in tit/stats/stats_utils.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
def correlation_voxelwise(subject_data, effect_sizes, weights=None, 
                          correlation_type='pearson', verbose=True):
    """
    Perform vectorized correlation at each voxel between electric field
    magnitude and continuous outcome measure.

    This implements the ACES (Automated Correlation of Electric field 
    strength and Stimulation effect) approach for continuous outcomes.

    Parameters:
    -----------
    subject_data : ndarray (x, y, z, n_subjects)
        Electric field magnitude data for all subjects
    effect_sizes : ndarray (n_subjects,)
        Continuous outcome measure for each subject (e.g., effect size, 
        % improvement, behavioral score)
    weights : ndarray (n_subjects,), optional
        Weights for each subject (e.g., sample size for meta-analysis)
    correlation_type : {'pearson', 'spearman'}, optional
        Type of correlation (default: 'pearson')
    verbose : bool
        Print progress information

    Returns:
    --------
    r_values : ndarray (x, y, z)
        Correlation coefficient at each voxel
    t_statistics : ndarray (x, y, z)
        Studentized correlation (t-statistic) at each voxel
    p_values : ndarray (x, y, z)
        P-value at each voxel
    valid_mask : ndarray (x, y, z)
        Boolean mask of valid voxels
    """
    if verbose:
        weight_str = " (weighted)" if weights is not None else ""
        print(f"\nPerforming voxelwise {correlation_type.capitalize()} correlation{weight_str} (vectorized)...")

    # Validate inputs
    n_subjects = subject_data.shape[-1]
    if len(effect_sizes) != n_subjects:
        raise ValueError(f"Number of effect sizes ({len(effect_sizes)}) must match "
                        f"number of subjects ({n_subjects})")

    if weights is not None and len(weights) != n_subjects:
        raise ValueError(f"Number of weights ({len(weights)}) must match "
                        f"number of subjects ({n_subjects})")

    if n_subjects < 3:
        raise ValueError(f"Need at least 3 subjects for correlation, got {n_subjects}")

    effect_sizes = np.asarray(effect_sizes, dtype=np.float64)

    shape = subject_data.shape[:3]
    r_values = np.zeros(shape)
    t_statistics = np.zeros(shape)
    p_values = np.ones(shape)

    # Create mask of valid voxels (non-zero in at least some subjects)
    valid_mask = np.any(subject_data > 0, axis=-1)
    total_voxels = np.sum(valid_mask)

    if verbose:
        print(f"Computing correlations for {total_voxels} valid voxels...")

    # Get coordinates of valid voxels
    valid_coords = np.argwhere(valid_mask)
    n_valid = len(valid_coords)

    # Pre-extract voxel data for vectorized computation
    voxel_data = np.zeros((n_valid, n_subjects), dtype=np.float64)
    for idx, coord in enumerate(valid_coords):
        i, j, k = coord
        voxel_data[idx, :] = subject_data[i, j, k, :]

    # Compute correlations for all voxels at once
    r_1d, t_1d, p_1d = correlation(
        voxel_data, effect_sizes,
        correlation_type=correlation_type,
        weights=weights
    )

    # Map results back to 3D volume
    for idx, coord in enumerate(valid_coords):
        i, j, k = coord
        r_values[i, j, k] = r_1d[idx]
        t_statistics[i, j, k] = t_1d[idx]
        p_values[i, j, k] = p_1d[idx]

    if verbose:
        print(f"Correlation range: [{np.min(r_values[valid_mask]):.4f}, {np.max(r_values[valid_mask]):.4f}]")
        print(f"Mean |r|: {np.mean(np.abs(r_values[valid_mask])):.4f}")
        sig_05 = np.sum((p_values < 0.05) & valid_mask)
        sig_01 = np.sum((p_values < 0.01) & valid_mask)
        print(f"Voxels with p<0.05 (uncorrected): {sig_05}")
        print(f"Voxels with p<0.01 (uncorrected): {sig_01}")

    return r_values, t_statistics, p_values, valid_mask

generate_correlation_summary

generate_correlation_summary(subject_data, effect_sizes, r_values, sig_mask, cluster_threshold, clusters, atlas_results, output_file, params=None, effect_metric='Effect Size', subject_ids=None, weights=None)

Generate comprehensive summary report for correlation analysis

Parameters:

subject_data : ndarray (x, y, z, n_subjects) Electric field magnitude data effect_sizes : ndarray (n_subjects,) Continuous outcome measures r_values : ndarray (x, y, z) Correlation map sig_mask : ndarray (x, y, z) Binary mask of significant voxels cluster_threshold : float Cluster statistic threshold from permutation clusters : list List of cluster dictionaries atlas_results : dict Atlas overlap results output_file : str Path to output summary file params : dict, optional Analysis parameters effect_metric : str Name of the outcome measure subject_ids : list, optional List of subject IDs weights : ndarray, optional Subject weights (if used)

Source code in tit/stats/reporting.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def generate_correlation_summary(subject_data, effect_sizes, r_values, sig_mask, 
                                 cluster_threshold, clusters, atlas_results, 
                                 output_file, params=None, effect_metric="Effect Size",
                                 subject_ids=None, weights=None):
    """
    Generate comprehensive summary report for correlation analysis

    Parameters:
    -----------
    subject_data : ndarray (x, y, z, n_subjects)
        Electric field magnitude data
    effect_sizes : ndarray (n_subjects,)
        Continuous outcome measures
    r_values : ndarray (x, y, z)
        Correlation map
    sig_mask : ndarray (x, y, z)
        Binary mask of significant voxels
    cluster_threshold : float
        Cluster statistic threshold from permutation
    clusters : list
        List of cluster dictionaries
    atlas_results : dict
        Atlas overlap results
    output_file : str
        Path to output summary file
    params : dict, optional
        Analysis parameters
    effect_metric : str
        Name of the outcome measure
    subject_ids : list, optional
        List of subject IDs
    weights : ndarray, optional
        Subject weights (if used)
    """
    if params is None:
        params = {
            'correlation_type': 'pearson',
            'cluster_threshold': 0.05,
            'cluster_stat': 'mass',
            'n_permutations': 1000,
            'alpha': 0.05,
            'use_weights': False
        }

    n_subjects = len(effect_sizes)

    with open(output_file, 'w') as f:
        f.write("="*70 + "\n")
        f.write("CORRELATION-BASED CLUSTER PERMUTATION ANALYSIS SUMMARY\n")
        f.write("(ACES-style analysis for continuous outcomes)\n")
        f.write("="*70 + "\n\n")

        f.write("ANALYSIS DETAILS:\n")
        f.write("-" * 70 + "\n")
        corr_type = params.get('correlation_type', 'pearson').capitalize()
        weighted_str = " (Weighted)" if params.get('use_weights', False) else ""
        f.write(f"Correlation Type: {corr_type}{weighted_str}\n")
        f.write(f"Outcome Measure: {effect_metric}\n")

        cluster_stat = params.get('cluster_stat', 'mass')
        cluster_stat_name = "Cluster Size" if cluster_stat == 'size' else "Cluster Mass"
        f.write(f"Cluster Statistic: {cluster_stat_name}\n")
        f.write(f"Cluster-forming threshold: p < {params.get('cluster_threshold', 0.05)}\n")
        f.write(f"Number of permutations: {params.get('n_permutations', 1000)}\n")
        f.write(f"Cluster-level alpha: {params.get('alpha', 0.05)}\n")

        if cluster_stat == 'size':
            f.write(f"Cluster size threshold: {cluster_threshold:.1f} voxels\n")
        else:
            f.write(f"Cluster mass threshold: {cluster_threshold:.2f}\n")
        f.write("\n")

        f.write("SAMPLE INFORMATION:\n")
        f.write("-" * 70 + "\n")
        f.write(f"Number of Subjects: {n_subjects}\n")
        if subject_ids:
            f.write(f"Subject IDs: {', '.join(str(s) for s in subject_ids)}\n")
        f.write("\n")

        f.write("EFFECT SIZE DISTRIBUTION:\n")
        f.write("-" * 70 + "\n")
        f.write(f"Mean: {np.mean(effect_sizes):.4f}\n")
        f.write(f"Std:  {np.std(effect_sizes):.4f}\n")
        f.write(f"Min:  {np.min(effect_sizes):.4f}\n")
        f.write(f"Max:  {np.max(effect_sizes):.4f}\n")
        f.write(f"Range: {np.max(effect_sizes) - np.min(effect_sizes):.4f}\n")

        if subject_ids:
            f.write("\nPer-Subject Effect Sizes:\n")
            for sid, es in zip(subject_ids, effect_sizes):
                weight_str = ""
                if weights is not None:
                    idx = list(subject_ids).index(sid)
                    weight_str = f", weight={weights[idx]:.2f}"
                f.write(f"  {sid}: {es:.4f}{weight_str}\n")
        f.write("\n")

        if weights is not None:
            f.write("WEIGHT DISTRIBUTION:\n")
            f.write("-" * 70 + "\n")
            f.write(f"Mean weight: {np.mean(weights):.4f}\n")
            f.write(f"Min weight:  {np.min(weights):.4f}\n")
            f.write(f"Max weight:  {np.max(weights):.4f}\n\n")

        f.write("RESULTS:\n")
        f.write("-" * 70 + "\n")
        n_sig = np.sum(sig_mask)
        f.write(f"Number of Significant Voxels: {n_sig}\n")
        f.write(f"Number of Significant Clusters: {len(clusters)}\n\n")

        if n_sig > 0:
            sig_bool = sig_mask.astype(bool)
            mean_r = np.mean(r_values[sig_bool])
            max_r = np.max(r_values[sig_bool])
            min_r = np.min(r_values[sig_bool])

            f.write("CORRELATION STATISTICS IN SIGNIFICANT VOXELS:\n")
            f.write("-" * 70 + "\n")
            f.write(f"Mean r: {mean_r:.4f}\n")
            f.write(f"Peak r: {max_r:.4f}\n")
            f.write(f"Min r:  {min_r:.4f}\n\n")

            # E-field statistics in significant voxels
            mean_efield = np.mean(subject_data[sig_bool, :])
            max_efield = np.max(subject_data[sig_bool, :])
            f.write("E-FIELD STATISTICS IN SIGNIFICANT VOXELS:\n")
            f.write("-" * 70 + "\n")
            f.write(f"Mean E-field: {mean_efield:.6f}\n")
            f.write(f"Max E-field:  {max_efield:.6f}\n\n")

        if clusters:
            f.write("SIGNIFICANT CLUSTERS:\n")
            f.write("-" * 70 + "\n")
            for i, c in enumerate(clusters[:20], 1):
                f.write(f"\n{i}. Cluster {c['cluster_id']}: {c['size']} voxels\n")
                f.write(f"   MNI Center: ({c['center_mni'][0]:.1f}, "
                       f"{c['center_mni'][1]:.1f}, {c['center_mni'][2]:.1f})\n")
                if 'mean_r' in c:
                    f.write(f"   Mean r: {c['mean_r']:.4f}\n")
                if 'peak_r' in c:
                    f.write(f"   Peak r: {c['peak_r']:.4f}\n")

        f.write("\n" + "="*70 + "\n")
        f.write("ATLAS OVERLAP ANALYSIS\n")
        f.write("="*70 + "\n")

        for atlas_name, region_counts in atlas_results.items():
            f.write(f"\n{atlas_name}\n")
            f.write("-" * 70 + "\n")

            if region_counts:
                f.write(f"Number of regions with significant voxels: {len(region_counts)}\n\n")
                f.write("Top 20 regions:\n")
                for i, r in enumerate(region_counts[:20], 1):
                    pct = 100 * r['overlap_voxels'] / r['region_size']
                    f.write(f"{i:2d}. Region {r['region_id']:3d}: "
                           f"{r['overlap_voxels']:4d} voxels ({pct:5.1f}% of region)\n")
            else:
                f.write("No overlapping regions found.\n")

        f.write("\n" + "="*70 + "\n")
        f.write("INTERPRETATION NOTES\n")
        f.write("="*70 + "\n\n")
        f.write("This analysis identifies brain regions where electric field magnitude\n")
        f.write(f"correlates with {effect_metric}.\n\n")
        f.write("Positive correlations (r > 0):\n")
        f.write(f"  Higher E-field → Higher {effect_metric}\n\n")
        f.write("Note: This analysis tests positive correlations only (as per ACES).\n")
        f.write("To find regions with negative correlations, invert your effect sizes\n")
        f.write("(multiply by -1) and re-run the analysis.\n\n")

        f.write("References:\n")
        f.write("  - Wischnewski et al. (2021) - ACES approach\n")
        f.write("  - Maris & Oostenveld (2007) - Cluster-based permutation\n")

    print(f"\nSummary written to: {output_file}")

generate_summary

generate_summary(responders, non_responders, sig_mask, correction_threshold, clusters, atlas_results, output_file, correction_method='cluster', params=None, group1_name='Responders', group2_name='Non-Responders', value_metric='Current Intensity', test_type='unpaired', observed_cluster_sizes=None)

Generate comprehensive summary report

Parameters:

responders : ndarray Responder data (group 1) non_responders : ndarray Non-responder data (group 2) sig_mask : ndarray Binary mask of significant voxels correction_threshold : float Threshold used for multiple comparison correction clusters : list List of cluster dictionaries atlas_results : dict Atlas overlap results output_file : str Path to output summary file correction_method : str Method used: 'cluster' or 'fdr' params : dict, optional Dictionary of analysis parameters (cluster_threshold, n_permutations, alpha, etc.) If None, uses defaults group1_name : str Name for first group (default: "Responders") group2_name : str Name for second group (default: "Non-Responders") value_metric : str Name of the metric being compared (default: "Current Intensity") test_type : str Type of t-test used: 'paired' or 'unpaired' (default: "unpaired") observed_cluster_sizes : list, optional List of observed cluster sizes (before permutation correction) sorted largest to smallest

Source code in tit/stats/reporting.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def generate_summary(responders, non_responders, sig_mask, correction_threshold, 
                    clusters, atlas_results, output_file, 
                    correction_method="cluster",
                    params=None,
                    group1_name="Responders",
                    group2_name="Non-Responders",
                    value_metric="Current Intensity",
                    test_type="unpaired",
                    observed_cluster_sizes=None):
    """
    Generate comprehensive summary report

    Parameters:
    -----------
    responders : ndarray
        Responder data (group 1)
    non_responders : ndarray
        Non-responder data (group 2)
    sig_mask : ndarray
        Binary mask of significant voxels
    correction_threshold : float
        Threshold used for multiple comparison correction
    clusters : list
        List of cluster dictionaries
    atlas_results : dict
        Atlas overlap results
    output_file : str
        Path to output summary file
    correction_method : str
        Method used: 'cluster' or 'fdr'
    params : dict, optional
        Dictionary of analysis parameters (cluster_threshold, n_permutations, alpha, etc.)
        If None, uses defaults
    group1_name : str
        Name for first group (default: "Responders")
    group2_name : str
        Name for second group (default: "Non-Responders")
    value_metric : str
        Name of the metric being compared (default: "Current Intensity")
    test_type : str
        Type of t-test used: 'paired' or 'unpaired' (default: "unpaired")
    observed_cluster_sizes : list, optional
        List of observed cluster sizes (before permutation correction) sorted largest to smallest
    """
    # Set default parameters if not provided
    if params is None:
        params = {
            'cluster_threshold': 0.01,
            'n_permutations': 500,
            'alpha': 0.05
        }

    # Extract parameters with defaults
    cluster_threshold_param = params.get('cluster_threshold', 0.01)
    n_permutations = params.get('n_permutations', 500)
    alpha = params.get('alpha', 0.05)
    cluster_stat = params.get('cluster_stat', 'size')

    with open(output_file, 'w') as f:
        f.write("="*70 + "\n")
        f.write("VOXELWISE STATISTICAL ANALYSIS SUMMARY\n")
        f.write("="*70 + "\n\n")

        f.write("ANALYSIS DETAILS:\n")
        f.write("-" * 70 + "\n")
        test_name = "Paired t-test" if test_type == "paired" else "Unpaired (Independent Samples) t-test"
        f.write(f"Statistical Test: {test_name}\n")

        if correction_method == "cluster":
            f.write(f"Multiple Comparison Correction: Cluster-based Permutation\n")
            cluster_stat_name = "Cluster Size" if cluster_stat == 'size' else "Cluster Mass"
            f.write(f"Cluster statistic: {cluster_stat_name}\n")
            f.write(f"Cluster-forming threshold: p < {cluster_threshold_param} (uncorrected)\n")
            f.write(f"Number of permutations: {n_permutations}\n")
            f.write(f"Cluster-level alpha: {alpha}\n")

            if cluster_stat == 'size':
                f.write(f"Cluster size threshold: {correction_threshold:.1f} voxels\n")
            else:
                f.write(f"Cluster mass threshold: {correction_threshold:.2f}\n")
            if 'n_jobs' in params:
                n_jobs = params['n_jobs']
                if n_jobs == -1:
                    import multiprocessing
                    n_jobs_actual = multiprocessing.cpu_count()
                    f.write(f"Parallel processing: {n_jobs_actual} cores\n")
                elif n_jobs == 1:
                    f.write(f"Parallel processing: Sequential (1 core)\n")
                else:
                    f.write(f"Parallel processing: {n_jobs} cores\n")
            f.write("\n")
        else:
            f.write(f"Multiple Comparison Correction: FDR (False Discovery Rate)\n")
            f.write(f"Significance Level: alpha = {alpha}\n")
            f.write(f"FDR-corrected p-value threshold: {correction_threshold:.6f}\n\n")

        # Add observed clusters information if provided
        if observed_cluster_sizes is not None and len(observed_cluster_sizes) > 0:
            f.write("OBSERVED CLUSTERS (BEFORE PERMUTATION CORRECTION):\n")
            f.write("-" * 70 + "\n")
            f.write(f"Total clusters at p < {cluster_threshold_param}: {len(observed_cluster_sizes)}\n")
            f.write(f"Largest observed cluster: {observed_cluster_sizes[0]} voxels\n")
            f.write(f"Total voxels in all clusters: {sum(observed_cluster_sizes)}\n\n")

            f.write("Top 10 Largest Observed Clusters:\n")
            for i, size in enumerate(observed_cluster_sizes[:10], 1):
                f.write(f"  {i:2d}. {size:6d} voxels\n")
            f.write("\n")

        f.write("SAMPLE INFORMATION:\n")
        f.write("-" * 70 + "\n")
        f.write(f"Number of {group1_name}: {responders.shape[-1]}\n")
        f.write(f"Number of {group2_name}: {non_responders.shape[-1]}\n")
        f.write(f"Total Subjects: {responders.shape[-1] + non_responders.shape[-1]}\n\n")

        f.write("RESULTS:\n")
        f.write("-" * 70 + "\n")
        n_sig = np.sum(sig_mask)
        f.write(f"Number of Significant Voxels: {n_sig}\n")

        if n_sig > 0:
            # Calculate mean values in significant voxels
            sig_bool = sig_mask.astype(bool)
            group1_mean = np.mean(responders[sig_bool, :])
            group2_mean = np.mean(non_responders[sig_bool, :])

            f.write(f"\nMean {value_metric} in Significant Voxels:\n")
            f.write(f"  {group1_name}: {group1_mean:.4f}\n")
            f.write(f"  {group2_name}: {group2_mean:.4f}\n")
            f.write(f"  Difference ({group1_name} - {group2_name}): {group1_mean - group2_mean:.4f}\n")

        f.write(f"\nNumber of Clusters: {len(clusters)}\n\n")

        if clusters:
            sort_label = "by statistic" if cluster_stat == 'mass' else "by size"
            f.write(f"TOP 10 CLUSTERS ({sort_label}):\n")
            f.write("-" * 70 + "\n")
            for i, c in enumerate(clusters[:10], 1):
                f.write(f"{i}. Cluster {c['cluster_id']}: {c['size']} voxels")
                if cluster_stat == 'mass' and 'stat_value' in c:
                    f.write(f", mass = {c['stat_value']:.2f}")
                f.write("\n")
                f.write(f"   MNI Center: ({c['center_mni'][0]:.1f}, "
                       f"{c['center_mni'][1]:.1f}, {c['center_mni'][2]:.1f})\n")

        f.write("\n" + "="*70 + "\n")
        f.write("ATLAS OVERLAP ANALYSIS\n")
        f.write("="*70 + "\n\n")

        for atlas_name, region_counts in atlas_results.items():
            f.write(f"\n{atlas_name}\n")
            f.write("-" * 70 + "\n")

            if region_counts:
                f.write(f"Number of regions with significant voxels: {len(region_counts)}\n\n")
                f.write("Top 20 regions:\n")
                for i, r in enumerate(region_counts[:20], 1):
                    pct = 100 * r['overlap_voxels'] / r['region_size']
                    f.write(f"{i:2d}. Region {r['region_id']:3d}: "
                           f"{r['overlap_voxels']:4d} voxels ({pct:5.1f}% of region)\n")
            else:
                f.write("No overlapping regions found.\n")

    print(f"\nSummary written to: {output_file}")

get_path_manager

get_path_manager() -> PathManager

Get the global PathManager singleton instance.

Returns: The global path manager instance

Source code in tit/core/paths.py
680
681
682
683
684
685
686
687
688
689
690
def get_path_manager() -> PathManager:
    """
    Get the global PathManager singleton instance.

    Returns:
        The global path manager instance
    """
    global _path_manager_instance
    if _path_manager_instance is None:
        _path_manager_instance = PathManager()
    return _path_manager_instance

load_subject_data

load_subject_data(subject_configs, nifti_file_pattern=None, analysis_type='group_comparison')

Unified data loading for both group comparison and correlation analysis

Parameters:

subject_configs : list of dict Subject configurations (format depends on analysis_type) nifti_file_pattern : str, optional Pattern for NIfTI files analysis_type : str Either 'group_comparison' or 'correlation'

Returns:

For group_comparison: responders, non_responders, template_img, resp_ids, non_resp_ids For correlation: subject_data, effect_sizes, weights, template_img, subject_ids

Source code in tit/stats/permutation_analysis.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def load_subject_data(subject_configs, nifti_file_pattern=None, analysis_type='group_comparison'):
    """
    Unified data loading for both group comparison and correlation analysis

    Parameters:
    -----------
    subject_configs : list of dict
        Subject configurations (format depends on analysis_type)
    nifti_file_pattern : str, optional
        Pattern for NIfTI files
    analysis_type : str
        Either 'group_comparison' or 'correlation'

    Returns:
    --------
    For group_comparison:
        responders, non_responders, template_img, resp_ids, non_resp_ids
    For correlation:
        subject_data, effect_sizes, weights, template_img, subject_ids
    """
    if analysis_type == 'group_comparison':
        return load_subject_data_group_comparison(subject_configs, nifti_file_pattern)
    elif analysis_type == 'correlation':
        return load_subject_data_correlation(subject_configs, nifti_file_pattern)
    else:
        raise ValueError(f"Unknown analysis_type: {analysis_type}")

load_subject_data_correlation

load_subject_data_correlation(subject_configs, nifti_file_pattern=None)

Load subject data for correlation analysis (continuous outcomes)

Source code in tit/stats/permutation_analysis.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def load_subject_data_correlation(subject_configs, nifti_file_pattern=None):
    """
    Load subject data for correlation analysis (continuous outcomes)
    """
    if nifti_file_pattern is None:
        nifti_file_pattern = "grey_{simulation_name}_TI_MNI_MNI_TI_max.nii.gz"

    # Check for required fields
    required_fields = ['subject_id', 'simulation_name', 'effect_size']
    for config in subject_configs:
        for field in required_fields:
            if field not in config:
                raise ValueError(f"Missing required field '{field}' in subject config")

    # Load all subjects
    subject_data, template_img, subject_ids = nifti.load_group_data_ti_toolbox(
        subject_configs,
        nifti_file_pattern=nifti_file_pattern,
        dtype=np.float32
    )

    # Build a lookup from subject_id to config for successfully loaded subjects
    config_lookup = {c['subject_id']: c for c in subject_configs}

    # Extract effect sizes and weights only for successfully loaded subjects
    effect_sizes = []
    weights_list = []
    has_weights = 'weight' in subject_configs[0]

    for sid in subject_ids:
        config = config_lookup[sid]
        effect_sizes.append(config['effect_size'])
        if has_weights:
            weights_list.append(config.get('weight', 1.0))

    effect_sizes = np.array(effect_sizes, dtype=np.float64)
    weights = np.array(weights_list, dtype=np.float64) if has_weights else None

    print(f"\nLoaded {len(subject_ids)} subjects: {subject_ids}")
    print(f"Effect sizes: mean={np.mean(effect_sizes):.3f}, std={np.std(effect_sizes):.3f}")
    print(f"Effect size range: [{np.min(effect_sizes):.3f}, {np.max(effect_sizes):.3f}]")
    if weights is not None:
        print(f"Weights: mean={np.mean(weights):.3f}, range=[{np.min(weights):.3f}, {np.max(weights):.3f}]")
    print(f"Data shape: {subject_data.shape}")

    return subject_data, effect_sizes, weights, template_img, subject_ids

load_subject_data_group_comparison

load_subject_data_group_comparison(subject_configs, nifti_file_pattern=None)

Load subject data for group comparison analysis (binary outcomes)

Source code in tit/stats/permutation_analysis.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def load_subject_data_group_comparison(subject_configs, nifti_file_pattern=None):
    """
    Load subject data for group comparison analysis (binary outcomes)
    """
    if nifti_file_pattern is None:
        nifti_file_pattern = "grey_{simulation_name}_TI_MNI_MNI_TI_max.nii.gz"

    # Separate configs by response
    responder_configs = [c for c in subject_configs if c['response'] == 1]
    non_responder_configs = [c for c in subject_configs if c['response'] == 0]

    if len(responder_configs) == 0 or len(non_responder_configs) == 0:
        raise ValueError("Need at least one responder and one non-responder")

    # Load responders
    responders, template_img, responder_ids = nifti.load_group_data_ti_toolbox(
        responder_configs,
        nifti_file_pattern=nifti_file_pattern,
        dtype=np.float32
    )

    # Load non-responders
    non_responders, _, non_responder_ids = nifti.load_group_data_ti_toolbox(
        non_responder_configs,
        nifti_file_pattern=nifti_file_pattern,
        dtype=np.float32
    )

    print(f"\nLoaded {len(responder_ids)} responders: {responder_ids}")
    print(f"Loaded {len(non_responder_ids)} non-responders: {non_responder_ids}")
    print(f"Responders shape: {responders.shape}")
    print(f"Non-responders shape: {non_responders.shape}")

    return responders, non_responders, template_img, responder_ids, non_responder_ids

main

main()

Command-line interface for unified permutation analysis

Source code in tit/stats/permutation_analysis.py
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
def main():
    """
    Command-line interface for unified permutation analysis
    """
    import argparse
    import multiprocessing

    # Ensure proper multiprocessing initialization
    if hasattr(multiprocessing, 'set_start_method'):
        try:
            multiprocessing.set_start_method('fork', force=True)
        except RuntimeError:
            pass

    parser = argparse.ArgumentParser(
        description='Unified Cluster-Based Permutation Testing for TI-Toolbox',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Group comparison analysis
  python permutation_analysis.py --csv subjects.csv --name my_analysis \\
      --analysis-type group_comparison

  # Correlation analysis
  python permutation_analysis.py --csv subjects.csv --name my_analysis \\
      --analysis-type correlation

Group Comparison CSV Format:
  subject_id,simulation_name,response
  070,ICP_RHIPPO,1
  071,ICP_RHIPPO,0

Correlation CSV Format:
  subject_id,simulation_name,effect_size,weight
  070,ICP_RHIPPO,0.45,25
  071,ICP_RHIPPO,0.32,30

Tissue Types:
  --tissue-type controls which NIfTI files are loaded:
    grey  : grey_{simulation_name}_TI_MNI_MNI_TI_max.nii.gz
    white : white_{simulation_name}_TI_MNI_MNI_TI_max.nii.gz
    all   : {simulation_name}_TI_MNI_MNI_TI_max.nii.gz (no prefix)
        """
    )

    # Required arguments
    parser.add_argument('--csv', '-c', required=True,
                        help='Path to CSV file with subject configurations')
    parser.add_argument('--name', '-n', required=True,
                        help='Analysis name (used for output directory)')
    parser.add_argument('--analysis-type', required=True,
                        choices=['group_comparison', 'correlation'],
                        help='Type of analysis to perform')

    # Statistical parameters (common)
    parser.add_argument('--cluster-threshold', '-t', type=float, default=0.05,
                        help='P-value threshold for cluster formation (default: 0.05)')
    parser.add_argument('--cluster-stat', choices=['mass', 'size'], default='mass',
                        help='Cluster statistic: mass (sum of t-values) or size (voxel count) (default: mass)')
    parser.add_argument('--n-permutations', '-p', type=int, default=1000,
                        help='Number of permutations (default: 1000)')
    parser.add_argument('--alpha', '-a', type=float, default=0.05,
                        help='Cluster-level significance threshold (default: 0.05)')
    parser.add_argument('--n-jobs', '-j', type=int, default=-1,
                        help='Number of parallel jobs: -1=all cores, 1=sequential (default: -1)')

    # Group comparison specific parameters
    parser.add_argument('--test-type', choices=['unpaired', 'paired'], default='unpaired',
                        help='Statistical test type for group comparison (default: unpaired)')
    parser.add_argument('--alternative', choices=['two-sided', 'greater', 'less'],
                        default='two-sided',
                        help='Alternative hypothesis for group comparison (default: two-sided)')

    # Correlation specific parameters
    parser.add_argument('--correlation-type', choices=['pearson', 'spearman'],
                        default='pearson',
                        help='Type of correlation for correlation analysis (default: pearson)')
    parser.add_argument('--use-weights', action='store_true',
                        help='Use weights from CSV if available (correlation analysis)')

    # Optional parameters
    parser.add_argument('--tissue-type', choices=['grey', 'white', 'all'],
                        default='grey',
                        help='Tissue type for NIfTI files: grey (grey matter), '
                             'white (white matter), or all (all tissues, no prefix). '
                             '(default: grey)')
    parser.add_argument('--nifti-pattern', default=None,
                        help='Custom NIfTI filename pattern (overrides --tissue-type)')
    parser.add_argument('--quiet', '-q', action='store_true',
                        help='Suppress progress output')

    args = parser.parse_args()

    # Build configuration
    config = {
        'analysis_type': args.analysis_type,
        'cluster_threshold': args.cluster_threshold,
        'cluster_stat': args.cluster_stat,
        'n_permutations': args.n_permutations,
        'alpha': args.alpha,
        'n_jobs': args.n_jobs,
        'tissue_type': args.tissue_type,
        'nifti_file_pattern': args.nifti_pattern,  # None triggers auto-generation
    }

    # Add analysis-specific parameters
    if args.analysis_type == 'group_comparison':
        config.update({
            'test_type': args.test_type,
            'alternative': args.alternative,
        })
    else:  # correlation
        config.update({
            'correlation_type': args.correlation_type,
            'use_weights': args.use_weights,
        })

    # Print header
    if not args.quiet:
        print("="*70)
        print("UNIFIED CLUSTER-BASED PERMUTATION TESTING - TI-TOOLBOX")
        print("="*70)
        print(f"Analysis type: {args.analysis_type}")
        print(f"CSV file: {args.csv}")
        print(f"Analysis name: {args.name}")
        print(f"Tissue type: {args.tissue_type}")
        print(f"Permutations: {args.n_permutations}")
        print(f"Parallel jobs: {args.n_jobs if args.n_jobs != -1 else 'all cores'}")
        print(f"Cluster statistic: {args.cluster_stat}")
        print(f"Cluster threshold: p < {args.cluster_threshold}")
        print("="*70)
        print()

    # Run analysis
    try:
        results = run_analysis(
            subject_configs=args.csv,
            analysis_name=args.name,
            config=config,
            output_callback=None if args.quiet else print
        )

        # Print summary
        if not args.quiet:
            print()
            print("="*70)
            print("ANALYSIS COMPLETE!")
            print("="*70)
            print(f"Output directory: {results['output_dir']}")
            if 'n_significant_clusters' in results:
                print(f"Significant clusters: {results['n_significant_clusters']}")
            if 'n_significant_voxels' in results:
                print(f"Significant voxels: {results['n_significant_voxels']}")
            print(f"Analysis time: {results['analysis_time']:.1f} seconds")
            print("="*70)

        return 0

    except Exception as e:
        print(f"\nERROR: {str(e)}", file=sys.stderr)
        if not args.quiet:
            import traceback
            traceback.print_exc()
        return 1

plot_cluster_size_mass_correlation

plot_cluster_size_mass_correlation(cluster_sizes: np.ndarray, cluster_masses: np.ndarray, output_file: str, *, dpi: int = 300) -> str | None

Plot correlation between cluster size and cluster mass from permutation null distribution.

Source code in tit/plotting/stats.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def plot_cluster_size_mass_correlation(
    cluster_sizes: np.ndarray,
    cluster_masses: np.ndarray,
    output_file: str,
    *,
    dpi: int = 300,
) -> str | None:
    """
    Plot correlation between cluster size and cluster mass from permutation null distribution.
    """
    from scipy.stats import pearsonr

    ensure_headless_matplotlib_backend()
    import matplotlib.pyplot as plt

    import seaborn as sns

    sns.set_style("whitegrid")
    sns.set_context("notebook", font_scale=1.0)

    # Remove zeros
    mask = (cluster_sizes > 0) & (cluster_masses > 0)
    sizes_nonzero = cluster_sizes[mask]
    masses_nonzero = cluster_masses[mask]
    if len(sizes_nonzero) < 2:
        return None

    r_value, p_value = pearsonr(sizes_nonzero, masses_nonzero)

    fig, ax = plt.subplots(figsize=(10, 8))

    if sns is not None:
        sns.regplot(
            x=sizes_nonzero,
            y=masses_nonzero,
            ax=ax,
            scatter_kws={"alpha": 0.6, "s": 50, "color": "steelblue", "edgecolors": "black", "linewidths": 0.5},
            line_kws={"color": "red", "linewidth": 2},
        )
    else:
        ax.scatter(sizes_nonzero, masses_nonzero, alpha=0.6, s=50, c="steelblue", edgecolors="black", linewidths=0.5)
        z = np.polyfit(sizes_nonzero, masses_nonzero, 1)
        xs = np.linspace(float(np.min(sizes_nonzero)), float(np.max(sizes_nonzero)), 100)
        ax.plot(xs, z[0] * xs + z[1], color="red", linewidth=2)

    z = np.polyfit(sizes_nonzero, masses_nonzero, 1)
    ax.set_xlabel("Maximum Cluster Size (voxels)", fontsize=12, fontweight="bold")
    ax.set_ylabel("Maximum Cluster Mass (sum of t-statistics)", fontsize=12, fontweight="bold")
    ax.set_title(
        f"Cluster Size vs Cluster Mass Correlation\nPearson r = {r_value:.3f} (p = {p_value:.2e})",
        fontsize=14,
        fontweight="bold",
    )

    textstr = (
        f"n = {len(sizes_nonzero)} permutations\n"
        f"r = {r_value:.3f}\n"
        f"p = {p_value:.2e}\n"
        f"Linear fit: y = {z[0]:.2f}x + {z[1]:.2f}"
    )
    props = dict(boxstyle="round", facecolor="wheat", alpha=0.8)
    ax.text(0.05, 0.95, textstr, transform=ax.transAxes, fontsize=11, verticalalignment="top", bbox=props)

    ax.grid(True, alpha=0.3)
    fig.tight_layout()

    return savefig_close(fig, output_file, fmt="pdf", opts=SaveFigOptions(dpi=dpi))

plot_permutation_null_distribution

plot_permutation_null_distribution(null_distribution: np.ndarray, threshold: float, observed_clusters: Sequence[Mapping[str, float]], output_file: str, *, alpha: float = 0.05, cluster_stat: str = 'size', dpi: int = 300) -> str

Plot permutation null distribution with threshold and observed clusters.

Source code in tit/plotting/stats.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def plot_permutation_null_distribution(
    null_distribution: np.ndarray,
    threshold: float,
    observed_clusters: Sequence[Mapping[str, float]],
    output_file: str,
    *,
    alpha: float = 0.05,
    cluster_stat: str = "size",
    dpi: int = 300,
) -> str:
    """
    Plot permutation null distribution with threshold and observed clusters.
    """
    ensure_headless_matplotlib_backend()
    import matplotlib.pyplot as plt
    import seaborn as sns

    sns.set_style("whitegrid")
    sns.set_context("notebook", font_scale=1.0)

    fig, ax = plt.subplots(figsize=(10, 6))

    # Labels based on cluster statistic
    if cluster_stat == "size":
        x_label = "Maximum Cluster Size (voxels)"
        title = "Permutation Null Distribution of Maximum Cluster Sizes"
        threshold_label = f"Discrete Threshold (p<{alpha}): {threshold:.1f} voxels"
    else:
        x_label = "Maximum Cluster Mass (sum of t-statistics)"
        title = "Permutation Null Distribution of Maximum Cluster Mass"
        threshold_label = f"Discrete Threshold (p<{alpha}): {threshold:.2f}"

    # Histogram
    if sns is not None:
        sns.histplot(
            null_distribution,
            bins=200,
            alpha=0.7,
            color="gray",
            edgecolor="black",
            label="Null Distribution",
            ax=ax,
        )
    else:
        ax.hist(null_distribution, bins=200, alpha=0.7, color="gray", edgecolor="black", label="Null Distribution")

    # Threshold line
    ax.axvline(threshold, color="red", linestyle="--", linewidth=2, label=threshold_label)

    # Observed clusters
    sig_plotted = False
    nonsig_plotted = False
    for cluster in observed_clusters:
        stat_value = float(cluster["stat_value"])
        p_value = cluster.get("p_value", None)
        if p_value is not None:
            is_significant = float(p_value) < 0.05
        else:
            is_significant = stat_value > threshold

        color = "green" if is_significant else "orange"
        label = None
        if is_significant and not sig_plotted:
            label = "Significant Clusters (p<0.05)"
            sig_plotted = True
        elif (not is_significant) and (not nonsig_plotted):
            label = "Non-significant Clusters (p≥0.05)"
            nonsig_plotted = True

        ax.axvline(stat_value, color=color, linestyle="-", linewidth=2, alpha=0.7, label=label)

    ax.set_xlabel(x_label, fontsize=12)
    ax.set_ylabel("Frequency", fontsize=12)
    ax.set_title(title, fontsize=14, fontweight="bold")
    ax.legend(loc="upper right", fontsize=10)
    ax.grid(True, alpha=0.3)
    fig.tight_layout()

    return savefig_close(fig, output_file, fmt="pdf", opts=SaveFigOptions(dpi=dpi))

prepare_config_from_csv

prepare_config_from_csv(csv_file, analysis_type='group_comparison')

Load subject configurations from CSV file

Parameters:

csv_file : str Path to CSV file analysis_type : str Either 'group_comparison' or 'correlation'

Returns:

list of dict : Subject configurations

Source code in tit/stats/permutation_analysis.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def prepare_config_from_csv(csv_file, analysis_type='group_comparison'):
    """
    Load subject configurations from CSV file

    Parameters:
    -----------
    csv_file : str
        Path to CSV file
    analysis_type : str
        Either 'group_comparison' or 'correlation'

    Returns:
    --------
    list of dict : Subject configurations
    """
    df = pd.read_csv(csv_file)

    if analysis_type == 'group_comparison':
        # Validate required columns for group comparison
        required_cols = ['subject_id', 'simulation_name', 'response']
        for col in required_cols:
            if col not in df.columns:
                raise ValueError(f"CSV file missing required column: '{col}' for group comparison")

        configs = []
        for _, row in df.iterrows():
            # Handle both 'sub-XXX' and 'XXX' formats
            subject_id = str(row['subject_id']).replace('sub-', '')

            configs.append({
                'subject_id': subject_id,
                'simulation_name': row['simulation_name'],
                'response': int(row['response'])
            })

    elif analysis_type == 'correlation':
        # Validate required columns for correlation
        required_cols = ['subject_id', 'simulation_name', 'effect_size']
        for col in required_cols:
            if col not in df.columns:
                raise ValueError(f"CSV file missing required column: '{col}' for correlation analysis")

        configs = []
        skipped_rows = 0
        for _, row in df.iterrows():
            # Skip rows with missing required fields
            if pd.isna(row['subject_id']) or pd.isna(row['simulation_name']) or pd.isna(row['effect_size']):
                skipped_rows += 1
                continue

            # Handle both 'sub-XXX' and 'XXX' formats
            # Also handle float-to-string conversion (e.g., 101.0 -> "101")
            raw_id = row['subject_id']
            if isinstance(raw_id, float):
                # Convert float to int then to string (101.0 -> "101")
                subject_id = str(int(raw_id))
            else:
                subject_id = str(raw_id).replace('sub-', '')
                # Also handle string representations of floats like "101.0"
                if subject_id.endswith('.0'):
                    subject_id = subject_id[:-2]

            config = {
                'subject_id': subject_id,
                'simulation_name': str(row['simulation_name']),
                'effect_size': float(row['effect_size'])
            }

            # Add weight if present
            if 'weight' in df.columns and pd.notna(row.get('weight')):
                config['weight'] = float(row['weight'])

            configs.append(config)

        if skipped_rows > 0:
            print(f"Note: Skipped {skipped_rows} rows with missing required fields")

        if len(configs) == 0:
            raise ValueError("No valid subject configurations found in CSV file")

    else:
        raise ValueError(f"Unknown analysis_type: {analysis_type}")

    return configs

run_analysis

run_analysis(subject_configs, analysis_name, config=None, output_callback=None, callback_handler=None, progress_callback=None, stop_callback=None)

Run unified cluster-based permutation analysis

Parameters:

subject_configs : list of dict or str Either a list of subject configurations or path to CSV file analysis_name : str Name for this analysis (used for output directory) config : dict, optional Configuration dictionary (merged with defaults based on analysis_type) output_callback : callable, optional Callback function for status updates (for GUI integration) callback_handler : logging.Handler, optional Callback handler for GUI console integration progress_callback : callable, optional Callback function for progress updates stop_callback : callable, optional Callback function to check if analysis should be stopped

Returns:

dict : Results dictionary (structure depends on analysis_type)

Source code in tit/stats/permutation_analysis.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
def run_analysis(subject_configs, analysis_name, config=None, output_callback=None,
                 callback_handler=None, progress_callback=None, stop_callback=None):
    """
    Run unified cluster-based permutation analysis

    Parameters:
    -----------
    subject_configs : list of dict or str
        Either a list of subject configurations or path to CSV file
    analysis_name : str
        Name for this analysis (used for output directory)
    config : dict, optional
        Configuration dictionary (merged with defaults based on analysis_type)
    output_callback : callable, optional
        Callback function for status updates (for GUI integration)
    callback_handler : logging.Handler, optional
        Callback handler for GUI console integration
    progress_callback : callable, optional
        Callback function for progress updates
    stop_callback : callable, optional
        Callback function to check if analysis should be stopped

    Returns:
    --------
    dict : Results dictionary (structure depends on analysis_type)
    """
    # Determine analysis type from config or default
    analysis_type = 'group_comparison'  # default
    if config and 'analysis_type' in config:
        analysis_type = config['analysis_type']

    # Merge config with appropriate defaults
    if analysis_type == 'group_comparison':
        CONFIG = DEFAULT_CONFIG_GROUP_COMPARISON.copy()
    elif analysis_type == 'correlation':
        CONFIG = DEFAULT_CONFIG_CORRELATION.copy()
    else:
        raise ValueError(f"Unknown analysis_type: {analysis_type}")

    if config:
        CONFIG.update(config)

    # Ensure analysis_type is set in CONFIG
    CONFIG['analysis_type'] = analysis_type

    # Callback helper
    def log_callback(msg):
        if output_callback:
            output_callback(msg)

    # Start timing
    analysis_start_time = time.time()

    # Set up output directory
    pm = get_path_manager() if get_path_manager else None
    if pm:
        project_dir = pm.project_dir
        derivatives_dir = pm.get_derivatives_dir()
        output_dir = os.path.join(
            derivatives_dir,
            const.DIR_TI_TOOLBOX,
            'stats',
            analysis_type,
            analysis_name
        )
    else:
        project_dir = os.environ.get('PROJECT_DIR', '/mnt')
        output_dir = os.path.join(
            project_dir,
            'derivatives',
            'tit',
            'stats',
            analysis_type,
            analysis_name
        )

    os.makedirs(output_dir, exist_ok=True)

    # Set up logging
    if callback_handler:
        logger, log_file = setup_logging(output_dir, analysis_type, callback_handler)
    else:
        logger, log_file = setup_logging(output_dir, analysis_type)

    # Log header
    analysis_title = "CLUSTER-BASED PERMUTATION TESTING" if analysis_type == 'group_comparison' else "CORRELATION-BASED CLUSTER PERMUTATION TESTING"
    subtitle = "" if analysis_type == 'group_comparison' else "(ACES-style analysis for continuous outcomes)"

    logger.info("="*70)
    logger.info(analysis_title)
    if subtitle:
        logger.info(subtitle)
    logger.info("="*70)
    logger.info(f"Analysis started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    logger.info(f"Analysis name: {analysis_name}")
    logger.info(f"Analysis type: {analysis_type}")
    logger.info(f"Output directory: {output_dir}")
    logger.info(f"Log file: {log_file}")
    logger.info("")

    log_callback(f"Starting {analysis_type} analysis: {analysis_name}")

    # Construct nifti_file_pattern from tissue_type
    if CONFIG.get('nifti_file_pattern') is None:
        tissue_type = CONFIG.get('tissue_type', 'grey')
        if tissue_type == 'grey':
            CONFIG['nifti_file_pattern'] = 'grey_{simulation_name}_TI_MNI_MNI_TI_max.nii.gz'
        elif tissue_type == 'white':
            CONFIG['nifti_file_pattern'] = 'white_{simulation_name}_TI_MNI_MNI_TI_max.nii.gz'
        elif tissue_type == 'all':
            CONFIG['nifti_file_pattern'] = '{simulation_name}_TI_MNI_MNI_TI_max.nii.gz'
        else:
            raise ValueError(f"Invalid tissue_type: '{tissue_type}'. Must be 'grey', 'white', or 'all'")

    # Log configuration
    logger.info("CONFIGURATION:")
    if analysis_type == 'group_comparison':
        logger.info(f"  Statistical test: {CONFIG['test_type'].capitalize()} t-test")
        alt_text = {
            'two-sided': 'two-sided (≠)',
            'greater': f"one-sided ({CONFIG['group1_name']} > {CONFIG['group2_name']})",
            'less': f"one-sided ({CONFIG['group1_name']} < {CONFIG['group2_name']})"
        }
        logger.info(f"  Alternative hypothesis: {alt_text.get(CONFIG['alternative'], CONFIG['alternative'])}")
    else:
        logger.info(f"  Correlation type: {CONFIG['correlation_type'].capitalize()}")

    cluster_stat_name = "Cluster Size" if CONFIG['cluster_stat'] == 'size' else "Cluster Mass"
    logger.info(f"  Cluster statistic: {cluster_stat_name}")
    logger.info(f"  Cluster threshold: {CONFIG['cluster_threshold']}")
    logger.info(f"  Number of permutations: {CONFIG['n_permutations']}")
    logger.info(f"  Alpha level: {CONFIG['alpha']}")
    logger.info(f"  Parallel jobs: {CONFIG['n_jobs']}")
    logger.info(f"  Tissue type: {CONFIG.get('tissue_type', 'grey')}")
    logger.info(f"  NIfTI pattern: {CONFIG['nifti_file_pattern']}")
    logger.info("")

    # Load subject configurations
    if isinstance(subject_configs, str):
        logger.info(f"Loading subject configurations from: {subject_configs}")
        subject_configs = prepare_config_from_csv(subject_configs, analysis_type)

    # Branch based on analysis type
    if analysis_type == 'group_comparison':
        return _run_group_comparison_analysis(
            subject_configs, CONFIG, output_dir, logger, log_callback,
            analysis_start_time, log_file, stop_callback
        )
    else:  # correlation
        return _run_correlation_analysis(
            subject_configs, CONFIG, output_dir, logger, log_callback,
            analysis_start_time, log_file, stop_callback
        )

setup_logging

setup_logging(output_dir, analysis_type='group_comparison', callback_handler=None)

Set up logging for unified analysis

Parameters:

output_dir : str Directory where log file will be saved analysis_type : str Type of analysis for log naming callback_handler : logging.Handler, optional Callback handler for GUI integration

Returns:

logger : logging.Logger Configured logger instance log_file : str Path to log file

Source code in tit/stats/permutation_analysis.py
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
def setup_logging(output_dir, analysis_type='group_comparison', callback_handler=None):
    """
    Set up logging for unified analysis

    Parameters:
    -----------
    output_dir : str
        Directory where log file will be saved
    analysis_type : str
        Type of analysis for log naming
    callback_handler : logging.Handler, optional
        Callback handler for GUI integration

    Returns:
    --------
    logger : logging.Logger
        Configured logger instance
    log_file : str
        Path to log file
    """
    if logging_util is None:
        raise ImportError("logging_util module not available")

    # Create timestamp for log file
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    log_file = os.path.join(output_dir, f"{analysis_type}_analysis_{timestamp}.log")

    # Create logger
    logger_name = f"{'GroupComparison' if analysis_type == 'group_comparison' else 'Correlation'}Analysis"
    logger = logging_util.get_logger(logger_name, log_file, overwrite=True)

    # If callback handler provided (for GUI), suppress console output and add callback
    if callback_handler:
        logging_util.suppress_console_output(logger)
        logger.addHandler(callback_handler)

    # Configure external loggers
    external_loggers = ['scipy', 'numpy', 'nibabel', 'pandas', 'matplotlib']
    logging_util.configure_external_loggers(external_loggers, logger)

    return logger, log_file

ttest_voxelwise

ttest_voxelwise(responders, non_responders, test_type='unpaired', alternative='two-sided', verbose=True)

Perform vectorized t-test (paired or unpaired) at each voxel.

Uses vectorized computation for optimal performance.

Parameters:

responders : ndarray (x, y, z, n_subjects) Responder data (group 1) non_responders : ndarray (x, y, z, n_subjects) Non-responder data (group 2) test_type : str Either 'paired' or 'unpaired' t-test alternative : {'two-sided', 'greater', 'less'}, optional Defines the alternative hypothesis (default: 'two-sided'): * 'two-sided': means are different (responders ≠ non-responders) * 'greater': responders have higher values (responders > non-responders) * 'less': responders have lower values (responders < non-responders) verbose : bool Print progress information

Returns:

p_values : ndarray (x, y, z) P-value at each voxel t_statistics : ndarray (x, y, z) T-statistic at each voxel valid_mask : ndarray (x, y, z) Boolean mask of valid voxels

Source code in tit/stats/stats_utils.py
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
def ttest_voxelwise(responders, non_responders, test_type='unpaired', alternative='two-sided', verbose=True):
    """
    Perform vectorized t-test (paired or unpaired) at each voxel.

    Uses vectorized computation for optimal performance.

    Parameters:
    -----------
    responders : ndarray (x, y, z, n_subjects)
        Responder data (group 1)
    non_responders : ndarray (x, y, z, n_subjects)
        Non-responder data (group 2)
    test_type : str
        Either 'paired' or 'unpaired' t-test
    alternative : {'two-sided', 'greater', 'less'}, optional
        Defines the alternative hypothesis (default: 'two-sided'):
        * 'two-sided': means are different (responders ≠ non-responders)
        * 'greater': responders have higher values (responders > non-responders)
        * 'less': responders have lower values (responders < non-responders)
    verbose : bool
        Print progress information

    Returns:
    --------
    p_values : ndarray (x, y, z)
        P-value at each voxel
    t_statistics : ndarray (x, y, z)
        T-statistic at each voxel
    valid_mask : ndarray (x, y, z)
        Boolean mask of valid voxels
    """
    if verbose:
        test_name = "Paired" if test_type == 'paired' else "Unpaired (Independent Samples)"
        alt_text = ""
        if alternative == 'greater':
            alt_text = " (one-sided: responders > non-responders)"
        elif alternative == 'less':
            alt_text = " (one-sided: responders < non-responders)"
        print(f"\nPerforming voxelwise {test_name} t-tests{alt_text} (vectorized)...")

    # Validate test type
    if test_type not in ['paired', 'unpaired']:
        raise ValueError("test_type must be 'paired' or 'unpaired'")

    # For paired test, check that sample sizes match
    if test_type == 'paired':
        if responders.shape[-1] != non_responders.shape[-1]:
            raise ValueError(f"Paired t-test requires equal sample sizes. "
                           f"Got {responders.shape[-1]} vs {non_responders.shape[-1]} subjects")

    shape = responders.shape[:3]
    p_values = np.ones(shape)
    t_statistics = np.zeros(shape)

    # Create mask of valid voxels (non-zero in at least some subjects)
    responder_mask = np.any(responders > 0, axis=-1)
    non_responder_mask = np.any(non_responders > 0, axis=-1)
    valid_mask = responder_mask | non_responder_mask

    total_voxels = np.sum(valid_mask)
    if verbose:
        print(f"Testing {total_voxels} valid voxels using vectorized approach...")

    # Get coordinates of valid voxels
    valid_coords = np.argwhere(valid_mask)

    # Vectorized approach: compute all tests at once
    n_valid = len(valid_coords)
    n_resp = responders.shape[-1]
    n_non_resp = non_responders.shape[-1]

    # Pre-extract voxel data using advanced indexing (faster than loop)
    # This avoids Python loop overhead by using NumPy's optimized indexing
    idx_i, idx_j, idx_k = valid_coords[:, 0], valid_coords[:, 1], valid_coords[:, 2]

    # Extract all valid voxels at once using fancy indexing
    resp_extracted = responders[idx_i, idx_j, idx_k, :].astype(np.float32)  # (n_valid, n_resp)
    non_resp_extracted = non_responders[idx_i, idx_j, idx_k, :].astype(np.float32)  # (n_valid, n_non_resp)

    # Concatenate horizontally
    voxel_data = np.concatenate([resp_extracted, non_resp_extracted], axis=1)

    # Use vectorized t-test functions
    if test_type == 'paired':
        t_stats_1d, p_values_1d = ttest_rel(
            voxel_data, n_resp, alternative=alternative
        )
    else:
        t_stats_1d, p_values_1d = ttest_ind(
            voxel_data, n_resp, n_non_resp, alternative=alternative
        )

    # Map results back to 3D volume coordinates using advanced indexing (faster than loop)
    t_statistics[idx_i, idx_j, idx_k] = t_stats_1d
    p_values[idx_i, idx_j, idx_k] = p_values_1d

    return p_values, t_statistics, valid_mask

Permutation analysis (tit.stats.permutation_analysis)

Unified Cluster-Based Permutation Testing for TI-Toolbox

This script provides unified cluster-based permutation testing for both: 1. Group comparison analysis (binary responder/non-responder classification) 2. Correlation analysis (continuous outcome measures)

Supports both t-test and correlation-based statistical approaches with cluster-based permutation correction for multiple comparisons.

Usage: from tit.stats import permutation_analysis # Group comparison results = permutation_analysis.run_analysis( subject_configs, analysis_name, analysis_type='group_comparison' ) # Correlation analysis results = permutation_analysis.run_analysis( subject_configs, analysis_name, analysis_type='correlation' )

For GUI usage, see gui/extensions/permutation_analysis.py

load_subject_data

load_subject_data(subject_configs, nifti_file_pattern=None, analysis_type='group_comparison')

Unified data loading for both group comparison and correlation analysis

Parameters:

subject_configs : list of dict Subject configurations (format depends on analysis_type) nifti_file_pattern : str, optional Pattern for NIfTI files analysis_type : str Either 'group_comparison' or 'correlation'

Returns:

For group_comparison: responders, non_responders, template_img, resp_ids, non_resp_ids For correlation: subject_data, effect_sizes, weights, template_img, subject_ids

Source code in tit/stats/permutation_analysis.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def load_subject_data(subject_configs, nifti_file_pattern=None, analysis_type='group_comparison'):
    """
    Unified data loading for both group comparison and correlation analysis

    Parameters:
    -----------
    subject_configs : list of dict
        Subject configurations (format depends on analysis_type)
    nifti_file_pattern : str, optional
        Pattern for NIfTI files
    analysis_type : str
        Either 'group_comparison' or 'correlation'

    Returns:
    --------
    For group_comparison:
        responders, non_responders, template_img, resp_ids, non_resp_ids
    For correlation:
        subject_data, effect_sizes, weights, template_img, subject_ids
    """
    if analysis_type == 'group_comparison':
        return load_subject_data_group_comparison(subject_configs, nifti_file_pattern)
    elif analysis_type == 'correlation':
        return load_subject_data_correlation(subject_configs, nifti_file_pattern)
    else:
        raise ValueError(f"Unknown analysis_type: {analysis_type}")

load_subject_data_correlation

load_subject_data_correlation(subject_configs, nifti_file_pattern=None)

Load subject data for correlation analysis (continuous outcomes)

Source code in tit/stats/permutation_analysis.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def load_subject_data_correlation(subject_configs, nifti_file_pattern=None):
    """
    Load subject data for correlation analysis (continuous outcomes)
    """
    if nifti_file_pattern is None:
        nifti_file_pattern = "grey_{simulation_name}_TI_MNI_MNI_TI_max.nii.gz"

    # Check for required fields
    required_fields = ['subject_id', 'simulation_name', 'effect_size']
    for config in subject_configs:
        for field in required_fields:
            if field not in config:
                raise ValueError(f"Missing required field '{field}' in subject config")

    # Load all subjects
    subject_data, template_img, subject_ids = nifti.load_group_data_ti_toolbox(
        subject_configs,
        nifti_file_pattern=nifti_file_pattern,
        dtype=np.float32
    )

    # Build a lookup from subject_id to config for successfully loaded subjects
    config_lookup = {c['subject_id']: c for c in subject_configs}

    # Extract effect sizes and weights only for successfully loaded subjects
    effect_sizes = []
    weights_list = []
    has_weights = 'weight' in subject_configs[0]

    for sid in subject_ids:
        config = config_lookup[sid]
        effect_sizes.append(config['effect_size'])
        if has_weights:
            weights_list.append(config.get('weight', 1.0))

    effect_sizes = np.array(effect_sizes, dtype=np.float64)
    weights = np.array(weights_list, dtype=np.float64) if has_weights else None

    print(f"\nLoaded {len(subject_ids)} subjects: {subject_ids}")
    print(f"Effect sizes: mean={np.mean(effect_sizes):.3f}, std={np.std(effect_sizes):.3f}")
    print(f"Effect size range: [{np.min(effect_sizes):.3f}, {np.max(effect_sizes):.3f}]")
    if weights is not None:
        print(f"Weights: mean={np.mean(weights):.3f}, range=[{np.min(weights):.3f}, {np.max(weights):.3f}]")
    print(f"Data shape: {subject_data.shape}")

    return subject_data, effect_sizes, weights, template_img, subject_ids

load_subject_data_group_comparison

load_subject_data_group_comparison(subject_configs, nifti_file_pattern=None)

Load subject data for group comparison analysis (binary outcomes)

Source code in tit/stats/permutation_analysis.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def load_subject_data_group_comparison(subject_configs, nifti_file_pattern=None):
    """
    Load subject data for group comparison analysis (binary outcomes)
    """
    if nifti_file_pattern is None:
        nifti_file_pattern = "grey_{simulation_name}_TI_MNI_MNI_TI_max.nii.gz"

    # Separate configs by response
    responder_configs = [c for c in subject_configs if c['response'] == 1]
    non_responder_configs = [c for c in subject_configs if c['response'] == 0]

    if len(responder_configs) == 0 or len(non_responder_configs) == 0:
        raise ValueError("Need at least one responder and one non-responder")

    # Load responders
    responders, template_img, responder_ids = nifti.load_group_data_ti_toolbox(
        responder_configs,
        nifti_file_pattern=nifti_file_pattern,
        dtype=np.float32
    )

    # Load non-responders
    non_responders, _, non_responder_ids = nifti.load_group_data_ti_toolbox(
        non_responder_configs,
        nifti_file_pattern=nifti_file_pattern,
        dtype=np.float32
    )

    print(f"\nLoaded {len(responder_ids)} responders: {responder_ids}")
    print(f"Loaded {len(non_responder_ids)} non-responders: {non_responder_ids}")
    print(f"Responders shape: {responders.shape}")
    print(f"Non-responders shape: {non_responders.shape}")

    return responders, non_responders, template_img, responder_ids, non_responder_ids

main

main()

Command-line interface for unified permutation analysis

Source code in tit/stats/permutation_analysis.py
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
def main():
    """
    Command-line interface for unified permutation analysis
    """
    import argparse
    import multiprocessing

    # Ensure proper multiprocessing initialization
    if hasattr(multiprocessing, 'set_start_method'):
        try:
            multiprocessing.set_start_method('fork', force=True)
        except RuntimeError:
            pass

    parser = argparse.ArgumentParser(
        description='Unified Cluster-Based Permutation Testing for TI-Toolbox',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Group comparison analysis
  python permutation_analysis.py --csv subjects.csv --name my_analysis \\
      --analysis-type group_comparison

  # Correlation analysis
  python permutation_analysis.py --csv subjects.csv --name my_analysis \\
      --analysis-type correlation

Group Comparison CSV Format:
  subject_id,simulation_name,response
  070,ICP_RHIPPO,1
  071,ICP_RHIPPO,0

Correlation CSV Format:
  subject_id,simulation_name,effect_size,weight
  070,ICP_RHIPPO,0.45,25
  071,ICP_RHIPPO,0.32,30

Tissue Types:
  --tissue-type controls which NIfTI files are loaded:
    grey  : grey_{simulation_name}_TI_MNI_MNI_TI_max.nii.gz
    white : white_{simulation_name}_TI_MNI_MNI_TI_max.nii.gz
    all   : {simulation_name}_TI_MNI_MNI_TI_max.nii.gz (no prefix)
        """
    )

    # Required arguments
    parser.add_argument('--csv', '-c', required=True,
                        help='Path to CSV file with subject configurations')
    parser.add_argument('--name', '-n', required=True,
                        help='Analysis name (used for output directory)')
    parser.add_argument('--analysis-type', required=True,
                        choices=['group_comparison', 'correlation'],
                        help='Type of analysis to perform')

    # Statistical parameters (common)
    parser.add_argument('--cluster-threshold', '-t', type=float, default=0.05,
                        help='P-value threshold for cluster formation (default: 0.05)')
    parser.add_argument('--cluster-stat', choices=['mass', 'size'], default='mass',
                        help='Cluster statistic: mass (sum of t-values) or size (voxel count) (default: mass)')
    parser.add_argument('--n-permutations', '-p', type=int, default=1000,
                        help='Number of permutations (default: 1000)')
    parser.add_argument('--alpha', '-a', type=float, default=0.05,
                        help='Cluster-level significance threshold (default: 0.05)')
    parser.add_argument('--n-jobs', '-j', type=int, default=-1,
                        help='Number of parallel jobs: -1=all cores, 1=sequential (default: -1)')

    # Group comparison specific parameters
    parser.add_argument('--test-type', choices=['unpaired', 'paired'], default='unpaired',
                        help='Statistical test type for group comparison (default: unpaired)')
    parser.add_argument('--alternative', choices=['two-sided', 'greater', 'less'],
                        default='two-sided',
                        help='Alternative hypothesis for group comparison (default: two-sided)')

    # Correlation specific parameters
    parser.add_argument('--correlation-type', choices=['pearson', 'spearman'],
                        default='pearson',
                        help='Type of correlation for correlation analysis (default: pearson)')
    parser.add_argument('--use-weights', action='store_true',
                        help='Use weights from CSV if available (correlation analysis)')

    # Optional parameters
    parser.add_argument('--tissue-type', choices=['grey', 'white', 'all'],
                        default='grey',
                        help='Tissue type for NIfTI files: grey (grey matter), '
                             'white (white matter), or all (all tissues, no prefix). '
                             '(default: grey)')
    parser.add_argument('--nifti-pattern', default=None,
                        help='Custom NIfTI filename pattern (overrides --tissue-type)')
    parser.add_argument('--quiet', '-q', action='store_true',
                        help='Suppress progress output')

    args = parser.parse_args()

    # Build configuration
    config = {
        'analysis_type': args.analysis_type,
        'cluster_threshold': args.cluster_threshold,
        'cluster_stat': args.cluster_stat,
        'n_permutations': args.n_permutations,
        'alpha': args.alpha,
        'n_jobs': args.n_jobs,
        'tissue_type': args.tissue_type,
        'nifti_file_pattern': args.nifti_pattern,  # None triggers auto-generation
    }

    # Add analysis-specific parameters
    if args.analysis_type == 'group_comparison':
        config.update({
            'test_type': args.test_type,
            'alternative': args.alternative,
        })
    else:  # correlation
        config.update({
            'correlation_type': args.correlation_type,
            'use_weights': args.use_weights,
        })

    # Print header
    if not args.quiet:
        print("="*70)
        print("UNIFIED CLUSTER-BASED PERMUTATION TESTING - TI-TOOLBOX")
        print("="*70)
        print(f"Analysis type: {args.analysis_type}")
        print(f"CSV file: {args.csv}")
        print(f"Analysis name: {args.name}")
        print(f"Tissue type: {args.tissue_type}")
        print(f"Permutations: {args.n_permutations}")
        print(f"Parallel jobs: {args.n_jobs if args.n_jobs != -1 else 'all cores'}")
        print(f"Cluster statistic: {args.cluster_stat}")
        print(f"Cluster threshold: p < {args.cluster_threshold}")
        print("="*70)
        print()

    # Run analysis
    try:
        results = run_analysis(
            subject_configs=args.csv,
            analysis_name=args.name,
            config=config,
            output_callback=None if args.quiet else print
        )

        # Print summary
        if not args.quiet:
            print()
            print("="*70)
            print("ANALYSIS COMPLETE!")
            print("="*70)
            print(f"Output directory: {results['output_dir']}")
            if 'n_significant_clusters' in results:
                print(f"Significant clusters: {results['n_significant_clusters']}")
            if 'n_significant_voxels' in results:
                print(f"Significant voxels: {results['n_significant_voxels']}")
            print(f"Analysis time: {results['analysis_time']:.1f} seconds")
            print("="*70)

        return 0

    except Exception as e:
        print(f"\nERROR: {str(e)}", file=sys.stderr)
        if not args.quiet:
            import traceback
            traceback.print_exc()
        return 1

prepare_config_from_csv

prepare_config_from_csv(csv_file, analysis_type='group_comparison')

Load subject configurations from CSV file

Parameters:

csv_file : str Path to CSV file analysis_type : str Either 'group_comparison' or 'correlation'

Returns:

list of dict : Subject configurations

Source code in tit/stats/permutation_analysis.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def prepare_config_from_csv(csv_file, analysis_type='group_comparison'):
    """
    Load subject configurations from CSV file

    Parameters:
    -----------
    csv_file : str
        Path to CSV file
    analysis_type : str
        Either 'group_comparison' or 'correlation'

    Returns:
    --------
    list of dict : Subject configurations
    """
    df = pd.read_csv(csv_file)

    if analysis_type == 'group_comparison':
        # Validate required columns for group comparison
        required_cols = ['subject_id', 'simulation_name', 'response']
        for col in required_cols:
            if col not in df.columns:
                raise ValueError(f"CSV file missing required column: '{col}' for group comparison")

        configs = []
        for _, row in df.iterrows():
            # Handle both 'sub-XXX' and 'XXX' formats
            subject_id = str(row['subject_id']).replace('sub-', '')

            configs.append({
                'subject_id': subject_id,
                'simulation_name': row['simulation_name'],
                'response': int(row['response'])
            })

    elif analysis_type == 'correlation':
        # Validate required columns for correlation
        required_cols = ['subject_id', 'simulation_name', 'effect_size']
        for col in required_cols:
            if col not in df.columns:
                raise ValueError(f"CSV file missing required column: '{col}' for correlation analysis")

        configs = []
        skipped_rows = 0
        for _, row in df.iterrows():
            # Skip rows with missing required fields
            if pd.isna(row['subject_id']) or pd.isna(row['simulation_name']) or pd.isna(row['effect_size']):
                skipped_rows += 1
                continue

            # Handle both 'sub-XXX' and 'XXX' formats
            # Also handle float-to-string conversion (e.g., 101.0 -> "101")
            raw_id = row['subject_id']
            if isinstance(raw_id, float):
                # Convert float to int then to string (101.0 -> "101")
                subject_id = str(int(raw_id))
            else:
                subject_id = str(raw_id).replace('sub-', '')
                # Also handle string representations of floats like "101.0"
                if subject_id.endswith('.0'):
                    subject_id = subject_id[:-2]

            config = {
                'subject_id': subject_id,
                'simulation_name': str(row['simulation_name']),
                'effect_size': float(row['effect_size'])
            }

            # Add weight if present
            if 'weight' in df.columns and pd.notna(row.get('weight')):
                config['weight'] = float(row['weight'])

            configs.append(config)

        if skipped_rows > 0:
            print(f"Note: Skipped {skipped_rows} rows with missing required fields")

        if len(configs) == 0:
            raise ValueError("No valid subject configurations found in CSV file")

    else:
        raise ValueError(f"Unknown analysis_type: {analysis_type}")

    return configs

run_analysis

run_analysis(subject_configs, analysis_name, config=None, output_callback=None, callback_handler=None, progress_callback=None, stop_callback=None)

Run unified cluster-based permutation analysis

Parameters:

subject_configs : list of dict or str Either a list of subject configurations or path to CSV file analysis_name : str Name for this analysis (used for output directory) config : dict, optional Configuration dictionary (merged with defaults based on analysis_type) output_callback : callable, optional Callback function for status updates (for GUI integration) callback_handler : logging.Handler, optional Callback handler for GUI console integration progress_callback : callable, optional Callback function for progress updates stop_callback : callable, optional Callback function to check if analysis should be stopped

Returns:

dict : Results dictionary (structure depends on analysis_type)

Source code in tit/stats/permutation_analysis.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
def run_analysis(subject_configs, analysis_name, config=None, output_callback=None,
                 callback_handler=None, progress_callback=None, stop_callback=None):
    """
    Run unified cluster-based permutation analysis

    Parameters:
    -----------
    subject_configs : list of dict or str
        Either a list of subject configurations or path to CSV file
    analysis_name : str
        Name for this analysis (used for output directory)
    config : dict, optional
        Configuration dictionary (merged with defaults based on analysis_type)
    output_callback : callable, optional
        Callback function for status updates (for GUI integration)
    callback_handler : logging.Handler, optional
        Callback handler for GUI console integration
    progress_callback : callable, optional
        Callback function for progress updates
    stop_callback : callable, optional
        Callback function to check if analysis should be stopped

    Returns:
    --------
    dict : Results dictionary (structure depends on analysis_type)
    """
    # Determine analysis type from config or default
    analysis_type = 'group_comparison'  # default
    if config and 'analysis_type' in config:
        analysis_type = config['analysis_type']

    # Merge config with appropriate defaults
    if analysis_type == 'group_comparison':
        CONFIG = DEFAULT_CONFIG_GROUP_COMPARISON.copy()
    elif analysis_type == 'correlation':
        CONFIG = DEFAULT_CONFIG_CORRELATION.copy()
    else:
        raise ValueError(f"Unknown analysis_type: {analysis_type}")

    if config:
        CONFIG.update(config)

    # Ensure analysis_type is set in CONFIG
    CONFIG['analysis_type'] = analysis_type

    # Callback helper
    def log_callback(msg):
        if output_callback:
            output_callback(msg)

    # Start timing
    analysis_start_time = time.time()

    # Set up output directory
    pm = get_path_manager() if get_path_manager else None
    if pm:
        project_dir = pm.project_dir
        derivatives_dir = pm.get_derivatives_dir()
        output_dir = os.path.join(
            derivatives_dir,
            const.DIR_TI_TOOLBOX,
            'stats',
            analysis_type,
            analysis_name
        )
    else:
        project_dir = os.environ.get('PROJECT_DIR', '/mnt')
        output_dir = os.path.join(
            project_dir,
            'derivatives',
            'tit',
            'stats',
            analysis_type,
            analysis_name
        )

    os.makedirs(output_dir, exist_ok=True)

    # Set up logging
    if callback_handler:
        logger, log_file = setup_logging(output_dir, analysis_type, callback_handler)
    else:
        logger, log_file = setup_logging(output_dir, analysis_type)

    # Log header
    analysis_title = "CLUSTER-BASED PERMUTATION TESTING" if analysis_type == 'group_comparison' else "CORRELATION-BASED CLUSTER PERMUTATION TESTING"
    subtitle = "" if analysis_type == 'group_comparison' else "(ACES-style analysis for continuous outcomes)"

    logger.info("="*70)
    logger.info(analysis_title)
    if subtitle:
        logger.info(subtitle)
    logger.info("="*70)
    logger.info(f"Analysis started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    logger.info(f"Analysis name: {analysis_name}")
    logger.info(f"Analysis type: {analysis_type}")
    logger.info(f"Output directory: {output_dir}")
    logger.info(f"Log file: {log_file}")
    logger.info("")

    log_callback(f"Starting {analysis_type} analysis: {analysis_name}")

    # Construct nifti_file_pattern from tissue_type
    if CONFIG.get('nifti_file_pattern') is None:
        tissue_type = CONFIG.get('tissue_type', 'grey')
        if tissue_type == 'grey':
            CONFIG['nifti_file_pattern'] = 'grey_{simulation_name}_TI_MNI_MNI_TI_max.nii.gz'
        elif tissue_type == 'white':
            CONFIG['nifti_file_pattern'] = 'white_{simulation_name}_TI_MNI_MNI_TI_max.nii.gz'
        elif tissue_type == 'all':
            CONFIG['nifti_file_pattern'] = '{simulation_name}_TI_MNI_MNI_TI_max.nii.gz'
        else:
            raise ValueError(f"Invalid tissue_type: '{tissue_type}'. Must be 'grey', 'white', or 'all'")

    # Log configuration
    logger.info("CONFIGURATION:")
    if analysis_type == 'group_comparison':
        logger.info(f"  Statistical test: {CONFIG['test_type'].capitalize()} t-test")
        alt_text = {
            'two-sided': 'two-sided (≠)',
            'greater': f"one-sided ({CONFIG['group1_name']} > {CONFIG['group2_name']})",
            'less': f"one-sided ({CONFIG['group1_name']} < {CONFIG['group2_name']})"
        }
        logger.info(f"  Alternative hypothesis: {alt_text.get(CONFIG['alternative'], CONFIG['alternative'])}")
    else:
        logger.info(f"  Correlation type: {CONFIG['correlation_type'].capitalize()}")

    cluster_stat_name = "Cluster Size" if CONFIG['cluster_stat'] == 'size' else "Cluster Mass"
    logger.info(f"  Cluster statistic: {cluster_stat_name}")
    logger.info(f"  Cluster threshold: {CONFIG['cluster_threshold']}")
    logger.info(f"  Number of permutations: {CONFIG['n_permutations']}")
    logger.info(f"  Alpha level: {CONFIG['alpha']}")
    logger.info(f"  Parallel jobs: {CONFIG['n_jobs']}")
    logger.info(f"  Tissue type: {CONFIG.get('tissue_type', 'grey')}")
    logger.info(f"  NIfTI pattern: {CONFIG['nifti_file_pattern']}")
    logger.info("")

    # Load subject configurations
    if isinstance(subject_configs, str):
        logger.info(f"Loading subject configurations from: {subject_configs}")
        subject_configs = prepare_config_from_csv(subject_configs, analysis_type)

    # Branch based on analysis type
    if analysis_type == 'group_comparison':
        return _run_group_comparison_analysis(
            subject_configs, CONFIG, output_dir, logger, log_callback,
            analysis_start_time, log_file, stop_callback
        )
    else:  # correlation
        return _run_correlation_analysis(
            subject_configs, CONFIG, output_dir, logger, log_callback,
            analysis_start_time, log_file, stop_callback
        )

setup_logging

setup_logging(output_dir, analysis_type='group_comparison', callback_handler=None)

Set up logging for unified analysis

Parameters:

output_dir : str Directory where log file will be saved analysis_type : str Type of analysis for log naming callback_handler : logging.Handler, optional Callback handler for GUI integration

Returns:

logger : logging.Logger Configured logger instance log_file : str Path to log file

Source code in tit/stats/permutation_analysis.py
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
def setup_logging(output_dir, analysis_type='group_comparison', callback_handler=None):
    """
    Set up logging for unified analysis

    Parameters:
    -----------
    output_dir : str
        Directory where log file will be saved
    analysis_type : str
        Type of analysis for log naming
    callback_handler : logging.Handler, optional
        Callback handler for GUI integration

    Returns:
    --------
    logger : logging.Logger
        Configured logger instance
    log_file : str
        Path to log file
    """
    if logging_util is None:
        raise ImportError("logging_util module not available")

    # Create timestamp for log file
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    log_file = os.path.join(output_dir, f"{analysis_type}_analysis_{timestamp}.log")

    # Create logger
    logger_name = f"{'GroupComparison' if analysis_type == 'group_comparison' else 'Correlation'}Analysis"
    logger = logging_util.get_logger(logger_name, log_file, overwrite=True)

    # If callback handler provided (for GUI), suppress console output and add callback
    if callback_handler:
        logging_util.suppress_console_output(logger)
        logger.addHandler(callback_handler)

    # Configure external loggers
    external_loggers = ['scipy', 'numpy', 'nibabel', 'pandas', 'matplotlib']
    logging_util.configure_external_loggers(external_loggers, logger)

    return logger, log_file