Skip to content

Optimization (tit.opt)

TI-Toolbox optimization package

Leadfield (tit.opt.leadfield)

Leadfield matrix generator for optimization and other applications Integrates with SimNIBS to create leadfield matrices

LeadfieldGenerator

LeadfieldGenerator(subject_dir, electrode_cap='EEG10-10', progress_callback=None, termination_flag=None)

Generate and load leadfield matrices for TI optimization

This class provides a unified interface for leadfield generation and management, supporting both HDF5 and NPY formats with consistent naming conventions.

Initialize leadfield generator

Args: subject_dir: Path to subject directory (m2m folder) or subject_id electrode_cap: Electrode cap type (e.g., 'EEG10-10', 'GSN-256') progress_callback: Optional callback function(message, type) for progress updates termination_flag: Optional callable that returns True if generation should be terminated

Source code in tit/opt/leadfield.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def __init__(self, subject_dir, electrode_cap='EEG10-10', progress_callback=None, termination_flag=None):
    """
    Initialize leadfield generator

    Args:
        subject_dir: Path to subject directory (m2m folder) or subject_id
        electrode_cap: Electrode cap type (e.g., 'EEG10-10', 'GSN-256')
        progress_callback: Optional callback function(message, type) for progress updates
        termination_flag: Optional callable that returns True if generation should be terminated
    """
    self.subject_dir = Path(subject_dir)
    self.electrode_cap = electrode_cap
    self._progress_callback = progress_callback
    self._termination_flag = termination_flag
    self._simnibs_process = None

    # Initialize PathManager
    self.pm = get_path_manager()

    # Extract subject_id from subject_dir path
    self.subject_id = self.subject_dir.name.replace('m2m_', '')

    # Initialize leadfield data attributes
    self.lfm = None  # Leadfield matrix
    self.positions = None  # Electrode positions

    # Setup logger
    if progress_callback is None:
        pm = self.pm
        # Prefer a project-scoped log file when the project dir is resolved.
        # In unit tests, PathManager is frequently mocked, so avoid filesystem writes.
        log_file: Optional[str] = None
        project_dir = getattr(pm, "project_dir", None)
        if isinstance(project_dir, str) and project_dir and os.path.isdir(project_dir):
            logs_dir = os.path.join(project_dir, "derivatives", "ti-toolbox", "logs")
            os.makedirs(logs_dir, exist_ok=True)
            log_file = os.path.join(logs_dir, "leadfield_generator.log")

        self.logger = logging_util.get_logger("LeadfieldGenerator", log_file=log_file, overwrite=False, console=True)
        logging_util.configure_external_loggers(['simnibs', 'mesh_io'], self.logger)
    else:
        self.logger = None

cleanup_old_simulations

cleanup_old_simulations()

Clean up old SimNIBS simulation files, temporary directories, and ROI mesh files.

Source code in tit/opt/leadfield.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def cleanup_old_simulations(self):
    """Clean up old SimNIBS simulation files, temporary directories, and ROI mesh files."""
    import glob
    import shutil

    self._log("Checking for old simulation files...", 'info')

    # Remove old simulation .mat files
    old_sim_files = glob.glob(str(self.subject_dir / "simnibs_simulation*.mat"))
    if old_sim_files:
        self._log(f"  Found {len(old_sim_files)} old simulation file(s), cleaning up...", 'info')
        for sim_file in old_sim_files:
            try:
                os.remove(sim_file)
                self._log(f"  Removed: {os.path.basename(sim_file)}", 'info')
            except Exception as e:
                self._log(f"  Warning: Could not remove {os.path.basename(sim_file)}: {e}", 'warning')

    # Remove temporary leadfield directory
    temp_leadfield_dir = self.subject_dir / 'leadfield'
    if temp_leadfield_dir.exists():
        self._log("  Removing old temporary leadfield directory...", 'info')
        try:
            shutil.rmtree(temp_leadfield_dir)
            self._log("  Removed: leadfield/", 'info')
        except Exception as e:
            self._log(f"  Warning: Could not remove leadfield directory: {e}", 'warning')

    # Remove ROI mesh file
    subject_id = self.subject_dir.name.replace('m2m_', '')
    roi_file = self.subject_dir / f"{subject_id}_ROI.msh"
    if roi_file.exists():
        self._log("  Removing old ROI mesh file...", 'info')
        try:
            os.remove(roi_file)
            self._log(f"  Removed: {roi_file.name}", 'info')
        except Exception as e:
            self._log(f"  Warning: Could not remove {roi_file.name}: {e}", 'warning')

generate_leadfield

generate_leadfield(output_dir=None, tissues=[1, 2], eeg_cap_path=None, cleanup=True)

Generate leadfield matrix using SimNIBS

Args: output_dir: Output directory for leadfield (default: subject_dir) tissues: Tissue types to include [1=GM, 2=WM] eeg_cap_path: Path to EEG cap CSV file (optional, will look in eeg_positions if not provided) cleanup: Whether to clean up old simulation files before running (default: True)

Returns: dict: Dictionary with path {'hdf5': hdf5_path}

