Skip to content

Analyzer (tit.analyzer)

Mesh analyzer (tit.analyzer.mesh_analyzer)

MeshAnalyzer: A tool for analyzing mesh-based neuroimaging data

This module provides functionality for analyzing field data in specific regions of interest, including spherical ROIs and cortical regions defined by an atlas. It works with SimNIBS mesh files and uses the cortical middle-layer surface mesh (via msh2cortex) for cortical/surface analyses.

Inputs: - SimNIBS mesh files (.msh) containing field data - Subject directory containing m2m files for atlas mapping - ROI specifications (coordinates, regions)

Outputs: - Statistical measures (mean, min, max) - ROI masks - Surface meshes for visualization - Visualization files (optional)

Example Usage: ```python # Initialize analyzer analyzer = MeshAnalyzer( field_mesh_path="/path/to/field.msh", field_name="normE", subject_dir="/path/to/m2m_subject", output_dir="/path/to/output" )

# Analyze a spherical ROI
sphere_results = analyzer.analyze_sphere(
    center_coordinates=[0, 0, 0],
    radius=10
)

# Analyze a cortical region
cortex_results = analyzer.analyze_cortex(
    atlas_type="DK40",
    target_region="superiorfrontal",
    visualize=True
)

# Analyze whole head
whole_head_results = analyzer.analyze_whole_head(
    atlas_type="DK40",
    visualize=True
)
```

Dependencies: - numpy - simnibs - subprocess (for msh2cortex operations)

MeshAnalyzer

MeshAnalyzer(field_mesh_path: str, field_name: str, subject_dir: str, output_dir: str, logger=None)

A class for analyzing mesh-based data from 3D models.

This class provides methods for analyzing field data in specific regions of interest, including spherical ROIs and cortical regions defined by an atlas. It works with SimNIBS mesh files and uses cortical middle-layer surface meshes for analysis.

Attributes: field_mesh_path (str): Path to the mesh file containing field data field_name (str): Name of the field to analyze in the mesh subject_dir (str): Directory containing subject data (m2m folder) output_dir (str): Directory where analysis results will be saved visualizer (MeshVisualizer): Instance of visualizer for generating plots _temp_dir (tempfile.TemporaryDirectory): Temporary directory for surface mesh _surface_mesh_path (str): Path to the generated surface mesh

Initialize the MeshAnalyzer class.

Args: field_mesh_path (str): Path to the mesh file containing the field data field_name (str): Name of the field to analyze subject_dir (str): Directory containing subject data (m2m folder) output_dir (str): Directory where analysis results will be saved logger: Optional logger instance to use. If None, creates its own.

Source code in tit/analyzer/mesh_analyzer.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def __init__(self, field_mesh_path: str, field_name: str, subject_dir: str, output_dir: str, logger=None):
    """
    Initialize the MeshAnalyzer class.

    Args:
        field_mesh_path (str): Path to the mesh file containing the field data
        field_name (str): Name of the field to analyze
        subject_dir (str): Directory containing subject data (m2m folder)
        output_dir (str): Directory where analysis results will be saved
        logger: Optional logger instance to use. If None, creates its own.
    """
    self.field_mesh_path = field_mesh_path
    self.field_name = field_name
    self.subject_dir = subject_dir
    self.output_dir = output_dir

    # Validate early (before logger/file handlers) so missing input paths fail
    # with the intended error rather than a secondary logging error.
    if not os.path.exists(field_mesh_path):
        raise FileNotFoundError(f"Field mesh file not found: {field_mesh_path}")

    # Set up logger - use provided logger or create a new one
    if logger is not None:
        # Create a child logger to distinguish mesh analyzer logs
        self.logger = logger.getChild('mesh_analyzer')
    else:
        # Create our own logger if none provided
        time_stamp = time.strftime('%Y%m%d_%H%M%S')

        # Extract subject ID from subject_dir (e.g., m2m_subject -> subject)
        subject_id = os.path.basename(self.subject_dir).split('_')[1] if '_' in os.path.basename(self.subject_dir) else os.path.basename(self.subject_dir)

        # Create derivatives/ti-toolbox/logs/sub-* directory structure
        pm = get_path_manager()
        log_dir = pm.path("ti_logs", subject_id=subject_id)
        os.makedirs(log_dir, exist_ok=True)

        # Create log file in the new directory
        log_file = os.path.join(log_dir, f'mesh_analyzer_{time_stamp}.log')
        self.logger = logging_util.get_logger('mesh_analyzer', log_file, overwrite=True)

    # Initialize visualizer with logger
    self.visualizer = MeshVisualizer(output_dir, self.logger)

    # Initialize temporary directory and surface mesh path
    self._temp_dir = None
    self._surface_mesh_path = None

    # Create output directory if it doesn't exist
    if not os.path.exists(output_dir):
        self.logger.debug(f"Creating output directory: {output_dir}")
        os.makedirs(output_dir)

    self.logger.debug(f"Mesh analyzer initialized successfully")
    self.logger.debug(f"Field mesh path: {field_mesh_path}")
    self.logger.debug(f"Field name: {field_name}")
    self.logger.debug(f"Subject directory: {subject_dir}")
    self.logger.debug(f"Output directory: {output_dir}")

__del__

__del__()

Cleanup temporary directory when the analyzer is destroyed.

Source code in tit/analyzer/mesh_analyzer.py
277
278
279
280
281
282
283
284
285
def __del__(self):
    """Cleanup temporary directory when the analyzer is destroyed."""
    if self._temp_dir is not None:
        try:
            self.logger.info("Cleaning up temporary directory...")
            self._temp_dir.cleanup()
        except (OSError, PermissionError):
            # Best-effort cleanup - directory may already be deleted or inaccessible
            pass

analyze_cortex

analyze_cortex(atlas_type, target_region, visualize=False)

Analyze a cortical region defined by an atlas.

Args: atlas_type: Type of atlas to use (e.g., 'DK40', 'HCP_MMP1') target_region: Name of the region to analyze visualize: Whether to generate visualization files

Returns: Dictionary containing analysis results

Source code in tit/analyzer/mesh_analyzer.py
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
def analyze_cortex(self, atlas_type, target_region, visualize=False):
    """
    Analyze a cortical region defined by an atlas.

    Args:
        atlas_type: Type of atlas to use (e.g., 'DK40', 'HCP_MMP1')
        target_region: Name of the region to analyze
        visualize: Whether to generate visualization files

    Returns:
        Dictionary containing analysis results
    """
    self.logger.info(f"Starting cortical analysis for region '{target_region}' using {atlas_type} atlas")

    try:
        surface_mesh_path = self._generate_surface_mesh()
        self.logger.info("Loading surface mesh...")
        gm_surf = simnibs.read_msh(surface_mesh_path)

        # Load the atlas
        self.logger.info(f"Loading atlas {atlas_type}...")
        atlas = simnibs.subject_atlas(atlas_type, self.subject_dir)

        # Verify region exists in atlas
        if target_region not in atlas:
            available_regions = sorted(atlas.keys())
            raise ValueError(f"Region '{target_region}' not found in {atlas_type} atlas. Available regions: {available_regions}")

        # Get ROI mask for this region
        roi_mask = atlas[target_region]

        # Check if we have any nodes in the ROI
        roi_nodes_count = np.sum(roi_mask)
        if roi_nodes_count == 0:
            self.logger.warning(f"No nodes found in the specified region '{target_region}'")
            results = {
                'mean_value': None,
                'max_value': None,
                'min_value': None,
                'focality': None,
                'normal_mean_value': None,
                'normal_max_value': None,
                'normal_min_value': None,
                'normal_focality': None
            }

            # Save results to CSV even if empty
            self.visualizer.save_results_to_csv(results, 'cortical', target_region, 'node')

            return results

        # Get the field values within the ROI
        field_values = gm_surf.field[self.field_name].value
        field_values_in_roi = field_values[roi_mask]

        # Filter for positive values in ROI (matching voxel analyzer behavior)
        positive_mask = field_values_in_roi > 0
        field_values_positive = field_values_in_roi[positive_mask]

        # Check if we have any positive values in the ROI
        positive_count = len(field_values_positive)
        if positive_count == 0:
            self.logger.warning(f"Warning: Region {target_region} has no positive values")
            results = {
                'mean_value': None,
                'max_value': None,
                'min_value': None,
                'focality': None,
                'normal_mean_value': None,
                'normal_max_value': None,
                'normal_min_value': None,
                'normal_focality': None
            }

            # Save results to CSV even if empty
            self.visualizer.save_results_to_csv(results, 'cortical', target_region, 'node')

            return results

        min_value = np.min(field_values_positive)
        node_areas = gm_surf.nodes_areas()
        if hasattr(node_areas, 'value'):
            node_areas = node_areas.value
        node_areas = np.asarray(node_areas)
        positive_node_areas = node_areas[roi_mask][positive_mask]
        whole_brain_positive_mask = field_values > 0
        whole_brain_node_areas = node_areas[whole_brain_positive_mask]

        metrics = calculate_roi_metrics(
            field_values_positive,
            positive_node_areas,
            ti_field_gm=field_values[whole_brain_positive_mask],
            gm_volumes=whole_brain_node_areas,
        )
        mean_value = metrics["TImean_ROI"]
        max_value = metrics["TImax_ROI"]
        focality = metrics.get("Focality")
        whole_brain_average = metrics.get("TImean_GM")

        # Log the whole brain average for debugging
        if whole_brain_average is not None:
            self.logger.info(f"Whole brain average (denominator for focality): {whole_brain_average:.6f}")

        # Prepare the results with TI_max values
        results = {
            'mean_value': mean_value,
            'max_value': max_value,
            'min_value': min_value,
            'focality': focality
        }

        # Extract TI_normal values from normal mesh
        self.logger.info("Extracting TI_normal values from normal mesh...")
        normal_field_values_positive, normal_statistics = self._extract_normal_field_values(roi_mask)

        # Add TI_normal statistics to results if available
        if normal_statistics is not None:
            results.update(normal_statistics)
            self.logger.debug("TI_normal values successfully added to results")
        else:
            self.logger.warning("TI_normal values not available - results will only contain TI_max values")

        # Generate visualization if requested
        if visualize:
            self.logger.info("Generating visualizations...")
            # Generate focality histogram
            try:
                self.logger.info(f"Generating focality histogram for region: {target_region}")
                # Get node areas for volume weighting
                node_areas = gm_surf.nodes_areas()
                if hasattr(node_areas, 'value'):
                    node_areas = node_areas.value
                node_areas = np.asarray(node_areas)
                positive_node_areas = node_areas[roi_mask][positive_mask]

                self.visualizer.generate_focality_histogram(
                    whole_head_field_data=field_values,
                    roi_field_data=field_values_positive,
                    whole_head_element_sizes=node_areas,
                    roi_element_sizes=positive_node_areas,
                    region_name=target_region,
                    roi_field_value=mean_value,
                    data_type='node'
                )
            except Exception as e:
                self.logger.warning(f"Could not generate focality histogram for {target_region}: {str(e)}")

            # Create region mesh visualization
            # This creates a mesh file with the ROI highlighted
            region_mesh_file = self.visualizer.visualize_cortex_roi(
                gm_surf=gm_surf,
                roi_mask=roi_mask,
                target_region=target_region,
                field_values=field_values,
                max_value=max_value,
                output_dir=self.output_dir,
                surface_mesh_path=self._surface_mesh_path
            )

        # Calculate and save extra focality information for entire grey matter
        self.logger.info("Calculating focality metrics for entire grey matter...")
        focality_info = self._calculate_focality_metrics(
            field_values,  # Use entire surface data, not just ROI
            node_areas,    # Use all node areas, not just ROI
            target_region
        )

        # Save results to CSV
        self.visualizer.save_results_to_csv(results, 'cortical', target_region, 'node')

        # Save extra info CSV with focality data
        if focality_info:
            self.visualizer.save_extra_info_to_csv(focality_info, 'cortical', target_region, 'node')

        return results

    except Exception as e:
        self.logger.error(f"Error in cortical analysis: {str(e)}")
        raise

