Source code for lbm_suite2p_python.utils

import os
import subprocess

import numpy as np
from pathlib import Path

import tifffile
from matplotlib import patches

import matplotlib.pyplot as plt
import matplotlib as mpl

import suite2p
from scipy.ndimage import percentile_filter, gaussian_filter1d, uniform_filter1d

mpl.rcParams.update({
    'axes.spines.left': True,
    'axes.spines.bottom': True,
    'axes.spines.top': False,
    'axes.spines.right': False,
    'legend.frameon': False,
    'figure.subplot.wspace': .01,
    'figure.subplot.hspace': .01,
    'figure.figsize': (18, 13),
    'ytick.major.left': True,
})
jet = mpl.cm.get_cmap('jet')
jet.set_bad(color='k')


def smooth_video(input_path, output_path, target_fps=60):
    filter_str = f"minterpolate=fps={target_fps}:mi_mode=mci:mc_mode=aobmc:me=umh:vsbmc=1"
    cmd = [
        "ffmpeg",
        "-y",
        "-i", input_path,
        "-vf", filter_str,
        "-fps_mode", "cfr",
        "-r", str(target_fps),
        "-c:v", "libx264",
        "-crf", "18",
        "-preset", "slow",
        output_path
    ]

    result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    if result.returncode != 0:
        print("FFmpeg error:", result.stderr)
        raise subprocess.CalledProcessError(result.returncode, cmd, output=result.stdout, stderr=result.stderr)


