Source code for simba.data_processors.timebins_clf_calculator

__author__ = "Simon Nilsson; sronilsson@gmail.com"

import os
from collections import defaultdict
from copy import deepcopy
from typing import List, Optional, Union

import numpy as np
import pandas as pd

try:
    from typing import Literal
except:
    from typing_extensions import Literal

from simba.mixins.config_reader import ConfigReader
from simba.utils.checks import (
    check_all_file_names_are_represented_in_video_log,
    check_file_exist_and_readable, check_int, check_that_column_exist,
    check_valid_boolean, check_valid_lst)
from simba.utils.data import detect_bouts
from simba.utils.errors import NoDataError
from simba.utils.printing import stdout_information, stdout_success
from simba.utils.read_write import (find_files_of_filetypes_in_directory,
                                    find_time_stamp_from_frame_numbers,
                                    get_fn_ext, read_df)

FIRST_OCCURRENCE = "First occurrence (s)"
EVENT_COUNT = "Event count"
TOTAL_EVENT_DURATION = "Total event duration (s)"
MEAN_EVENT_DURATION = "Mean event duration (s)"
MEDIAN_EVENT_DURATION = "Median event duration (s)"
MEAN_EVENT_INTERVAL = "Mean event interval (s)"
MEDIAN_EVENT_INTERVAL = "Median event interval (s)"
START_TIME = "START TIME"
END_TIME = "END TIME"
MEASUREMENT = 'MEASUREMENT'
CLASSIFIER = 'CLASSIFIER'
TIME_BIN_ID = 'TIME BIN #'
VIDEO = "VIDEO"
MEASUREMENT_NAMES = [FIRST_OCCURRENCE, EVENT_COUNT, TOTAL_EVENT_DURATION, MEAN_EVENT_DURATION, MEDIAN_EVENT_DURATION, MEAN_EVENT_INTERVAL, MEDIAN_EVENT_INTERVAL]