analyze_sphere

analyze_sphere(center_coordinates, radius, visualize=False, save_results=True)

Analyze a spherical region of interest from a mesh.

Args: center_coordinates: List of [x, y, z] coordinates for the center of the sphere radius: Radius of the sphere in mm visualize: Whether to generate visualization files save_results: Whether to save individual CSV files (default: True, set False for batch processing)

Returns: Dictionary containing analysis results or None if no nodes found

Source code in tit/analyzer/mesh_analyzer.py
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
def analyze_sphere(self, center_coordinates, radius, visualize=False, save_results=True):
    """
    Analyze a spherical region of interest from a mesh.

    Args:
        center_coordinates: List of [x, y, z] coordinates for the center of the sphere
        radius: Radius of the sphere in mm
        visualize: Whether to generate visualization files
        save_results: Whether to save individual CSV files (default: True, set False for batch processing)

    Returns:
        Dictionary containing analysis results or None if no nodes found
    """
    self.logger.info(f"Starting spherical ROI analysis (radius={radius}mm) at coordinates {center_coordinates}")

    try:
        surface_mesh_path = self._generate_surface_mesh()
        gm_surf = simnibs.read_msh(surface_mesh_path)

        # Check if field exists
        if self.field_name not in gm_surf.field:
            available_fields = list(gm_surf.field.keys())
            raise ValueError(f"Field '{self.field_name}' not found in surface mesh. Available fields: {available_fields}")

        # Get field values from surface mesh
        field_values = gm_surf.field[self.field_name].value

        # Create spherical ROI on surface mesh (use canonical helper)
        self.logger.info(f"Creating spherical ROI at {center_coordinates} with radius {radius}mm...")
        node_coords = gm_surf.nodes.node_coord
        roi_indices = ROICoordinateHelper.find_voxels_in_sphere(node_coords, center_coordinates, radius)
        roi_mask = np.zeros(len(node_coords), dtype=bool)
        roi_mask[roi_indices] = True

        # Check if we have any nodes in the ROI
        roi_nodes_count = np.sum(roi_mask)
        if roi_nodes_count == 0:
            self.logger.error(f"Analysis Failed: No nodes found in ROI at [{center_coordinates[0]}, {center_coordinates[1]}, {center_coordinates[2]}], r={radius}mm")
            self.logger.error("ROI is not capturing any grey matter surface nodes")
            self.logger.error("Suggestion: Adjust coordinates/radius or verify using freeview")
            return None

        self.logger.info(f"Found {roi_nodes_count} nodes in the ROI")
        self.logger.info("Calculating statistics...")

        # Get the field values within the ROI
        field_values_in_roi = field_values[roi_mask]

        # Filter for positive values in ROI (matching voxel analyzer behavior)
        positive_mask = field_values_in_roi > 0
        field_values_positive = field_values_in_roi[positive_mask]

        # Check if we have any positive values in the ROI
        positive_count = len(field_values_positive)
        if positive_count == 0:
            self.logger.warning(f"Warning: No positive values found in spherical ROI")
            return None

        self.logger.info(f"Found {positive_count} nodes with positive values in the ROI")
        self.logger.info("Calculating statistics...")

        # Calculate statistics using canonical ROI metric helper
        min_value = np.min(field_values_positive)
        node_areas = gm_surf.nodes_areas()
        if hasattr(node_areas, 'value'):
            node_areas = node_areas.value
        node_areas = np.asarray(node_areas)
        positive_node_areas = node_areas[roi_mask][positive_mask]

        whole_brain_positive_mask = field_values > 0
        whole_brain_node_areas = node_areas[whole_brain_positive_mask]
        metrics = calculate_roi_metrics(
            field_values_positive,
            positive_node_areas,
            ti_field_gm=field_values[whole_brain_positive_mask],
            gm_volumes=whole_brain_node_areas,
        )
        mean_value = metrics["TImean_ROI"]
        max_value = metrics["TImax_ROI"]
        focality = metrics.get("Focality")
        whole_brain_average = metrics.get("TImean_GM")

        # Log the whole brain average for debugging
        if whole_brain_average is not None:
            self.logger.info(f"Whole brain average (denominator for focality): {whole_brain_average:.6f}")

        # Create results dictionary with TI_max values
        results = {
            'mean_value': mean_value,
            'max_value': max_value,
            'min_value': min_value,
            'nodes_in_roi': roi_nodes_count,
            'focality': focality
        }

        # Extract TI_normal values from normal mesh
        self.logger.info("Extracting TI_normal values from normal mesh...")
        normal_field_values_positive, normal_statistics = self._extract_normal_field_values(roi_mask)

        # Add TI_normal statistics to results if available
        if normal_statistics is not None:
            results.update(normal_statistics)
            self.logger.debug("TI_normal values successfully added to results")
        else:
            self.logger.warning("TI_normal values not available - results will only contain TI_max values")

        # Generate visualizations if requested
        if visualize:
            if self.visualizer is not None:
                self.logger.info("Generating visualizations...")

                # Generate focality histogram
                try:
                    self.logger.info("Generating focality histogram for spherical ROI...")
                    self.visualizer.generate_focality_histogram(
                        whole_head_field_data=field_values,
                        roi_field_data=field_values_positive,
                        whole_head_element_sizes=node_areas,
                        roi_element_sizes=positive_node_areas,
                        region_name=f"sphere_x{center_coordinates[0]:.2f}_y{center_coordinates[1]:.2f}_z{center_coordinates[2]:.2f}_r{radius}",
                        roi_field_value=mean_value,
                        data_type='node'
                    )
                except Exception as e:
                    self.logger.warning(f"Could not generate focality histogram for spherical ROI: {str(e)}")

                # Create spherical ROI overlay visualization
                # This creates a mesh file with the ROI highlighted
                self.logger.info("Creating spherical ROI overlay visualization...")
                viz_file = self.visualizer.visualize_spherical_roi(
                    gm_surf=gm_surf,
                    roi_mask=roi_mask,
                    center_coords=center_coordinates,
                    radius=radius,
                    field_values=field_values,
                    max_value=max_value,
                    output_dir=self.output_dir,
                    surface_mesh_path=surface_mesh_path
                )
                results['visualization_file'] = viz_file
            else:
                self.logger.warning("Visualization requested but MeshVisualizer not available")

        # Calculate and save extra focality information for entire grey matter
        self.logger.info("Calculating focality metrics for entire grey matter...")
        focality_info = self._calculate_focality_metrics(
            field_values,  # Use entire surface data, not just ROI
            node_areas,    # Use all node areas, not just ROI
            f"sphere_x{center_coordinates[0]:.2f}_y{center_coordinates[1]:.2f}_z{center_coordinates[2]:.2f}_r{radius}"
        )

        # Save results to CSV (only if save_results=True)
        if save_results and self.visualizer is not None:
            region_name = f"sphere_x{center_coordinates[0]:.2f}_y{center_coordinates[1]:.2f}_z{center_coordinates[2]:.2f}_r{radius}"
            self.visualizer.save_results_to_csv(results, 'spherical', region_name, 'node')

            # Save extra info CSV with focality data
            if focality_info:
                self.visualizer.save_extra_info_to_csv(focality_info, 'spherical', region_name, 'node')
        elif save_results:
            self.logger.warning("Cannot save results to CSV - MeshVisualizer not available")

        return results

    except Exception as e:
        self.logger.error(f"Error in spherical analysis: {str(e)}")
        raise

analyze_whole_head

analyze_whole_head(atlas_type='HCP_MMP1', visualize=False)

Analyze all regions in the specified atlas.

Source code in tit/analyzer/mesh_analyzer.py
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
def analyze_whole_head(self, atlas_type='HCP_MMP1', visualize=False):
    """
    Analyze all regions in the specified atlas.
    """
    start_time = time.time()
    self.logger.info(f"Starting whole head analysis using {atlas_type} atlas")

    try:
        surface_mesh_path = self._generate_surface_mesh()
        self.logger.info("Loading surface mesh...")
        gm_surf = simnibs.read_msh(surface_mesh_path)

        # Load the atlas
        self.logger.info(f"Loading atlas {atlas_type}...")
        atlas = simnibs.subject_atlas(atlas_type, self.subject_dir)

        # Dictionary to store results for each region
        results = {}

        # Analyze each region in the atlas
        for region_name in atlas.keys():
            try:
                self.logger.info(f"Processing region: {region_name}")

                # Create a directory for this region in the main output directory
                region_dir = os.path.join(self.output_dir, region_name)
                os.makedirs(region_dir, exist_ok=True)

                # Get ROI mask for this region
                try:
                    roi_mask = atlas[region_name]
                    # Check if roi_mask is actually a mask array
                    if callable(roi_mask):
                        self.logger.error(f"Region {region_name}: atlas[region_name] returned a callable object, not a mask")
                        raise TypeError(f"Region {region_name}: Expected mask array, got callable object")
                    elif not hasattr(roi_mask, '__len__'):
                        self.logger.error(f"Region {region_name}: atlas[region_name] returned non-array object: {type(roi_mask)}")
                        raise TypeError(f"Region {region_name}: Expected mask array, got {type(roi_mask)}")
                except Exception as e:
                    self.logger.error(f"Failed to get ROI mask for region {region_name}: {str(e)}")
                    raise

                # Check if we have any nodes in the ROI
                roi_nodes_count = np.sum(roi_mask)
                if roi_nodes_count == 0:
                    self.logger.warning(f"Warning: No nodes found in the specified region '{region_name}'")
                    region_results = {
                        'mean_value': None,
                        'max_value': None,
                        'min_value': None,
                        'focality': None,
                        'nodes_in_roi': 0,
                        'normal_mean_value': None,
                        'normal_max_value': None,
                        'normal_min_value': None,
                        'normal_focality': None
                    }

                    # Store in the overall results
                    results[region_name] = region_results

                    continue

                self.logger.info(f"Getting field values within the ROI...")
                # Get the field values within the ROI
                try:
                    field_values = gm_surf.field[self.field_name].value
                    if callable(field_values):
                        self.logger.error(f"Region {region_name}: field values returned a callable object")
                        raise TypeError(f"Region {region_name}: Expected field values array, got callable object")
                    field_values_in_roi = field_values[roi_mask]
                except Exception as e:
                    self.logger.error(f"Failed to get field values for region {region_name}: {str(e)}")
                    self.logger.error(f"Field values type: {type(field_values) if 'field_values' in locals() else 'undefined'}")
                    self.logger.error(f"ROI mask type: {type(roi_mask)}")
                    raise

                # Filter for positive values in ROI (matching voxel analyzer behavior)
                positive_mask = field_values_in_roi > 0
                field_values_positive = field_values_in_roi[positive_mask]

                # Check if we have any positive values in the ROI
                positive_count = len(field_values_positive)
                if positive_count == 0:
                    self.logger.warning(f"Warning: Region {region_name} has no positive values")
                    region_results = {
                        'mean_value': None,
                        'max_value': None,
                        'min_value': None,
                        'focality': None,
                        'nodes_in_roi': 0,
                        'normal_mean_value': None,
                        'normal_max_value': None,
                        'normal_min_value': None,
                        'normal_focality': None
                    }

                    # Store in the overall results
                    results[region_name] = region_results

                    continue

                # Calculate statistics on positive values only
                min_value = np.min(field_values_positive)
                max_value = np.max(field_values_positive)

                # Calculate mean value using node areas for proper averaging (only positive values)
                try:
                    node_areas = gm_surf.nodes_areas()
                    if hasattr(node_areas, 'value'):
                        node_areas = node_areas.value
                    node_areas = np.asarray(node_areas)
                    if callable(node_areas):
                        self.logger.error(f"Region {region_name}: nodes_areas() returned a callable object")
                        raise TypeError(f"Region {region_name}: Expected node areas array, got callable object")
                    positive_node_areas = node_areas[roi_mask][positive_mask]
                except Exception as e:
                    self.logger.error(f"Failed to get node areas for region {region_name}: {str(e)}")
                    self.logger.error(f"Node areas type: {type(node_areas) if 'node_areas' in locals() else 'undefined'}")
                    raise
                # Calculate mean + focality using canonical ROI metric helper
                whole_brain_positive_mask = field_values > 0
                whole_brain_node_areas = node_areas[whole_brain_positive_mask]
                metrics = calculate_roi_metrics(
                    field_values_positive,
                    positive_node_areas,
                    ti_field_gm=field_values[whole_brain_positive_mask],
                    gm_volumes=whole_brain_node_areas,
                )
                mean_value = metrics["TImean_ROI"]
                focality = metrics.get("Focality")
                whole_brain_average = metrics.get("TImean_GM")

                # Log the whole brain average for debugging
                if whole_brain_average is not None:
                    self.logger.info(f"Whole brain average (denominator for focality): {whole_brain_average:.6f}")

                # Create result dictionary for this region with TI_max values
                region_results = {
                    'mean_value': mean_value,
                    'max_value': max_value,
                    'min_value': min_value,
                    'focality': focality,
                    'nodes_in_roi': positive_count
                }

                # Extract TI_normal values from normal mesh for this region
                self.logger.debug(f"Extracting TI_normal values for region: {region_name}")
                normal_field_values_positive, normal_statistics = self._extract_normal_field_values(roi_mask)

                # Add TI_normal statistics to region results if available
                if normal_statistics is not None:
                    region_results.update(normal_statistics)
                    self.logger.debug(f"TI_normal values successfully added for region: {region_name}")
                else:
                    self.logger.warning(f"TI_normal values not available for region: {region_name}")

                # Store in the overall results
                results[region_name] = region_results

                # Generate visualizations if requested
                if visualize:
                    self.logger.info(f"Generating 3D visualization for region: {region_name}")
                    # Generate 3D visualization and save directly to the region directory
                    self.visualizer.visualize_cortex_roi(
                        gm_surf=gm_surf,
                        roi_mask=roi_mask,
                        target_region=region_name,
                        field_values=field_values,
                        max_value=max_value,
                        output_dir=region_dir,
                        surface_mesh_path=self._surface_mesh_path
                    )

                    # Generate focality histogram for this region
                    if len(field_values_positive) > 0:
                        try:
                            self.logger.info(f"Generating focality histogram for region: {region_name}")
                            # Get node areas for volume weighting
                            node_areas = gm_surf.nodes_areas()
                            if hasattr(node_areas, 'value'):
                                node_areas = node_areas.value
                            node_areas = np.asarray(node_areas)
                            positive_node_areas = node_areas[roi_mask][positive_mask]

                            # Create a custom visualizer just for this region with the region directory as output
                            # Pass the main logger to avoid creating derivatives folder in region directory
                            region_visualizer = MeshVisualizer(region_dir, self.logger)

                            region_visualizer.generate_focality_histogram(
                                whole_head_field_data=field_values,
                                roi_field_data=field_values_positive,
                                whole_head_element_sizes=node_areas,
                                roi_element_sizes=positive_node_areas,
                                region_name=region_name,
                                roi_field_value=mean_value,
                                data_type='node'
                            )
                        except Exception as e:
                            self.logger.warning(f"Could not generate focality histogram for {region_name}: {str(e)}")

            except Exception as e:
                self.logger.warning(f"Warning: Failed to analyze region {region_name}: {str(e)}")
                results[region_name] = {
                    'mean_value': None,
                    'max_value': None,
                    'min_value': None,
                    'focality': None,
                    'nodes_in_roi': 0,
                    'normal_mean_value': None,
                    'normal_max_value': None,
                    'normal_min_value': None,
                    'normal_focality': None
                }

        # Generate global visualization plots are disabled - only histograms are generated per region

        # Always generate and save summary CSV after all regions are processed
        self.logger.info("Saving whole-head analysis summary to CSV...")
        self.visualizer.save_whole_head_results_to_csv(results, atlas_type, 'node')

        return results

    finally:
        # Clean up
        try:
            del gm_surf
            del atlas
        except NameError:
            # Variables may not be defined if cleanup failed earlier
            pass

