import json
from collections.abc import Sequence
from io import StringIO
from itertools import product
import re
from pathlib import Path
import numpy as np
from icecream import ic
import dask.array as da
from tifffile import TiffFile
from . import log
from .metadata import is_raw_scanimage
try:
from zarr import open as zarr_open
from zarr.storage import FsspecStore
from fsspec.implementations.reference import ReferenceFileSystem
HAS_ZARR = True
except ImportError:
HAS_ZARR = False
zarr_open = None
ReferenceFileSystem = None
FsspecStore = None
CHUNKS = {0: 1, 1: "auto", 2: -1, 3: -1}
SAVE_AS_TYPES = [".tiff", ".bin", ".h5", ".zarr"]
logger = log.get("file_io")
PIPELINE_TAGS = ("plane", "roi", "z", "plane_", "roi_", "z_")
[docs]
def load_ops(ops_input: str | Path | list[str | Path]):
"""Simple utility load a suite2p npy file"""
if isinstance(ops_input, (str, Path)):
return np.load(ops_input, allow_pickle=True).item()
elif isinstance(ops_input, dict):
return ops_input
print("Warning: No valid ops file provided, returning None.")
return {}
[docs]
def write_ops(metadata, raw_filename):
"""
Write metadata to an ops file alongside the given filename.
metadata must contain
'shape'
'pixel_resolution',
'frame_rate' keys.
"""
logger.info(f"Writing ops file for {raw_filename} with metadata: {metadata}")
assert isinstance(raw_filename, (str, Path)), "filename must be a string or Path object"
filename = Path(raw_filename).expanduser().resolve()
# this convention means input can be either
if filename.is_file():
root = filename.parent
else:
root = filename
ops_path = root.joinpath("ops.npy")
logger.debug(f"Writing ops file to {ops_path}.")
shape = metadata["shape"]
nt = shape[0]
Lx = shape[-2]
Ly = shape[-1]
if "pixel_resolution" not in metadata:
logger.warning("No pixel resolution found in metadata, using default [2, 2].")
if "fs" not in metadata:
if "frame_rate" in metadata:
metadata["fs"] = metadata["frame_rate"]
elif "framerate" in metadata:
metadata["fs"] = metadata["framerate"]
else:
logger.debug("No frame rate found in metadata; defaulting fs=10")
metadata["fs"] = 10
dx, dy = metadata.get("pixel_resolution", [2, 2])
ops = {
# suite2p needs these
"Ly": Ly,
"Lx": Lx,
"fs": metadata['fs'],
"nframes": nt,
"dx": dx,
"dy": dy,
"ops_path": str(ops_path),
# and dump the rest of the metadata
**metadata,
}
np.save(ops_path, ops)
logger.debug(f"Ops file written to {ops_path} with metadata:\n"
f" {ops}")
def normalize_file_url(path):
"""
Derive a folder tag from a filename based on “planeN”, “roiN”, or "tagN" patterns.
Parameters
----------
path : str or pathlib.Path
File path or name whose stem will be parsed.
Returns
-------
str
If the stem starts with “plane”, “roi”, or “res” followed by an integer,
returns that tag plus the integer (e.g. “plane3”, “roi7”, “res2”).
Otherwise returns the original stem unchanged.
Examples
--------
>>> normalize_file_url("plane_01.tif")
'plane1'
>>> normalize_file_url("plane2.bin")
'plane2'
>>> normalize_file_url("roi5.raw")
'roi5'
>>> normalize_file_url("ROI_10.dat")
'roi10'
>>> normalize_file_url("res-3.h5")
'res3'
>>> normalize_file_url("assembled_data_1.tiff")
'assembled_data_1'
>>> normalize_file_url("file_12.tif")
'file_12'
"""
name = Path(path).stem
for tag in PIPELINE_TAGS:
low = name.lower()
if low.startswith(tag):
suffix = name[len(tag):]
if suffix and (suffix[0] in ("_", "-")):
suffix = suffix[1:]
if suffix.isdigit():
return f"{tag}{int(suffix)}"
return name
[docs]
def npy_to_dask(files, name="", axis=1, astype=None):
"""
Creates a Dask array that lazily stacks multiple .npy files along a specified axis without fully loading them into memory.
Taken from suite3d for convenience
https://github.com/alihaydaroglu/suite3d/blob/py310/suite3d/utils.py
To avoid the unnessessary import. Very nice function, thanks Ali!
Parameters
----------
files : list of str or Path
A list of file paths pointing to .npy files containing array data. Each file must have the same shape except
possibly along the concatenation axis.
name : str, optional
A string to be appended to a base name ("from-npy-stack-") to label the resulting Dask array. Default is an empty string.
axis : int, optional
The axis along which to stack/concatenate the arrays from the provided files. Default is 1.
astype : numpy.dtype, optional
If provided, the resulting Dask array will be cast to this data type. Otherwise, the data type is inferred
from the first file.
Returns
-------
dask.array.Array
Examples
--------
>>> # https://www.fastplotlib.org/
>>> import fastplotlib as fpl
>>> import mbo_utilities as mbo
>>> files = mbo.get_files("path/to/images/", 'fused', 3) # suite3D output
>>> arr = npy_to_dask(files, name="stack", axis=1)
>>> print(arr.shape)
(nz, nt, ny, nx )
>>> # Optionally, cast the array to float32
>>> arr = npy_to_dask(files, axis=1, astype=np.float32)
>>> fpl.ImageWidget(arr.transpose(1, 0, 2, 3)).show()
"""
sample_mov = np.load(files[0], mmap_mode="r")
file_ts = [np.load(f, mmap_mode="r").shape[axis] for f in files]
nz, nt_sample, ny, nx = sample_mov.shape
dtype = sample_mov.dtype
chunks = [(nz,), (nt_sample,), (ny,), (nx,)]
chunks[axis] = tuple(file_ts)
chunks = tuple(chunks)
name = "from-npy-stack-%s" % name
keys = list(product([name], *[range(len(c)) for c in chunks]))
values = [(np.load, files[i], "r") for i in range(len(chunks[axis]))]
dsk = dict(zip(keys, values, strict=False))
arr = da.Array(dsk, name, chunks, dtype)
if astype is not None:
arr = arr.astype(astype)
return arr
[docs]
def expand_paths(paths: str | Path | Sequence[str | Path]) -> list[Path]:
"""
Expand a path, list of paths, or wildcard pattern into a sorted list of actual files.
This is a handy wrapper for loading images or data files when you’ve got a folder,
some wildcards, or a mix of both.
Parameters
----------
paths : str, Path, or list of (str or Path)
Can be a single path, a wildcard pattern like '*.tif', a folder, or a list of those.
Returns
-------
list of Path
Sorted list of full paths to matching files.
Examples
--------
>>> expand_paths("data/*.tif")
[Path('data/img_000.tif'), Path('data/img_001.tif'), ...]
>>> expand_paths(Path("data"))
[Path('data/img_000.tif'), Path('data/img_001.tif'), ...]
>>> expand_paths(["data/*.tif", Path("more_data")])
[Path('data/img_000.tif'), Path('more_data/img_050.tif'), ...]
"""
if isinstance(paths, (str, Path)):
paths = [paths]
elif not isinstance(paths, (list, tuple)):
raise TypeError(f"Expected str, Path, or sequence of them, got {type(paths)}")
result = []
for p in paths:
p = Path(p)
if "*" in str(p):
result.extend(p.parent.glob(p.name))
elif p.is_dir():
result.extend(p.glob("*"))
elif p.exists() and p.is_file():
result.append(p)
return sorted(p.resolve() for p in result if p.is_file())
def _tiff_to_fsspec(tif_path: Path, base_dir: Path) -> dict:
"""
Create a kerchunk reference for a single TIFF file.
Parameters
----------
tif_path : Path
Path to the TIFF file on disk.
base_dir : Path
Directory representing the “root” URI for the reference.
Returns
-------
refs : dict
A kerchunk reference dict (in JSON form) for this single TIFF.
"""
with TiffFile(str(tif_path.expanduser().resolve())) as tif:
with StringIO() as f:
store = tif.aszarr()
store.write_fsspec(f, url=base_dir.as_uri())
refs = json.loads(f.getvalue()) # type: ignore
return refs
def _multi_tiff_to_fsspec(tif_files: list[Path], base_dir: Path) -> dict:
assert len(tif_files) > 1, "Need at least two TIFF files to combine."
combined_refs: dict[str, str] = {}
per_file_refs = []
total_shape = None
total_chunks = None
zarr_meta = {}
for tif_path in tif_files:
# Create a json reference for each TIFF file
inner_refs = _tiff_to_fsspec(tif_path, base_dir)
zarr_meta = json.loads(inner_refs.pop(".zarray"))
inner_refs.pop(".zattrs", None)
shape = zarr_meta["shape"]
chunks = zarr_meta["chunks"]
if total_shape is None:
total_shape = shape.copy()
total_chunks = chunks
else:
assert shape[1:] == total_shape[1:], f"Shape mismatch in {tif_path}"
assert chunks == total_chunks, f"Chunk mismatch in {tif_path}"
total_shape[0] += shape[0] # accumulate along axis 0
per_file_refs.append((inner_refs, shape))
combined_zarr_meta = {
"shape": total_shape, # total shape tracks the full-assembled image shape
"chunks": total_chunks,
"dtype": zarr_meta["dtype"],
"compressor": zarr_meta["compressor"],
"filters": zarr_meta.get("filters", None),
"order": zarr_meta["order"],
"zarr_format": zarr_meta["zarr_format"],
"fill_value": zarr_meta.get("fill_value", 0),
}
combined_refs[".zarray"] = json.dumps(combined_zarr_meta)
combined_refs[".zattrs"] = json.dumps(
{"_ARRAY_DIMENSIONS": ["T", "C", "Y", "X"][:len(total_shape)]}
)
axis0_offset = 0
# since we are combining along axis 0, we need to adjust the keys
# in the inner_refs to account for the offset along that axis.
for inner_refs, shape in per_file_refs:
chunksize0 = total_chunks[0]
for key, val in inner_refs.items():
idx = list(map(int, key.strip("/").split(".")))
idx[0] += axis0_offset // chunksize0
new_key = ".".join(map(str, idx))
combined_refs[new_key] = val
axis0_offset += shape[0]
return combined_refs
def read_scan(
pathnames,
dtype=np.int16,
roi=None,
fix_phase: bool = True,
phasecorr_method: str = "frame",
border: int | tuple[int, int, int, int] = 3,
upsample: int = 1,
max_offset: int = 4,
):
"""
Reads a ScanImage scan from a given file or set of file paths and returns a
ScanMultiROIReordered object with lazy-loaded data.
Parameters
----------
pathnames : str, Path, or sequence of str/Path
A single path to, a wildcard pattern (e.g. ``*.tif``), or a list of paths
specifying the ScanImage TIFF files to read.
roi : int, optional
Specify ROI to export if only exporting a single ROI. 1-based.
Defaults to None, which exports pre-assembled (tiled) rois.
fix_phase : bool, optional
If True, applies phase correction to the scan data. Default is False.
phasecorr_method : str, optional
The method to use for phase correction. Options are 'subpix', 'two_step',
border : int or tuple of int, optional
The border size to use for phase correction. If an int, applies the same
border to all sides. If a tuple, specifies (top, bottom, left, right) borders.
upsample : int, optional
The for subpixel correction, upsample factor for phase correction.
A value of 1 clamps to whole-pixel. Default is 10.
max_offset : int, optional
The maximum allowed phase offset in pixels. If the computed offset exceeds
this value, it is clamped to the maximum. Default is 3.
dtype : numpy.dtype, optional
The data type to use when reading the scan data. Default is np.int16.
Returns
-------
mbo_utilities.array_types.MboRawArray
A scan object with metadata and lazily loaded data. Raises FileNotFoundError
if no files match the specified path(s).
Notes
-----
If the provided path string appears to include escaped characters (for example,
unintentional backslashes), a warning message is printed suggesting the use of a
raw string (r'...') or double backslashes.
Examples
--------
>>> import mbo_utilities as mbo
>>> import matplotlib.pyplot as plt
>>> scan = mbo.read_scan(r"D:\\demo\\raw")
>>> plt.imshow(scan[0, 5, 0, 0], cmap='gray') # First frame of z-plane 6
>>> scan = mbo.read_scan(r"D:\\demo\\raw", roi=1) # First ROI
>>> plt.imshow(scan[0, 5, 0, 0], cmap='gray') # indexing works the same
"""
filenames = expand_paths(pathnames)
if len(filenames) == 0:
error_msg = f"Pathname(s) {pathnames} do not match any files in disk."
raise FileNotFoundError(error_msg)
if not is_raw_scanimage(filenames[0]):
raise ValueError(
f"The file {filenames[0]} does not appear to be a raw ScanImage TIFF file."
)
# scan = MboRawArray(
# roi=roi,
# fix_phase=fix_phase,
# phasecorr_method=phasecorr_method,
# border=border,
# upsample=upsample,
# max_offset=max_offset,
# )
# scan.read_data(filenames, dtype=dtype)
# return scan
[docs]
def get_files(
base_dir, str_contains="", max_depth=1, sort_ascending=True, exclude_dirs=None
) -> list | Path:
"""
Recursively search for files in a specified directory whose names contain a given substring,
limiting the search to a maximum subdirectory depth. Optionally, the resulting list of file paths
is sorted in ascending order using numeric parts of the filenames when available.
Parameters
----------
base_dir : str or Path
The base directory where the search begins. This path is expanded (e.g., '~' is resolved)
and converted to an absolute path.
str_contains : str, optional
A substring that must be present in a file's name for it to be included in the result.
If empty, all files are matched.
max_depth : int, optional
The maximum number of subdirectory levels (relative to the base directory) to search.
Defaults to 1. If set to 0, it is automatically reset to 1.
sort_ascending : bool, optional
If True (default), the matched file paths are sorted in ascending alphanumeric order.
The sort key extracts numeric parts from filenames so that, for example, "file2" comes
before "file10".
exclude_dirs : iterable of str or Path, optional
An iterable of directories to exclude from the resulting list of file paths. By default
will exclude ".venv/", "__pycache__/", ".git" and ".github"].
Returns
-------
list of str
A list of full file paths (as strings) for files within the base directory (and its
subdirectories up to the specified depth) that contain the provided substring.
Raises
------
FileNotFoundError
If the base directory does not exist.
NotADirectoryError
If the specified base_dir is not a directory.
Examples
--------
>>> import mbo_utilities as mbo
>>> # Get all files that contain "ops.npy" in their names by searching up to 3 levels deep:
>>> ops_files = mbo.get_files("path/to/files", "ops.npy", max_depth=3)
>>> # Get only files containing "tif" in the current directory (max_depth=1):
>>> tif_files = mbo.get_files("path/to/files", "tif")
"""
base_path = Path(base_dir).expanduser().resolve()
if not base_path.exists():
raise FileNotFoundError(f"Directory '{base_path}' does not exist.")
if not base_path.is_dir():
raise NotADirectoryError(f"'{base_path}' is not a directory.")
if max_depth == 0:
ic("Max-depth of 0 is not allowed. Setting to 1.")
max_depth = 1
base_depth = len(base_path.parts)
pattern = f"*{str_contains}*" if str_contains else "*"
if exclude_dirs is None:
exclude_dirs = [".venv", ".git", "__pycache__"]
def is_excluded(path):
return any(excl in path.parts for excl in exclude_dirs)
files = [
file
for file in base_path.rglob(pattern)
if len(file.parts) - base_depth <= max_depth
and file.is_file()
and not is_excluded(file)
]
if sort_ascending:
def numerical_sort_key(path):
match = re.search(r"\d+", path.name)
return int(match.group()) if match else float("inf")
files.sort(key=numerical_sort_key)
return [str(file) for file in files]
def _is_arraylike(obj) -> bool:
"""
Checks if the object is array-like.
For now just checks if obj has `__getitem__()`
"""
for attr in ["__getitem__", "shape", "ndim"]:
if not hasattr(obj, attr):
return False
return True
def _get_mbo_project_root() -> Path:
"""Return the root path of the mbo_utilities repository (based on this file)."""
return Path(__file__).resolve().parent.parent
[docs]
def get_mbo_dirs() -> dict:
"""
Ensure ~/mbo and its subdirectories exist.
Returns a dict with paths to the root, settings, and cache directories.
"""
base = Path.home().joinpath("mbo")
imgui = base.joinpath("imgui")
cache = base.joinpath("cache")
logs = base.joinpath("logs")
tests = base.joinpath("tests")
data = base.joinpath("data")
assets = imgui.joinpath("assets")
settings = assets.joinpath("app_settings")
for d in (base, imgui, cache, logs, assets, data, tests):
d.mkdir(exist_ok=True)
return {
"base": base,
"imgui": imgui,
"cache": cache,
"logs": logs,
"assets": assets,
"settings": settings,
"data": data,
"tests": tests,
}
def _convert_range_to_slice(k):
return slice(k.start, k.stop, k.step) if isinstance(k, range) else k