Source code for mbo_utilities.file_io

from __future__ import annotations

import os
import re
from collections.abc import Sequence
from itertools import product
from pathlib import Path

import dask
import dask.array as da
import ffmpeg
import numpy as np
import tifffile
from matplotlib import cm

from .metadata import is_raw_scanimage
from .scanreader import scans
from .scanreader.multiroi import ROI
from .util import norm_minmax

CHUNKS = {0: 1, 1: "auto", 2: -1, 3: -1}


def zarr_to_dask(zarr_parent):
    """Convert directory of zarr arrays into a Z-stack."""
    # search 3 dirs deep for arrays within our zarr group
    files = get_files(zarr_parent, ".zarray", 3)
    return da.stack([da.from_zarr(Path(x).parent) for x in files], axis=1)

[docs] def npy_to_dask(files, name="", axis=1, astype=None): """ Creates a Dask array that lazily stacks multiple .npy files along a specified axis without fully loading them into memory. Parameters ---------- files : list of str or Path A list of file paths pointing to .npy files containing array data. Each file must have the same shape except possibly along the concatenation axis. name : str, optional A string to be appended to a base name ("from-npy-stack-") to label the resulting Dask array. Default is an empty string. axis : int, optional The axis along which to stack/concatenate the arrays from the provided files. Default is 1. astype : numpy.dtype, optional If provided, the resulting Dask array will be cast to this data type. Otherwise, the data type is inferred from the first file. Returns ------- dask.array.Array Examples -------- >>> # https://www.fastplotlib.org/ >>> import fastplotlib as fpl >>> import mbo_utilities as mbo >>> files = mbo.get_files("path/to/images/", 'fused', 3) # suite3D output >>> arr = npy_to_dask(files, name="stack", axis=1) >>> print(arr.shape) (nz, nt, ny, nx ) >>> # Optionally, cast the array to float32 >>> arr = npy_to_dask(files, axis=1, astype=np.float32) >>> fpl.ImageWidget(arr.transpose(1, 0, 2, 3)).show() """ sample_mov = np.load(files[0], mmap_mode="r") file_ts = [np.load(f, mmap_mode="r").shape[axis] for f in files] nz, nt_sample, ny, nx = sample_mov.shape dtype = sample_mov.dtype chunks = [(nz,), (nt_sample,), (ny,), (nx,)] chunks[axis] = tuple(file_ts) chunks = tuple(chunks) name = "from-npy-stack-%s" % name keys = list(product([name], *[range(len(c)) for c in chunks])) values = [(np.load, files[i], "r") for i in range(len(chunks[axis]))] dsk = dict(zip(keys, values, strict=False)) arr = da.Array(dsk, name, chunks, dtype) if astype is not None: arr = arr.astype(astype) return arr
def is_escaped_string(path: str) -> bool: return bool(re.search(r"\\[a-zA-Z]", path)) def _make_json_serializable(obj): """Convert metadata to JSON serializable format.""" if isinstance(obj, dict): return {k: _make_json_serializable(v) for k, v in obj.items()} if isinstance(obj, list): return [_make_json_serializable(v) for v in obj] if isinstance(obj, np.ndarray): return obj.tolist() if isinstance(obj, (np.integer, np.floating)): return obj.item() return obj def expand_paths(paths: str | Path | Sequence[str | Path]) -> list[Path]: """ Expand a path, list of paths, or wildcard pattern into a sorted list of actual files. This is a handy wrapper for loading images or data files when you’ve got a folder, some wildcards, or a mix of both. Parameters ---------- paths : str, Path, or list of (str or Path) Can be a single path, a wildcard pattern like '*.tif', a folder, or a list of those. Returns ------- list of Path Sorted list of full paths to matching files. Examples -------- >>> expand_paths("data/*.tif") [Path('data/img_000.tif'), Path('data/img_001.tif'), ...] >>> expand_paths(Path("data")) [Path('data/img_000.tif'), Path('data/img_001.tif'), ...] >>> expand_paths(["data/*.tif", Path("more_data")]) [Path('data/img_000.tif'), Path('more_data/img_050.tif'), ...] """ if isinstance(paths, (str, Path)): paths = [paths] elif not isinstance(paths, (list, tuple)): raise TypeError(f"Expected str, Path, or sequence of them, got {type(paths)}") result = [] for p in paths: p = Path(p) if "*" in str(p): result.extend(p.parent.glob(p.name)) elif p.is_dir(): result.extend(p.glob("*")) elif p.exists() and p.is_file(): result.append(p) return sorted(p.resolve() for p in result if p.is_file())
[docs] def read_scan(pathnames, dtype=np.int16): """ Reads a ScanImage scan from a given file or set of file paths and returns a ScanMultiROIReordered object with lazy-loaded data. Parameters ---------- pathnames : str, Path, or sequence of str/Path A single path, a wildcard pattern (e.g. ``*.tif``), or a list of paths specifying the ScanImage TIFF files to read. dtype : numpy.dtype, optional The data type to use when reading the scan data. Default is np.int16. Returns ------- ScanMultiROIReordered A scan object with metadata and lazily loaded data. Raises FileNotFoundError if no files match the specified path(s). Notes ----- If the provided path string appears to include escaped characters (for example, unintentional backslashes), a warning message is printed suggesting the use of a raw string (r'...') or double backslashes. Examples -------- >>> import mbo_utilities as mbo >>> import matplotlib.pyplot as plt >>> scan = mbo.read_scan(r"C:\\path\to\\scan\\*.tif") >>> plt.imshow(scan[0, 5, 0, 0], cmap='gray') # First frame of z-plane 6 """ if isinstance(pathnames, str) and is_escaped_string(pathnames): print("Detected possible escaped characters in the path." " Use a raw string (r'...') or double backslashes.") filenames = expand_paths(pathnames) if len(filenames) == 0: error_msg = f"Pathname(s) {pathnames} do not match any files in disk." raise FileNotFoundError(error_msg) scan = ScanMultiROIReordered(join_contiguous=True) scan.read_data(filenames, dtype=dtype) return scan
class ScanMultiROIReordered(scans.ScanMultiROI): """ A subclass of ScanMultiROI that ignores the num_fields dimension and reorders the output to [time, z, x, y]. """ def __getitem__(self, key): if not isinstance(key, tuple): key = (key,) key = tuple(list(k) if isinstance(k, range) else k for k in key) # Call the parent class's __getitem__ with the reordered key item = super().__getitem__((0, key[2], key[3], key[1], key[0])) if item.ndim == 2: return item if item.ndim == 3: return np.transpose(item, (2, 0, 1)) if item.ndim == 4: return np.transpose(item, (3, 2, 0, 1)) raise ValueError(f"Unexpected number of dimensions: {item.ndim}") @property def shape(self): return self.num_frames, self.num_channels, self.field_heights[0], self.field_widths[0] @property def ndim(self): return 4 @property def size(self): return self.num_frames * self.num_channels * self.field_heights[0] * self.field_widths[0] @property def scanning_depths(self): """ We override this because LBM should always be at a single scanning depth. """ return [0] def _create_rois(self): """ Create scan rois from the configuration file. Override the base method to force ROI's that have multiple 'zs' to a single depth. """ try: roi_infos = self.tiff_files[0].scanimage_metadata["RoiGroups"]["imagingRoiGroup"]["rois"] except KeyError: raise RuntimeError("This file is not a raw-scanimage tiff or is missing tiff.scanimage_metadata.") roi_infos = roi_infos if isinstance(roi_infos, list) else [roi_infos] roi_infos = list(filter(lambda r: isinstance(r["zs"], (int, float, list)), roi_infos)) # discard empty/malformed ROIs for roi_info in roi_infos: # LBM uses a single depth that is not stored in metadata, so force this to be 0 roi_info["zs"] = [0] rois = [ROI(roi_info) for roi_info in roi_infos] return rois
[docs] def get_files(base_dir, str_contains="", max_depth=1, sort_ascending=True, exclude_dirs=None) -> list | Path: """ Recursively search for files in a specified directory whose names contain a given substring, limiting the search to a maximum subdirectory depth. Optionally, the resulting list of file paths is sorted in ascending order using numeric parts of the filenames when available. Parameters ---------- base_dir : str or Path The base directory where the search begins. This path is expanded (e.g., '~' is resolved) and converted to an absolute path. str_contains : str, optional A substring that must be present in a file's name for it to be included in the result. If empty, all files are matched. max_depth : int, optional The maximum number of subdirectory levels (relative to the base directory) to search. Defaults to 1. If set to 0, it is automatically reset to 1. sort_ascending : bool, optional If True (default), the matched file paths are sorted in ascending alphanumeric order. The sort key extracts numeric parts from filenames so that, for example, "file2" comes before "file10". exclude_dirs : iterable of str or Path, optional An iterable of directories to exclude from the resulting list of file paths. By default will exclude ".venv/", "__pycache__/", ".git" and ".github"]. Returns ------- list of str A list of full file paths (as strings) for files within the base directory (and its subdirectories up to the specified depth) that contain the provided substring. Raises ------ FileNotFoundError If the base directory does not exist. NotADirectoryError If the specified base_dir is not a directory. Examples -------- >>> import mbo_utilities as mbo >>> # Get all files that contain "ops.npy" in their names by searching up to 3 levels deep: >>> ops_files = mbo.get_files("path/to/files", "ops.npy", max_depth=3) >>> # Get only files containing "tif" in the current directory (max_depth=1): >>> tif_files = mbo.get_files("path/to/files", "tif") """ base_path = Path(base_dir).expanduser().resolve() if not base_path.exists(): raise FileNotFoundError(f"Directory '{base_path}' does not exist.") if not base_path.is_dir(): raise NotADirectoryError(f"'{base_path}' is not a directory.") if max_depth == 0: print("Max-depth of 0 is not allowed. Setting to 1.") max_depth = 1 base_depth = len(base_path.parts) pattern = f"*{str_contains}*" if str_contains else "*" if exclude_dirs is None: exclude_dirs = [".venv", ".git", "__pycache__"] def is_excluded(path): return any(excl in path.parts for excl in exclude_dirs) files = [ file for file in base_path.rglob(pattern) if len(file.parts) - base_depth <= max_depth and file.is_file() and not is_excluded(file) ] # files = [ # file for file in base_path.rglob(pattern) # if len(file.parts) - base_depth <= max_depth and file.is_file() # ] if sort_ascending: import re def numerical_sort_key(path): match = re.search(r"\d+", path.name) return int(match.group()) if match else float("inf") files.sort(key=numerical_sort_key) return [str(file) for file in files]
[docs] def zstack_from_files(files: list, proj="mean"): """ Creates a Z-Stack image by applying a projection to each TIFF file in the provided list and stacking the results into a NumPy array. Parameters ---------- files : list of str or Path A list of file paths to TIFF images. Files whose extensions are not '.tif' or '.tiff' are ignored. proj : str, optional The type of projection to apply to each TIFF image. Valid options are 'mean', 'max', and 'std'. Default is 'mean'. Returns ------- numpy.ndarray A stacked array of projected images with the new dimension corresponding to the file order. For example, for N input files, the output shape will be (N, height, width). Raises ------ ValueError If an unsupported projection type is provided. Examples -------- >>> import mbo_utilities as mbo >>> files = mbo.get_files("/path/to/files", "tif") >>> z_stack = mbo.zstack_from_files(files, proj="max") >>> z_stack.shape # (3, height, width) """ lazy_arrays = [] for file in files: if Path(file).suffix not in [".tif", ".tiff"]: continue arr = tifffile.memmap(file) if proj == "mean": img = np.mean(arr, axis=0) elif proj == "max": img = np.max(arr, axis=0) elif proj == "std": img = np.std(arr, axis=0) else: raise ValueError(f"Unsupported projection '{proj}'") lazy_arrays.append(img) return np.stack(lazy_arrays, axis=0)
[docs] def save_png(fname, data): """ Saves a given image array as a PNG file using Matplotlib. Parameters ---------- fname : str or Path The file name (or full path) where the PNG image will be saved. data : array-like The image data to be visualized and saved. Can be any 2D or 3D array that Matplotlib can display. Examples -------- >>> import mbo_utilities as mbo >>> import tifffile >>> data = tifffile.memmap("path/to/plane_0.tiff") >>> frame = data[0, ...] >>> mbo.save_png("plane_0_frame_1.png", frame) """ # TODO: move this to a separate module that imports matplotlib import matplotlib.pyplot as plt plt.imshow(data) plt.axis("tight") plt.axis("off") plt.tight_layout() plt.savefig(fname, dpi=300, bbox_inches="tight") print(f"Saved data to {fname}")
[docs] def save_mp4( fname: str | Path | np.ndarray, images, framerate=60, speedup=1, chunk_size=100, cmap="gray", win=7, vcodec="libx264", normalize=True ): """ Save a video from a 3D array or TIFF stack to `.mp4`. Parameters ---------- fname : str Output video file name. images : numpy.ndarray or str Input 3D array (T x H x W) or a file path to a TIFF stack. framerate : int, optional Original framerate of the video, by default 60. speedup : int, optional Factor to increase the playback speed, by default 1 (no speedup). chunk_size : int, optional Number of frames to process and write in a single chunk, by default 100. cmap : str, optional Colormap to apply to the video frames, by default "gray". Must be a valid Matplotlib colormap name. win : int, optional Temporal averaging window size. If `win > 1`, frames are averaged over the specified window using convolution. By default, 7. vcodec : str, optional Video codec to use, by default 'libx264'. normalize : bool, optional Flag to min-max normalize the video frames, by default True. Raises ------ FileNotFoundError If the input file does not exist when `images` is provided as a file path. ValueError If `images` is not a valid 3D NumPy array or a file path to a TIFF stack. Notes ----- - The input array `images` must have the shape (T, H, W), where T is the number of frames, H is the height, and W is the width. - The `win` parameter performs temporal smoothing by averaging over adjacent frames. Examples -------- Save a video from a 3D NumPy array with a gray colormap and 2x speedup: >>> import numpy as np >>> images = np.random.rand(100, 600, 576) * 255 >>> save_mp4('output.mp4', images, framerate=17, cmap='gray', speedup=2) Save a video with temporal averaging applied over a 5-frame window at 4x speed: >>> save_mp4('output_smoothed.mp4', images, framerate=30, speedup=4, cmap='gray', win=5) Save a video from a TIFF stack: >>> save_mp4('output.mp4', 'path/to/stack.tiff', framerate=60, cmap='gray') """ if isinstance(images, (str, Path)): print(f"Loading TIFF stack from {images}") if Path(images).is_file(): images = tifffile.memmap(images) else: raise FileNotFoundError(f"File not found: {images}") T, height, width = images.shape colormap = cm.get_cmap(cmap) if normalize: print("Normalizing mp4 images to [0, 1]") images = norm_minmax(images) if win and win > 1: print(f"Applying temporal averaging with window size {win}") kernel = np.ones(win) / win images = np.apply_along_axis(lambda x: np.convolve(x, kernel, mode="same"), axis=0, arr=images) print(f"Saving {T} frames to {fname}") output_framerate = int(framerate * speedup) process = ( ffmpeg .input("pipe:", format="rawvideo", pix_fmt="rgb24", s=f"{width}x{height}", framerate=output_framerate) .output(str(fname), pix_fmt="yuv420p", vcodec=vcodec, r=output_framerate) .overwrite_output() .run_async(pipe_stdin=True) ) for start in range(0, T, chunk_size): end = min(start + chunk_size, T) chunk = images[start:end] colored_chunk = (colormap(chunk)[:, :, :, :3] * 255).astype(np.uint8) for frame in colored_chunk: process.stdin.write(frame.tobytes()) process.stdin.close() process.wait() print(f"Video saved to {fname}")
def _is_arraylike(obj) -> bool: """ Checks if the object is array-like. For now just checks if obj has `__getitem__()` """ for attr in ["__getitem__", "shape", "ndim"]: if not hasattr(obj, attr): return False return True def to_lazy_array(data_in: os.PathLike | np.ndarray | list[os.PathLike | np.ndarray]): """ Convencience function to resolve various data_in variants into lazy arrays. """ if _is_arraylike(data_in): return data_in if isinstance(data_in, list): if is_raw_scanimage(data_in[0]): return read_scan(data_in) else: return zstack_from_files(data_in) if isinstance(data_in, (str, Path)): data_in = Path(data_in).expanduser().resolve() if data_in.is_file(): # check suffix if data_in.suffix in [".tif", ".tiff"]: return tifffile.memmap(data_in) elif data_in.suffix == ".npy": return np.memmap(data_in) else: raise TypeError(f"Invalid type {type(data_in)}")