get_grey_matter_statistics

get_grey_matter_statistics()

Calculate grey matter field statistics from the GM surface (central layer).

For mesh analysis, tries to use the same GM surface as other analyses for consistency. Falls back to original mesh if surface generation fails to ensure robustness.

Returns: dict: Dictionary containing grey matter statistics (mean, max, min)

Source code in tit/analyzer/mesh_analyzer.py
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
def get_grey_matter_statistics(self):
    """
    Calculate grey matter field statistics from the GM surface (central layer).

    For mesh analysis, tries to use the same GM surface as other analyses for consistency.
    Falls back to original mesh if surface generation fails to ensure robustness.

    Returns:
        dict: Dictionary containing grey matter statistics (mean, max, min)
    """
    self.logger.info("Calculating grey matter field statistics from GM surface (central layer)...")

    try:
        # Try to generate and load GM surface mesh (same as other mesh analyses)
        surface_mesh_path = self._generate_surface_mesh()
        gm_surf = simnibs.read_msh(surface_mesh_path)

        # Check if field exists
        if self.field_name not in gm_surf.field:
            available_fields = list(gm_surf.field.keys())
            raise ValueError(f"Field '{self.field_name}' not found in GM surface. Available fields: {available_fields}")

        # Get field values from GM surface (same source as other mesh analyses)
        field_values = gm_surf.field[self.field_name].value

        # Filter for positive values only (matching ROI analysis behavior)
        positive_mask = field_values > 0
        field_values_positive = field_values[positive_mask]

        # Check if we have any positive values
        if len(field_values_positive) == 0:
            self.logger.warning("No positive values found in GM surface")
            return {'grey_mean': 0.0, 'grey_max': 0.0, 'grey_min': 0.0}

        # Calculate statistics using area-weighted averaging (consistent with other mesh analyses)
        node_areas = gm_surf.nodes_areas()
        if hasattr(node_areas, 'value'):
            node_areas = node_areas.value
        node_areas = np.asarray(node_areas)
        positive_node_areas = node_areas[positive_mask]
        grey_mean = np.average(field_values_positive, weights=positive_node_areas)
        grey_max = np.max(field_values_positive)
        grey_min = np.min(field_values_positive)

        self.logger.info(f"Grey matter statistics from GM surface for field '{self.field_name}' (positive values only): "
                       f"mean={grey_mean:.6f}, max={grey_max:.6f}, min={grey_min:.6f}")
        self.logger.info(f"Total nodes with positive values: {len(field_values_positive)}")

        return {
            'grey_mean': float(grey_mean),
            'grey_max': float(grey_max),
            'grey_min': float(grey_min)
        }

    except Exception as e:
        self.logger.warning(f"GM surface generation failed: {str(e)}")
        self.logger.info("Falling back to original mesh for grey matter statistics...")

        # Fallback: Use original mesh if surface generation fails
        try:
            field_mesh = simnibs.read_msh(self.field_mesh_path)

            if self.field_name not in field_mesh.field:
                available_fields = list(field_mesh.field.keys())
                raise ValueError(f"Field '{self.field_name}' not found in original mesh. Available fields: {available_fields}")

            field_values = field_mesh.field[self.field_name].value
            positive_mask = field_values > 0
            field_values_positive = field_values[positive_mask]

            if len(field_values_positive) == 0:
                self.logger.warning("No positive values found in original mesh")
                return {'grey_mean': 0.0, 'grey_max': 0.0, 'grey_min': 0.0}

            # Use simple averaging for volume mesh fallback
            grey_mean = np.mean(field_values_positive)
            grey_max = np.max(field_values_positive)
            grey_min = np.min(field_values_positive)

            self.logger.info(f"Grey matter statistics from original mesh (fallback) for field '{self.field_name}': "
                           f"mean={grey_mean:.6f}, max={grey_max:.6f}, min={grey_min:.6f}")

            return {
                'grey_mean': float(grey_mean),
                'grey_max': float(grey_max),
                'grey_min': float(grey_min)
            }

        except Exception as fallback_error:
            self.logger.error(f"Fallback to original mesh also failed: {str(fallback_error)}")
            return {'grey_mean': 0.0, 'grey_max': 0.0, 'grey_min': 0.0}

Voxel analyzer (tit.analyzer.voxel_analyzer)

VoxelAnalyzer: A tool for analyzing voxel-based neuroimaging data

This module provides functionality for analyzing voxel-based data from medical imaging, particularly focusing on field analysis in specific regions of interest (ROIs).

Inputs: - NIfTI files containing field data - Atlas files (NIfTI/MGZ) for cortical parcellation - ROI specifications (coordinates, regions)

Outputs: - Statistical measures (mean, min, max) - ROI masks - Visualization files (optional)

Example Usage: ```python # Initialize analyzer analyzer = VoxelAnalyzer( field_nifti="/path/to/field.nii.gz", subject_dir="/path/to/subject", output_dir="/path/to/output" )

# Analyze a spherical ROI
sphere_results = analyzer.analyze_sphere(
    center_coordinates=[0, 0, 0],
    radius=10,
    visualize=True
)

# Analyze a cortical region
cortex_results = analyzer.analyze_cortex(
    atlas_file="/path/to/atlas.mgz",
    target_region="Left-Hippocampus"
)

# Analyze whole head
whole_head_results = analyzer.analyze_whole_head(
    atlas_file="/path/to/atlas.mgz",
    visualize=True
)
```

Dependencies: - numpy - nibabel - subprocess (for FreeSurfer operations)

VoxelAnalyzer

VoxelAnalyzer(field_nifti: str, subject_dir: str, output_dir: str, logger=None, quiet=False)

A class for analyzing voxel-based data from medical imaging.

This class provides methods for analyzing field data in specific regions of interest, including spherical ROIs and cortical regions defined by an atlas.

Attributes: field_nifti (str): Path to the NIfTI file containing field data subject_dir (str): Directory containing subject data output_dir (str): Directory where analysis results will be saved visualizer (VoxelVisualizer): Instance of visualizer for generating plots

Initialize the VoxelAnalyzer with paths to required data.

Args: field_nifti (str): Path to the NIfTI file containing field data subject_dir (str): Directory containing subject data output_dir (str): Directory where analysis results will be saved logger: Optional logger instance to use. If None, creates its own. quiet (bool): If True, suppress verbose logging messages

Raises: FileNotFoundError: If field_nifti file does not exist

Source code in tit/analyzer/voxel_analyzer.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def __init__(self, field_nifti: str, subject_dir: str, output_dir: str, logger=None, quiet=False):
    """
    Initialize the VoxelAnalyzer with paths to required data.

    Args:
        field_nifti (str): Path to the NIfTI file containing field data
        subject_dir (str): Directory containing subject data
        output_dir (str): Directory where analysis results will be saved
        logger: Optional logger instance to use. If None, creates its own.
        quiet (bool): If True, suppress verbose logging messages

    Raises:
        FileNotFoundError: If field_nifti file does not exist
    """
    # Check for required dependencies
    if nib is None:
        raise ImportError("nibabel is required for voxel analysis but is not installed")

    self.field_nifti = field_nifti
    self.subject_dir = subject_dir
    self.output_dir = output_dir
    self.quiet = quiet

    # Validate early (before logger/file handlers) so missing input paths fail
    # with the intended error rather than a secondary logging error.
    if not os.path.exists(field_nifti):
        raise FileNotFoundError(f"Field file not found: {field_nifti}")

    # Set up logger - use provided logger or create a new one
    if logger is not None:
        # Create a child logger to distinguish voxel analyzer logs
        self.logger = logger.getChild('voxel_analyzer')
    else:
        # Create our own logger if none provided
        time_stamp = time.strftime('%Y%m%d_%H%M%S')

        # Extract subject ID from subject_dir (e.g., m2m_subject -> subject)
        subject_id = os.path.basename(self.subject_dir).split('_')[1] if '_' in os.path.basename(self.subject_dir) else os.path.basename(self.subject_dir)

        # Create derivatives/ti-toolbox/logs/sub-* directory structure
        pm = get_path_manager()
        log_dir = pm.path("ti_logs", subject_id=subject_id)
        os.makedirs(log_dir, exist_ok=True)

        # Create log file in the new directory
        log_file = os.path.join(log_dir, f'voxel_analyzer_{time_stamp}.log')
        self.logger = logging_util.get_logger('voxel_analyzer', log_file, overwrite=True)

    # Initialize visualizer with logger
    self.visualizer = VoxelVisualizer(output_dir, self.logger)

    # Create output directory if it doesn't exist
    if not os.path.exists(output_dir):
        self.logger.debug(f"Creating output directory: {output_dir}")
        os.makedirs(output_dir)

    self.logger.debug(f"Voxel analyzer initialized successfully")
    self.logger.debug(f"Field NIfTI path: {field_nifti}")
    self.logger.debug(f"Subject directory: {subject_dir}")
    self.logger.debug(f"Output directory: {output_dir}")

