Source code for mbo_utilities.writer

"""
imwrite - Write lazy imaging arrays to disk.

This module provides the imwrite() function for writing imaging data to
various file formats with support for ROI selection, z-plane registration,
chunked streaming, and format conversion.
"""

from __future__ import annotations

import logging
import time
from pathlib import Path


from mbo_utilities import log
from mbo_utilities._writers import _try_generic_writers, add_processing_step
from mbo_utilities.arrays._registration import (
    compute_axial_shifts,
    validate_axial_shifts,
)
from mbo_utilities.metadata import RoiMode, get_param
from typing import TYPE_CHECKING
import contextlib

import numpy as np

if TYPE_CHECKING:
    from collections.abc import Callable, Sequence

logger = log.get("writer")


[docs] def imwrite( lazy_array, outpath: str | Path, ext: str = ".tiff", planes: list | tuple | int | None = None, frames: list | tuple | int | None = None, channels: list | tuple | int | None = None, num_frames: int | None = None, register_z: bool = False, roi_mode: RoiMode | str = RoiMode.concat_y, roi: int | Sequence[int] | None = None, metadata: dict | None = None, overwrite: bool = False, order: list | tuple | None = None, target_chunk_mb: int = 100, progress_callback: Callable | None = None, debug: bool = False, show_progress: bool = True, output_name: str | None = None, output_suffix: str | None = None, dataset_name: str | None = None, dim_order: str | Sequence[str] | None = None, **kwargs, ): """ Write a supported lazy imaging array to disk. This function handles writing multi-dimensional imaging data to various formats, with support for ROI selection, z-plane registration, chunked streaming, and format conversion. Use with `imread()` to load and convert imaging data. Parameters ---------- lazy_array : object A lazy array from `imread()` or a numpy array. Any object with `.shape`, `.dtype`, and `_imwrite()` method is supported. Use `mbo formats` CLI command to list all supported input formats. outpath : str or Path Target directory to write output files. Will be created if it doesn't exist. Files are named automatically based on plane/ROI (e.g., `plane01_roi1.tiff`). ext : str, default=".tiff" Output format extension. Supported formats: - `.tiff`, `.tif` : Multi-page TIFF (BigTIFF for >4GB) - `.bin` : Suite2p-compatible binary format with ops.npy metadata - `.zarr` : Zarr v3 array store - `.h5`, `.hdf5` : HDF5 format planes : list | tuple | int | None, optional Z-planes to export (1-based indexing). Options: - None (default) : Export all planes - int : Single plane, e.g. `planes=7` exports only plane 7 - list/tuple : Specific planes, e.g. `planes=[1, 7, 14]` frames : list | tuple | int | None, optional Timepoints to export (1-based indexing). Options: - None (default) : Export all frames - int : Single frame, e.g. `frames=100` exports only frame 100 - list/tuple : Specific frames, e.g. `frames=[1, 50, 100]` - range : Range of frames, e.g. `frames=list(range(1, 101))` channels : list | tuple | int | None, optional Color channels to export (1-based indexing). Only applies to arrays with a C dimension (e.g., multi-color imaging). Options: - None (default) : Export all channels - int : Single channel, e.g. `channels=1` - list/tuple : Specific channels, e.g. `channels=[1, 2]` roi_mode : RoiMode | str, default=RoiMode.concat_y Mode for handling multi-ROI data. Options: - RoiMode.concat_y : Horizontally concatenate ROIs into single FOV (default) - RoiMode.separate : Write each ROI to separate files String values are accepted (case-insensitive): "concat_y", "separate". roi : int | Sequence[int] | None, optional Specific ROI(s) to export when roi_mode=RoiMode.separate. Options: - None (default) : Export all ROIs - int > 0 : Export specific ROI, e.g. `roi=1` exports only ROI 1 - list/tuple : Export specific ROIs, e.g. `roi=[1, 3]` Note: When roi_mode=RoiMode.concat_y, this parameter is ignored. num_frames : int, optional Number of frames to export. If None (default), exports all frames. register_z : bool, default=False Compute per-plane rigid shifts via phase correlation and store them in ``metadata["plane_shifts"]``. The shifts are not applied to the output pixels; viewers consume ``plane_shifts`` to align planes at render time (e.g. napari layer ``translate`` or ``AxiallyAlignedView``). Optional tunables via kwargs: ``max_frames`` (subsample count, default 200), ``chunk_frames`` (streaming batch, default 10), ``max_reg_xy`` (search radius in pixels, default 30). GPU is used automatically when cupy + CUDA are available. metadata : dict, optional Additional metadata to merge into output file headers/attributes. overwrite : bool, default=False Whether to overwrite existing output files. order : list | tuple, optional Reorder planes before writing. Must have same length as `planes`. target_chunk_mb : int, optional Target chunk size in MB for streaming writes. Default is 100 MB. progress_callback : Callable, optional Callback function for progress updates: `callback(progress, current_plane)`. debug : bool, default=False Enable verbose logging for troubleshooting. show_progress : bool, default=True Show tqdm progress bar during writing. Set to False in notebooks when you don't want progress output cluttering the display. output_name : str, optional Filename for binary output when ext=".bin". output_suffix : str, optional Custom suffix to append to output filenames. If None (default), files are named with "_stitched" for multi-ROI data when roi is None, or "_roiN" for specific ROIs. Examples: "_stitched", "_processed", "_session1". The suffix is automatically sanitized (illegal characters removed, double extensions prevented, underscore prefix added if missing). dataset_name : str, optional Name of the HDF5 dataset to write under (only applies when writing ``.h5`` / ``.hdf5``). Default is ``"mov"`` — matches suite2p / caiman convention and the auto-detect order in :class:`H5Array`. Pass ``"data"`` for the legacy mbo name, or any other key for custom consumers. Ignored for non-h5 formats. dim_order : str, optional Axis labels for an in-memory numpy array of ndim 3/4/5, drawn from ``"TCZYX"``. The array is permuted and singleton-padded to canonical 5D TCZYX before writing. Examples: ``"TYX"``, ``"TZYX"``, ``"TCYX"``, ``"TCZYX"``. Ignored when ``lazy_array`` is already a lazy array. **kwargs Additional format-specific options passed to writer backends. Zarr-specific options (ext=".zarr"): ``sharded`` (bool, default True) uses Zarr v3 sharding codec, ``compressor`` (str, default "none") one of "none", "gzip", "zstd", "blosc-lz4", "blosc-zstd". Default is no compression so interactive scrubbing doesn't pay decompression cost per frame; pass an explicit value when storage size matters more than random-access speed. ``compression_level`` (int) compressor-specific level (gzip 0–9, zstd 1–22, blosc 1–9). Ignored when ``compressor="none"``. ``ome`` (bool, default True) writes OME-NGFF v0.5 metadata, ``pyramid`` (bool, default False) generates multi-resolution pyramid, ``pyramid_max_layers`` (int, default 4) sets max resolution levels, ``pyramid_method`` (str, default "mean") sets downsampling method. Returns ------- Path Path to the output directory containing written files. Examples -------- >>> from mbo_utilities import imread, imwrite >>> data = imread("path/to/raw/*.tiff") >>> imwrite(data, "output/session1", roi=None) # Stitch all ROIs >>> # Save specific planes >>> imwrite(data, "output/session1", planes=[1, 7, 14]) >>> # Split ROIs >>> imwrite(data, "output/session1", roi=0) >>> # Z-plane registration >>> imwrite(data, "output/registered", register_z=True) >>> # Convert to Suite2p binary >>> imwrite(data, "output/suite2p", ext=".bin", roi=0) >>> # Save to Zarr >>> imwrite(data, "output/zarr_store", ext=".zarr") """ if debug: logger.setLevel(logging.INFO) logger.info("Debug mode enabled; setting log level to INFO.") logger.propagate = True else: logger.setLevel(logging.WARNING) logger.propagate = False # normalize roi_mode to enum if isinstance(roi_mode, str): roi_mode = RoiMode.from_string(roi_mode) # save path if not isinstance(outpath, (str, Path)): raise TypeError( f"`outpath` must be a string or Path, got {type(outpath)} instead." ) outpath = Path(outpath) if not outpath.parent.is_dir(): raise ValueError( f"{outpath} is not inside a valid directory." f" Please create the directory first." ) outpath.mkdir(exist_ok=True) # auto-wrap raw numpy arrays so the full ext-aware writer pipeline runs if isinstance(lazy_array, np.ndarray): from mbo_utilities.arrays.numpy import NumpyArray lazy_array = NumpyArray(lazy_array, dim_order=dim_order) elif dim_order is not None: logger.debug("dim_order ignored: lazy_array is not a raw numpy array") # handle roi based on roi_mode # ROI support detected via duck typing: hasattr(arr, 'roi_mode') if roi_mode == RoiMode.separate: # separate mode: set roi on array if specified if roi is not None and hasattr(lazy_array, "roi_mode"): lazy_array.roi = roi elif roi_mode == RoiMode.concat_y: # concat mode: roi parameter is ignored, use None (stitch all) if roi is not None: logger.debug( f"roi={roi} ignored when roi_mode=concat_y. " f"All ROIs will be concatenated." ) roi = None if order is not None: if len(order) != len(planes): raise ValueError( f"The length of the `order` ({len(order)}) does not match " f"the number of planes ({len(planes)})." ) if any(i < 0 or i >= len(planes) for i in order): raise ValueError( f"order indices must be in range [0, {len(planes) - 1}], got {order}" ) planes = [planes[i] for i in order] existing_meta = getattr(lazy_array, "metadata", None) file_metadata = dict(existing_meta or {}) if metadata: if not isinstance(metadata, dict): raise ValueError(f"metadata must be a dict, got {type(metadata)}") file_metadata.update(metadata) # store roi_mode in metadata as string file_metadata["roi_mode"] = roi_mode.value if num_frames is not None: file_metadata["num_frames"] = int(num_frames) file_metadata["nframes"] = int(num_frames) if hasattr(lazy_array, "metadata"): with contextlib.suppress(AttributeError): lazy_array.metadata = file_metadata # axial registration knobs (all optional). defaults match compute_axial_shifts. axial_max_frames = int(kwargs.pop("max_frames", 200)) axial_chunk_frames = int(kwargs.pop("chunk_frames", 10)) axial_max_reg_xy = int(kwargs.pop("max_reg_xy", 30)) if register_z: total_planes = ( int(lazy_array._shape5d()[2]) if hasattr(lazy_array, "_shape5d") else get_param(file_metadata, "nplanes") ) if validate_axial_shifts(file_metadata, total_planes): logger.info("using plane_shifts already present in metadata.") if progress_callback: progress_callback(1.0, "Using cached plane shifts") else: logger.info("computing axial plane shifts...") compute_axial_shifts( lazy_array, metadata=file_metadata, max_frames=axial_max_frames, chunk_frames=axial_chunk_frames, max_reg_xy=axial_max_reg_xy, progress_callback=progress_callback, ) # shifts are cumulative over the full stack; when exporting a plane # subset keep only the matching rows (re-referenced to the first # written plane) so the output metadata carries one shift per written # plane — with_axial_shifts requires len(plane_shifts) == output Z. shifts = file_metadata.get("plane_shifts") if planes is not None and shifts: sel = list(planes) if isinstance(planes, (list, tuple)) else [planes] sel0 = [p - 1 for p in sel] if len(shifts) != len(sel0) and all(0 <= i < len(shifts) for i in sel0): sub = [list(shifts[i]) for i in sel0] y0, x0 = sub[0] file_metadata["plane_shifts"] = [[y - y0, x - x0] for y, x in sub] out_planes = len(sel0) if planes is not None and shifts else total_planes if not validate_axial_shifts(file_metadata, out_planes): logger.error( "axial registration did not produce valid plane_shifts." ) if hasattr(lazy_array, "metadata"): with contextlib.suppress(AttributeError): lazy_array.metadata = file_metadata # Collect input files for processing history input_files = getattr(lazy_array, "filenames", None) if input_files: # handle single path or list of paths if isinstance(input_files, (str, Path)): input_files = [str(input_files)] else: input_files = [str(f) for f in input_files] # Extract scan-phase correction parameters if available (ScanImageArray) scan_phase_params = {} if hasattr(lazy_array, "fix_phase"): scan_phase_params["fix_phase"] = getattr(lazy_array, "fix_phase", False) if hasattr(lazy_array, "use_fft"): scan_phase_params["use_fft"] = getattr(lazy_array, "use_fft", False) if hasattr(lazy_array, "phasecorr_method"): scan_phase_params["phasecorr_method"] = getattr( lazy_array, "phasecorr_method", None ) # Start timing for processing history write_start_time = time.perf_counter() if hasattr(lazy_array, "_imwrite"): write_kwargs = kwargs.copy() if num_frames is not None: write_kwargs["num_frames"] = num_frames if dataset_name is not None: write_kwargs["dataset_name"] = dataset_name result = lazy_array._imwrite( outpath, overwrite=overwrite, target_chunk_mb=target_chunk_mb, ext=ext, progress_callback=progress_callback, planes=planes, frames=frames, channels=channels, debug=debug, show_progress=show_progress, output_name=output_name, output_suffix=output_suffix, roi_mode=roi_mode, **write_kwargs, ) else: logger.info(f"Falling back to generic writers for {type(lazy_array)}.") _try_generic_writers( lazy_array, outpath, overwrite=overwrite, dataset_name=dataset_name, ) result = outpath # Record processing step in metadata write_duration = time.perf_counter() - write_start_time # Build extra info for processing history processing_extra = { "input_format": type(lazy_array).__name__, "output_format": ext, "num_frames": file_metadata.get("num_frames") or file_metadata.get("nframes"), "shape": list(lazy_array.shape) if hasattr(lazy_array, "shape") else None, } # Add scan-phase correction info if present if scan_phase_params: processing_extra["scan_phase_correction"] = scan_phase_params if register_z: processing_extra["z_registration"] = { "enabled": True, "n_planes": len(file_metadata.get("plane_shifts", [])) if "plane_shifts" in file_metadata else 0, "params": file_metadata.get("plane_shifts_params"), } # Add ROI info if specified if roi is not None: processing_extra["roi"] = roi # Add planes info if specified if planes is not None: processing_extra["planes"] = ( list(planes) if hasattr(planes, "__iter__") else planes ) # Add frames info if specified if frames is not None: processing_extra["frames"] = ( list(frames) if hasattr(frames, "__iter__") else frames ) # Add channels info if specified if channels is not None: processing_extra["channels"] = ( list(channels) if hasattr(channels, "__iter__") else channels ) # Collect output files output_files = None if result and isinstance(result, Path): if result.is_dir(): # List files in output directory out_files = list(result.glob(f"*{ext}")) if out_files: output_files = [str(f) for f in out_files[:20]] # Limit to first 20 else: output_files = [str(result)] add_processing_step( file_metadata, step_name="imwrite", input_files=input_files, output_files=output_files, duration_seconds=write_duration, extra=processing_extra, ) # Update lazy_array metadata with processing history if possible if hasattr(lazy_array, "metadata"): with contextlib.suppress(AttributeError): lazy_array.metadata = file_metadata logger.debug(f"Processing step recorded: imwrite to {ext} in {write_duration:.2f}s") return result