Skip to content

utils

tit.pre.utils

Utility helpers for the tit.pre package.

Provides subject discovery, BIDS directory scaffolding, dataset-description management, subprocess execution with cancellation support, and shared exception classes.

Public API

discover_subjects Return sorted subject IDs found in a BIDS project tree. check_m2m_exists Check whether a SimNIBS m2m directory exists.

See Also

tit.pre : Package-level overview and convenience re-exports.

PreprocessError

Bases: RuntimeError

Raised when a preprocessing step fails.

See Also

PreprocessCancelled : Raised specifically when cancelled by a stop event.

PreprocessCancelled

Bases: RuntimeError

Raised when a preprocessing run is cancelled via a stop event.

See Also

PreprocessError : General preprocessing failure.

CommandRunner

CommandRunner(stop_event: Event | None = None)

Run subprocesses with cancellation and streaming log output.

Wraps subprocess.Popen to stream stdout/stderr line-by-line to a logger while honouring a threading.Event for cancellation.

Parameters

stop_event : threading.Event or None, optional Event that, when set, causes any running command to be terminated. A fresh event is created if None.

Attributes

stop_event : threading.Event The cancellation event.

See Also

PreprocessCancelled : Exception raised when a command is cancelled.

Source code in tit/pre/utils.py
def __init__(self, stop_event: threading.Event | None = None) -> None:
    self.stop_event = stop_event or threading.Event()
    self._lock = threading.Lock()
    self._processes: set[subprocess.Popen] = set()
    self.last_output_lines: list[str] = []

request_stop

request_stop() -> None

Signal cancellation and terminate all running processes.

Source code in tit/pre/utils.py
def request_stop(self) -> None:
    """Signal cancellation and terminate all running processes."""
    self.stop_event.set()
    self.terminate_all()

terminate_all

terminate_all() -> None

Terminate all tracked child processes.

Source code in tit/pre/utils.py
def terminate_all(self) -> None:
    """Terminate all tracked child processes."""
    with self._lock:
        procs = list(self._processes)
    for proc in procs:
        _terminate_process(proc)

run

run(cmd: Sequence[str], *, logger: Logger, cwd: str | None = None, env: dict | None = None) -> int

Execute cmd and stream its output to logger.

Parameters

cmd : sequence of str Command and arguments. logger : logging.Logger Logger that receives each output line at INFO level. cwd : str or None, optional Working directory for the subprocess. env : dict or None, optional Environment variables for the subprocess.

Returns

int Process exit code.

Raises

PreprocessCancelled If stop_event is set before or during execution. ValueError If cmd is empty.

Source code in tit/pre/utils.py
def run(
    self,
    cmd: Sequence[str],
    *,
    logger: logging.Logger,
    cwd: str | None = None,
    env: dict | None = None,
) -> int:
    """Execute *cmd* and stream its output to *logger*.

    Parameters
    ----------
    cmd : sequence of str
        Command and arguments.
    logger : logging.Logger
        Logger that receives each output line at INFO level.
    cwd : str or None, optional
        Working directory for the subprocess.
    env : dict or None, optional
        Environment variables for the subprocess.

    Returns
    -------
    int
        Process exit code.

    Raises
    ------
    PreprocessCancelled
        If ``stop_event`` is set before or during execution.
    ValueError
        If *cmd* is empty.
    """
    if self.stop_event.is_set():
        raise PreprocessCancelled("Pre-processing cancelled before command start.")

    if not cmd:
        raise ValueError("Command is empty.")

    logger.debug(f"Command: {' '.join(cmd)}")
    output_tail: deque[str] = deque(maxlen=20)
    self.last_output_lines = []

    preexec_fn = os.setsid if os.name != "nt" else None
    proc = subprocess.Popen(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        bufsize=1,
        cwd=cwd,
        env=env,
        preexec_fn=preexec_fn,
    )

    with self._lock:
        self._processes.add(proc)

    try:
        if proc.stdout:
            for line in iter(proc.stdout.readline, ""):
                if self.stop_event.is_set():
                    _terminate_process(proc)
                    raise PreprocessCancelled("Pre-processing cancelled.")
                line = line.strip()
                if line:
                    output_tail.append(line)
                    logger.info(line)
        returncode = proc.wait()
    finally:
        self.last_output_lines = list(output_tail)
        with self._lock:
            self._processes.discard(proc)

    return returncode

discover_subjects

discover_subjects(project_dir: str | None) -> list[str]

Return sorted, deduplicated subject IDs found in a BIDS project tree.

Returns an empty list when project_dir is None (project not configured).