analyze_cortex

analyze_cortex(atlas_file, target_region, region_info=None, atlas_data=None, field_data=None, visualize=False)

Analyze a field scan within a specific cortical region defined in an atlas.

Source code in tit/analyzer/voxel_analyzer.py
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
def analyze_cortex(self, atlas_file, target_region, region_info=None, atlas_data=None, field_data=None, visualize=False):
    """
    Analyze a field scan within a specific cortical region defined in an atlas.
    """
    # Extract atlas type from filename
    atlas_type = self._extract_atlas_type(atlas_file)

    # Load the atlas and field data if not provided
    if atlas_data is None:
        self.logger.debug(f"Loading atlas from {atlas_file}...")
        atlas_tuple = self.load_brain_image(atlas_file)
        atlas_img, atlas_arr = atlas_tuple
    else:
        # Unpack the tuple
        atlas_img, atlas_arr = atlas_data

    if field_data is None:
        self.logger.debug(f"Loading field from {self.field_nifti}...")
        field_tuple = self.load_brain_image(self.field_nifti)
        field_img, field_arr = field_tuple
    else:
        # Unpack the tuple
        field_img, field_arr = field_data

    # Handle 4D field data (extract first volume if multiple volumes)
    if len(field_arr.shape) == 4:
        if not self.quiet:
            self.logger.info(f"Detected 4D field data with shape {field_arr.shape}")
        field_shape_3d = field_arr.shape[:3]
        # If time dimension is 1, we can simply reshape to 3D
        if field_arr.shape[3] == 1:
            if not self.quiet:
                self.logger.info("Reshaping 4D field data to 3D")
            field_arr = field_arr[:,:,:,0]
        else:
            self.logger.warning(f"4D field has {field_arr.shape[3]} volumes. Using only the first volume.")
            field_arr = field_arr[:,:,:,0]
    else:
        field_shape_3d = field_arr.shape

    # Compare spatial dimensions for atlas and field
    if atlas_arr.shape != field_shape_3d:
        if not self.quiet:
            self.logger.info("Atlas and field dimensions don't match, attempting to resample...")
        self.logger.debug(f"Atlas shape: {atlas_arr.shape}")
        self.logger.debug(f"Field shape: {field_arr.shape}")

        # Resample the atlas to match the field data, passing atlas_file
        atlas_img, atlas_arr = self.resample_to_match(
            atlas_img,
            field_shape_3d,  # Use only the spatial dimensions
            field_img.affine,
            source_path=atlas_file  # Pass the atlas file path
        )

        # Verify the resampling worked
        if atlas_arr.shape != field_shape_3d:
            raise ValueError(f"Failed to resample atlas to match field dimensions: {atlas_arr.shape} vs {field_shape_3d}")
    else:
        if not self.quiet:
            self.logger.info("Atlas and field dimensions already match - skipping resampling")

    # Load region information if not provided
    if region_info is None:
        region_info = self.get_atlas_regions(atlas_file)

    # Determine region ID based on target_region
    if not self.quiet:
        self.logger.info(f"Finding region information for {target_region}...")
    region_id, region_name = self.find_region(target_region, region_info)
    if not self.quiet:
        self.logger.info(f"Processing region: {region_name} (ID: {region_id})")

    # Create mask for this region
    region_mask = (atlas_arr == region_id)

    # Check if the mask contains any voxels
    mask_count = np.sum(region_mask)
    if mask_count == 0:
        self.logger.warning(f"Warning: Region {region_name} (ID: {region_id}) contains 0 voxels in the atlas")
        results = {
            'mean_value': None,
            'max_value': None,
            'min_value': None,
            'focality': None
        }

        # Save results to CSV even if empty
        self.visualizer.save_results_to_csv(results, 'cortical', region_name, 'voxel')

        return results

    # Filter for voxels with positive values
    value_mask = (field_arr > 0)
    combined_mask = region_mask & value_mask

    # Extract field values after filtering
    field_values = field_arr[combined_mask]

    # Check if any voxels remain after filtering
    filtered_count = len(field_values)
    if filtered_count == 0:
        self.logger.warning(f"Warning: Region {region_name} (ID: {region_id}) has no voxels with positive values")
        results = {
            'mean_value': None,
            'max_value': None,
            'min_value': None,
            'focality': None
        }

        # Save results to CSV even if empty
        self.visualizer.save_results_to_csv(results, 'cortical', region_name, 'voxel')

        return results

    if not self.quiet:
        self.logger.info("Calculating statistics...")
    max_value = np.max(field_values)
    min_value = np.min(field_values)

    whole_brain_positive_mask = field_arr > 0
    gm_values = field_arr[whole_brain_positive_mask]
    metrics = calculate_roi_metrics(
        field_values,
        np.ones(len(field_values), dtype=float),
        ti_field_gm=gm_values,
        gm_volumes=np.ones(len(gm_values), dtype=float),
    )
    mean_value = metrics["TImean_ROI"]
    focality = metrics.get("Focality")
    whole_brain_average = metrics.get("TImean_GM")

    # Log the whole brain average for debugging
    if not self.quiet:
        self.logger.info(f"Whole brain average (denominator for focality): {whole_brain_average:.6f}")

    # Prepare results dictionary
    results = {
        'mean_value': mean_value,
        'max_value': max_value,
        'min_value': min_value,
        'focality': focality,
        'voxels_in_roi': filtered_count
    }

    # Generate visualization if requested
    if visualize:
        if not self.quiet:
            self.logger.info("Generating visualizations...")

        # Generate focality histogram
        try:
            if not self.quiet:
                self.logger.info(f"Generating focality histogram for region: {region_name}")
            # Get voxel dimensions from the field image
            voxel_dims = field_img.header.get_zooms()[:3]

            # Filter out zero values from whole head data for histogram
            whole_head_positive_mask = field_arr > 0
            whole_head_filtered = field_arr[whole_head_positive_mask]

            self.visualizer.generate_focality_histogram(
                whole_head_field_data=whole_head_filtered,
                roi_field_data=field_values,
                region_name=region_name,
                roi_field_value=mean_value,
                data_type='voxel',
                voxel_dims=voxel_dims
            )
        except Exception as e:
            self.logger.warning(f"Could not generate focality histogram for {region_name}: {str(e)}")

        # Create visualization NIfTI file
        viz_file = self.visualizer.create_cortex_nifti(
            atlas_img=atlas_img,
            atlas_arr=atlas_arr,
            field_arr=field_arr,
            region_id=region_id,
            region_name=region_name
        )

    # Calculate and save extra focality information for entire field
    if not self.quiet:
        self.logger.info("Calculating focality metrics for entire field...")

    # Get voxel dimensions for volume calculations
    voxel_dims = field_img.header.get_zooms()[:3]
    voxel_volume = np.prod(voxel_dims)  # Volume of one voxel in mm³

    # Use entire field data (positive values only) for focality calculation
    entire_field_positive = field_arr[field_arr > 0]

    focality_info = self._calculate_focality_metrics(
        entire_field_positive,  # Use entire field, not just ROI
        voxel_volume, 
        region_name
    )

    # Save results to CSV
    self.visualizer.save_results_to_csv(results, 'cortical', region_name, 'voxel')

    # Save extra info CSV with focality data
    if focality_info:
        self.visualizer.save_extra_info_to_csv(focality_info, 'cortical', region_name, 'voxel')

    # Return analysis results
    return results

analyze_sphere

analyze_sphere(center_coordinates, radius, visualize=False)

Analyze a spherical region of interest from voxel data.

Returns: Dictionary containing analysis results or None if no valid voxels found

Source code in tit/analyzer/voxel_analyzer.py
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
    def analyze_sphere(self, center_coordinates, radius, visualize=False):
        """
        Analyze a spherical region of interest from voxel data.

        Returns:
            Dictionary containing analysis results or None if no valid voxels found
        """
        if not self.quiet:
            self.logger.info(f"Starting spherical ROI analysis (radius={radius}mm) at coordinates {center_coordinates}")

        # Load the NIfTI data
        if not self.quiet:
            self.logger.info("Loading field data...")
        img = nib.load(self.field_nifti)
        field_data = img.get_fdata()

        # Handle 4D field data (extract first volume if multiple volumes)
        if len(field_data.shape) == 4:
            if field_data.shape[3] == 1:
                field_data = field_data[:,:,:,0]
            else:
                field_data = field_data[:,:,:,0]

        # Get voxel dimensions (for proper distance calculation)
        voxel_size = np.array(img.header.get_zooms()[:3])

        # Get affine transformation matrix
        affine = img.affine

        # Convert world coordinates to voxel coordinates if needed
        if not self.quiet:
            self.logger.info("Converting coordinates and creating ROI mask...")
        inv_affine = np.linalg.inv(affine)
        voxel_coords = np.dot(inv_affine, np.append(center_coordinates, 1))[:3]

        # Create coordinate grids for the entire volume
        x_size, y_size, z_size = field_data.shape
        x, y, z = np.ogrid[:x_size, :y_size, :z_size]

        # Calculate distance from center voxel (using voxel dimensions to account for anisotropy)
        dist_from_center = np.sqrt(
            ((x - voxel_coords[0])**2 * voxel_size[0]**2) +
            ((y - voxel_coords[1])**2 * voxel_size[1]**2) +
            ((z - voxel_coords[2])**2 * voxel_size[2]**2)
        )

        # Create the spherical mask
        roi_mask = dist_from_center <= radius

        # Create mask for non-zero values
        nonzero_mask = field_data > 0

        # Combine masks to get only non-zero values within ROI
        combined_mask = roi_mask & nonzero_mask

        # Count voxels in ROI
        roi_voxels_count = np.sum(combined_mask)
        total_roi_voxels = np.sum(roi_mask)

        # Check if we have any voxels in the ROI
        if roi_voxels_count == 0:
            # Determine the tissue type from the field NIfTI name
            field_name = os.path.basename(self.field_nifti).lower()
            tissue_type = "grey matter" if "grey" in field_name else "white matter" if "white" in field_name else "brain tissue"

            warning_msg = f"""
\033[93m⚠️  WARNING: Analysis Failed ⚠️
• No valid voxels found in ROI at [{center_coordinates[0]}, {center_coordinates[1]}, {center_coordinates[2]}], r={radius}mm
{"ROI not intersecting " + tissue_type if total_roi_voxels == 0 else "ROI contains only zero/invalid values"}
{"Adjust coordinates/radius" if total_roi_voxels == 0 else "Check field data"} or verify using freeview\033[0m"""
            self.logger.warning(warning_msg)

            return None

        if not self.quiet:
            self.logger.info("Calculating statistics...")
        # Get the field values within the ROI
        roi_values = field_data[combined_mask]

        min_value = np.min(roi_values)
        whole_brain_positive_mask = field_data > 0
        gm_values = field_data[whole_brain_positive_mask]
        metrics = calculate_roi_metrics(
            roi_values,
            np.ones(len(roi_values), dtype=float),
            ti_field_gm=gm_values,
            gm_volumes=np.ones(len(gm_values), dtype=float),
        )
        mean_value = metrics["TImean_ROI"]
        max_value = metrics["TImax_ROI"]
        focality = metrics.get("Focality")
        whole_brain_average = metrics.get("TImean_GM")

        # Log the whole brain average for debugging
        if not self.quiet:
            self.logger.info(f"Whole brain average (denominator for focality): {whole_brain_average:.6f}")

        # Create results dictionary
        results = {
            'mean_value': mean_value,
            'max_value': max_value,
            'min_value': min_value,
            'focality': focality,
            'voxels_in_roi': roi_voxels_count
        }

        # Generate visualization if requested
        if visualize:
            region_name = f"sphere_x{center_coordinates[0]:.2f}_y{center_coordinates[1]:.2f}_z{center_coordinates[2]:.2f}_r{radius}"

            # Create visualization overlay (showing field values only within the sphere)
            vis_arr = np.zeros_like(field_data)
            vis_arr[combined_mask] = field_data[combined_mask]

            # Save as NIfTI directly to the output directory
            vis_img = nib.Nifti1Image(vis_arr, affine)
            output_filename = os.path.join(self.output_dir, f"sphere_overlay_{region_name}.nii.gz")
            nib.save(vis_img, output_filename)
            if not self.quiet:
                self.logger.info(f"Created visualization overlay: {output_filename}")

            # Visualization requested but only histogram generation is supported

            # Generate focality histogram if visualizer is available
            if self.visualizer is not None:
                try:
                    if not self.quiet:
                        self.logger.info("Generating focality histogram for spherical ROI...")
                    # Get voxel dimensions from the loaded image
                    voxel_dims = img.header.get_zooms()[:3]

                    # Filter out zero values from whole head data for histogram
                    whole_head_positive_mask = field_data > 0
                    whole_head_filtered = field_data[whole_head_positive_mask]

                    self.visualizer.generate_focality_histogram(
                        whole_head_field_data=whole_head_filtered,
                        roi_field_data=roi_values,
                        region_name=region_name,
                        roi_field_value=mean_value,
                        data_type='voxel',
                        voxel_dims=voxel_dims
                    )
                except Exception as e:
                    self.logger.warning(f"Could not generate focality histogram for spherical ROI: {str(e)}")

        # Calculate and save extra focality information for entire volume
        if not self.quiet:
            self.logger.info("Calculating focality metrics for entire volume...")
        focality_info = self._calculate_focality_metrics(
            field_data,  # Use entire volume data
            np.prod(voxel_size),  # Voxel volume
            f"sphere_x{center_coordinates[0]:.2f}_y{center_coordinates[1]:.2f}_z{center_coordinates[2]:.2f}_r{radius}"
        )

        # Save results to CSV if visualizer is available
        if self.visualizer is not None:
            region_name = f"sphere_x{center_coordinates[0]:.2f}_y{center_coordinates[1]:.2f}_z{center_coordinates[2]:.2f}_r{radius}"
            self.visualizer.save_results_to_csv(results, 'spherical', region_name, 'voxel')

            # Save extra info CSV with focality data
            if focality_info:
                self.visualizer.save_extra_info_to_csv(focality_info, 'spherical', region_name, 'voxel')
        else:
            self.logger.warning("Cannot save results to CSV - VoxelVisualizer not available")

        return results

