import logging
import os
import traceback
from contextlib import nullcontext
from itertools import product
from pathlib import Path
import copy
import numpy as np
import suite2p
from suite2p.io.binary import BinaryFile
from lbm_suite2p_python.utils import dff_rolling_percentile
import mbo_utilities as mbo # noqa
logger = mbo.log.get("run_lsp")
from lbm_suite2p_python.zplane import (
plot_traces,
plot_projection,
plot_noise_distribution,
load_planar_results,
load_ops,
suite2p_roi_overlay, plot_traces_noise
)
from . import dff_shot_noise
from .volume import (
plot_execution_time,
plot_volume_signal,
plot_volume_neuron_counts,
get_volume_stats,
save_images_to_movie,
)
if mbo.is_running_jupyter():
from tqdm.notebook import tqdm
else:
from tqdm import tqdm
try:
from rastermap import Rastermap
HAS_RASTERMAP = True
except ImportError:
Rastermap = None
utils = None
HAS_RASTERMAP = False
if HAS_RASTERMAP:
from lbm_suite2p_python.zplane import plot_rastermap
from pathlib import Path
PIPELINE_TAGS = ("plane", "roi", "z", "plane_", "roi_", "z_")
def normalize_folder(path):
"""
Derive a folder tag from a filename based on “planeN”, “roiN”, or "tagN" patterns.
Parameters
----------
path : str or pathlib.Path
File path or name whose stem will be parsed.
Returns
-------
str
If the stem starts with “plane”, “roi”, or “res” followed by an integer,
returns that tag plus the integer (e.g. “plane3”, “roi7”, “res2”).
Otherwise returns the original stem unchanged.
Examples
--------
>>> normalize_folder("plane_01.tif")
'plane1'
>>> normalize_folder("plane2.bin")
'plane2'
>>> normalize_folder("roi5.raw")
'roi5'
>>> normalize_folder("ROI_10.dat")
'roi10'
>>> normalize_folder("res-3.h5")
'res3'
>>> normalize_folder("assembled_data_1.tiff")
'assembled_data_1'
>>> normalize_folder("file_12.tif")
'file_12'
"""
name = Path(path).stem
for tag in PIPELINE_TAGS:
low = name.lower()
if low.startswith(tag):
suffix = name[len(tag) :]
if suffix and (suffix[0] in ("_", "-")):
suffix = suffix[1:]
if suffix.isdigit():
return f"{tag}{int(suffix)}"
return name
def get_missing_ops_keys(ops: dict) -> list[str]:
required = ["Ly", "Lx", "fs", "nframes", "raw_file", "input_format"]
return [k for k in required if k not in ops or ops[k] is None]
[docs]
def run_volume(
input_files: list,
save_path: str | Path = None,
ops: dict | str | Path = None,
keep_reg: bool = True,
keep_raw: bool = True,
force_reg: bool = False,
force_detect: bool = False,
replot: bool = False,
):
"""
Processes a full volumetric imaging dataset using Suite2p, handling plane-wise registration,
segmentation, plotting, and aggregation of volumetric statistics and visualizations.
Parameters
----------
input_files : list of str or Path
List of TIFF file paths, each representing a single imaging plane.
save_path : str or Path, optional
Base directory to save all outputs.
If none, will create a "volume" directory in the parent of the first input file.
ops : dict or list, optional
Dictionary of Suite2p parameters to use for each imaging plane.
save_path : str, optional
Subdirectory name within `save_path` for saving results (default: None).
keep_raw : bool, default false
if true, do not delete the raw binary (`data_raw.bin`) after processing.
keep_reg : bool, default false
if true, do not delete the registered binary (`data.bin`) after processing.
force_reg : bool, default false
if true, force a new registration even if existing shifts are found in ops.npy.
force_detect : bool, default false
if true, force roi detection even if an existing stat.npy is present.
replot : bool, optional
If True, regenerate all summary plots even if they already exist (default: False).
Returns
-------
list of str
List of paths to `ops.npy` files for each plane.
Raises
------
Exception
If volumetric summary statistics or any visualization fails to generate.
Example
-------
>> input_files = mbo.get_files(assembled_path, str_contains='tif', max_depth=3)
>> ops = mbo.params_from_metadata(mbo.get_metadata(input_files[0]), suite2p.default_ops())
Run volume
>> output_ops_list = lsp.run_volume(ops, input_files, save_path)
Notes
-----
At the root of `save_path` will be a folder for each z-plane with all suite2p results, as well as
volumetric outputs at the base of this folder.
Each z-plane folder contains:
- Registration, Segmentation and Extraction results (ops, spks, iscell)
- Summary statistics: execution time, signal strength, acceptance rates
- Optional rastermap model for visualization of activity across the volume
Each save_path root contains:
- Accepted/Rejected histogram, neuron-count x z-plane (acc_rej_bar.png)
- Execution time for each step in each z-plane (execution_time.png)
- Mean/Max images, with and without segmentation masks, in GIF/MP4
- Traces animation over time and neurons
- Optional rastermap clustering results
"""
if save_path is None:
save_path = Path(input_files[0]).parent
all_ops = []
for file in tqdm(input_files, desc="Processing Planes"):
print(f"Processing {file} ---------------")
output_ops = run_plane(
input_path=file,
save_path=str(save_path),
ops=ops,
keep_reg=keep_reg,
keep_raw=keep_raw,
force_reg=force_reg,
force_detect=force_detect,
replot=replot,
)
all_ops.append(output_ops)
# batch was ran, lets accumulate data
if isinstance(all_ops[0], dict):
all_ops = [ops["ops_path"] for ops in all_ops]
try:
zstats_file = get_volume_stats(all_ops, overwrite=True)
all_segs = mbo.get_files(save_path, "segmentation.png", 4)
all_means = mbo.get_files(save_path, "mean_image.png", 4)
all_maxs = mbo.get_files(save_path, "max_projection_image.png", 4)
all_traces = mbo.get_files(save_path, "traces.png", 4)
save_images_to_movie(
all_segs, os.path.join(save_path, "segmentation_volume.mp4")
)
save_images_to_movie(
all_means, os.path.join(save_path, "mean_images_volume.mp4")
)
save_images_to_movie(all_maxs, os.path.join(save_path, "max_images_volume.mp4"))
save_images_to_movie(all_traces, os.path.join(save_path, "traces_volume.mp4"))
plot_volume_neuron_counts(zstats_file, save_path)
plot_volume_signal(
zstats_file, os.path.join(save_path, "mean_volume_signal.png")
)
plot_execution_time(zstats_file, os.path.join(save_path, "execution_time.png"))
res_z = [
load_planar_results(ops_path, z_plane=i)
for i, ops_path in enumerate(all_ops)
]
all_spks = np.concatenate([res["spks"] for res in res_z], axis=0)
print(type(all_spks))
# all_iscell = np.stack([res['iscell'] for res in res_z], axis=-1)
if HAS_RASTERMAP:
model = Rastermap(
n_clusters=100,
n_PCs=100,
locality=0.75,
time_lag_window=15,
).fit(all_spks)
np.save(os.path.join(save_path, "model.npy"), model)
title_kwargs = {"fontsize": 8, "y": 0.95}
plot_rastermap(
all_spks,
model,
neuron_bin_size=20,
xmax=min(2000, all_spks.shape[1]),
save_path=os.path.join(save_path, "rastermap.png"),
title_kwargs=title_kwargs,
title="Rastermap Sorted Activity",
)
else:
print("No rastermap is available.")
except Exception:
print("Volume statistics failed.")
print("Traceback: ", traceback.format_exc())
print(f"Processing completed for {len(input_files)} files.")
return all_ops
def should_write_ops(ops_path, ops, force=False):
if force or not ops_path.exists():
return True
try:
existing_ops = np.load(ops_path, allow_pickle=True).item()
has_registration = "xoff" in existing_ops and "meanImg" in existing_ops
has_detection = "stat" in ops or (ops_path.parent / "stat.npy").exists()
return not (has_registration and has_detection)
except Exception:
return True
def run_plane_bin(ops):
ops = load_ops(ops)
if "nframes" in ops and "n_frames" not in ops:
ops["n_frames"] = ops["nframes"]
if "n_frames" not in ops:
raise KeyError("run_plane_bin: missing frame count (nframes or n_frames)")
n_frames = ops["n_frames"]
Ly, Lx = ops["Ly"], ops["Lx"]
prior_ops = {}
if Path(ops["ops_path"]).exists():
prior_ops = np.load(ops["ops_path"], allow_pickle=True).item()
with (
suite2p.io.BinaryFile(
Ly=Ly, Lx=Lx, filename=ops["reg_file"], n_frames=n_frames
) as f_reg,
(
suite2p.io.BinaryFile(
Ly=Ly, Lx=Lx, filename=ops["raw_file"], n_frames=n_frames
)
if "raw_file" in ops and ops["raw_file"] is not None
else nullcontext()
) as f_raw,
):
ops = suite2p.pipeline(
f_reg, f_raw, None, None, ops["do_registration"], ops, stat=None
)
# merge in any non-conflicting prior fields
merged_ops = {**ops, **{k: v for k, v in prior_ops.items() if k not in ops}}
np.save(ops["ops_path"], merged_ops)
return merged_ops
[docs]
def run_plane(
input_path: str | Path,
save_path: str | Path | None = None,
ops: dict | str | Path = None,
keep_raw: bool = False,
keep_reg: bool = True,
force_reg: bool = False,
force_detect: bool = False,
dff_window_size: int = 500,
dff_percentile: int = 20,
**kwargs,
):
"""
Processes a single imaging plane using suite2p, handling registration, segmentation,
and plotting of results.
Parameters
----------
input_path : str or Path
Full path to the file to process, with the file extension.
save_path : str or Path, optional
Directory to save the results.
ops : dict, str or Path, optional
Path to or dict of user‐supplied ops.npy. If given, it overrides any existing or generated ops.
keep_raw : bool, default false
if true, do not delete the raw binary (`data_raw.bin`) after processing.
keep_reg : bool, default false
if true, do not delete the registered binary (`data.bin`) after processing.
force_reg : bool, default false
if true, force a new registration even if existing shifts are found in ops.npy.
force_detect : bool, default false
if true, force roi detection even if an existing stat.npy is present.
dff_window_size : int, default 10
Size of the window for calculating dF/F traces.
dff_percentile : int, default 8
Percentile to use for baseline F₀ estimation in dF/F calculation.
**kwargs : dict, optional
Returns
-------
dict
Processed ops dictionary containing results.
Raises
------
FileNotFoundError
If `input_tiff` does not exist.
TypeError
If `save_folder` is not a string.
Exception
If plotting functions fail.
Notes
-----
- ops supplied to the function via `ops_file` will take precendence over previously saved ops.npy files.
Example
-------
>> import mbo_utilities as mbo
>> import lbm_suite2p_python as lsp
Get a list of z-planes in Txy format
>> input_files = mbo.get_files(assembled_path, str_contains='tif', max_depth=3)
>> metadata = mbo.get_metadata(input_files[0])
>> ops = suite2p.default_ops()
Automatically fill in metadata needed for processing (frame rate, pixel resolution, etc..)
>> mbo_ops = mbo.params_from_metadata(metadata, ops) # handles framerate, Lx/Ly, etc
Run a single z-plane through suite2p, keeping raw and registered files.
>> output_ops = lsp.run_plane(input_files[0], save_path="D://data//outputs", keep_raw=True, keep_registered=True, force_reg=True, force_detect=True)
"""
if "debug" in kwargs:
logger.setLevel(logging.DEBUG)
logger.info("Debug mode enabled.")
assert isinstance(
input_path, (Path, str)
), f"input_path should be a pathlib.Path or string, not: {type(input_path)}"
input_path = Path(input_path)
if not input_path.is_file():
raise ValueError(f"Input file does not exist: {input_path}")
input_parent = input_path.parent
assert isinstance(
save_path, (Path, str, type(None))
), f"save_path should be a pathlib.Path or string, not: {type(save_path)}"
if save_path is None:
logger.debug(f"save_path is None, using parent of input file: {input_parent}")
save_path = input_parent
else:
save_path = Path(save_path)
if not save_path.parent.is_dir():
raise ValueError(
f"save_path does not have a valid parent directory: {save_path}"
)
save_path.mkdir(exist_ok=True)
ops_default = suite2p.default_ops()
ops_user = load_ops(ops) if ops else {}
ops = {**ops_default, **ops_user}
file = mbo.imread(input_path)
metadata = file.metadata
if "plane" in ops:
plane = ops["plane"]
metadata["plane"] = plane
elif "plane" in metadata:
plane = metadata["plane"]
ops["plane"] = plane
else:
# get the plane from the filename
plane = mbo.get_plane_from_filename(input_path, ops.get("plane", None))
ops["plane"] = plane
metadata["plane"] = plane
plane_dir = save_path
needs_detect = force_detect or not (plane_dir / "stat.npy").exists()
ops_file = plane_dir / "ops.npy"
reg_data_file = plane_dir / "data.bin"
reg_data_file2 = plane_dir / "reg_tif"
if should_write_ops(ops_file, ops, force=kwargs.get("force_save", False)):
mbo.imwrite(file, plane_dir, ext=".bin", metadata=metadata)
else:
print(f"Skipping ops.npy save: {ops_file.name} already contains results.")
ops_outpath = (
np.load(ops_file, allow_pickle=True).item()
if (plane_dir / "ops.npy").exists()
else {}
)
exists = False
if reg_data_file.exists():
exists = True
if reg_data_file2.exists():
exists = True
if force_reg:
needs_reg = True
else:
# if either reg data file exists, we assume registration is done
needs_reg = not exists
ops = {
**ops,
**ops_outpath, # merge any existing ops
"ops_path": str(ops_file),
"do_registration": int(needs_reg),
"roidetect": int(needs_detect),
"save_path": str(plane_dir),
"raw_file": str((plane_dir / "data_raw.bin").resolve()),
"reg_file": str((plane_dir / "data.bin").resolve())
}
if "nframes" not in ops and "shape" in ops.get("metadata", {}):
ops["nframes"] = ops["metadata"]["shape"][0]
ops = run_plane_bin(ops)
output_ops = load_ops(ops_file)
# cleanup ourselves
if not keep_raw:
(plane_dir / "data_raw.bin").unlink(missing_ok=True)
if not keep_reg:
(plane_dir / "data.bin").unlink(missing_ok=True)
expected_files = {
"ops": plane_dir / "ops.npy",
"stat": plane_dir / "stat.npy",
"iscell": plane_dir / "iscell.npy",
"registration": plane_dir / "registration.png",
"segmentation": plane_dir / "segmentation.png",
"segmentation_traces": plane_dir / "segmentation_match_traces.png",
"max_proj": plane_dir / "max_projection_image.png",
"meanImg": plane_dir / "mean_image.png",
"meanImgE": plane_dir / "mean_image_enhanced.png",
"traces": plane_dir / "traces.png",
"traces_noise": plane_dir / "traces_noise.png",
"noise": plane_dir / "shot_noise_distrubution.png",
"model": plane_dir / "model.npy",
"rastermap": plane_dir / "rastermap.png",
}
try:
if not all(
expected_files[key].is_file()
for key in ["registration", "segmentation", "traces"]
):
print(f"Generating missing plots for {plane_dir.stem}...")
def safe_delete(file_path):
if file_path.exists():
try:
file_path.unlink()
except PermissionError:
print(
f"Error: Cannot delete {file_path}. Ensure it is not open elsewhere."
)
for key in ["registration", "segmentation", "traces"]:
safe_delete(expected_files[key])
model = None
colors = None
if expected_files["stat"].is_file():
res = load_planar_results(output_ops)
iscell = res["iscell"]
spks = res["spks"][iscell]
n_neurons = spks.shape[0]
if iscell.ndim == 2:
iscell = iscell[:, 0]
stat = res["stat"]
f = res["F"][iscell]
if f.shape[0] < 10:
print(f"Too few cells to plot traces for {plane_dir.stem}.")
return output_ops
if expected_files["model"].is_file():
print("Loading cached rastermap model...")
model = np.load(expected_files["model"], allow_pickle=True).item()
else:
if n_neurons < 200:
params = {
"n_clusters": None,
"n_PCs": min(64, n_neurons - 1),
"locality": 0.1,
"time_lag_window": 15,
"grid_upsample": 0,
}
else:
params = {
"n_clusters": 100,
"n_PCs": 128,
"locality": 0.0,
"grid_upsample": 10,
}
print("Computing rastermap model...")
model = Rastermap(**params).fit(spks)
np.save(expected_files["model"], model)
plot_rastermap(
spks,
model,
neuron_bin_size=0,
save_path=expected_files["rastermap"],
title_kwargs={"fontsize": 8, "y": 0.95},
title="Rastermap Sorted Activity",
)
if model is not None:
print("Sorting neurons by rastermap model...")
isort = np.where(iscell == 1)[0][model.isort]
output_ops["isort"] = isort # now global to stat, not local
percentile = output_ops.get("dff_percentile", dff_percentile)
win_size = output_ops.get("dff_window_size", dff_window_size)
dff = dff_rolling_percentile(
f,
percentile=percentile,
window_size=win_size
) * 100 # convert to percentage
dff_noise = dff_shot_noise(dff, output_ops["fs"])
if n_neurons < 10:
print(f"Too few cells to plot traces for {plane_dir.stem}.")
else:
print("Plotting traces...")
fig, colors = plot_traces(
dff,
save_path=expected_files["traces"],
num_neurons=n_neurons,
signal_units="dffp",
return_color=True
)
plot_traces_noise(
dff_noise,
ncells=n_neurons,
savepath=expected_files["traces_noise"]
)
print("Plotting noise distribution...")
plot_noise_distribution(dff_noise, save_path=expected_files["noise"])
suite2p_roi_overlay(
output_ops,
stat,
iscell,
"max_proj",
plot_indices=None,
savepath=expected_files["segmentation"],
)
cell_indices = output_ops["isort"][:n_neurons]
suite2p_roi_overlay(
output_ops,
stat,
iscell,
"max_proj",
plot_indices=cell_indices,
savepath=expected_files["segmentation_traces"],
color_mode="colormap",
colors=colors if colors is not None else None,
)
fig_label = kwargs.get("fig_label", plane_dir.stem)
for key in ["meanImg", "max_proj", "meanImgE"]:
if key not in output_ops:
continue
plot_projection(
output_ops,
expected_files[key],
fig_label=fig_label,
display_masks=False,
add_scalebar=True,
proj=key,
)
print("Plots generated successfully.")
except Exception:
traceback.print_exc()
return output_ops
def run_grid_search(
base_ops: dict,
grid_search_dict: dict,
input_file: Path | str,
save_root: Path | str,
):
"""
Run a grid search over all combinations of the input suite2p parameters.
Parameters
----------
base_ops : dict
Dictionary of default Suite2p ops to start from. Each parameter combination will override values in this dictionary.
grid_search_dict : dict
Dictionary mapping parameter names (str) to a list of values to grid search.
Each combination of values across parameters will be run once.
input_file : str or Path
Path to the input data file, currently only supports tiff.
save_root : str or Path
Root directory where each parameter combination's output will be saved.
A subdirectory will be created for each run using a short parameter tag.
Notes
-----
- Subfolder names for each parameter are abbreviated to 3-character keys and truncated/rounded values.
Examples
--------
>>> import lbm_suite2p_python as lsp
>>> import suite2p
>>> base_ops = suite2p.default_ops()
>>> base_ops["anatomical_only"] = 3
>>> base_ops["diameter"] = 6
>>> lsp.run_grid_search(
... base_ops,
... {"threshold_scaling": [1.0, 1.2], "tau": [0.1, 0.15]},
... input_file="/mnt/data/assembled_plane_03.tiff",
... save_root="/mnt/grid_search/"
... )
This will create the following output directory structure::
/mnt/data/grid_search/
├── thr1.00_tau0.10/
│ └── suite2p output for threshold_scaling=1.0, tau=0.1
├── thr1.00_tau0.15/
├── thr1.20_tau0.10/
└── thr1.20_tau0.15/
See Also
--------
[suite2p parameters](http://suite2p.readthedocs.io/en/latest/parameters.html)
"""
save_root = Path(save_root)
save_root.mkdir(exist_ok=True)
print(f"Saving grid-search in {save_root}")
param_names = list(grid_search_dict.keys())
param_values = list(grid_search_dict.values())
param_combos = list(product(*param_values))
for combo in param_combos:
ops = copy.deepcopy(base_ops)
combo_dict = dict(zip(param_names, combo))
ops.update(combo_dict)
tag_parts = [
f"{k[:3]}{v:.2f}" if isinstance(v, float) else f"{k[:3]}{v}"
for k, v in combo_dict.items()
]
tag = "_".join(tag_parts)
print(f"Running grid search in: {save_root.joinpath(tag)}")
save_path = save_root / tag
run_plane(
input_path=input_file,
save_path=save_path,
ops=ops,
keep_reg=True,
keep_raw=True,
force_reg=True,
force_detect=True,
)