[docs] def load_ops(ops_input: str | Path | list[str | Path]): """ Simple utility load a suite2p npy file""" if isinstance(ops_input, (str, Path)): return np.load(ops_input, allow_pickle=True).item() elif isinstance(ops_input, dict): return ops_input
def resize_to_max_proj(mask, target_shape): """Centers a mask within the target shape, cropping if too large or padding if too small.""" sy, sx = mask.shape ty, tx = target_shape # If mask is larger, crop it if sy > ty or sx > tx: start_y = (sy - ty) // 2 start_x = (sx - tx) // 2 return mask[start_y:start_y + ty, start_x:start_x + tx] # If mask is smaller, pad it resized_mask = np.zeros(target_shape, dtype=mask.dtype) start_y = (ty - sy) // 2 start_x = (tx - sx) // 2 resized_mask[start_y:start_y + sy, start_x:start_x + sx] = mask return resized_mask def convert_to_rgba(zstack): """ Converts a grayscale Z-stack (14x500x500) to an RGBA format (14x500x500x4). Parameters ---------- zstack : np.ndarray Input grayscale Z-stack with shape (num_slices, height, width). Returns ------- np.ndarray RGBA Z-stack with shape (num_slices, height, width, 4). """ # Normalize grayscale values to [0,1] range normalized = (zstack - zstack.min()) / (zstack.max() - zstack.min()) # Convert to RGB (repeat grayscale across RGB channels) rgba_stack = np.zeros((*zstack.shape, 4), dtype=np.float32) rgba_stack[..., :3] = np.repeat(normalized[..., np.newaxis], 3, axis=-1) # Set alpha channel to fully opaque (1.0) rgba_stack[..., 3] = 1.0 return rgba_stack
[docs] def load_traces(ops): """ Load fluorescence traces and related data from an ops file directory and return valid cells. This function loads the raw fluorescence traces, neuropil traces, and spike data from the directory specified in the ops dictionary. It also loads the 'iscell' file and returns only the traces corresponding to valid cells (i.e. where iscell is True). Parameters ---------- ops : dict Dictionary containing at least the key 'save_path', which specifies the directory where the following files are stored: 'F.npy', 'Fneu.npy', 'spks.npy', and 'iscell.npy'. Returns ------- F_valid : ndarray Array of fluorescence traces for valid cells (n_valid x n_timepoints). Fneu_valid : ndarray Array of neuropil fluorescence traces for valid cells (n_valid x n_timepoints). spks_valid : ndarray Array of spike data for valid cells (n_valid x n_timepoints). Notes ----- The 'iscell.npy' file is expected to be an array where the first column (iscell[:, 0]) contains boolean values indicating valid cells. """ save_path = Path(ops['save_path']) F = np.load(save_path.joinpath('F.npy')) Fneu = np.load(save_path.joinpath('Fneu.npy')) spks = np.load(save_path.joinpath('spks.npy')) iscell = np.load(save_path.joinpath('iscell.npy'), allow_pickle=True)[:, 0].astype(bool) F_valid = F[iscell] Fneu_valid = Fneu[iscell] spks_valid = spks[iscell] return F_valid, Fneu_valid, spks_valid
[docs] def plot_projection( ops, savepath=None, fig_label=None, vmin=None, vmax=None, add_scalebar=False, proj="meanImg", display_masks=False, accepted_only=False ): if proj == "meanImg": txt = "Mean-Image" elif proj == "max_proj": txt = "Max-Projection" elif proj == "meanImgE": txt = "Mean-Image (Enhanced)" else: raise ValueError("Unknown projection type. Options are ['meanImg', 'max_proj', 'meanImgE']") if savepath: savepath = Path(savepath) data = ops[proj] shape = data.shape fig, ax = plt.subplots(figsize=(6, 6), facecolor='black') vmin = np.nanpercentile(data, 2) if vmin is None else vmin vmax = np.nanpercentile(data, 98) if vmax is None else vmax if vmax - vmin < 1e-6: vmax = vmin + 1e-6 ax.imshow(data, cmap='gray', vmin=vmin, vmax=vmax) # move projection title higher if masks are displayed to avoid overlap. proj_title_y = 1.07 if display_masks else 1.02 ax.text(0.5, proj_title_y, txt, transform=ax.transAxes, fontsize=14, fontweight='bold', fontname="Courier New", color='white', ha='center', va='bottom') if fig_label: fig_label = fig_label.replace("_", " ").replace("-", " ").replace(".", " ") ax.set_ylabel(fig_label, color='white', fontweight='bold', fontsize=12) ax.set_xticks([]) ax.set_yticks([]) if display_masks: stats_file = Path(ops['save_path']).joinpath('stat.npy') iscell = np.load(Path(ops['save_path']).joinpath('iscell.npy'), allow_pickle=True)[:, 0].astype(bool) stats = np.load(stats_file, allow_pickle=True) im = suite2p.ROI.stats_dicts_to_3d_array(stats, Ly=ops['Ly'], Lx=ops['Lx'], label_id=True) im[im == 0] = np.nan accepted_cells = np.sum(iscell) rejected_cells = np.sum(~iscell) cell_rois = resize_to_max_proj( np.nanmax(im[iscell], axis=0) if np.any(iscell) else np.zeros_like(im[0]), shape) green_overlay = np.zeros((*shape, 4), dtype=np.float32) green_overlay[..., 1] = 1 green_overlay[..., 3] = (cell_rois > 0) * 1.0 ax.imshow(green_overlay) if not accepted_only: non_cell_rois = resize_to_max_proj( np.nanmax(im[~iscell], axis=0) if np.any(~iscell) else np.zeros_like(im[0]), shape) magenta_overlay = np.zeros((*shape, 4), dtype=np.float32) magenta_overlay[..., 0] = 1 magenta_overlay[..., 2] = 1 magenta_overlay[..., 3] = (non_cell_rois > 0) * 0.5 ax.imshow(magenta_overlay) ax.text(0.37, 1.02, f"Accepted: {accepted_cells:03d}", transform=ax.transAxes, fontsize=14, fontweight='bold', fontname="Courier New", color='lime', ha='right', va='bottom') ax.text(0.63, 1.02, f"Rejected: {rejected_cells:03d}", transform=ax.transAxes, fontsize=14, fontweight='bold', fontname="Courier New", color='magenta', ha='left', va='bottom') if add_scalebar and 'dx' in ops: pixel_size = ops['dx'] scale_bar_length = 100 / pixel_size scalebar_x = shape[1] * 0.05 scalebar_y = shape[0] * 0.90 ax.add_patch(patches.Rectangle( (scalebar_x, scalebar_y), scale_bar_length, 5, edgecolor='white', facecolor='white')) ax.text(scalebar_x + scale_bar_length / 2, scalebar_y - 10, "100 μm", color='white', fontsize=10, ha='center', fontweight='bold') # remove the spines that will show up as white bars for spine in ax.spines.values(): spine.set_visible(False) plt.tight_layout() if savepath: savepath.parent.mkdir(parents=True, exist_ok=True) plt.savefig(savepath, dpi=300, facecolor='black') else: plt.show()
def gaussian(x, mu, sigma): return np.exp(-0.5 * ((x - mu) / sigma) ** 2) / (sigma * np.sqrt(2 * np.pi))
[docs] def dff_percentile(f_trace, window_size=300, percentile=8): """ Compute ΔF/F₀ using a rolling percentile baseline. Parameters: ----------- f_trace : np.ndarray (N_neurons, N_frames) fluorescence traces. window_size : int Size of the rolling window (in frames). percentile : int Percentile to use for baseline F₀ estimation. Returns: -------- dff : np.ndarray (N_neurons, N_frames) ΔF/F₀ traces. """ f0 = np.array([ percentile_filter(f, percentile, size=window_size, mode='nearest') for f in f_trace ]) return (f_trace - f0) / (f0 + 1e-6) # 1e-6 to avoid division by zero
[docs] def dff_maxmin(f_trace, fps, smooth_window=5): """Compute DF/F₀ using a 5s Gaussian filter followed by rolling max-min ('maxmin').""" window_size = int(5 * fps) # Step 1: Apply Gaussian filter for initial baseline estimation f_smoothed = gaussian_filter1d(f_trace, sigma=window_size, axis=1) # Step 2: Rolling max-min baseline f_min = uniform_filter1d(f_smoothed, size=window_size, axis=1, mode="nearest") f_max = uniform_filter1d(f_trace, size=window_size, axis=1, mode="nearest") f_baseline = (f_min + f_max) / 2 # Approximate rolling baseline # Step 3: Compute ΔF/F₀ dff = (f_trace - f_baseline) / (f_baseline + 1e-8) # Step 4: Normalize 0 to 1 for visualization dff_n = (dff - np.min(dff, axis=1, keepdims=True)) / (np.max(dff, axis=1, keepdims=True) - np.min(dff, axis=1, keepdims=True) + 1e-8) dff_smooth = uniform_filter1d(dff_n, size=smooth_window, axis=1) return dff_smooth
[docs] def get_common_path(ops_files: list | tuple): """ Find the common path of all files in `ops_files`. If there is a single file or no common path, return the first non-empty path. """ if not isinstance(ops_files, (list, tuple)): ops_files = [ops_files] if len(ops_files) == 1: path = Path(ops_files[0]).parent while path.exists() and len(list(path.iterdir())) <= 1: # Traverse up if only one item exists path = path.parent return path else: return Path(os.path.commonpath(ops_files))
[docs] def combine_tiffs(files): """ Combines multiple TIFF files into a single stacked TIFF. Parameters ---------- files : list of str or Path List of file paths to the TIFF files to be combined. Returns ------- np.ndarray A 3D NumPy array representing the concatenated TIFF stack. Notes ----- - Input TIFFs should have identical spatial dimensions (`Y x X`). - The output shape will be `(T_total, Y, X)`, where `T_total` is the sum of all input time points. """ first_file = files[0] first_tiff = tifffile.imread(first_file) num_files = len(files) num_frames, height, width = first_tiff.shape new_tiff = np.zeros((num_frames * num_files, height, width), dtype=first_tiff.dtype) for i, f in enumerate(files): tiff = tifffile.imread(f) new_tiff[i * num_frames:(i + 1) * num_frames] = tiff return new_tiff