Source code for lbm_caiman_python.batch

import re as regex
from pathlib import Path
from typing import Union
import lbm_mc as mc
from contextlib import contextmanager
import pathlib

COMPUTE_BACKEND_SUBPROCESS = "subprocess"  #: subprocess backend
COMPUTE_BACKEND_SLURM = "slurm"  #: SLURM backend
COMPUTE_BACKEND_LOCAL = "local"

COMPUTE_BACKENDS = [
    COMPUTE_BACKEND_SUBPROCESS,
    COMPUTE_BACKEND_SLURM,
    COMPUTE_BACKEND_LOCAL,
]

DATAFRAME_COLUMNS = [
    "algo",
    "item_name",
    "input_movie_path",
    "params",
    "outputs",
    "added_time",
    "ran_time",
    "algo_duration",
    "comments",
    "uuid",
]


@contextmanager
def _set_posix_windows():
    posix_backup = pathlib.PosixPath
    try:
        pathlib.PosixPath = pathlib.WindowsPath
        yield
    finally:
        pathlib.PosixPath = posix_backup


@contextmanager
def _set_windows_posix():
    """
    Set the Path class to WindowsPath on a POSIX system.
    """
    windows_backup = pathlib.WindowsPath
    try:
        pathlib.WindowsPath = pathlib.PosixPath
        yield
    finally:
        pathlib.WindowsPath = windows_backup


[docs] def load_batch(batch_path: str | Path): """ Load a batch after transfering it from a Windows to a POSIX system or vice versa. Parameters ---------- batch_path : str, Path The path to the batch file. Returns ------- pandas.DataFrame The loaded batch. """ try: with _set_windows_posix(): return mc.load_batch(batch_path) except Exception: with _set_posix_windows(): return mc.load_batch(batch_path)
[docs] def clean_batch(df): """ Clean a batch of DataFrame entries by removing unsuccessful df from storage. This function iterates over the df of the given DataFrame, identifies df where the 'outputs' column is either `None` or a dictionary containing a 'success' key with a `False` value. For each such row, the corresponding item is removed using the `df.caiman.remove_item()` method, and the removal is saved to disk. Parameters ---------- df : pandas.DataFrame The DataFrame to be cleaned. It must have a 'uuid' column for identification and an 'outputs' column containing a dictionary with a 'success' key. Returns ------- pandas.DataFrame The DataFrame reloaded from disk after unsuccessful items have been removed. Notes ----- - If 'outputs' is None or does not contain 'success' as a key with a value of `False`, the row will be removed. Examples -------- >>> import pandas as pd >>> df = pd.DataFrame({ ... 'uuid': ['123', '456', '789'], ... 'outputs': [{'success': True}, {'success': False}, None] ... }) >>> cleaned_df = clean_batch(df) Removing unsuccessful batch row 1. Row 1 deleted. Removing unsuccessful batch row 2. Row 2 deleted. """ for index, row in df.iterrows(): # Check if 'outputs' is a dictionary and has 'success' key with value False if isinstance(row["outputs"], dict) and row["outputs"].get("success") is False or row["outputs"] is None: uuid = row["uuid"] print(f"Removing unsuccessful batch row {row.index}.") df.caiman.remove_item(uuid, remove_data=True, safe_removal=False) print(f"Row {row.index} deleted.") df.caiman.save_to_disk() return df.caiman.reload_from_disk()
[docs] def delete_batch_rows(df, rows_delete, remove_data=False, safe_removal=True): rows_delete = [rows_delete] if isinstance(rows_delete, int) else rows_delete uuids_delete = [row.uuid for i, row in df.iterrows() if i in rows_delete] for uuid in uuids_delete: df.caiman.remove_item(uuid, remove_data=remove_data, safe_removal=safe_removal) df.caiman.save_to_disk() return df
def validate_path(path: Union[str, Path]): if not regex.match("^[A-Za-z0-9@\/\\\:._-]*$", str(path)): raise ValueError( "Paths must only contain alphanumeric characters, " "hyphens ( - ), underscores ( _ ) or periods ( . )" ) return path
[docs] def remove_batch_duplicates(df): """ Remove duplicate items from a batch DataFrame. Parameters ---------- df : pandas.DataFrame The batch DataFrame to remove duplicates from. Returns ------- None """ import hashlib df["hash"] = df.apply(lambda row: hashlib.sha256(row.mcorr.get_output().tobytes()).hexdigest(), axis=1) uuids_to_remove = [] for _, group in df.groupby("hash"): if len(group) > 1: for idx in group.index[1:]: uuid = df.loc[idx, "uuid"] uuids_to_remove.append(uuid) if not uuids_to_remove: print("No duplicates found.") return for uuid in uuids_to_remove: print(f"Removing duplicate item {uuid}.") df.caiman.remove_item(uuid, remove_data=True, safe_removal=False) df.drop(columns="hash", inplace=True) df.caiman.save_to_disk() return df
[docs] def get_batch_from_path(batch_path): """ Load or create a batch at the given batch_path. """ try: df = mc.load_batch(batch_path) print(f"Batch found at {batch_path}") except (IsADirectoryError, FileNotFoundError): print(f"Creating batch at {batch_path}") df = mc.create_batch(batch_path) return df