import logging
import logging.handlers
import multiprocessing as mp
import os
import pickle
import sys
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
import traceback
import copy
import numpy as np
from lbm_suite2p_python.default_ops import default_ops
from lbm_suite2p_python.db_settings import save_ops_db_settings, reconcile_detection_keys
from lbm_suite2p_python.postprocessing import (
ops_to_json,
load_ops,
dff_rolling_percentile,
zscore_trace,
apply_filters,
)
from importlib.metadata import version, PackageNotFoundError
def _call_upstream_pipeline(ops, f_reg, f_raw, f_reg_chan2, f_raw_chan2,
run_registration, stat=None):
"""Compat wrapper around upstream `suite2p.run_s2p.pipeline` that
preserves the fork's `ops`-in / `ops`-out contract.
Upstream's pipeline now takes a nested `settings` dict, requires
`save_path` positionally, and returns a 12-tuple of discrete outputs
instead of a merged ops. This helper:
1. Splits the flat fork ops into (db, settings) via db_settings helpers.
2. Resolves torch_device from ops if set.
3. Calls pipeline with the upstream signature.
4. Folds reg_outputs / detect_outputs dicts back into the flat ops
dict so downstream fork code (which reads e.g. ops['meanImg'],
ops['yrange']) keeps working.
"""
from suite2p.run_s2p import pipeline as _upstream_pipeline
from suite2p import default_settings as _us_default_settings
from lbm_suite2p_python.db_settings import ops_to_db_settings
save_path = ops.get("save_path") or ops.get("ops_path")
if save_path is None:
raise ValueError("ops must carry 'save_path' or 'ops_path' to call upstream pipeline")
save_path = str(Path(save_path).parent) if str(save_path).endswith("ops.npy") else str(save_path)
# build settings by overlaying ops-derived values onto the full upstream
# default schema. upstream frequently grows new required keys
# (upsample_meanImg, batch_size, ...) that won't exist in an ops dict
# built by the fork — falling back to upstream defaults keeps us safe
# as upstream evolves without a db_settings.py patch on every release.
_, derived = ops_to_db_settings(ops)
settings = _us_default_settings()
def _deep_merge(base, overlay):
for k, v in overlay.items():
if isinstance(v, dict) and isinstance(base.get(k), dict):
_deep_merge(base[k], v)
else:
base[k] = v
_deep_merge(settings, derived)
# resolve torch device (default cuda, fall back to cpu on allocation)
try:
import torch
dev_name = ops.get("torch_device") or settings.get("torch_device") or "cuda"
try:
device = torch.device(dev_name)
# probe a tiny allocation to validate cuda is actually usable
if device.type == "cuda":
torch.zeros(1, device=device)
except Exception:
device = torch.device("cpu")
except ImportError:
device = None
(reg_outputs, detect_outputs, stat, F, Fneu, F_chan2, Fneu_chan2,
spks, iscell, redcell, zcorr, plane_times) = _upstream_pipeline(
save_path=save_path,
f_reg=f_reg,
f_raw=f_raw,
f_reg_chan2=f_reg_chan2,
f_raw_chan2=f_raw_chan2,
run_registration=run_registration,
settings=settings,
badframes=ops.get("badframes"),
stat=stat,
device=device if device is not None else None,
)
# fold outputs back into flat ops for fork consumers
if isinstance(reg_outputs, dict):
for k, v in reg_outputs.items():
ops[k] = v
if isinstance(detect_outputs, dict):
from lbm_suite2p_python.db_settings import _ensure_diameter_array
for k, v in detect_outputs.items():
if k == "diameter" and v is not None:
ops[k] = _ensure_diameter_array(v)
elif k != "diameter":
ops[k] = v
ops["plane_times"] = plane_times
return ops
def compute_enhanced_mean_image(img, ops):
"""Compat shim for upstream's removal of `compute_enhanced_mean_image`.
Upstream suite2p replaced `compute_enhanced_mean_image(img, ops)` with
`highpass_mean_image(I, aspect=1.0)` — the ops dict is no longer read
(aspect is now the only knob, pulled from ops by the caller). Some
fork call sites pass `None` for `img` and rely on ops["meanImg"]; we
reproduce that fallback here.
"""
from suite2p.registration import highpass_mean_image
if img is None:
img = ops["meanImg"]
aspect = ops.get("aspect", 1.0) or 1.0
return highpass_mean_image(np.asarray(img).astype(np.float32), aspect=float(aspect))
def _get_version():
try:
return version("lbm_suite2p_python")
except PackageNotFoundError:
return "0.0.0"
# canonical set of frame-count alias keys that mbo_utilities writes to
# ops.npy. lbm_suite2p_python and mbo_utilities must keep these in
# lockstep — when one updates the count (e.g. dual-channel trim), the
# others must follow or downstream readers see contradictory values.
_FRAME_COUNT_ALIASES = (
"nframes",
"num_frames",
"num_timepoints",
"n_frames",
"T",
"nt",
"timepoints",
)
def _set_frame_count_aliases(ops: dict, n: int) -> None:
"""Set every frame-count alias in `ops` to `n`.
Centralizes the fan-out so adding a new alias only requires
updating `_FRAME_COUNT_ALIASES`. Channel-specific keys
(`nframes_chan1`, `nframes_chan2`) are NOT touched here — those
have their own semantics and must be set explicitly by the caller.
"""
n = int(n)
for key in _FRAME_COUNT_ALIASES:
ops[key] = n
# detection outputs produced by suite2p in the plane dir. copied into a
# new plane_dir alongside ops.npy so figure regeneration works when the
# user reruns against a different save_path.
_DETECTION_OUTPUT_FILES = (
"stat.npy", "iscell.npy", "F.npy", "Fneu.npy",
"spks.npy", "norm_traces.npy", "redcell.npy", "zcorr.npy",
"reg_outputs.npy", "detect_outputs.npy",
"F_chan2.npy", "Fneu_chan2.npy",
"roi_stats.npy",
)
def _resolve_source_plane_dir(input_arr, input_path, target_plane, source_plane_num=None):
"""Find the source directory for a specific plane.
Raises instead of silently falling back to ``input_path.parent`` when
a match can't be found uniquely. Previously a missed lookup caused
plane 1's files to be copied into plane N's output dir (see the
'Copying data.bin from zplane01 -> zplane02' bug).
Parameters
----------
input_arr : lazy array or None
Lazy array from ``mbo.imread`` — expected to carry a ``filenames``
attribute for volumetric Suite2p arrays.
input_path : Path
Fallback path (used only for single-plane inputs).
target_plane : int
1-based plane number we're trying to process.
source_plane_num : int, optional
1-based source-plane identity when the loaded array represents a
plane subset (e.g. every-other). Takes precedence over
``target_plane`` for the zplane-name match.
Returns
-------
Path
Source plane directory.
"""
search_plane = source_plane_num if source_plane_num is not None else target_plane
if input_arr is not None and hasattr(input_arr, "filenames") and input_arr.filenames:
matches = []
for fname in input_arr.filenames:
fpath = Path(fname)
if f"zplane{search_plane:02d}" in fpath.parent.name:
matches.append(fpath.parent)
if len(matches) == 1:
return matches[0]
if len(matches) > 1:
raise RuntimeError(
f"Ambiguous source directory for plane {search_plane}: "
f"multiple matches in input_arr.filenames: {matches}. "
"Narrow the input to a single plane or clean up the source tree."
)
# zero matches — but if the input only contains a single plane
# whose name doesn't carry a zplane number (e.g. processed
# single-plane), and that's the plane we're looking for, fall
# back to that single filename's parent.
if len(input_arr.filenames) == 1:
return Path(input_arr.filenames[0]).parent
raise FileNotFoundError(
f"No source directory for plane {search_plane} in input_arr. "
f"Searched filenames: {[str(p) for p in input_arr.filenames]}. "
"The per-plane lookup scans for 'zplane{NN:02d}' in each "
"filename's parent dir name."
)
# no lazy array — input was a direct path. only safe when the input
# itself is already a plane dir.
return input_path.parent
def _copy_if_needed(src: Path, dst: Path, *, overwrite_empty: bool = True) -> bool:
"""Copy src -> dst when dst is missing (or empty, if overwrite_empty).
Returns True when a copy actually happened. Never overwrites a
non-empty destination.
"""
import shutil
if not src.exists():
return False
if dst.exists():
if not (overwrite_empty and dst.stat().st_size == 0):
return False
shutil.copy2(src, dst)
return True
def _stage_source_into_plane_dir(
src_dir: Path,
plane_dir: Path,
ops: dict,
) -> None:
"""Stage files from a source plane dir into ``plane_dir``.
copies ops.npy and detection outputs into ``plane_dir`` so figures
and post-processing read locally. binaries (data.bin, data_raw.bin,
data_chan2.bin) are NOT copied — ops['raw_file'] / ops['reg_file'] /
ops['chan2_file'] are repointed at the source paths so the new
output dir stays small and can be copied around without dragging
the binary along. source binaries are read-only from this plane's
perspective.
``ops`` is mutated in place to point at the source binaries.
"""
src_dir = Path(src_dir)
plane_dir = Path(plane_dir)
plane_dir.mkdir(parents=True, exist_ok=True)
print(f" Staging plane outputs: {src_dir.name} -> {plane_dir.name}")
_copy_if_needed(src_dir / "ops.npy", plane_dir / "ops.npy")
for fname in _DETECTION_OUTPUT_FILES:
_copy_if_needed(src_dir / fname, plane_dir / fname)
for ops_key, fname in (
("raw_file", "data_raw.bin"),
("reg_file", "data.bin"),
("chan2_file", "data_chan2.bin"),
):
src_bin = src_dir / fname
if src_bin.exists():
ops[ops_key] = str(src_bin.resolve())
def _apply_reactive_metadata(
ops: dict,
source_metadata: dict | None,
source_shape: tuple | None = None,
frame_indices: list[int] | None = None,
plane_indices: list[int] | None = None,
logger=None,
) -> None:
"""Reactively scale fs/dz in `ops` from a source metadata dict and
the user's stride selections, in place.
Uses `mbo_utilities.metadata.OutputMetadata` so the scaling rules
stay aligned with the writer side. Without this, every-other-
timepoint produces ops.npy with raw source `fs` (factor of 2
wrong), and every-other-plane produces ops.npy with raw source `dz`.
Parameters
----------
ops : dict
ops dict to mutate. Existing keys are NOT overwritten unless
the reactive value differs (so user-set explicit overrides win
over the reactive computation).
source_metadata : dict | None
Source acquisition metadata (must contain `fs`, `dz` etc. for
scaling to work). If None, function is a no-op.
source_shape : tuple | None
Source shape tuple (T, C, Z, Y, X). If None, OutputMetadata
falls back to whatever it can infer.
frame_indices : list[int] | None
0-based timepoint indices the user selected. The implicit
stride scales `fs`. None means no T-stride scaling.
plane_indices : list[int] | None
0-based z-plane indices the user selected (FULL list, not just
the current plane). The implicit stride scales `dz`. None
means no Z-stride scaling.
logger : optional
If provided, a missing `fs` produces a `.warning(...)` call.
"""
if not source_metadata:
return
try:
from mbo_utilities.metadata import OutputMetadata, get_param
except ImportError:
# mbo_utilities not available — caller's choice to handle this
return
if logger is not None and get_param(source_metadata, "fs") is None:
logger.warning(
"Source metadata has no 'fs' field. Frame rate in ops.npy "
"may not reflect the acquisition rate. Set fs via the "
"metadata editor or fix the source TIFF metadata."
)
selections: dict = {}
if frame_indices is not None:
selections["T"] = list(frame_indices)
if plane_indices is not None:
selections["Z"] = list(plane_indices)
if not selections:
return
# OutputMetadata's accessors crash on `source_shape=None`. Default
# to an empty tuple — the reactive scaling for fs/dz only needs the
# selections, not the absolute shape.
out_meta = OutputMetadata(
source=dict(source_metadata),
source_shape=source_shape if source_shape is not None else (),
source_dims=("T", "C", "Z", "Y", "X"),
selections=selections,
)
scaled = out_meta.to_dict()
# Split-responsibility scaling:
# - `fs` is owned by mbo's writer (`_imwrite_base` builds its own
# OutputMetadata from `frames=` and scales fs there). If we
# also scaled fs here, the writer would re-scale our already-
# scaled value and end up with fs/4 instead of fs/2.
# - `dz` (and dx/dy) are owned by lbm because mbo's writer only
# sees the per-plane data, not the FULL plane selection list,
# so it cannot compute the implicit z-stride.
# We also propagate `_metadata_provenance` so the next pass (a
# re-run of this ops through OutputMetadata) reads the stamped
# base+stride and doesn't double-scale dz.
if plane_indices is not None:
for key in ("dz", "dx", "dy", "z_step",
"umPerPixZ", "umPerPixX", "umPerPixY",
"_metadata_provenance"):
if key in scaled and scaled[key] is not None:
ops[key] = scaled[key]
from lbm_suite2p_python.zplane import (
plot_zplane_figures,
plot_filtered_cells,
plot_filter_exclusions,
plot_cell_filter_summary,
plot_volume_accepted_rejected_overlay,
)
DEFAULT_CELL_FILTERS = []
from mbo_utilities.log import get as get_logger
from mbo_utilities.metadata import get_voxel_size
logger = get_logger("run_lsp")
_external_logging_attached = False
def _attach_external_loggers(level: int = logging.INFO) -> None:
"""Surface suite2p / cellpose progress messages on stdout.
Mainline suite2p (`suite2p.detection.anatomical`,
`suite2p.registration.*`) and cellpose (`cellpose.models`) use plain
`logging.getLogger(__name__)` loggers with no handlers attached. Their
`logger.info(...)` calls — registration progress, ">>>> CELLPOSE finding
masks", median-diameter reports — propagate to root and get silently
dropped by the default WARNING-level lastResort handler. Without this
hookup the user can't tell whether detection is running or what params
cellpose actually picked. Idempotent, called once per pipeline entry.
"""
global _external_logging_attached
if _external_logging_attached:
return
fmt = logging.Formatter("%(name)s: %(message)s")
for name in ("suite2p", "cellpose"):
lg = logging.getLogger(name)
if lg.level == logging.NOTSET or lg.level > level:
lg.setLevel(level)
has_stream = any(
isinstance(h, logging.StreamHandler) and not isinstance(h, logging.FileHandler)
for h in lg.handlers
)
if not has_stream:
h = logging.StreamHandler()
h.setFormatter(fmt)
h.setLevel(level)
lg.addHandler(h)
lg.propagate = False
_external_logging_attached = True
def _is_valid_torch_checkpoint(path) -> bool:
"""Cheap integrity check for a cellpose/torch checkpoint (a zip archive).
A download interrupted mid-write leaves a truncated file with no zip
end-of-central-directory record; is_zipfile catches that case.
"""
import zipfile
try:
return zipfile.is_zipfile(path)
except OSError:
return False
def _prewarm_cellpose_model(ops) -> None:
"""Download the cellpose model once, in the parent, before workers fan out.
cellpose's cache_CPSAM_model_path downloads to a temp file then renames
with no cross-process lock. Multiple workers hitting an empty cache at once
race: one wins the rename, the rest fail (Windows WinError 32/183) or read a
half-written file (PytorchStreamReader miniz error). Warming here serializes
the download so workers only ever read a complete file. A corrupt leftover
from a prior failed run is removed and re-downloaded.
"""
if not (ops.get("roidetect", True) and ops.get("anatomical_only", 0) > 0):
return
try:
from cellpose import models as cp_models
except Exception:
return # cellpose absent; detection falls back to functional mode
cached = Path(cp_models.MODEL_DIR) / "cpsam"
if cached.exists() and not _is_valid_torch_checkpoint(cached):
print(f"Cellpose model cache corrupt, re-downloading: {cached}")
try:
cached.unlink()
except OSError:
pass
try:
cp_models.cache_CPSAM_model_path()
except Exception as exc:
print(
f"Warning: could not pre-download cellpose model ({exc}); "
"workers may race on first download"
)
from lbm_suite2p_python.volume import (
plot_volume_diagnostics,
plot_orthoslices,
plot_3d_roi_map,
plot_3d_rastermap_clusters,
plot_volume_trace_figures,
get_volume_stats,
)
PIPELINE_TAGS = ("plane", "roi", "z", "plane_", "roi_", "z_")
from lbm_suite2p_python.utils import _is_lazy_array, _get_num_planes
def _get_suite2p_version():
"""Get suite2p version string."""
try:
return version("suite2p")
except PackageNotFoundError:
return "not installed"
def _resolve_gpu_env() -> None:
"""Honor MBO_GPU -> CUDA_VISIBLE_DEVICES before torch/cupy/cellpose init.
Lets ``MBO_GPU=0 lsp ...`` (or programmatic use) force CPU across suite2p
and cellpose without per-call device args; ``MBO_GPU=N`` pins a device.
No-op when MBO_GPU is unset. Entry points call this first so the env is
set before torch initializes CUDA, and before workers are spawned (they
inherit it).
"""
raw = os.environ.get("MBO_GPU")
if raw is None:
return
try:
from mbo_utilities.gpu import apply_gpu_policy
apply_gpu_policy(raw)
return
except Exception:
pass
token = raw.strip().lower()
if token in ("0", "off", "false", "no", "cpu", "none"):
os.environ["CUDA_VISIBLE_DEVICES"] = ""
elif token and token.replace(",", "").isdigit():
os.environ["CUDA_VISIBLE_DEVICES"] = token
def _apply_thread_limits(threads_per_worker: int | None) -> None:
"""Cap BLAS / OMP / numba / torch thread counts per process.
Sets env vars so any child process spawned later inherits the cap,
and calls torch.set_num_threads for the current process (where the
BLAS env vars may have been read at import time and cannot be
changed without threadpoolctl). No-op when value is None or <= 0.
"""
if not threads_per_worker or threads_per_worker <= 0:
return
n = str(int(threads_per_worker))
for var in (
"OMP_NUM_THREADS",
"MKL_NUM_THREADS",
"OPENBLAS_NUM_THREADS",
"NUMEXPR_NUM_THREADS",
"NUMBA_NUM_THREADS",
):
os.environ[var] = n
try:
import torch
torch.set_num_threads(int(threads_per_worker))
except Exception:
pass
def _add_processing_step(
ops, step_name, input_files=None, duration_seconds=None, extra=None
):
"""
Add a processing step to ops["processing_history"].
Each step is appended to the history list, preserving previous runs.
This allows tracking of re-runs and incremental processing.
Parameters
----------
ops : dict
The ops dictionary to update.
step_name : str
Name of the processing step (e.g., "binary_write", "registration", "detection").
input_files : list of str, optional
List of input file paths for this step.
duration_seconds : float, optional
How long this step took.
extra : dict, optional
Additional metadata for this step.
Returns
-------
dict
The updated ops dictionary.
"""
if "processing_history" not in ops:
ops["processing_history"] = []
step_record = {
"step": step_name,
"timestamp": datetime.now().isoformat(),
"lbm_suite2p_python_version": _get_version(),
"suite2p_version": _get_suite2p_version(),
}
if input_files is not None:
step_record["input_files"] = (
[str(f) for f in input_files]
if not isinstance(input_files, str)
else [input_files]
)
if duration_seconds is not None:
step_record["duration_seconds"] = round(duration_seconds, 2)
if extra is not None:
step_record.update(extra)
ops["processing_history"].append(step_record)
return ops
add_processing_step = _add_processing_step
def _extract_rastermap_section(rastermap_kwargs, section):
"""Pull the sub-dict for "planar" or "volumetric" from a unified rastermap_kwargs.
Returns None when the section was not requested. Returns an empty dict
(meaning "use defaults") when the key is present with a None / empty value.
"""
if not rastermap_kwargs or section not in rastermap_kwargs:
return None
sub = rastermap_kwargs[section]
return {} if sub is None else dict(sub)
def _extract_planar_rastermap_kwargs(rastermap_kwargs):
return _extract_rastermap_section(rastermap_kwargs, "planar")
def _extract_volumetric_rastermap_kwargs(rastermap_kwargs):
return _extract_rastermap_section(rastermap_kwargs, "volumetric")
def _resolve_input_source(input_arr, input_data):
"""Return a picklable source spec for parallel workers to imread().
Workers re-open the data themselves rather than receiving a pickled
lazy array (which may not survive spawn). Preference order:
1. Original input_data if it's a str/Path/list of paths.
2. input_arr.filenames for mbo lazy arrays backed by files.
Raises ValueError when neither is available.
"""
if isinstance(input_data, (str, Path)):
return str(input_data)
if isinstance(input_data, (list, tuple)) and all(
isinstance(p, (str, Path)) for p in input_data
):
return [str(p) for p in input_data]
if input_arr is not None and hasattr(input_arr, "filenames"):
fns = list(input_arr.filenames) if input_arr.filenames else []
if fns:
return [str(p) for p in fns]
raise ValueError(
"parallel mode (workers != 1) requires a path-based input. "
"Pass a file path, list of paths, or a lazy array loaded from disk, "
"or set workers=1 to process in-memory arrays sequentially."
)
def _resolve_worker_count(workers, num_planes):
"""Resolve `workers` kwarg into a concrete worker count.
workers=1: sequential (caller short-circuits before calling this).
workers in (None, <=0): auto = min(num_planes, cpu_count//2, 8).
workers>1: clamp to num_planes.
"""
if workers is None or (isinstance(workers, int) and workers <= 0):
cpu = os.cpu_count() or 1
return max(1, min(num_planes, cpu // 2 or 1, 8))
return max(1, min(int(workers), num_planes))
class _QueueStreamRedirect:
"""File-like that forwards writes to a logger so print()/tqdm output
from a worker process flows through the multiprocessing log queue."""
def __init__(self, logger_, level=logging.INFO):
self._logger = logger_
self._level = level
self._buf = ""
def write(self, msg):
if not msg:
return
self._buf += msg
while "\n" in self._buf:
line, _, self._buf = self._buf.partition("\n")
if line:
self._logger.log(self._level, line)
def flush(self):
if self._buf:
self._logger.log(self._level, self._buf)
self._buf = ""
def isatty(self):
return False
def _plane_worker(input_source, current_ops, save_path, run_plane_kwargs,
reader_kwargs, log_queue):
"""Process one zplane in a child process. Returns (plane_num, ops_path).
Module-level so it pickles for ProcessPoolExecutor. The fully-prepared
`current_ops` is built in the main process; this worker only re-opens
the input via imread() and calls run_plane().
"""
plane_num = current_ops.get("plane")
root = logging.getLogger()
root.handlers.clear()
root.addHandler(logging.handlers.QueueHandler(log_queue))
root.setLevel(logging.INFO)
# suite2p / cellpose loggers set propagate=False, so route them directly.
for name in ("suite2p", "cellpose"):
lg = logging.getLogger(name)
lg.handlers.clear()
lg.addHandler(logging.handlers.QueueHandler(log_queue))
lg.setLevel(logging.INFO)
lg.propagate = False
global _external_logging_attached
_external_logging_attached = True
plane_logger = logging.getLogger(f"plane{plane_num:02d}")
plane_logger.setLevel(logging.INFO)
_orig_stdout, _orig_stderr = sys.stdout, sys.stderr
sys.stdout = _QueueStreamRedirect(plane_logger, logging.INFO)
sys.stderr = _QueueStreamRedirect(plane_logger, logging.INFO)
try:
from mbo_utilities import imread
arr = imread(input_source, **(reader_kwargs or {}))
ops_path = run_plane(
input_data=arr,
save_path=save_path,
ops=current_ops,
**run_plane_kwargs,
)
return plane_num, ops_path
finally:
try:
sys.stdout.flush()
sys.stderr.flush()
except Exception:
pass
sys.stdout, sys.stderr = _orig_stdout, _orig_stderr
def _prepare_plane_ops(*, base_ops, plane_idx, num_planes, input_arr,
volume_voxel, volume_source_meta, volume_source_shape,
z_selection, frame_indices):
"""Build a per-plane ops dict from base ops.
Centralizes the per-plane prep so both sequential and parallel paths
apply identical voxel-size propagation and reactive metadata. Mirrors
the inline prep that previously lived in run_volume's loop.
"""
plane_num = plane_idx + 1
current_ops = copy.deepcopy(load_ops(base_ops)) if base_ops else default_ops()
reconcile_detection_keys(current_ops)
current_ops["plane"] = plane_num
current_ops["num_zplanes"] = num_planes
if input_arr is not None and hasattr(input_arr, "metadata"):
_source_idx = (input_arr.metadata or {}).get("selected_planes_0based")
if _source_idx is not None and plane_idx < len(_source_idx):
current_ops["source_plane_num"] = int(_source_idx[plane_idx]) + 1
if volume_voxel is not None:
if volume_voxel.dz is not None:
current_ops.setdefault("dz", volume_voxel.dz)
current_ops.setdefault("z_step", volume_voxel.dz)
if volume_voxel.dx != 1.0:
current_ops.setdefault("dx", volume_voxel.dx)
if volume_voxel.dy != 1.0:
current_ops.setdefault("dy", volume_voxel.dy)
_apply_reactive_metadata(
ops=current_ops,
source_metadata=volume_source_meta,
source_shape=volume_source_shape,
frame_indices=frame_indices,
plane_indices=z_selection,
logger=logger,
)
return current_ops
[docs]
def pipeline(
input_data,
save_path: str | Path = None,
ops: dict = None,
db: dict | None = None,
settings: dict | None = None,
planes: list | int = None,
roi_mode: int = None,
keep_reg: bool = True,
keep_raw: bool = False,
force_reg: bool = False,
force_detect: bool = False,
num_timepoints: int = None,
frame_indices: list | None = None,
dff_window_size: int = None,
dff_percentile: int = 20,
dff_smooth_window: int = None,
norm_method: str = "zscore",
correct_neuropil: bool = True,
cell_filters: list = None,
accept_all_cells: bool = False,
rastermap_kwargs: dict = None,
save_json: bool = False,
reader_kwargs: dict = None,
writer_kwargs: dict = None,
workers: int | None = 1,
skip_volumetric: bool = False,
threads_per_worker: int | None = None,
# deprecated parameters
roi: int = None,
num_frames: int = None,
**kwargs,
) -> list[Path]:
"""
Unified Suite2p processing pipeline.
Wrapper around run_volume (for 4D data) and run_plane (for 3D data).
Automatically detects input type and delegates processing.
Parameters
----------
input_data : str, Path, list, or lazy array
Input data source. Can be a file path, directory, list of files,
or a lazy array from mbo_utilities.
save_path : str or Path, optional
Output directory for results. If None, saves next to input file.
Required when input_data is an array without filenames.
ops : dict, optional
Flat Suite2p parameter overrides (the fork's ops shape). If None,
uses suite2p defaults with metadata auto-populated from input.
Detection is selectable either way and kept consistent: suite2p's
``"algorithm"`` ("cellpose" / "sparsery" / "sourcery") or the fork's
``"anatomical_only"`` (0=functional, 1=max_proj/mean, 2=mean,
4=max_proj). Setting one fills in the other; invalid or legacy
values (e.g. ``anatomical_only=3``) are warned and mapped to what
suite2p accepts.
db, settings : dict, optional
suite2p's nested ``db`` / ``settings`` dicts (the new upstream
shape). Flattened and merged into ``ops``; explicit ``ops`` keys win.
planes : int or list, optional
Which z-planes to process (1-indexed). None processes all planes.
roi_mode : int, optional
ROI handling for multi-ROI ScanImage data:
- None: stitch all ROIs into single FOV (default)
- 0: process each ROI separately
- N > 0: process only ROI N (1-indexed)
keep_reg : bool, default True
Keep registered binary (data.bin) after processing.
keep_raw : bool, default False
Keep raw binary (data_raw.bin) after processing.
force_reg : bool, default False
Force re-registration even if already complete.
force_detect : bool, default False
Force ROI detection even if stat.npy exists.
num_timepoints : int, optional
Limit processing to first N frames (truncation only). For an
explicit set of frames or a strided selection, use
``frame_indices`` instead.
frame_indices : list[int], optional
Explicit 0-based frame indices to process. Supports stride
(e.g. ``list(range(0, 1574, 2))`` for every other frame).
When provided, the implicit stride is used by `OutputMetadata`
to reactively scale `fs` (e.g. stride of 2 → `fs / 2` in the
output ops.npy). Takes precedence over ``num_timepoints``.
dff_window_size : int, optional
Window size for rolling percentile dF/F baseline (frames).
If None, auto-calculated as ~10 * tau * fs.
dff_percentile : int, default 20
Percentile for baseline F0 estimation.
dff_smooth_window : int, optional
Temporal smoothing window for dF/F traces (frames).
If None, auto-calculated. Set to 1 to disable.
norm_method : str, default "zscore"
Normalization for the saved norm_traces.npy. "dff" uses the rolling
percentile ΔF/F (dff_window_size / dff_percentile / dff_smooth_window
apply); "zscore" uses per-ROI (F - mean) / std. Quality metrics
(SNR / skew / shot-noise) always use ΔF/F regardless of this setting.
correct_neuropil : bool, default True
If True, the norm trace is computed on F - 0.7 * Fneu. If False, on
raw F. Affects norm_traces.npy, the trace plots, and trace-quality
scoring.
cell_filters : list, optional
Filters to apply to detected ROIs. Default is no filters (off).
Currently supports diameter bounds in microns or pixels.
Example: [{"name": "max_diameter", "min_diameter_um": 4, "max_diameter_um": 35}]
accept_all_cells : bool, default False
If True, mark all detected ROIs as accepted, overriding
Suite2p's built-in classifier results.
rastermap_kwargs : dict, optional
Rastermap configuration. Default None (rastermap disabled).
Pass a dict with optional ``"planar"`` and ``"volumetric"`` keys
to enable each independently — presence of the key turns it on.
Each sub-dict holds ``rastermap.Rastermap()`` overrides; an empty
dict (or None) means use built-in defaults.
Example::
rastermap_kwargs={
"planar": {"n_clusters": 50, "n_PCs": 64},
"volumetric": {"n_clusters": 40},
}
save_json : bool, default False
Save ops as JSON in addition to .npy format.
reader_kwargs : dict, optional
Arguments passed to mbo_utilities.imread().
writer_kwargs : dict, optional
Arguments passed to binary writer (e.g., output_format).
workers : int or None, default 1
Number of zplane worker processes for volumetric input. ``1``
keeps the sequential code path. ``None`` (or any value <= 0)
auto-picks ``min(num_planes, cpu_count // 2, 8)``. Parallel mode
requires a path-based input; per-plane outputs go to disjoint
subdirectories. Cellpose on GPU may OOM with multiple workers —
reduce ``workers`` or switch cellpose to CPU when this happens.
Ignored for planar (single-plane) inputs.
skip_volumetric : bool, default False
When True, return per-plane ops_files immediately after the
per-plane loop, skipping merge_mrois, volume_stats, and
volumetric plots. Useful when farming planes across machines
and aggregating later.
plane_name : str, optional
Name for output directory when input is an array without
filenames. Passed via kwargs.
roi : int, optional
Deprecated. Use roi_mode instead.
num_frames : int, optional
Deprecated. Use num_timepoints instead.
**kwargs
Additional arguments passed to run_plane or run_volume.
Returns
-------
list[Path]
Paths to ops.npy files for each processed plane.
Examples
--------
>>> import lbm_suite2p_python as lsp
>>> ops = {"anatomical_only": 4} # max-projection
>>> results = lsp.pipeline("D:/data/volume.zarr", planes=[1, 5, 10], ops=ops)
>>> # process array, accept all ROIs without filtering
>>> results = lsp.pipeline(
... arr, save_path="D:/results", plane_name="plane0",
... accept_all_cells=True, cell_filters=[], ops=ops
... )
See Also
--------
run_plane : lower-level single-plane processing
run_volume : process list of files
"""
from mbo_utilities import imread
from mbo_utilities.arrays import supports_roi
_attach_external_loggers()
_resolve_gpu_env()
_apply_thread_limits(threads_per_worker)
# 1. Handle Deprecations
if roi is not None:
import warnings
warnings.warn(
"'roi' is deprecated, use 'roi_mode'", DeprecationWarning, stacklevel=2
)
roi_mode = roi
if num_frames is not None:
import warnings
warnings.warn(
"'num_frames' is deprecated, use 'num_timepoints'",
DeprecationWarning,
stacklevel=2,
)
num_timepoints = num_frames
# flatten (db, settings) into ops so downstream run_volume / run_plane
# don't each need to forward the pair. explicit ops keys still win.
if db is not None or settings is not None:
from lbm_suite2p_python.db_settings import merge_db_settings_into_ops
ops = merge_db_settings_into_ops(ops if isinstance(ops, dict) else None, db, settings)
reader_kwargs = reader_kwargs or {}
writer_kwargs = writer_kwargs or {}
if num_timepoints is not None:
writer_kwargs["num_frames"] = num_timepoints
# 1-based frame numbers.
if frame_indices is not None:
writer_kwargs["frames"] = [int(i) + 1 for i in frame_indices]
# don't double-pass num_frames; len(frame_indices) is implicit
writer_kwargs.pop("num_frames", None)
# Always load array to check dimensions and ensure downstream functions have the array shape
# If input is already array, this is fast. If path or list of paths, it loads lazy array.
if _is_lazy_array(input_data):
arr = input_data
else:
print(f"Loading input to determine dimensions...")
arr = imread(input_data, **reader_kwargs)
# Apply ROI mode if applicable check dimensions
if roi_mode is not None and supports_roi(arr):
arr.roi = roi_mode
# check if data is volumetric (multiple Z planes)
# mbo_utilities arrays expose nz; legacy arrays use num_planes or shape inference
is_volumetric = _get_num_planes(arr) > 1
# 3. Delegate
if is_volumetric:
print("Delegating to run_volume (volumetric input detected)...")
if arr is not None:
input_arg = arr
else:
input_arg = input_data
return run_volume(
input_data=input_arg,
save_path=save_path,
ops=ops,
planes=planes,
keep_reg=keep_reg,
keep_raw=keep_raw,
force_reg=force_reg,
force_detect=force_detect,
frame_indices=frame_indices,
dff_window_size=dff_window_size,
dff_percentile=dff_percentile,
dff_smooth_window=dff_smooth_window,
norm_method=norm_method,
correct_neuropil=correct_neuropil,
accept_all_cells=accept_all_cells,
cell_filters=cell_filters,
rastermap_kwargs=rastermap_kwargs,
save_json=save_json,
reader_kwargs=reader_kwargs,
writer_kwargs=writer_kwargs,
workers=workers,
skip_volumetric=skip_volumetric,
threads_per_worker=threads_per_worker,
**kwargs,
)
else:
if workers != 1:
logger.warning(
"workers=%r ignored: input is planar (single zplane), no parallelism applicable",
workers,
)
# run_plane is planar-only — extract just the planar sub-dict
planar_kwargs = _extract_planar_rastermap_kwargs(rastermap_kwargs)
# run_plane returns a single Path, we wrap in list
ops_path = run_plane(
input_data=arr, # Pass the array we loaded (with ROI applied)
save_path=save_path,
ops=ops,
# planes argument is ignored for single plane, or used to validate?
# run_plane infers using 'plane' in ops/metadata.
# If user passed 'planes', we should check if they asked for something valid?
# For 3D input, 'planes' argument implies iterating? But it's 3D...
# We'll rely on run_plane to extract metadata.
keep_reg=keep_reg,
keep_raw=keep_raw,
force_reg=force_reg,
force_detect=force_detect,
frame_indices=frame_indices,
dff_window_size=dff_window_size,
dff_percentile=dff_percentile,
dff_smooth_window=dff_smooth_window,
norm_method=norm_method,
correct_neuropil=correct_neuropil,
accept_all_cells=accept_all_cells,
cell_filters=cell_filters,
rastermap_kwargs=planar_kwargs,
save_json=save_json,
reader_kwargs=reader_kwargs,
writer_kwargs=writer_kwargs,
**kwargs,
)
return [ops_path]
def derive_tag_from_filename(path):
"""
Derive a folder tag from a filename based on “planeN”, “roiN”, or "tagN" patterns.
Parameters
----------
path : str or pathlib.Path
File path or name whose stem will be parsed.
Returns
-------
str
If the stem starts with “plane”, “roi”, or “res” followed by an integer,
returns that tag plus the integer (e.g. “plane3”, “roi7”, “res2”).
Otherwise returns the original stem unchanged.
Examples
--------
>>> derive_tag_from_filename("plane_01.tif")
'plane1'
>>> derive_tag_from_filename("plane2.bin")
'plane2'
>>> derive_tag_from_filename("roi5.raw")
'roi5'
>>> derive_tag_from_filename("ROI_10.dat")
'roi10'
>>> derive_tag_from_filename("res-3.h5")
'res3'
>>> derive_tag_from_filename("assembled_data_1.tiff")
'assembled_data_1'
>>> derive_tag_from_filename("file_12.tif")
'file_12'
"""
name = Path(path).stem
for tag in PIPELINE_TAGS:
low = name.lower()
if low.startswith(tag):
suffix = name[len(tag) :]
if suffix and (suffix[0] in ("_", "-")):
suffix = suffix[1:]
if suffix.isdigit():
return f"{tag}{int(suffix)}"
return name
def get_plane_num_from_tag(tag: str, fallback: int = None) -> int:
"""
Extract the plane number from a tag string like "plane3" or "roi7".
Parameters
----------
tag : str
A tag string (e.g., "plane3", "roi7", "z10") typically from derive_tag_from_filename.
fallback : int, optional
Value to return if no number can be extracted from the tag.
Returns
-------
int
The extracted plane number, or the fallback value if extraction fails.
Examples
--------
>>> get_plane_num_from_tag("plane3")
3
>>> get_plane_num_from_tag("roi7")
7
>>> get_plane_num_from_tag("z10")
10
>>> get_plane_num_from_tag("assembled_data", fallback=0)
0
"""
import re
match = re.search(r"(\d+)$", tag)
if match:
return int(match.group(1))
return fallback
def generate_plane_dirname(
plane: int,
nframes: int = None,
frame_start: int = 1,
frame_stop: int = None,
suffix: str = None,
) -> str:
"""
generate a descriptive directory name for a plane's outputs.
uses mbo_utilities tag conventions for consistent, self-documenting names.
format: zplaneNN[_tpSTART-STOP][_suffix]
parameters
----------
plane : int
z-plane number (1-based)
nframes : int, optional
total number of frames. if provided and > 1, adds timepoint range.
frame_start : int, default 1
first frame (1-based)
frame_stop : int, optional
last frame (1-based). defaults to nframes if not provided.
suffix : str, optional
additional suffix (e.g., "stitched", "roi1")
returns
-------
str
directory name like "zplane01", "zplane03_tp00001-05000", etc.
examples
--------
>>> generate_plane_dirname(3)
'zplane03'
>>> generate_plane_dirname(3, nframes=5000)
'zplane03_tp00001-05000'
>>> generate_plane_dirname(3, nframes=5000, suffix="stitched")
'zplane03_tp00001-05000_stitched'
>>> generate_plane_dirname(1, frame_start=100, frame_stop=500)
'zplane01_tp00100-00500'
"""
from mbo_utilities.arrays.features._dim_tags import TAG_REGISTRY, DimensionTag
parts = []
# z-plane tag (always present)
z_def = TAG_REGISTRY["Z"]
z_tag = DimensionTag(z_def, start=plane, stop=None, step=1)
parts.append(z_tag.to_string())
# timepoint tag (if multiple frames)
if nframes is not None and nframes > 1:
t_def = TAG_REGISTRY["T"]
stop = frame_stop if frame_stop is not None else nframes
t_tag = DimensionTag(t_def, start=frame_start, stop=stop, step=1)
parts.append(t_tag.to_string())
# optional suffix
if suffix:
parts.append(suffix)
return "_".join(parts)
[docs]
def run_volume(
input_data,
save_path: str | Path = None,
ops: dict | str | Path = None,
planes: list | int = None,
keep_reg: bool = True,
keep_raw: bool = False,
force_reg: bool = False,
force_detect: bool = False,
frame_indices: list | None = None,
dff_window_size: int = None,
dff_percentile: int = 20,
dff_smooth_window: int = None,
norm_method: str = "zscore",
correct_neuropil: bool = True,
accept_all_cells: bool = False,
cell_filters: list = None,
rastermap_kwargs: dict = None,
save_json: bool = False,
reader_kwargs: dict = None,
writer_kwargs: dict = None,
workers: int | None = 1,
skip_volumetric: bool = False,
threads_per_worker: int | None = None,
**kwargs,
):
"""
Processes a full volumetric imaging dataset using Suite2p.
Iterates over 3D planes (z-stacks) within the volume, calling run_plane() for each.
Aggregates results into volume_stats.npy and generates volumetric plots.
Parameters
----------
input_data : list, Path, or lazy array
Input data source.
- List of paths: [plane1.tif, plane2.tif, ...]
- Lazy array: 4D array (Time, Z, Y, X)
save_path : str or Path, optional
Base directory to save outputs.
ops : dict, optional
Suite2p parameters.
planes : list or int, optional
Specific planes to process (1-based index).
keep_reg : bool, default True
Keep registered binaries.
keep_raw : bool, default False
Keep raw binaries.
force_reg : bool, default False
Force re-registration.
force_detect : bool, default False
Force detection.
frame_indices : list, default None
List of frame indices to process.
dff_window_size, dff_percentile, dff_smooth_window : optional
dF/F calculation parameters (used when norm_method="dff").
norm_method : str, default "zscore"
Normalization for norm_traces.npy: "dff" or "zscore". See pipeline().
accept_all_cells : bool, default False
Mark all ROIs as accepted.
cell_filters : list, optional
Filters to apply (see run_plane).
rastermap_kwargs : dict, optional
Unified rastermap config. Default None (off). Keys ``"planar"`` /
``"volumetric"`` independently enable each mode; their values hold
rastermap.Rastermap() overrides (empty dict / None = defaults).
See pipeline() for the full schema.
save_json : bool, default False
Save ops as JSON.
workers : int or None, default 1
Number of zplane worker processes. ``1`` (default) keeps the
sequential path. ``None`` or any value <= 0 auto-picks
``min(num_planes, cpu_count // 2, 8)``. Values > 1 run that many
workers (clamped to num_planes). Parallel mode requires a
path-based input. Workers reload input via ``imread`` and write
to disjoint per-plane subdirectories.
Caveat: cellpose on GPU may OOM with multiple workers; reduce
``workers`` or switch cellpose to CPU when this happens.
skip_volumetric : bool, default False
When True, return per-plane ops_files immediately after the
per-plane loop, skipping merge_mrois, volume_stats, and all
volumetric plots. Useful when farming planes across machines.
**kwargs
Additional args passed to run_plane.
Returns
-------
list[Path]
List of paths to ops.npy files for processed planes.
"""
from mbo_utilities.arrays import _normalize_planes
from mbo_utilities import imread
from lbm_suite2p_python.merging import merge_mrois
_resolve_gpu_env()
_apply_thread_limits(threads_per_worker)
# Handle input data
input_arr = None
input_paths = []
if _is_lazy_array(input_data):
input_arr = input_data
if hasattr(input_arr, "filenames"):
# For lazy arrays backed by files, use the first file's parent as default save_path
if save_path is None and input_arr.filenames:
save_path = Path(input_arr.filenames[0]).parent / "suite2p_results"
elif isinstance(input_data, (list, tuple)):
input_paths = [Path(p) for p in input_data]
if save_path is None and input_paths:
save_path = input_paths[0].parent
# load as array to get actual plane count (files may contain interleaved planes)
input_arr = imread(input_paths, **(reader_kwargs or {}))
elif isinstance(input_data, (str, Path)):
# Single path representing a volume (e.g. 4D tiff, zarr)
# We'll load it as an array to iterate planes
input_path = Path(input_data)
if save_path is None:
save_path = input_path.parent / (input_path.stem + "_results")
# Load as array to determine planes
input_arr = imread(input_path, **(reader_kwargs or {}))
else:
raise TypeError(f"Invalid input_data type: {type(input_data)}")
if save_path is None:
raise ValueError("save_path must be specified.")
save_path = Path(save_path)
save_path.mkdir(parents=True, exist_ok=True)
# Determine num_planes
if input_arr is not None:
try:
num_planes = _get_num_planes(input_arr)
except:
num_planes = 1
else:
num_planes = len(input_paths)
# Extract voxel size from input array metadata for propagation to per-plane ops
_volume_voxel = None
if input_arr is not None and hasattr(input_arr, "metadata"):
_volume_voxel = get_voxel_size(input_arr.metadata)
# Normalize planes to process
planes_indices = _normalize_planes(planes, num_planes)
# Build the FULL Z selection (0-based) — we pass this to every
# run_plane call so its OutputMetadata layer can compute the
# implicit z-stride and reactively scale dz. Without the full list
# in scope, each per-plane ops.npy would carry the unscaled source
# dz even though only every Nth plane was being processed.
_volume_z_selection = list(planes_indices) if planes_indices else None
# Snapshot the source acquisition metadata once — used by the
# reactive helper inside the per-plane loop to scale dz/fs.
_volume_source_meta = (
dict(input_arr.metadata)
if input_arr is not None and hasattr(input_arr, "metadata")
else None
)
_volume_source_shape = (
tuple(input_arr._shape5d())
if input_arr is not None and hasattr(input_arr, "_shape5d")
else None
)
_volume_start = time.time()
print(
f"Processing {len(planes_indices)} planes in volume (Total planes: {num_planes})"
)
print(f"Output: {save_path}")
progress_callback = kwargs.pop("progress_callback", None)
# decompose unified rastermap_kwargs into planar (per-plane) and
# volumetric (post-loop). Each is None when the user did not enable
# that mode, an empty dict when they enabled it without overrides.
planar_rastermap_kwargs = _extract_planar_rastermap_kwargs(rastermap_kwargs)
volumetric_rastermap_kwargs = _extract_volumetric_rastermap_kwargs(rastermap_kwargs)
ops_files = []
# shared run_plane kwargs (identical for sequential and parallel paths)
_run_plane_kwargs = dict(
keep_reg=keep_reg,
keep_raw=keep_raw,
force_reg=force_reg,
force_detect=force_detect,
frame_indices=frame_indices,
dff_window_size=dff_window_size,
dff_percentile=dff_percentile,
dff_smooth_window=dff_smooth_window,
norm_method=norm_method,
correct_neuropil=correct_neuropil,
accept_all_cells=accept_all_cells,
cell_filters=cell_filters,
rastermap_kwargs=planar_rastermap_kwargs,
save_json=save_json,
reader_kwargs=reader_kwargs,
writer_kwargs=writer_kwargs,
**kwargs,
)
run_parallel = workers != 1
if run_parallel:
n_workers = _resolve_worker_count(workers, len(planes_indices))
if not run_parallel:
# Iterate sequentially (original behavior)
for i, plane_idx in enumerate(planes_indices):
plane_num = plane_idx + 1
if progress_callback:
progress_callback(
plane=i,
total_planes=len(planes_indices),
step="plane_start",
message=f"Plane {plane_num}",
)
# Prepare input for run_plane
if input_arr is not None:
# Pass the whole array, run_plane handles extraction via ops['plane']
current_input = input_arr
else:
# List of files - map plane_num to file index (assuming 1-to-1 if no explicit mapping)
# If input_paths corresponds to ALL planes, then plane_idx indexes into it
if plane_idx < len(input_paths):
current_input = input_paths[plane_idx]
else:
# Fallback or error? Assuming input_files length matches num_planes
current_input = input_paths[0] # Should not happen if logic is correct
current_ops = _prepare_plane_ops(
base_ops=ops,
plane_idx=plane_idx,
num_planes=num_planes,
input_arr=input_arr,
volume_voxel=_volume_voxel,
volume_source_meta=_volume_source_meta,
volume_source_shape=_volume_source_shape,
z_selection=_volume_z_selection,
frame_indices=frame_indices,
)
# Call run_plane
try:
print(f"\n--- Volume Step: Plane {plane_num} ---")
_plane_start = time.time()
ops_file = run_plane(
input_data=current_input,
save_path=save_path,
ops=current_ops,
**_run_plane_kwargs,
)
ops_files.append(ops_file)
print(
f"--- Plane {plane_num} elapsed: {time.time() - _plane_start:.1f}s ---"
)
if progress_callback:
progress_callback(
plane=i,
total_planes=len(planes_indices),
step="plane_done",
message=f"Plane {plane_num} complete",
)
except Exception as e:
print(f"ERROR processing plane {plane_num}: {e}")
traceback.print_exc()
else:
# Parallel: resolve a picklable input source, pre-build all per-plane
# ops in the main process, then run planes in a ProcessPoolExecutor.
try:
input_source = _resolve_input_source(input_arr, input_data)
except ValueError:
raise
prepared = [] # list of (plane_num, current_ops)
for plane_idx in planes_indices:
current_ops = _prepare_plane_ops(
base_ops=ops,
plane_idx=plane_idx,
num_planes=num_planes,
input_arr=input_arr,
volume_voxel=_volume_voxel,
volume_source_meta=_volume_source_meta,
volume_source_shape=_volume_source_shape,
z_selection=_volume_z_selection,
frame_indices=frame_indices,
)
prepared.append((plane_idx + 1, current_ops))
# Eager picklability check on the first plane's ops so users see
# a clear error at submission rather than a swallowed future.
try:
pickle.dumps(prepared[0][1])
pickle.dumps(_run_plane_kwargs)
except Exception as exc:
raise RuntimeError(
f"parallel mode cannot pickle the prepared per-plane ops or "
f"run_plane kwargs ({exc!r}). Use workers=1 or remove the "
f"unpicklable item."
) from exc
print(
f"Running {len(planes_indices)} planes across {n_workers} worker process(es)..."
)
# Download the cellpose model once before fanning out; concurrent
# first-time downloads race on the shared cache file. Use a prepared
# per-plane ops (already reconciled) so algorithm="cellpose" without
# anatomical_only still triggers the warm-up.
_prewarm_cellpose_model(prepared[0][1] if prepared else ops)
manager = mp.Manager()
log_queue = manager.Queue()
stdout_handler = logging.StreamHandler()
stdout_handler.setFormatter(logging.Formatter("%(name)s: %(message)s"))
listener = logging.handlers.QueueListener(
log_queue, stdout_handler, respect_handler_level=False
)
listener.start()
if progress_callback:
for i, (plane_num, _) in enumerate(prepared):
progress_callback(
plane=i,
total_planes=len(planes_indices),
step="plane_start",
message=f"Plane {plane_num}",
)
try:
with ProcessPoolExecutor(max_workers=n_workers) as pool:
future_to_plane = {}
plane_start_times = {}
for plane_num, current_ops in prepared:
fut = pool.submit(
_plane_worker,
input_source,
current_ops,
save_path,
_run_plane_kwargs,
reader_kwargs,
log_queue,
)
future_to_plane[fut] = plane_num
plane_start_times[plane_num] = time.time()
done_count = 0
for fut in as_completed(future_to_plane):
plane_num = future_to_plane[fut]
try:
result_plane_num, ops_file = fut.result()
ops_files.append((result_plane_num, ops_file))
done_count += 1
print(
f"--- Plane {result_plane_num} elapsed: "
f"{time.time() - plane_start_times[plane_num]:.1f}s ---"
)
if progress_callback:
progress_callback(
plane=done_count - 1,
total_planes=len(planes_indices),
step="plane_done",
message=f"Plane {result_plane_num} complete",
)
except Exception as e:
print(f"ERROR processing plane {plane_num}: {e}")
traceback.print_exc()
finally:
listener.stop()
# Sort results back into z-order — volumetric stats/plots rely on
# list order matching plane order.
ops_files = [p for _, p in sorted(ops_files, key=lambda t: t[0])]
if not ops_files:
raise RuntimeError(
"run_volume failed: All planes resulted in exceptions during processing."
)
if skip_volumetric:
logger.info(
"skip_volumetric=True; returning per-plane ops_files without aggregation"
)
return ops_files
# Post-Loop: Merging and Volume Stats
# Check for multi-ROI merging using metadata (not filename heuristics)
should_merge = False
if input_arr is not None and hasattr(input_arr, "metadata"):
md = input_arr.metadata
roi_mode = md.get("roi_mode")
num_rois = md.get("num_rois") or md.get("num_mrois") or 1
# only merge if roi_mode is "separate" (files written as separate rois)
if roi_mode == "separate" and num_rois > 1:
should_merge = True
if should_merge:
print("Detected mROI data, attempting to merge...")
merged_savepath = save_path / "merged_mrois"
try:
merge_mrois(save_path, merged_savepath)
# Update ops_files to point to merged results
# We need to find the new ops files
# mbo_utilities check_and_merge_mrois returned list, but we don't have it.
# We'll glob the new directory
merged_ops = sorted(list(merged_savepath.glob("**/ops.npy")))
if merged_ops:
ops_files = merged_ops
print(f"Merged {len(ops_files)} planes to {merged_savepath}")
save_path = merged_savepath # Update save_path for subsequent plots
except Exception as e:
print(f"Merging failed: {e}")
# Generate volume statistics
if ops_files:
print("\nGenering volumetric statistics...")
try:
# aggregate per-plane ops into volume_stats.npy
get_volume_stats(ops_files, overwrite=True)
# Volumetric plots
try:
plot_volume_diagnostics(
ops_files, save_path / "volume_quality_diagnostics.png"
)
plot_orthoslices(ops_files, save_path / "orthoslices.png")
plot_3d_roi_map(ops_files, save_path / "roi_map_3d.png", color_by="snr")
plot_3d_roi_map(
ops_files, save_path / "roi_map_3d_plane.png", color_by="plane"
)
plot_volume_accepted_rejected_overlay(
ops_files, save_path / "volume_segmentation_overlay.png"
)
except Exception as e:
print(f"Warning: Volume plots failed: {e}")
traceback.print_exc()
if volumetric_rastermap_kwargs is not None:
try:
plot_3d_rastermap_clusters(
save_path,
save_path=save_path / "rastermap_3d.png",
rastermap_kwargs=volumetric_rastermap_kwargs,
)
except Exception as e:
print(f"Warning: Volumetric rastermap failed: {e}")
traceback.print_exc()
# volume-level trace figures (trace analysis, top-N raw/dF/F,
# sorted-activity rastermap.png). runs after plot_3d_rastermap_clusters
# so the cached rastermap_model.npy can be reused for the heatmap.
try:
plot_volume_trace_figures(
ops_files,
save_path,
rastermap_kwargs=volumetric_rastermap_kwargs,
correct_neuropil=correct_neuropil,
norm_method=norm_method,
)
except Exception as e:
print(f"Warning: Volume trace figures failed: {e}")
traceback.print_exc()
except Exception as e:
print(f"Warning: Volume statistics failed: {e}")
traceback.print_exc()
_volume_elapsed = time.time() - _volume_start
_h, _rem = divmod(int(_volume_elapsed), 3600)
_m, _s = divmod(_rem, 60)
print(
f"\nVolume total elapsed: {_h:02d}:{_m:02d}:{_s:02d} "
f"({_volume_elapsed:.1f}s) across {len(ops_files)} plane(s)"
)
return ops_files
def _should_write_bin(
ops_path: Path,
force: bool = False,
*,
validate_chan2: bool | None = None,
expected_dtype: np.dtype = np.int16,
expected_nframes: int | None = None,
expected_ly: int | None = None,
expected_lx: int | None = None,
) -> bool:
if force:
return True
ops_path = Path(ops_path)
if not ops_path.is_file():
return True
raw_path = ops_path.parent / "data_raw.bin"
reg_path = ops_path.parent / "data.bin"
chan2_path = ops_path.parent / "data_chan2.bin"
# If neither raw nor registered binary exists, need to write
if not raw_path.is_file() and not reg_path.is_file():
return True
# Use whichever binary exists for validation (prefer raw)
binary_to_validate = raw_path if raw_path.is_file() else reg_path
try:
ops = np.load(ops_path, allow_pickle=True).item()
if validate_chan2 is None:
validate_chan2 = ops.get("align_by_chan", 1) == 2
Ly = ops.get("Ly")
Lx = ops.get("Lx")
nframes_raw = (
ops.get("nframes_chan1") or ops.get("nframes") or ops.get("num_frames")
)
if (None in (nframes_raw, Ly, Lx)) or (nframes_raw <= 0 or Ly <= 0 or Lx <= 0):
return True
# invalidate cache if requested dimensions differ from cached
if expected_nframes is not None and int(expected_nframes) != int(nframes_raw):
print(
f"Cache invalidated: cached nframes={nframes_raw}, requested={expected_nframes}"
)
return True
if expected_ly is not None and int(expected_ly) != int(Ly):
print(f"Cache invalidated: cached Ly={Ly}, requested={expected_ly}")
return True
if expected_lx is not None and int(expected_lx) != int(Lx):
print(f"Cache invalidated: cached Lx={Lx}, requested={expected_lx}")
return True
expected_size_raw = (
int(nframes_raw) * int(Ly) * int(Lx) * np.dtype(expected_dtype).itemsize
)
actual_size_raw = binary_to_validate.stat().st_size
if actual_size_raw != expected_size_raw or actual_size_raw == 0:
return True
try:
arr = np.memmap(
binary_to_validate,
dtype=expected_dtype,
mode="r",
shape=(int(nframes_raw), int(Ly), int(Lx)),
)
_ = arr[0, 0, 0]
del arr
except Exception:
return True
if validate_chan2:
nframes_chan2 = ops.get("nframes_chan2")
if (
(not chan2_path.is_file())
or (nframes_chan2 is None)
or (nframes_chan2 <= 0)
):
return True
expected_size_chan2 = (
int(nframes_chan2)
* int(Ly)
* int(Lx)
* np.dtype(expected_dtype).itemsize
)
actual_size_chan2 = chan2_path.stat().st_size
if actual_size_chan2 != expected_size_chan2 or actual_size_chan2 == 0:
return True
try:
arr2 = np.memmap(
chan2_path,
dtype=expected_dtype,
mode="r",
shape=(int(nframes_chan2), int(Ly), int(Lx)),
)
_ = arr2[0, 0, 0]
del arr2
except Exception:
return True
return False
except Exception as e:
print(f"Bin validation failed for {ops_path.parent}: {e}")
return True
def _should_register(ops_path: str | Path) -> bool:
"""
Determine whether Suite2p registration still needs to be performed.
Returns True if registration *should* be run, False otherwise.
Decision rule:
1. If the plane's data.bin (suite2p's registered output) is missing,
registration has not run yet on this plane — return True
unconditionally. ops.npy field state is not trusted here because
write_ops merges the source array's metadata into a freshly-written
ops.npy, and source metadata can carry suite2p-output-shaped keys
(e.g. when imread of a directory pulls in a sibling ops.npy) that
falsely advertise "registration done" on what is actually raw data.
2. With data.bin present, fall back to ops.npy field inspection
(refImg / meanImg / xoff / yoff / regDX / regPC) to decide whether
the previous registration is complete enough to reuse.
"""
ops_path = Path(ops_path)
reg_bin = ops_path.parent / "data.bin"
if not reg_bin.exists():
return True
ops = load_ops(ops_path)
has_ref = isinstance(ops.get("refImg"), np.ndarray)
has_mean = isinstance(ops.get("meanImg"), np.ndarray)
# Check for valid offsets - ensure they are actual arrays, not _NoValue or other sentinels
def _has_valid_offsets(key):
val = ops.get(key)
if val is None or not isinstance(val, np.ndarray):
return False
try:
return np.any(np.isfinite(val))
except (TypeError, ValueError):
return False
has_offsets = _has_valid_offsets("xoff") or _has_valid_offsets("yoff")
has_metrics = any(
isinstance(ops.get(k), np.ndarray) and ops[k].size > 0
for k in ("regDX", "regPC", "regPC1", "regDX1")
)
# registration done if any of these are true
registration_done = has_ref or has_mean or has_offsets or has_metrics
return not registration_done
def run_plane_bin(ops) -> bool:
"""
Run Suite2p pipeline on pre-written binary files.
Executes registration, cell detection, and signal extraction on binary
data referenced in ops. Requires data_raw.bin and ops.npy to exist.
Parameters
----------
ops : dict or str or Path
Suite2p ops dictionary or path to ops.npy file.
Returns
-------
bool
True if pipeline completed successfully.
"""
from contextlib import nullcontext
from suite2p.io.binary import BinaryFile
ops = load_ops(ops)
# fixup stale absolute paths when data has been moved to a new location.
# ops_path is the ground truth; derive all other paths from its parent.
ops_path_stored = Path(ops.get("ops_path", ""))
if ops_path_stored.name == "ops.npy" and ops_path_stored.parent.exists():
ops_parent = ops_path_stored.parent
elif "save_path" in ops and Path(ops["save_path"]).exists():
ops_parent = Path(ops["save_path"])
ops["ops_path"] = str(ops_parent / "ops.npy")
else:
raise FileNotFoundError(
f"Cannot locate plane directory. ops_path={ops.get('ops_path')}, "
f"save_path={ops.get('save_path')}"
)
# resolve path keys. rewrite stale absolutes (ops was moved) but
# preserve an already-valid absolute path (staging points raw_file/
# reg_file/chan2_file at the source dir — overwriting those would
# break the link).
ops["save_path"] = str(ops_parent)
ops["ops_path"] = str(ops_parent / "ops.npy")
def _repoint_if_stale(ops_key: str, default_name: str) -> None:
current = ops.get(ops_key)
if not current:
return
current_path = Path(current)
if current_path.exists():
return # live pointer (local or linked source) — keep it
local = ops_parent / current_path.name
if local.exists():
ops[ops_key] = str(local)
return
fallback = ops_parent / default_name
if fallback.exists():
ops[ops_key] = str(fallback)
return
# no live path available — clear the stale pointer so downstream
# `if raw_file is None` / `reg_file.exists()` checks fire cleanly
# instead of marching a bad path into suite2p.
ops.pop(ops_key, None)
_repoint_if_stale("raw_file", "data_raw.bin")
_repoint_if_stale("chan2_file", "data_chan2.bin")
_repoint_if_stale("reg_file", "data.bin")
# ensure image arrays are numpy ndarrays (can become lists after dict merges)
for key in ("meanImg", "meanImgE", "refImg", "max_proj", "Vcorr"):
val = ops.get(key)
if val is not None and not isinstance(val, np.ndarray):
ops[key] = np.array(val)
# Get Ly and Lx with helpful error message if missing
Ly = ops.get("Ly")
Lx = ops.get("Lx")
if Ly is None or Lx is None:
raise KeyError(
f"Missing required dimension keys in ops: Ly={Ly}, Lx={Lx}. "
f"Ensure the binary was written with proper metadata. "
f"Available keys: {list(ops.keys())}"
)
Ly, Lx = int(Ly), int(Lx)
# run_registration governs whether we actually need data_raw.bin.
# compute it here (up from the block that previously defined it) so
# the raw_file check below can be relaxed when registration is off.
run_registration = bool(ops.get("do_registration", True))
run_detection = bool(ops.get("roidetect", True))
raw_file = ops.get("raw_file")
n_func = ops.get("nframes_chan1") or ops.get("nframes") or ops.get("n_frames")
if n_func is None:
raise KeyError("Missing nframes_chan1 / nframes / n_frames in ops")
# Graceful fallback for "reload a completed plane" workflow:
# - keep_raw=False (default) deletes data_raw.bin after a run.
# - When the user reloads that plane_dir and clicks Run again, the
# GUI hands us an ops where raw_file is missing but reg_file (the
# registered data.bin) is present. Re-registration needs raw data,
# but detection/extraction can still run against data.bin.
# - Instead of crashing, downgrade to detection-only and log why.
if run_registration and raw_file is None:
_existing_reg = ops.get("reg_file")
if _existing_reg and Path(_existing_reg).exists():
print(
"NOTE: raw_file missing but reg_file exists — downgrading "
"do_registration=1 → 0 (detection/extraction only against "
f"{Path(_existing_reg).name}). Pass keep_raw=True on the "
"first run if you want to re-register a reloaded plane."
)
ops["do_registration"] = 0
run_registration = False
else:
raise KeyError(
"Missing raw_file in ops — required when do_registration=1. "
"Set do_registration=0 to run detection-only against an "
"existing data.bin."
)
n_func = int(n_func)
# reg_file may already point at a linked source binary; only pin it
# to plane_dir/data.bin when ops has no pointer yet.
existing_reg = ops.get("reg_file")
if existing_reg and Path(existing_reg).exists():
reg_file = Path(existing_reg)
else:
reg_file = ops_parent / "data.bin"
ops["reg_file"] = str(reg_file)
chan2_file = ops.get("chan2_file", "")
use_chan2 = bool(chan2_file) and Path(chan2_file).exists()
n_chan2 = int(ops.get("nframes_chan2", 0)) if use_chan2 else 0
n_align = n_func if not use_chan2 else min(n_func, n_chan2)
if n_align <= 0:
raise ValueError("Non-positive frame count after alignment selection.")
if use_chan2 and (n_func != n_chan2):
print(
f"[run_plane_bin] Trimming to {n_align} frames (func={n_func}, chan2={n_chan2})."
)
ops["functional_chan"] = 1
ops["align_by_chan"] = 2 if use_chan2 else 1
ops["nchannels"] = 2 if use_chan2 else 1
# Fan out the trimmed frame count to every alias mbo_utilities also
# writes (num_timepoints, num_frames, n_frames, T, nt, timepoints).
# Without this, dual-channel trimming leaves nframes=n_align but the
# other aliases stuck at the chan1 source value, and downstream
# readers get inconsistent answers from the same ops dict.
_set_frame_count_aliases(ops, n_align)
ops["nframes_chan1"] = n_align
if use_chan2:
ops["nframes_chan2"] = n_align
if "diameter" in ops:
# save user's input diameter for provenance — cellpose later overwrites
# ops["diameter"] with the estimated median from detection.
# do not default/coerce here: db_settings._ensure_diameter_list
# converts to [dy, dx] at the upstream-pipeline boundary, and mainline
# suite2p / cellpose handle None / scalar / list / aspect-pair natively.
ops["diameter_user"] = ops["diameter"]
# reset detection-derived parameters when re-running registration or detection
# so compute_enhanced_mean_image() reinitializes them from diameter.
# prevents inheriting spatscale_pix=0 from a failed run, which causes
# meanImgE to be computed with a [1,1] filter (all 0.5 output).
# (run_registration / run_detection were resolved earlier, near the
# raw_file check — keep them in scope here.)
if run_registration:
# full reset: clear both registration and detection intermediates.
# registration outputs (badframes/xoff/yoff/corrXY) are cleared too —
# otherwise a prior divergent run leaves badframes=True for most
# frames and detection silently excludes them on the rerun even
# though the new registration succeeded.
for key in [
"spatscale_pix", "Vcorr", "Vmax", "Vmap", "Vsplit", "ihop",
"badframes", "xoff", "yoff", "corrXY",
"xoff1", "yoff1", "corrXY1",
]:
if key in ops:
del ops[key]
elif run_detection:
# detection only: clear detection intermediates but keep registration outputs
for key in ["spatscale_pix", "Vmax", "Vmap", "Vsplit", "ihop"]:
if key in ops:
del ops[key]
reg_file_chan2 = ops_parent / "data_chan2_reg.bin" if use_chan2 else None
# NOTE: previous versions hard-coded `ops["anatomical_red"] = False`
# and `ops["chan2_thres"] = 0.1` here. Both were silently overwriting
# the user's settings and the suite2p schema defaults
# (detection.chan2_threshold = 0.25). They've been removed — the
# user's chan2 threshold now flows through unchanged. If a downstream
# consumer needs anatomical_red / chan2_thres, set them explicitly
# in the caller's ops dict.
if ops.get("roidetect", True) and ops.get("anatomical_only", 0) > 0:
# Estimate memory usage for Cellpose detection
estimated_gb = (Ly * Lx * n_align * 2) / 1e9 # Rough estimate
spatial_scale = ops.get("spatial_scale", 0)
if spatial_scale > 0:
estimated_gb /= spatial_scale**2
if estimated_gb > 50: # Warn for datasets > 50GB
print(
f"Large dataset warning: {estimated_gb:.1f} GB estimated for detection"
)
if spatial_scale == 0:
print(
f" Consider adding 'spatial_scale': 2 to reduce memory usage by 4x"
)
print(f" Or reduce 'batch_size' (current: {ops.get('batch_size', 500)})")
# When skipping registration, make sure data.bin is available and
# synthesize the reg-output fields detection expects. The registered
# binary may already exist locally, be linked to a source dir, or
# need to be materialized from data_raw.bin.
if not run_registration:
import shutil
reg_file_path = Path(reg_file)
raw_file_path = Path(raw_file) if raw_file else None
# materialize data.bin from data_raw.bin only when necessary
if raw_file_path and raw_file_path.exists() and (
not reg_file_path.exists() or reg_file_path.stat().st_size == 0
):
print(f" Materializing {reg_file_path.name} from {raw_file_path.name}")
shutil.copy2(raw_file_path, reg_file_path)
if not reg_file_path.exists():
raise FileNotFoundError(
f"Registration was skipped but no usable registered binary is "
f"available. reg_file={reg_file_path}, raw_file={raw_file}. "
"Either turn registration back on, or point reg_file at an "
"existing data.bin."
)
if "yrange" not in ops or "xrange" not in ops:
use_anatomical = ops.get("anatomical_only", 0) > 0
if use_anatomical:
print(
" Using full image dimensions for anatomical detection (avoids cropping issues)"
)
ops["yrange"] = [0, Ly]
ops["xrange"] = [0, Lx]
else:
# prefer data_raw.bin for detection of valid region (it's
# closer to acquisition), fall back to data.bin.
detect_from = (
raw_file_path
if raw_file_path and raw_file_path.exists()
else reg_file_path
)
print(f" Detecting valid region from {detect_from.name}...")
with BinaryFile(Ly=Ly, Lx=Lx, filename=str(detect_from)) as f:
meanImg_full = f.sampled_mean().astype(np.float32)
threshold = meanImg_full.max() * 0.01
valid_mask = meanImg_full > threshold
valid_rows = np.any(valid_mask, axis=1)
valid_cols = np.any(valid_mask, axis=0)
if valid_rows.sum() > 0 and valid_cols.sum() > 0:
y_indices = np.where(valid_rows)[0]
x_indices = np.where(valid_cols)[0]
yrange = [int(y_indices[0]), int(y_indices[-1] + 1)]
xrange = [int(x_indices[0]), int(x_indices[-1] + 1)]
else:
yrange = [0, Ly]
xrange = [0, Lx]
ops["yrange"] = yrange
ops["xrange"] = xrange
print(f" Valid region: yrange={yrange}, xrange={xrange}")
# registration outputs that detection expects
if "badframes" not in ops:
ops["badframes"] = np.zeros(n_align, dtype=bool)
if "xoff" not in ops:
ops["xoff"] = np.zeros(n_align, dtype=np.float32)
if "yoff" not in ops:
ops["yoff"] = np.zeros(n_align, dtype=np.float32)
if "corrXY" not in ops:
ops["corrXY"] = np.ones(n_align, dtype=np.float32)
# Also materialize channel 2 registered binary if source chan2 exists
if use_chan2:
chan2_path = Path(chan2_file)
reg_chan2_path = Path(reg_file_chan2)
if chan2_path.exists():
if not reg_chan2_path.exists() or reg_chan2_path.stat().st_size == 0:
print(f" Copying {chan2_path.name} -> {reg_chan2_path.name}")
shutil.copy2(chan2_path, reg_chan2_path)
else:
print(f" {reg_chan2_path.name} already exists, skipping copy")
# ensure summary images exist even when registration was skipped.
# meanImg is needed by detection; meanImgE is only computed during
# registration so we derive it here when missing.
if not run_registration:
if "meanImg" not in ops:
with BinaryFile(
Ly=Ly, Lx=Lx, filename=str(reg_file), n_frames=n_align
) as f_mean:
ops["meanImg"] = f_mean.sampled_mean().astype(np.float32)
print(" Computed meanImg from binary")
if "meanImgE" not in ops and "meanImg" in ops:
ops["meanImgE"] = compute_enhanced_mean_image(
ops["meanImg"].astype(np.float32), ops
)
print(" Computed meanImgE from meanImg")
# for very short recordings, suite2p's bin_movie crashes when
# bin_size > nframes (produces empty array). pre-compute meanImg so
# detection_wrapper skips the failing reduction, and clamp tau so
# bin_size <= nframes.
tau_fs = np.round(ops.get("tau", 0.7) * ops.get("fs", 30))
if n_align < max(tau_fs, 2):
with BinaryFile(
Ly=Ly, Lx=Lx, filename=str(reg_file), n_frames=n_align
) as f_pre:
all_frames = f_pre.data[:n_align]
ops["meanImg"] = all_frames.mean(axis=0).astype(np.float32)
ops["max_proj"] = all_frames.max(axis=0).astype(np.float32)
# set tau so bin_size = max(1, n//nbinned, round(tau*fs)) <= nframes
ops["tau"] = 0
# upstream's BinaryFile defaults to write=False (read-only) and errors on
# missing files. reg_file / reg_file_chan2 are registration DESTINATIONS
# — they may not exist yet on a fresh run — so we pass write=True to pick
# up the "r+ if exists else w+" behavior the fork used to rely on.
# raw_file / chan2_file are input sources (already produced by mbo's
# imwrite or upstream copy) and stay read-only.
# f_raw is only needed when registration runs; open it conditionally
# so detection-only runs don't require data_raw.bin at all.
_open_raw = (
raw_file is not None
and Path(raw_file).exists()
and run_registration
)
with (
BinaryFile(Ly=Ly, Lx=Lx, filename=str(reg_file), n_frames=n_align, write=True) as f_reg,
(
BinaryFile(Ly=Ly, Lx=Lx, filename=str(raw_file), n_frames=n_align)
if _open_raw
else nullcontext()
) as f_raw,
(
BinaryFile(Ly=Ly, Lx=Lx, filename=str(reg_file_chan2), n_frames=n_align, write=True)
if use_chan2
else nullcontext()
) as f_reg_chan2,
(
BinaryFile(Ly=Ly, Lx=Lx, filename=str(chan2_file), n_frames=n_align)
if use_chan2
else nullcontext()
) as f_raw_chan2,
):
ops = _call_upstream_pipeline(
ops,
f_reg, f_raw,
f_reg_chan2 if use_chan2 else None,
f_raw_chan2 if use_chan2 else None,
run_registration=run_registration,
stat=None,
)
# ensure meanImgE is always present in final ops (safety net)
if "meanImgE" not in ops and "meanImg" in ops:
ops["meanImgE"] = compute_enhanced_mean_image(
ops["meanImg"].astype(np.float32), ops
)
if use_chan2:
ops["reg_file_chan2"] = str(reg_file_chan2)
save_ops_db_settings(ops["ops_path"], ops)
return True
def _cleanup_bin_files(plane_dir, keep_raw, keep_reg):
"""delete bin files if user doesn't want them, swallowing OS errors."""
if not keep_raw:
try:
(plane_dir / "data_raw.bin").unlink(missing_ok=True)
except OSError:
pass
if not keep_reg:
try:
(plane_dir / "data.bin").unlink(missing_ok=True)
except OSError:
pass
[docs]
def run_plane(
input_data,
save_path: str | Path | None = None,
ops: dict | str | Path = None,
db: dict | None = None,
settings: dict | None = None,
chan2_file: str | Path | None = None,
keep_raw: bool = False,
keep_reg: bool = True,
force_reg: bool = False,
force_detect: bool = False,
frame_indices: list | None = None,
dff_window_size: int = None,
dff_percentile: int = 20,
dff_smooth_window: int = None,
norm_method: str = "zscore",
correct_neuropil: bool = True,
accept_all_cells: bool = False,
cell_filters: list = None,
rastermap_kwargs: dict = None,
save_json: bool = False,
plane_name: str | None = None,
reader_kwargs: dict = None,
writer_kwargs: dict = None,
**kwargs,
) -> Path:
"""
Processes a single imaging plane using suite2p.
Handles registration, segmentation, filtering, dF/F calculation, and plotting.
Now accepts both file paths and lazy arrays.
Parameters
----------
input_data : str, Path, or lazy array
Input data. Can be a file path or an mbo_utilities lazy array.
save_path : str or Path, optional
Root directory to save the results. A subdirectory will be created based on
the input filename or `plane_name` parameter.
ops : dict, str or Path, optional
Path to or dict of user‐supplied ops (flat fork shape). Detection
accepts suite2p's ``"algorithm"`` or the fork's ``"anatomical_only"``
interchangeably (kept consistent; invalid/legacy values warned and
mapped to suite2p-valid ones). Pass ``db`` / ``settings`` for the
nested upstream shape.
chan2_file : str, optional
Path to structural / anatomical data used for registration.
keep_raw : bool, default False
If True, do not delete the raw binary (`data_raw.bin`) after processing.
keep_reg : bool, default True
If True, keep the registered binary (`data.bin`) after processing.
force_reg : bool, default False
If True, force a new registration.
force_detect : bool, default False
If True, force ROI detection.
dff_window_size : int, optional
Frames for rolling percentile baseline. Default: auto-calculated (~10*tau*fs).
dff_percentile : int, default 20
Percentile for baseline F0.
dff_smooth_window : int, optional
Smoothing window for dF/F. Default: auto-calculated.
norm_method : str, default "zscore"
Normalization for norm_traces.npy: "dff" (rolling percentile ΔF/F)
or "zscore" (per-ROI (F - mean) / std). Quality metrics always use
ΔF/F regardless of this setting.
accept_all_cells : bool, default False
If True, mark all detected ROIs as accepted cells.
cell_filters : list[dict], optional
Filters to apply to detected ROIs (e.g. diameter, area).
Default is no filters.
Example: [{"name": "max_diameter", "min_diameter_um": 4, "max_diameter_um": 35}]
rastermap_kwargs : dict, optional
Per-plane rastermap config. Default None (off). Pass a dict
(possibly empty) to enable; contents override the cell-count-aware
defaults passed to rastermap.Rastermap().
Example: {"n_clusters": 50, "n_PCs": 64}.
save_json : bool, default False
Save ops as JSON.
frame_indices : list[int], optional
Explicit 0-based timepoint indices. Supports stride
(e.g. ``list(range(0, 1574, 2))`` for every other frame).
When provided, the binary on disk contains exactly these
frames, and `OutputMetadata` reactively scales `fs` in ops.npy
based on the implicit stride. Takes precedence over any
``num_frames`` in ``writer_kwargs``.
plane_name : str, optional
Custom name for the plane subdirectory.
reader_kwargs : dict, optional
Arguments for mbo_utilities.imread.
writer_kwargs : dict, optional
Arguments for binary writing.
**kwargs : dict
Additional arguments passed to Suite2p.
Returns
-------
Path
Path to the saved ops.npy file.
"""
from mbo_utilities import imread, imwrite
from mbo_utilities.metadata import get_metadata
_resolve_gpu_env()
progress_callback = kwargs.pop("progress_callback", None)
if "debug" in kwargs:
logger.setLevel(logging.DEBUG)
logger.info("Debug mode enabled.")
# Validate cell_filters (use default if None)
if cell_filters is None:
cell_filters = DEFAULT_CELL_FILTERS
# Handle input_data type
input_path = None
input_arr = None
if _is_lazy_array(input_data):
input_arr = input_data
# Try to get a filename for naming purposes, otherwise require plane_name
filenames = getattr(input_arr, "filenames", [])
if filenames:
input_path = Path(filenames[0])
elif plane_name is None:
raise ValueError(
"plane_name is required when input is an array without filenames."
)
else:
# dummy path for internal logic compatibility
input_path = Path(f"{plane_name}.tif")
elif isinstance(input_data, (str, Path)):
input_path = Path(input_data)
else:
raise TypeError(
f"input_data must be path or lazy array, got {type(input_data)}"
)
input_parent = input_path.parent
# Save path handling
if save_path is None:
if input_arr is not None:
raise ValueError("save_path is required when input is an array.")
# binary inputs with ops.npy are processed in-place
is_binary_input = input_path.suffix == ".bin"
binary_with_ops = is_binary_input and (input_path.parent / "ops.npy").exists()
if binary_with_ops:
save_path = input_parent
else:
save_path = input_parent
else:
save_path = Path(save_path)
save_path.mkdir(parents=True, exist_ok=True)
# Directory setup
skip_imwrite = False
is_binary_input = input_path.suffix == ".bin"
binary_with_ops = is_binary_input and (input_path.parent / "ops.npy").exists()
# load ops defaults and user settings first (needed for plane number).
# accepts EITHER ops=... (legacy flat dict) OR db=... / settings=... (new
# upstream-shaped dicts). when both are supplied, db/settings are
# flattened first and any explicit ops keys win the merge.
from lbm_suite2p_python.db_settings import merge_db_settings_into_ops
ops_default = default_ops()
ops_user = load_ops(ops) if ops else {}
if db is not None or settings is not None:
ops_user = merge_db_settings_into_ops(ops_user, db, settings)
# keep the flat (anatomical_only/sparse_mode) and suite2p (algorithm)
# detection spellings consistent + valid before they merge into ops.
reconcile_detection_keys(ops_user)
ops = {**ops_default, **ops_user, "data_path": str(input_path.resolve())}
# normalize kwargs
reader_kwargs = reader_kwargs or {}
writer_kwargs = writer_kwargs or {}
# determine if we're processing existing binary in-place
if binary_with_ops and (
input_path.parent == save_path or save_path == input_parent
):
# resolve the plane-specific source dir. _resolve_source_plane_dir
# raises on ambiguity / no match instead of silently using input_path.parent.
target_plane = ops.get("plane", 1)
source_plane_num = ops.get("source_plane_num")
plane_dir = _resolve_source_plane_dir(
input_arr, input_path, target_plane, source_plane_num=source_plane_num
)
print(f"Processing existing binary in-place: {plane_dir}")
skip_imwrite = True
ops_file = plane_dir / "ops.npy"
existing_ops = (
np.load(ops_file, allow_pickle=True).item() if ops_file.exists() else {}
)
metadata = {
k: v
for k, v in existing_ops.items()
if k in ("plane", "fs", "dx", "dy", "dz", "z_step", "Ly", "Lx", "nframes")
}
file = None
# merge: defaults < existing (registration results, images, etc.) < user overrides
ops = {
**ops_default,
**existing_ops,
**ops_user,
"data_path": str(input_path.resolve()),
}
elif binary_with_ops:
# binary input with ops but different save_path: stage detection
# outputs + ops.npy into the new plane_dir, repoint raw_file /
# reg_file / chan2_file at the source binaries. binaries stay
# at the source so the new dir is small and copy-pasteable.
skip_imwrite = True
file = None
# resolve the plane-specific source dir (raises on ambiguity / miss).
target_plane = ops.get("plane", 1)
source_plane_num = ops.get("source_plane_num")
src_dir = _resolve_source_plane_dir(
input_arr, input_path, target_plane, source_plane_num=source_plane_num
)
existing_ops = np.load(src_dir / "ops.npy", allow_pickle=True).item()
# remember the original acquisition source before the merge below
# overwrites data_path with the staged binary path. needed by
# force_reg path when the source has no data_raw.bin.
original_data_path = existing_ops.get("data_path")
metadata = {
k: v
for k, v in existing_ops.items()
if k in ("plane", "fs", "dx", "dy", "dz", "z_step", "Ly", "Lx", "nframes")
}
# determine plane number for directory naming
if "plane" in ops:
plane = ops["plane"]
elif "plane" in existing_ops:
plane = existing_ops["plane"]
else:
tag = derive_tag_from_filename(input_path)
plane = get_plane_num_from_tag(tag, fallback=1)
# source_plane_num (set by run_volume when the loaded array came
# from a plane-subset save) overrides `plane` for dir naming only.
# keeps slicing (which uses ops["plane"] as a local index) intact.
naming_plane = ops.get("source_plane_num", plane)
if plane_name is not None:
subdir_name = plane_name
else:
nframes_hint = existing_ops.get("nframes_chan1") or existing_ops.get(
"nframes"
)
subdir_name = generate_plane_dirname(
plane=naming_plane,
nframes=nframes_hint,
frame_start=(int(min(frame_indices)) + 1) if frame_indices else 1,
frame_stop=(int(max(frame_indices)) + 1) if frame_indices else None,
)
plane_dir = save_path / subdir_name
plane_dir.mkdir(exist_ok=True)
ops_file = plane_dir / "ops.npy"
# merge existing ops with user overrides BEFORE staging, so the
# stage call can mutate the path pointers on the final ops dict
# without them being wiped by a later merge.
ops = {
**ops_default,
**existing_ops,
**ops_user,
"data_path": str(input_path.resolve()),
}
if not force_reg:
# registration writes to data.bin, but data.bin lives in the
# source dir — running it would overwrite the user's source
# binary. force detection-only; user can re-register from the
# original tiff/zarr by passing force_reg=True (Force in the
# GUI), which routes writes into the new plane_dir.
if ops_user.get("do_registration", 1):
logger.warning(
"do_registration ignored: the source directory already "
"contains a data.bin and running registration here would "
"overwrite it. Forcing do_registration=0 (detection-only). "
"Select Force in the Registration column (force_reg=True) "
"to re-register from the original input into the new "
"save_path."
)
ops["do_registration"] = 0
_stage_source_into_plane_dir(src_dir, plane_dir, ops)
if force_reg:
# writes go to plane_dir/data.bin — discard the source pointer
# that _stage_source_into_plane_dir set.
ops.pop("reg_file", None)
# existing_ops carries a `raw_file` path from the original
# run; if keep_raw=False removed the file, the string is
# still present but stale. existence-check, don't trust the
# string alone.
_raw = ops.get("raw_file")
if not _raw or not Path(_raw).exists():
import shutil
target_raw = plane_dir / "data_raw.bin"
# prefer the original acquisition (tiff/zarr) when it's
# still on disk — avoids re-registering already-
# registered data. Skip when it points at the same .bin
# the user just loaded (no improvement, and imread of a
# stale data_path could be wrong).
use_original = (
bool(original_data_path)
and Path(original_data_path).exists()
and Path(original_data_path).resolve() != input_path.resolve()
)
if use_original:
print(
f" Force registration: rewriting data_raw.bin from "
f"{Path(original_data_path).name}"
)
file = imread(Path(original_data_path), **reader_kwargs)
if hasattr(file, "metadata"):
metadata = dict(file.metadata)
else:
metadata = get_metadata(Path(original_data_path))
skip_imwrite = False
else:
# original acquisition is gone (or IS the input).
# seed data_raw.bin from the staged data.bin so
# registration has frames to work with. Re-registers
# already-registered data (near-zero shifts) — fine
# for detection / bad-frame re-runs.
print(
f" Force registration: original acquisition "
f"unavailable; seeding data_raw.bin from "
f"{input_path.name} (re-registering already-"
f"registered data)"
)
shutil.copy2(input_path, target_raw)
ops["raw_file"] = str(target_raw)
else:
skip_imwrite = False
# determine plane number for directory naming (from ops or input filename)
if "plane" in ops:
plane = ops["plane"]
else:
tag = derive_tag_from_filename(input_path)
plane = get_plane_num_from_tag(tag, fallback=1)
# source_plane_num (set by run_volume when the loaded array came
# from a plane-subset save) overrides `plane` for dir naming only.
naming_plane = ops.get("source_plane_num", plane)
# compute plane_dir early so we can check if binaries already exist
if plane_name is not None:
subdir_name = plane_name
else:
# prefer the user-specified frame limit over raw array shape
nframes_hint = (
writer_kwargs.get("num_frames")
or ops.get("nframes")
)
if not nframes_hint and input_arr is not None and hasattr(input_arr, "shape"):
nframes_hint = input_arr.shape[0]
# if input was a path, peek at metadata for frame count
if not nframes_hint and input_arr is None and input_path is not None:
try:
from mbo_utilities import get_metadata
md = get_metadata(input_path)
nframes_hint = (
md.get("num_timepoints")
or md.get("nframes")
or md.get("num_frames")
)
except Exception:
nframes_hint = None
subdir_name = generate_plane_dirname(
plane=naming_plane,
nframes=nframes_hint,
frame_start=(int(min(frame_indices)) + 1) if frame_indices else 1,
frame_stop=(int(max(frame_indices)) + 1) if frame_indices else None,
)
plane_dir = save_path / subdir_name
plane_dir.mkdir(exist_ok=True)
ops_file = plane_dir / "ops.npy"
# extract expected dims from input for cache validation
# honors writer_kwargs["num_frames"] limit if user requested fewer frames
exp_nframes = writer_kwargs.get("num_frames")
exp_ly = exp_lx = None
if input_arr is not None and hasattr(input_arr, "shape"):
if exp_nframes is None:
exp_nframes = input_arr.shape[0]
exp_ly = input_arr.shape[-2]
exp_lx = input_arr.shape[-1]
# check if binaries already exist in the actual output directory
should_write = _should_write_bin(
ops_file,
force=force_reg,
expected_nframes=exp_nframes,
expected_ly=exp_ly,
expected_lx=exp_lx,
)
if should_write:
if input_arr is not None:
file = input_arr
else:
file = imread(input_path, **reader_kwargs)
if hasattr(file, "metadata"):
metadata = file.metadata
else:
metadata = get_metadata(input_path)
else:
print(
f"Skipping data_raw.bin write, already exists and passes data validation checks."
)
file = None
# load metadata from existing ops if available
if ops_file.exists():
existing_ops = np.load(ops_file, allow_pickle=True).item()
metadata = {
k: v
for k, v in existing_ops.items()
if k in ("plane", "fs", "dx", "dy", "dz", "z_step", "Ly", "Lx", "nframes")
}
# ensure registration metadata from previous runs is preserved
ops = {
**ops_default,
**existing_ops,
**ops_user,
"data_path": str(input_path.resolve()),
}
else:
metadata = {}
# plane number handling (finalize)
if "plane" in ops:
plane = ops["plane"]
elif "plane" in metadata:
plane = metadata["plane"]
ops["plane"] = plane
else:
tag = derive_tag_from_filename(input_path)
plane = get_plane_num_from_tag(tag, fallback=ops.get("plane", None))
ops["plane"] = plane
metadata["plane"] = plane
ops["save_path"] = str(plane_dir.resolve())
# propagate voxel size from metadata into ops (user ops take precedence via setdefault)
vs = get_voxel_size(metadata)
if vs.dz is not None:
ops.setdefault("dz", vs.dz)
ops.setdefault("z_step", vs.dz)
if vs.dx != 1.0:
ops.setdefault("dx", vs.dx)
if vs.dy != 1.0:
ops.setdefault("dy", vs.dy)
# Propagate fs from source metadata into ops the same way dz/dx/dy
# are propagated above. WITHOUT this, lbm's `default_ops()` ships
# `fs=10.0` and that hardcoded default wins over the actual source
# acquisition fs when the metadata kwarg merge happens inside
# mbo's `imwrite` (the kwarg takes precedence over arr.metadata).
# Result: any downstream reactive scaling — including the writer's
# OutputMetadata fs/stride scaling — divides 10 instead of the
# real fs, producing nonsense like fs=5 from a 30Hz source +
# every-other-frame stride.
src_fs = metadata.get("fs") if metadata else None
if src_fs is None and file is not None and hasattr(file, "metadata"):
src_fs = (file.metadata or {}).get("fs")
if src_fs is not None:
# only override the lbm default — don't overwrite a value the
# user explicitly set in their ops_user dict.
if ops.get("fs") in (None, 10.0) or "fs" not in ops_user:
ops["fs"] = float(src_fs)
# When called directly (not via run_volume), the caller may pass
# frame_indices for stride scaling. run_volume already invoked the
# reactive helper before reaching us, so re-running it here is a
# no-op when there's no T stride. When there IS a T stride from
# this entry point, scale fs from the source metadata via the
# reactive helper. plane_indices is None here because we only see
# one plane at a time at this layer.
if frame_indices is not None and file is not None:
_src_meta = (
dict(file.metadata) if hasattr(file, "metadata") else None
)
_src_shape = (
tuple(file._shape5d()) if hasattr(file, "_shape5d") else None
)
_apply_reactive_metadata(
ops=ops,
source_metadata=_src_meta,
source_shape=_src_shape,
frame_indices=frame_indices,
plane_indices=None, # this layer only sees one plane
logger=logger,
)
# store source filename info in ops for traceability
ops["source_dirname"] = plane_dir.name
ops["source_input"] = str(input_path.name)
# Write binary
if progress_callback:
progress_callback(step="writing_binary", message="Writing binary...")
if not skip_imwrite and file is not None:
md_combined = {**metadata, **ops}
print(f"Writing binary to {plane_dir}...")
bin_start = time.time()
# extract single plane if data is volumetric
write_planes = [plane] if _get_num_planes(file) > 1 else None
write_kw = dict(writer_kwargs)
# If the caller gave us explicit frame indices, pass them as
# `frames=` (1-based) to imwrite. This wins over any stale
# `num_frames` truncation in writer_kwargs — strided semantics
# require an explicit index list, not a count.
if frame_indices is not None:
write_kw["frames"] = [int(i) + 1 for i in frame_indices]
write_kw.pop("num_frames", None)
imwrite(
file,
plane_dir,
ext=".bin",
metadata=md_combined,
output_name="data_raw.bin",
overwrite=True,
planes=write_planes,
show_progress=False,
**write_kw,
)
# Record binary write
# Reload ops from disk to get Lx, Ly, and other metadata added by imwrite
if ops_file.exists():
ops = np.load(ops_file, allow_pickle=True).item()
_add_processing_step(
ops,
"binary_write",
input_files=[str(input_path)],
duration_seconds=time.time() - bin_start,
extra={"plane": plane, "shape": list(file.shape)},
)
save_ops_db_settings(ops_file, ops)
# Determine processing needs.
# explicit user toggles win over file-state inference: when the caller
# turned off registration AND detection (and didn't pass a force flag),
# honor the intent and skip suite2p entirely regardless of what's on
# disk. post-processing below still runs and guards its own inputs.
user_skip_reg = (not ops.get("do_registration", 1)) and not force_reg
user_skip_detect = (not ops.get("roidetect", 1)) and not force_detect
stat_file = plane_dir / "stat.npy"
# If we just wrote a fresh data_raw.bin in this run, any stat.npy /
# ops.npy left in plane_dir from a prior run is stale and must not
# short-circuit suite2p. fresh raw data needs registration and (unless
# user-disabled) detection regardless of leftover artifacts.
fresh_binary_written = (not skip_imwrite) and (file is not None) and should_write
if user_skip_reg and user_skip_detect:
needs_reg = False
needs_detect = False
else:
# detection: needed when stat.npy is missing or empty, unless the
# user explicitly disabled roidetect (file state can't fabricate
# detection results).
needs_detect = False
if force_detect:
needs_detect = True
elif fresh_binary_written:
needs_detect = not user_skip_detect
elif not stat_file.exists():
needs_detect = not user_skip_detect
elif ops.get("roidetect", 1):
stat = np.load(stat_file, allow_pickle=True)
if stat is None or len(stat) == 0:
needs_detect = True
# registration: needed when ops is missing or _should_register says
# so, unless the user explicitly disabled do_registration.
if force_reg:
needs_reg = True
elif user_skip_reg:
needs_reg = False
elif fresh_binary_written:
needs_reg = True
elif not ops_file.exists():
needs_reg = True
else:
needs_reg = _should_register(ops_file)
# Update ops logic
if force_reg:
ops["do_registration"] = 1
elif not needs_reg:
ops["do_registration"] = 0
elif "do_registration" not in ops_user:
ops["do_registration"] = 1
if force_detect:
ops["roidetect"] = 1
elif "roidetect" not in ops_user:
ops["roidetect"] = int(needs_detect)
# Channel 2 handling
if chan2_file is not None:
chan2_path = Path(chan2_file)
if chan2_path.exists():
chan2_data = imread(chan2_path, **reader_kwargs)
chan2_md = getattr(chan2_data, "metadata", {})
imwrite(
chan2_data,
plane_dir,
ext=".bin",
metadata=chan2_md,
structural=True,
show_progress=False,
)
ops["chan2_file"] = str((plane_dir / "data_chan2.bin").resolve())
# Use _shape5d() (the always-5D contract) so natural-rank
# arrays (BinArray, a 4D TZYX _ChannelView) still report the
# real T dimension at index 0.
if hasattr(chan2_data, "_shape5d"):
ops["nframes_chan2"] = int(chan2_data._shape5d()[0])
elif hasattr(chan2_data, "shape"):
ops["nframes_chan2"] = int(chan2_data.shape[0])
else:
ops["nframes_chan2"] = 0
ops["nchannels"] = 2
ops["align_by_chan"] = 2
# ensure ops paths point to current plane_dir before running suite2p.
# handles cases where data was moved from its original location.
ops["ops_path"] = str(ops_file)
ops["save_path"] = str(plane_dir.resolve())
raw_bin = plane_dir / "data_raw.bin"
if raw_bin.exists():
ops["raw_file"] = str(raw_bin)
reg_bin = plane_dir / "data.bin"
if reg_bin.exists():
ops["reg_file"] = str(reg_bin)
# Run Suite2p (skip entirely when nothing needs to be done)
skip_suite2p = not needs_reg and not needs_detect
# Persistence policy:
# - When suite2p will run, save ops/db/settings together as a normal
# record of the run.
# - When skipping (cache-hit / user-skip), still persist ops.npy so
# the path repoint above (save_path / ops_path / raw_file /
# reg_file) reaches disk — otherwise post-processing reloads the
# stale paths copied in by _stage_source_into_plane_dir and reads
# F.npy/etc. from the wrong location. settings.npy / db.npy are
# left untouched so they keep being a faithful record of how the
# on-disk artifacts were produced.
if not skip_suite2p:
save_ops_db_settings(ops_file, ops)
else:
np.save(ops_file, ops, allow_pickle=True)
if skip_suite2p:
if user_skip_reg and user_skip_detect:
print(" Suite2p disabled by user toggles; regenerating figures only.")
else:
print(" Registration and detection already complete, skipping suite2p.")
print(" Re-generating post-processing and figures...")
# when the user toggled everything off and there's nothing to
# regenerate from, bail cleanly instead of letting each
# post-processing step emit its own "file not found" warning.
if user_skip_reg and user_skip_detect and not stat_file.exists():
print(
f" Skipping plane: {plane_dir.name} has no stat.npy — "
"nothing to regenerate with registration and detection off."
)
return ops_file
else:
if progress_callback:
progress_callback(step="suite2p", message="Running suite2p...")
try:
s2p_start = time.time()
processed = run_plane_bin(ops)
if processed:
updated_ops = load_ops(ops_file)
_add_processing_step(
updated_ops,
"suite2p_pipeline",
duration_seconds=time.time() - s2p_start,
extra={
"do_registration": updated_ops.get("do_registration", 1),
"n_cells": len(
np.load(plane_dir / "stat.npy", allow_pickle=True)
)
if (plane_dir / "stat.npy").exists()
else 0,
},
)
save_ops_db_settings(ops_file, updated_ops)
except Exception as e:
print(f"Error in run_plane_bin: {e}")
traceback.print_exc()
_cleanup_bin_files(plane_dir, keep_raw, keep_reg)
raise
if not processed:
_cleanup_bin_files(plane_dir, keep_raw, keep_reg)
return ops_file
# post-processing
if progress_callback:
progress_callback(step="postprocessing", message="Post-processing...")
# Load original iscell first so we have the true suite2p classification
iscell_original = None
iscell_file = plane_dir / "iscell.npy"
if iscell_file.exists():
iscell_original = np.load(iscell_file, allow_pickle=True)
# 1. Accept All Cells
if accept_all_cells:
if iscell_original is not None:
np.save(plane_dir / "iscell_suite2p.npy", iscell_original.copy())
iscell = iscell_original.copy()
iscell[:, 0] = 1
np.save(iscell_file, iscell)
print(" Marked all ROIs as accepted.")
# 2. Cell Filtering (off by default — empty list)
if cell_filters:
print(f" Applying cell filters: {[f['name'] for f in cell_filters]}")
filter_start = time.time()
try:
iscell_filtered, removed_mask, filter_results = apply_filters(
plane_dir=plane_dir,
filters=cell_filters,
save=True,
)
updated_ops = load_ops(ops_file)
_add_processing_step(
updated_ops,
"cell_filtering",
duration_seconds=time.time() - filter_start,
extra={"n_removed": int(removed_mask.sum())},
)
# flatten filter_results into per-filter metadata for ops.npy
filter_metadata = {}
for r in filter_results:
name = r["name"]
config = r.get("config", {})
info = r.get("info", {})
removed = r.get("removed_mask", np.zeros(0, dtype=bool))
params = {}
for key in [
"min_diameter_um",
"max_diameter_um",
"min_diameter_px",
"max_diameter_px",
"correct_neuropil",
"percentile",
"window_size",
"reject_negative_F0",
"min_F0_abs",
"min_F0_rel",
]:
if key in config and config[key] is not None:
val = config[key]
params[key] = (
round(val, 3) if isinstance(val, float) else val
)
if not params:
for key in ["min_px", "max_px"]:
if key in info and info[key] is not None:
params[key] = round(info[key], 1)
filter_metadata[name] = {
"params": params,
"n_rejected": int(removed.sum()),
}
updated_ops["filter_metadata"] = filter_metadata
save_ops_db_settings(ops_file, updated_ops)
try:
fig = plot_filtered_cells(
plane_dir,
iscell_original,
iscell_filtered,
save_path=plane_dir / "13_filtered_cells.png",
)
import matplotlib.pyplot as plt
plt.close(fig)
plot_filter_exclusions(
plane_dir, iscell_filtered, filter_results, save_dir=plane_dir
)
plot_cell_filter_summary(
plane_dir, save_path=plane_dir / "15_filter_summary.png"
)
except Exception as e:
print(f" Warning: Filter plots failed: {e}")
except Exception as e:
print(f" Warning: Cell filtering failed: {e}")
# 3. Normalized-trace calculation (norm_traces.npy)
F_file = plane_dir / "F.npy"
Fneu_file = plane_dir / "Fneu.npy"
if F_file.exists() and Fneu_file.exists():
print(f" Computing norm_traces ({norm_method})...")
norm_start = time.time()
F = np.load(F_file)
Fneu = np.load(Fneu_file)
if correct_neuropil:
F_corr = F - 0.7 * Fneu
else:
F_corr = F
current_ops = load_ops(ops_file)
if norm_method == "zscore":
norm = zscore_trace(
F_corr,
smooth_window=dff_smooth_window,
fs=current_ops.get("fs", 30.0),
tau=current_ops.get("tau", 1.0),
)
else:
norm = dff_rolling_percentile(
F_corr,
window_size=dff_window_size,
percentile=dff_percentile,
smooth_window=dff_smooth_window,
fs=current_ops.get("fs", 30.0),
tau=current_ops.get("tau", 1.0),
)
np.save(plane_dir / "norm_traces.npy", norm)
_add_processing_step(
current_ops,
"norm_calculation",
duration_seconds=time.time() - norm_start,
extra={"method": norm_method, "percentile": dff_percentile},
)
save_ops_db_settings(ops_file, current_ops)
# 3b. Persist post-processing kwargs to ops.npy unconditionally.
# These live as top-level ops keys (NOT in the suite2p settings
# schema), so settings.npy / db.npy stay a record of the suite2p
# stages only — ops.npy carries the lsp / GUI-tunable knobs. Done
# outside the F.npy/Fneu.npy gate above so detection-skipped runs
# still leave a record of which knobs the caller passed in. Callers
# like mbo_utilities / mbo studio diff these against their dataclass
# defaults to flag "modified" parameters in the GUI.
if ops_file.exists():
try:
_post_ops = load_ops(ops_file)
_post_ops["dff_window_size"] = dff_window_size
_post_ops["dff_percentile"] = dff_percentile
_post_ops["dff_smooth_window"] = dff_smooth_window
_post_ops["norm_method"] = norm_method
_post_ops["correct_neuropil"] = bool(correct_neuropil)
_post_ops["accept_all_cells"] = bool(accept_all_cells)
_post_ops["save_json"] = bool(save_json)
# cell_filters: list[dict] (or None). Stored as-is so a reload
# can reconstruct each criterion's enabled/value pair.
_post_ops["cell_filters"] = cell_filters
# rastermap_kwargs: nested dict {"planar": {...}, "volumetric":
# {...}} or None. Presence of a key is the per-mode enable
# signal; sub-dict contents override Rastermap() defaults.
_post_ops["rastermap_kwargs"] = rastermap_kwargs
save_ops_db_settings(ops_file, _post_ops)
except Exception as _e:
print(f" Warning: persisting post-processing kwargs failed: {_e}")
# 3b. ROI statistics
try:
from lbm_suite2p_python.postprocessing import compute_roi_stats
print(" Computing ROI statistics...")
compute_roi_stats(plane_dir)
except Exception as e:
print(f" Warning: ROI stats computation failed: {e}")
# 4. Plots and Cleanup
try:
plot_zplane_figures(
plane_dir,
dff_percentile=dff_percentile,
dff_window_size=dff_window_size,
dff_smooth_window=dff_smooth_window,
norm_method=norm_method,
correct_neuropil=correct_neuropil,
run_rastermap=rastermap_kwargs is not None,
rastermap_kwargs=rastermap_kwargs,
)
except Exception as e:
print(f" Warning: Plot generation failed: {e}")
if save_json:
ops_to_json(ops_file)
_cleanup_bin_files(plane_dir, keep_raw, keep_reg)
if progress_callback:
progress_callback(step="done", message="Plane complete")
return ops_file
def grid_search(*args, **kwargs):
"""Run a grid search over Suite2p parameters. See lbm_suite2p_python.grid_search module."""
from lbm_suite2p_python.grid_search import grid_search as _grid_search
return _grid_search(*args, **kwargs)