[docs]class TimeBinsClfCalculator(ConfigReader): """ Computes aggregate classification results in user-defined time-bins. Results are stored in the ``project_folder/logs`` directory of the SimBA project. :param Union[str, os.PathLike] config_path: Path to SimBA project config file in Configparser format. :param int bin_length: Integer representing the time bin size in seconds. :param List[str] classifiers: Names of classifiers to calculate aggregate statistics in time-bins for. EXAMPLE: ['Attack', 'Sniffing'] :param Optional[Union[str, os.PathLike]] data_path: Optional path to directory containing CSV files or single CSV file. If None, uses machine results from project. Default: None. :param bool first_occurrence: If True, calculate first occurrence time for each classifier in each time bin. Default: False. :param bool event_count: If True, calculate event count for each classifier in each time bin. Default: False. :param bool total_event_duration: If True, calculate total event duration for each classifier in each time bin. Default: True. :param bool mean_event_duration: If True, calculate mean event duration for each classifier in each time bin. Default: False. :param bool median_event_duration: If True, calculate median event duration for each classifier in each time bin. Default: False. :param bool mean_interval_duration: If True, calculate mean interval duration between events for each classifier in each time bin. Default: False. :param bool median_interval_duration: If True, calculate median interval duration between events for each classifier in each time bin. Default: False. :param bool include_timestamp: If True, include START TIME and END TIME (in HH:MM:SS format) columns in output. Default: False. :param bool transpose: If True, transpose results with MultiIndex columns (CLASSIFIER, TIME BIN #, MEASUREMENT) so one video per row Default: False. .. note:: `Tutorial <https://github.com/sgoldenlab/simba/blob/master/docs/Scenario2.md#part-4--analyze-machine-results>`__. :example: >>> timebin_clf_analyzer = TimeBinsClfCalculator(config_path='MyConfigPath', bin_length=15, classifiers=['Attack', 'Sniffing'], event_count=True, total_event_duration=True) >>> timebin_clf_analyzer.run() >>> timebin_clf_analyzer.save() """ def __init__(self, config_path: Union[str, os.PathLike], bin_length: int, classifiers: List[str], data_path: Optional[Union[str, os.PathLike]] = None, first_occurrence: bool = False, event_count: bool = False, total_event_duration: bool = True, mean_event_duration: bool = False, median_event_duration: bool = False, mean_interval_duration: bool = False, median_interval_duration: bool = False, include_timestamp: bool = False, transpose: bool = False): super().__init__(config_path=config_path) check_file_exist_and_readable(file_path=config_path) check_int(name=f'{self.__class__.__name__} bin_length', value=bin_length, min_value=1) check_valid_lst(data=classifiers, source=f'{self.__class__.__name__} classifiers', valid_dtypes=(str,), valid_values=self.clf_names, min_len=1) if data_path is None: if len(self.machine_results_paths) == 0: raise NoDataError(msg=f'No data files found in {self.machine_results_dir} directory. Get classification data before analyzing classification time-bin data.',source=self.__class__.__name__) self.data_paths = deepcopy(self.machine_results_paths) elif os.path.isdir(data_path): self.data_paths = find_files_of_filetypes_in_directory(directory=data_path, extensions=('.csv',), raise_warning=False, raise_error=True, as_dict=False) elif os.path.isfile(data_path): self.data_paths = [data_path] for file_path in self.data_paths: check_file_exist_and_readable(file_path=file_path, raise_error=True) self.clfs, self.bin_length = classifiers, bin_length self.event_count, self.total_event_duration, self.first_occurrence = event_count, total_event_duration, first_occurrence self.mean_event_duration, self.median_event_duration, self.transpose = mean_event_duration, median_event_duration, transpose self.mean_interval_duration, self.median_interval_duration, self.include_timestamp = mean_interval_duration, median_interval_duration, include_timestamp check_valid_boolean(value=include_timestamp, source=f'{self.__class__.__name__} include_timestamp', raise_error=True) check_valid_boolean(value=transpose, source=f'{self.__class__.__name__} transpose', raise_error=True) self.measurements = [] for i, j in zip([first_occurrence, event_count, total_event_duration, mean_event_duration, median_event_duration, mean_interval_duration, median_interval_duration], MEASUREMENT_NAMES): check_valid_boolean(value=i, source=f'{self.__class__.__name__} {j}', raise_error=True) if i: self.measurements.append(j) def _reformat_results(self): self.out_df_lst = [] for video_name, video_info in self.video_dict.items(): for bin_number, bin_data in video_info.items(): start_time, end_time = bin_data[START_TIME], bin_data[END_TIME] data_df = (pd.DataFrame.from_dict(bin_data).reset_index().rename(columns={"index": MEASUREMENT})) data_df = pd.melt(data_df, id_vars=[MEASUREMENT]).rename(columns={"value": "VALUE", "variable": CLASSIFIER}) if self.include_timestamp and not self.transpose: data_df.insert(loc=0, column=START_TIME, value=start_time) data_df.insert(loc=0, column=END_TIME, value=start_time) data_df.insert(loc=0, column=TIME_BIN_ID, value=bin_number) data_df.insert(loc=0, column=VIDEO, value=video_name) self.out_df_lst.append(data_df) self.out_df = pd.concat(self.out_df_lst, axis=0).sort_values(by=[VIDEO, TIME_BIN_ID]) self.out_df = self.out_df[self.out_df[MEASUREMENT].isin(self.measurements)] self.out_df = self.out_df[self.out_df[CLASSIFIER].isin(self.clfs)] if self.transpose: self.out_df["mi"] = list(zip(self.out_df[CLASSIFIER], self.out_df[TIME_BIN_ID].astype(int), self.out_df[MEASUREMENT])) tmp = self.out_df.pivot_table(index="VIDEO", columns="mi", values="VALUE", aggfunc="first") tmp.columns = pd.MultiIndex.from_tuples(tmp.columns, names=[CLASSIFIER, TIME_BIN_ID, MEASUREMENT]) tmp = tmp.sort_index(axis=1) self.out_df = tmp.reset_index() else: self.out_df = self.out_df.set_index(VIDEO)
[docs] def run(self): self.video_dict = {} check_all_file_names_are_represented_in_video_log(video_info_df=self.video_info_df, data_paths=self.data_paths) for file_cnt, file_path in enumerate(self.data_paths): _, file_name, _ = get_fn_ext(file_path) self.video_dict[file_name] = {} stdout_information(msg=f'Analyzing classification in time-bins ({self.bin_length}s) for video {file_name} ({file_cnt+1}/{len(self.data_paths)}') data_df = read_df(file_path, self.file_type) check_that_column_exist(df=data_df, column_name=self.clfs, file_name=file_path) video_settings, px_per_mm, fps = self.read_video_info(video_name=file_name) bin_frame_length = max(1, int(self.bin_length * fps)) splits = np.arange(0, data_df.shape[0], bin_frame_length) data_df_lst = [data_df.iloc[start: start + bin_frame_length] for start in splits] for bin_cnt, df in enumerate(data_df_lst): self.video_dict[file_name][bin_cnt] = {} bin_times = find_time_stamp_from_frame_numbers(start_frame=int(bin_frame_length * bin_cnt),end_frame=min(int(bin_frame_length * (bin_cnt + 1)), len(data_df)), fps=fps) bouts_df = detect_bouts(data_df=df, target_lst=list(self.clf_names), fps=fps) bouts_df["Shifted start"] = bouts_df["Start_time"].shift(-1) bouts_df["Interval duration"] = (bouts_df["Shifted start"] - bouts_df["End Time"]) for clf in self.clf_names: self.video_dict[file_name][bin_cnt][clf] = defaultdict(list) self.video_dict[file_name][bin_cnt][START_TIME], self.video_dict[file_name][bin_cnt][END_TIME] = bin_times[0], bin_times[1] bout_df = bouts_df.loc[bouts_df["Event"] == clf] if len(bouts_df) > 0: self.video_dict[file_name][bin_cnt][clf][FIRST_OCCURRENCE] = (round(bout_df["Start_time"].min(), 3)) self.video_dict[file_name][bin_cnt][clf][EVENT_COUNT] = len(bout_df) self.video_dict[file_name][bin_cnt][clf][TOTAL_EVENT_DURATION] = round(bout_df["Bout_time"].sum(), 3) self.video_dict[file_name][bin_cnt][clf][MEAN_EVENT_DURATION] = round(bout_df["Bout_time"].mean(), 3) self.video_dict[file_name][bin_cnt][clf][MEDIAN_EVENT_DURATION] = round(bout_df["Bout_time"].median(), 3) else: self.video_dict[file_name][bin_cnt][clf][FIRST_OCCURRENCE] = None self.video_dict[file_name][bin_cnt][clf][EVENT_COUNT] = 0 self.video_dict[file_name][bin_cnt][clf][TOTAL_EVENT_DURATION] = 0 self.video_dict[file_name][bin_cnt][clf][MEAN_EVENT_DURATION] = 0 self.video_dict[file_name][bin_cnt][clf][MEDIAN_EVENT_DURATION] = 0 if len(bouts_df) > 1: self.video_dict[file_name][bin_cnt][clf][MEAN_EVENT_INTERVAL] = round(bout_df[:-1]["Interval duration"].mean(), 3) self.video_dict[file_name][bin_cnt][clf][MEDIAN_EVENT_INTERVAL] = round(bout_df[:-1]["Interval duration"].median(), 3) else: self.video_dict[file_name][bin_cnt][clf][MEAN_EVENT_INTERVAL] = None self.video_dict[file_name][bin_cnt][clf][MEDIAN_EVENT_INTERVAL] = None self._reformat_results()
[docs] def save(self): self.save_path = os.path.join(self.project_path, "logs", f"Time_bins_ML_results_{self.datetime}.csv") self.out_df.to_csv(self.save_path) self.timer.stop_timer() stdout_success(msg=f'Classification time-bins results saved at {self.save_path}', elapsed_time=self.timer.elapsed_time_str)
# # test = TimeBinsClfCalculator(config_path=r"E:\troubleshooting\mitra_emergence_hour\project_folder\project_config.ini", # classifiers=['CIRCLING'], # bin_length=600, # include_timestamp=True, # transpose=True) # # test.run() # test.save() # # # test = TimeBinsClfCalculator(config_path=r"D:\troubleshooting\maplight_ri\project_folder\project_config.ini", # classifiers=['attack'], # bin_length=60, # include_timestamp=True, # transpose=True) # # test.run() # test.save() # # test = TimeBinsClfCalculator(config_path=r'D:\troubleshooting\mitra\project_folder\project_config.ini', # classifiers=['lay-on-belly'], # bin_length=60) # # test.run() # test = TimeBinsClf(config_path='/Users/simon/Desktop/envs/troubleshooting/two_black_animals_14bp/project_folder/project_config.ini', # bin_length=2, # measurements=['First occurrence (s)', 'Event count', 'Total event duration (s)', 'Mean event duration (s)'], # classifiers=['Attack', 'Sniffing']) # test.analyze_timebins_clf() # test = TimeBinsClf(config_path='/Users/simon/Desktop/troubleshooting/light_analyzer/project_folder/project_config.ini', # bin_length=2, # measurements=['First occurrence (s)', 'Event count', 'Total event duration (s)', 'Mean event duration (s)']) # test.analyze_timebins_clf()