Source code in tit/opt/leadfield.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def generate_leadfield(self, output_dir=None, tissues=[1, 2], eeg_cap_path=None, cleanup=True):
    """
    Generate leadfield matrix using SimNIBS

    Args:
        output_dir: Output directory for leadfield (default: subject_dir)
        tissues: Tissue types to include [1=GM, 2=WM]
        eeg_cap_path: Path to EEG cap CSV file (optional, will look in eeg_positions if not provided)
        cleanup: Whether to clean up old simulation files before running (default: True)

    Returns:
        dict: Dictionary with path {'hdf5': hdf5_path}
    """
    from simnibs import sim_struct
    import simnibs

    if output_dir is None:
        # Use PathManager to get leadfield directory
        output_dir = self.pm.path_optional("leadfields", subject_id=self.subject_id)
        if output_dir is None:
            # Fallback to manual construction if PathManager doesn't find it
            output_dir = self.subject_dir.parent / "leadfields"
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    # Clean up old simulation files in output directory
    if cleanup:
        self._cleanup_output_dir(output_dir)

    # Setup SimNIBS leadfield calculation
    tdcs_lf = sim_struct.TDCSLEADFIELD()

    # Find mesh file - try multiple naming conventions
    # Common patterns: {subject_id}.msh, m2m_{subject_id}.msh, {subject_dir_name}.msh
    subject_id = self.subject_dir.name.replace('m2m_', '')  # Extract subject ID

    possible_mesh_names = [
        f"{subject_id}.msh",  # Most common: 101.msh
        f"{self.subject_dir.name}.msh",  # m2m_101.msh
        "final.msh",  # Sometimes used
    ]

    mesh_file = None
    for mesh_name in possible_mesh_names:
        candidate = self.subject_dir / mesh_name
        if candidate.exists():
            mesh_file = candidate
            self._log(f"Found mesh file: {mesh_file}", 'info')
            break

    if mesh_file is None:
        # List available .msh files to help debug
        msh_files = list(self.subject_dir.glob("*.msh"))
        error_msg = f"Mesh file not found in {self.subject_dir}\n"
        error_msg += f"Tried: {', '.join(possible_mesh_names)}\n"
        if msh_files:
            error_msg += f"Available .msh files: {', '.join([f.name for f in msh_files])}"
        else:
            error_msg += "No .msh files found in directory"
        raise FileNotFoundError(error_msg)

    tdcs_lf.fnamehead = str(mesh_file)
    tdcs_lf.subpath = str(self.subject_dir)
    tdcs_lf.pathfem = str(output_dir)
    tdcs_lf.interpolation = None
    tdcs_lf.map_to_surf = False
    tdcs_lf.tissues = tissues

    # Set EEG cap path if provided
    if eeg_cap_path:
        if not Path(eeg_cap_path).exists():
            raise FileNotFoundError(f"EEG cap file not found: {eeg_cap_path}")
        tdcs_lf.eeg_cap = str(eeg_cap_path)
        self._log(f"Using EEG cap: {Path(eeg_cap_path).name}", 'info')
    elif self.electrode_cap and self.electrode_cap != 'EEG10-10':
        # Try to find in eeg_positions directory using PathManager
        eeg_positions_dir = self.pm.path_optional("eeg_positions", subject_id=self.subject_id)
        if eeg_positions_dir and os.path.exists(eeg_positions_dir):
            cap_file = Path(eeg_positions_dir) / f"{self.electrode_cap}.csv"
            if cap_file.exists():
                tdcs_lf.eeg_cap = str(cap_file)
                self._log(f"Found EEG cap: {cap_file.name}", 'info')

    # Clean up old files if requested
    if cleanup:
        self.cleanup_old_simulations()

    self._log(f"Generating leadfield matrix for {self.subject_dir.name}...", 'info')
    self._log(f"Electrode cap: {self.electrode_cap if self.electrode_cap else 'Default'}", 'info')
    self._log(f"Tissues: {tissues} (1=GM, 2=WM)", 'info')
    self._log(f"Mesh file: {mesh_file.name}", 'info')
    self._log("Setting up SimNIBS leadfield calculation...", 'info')

    # Redirect SimNIBS output to GUI console via callback (not terminal)
    import sys
    from io import StringIO
    import logging

    # Suppress SimNIBS console output by redirecting stdout/stderr to StringIO
    old_stdout = sys.stdout
    old_stderr = sys.stderr
    stdout_capture = StringIO()
    stderr_capture = StringIO()
    sys.stdout = stdout_capture
    sys.stderr = stderr_capture

    # Configure SimNIBS logger to use callback if available
    simnibs_logger = logging.getLogger('simnibs')
    old_simnibs_handlers = simnibs_logger.handlers[:]

    if self._progress_callback:
        # Remove console handlers from SimNIBS logger
        logging_util.suppress_console_output(simnibs_logger)

        # Add callback handler to redirect to GUI
        logging_util.add_callback_handler(simnibs_logger, self._progress_callback, logging.INFO)

    # Run SimNIBS with termination checks
    simnibs_error = None
    try:
        # Check for termination before starting
        if self._termination_flag and self._termination_flag():
            self._log("Leadfield generation cancelled before starting", 'warning')
            raise InterruptedError("Leadfield generation was cancelled before starting")

        # Note: SimNIBS runs MPI processes that cannot be interrupted mid-execution
        # The termination check will take effect after SimNIBS completes
        self._log("Running SimNIBS (this cannot be interrupted mid-execution)...", 'info')
        simnibs.run_simnibs(tdcs_lf)

        # Check for termination after SimNIBS finishes
        if self._termination_flag and self._termination_flag():
            self._log("Leadfield generation cancelled after SimNIBS execution", 'warning')
            raise InterruptedError("Leadfield generation was cancelled")

    except Exception as e:
        simnibs_error = e
    finally:
        # Restore stdout/stderr
        sys.stdout = old_stdout
        sys.stderr = old_stderr

        # Restore SimNIBS logger handlers
        if self._progress_callback:
            simnibs_logger.handlers = old_simnibs_handlers

        # Send any captured stdout/stderr to callback (fallback for print statements)
        if self._progress_callback:
            stdout_text = stdout_capture.getvalue()
            stderr_text = stderr_capture.getvalue()
            if stdout_text.strip():
                for line in stdout_text.strip().split('\n'):
                    if line.strip():
                        self._log(line, 'info')
            if stderr_text.strip():
                for line in stderr_text.strip().split('\n'):
                    if line.strip():
                        self._log(line, 'warning')

    # Re-raise any error that occurred during SimNIBS run
    if simnibs_error:
        raise simnibs_error

    self._log("SimNIBS leadfield computation completed", 'info')

    # Find generated HDF5 file
    self._log("Processing generated leadfield files...", 'info')
    hdf5_files = list(output_dir.glob('*.hdf5'))
    if not hdf5_files:
        raise FileNotFoundError(f"No HDF5 leadfield file found in {output_dir}")

    hdf5_path = hdf5_files[0]

    # Use the filename that SimNIBS generated (simplified naming - no renaming)
    self._log(f"Leadfield generated: {hdf5_path}", 'success')

    result = {'hdf5': str(hdf5_path)}

    return result

get_electrode_names_from_cap

get_electrode_names_from_cap(cap_name=None)

Extract electrode names from an EEG cap CSV file using simnibs csv_reader.

Args: cap_name: Name of EEG cap (will look in subject_dir/eeg_positions/). If None, uses self.electrode_cap.

Returns: list: List of electrode names

