Source code for simba.roi_tools.ROI_time_bin_calculator

__author__ = "Simon Nilsson; sronilsson@gmail.com"

import itertools
import os
from typing import List, Optional, Union

import numpy as np
import pandas as pd

from simba.data_processors.timebins_movement_calculator import \
    TimeBinsMovementCalculator
from simba.mixins.config_reader import ConfigReader
from simba.mixins.feature_extraction_supplement_mixin import \
    FeatureExtractionSupplemental
from simba.roi_tools.ROI_analyzer import ROIAnalyzer
from simba.utils.checks import check_float
from simba.utils.errors import (BodypartColumnNotFoundError, DuplicationError,
                                FrameRangeError, ROICoordinatesNotFoundError)
from simba.utils.printing import SimbaTimer, stdout_success
from simba.utils.read_write import get_fn_ext, read_data_paths, read_df


[docs]class ROITimebinCalculator(ConfigReader): """ Compute how much time and how many entries animals are making into user-defined ROIs within user-defined time bins. Also compute the average velocity and distance moved within user-defined ROIs split by time-bins. Results are stored in the ``project_folder/logs`` directory of the SimBA project. :param Union[str, os.PathLike] config_path: path to SimBA project config file in Configparser format :param float bin_length: length of time bins in seconds. :param List[str] body_parts: List of body-parts to use as proxy of animal locations. :param float threshold: Filter pose-estimation data detected below defined threshold. :param Optional[bool] movement: If True, compute the distances and velocities within time-bins. Default False. .. important:: Legacy. Use :func:`simba.roi_tools.roi_time_bins_analyzer.ROITimebinAnalyzer`. .. note:: `Example anticipated ROI time-bins entry results <https://github.com/sgoldenlab/simba/blob/master/misc/ROI_time_bins_5.2s_entry_data_20240331125343.csv>`__. `Example anticipated ROI time-bins latency results <https://github.com/sgoldenlab/simba/blob/master/misc/ROI_time_bins_5.2s_time_data_20240331125343.csv>`__. `Example anticipated movement results <https://github.com/sgoldenlab/simba/blob/master/misc/Time_bins_0.5s_movement_results_20240330143150.csv>`__. :example: >>> calculator = ROITimebinCalculator(config_path=r"/Users/simon/Desktop/envs/simba/troubleshooting/two_black_animals_14bp/project_folder/project_config.ini", bin_length=1.0, body_parts=['Nose_1'], threshold=0.00, movement=True) >>> calculator.run() >>> calculator.save() """ def __init__(self, config_path: Union[str, os.PathLike], bin_length: float, body_parts: List[str], data_path: Optional[Union[str, os.PathLike, List[str]]] = None, threshold: Optional[float] = 0.0, movement: Optional[bool] = False): ConfigReader.__init__(self, config_path=config_path) if not os.path.isfile(self.roi_coordinates_path): raise ROICoordinatesNotFoundError(expected_file_path=self.roi_coordinates_path) check_float(name="bin_length", value=bin_length, min_value=10e-6) check_float(name="threshold", value=threshold, min_value=0.0, max_value=1.0) self.data_paths = read_data_paths(path=data_path, default=self.outlier_corrected_paths, default_name=self.outlier_corrected_dir, file_type=self.file_type) self.read_roi_data() self.bin_length, self.body_parts, self.threshold = (bin_length, body_parts, threshold) self.save_path_time = os.path.join(self.logs_path, f"ROI_time_bins_{bin_length}s_time_data_{self.datetime}.csv") self.save_path_entries = os.path.join(self.logs_path, f"ROI_time_bins_{bin_length}s_entry_data_{self.datetime}.csv") for bp in body_parts: if bp not in self.body_parts_lst: raise BodypartColumnNotFoundError(msg=f'The body-part {bp} is not a valid body-part in the SimBA project. Options: {self.body_parts_lst}', source=self.__class__.__name__) if len(set(body_parts)) != len(body_parts): raise DuplicationError(msg=f'All body-part entries have to be unique. Got {body_parts}', source=self.__class__.__name__) self.roi_analyzer = ROIAnalyzer(config_path=self.config_path, data_path=self.outlier_corrected_dir, calculate_distances=False, threshold=threshold, body_parts=body_parts, detailed_bout_data=True) self.roi_analyzer.run() self.animal_names = list(self.roi_analyzer.bp_dict.keys()) self.bp_dict = self.roi_analyzer.bp_dict self.entries_exits_df = self.roi_analyzer.detailed_df self.movement = movement if movement: self.save_path_movement_velocity = os.path.join(self.logs_path, f"ROI_time_bins_{bin_length}s_movement_velocity_data_{self.datetime}.csv") self.movement_timebins = TimeBinsMovementCalculator(config_path=config_path, bin_length=bin_length, body_parts=body_parts, plots=False) self.movement_timebins.run() def run(self): self.results_entries = pd.DataFrame(columns=["VIDEO","SHAPE","ANIMAL","BODY-PART","TIME BIN #","ENTRY COUNT",]) self.results_time = pd.DataFrame(columns=["VIDEO","SHAPE","ANIMAL","BODY-PART","TIME BIN #","TIME INSIDE SHAPE (S)"]) self.results_movement_velocity = pd.DataFrame(columns=["VIDEO","SHAPE","ANIMAL","BODY-PART","TIME BIN #","DISTANCE (CM)","VELOCITY (CM/S)"]) print(f"Analyzing time-bin data for {len(self.data_paths)} video(s)...") for file_cnt, file_path in enumerate(self.data_paths): video_timer = SimbaTimer(start=True) _, self.video_name, _ = get_fn_ext(filepath=file_path) _, px_per_mm, fps = self.read_video_info(video_name=self.video_name) print(f'Analyzing time-bins for {self.video_name} (Video {file_cnt+1}/{len(self.data_paths)})...') frames_per_bin = int(fps * self.bin_length) if frames_per_bin == 0: raise FrameRangeError(msg=f"The specified time-bin length of {self.bin_length} is TOO SHORT for video {self.video_name} which has a specified FPS of {fps}. This results in time bins that are LESS THAN a single frame.", source=self.__class__.__name__) video_frms = list(range(0, len(read_df(file_path=file_path, file_type=self.file_type)))) frame_bins = [video_frms[i : i + (frames_per_bin)] for i in range(0, len(video_frms), frames_per_bin)] self.video_data = self.entries_exits_df[self.entries_exits_df["VIDEO"] == self.video_name] for animal_name, shape_name in list(itertools.product(self.animal_names, self.shape_names)): data_df = self.video_data.loc[(self.video_data["SHAPE NAME"] == shape_name) & (self.video_data["ANIMAL"] == animal_name)] body_part = self.bp_dict[animal_name][0][:-2] entry_frms = list(data_df["START FRAME"]) inside_shape_frms = [list(range(x, y)) for x, y in zip(list(data_df["START FRAME"].astype(int)), list(data_df["END FRAME"].astype(int) + 1))] inside_shape_frms = [i for s in inside_shape_frms for i in s] for bin_cnt, bin_frms in enumerate(frame_bins): frms_inside_roi_in_timebin = [x for x in inside_shape_frms if x in bin_frms] entry_roi_in_timebin = [x for x in entry_frms if x in bin_frms] self.results_time.loc[len(self.results_time)] = [self.video_name,shape_name,animal_name,body_part,bin_cnt,len(frms_inside_roi_in_timebin) / fps] self.results_entries.loc[len(self.results_entries)] = [self.video_name,shape_name,animal_name,body_part,bin_cnt,len(entry_roi_in_timebin)] if self.movement: if len(frms_inside_roi_in_timebin) > 0: bin_move = (self.movement_timebins.movement_dict[self.video_name].iloc[frms_inside_roi_in_timebin].values.flatten().astype(np.float32)) movement, velocity = (FeatureExtractionSupplemental.distance_and_velocity(x=bin_move,fps=fps, pixels_per_mm=1, centimeters=False)) self.results_movement_velocity.loc[len(self.results_movement_velocity)] = [self.video_name, shape_name, animal_name, body_part, bin_cnt, movement, velocity] else: self.results_movement_velocity.loc[len(self.results_movement_velocity)] = [self.video_name, shape_name, animal_name, body_part, bin_cnt, 0, "None"] video_timer.stop_timer() print(f"Video {self.video_name} complete (elapsed time {video_timer.elapsed_time_str}s)") def save(self): self.results_time.sort_values(by=["VIDEO", "SHAPE", "ANIMAL", "TIME BIN #"]).set_index("VIDEO").to_csv(self.save_path_time) self.results_entries.sort_values(by=["VIDEO", "SHAPE", "ANIMAL", "TIME BIN #"]).set_index("VIDEO").to_csv(self.save_path_entries) self.timer.stop_timer() stdout_success(msg=f"ROI time bin entry data saved at {self.save_path_entries}", elapsed_time=self.timer.elapsed_time_str) stdout_success(msg=f"ROI time bin time data saved at {self.save_path_time}", elapsed_time=self.timer.elapsed_time_str) if self.movement: self.results_movement_velocity.sort_values(by=["VIDEO", "SHAPE", "ANIMAL", "TIME BIN #"]).set_index("VIDEO").to_csv(self.save_path_movement_velocity) stdout_success(msg=f"ROI time-bin movement data saved at {self.save_path_movement_velocity}", elapsed_time=self.timer.elapsed_time_str)
# test = ROITimebinCalculator(config_path=r"C:\troubleshooting\spontenous_alternation\project_folder\project_config.ini", # bin_length=0.5, # body_parts=['nose'], # threshold=0.00, # movement=True) # test.run() # test.save() # test = ROITimebinCalculator(config_path=r"C:\troubleshooting\ROI_movement_test\project_folder\project_config.ini", # bin_length=0.5, # body_parts=['Head'], # threshold=0.00, # movement=True) # test.run() # test.save() # test = ROITimebinCalculator(config_path=r"/Users/simon/Desktop/envs/simba/troubleshooting/two_black_animals_14bp/project_folder/project_config.ini", # bin_length=1, # body_parts=['Nose_1'], # threshold=0.00, # movement=True) # test.run() # test.save()