import os
import re
import traceback
from pathlib import Path
from contextlib import nullcontext
from icecream import ic
import numpy as np
from scipy.ndimage import uniform_filter1d
import tifffile
import suite2p
from lbm_suite2p_python.utils import dff_percentile
import mbo_utilities as mbo # noqa
try:
from suite2p.io.binary import BinaryFile
except ImportError:
BinaryFile = None
from lbm_suite2p_python.zplane import (
plot_traces,
plot_projection,
plot_noise_distribution,
load_planar_results,
load_ops,
)
from . import dff_shot_noise
from .volume import (
plot_execution_time,
plot_volume_signal,
plot_volume_neuron_counts,
get_volume_stats,
save_images_to_movie,
)
if mbo.is_running_jupyter():
from tqdm.notebook import tqdm
else:
from tqdm import tqdm
try:
from rastermap import Rastermap
HAS_RASTERMAP = True
except ImportError:
Rastermap = None
utils = None
HAS_RASTERMAP = False
if HAS_RASTERMAP:
from lbm_suite2p_python.zplane import plot_rastermap
def _normalize_plane_folder(path):
name = Path(path).stem
m = re.search(r"plane[_-](\d+)", name, re.IGNORECASE)
if not m:
raise ValueError(f"invalid plane name: {name}")
return f"plane{int(m.group(1))}"
def _write_raw_binary(tiff_path, out_path):
data = tifffile.memmap(tiff_path)
out_path = Path(out_path).with_suffix(".bin")
if data.ndim != 3:
raise ValueError("Must be assembled, 3D (T, Y, X)")
nframes, x, y = data.shape
bf = BinaryFile(
Ly=y, Lx=x, filename=str(Path(out_path)), n_frames=nframes, dtype=np.int16
)
bf[:] = data
bf.close()
def _build_ops(metadata: dict, raw_bin: Path) -> dict:
nt, Ly, Lx = metadata["shape"]
dx, dy = metadata.get("pixel_resolution", [2, 2])
return {
"Ly": Ly,
"Lx": Lx,
"fs": round(metadata["frame_rate"], 2),
"nframes": nt,
"raw_file": str(raw_bin),
# "reg_file": str(raw_bin),
"dx": dx,
"dy": dy,
"metadata": metadata,
"input_format": "binary",
"do_regmetrics": True,
"delete_bin": False,
"move_bin": False,
}
[docs]
def run_volume(
input_files: list,
save_path: str | Path=None,
ops: dict | str | Path=None,
keep_reg: bool = True,
keep_raw: bool = True,
force_reg: bool = False,
force_detect: bool = False,
replot: bool=False
):
"""
Processes a full volumetric imaging dataset using Suite2p, handling plane-wise registration,
segmentation, plotting, and aggregation of volumetric statistics and visualizations.
Parameters
----------
input_files : list of str or Path
List of TIFF file paths, each representing a single imaging plane.
save_path : str or Path, optional
Base directory to save all outputs.
If none, will create a "volume" directory in the parent of the first input file.
ops : dict or list, optional
Dictionary of Suite2p parameters to use for each imaging plane.
save_path : str, optional
Subdirectory name within `save_path` for saving results (default: None).
keep_raw : bool, default false
if true, do not delete the raw binary (`data_raw.bin`) after processing.
keep_reg : bool, default false
if true, do not delete the registered binary (`data.bin`) after processing.
force_reg : bool, default false
if true, force a new registration even if existing shifts are found in ops.npy.
force_detect : bool, default false
if true, force roi detection even if an existing stat.npy is present.
replot : bool, optional
If True, regenerate all summary plots even if they already exist (default: False).
Returns
-------
list of str
List of paths to `ops.npy` files for each plane.
Raises
------
Exception
If volumetric summary statistics or any visualization fails to generate.
Example
-------
>> input_files = mbo.get_files(assembled_path, str_contains='tif', max_depth=3)
>> ops = mbo.params_from_metadata(mbo.get_metadata(input_files[0]), suite2p.default_ops())
Run volume
>> output_ops_list = lsp.run_volume(ops, input_files, save_path)
Notes
-----
At the root of `save_path` will be a folder for each z-plane with all suite2p results, as well as
volumetric outputs at the base of this folder.
Each z-plane folder contains:
- Registration, Segmentation and Extraction results (ops, spks, iscell)
- Summary statistics: execution time, signal strength, acceptance rates
- Optional rastermap model for visualization of activity across the volume
Each save_path root contains:
- Accepted/Rejected histogram, neuron-count x z-plane (acc_rej_bar.png)
- Execution time for each step in each z-plane (execution_time.png)
- Mean/Max images, with and without segmentation masks, in GIF/MP4
- Traces animation over time and neurons
- Optional rastermap clustering results
"""
if save_path is None:
save_path = Path(input_files[0]).parent
all_ops = []
for file in tqdm(input_files, desc="Processing Planes"):
print(f"Processing {file} ---------------")
output_ops = run_plane(
input_path=file,
save_path=str(save_path),
ops=ops,
keep_reg=keep_reg,
keep_raw=keep_raw,
force_reg=force_reg,
force_detect=force_detect,
replot=replot,
)
all_ops.append(output_ops)
# batch was ran, lets accumulate data
if isinstance(all_ops[0], dict):
all_ops = [ops["ops_path"] for ops in all_ops]
try:
zstats_file = get_volume_stats(all_ops, overwrite=True)
all_segs = mbo.get_files(save_path, "segmentation.png", 4)
all_means = mbo.get_files(save_path, "mean_image.png", 4)
all_maxs = mbo.get_files(save_path, "max_projection_image.png", 4)
all_traces = mbo.get_files(save_path, "traces.png", 4)
save_images_to_movie(
all_segs, os.path.join(save_path, "segmentation_volume.mp4")
)
save_images_to_movie(
all_means, os.path.join(save_path, "mean_images_volume.mp4")
)
save_images_to_movie(all_maxs, os.path.join(save_path, "max_images_volume.mp4"))
save_images_to_movie(all_traces, os.path.join(save_path, "traces_volume.mp4"))
plot_volume_neuron_counts(zstats_file, save_path)
plot_volume_signal(
zstats_file, os.path.join(save_path, "mean_volume_signal.png")
)
plot_execution_time(zstats_file, os.path.join(save_path, "execution_time.png"))
res_z = [
load_planar_results(ops_path, z_plane=i)
for i, ops_path in enumerate(all_ops)
]
all_spks = np.concatenate([res["spks"] for res in res_z], axis=0)
print(type(all_spks))
# all_iscell = np.stack([res['iscell'] for res in res_z], axis=-1)
if HAS_RASTERMAP:
model = Rastermap(
n_clusters=100,
n_PCs=100,
locality=0.75,
time_lag_window=15,
).fit(all_spks)
np.save(os.path.join(save_path, "model.npy"), model)
title_kwargs = {"fontsize": 8, "y": 0.95}
plot_rastermap(
all_spks,
model,
neuron_bin_size=20,
xmax=min(2000, all_spks.shape[1]),
save_path=os.path.join(save_path, "rastermap.png"),
title_kwargs=title_kwargs,
title="Rastermap Sorted Activity",
)
else:
print("No rastermap is available.")
except Exception:
print("Volume statistics failed.")
print("Traceback: ", traceback.format_exc())
print(f"Processing completed for {len(input_files)} files.")
return all_ops
def run_plane_bin(plane_dir):
plane_dir = Path(plane_dir)
ops_path = plane_dir / "ops.npy"
if ops_path.exists():
_ = ic(f"Loading ops from existing file: {ops_path}")
ops = load_ops(str(ops_path))
else:
raise ValueError(f"Invalid ops path: {ops_path}")
# ops.update(input_format="binary", delete_bin=False, move_bin=False)
if "nframes" in ops and "n_frames" not in ops:
ops["n_frames"] = ops["nframes"]
if "n_frames" not in ops:
raise KeyError("run_plane_bin: missing frame count (nframes or n_frames)")
n_frames = ops["n_frames"]
Ly, Lx = ops["Ly"], ops["Lx"]
ops["raw_file"] = str((plane_dir / "data_raw.bin").resolve())
ops["reg_file"] = str((plane_dir / "data.bin").resolve())
with suite2p.io.BinaryFile(Ly=Ly, Lx=Lx, filename=ops["reg_file"], n_frames=n_frames) as f_reg, \
suite2p.io.BinaryFile(Ly=Ly, Lx=Lx, filename=ops["raw_file"], n_frames=n_frames) \
if "raw_file" in ops and ops["raw_file"] is not None else nullcontext() as f_raw:
ops = suite2p.pipeline(f_reg, f_raw, None, None, True, ops, stat=None)
return ops
[docs]
def run_plane(
input_path: str | Path,
save_path: str | Path | None = None,
ops: dict | str | Path = None,
keep_raw: bool = False,
keep_reg: bool = True,
force_reg: bool = False,
force_detect: bool = False,
**kwargs,
):
"""
Processes a single imaging plane using suite2p, handling registration, segmentation,
and plotting of results.
Parameters
----------
input_path : str or Path
Full path to the file to process, with the file extension.
save_path : str or Path, optional
Directory to save the results.
ops : dict, str or Path, optional
Path to or dict of user‐supplied ops.npy. If given, it overrides any existing or generated ops.
keep_raw : bool, default false
if true, do not delete the raw binary (`data_raw.bin`) after processing.
keep_reg : bool, default false
if true, do not delete the registered binary (`data.bin`) after processing.
force_reg : bool, default false
if true, force a new registration even if existing shifts are found in ops.npy.
force_detect : bool, default false
if true, force roi detection even if an existing stat.npy is present.
Returns
-------
dict
Processed ops dictionary containing results.
Raises
------
FileNotFoundError
If `input_tiff` does not exist.
TypeError
If `save_folder` is not a string.
Exception
If plotting functions fail.
Notes
-----
- ops supplied to the function via `ops_file` will take precendence over previously saved ops.npy files.
Example
-------
>> import mbo_utilities as mbo
>> import lbm_suite2p_python as lsp
Get a list of z-planes in Txy format
>> input_files = mbo.get_files(assembled_path, str_contains='tif', max_depth=3)
>> metadata = mbo.get_metadata(input_files[0])
>> ops = suite2p.default_ops()
Automatically fill in metadata needed for processing (frame rate, pixel resolution, etc..)
>> mbo_ops = mbo.params_from_metadata(metadata, ops) # handles framerate, Lx/Ly, etc
Run a single z-plane through suite2p, keeping raw and registered files.
>> output_ops = lsp.run_plane(input_files[0], save_path="D://data//outputs", keep_raw=True, keep_registered=True, force_reg=True, force_detect=True)
"""
if isinstance(input_path, list):
raise ValueError(
f"input_path should be a pathlib.Path or string, not: {type(input_path)}"
)
if "debug" in kwargs:
ic.enable()
else:
ic.disable()
p = Path(input_path)
if p.is_dir():
raise ValueError(f"Input path must be a file, not a directory: {p}")
save_root = Path(save_path) if save_path is not None else p.parent
save_root.mkdir(exist_ok=True)
ops0 = suite2p.default_ops()
if p.suffix.lower() in (".tif", ".tiff"):
metadata = mbo.get_metadata(p)
folder = _normalize_plane_folder(p)
plane_dir = save_root / folder
plane_dir.mkdir(exist_ok=True)
raw_bin = plane_dir / "data_raw.bin"
if not raw_bin.exists() or force_reg:
# if the raw binary does not exist, or we are forcing registration, write it
print(f"Writing raw binary to {raw_bin}")
_write_raw_binary(p, raw_bin)
ops1 = _build_ops(metadata, raw_bin)
ops0 = {**ops0, **ops1}
elif p.suffix.lower() in (".bin", "bin"):
plane_dir = p.parent
else:
raise ValueError(
f"Unsupported file type: {p.suffix}. Only .tif/.tiff or .bin files are supported."
)
ops_path = plane_dir / "ops.npy"
saved_ops = load_ops(ops_path) if ops_path.exists() else {}
user_ops = load_ops(ops) if ops else {}
print(f"Applying user ops: {user_ops}")
ops = {**ops0, **saved_ops, **user_ops}
needs_reg = (
force_reg
or (keep_reg and not (plane_dir / "data.bin").exists())
or "yoff" not in ops
)
needs_detect = force_detect or not (plane_dir / "stat.npy").exists()
ops["zplane"] = int(plane_dir.stem.removeprefix("plane"))
ops["do_registration"] = int(needs_reg)
ops["roidetect"] = int(needs_detect)
if "save_path" not in ops.keys():
ops["save_path"] = str(plane_dir)
if "nframes" not in ops and "shape" in ops.get("metadata", {}):
ic(ops["metadata"]["shape"])
ops["nframes"] = ops["metadata"]["shape"][0]
ops["ops_path"] = str(ops_path)
np.save(ops_path, ops)
output_ops = run_plane_bin(plane_dir)
# cleanup ourselves
if not keep_raw:
(plane_dir / "data_raw.bin").unlink(missing_ok=True)
if not keep_reg:
(plane_dir / "data.bin").unlink(missing_ok=True)
expected_files = {
"ops": plane_dir / "ops.npy",
"stat": plane_dir / "stat.npy",
"iscell": plane_dir / "iscell.npy",
"registration": plane_dir / "registration.png",
"segmentation": plane_dir / "segmentation.png",
"max_proj": plane_dir / "max_projection_image.png",
"traces": plane_dir / "traces.png",
"noise": plane_dir / "shot_noise_distrubution.png",
"model": plane_dir / "model.npy",
"rastermap": plane_dir / "rastermap.png",
}
try:
if not all(
expected_files[key].is_file()
for key in ["registration", "segmentation", "traces"]
):
print(f"Generating missing plots for {plane_dir.stem}...")
def safe_delete(file_path):
if file_path.exists():
try:
file_path.unlink()
except PermissionError:
print(
f"Error: Cannot delete {file_path}. Ensure it is not open elsewhere."
)
for key in ["registration", "segmentation", "traces"]:
safe_delete(expected_files[key])
if expected_files["stat"].is_file():
res = load_planar_results(output_ops)
iscell = res["iscell"]
f = res["F"][iscell]
dff = dff_percentile(f, percentile=2) * 100
dff = uniform_filter1d(dff, size=3, axis=1)
dff_noise = dff_shot_noise(dff, output_ops["fs"])
ncells = min(30, dff.shape[0])
if ncells < 10:
print(f"Too few cells to plot traces for {plane_dir.stem}.")
else:
print("Plotting traces...")
plot_traces(
dff, save_path=expected_files["traces"], num_neurons=ncells
)
print("Plotting noise distribution...")
plot_noise_distribution(dff_noise, save_path=expected_files["noise"])
if HAS_RASTERMAP:
spks = res["spks"][iscell]
n_neurons = spks.shape[0]
if n_neurons < 200:
params = {
"n_clusters": None,
"n_PCs": min(64, n_neurons - 1),
"locality": 0.1,
"time_lag_window": 15,
"grid_upsample": 0,
}
else:
params = {
"n_clusters": 100,
"n_PCs": 128,
"locality": 0.0,
"grid_upsample": 10,
}
print("Computing rastermap model...")
model = Rastermap(**params).fit(spks)
np.save(expected_files["model"], model)
neuron_bin_size = (
1 if n_neurons < 200 else 5 if n_neurons < 500 else 10
)
xmax = min(spks.shape[1], int(2000 * (200 / n_neurons) ** 0.5))
plot_rastermap(
spks,
model,
neuron_bin_size=neuron_bin_size,
xmax=xmax,
save_path=expected_files["rastermap"],
title_kwargs={"fontsize": 8, "y": 0.95},
title="Rastermap Sorted Activity",
)
else:
print("No rastermap is available.")
fig_label = kwargs.get("fig_label", plane_dir.stem)
plot_projection(
output_ops,
expected_files["segmentation"],
fig_label=fig_label,
display_masks=True,
add_scalebar=True,
proj="meanImg",
)
plot_projection(
output_ops,
expected_files["max_proj"],
fig_label=fig_label,
display_masks=False,
add_scalebar=True,
proj="max_proj",
)
print("Plots generated successfully.")
except Exception:
traceback.print_exc()
return output_ops
def run_grid_search(
base_ops: dict,
grid_search_dict: dict,
input_file: Path | str,
save_root: Path | str,
):
"""
Run a grid search over all combinations of the input suite2p parameters.
Parameters
----------
base_ops : dict
Dictionary of default Suite2p ops to start from. Each parameter combination will override values in this dictionary.
grid_search_dict : dict
Dictionary mapping parameter names (str) to a list of values to grid search.
Each combination of values across parameters will be run once.
input_file : str or Path
Path to the input data file, currently only supports tiff.
save_root : str or Path
Root directory where each parameter combination's output will be saved.
A subdirectory will be created for each run using a short parameter tag.
Notes
-----
- Subfolder names for each parameter are abbreviated to 3-character keys and truncated/rounded values.
Examples
--------
>>> import lbm_suite2p_python as lsp
>>> import suite2p
>>> base_ops = suite2p.default_ops()
>>> base_ops["anatomical_only"] = 3
>>> base_ops["diameter"] = 6
>>> lsp.run_grid_search(
... base_ops,
... {"threshold_scaling": [1.0, 1.2], "tau": [0.1, 0.15]},
... input_file="/mnt/data/assembled_plane_03.tiff",
... save_root="/mnt/grid_search/"
... )
This will create the following output directory structure::
/mnt/data/grid_search/
├── thr1.00_tau0.10/
│ └── suite2p output for threshold_scaling=1.0, tau=0.1
├── thr1.00_tau0.15/
├── thr1.20_tau0.10/
└── thr1.20_tau0.15/
See Also
--------
[suite2p parameters](http://suite2p.readthedocs.io/en/latest/parameters.html)
"""
from itertools import product
from pathlib import Path
import copy
save_root = Path(save_root)
save_root.mkdir(exist_ok=True)
print(f"Saving grid-search in {save_root}")
param_names = list(grid_search_dict.keys())
param_values = list(grid_search_dict.values())
param_combos = list(product(*param_values))
for combo in param_combos:
ops = copy.deepcopy(base_ops)
combo_dict = dict(zip(param_names, combo))
ops.update(combo_dict)
tag_parts = [
f"{k[:3]}{v:.2f}" if isinstance(v, float) else f"{k[:3]}{v}"
for k, v in combo_dict.items()
]
tag = "_".join(tag_parts)
print(f"Running grid search in: {save_root.joinpath(tag)}")
run_plane(ops, input_file, save_root, save_folder=tag)