import json
from collections import defaultdict
from collections.abc import Sequence
from io import StringIO
import re
from pathlib import Path
import numpy as np
import dask.array as da
from tifffile import TiffFile, tifffile
from . import log
try:
from zarr import open as zarr_open
from zarr.storage import FsspecStore
from fsspec.implementations.reference import ReferenceFileSystem
HAS_ZARR = True
except ImportError:
HAS_ZARR = False
zarr_open = None
ReferenceFileSystem = None
FsspecStore = None
CHUNKS = {0: 1, 1: "auto", 2: -1, 3: -1}
MBO_SUPPORTED_FTYPES = [".tiff", ".zarr", ".bin", ".h5"]
PIPELINE_TAGS = ("plane", "roi", "z", "plane_", "roi_", "z_")
logger = log.get("file_io")
[docs]
def load_ops(ops_input: str | Path | list[str | Path]):
"""Simple utility load a suite2p npy file"""
if isinstance(ops_input, (str, Path)):
return np.load(ops_input, allow_pickle=True).item()
elif isinstance(ops_input, dict):
return ops_input
logger.warning("No valid ops file provided, returning empty dict.")
return {}
[docs]
def write_ops(metadata, raw_filename, **kwargs):
"""
Write metadata to an ops file alongside the given filename.
metadata must contain
'shape', 'pixel_resolution', 'frame_rate' keys.
"""
logger.debug(f"Writing ops file for {raw_filename} with metadata: {metadata}")
if not isinstance(raw_filename, (str, Path)):
raise TypeError(f"raw_filename must be str or Path, got {type(raw_filename)}")
filename = Path(raw_filename).expanduser().resolve()
structural = kwargs.get("structural", False)
chan = 2 if structural or "data_chan2.bin" in str(filename) else 1
logger.debug(f"Detected channel {chan}")
root = filename.parent if filename.is_file() else filename
ops_path = root / "ops.npy"
logger.info(f"Writing ops file to {ops_path}")
shape = metadata["shape"]
nt, Ly, Lx = shape[0], shape[-2], shape[-1] # Fixed: shape is (T, Y, X), so [-2]=Ly, [-1]=Lx
# Check if num_frames was explicitly set (takes precedence over shape)
if "num_frames" in metadata:
nt_metadata = int(metadata["num_frames"])
if nt_metadata != shape[0]:
raise ValueError(
f"Inconsistent frame count in metadata!\n"
f"metadata['num_frames'] = {nt_metadata}\n"
f"metadata['shape'][0] = {shape[0]}\n"
f"These must match. Check your data and metadata."
)
nt = nt_metadata
logger.debug(f"Using explicit num_frames={nt} from metadata")
elif "nframes" in metadata:
nt_metadata = int(metadata["nframes"])
if nt_metadata != shape[0]:
raise ValueError(
f"Inconsistent frame count in metadata!\n"
f"metadata['nframes'] = {nt_metadata}\n"
f"metadata['shape'][0] = {shape[0]}\n"
f"These must match. Check your data and metadata."
)
nt = nt_metadata
logger.debug(f"Using explicit nframes={nt} from metadata")
if "pixel_resolution" not in metadata:
logger.warning("No pixel resolution found in metadata, using default [2, 2].")
if "fs" not in metadata:
if "frame_rate" in metadata:
metadata["fs"] = metadata["frame_rate"]
elif "framerate" in metadata:
metadata["fs"] = metadata["framerate"]
else:
logger.warning("No frame rate found; defaulting fs=10")
metadata["fs"] = 10
dx, dy = metadata.get("pixel_resolution", [2, 2])
# Load or initialize ops
if ops_path.exists():
ops = np.load(ops_path, allow_pickle=True).item()
else:
from .metadata import default_ops
ops = default_ops()
# Update shared core fields
ops.update(
{
"Ly": Ly,
"Lx": Lx,
"fs": metadata["fs"],
"dx": dx,
"dy": dy,
"ops_path": str(ops_path),
}
)
# Channel-specific entries
# Use the potentially overridden nt (from num_frames or nframes)
if chan == 1:
ops["nframes_chan1"] = nt
ops["raw_file"] = str(filename)
else:
ops["nframes_chan2"] = nt
ops["chan2_file"] = str(filename)
ops["align_by_chan"] = chan
# Set top-level nframes to match the written channel
# This ensures consistency between nframes and nframes_chan1/chan2
ops["nframes"] = nt
# Merge extra metadata, but DON'T overwrite nframes fields
# This prevents inconsistency between nframes and nframes_chan1
for key, value in metadata.items():
if key not in ["nframes", "nframes_chan1", "nframes_chan2", "num_frames"]:
ops[key] = value
np.save(ops_path, ops)
logger.debug(
f"Ops file written to {ops_path} with nframes={ops['nframes']}, nframes_chan1={ops.get('nframes_chan1')}"
)
[docs]
def files_to_dask(files: list[str | Path], astype=None, chunk_t=250):
"""
Lazily build a Dask array or list of arrays depending on filename tags.
- "plane", "z", or "chan" → stacked along Z (TZYX)
- "roi" → list of 3D (T,Y,X) arrays, one per ROI
- otherwise → concatenate all files in time (T)
"""
files = [Path(f) for f in files]
if not files:
raise ValueError("No input files provided.")
has_plane = any(re.search(r"(plane|z|chan)[_-]?\d+", f.stem, re.I) for f in files)
has_roi = any(re.search(r"roi[_-]?\d+", f.stem, re.I) for f in files)
# lazy-load utility inline
def load_lazy(f):
if f.suffix == ".npy":
arr = np.load(f, mmap_mode="r")
elif f.suffix in (".tif", ".tiff"):
arr = tifffile.memmap(f, mode="r")
else:
raise ValueError(f"Unsupported file type: {f}")
chunks = (min(chunk_t, arr.shape[0]),) + arr.shape[1:]
return da.from_array(arr, chunks=chunks)
if has_roi:
roi_groups = defaultdict(list)
for f in files:
m = re.search(r"roi[_-]?(\d+)", f.stem, re.I)
roi_idx = int(m.group(1)) if m else 0
roi_groups[roi_idx].append(f)
roi_arrays = []
for roi_idx, group in sorted(roi_groups.items()):
arrays = [load_lazy(f) for f in sorted(group)]
darr = da.concatenate(arrays, axis=0) # concat in time
if astype:
darr = darr.astype(astype)
roi_arrays.append(darr)
return roi_arrays
# Plane or Z grouping case
if has_plane:
plane_groups = defaultdict(list)
for f in files:
m = re.search(r"(plane|z|chan)[_-]?(\d+)", f.stem, re.I)
plane_idx = int(m.group(2)) if m else 0
plane_groups[plane_idx].append(f)
plane_stacks = []
for z, group in sorted(plane_groups.items()):
arrays = [load_lazy(f) for f in sorted(group)]
plane = da.concatenate(arrays, axis=0)
plane_stacks.append(plane)
full = da.stack(plane_stacks, axis=1) # (T,Z,Y,X)
return full.astype(astype) if astype else full
# Default: concatenate along time
arrays = [load_lazy(f) for f in sorted(files)]
full = da.concatenate(arrays, axis=0) # (T,Y,X)
return full.astype(astype) if astype else full
[docs]
def expand_paths(paths: str | Path | Sequence[str | Path]) -> list[Path]:
"""
Expand a path, list of paths, or wildcard pattern into a sorted list of actual files.
This is a handy wrapper for loading images or data files when you’ve got a folder,
some wildcards, or a mix of both.
Parameters
----------
paths : str, Path, or list of (str or Path)
Can be a single path, a wildcard pattern like "\\*.tif", a folder, or a list of those.
Returns
-------
list of Path
Sorted list of full paths to matching files.
Examples
--------
>>> expand_paths("data/\\*.tif")
[Path("data/img_000.tif"), Path("data/img_001.tif"), ...]
>>> expand_paths(Path("data"))
[Path("data/img_000.tif"), Path("data/img_001.tif"), ...]
>>> expand_paths(["data/\\*.tif", Path("more_data")])
[Path("data/img_000.tif"), Path("more_data/img_050.tif"), ...]
"""
if isinstance(paths, (str, Path)):
paths = [paths]
elif not isinstance(paths, (list, tuple)):
raise TypeError(f"Expected str, Path, or sequence of them, got {type(paths)}")
result = []
for p in paths:
p = Path(p)
if "*" in str(p):
result.extend(p.parent.glob(p.name))
elif p.is_dir():
result.extend(p.glob("*"))
elif p.exists() and p.is_file():
result.append(p)
return sorted(p.resolve() for p in result if p.is_file())
def _tiff_to_fsspec(tif_path: Path, base_dir: Path) -> dict:
"""
Create a kerchunk reference for a single TIFF file.
Parameters
----------
tif_path : Path
Path to the TIFF file on disk.
base_dir : Path
Directory representing the “root” URI for the reference.
Returns
-------
refs : dict
A kerchunk reference dict (in JSON form) for this single TIFF.
"""
with TiffFile(str(tif_path.expanduser().resolve())) as tif:
with StringIO() as f:
store = tif.aszarr()
store.write_fsspec(f, url=base_dir.as_uri())
refs = json.loads(f.getvalue()) # type: ignore
return refs
def save_fsspec(filenames):
base_dir = Path(filenames[0]).parent
combined_json_path = base_dir / "combined_refs.json"
if combined_json_path.is_file():
# delete it, its cheap to create
logger.debug(f"Removing existing combined reference file: {combined_json_path}")
combined_json_path.unlink()
logger.debug(f"Generating combined kerchunk reference for {len(filenames)} files…")
combined_refs = _multi_tiff_to_fsspec(tif_files=filenames, base_dir=base_dir)
with open(combined_json_path, "w") as _f:
json.dump(combined_refs, _f) # type: ignore
logger.info(f"Combined kerchunk reference written to {combined_json_path}")
return combined_json_path
def _multi_tiff_to_fsspec(tif_files: list[Path], base_dir: Path) -> dict:
if len(tif_files) <= 1:
raise ValueError("Need at least two TIFF files to combine.")
combined_refs: dict[str, str] = {}
per_file_refs = []
total_shape = None
total_chunks = None
zarr_meta = {}
for tif_path in tif_files:
# Create a json reference for each TIFF file
inner_refs = _tiff_to_fsspec(tif_path, base_dir)
zarr_meta = json.loads(inner_refs.pop(".zarray"))
inner_refs.pop(".zattrs", None)
shape = zarr_meta["shape"]
chunks = zarr_meta["chunks"]
if total_shape is None:
total_shape = shape.copy()
total_chunks = chunks
else:
if shape[1:] != total_shape[1:]:
raise ValueError(f"Shape mismatch in {tif_path}: {shape[1:]} != {total_shape[1:]}")
if chunks != total_chunks:
raise ValueError(f"Chunk mismatch in {tif_path}: {chunks} != {total_chunks}")
total_shape[0] += shape[0] # accumulate along axis 0
per_file_refs.append((inner_refs, shape))
combined_zarr_meta = {
"shape": total_shape, # total shape tracks the full-assembled image shape
"chunks": total_chunks,
"dtype": zarr_meta["dtype"],
"compressor": zarr_meta["compressor"],
"filters": zarr_meta.get("filters", None),
"order": zarr_meta["order"],
"zarr_format": zarr_meta["zarr_format"],
"fill_value": zarr_meta.get("fill_value", 0),
}
combined_refs[".zarray"] = json.dumps(combined_zarr_meta)
combined_refs[".zattrs"] = json.dumps(
{"_ARRAY_DIMENSIONS": ["T", "C", "Y", "X"][: len(total_shape)]}
)
axis0_offset = 0
# since we are combining along axis 0, we need to adjust the keys
# in the inner_refs to account for the offset along that axis.
for inner_refs, shape in per_file_refs:
chunksize0 = total_chunks[0]
for key, val in inner_refs.items():
idx = list(map(int, key.strip("/").split(".")))
idx[0] += axis0_offset // chunksize0
new_key = ".".join(map(str, idx))
combined_refs[new_key] = val
axis0_offset += shape[0]
return combined_refs
def sort_by_si_filename(filename):
"""
Sort ScanImage files by the last number in the filename (e.g., _00001, _00002, etc.).
"""
numbers = re.findall(r"\d+", str(filename))
return int(numbers[-1]) if numbers else 0
def is_excluded(path, exclude_dirs=()):
return any(excl in path.parts for excl in exclude_dirs)
[docs]
def get_files(
base_dir, str_contains="", max_depth=1, sort_ascending=True, exclude_dirs=None
) -> list | Path:
"""
Recursively search for files in a specified directory whose names contain a given substring,
limiting the search to a maximum subdirectory depth. Optionally, the resulting list of file paths
is sorted in ascending order using numeric parts of the filenames when available.
Parameters
----------
base_dir : str or Path
The base directory where the search begins. This path is expanded (e.g., '~' is resolved)
and converted to an absolute path.
str_contains : str, optional
A substring that must be present in a file's name for it to be included in the result.
If empty, all files are matched.
max_depth : int, optional
The maximum number of subdirectory levels (relative to the base directory) to search.
Defaults to 1. If set to 0, it is automatically reset to 1.
sort_ascending : bool, optional
If True (default), the matched file paths are sorted in ascending alphanumeric order.
The sort key extracts numeric parts from filenames so that, for example, "file2" comes
before "file10".
exclude_dirs : iterable of str or Path, optional
An iterable of directories to exclude from the resulting list of file paths. By default
will exclude ".venv/", "__pycache__/", ".git" and ".github"].
Returns
-------
list of str
A list of full file paths (as strings) for files within the base directory (and its
subdirectories up to the specified depth) that contain the provided substring.
Raises
------
FileNotFoundError
If the base directory does not exist.
NotADirectoryError
If the specified base_dir is not a directory.
Examples
--------
>>> import mbo_utilities as mbo
>>> # Get all files that contain "ops.npy" in their names by searching up to 3 levels deep:
>>> ops_files = mbo.get_files("path/to/files", "ops.npy", max_depth=3)
>>> # Get only files containing "tif" in the current directory (max_depth=1):
>>> tif_files = mbo.get_files("path/to/files", "tif")
"""
base_path = Path(base_dir).expanduser().resolve()
if not base_path.exists():
raise FileNotFoundError(f"Directory '{base_path}' does not exist.")
if not base_path.is_dir():
raise NotADirectoryError(f"'{base_path}' is not a directory.")
if max_depth == 0:
max_depth = 1
base_depth = len(base_path.parts)
pattern = f"*{str_contains}*" if str_contains else "*"
if exclude_dirs is None:
exclude_dirs = [".venv", ".git", "__pycache__"]
files = [
file
for file in base_path.rglob(pattern)
if len(file.parts) - base_depth <= max_depth
and file.is_file()
and not is_excluded(file, exclude_dirs)
]
if sort_ascending:
files.sort(key=sort_by_si_filename)
return files
def get_plane_from_filename(path, fallback=None):
path = Path(path)
for part in path.stem.lower().split("_"):
if part.startswith("plane"):
suffix = part[5:]
if suffix.isdigit():
return int(suffix.lstrip("0") or "0")
if fallback is not None:
return fallback
raise ValueError(f"Could not extract plane number from filename: {path.name}")
def derive_tag_from_filename(path):
"""
Derive a folder tag from a filename based on “planeN”, “roiN”, or "tagN" patterns.
Parameters
----------
path : str or pathlib.Path
File path or name whose stem will be parsed.
Returns
-------
str
If the stem starts with “plane”, “roi”, or “res” followed by an integer,
returns that tag plus the integer (e.g. “plane3”, “roi7”, “res2”).
Otherwise returns the original stem unchanged.
Examples
--------
>>> derive_tag_from_filename("plane_01.tif")
'plane1'
>>> derive_tag_from_filename("plane2.bin")
'plane2'
>>> derive_tag_from_filename("roi5.raw")
'roi5'
>>> derive_tag_from_filename("ROI_10.dat")
'roi10'
>>> derive_tag_from_filename("res-3.h5")
'res3'
>>> derive_tag_from_filename("assembled_data_1.tiff")
'assembled_data_1'
>>> derive_tag_from_filename("file_12.tif")
'file_12'
"""
name = Path(path).stem
for tag in PIPELINE_TAGS:
low = name.lower()
if low.startswith(tag):
suffix = name[len(tag) :]
if suffix and (suffix[0] in ("_", "-")):
suffix = suffix[1:]
if suffix.isdigit():
return f"{tag}{int(suffix)}"
return name
def group_plane_rois(input_dir):
input_dir = Path(input_dir)
grouped = defaultdict(list)
for d in input_dir.iterdir():
if (
d.is_dir()
and not d.name.endswith(".zarr") # exclude zarr dirs
and d.stem.startswith("plane")
and "_roi" in d.stem
):
parts = d.stem.split("_")
if len(parts) == 2 and parts[1].startswith("roi"):
plane = parts[0] # e.g. "plane01"
grouped[plane].append(d)
return grouped
def merge_zarr_rois(input_dir, output_dir=None, overwrite=True):
"""
Concatenate roi1 + roi2 .zarr stores for each plane into a single planeXX.zarr.
Parameters
----------
input_dir : Path or str
Directory containing planeXX_roi1, planeXX_roi2 subfolders with ops.npy + data.zarr.
output_dir : Path or str, optional
Where to write merged planeXX.zarr. Defaults to `input_dir`.
overwrite : bool
If True, existing outputs are replaced.
"""
z_merged = None
input_dir = Path(input_dir)
output_dir = (
Path(output_dir)
if output_dir
else input_dir.parent / (input_dir.name + "_merged")
)
output_dir.mkdir(exist_ok=True)
logger.debug(f"Saving merged zarrs to {output_dir}")
roi1_dirs = sorted(input_dir.glob("*plane*_roi1*"))
roi2_dirs = sorted(input_dir.glob("*plane*_roi2*"))
if not roi1_dirs or not roi2_dirs:
logger.critical("No roi1 or roi2 in input dir")
return None
if len(roi1_dirs) != len(roi2_dirs):
raise ValueError(f"Mismatched ROI dirs: {len(roi1_dirs)} roi1 dirs vs {len(roi2_dirs)} roi2 dirs")
for roi1, roi2 in zip(roi1_dirs, roi2_dirs):
zplane = roi1.stem.split("_")[0] # "plane01"
out_path = output_dir / f"{zplane}.zarr"
if out_path.exists():
if overwrite:
logger.info(f"Overwriting {out_path}")
import shutil
shutil.rmtree(out_path)
else:
logger.info(f"Skipping {zplane}, {out_path} exists")
continue
# load ops
z1 = da.from_zarr(roi1)
z2 = da.from_zarr(roi2)
if z1.shape[0] != z2.shape[0]:
raise ValueError(f"Frame count mismatch for {zplane}: {z1.shape[0]} != {z2.shape[0]}")
if z1.shape[1] != z2.shape[1]:
raise ValueError(f"Height mismatch for {zplane}: {z1.shape[1]} != {z2.shape[1]}")
# concatenate along width (axis=2)
z_merged = da.concatenate([z1, z2], axis=2)
z_merged.to_zarr(out_path, overwrite=overwrite)
if z_merged is not None:
logger.info(f"Merged shape: {z_merged.shape}")
return None
def load_zarr_grouped(
input_dir,
):
"""
Discover and lazily concatenate ROI .zarr stores per plane.
Returns
-------
dict[str, dask.array.Array]
Mapping plane tag -> concatenated dask array
"""
"""
Lazily load multiple planeN_roiN.zarr stores into a single 4D array (T, Z, Y, X).
Each plane's ROIs are concatenated horizontally (along X),
and all planes are stacked along the Z dimension.
Returns
-------
dask.array.Array
Lazy 4D array with shape (T, Z, Y, X_total)
"""
input_dir = Path(input_dir)
grouped = defaultdict(list)
# collect plane-roi groups
for d in sorted(input_dir.glob("plane*_roi*.zarr")):
if not d.is_dir():
continue
parts = d.stem.split("_")
if len(parts) == 2 and parts[1].startswith("roi"):
grouped[parts[0]].append(d)
if not grouped:
raise ValueError(f"No plane*_roi*.zarr directories in {input_dir}")
planes = []
for plane, roi_dirs in sorted(grouped.items(), key=lambda kv: kv[0]):
roi_dirs = sorted(roi_dirs, key=lambda p: p.stem.lower())
arrays = [da.from_zarr(p, chunks=None) for p in roi_dirs]
base_shape = arrays[0].shape
for a in arrays[1:]:
if a.shape[:2] != base_shape[:2]:
raise ValueError(
f"Shape mismatch in {plane}: {a.shape} vs {base_shape}"
)
merged_plane = da.concatenate(arrays, axis=2) # concat horizontally
planes.append(merged_plane)
logger.info(f"{plane}: concatenated {len(arrays)} ROIs → {merged_plane.shape}")
arr_4d = da.stack(planes, axis=1) # stack planes along Z
logger.info(f"Final 4D array shape: {arr_4d.shape} (T, Z, Y, X)")
return arr_4d
def _is_arraylike(obj) -> bool:
"""
Checks if the object is array-like.
For now just checks if obj has `__getitem__()`
"""
for attr in ["__getitem__", "shape", "ndim"]:
if not hasattr(obj, attr):
return False
return True
def _get_mbo_project_root() -> Path:
"""Return the root path of the mbo_utilities repository (based on this file)."""
return Path(__file__).resolve().parent.parent
[docs]
def get_mbo_dirs() -> dict:
"""
Ensure ~/mbo and its subdirectories exist.
Returns a dict with paths to the root, settings, and cache directories.
"""
base = Path.home().joinpath("mbo")
imgui = base.joinpath("imgui")
cache = base.joinpath("cache")
logs = base.joinpath("logs")
tests = base.joinpath("tests")
data = base.joinpath("data")
assets = imgui.joinpath("assets")
settings = assets.joinpath("app_settings")
for d in (base, imgui, cache, logs, assets, data, tests):
d.mkdir(exist_ok=True)
return {
"base": base,
"imgui": imgui,
"cache": cache,
"logs": logs,
"assets": assets,
"settings": settings,
"data": data,
"tests": tests,
}
def get_last_savedir_path() -> Path:
"""Return path to settings file tracking last saved folder."""
return Path.home().joinpath("mbo", "settings", "last_savedir.json")
def load_last_savedir(default=None) -> Path:
"""Load last saved directory path if it exists."""
f = get_last_savedir_path()
if f.is_file():
try:
path = Path(json.loads(f.read_text()).get("last_savedir", ""))
if path.exists():
return path
except Exception as e:
logger.debug(f"Failed to load last savedir from {f}: {e}")
pass
return Path(default or Path().cwd())
def save_last_savedir(path: Path):
"""Persist the most recent save directory path."""
f = get_last_savedir_path()
f.parent.mkdir(parents=True, exist_ok=True)
f.write_text(json.dumps({"last_savedir": str(path)}))
def _convert_range_to_slice(k):
return slice(k.start, k.stop, k.step) if isinstance(k, range) else k
def print_tree(path, max_depth=1, prefix=""):
path = Path(path)
if not path.is_dir():
print(path)
return
entries = sorted([p for p in path.iterdir() if p.is_dir()])
for i, entry in enumerate(entries):
connector = "└── " if i == len(entries) - 1 else "├── "
print(prefix + connector + entry.name + "/")
if max_depth > 1:
extension = " " if i == len(entries) - 1 else "│ "
print_tree(entry, max_depth=max_depth - 1, prefix=prefix + extension)
def merge_zarr_zplanes(
zarr_paths: list[str | Path],
output_path: str | Path,
*,
suite2p_dirs: list[str | Path] | None = None,
metadata: dict | None = None,
overwrite: bool = True,
compression_level: int = 1,
) -> Path:
"""
Merge multiple single z-plane Zarr files into a single OME-Zarr volume.
Creates an OME-NGFF v0.5 compliant Zarr store with shape (T, Z, Y, X) by
stacking individual z-plane Zarr files. Optionally includes Suite2p segmentation
masks as OME-Zarr labels.
Parameters
----------
zarr_paths : list of str or Path
List of paths to single-plane Zarr stores. Should be ordered by z-plane.
Each Zarr should have shape (T, Y, X).
output_path : str or Path
Path for the output merged Zarr store.
suite2p_dirs : list of str or Path, optional
List of Suite2p output directories corresponding to each z-plane.
If provided, ROI masks will be added as OME-Zarr labels.
Must match length of zarr_paths.
metadata : dict, optional
Comprehensive metadata dictionary. Coordinate-related keys are used for
OME-NGFF transformations, while additional keys are preserved as custom
metadata. Supported keys:
**Coordinate transformations:**
- pixel_resolution : tuple (x, y) in micrometers
- frame_rate : float, Hz (or 'fs')
- dz : float, z-step in micrometers (or 'z_step')
- name : str, volume name
**ScanImage metadata:**
- si : dict, complete ScanImage metadata structure
- roi_groups : list, ROI definitions with scanfield info
- objective_resolution : float, objective NA
- zoom_factor : float
**Acquisition metadata:**
- acquisition_date : str, ISO format
- experimenter : str
- description : str
- specimen : str
**Microscope metadata:**
- objective : str, objective name
- emission_wavelength : float, nm
- excitation_wavelength : float, nm
- numerical_aperture : float
**Processing metadata:**
- fix_phase : bool
- phasecorr_method : str
- use_fft : bool
- register_z : bool
**OMERO rendering:**
- channel_names : list of str
- num_planes : int, number of channels/planes
All metadata is organized into structured groups (scanimage, acquisition,
microscope, processing) in the output OME-Zarr attributes.
overwrite : bool, default=True
If True, overwrite existing output Zarr store.
compression_level : int, default=1
Gzip compression level (0-9). Higher = better compression, slower.
Returns
-------
Path
Path to the created OME-Zarr store.
Raises
------
ValueError
If zarr_paths is empty or shapes are incompatible.
FileNotFoundError
If any input Zarr or Suite2p directory doesn't exist.
Examples
--------
Merge z-plane Zarr files into a volume:
>>> zarr_files = [
... "session1/plane01.zarr",
... "session1/plane02.zarr",
... "session1/plane03.zarr",
... ]
>>> merge_zarr_zplanes(zarr_files, "session1/volume.zarr")
Include Suite2p segmentation masks:
>>> s2p_dirs = [
... "session1/plane01_suite2p",
... "session1/plane02_suite2p",
... "session1/plane03_suite2p",
... ]
>>> merge_zarr_zplanes(
... zarr_files,
... "session1/volume.zarr",
... suite2p_dirs=s2p_dirs,
... metadata={"pixel_resolution": (0.5, 0.5), "frame_rate": 30.0, "dz": 5.0}
... )
See Also
--------
imwrite : Write imaging data to various formats including OME-Zarr
"""
if not HAS_ZARR:
raise ImportError("zarr package required. Install with: pip install zarr")
import zarr
from zarr.codecs import BytesCodec, GzipCodec
zarr_paths = [Path(p) for p in zarr_paths]
output_path = Path(output_path)
if not zarr_paths:
raise ValueError("zarr_paths cannot be empty")
# Validate all input Zarrs exist
for zp in zarr_paths:
if not zp.exists():
raise FileNotFoundError(f"Zarr store not found: {zp}")
# Validate suite2p_dirs if provided
if suite2p_dirs is not None:
suite2p_dirs = [Path(p) for p in suite2p_dirs]
if len(suite2p_dirs) != len(zarr_paths):
raise ValueError(
f"suite2p_dirs length ({len(suite2p_dirs)}) must match "
f"zarr_paths length ({len(zarr_paths)})"
)
for s2p_dir in suite2p_dirs:
if not s2p_dir.exists():
raise FileNotFoundError(f"Suite2p directory not found: {s2p_dir}")
# Read first Zarr to get dimensions
logger.info(f"Reading first Zarr to determine dimensions: {zarr_paths[0]}")
z0 = zarr.open(str(zarr_paths[0]), mode="r")
logger.debug(f"Zarr type: {type(z0)}")
if hasattr(z0, "shape"):
# Direct array
T, Y, X = z0.shape
dtype = z0.dtype
logger.debug(f"Detected direct array with shape {(T, Y, X)}, dtype {dtype}")
else:
# Group - look for "0" array (OME-Zarr)
logger.debug(f"Detected group with keys: {list(z0.keys())}")
if "0" in z0:
arr = z0["0"]
T, Y, X = arr.shape
dtype = arr.dtype
logger.debug(f"Using '0' subarray with shape {(T, Y, X)}, dtype {dtype}")
else:
raise ValueError(
f"Cannot determine shape of {zarr_paths[0]}. "
f"Expected direct array or group with '0' subarray. "
f"Got group with keys: {list(z0.keys())}"
)
Z = len(zarr_paths)
logger.info(f"Creating merged Zarr volume with shape (T={T}, Z={Z}, Y={Y}, X={X})")
if output_path.exists() and overwrite:
import shutil
shutil.rmtree(output_path)
root = zarr.open_group(str(output_path), mode="w", zarr_format=3)
image_codecs = [BytesCodec(), GzipCodec(level=compression_level)]
image = zarr.create(
store=root.store,
path="0",
shape=(T, Z, Y, X),
chunks=(1, 1, Y, X), # Chunk by frame and z-plane
dtype=dtype,
codecs=image_codecs,
overwrite=True,
)
logger.info("Copying z-plane data...")
for zi, zpath in enumerate(zarr_paths):
logger.debug(f"Reading z-plane {zi + 1}/{Z} from {zpath}")
z_arr = zarr.open(str(zpath), mode="r")
# Handle both direct arrays and OME-Zarr groups
if hasattr(z_arr, "shape"):
plane_data = z_arr[:]
logger.debug(f" Read direct array with shape {plane_data.shape}")
elif "0" in z_arr:
plane_data = z_arr["0"][:]
logger.debug(f" Read '0' subarray with shape {plane_data.shape}")
else:
raise ValueError(
f"Cannot read data from {zpath}. "
f"Got group with keys: {list(z_arr.keys()) if hasattr(z_arr, 'keys') else 'N/A'}"
)
if plane_data.shape != (T, Y, X):
raise ValueError(
f"Shape mismatch at z={zi} (file: {zpath.name}): "
f"expected {(T, Y, X)}, got {plane_data.shape}"
)
logger.debug(f" Writing to output volume at z={zi}")
image[:, zi, :, :] = plane_data
logger.info(f"Copied z-plane {zi + 1}/{Z} from {zpath.name}")
# Add Suite2p labels if provided
if suite2p_dirs is not None:
logger.info("Adding Suite2p segmentation masks as labels...")
_add_suite2p_labels(root, suite2p_dirs, T, Z, Y, X, dtype, compression_level)
metadata = metadata or {}
ome_attrs = _build_rich_ome_metadata(
shape=(T, Z, Y, X),
dtype=dtype,
metadata=metadata,
)
for key, value in ome_attrs.items():
root.attrs[key] = value
# Add napari-specific scale metadata to the array for proper volumetric viewing
pixel_resolution = metadata.get("pixel_resolution", [1.0, 1.0])
frame_rate = metadata.get("frame_rate", metadata.get("fs", 1.0))
dz = metadata.get("dz", metadata.get("z_step", 1.0))
if isinstance(pixel_resolution, (list, tuple)) and len(pixel_resolution) >= 2:
pixel_x, pixel_y = float(pixel_resolution[0]), float(pixel_resolution[1])
else:
pixel_x = pixel_y = 1.0
time_scale = 1.0 / float(frame_rate) if frame_rate else 1.0
# napari reads scale from array attributes for volumetric viewing
# Scale order: (T, Z, Y, X) in physical units
image.attrs["scale"] = [time_scale, float(dz), pixel_y, pixel_x]
logger.info(f"Successfully created merged OME-Zarr at {output_path}")
logger.info(f"Napari scale (t,z,y,x): {image.attrs['scale']}")
return output_path
def _build_rich_ome_metadata(
shape: tuple,
dtype,
metadata: dict,
) -> dict:
"""
Build comprehensive OME-NGFF v0.5 metadata from ScanImage and other metadata.
Creates OMERO rendering settings, custom metadata fields, and proper
coordinate transformations based on available metadata.
Parameters
----------
shape : tuple
Shape of the array (T, Z, Y, X)
dtype : np.dtype
Data type of the array
metadata : dict
Metadata dictionary with optional keys
Returns
-------
dict
Complete OME-NGFF v0.5 metadata attributes
"""
T, Z, Y, X = shape
pixel_resolution = metadata.get("pixel_resolution", [1.0, 1.0])
frame_rate = metadata.get("frame_rate", metadata.get("fs", 1.0))
dz = metadata.get("dz", metadata.get("z_step", 1.0))
if isinstance(pixel_resolution, (list, tuple)) and len(pixel_resolution) >= 2:
pixel_x, pixel_y = float(pixel_resolution[0]), float(pixel_resolution[1])
else:
pixel_x = pixel_y = 1.0
time_scale = 1.0 / float(frame_rate) if frame_rate else 1.0
# Build OME-NGFF v0.5 multiscales
axes = [
{"name": "t", "type": "time", "unit": "second"},
{"name": "z", "type": "space", "unit": "micrometer"},
{"name": "y", "type": "space", "unit": "micrometer"},
{"name": "x", "type": "space", "unit": "micrometer"},
]
scale_values = [time_scale, float(dz), pixel_y, pixel_x]
datasets = [
{
"path": "0",
"coordinateTransformations": [{"type": "scale", "scale": scale_values}],
}
]
multiscales = [
{
"version": "0.5",
"name": metadata.get("name", "volume"),
"axes": axes,
"datasets": datasets,
}
]
# Build OME content
ome_content = {
"version": "0.5",
"multiscales": multiscales,
}
# Add OMERO rendering metadata
omero_metadata = _build_omero_metadata(
shape=shape,
dtype=dtype,
metadata=metadata,
)
if omero_metadata:
ome_content["omero"] = omero_metadata
result = {"ome": ome_content}
# Add custom metadata fields (ScanImage, acquisition info, etc.)
custom_meta = {}
# Add ScanImage metadata
if "si" in metadata:
si = metadata["si"]
custom_meta["scanimage"] = {
"version": f"{si.get('VERSION_MAJOR', 'unknown')}.{si.get('VERSION_MINOR', 0)}",
"imaging_system": si.get("imagingSystem", "unknown"),
"objective_resolution": si.get(
"objectiveResolution", metadata.get("objective_resolution")
),
"scan_mode": si.get("hScan2D", {}).get("scanMode", "unknown"),
}
# Add beam/laser info
if "hBeams" in si:
custom_meta["scanimage"]["laser_power"] = si["hBeams"].get("powers", 0)
custom_meta["scanimage"]["power_fraction"] = si["hBeams"].get(
"powerFractions", 0
)
# Add ROI info
if "hRoiManager" in si:
roi_mgr = si["hRoiManager"]
custom_meta["scanimage"]["roi"] = {
"scan_zoom": roi_mgr.get("scanZoomFactor", metadata.get("zoom_factor")),
"lines_per_frame": roi_mgr.get("linesPerFrame"),
"pixels_per_line": roi_mgr.get("pixelsPerLine"),
"line_period": roi_mgr.get("linePeriod"),
"bidirectional": si.get("hScan2D", {}).get("bidirectional", True),
}
# Add ROI groups information
if "roi_groups" in metadata:
custom_meta["roi_groups"] = metadata["roi_groups"]
# Add acquisition metadata
acq_meta = {}
for key in ["acquisition_date", "experimenter", "description", "specimen"]:
if key in metadata:
acq_meta[key] = metadata[key]
if acq_meta:
custom_meta["acquisition"] = acq_meta
# Add microscope metadata
microscope_meta = {}
for key in [
"objective",
"emission_wavelength",
"excitation_wavelength",
"numerical_aperture",
]:
if key in metadata:
microscope_meta[key] = metadata[key]
if microscope_meta:
custom_meta["microscope"] = microscope_meta
# Add processing metadata
processing_meta = {}
for key in ["fix_phase", "phasecorr_method", "use_fft", "register_z"]:
if key in metadata:
processing_meta[key] = metadata[key]
if processing_meta:
custom_meta["processing"] = processing_meta
# Add file info
if "file_paths" in metadata or "num_files" in metadata:
custom_meta["source_files"] = {
"num_files": metadata.get("num_files"),
"num_frames": metadata.get("num_frames"),
"frames_per_file": metadata.get("frames_per_file"),
}
# Add all serializable custom metadata
for key, value in custom_meta.items():
try:
json.dumps(value)
result[key] = value
except (TypeError, ValueError):
logger.debug(f"Skipping non-serializable metadata key: {key}")
# Add any other simple metadata fields
for key, value in metadata.items():
if (
key
not in [
"pixel_resolution",
"frame_rate",
"fs",
"dz",
"z_step",
"name",
"si",
"roi_groups",
"acquisition_date",
"experimenter",
"description",
"specimen",
"objective",
"emission_wavelength",
"excitation_wavelength",
"numerical_aperture",
"fix_phase",
"phasecorr_method",
"use_fft",
"register_z",
"file_paths",
"num_files",
"num_frames",
"frames_per_file",
]
and key not in result
):
try:
json.dumps(value)
result[key] = value
except (TypeError, ValueError):
pass
return result
def _build_omero_metadata(shape: tuple, dtype, metadata: dict) -> dict:
"""
Build OMERO rendering metadata for OME-NGFF.
Parameters
----------
shape : tuple
Shape of the array (T, Z, Y, X)
dtype : np.dtype
Data type of the array
metadata : dict
Metadata dictionary
Returns
-------
dict
OMERO metadata or empty dict if not enough info
"""
import numpy as np
T, Z, Y, X = shape
# Determine data range for window settings
if np.issubdtype(dtype, np.integer):
info = np.iinfo(dtype)
data_min, data_max = info.min, info.max
else:
data_min, data_max = 0.0, 1.0
# Build channel metadata
channels = []
# Get channel names from metadata
channel_names = metadata.get("channel_names")
num_channels = metadata.get("num_planes", 1)
if channel_names is None:
# Generate default channel names
if num_channels == 1:
channel_names = ["Channel 1"]
else:
channel_names = [f"Z-plane {i + 1}" for i in range(num_channels)]
# Default colors (cycle through common microscopy colors)
default_colors = [
"00FF00", # Green
"FF0000", # Red
"0000FF", # Blue
"FFFF00", # Yellow
"FF00FF", # Magenta
"00FFFF", # Cyan
"FFFFFF", # White
]
for i, name in enumerate(channel_names[:num_channels]):
channel = {
"active": True,
"coefficient": 1.0,
"color": default_colors[i % len(default_colors)],
"family": "linear",
"inverted": False,
"label": name,
"window": {
"end": float(data_max),
"max": float(data_max),
"min": float(data_min),
"start": float(data_min),
},
}
channels.append(channel)
if not channels:
return {}
omero = {
"channels": channels,
"rdefs": {
"defaultT": 0,
"defaultZ": Z // 2, # Middle z-plane
"model": "greyscale",
},
"version": "0.5",
}
return omero
def _add_suite2p_labels(
root_group,
suite2p_dirs: list[Path],
T: int,
Z: int,
Y: int,
X: int,
dtype,
compression_level: int,
):
"""
Add Suite2p segmentation masks as OME-Zarr labels.
Creates a 'labels' subgroup with ROI masks from Suite2p stat.npy files.
Follows OME-NGFF v0.5 labels specification.
Parameters
----------
root_group : zarr.Group
Root Zarr group to add labels to.
suite2p_dirs : list of Path
Suite2p output directories for each z-plane.
T, Z, Y, X : int
Dimensions of the volume.
dtype : np.dtype
Data type for label array.
compression_level : int
Gzip compression level.
"""
import zarr
from zarr.codecs import BytesCodec, GzipCodec
logger.info("Creating labels array from Suite2p masks...")
# Create labels subgroup
labels_group = root_group.create_group("labels", overwrite=True)
# Create ROI masks array (static across time, just Z, Y, X)
label_codecs = [BytesCodec(), GzipCodec(level=compression_level)]
masks = zarr.create(
store=labels_group.store,
path="labels/0",
shape=(Z, Y, X),
chunks=(1, Y, X),
dtype=np.uint32, # uint32 for up to 4 billion ROIs
codecs=label_codecs,
overwrite=True,
)
# Process each z-plane
roi_id = 1 # Start ROI IDs at 1 (0 = background)
for zi, s2p_dir in enumerate(suite2p_dirs):
stat_path = s2p_dir / "stat.npy"
iscell_path = s2p_dir / "iscell.npy"
if not stat_path.exists():
logger.warning(f"stat.npy not found in {s2p_dir}, skipping z={zi}")
continue
# Load Suite2p data
stat = np.load(stat_path, allow_pickle=True)
# Load iscell if available to filter
if iscell_path.exists():
iscell = np.load(iscell_path, allow_pickle=True)[:, 0].astype(bool)
else:
iscell = np.ones(len(stat), dtype=bool)
# Create mask for this z-plane
plane_mask = np.zeros((Y, X), dtype=np.uint32)
for roi_idx, (roi_stat, is_cell) in enumerate(zip(stat, iscell)):
if not is_cell:
continue
# Get pixel coordinates for this ROI
ypix = roi_stat.get("ypix", [])
xpix = roi_stat.get("xpix", [])
if len(ypix) == 0 or len(xpix) == 0:
continue
# Ensure coordinates are within bounds
ypix = np.clip(ypix, 0, Y - 1)
xpix = np.clip(xpix, 0, X - 1)
# Assign unique ROI ID
plane_mask[ypix, xpix] = roi_id
roi_id += 1
# Write to Zarr
masks[zi, :, :] = plane_mask
logger.debug(
f"Added {(plane_mask > 0).sum()} labeled pixels for z-plane {zi + 1}/{Z}"
)
# Add OME-NGFF labels metadata
labels_metadata = {
"version": "0.5",
"labels": ["0"], # Path to the label array
}
# Add metadata for label array
label_array_meta = {
"version": "0.5",
"image-label": {
"version": "0.5",
"colors": [], # Can add color LUT here if desired
"source": {"image": "../../0"}, # Reference to main image
},
}
labels_group.attrs.update(labels_metadata)
labels_group["0"].attrs.update(label_array_meta)
logger.info(f"Added {roi_id - 1} total ROIs across {Z} z-planes")