Discovery order:

  1. sourcedata/sub-*/T1w/ or T2w/ -- any subdir, NIfTI, DICOM, or supported DICOM archive (.zip, .tar, .tar.gz, .tgz).
  2. sourcedata/sub-*/*.tgz (compressed bundles at top level).
  3. sub-*/anat/*T1w*.nii[.gz] or *T2w*.nii[.gz] at project root.
Parameters

project_dir : str BIDS project root directory.

Returns

list[str] Sorted list of subject identifiers (without the sub- prefix).

See Also

check_m2m_exists : Check whether a subject's m2m directory exists.

Source code in tit/pre/utils.py
def discover_subjects(project_dir: str | None) -> list[str]:
    """Return sorted, deduplicated subject IDs found in a BIDS project tree.

    Returns an empty list when *project_dir* is ``None`` (project not
    configured).

    Discovery order:

    1. ``sourcedata/sub-*/T1w/`` or ``T2w/`` -- any subdir, NIfTI, DICOM,
       or supported DICOM archive (``.zip``, ``.tar``, ``.tar.gz``, ``.tgz``).
    2. ``sourcedata/sub-*/*.tgz`` (compressed bundles at top level).
    3. ``sub-*/anat/*T1w*.nii[.gz]`` or ``*T2w*.nii[.gz]`` at project root.

    Parameters
    ----------
    project_dir : str
        BIDS project root directory.

    Returns
    -------
    list[str]
        Sorted list of subject identifiers (without the ``sub-`` prefix).

    See Also
    --------
    check_m2m_exists : Check whether a subject's m2m directory exists.
    """
    if project_dir is None:
        return []

    found: list[str] = []

    sourcedata_dir = os.path.join(project_dir, "sourcedata")
    if os.path.exists(sourcedata_dir):
        for subj_dir in glob.glob(os.path.join(sourcedata_dir, "sub-*")):
            if os.path.isdir(subj_dir):
                t1w_dir = os.path.join(subj_dir, "T1w")
                t2w_dir = os.path.join(subj_dir, "T2w")

                supported_modality_files = (
                    ".dcm",
                    ".dicom",
                    ".zip",
                    ".tar",
                    ".tar.gz",
                    ".tgz",
                    ".json",
                    ".nii",
                    ".nii.gz",
                )
                has_valid_structure = (
                    (
                        os.path.exists(t1w_dir)
                        and (
                            any(
                                os.path.isdir(os.path.join(t1w_dir, d))
                                for d in os.listdir(t1w_dir)
                            )
                            or any(
                                f.lower().endswith(supported_modality_files)
                                for f in os.listdir(t1w_dir)
                            )
                        )
                    )
                    or (
                        os.path.exists(t2w_dir)
                        and (
                            any(
                                os.path.isdir(os.path.join(t2w_dir, d))
                                for d in os.listdir(t2w_dir)
                            )
                            or any(
                                f.lower().endswith(supported_modality_files)
                                for f in os.listdir(t2w_dir)
                            )
                        )
                    )
                    or any(f.endswith(".tgz") for f in os.listdir(subj_dir))
                )

                if has_valid_structure:
                    subject_id = os.path.basename(subj_dir).replace("sub-", "")
                    found.append(subject_id)

    for subj_dir in glob.glob(os.path.join(project_dir, "sub-*")):
        if os.path.isdir(subj_dir):
            subject_id = os.path.basename(subj_dir).replace("sub-", "")
            if subject_id in found:
                continue
            anat_dir = os.path.join(subj_dir, "anat")
            if os.path.exists(anat_dir):
                has_nifti = any(
                    f.endswith((".nii", ".nii.gz")) and ("T1w" in f or "T2w" in f)
                    for f in os.listdir(anat_dir)
                )
                if has_nifti:
                    found.append(subject_id)

    return sorted(found)

check_m2m_exists

check_m2m_exists(project_dir: str, subject_id: str) -> bool

Return True if the SimNIBS m2m directory already exists.

Checks for <project_dir>/derivatives/SimNIBS/sub-<subject_id>/m2m_<subject_id>.

Parameters

project_dir : str BIDS project root directory. subject_id : str Subject identifier without the sub- prefix.

Returns

bool True if the m2m directory exists on disk.

See Also

discover_subjects : Find all subject IDs in a BIDS project. run_charm : Generate the m2m head mesh.

Source code in tit/pre/utils.py
def check_m2m_exists(project_dir: str, subject_id: str) -> bool:
    """Return ``True`` if the SimNIBS m2m directory already exists.

    Checks for
    ``<project_dir>/derivatives/SimNIBS/sub-<subject_id>/m2m_<subject_id>``.

    Parameters
    ----------
    project_dir : str
        BIDS project root directory.
    subject_id : str
        Subject identifier without the ``sub-`` prefix.

    Returns
    -------
    bool
        ``True`` if the m2m directory exists on disk.

    See Also
    --------
    discover_subjects : Find all subject IDs in a BIDS project.
    run_charm : Generate the m2m head mesh.
    """
    m2m_dir = os.path.join(
        project_dir,
        "derivatives",
        "SimNIBS",
        f"sub-{subject_id}",
        f"m2m_{subject_id}",
    )
    return os.path.exists(m2m_dir)

ensure_subject_dirs

ensure_subject_dirs(project_dir: str, subject_id: str) -> None

Create the standard BIDS directory scaffold for a subject.

Source code in tit/pre/utils.py
def ensure_subject_dirs(project_dir: str, subject_id: str) -> None:
    """Create the standard BIDS directory scaffold for a subject."""
    pm = get_path_manager(project_dir)
    for modality in ("T1w", "T2w"):
        pm.ensure(pm.sourcedata_dicom(subject_id, modality))
    pm.ensure(pm.bids_anat(subject_id))
    pm.ensure(pm.sub(subject_id))
    pm.ensure(pm.ti_toolbox())

ensure_dataset_descriptions

ensure_dataset_descriptions(project_dir: str, datasets: Iterable[str]) -> None

Create or update dataset_description.json for each dataset.

Source code in tit/pre/utils.py
def ensure_dataset_descriptions(project_dir: str, datasets: Iterable[str]) -> None:
    """Create or update ``dataset_description.json`` for each dataset."""
    project_name = Path(project_dir).name
    today = date.today().strftime("%Y-%m-%d")
    repo_root = Path(__file__).resolve().parents[2]
    assets_dir = repo_root / "resources" / "dataset_descriptions"

    for dataset in datasets:
        template_name = DATASET_TEMPLATES.get(dataset)
        if not template_name:
            continue
        template_path = assets_dir / template_name
        target_path = _dataset_description_target(project_dir, dataset)
        target_path.parent.mkdir(parents=True, exist_ok=True)

        if not target_path.exists():
            if template_path.exists():
                target_path.write_text(
                    template_path.read_text(encoding="utf-8"), encoding="utf-8"
                )
            else:
                target_path.write_text(
                    json.dumps(
                        {
                            "Name": f"{dataset} derivatives",
                            "BIDSVersion": "1.10.0",
                            "DatasetType": "derivative",
                            "SourceDatasets": [{"URI": ""}],
                            "DatasetLinks": {},
                        },
                        indent=2,
                    ),
                    encoding="utf-8",
                )

        payload = json.loads(target_path.read_text(encoding="utf-8"))

        if not payload.get("Name"):
            payload["Name"] = project_name

        uri_value = f"bids:{project_name}@{today}"
        source_datasets = payload.get("SourceDatasets")
        if isinstance(source_datasets, list) and source_datasets:
            if isinstance(source_datasets[0], dict):
                if not source_datasets[0].get("URI"):
                    source_datasets[0]["URI"] = uri_value
        elif "URI" in payload and not payload.get("URI"):
            payload["URI"] = uri_value

        dataset_links = payload.get("DatasetLinks")
        if isinstance(dataset_links, dict) and not dataset_links:
            payload["DatasetLinks"] = {project_name: "../../"}

        target_path.write_text(json.dumps(payload, indent=2), encoding="utf-8")

build_logger

build_logger(step_name: str, subject_id: str, project_dir: str, *, log_file: str | None = None, console: bool = True) -> Logger

Create a named logger with a file handler for a preprocessing step.

Source code in tit/pre/utils.py
def build_logger(
    step_name: str,
    subject_id: str,
    project_dir: str,
    *,
    log_file: str | None = None,
    console: bool = True,
) -> logging.Logger:
    """Create a named logger with a file handler for a preprocessing step."""
    from tit.logger import add_file_handler

    pm = get_path_manager(project_dir)
    log_dir = pm.logs(subject_id)
    os.makedirs(log_dir, exist_ok=True)
    if log_file is None:
        timestamp = time.strftime("%Y%m%d_%H%M%S")
        log_file = os.path.join(log_dir, f"{step_name}_{timestamp}.log")

    logger_name = f"tit.pre.{step_name}.{subject_id}"
    add_file_handler(log_file, logger_name=logger_name)
    return logging.getLogger(logger_name)