analyze_whole_head

analyze_whole_head(atlas_file, visualize=False)

Analyze all regions in the specified atlas.

Source code in tit/analyzer/voxel_analyzer.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
def analyze_whole_head(self, atlas_file, visualize=False):
    """
    Analyze all regions in the specified atlas.
    """
    start_time = time.time()
    if not self.quiet:
        self.logger.info(f"Starting whole head analysis of atlas: {atlas_file}")

    # Extract atlas type from filename
    atlas_type = self._extract_atlas_type(atlas_file)
    self.logger.debug(f"Detected atlas type: {atlas_type}")

    try:
        # Load region information once
        region_info = self.get_atlas_regions(atlas_file)

        # Load atlas and field data once
        self.logger.debug(f"Loading atlas from {atlas_file}...")
        atlas_tuple = self.load_brain_image(atlas_file)
        atlas_img, atlas_arr = atlas_tuple

        self.logger.debug(f"Loading field from {self.field_nifti}...")
        field_tuple = self.load_brain_image(self.field_nifti)
        field_img, field_arr = field_tuple

        # Handle 4D field data (extract first volume if multiple volumes)
        if len(field_arr.shape) == 4:
            self.logger.debug(f"Detected 4D field data with shape {field_arr.shape}")
            field_shape_3d = field_arr.shape[:3]
            # If time dimension is 1, we can simply reshape to 3D
            if field_arr.shape[3] == 1:
                self.logger.debug("Reshaping 4D field data to 3D")
                field_arr = field_arr[:,:,:,0]
            else:
                self.logger.warning(f"4D field has {field_arr.shape[3]} volumes. Using only the first volume.")
                field_arr = field_arr[:,:,:,0]
        else:
            field_shape_3d = field_arr.shape

        # Check if resampling is needed and do it once if necessary
        if atlas_arr.shape != field_shape_3d:
            self.logger.debug("Atlas and field dimensions don't match, attempting to resample...")
            self.logger.debug(f"Atlas shape: {atlas_arr.shape}")
            self.logger.debug(f"Field shape: {field_arr.shape}")

            # Resample the atlas to match the field data, passing atlas_file
            atlas_img, atlas_arr = self.resample_to_match(
                atlas_img,
                field_shape_3d,  # Use only the spatial dimensions
                field_img.affine,
                source_path=atlas_file  # Pass the atlas file path
            )

            # Verify the resampling worked
            if atlas_arr.shape != field_shape_3d:
                raise ValueError(f"Failed to resample atlas to match field dimensions: {atlas_arr.shape} vs {field_shape_3d}")
        else:
            self.logger.debug("Atlas and field dimensions already match - skipping resampling")

        field_tuple = (field_img, field_arr)

        # Dictionary to store results for each region
        results = {}

        # Analyze each region in the atlas
        for region_id, info in region_info.items():
            region_name = info['name']
            try:
                self.logger.debug(f"Processing region: {region_name}")

                # Create a directory for this region in the main output directory
                region_dir = os.path.join(self.output_dir, region_name)
                os.makedirs(region_dir, exist_ok=True)

                # Create mask for this region
                region_mask = (atlas_arr == region_id)

                # Check if the mask contains any voxels
                mask_count = np.sum(region_mask)
                if mask_count == 0:
                    self.logger.warning(f"Warning: Region {region_name} (ID: {region_id}) contains 0 voxels in the atlas")
                    region_results = {
                        'mean_value': None,
                        'max_value': None,
                        'min_value': None,
                        'focality': None,
                        'voxels_in_roi': 0
                    }

                    # Store in the overall results
                    results[region_name] = region_results

                    continue

                # Filter for voxels with positive values
                value_mask = (field_arr > 0)
                combined_mask = region_mask & value_mask

                # Extract field values after filtering
                field_values = field_arr[combined_mask]

                # Check if any voxels remain after filtering
                filtered_count = len(field_values)
                if filtered_count == 0:
                    self.logger.warning(f"Warning: Region {region_name} (ID: {region_id}) has no voxels with positive values")
                    region_results = {
                        'mean_value': None,
                        'max_value': None,
                        'min_value': None,
                        'focality': None,
                        'voxels_in_roi': 0
                    }

                    # Store in the overall results
                    results[region_name] = region_results

                    continue

                # Calculate statistics
                max_value = np.max(field_values)
                min_value = np.min(field_values)

                whole_brain_positive_mask = field_arr > 0
                gm_values = field_arr[whole_brain_positive_mask]
                metrics = calculate_roi_metrics(
                    field_values,
                    np.ones(len(field_values), dtype=float),
                    ti_field_gm=gm_values,
                    gm_volumes=np.ones(len(gm_values), dtype=float),
                )
                mean_value = metrics["TImean_ROI"]
                focality = metrics.get("Focality")
                whole_brain_average = metrics.get("TImean_GM")

                # Log the whole brain average for debugging
                if not self.quiet:
                    self.logger.info(f"Whole brain average (denominator for focality): {whole_brain_average:.6f}")

                # Create result dictionary for this region
                region_results = {
                    'mean_value': mean_value,
                    'max_value': max_value,
                    'min_value': min_value,
                    'focality': focality,
                    'voxels_in_roi': filtered_count  # Store the number of voxels
                }

                # Store in the overall results
                results[region_name] = region_results

                # Generate visualizations if requested
                if visualize:
                    # Create visualization NIfTI file directly in the region directory
                    viz_file = self.visualizer._generate_region_visualization(
                        atlas_img=atlas_img,
                        atlas_arr=atlas_arr,
                        field_arr=field_arr,
                        region_id=region_id,
                        region_name=region_name,
                        output_dir=region_dir
                    )

                    # Generate focality histogram for this region
                    if len(field_values) > 0:
                        try:
                            if not self.quiet:
                                self.logger.info(f"Generating focality histogram for region: {region_name}")
                            # Get voxel dimensions from the field image
                            field_img_tuple = self.load_brain_image(self.field_nifti)
                            voxel_dims = field_img_tuple[0].header.get_zooms()[:3]

                            # Filter out zero values from whole head data for histogram
                            whole_head_positive_mask = field_arr > 0
                            whole_head_filtered = field_arr[whole_head_positive_mask]

                            self.visualizer.generate_focality_histogram(
                                whole_head_field_data=whole_head_filtered,
                                roi_field_data=field_values,
                                region_name=region_name,
                                roi_field_value=mean_value,
                                data_type='voxel',
                                voxel_dims=voxel_dims
                            )
                        except Exception as e:
                            self.logger.warning(f"Could not generate focality histogram for {region_name}: {str(e)}")

            except Exception as e:
                self.logger.warning(f"Warning: Failed to analyze region {region_name}: {str(e)}")
                results[region_name] = {
                    'mean_value': None,
                    'max_value': None,
                    'min_value': None,
                    'focality': None,
                    'voxels_in_roi': 0
                }

        # Generate global visualization plots are disabled - only histograms are generated per region

            # Generate and save summary CSV
            self.visualizer.save_whole_head_results_to_csv(results, atlas_type, 'voxel')

        return results

    finally:
        # Clean up all temporary data
        try:
            del atlas_arr
            del field_arr
            del region_info
            if 'atlas_tuple' in locals():
                del atlas_tuple
            if 'field_tuple' in locals():
                del field_tuple
        except NameError:
            # Variables may not be defined if cleanup failed earlier
            pass

find_region

find_region(target_region, region_info)

Find region ID and name based on input.

Parameters:

Name Type Description Default
target_region str or int

Target region name or ID

required
region_info dict

Dictionary with region information

required

Returns:

Type Description
tuple

(region_id, region_name)

Source code in tit/analyzer/voxel_analyzer.py
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
def find_region(self, target_region, region_info):
    """Find region ID and name based on input.

    Parameters
    ----------
    target_region : str or int
        Target region name or ID
    region_info : dict
        Dictionary with region information

    Returns
    -------
    tuple
        (region_id, region_name)
    """
    # Check if target_region is an ID (int) or can be converted to one
    try:
        region_id = int(target_region)
        # If it's an ID, get the name from region_info if available
        if region_info and region_id in region_info:
            region_name = region_info[region_id]['name']
        else:
            region_name = f"Region {region_id}"
        return region_id, region_name
    except ValueError:
        # target_region is a string name, need to find the corresponding ID
        if not region_info:
            raise ValueError("Region labels are required to look up regions by name")

        # Search for the region name (case-insensitive)
        target_lower = target_region.lower()
        for region_id, info in region_info.items():
            if target_lower in info['name'].lower():
                return region_id, info['name']

        # If we get here, region name was not found
        raise ValueError(f"Region name '{target_region}' not found in region labels")

get_atlas_regions

get_atlas_regions(atlas_file)

Extract region information from atlas file using FreeSurfer's mri_segstats.

Parameters:

Name Type Description Default
atlas_file str

Path to the atlas file (.nii or .mgz)

required

Returns:

Type Description
dict

Dictionary mapping region IDs to region information