Source code in tit/opt/leadfield.py
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
def get_electrode_names_from_cap(self, cap_name=None):
    """
    Extract electrode names from an EEG cap CSV file using simnibs csv_reader.

    Args:
        cap_name: Name of EEG cap (will look in subject_dir/eeg_positions/).
                 If None, uses self.electrode_cap.

    Returns:
        list: List of electrode names
    """
    if cap_name is None:
        cap_name = self.electrode_cap

    # Prefer SimNIBS reader when available; fall back to a lightweight CSV parse for tests.
    try:
        from simnibs.utils.csv_reader import eeg_positions  # type: ignore[import-not-found]

        eeg_pos = eeg_positions(str(self.subject_dir), cap_name=cap_name)
        electrodes = list(eeg_pos.keys())
        return sorted(electrodes)
    except Exception:
        cap = str(cap_name)
        if not cap.lower().endswith(".csv"):
            cap = f"{cap}.csv"
        cap_path = self.subject_dir / "eeg_positions" / cap
        if not cap_path.exists():
            raise OSError(f"Could not find EEG cap file: {cap_path}")

        labels: List[str] = []
        with cap_path.open("r", encoding="utf-8", errors="ignore") as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                parts = [p.strip() for p in line.split(",")]
                # expected SimNIBS format: Type,x,y,z,label
                if len(parts) < 2:
                    continue
                if parts[0] not in ("Electrode", "ReferenceElectrode"):
                    continue
                label = parts[-1].strip()
                if label:
                    labels.append(label)
        # stable unique
        return sorted(list(dict.fromkeys(labels)))

list_available_leadfields

list_available_leadfields(subject_id=None)

List available leadfield HDF5 files for a subject.

Args: subject_id: Subject ID (optional, will use self.subject_id if not provided)

Returns: list: List of tuples (net_name, hdf5_path, file_size_gb)

Source code in tit/opt/leadfield.py
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def list_available_leadfields(self, subject_id=None):
    """
    List available leadfield HDF5 files for a subject.

    Args:
        subject_id: Subject ID (optional, will use self.subject_id if not provided)

    Returns:
        list: List of tuples (net_name, hdf5_path, file_size_gb)
    """
    if subject_id is None:
        subject_id = self.subject_id

    # Use PathManager to get leadfield directory
    leadfields_dir = self.pm.path_optional("leadfields", subject_id=subject_id)

    leadfields = []
    if leadfields_dir and os.path.exists(leadfields_dir):
        leadfields_dir = Path(leadfields_dir)
        for item in leadfields_dir.iterdir():
            # Look for files matching pattern: {net_name}_leadfield.hdf5
            if item.is_file() and item.name.endswith("_leadfield.hdf5"):
                hdf5_file = item
                if hdf5_file.exists():
                    # Extract net name from filename pattern: {net_name}_leadfield.hdf5
                    net_name = item.name.replace("_leadfield.hdf5", "")
                    # Get file size in GB
                    try:
                        file_size = hdf5_file.stat().st_size / (1024**3)  # GB
                    except OSError:
                        file_size = 0.0
                    leadfields.append((net_name, str(hdf5_file), file_size))

    return sorted(leadfields, key=lambda x: x[0])

list_available_leadfields_hdf5

list_available_leadfields_hdf5(subject_id=None)

List available leadfield HDF5 files for a subject.

Args: subject_id: Subject ID (optional, will use self.subject_id if not provided)

Returns: list: List of tuples (net_name, hdf5_path, file_size_gb)

Source code in tit/opt/leadfield.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
def list_available_leadfields_hdf5(self, subject_id=None):
    """
    List available leadfield HDF5 files for a subject.

    Args:
        subject_id: Subject ID (optional, will use self.subject_id if not provided)

    Returns:
        list: List of tuples (net_name, hdf5_path, file_size_gb)
    """
    if subject_id is None:
        subject_id = self.subject_id

    # Use PathManager to get leadfield directory
    leadfields_dir = self.pm.path_optional("leadfields", subject_id=subject_id)

    leadfields = []
    if leadfields_dir and os.path.exists(leadfields_dir):
        leadfields_dir = Path(leadfields_dir)
        for item in leadfields_dir.iterdir():
            # Look for HDF5 files that contain "leadfield" in the name (flexible naming)
            if item.is_file() and item.name.endswith(".hdf5") and "leadfield" in item.name.lower():
                hdf5_file = item
                if hdf5_file.exists():
                    # Extract net name more flexibly - try different SimNIBS naming patterns
                    filename = item.name

                    # Try to extract net_name from various SimNIBS naming patterns
                    if "_leadfield_" in filename:
                        # Split by "_leadfield_" and take the part after it, remove .hdf5
                        parts = filename.split("_leadfield_")
                        if len(parts) == 2:
                            net_name = parts[1].replace(".hdf5", "")
                        else:
                            net_name = filename.replace("_leadfield_", "").replace(".hdf5", "")
                    elif filename.endswith("_leadfield.hdf5"):
                        # Standard pattern: {net_name}_leadfield.hdf5
                        net_name = filename.replace("_leadfield.hdf5", "")
                    else:
                        # Fallback: remove .hdf5 and try to clean up
                        net_name = filename.replace(".hdf5", "")

                    # Clean up the net_name (remove subject_id prefix if present)
                    if net_name.startswith(f"{subject_id}_"):
                        net_name = net_name.replace(f"{subject_id}_", "", 1)
                    elif net_name.startswith(f"{subject_id}"):
                        net_name = net_name.replace(f"{subject_id}", "", 1)

                    # Clean up extra underscores and empty parts
                    net_name = net_name.strip("_")
                    if not net_name:
                        net_name = "unknown"

                    # Get file size in GB
                    try:
                        file_size = hdf5_file.stat().st_size / (1024**3)  # GB
                    except OSError:
                        file_size = 0.0
                    leadfields.append((net_name, str(hdf5_file), file_size))

    return sorted(leadfields, key=lambda x: x[0])

Flex search (tit.opt.flex)

Flex-search optimization package for TI stimulation.

This package provides flexible optimization for temporal interference (TI) stimulation with support for different ROI definitions and optimization goals.

Modules: flex_config: Configuration and optimization setup flex_log: Logging utilities and progress tracking multi_start: Multi-start optimization logic and result management flex: Main optimization orchestration script

Note: roi module is now located at core.roi (shared across optimization approaches)

Main API (tit.opt.flex.flex)

Main flex-search script for TI stimulation optimization.

This script orchestrates the flexible search optimization process for temporal interference (TI) stimulation. It supports: - Multiple ROI methods (spherical, atlas-based, subcortical) - Multiple optimization goals (mean, max, focality) - Multi-start optimization for robust results - Optional electrode mapping to EEG cap positions

main

main() -> int

Main entry point for flex-search optimization.

Returns: Exit code (0 for success, 1 for failure)

