"""
imwrite - Write lazy imaging arrays to disk.
This module provides the imwrite() function for writing imaging data to
various file formats with support for ROI selection, z-plane registration,
chunked streaming, and format conversion.
"""
from __future__ import annotations
import logging
import time
from pathlib import Path
from mbo_utilities import log
from mbo_utilities._writers import _try_generic_writers, add_processing_step
from mbo_utilities.arrays._registration import (
compute_axial_shifts,
validate_axial_shifts,
)
from mbo_utilities.metadata import RoiMode, get_param
from typing import TYPE_CHECKING
import contextlib
import numpy as np
if TYPE_CHECKING:
from collections.abc import Callable, Sequence
logger = log.get("writer")
[docs]
def imwrite(
lazy_array,
outpath: str | Path,
ext: str = ".tiff",
planes: list | tuple | int | None = None,
frames: list | tuple | int | None = None,
channels: list | tuple | int | None = None,
num_frames: int | None = None,
register_z: bool = False,
roi_mode: RoiMode | str = RoiMode.concat_y,
roi: int | Sequence[int] | None = None,
metadata: dict | None = None,
overwrite: bool = False,
order: list | tuple | None = None,
target_chunk_mb: int = 100,
progress_callback: Callable | None = None,
debug: bool = False,
show_progress: bool = True,
output_name: str | None = None,
output_suffix: str | None = None,
dataset_name: str | None = None,
dim_order: str | Sequence[str] | None = None,
**kwargs,
):
"""
Write a supported lazy imaging array to disk.
This function handles writing multi-dimensional imaging data to various formats,
with support for ROI selection, z-plane registration, chunked streaming, and
format conversion. Use with `imread()` to load and convert imaging data.
Parameters
----------
lazy_array : object
A lazy array from `imread()` or a numpy array. Any object with `.shape`,
`.dtype`, and `_imwrite()` method is supported. Use `mbo formats` CLI
command to list all supported input formats.
outpath : str or Path
Target directory to write output files. Will be created if it doesn't exist.
Files are named automatically based on plane/ROI (e.g., `plane01_roi1.tiff`).
ext : str, default=".tiff"
Output format extension. Supported formats:
- `.tiff`, `.tif` : Multi-page TIFF (BigTIFF for >4GB)
- `.bin` : Suite2p-compatible binary format with ops.npy metadata
- `.zarr` : Zarr v3 array store
- `.h5`, `.hdf5` : HDF5 format
planes : list | tuple | int | None, optional
Z-planes to export (1-based indexing). Options:
- None (default) : Export all planes
- int : Single plane, e.g. `planes=7` exports only plane 7
- list/tuple : Specific planes, e.g. `planes=[1, 7, 14]`
frames : list | tuple | int | None, optional
Timepoints to export (1-based indexing). Options:
- None (default) : Export all frames
- int : Single frame, e.g. `frames=100` exports only frame 100
- list/tuple : Specific frames, e.g. `frames=[1, 50, 100]`
- range : Range of frames, e.g. `frames=list(range(1, 101))`
channels : list | tuple | int | None, optional
Color channels to export (1-based indexing). Only applies to arrays
with a C dimension (e.g., multi-color imaging). Options:
- None (default) : Export all channels
- int : Single channel, e.g. `channels=1`
- list/tuple : Specific channels, e.g. `channels=[1, 2]`
roi_mode : RoiMode | str, default=RoiMode.concat_y
Mode for handling multi-ROI data. Options:
- RoiMode.concat_y : Horizontally concatenate ROIs into single FOV (default)
- RoiMode.separate : Write each ROI to separate files
String values are accepted (case-insensitive): "concat_y", "separate".
roi : int | Sequence[int] | None, optional
Specific ROI(s) to export when roi_mode=RoiMode.separate. Options:
- None (default) : Export all ROIs
- int > 0 : Export specific ROI, e.g. `roi=1` exports only ROI 1
- list/tuple : Export specific ROIs, e.g. `roi=[1, 3]`
Note: When roi_mode=RoiMode.concat_y, this parameter is ignored.
num_frames : int, optional
Number of frames to export. If None (default), exports all frames.
register_z : bool, default=False
Compute per-plane rigid shifts via phase correlation and store
them in ``metadata["plane_shifts"]``. The shifts are not applied
to the output pixels; viewers consume ``plane_shifts`` to align
planes at render time (e.g. napari layer ``translate`` or
``AxiallyAlignedView``).
Optional tunables via kwargs: ``max_frames`` (subsample count,
default 200), ``chunk_frames`` (streaming batch, default 10),
``max_reg_xy`` (search radius in pixels, default 30). GPU is
used automatically when cupy + CUDA are available.
metadata : dict, optional
Additional metadata to merge into output file headers/attributes.
overwrite : bool, default=False
Whether to overwrite existing output files.
order : list | tuple, optional
Reorder planes before writing. Must have same length as `planes`.
target_chunk_mb : int, optional
Target chunk size in MB for streaming writes. Default is 100 MB.
progress_callback : Callable, optional
Callback function for progress updates: `callback(progress, current_plane)`.
debug : bool, default=False
Enable verbose logging for troubleshooting.
show_progress : bool, default=True
Show tqdm progress bar during writing. Set to False in notebooks
when you don't want progress output cluttering the display.
output_name : str, optional
Filename for binary output when ext=".bin".
output_suffix : str, optional
Custom suffix to append to output filenames. If None (default), files are
named with "_stitched" for multi-ROI data when roi is None, or "_roiN"
for specific ROIs. Examples: "_stitched", "_processed", "_session1".
The suffix is automatically sanitized (illegal characters removed, double
extensions prevented, underscore prefix added if missing).
dataset_name : str, optional
Name of the HDF5 dataset to write under (only applies when writing
``.h5`` / ``.hdf5``). Default is ``"mov"`` — matches suite2p / caiman
convention and the auto-detect order in :class:`H5Array`. Pass
``"data"`` for the legacy mbo name, or any other key for custom
consumers. Ignored for non-h5 formats.
dim_order : str, optional
Axis labels for an in-memory numpy array of ndim 3/4/5, drawn from
``"TCZYX"``. The array is permuted and singleton-padded to canonical
5D TCZYX before writing. Examples: ``"TYX"``, ``"TZYX"``, ``"TCYX"``,
``"TCZYX"``. Ignored when ``lazy_array`` is already a lazy array.
**kwargs
Additional format-specific options passed to writer backends.
Zarr-specific options (ext=".zarr"):
``sharded`` (bool, default True) uses Zarr v3 sharding codec,
``compressor`` (str, default "none") one of "none", "gzip",
"zstd", "blosc-lz4", "blosc-zstd". Default is no compression so
interactive scrubbing doesn't pay decompression cost per frame;
pass an explicit value when storage size matters more than
random-access speed.
``compression_level`` (int) compressor-specific level (gzip 0–9,
zstd 1–22, blosc 1–9). Ignored when ``compressor="none"``.
``ome`` (bool, default True) writes OME-NGFF v0.5 metadata,
``pyramid`` (bool, default False) generates multi-resolution pyramid,
``pyramid_max_layers`` (int, default 4) sets max resolution levels,
``pyramid_method`` (str, default "mean") sets downsampling method.
Returns
-------
Path
Path to the output directory containing written files.
Examples
--------
>>> from mbo_utilities import imread, imwrite
>>> data = imread("path/to/raw/*.tiff")
>>> imwrite(data, "output/session1", roi=None) # Stitch all ROIs
>>> # Save specific planes
>>> imwrite(data, "output/session1", planes=[1, 7, 14])
>>> # Split ROIs
>>> imwrite(data, "output/session1", roi=0)
>>> # Z-plane registration
>>> imwrite(data, "output/registered", register_z=True)
>>> # Convert to Suite2p binary
>>> imwrite(data, "output/suite2p", ext=".bin", roi=0)
>>> # Save to Zarr
>>> imwrite(data, "output/zarr_store", ext=".zarr")
"""
if debug:
logger.setLevel(logging.INFO)
logger.info("Debug mode enabled; setting log level to INFO.")
logger.propagate = True
else:
logger.setLevel(logging.WARNING)
logger.propagate = False
# normalize roi_mode to enum
if isinstance(roi_mode, str):
roi_mode = RoiMode.from_string(roi_mode)
# save path
if not isinstance(outpath, (str, Path)):
raise TypeError(
f"`outpath` must be a string or Path, got {type(outpath)} instead."
)
outpath = Path(outpath)
if not outpath.parent.is_dir():
raise ValueError(
f"{outpath} is not inside a valid directory."
f" Please create the directory first."
)
outpath.mkdir(exist_ok=True)
# auto-wrap raw numpy arrays so the full ext-aware writer pipeline runs
if isinstance(lazy_array, np.ndarray):
from mbo_utilities.arrays.numpy import NumpyArray
lazy_array = NumpyArray(lazy_array, dim_order=dim_order)
elif dim_order is not None:
logger.debug("dim_order ignored: lazy_array is not a raw numpy array")
# handle roi based on roi_mode
# ROI support detected via duck typing: hasattr(arr, 'roi_mode')
if roi_mode == RoiMode.separate:
# separate mode: set roi on array if specified
if roi is not None and hasattr(lazy_array, "roi_mode"):
lazy_array.roi = roi
elif roi_mode == RoiMode.concat_y:
# concat mode: roi parameter is ignored, use None (stitch all)
if roi is not None:
logger.debug(
f"roi={roi} ignored when roi_mode=concat_y. "
f"All ROIs will be concatenated."
)
roi = None
if order is not None:
if len(order) != len(planes):
raise ValueError(
f"The length of the `order` ({len(order)}) does not match "
f"the number of planes ({len(planes)})."
)
if any(i < 0 or i >= len(planes) for i in order):
raise ValueError(
f"order indices must be in range [0, {len(planes) - 1}], got {order}"
)
planes = [planes[i] for i in order]
existing_meta = getattr(lazy_array, "metadata", None)
file_metadata = dict(existing_meta or {})
if metadata:
if not isinstance(metadata, dict):
raise ValueError(f"metadata must be a dict, got {type(metadata)}")
file_metadata.update(metadata)
# store roi_mode in metadata as string
file_metadata["roi_mode"] = roi_mode.value
if num_frames is not None:
file_metadata["num_frames"] = int(num_frames)
file_metadata["nframes"] = int(num_frames)
if hasattr(lazy_array, "metadata"):
with contextlib.suppress(AttributeError):
lazy_array.metadata = file_metadata
# axial registration knobs (all optional). defaults match compute_axial_shifts.
axial_max_frames = int(kwargs.pop("max_frames", 200))
axial_chunk_frames = int(kwargs.pop("chunk_frames", 10))
axial_max_reg_xy = int(kwargs.pop("max_reg_xy", 30))
if register_z:
total_planes = (
int(lazy_array._shape5d()[2])
if hasattr(lazy_array, "_shape5d")
else get_param(file_metadata, "nplanes")
)
if validate_axial_shifts(file_metadata, total_planes):
logger.info("using plane_shifts already present in metadata.")
if progress_callback:
progress_callback(1.0, "Using cached plane shifts")
else:
logger.info("computing axial plane shifts...")
compute_axial_shifts(
lazy_array,
metadata=file_metadata,
max_frames=axial_max_frames,
chunk_frames=axial_chunk_frames,
max_reg_xy=axial_max_reg_xy,
progress_callback=progress_callback,
)
# shifts are cumulative over the full stack; when exporting a plane
# subset keep only the matching rows (re-referenced to the first
# written plane) so the output metadata carries one shift per written
# plane — with_axial_shifts requires len(plane_shifts) == output Z.
shifts = file_metadata.get("plane_shifts")
if planes is not None and shifts:
sel = list(planes) if isinstance(planes, (list, tuple)) else [planes]
sel0 = [p - 1 for p in sel]
if len(shifts) != len(sel0) and all(0 <= i < len(shifts) for i in sel0):
sub = [list(shifts[i]) for i in sel0]
y0, x0 = sub[0]
file_metadata["plane_shifts"] = [[y - y0, x - x0] for y, x in sub]
out_planes = len(sel0) if planes is not None and shifts else total_planes
if not validate_axial_shifts(file_metadata, out_planes):
logger.error(
"axial registration did not produce valid plane_shifts."
)
if hasattr(lazy_array, "metadata"):
with contextlib.suppress(AttributeError):
lazy_array.metadata = file_metadata
# Collect input files for processing history
input_files = getattr(lazy_array, "filenames", None)
if input_files:
# handle single path or list of paths
if isinstance(input_files, (str, Path)):
input_files = [str(input_files)]
else:
input_files = [str(f) for f in input_files]
# Extract scan-phase correction parameters if available (ScanImageArray)
scan_phase_params = {}
if hasattr(lazy_array, "fix_phase"):
scan_phase_params["fix_phase"] = getattr(lazy_array, "fix_phase", False)
if hasattr(lazy_array, "use_fft"):
scan_phase_params["use_fft"] = getattr(lazy_array, "use_fft", False)
if hasattr(lazy_array, "phasecorr_method"):
scan_phase_params["phasecorr_method"] = getattr(
lazy_array, "phasecorr_method", None
)
# Start timing for processing history
write_start_time = time.perf_counter()
if hasattr(lazy_array, "_imwrite"):
write_kwargs = kwargs.copy()
if num_frames is not None:
write_kwargs["num_frames"] = num_frames
if dataset_name is not None:
write_kwargs["dataset_name"] = dataset_name
result = lazy_array._imwrite(
outpath,
overwrite=overwrite,
target_chunk_mb=target_chunk_mb,
ext=ext,
progress_callback=progress_callback,
planes=planes,
frames=frames,
channels=channels,
debug=debug,
show_progress=show_progress,
output_name=output_name,
output_suffix=output_suffix,
roi_mode=roi_mode,
**write_kwargs,
)
else:
logger.info(f"Falling back to generic writers for {type(lazy_array)}.")
_try_generic_writers(
lazy_array,
outpath,
overwrite=overwrite,
dataset_name=dataset_name,
)
result = outpath
# Record processing step in metadata
write_duration = time.perf_counter() - write_start_time
# Build extra info for processing history
processing_extra = {
"input_format": type(lazy_array).__name__,
"output_format": ext,
"num_frames": file_metadata.get("num_frames") or file_metadata.get("nframes"),
"shape": list(lazy_array.shape) if hasattr(lazy_array, "shape") else None,
}
# Add scan-phase correction info if present
if scan_phase_params:
processing_extra["scan_phase_correction"] = scan_phase_params
if register_z:
processing_extra["z_registration"] = {
"enabled": True,
"n_planes": len(file_metadata.get("plane_shifts", []))
if "plane_shifts" in file_metadata else 0,
"params": file_metadata.get("plane_shifts_params"),
}
# Add ROI info if specified
if roi is not None:
processing_extra["roi"] = roi
# Add planes info if specified
if planes is not None:
processing_extra["planes"] = (
list(planes) if hasattr(planes, "__iter__") else planes
)
# Add frames info if specified
if frames is not None:
processing_extra["frames"] = (
list(frames) if hasattr(frames, "__iter__") else frames
)
# Add channels info if specified
if channels is not None:
processing_extra["channels"] = (
list(channels) if hasattr(channels, "__iter__") else channels
)
# Collect output files
output_files = None
if result and isinstance(result, Path):
if result.is_dir():
# List files in output directory
out_files = list(result.glob(f"*{ext}"))
if out_files:
output_files = [str(f) for f in out_files[:20]] # Limit to first 20
else:
output_files = [str(result)]
add_processing_step(
file_metadata,
step_name="imwrite",
input_files=input_files,
output_files=output_files,
duration_seconds=write_duration,
extra=processing_extra,
)
# Update lazy_array metadata with processing history if possible
if hasattr(lazy_array, "metadata"):
with contextlib.suppress(AttributeError):
lazy_array.metadata = file_metadata
logger.debug(f"Processing step recorded: imwrite to {ext} in {write_duration:.2f}s")
return result