def run_pipeline(
project_dir: str,
subject_ids: Iterable[str],
*,
convert_dicom: bool = False,
run_recon: bool = False,
parallel_recon: bool = False,
parallel_cores: int | None = None,
create_m2m: bool = False,
run_tissue_analysis: bool = False,
run_qsiprep: bool = False,
run_qsirecon: bool = False,
qsiprep_config: dict | None = None,
qsi_recon_config: dict | None = None,
extract_dti: bool = False,
run_subcortical_segmentations: bool = False,
debug: bool = False,
stop_event: object | None = None,
logger_callback: Callable | None = None,
runner: CommandRunner | None = None,
) -> int:
"""Run the preprocessing pipeline for one or more subjects.
Parameters
----------
project_dir : str
BIDS project root.
subject_ids : iterable of str
Subject identifiers without the `sub-` prefix.
convert_dicom : bool, optional
Run DICOM to NIfTI conversion.
run_recon : bool, optional
Run FreeSurfer recon-all.
parallel_recon : bool, optional
Run recon-all in parallel across subjects.
parallel_cores : int, optional
Max parallel subjects for recon-all.
create_m2m : bool, optional
Run SimNIBS charm (also runs subject_atlas for .annot files).
run_tissue_analysis : bool, optional
Run tissue analysis pipeline.
run_qsiprep : bool, optional
Run QSIPrep DWI preprocessing via Docker.
run_qsirecon : bool, optional
Run QSIRecon reconstruction via Docker.
qsi_recon_specs : iterable of str, optional
QSIRecon reconstruction specs to run. Default: ['dipy_dki'].
extract_dti : bool, optional
Extract DTI tensor for SimNIBS anisotropic conductivity.
run_subcortical_segmentations : bool, optional
Run thalamic nuclei and hippocampal subfield segmentations (standalone).
debug : bool, optional
Enable verbose logging.
stop_event : object, optional
Event used to cancel running steps.
logger_callback : callable, optional
Callback used by GUI to capture log lines.
runner : CommandRunner, optional
Subprocess runner used to stream output.
Returns
-------
int
0 on success, 1 on failure.
"""
subject_list = [str(s).strip() for s in subject_ids if str(s).strip()]
if not subject_list:
raise PreprocessError("No subjects provided.")
pm = get_path_manager(project_dir)
for sid in subject_list:
ensure_subject_dirs(project_dir, sid)
datasets = {"ti-toolbox"}
if run_recon:
datasets.add("freesurfer")
if create_m2m:
datasets.add("simnibs")
ensure_dataset_descriptions(project_dir, datasets)
if runner is None:
runner = CommandRunner(stop_event=stop_event)
elif stop_event is not None and runner.stop_event is not stop_event:
runner.stop_event = stop_event
if parallel_recon and run_recon and len(subject_list) > 1:
for sid in subject_list:
_run_subject_pipeline(
project_dir,
sid,
convert_dicom=convert_dicom,
run_recon=False,
parallel_recon=parallel_recon,
create_m2m=create_m2m,
run_tissue=False,
run_qsiprep_step=False,
run_qsirecon_step=False,
qsiprep_config=qsiprep_config,
qsi_recon_config=qsi_recon_config,
extract_dti_step=False,
run_subcortical=False,
debug=debug,
runner=runner,
callback=logger_callback,
)
max_workers = parallel_cores or os.cpu_count() or 1
max_workers = min(max_workers, len(subject_list))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for sid in subject_list:
futures.append(
executor.submit(
_run_subject_pipeline,
project_dir,
sid,
convert_dicom=False,
run_recon=True,
parallel_recon=True,
create_m2m=False,
run_tissue=False,
run_qsiprep_step=False,
run_qsirecon_step=False,
qsiprep_config=qsiprep_config,
qsi_recon_config=qsi_recon_config,
extract_dti_step=False,
run_subcortical=False,
debug=debug,
runner=runner,
callback=logger_callback,
)
)
for future in as_completed(futures):
future.result()
if run_tissue_analysis:
for sid in subject_list:
_run_subject_pipeline(
project_dir,
sid,
convert_dicom=False,
run_recon=False,
parallel_recon=parallel_recon,
create_m2m=False,
run_tissue=True,
run_qsiprep_step=False,
run_qsirecon_step=False,
qsiprep_config=qsiprep_config,
qsi_recon_config=qsi_recon_config,
extract_dti_step=False,
run_subcortical=False,
debug=debug,
runner=runner,
callback=logger_callback,
)
# Run QSI steps after tissue analysis (if enabled)
if run_qsiprep or run_qsirecon or extract_dti:
for sid in subject_list:
_run_subject_pipeline(
project_dir,
sid,
convert_dicom=False,
run_recon=False,
parallel_recon=parallel_recon,
create_m2m=False,
run_tissue=False,
run_qsiprep_step=run_qsiprep,
run_qsirecon_step=run_qsirecon,
qsiprep_config=qsiprep_config,
qsi_recon_config=qsi_recon_config,
extract_dti_step=extract_dti,
run_subcortical=False,
debug=debug,
runner=runner,
callback=logger_callback,
)
if run_subcortical_segmentations:
for sid in subject_list:
_run_subject_pipeline(
project_dir,
sid,
convert_dicom=False,
run_recon=False,
parallel_recon=parallel_recon,
create_m2m=False,
run_tissue=False,
run_qsiprep_step=False,
run_qsirecon_step=False,
qsiprep_config=qsiprep_config,
qsi_recon_config=qsi_recon_config,
extract_dti_step=False,
run_subcortical=True,
debug=debug,
runner=runner,
callback=logger_callback,
)
else:
for sid in subject_list:
_run_subject_pipeline(
project_dir,
sid,
convert_dicom=convert_dicom,
run_recon=run_recon,
parallel_recon=parallel_recon,
create_m2m=create_m2m,
run_tissue=run_tissue_analysis,
run_qsiprep_step=run_qsiprep,
run_qsirecon_step=run_qsirecon,
qsiprep_config=qsiprep_config,
qsi_recon_config=qsi_recon_config,
extract_dti_step=extract_dti,
run_subcortical=run_subcortical_segmentations,
debug=debug,
runner=runner,
callback=logger_callback,
)
# Generate HTML reports for each subject
from tit.reporting import PreprocessingReportGenerator
for sid in subject_list:
report_gen = PreprocessingReportGenerator(
project_dir=project_dir,
subject_id=sid,
)
# Add processing steps based on what was run
if convert_dicom:
report_gen.add_processing_step(
step_name="DICOM Conversion",
description="Convert DICOM files to NIfTI format",
status="completed",
)
if create_m2m:
report_gen.add_processing_step(
step_name="SimNIBS charm",
description="Create head mesh model for simulations",
status="completed",
)
report_gen.add_processing_step(
step_name="Subject Atlas Segmentation",
description="Generate atlas-based parcellation",
status="completed",
)
if run_recon:
report_gen.add_processing_step(
step_name="FreeSurfer recon-all",
description="Cortical surface reconstruction",
status="completed",
)
if run_tissue_analysis:
report_gen.add_processing_step(
step_name="Tissue Analysis",
description="Tissue segmentation and analysis",
status="completed",
)
if run_qsiprep:
report_gen.add_processing_step(
step_name="QSIPrep",
description="Diffusion MRI preprocessing",
status="completed",
)
if run_qsirecon:
report_gen.add_processing_step(
step_name="QSIRecon",
description="Diffusion MRI reconstruction",
status="completed",
)
if extract_dti:
report_gen.add_processing_step(
step_name="DTI Tensor Extraction",
description="Extract DTI tensors for anisotropic conductivity",
status="completed",
)
if run_subcortical_segmentations:
report_gen.add_processing_step(
step_name="Subcortical Segmentations",
description="Thalamic nuclei and hippocampal subfield segmentations",
status="completed",
)
report_gen.scan_for_data()
report_path = report_gen.generate()
if logger_callback:
logger_callback(f"Report generated: {report_path}", "info")
return 0