Source code in tit/opt/flex/flex.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def main() -> int:
    """Main entry point for flex-search optimization.

    Returns:
        Exit code (0 for success, 1 for failure)
    """
    # Parse arguments
    args = flex_config.parse_arguments()

    # Track total session time
    start_time = time.time()

    # Multi-start optimization setup
    n_multistart = args.n_multistart
    optim_funvalue_list = np.zeros(n_multistart)

    # Build base optimization to get the output folder structure

    opt_base = flex_config.build_optimization(args)
    base_output_folder = opt_base.output_folder

    # Setup logger
    logger = flex_log.setup_logger(base_output_folder, args.subject)

    # Configure SimNIBS related loggers
    configure_external_loggers(['simnibs', 'mesh_io', 'sim_struct', 'opt_struct'], logger)

    logger.debug(f"Base output directory: {base_output_folder}")
    logger.debug(f"Command: {' '.join(sys.argv)}")

    if n_multistart > 1:
        logger.debug(f"Running multi-start optimization with {n_multistart} runs")
    else:
        logger.debug("Running single optimization")

    # Create output folder list for each run
    output_folder_list = [
        os.path.join(base_output_folder, f"{i_opt:02d}") 
        for i_opt in range(n_multistart)
    ]

    # Log optimization start
    flex_log.log_optimization_start(
        args.subject, args.goal, args.postproc, 
        args.roi_method, n_multistart, logger
    )

    # Run multiple optimizations
    for i_opt in range(n_multistart):
        run_start_time = time.time()

        try:
            # Build optimization for this specific run
            opt = flex_config.build_optimization(args)
            opt.output_folder = output_folder_list[i_opt]
            os.makedirs(opt.output_folder, exist_ok=True)

            # Log optimization parameters (only for first run)
            if i_opt == 0:
                flex_log.log_optimization_config(args, n_multistart, logger)

            # Configure optimizer options
            flex_config.configure_optimizer_options(opt, args, logger)

            # Log run start
            cpus_to_pass = args.cpus if args.cpus is not None else None
            if n_multistart > 1:
                logger.debug(f"Starting optimization run {i_opt + 1}/{n_multistart}...")
            else:
                logger.debug("Starting optimization...")

            # Define step name for logging
            step_name = f"Optimization run {i_opt + 1}/{n_multistart}" if n_multistart > 1 else "Optimization"
            flex_log.log_optimization_step_start(step_name, logger)

            # Run optimization
            optimization_start_time = time.time()
            optim_funvalue_list[i_opt] = multi_start.run_single_optimization(
                opt, cpus_to_pass, logger
            )
            optimization_end_time = time.time()

            # Log results
            if optim_funvalue_list[i_opt] != float('inf'):
                optimization_duration = optimization_end_time - optimization_start_time
                run_duration = time.time() - run_start_time
                flex_log.log_run_details(
                    i_opt, n_multistart, opt.output_folder, opt,
                    optimization_duration, run_duration, logger
                )
                flex_log.log_optimization_step_complete(step_name, "", logger)

                # Log final electrode simulation if enabled
                run_final_sim = (
                    args.run_final_electrode_simulation and 
                    not args.skip_final_electrode_simulation
                )
                if run_final_sim:
                    flex_log.log_optimization_step_start("Final electrode simulation", logger)
                    flex_log.log_optimization_step_complete("Final electrode simulation", "", logger)
            else:
                # Optimization failed
                flex_log.log_optimization_step_failed(step_name, "See logs for details", logger)

        except Exception as exc:
            # Unexpected error
            run_duration = time.time() - run_start_time
            logger.error(f"Unexpected error in run {i_opt + 1} after {run_duration:.1f} seconds: {exc}")
            optim_funvalue_list[i_opt] = float('inf')
            flex_log.log_optimization_step_failed(
                step_name, f"Unexpected error: {type(exc).__name__}", logger
            )

    # Post-processing
    if n_multistart > 1:
        logger.debug("=" * 80)
        logger.debug("MULTI-START OPTIMIZATION POST-PROCESSING")
        logger.debug("=" * 80)

        flex_log.log_optimization_step_start("Post-processing", logger)

        # Select best solution
        best_opt_idx, valid_runs, failed_runs = multi_start.select_best_solution(
            optim_funvalue_list, n_multistart, logger
        )

        if best_opt_idx == -1:
            flex_log.log_optimization_step_failed(
                "Post-processing", "No valid optimization results found", logger
            )
            flex_log.log_optimization_complete(
                args.subject, success=False, n_multistart=n_multistart, logger=logger
            )
            return 1

        best_run_number = best_opt_idx + 1
        best_folder = output_folder_list[best_opt_idx]

        # Copy best solution
        if not multi_start.copy_best_solution(best_folder, base_output_folder, logger):
            flex_log.log_optimization_step_failed(
                "Post-processing", "Failed to copy best solution", logger
            )
            flex_log.log_optimization_complete(
                args.subject, success=False, n_multistart=n_multistart, logger=logger
            )
            return 1

        # Create summary file
        multistart_summary_file = os.path.join(base_output_folder, "multistart_optimization_summary.txt")
        try:
            multi_start.create_multistart_summary_file(
                multistart_summary_file, args, n_multistart, 
                optim_funvalue_list, best_opt_idx, valid_runs, 
                failed_runs, start_time
            )
            logger.debug(f"Multi-start summary saved to: {multistart_summary_file}")
        except Exception as e:
            logger.warning(f"Failed to create multi-start summary file: {e}")

        # Clean up
        multi_start.cleanup_temporary_directories(
            output_folder_list, n_multistart, logger
        )

        logger.debug("MULTI-START OPTIMIZATION COMPLETED SUCCESSFULLY")
        logger.debug(f"Final results available in: {base_output_folder}")

        flex_log.log_optimization_step_complete(
            "Post-processing", 
            f"{len(valid_runs)}/{n_multistart} runs successful", 
            logger
        )

        flex_log.log_optimization_complete(
            args.subject, success=True, output_path=base_output_folder,
            n_multistart=n_multistart, best_run=best_run_number, logger=logger
        )

    else:
        # Single optimization run
        if optim_funvalue_list[0] == float('inf'):
            logger.error("Single optimization run failed")
            flex_log.log_optimization_complete(
                args.subject, success=False, logger=logger
            )
            return 1
        else:
            logger.debug("SINGLE OPTIMIZATION COMPLETED SUCCESSFULLY")
            logger.debug(f"Final function value: {optim_funvalue_list[0]:.6f}")

            single_run_folder = output_folder_list[0]

            logger.debug("FINALIZING RESULTS:")
            logger.debug(f"Moving results from: {single_run_folder}")
            logger.debug(f"Moving results to: {base_output_folder}")

            # Copy results
            if not multi_start.copy_best_solution(single_run_folder, base_output_folder, logger):
                flex_log.log_optimization_complete(
                    args.subject, success=False, logger=logger
                )
                return 1

            # Create summary file
            single_summary_file = os.path.join(base_output_folder, "optimization_summary.txt")
            try:
                multi_start.create_single_optimization_summary_file(
                    single_summary_file, args, optim_funvalue_list[0], start_time
                )
                logger.debug(f"Optimization summary saved to: {single_summary_file}")
            except Exception as e:
                logger.warning(f"Failed to create optimization summary file: {e}")

            # Clean up
            time.sleep(0.1)
            logger.debug("CLEANING UP TEMPORARY DIRECTORY:")

            for attempt in range(2):
                try:
                    if os.path.exists(single_run_folder):
                        shutil.rmtree(single_run_folder)
                    logger.debug("✓ Removed temporary directory")
                    break
                except Exception as exc:
                    if attempt == 0:
                        time.sleep(0.2)
                        continue
                    else:
                        logger.warning(f"✗ Failed to remove temporary directory: {single_run_folder} - {exc}")
                        logger.warning("âš  Temporary directory could not be removed (results still valid)")

            logger.debug(f"Results available in: {base_output_folder}")

            flex_log.log_optimization_complete(
                args.subject, success=True, output_path=base_output_folder,
                n_multistart=n_multistart, logger=logger
            )

    # Log session footer
    total_duration = time.time() - start_time
    flex_log.log_session_footer(args.subject, n_multistart, total_duration, logger)

    return 0

