import argparse
import multiprocessing
import os
import sys
from typing import Optional, Union
import numpy as np
import pandas as pd
from simba.data_processors.find_animal_blob_location import (
get_blob_vertices_from_video, get_left_right_points,
get_nose_tail_from_vertices, stabilize_body_parts)
from simba.mixins.geometry_mixin import GeometryMixin
from simba.utils.checks import (check_if_dir_exists, check_instance, check_int,
check_nvidea_gpu_available, check_valid_dict)
from simba.utils.data import (resample_geometry_vertices, savgol_smoother,
terminate_cpu_pool)
from simba.utils.enums import Defaults
from simba.utils.errors import SimBAGPUError
from simba.utils.lookups import get_current_time
from simba.utils.printing import SimbaTimer, stdout_success
from simba.utils.read_write import (find_core_cnt, get_video_meta_data,
read_pickle, remove_files, write_df)
from simba.video_processors.video_processing import (video_bg_subtraction,
video_bg_subtraction_mp)
CENTER_X = 'center_x'
CENTER_Y = 'center_y'
NOSE_X = 'nose_x'
NOSE_Y = 'nose_y'
TAIL_X = 'tail_x'
TAIL_Y = 'tail_y'
LEFT_X = 'left_x'
LEFT_Y = 'left_y'
RIGHT_X = 'right_x'
RIGHT_Y = 'right_y'
VERTICES = 'vertices'
IN_DIR = 'input_dir'
OUT_DIR = 'output_dir'
VIDEO_DATA = 'video_data'
VERTICE_CNT = 'vertice_cnt'
REFERENCE = 'reference'
CORE_CNT = 'core_cnt'
CLOSING_KERNEL = 'close_kernel'
CLOSING_ITS = 'close_iterations'
OPENING_ITS = 'open_iterations'
OPENING_KERNEL = 'open_kernel'
SAVE_BG_VIDEOS = 'save_bg_videos'
GPU = 'gpu'
METHOD = 'method'
THRESHOLD = 'threshold'
REQUIRED_KEYS = (IN_DIR, OUT_DIR, GPU, CORE_CNT, VIDEO_DATA, VERTICE_CNT, SAVE_BG_VIDEOS, CLOSING_ITS, OPENING_ITS)
[docs]class BlobTrackingExecutor():
"""
Perform animal blob tracking from video data using background subtraction, blob location detection,
and geometry-based processing. The class handles the processing of video frames in parallel, tracks blob locations
across frames, and saves the results to CSV files.
The ``data`` parameter should be a dictionary or path to a pickle file containing a dictionary with the following required keys:
- ``input_dir``: Input directory containing videos
- ``output_dir``: Output directory for results
- ``gpu``: Boolean indicating whether to use GPU acceleration
- ``core_cnt``: Number of CPU cores to use
- ``video_data``: Dictionary mapping video names to their configuration (threshold, reference video, etc.)
- ``vertice_cnt``: Number of vertices to use when sampling blob geometry
- ``save_bg_videos``: Boolean indicating whether to save background-subtracted videos
- ``close_iterations``: Number of iterations for morphological closing
- ``open_iterations``: Number of iterations for morphological opening
.. seealso::
For GUI used to create ``data``, see :func:`simba.video_processors.batch_process_menus.BatchProcessFrame`.
This is a wrapper for calling background subtraction through :func:`simba.video_processors.video_processing.video_bg_subtraction_mp` or :func:`~simba.video_processors.video_processing.video_bg_subtraction`.
For finding the animal, this class wraps :func:`simba.data_processors.find_animal_blob_location.get_blob_vertices_from_video`.
For GPU based bg subtraction methods, see :func:`~simba.data_processors.cuda.image.bg_subtraction_cuda` or :func:`~simba.data_processors.cuda.image.bg_subtraction_cupy`.
:param Union[dict, str, os.PathLike] data: Path to a pickle file or a dictionary containing the necessary parameters for blob tracking. The configuration should include keys for the video data, input/output directories, and other processing settings.
:param Optional[int] batch_size: The batch size for blob location detection. If None, automatically calculated based on available RAM. Default: None.
:param bool rostrocaudal: If True, compute nose and tail coordinates. Default: True.
:param bool mediolateral: If True, compute left and right point coordinates. Default: True.
:param bool center: If True, compute center coordinates. Default: True.
:example:
>>> tracker = BlobTrackingExecutor(data=r"C:/troubleshooting/mitra/test/.temp/blob_definitions.pickle")
>>> tracker.run()
>>> tracker = BlobTrackingExecutor(data=r"C:/troubleshooting/mitra/test/.temp/blob_definitions.pickle", batch_size=5000)
>>> tracker.run()
"""
def __init__(self,
data: Union[dict, str, os.PathLike],
batch_size: Optional[int] = None,
rostrocaudal : bool = True,
mediolateral: bool = True,
center: bool = True):
self.timer = SimbaTimer(start=True)
check_instance(source=f'{self.__class__.__name__} data', instance=data, accepted_types=(str, os.PathLike, dict), raise_error=True)
if isinstance(data, (str, os.PathLike)):
data = read_pickle(data_path=data)
check_valid_dict(x=data, valid_key_dtypes=(str,), required_keys=REQUIRED_KEYS)
check_if_dir_exists(in_dir=data[IN_DIR], source=self.__class__.__name__, raise_error=True)
check_if_dir_exists(in_dir=data[OUT_DIR], source=self.__class__.__name__, raise_error=True)
check_int(name=f'{self.__class__.__name__} core_cnt', value=data[CORE_CNT], min_value=1, max_value=find_core_cnt()[0], raise_error=True)
if batch_size is not None: check_int(name=f'{self.__class__.__name__} batch_size', value=batch_size, min_value=1, raise_error=True)
self.data, self.gpu, self.core_cnt, self.batch_size, self.vertice_cnt, self.closing_iterations = data, data[GPU], data[CORE_CNT], batch_size, data[VERTICE_CNT], data[CLOSING_ITS]
self.save_bg_videos, self.save_dir, self.opening_iterations = data[SAVE_BG_VIDEOS], data[OUT_DIR], data[OPENING_ITS]
self.rostrocaudal, self.mediolateral, self.center = rostrocaudal, mediolateral, center
if self.gpu and not check_nvidea_gpu_available():
raise SimBAGPUError(msg='GPU is set to True, but SImBA could not find a GPU on the machine', source=self.__class__.__name__)
self.vertice_col_names = []
self.video_cnt = len(list(self.data[VIDEO_DATA].keys()))
for i in range(self.vertice_cnt):
self.vertice_col_names.append(f"vertice_{i}_x"); self.vertice_col_names.append(f"vertice_{i}_y")
def run(self):
self.pool = multiprocessing.Pool(self.core_cnt, maxtasksperchild=Defaults.MAXIMUM_MAX_TASK_PER_CHILD.value) if self.core_cnt > 1 else None
self._remove_bgs()
self._find_blobs()
self.timer.stop_timer()
if self.pool is not None: terminate_cpu_pool(pool=self.pool)
stdout_success(msg=f'Animal tracking complete. Results save din directory {self.data[OUT_DIR]}', elapsed_time=self.timer.elapsed_time_str)
def _interpolate_vertices(self, arr: np.ndarray) -> np.ndarray:
df = pd.DataFrame(arr.reshape(arr.shape[0], -1))
df = df.fillna(0).clip(lower=0).astype(np.int32)
for c in df.columns:
bp_df = df[[c]].astype(np.float32)
missing_idx = bp_df[bp_df[c] <= 0.0].index
bp_df.loc[missing_idx, c] = np.nan
if bp_df[c].notna().sum() >= 2:
bp_df[c] = bp_df[c].interpolate(method='nearest', axis=0).ffill().bfill()
else:
bp_df[c] = bp_df[c].fillna(0)
df.update(bp_df)
return df.values.reshape(arr.shape).astype(np.int32)
def _remove_bgs(self):
bg_timer = SimbaTimer(start=True)
self.bg_video_paths = []
for video_cnt, (video_name, video_data) in enumerate(self.data[VIDEO_DATA].items()):
video_meta = get_video_meta_data(video_path=video_data['video_path'])
bg_video_path = os.path.join(self.save_dir, f'{video_name}.mp4')
self.bg_video_paths.append(bg_video_path)
close_kernel_size = None if video_data[CLOSING_KERNEL] is None else tuple(video_data[CLOSING_KERNEL])
opening_kernel_size = None if video_data[OPENING_KERNEL] is None else tuple(video_data[OPENING_KERNEL])
print(f'Starting background subtraction on video {video_name} (video {video_cnt+1}/{self.video_cnt}, frame count: {video_meta["frame_count"]}, cores: {self.core_cnt}, gpu: {self.gpu}, {get_current_time()})')
if (self.core_cnt == 1):
video_bg_subtraction(video_path=video_data['video_path'],
bg_video_path=video_data[REFERENCE],
threshold=video_data[THRESHOLD],
save_path=bg_video_path,
verbose=True,
fg_color=(255, 255, 255),
closing_kernel_size=close_kernel_size,
closing_iterations=self.closing_iterations,
opening_kernel_size=opening_kernel_size,
opening_iterations=self.opening_iterations)
else:
video_bg_subtraction_mp(video_path=video_data['video_path'],
bg_video_path=video_data[REFERENCE],
threshold=video_data[THRESHOLD],
save_path=bg_video_path,
verbose=True,
gpu=self.gpu,
core_cnt=self.core_cnt,
fg_color=(255, 255, 255),
closing_kernel_size=close_kernel_size,
closing_iterations=self.closing_iterations,
opening_kernel_size=opening_kernel_size,
opening_iterations=self.opening_iterations,
pool=self.pool)
bg_timer.stop_timer()
print(f'Background subtraction COMPLETE: videos saved at {self.save_dir}, (elapsed time: {bg_timer.elapsed_time_str}s), {get_current_time()}')
def _find_blobs(self):
blob_timer = SimbaTimer(start=True)
for video_cnt, (video_name, video_data) in enumerate(self.data[VIDEO_DATA].items()):
video_timer, geometries = SimbaTimer(start=True), None
video_meta = get_video_meta_data(video_path=video_data['video_path'])
temp_video_path = os.path.join(self.save_dir, f'{video_name}.mp4')
save_path = os.path.join(self.data[OUT_DIR], f'{video_meta["video_name"]}.csv')
inclusion_zone = None if 'inclusion_zones' not in video_data.keys() else video_data['inclusion_zones']
window_size = None if 'window_size' not in video_data.keys() else video_data['window_size']
vertices = get_blob_vertices_from_video(video_path=temp_video_path,
pool=self.pool,
gpu=self.gpu,
verbose=True,
core_cnt=self.core_cnt,
batch_size=self.batch_size,
inclusion_zone=inclusion_zone,
window_size=window_size,
convex_hull=False,
vertice_cnt=self.vertice_cnt)
vertices = self._interpolate_vertices(arr=vertices)
results = pd.DataFrame()
if video_data['buffer_size'] is not None:
geometries = GeometryMixin.bodyparts_to_polygon(data=vertices, parallel_offset=video_data['buffer_size'], convex_hull=False, simplify_tolerance=0.1)
vertices = [np.array(x.exterior.coords).astype(np.int32) for x in geometries]
vertices = resample_geometry_vertices(vertices=vertices, vertice_cnt=self.vertice_cnt)
if video_data['smoothing_time'] is not None:
vertices = savgol_smoother(data=vertices.reshape(vertices.shape[0], -1), fps=video_meta['fps'], time_window=video_data['smoothing_time'], source=self.__class__.__name__).astype(np.int32)
vertices = vertices.reshape(vertices.shape[0], self.vertice_cnt, 2)
if self.center or self.mediolateral or self.rostrocaudal:
geometries = geometries if geometries != None else GeometryMixin.bodyparts_to_polygon(data=vertices, convex_hull=False, simplify_tolerance=0.1)
centers = GeometryMixin.get_center(shape=geometries).astype(np.int32)
results[CENTER_X], results[CENTER_Y] = centers[:, 0], centers[:, 1]
if self.rostrocaudal or self.mediolateral:
nose, tail = get_nose_tail_from_vertices(vertices=vertices, fps=video_meta['fps'], smooth_factor=1.0)
#nose, tail = stabilize_body_parts(bp_1=nose, bp_2=tail, center_positions=centers)
results[NOSE_X], results[NOSE_Y] = nose[:, 0], nose[:, 1]
results[TAIL_X], results[TAIL_Y] = tail[:, 0], tail[:, 1]
if self.mediolateral:
left, right = get_left_right_points(hull_vertices=vertices, anterior=nose, center=centers, posterior=tail)
results[LEFT_X], results[LEFT_Y] = left[:, 0], left[:, 1]
results[RIGHT_X], results[RIGHT_Y] = right[:, 0], right[:, 1]
vertices = pd.DataFrame(vertices.reshape(vertices.shape[0], (self.vertice_cnt*2)).astype(np.int32), columns=self.vertice_col_names)
results = pd.concat([results, vertices], axis=1).reset_index(drop=True).fillna(0)
write_df(df=results, file_type='csv', save_path=save_path)
video_timer.stop_timer()
print(f'Animal blob tracking data for video {video_meta["video_name"]} saved at {save_path}, (elapsed time: {video_timer.elapsed_time_str}s).')
blob_timer.stop_timer()
if not self.save_bg_videos:
remove_files(file_paths=self.bg_video_paths, raise_error=False)
print(f'Blob tracking COMPLETE: data saved at {self.save_dir}, (elapsed time: {blob_timer.elapsed_time_str}s')
# if __name__ == "__main__" and not hasattr(sys, 'ps1'):
# parser = argparse.ArgumentParser(description="Execute Blob tracking in SimBA.")
# parser.add_argument('--data', type=str, required=True, help='Path to the pickle holding the parameters for performing blob tracking')
# args = parser.parse_args()
# tracker = BlobTrackingExecutor(data=args.data)
# tracker.run()
# DATA_PATH = r"D:\troubleshooting\maplight_ri\project_folder\blob\data\blob_definitions.pickle"
# tracker = BlobTrackingExecutor(data=DATA_PATH)
# tracker.run()
# #DATA_PATH = r"/mnt/d/open_field_3/sample/.temp/blob_definitions.h5"
# #DATA_PATH = r"D:\open_field_3\sample\blob_data\blob_definitions.json"
# # DATA_PATH = r"D:\EPM\sampled\.temp\blob_definitions.h5"
# #DATA_PATH = r"D:\EPM\sample_2\.temp\blob_definitions.h5"
#
# #
# DATA_PATH = r"D:\open_field_2\sample\clipped_10min\data\blob_definitions.json"
#
#
# #DATA_PATH = r"D:\open_field_4\data\blob_definitions.json"
# DATA_PATH = r"D:\open_field\data\blob_definitions.json"
#
# DATA_PATH = r"D:\EPM_3\out\blob_definitions.json"
# # # # # # # # # #
# # # # # # # # # #
# tracker = BlobTrackingExecutor(data=DATA_PATH)
# tracker.run()
# # # #
# # #
# #
# data = {'input_dir': r'C:\\troubleshooting\\mitra\\test',
# 'output_dir': r'C:\\troubleshooting\\mitra\\test\\blob_data',
# 'gpu': False,
# 'core_cnt': 32,
# 'video_data': {r'C:\\troubleshooting\\mitra\\test\\501_MA142_Gi_Saline_0515.mp4': {'threshold': 50, 'method': 'absolute', 'window_size': 'None', 'window_weight': 'None', 'reference': r'C:/troubleshooting/mitra/test\background_dir\501_MA142_Gi_Saline_0515.mp4', 'visualize': False, 'exclusion_zones': None,
# 'inclusion_zones': {'inclusion_zone': {'Video': '501_MA142_Gi_Saline_0515', 'Shape_type': 'rectangle', 'Name': 'inclusion_zone', 'Color name': 'Red', 'Color BGR': (0, 0, 255), 'Thickness': 5, 'Center_X': 365, 'Center_Y': 293, 'topLeftX': 204, 'topLeftY': 131, 'Bottom_right_X': 526, 'Bottom_right_Y': 455, 'width': 322, 'height': 324, 'width_cm': 32.2, 'height_cm': 32.4, 'area_cm': 1043.28, 'Tags': {'Center tag': (365, 293), 'Top left tag': (204, 131), 'Bottom right tag': (526, 455), 'Top right tag': (526, 131), 'Bottom left tag': (204, 455), 'Top tag': (365, 131), 'Right tag': (526, 293), 'Left tag': (204, 293), 'Bottom tag': (365, 455)}, 'Ear_tag_size': 15}}}}}
#
# data = {'input_dir': r'C:\\troubleshooting\\mitra\\test',
# 'output_dir': r'C:\\troubleshooting\\mitra\\test\\blob_data',
# 'gpu': False,
# 'core_cnt': 32,
# 'video_data': {r'C:\\troubleshooting\\mitra\\test\\501_MA142_Gi_Saline_0515': {'threshold': 30, 'method': 'absolute', 'window_size': 'None', 'window_weight': 'None', 'reference': 'C:/troubleshooting/mitra/test\\501_MA142_Gi_Saline_0515.mp4', 'visualize': False, 'exclusion_zones': None, 'inclusion_zones': None},
# r'C:\\troubleshooting\\mitra\\test\\502_MA141_Gi_CNO_0514': {'threshold': 30, 'method': 'absolute', 'window_size': 'None', 'window_weight': 'None', 'reference': 'C:/troubleshooting/mitra/test\\502_MA141_Gi_CNO_0514.mp4', 'visualize': False, 'exclusion_zones': None, 'inclusion_zones': None},
# r'C:\\troubleshooting\\mitra\\test\\503_MA109_Gi_CNO_0521': {'threshold': 30, 'method': 'absolute', 'window_size': 'None', 'window_weight': 'None', 'reference': 'C:/troubleshooting/mitra/test\\503_MA109_Gi_CNO_0521.mp4', 'visualize': False, 'exclusion_zones': None, 'inclusion_zones': None}}}
#
#data = read_pickle(data_path=r"D:\troubleshooting\maplight_ri\project_folder\blob\batch_out_6")
# # data['gpu'] = True
# # data['core_cnt'] = 16
# tracker = BlobTrackingExecutor(data=data)
# tracker.run()