Source code for lbm_caiman_python.postprocessing

"""
postprocessing utilities for caiman results.
"""

from pathlib import Path
from typing import Union

import numpy as np


[docs] def load_ops(ops_input) -> dict: """ load ops from file or return dict as-is. parameters ---------- ops_input : str, Path, dict, or None path to ops.npy file, dictionary, or None. returns ------- dict ops dictionary. """ if ops_input is None: return {} if isinstance(ops_input, dict): return ops_input.copy() if isinstance(ops_input, (str, Path)): ops_path = Path(ops_input) # handle directory input if ops_path.is_dir(): ops_path = ops_path / "ops.npy" if ops_path.exists(): ops = np.load(ops_path, allow_pickle=True) if isinstance(ops, np.ndarray): ops = ops.item() return dict(ops) else: return {} return {}
[docs] def load_planar_results(plane_dir) -> dict: """ load all results from a plane directory. parameters ---------- plane_dir : str or Path path to plane output directory. returns ------- dict dictionary containing ops, estimates, F, dff, etc. """ plane_dir = Path(plane_dir) results = {} # load ops ops_file = plane_dir / "ops.npy" if ops_file.exists(): results["ops"] = load_ops(ops_file) # load estimates estimates_file = plane_dir / "estimates.npy" if estimates_file.exists(): results["estimates"] = np.load(estimates_file, allow_pickle=True).item() # load fluorescence F_file = plane_dir / "F.npy" if F_file.exists(): results["F"] = np.load(F_file) # load dff dff_file = plane_dir / "dff.npy" if dff_file.exists(): results["dff"] = np.load(dff_file) # load spikes spks_file = plane_dir / "spks.npy" if spks_file.exists(): results["spks"] = np.load(spks_file) # load motion correction results shifts_file = plane_dir / "mcorr_shifts.npy" if shifts_file.exists(): results["shifts"] = np.load(shifts_file, allow_pickle=True) template_file = plane_dir / "mcorr_template.npy" if template_file.exists(): results["template"] = np.load(template_file) return results
[docs] def dff_rolling_percentile( F: np.ndarray, window_size: int = None, percentile: int = 20, smooth_window: int = None, fs: float = 30.0, tau: float = 1.0, ) -> np.ndarray: """ compute dF/F using rolling percentile baseline. parameters ---------- F : np.ndarray fluorescence traces, shape (n_cells, n_frames). window_size : int, optional frames for rolling percentile. default: ~10*tau*fs. percentile : int, default 20 percentile for baseline F0. smooth_window : int, optional smoothing window for dF/F. fs : float, default 30.0 frame rate in Hz. tau : float, default 1.0 decay time constant in seconds. returns ------- np.ndarray dF/F traces, same shape as F. """ if F is None or F.size == 0: return np.array([]) # ensure 2d if F.ndim == 1: F = F[np.newaxis, :] n_cells, n_frames = F.shape # auto-calculate window size if window_size is None: window_size = int(10 * tau * fs) window_size = max(10, min(window_size, n_frames // 2)) # compute baseline using rolling percentile from scipy.ndimage import percentile_filter F0 = np.zeros_like(F) for i in range(n_cells): F0[i] = percentile_filter(F[i], percentile, size=window_size) # avoid division by zero F0 = np.maximum(F0, 1e-6) # compute dF/F dff = (F - F0) / F0 # optional smoothing if smooth_window is not None and smooth_window > 1: from scipy.ndimage import uniform_filter1d dff = uniform_filter1d(dff, size=smooth_window, axis=1) return dff
[docs] def compute_roi_stats(plane_dir) -> dict: """ compute roi quality statistics. parameters ---------- plane_dir : str or Path path to plane output directory. returns ------- dict dictionary of roi statistics. """ plane_dir = Path(plane_dir) results = load_planar_results(plane_dir) stats = {} # get estimates estimates = results.get("estimates", {}) A = estimates.get("A") C = estimates.get("C") SNR = estimates.get("SNR_comp") r_values = estimates.get("r_values") if A is not None: from scipy import sparse if sparse.issparse(A): A = A.toarray() n_cells = A.shape[1] stats["n_cells"] = n_cells # compute cell sizes (number of pixels) cell_sizes = np.array([(A[:, i] > 0).sum() for i in range(n_cells)]) stats["cell_sizes"] = cell_sizes stats["mean_cell_size"] = float(np.mean(cell_sizes)) stats["median_cell_size"] = float(np.median(cell_sizes)) if C is not None: n_cells, n_frames = C.shape stats["n_frames"] = n_frames # compute signal statistics stats["mean_signal"] = float(np.mean(C)) stats["max_signal"] = float(np.max(C)) if SNR is not None: stats["snr"] = SNR stats["mean_snr"] = float(np.mean(SNR)) stats["median_snr"] = float(np.median(SNR)) if r_values is not None: stats["r_values"] = r_values stats["mean_r_value"] = float(np.mean(r_values)) # save stats np.save(plane_dir / "roi_stats.npy", stats) return stats
[docs] def get_accepted_cells(plane_dir) -> tuple: """ get indices of accepted and rejected cells. parameters ---------- plane_dir : str or Path path to plane output directory. returns ------- tuple (accepted_indices, rejected_indices) """ plane_dir = Path(plane_dir) estimates_file = plane_dir / "estimates.npy" if not estimates_file.exists(): return np.array([]), np.array([]) estimates = np.load(estimates_file, allow_pickle=True).item() accepted = estimates.get("idx_components") rejected = estimates.get("idx_components_bad") if accepted is None: # if no evaluation done, accept all A = estimates.get("A") if A is not None: accepted = np.arange(A.shape[1]) else: accepted = np.array([]) if rejected is None: rejected = np.array([]) return np.asarray(accepted), np.asarray(rejected)
[docs] def get_contours(plane_dir, threshold: float = 0.5) -> list: """ get cell contours for visualization. parameters ---------- plane_dir : str or Path path to plane output directory. threshold : float, default 0.5 threshold for contour extraction (fraction of max). returns ------- list list of contour coordinates for each cell. """ plane_dir = Path(plane_dir) results = load_planar_results(plane_dir) estimates = results.get("estimates", {}) ops = results.get("ops", {}) A = estimates.get("A") if A is None: return [] from scipy import sparse if sparse.issparse(A): A = A.toarray() Ly = ops.get("Ly", int(np.sqrt(A.shape[0]))) Lx = ops.get("Lx", int(np.sqrt(A.shape[0]))) contours = [] for i in range(A.shape[1]): component = A[:, i].reshape((Ly, Lx), order="F") # find contour from skimage import measure thresh = component.max() * threshold contour_list = measure.find_contours(component, thresh) if contour_list: contours.append(contour_list[0]) else: contours.append(np.array([])) return contours