Config (tit.opt.flex.flex_config)

Configuration and optimization setup for flex-search.

This module handles: - Argument parsing - Optimization object configuration - Electrode setup - Output directory structure

build_optimization

build_optimization(args: argparse.Namespace) -> opt_struct.TesFlexOptimization

Set up optimization object with all parameters.

Args: args: Parsed command line arguments

Returns: Configured SimNIBS optimization object

Raises: SystemExit: If required environment variables or files are missing

Source code in tit/opt/flex/flex_config.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def build_optimization(args: argparse.Namespace) -> opt_struct.TesFlexOptimization:
    """Set up optimization object with all parameters.

    Args:
        args: Parsed command line arguments

    Returns:
        Configured SimNIBS optimization object

    Raises:
        SystemExit: If required environment variables or files are missing
    """
    opt = opt_struct.TesFlexOptimization()

    # Docker-first: resolve paths via PathManager templates (centralized conventions).
    from tit.core import get_path_manager

    pm = get_path_manager()
    opt.subpath = pm.path("m2m", subject_id=args.subject)
    opt.output_folder = pm.path("flex_search_run", subject_id=args.subject, search_name=utils.roi_dirname(args))
    os.makedirs(opt.output_folder, exist_ok=True)

    # Configure goals and thresholds
    opt.goal = args.goal
    if args.goal == "focality":
        # Allow "dynamic" focality thresholds:
        # - If --thresholds is omitted (None/empty) or explicitly set to "dynamic"/"auto",
        #   do NOT set opt.threshold and let SimNIBS handle threshold adaptation.
        thr_raw = (args.thresholds or "").strip()
        if thr_raw and thr_raw.lower() not in {"dynamic", "auto"}:
            try:
                vals = [float(v) for v in thr_raw.split(",")]
            except Exception as exc:
                raise SystemExit(f"Invalid --thresholds value: {args.thresholds!r}. Expected float(s) or 'dynamic'.") from exc
            opt.threshold = vals if len(vals) > 1 else vals[0]
        if not args.non_roi_method:
            raise SystemExit("--non-roi-method required for focality goal")

    opt.e_postproc = args.postproc
    opt.open_in_gmsh = False  # Never auto-launch GUI

    # Final electrode simulation control
    opt.run_final_electrode_simulation = (
        args.run_final_electrode_simulation and
        not args.skip_final_electrode_simulation
    )

    # Detailed results control
    if hasattr(args, 'detailed_results') and args.detailed_results:
        opt.detailed_results = True

    # Skin visualization control
    if hasattr(args, 'visualize_valid_skin_region') and args.visualize_valid_skin_region:
        opt.visualize_valid_skin_region = True

    # Configure mapping
    if args.enable_mapping:
        opt.map_to_net_electrodes = True
        eeg_dir = pm.path("eeg_positions", subject_id=args.subject)
        opt.net_electrode_file = os.path.join(eeg_dir, f"{args.eeg_net}.csv")
        if not os.path.isfile(opt.net_electrode_file):
            raise SystemExit(f"EEG net file not found: {opt.net_electrode_file}")
        if hasattr(opt, "run_mapped_electrodes_simulation") and not args.disable_mapping_simulation:
            opt.run_mapped_electrodes_simulation = True
    else:
        # Initialize electrode_mapping to None when mapping is disabled
        # This prevents AttributeError in SimNIBS logging code
        opt.electrode_mapping = None

    # Configure skin visualization net file (separate from mapping)
    if hasattr(args, 'skin_visualization_net') and args.skin_visualization_net:
        opt.net_electrode_file = args.skin_visualization_net
        if not os.path.isfile(opt.net_electrode_file):
            raise SystemExit(f"Skin visualization EEG net file not found: {opt.net_electrode_file}")

    # Configure electrodes
    c_A = args.current / 1000.0  # mA → A
    electrode_shape = args.electrode_shape
    dimensions = [float(x) for x in args.dimensions.split(',')]
    thickness = args.thickness

    # Calculate effective radius from dimensions for ElectrodeArrayPair layout
    # For circular electrodes, use average of dimensions; for rectangular, use max dimension
    if electrode_shape == "ellipse":
        effective_radius = (dimensions[0] + dimensions[1]) / 4.0  # Average dimension / 2
    else:  # rectangle
        effective_radius = max(dimensions) / 2.0  # Max dimension / 2

    # Create electrode pairs for TI stimulation (2 pairs)
    electrode_pairs = []
    for _ in range(2):  # Two pairs for TI
        electrode_pair = ElectrodeArrayPair()

        # Set electrode shape and dimensions for plotting
        if electrode_shape == "ellipse":
            electrode_pair.radius = [effective_radius]
            electrode_pair.dimensions = [dimensions[0], dimensions[1]]
        else:  # rectangle
            electrode_pair.radius = [0]  # No radius for rectangular
            electrode_pair.length_x = [dimensions[0]]
            electrode_pair.length_y = [dimensions[1]]

        electrode_pair.current = [c_A, -c_A]
        electrode_pairs.append(electrode_pair)

    # Add to optimization
    opt.electrode = electrode_pairs

    # Configure ROI
    utils.configure_roi(opt, args)

    return opt

configure_optimizer_options

