Source code for simba.data_processors.timebins_movement_calculator_mp

__author__ = "Simon Nilsson; sronilsson@gmail.com"

import functools
import itertools
import multiprocessing
import os
import platform
from typing import List, Optional, Tuple, Union

import numpy as np
import pandas as pd

from simba.mixins.config_reader import ConfigReader
from simba.mixins.feature_extraction_mixin import FeatureExtractionMixin
from simba.mixins.feature_extraction_supplement_mixin import \
    FeatureExtractionSupplemental
from simba.mixins.plotting_mixin import PlottingMixin
from simba.utils.checks import (
    check_all_file_names_are_represented_in_video_log, check_float, check_int,
    check_that_column_exist, check_valid_boolean, check_valid_lst,
    check_valid_tuple)
from simba.utils.data import get_cpu_pool, terminate_cpu_pool
from simba.utils.enums import TagNames
from simba.utils.errors import FrameRangeError, InvalidInputError, NoDataError
from simba.utils.printing import (SimbaTimer, log_event, stdout_information,
                                  stdout_success)
from simba.utils.read_write import (create_directory, find_core_cnt,
                                    find_files_of_filetypes_in_directory,
                                    find_time_stamp_from_frame_numbers,
                                    get_fn_ext, read_df, read_video_info)


def _time_bin_movement_helper(data: list,
                              bin_length: int,
                              verbose: bool,
                              video_info_df: pd.DataFrame,
                              bp_headers: list,
                              bp_dict: dict,
                              threshold: float,
                              distance: bool,
                              velocity: bool):


    def _remove_low_confidence_positions(arr, threshold):
        return arr[arr[:, -1] >= threshold]

    batch_id, file_paths = data
    video_dict, movement_dict, batch_results = {}, {}, []
    for file_cnt, file_path in enumerate(file_paths):
        video_timer = SimbaTimer(start=True)
        _, video_name, _ = get_fn_ext(file_path)
        if verbose: print(f"Processing time-bin movements ({bin_length}s) for video {video_name} ({str(file_cnt+1)}/{str(len(file_paths))}, core batch: {batch_id})...")
        video_dict[video_name] = {}
        video_settings, px_per_mm, fps = read_video_info(video_name=video_name, video_info_df=video_info_df)
        fps, movement_cols, velocity_cols = fps, set(), set()
        bin_length_frames = int(fps * bin_length)
        if bin_length_frames == 0:
            raise FrameRangeError(msg=f"The specified time-bin length of {bin_length} is TOO SHORT for video {video_name} which has a specified FPS of {fps}. This results in time bins that are LESS THAN a single frame.", source=_time_bin_movement_helper.__name__,)
        data_df = read_df(file_path)
        check_that_column_exist(df=data_df, column_name=bp_headers, file_name=file_path)
        data_df = data_df[bp_headers]
        data_df = FeatureExtractionMixin().create_shifted_df(df=data_df)
        video_results = []
        for animal_data in bp_dict.values():
            name, animal_bps = list(animal_data.keys())[0], list(animal_data.values())[0]
            bp_time_1, bp_time_2 = (data_df[animal_bps].values, data_df[[f"{animal_bps[0]}_shifted", f"{animal_bps[1]}_shifted"]].values,)
            movement_dict[video_name] = pd.DataFrame(FeatureExtractionMixin().framewise_euclidean_distance(location_1=bp_time_1.astype(np.float64)[:, :2], location_2=bp_time_2.astype(np.float64)[:, :2], px_per_mm=np.float64(px_per_mm), centimeter=True), columns=["VALUE"])
            animal_data = data_df[animal_bps].values.astype(np.float32)
            movement_lists = [animal_data[i: i + bin_length_frames] for i in range(0, animal_data.shape[0], bin_length_frames)]
            last_valid_xy = None
            for bin, movement_bin_positions in enumerate(movement_lists):
                bin_times = find_time_stamp_from_frame_numbers(start_frame=int(bin_length_frames * bin), end_frame=min(int(bin_length_frames * (bin + 1)), len(data_df)), fps=fps)
                if threshold > 0.0:
                    movement_bin_positions = _remove_low_confidence_positions(arr=movement_bin_positions, threshold=threshold)
                movement_bin_positions = movement_bin_positions[:, :2]
                if movement_bin_positions.shape[0] > 0 and last_valid_xy is not None:
                    movement_bin_positions = np.vstack((last_valid_xy, movement_bin_positions))
                if movement_bin_positions.shape[0] > 0:
                    last_valid_xy = movement_bin_positions[-1:].copy()
                #movement_bin_positions_shifted = FeatureExtractionMixin.create_shifted_array(data=movement_bin_positions, periods=1)
                #movement_df = pd.DataFrame(FeatureExtractionMixin.framewise_euclidean_distance(location_1=movement_bin_positions.astype(np.float64), location_2=movement_bin_positions_shifted.astype(np.float64), px_per_mm=np.float64(px_per_mm), centimeter=True), columns=["VALUE"])
                movement_data, velocity_data = (FeatureExtractionSupplemental.distance_and_velocity(x=movement_bin_positions, fps=fps, pixels_per_mm=px_per_mm, centimeters=True))
                if distance:
                    video_results.append({"VIDEO": video_name,"TIME BIN #": bin, "START TIME": bin_times[0], "END TIME": bin_times[1], "ANIMAL": name,"BODY-PART": animal_bps[0][:-2],"MEASUREMENT": "Movement (cm)","VALUE": movement_data})
                if velocity:
                    video_results.append({"VIDEO": video_name,"TIME BIN #": bin, "START TIME": bin_times[0], "END TIME": bin_times[1], "ANIMAL": name,"BODY-PART": animal_bps[0][:-2],"MEASUREMENT": "Velocity (cm/s)","VALUE": velocity_data})
        results = pd.DataFrame(video_results).reset_index(drop=True)
        batch_results.append(results)
        video_timer.stop_timer()
        if verbose:
            stdout_information(msg=f"Time-bin movement calculations for video {video_name} complete (elapsed time: {video_timer.elapsed_time_str}s)...")

    return batch_results, video_dict, movement_dict


