Source code for lbm_suite2p_python.volume

import glob
import os
import subprocess
from pathlib import Path

import cv2
import numpy as np
from matplotlib import pyplot as plt
from rastermap.utils import bin1d

from lbm_suite2p_python import load_ops, get_common_path


[docs] def update_ops_paths(ops_files: str | list): """ Update save_path, save_path0, and save_folder in an ops dictionary based on its current location. Use after moving an ops_file or batch of ops_files.""" if isinstance(ops_files, (str,Path)): ops_files = [ops_files] for ops_file in ops_files: ops = np.load(ops_file, allow_pickle=True).item() ops_path = Path(ops_file) plane0_folder = ops_path.parent plane_folder = plane0_folder.parent ops["save_path"] = str(plane0_folder) ops["save_path0"] = str(plane_folder) ops["save_folder"] = plane_folder.name ops["ops_path"] = ops_path np.save(ops_file, ops)
[docs] def load_results_dict(ops_filepath, apply_zscore=True, z_plane=None) -> dict: """ Load stat, iscell, spks files and return as a dict parameters ---------- ops_filepath : str or Path path to the ops.npy file. apply_zscore : bool, optional whether to apply zscore to traces (default is False). z_plane : int or None, optional the z-plane index for this file. If provided, it is stored in the output. returns ------- dict dictionary with keys: - 'output_ops': dict loaded from ops file, - 'spks': spks (2d array: neurons x time), - 'stats': stats loaded from stat.npy, - 'iscell': boolean array from iscell.npy, - 'xy': x-y positions from stats, - 'z_plane': an array (of shape [n_neurons,]) with the provided z_plane index. """ from pathlib import Path from scipy.stats import zscore ops_filepath = Path(ops_filepath) output_ops = load_ops(ops_filepath) save_path = Path(output_ops['save_path']) mean_img = output_ops["meanImg"] stats_file = save_path.joinpath('stat.npy') spks_file = save_path.joinpath('spks.npy') iscell_file = save_path.joinpath('iscell.npy') iscell = np.load(iscell_file, allow_pickle=True)[:, 0].astype(bool) stats = np.load(stats_file, allow_pickle=True)[iscell] spks = np.load(spks_file, allow_pickle=True)[iscell] if apply_zscore: spks = zscore(spks, axis=1) xy = np.array([s["med"] for s in stats]) n_neurons = spks.shape[0] if z_plane is None: # If not provided, assign a default of 0 z_plane_arr = np.zeros(n_neurons, dtype=int) else: z_plane_arr = np.full(n_neurons, z_plane, dtype=int) return { 'output_ops': output_ops, 'spks': spks, 'stats': stats, 'iscell': iscell, 'xy': xy, 'mean_image': mean_img, 'z_plane': z_plane_arr }
[docs] def plot_rastermap( spks, model, neuron_bin_size=None, fps=17, vmin=0, vmax=0.8, xmin=0, xmax=None, save_path=None, title=None, title_kwargs={}, fig_text=None ): n_neurons, n_timepoints = spks.shape if neuron_bin_size is None: neuron_bin_size = max(1, n_neurons // 500) # default internal rastermap binning print(f"Neuron binning factor (default): {neuron_bin_size}") if neuron_bin_size > 1: sn = bin1d(spks[model.isort], neuron_bin_size, axis=0) ntype = "superneurons" else: sn = spks[model.isort] ntype = "neurons" print(xmax) if xmax is None or xmax < xmin or xmax > sn.shape[1]: xmax = sn.shape[1] print(xmax) sn = sn[:, xmin:xmax] current_time = np.round((xmax - xmin) / fps, 1) current_neurons = sn.shape[0] fig, ax = plt.subplots(figsize=(6, 3), dpi=200) print(f"Plotting from {xmin} : {xmax}") img = ax.imshow(sn[:, xmin:xmax], cmap="gray_r", vmin=vmin, vmax=vmax, aspect="auto") fig.patch.set_facecolor("black") ax.set_facecolor("black") ax.tick_params(axis='both', labelbottom=False, labelleft=False, length=0) for spine in ax.spines.values(): spine.set_visible(False) heatmap_pos = ax.get_position() scalebar_length = heatmap_pos.width * 0.1 # 10% width of heatmap scalebar_duration = np.round(current_time * 0.1) # 10% of the displayed time in heatmap x_start = heatmap_pos.x1 - scalebar_length x_end = heatmap_pos.x1 y_position = heatmap_pos.y0 fig.lines.append(plt.Line2D([x_start, x_end], [y_position - 0.03, y_position - 0.03], transform=fig.transFigure, color='white', linewidth=2, solid_capstyle='butt')) fig.text( x=(x_start + x_end) / 2, y=y_position - 0.045, # slightly below the scalebar s=f"{scalebar_duration:.0f} s", ha="center", va="top", color="white", fontsize=6 ) axins = fig.add_axes([ heatmap_pos.x0, # exactly aligned with heatmap's left edge heatmap_pos.y0 - 0.03, # slightly below the heatmap heatmap_pos.width * 0.1, # 20% width of heatmap 0.015 # height of the colorbar ]) cbar = fig.colorbar(img, cax=axins, orientation="horizontal", ticks=[vmin, vmax]) cbar.ax.tick_params(labelsize=5, colors="white", pad=2) cbar.outline.set_edgecolor("white") fig.text( heatmap_pos.x0, heatmap_pos.y0 - 0.1, # below the colorbar with spacing "z-scored", ha="left", va="top", color="white", fontsize=6 ) scalebar_neurons = int(0.1 * current_neurons) x_position = heatmap_pos.x1 + 0.01 # slightly right of heatmap y_start = heatmap_pos.y0 y_end = y_start + (heatmap_pos.height * scalebar_neurons / current_neurons) line = plt.Line2D( [x_position, x_position], [y_start, y_end], transform=fig.transFigure, color='white', linewidth=2 ) line.set_figure(fig) fig.lines.append(line) fig.text( x=x_position + 0.008, y=y_start, s=f"{scalebar_neurons} {ntype}", ha="left", va="bottom", color="white", fontsize=6, rotation=90 ) if fig_text is None: fig_text = f"Neurons: {spks.shape[1]}, Superneurons: {sn.shape[0]}, n_clusters: {model.n_PCs}, n_PCs: {model.n_clusters}, locality: {model.locality}" fig.text( x=(heatmap_pos.x0 + heatmap_pos.x1) / 2, y=y_start - 0.085, # vertically between existing scalebars s=fig_text, ha="center", va="top", color="white", fontsize=6 ) if title is not None: plt.suptitle(title, **title_kwargs) if save_path is not None: save_path = Path(save_path) save_path.parent.mkdir(parents=True, exist_ok=True) plt.savefig(save_path, dpi=200, facecolor="black", bbox_inches="tight") return fig, ax
[docs] def plot_execution_time(filepath, savepath): """ Plots the execution time for each processing step per z-plane. This function loads execution timing data from a `.npy` file and visualizes the runtime of different processing steps as a stacked bar plot with a black background. Parameters ---------- filepath : str or Path Path to the `.npy` file containing the volume timing stats. savepath : str or Path Path to save the generated figure. Notes ----- - The `.npy` file should contain structured data with `plane`, `registration`, `detection`, `extraction`, `classification`, `deconvolution`, and `total_plane_runtime` fields. """ plane_stats = np.load(filepath) planes = plane_stats["plane"] reg_time = plane_stats["registration"] detect_time = plane_stats["detection"] extract_time = plane_stats["extraction"] total_time = plane_stats["total_plane_runtime"] plt.figure(figsize=(10, 6), facecolor="black") ax = plt.gca() ax.set_facecolor("black") plt.xlabel("Z-Plane", fontsize=14, fontweight="bold", color="white") plt.ylabel("Execution Time (s)", fontsize=14, fontweight="bold", color="white") plt.title("Execution Time per Processing Step", fontsize=16, fontweight="bold", color="white") plt.bar(planes, reg_time, label="Registration", alpha=0.8, color="#FF5733") plt.bar(planes, detect_time, label="Detection", alpha=0.8, bottom=reg_time, color="#33FF57") bars3 = plt.bar(planes, extract_time, label="Extraction", alpha=0.8, bottom=reg_time + detect_time, color="#3357FF") for bar, total in zip(bars3, total_time): height = bar.get_y() + bar.get_height() if total > 1: # Only label if execution time is large enough to be visible plt.text(bar.get_x() + bar.get_width() / 2, height + 2, f"{int(total)}", ha="center", va="bottom", fontsize=12, color="white", fontweight="bold") plt.xticks(planes, fontsize=12, fontweight="bold", color="white") plt.yticks(fontsize=12, fontweight="bold", color="white") plt.grid(axis="y", linestyle="--", alpha=0.4, color="white") ax.spines["bottom"].set_color("white") ax.spines["left"].set_color("white") ax.spines["top"].set_color("white") ax.spines["right"].set_color("white") plt.legend(fontsize=12, facecolor="black", edgecolor="white", labelcolor="white", loc="upper left", bbox_to_anchor=(1, 1)) plt.savefig(savepath, bbox_inches="tight", facecolor="black") plt.show()
[docs] def plot_volume_signal(zstats, savepath): """ Plots the mean fluorescence signal per z-plane with standard deviation error bars. This function loads signal statistics from a `.npy` file and visualizes the mean fluorescence signal per z-plane, with error bars representing the standard deviation. Parameters ---------- zstats : str or Path Path to the `.npy` file containing the volume stats. The output of `get_zstats()`. savepath : str or Path Path to save the generated figure. Notes ----- - The `.npy` file should contain structured data with `plane`, `mean_trace`, and `std_trace` fields. - Error bars represent the standard deviation of the fluorescence signal. """ plane_stats = np.load(zstats) planes = plane_stats["plane"] mean_signal = plane_stats["mean_trace"] std_signal = plane_stats["std_trace"] plt.figure(figsize=(10, 6), facecolor="black") ax = plt.gca() ax.set_facecolor("black") plt.xlabel("Z-Plane", fontsize=14, fontweight="bold", color="white") plt.ylabel("Mean Raw Signal", fontsize=14, fontweight="bold", color="white") plt.title("Mean Fluorescence Signal per Z-Plane", fontsize=16, fontweight="bold", color="white") plt.errorbar(planes, mean_signal, yerr=std_signal, fmt='o-', color="cyan", ecolor="lightblue", elinewidth=2, capsize=4, markersize=6, alpha=0.8, label="Mean ± STD") plt.xticks(planes, fontsize=12, fontweight="bold", color="white") plt.yticks(fontsize=12, fontweight="bold", color="white") plt.grid(axis="y", linestyle="--", alpha=0.4, color="white") ax.spines["bottom"].set_color("white") ax.spines["left"].set_color("white") ax.spines["top"].set_color("white") ax.spines["right"].set_color("white") plt.legend(fontsize=12, facecolor="black", edgecolor="white", labelcolor="white") plt.savefig(savepath, bbox_inches="tight", facecolor="black") plt.show()
def plot_volume_neuron_counts(zstats, savepath): """ Plots the number of accepted and rejected neurons per z-plane. This function loads neuron count data from a `.npy` file and visualizes the accepted vs. rejected neurons as a stacked bar plot with a black background. Parameters ---------- zstats : str, Path Full path to the zstats.npy file. savepath : str or Path Path to directory where generated figure will be saved. Notes ----- - The `.npy` file should contain structured data with `plane`, `accepted`, and `rejected` fields. """ zstats = Path(zstats) if not zstats.is_file(): raise FileNotFoundError(f"{zstats} is not a valid zstats.npy file.") plane_stats = np.load(zstats) savepath = Path(savepath) planes = plane_stats["plane"] accepted = plane_stats["accepted"] rejected = plane_stats["rejected"] savename = savepath.joinpath(f"all_neurons_{accepted}acc_{rejected}rej.png") plt.figure(figsize=(10, 6), facecolor="black") ax = plt.gca() ax.set_facecolor("black") plt.xlabel("Z-Plane", fontsize=14, fontweight="bold", color="white") plt.ylabel("Number of Neurons", fontsize=14, fontweight="bold", color="white") plt.title("Accepted vs. Rejected Neurons per Z-Plane", fontsize=16, fontweight="bold", color="white") bars1 = plt.bar(planes, accepted, label="Accepted Neurons", alpha=0.8, color="#4CAF50") # Light green bars2 = plt.bar(planes, rejected, label="Rejected Neurons", alpha=0.8, bottom=accepted, color="#F57C00") # Light orange for bar in bars1: height = bar.get_height() if height > 0: plt.text(bar.get_x() + bar.get_width() / 2, height / 2, f"{int(height)}", ha="center", va="center", fontsize=12, color="white", fontweight="bold") for bar1, bar2 in zip(bars1, bars2): height1 = bar1.get_height() height2 = bar2.get_height() if height2 > 0: plt.text(bar2.get_x() + bar2.get_width() / 2, height1 + height2 / 2, f"{int(height2)}", ha="center", va="center", fontsize=12, color="white", fontweight="bold") plt.xticks(planes, fontsize=12, fontweight="bold", color="white") plt.yticks(fontsize=12, fontweight="bold", color="white") plt.grid(axis="y", linestyle="--", alpha=0.4, color="white") ax.spines["bottom"].set_color("white") ax.spines["left"].set_color("white") ax.spines["top"].set_color("white") ax.spines["right"].set_color("white") plt.legend(fontsize=12, facecolor="black", edgecolor="white", labelcolor="white") plt.savefig(savename, bbox_inches="tight", facecolor="black") def get_volume_stats(ops_files: list[str | Path], overwrite: bool = True): """ Plots the number of accepted and rejected neurons per z-plane. This function loads neuron count data from a `.npy` file and visualizes the accepted vs. rejected neurons as a stacked bar plot with a black background. Parameters ---------- ops_files : list of str or Path Each item in the list should be a path pointing to a z-lanes `ops.npy` file. The number of items in this list should match the number of z-planes in your session. overwrite : bool If a file already exists, it will be overwritten. Defaults to True. Notes ----- - The `.npy` file should contain structured data with `plane`, `accepted`, and `rejected` fields. """ if ops_files is None: print('No ops files found.') return None plane_stats = {} for i, file in enumerate(ops_files): output_ops = load_ops(file) iscell = np.load(Path(output_ops['save_path']).joinpath('iscell.npy'), allow_pickle=True)[:, 0].astype(bool) traces = np.load(Path(output_ops['save_path']).joinpath('F.npy'), allow_pickle=True) mean_trace = np.mean(traces) std_trace = np.std(traces) num_accepted = np.sum(iscell) num_rejected = np.sum(~iscell) timing = output_ops['timing'] plane_stats[i + 1] = (num_accepted, num_rejected, mean_trace, std_trace, timing, file) # edge case: the common path will be ops.npy if there's only a single file common_path = get_common_path(ops_files) plane_save = os.path.join(common_path, "zstats.npy") plane_stats_npy = np.array( [(plane, accepted, rejected, mean_trace, std_trace, timing["registration"], timing["detection"], timing["extraction"], timing["classification"], timing["deconvolution"], timing["total_plane_runtime"], filepath) for plane, (accepted, rejected, mean_trace, std_trace, timing, filepath) in plane_stats.items()], dtype=[ ("plane", "i4"), ("accepted", "i4"), ("rejected", "i4"), ("mean_trace", "f8"), ("std_trace", "f8"), ("registration", "f8"), ("detection", "f8"), ("extraction", "f8"), ("classification", "f8"), ("deconvolution", "f8"), ("total_plane_runtime", "f8"), ("filepath", "U255") ] ) # if the file doesn't exist, save it if not Path(plane_save).is_file(): np.save(plane_save, plane_stats_npy) # if the file does exist, only save if overwrite is true elif Path(plane_save).is_file() and overwrite: np.save(plane_save, plane_stats_npy) else: print(f"File {plane_save} already exists. Skipping.") return plane_save
[docs] def save_images_to_movie(image_input, savepath, duration=None, format=".mp4"): """ Convert a sequence of saved images into a movie. TODO: move to mbo_utilities. Parameters ---------- image_input : str, Path, or list Directory containing saved segmentation images or a list of image file paths. savepath : str or Path Path to save the video file. duration : int, optional Desired total video duration in seconds. If None, defaults to 1 FPS (1 image per second). format : str, optional Video format: ".mp4" (PowerPoint-compatible), ".avi" (lossless), ".mov" (ProRes). Default is ".mp4". Examples -------- >>> import mbo_utilities as mbo >>> import lbm_suite2p_python as lsp Get all png files autosaved during LBM-Suite2p-Python `run_volume()` >>> segmentation_pngs = mbo.get_files("path/suite3d/results/", "segmentation.png", max_depth=3) >>> lsp.save_images_to_movie(segmentation_pngs, "path/to/save/segmentation.png", format=".mp4") """ savepath = Path(savepath).with_suffix(format) # Ensure correct file extension temp_video = savepath.with_suffix(".avi") # Temporary AVI file for MOV conversion savepath.parent.mkdir(parents=True, exist_ok=True) if isinstance(image_input, (str, Path)): image_dir = Path(image_input) image_files = sorted(glob.glob(str(image_dir / "*.png")) + glob.glob(str(image_dir / "*.jpg")) + glob.glob(str(image_dir / "*.tif"))) elif isinstance(image_input, list): image_files = sorted(map(str, image_input)) else: raise ValueError("image_input must be a directory path or a list of file paths.") if not image_files: return first_image = cv2.imread(image_files[0]) height, width, _ = first_image.shape fps = len(image_files) / duration if duration else 1 if format == ".mp4": fourcc = cv2.VideoWriter_fourcc(*'XVID') video_path = savepath elif format == ".avi": fourcc = cv2.VideoWriter_fourcc(*'HFYU') video_path = savepath elif format == ".mov": fourcc = cv2.VideoWriter_fourcc(*'HFYU') video_path = temp_video else: raise ValueError("Invalid format. Use '.mp4', '.avi', or '.mov'.") video_writer = cv2.VideoWriter(str(video_path), fourcc, max(fps, 1), (width, height)) for image_file in image_files: frame = cv2.imread(image_file) video_writer.write(frame) video_writer.release() if format == ".mp4": ffmpeg_cmd = [ "ffmpeg", "-y", "-i", str(video_path), "-vcodec", "libx264", "-acodec", "aac", "-preset", "slow", "-crf", "18", str(savepath) # Save directly to `savepath` ] subprocess.run(ffmpeg_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) print(f"MP4 saved at {savepath}") elif format == ".mov": ffmpeg_cmd = [ "ffmpeg", "-y", "-i", str(temp_video), "-c:v", "prores_ks", # Use Apple ProRes codec "-profile:v", "3", # ProRes 422 LT "-pix_fmt", "yuv422p10le", str(savepath) ] subprocess.run(ffmpeg_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) temp_video.unlink()
def get_fcells_list(ops_list: list): if not isinstance(ops_list, list): raise ValueError("`ops_list` must be a list") f_cells_list = [] for ops in ops_list: ops = load_ops(ops) f_cells = np.load(Path(ops['save_path']).joinpath('F.npy')) f_cells_list.append(f_cells) return f_cells_list def collect_result_png(ops_list): if not isinstance(ops_list, list): raise ValueError("`ops_list` must be a list") png_list = [] for ops in ops_list: ops = load_ops(ops) f_cells = np.load(Path(ops['save_path']).joinpath('segmentation.png')) png_list.append(f_cells) return png_list