Source code for mbo_utilities.file_io

import json
from collections.abc import Sequence
from io import StringIO
from itertools import product
import re

from pathlib import Path
import numpy as np

from icecream import ic

import dask.array as da
from tifffile import TiffFile

from . import log
from .metadata import is_raw_scanimage

try:
    from zarr import open as zarr_open
    from zarr.storage import FsspecStore
    from fsspec.implementations.reference import ReferenceFileSystem
    HAS_ZARR = True
except ImportError:
    HAS_ZARR = False
    zarr_open = None
    ReferenceFileSystem = None
    FsspecStore = None

CHUNKS = {0: 1, 1: "auto", 2: -1, 3: -1}

SAVE_AS_TYPES = [".tiff", ".bin", ".h5", ".zarr"]

logger = log.get("file_io")

PIPELINE_TAGS = ("plane", "roi", "z", "plane_", "roi_", "z_")

[docs] def load_ops(ops_input: str | Path | list[str | Path]): """Simple utility load a suite2p npy file""" if isinstance(ops_input, (str, Path)): return np.load(ops_input, allow_pickle=True).item() elif isinstance(ops_input, dict): return ops_input print("Warning: No valid ops file provided, returning None.") return {}
[docs] def write_ops(metadata, raw_filename): """ Write metadata to an ops file alongside the given filename. metadata must contain 'shape' 'pixel_resolution', 'frame_rate' keys. """ logger.info(f"Writing ops file for {raw_filename} with metadata: {metadata}") assert isinstance(raw_filename, (str, Path)), "filename must be a string or Path object" filename = Path(raw_filename).expanduser().resolve() # this convention means input can be either if filename.is_file(): root = filename.parent else: root = filename ops_path = root.joinpath("ops.npy") logger.debug(f"Writing ops file to {ops_path}.") shape = metadata["shape"] nt = shape[0] Lx = shape[-2] Ly = shape[-1] if "pixel_resolution" not in metadata: logger.warning("No pixel resolution found in metadata, using default [2, 2].") if "fs" not in metadata: if "frame_rate" in metadata: metadata["fs"] = metadata["frame_rate"] elif "framerate" in metadata: metadata["fs"] = metadata["framerate"] else: logger.debug("No frame rate found in metadata; defaulting fs=10") metadata["fs"] = 10 dx, dy = metadata.get("pixel_resolution", [2, 2]) ops = { # suite2p needs these "Ly": Ly, "Lx": Lx, "fs": metadata['fs'], "nframes": nt, "dx": dx, "dy": dy, "ops_path": str(ops_path), # and dump the rest of the metadata **metadata, } np.save(ops_path, ops) logger.debug(f"Ops file written to {ops_path} with metadata:\n" f" {ops}")
def normalize_file_url(path): """ Derive a folder tag from a filename based on “planeN”, “roiN”, or "tagN" patterns. Parameters ---------- path : str or pathlib.Path File path or name whose stem will be parsed. Returns ------- str If the stem starts with “plane”, “roi”, or “res” followed by an integer, returns that tag plus the integer (e.g. “plane3”, “roi7”, “res2”). Otherwise returns the original stem unchanged. Examples -------- >>> normalize_file_url("plane_01.tif") 'plane1' >>> normalize_file_url("plane2.bin") 'plane2' >>> normalize_file_url("roi5.raw") 'roi5' >>> normalize_file_url("ROI_10.dat") 'roi10' >>> normalize_file_url("res-3.h5") 'res3' >>> normalize_file_url("assembled_data_1.tiff") 'assembled_data_1' >>> normalize_file_url("file_12.tif") 'file_12' """ name = Path(path).stem for tag in PIPELINE_TAGS: low = name.lower() if low.startswith(tag): suffix = name[len(tag):] if suffix and (suffix[0] in ("_", "-")): suffix = suffix[1:] if suffix.isdigit(): return f"{tag}{int(suffix)}" return name
[docs] def npy_to_dask(files, name="", axis=1, astype=None): """ Creates a Dask array that lazily stacks multiple .npy files along a specified axis without fully loading them into memory. Taken from suite3d for convenience https://github.com/alihaydaroglu/suite3d/blob/py310/suite3d/utils.py To avoid the unnessessary import. Very nice function, thanks Ali! Parameters ---------- files : list of str or Path A list of file paths pointing to .npy files containing array data. Each file must have the same shape except possibly along the concatenation axis. name : str, optional A string to be appended to a base name ("from-npy-stack-") to label the resulting Dask array. Default is an empty string. axis : int, optional The axis along which to stack/concatenate the arrays from the provided files. Default is 1. astype : numpy.dtype, optional If provided, the resulting Dask array will be cast to this data type. Otherwise, the data type is inferred from the first file. Returns ------- dask.array.Array Examples -------- >>> # https://www.fastplotlib.org/ >>> import fastplotlib as fpl >>> import mbo_utilities as mbo >>> files = mbo.get_files("path/to/images/", 'fused', 3) # suite3D output >>> arr = npy_to_dask(files, name="stack", axis=1) >>> print(arr.shape) (nz, nt, ny, nx ) >>> # Optionally, cast the array to float32 >>> arr = npy_to_dask(files, axis=1, astype=np.float32) >>> fpl.ImageWidget(arr.transpose(1, 0, 2, 3)).show() """ sample_mov = np.load(files[0], mmap_mode="r") file_ts = [np.load(f, mmap_mode="r").shape[axis] for f in files] nz, nt_sample, ny, nx = sample_mov.shape dtype = sample_mov.dtype chunks = [(nz,), (nt_sample,), (ny,), (nx,)] chunks[axis] = tuple(file_ts) chunks = tuple(chunks) name = "from-npy-stack-%s" % name keys = list(product([name], *[range(len(c)) for c in chunks])) values = [(np.load, files[i], "r") for i in range(len(chunks[axis]))] dsk = dict(zip(keys, values, strict=False)) arr = da.Array(dsk, name, chunks, dtype) if astype is not None: arr = arr.astype(astype) return arr
[docs] def expand_paths(paths: str | Path | Sequence[str | Path]) -> list[Path]: """ Expand a path, list of paths, or wildcard pattern into a sorted list of actual files. This is a handy wrapper for loading images or data files when you’ve got a folder, some wildcards, or a mix of both. Parameters ---------- paths : str, Path, or list of (str or Path) Can be a single path, a wildcard pattern like '*.tif', a folder, or a list of those. Returns ------- list of Path Sorted list of full paths to matching files. Examples -------- >>> expand_paths("data/*.tif") [Path('data/img_000.tif'), Path('data/img_001.tif'), ...] >>> expand_paths(Path("data")) [Path('data/img_000.tif'), Path('data/img_001.tif'), ...] >>> expand_paths(["data/*.tif", Path("more_data")]) [Path('data/img_000.tif'), Path('more_data/img_050.tif'), ...] """ if isinstance(paths, (str, Path)): paths = [paths] elif not isinstance(paths, (list, tuple)): raise TypeError(f"Expected str, Path, or sequence of them, got {type(paths)}") result = [] for p in paths: p = Path(p) if "*" in str(p): result.extend(p.parent.glob(p.name)) elif p.is_dir(): result.extend(p.glob("*")) elif p.exists() and p.is_file(): result.append(p) return sorted(p.resolve() for p in result if p.is_file())
def _tiff_to_fsspec(tif_path: Path, base_dir: Path) -> dict: """ Create a kerchunk reference for a single TIFF file. Parameters ---------- tif_path : Path Path to the TIFF file on disk. base_dir : Path Directory representing the “root” URI for the reference. Returns ------- refs : dict A kerchunk reference dict (in JSON form) for this single TIFF. """ with TiffFile(str(tif_path.expanduser().resolve())) as tif: with StringIO() as f: store = tif.aszarr() store.write_fsspec(f, url=base_dir.as_uri()) refs = json.loads(f.getvalue()) # type: ignore return refs def _multi_tiff_to_fsspec(tif_files: list[Path], base_dir: Path) -> dict: assert len(tif_files) > 1, "Need at least two TIFF files to combine." combined_refs: dict[str, str] = {} per_file_refs = [] total_shape = None total_chunks = None zarr_meta = {} for tif_path in tif_files: # Create a json reference for each TIFF file inner_refs = _tiff_to_fsspec(tif_path, base_dir) zarr_meta = json.loads(inner_refs.pop(".zarray")) inner_refs.pop(".zattrs", None) shape = zarr_meta["shape"] chunks = zarr_meta["chunks"] if total_shape is None: total_shape = shape.copy() total_chunks = chunks else: assert shape[1:] == total_shape[1:], f"Shape mismatch in {tif_path}" assert chunks == total_chunks, f"Chunk mismatch in {tif_path}" total_shape[0] += shape[0] # accumulate along axis 0 per_file_refs.append((inner_refs, shape)) combined_zarr_meta = { "shape": total_shape, # total shape tracks the full-assembled image shape "chunks": total_chunks, "dtype": zarr_meta["dtype"], "compressor": zarr_meta["compressor"], "filters": zarr_meta.get("filters", None), "order": zarr_meta["order"], "zarr_format": zarr_meta["zarr_format"], "fill_value": zarr_meta.get("fill_value", 0), } combined_refs[".zarray"] = json.dumps(combined_zarr_meta) combined_refs[".zattrs"] = json.dumps( {"_ARRAY_DIMENSIONS": ["T", "C", "Y", "X"][:len(total_shape)]} ) axis0_offset = 0 # since we are combining along axis 0, we need to adjust the keys # in the inner_refs to account for the offset along that axis. for inner_refs, shape in per_file_refs: chunksize0 = total_chunks[0] for key, val in inner_refs.items(): idx = list(map(int, key.strip("/").split("."))) idx[0] += axis0_offset // chunksize0 new_key = ".".join(map(str, idx)) combined_refs[new_key] = val axis0_offset += shape[0] return combined_refs def read_scan( pathnames, dtype=np.int16, roi=None, fix_phase: bool = True, phasecorr_method: str = "frame", border: int | tuple[int, int, int, int] = 3, upsample: int = 1, max_offset: int = 4, ): """ Reads a ScanImage scan from a given file or set of file paths and returns a ScanMultiROIReordered object with lazy-loaded data. Parameters ---------- pathnames : str, Path, or sequence of str/Path A single path to, a wildcard pattern (e.g. ``*.tif``), or a list of paths specifying the ScanImage TIFF files to read. roi : int, optional Specify ROI to export if only exporting a single ROI. 1-based. Defaults to None, which exports pre-assembled (tiled) rois. fix_phase : bool, optional If True, applies phase correction to the scan data. Default is False. phasecorr_method : str, optional The method to use for phase correction. Options are 'subpix', 'two_step', border : int or tuple of int, optional The border size to use for phase correction. If an int, applies the same border to all sides. If a tuple, specifies (top, bottom, left, right) borders. upsample : int, optional The for subpixel correction, upsample factor for phase correction. A value of 1 clamps to whole-pixel. Default is 10. max_offset : int, optional The maximum allowed phase offset in pixels. If the computed offset exceeds this value, it is clamped to the maximum. Default is 3. dtype : numpy.dtype, optional The data type to use when reading the scan data. Default is np.int16. Returns ------- mbo_utilities.array_types.MboRawArray A scan object with metadata and lazily loaded data. Raises FileNotFoundError if no files match the specified path(s). Notes ----- If the provided path string appears to include escaped characters (for example, unintentional backslashes), a warning message is printed suggesting the use of a raw string (r'...') or double backslashes. Examples -------- >>> import mbo_utilities as mbo >>> import matplotlib.pyplot as plt >>> scan = mbo.read_scan(r"D:\\demo\\raw") >>> plt.imshow(scan[0, 5, 0, 0], cmap='gray') # First frame of z-plane 6 >>> scan = mbo.read_scan(r"D:\\demo\\raw", roi=1) # First ROI >>> plt.imshow(scan[0, 5, 0, 0], cmap='gray') # indexing works the same """ filenames = expand_paths(pathnames) if len(filenames) == 0: error_msg = f"Pathname(s) {pathnames} do not match any files in disk." raise FileNotFoundError(error_msg) if not is_raw_scanimage(filenames[0]): raise ValueError( f"The file {filenames[0]} does not appear to be a raw ScanImage TIFF file." ) # scan = MboRawArray( # roi=roi, # fix_phase=fix_phase, # phasecorr_method=phasecorr_method, # border=border, # upsample=upsample, # max_offset=max_offset, # ) # scan.read_data(filenames, dtype=dtype) # return scan
[docs] def get_files( base_dir, str_contains="", max_depth=1, sort_ascending=True, exclude_dirs=None ) -> list | Path: """ Recursively search for files in a specified directory whose names contain a given substring, limiting the search to a maximum subdirectory depth. Optionally, the resulting list of file paths is sorted in ascending order using numeric parts of the filenames when available. Parameters ---------- base_dir : str or Path The base directory where the search begins. This path is expanded (e.g., '~' is resolved) and converted to an absolute path. str_contains : str, optional A substring that must be present in a file's name for it to be included in the result. If empty, all files are matched. max_depth : int, optional The maximum number of subdirectory levels (relative to the base directory) to search. Defaults to 1. If set to 0, it is automatically reset to 1. sort_ascending : bool, optional If True (default), the matched file paths are sorted in ascending alphanumeric order. The sort key extracts numeric parts from filenames so that, for example, "file2" comes before "file10". exclude_dirs : iterable of str or Path, optional An iterable of directories to exclude from the resulting list of file paths. By default will exclude ".venv/", "__pycache__/", ".git" and ".github"]. Returns ------- list of str A list of full file paths (as strings) for files within the base directory (and its subdirectories up to the specified depth) that contain the provided substring. Raises ------ FileNotFoundError If the base directory does not exist. NotADirectoryError If the specified base_dir is not a directory. Examples -------- >>> import mbo_utilities as mbo >>> # Get all files that contain "ops.npy" in their names by searching up to 3 levels deep: >>> ops_files = mbo.get_files("path/to/files", "ops.npy", max_depth=3) >>> # Get only files containing "tif" in the current directory (max_depth=1): >>> tif_files = mbo.get_files("path/to/files", "tif") """ base_path = Path(base_dir).expanduser().resolve() if not base_path.exists(): raise FileNotFoundError(f"Directory '{base_path}' does not exist.") if not base_path.is_dir(): raise NotADirectoryError(f"'{base_path}' is not a directory.") if max_depth == 0: ic("Max-depth of 0 is not allowed. Setting to 1.") max_depth = 1 base_depth = len(base_path.parts) pattern = f"*{str_contains}*" if str_contains else "*" if exclude_dirs is None: exclude_dirs = [".venv", ".git", "__pycache__"] def is_excluded(path): return any(excl in path.parts for excl in exclude_dirs) files = [ file for file in base_path.rglob(pattern) if len(file.parts) - base_depth <= max_depth and file.is_file() and not is_excluded(file) ] if sort_ascending: def numerical_sort_key(path): match = re.search(r"\d+", path.name) return int(match.group()) if match else float("inf") files.sort(key=numerical_sort_key) return [str(file) for file in files]
def _is_arraylike(obj) -> bool: """ Checks if the object is array-like. For now just checks if obj has `__getitem__()` """ for attr in ["__getitem__", "shape", "ndim"]: if not hasattr(obj, attr): return False return True def _get_mbo_project_root() -> Path: """Return the root path of the mbo_utilities repository (based on this file).""" return Path(__file__).resolve().parent.parent
[docs] def get_mbo_dirs() -> dict: """ Ensure ~/mbo and its subdirectories exist. Returns a dict with paths to the root, settings, and cache directories. """ base = Path.home().joinpath("mbo") imgui = base.joinpath("imgui") cache = base.joinpath("cache") logs = base.joinpath("logs") tests = base.joinpath("tests") data = base.joinpath("data") assets = imgui.joinpath("assets") settings = assets.joinpath("app_settings") for d in (base, imgui, cache, logs, assets, data, tests): d.mkdir(exist_ok=True) return { "base": base, "imgui": imgui, "cache": cache, "logs": logs, "assets": assets, "settings": settings, "data": data, "tests": tests, }
def _convert_range_to_slice(k): return slice(k.start, k.stop, k.step) if isinstance(k, range) else k