Source code in tit/analyzer/voxel_analyzer.py
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
def get_atlas_regions(self, atlas_file):
    """Extract region information from atlas file using FreeSurfer's mri_segstats.

    Parameters
    ----------
    atlas_file : str
        Path to the atlas file (.nii or .mgz)

    Returns
    -------
    dict
        Dictionary mapping region IDs to region information
    """
    region_info = {}

    # Get the atlas name from the file path
    atlas_name = os.path.basename(atlas_file)
    atlas_name = os.path.splitext(atlas_name)[0]  # Remove extension
    if atlas_name.endswith('.nii'):  # Handle .nii.gz case
        atlas_name = os.path.splitext(atlas_name)[0]

    # Get the FreeSurfer subject directory (parent of mri directory)
    mri_dir = os.path.dirname(atlas_file)
    freesurfer_dir = os.path.dirname(mri_dir)

    # Define the output file path in the mri directory
    output_file = os.path.join(mri_dir, f"{atlas_name}_labels.txt")

    # Create mri directory if it doesn't exist
    os.makedirs(mri_dir, exist_ok=True)

    # Function to generate the labels file
    def generate_labels_file():
        cmd = [
            'mri_segstats',
            '--seg', atlas_file,
            '--excludeid', '0',  # Exclude background
            '--ctab-default',    # Use default color table
            '--sum', output_file
        ]

        try:
            if not self.quiet:
                self.logger.info(f"Running: {' '.join(cmd)}")
            subprocess.run(cmd, check=True, capture_output=True)
            return True
        except subprocess.CalledProcessError as e:
            self.logger.warning(f"Warning: Could not extract region information using mri_segstats: {str(e)}")
            self.logger.debug(f"Command output: {e.stdout.decode() if e.stdout else ''}")
            self.logger.debug(f"Command error: {e.stderr.decode() if e.stderr else ''}")
            return False

    # Function to parse the labels file
    def parse_labels_file():
        try:
            with open(output_file, 'r') as f:
                # Skip header lines (all lines starting with #)
                in_header = True
                for line in f:
                    # Check if we've reached the end of the header
                    if in_header and not line.startswith('#'):
                        in_header = False

                    # Process data lines (non-header)
                    if not in_header and line.strip():
                        parts = line.strip().split()

                        # The format is:
                        # Index SegId NVoxels Volume_mm3 StructName
                        # We need at least 5 columns
                        if len(parts) >= 5:
                            try:
                                region_id = int(parts[1])  # SegId is the second column
                                n_voxels = int(parts[2])   # NVoxels is the third column

                                # Structure name can contain spaces, so join the remaining parts
                                region_name = ' '.join(parts[4:])

                                # Generate a random color based on region_id for visualization
                                # This creates a consistent color for each region
                                import random
                                random.seed(region_id)
                                r = random.uniform(0.2, 0.8)
                                g = random.uniform(0.2, 0.8)
                                b = random.uniform(0.2, 0.8)

                                region_info[region_id] = {
                                    'name': region_name,
                                    'voxel_count': n_voxels,
                                    'color': (r, g, b)
                                }
                            except (ValueError, IndexError) as e:
                                self.logger.warning(f"Warning: Could not parse line: {line.strip()}")
                                self.logger.debug(f"Error: {str(e)}")

            return len(region_info) > 0
        except Exception as e:
            self.logger.error(f"Error reading labels file: {str(e)}")
            return False

    # Try to use existing file first
    if os.path.exists(output_file):
        if not self.quiet:
            self.logger.info(f"Found existing labels file: {output_file}")
        if parse_labels_file():
            if not self.quiet:
                self.logger.info(f"Successfully parsed {len(region_info)} regions from existing file")
            return region_info
        else:
            if not self.quiet:
                self.logger.info("Existing file is invalid or empty, regenerating...")

    # Generate new file if needed
    if generate_labels_file():
        if parse_labels_file():
            if not self.quiet:
                self.logger.info(f"Successfully generated and parsed {len(region_info)} regions")
            return region_info

    self.logger.warning("Warning: Could not get region information from atlas file")
    return region_info

get_grey_matter_statistics

get_grey_matter_statistics()

Calculate grey matter field statistics from the NIfTI data.

Returns: dict: Dictionary containing grey matter statistics (mean, max, min)

Source code in tit/analyzer/voxel_analyzer.py
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
def get_grey_matter_statistics(self):
    """
    Calculate grey matter field statistics from the NIfTI data.

    Returns:
        dict: Dictionary containing grey matter statistics (mean, max, min)
    """
    if not self.quiet:
        self.logger.info("Calculating grey matter field statistics...")

    try:
        # Load the field data
        img = nib.load(self.field_nifti)
        field_data = img.get_fdata()

        # Handle 4D field data (extract first volume if multiple volumes)
        if len(field_data.shape) == 4:
            if field_data.shape[3] == 1:
                field_data = field_data[:,:,:,0]
            else:
                field_data = field_data[:,:,:,0]

        # For voxel analysis, we assume the entire field is grey matter
        # (since we're typically working with grey matter NIfTI files)
        # Filter for positive values only (matching ROI analysis behavior)
        positive_mask = field_data > 0
        field_data_positive = field_data[positive_mask]

        # Check if we have any positive values
        if len(field_data_positive) == 0:
            self.logger.warning("No positive values found in grey matter data")
            return {'grey_mean': 0.0, 'grey_max': 0.0, 'grey_min': 0.0}

        # Calculate statistics on positive values only
        grey_mean = np.mean(field_data_positive)
        grey_max = np.max(field_data_positive)
        grey_min = np.min(field_data_positive)

        if not self.quiet:
            self.logger.info(f"Grey matter statistics for field '{os.path.basename(self.field_nifti)}' (positive values only): "
                           f"mean={grey_mean:.6f}, max={grey_max:.6f}, min={grey_min:.6f}")

        return {
            'grey_mean': float(grey_mean),
            'grey_max': float(grey_max),
            'grey_min': float(grey_min)
        }

    except Exception as e:
        self.logger.error(f"Error calculating grey matter statistics: {str(e)}")
        return {'grey_mean': 0.0, 'grey_max': 0.0, 'grey_min': 0.0}

load_brain_image

load_brain_image(file_path)

Load brain image data from file, handling different formats.

Parameters:

Name Type Description Default
file_path str

Path to the image file

required

Returns:

Type Description
tuple

(nibabel image object, numpy array of data)

Source code in tit/analyzer/voxel_analyzer.py
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
def load_brain_image(self, file_path):
    """Load brain image data from file, handling different formats.

    Parameters
    ----------
    file_path : str
        Path to the image file

    Returns
    -------
    tuple
        (nibabel image object, numpy array of data)
    """
    file_ext = Path(file_path).suffix.lower()

    if file_ext == '.mgz':
        # Try to use nibabel directly first
        try:
            img = nib.load(file_path)
            data = img.get_fdata()
            return img, data
        except Exception as e:
            self.logger.warning(f"Could not load MGZ file directly: {str(e)}")

            # Try to convert MGZ to NIfTI using mri_convert
            try:
                # Create a temporary file for the converted image
                with tempfile.NamedTemporaryFile(suffix='.nii.gz', delete=False) as temp:
                    temp_path = temp.name

                # Run mri_convert to convert MGZ to NIfTI
                cmd = ['mri_convert', file_path, temp_path]
                subprocess.run(cmd, check=True)

                # Load the converted file
                img = nib.load(temp_path)
                data = img.get_fdata()

                # Clean up
                os.unlink(temp_path)

                return img, data
            except Exception as e2:
                raise RuntimeError(f"Failed to convert MGZ file: {str(e2)}")
    else:
        # For NIfTI and other formats, use nibabel directly
        img = nib.load(file_path)
        data = img.get_fdata()
        return img, data

resample_to_match

resample_to_match(source_img, target_shape, target_affine, source_path=None)

Resample source image to match target dimensions and affine using FreeSurfer's mri_convert.

If a resampled version already exists, it will be loaded instead of generating a new one.

Parameters:

Name Type Description Default
source_img Nifti1Image

Source image to resample

required
target_shape tuple

Target shape (x, y, z) or (x, y, z, t)

required
target_affine ndarray

Target affine transformation matrix

required
source_path str

Path to the source file. If provided, will be used to generate a resampled file name.

None

Returns:

Type Description
tuple

(resampled nibabel image, resampled data array)

Source code in tit/analyzer/voxel_analyzer.py
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
def resample_to_match(self, source_img, target_shape, target_affine, source_path=None):
    """Resample source image to match target dimensions and affine using FreeSurfer's mri_convert.

    If a resampled version already exists, it will be loaded instead of generating a new one.

    Parameters
    ----------
    source_img : nibabel.Nifti1Image
        Source image to resample
    target_shape : tuple
        Target shape (x, y, z) or (x, y, z, t)
    target_affine : numpy.ndarray
        Target affine transformation matrix
    source_path : str, optional
        Path to the source file. If provided, will be used to generate a resampled file name.

    Returns
    -------
    tuple
        (resampled nibabel image, resampled data array)
    """
    if not self.quiet:
        self.logger.info(f"Resampling image from shape {source_img.shape} to {target_shape}")

    # If source_path is provided, try to find an existing resampled file
    if source_path:
        resampled_path = self._get_resampled_atlas_filename(source_path, target_shape)
        if os.path.exists(resampled_path):
            if not self.quiet:
                self.logger.info(f"Found existing resampled atlas: {resampled_path}")
            try:
                resampled_img = nib.load(resampled_path)
                resampled_data = resampled_img.get_fdata()

                # Check if the resampled image has the correct shape
                if resampled_data.shape[:3] == target_shape[:3]:
                    if not self.quiet:
                        self.logger.info("Loaded previously resampled atlas")

                    # If target is 4D but resampled is 3D, reshape it to match
                    is_target_4d = len(target_shape) == 4
                    if is_target_4d and len(resampled_data.shape) == 3:
                        if not self.quiet:
                            self.logger.info(f"Reshaping 3D data {resampled_data.shape} to match 4D target {target_shape}")
                        # Add a dimension to match the 4D target shape
                        resampled_data = np.expand_dims(resampled_data, axis=3)

                        # Create a new 4D NIfTI image
                        new_header = resampled_img.header.copy()
                        new_header.set_data_shape(resampled_data.shape)
                        resampled_img = nib.Nifti1Image(resampled_data, resampled_img.affine, header=new_header)

                    return resampled_img, resampled_data
                else:
                    self.logger.warning(f"Existing resampled atlas has wrong shape: {resampled_data.shape[:3]} vs expected {target_shape[:3]}")
            except Exception as e:
                self.logger.error(f"Error loading existing resampled atlas: {str(e)}")
                if not self.quiet:
                    self.logger.info("Will generate a new one")

    if not self.quiet:
        self.logger.info("Generating new resampled atlas...")

    # If target shape is 4D but source is 3D, we need to handle this specially
    is_target_4d = len(target_shape) == 4
    spatial_shape = target_shape[:3]  # Extract just the spatial dimensions

    # Create a temporary file for the target template
    with tempfile.NamedTemporaryFile(suffix='.nii.gz', delete=False) as temp_template:
        template_path = temp_template.name

    # Create a temporary file for the resampled output
    with tempfile.NamedTemporaryFile(suffix='.nii.gz', delete=False) as temp_output:
        output_path = temp_output.name

    try:
        # Save the source image to a temporary file if not provided
        if source_path and os.path.exists(source_path):
            # Use the provided source path
            temp_source_created = False
        else:
            # Save the source image to a temporary file
            with tempfile.NamedTemporaryFile(suffix='.nii.gz', delete=False) as temp_source:
                source_path = temp_source.name
                nib.save(source_img, source_path)
                temp_source_created = True

        # Create a template image with target dimensions (3D only)
        template_img = nib.Nifti1Image(np.zeros(spatial_shape), target_affine)
        nib.save(template_img, template_path)

        # Run mri_convert to resample the image
        cmd = [
            'mri_convert',
            '--reslice_like', template_path,  # Use template for resampling
            source_path,                      # Source image
            output_path                       # Output image
        ]

        if not self.quiet:
            self.logger.info(f"Running: {' '.join(cmd)}")
        result = subprocess.run(cmd, check=True, capture_output=True)

        # Load the resampled image
        resampled_img = nib.load(output_path)
        resampled_data = resampled_img.get_fdata()

        # If target is 4D but resampled is 3D, reshape it to match
        if is_target_4d and len(resampled_data.shape) == 3:
            if not self.quiet:
                self.logger.info(f"Reshaping 3D data {resampled_data.shape} to match 4D target {target_shape}")
            # Add a dimension to match the 4D target shape
            resampled_data = np.expand_dims(resampled_data, axis=3)

            # Create a new 4D NIfTI image
            new_header = resampled_img.header.copy()
            new_header.set_data_shape(resampled_data.shape)
            resampled_img = nib.Nifti1Image(resampled_data, resampled_img.affine, header=new_header)

        # Save the resampled atlas for future use if source_path was provided and not temporary
        if source_path and not temp_source_created:
            resampled_save_path = self._get_resampled_atlas_filename(source_path, target_shape)
            if not self.quiet:
                self.logger.info(f"Saving resampled atlas for future use: {resampled_save_path}")
            nib.save(resampled_img, resampled_save_path)

        if not self.quiet:
            self.logger.info("Resampling complete")
        return resampled_img, resampled_data

    finally:
        # Clean up temporary files
        for temp_file in [template_path, output_path]:
            try:
                os.unlink(temp_file)
            except (OSError, FileNotFoundError):
                # Best-effort cleanup - file may already be deleted
                pass

        # Also remove temporary source file if we created it
        if 'temp_source_created' in locals() and temp_source_created and 'source_path' in locals():
            try:
                os.unlink(source_path)
            except (OSError, FileNotFoundError):
                # Best-effort cleanup - file may already be deleted
                pass