configure_optimizer_options(opt: opt_struct.TesFlexOptimization, args: argparse.Namespace, logger) -> None

Configure optimizer options for the optimization object.

Args: opt: SimNIBS optimization object args: Parsed command line arguments logger: Logger instance

Source code in tit/opt/flex/flex_config.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def configure_optimizer_options(
    opt: opt_struct.TesFlexOptimization,
    args: argparse.Namespace,
    logger
) -> None:
    """Configure optimizer options for the optimization object.

    Args:
        opt: SimNIBS optimization object
        args: Parsed command line arguments
        logger: Logger instance
    """
    # Check if optimizer options exist
    if not hasattr(opt, '_optimizer_options_std') or not isinstance(opt._optimizer_options_std, dict):
        logger.warning("opt._optimizer_options_std not found or not a dict, cannot configure optimizer options.")
        return

    # Apply max_iterations if provided
    if args.max_iterations is not None:
        opt._optimizer_options_std["maxiter"] = args.max_iterations
        logger.debug(f"Set max iterations to {args.max_iterations}")

    # Apply population_size if provided
    if args.population_size is not None:
        opt._optimizer_options_std["popsize"] = args.population_size
        logger.debug(f"Set population size to {args.population_size}")

    # Apply tolerance if provided
    if hasattr(args, 'tolerance') and args.tolerance is not None:
        opt._optimizer_options_std["tol"] = args.tolerance
        logger.debug(f"Set tolerance to {args.tolerance}")

    # Apply mutation if provided
    if hasattr(args, 'mutation') and args.mutation is not None:
        # Parse mutation parameter - can be single value or min,max range
        mutation_str = args.mutation.strip()
        if ',' in mutation_str:
            # Parse as [min, max] range
            try:
                mutation_parts = [float(x.strip()) for x in mutation_str.split(',')]
                if len(mutation_parts) == 2:
                    opt._optimizer_options_std["mutation"] = mutation_parts
                    logger.debug(f"Set mutation to {mutation_parts}")
                else:
                    logger.warning(f"Invalid mutation format: {mutation_str}. Expected single value or 'min,max'")
            except ValueError as e:
                logger.warning(f"Failed to parse mutation parameter '{mutation_str}': {e}")
        else:
            # Parse as single value
            try:
                mutation_val = float(mutation_str)
                opt._optimizer_options_std["mutation"] = mutation_val
                logger.debug(f"Set mutation to {mutation_val}")
            except ValueError as e:
                logger.warning(f"Failed to parse mutation parameter '{mutation_str}': {e}")

    # Apply recombination if provided
    if hasattr(args, 'recombination') and args.recombination is not None:
        opt._optimizer_options_std["recombination"] = args.recombination
        logger.debug(f"Set recombination to {args.recombination}")

parse_arguments

parse_arguments() -> argparse.Namespace

Parse command line arguments.

Returns: Parsed arguments namespace

Source code in tit/opt/flex/flex_config.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def parse_arguments() -> argparse.Namespace:
    """Parse command line arguments.

    Returns:
        Parsed arguments namespace
    """
    p = argparse.ArgumentParser(
        prog="flex-search",
        description="Optimise TI stimulation and (optionally) map final "
                    "electrodes to the nearest EEG-net nodes.",
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
    )

    # Core parameters
    p.add_argument("--subject", "-sub", required=True, help="Subject ID")
    p.add_argument("--goal", choices=["mean", "max", "focality"], required=True,
                   help="Optimization goal")
    p.add_argument("--postproc", choices=["max_TI", "dir_TI_normal", "dir_TI_tangential"],
                   required=True, help="Post-processing method")
    p.add_argument("--eeg-net", "-eeg",
                   help="CSV filename in eeg_positions (without .csv). Required when --enable-mapping is used.")
    p.add_argument("--current", type=float, required=True,
                   help="Electrode current in mA")
    p.add_argument("--electrode-shape", choices=["rect", "ellipse"], required=True,
                   help="Electrode shape (rect or ellipse)")
    p.add_argument("--dimensions", type=str, required=True,
                   help="Electrode dimensions in mm (x,y format, e.g., '8,8')")
    p.add_argument("--thickness", type=float, required=True,
                   help="Electrode thickness in mm")
    p.add_argument("--roi-method", choices=["spherical", "atlas", "subcortical"],
                   required=True, help="ROI definition method")

    # Focality-specific arguments
    p.add_argument("--thresholds",
                   help="Focality threshold(s). Provide a single value or two comma-separated values. "
                        "If omitted (or set to 'dynamic'), SimNIBS will use its dynamic thresholding/adaptation.")
    p.add_argument("--non-roi-method", choices=["everything_else", "specific"],
                   help="Non-ROI definition method (required for focality goal)")

    # Mapping (disabled by default)
    p.add_argument("--enable-mapping", action="store_true",
                   help="Map optimal electrodes to nearest EEG-net nodes")
    p.add_argument("--disable-mapping-simulation", action="store_true",
                   help="Skip extra simulation with mapped electrodes")

    # Output control
    p.add_argument("--run-final-electrode-simulation", action="store_true", default=False,
                   help="Run final simulation with optimal electrodes (default: False)")
    p.add_argument("--skip-final-electrode-simulation", action="store_true",
                   help="Skip final simulation with optimal electrodes")

    # Stability and performance arguments
    p.add_argument("--n-multistart", type=int, default=1,
                   help="Number of optimization runs (multi-start). Best result will be kept.")
    p.add_argument("--max-iterations", type=int,
                   help="Maximum optimization iterations for differential_evolution")
    p.add_argument("--population-size", type=int,
                   help="Population size for differential_evolution")
    p.add_argument("--cpus", type=int,
                   help="Number of CPU cores to utilize")

    # Differential evolution optimizer parameters
    p.add_argument("--tolerance", type=float,
                   help="Tolerance for differential_evolution convergence (tol parameter)")
    p.add_argument("--mutation", type=str,
                   help="Mutation parameter for differential_evolution (single value or 'min,max' range)")
    p.add_argument("--recombination", type=float,
                   help="Recombination parameter for differential_evolution")

    # Output control
    p.add_argument("--detailed-results", action="store_true",
                   help="Enable detailed results output (creates additional visualization and debug files)")
    p.add_argument("--visualize-valid-skin-region", action="store_true",
                   help="Create visualizations of valid skin region for electrode placement (requires --detailed-results)")
    p.add_argument("--skin-visualization-net",
                   help="EEG net CSV file to use for skin visualization (shows electrode positions on valid/invalid skin regions)")

    return p.parse_args()

Ex search (tit.opt.ex)

TI Exhaustive Search Module

