Source code for simba.data_processors.pup_retrieval_calculator

__author__ = "Simon Nilsson; sronilsson@gmail.com"

import glob
import os
from datetime import datetime
from typing import Dict, Union

import pandas as pd
import seaborn as sns

from simba.mixins.config_reader import ConfigReader
from simba.utils.checks import check_if_filepath_list_is_empty
from simba.utils.data import detect_bouts
from simba.utils.enums import ConfigKey, Dtypes, Paths
from simba.utils.read_write import (get_fn_ext, read_config_entry,
                                    read_config_file, read_df,
                                    read_project_path_and_file_type,
                                    read_video_info_csv)


[docs]class PupRetrieverCalculator(ConfigReader): """ Pup retreival calculator used in ``Winters et al., `Sci Reports`, 2022`` :param str config_path: path to SimBA project config file in Configparser format :param dict settings: user-defined setting for pup retrieval. .. note:: `Documentation <https://github.com/sgoldenlab/simba/blob/master/docs/add_on_pup_ret.md>`_ .. image:: _static/img/pup_retrieval.webp :alt: Pup retrieval :width: 400 :align: center :examples: >>> settings = {'pup_track_p': 0.025, 'dam_track_p': 0.5, 'start_distance_criterion': 80.0, 'carry_frames': 90.0, 'core_nest': 'corenest', 'nest': 'nest', 'dam_name': '1_mother', 'pup_name': '2_pup', 'smooth_function': 'gaussian', 'smooth_factor': 5, 'max_time': 90.0, 'clf_carry': 'carry', 'clf_approach': 'approach', 'clf_dig': 'digging', 'distance_plots': True, 'log': True, 'swarm_plot': True} >>> config_path = '/Users/simon/Downloads/Automated PRT_test/project_folder/project_config.ini' >>> calculator = PupRetrieverCalculator(config_path=config_path, settings=settings) >>> calculator.run() :references: .. [1] Winters, Carmen, Wim Gorssen, Victoria A. Ossorio-Salazar, Simon Nilsson, Sam Golden, and Rudi Dโ€™Hooge. โ€œAutomated Procedure to Assess Pup Retrieval in Laboratory Mice.โ€ Scientific Reports 12, no. 1 (2022): 1663. https://doi.org/10.1038/s41598-022-05641-w. """ def __init__(self, config_path: str, settings: Dict[str, Union[float, str, bool]]): ConfigReader.__init__(config_path=config_path) self.config = read_config_file(config_path=config_path) self.project_path, self.file_type = read_project_path_and_file_type( config=self.config ) self.settings, self.datetime = settings, datetime.now().strftime("%Y%m%d%H%M%S") self.animal_cnt = read_config_entry( self.config, ConfigKey.GENERAL_SETTINGS.value, ConfigKey.ANIMAL_CNT.value, Dtypes.INT.value, ) self.clf_lst = [ settings["clf_approach"], settings["clf_carry"], settings["clf_dig"], ] self.distance_pup_core_field = ( f'{self.settings["core_nest"]} {self.settings["pup_name"]} {"distance"}' ) self.distance_dam_core_field = ( f'{self.settings["core_nest"]} {self.settings["dam_name"]} {"distance"}' ) self.pup_in_core_field = ( f'{self.settings["core_nest"]} {self.settings["pup_name"]} {"in zone"}' ) self.pup_in_nest_field = ( f'{self.settings["nest"]} {self.settings["pup_name"]} {"in zone"}' ) machine_results_path = os.path.join( self.project_path, Paths.MACHINE_RESULTS_DIR.value ) self.logs_dir_path = os.path.join(self.project_path, "logs") self.data_files = glob.glob(machine_results_path + "/*." + self.file_type) check_if_filepath_list_is_empty( filepaths=self.data_files, error_msg="SIMBA ERROR: NO FILES FOUND IN {}".format(machine_results_path), ) self.vid_info_df = read_video_info_csv( os.path.join(self.project_path, Paths.VIDEO_INFO.value) ) def __get_max_frames(self): self.max_frames = int(self.fps * self.settings["max_time"]) if self.max_frames < len(self.data_df): self.data_df = self.data_df.head(self.max_frames) def __check_column_names(self): for c in [ self.distance_pup_core_field, self.distance_dam_core_field, self.pup_in_core_field, self.pup_in_nest_field, ]: if c not in self.data_df.columns: print( f"SIMBA ERROR: Could not find anticipated column named {c} in {self.file_path}" ) raise ValueError()
[docs] def correct_in_nest_frames(self): if 1 in self.data_df[self.pup_in_nest_field].values: for nest_frame in self.data_df[ self.data_df[self.pup_in_nest_field] == 1 ].index.tolist(): sliced_df = ( self.data_df[self.settings["clf_carry"]] .loc[nest_frame - self.carry_frames + 1 : nest_frame] .tolist() ) if sum(sliced_df) == 0: self.data_df.at[nest_frame, self.pup_in_nest_field] = 0 else: break
def __create_log(self): log = { "Datetime": self.datetime, "Videos_#": len(self.data_files), **self.settings, } log_df = pd.DataFrame.from_dict(log, orient="index").rename( columns={0: "VALUES"} ) log_save_path = os.path.join( self.logs_dir_path, f"Log_pup_retrieval_{self.datetime}" ) log_df.to_csv(log_save_path) print("Pup retreival log saved at {}...".format(log_save_path)) def __generate_figure( self, data: pd.DataFrame, y_col: str, x_lbl: str, y_lbl: str, title: str, hue: str, video_name: str, ): current_figure = sns.scatterplot( x=data.index, y=data[y_col], hue=data[hue], legend=False, palette="Set1" ) current_figure.set(xlabel=x_lbl, ylabel=y_lbl, title=title) save_plot_name = f"{title} {video_name} {self.datetime}.png" save_plot_path = os.path.join(self.logs_dir_path, video_name) if not os.path.exists(save_plot_path): os.makedirs(save_plot_path) image_save_path = os.path.join(save_plot_path, save_plot_name) current_figure.figure.savefig(image_save_path, bbox_inches="tight") current_figure.clear() def __create_swarm_plot(self): figure_df = self.out_df.copy() figure_df["Experiment"] = 1 swarm_plot = sns.swarmplot( x="Experiment", y="PUP IN NEST (S)", data=figure_df, color="grey" ) swarm_plot.set( xlabel="", ylabel="Pup in nest (s)", title="Summary - pup retrieval time (s)", ) swarm_plot_name = f"Summary_pup_retrieval_times_{self.datetime}.png" save_plot_path = os.path.join(self.logs_dir_path, swarm_plot_name) swarm_plot.figure.savefig(save_plot_path, bbox_inches="tight") swarm_plot.clear() print(f"Swarm plot saved @ {save_plot_path}...")
[docs] def run(self): self.out = [] for file_cnt, file_path in enumerate(self.data_files): self.results = {} self.file_path = file_path _, video_name, _ = get_fn_ext(filepath=file_path) _, _, self.fps = self.read_video_info(video_name=video_name) self.data_df = read_df( file_path=file_path, file_type=self.file_type ).fillna(method="ffill") self.__get_max_frames() self.carry_frames = int(self.fps * self.settings["carry_time"]) self.data_df["mean_p_mother"] = self.data_df[ self.animal_bp_dict[self.settings["dam_name"]]["P_bps"] ].mean(axis=1) self.data_df["pup_p_mean"] = self.data_df[ self.animal_bp_dict[self.settings["pup_name"]]["P_bps"] ].mean(axis=1) self.data_df["cumsum_nest_pup"] = self.data_df[ self.pup_in_nest_field ].cumsum() if self.settings["distance_plots"]: self.__generate_figure( data=self.data_df, y_col=self.distance_dam_core_field, x_lbl="frame number", y_lbl="distance (mm)", title="distance between mother and corenest - BEFORE pre-processing", hue="cumsum_nest_pup", video_name=video_name, ) self.__generate_figure( data=self.data_df, y_col=self.distance_pup_core_field, x_lbl="frame number", y_lbl="distance (mm)", title="distance between pup and corenest - BEFORE pre-processing", hue="cumsum_nest_pup", video_name=video_name, ) for clf in self.clf_lst: self.data_df.loc[ self.data_df["mean_p_mother"] < self.settings["dam_track_p"], clf ] = 0 first_row = self.data_df[ self.data_df[self.distance_pup_core_field] > self.settings["start_distance_criterion"] ].index[0] self.data_df.loc[0:first_row, self.distance_pup_core_field] = ( self.data_df.loc[first_row, self.distance_pup_core_field] ) self.data_df.loc[0:first_row, self.pup_in_core_field] = 0 self.data_df.loc[0:first_row, self.pup_in_nest_field] = 0 self.correct_in_nest_frames() rows_with_low_mean_pup_prob = self.data_df[ self.data_df["pup_p_mean"] < self.settings["pup_track_p"] ].index.tolist() self.data_df.loc[rows_with_low_mean_pup_prob, self.pup_in_core_field] = 0 self.data_df.loc[rows_with_low_mean_pup_prob, self.pup_in_nest_field] = 0 if self.settings["smooth_function"] == "gaussian": self.data_df[self.distance_pup_core_field] = ( self.data_df[self.distance_pup_core_field] .rolling(window=int(self.fps), win_type="gaussian", center=True) .mean(std=self.settings["smooth_factor"]) .fillna(self.data_df[self.distance_pup_core_field]) ) self.data_df[self.distance_dam_core_field] = ( self.data_df[self.distance_dam_core_field] .rolling(window=int(self.fps), win_type="gaussian", center=True) .mean(std=self.settings["smooth_factor"]) .fillna(self.data_df[self.distance_dam_core_field]) ) if self.settings["distance_plots"]: self.__generate_figure( data=self.data_df, y_col=self.distance_dam_core_field, x_lbl="frame number", y_lbl="distance (mm)", title="distance between mother and corenest - AFTER pre-processing", hue="cumsum_nest_pup", video_name=video_name, ) self.__generate_figure( data=self.data_df, y_col=self.distance_pup_core_field, x_lbl="frame number", y_lbl="distance (mm)", title="distance between pup and core-nest - AFTER pre-processing", hue="cumsum_nest_pup", video_name=video_name, ) closest_dist_between_pup_and_zone = round( self.data_df[self.distance_pup_core_field].min(), 3 ) if 1 in self.data_df[self.pup_in_nest_field].values: frame_when_pup_is_in_zone = self.data_df[ self.data_df[self.pup_in_nest_field] == 1 ].index.min() time_seconds_until_zone = round(frame_when_pup_is_in_zone / self.fps, 3) reason_zone = "Pup in nest" else: frame_when_pup_is_in_zone = len(self.data_df) time_seconds_until_zone = round(frame_when_pup_is_in_zone / self.fps, 3) reason_zone = "Pup not retrieved" if 1 in self.data_df[self.pup_in_core_field].values: frame_when_pup_is_in_core_nest = self.data_df[ self.data_df[self.pup_in_core_field] == 1 ].index.min() time_seconds_until_corenest = round( frame_when_pup_is_in_core_nest / self.fps, 3 ) reason_corenest = "Pup in core-nest" else: frame_when_pup_is_in_core_nest = len(self.data_df) time_seconds_until_corenest = round( frame_when_pup_is_in_core_nest / self.fps, 3 ) reason_corenest = "Pup not in core-nest" latencies, total_times, before_retrieval_time = {}, {}, {} for clf in self.clf_lst: total_times[clf] = round(self.data_df[clf].sum() / self.fps, 3) before_retrieval_time[clf] = round( self.data_df.loc[0:frame_when_pup_is_in_zone, clf].sum() / self.fps, 3, ) latencies[clf] = round( self.data_df[self.data_df[clf] == 1].index.min() / self.fps, 3 ) event_counter, time_between_events, mean_duration = {}, {}, {} bouts = detect_bouts( data_df=self.data_df, target_lst=self.clf_lst, fps=self.fps ) for clf in self.clf_lst: clf_bouts = bouts[bouts["Event"] == clf].reset_index(drop=True) event_counter[clf] = len(clf_bouts) mean_duration[clf] = round(clf_bouts["Bout_time"].mean() / self.fps, 3) clf_bouts["Start_time"] = clf_bouts["Start_time"].shift(-1) clf_bouts.drop(clf_bouts.tail(1).index, inplace=True) clf_bouts["TIME BETWEEN"] = ( clf_bouts["Start_time"] - clf_bouts["End Time"] ) time_between_events[clf] = round( clf_bouts["TIME BETWEEN"].mean() / self.fps, 3 ) before_enter_core_df = self.data_df.loc[ 0 : frame_when_pup_is_in_core_nest - 1 ] before_core_bouts = detect_bouts( data_df=before_enter_core_df, target_lst=self.clf_lst, fps=self.fps ) event_counter_before_corenest, mean_event_length_before_corenest = {}, {} for clf in self.clf_lst: clf_bouts = before_core_bouts[ before_core_bouts["Event"] == clf ].reset_index(drop=True) mean_event_length_before_corenest[clf] = round( clf_bouts["Bout_time"].mean(), 3 ) event_counter_before_corenest[clf] = len(clf_bouts) retrieval_frame = self.data_df[ self.data_df[self.pup_in_nest_field] == 1 ].index.min() retrieval_frame = 160 first_approach = self.data_df[ self.data_df[self.settings["clf_approach"]] == 1 ].index.min() dig_bouts = detect_bouts( data_df=before_enter_core_df, target_lst=[self.settings["clf_dig"]], fps=self.fps, ) dig_bouts_in_window = dig_bouts.loc[ (dig_bouts["End_frame"] > first_approach) & (dig_bouts["End_frame"] < retrieval_frame) ] dig_bouts_in_window_seconds = round( dig_bouts_in_window["Bout_time"].sum(), 3 ) self.results["VIDEO"] = video_name self.results["PUP IN NEST (FRAME)"] = frame_when_pup_is_in_zone self.results["PUP IN NEST (S)"] = time_seconds_until_zone self.results["MINIMUM DISTANCE (PUP TO CORENEST)"] = ( closest_dist_between_pup_and_zone ) self.results["REASON (PUP IN NEST)"] = reason_zone self.results["PUP IN CORE-NEST (FRAME)"] = frame_when_pup_is_in_core_nest self.results["PUP IN CORE-NEST (S)"] = time_seconds_until_corenest self.results["REASON (PUP IN CORE-NEST)"] = reason_corenest for clf in self.clf_lst: self.results[clf + " (TOTAL TIME)"] = total_times[clf] self.results[clf + " (BEFORE RETRIEVAL)"] = ( event_counter_before_corenest[clf] ) self.results[clf + " (LATENCY TO FIRST EVENT)"] = latencies[clf] self.results[clf + " (EVENT COUNT)"] = event_counter[clf] self.results[clf + " (MEAN DURATION)"] = mean_duration[clf] self.results[clf + " (MEAN INTERVAL)"] = time_between_events[clf] self.results["DIG TIME AFTER APPROACH AND BEFORE RETRIEVAL (S)"] = ( dig_bouts_in_window_seconds ) self.results["DIG EVENTS AFTER APPROACH"] = len(dig_bouts_in_window) self.results["MEAN DIG DURATION AFTER APPROACH"] = round( dig_bouts_in_window["Bout_time"].mean(), 3 ) self.out.append(pd.DataFrame.from_dict(self.results, orient="index").T) self.out_df = pd.concat(self.out, axis=0).reset_index(drop=True) if self.settings["log"]: self.__create_log() if self.settings["swarm_plot"]: self.__create_swarm_plot()
[docs] def save_results(self): file_path = os.path.join( self.logs_dir_path, f"Log_pup_retrieval_{self.datetime}.csv" ) self.out_df.to_csv(file_path) print(f"SIMBA COMPLETE: Summary data saved at {file_path}.")
# settings = {'pup_track_p': 0.025, 'dam_track_p': 0.5, 'start_distance_criterion': 80.0, 'carry_frames': 90.0, # 'core_nest': 'corenest', 'nest': 'nest', 'dam_name': '1_mother', 'pup_name': '2_pup', # 'smooth_function': 'gaussian', 'smooth_factor': 5, 'max_time': 90.0, 'clf_carry': 'carry', # 'clf_approach': 'approach', 'clf_dig': 'digging', 'distance_plots': True, 'log': True, 'swarm_plot': True} # config_path = '/Users/simon/Downloads/Automated PRT_test/project_folder/project_config.ini' # # test = PupRetrieverCalculator(config_path=config_path, settings=settings) # test.run()