[docs]class TimeBinsMovementCalculatorMultiprocess(ConfigReader, FeatureExtractionMixin): """ Computes aggregate movement statistics in user-defined time-bins using multiprocessing for improved performance. .. note:: `Tutorial <https://github.com/sgoldenlab/simba/blob/master/docs/Scenario2.md#part-4--analyze-machine-results>`__. .. note:: On macOS (Darwin), multiprocessing start method is automatically set to 'spawn' for compatibility. .. image:: _static/img/TimeBinsMovementCalculator.png :alt: Time Bins Movement Calculator :width: 500 :align: center .. seealso:: For single core class, see :func:`simba.data_processors.timebins_movement_calculator.TimeBinsMovementCalculator`. :param Union[str, os.PathLike] config_path: Path to SimBA project config file in Configparser format. :param Union[int, float] bin_length: Time bin size in seconds. Must be greater than 0. :param List[str] body_parts: List of body part names to calculate movement for (e.g., ['Nose_1', 'Nose_2']). Body parts must exist in the project's body part configuration. :param Optional[List[Union[str, os.PathLike]]] data_path: Optional list of specific file paths to process. If None, processes all files in the project's outlier corrected directory. Can also be a single file path or a directory path containing CSV files. :param Optional[bool] plots: If True, creates time-bin line plots representing the movement in each time-bin per video. Results are saved in the ``project_folder/logs/`` sub-directory. Default: False. :param bool verbose: If True, prints progress information during processing. Default: True. :param int core_cnt: Number of CPU cores to use for multiprocessing. If -1, uses all available cores. If greater than available cores, uses all available cores. Must be greater than 0. Default: -1. :example: >>> calculator = TimeBinsMovementCalculatorMultiprocess( ... config_path='/Users/simon/Desktop/envs/simba/troubleshooting/two_black_animals_14bp/project_folder/project_config.ini', ... bin_length=5.0, ... body_parts=['Nose_1', 'Nose_2'], ... plots=True, ... core_cnt=4 ... ) >>> calculator.run() >>> calculator.save() """ def __init__(self, config_path: Union[str, os.PathLike], bin_length: Union[int, float], body_parts: Union[List[str], Tuple[str]], data_path: Optional[Union[List[Union[str, os.PathLike]], str, os.PathLike]] = None, plots: bool = False, verbose: bool = True, core_cnt: int = -1, distance: bool = True, velocity: bool = True, threshold: float = 0.00, transpose: bool = False, include_timestamp: bool = False): ConfigReader.__init__(self, config_path=config_path) log_event(logger_name=str(self.__class__.__name__), log_type=TagNames.CLASS_INIT.value, msg=self.create_log_msg_from_init_args(locals=locals()),) check_float(name=f"{self.__class__.__name__} TIME BIN", value=bin_length, min_value=10e-6) if isinstance(body_parts, list): check_valid_lst(data=body_parts, source=f'{self.__class__.__name__} body_parts', min_len=1, valid_dtypes=(str,), valid_values=self.body_parts_lst) elif isinstance(body_parts, tuple): check_valid_tuple(x=body_parts, source=f'{self.__class__.__name__} body_parts', minimum_length=1, valid_dtypes=(str,), accepted_values=self.body_parts_lst) else: raise InvalidInputError(msg='Body-parts has to be a list of tuple of strings', source=f'{self.__class__.__name__} body_parts') if data_path is None: if len(self.outlier_corrected_paths) == 0: raise NoDataError(msg=f'No data files found in {self.outlier_corrected_dir}', source=self.__class__.__name__) self.file_paths = self.outlier_corrected_paths elif os.path.isfile(data_path): self.file_paths = [data_path] elif os.path.isdir(data_path): self.file_paths = find_files_of_filetypes_in_directory(directory=self.file_paths, extensions=('.csv',), raise_warning=False, raise_error=True, as_dict=False) else: self.file_paths = data_path check_valid_boolean(value=[plots], source=f'{self.__class__.__name__} plots', raise_error=True) check_valid_boolean(value=[verbose], source=f'{self.__class__.__name__} verbose', raise_error=True) check_valid_boolean(value=distance, source=f'{self.__class__.__name__} distance', raise_error=True) check_valid_boolean(value=velocity, source=f'{self.__class__.__name__} velocity', raise_error=True) check_valid_boolean(value=transpose, source=f'{self.__class__.__name__} transpose', raise_error=True) check_valid_boolean(value=include_timestamp, source=f'{self.__class__.__name__} include_timestamp', raise_error=True) check_int(f'{self.__class__.__name__} core_cnt', value=core_cnt, min_value=-1, unaccepted_vals=[0], raise_error=True) check_float(name=f'{self.__class__.__name__} threshold', value=threshold, allow_negative=False) self.core_cnt = find_core_cnt()[0] if core_cnt == -1 or core_cnt > find_core_cnt()[0] else core_cnt self.verbose, self.distance, self.velocity, self.transpose, self.include_timestamp, self.threshold = verbose, distance, velocity, transpose, include_timestamp, threshold if not distance and not velocity: raise InvalidInputError(msg='distance AND velocity are both False. To compute movement metrics, set at least one value to True.', source=self.__class__.__name__) self.col_headers, self.bp_dict = [], {} for bp_cnt, bp in enumerate(body_parts): self.col_headers.extend((f"{bp}_x", f"{bp}_y", f'{bp}_p')) animal_name = self.find_animal_name_from_body_part_name(bp_name=bp, bp_dict=self.animal_bp_dict) self.bp_dict[bp_cnt] = {animal_name: [f"{bp}_x", f"{bp}_y", f'{bp}_p']} self.animal_combinations = list(itertools.combinations(self.animal_bp_dict, 2)) self.bin_length, self.plots = bin_length, plots if platform.system() == "Darwin": multiprocessing.set_start_method("spawn", force=True) if verbose: stdout_information(msg=f"Processing {len(self.file_paths)} video(s) for time-bins movement data...") def __create_plots(self): timer = SimbaTimer(start=True) stdout_information(msg="Creating time-bin movement plots...") plots_dir = os.path.join( self.project_path, "logs", f"time_bin_movement_plots_{self.datetime}") create_directory(paths=plots_dir, overwrite=True) y_max = -np.inf for video_name in self.results["VIDEO"].unique(): video_df = self.results.loc[(self.results["VIDEO"] == video_name) & (self.results["MEASUREMENT"] == "Movement (cm)")] for body_part in video_df["BODY-PART"].unique(): body_part_df = (video_df[video_df["BODY-PART"] == body_part].reset_index(drop=True).sort_values(by=["TIME BIN #"])) body_part_df["VALUE"] = body_part_df["VALUE"].astype(float) y_max = max(y_max, np.max(body_part_df["VALUE"])) for video_name in self.results["VIDEO"].unique(): video_df = self.results.loc[(self.results["VIDEO"] == video_name) & (self.results["MEASUREMENT"] == "Movement (cm)")] video_df["TIME BIN #"] = video_df["TIME BIN #"].astype(int) for body_part in video_df["BODY-PART"].unique(): body_part_df = (video_df[video_df["BODY-PART"] == body_part] .reset_index(drop=True) .sort_values(by=["TIME BIN #"])) body_part_df[f"Time bin # (bin length {self.bin_length}s)"] = (body_part_df["TIME BIN #"]) body_part_df["VALUE"] = body_part_df["VALUE"].astype(float) _ = PlottingMixin.make_line_plot(data=[body_part_df["VALUE"].astype(float).values], colors=['Green'], save_path=os.path.join(plots_dir, f"{video_name}_{body_part}.png"), title=video_name, y_max=int(y_max), line_opacity=0.8, y_lbl="DISTANCE (CM)", x_lbl=f"TIME BIN # (BIN LENGTH {self.bin_length}s)", x_tick_lbls_as_int=True, y_tick_lbls_as_int=True, x_tick_cnt=body_part_df["VALUE"].astype(float).values.shape[0]+1) timer.stop_timer() stdout_success(msg=f"Time bin movement plots saved in {plots_dir}", elapsed_time=timer.elapsed_time_str, source=self.__class__.__name__)
[docs] def run(self): self.video_dict, self.movement_dict, self.out_df_lst = {}, {}, [] self.save_path = os.path.join( self.project_path, "logs", f"Time_bins_{self.bin_length}s_movement_results_{self.datetime}.csv") check_all_file_names_are_represented_in_video_log(video_info_df=self.video_info_df, data_paths=self.file_paths) split_data_paths = [self.file_paths[i * len(self.file_paths) // self.core_cnt: (i + 1) * len(self.file_paths) // self.core_cnt] for i in range(self.core_cnt)] split_data_paths = [(x, i) for x, i in enumerate(split_data_paths)] pool = get_cpu_pool(core_cnt=self.core_cnt, verbose=True, source=self.__class__.__name__) constants = functools.partial(_time_bin_movement_helper, bin_length=self.bin_length, verbose=self.verbose, video_info_df=self.video_info_df, bp_headers=self.col_headers, bp_dict=self.bp_dict, threshold=self.threshold, distance=self.distance, velocity=self.velocity) for cnt, result in enumerate(pool.imap(constants, split_data_paths, chunksize=self.multiprocess_chunksize)): results_df, video_dict, movement_dict = result self.out_df_lst.append(results_df) self.movement_dict.update(movement_dict) self.video_dict.update(video_dict) print(self.velocity) self.out_df_lst = [item for sub in self.out_df_lst for item in sub] terminate_cpu_pool(pool=pool, verbose=True, source=self.__class__.__name__)
[docs] def save(self): self.results = pd.concat(self.out_df_lst, axis=0).sort_values(by=["VIDEO", "TIME BIN #", "MEASUREMENT", "ANIMAL"])[["VIDEO", "TIME BIN #", "START TIME", "END TIME", "ANIMAL", "BODY-PART", "MEASUREMENT", "VALUE"]] if not self.include_timestamp: self.results = self.results.drop(["START TIME", "END TIME"], axis=1) if self.plots: self.__create_plots() if self.transpose: self.results = self.results.pivot_table(index=["VIDEO", "ANIMAL", "BODY-PART", "MEASUREMENT"], columns="TIME BIN #", values="VALUE").reset_index() self.results.set_index("VIDEO").to_csv(self.save_path) self.timer.stop_timer() stdout_success(msg=f"Movement time-bins results for {len(self.file_paths)} videos ({self.bin_length}s bin size) saved at {self.save_path}", elapsed_time=self.timer.elapsed_time_str, source=self.__class__.__name__)
# if __name__ == "__main__": # test = TimeBinsMovementCalculatorMultiprocess(config_path=r"/Users/simon/Desktop/envs/simba/troubleshooting/mitra/project_folder/project_config.ini", # body_parts=['Nose'], #['Simon CENTER OF GRAVITY', 'JJ CENTER OF GRAVITY', 'Animal_1 CENTER OF GRAVITY'] # bin_length=0.5, # plots=True) # test.run() # test.save() # # if __name__ == "__main__": # test = TimeBinsMovementCalculatorMultiprocess(config_path=r"E:\troubleshooting\mitra_pbn\mitra_pbn\project_folder\project_config.ini", # bin_length=60, # plots=False, # core_cnt=10, # velocity=False, # transpose=True, # threshold=0.784, # body_parts=['center']) # test.run() # test.save()