A streamlined implementation for TI exhaustive search simulations.

calculate_total_combinations

calculate_total_combinations(e1_plus, e1_minus, e2_plus, e2_minus, current_ratios, all_combinations)

Calculate total number of montage combinations to be tested.

Args: e1_plus (list): E1+ electrode names e1_minus (list): E1- electrode names e2_plus (list): E2+ electrode names e2_minus (list): E2- electrode names current_ratios (list): List of (ch1_current, ch2_current) tuples all_combinations (bool): If True, test all valid electrode combinations

Returns: int: Total number of combinations to test

Source code in tit/opt/ex/logic.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def calculate_total_combinations(e1_plus, e1_minus, e2_plus, e2_minus, current_ratios, all_combinations):
    """Calculate total number of montage combinations to be tested.

    Args:
        e1_plus (list): E1+ electrode names
        e1_minus (list): E1- electrode names
        e2_plus (list): E2+ electrode names
        e2_minus (list): E2- electrode names
        current_ratios (list): List of (ch1_current, ch2_current) tuples
        all_combinations (bool): If True, test all valid electrode combinations

    Returns:
        int: Total number of combinations to test
    """
    if all_combinations:
        electrode_combinations = [(e1p, e1m, e2p, e2m) for e1p, e1m, e2p, e2m in product(e1_plus, repeat=4)
                                 if len(set([e1p, e1m, e2p, e2m])) == 4]
        return len(electrode_combinations) * len(current_ratios)
    return len(e1_plus) * len(e1_minus) * len(e2_plus) * len(e2_minus) * len(current_ratios)

create_roi_from_coordinates

create_roi_from_coordinates(subject_id: str, roi_name: str, x: float, y: float, z: float) -> Tuple[bool, str]

Create an ROI from custom coordinates.

Args: subject_id: Subject identifier roi_name: Name for the ROI file (without .csv extension) x, y, z: Coordinates in subject space (RAS)

Returns: Tuple of (success: bool, message: str)

Source code in tit/opt/ex/roi_utils.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def create_roi_from_coordinates(
    subject_id: str,
    roi_name: str,
    x: float,
    y: float,
    z: float
) -> Tuple[bool, str]:
    """
    Create an ROI from custom coordinates.

    Args:
        subject_id: Subject identifier
        roi_name: Name for the ROI file (without .csv extension)
        x, y, z: Coordinates in subject space (RAS)

    Returns:
        Tuple of (success: bool, message: str)
    """
    try:
        pm = get_path_manager()
        m2m_dir = pm.path("m2m", subject_id=subject_id)
        roi_dir = pm.path("m2m_rois", subject_id=subject_id)
        os.makedirs(roi_dir, exist_ok=True)

        # Ensure .csv extension
        if not roi_name.endswith('.csv'):
            roi_name += '.csv'

        roi_file = Path(roi_dir) / roi_name

        # Save ROI file
        ROICoordinateHelper.save_roi_to_csv([x, y, z], str(roi_file))

        # Update roi_list.txt
        _update_roi_list_file(roi_dir, roi_name)

        return True, f"ROI '{roi_name}' created successfully at ({x:.2f}, {y:.2f}, {z:.2f})"

    except Exception as e:
        return False, f"Failed to create ROI: {str(e)}"

create_roi_from_preset

create_roi_from_preset(subject_id: str, roi_name: str, preset_key: str, presets: Optional[Dict[str, Dict]] = None) -> Tuple[bool, str]

Create an ROI from a preset.

Args: subject_id: Subject identifier roi_name: Name for the ROI file (without .csv extension) preset_key: Key of the preset to use presets: Optional preset dictionary (will load if not provided)

Returns: Tuple of (success: bool, message: str)

Source code in tit/opt/ex/roi_utils.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def create_roi_from_preset(
    subject_id: str,
    roi_name: str,
    preset_key: str,
    presets: Optional[Dict[str, Dict]] = None
) -> Tuple[bool, str]:
    """
    Create an ROI from a preset.

    Args:
        subject_id: Subject identifier
        roi_name: Name for the ROI file (without .csv extension)
        preset_key: Key of the preset to use
        presets: Optional preset dictionary (will load if not provided)

    Returns:
        Tuple of (success: bool, message: str)
    """
    if presets is None:
        presets = load_roi_presets()

    if preset_key not in presets:
        return False, f"Preset '{preset_key}' not found"

    try:
        pm = get_path_manager()
        m2m_dir = pm.path("m2m", subject_id=subject_id)
        roi_dir = pm.path("m2m_rois", subject_id=subject_id)
        os.makedirs(roi_dir, exist_ok=True)

        # Ensure .csv extension
        if not roi_name.endswith('.csv'):
            roi_name += '.csv'

        roi_file = Path(roi_dir) / roi_name

        # Get MNI coordinates from preset
        mni_coords = presets[preset_key]['mni']

        # Transform to subject space
        subject_coords = ROICoordinateHelper.transform_mni_to_subject(mni_coords, m2m_dir)
        x, y, z = subject_coords[0], subject_coords[1], subject_coords[2]

        # Save ROI file
        ROICoordinateHelper.save_roi_to_csv([x, y, z], str(roi_file))

        # Update roi_list.txt
        _update_roi_list_file(roi_dir, roi_name)

        return True, f"ROI '{roi_name}' created successfully at ({x:.2f}, {y:.2f}, {z:.2f})"

    except Exception as e:
        return False, f"Failed to create ROI: {str(e)}"

delete_roi

delete_roi(subject_id: str, roi_name: str) -> Tuple[bool, str]

Delete an ROI file.

Args: subject_id: Subject identifier roi_name: Name of the ROI file to delete

Returns: Tuple of (success: bool, message: str)

Source code in tit/opt/ex/roi_utils.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def delete_roi(subject_id: str, roi_name: str) -> Tuple[bool, str]:
    """
    Delete an ROI file.

    Args:
        subject_id: Subject identifier
        roi_name: Name of the ROI file to delete

    Returns:
        Tuple of (success: bool, message: str)
    """
    try:
        pm = get_path_manager()
        m2m_dir = pm.path("m2m", subject_id=subject_id)
        roi_dir = pm.path("m2m_rois", subject_id=subject_id)

        # Ensure .csv extension
        if not roi_name.endswith('.csv'):
            roi_name += '.csv'

        roi_file = Path(roi_dir) / roi_name
        roi_list_file = Path(roi_dir) / "roi_list.txt"

        # Remove the ROI file
        if roi_file.exists():
            roi_file.unlink()

        # Update roi_list.txt
        if roi_list_file.exists():
            with open(roi_list_file, 'r') as f:
                existing_rois = [line.strip() for line in f.readlines()]

            if roi_name in existing_rois:
                existing_rois.remove(roi_name)

                with open(roi_list_file, 'w') as f:
                    for roi in existing_rois:
                        f.write(f"{roi}\n")

        return True, f"ROI '{roi_name}' deleted successfully"

    except Exception as e:
        return False, f"Failed to delete ROI: {str(e)}"

