import logging
import time
from datetime import datetime
from pathlib import Path
import os
import traceback
from contextlib import nullcontext
from itertools import product
import copy
import gc
import numpy as np
from lbm_suite2p_python.default_ops import default_ops
from lbm_suite2p_python.postprocessing import (
ops_to_json,
load_planar_results,
load_ops,
dff_rolling_percentile,
apply_filters,
)
from importlib.metadata import version, PackageNotFoundError
def _get_version():
try:
return version("lbm_suite2p_python")
except PackageNotFoundError:
return "0.0.0"
from lbm_suite2p_python.zplane import (
save_pc_panels_and_metrics,
plot_zplane_figures,
plot_filtered_cells,
plot_filter_exclusions,
plot_cell_filter_summary,
)
DEFAULT_CELL_FILTERS = [
{"name": "max_diameter", "min_diameter_um": 4, "max_diameter_um": 35}
]
from mbo_utilities.log import get as get_logger
from mbo_utilities.metadata import (
get_param,
get_voxel_size,
)
logger = get_logger("run_lsp")
from lbm_suite2p_python._benchmarking import get_cpu_percent, get_ram_used
from lbm_suite2p_python.volume import (
plot_volume_diagnostics,
plot_orthoslices,
plot_3d_roi_map,
get_volume_stats,
)
from mbo_utilities.arrays import (
iter_rois,
supports_roi,
_normalize_planes,
_build_output_path,
)
from mbo_utilities._writers import _write_plane
PIPELINE_TAGS = ("plane", "roi", "z", "plane_", "roi_", "z_")
from lbm_suite2p_python.utils import _is_lazy_array, _get_num_planes
def _get_suite2p_version():
"""Get suite2p version string."""
try:
import suite2p
return getattr(suite2p, "__version__", "unknown")
except ImportError:
return "not installed"
def _add_processing_step(ops, step_name, input_files=None, duration_seconds=None, extra=None):
"""
Add a processing step to ops["processing_history"].
Each step is appended to the history list, preserving previous runs.
This allows tracking of re-runs and incremental processing.
Parameters
----------
ops : dict
The ops dictionary to update.
step_name : str
Name of the processing step (e.g., "binary_write", "registration", "detection").
input_files : list of str, optional
List of input file paths for this step.
duration_seconds : float, optional
How long this step took.
extra : dict, optional
Additional metadata for this step.
Returns
-------
dict
The updated ops dictionary.
"""
if "processing_history" not in ops:
ops["processing_history"] = []
step_record = {
"step": step_name,
"timestamp": datetime.now().isoformat(),
"lbm_suite2p_python_version": _get_version(),
"suite2p_version": _get_suite2p_version(),
}
if input_files is not None:
step_record["input_files"] = [str(f) for f in input_files] if not isinstance(input_files, str) else [input_files]
if duration_seconds is not None:
step_record["duration_seconds"] = round(duration_seconds, 2)
if extra is not None:
step_record.update(extra)
ops["processing_history"].append(step_record)
return ops
add_processing_step = _add_processing_step
[docs]
def pipeline(
input_data,
save_path: str | Path = None,
ops: dict = None,
planes: list | int = None,
roi_mode: int = None,
keep_reg: bool = True,
keep_raw: bool = False,
force_reg: bool = False,
force_detect: bool = False,
num_timepoints: int = None,
dff_window_size: int = None,
dff_percentile: int = 20,
dff_smooth_window: int = None,
cell_filters: list = None,
accept_all_cells: bool = False,
save_json: bool = False,
reader_kwargs: dict = None,
writer_kwargs: dict = None,
# deprecated parameters
roi: int = None,
num_frames: int = None,
**kwargs,
) -> list[Path]:
"""
Unified Suite2p processing pipeline.
Wrapper around run_volume (for 4D data) and run_plane (for 3D data).
Automatically detects input type and delegates processing.
Parameters
----------
input_data : str, Path, list, or lazy array
Input data source (file, directory, list of files, or array).
save_path : str or Path, optional
Output directory.
ops : dict, optional
Suite2p parameters.
planes : int or list, optional
Planes to process (1-based index).
roi_mode : int, optional
ROI mode for ScanImage data (None=stitch, 0=split, N=single).
keep_reg, keep_raw : bool
Keep binary files.
force_reg, force_detect : bool
Force re-processing.
num_timepoints : int, optional
Limit frames.
dff_window_size, dff_percentile, dff_smooth_window : optional
dF/F parameters.
cell_filters : list, optional
Filters to apply. Default is 4-35um diameter if None. Pass [] to disable.
accept_all_cells : bool
Mark all detected ROIs as accepted.
**kwargs
Additional args passed to sub-functions.
Returns
-------
list[Path]
List of paths to produced ops.npy files.
"""
from mbo_utilities import imread
from mbo_utilities.arrays import supports_roi
# 1. Handle Deprecations
if roi is not None:
import warnings
warnings.warn("'roi' is deprecated, use 'roi_mode'", DeprecationWarning, stacklevel=2)
roi_mode = roi
if num_frames is not None:
import warnings
warnings.warn("'num_frames' is deprecated, use 'num_timepoints'", DeprecationWarning, stacklevel=2)
num_timepoints = num_frames
# Normalize kwargs
reader_kwargs = reader_kwargs or {}
writer_kwargs = writer_kwargs or {}
if num_timepoints is not None:
writer_kwargs["num_frames"] = num_timepoints
# 2. Load Input to Determine Dimensionality
# We need to know if it's 3D (single plane) or 4D (volume)
is_list = isinstance(input_data, (list, tuple))
if is_list:
# Check if list of files implies volume (files with plane tags)
# We'll just assume list = volume for now, as run_volume handles lists
is_volumetric = True
arr = None
else:
# Load array to check dimensions
# If input is already array, this is fast. If path, it loads lazy array.
if _is_lazy_array(input_data):
arr = input_data
else:
print(f"Loading input to determine dimensions: {input_data}")
arr = imread(input_data, **reader_kwargs)
# Apply ROI mode if applicable check dimensions
if roi_mode is not None and supports_roi(arr):
arr.roi = roi_mode
# Check dims
# TZYX (4D) or TYX (3D)
if arr.ndim == 4:
is_volumetric = True
elif arr.ndim == 3:
is_volumetric = False
# Check if user asked for multiple planes on a 3D array?
# If array is 3D, it's one plane.
# Unless it's a stack of planes? But imread usually returns TYX for single plane tiff.
# If it's a stack of planes (ZYX?), current pipeline assumes Time is first dim.
# So 3D TYX is one plane.
else:
# handle odd cases
is_volumetric = False
# 3. Delegate
if is_volumetric:
print("Delegating to run_volume (4D input detected)...")
if arr is not None:
input_arg = arr
else:
input_arg = input_data
return run_volume(
input_data=input_arg,
save_path=save_path,
ops=ops,
planes=planes,
keep_reg=keep_reg,
keep_raw=keep_raw,
force_reg=force_reg,
force_detect=force_detect,
dff_window_size=dff_window_size,
dff_percentile=dff_percentile,
dff_smooth_window=dff_smooth_window,
accept_all_cells=accept_all_cells,
cell_filters=cell_filters,
save_json=save_json,
reader_kwargs=reader_kwargs,
writer_kwargs=writer_kwargs,
**kwargs
)
else:
print("Delegating to run_plane (3D input detected)...")
# run_plane returns a single Path, we wrap in list
ops_path = run_plane(
input_data=arr, # Pass the array we loaded (with ROI applied)
save_path=save_path,
ops=ops,
# planes argument is ignored for single plane, or used to validate?
# run_plane infers using 'plane' in ops/metadata.
# If user passed 'planes', we should check if they asked for something valid?
# For 3D input, 'planes' argument implies iterating? But it's 3D...
# We'll rely on run_plane to extract metadata.
keep_reg=keep_reg,
keep_raw=keep_raw,
force_reg=force_reg,
force_detect=force_detect,
dff_window_size=dff_window_size,
dff_percentile=dff_percentile,
dff_smooth_window=dff_smooth_window,
accept_all_cells=accept_all_cells,
cell_filters=cell_filters,
save_json=save_json,
reader_kwargs=reader_kwargs,
writer_kwargs=writer_kwargs,
**kwargs
)
return [ops_path]
def derive_tag_from_filename(path):
"""
Derive a folder tag from a filename based on “planeN”, “roiN”, or "tagN" patterns.
Parameters
----------
path : str or pathlib.Path
File path or name whose stem will be parsed.
Returns
-------
str
If the stem starts with “plane”, “roi”, or “res” followed by an integer,
returns that tag plus the integer (e.g. “plane3”, “roi7”, “res2”).
Otherwise returns the original stem unchanged.
Examples
--------
>>> derive_tag_from_filename("plane_01.tif")
'plane1'
>>> derive_tag_from_filename("plane2.bin")
'plane2'
>>> derive_tag_from_filename("roi5.raw")
'roi5'
>>> derive_tag_from_filename("ROI_10.dat")
'roi10'
>>> derive_tag_from_filename("res-3.h5")
'res3'
>>> derive_tag_from_filename("assembled_data_1.tiff")
'assembled_data_1'
>>> derive_tag_from_filename("file_12.tif")
'file_12'
"""
name = Path(path).stem
for tag in PIPELINE_TAGS:
low = name.lower()
if low.startswith(tag):
suffix = name[len(tag) :]
if suffix and (suffix[0] in ("_", "-")):
suffix = suffix[1:]
if suffix.isdigit():
return f"{tag}{int(suffix)}"
return name
def get_plane_num_from_tag(tag: str, fallback: int = None) -> int:
"""
Extract the plane number from a tag string like "plane3" or "roi7".
Parameters
----------
tag : str
A tag string (e.g., "plane3", "roi7", "z10") typically from derive_tag_from_filename.
fallback : int, optional
Value to return if no number can be extracted from the tag.
Returns
-------
int
The extracted plane number, or the fallback value if extraction fails.
Examples
--------
>>> get_plane_num_from_tag("plane3")
3
>>> get_plane_num_from_tag("roi7")
7
>>> get_plane_num_from_tag("z10")
10
>>> get_plane_num_from_tag("assembled_data", fallback=0)
0
"""
import re
match = re.search(r"(\d+)$", tag)
if match:
return int(match.group(1))
return fallback
[docs]
def run_volume(
input_data,
save_path: str | Path = None,
ops: dict | str | Path = None,
planes: list | int = None,
keep_reg: bool = True,
keep_raw: bool = False,
force_reg: bool = False,
force_detect: bool = False,
dff_window_size: int = None,
dff_percentile: int = 20,
dff_smooth_window: int = None,
accept_all_cells: bool = False,
cell_filters: list = None,
save_json: bool = False,
reader_kwargs: dict = None,
writer_kwargs: dict = None,
**kwargs,
):
"""
Processes a full volumetric imaging dataset using Suite2p.
Iterates over 3D planes (z-stacks) within the volume, calling run_plane() for each.
Aggregates results into volume_stats.npy and generates volumetric plots.
Parameters
----------
input_data : list, Path, or lazy array
Input data source.
- List of paths: [plane1.tif, plane2.tif, ...]
- Lazy array: 4D array (Time, Z, Y, X)
save_path : str or Path, optional
Base directory to save outputs.
ops : dict, optional
Suite2p parameters.
planes : list or int, optional
Specific planes to process (1-based index).
keep_reg : bool, default True
Keep registered binaries.
keep_raw : bool, default False
Keep raw binaries.
force_reg : bool, default False
Force re-registration.
force_detect : bool, default False
Force detection.
dff_window_size, dff_percentile, dff_smooth_window : optional
dF/F calculation parameters.
accept_all_cells : bool, default False
Mark all ROIs as accepted.
cell_filters : list, optional
Filters to apply (see run_plane).
save_json : bool, default False
Save ops as JSON.
**kwargs
Additional args passed to run_plane.
Returns
-------
list[Path]
List of paths to ops.npy files for processed planes.
"""
from mbo_utilities.arrays import _normalize_planes
from mbo_utilities import imread
from lbm_suite2p_python.merging import merge_mrois
# Handle input data
input_arr = None
input_paths = []
if _is_lazy_array(input_data):
input_arr = input_data
if hasattr(input_arr, "filenames"):
# For lazy arrays backed by files, use the first file's parent as default save_path
if save_path is None and input_arr.filenames:
save_path = Path(input_arr.filenames[0]).parent / "suite2p_results"
elif isinstance(input_data, (list, tuple)):
input_paths = [Path(p) for p in input_data]
if save_path is None and input_paths:
save_path = input_paths[0].parent
elif isinstance(input_data, (str, Path)):
# Single path representing a volume (e.g. 4D tiff, zarr)
# We'll load it as an array to iterate planes
input_path = Path(input_data)
if save_path is None:
save_path = input_path.parent / (input_path.stem + "_results")
# Load as array to determine planes
input_arr = imread(input_path, **(reader_kwargs or {}))
else:
raise TypeError(f"Invalid input_data type: {type(input_data)}")
if save_path is None:
raise ValueError("save_path must be specified.")
save_path = Path(save_path)
save_path.mkdir(parents=True, exist_ok=True)
# Determine num_planes
if input_arr is not None:
try:
num_planes = _get_num_planes(input_arr)
except:
num_planes = 1
else:
num_planes = len(input_paths)
# Normalize planes to process
planes_indices = _normalize_planes(planes, num_planes)
print(f"Processing {len(planes_indices)} planes in volume (Total planes: {num_planes})")
print(f"Output: {save_path}")
ops_files = []
# Iterate
for i, plane_idx in enumerate(planes_indices):
plane_num = plane_idx + 1
# Prepare input for run_plane
if input_arr is not None:
# Pass the whole array, run_plane handles extraction via ops['plane']
current_input = input_arr
else:
# List of files - map plane_num to file index (assuming 1-to-1 if no explicit mapping)
# If input_paths corresponds to ALL planes, then plane_idx indexes into it
if plane_idx < len(input_paths):
current_input = input_paths[plane_idx]
else:
# Fallback or error? Assuming input_files length matches num_planes
current_input = input_paths[0] # Should not happen if logic is correct
# Prepare ops with plane number
current_ops = load_ops(ops) if ops else default_ops()
current_ops["plane"] = plane_num
current_ops["num_zplanes"] = num_planes # useful info
# Call run_plane
try:
print(f"\n--- Volume Step: Plane {plane_num} ---")
ops_file = run_plane(
input_data=current_input,
save_path=save_path,
ops=current_ops,
keep_reg=keep_reg,
keep_raw=keep_raw,
force_reg=force_reg,
force_detect=force_detect,
dff_window_size=dff_window_size,
dff_percentile=dff_percentile,
dff_smooth_window=dff_smooth_window,
accept_all_cells=accept_all_cells,
cell_filters=cell_filters,
save_json=save_json,
reader_kwargs=reader_kwargs,
writer_kwargs=writer_kwargs,
**kwargs
)
ops_files.append(ops_file)
except Exception as e:
print(f"ERROR processing plane {plane_num}: {e}")
traceback.print_exc()
# Post-Loop: Merging and Volume Stats
# Check for multi-ROI merging using metadata (not filename heuristics)
should_merge = False
if input_arr is not None and hasattr(input_arr, "metadata"):
md = input_arr.metadata
roi_mode = md.get("roi_mode")
num_rois = md.get("num_rois") or md.get("num_mrois") or 1
# only merge if roi_mode is "separate" (files written as separate rois)
if roi_mode == "separate" and num_rois > 1:
should_merge = True
if should_merge:
print("Detected mROI data, attempting to merge...")
merged_savepath = save_path / "merged_mrois"
try:
merge_mrois(save_path, merged_savepath)
# Update ops_files to point to merged results
# We need to find the new ops files
# mbo_utilities check_and_merge_mrois returned list, but we don't have it.
# We'll glob the new directory
merged_ops = sorted(list(merged_savepath.glob("**/ops.npy")))
if merged_ops:
ops_files = merged_ops
print(f"Merged {len(ops_files)} planes to {merged_savepath}")
save_path = merged_savepath # Update save_path for subsequent plots
except Exception as e:
print(f"Merging failed: {e}")
# Generate volume statistics
if ops_files:
print("\nGenering volumetric statistics...")
try:
# run_plane already does individual calls, but we need aggregate stats
stats_path = get_volume_stats(ops_files, overwrite=True)
# Volumetric plots
try:
plot_volume_diagnostics(ops_files, save_path / "volume_quality_diagnostics.png")
plot_orthoslices(ops_files, save_path / "orthoslices.png")
plot_3d_roi_map(ops_files, save_path / "roi_map_3d.png", color_by="snr")
plot_3d_roi_map(ops_files, save_path / "roi_map_3d_plane.png", color_by="plane")
except Exception as e:
print(f"Warning: Volume plots failed: {e}")
traceback.print_exc()
except Exception as e:
print(f"Warning: Volume statistics failed: {e}")
traceback.print_exc()
return ops_files
def _should_write_bin(ops_path: Path, force: bool = False, *, validate_chan2: bool | None = None, expected_dtype: np.dtype = np.int16) -> bool:
if force:
return True
ops_path = Path(ops_path)
if not ops_path.is_file():
return True
raw_path = ops_path.parent / "data_raw.bin"
reg_path = ops_path.parent / "data.bin"
chan2_path = ops_path.parent / "data_chan2.bin"
# If neither raw nor registered binary exists, need to write
if not raw_path.is_file() and not reg_path.is_file():
return True
# Use whichever binary exists for validation (prefer raw)
binary_to_validate = raw_path if raw_path.is_file() else reg_path
try:
ops = np.load(ops_path, allow_pickle=True).item()
if validate_chan2 is None:
validate_chan2 = (ops.get("align_by_chan", 1) == 2)
Ly = ops.get("Ly")
Lx = ops.get("Lx")
nframes_raw = ops.get("nframes_chan1") or ops.get("nframes") or ops.get("num_frames")
if (None in (nframes_raw, Ly, Lx)) or (nframes_raw <= 0 or Ly <= 0 or Lx <= 0):
return True
expected_size_raw = int(nframes_raw) * int(Ly) * int(Lx) * np.dtype(expected_dtype).itemsize
actual_size_raw = binary_to_validate.stat().st_size
if actual_size_raw != expected_size_raw or actual_size_raw == 0:
return True
try:
arr = np.memmap(binary_to_validate, dtype=expected_dtype, mode="r", shape=(int(nframes_raw), int(Ly), int(Lx)))
_ = arr[0, 0, 0]
del arr
except Exception:
return True
if validate_chan2:
nframes_chan2 = ops.get("nframes_chan2")
if (not chan2_path.is_file()) or (nframes_chan2 is None) or (nframes_chan2 <= 0):
return True
expected_size_chan2 = int(nframes_chan2) * int(Ly) * int(Lx) * np.dtype(expected_dtype).itemsize
actual_size_chan2 = chan2_path.stat().st_size
if actual_size_chan2 != expected_size_chan2 or actual_size_chan2 == 0:
return True
try:
arr2 = np.memmap(chan2_path, dtype=expected_dtype, mode="r", shape=(int(nframes_chan2), int(Ly), int(Lx)))
_ = arr2[0, 0, 0]
del arr2
except Exception:
return True
return False
except Exception as e:
print(f"Bin validation failed for {ops_path.parent}: {e}")
return True
def _should_register(ops_path: str | Path) -> bool:
"""
Determine whether Suite2p registration still needs to be performed.
Registration is considered complete if any of the following hold:
- A reference image (refImg) exists and is a valid ndarray
- meanImg exists (Suite2p always produces it post-registration)
- Valid registration offsets (xoff/yoff) are present
Returns True if registration *should* be run, False otherwise.
"""
ops = load_ops(ops_path)
has_ref = isinstance(ops.get("refImg"), np.ndarray)
has_mean = isinstance(ops.get("meanImg"), np.ndarray)
# Check for valid offsets - ensure they are actual arrays, not _NoValue or other sentinels
def _has_valid_offsets(key):
val = ops.get(key)
if val is None or not isinstance(val, np.ndarray):
return False
try:
return np.any(np.isfinite(val))
except (TypeError, ValueError):
return False
has_offsets = _has_valid_offsets("xoff") or _has_valid_offsets("yoff")
has_metrics = any(k in ops for k in ("regDX", "regPC", "regPC1", "regDX1"))
# registration done if any of these are true
registration_done = has_ref or has_mean or has_offsets or has_metrics
return not registration_done
def run_plane_bin(ops) -> bool:
"""
Run Suite2p pipeline on pre-written binary files.
Executes registration, cell detection, and signal extraction on binary
data referenced in ops. Requires data_raw.bin and ops.npy to exist.
Parameters
----------
ops : dict or str or Path
Suite2p ops dictionary or path to ops.npy file.
Returns
-------
bool
True if pipeline completed successfully.
"""
from contextlib import nullcontext
from suite2p.io.binary import BinaryFile
from suite2p.run_s2p import pipeline
ops = load_ops(ops)
# Get Ly and Lx with helpful error message if missing
Ly = ops.get("Ly")
Lx = ops.get("Lx")
if Ly is None or Lx is None:
raise KeyError(
f"Missing required dimension keys in ops: Ly={Ly}, Lx={Lx}. "
f"Ensure the binary was written with proper metadata. "
f"Available keys: {list(ops.keys())}"
)
Ly, Lx = int(Ly), int(Lx)
raw_file = ops.get("raw_file")
n_func = ops.get("nframes_chan1") or ops.get("nframes") or ops.get("n_frames")
if raw_file is None or n_func is None:
raise KeyError("Missing raw_file or nframes_chan1")
n_func = int(n_func)
ops_parent = Path(ops["ops_path"]).parent
ops["save_path"] = ops_parent
reg_file = ops_parent / "data.bin"
ops["reg_file"] = str(reg_file)
chan2_file = ops.get("chan2_file", "")
use_chan2 = bool(chan2_file) and Path(chan2_file).exists()
n_chan2 = int(ops.get("nframes_chan2", 0)) if use_chan2 else 0
n_align = n_func if not use_chan2 else min(n_func, n_chan2)
if n_align <= 0:
raise ValueError("Non-positive frame count after alignment selection.")
if use_chan2 and (n_func != n_chan2):
print(f"[run_plane_bin] Trimming to {n_align} frames (func={n_func}, chan2={n_chan2}).")
ops["functional_chan"] = 1
ops["align_by_chan"] = 2 if use_chan2 else 1
ops["nchannels"] = 2 if use_chan2 else 1
ops["nframes"] = n_align
ops["nframes_chan1"] = n_align
if use_chan2:
ops["nframes_chan2"] = n_align
if "diameter" in ops:
# save user's input diameter before suite2p/cellpose overwrites it
# cellpose estimates actual cell diameters and saves median to ops["diameter"]
ops["diameter_user"] = ops["diameter"]
if ops["diameter"] is not None and np.isnan(ops["diameter"]):
ops["diameter"] = 8
ops["diameter_user"] = 8
if (ops["diameter"] in (None, 0)) and ops.get("anatomical_only", 0) > 0:
ops["diameter"] = 8
ops["diameter_user"] = 8
print("Warning: diameter was not set, defaulting to 8.")
# When running registration, reset detection-derived parameters so that
# compute_enhanced_mean_image() will reinitialize them from diameter.
# This fixes a bug where re-running registration on previously processed data
# would inherit spatscale_pix=0 from a failed Cellpose detection, causing
# meanImgE to be computed with a [1,1] filter (all 0.5 output).
run_registration = bool(ops.get("do_registration", True))
if run_registration:
for key in ["spatscale_pix", "Vcorr", "Vmax", "Vmap", "Vsplit", "ihop"]:
if key in ops:
del ops[key]
reg_file_chan2 = ops_parent / "data_chan2_reg.bin" if use_chan2 else None
ops["anatomical_red"] = False
ops["chan2_thres"] = 0.1
# Memory estimation warning for large datasets
if ops.get("roidetect", True) and ops.get("anatomical_only", 0) > 0:
# Estimate memory usage for Cellpose detection
estimated_gb = (Ly * Lx * n_align * 2) / 1e9 # Rough estimate
spatial_scale = ops.get("spatial_scale", 0)
if spatial_scale > 0:
estimated_gb /= (spatial_scale ** 2)
if estimated_gb > 50: # Warn for datasets > 50GB
print(f"Large dataset warning: {estimated_gb:.1f} GB estimated for detection")
if spatial_scale == 0:
print(f" Consider adding 'spatial_scale': 2 to reduce memory usage by 4x")
print(f" Or reduce 'batch_size' (current: {ops.get('batch_size', 500)})")
# When skipping registration, copy data_raw.bin to data.bin and detect valid region
if not run_registration:
print("Registration skipped - copying data_raw.bin to data.bin...")
import shutil
raw_file_path = Path(raw_file)
reg_file_path = Path(reg_file)
# Copy data_raw.bin to data.bin if it doesn't exist or is empty
if raw_file_path.exists():
if not reg_file_path.exists() or reg_file_path.stat().st_size == 0:
print(f" Copying {raw_file_path.name} -> {reg_file_path.name}")
shutil.copy2(raw_file_path, reg_file_path)
else:
print(f" {reg_file_path.name} already exists, skipping copy")
# Detect valid region (exclude dead zones from Suite3D shifts)
# This replicates what Suite2p's registration does via compute_crop()
# IMPORTANT: Skip auto-detection for anatomical_only mode since Cellpose
# returns masks in full image coordinates, not cropped coordinates
use_anatomical = ops.get("anatomical_only", 0) > 0
if "yrange" not in ops or "xrange" not in ops:
if use_anatomical:
# For anatomical detection, always use full image to avoid coordinate mismatch
print(" Using full image dimensions for anatomical detection (avoids cropping issues)")
ops["yrange"] = [0, Ly]
ops["xrange"] = [0, Lx]
else:
print(" Detecting valid region to exclude dead zones...")
with BinaryFile(Ly=Ly, Lx=Lx, filename=str(raw_file_path)) as f:
meanImg_full = f.sampled_mean().astype(np.float32)
# Find regions with valid data (threshold at 1% of max)
threshold = meanImg_full.max() * 0.01
valid_mask = meanImg_full > threshold
valid_rows = np.any(valid_mask, axis=1)
valid_cols = np.any(valid_mask, axis=0)
if valid_rows.sum() > 0 and valid_cols.sum() > 0:
y_indices = np.where(valid_rows)[0]
x_indices = np.where(valid_cols)[0]
yrange = [int(y_indices[0]), int(y_indices[-1] + 1)]
xrange = [int(x_indices[0]), int(x_indices[-1] + 1)]
else:
yrange = [0, Ly]
xrange = [0, Lx]
ops["yrange"] = yrange
ops["xrange"] = xrange
print(f" Valid region: yrange={yrange}, xrange={xrange}")
# Set registration outputs that detection expects
if "badframes" not in ops:
ops["badframes"] = np.zeros(n_align, dtype=bool)
if "xoff" not in ops:
ops["xoff"] = np.zeros(n_align, dtype=np.float32)
if "yoff" not in ops:
ops["yoff"] = np.zeros(n_align, dtype=np.float32)
if "corrXY" not in ops:
ops["corrXY"] = np.ones(n_align, dtype=np.float32)
# Also copy channel 2 if it exists
if use_chan2:
chan2_path = Path(chan2_file)
reg_chan2_path = Path(reg_file_chan2)
if chan2_path.exists():
if not reg_chan2_path.exists() or reg_chan2_path.stat().st_size == 0:
print(f" Copying {chan2_path.name} -> {reg_chan2_path.name}")
shutil.copy2(chan2_path, reg_chan2_path)
else:
print(f" {reg_chan2_path.name} already exists, skipping copy")
with (
BinaryFile(Ly=Ly, Lx=Lx, filename=str(reg_file), n_frames=n_align) as f_reg,
BinaryFile(Ly=Ly, Lx=Lx, filename=str(raw_file), n_frames=n_align) as f_raw,
(BinaryFile(Ly=Ly, Lx=Lx, filename=str(reg_file_chan2), n_frames=n_align) if use_chan2 else nullcontext()) as f_reg_chan2,
(BinaryFile(Ly=Ly, Lx=Lx, filename=str(chan2_file), n_frames=n_align) if use_chan2 else nullcontext()) as f_raw_chan2,
):
ops = pipeline(
f_reg=f_reg,
f_raw=f_raw,
f_reg_chan2=f_reg_chan2 if use_chan2 else None,
f_raw_chan2=f_raw_chan2 if use_chan2 else None,
run_registration=run_registration,
ops=ops,
stat=None,
)
if use_chan2:
ops["reg_file_chan2"] = str(reg_file_chan2)
np.save(ops["ops_path"], ops)
return True
[docs]
def run_plane(
input_data,
save_path: str | Path | None = None,
ops: dict | str | Path = None,
chan2_file: str | Path | None = None,
keep_raw: bool = False,
keep_reg: bool = True,
force_reg: bool = False,
force_detect: bool = False,
dff_window_size: int = None,
dff_percentile: int = 20,
dff_smooth_window: int = None,
accept_all_cells: bool = False,
cell_filters: list = None,
save_json: bool = False,
plane_name: str | None = None,
reader_kwargs: dict = None,
writer_kwargs: dict = None,
**kwargs,
) -> Path:
"""
Processes a single imaging plane using suite2p.
Handles registration, segmentation, filtering, dF/F calculation, and plotting.
Now accepts both file paths and lazy arrays.
Parameters
----------
input_data : str, Path, or lazy array
Input data. Can be a file path or an mbo_utilities lazy array.
save_path : str or Path, optional
Root directory to save the results. A subdirectory will be created based on
the input filename or `plane_name` parameter.
ops : dict, str or Path, optional
Path to or dict of user‐supplied ops.npy.
chan2_file : str, optional
Path to structural / anatomical data used for registration.
keep_raw : bool, default False
If True, do not delete the raw binary (`data_raw.bin`) after processing.
keep_reg : bool, default True
If True, keep the registered binary (`data.bin`) after processing.
force_reg : bool, default False
If True, force a new registration.
force_detect : bool, default False
If True, force ROI detection.
dff_window_size : int, optional
Frames for rolling percentile baseline. Default: auto-calculated (~10*tau*fs).
dff_percentile : int, default 20
Percentile for baseline F0.
dff_smooth_window : int, optional
Smoothing window for dF/F. Default: auto-calculated.
accept_all_cells : bool, default False
If True, mark all detected ROIs as accepted cells.
cell_filters : list[dict], optional
Filters to apply to detected ROIs (e.g. diameter, area).
Default: [{"name": "max_diameter", "min_diameter_um": 4, "max_diameter_um": 35}]
Pass [] to disable default filtering.
save_json : bool, default False
Save ops as JSON.
plane_name : str, optional
Custom name for the plane subdirectory.
reader_kwargs : dict, optional
Arguments for mbo_utilities.imread.
writer_kwargs : dict, optional
Arguments for binary writing.
**kwargs : dict
Additional arguments passed to Suite2p.
Returns
-------
Path
Path to the saved ops.npy file.
"""
from mbo_utilities import imread, imwrite
from mbo_utilities.metadata import get_metadata
from mbo_utilities.arrays import ScanImageArray
if "debug" in kwargs:
logger.setLevel(logging.DEBUG)
logger.info("Debug mode enabled.")
# Validate cell_filters (use default if None)
if cell_filters is None:
cell_filters = DEFAULT_CELL_FILTERS
# Handle input_data type
input_path = None
input_arr = None
if _is_lazy_array(input_data):
input_arr = input_data
# Try to get a filename for naming purposes, otherwise require plane_name
filenames = getattr(input_arr, "filenames", [])
if filenames:
input_path = Path(filenames[0])
elif plane_name is None:
raise ValueError("plane_name is required when input is an array without filenames.")
else:
# dummy path for internal logic compatibility
input_path = Path(f"{plane_name}.tif")
elif isinstance(input_data, (str, Path)):
input_path = Path(input_data)
else:
raise TypeError(f"input_data must be path or lazy array, got {type(input_data)}")
input_parent = input_path.parent
# Save path handling
if save_path is None:
if input_arr is not None:
raise ValueError("save_path is required when input is an array.")
# binary inputs with ops.npy are processed in-place
is_binary_input = input_path.suffix == ".bin"
binary_with_ops = is_binary_input and (input_path.parent / "ops.npy").exists()
if binary_with_ops:
save_path = input_parent
else:
save_path = input_parent
else:
save_path = Path(save_path)
save_path.mkdir(parents=True, exist_ok=True)
# Directory setup
skip_imwrite = False
is_binary_input = input_path.suffix == ".bin"
binary_with_ops = is_binary_input and (input_path.parent / "ops.npy").exists()
if binary_with_ops and (input_path.parent == save_path or save_path == input_parent):
# Processing existing binary in-place
print(f"Processing existing binary in-place: {input_path}")
plane_dir = input_path.parent
skip_imwrite = True
else:
# Create subdirectory
if plane_name is not None:
subdir_name = plane_name
else:
subdir_name = derive_tag_from_filename(input_path.name)
plane_dir = save_path / subdir_name
plane_dir.mkdir(exist_ok=True)
ops_file = plane_dir / "ops.npy"
ops_default = default_ops()
ops_user = load_ops(ops) if ops else {}
ops = {**ops_default, **ops_user, "data_path": str(input_path.resolve())}
# Normalize kwargs
reader_kwargs = reader_kwargs or {}
writer_kwargs = writer_kwargs or {}
# Handle binary writing vs loading
if skip_imwrite:
file = None
existing_ops = np.load(ops_file, allow_pickle=True).item() if ops_file.exists() else {}
metadata = {k: v for k, v in existing_ops.items() if k in ("plane", "fs", "dx", "dy", "Ly", "Lx", "nframes")}
else:
should_write = _should_write_bin(ops_file, force=force_reg)
if should_write:
if input_arr is not None:
file = input_arr
else:
file = imread(input_path, **reader_kwargs)
if isinstance(file, ScanImageArray):
if not kwargs.get("allow_raw_array", False):
# If it's a raw array (ScanImage), we usually process it via pipeline or run_volume
# but run_plane can handle it if it's already a single plane slice (3D)
pass
if hasattr(file, "metadata"):
metadata = file.metadata
else:
metadata = get_metadata(input_path)
else:
file = None
metadata = {} # Metadata loading from ops.npy handled later if file is None
# Plane number handling
if "plane" in ops:
plane = ops["plane"]
elif "plane" in metadata:
plane = metadata["plane"]
ops["plane"] = plane
else:
tag = derive_tag_from_filename(input_path)
plane = get_plane_num_from_tag(tag, fallback=ops.get("plane", None))
ops["plane"] = plane
metadata["plane"] = plane
ops["save_path"] = str(plane_dir.resolve())
# Write binary
if not skip_imwrite and file is not None:
md_combined = {**metadata, **ops}
print(f"Writing binary to {plane_dir}...")
bin_start = time.time()
imwrite(
file,
plane_dir,
ext=".bin",
metadata=md_combined,
register_z=False,
output_name="data_raw.bin",
overwrite=True,
**writer_kwargs,
)
# Record binary write
# Reload ops from disk to get Lx, Ly, and other metadata added by imwrite
if ops_file.exists():
ops = np.load(ops_file, allow_pickle=True).item()
_add_processing_step(
ops,
"binary_write",
input_files=[str(input_path)],
duration_seconds=time.time() - bin_start,
extra={"plane": plane, "shape": list(file.shape)}
)
np.save(ops_file, ops)
# Determine processing needs
needs_detect = False
if force_detect:
needs_detect = True
elif ops["roidetect"]:
stat_file = plane_dir / "stat.npy"
if stat_file.exists():
stat = np.load(stat_file, allow_pickle=True)
if stat is None or len(stat) == 0:
needs_detect = True
else:
needs_detect = False
else:
needs_detect = True
# Check registration needs
if force_reg:
needs_reg = True
elif not ops_file.exists():
needs_reg = True
else:
needs_reg = _should_register(ops_file)
# Update ops logic
if force_reg:
ops["do_registration"] = 1
elif not needs_reg:
ops["do_registration"] = 0
elif "do_registration" not in ops_user:
ops["do_registration"] = 1
if force_detect:
ops["roidetect"] = 1
elif "roidetect" not in ops_user:
ops["roidetect"] = int(needs_detect)
# Channel 2 handling
if chan2_file is not None:
chan2_path = Path(chan2_file)
if chan2_path.exists():
chan2_data = imread(chan2_path, **reader_kwargs)
chan2_md = getattr(chan2_data, "metadata", {})
imwrite(chan2_data, plane_dir, ext=".bin", metadata=chan2_md, register_z=False, structural=True)
ops["chan2_file"] = str((plane_dir / "data_chan2.bin").resolve())
ops["nframes_chan2"] = chan2_data.shape[0] if hasattr(chan2_data, "shape") else 0
ops["nchannels"] = 2
ops["align_by_chan"] = 2
# Run Suite2p
try:
s2p_start = time.time()
processed = run_plane_bin(ops) # This updates ops in-place and saves it
if processed:
updated_ops = load_ops(ops_file)
_add_processing_step(
updated_ops,
"suite2p_pipeline",
duration_seconds=time.time() - s2p_start,
extra={
"do_registration": updated_ops.get("do_registration", 1),
"n_cells": len(np.load(plane_dir / "stat.npy", allow_pickle=True)) if (plane_dir / "stat.npy").exists() else 0
}
)
np.save(ops_file, updated_ops)
except Exception as e:
print(f"Error in run_plane_bin: {e}")
traceback.print_exc()
# Re-raise so caller knows processing failed
raise
if not processed:
return ops_file
# --- Post-Processing ---
# 1. Accept All Cells
if accept_all_cells:
iscell_file = plane_dir / "iscell.npy"
if iscell_file.exists():
iscell = np.load(iscell_file, allow_pickle=True)
np.save(plane_dir / "iscell_suite2p.npy", iscell)
iscell[:, 0] = 1
np.save(iscell_file, iscell)
print(" Marked all ROIs as accepted.")
# 2. Cell Filtering
if cell_filters:
print(f" Applying cell filters: {[f['name'] for f in cell_filters]}")
filter_start = time.time()
try:
iscell_original = np.load(plane_dir / "iscell.npy", allow_pickle=True)
iscell_filtered, removed_mask, filter_results = apply_filters(
plane_dir=plane_dir,
filters=cell_filters,
save=True,
)
updated_ops = load_ops(ops_file)
_add_processing_step(
updated_ops,
"cell_filtering",
duration_seconds=time.time() - filter_start,
extra={"n_removed": int(removed_mask.sum())}
)
# convert filter_results list to dict keyed by filter name
filter_metadata = {}
for r in filter_results:
name = r["name"]
config = r.get("config", {})
info = r.get("info", {})
removed = r.get("removed_mask", np.zeros(0, dtype=bool))
# build params from config (user-specified) or info (computed)
params = {}
for key in ["min_diameter_um", "max_diameter_um", "min_diameter_px", "max_diameter_px",
"min_area_px", "max_area_px", "min_mult", "max_mult", "max_ratio"]:
if key in config and config[key] is not None:
val = config[key]
params[key] = round(val, 1) if isinstance(val, float) else val
if not params:
for key in ["min_px", "max_px", "min_ratio", "max_ratio", "lower_px", "upper_px"]:
if key in info and info[key] is not None:
params[key] = round(info[key], 1)
filter_metadata[name] = {
"params": params,
"n_rejected": int(removed.sum()),
}
updated_ops["filter_metadata"] = filter_metadata
np.save(ops_file, updated_ops)
# Plots
try:
fig = plot_filtered_cells(
plane_dir,
iscell_original,
iscell_filtered,
save_path=plane_dir / "13_filtered_cells.png"
)
import matplotlib.pyplot as plt
plt.close(fig)
plot_filter_exclusions(plane_dir, iscell_filtered, filter_results, save_dir=plane_dir)
plot_cell_filter_summary(plane_dir, save_path=plane_dir / "15_filter_summary.png")
except Exception as e:
print(f" Warning: Filter plots failed: {e}")
except Exception as e:
print(f" Warning: Cell filtering failed: {e}")
# 3. dF/F Calculation
F_file = plane_dir / "F.npy"
Fneu_file = plane_dir / "Fneu.npy"
if F_file.exists() and Fneu_file.exists():
print(" Computing dF/F...")
dff_start = time.time()
F = np.load(F_file)
Fneu = np.load(Fneu_file)
F_corr = F - 0.7 * Fneu # Fixed neucoeff for now, could be parameter
current_ops = load_ops(ops_file)
dff = dff_rolling_percentile(
F_corr,
window_size=dff_window_size,
percentile=dff_percentile,
smooth_window=dff_smooth_window,
fs=current_ops.get("fs", 30.0),
tau=current_ops.get("tau", 1.0)
)
np.save(plane_dir / "dff.npy", dff)
_add_processing_step(
current_ops,
"dff_calculation",
duration_seconds=time.time() - dff_start,
extra={"percentile": dff_percentile}
)
np.save(ops_file, current_ops)
# 4. Plots and Cleanup
try:
plot_zplane_figures(
plane_dir,
dff_percentile=dff_percentile,
dff_window_size=dff_window_size,
dff_smooth_window=dff_smooth_window
)
except Exception as e:
print(f" Warning: Plot generation failed: {e}")
if save_json:
ops_to_json(ops_file)
if not keep_raw:
(plane_dir / "data_raw.bin").unlink(missing_ok=True)
if not keep_reg:
(plane_dir / "data.bin").unlink(missing_ok=True)
# PC metrics
save_pc_panels_and_metrics(ops_file, plane_dir / "pc_metrics")
return ops_file
def grid_search(*args, **kwargs):
"""Run a grid search over Suite2p parameters. See lbm_suite2p_python.grid_search module."""
from lbm_suite2p_python.grid_search import grid_search as _grid_search
return _grid_search(*args, **kwargs)