Group analyzer (tit.analyzer.group_analyzer)

Group Analyzer Script

Each subject's analysis will be saved under: derivatives/SimNIBS//Simulations//Analyses//

Within each subject's Analyses folder, we also capture: - overlays & plots (via --visualize) - CSV summary (handled by main_analyzer.py) - centralized group analysis log file (all subjects logged together)

build_main_analyzer_command

build_main_analyzer_command(args, subject_args: List[str], subject_output_dir: str) -> List[str]

Build the command to run main_analyzer.py for a single subject, matching the exact structure used in analyzer_tab.py.

Source code in tit/analyzer/group_analyzer.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def build_main_analyzer_command(
    args,
    subject_args: List[str],
    subject_output_dir: str
) -> List[str]:
    """
    Build the command to run main_analyzer.py for a single subject,
    matching the exact structure used in analyzer_tab.py.
    """
    global group_logger

    subject_id = subject_args[0]
    m2m_path = subject_args[1]
    field_path = subject_args[2]

    # Locate main_analyzer.py (assume it sits next to this script)
    script_dir = Path(__file__).parent
    main_analyzer_path = script_dir / "main_analyzer.py"

    # Use `simnibs_python` as the interpreter (matching analyzer_tab.py)
    cmd = ["simnibs_python", str(main_analyzer_path)]

    # Add arguments in the same order as analyzer_tab.py
    cmd += ["--m2m_subject_path", m2m_path]

    # For mesh analysis, field_path is the montage name directly
    # For voxel analysis, field_path is the actual file path
    if args.space == 'mesh':
        # field_path is already the montage name
        cmd += ["--montage_name", field_path]
    else:
        cmd += ["--field_path", field_path]

    cmd += ["--space", args.space]
    cmd += ["--analysis_type", args.analysis_type]

    # Analysis-type-specific flags
    if args.analysis_type == 'spherical':
        # Pass coordinates and coordinate space - let main_analyzer.py handle transformation
        cmd += ["--coordinates"] + [str(c) for c in args.coordinates]
        cmd += ["--coordinate-space", args.coordinate_space]
        cmd += ["--radius", str(args.radius)]

    else:  # cortical
        # For cortical, add atlas info first
        if args.space == 'mesh':
            cmd += ["--atlas_name", args.atlas_name]
        else:  # voxel
            atlas_path = subject_args[3]
            cmd += ["--atlas_path", atlas_path]

        # Then add region or whole_head flag
        if args.whole_head:
            cmd += ["--whole_head"]
        else:
            cmd += ["--region", args.region]

    # Field name is now hardcoded in main_analyzer.py, no need to pass it

    # Add output directory
    cmd += ["--output_dir", subject_output_dir]

    # Always enable visualizations (matching the request)
    cmd += ["--visualize"]

    # Add group analysis log file path if available
    if group_logger and hasattr(group_logger, 'handlers') and group_logger.handlers:
        # Get the log file path from the first handler
        for handler in group_logger.handlers:
            if hasattr(handler, 'baseFilename'):
                log_file_path = getattr(handler, 'baseFilename')
                cmd += ["--log_file", log_file_path]
                break

    # Add quiet flag if requested
    if args.quiet:
        cmd += ["--quiet"]

    return cmd

collect_analysis_paths

collect_analysis_paths(successful_dirs: List[str]) -> List[str]

Each entry in successful_dirs is exactly the folder we passed to --output_dir for main_analyzer.py. We check that it contains at least one .csv before returning it.

Source code in tit/analyzer/group_analyzer.py
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
def collect_analysis_paths(successful_dirs: List[str]) -> List[str]:
    """
    Each entry in successful_dirs is exactly the folder we passed to --output_dir
    for main_analyzer.py. We check that it contains at least one .csv before returning it.
    """
    global group_logger

    analysis_paths = []
    for d in successful_dirs:
        if os.path.isdir(d):
            csv_list = [f for f in os.listdir(d) if f.lower().endswith(".csv")]
            if csv_list:
                analysis_paths.append(d)
            else:
                if group_logger:
                    group_logger.debug(f"No CSV found under {d}")
        else:
            if group_logger:
                group_logger.debug(f"Expected analysis directory not found: {d}")

    return analysis_paths

compute_subject_output_dir

compute_subject_output_dir(args, subject_args: List[str]) -> str

Create a consistent output directory structure for analysis results. Structure: output_dir/sub-{subject_id}/Simulations/{montage_name}/Analyses/{Mesh|Voxel}/{analysis_name}

Source code in tit/analyzer/group_analyzer.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def compute_subject_output_dir(args, subject_args: List[str]) -> str:
    """
    Create a consistent output directory structure for analysis results.
    Structure: output_dir/sub-{subject_id}/Simulations/{montage_name}/Analyses/{Mesh|Voxel}/{analysis_name}
    """
    subject_id = subject_args[0]

    # For mesh analysis, field_path is the montage name
    # For voxel analysis, we need to extract montage name from field path
    if args.space == 'mesh':
        montage_name = subject_args[2]  # field_path is montage name for mesh
    else:
        # For voxel analysis, extract montage name from path structure
        field_path = subject_args[2]
        try:
            path_parts = Path(field_path).parts
            sim_idx = path_parts.index('Simulations')
            montage_name = path_parts[sim_idx + 1]
        except (ValueError, IndexError):
            # If we can't parse the path, use a default montage name
            montage_name = 'unknown_montage'

    # Build consistent directory structure
    space_dir = 'Mesh' if args.space == 'mesh' else 'Voxel'
    base_dir = os.path.join(args.output_dir, f'sub-{subject_id}', 'Simulations', montage_name, 'Analyses', space_dir)

    # Create analysis-specific directory name
    if args.analysis_type == 'spherical':
        coord_space_suffix = f"_{args.coordinate_space}"
        coords = [float(c) for c in args.coordinates]
        analysis_name = f"sphere_x{coords[0]:.2f}_y{coords[1]:.2f}_z{coords[2]:.2f}_r{args.radius}{coord_space_suffix}"
    else:  # cortical
        if args.whole_head:
            if args.space == 'mesh':
                analysis_name = f"whole_head_{args.atlas_name}"
            else:
                atlas_path = subject_args[3]
                atlas_name = os.path.basename(atlas_path).split('.')[0]
                analysis_name = f"whole_head_{atlas_name}"
        else:
            analysis_name = f"region_{args.region}"

    output_dir = os.path.join(base_dir, analysis_name)
    os.makedirs(output_dir, exist_ok=True)
    return output_dir

create_group_output_directory

create_group_output_directory(first_subject_path: str) -> str

Create centralized group analysis output directory.

Args: first_subject_path (str): Path from the first subject to extract project name

Returns: str: Path to the created group analysis directory

Source code in tit/analyzer/group_analyzer.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def create_group_output_directory(first_subject_path: str) -> str:
    """
    Create centralized group analysis output directory.

    Args:
        first_subject_path (str): Path from the first subject to extract project name

    Returns:
        str: Path to the created group analysis directory
    """
    global group_logger

    # Extract project name from the first subject's path
    project_name = _extract_project_name(first_subject_path)

    # No longer create centralized group analysis directory under SimNIBS
    # Individual subject analyses are stored in their respective subject directories
    # Group comparisons will be stored in the user-specified output directory

    if group_logger:
        group_logger.debug(f"Using project: {project_name}")

    # Return a placeholder path - not actually used since we don't create central directories
    return f"/mnt/{project_name}/derivatives/SimNIBS"

determine_group_subfolder_name

determine_group_subfolder_name(args, first_subject_args: List[str]) -> str

Determine the group subfolder name in format: {montage}_{roi}

Args: args: Command line arguments first_subject_args: First subject's arguments to extract montage info

Returns: str: Subfolder name like "montage_roi"

Source code in tit/analyzer/group_analyzer.py
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
def determine_group_subfolder_name(args, first_subject_args: List[str]) -> str:
    """
    Determine the group subfolder name in format: {montage}_{roi}

    Args:
        args: Command line arguments
        first_subject_args: First subject's arguments to extract montage info

    Returns:
        str: Subfolder name like "montage_roi"
    """
    # Extract montage name from field path
    field_path = first_subject_args[2]
    path_parts = Path(field_path).parts

    try:
        # Locate "Simulations" in the file path
        sim_idx = path_parts.index('Simulations')
        # Montage name is the folder immediately after "Simulations"
        montage_name = path_parts[sim_idx + 1]
    except (ValueError, IndexError):
        # If "Simulations" not found in path, try to extract from filename
        field_filename = Path(field_path).stem
        if field_filename.endswith('_TINormal'):
            montage_name = field_filename.replace('_TINormal', 'Normal')
        elif field_filename.endswith('_TI_Normal'):
            montage_name = field_filename.replace('_TI_Normal', '_Normal')
        elif field_filename.endswith('_TI'):
            montage_name = field_filename.replace('_TI', '')
        else:
            montage_name = field_filename

    # Determine ROI description
    if args.analysis_type == 'spherical':
        roi_desc = f"sphere_r{args.radius}"
    else:  # cortical
        if args.whole_head:
            if args.space == 'mesh':
                roi_desc = f"whole_head_{args.atlas_name}"
            else:  # voxel
                atlas_path = first_subject_args[3] if len(first_subject_args) > 3 else "unknown_atlas"
                atlas_name = os.path.basename(atlas_path).split('.')[0] if atlas_path != "unknown_atlas" else "unknown_atlas"
                roi_desc = f"whole_head_{atlas_name}"
        else:
            roi_desc = args.region

    return f"{montage_name}_{roi_desc}"

format_duration

format_duration(total_seconds)

Format duration in human-readable format.

Source code in tit/analyzer/group_analyzer.py
28
29
30
31
32
33
34
35
36
37
38
39
40
def format_duration(total_seconds):
    """Format duration in human-readable format."""
    total_seconds = int(total_seconds)
    hours = total_seconds // 3600
    minutes = (total_seconds % 3600) // 60
    seconds = total_seconds % 60

    if hours > 0:
        return f"{hours}h {minutes}m {seconds}s"
    elif minutes > 0:
        return f"{minutes}m {seconds}s"
    else:
        return f"{seconds}s"

log_group_analysis_complete