generate_current_ratios

generate_current_ratios(total_current, current_step, channel_limit)

Generate valid current ratio combinations for TI stimulation.

Args: total_current (float): Total current in milliamps current_step (float): Step size for current increments in milliamps channel_limit (float): Maximum current per channel in milliamps

Returns: tuple: (ratios, channel_limit_exceeded) - ratios: List of (ch1_current, ch2_current) tuples - channel_limit_exceeded: Boolean indicating if limit was exceeded

Source code in tit/opt/ex/logic.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def generate_current_ratios(total_current, current_step, channel_limit):
    """Generate valid current ratio combinations for TI stimulation.

    Args:
        total_current (float): Total current in milliamps
        current_step (float): Step size for current increments in milliamps
        channel_limit (float): Maximum current per channel in milliamps

    Returns:
        tuple: (ratios, channel_limit_exceeded)
            - ratios: List of (ch1_current, ch2_current) tuples
            - channel_limit_exceeded: Boolean indicating if limit was exceeded
    """
    ratios, epsilon = [], current_step * 0.01
    min_current = max(total_current - channel_limit, current_step)
    if min_current < current_step - epsilon:
        min_current = current_step
        channel_limit_exceeded = True
    else:
        channel_limit_exceeded = False

    current_ch1 = channel_limit
    while current_ch1 >= min_current - epsilon:
        current_ch2 = total_current - current_ch1
        if (current_ch1 <= channel_limit + epsilon and
            current_ch2 <= channel_limit + epsilon and
            current_ch1 >= current_step - epsilon and
            current_ch2 >= current_step - epsilon):
            ratios.append((current_ch1, current_ch2))
        current_ch1 -= current_step

    return ratios, channel_limit_exceeded

generate_montage_combinations

generate_montage_combinations(e1_plus, e1_minus, e2_plus, e2_minus, current_ratios, all_combinations)

Generate electrode montage combinations for testing.

Args: e1_plus (list): E1+ electrode names e1_minus (list): E1- electrode names e2_plus (list): E2+ electrode names e2_minus (list): E2- electrode names current_ratios (list): List of (ch1_current, ch2_current) tuples all_combinations (bool): If True, generate all valid electrode combinations

Yields: tuple: (e1_plus, e1_minus, e2_plus, e2_minus, current_ch1, current_ch2)

Source code in tit/opt/ex/logic.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def generate_montage_combinations(e1_plus, e1_minus, e2_plus, e2_minus, current_ratios, all_combinations):
    """Generate electrode montage combinations for testing.

    Args:
        e1_plus (list): E1+ electrode names
        e1_minus (list): E1- electrode names
        e2_plus (list): E2+ electrode names
        e2_minus (list): E2- electrode names
        current_ratios (list): List of (ch1_current, ch2_current) tuples
        all_combinations (bool): If True, generate all valid electrode combinations

    Yields:
        tuple: (e1_plus, e1_minus, e2_plus, e2_minus, current_ch1, current_ch2)
    """
    if all_combinations:
        electrode_combinations = [(e1p, e1m, e2p, e2m) for e1p, e1m, e2p, e2m in product(e1_plus, repeat=4)
                                 if len(set([e1p, e1m, e2p, e2m])) == 4]
        for electrode_combo in electrode_combinations:
            for current_ratio in current_ratios:
                yield (*electrode_combo, current_ratio)
    else:
        for combo in product(e1_plus, e1_minus, e2_plus, e2_minus, current_ratios):
            yield combo

get_available_rois

get_available_rois(subject_id: str) -> List[str]

Get list of available ROIs for a subject.

Source code in tit/opt/ex/roi_utils.py
40
41
42
43
44
45
46
47
48
49
def get_available_rois(subject_id: str) -> List[str]:
    """Get list of available ROIs for a subject."""
    pm = get_path_manager()
    roi_dir = pm.path_optional("m2m_rois", subject_id=subject_id) or ""

    roi_files = []
    for p in Path(roi_dir).glob("*.csv"):
        roi_files.append(p.name)

    return sorted(roi_files)

get_roi_coordinates

get_roi_coordinates(subject_id: str, roi_name: str) -> Optional[Tuple[float, float, float]]

Get coordinates for an ROI.

Args: subject_id: Subject identifier roi_name: Name of the ROI file

Returns: Tuple of (x, y, z) coordinates or None if not found

Source code in tit/opt/ex/roi_utils.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def get_roi_coordinates(subject_id: str, roi_name: str) -> Optional[Tuple[float, float, float]]:
    """
    Get coordinates for an ROI.

    Args:
        subject_id: Subject identifier
        roi_name: Name of the ROI file

    Returns:
        Tuple of (x, y, z) coordinates or None if not found
    """
    try:
        pm = get_path_manager()
        m2m_dir = pm.path("m2m", subject_id=subject_id)
        roi_dir = pm.path("m2m_rois", subject_id=subject_id)

        # Ensure .csv extension
        if not roi_name.endswith('.csv'):
            roi_name += '.csv'

        roi_file = Path(roi_dir) / roi_name

        coords = ROICoordinateHelper.load_roi_from_csv(str(roi_file))
        if coords is not None:
            return (float(coords[0]), float(coords[1]), float(coords[2]))

        return None

    except Exception:
        return None

load_roi_presets

load_roi_presets() -> Dict[str, Dict]

Load ROI presets from the roi_presets.json file.

Source code in tit/opt/ex/roi_utils.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def load_roi_presets() -> Dict[str, Dict]:
    """Load ROI presets from the roi_presets.json file."""
    # Try multiple possible locations for the presets file
    possible_paths = [
        Path(__file__).parent.parent / "roi_presets.json",  # tit/opt/roi_presets.json
        Path(__file__).parent.parent.parent / "resources" / "roi_presets.json",  # resources/roi_presets.json
    ]

    for preset_path in possible_paths:
        if preset_path.exists():
            try:
                with open(preset_path, 'r') as f:
                    data = json.load(f)
                    return data.get('regions', {})
            except (json.JSONDecodeError, KeyError):
                continue

    # Return empty dict if no presets found
    return {}

Main (tit.opt.ex.main)

Runner (tit.opt.ex.runner)