Source code for simba.data_processors.timebins_movement_calculator

__author__ = "Simon Nilsson; sronilsson@gmail.com"

import itertools
import os
from typing import List, Optional, Tuple, Union

import numpy as np
import pandas as pd

from simba.mixins.config_reader import ConfigReader
from simba.mixins.feature_extraction_mixin import FeatureExtractionMixin
from simba.mixins.feature_extraction_supplement_mixin import \
    FeatureExtractionSupplemental
from simba.mixins.plotting_mixin import PlottingMixin
from simba.utils.checks import (
    check_all_file_names_are_represented_in_video_log,
    check_file_exist_and_readable, check_float, check_that_column_exist,
    check_valid_boolean, check_valid_lst, check_valid_tuple)
from simba.utils.enums import TagNames
from simba.utils.errors import FrameRangeError, InvalidInputError, NoDataError
from simba.utils.printing import (SimbaTimer, log_event, stdout_information,
                                  stdout_success)
from simba.utils.read_write import (create_directory,
                                    find_files_of_filetypes_in_directory,
                                    find_time_stamp_from_frame_numbers,
                                    get_fn_ext, read_df)


[docs]class TimeBinsMovementCalculator(ConfigReader, FeatureExtractionMixin): """ Compute aggregate movement and/or velocity statistics in user-defined time-bins. .. note:: `Tutorial <https://github.com/sgoldenlab/simba/blob/master/docs/Scenario2.md#part-4--analyze-machine-results>`__. .. image:: _static/img/TimeBinsMovementCalculator.png :alt: Time Bins Movement Calculator :width: 500 :align: center .. seealso:: For multicore processing, see :class:`simba.data_processors.timebins_movement_calculator_mp.TimeBinsMovementCalculator`. :param Union[str, os.PathLike] config_path: Path to SimBA project config file. :param Union[int, float] bin_length: Time-bin size in seconds. :param Union[List[str], Tuple[str]] body_parts: Body-part names to include in the movement calculations. :param Optional[Union[List[Union[str, os.PathLike]], Union[str, os.PathLike]]] data_path: Optional file path(s) to process. If ``None``, all outlier-corrected files in the project are used. :param bool plots: If ``True``, create per-video movement line plots for each body-part. Default: ``False``. :param verbose (bool): If True, prints progress messages during processing. Default: True. :param float threshold: Confidence threshold used when filtering low-confidence positions. Default: ``0.0``. :param bool distance: If ``True``, compute movement distance per time-bin. Default: ``True``. :param bool velocity: If ``True``, compute velocity per time-bin. Default: ``True``. :param bool transpose: If ``True``, save output in transposed format with one column per time-bin. Default: ``False``. :param bool include_timestamp: If ``True``, include start/end timestamps for each time-bin in saved results. Default: ``False``. :example: >>> calculator = TimeBinsMovementCalculator(config_path='/Users/simon/Desktop/envs/simba/troubleshooting/two_black_animals_14bp/project_folder/project_config.ini', bin_length=0.04, plots=True, body_parts=['Nose_1', 'Nose_2']) >>> calculator.run() """ def __init__(self, config_path: Union[str, os.PathLike], bin_length: Union[int, float], body_parts: Union[List[str], Tuple[str]], data_path: Optional[Union[List[Union[str, os.PathLike]], Union[str, os.PathLike]]] = None, plots: bool = False, verbose: bool = True, threshold: float = 0.00, distance: bool = True, velocity: bool = True, transpose: bool = False, include_timestamp: bool = False): ConfigReader.__init__(self, config_path=config_path) log_event(logger_name=str(self.__class__.__name__), log_type=TagNames.CLASS_INIT.value, msg=self.create_log_msg_from_init_args(locals=locals()),) check_float(name=f"{self.__class__.__name__} TIME BIN", value=bin_length, allow_negative=False, allow_zero=False) if isinstance(body_parts, list): check_valid_lst(data=body_parts, source=f'{self.__class__.__name__} body_parts', min_len=1, valid_dtypes=(str,), valid_values=self.body_parts_lst) elif isinstance(body_parts, tuple): check_valid_tuple(x=body_parts, source=f'{self.__class__.__name__} body_parts', minimum_length=1, valid_dtypes=(str,), accepted_values=self.body_parts_lst) else: raise InvalidInputError(msg='Body-parts has to be a list of tuple of strings', source=f'{self.__class__.__name__} body_parts') if data_path is None: if len(self.outlier_corrected_paths) == 0: raise NoDataError(msg=f'No data files found in {self.outlier_corrected_dir}', source=self.__class__.__name__) self.file_paths = self.outlier_corrected_paths elif isinstance(data_path, list): _ = [check_file_exist_and_readable(file_path=x, raise_error=True) for x in data_path] self.file_paths = data_path elif os.path.isdir(data_path): self.file_paths = find_files_of_filetypes_in_directory(directory=self.file_paths, extensions=('.csv',), raise_warning=False, raise_error=True, as_dict=False) elif isinstance(data_path, str): check_file_exist_and_readable(file_path=data_path, raise_error=True) self.file_paths = [data_path] check_valid_boolean(value=[plots], source=f'{self.__class__.__name__} plots', raise_error=True) check_valid_boolean(value=[verbose], source=f'{self.__class__.__name__} verbose', raise_error=True) check_valid_boolean(value=distance, source=f'{self.__class__.__name__} distance', raise_error=True) check_valid_boolean(value=velocity, source=f'{self.__class__.__name__} velocity', raise_error=True) check_valid_boolean(value=transpose, source=f'{self.__class__.__name__} transpose', raise_error=True) check_valid_boolean(value=include_timestamp, source=f'{self.__class__.__name__} include_timestamp', raise_error=True) check_float(name=f'{self.__class__.__name__} threshold', value=threshold, allow_negative=False) self.verbose, self.distance, self.velocity, self.transpose, self.include_timestamp = verbose, distance, velocity, transpose, include_timestamp self.threshold = threshold if not distance and not velocity: raise InvalidInputError(msg='distance AND velocity are both False. To compute movement metrics, set at least one value to True.', source=self.__class__.__name__) self.col_headers, self.bp_dict = [], {} for bp_cnt, bp in enumerate(body_parts): self.col_headers.extend((f"{bp}_x", f"{bp}_y", f"{bp}_p")) animal_name = self.find_animal_name_from_body_part_name(bp_name=bp, bp_dict=self.animal_bp_dict) self.bp_dict[bp_cnt] = {animal_name: [f"{bp}_x", f"{bp}_y", f"{bp}_p"]} self.animal_combinations = list(itertools.combinations(self.animal_bp_dict, 2)) self.bin_length, self.plots = bin_length, plots if verbose: stdout_information(msg=f"Processing {len(self.file_paths)} video(s) for time-bins movement data...") def __create_plots(self): timer = SimbaTimer(start=True) stdout_information(msg="Creating time-bin movement plots...") plots_dir = os.path.join( self.project_path, "logs", f"time_bin_movement_plots_{self.datetime}") create_directory(paths=plots_dir, overwrite=True) y_max = -np.inf for video_name in self.results["VIDEO"].unique(): video_df = self.results.loc[(self.results["VIDEO"] == video_name) & (self.results["MEASUREMENT"] == "Movement (cm)")] for body_part in video_df["BODY-PART"].unique(): body_part_df = (video_df[video_df["BODY-PART"] == body_part].reset_index(drop=True).sort_values(by=["TIME BIN #"])) body_part_df["VALUE"] = body_part_df["VALUE"].astype(float) y_max = max(y_max, np.max(body_part_df["VALUE"])) for video_name in self.results["VIDEO"].unique(): video_df = self.results.loc[(self.results["VIDEO"] == video_name) & (self.results["MEASUREMENT"] == "Movement (cm)")] video_df["TIME BIN #"] = video_df["TIME BIN #"].astype(int) for body_part in video_df["BODY-PART"].unique(): body_part_df = (video_df[video_df["BODY-PART"] == body_part] .reset_index(drop=True) .sort_values(by=["TIME BIN #"])) body_part_df[f"Time bin # (bin length {self.bin_length}s)"] = (body_part_df["TIME BIN #"]) body_part_df["VALUE"] = body_part_df["VALUE"].astype(float) _ = PlottingMixin.make_line_plot(data=[body_part_df["VALUE"].astype(float).values], colors=['Green'], save_path=os.path.join(plots_dir, f"{video_name}_{body_part}.png"), title=video_name, y_max=int(y_max), line_opacity=0.8, y_lbl="DISTANCE (CM)", x_lbl=f"TIME BIN # (BIN LENGTH {self.bin_length}s)", x_tick_lbls_as_int=True, y_tick_lbls_as_int=True, x_tick_cnt=body_part_df["VALUE"].astype(float).values.shape[0]+1) timer.stop_timer() stdout_success(msg=f"Time bin movement plots saved in {plots_dir}", elapsed_time=timer.elapsed_time_str, source=self.__class__.__name__) def _remove_low_confidence_positions(self, arr, threshold): return arr[arr[:, -1] >= threshold]
[docs] def run(self): video_dict, self.out_df_lst = {}, [] self.movement_dict = {} self.save_path = os.path.join( self.project_path, "logs", f"Time_bins_{self.bin_length}s_movement_results_{self.datetime}.csv") check_all_file_names_are_represented_in_video_log(video_info_df=self.video_info_df, data_paths=self.file_paths) for file_cnt, file_path in enumerate(self.file_paths): video_timer = SimbaTimer(start=True) _, video_name, _ = get_fn_ext(file_path) if self.verbose: stdout_information(msg=f"Processing time-bin movements ({self.bin_length}s) for video {video_name} ({str(file_cnt+1)}/{str(len(self.file_paths))})...") video_dict[video_name] = {} video_settings, px_per_mm, fps = self.read_video_info(video_name=video_name) fps, self.movement_cols, self.velocity_cols = int(fps), set(), set() bin_length_frames = int(fps * self.bin_length) if bin_length_frames == 0: raise FrameRangeError(msg=f"The specified time-bin length of {self.bin_length} is TOO SHORT for video {video_name} which has a specified FPS of {fps}. This results in time bins that are LESS THAN a single frame.", source=self.__class__.__name__,) self.data_df = read_df(file_path, self.file_type) check_that_column_exist(df=self.data_df, column_name=self.col_headers, file_name=file_path) self.data_df, results = self.data_df[self.col_headers], [] self.shifted_df = self.create_shifted_df(df=self.data_df) for animal_data in self.bp_dict.values(): animal_name, animal_bps = list(animal_data.keys())[0], list(animal_data.values())[0] bp_time_1, bp_time_2 = (self.shifted_df[animal_bps].values[:, :2], self.shifted_df[[f"{animal_bps[0]}_shifted", f"{animal_bps[1]}_shifted"]].values,) self.movement_dict[video_name] = pd.DataFrame(self.framewise_euclidean_distance(location_1=bp_time_1.astype(np.float64), location_2=bp_time_2.astype(np.float64), px_per_mm=np.float64(px_per_mm), centimeter=True), columns=["VALUE"]) animal_data = self.data_df[animal_bps].values.astype(np.float32) movement_lists = [animal_data[i : i + bin_length_frames] for i in range(0, animal_data.shape[0], bin_length_frames)] last_valid_xy = None for bin, movement_bin_positions in enumerate(movement_lists): bin_times = find_time_stamp_from_frame_numbers(start_frame=int(bin_length_frames*bin), end_frame=min(int(bin_length_frames*(bin+1)), len(self.data_df)), fps=fps) if self.threshold > 0.0: movement_bin_positions = self._remove_low_confidence_positions(arr=movement_bin_positions, threshold=self.threshold) movement_bin_positions = movement_bin_positions[:, :2] if movement_bin_positions.shape[0] > 0 and last_valid_xy is not None: movement_bin_positions = np.vstack((last_valid_xy, movement_bin_positions)) if movement_bin_positions.shape[0] > 0: last_valid_xy = movement_bin_positions[-1:].copy() #movement_bin_positions_shifted = self.create_shifted_array(data=movement_bin_positions, periods=1) #movement_df = pd.DataFrame(self.framewise_euclidean_distance(location_1=movement_bin_positions.astype(np.float64), location_2=movement_bin_positions_shifted.astype(np.float64), px_per_mm=1, centimeter=False), columns=["VALUE"]) movement, velocity = (FeatureExtractionSupplemental.distance_and_velocity(x=movement_bin_positions, fps=fps, pixels_per_mm=px_per_mm, centimeters=True)) if self.distance: results.append({"VIDEO": video_name,"TIME BIN #": bin, "START TIME": bin_times[0], "END TIME": bin_times[1], "ANIMAL": animal_name,"BODY-PART": animal_bps[0][:-2],"MEASUREMENT": "Movement (cm)","VALUE": movement}) if self.velocity: results.append({"VIDEO": video_name,"TIME BIN #": bin, "START TIME": bin_times[0], "END TIME": bin_times[1], "ANIMAL": animal_name,"BODY-PART": animal_bps[0][:-2],"MEASUREMENT": "Velocity (cm/s)","VALUE": velocity}) results = pd.DataFrame(results).reset_index(drop=True) self.out_df_lst.append(results) video_timer.stop_timer() if self.verbose: stdout_information(msg=f"Time-bin movement calculations for video {video_name} complete...", elapsed_time=video_timer.elapsed_time_str)
[docs] def save(self): self.results = pd.concat(self.out_df_lst, axis=0).sort_values(by=["VIDEO", "TIME BIN #", "MEASUREMENT", "ANIMAL"])[["VIDEO", "TIME BIN #", "START TIME", "END TIME", "ANIMAL", "BODY-PART", "MEASUREMENT", "VALUE"]] if not self.include_timestamp: self.results = self.results.drop(["START TIME", "END TIME"], axis=1) if self.plots: self.__create_plots() if self.transpose: self.results = self.results.pivot_table(index=["VIDEO", "ANIMAL", "BODY-PART", "MEASUREMENT"], columns="TIME BIN #", values="VALUE").reset_index() self.results.set_index("VIDEO").to_csv(self.save_path) self.timer.stop_timer() if self.verbose: stdout_success(msg=f"Movement time-bins results saved at {self.save_path}", elapsed_time=self.timer.elapsed_time_str, source=self.__class__.__name__)
# test = TimeBinsMovementCalculator(config_path=r"E:\troubleshooting\mitra_pbn\mitra_pbn\project_folder\project_config.ini", # body_parts= ('center',), # bin_length=60, # transpose=True, # threshold=0.784, # velocity=False, # plots=False) # test.run() # test.save() # test = TimeBinsMovementCalculator(config_path=r"E:\troubleshooting\mitra_emergence_hour\project_folder\project_config.ini", # body_parts=['center'], # bin_length=600, # plots=False, # include_timestamp=False, # transpose=True, # velocity=False) # test.run() # test.save() # test = TimeBinsMovementCalculator(config_path=r"C:\troubleshooting\mitra\project_folder\project_config.ini", # body_parts=['Nose'], #['Simon CENTER OF GRAVITY', 'JJ CENTER OF GRAVITY', 'Animal_1 CENTER OF GRAVITY'] # bin_length=10, plots=True) # test.run() # test.save() # test = TimeBinsMovementCalculator(config_path=r"C:\troubleshooting\two_black_animals_14bp\project_folder\project_config.ini", # bin_length=0.1, # plots=True, # body_parts=['Nose_1']) # test.run() # test.save()