log_group_analysis_complete(num_successful, num_total, output_path='')

Log the completion of group analysis.

Source code in tit/analyzer/group_analyzer.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def log_group_analysis_complete(num_successful, num_total, output_path=""):
    """Log the completion of group analysis."""
    global _group_start_time, SUMMARY_MODE

    if _group_start_time is not None:
        total_duration = time.time() - _group_start_time
        duration_str = format_duration(total_duration)

        if SUMMARY_MODE:
            if num_successful == num_total:
                print(f"└─ Group analysis completed ({num_successful}/{num_total} subjects successful, Total: {duration_str})")
            else:
                print(f"└─ Group analysis completed with failures ({num_successful}/{num_total} subjects successful, Total: {duration_str})")
            if output_path:
                # Show relative path from /mnt/ for cleaner display
                display_path = output_path.replace('/mnt/', '') if output_path.startswith('/mnt/') else output_path
                print(f"   Group results saved to: {display_path}")

log_group_analysis_start

log_group_analysis_start(num_subjects, analysis_description)

Log the start of group analysis.

Source code in tit/analyzer/group_analyzer.py
42
43
44
45
46
47
48
def log_group_analysis_start(num_subjects, analysis_description):
    """Log the start of group analysis."""
    global _group_start_time, SUMMARY_MODE
    _group_start_time = time.time()

    if SUMMARY_MODE:
        print(f"\nBeginning group analysis for {num_subjects} subjects ({analysis_description})")

log_group_subject_status

log_group_subject_status(subject_id, status, duration_str, error_msg='')

Log the status of a subject in group analysis.

Source code in tit/analyzer/group_analyzer.py
68
69
70
71
72
73
74
75
76
def log_group_subject_status(subject_id, status, duration_str, error_msg=""):
    """Log the status of a subject in group analysis."""
    global SUMMARY_MODE

    if SUMMARY_MODE:
        if status == "complete":
            print(f"├─ Subject {subject_id}: ✓ Complete ({duration_str})")
        else:
            print(f"├─ Subject {subject_id}: ✗ Failed ({duration_str}) - {error_msg}")

run_comprehensive_group_analysis

run_comprehensive_group_analysis(analysis_paths: List[str], project_name: Optional[str] = None) -> str

Run comprehensive group analysis using all available comparison methods.

Args: analysis_paths (List[str]): List of paths to individual subject analysis directories project_name (str, optional): Project name. If None, extracted from first path.

Returns: str: Path to the group analysis output directory

Source code in tit/analyzer/group_analyzer.py
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
def run_comprehensive_group_analysis(analysis_paths: List[str], project_name: Optional[str] = None) -> str:
    """
    Run comprehensive group analysis using all available comparison methods.

    Args:
        analysis_paths (List[str]): List of paths to individual subject analysis directories
        project_name (str, optional): Project name. If None, extracted from first path.

    Returns:
        str: Path to the group analysis output directory
    """
    global group_logger

    if len(analysis_paths) == 0:
        if group_logger:
            group_logger.debug("No analysis paths provided for group comparison.")
        return ""

    if group_logger:
        group_logger.debug(f"\n=== Running comprehensive group analysis on {len(analysis_paths)} analyses ===")

    try:
        # Use the comprehensive comparison function from compare_analyses.py
        group_output_dir = run_all_group_comparisons(analysis_paths, project_name)
        if group_logger:
            group_logger.debug(f"✔ Comprehensive group analysis completed successfully.")
            group_logger.debug(f"  All results saved to: {group_output_dir}")
        return group_output_dir
    except Exception as e:
        if group_logger:
            group_logger.error(f"✖ Group analysis failed with error: {e}")
        return ""

run_subject_analysis

run_subject_analysis(args, subject_args: List[str]) -> Tuple[bool, str]

Run analysis for a single subject and return (success, output_dir). All output is logged to the centralized group analysis log file.

Source code in tit/analyzer/group_analyzer.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
def run_subject_analysis(args, subject_args: List[str]) -> Tuple[bool, str]:
    """
    Run analysis for a single subject and return (success, output_dir).
    All output is logged to the centralized group analysis log file.
    """
    global group_logger

    subject_id = subject_args[0]
    m2m_path = subject_args[1]
    field_path = subject_args[2]

    # Compute the target output folder for this subject
    subject_output_dir = compute_subject_output_dir(args, subject_args)

    # Build the command
    cmd = build_main_analyzer_command(args, subject_args, subject_output_dir)

    if group_logger:
        group_logger.debug(f"Starting analysis for subject: {subject_id}")

    # Track timing for subject analysis
    import time
    start_time = time.time()

    # Run main_analyzer.py with real-time output streaming
    if args.quiet:
        # In summary mode, stream output in real-time to show task steps as they happen
        proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, 
                               text=True, bufsize=1, universal_newlines=True)

        # Stream output in real-time
        output_lines = []
        while True:
            line = proc.stdout.readline()
            if not line and proc.poll() is not None:
                break
            if line:
                line = line.strip()
                if line:
                    output_lines.append(line)
                    # Display task steps in real-time
                    if line.startswith('Beginning analysis for subject:'):
                        # This is the subject start message, display it with proper indentation
                        print(f"  {line}")
                    elif line.startswith('├─ ') or line.startswith('└─ '):
                        # This is a task step, display it with proper indentation
                        clean_line = line[2:]  # Remove the tree symbols
                        print(f"  ├─ {clean_line}")
                    elif 'Starting...' in line and 'Subject' in line:
                        # Skip the "Subject X: Starting..." line as it's already shown
                        continue
                    elif '✓ Complete' in line or '✗ Failed' in line:
                        # Skip completion lines as they're handled separately
                        continue
                    elif 'Analysis completed successfully' in line:
                        # Skip the final completion message as it's handled separately
                        continue

        # Wait for process to complete
        return_code = proc.wait()
        stdout_output = '\n'.join(output_lines)
    else:
        # In debug mode, capture output as before
        proc = subprocess.run(cmd, capture_output=True, text=True)
        return_code = proc.returncode
        stdout_output = proc.stdout

    end_time = time.time()
    duration = int(end_time - start_time)
    duration_str = format_duration(duration)

    if return_code == 0:
        if group_logger:
            group_logger.debug(f"Subject {subject_id} analysis completed successfully")

        # Log subject status for summary
        if args.quiet:
            log_group_subject_status(subject_id, "complete", duration_str)

        return True, subject_output_dir
    else:
        if group_logger:
            group_logger.error(f"Subject {subject_id} analysis failed")
            # Log any additional error output that might not have been captured by main_analyzer.py
            if stdout_output.strip():
                group_logger.error(f"stdout: {stdout_output.strip()}")
            # For streaming mode, stderr is redirected to stdout, so no separate stderr handling needed

        # Extract meaningful error message for summary
        error_msg = ""
        if stdout_output.strip():
            # Look for error patterns in stdout
            stdout_lines = stdout_output.strip().split('\n')
            for line in stdout_lines:
                if any(keyword in line.lower() for keyword in ['error:', 'failed', 'exception', 'critical']):
                    error_msg = line.strip()
                    break

        # Log subject status for summary
        if args.quiet:
            log_group_subject_status(subject_id, "failed", duration_str, error_msg)

        return False, ""

setup_parser

setup_parser()

Set up command line argument parser.

Source code in tit/analyzer/group_analyzer.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def setup_parser():
    """Set up command line argument parser."""
    parser = argparse.ArgumentParser(description="Run analysis across multiple subjects and compare results")

    # Analysis parameters (same as main_analyzer.py)
    parser.add_argument("--space", required=True, choices=['mesh', 'voxel'],
                        help="Analysis space: mesh or voxel")
    parser.add_argument("--analysis_type", required=True, choices=['spherical', 'cortical'],
                        help="Type of analysis to perform")

    # Analysis‐specific parameters
    parser.add_argument("--atlas_name",
                        help="Atlas name for mesh-based cortical analysis (e.g., DK40)")
    parser.add_argument("--coordinates", nargs=3,
                        help="x y z coordinates for spherical analysis")
    parser.add_argument("--coordinate-space", choices=['MNI', 'subject'],
                        help="Coordinate space of the input coordinates (MNI or subject) - required for spherical analysis")
    parser.add_argument("--radius", type=float,
                        help="Radius for spherical analysis")
    parser.add_argument("--region",
                        help="Region name for cortical analysis")
    parser.add_argument("--whole_head", action="store_true",
                        help="Analyze the whole head instead of a specific region")

    # Subject specification; for voxel‐cortical we expect an extra atlas_path
    parser.add_argument("--subject", action="append", nargs='+', metavar="ARG",
                        help="Subject specification: subject_id m2m_path field_path [atlas_path] "
                             "(atlas_path required for voxel-based cortical analysis)")

    # Output and comparison options
    parser.add_argument("--output_dir", required=True,
                        help="Directory for legacy group analysis outputs (comprehensive results go to centralized location)")
    parser.add_argument("--quiet", action="store_true",
                        help="Enable summary mode for clean output (non-debug mode)")
    parser.add_argument("--no-compare", action="store_true",
                        help="Skip comparison analysis after all subjects are processed (comparison runs by default)")
    parser.add_argument("--visualize", action="store_true",
                        help="Generate visualization outputs for each analysis")

    return parser

validate_args

validate_args(args)

Validate command line arguments.

Source code in tit/analyzer/group_analyzer.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def validate_args(args):
    """Validate command line arguments."""
    if not args.subject or len(args.subject) == 0:
        raise ValueError("At least one --subject must be specified")

    # Validate analysis‐specific arguments
    if args.analysis_type == 'spherical':
        if not args.coordinates:
            raise ValueError("Coordinates are required for spherical analysis")
        if args.radius is None:
            raise ValueError("Radius is required for spherical analysis")
        if not args.coordinate_space:
            raise ValueError("Coordinate space (--coordinate-space) is required for spherical analysis")

        # Spherical: each subject must supply exactly (id, m2m_path, field_path)
        for i, subject_args in enumerate(args.subject):
            if len(subject_args) != 3:
                raise ValueError(
                    f"Subject {i+1}: Spherical analysis requires exactly 3 arguments: "
                    "subject_id m2m_path field_path"
                )

    else:  # cortical
        if args.space == 'mesh':
            if not args.atlas_name:
                raise ValueError("Atlas name is required for mesh-based cortical analysis")
            # Mesh‐cortical: each subject still has (id, m2m_path, field_path)
            for i, subject_args in enumerate(args.subject):
                if len(subject_args) != 3:
                    raise ValueError(
                        f"Subject {i+1}: Mesh cortical analysis requires exactly 3 arguments: "
                        "subject_id m2m_path field_path"
                    )
        else:
            # Voxel‐cortical: each subject must supply (id, m2m_path, field_path, atlas_path)
            for i, subject_args in enumerate(args.subject):
                if len(subject_args) != 4:
                    raise ValueError(
                        f"Subject {i+1}: Voxel cortical analysis requires exactly 4 arguments: "
                        "subject_id m2m_path field_path atlas_path"
                    )

        if not args.whole_head and not args.region:
            raise ValueError("Either --whole_head flag or --region must be specified for cortical analysis")

    # Validate space‐specific: mesh needs a field_name (now hardcoded)
    # Field name is now hardcoded to "TI_max", no validation needed

    # Validate existence of provided paths
    for subject_args in args.subject:
        subject_id = subject_args[0]
        m2m_path = subject_args[1]
        field_path = subject_args[2]

        if not os.path.isdir(m2m_path):
            raise ValueError(f"Subject {subject_id} m2m directory not found: {m2m_path}")
        # For mesh analysis, field_path is actually a montage name, not a file path
        # Skip file existence check for mesh analysis
        if args.space != 'mesh' and not os.path.exists(field_path):
            raise ValueError(f"Subject {subject_id} field file not found: {field_path}")

        if args.space == 'voxel' and args.analysis_type == 'cortical':
            atlas_path = subject_args[3]
            if not os.path.exists(atlas_path):
                raise ValueError(f"Subject {subject_id} atlas file not found: {atlas_path}")