Source code for simba.third_party_label_appenders.observer_importer

import glob
import os
from copy import deepcopy

import numpy as np
import pandas as pd

from simba.mixins.config_reader import ConfigReader
from simba.utils.checks import check_if_filepath_list_is_empty
from simba.utils.errors import (AnnotationFileNotFoundError,
                                ColumnNotFoundError,
                                ThirdPartyAnnotationEventCountError,
                                ThirdPartyAnnotationOverlapError)
from simba.utils.printing import SimbaTimer, stdout_success
from simba.utils.read_write import get_fn_ext, read_df, write_df
from simba.utils.warnings import (
    ThirdPartyAnnotationsClfMissingWarning,
    ThirdPartyAnnotationsOutsidePoseEstimationDataWarning)

TIME_FIELD = "Time_Relative_hmsf"
VIDEO_NAME_FIELD = "Observation"
BEHAVIOR_FIELD = "Behavior"
EVENT_TYPE_FIELD = "Event_Type"
POINT_EVENT = "Point"
START = "State start"
STOP = "State stop"
EXPECTED_FIELDS = [TIME_FIELD, VIDEO_NAME_FIELD, BEHAVIOR_FIELD, EVENT_TYPE_FIELD]


[docs]class NoldusObserverImporter(ConfigReader): """ Append Noldus Observer human annotations onto featurized pose-estimation data. Results are saved within the project_folder/csv/targets_inserted directory of the SimBA project (as parquets' or CSVs). :param str config_path: path to SimBA project config file in Configparser format :param str data_dir: path to folder holding Observer data files is XLSX or XLS format .. note:: `Third-party import GitHub tutorial <https://github.com/sgoldenlab/simba/blob/master/docs/third_party_annot.md>`__. `Expected input example 1 <https://github.com/sgoldenlab/simba/blob/master/misc/Observer_example_1.xlsx>`__. `Expected input example 2 <https://github.com/sgoldenlab/simba/blob/master/misc/Observer_example_2.xlsx>`__. Examples ----- >>> _ = NoldusObserverImporter(config_path='MyConfigPath', data_dir='MyNoldusObserverDataDir').run() """ def __init__(self, config_path: str, data_dir: str): super().__init__(config_path=config_path) self.observer_files_found = glob.glob(data_dir + "/*.xlsx") + glob.glob( data_dir + "/*.xls" ) self.observer_files_found = [ x for x in self.observer_files_found if "~$" not in x ] check_if_filepath_list_is_empty( filepaths=self.observer_files_found, error_msg=f"SIMBA ERROR: The {data_dir} directory contains ZERO xlsx/xls files", ) check_if_filepath_list_is_empty( filepaths=self.feature_file_paths, error_msg=f"SIMBA ERROR: The {self.features_dir} directory contains ZERO files", ) self.__read_data() def __check_column_names(self, df: pd.DataFrame, file_path: str): remain = list(set(EXPECTED_FIELDS) - set(list(df.columns))) if remain: raise ColumnNotFoundError(file_name=file_path, column_name=remain[0]) def check_timestamps(self, timestamps=list): corrected_ts = [] for timestamp in timestamps: h, m, s = timestamp.split(":", 3) missing_fractions = 9 - len(s) if missing_fractions == 0: corrected_ts.append(timestamp) else: corrected_ts.append(f'{h}:{m}:{s}.{"0" * missing_fractions}') return corrected_ts def __read_data(self): print( f"Reading Noldus Observer annotation files ({str(len(self.observer_files_found))} files)..." ) self.annotation = {} for file_path in self.observer_files_found: try: df = pd.read_excel( file_path, sheet_name=None, usecols=EXPECTED_FIELDS ).popitem(last=False)[1] except KeyError: raise ColumnNotFoundError( file_name=file_path, column_name=", ".join(EXPECTED_FIELDS) ) for video_name in df[VIDEO_NAME_FIELD].unique(): video_df = df[df[VIDEO_NAME_FIELD] == video_name].reset_index(drop=True) self.__check_column_names(df=video_df, file_path=file_path) video_df = video_df[video_df[EVENT_TYPE_FIELD] != POINT_EVENT] video_df[TIME_FIELD] = self.check_timestamps( timestamps=list(video_df[TIME_FIELD].astype(str)) ) video_df[TIME_FIELD] = pd.to_timedelta(video_df[TIME_FIELD]) video_df[EVENT_TYPE_FIELD] = video_df[EVENT_TYPE_FIELD].replace( {START: "START", STOP: "STOP"} ) _, _, fps = self.read_video_info(video_name=video_name) video_df["FRAME"] = video_df[TIME_FIELD].dt.total_seconds() * fps video_df["FRAME"] = video_df["FRAME"].apply(np.floor) video_df = video_df.drop([TIME_FIELD, VIDEO_NAME_FIELD], axis=1) if video_name in list(self.annotation.keys()): self.annotation[video_name] = pd.concat( [self.annotation[video_name], video_df], axis=0 ).reset_index(drop=True) else: self.annotation[video_name] = video_df for k, v in self.annotation.items(): self.annotation[k] = v.sort_values(by="FRAME").reset_index(drop=True) print( f"Annotations for {str(len(list(self.annotation.keys())))} video names found in Ethovision files..." ) def run(self): for file_path in self.feature_file_paths: video_timer = SimbaTimer() video_timer.start_timer() _, file_name, _ = get_fn_ext(filepath=file_path) if file_name not in list(self.annotation.keys()): raise AnnotationFileNotFoundError(video_name=file_name) data_df = read_df(file_path=file_path, file_type=self.file_type) output_df = deepcopy(data_df) for clf_name in self.clf_names: clf_df = self.annotation[file_name][[EVENT_TYPE_FIELD, "FRAME"]][ self.annotation[file_name][BEHAVIOR_FIELD] == clf_name ].reset_index(drop=True) start_events, stop_events = len( clf_df[clf_df[EVENT_TYPE_FIELD] == "START"] ), len(clf_df[clf_df[EVENT_TYPE_FIELD] == "STOP"]) if start_events != stop_events: raise ThirdPartyAnnotationEventCountError( video_name=file_name, clf_name=clf_name, start_event_cnt=start_events, stop_event_cnt=stop_events, ) start_df, stop_df = clf_df[ clf_df[EVENT_TYPE_FIELD] == "START" ].reset_index(drop=True), clf_df[ clf_df[EVENT_TYPE_FIELD] == "STOP" ].reset_index( drop=True ) start_df, stop_df = start_df["FRAME"].rename( columns={"FRAME": "START"} ), stop_df["FRAME"].rename(columns={"FRAME": "STOP"}) clf_df = pd.concat([start_df, stop_df], axis=1).reset_index(drop=True) clf_df.columns = ["START", "STOP"] if len(clf_df.query("START > STOP")) > 0: raise ThirdPartyAnnotationOverlapError( video_name=file_name, clf_name=clf_name ) if len(clf_df) == 0: ThirdPartyAnnotationsClfMissingWarning( video_name=file_name, clf_name=clf_name ) output_df[clf_name] = 0 continue annot_idx = list( clf_df.apply( lambda x: list(range(int(x["START"]), int(x["STOP"]) + 1)), 1 ) ) annot_idx = [x for xs in annot_idx for x in xs] idx_diff = list(set(annot_idx) - set(data_df.index)) if len(idx_diff) > 0: ThirdPartyAnnotationsOutsidePoseEstimationDataWarning( video_name=file_name, clf_name=clf_name, frm_cnt=data_df.index[-1], first_error_frm=idx_diff[0], ambiguous_cnt=len(idx_diff), ) annot_idx = [x for x in annot_idx if x not in idx_diff] output_df[clf_name] = 0 output_df.loc[annot_idx, clf_name] = 1 self.__save( df=output_df, path=os.path.join( self.targets_folder, file_name + "." + self.file_type ), ) video_timer.stop_timer() print( f"Imported Noldus Observer annotations for video {file_name} (elapsed time {video_timer.elapsed_time_str}s)..." ) self.timer.stop_timer() stdout_success( msg=f"Imported annotations saved in project/folder/csv/targets_inserted directory", elapsed_time=self.timer.elapsed_time_str, ) def __save(self, df: pd.DataFrame, path: str): write_df(df=df, file_type=self.file_type, save_path=path)
# test = NoldusObserverImporter(config_path='/Users/simon/Desktop/envs/troubleshooting/Gosia/project_folder/project_config.ini', # data_dir='/Users/simon/Desktop/envs/troubleshooting/Gosia/source/behaviours/Exp_38') # test.run() # for k, v in test.annotation.items(): # print(v[BEHAVIOR_FIELD].unique()) # # # # # test.run()