Source code for simba.roi_tools.ROI_analyzer

__author__ = "Simon Nilsson"

import os
from typing import List, Optional, Union

import numpy as np
import pandas as pd

from simba.mixins.config_reader import ConfigReader
from simba.mixins.feature_extraction_mixin import FeatureExtractionMixin
from simba.mixins.feature_extraction_supplement_mixin import \
    FeatureExtractionSupplemental
from simba.utils.checks import (
    check_all_file_names_are_represented_in_video_log,
    check_file_exist_and_readable, check_float, check_that_column_exist,
    check_valid_lst)
from simba.utils.data import detect_bouts, slice_roi_dict_for_video
from simba.utils.enums import Keys
from simba.utils.errors import (CountError, MissingColumnsError,
                                ROICoordinatesNotFoundError)
from simba.utils.printing import stdout_success
from simba.utils.read_write import get_fn_ext, read_data_paths, read_df
from simba.utils.warnings import NoDataFoundWarning


[docs]class ROIAnalyzer(ConfigReader, FeatureExtractionMixin): """ Analyze movements, entries, exits, and time-spent-in user-defined ROIs. Results are stored in the 'project_folder/logs' directory of the SimBA project. :param str config_path: Path to SimBA project config file in Configparser format. :param Optional[str] data_path: Path to folder or file holding the data used to calculate ROI aggregate statistics. If None, then defaults to the `project_folder/csv/outlier_corrected_movement_location` directory of the SimBA project. Default: None. :param Optional[bool] calculate_distances: If True, then calculate movements aggregate statistics (distances and velocities) inside ROIs. Results are saved in ``project_folder/logs/`` directory. Default: False. :param Optional[bool] detailed_bout_data: If True, saves a file with a row for every entry into each ROI for each animal in each video. Results are saved in ``project_folder/logs/`` directory. Default: False. :param Optional[float] threshold: Float between 0 and 1. Body-part locations detected below this confidence threshold are filtered. Default: 0.0. :param Optional[List[str]] body_parts: List of body-parts to perform ROI analysis on. .. note:: `ROI tutorials <https://github.com/sgoldenlab/simba/blob/master/docs/ROI_tutorial_new.md>`__. :example: >>> test = ROIAnalyzer(config_path = r"/Users/simon/Desktop/envs/simba/troubleshooting/two_black_animals_14bp/project_folder/project_config.ini", calculate_distances=True, detailed_bout_data=True, body_parts=['Nose_1', 'Nose_2'], threshold=0.0) >>> test.run() >>> test.save() """ def __init__(self, config_path: Union[str, os.PathLike], data_path: Optional[Union[str, os.PathLike, List[str]]] = None, detailed_bout_data: bool = False, calculate_distances: bool = False, threshold: float = 0.0, body_parts: Optional[List[str]] = None): check_file_exist_and_readable(file_path=config_path) ConfigReader.__init__(self, config_path=config_path) if not os.path.isfile(self.roi_coordinates_path): raise ROICoordinatesNotFoundError(expected_file_path=self.roi_coordinates_path) self.read_roi_data() FeatureExtractionMixin.__init__(self) self.data_paths = read_data_paths(path=data_path, default=self.outlier_corrected_paths, default_name=self.outlier_corrected_dir, file_type=self.file_type) check_float(name="Body-part probability threshold", value=threshold, min_value=0.0, max_value=1.0) check_valid_lst(data=body_parts, source=f"{self.__class__.__name__} body-parts", valid_dtypes=(str,)) if len(set(body_parts)) != len(body_parts): raise CountError(msg=f"All body-part entries have to be unique. Got {body_parts}", source=self.__class__.__name__) self.bp_dict, self.bp_lk = {}, {} for bp in body_parts: animal = self.find_animal_name_from_body_part_name(bp_name=bp, bp_dict=self.animal_bp_dict) self.bp_dict[animal] = [f'{bp}_{"x"}', f'{bp}_{"y"}', f'{bp}_{"p"}'] self.bp_lk[animal] = bp self.roi_headers = [v for k, v in self.bp_dict.items()] self.roi_headers = [item for sublist in self.roi_headers for item in sublist] self.calculate_distances, self.threshold = calculate_distances, threshold self.detailed_bout_data = detailed_bout_data def run(self): check_all_file_names_are_represented_in_video_log(video_info_df=self.video_info_df, data_paths=self.data_paths) self.movements_df = pd.DataFrame(columns=["VIDEO", "ANIMAL", "SHAPE", "MEASUREMENT", "VALUE"]) self.entry_results = pd.DataFrame(columns=["VIDEO", "ANIMAL", "SHAPE", "ENTRY COUNT"]) self.time_results = pd.DataFrame(columns=["VIDEO", "ANIMAL", "SHAPE", "TIME (S)"]) self.roi_bout_results = [] self.detailed_df = None for file_cnt, file_path in enumerate(self.data_paths): _, video_name, _ = get_fn_ext(file_path) print(f"Analysing ROI data for video {video_name}... (Video {file_cnt+1}/{len(self.data_paths)})") video_settings, pix_per_mm, self.fps = self.read_video_info(video_name=video_name) self.sliced_roi_dict, video_shape_names = slice_roi_dict_for_video(data=self.roi_dict, video_name=video_name) if len(video_shape_names) == 0: NoDataFoundWarning(msg=f"Skipping video {video_name}: No user-defined ROI data found for this video...") continue else: self.data_df = read_df(file_path, self.file_type).reset_index(drop=True) if len(self.bp_headers) != len(self.data_df.columns): raise MissingColumnsError(msg=f"The data file {file_path} contains {len(self.data_df.columns)} body-part columns, but the project is made for {len(self.bp_headers)} body-part columns as suggested by the {self.body_parts_path} file", source=self.__class__.__name__) self.data_df.columns = self.bp_headers check_that_column_exist(df=self.data_df, column_name=self.roi_headers, file_name=file_path) for animal_name, bp_names in self.bp_dict.items(): animal_df = self.data_df[self.bp_dict[animal_name]].reset_index(drop=True) animal_bout_results = {} for _, row in self.sliced_roi_dict[Keys.ROI_RECTANGLES.value].iterrows(): roi_coords = np.array([[row["topLeftX"], row["topLeftY"]], [row["Bottom_right_X"], row["Bottom_right_Y"]]]) animal_df[row["Name"]] = (FeatureExtractionMixin.framewise_inside_rectangle_roi(bp_location=animal_df.values[:, 0:2], roi_coords=roi_coords)) animal_df.loc[animal_df[bp_names[2]] < self.threshold, row["Name"]] = 0 roi_bouts = detect_bouts(data_df=animal_df, target_lst=[row["Name"]], fps=self.fps) roi_bouts["ANIMAL"] = animal_name roi_bouts["VIDEO"] = video_name self.roi_bout_results.append(roi_bouts) animal_bout_results[row["Name"]] = roi_bouts self.entry_results.loc[len(self.entry_results)] = [video_name,animal_name,row["Name"], len(roi_bouts)] self.time_results.loc[len(self.time_results)] = [video_name,animal_name,row["Name"],roi_bouts["Bout_time"].sum()] for _, row in self.sliced_roi_dict[Keys.ROI_CIRCLES.value].iterrows(): center_x, center_y = row["centerX"], row["centerY"] animal_df[f'{row["Name"]}_distance'] = (FeatureExtractionMixin.framewise_euclidean_distance_roi(location_1=animal_df.values[:, 0:2], location_2=np.array([center_x, center_y]), px_per_mm=1)) animal_df[row["Name"]] = 0 animal_df.loc[animal_df[f'{row["Name"]}_distance'] <= row["radius"], row["Name"]] = 1 animal_df.loc[animal_df[bp_names[2]] < self.threshold, row["Name"]] = 0 roi_bouts = detect_bouts(data_df=animal_df, target_lst=[row["Name"]], fps=self.fps) roi_bouts["ANIMAL"] = animal_name roi_bouts["VIDEO"] = video_name self.roi_bout_results.append(roi_bouts) animal_bout_results[row["Name"]] = roi_bouts self.entry_results.loc[len(self.entry_results)] = [video_name, animal_name, row["Name"], len(roi_bouts)] self.time_results.loc[len(self.time_results)] = [video_name,animal_name,row["Name"], roi_bouts["Bout_time"].sum()] for _, row in self.sliced_roi_dict[Keys.ROI_POLYGONS.value].iterrows(): roi_coords = np.array(list(zip(row["vertices"][:, 0], row["vertices"][:, 1]))) animal_df[row["Name"]] = (FeatureExtractionMixin.framewise_inside_polygon_roi(bp_location=animal_df.values[:, 0:2], roi_coords=roi_coords)) animal_df.loc[ animal_df[bp_names[2]] < self.threshold, row["Name"] ] = 0 roi_bouts = detect_bouts( data_df=animal_df, target_lst=[row["Name"]], fps=self.fps ) roi_bouts["ANIMAL"] = animal_name roi_bouts["VIDEO"] = video_name self.roi_bout_results.append(roi_bouts) animal_bout_results[row["Name"]] = roi_bouts self.entry_results.loc[len(self.entry_results)] = [ video_name, animal_name, row["Name"], len(roi_bouts), ] self.time_results.loc[len(self.time_results)] = [ video_name, animal_name, row["Name"], roi_bouts["Bout_time"].sum(), ] if self.calculate_distances: for roi_name, roi_data in animal_bout_results.items(): if len(roi_data) == 0: self.movements_df.loc[len(self.movements_df)] = [video_name, animal_name, roi_name, "Movement (cm)", 0,] self.movements_df.loc[len(self.movements_df)] = [video_name, animal_name, roi_name, "Average velocity (cm/s)", "None",] else: distances, velocities = [], [] roi_frames = roi_data[["Start_frame", "End_frame"]].values for event in roi_frames: event_pose = animal_df.loc[np.arange(event[0], event[1] + 1), bp_names] event_pose = event_pose[event_pose[bp_names[2]] > self.threshold][bp_names[:2]].values if event_pose.shape[0] > 1: distance, velocity = (FeatureExtractionSupplemental.distance_and_velocity(x=event_pose, fps=self.fps, pixels_per_mm=pix_per_mm, centimeters=True)) distances.append(distance) velocities.append(velocity) self.movements_df.loc[len(self.movements_df)] = [video_name, animal_name, roi_name, "Movement (cm)", sum(distances)] self.movements_df.loc[len(self.movements_df)] = [video_name, animal_name, roi_name, "Average velocity (cm/s)", np.average(velocities)] if len(self.roi_bout_results) > 0: self.detailed_df = pd.concat(self.roi_bout_results, axis=0) self.detailed_df = self.detailed_df.rename(columns={"Event": "SHAPE NAME", "Start_time": "START TIME", "End Time": "END TIME", "Start_frame": "START FRAME", "End_frame": "END FRAME", "Bout_time": "DURATION (S)"}) self.detailed_df["BODY-PART"] = self.detailed_df["ANIMAL"].map(self.bp_lk) self.detailed_df = self.detailed_df[["VIDEO", "ANIMAL", "BODY-PART", "SHAPE NAME", "START TIME", "END TIME", "START FRAME", "END FRAME", "DURATION (S)"]].reset_index(drop=True) def save(self): self.entry_results["BODY-PART"] = self.entry_results["ANIMAL"].map(self.bp_lk) self.time_results["BODY-PART"] = self.time_results["ANIMAL"].map(self.bp_lk) self.entry_results = self.entry_results[["VIDEO", "ANIMAL", "BODY-PART", "SHAPE", "ENTRY COUNT"]] self.time_results = self.time_results[["VIDEO", "ANIMAL", "BODY-PART", "SHAPE", "TIME (S)"]] self.entry_results.to_csv(os.path.join(self.logs_path, f'{"ROI_entry_data"}_{self.datetime}.csv')) self.time_results.to_csv(os.path.join(self.logs_path, f'{"ROI_time_data"}_{self.datetime}.csv')) if self.detailed_bout_data and self.detailed_df is not None: detailed_path = os.path.join(self.logs_path, f'{"Detailed_ROI_data"}_{self.datetime}.csv') self.detailed_df.to_csv(detailed_path) print(f"Detailed ROI data saved at {detailed_path}...") if self.calculate_distances: movement_path = os.path.join(self.logs_path, f'{"ROI_movement_data"}_{self.datetime}.csv') self.movements_df["BODY-PART"] = self.movements_df["ANIMAL"].map(self.bp_lk) self.movements_df = self.movements_df[["VIDEO", "ANIMAL", "BODY-PART", "SHAPE", "MEASUREMENT", "VALUE"]] self.movements_df.to_csv(movement_path) print(f"ROI aggregate movement data saved at {movement_path}...") stdout_success(msg=f"ROI time and ROI entry saved in the {self.logs_path} directory in CSV format.")
# test = ROIAnalyzer(config_path = r"C:\troubleshooting\ROI_movement_test\project_folder\project_config.ini", # data_path=None, # calculate_distances=True, # detailed_bout_data=True, # body_parts=['Head'], # threshold=0.0) # test.run() # test = ROIAnalyzer(config_path = r"/Users/simon/Desktop/envs/simba/troubleshooting/two_black_animals_14bp/project_folder/project_config.ini", # data_path=None, # calculate_distances=True, # detailed_bout_data=True, # body_parts=['Nose_1', 'Nose_2'], # threshold=0.0) # test.run() # test.save() # # test = ROIAnalyzer(ini_path = r"/Users/simon/Desktop/envs/simba/troubleshooting/two_black_animals_14bp/project_folder/project_config.ini", # data_path = "outlier_corrected_movement_location", # calculate_distances=True, # settings={'threshold': 0.00, 'body_parts': {'Animal_1': 'Nose_1'}}) # test.run() # test.save() # test = ROIAnalyzer(ini_path = r"/Users/simon/Desktop/envs/simba/troubleshooting/two_black_animals_14bp/project_folder/project_config.ini", # data_path = "outlier_corrected_movement_location", # calculate_distances=True) # test.run() # test = ROIAnalyzer(ini_path = r"/Users/simon/Desktop/envs/simba_dev/tests/data/test_projects/zebrafish/project_folder/project_config.ini", # data_path = "outlier_corrected_movement_location", # calculate_distances=True) # settings = {'body_parts': {'animal_1_bp': 'Ear_left_1', 'animal_2_bp': 'Ear_left_2', 'animal_3_bp': 'Ear_right_1',}, 'threshold': 0.4} # test = ROIAnalyzer(ini_path = r"/Users/simon/Desktop/envs/troubleshooting/two_animals_16bp_032023/project_folder/project_config.ini", # data_path = "outlier_corrected_movement_location", # settings=settings, # calculate_distances=True) # test.run() # test.save() # settings = {'body_parts': {'Simon': 'Ear_left_1', 'JJ': 'Ear_left_2'}, 'threshold': 0.4} # test = ROIAnalyzer(ini_path = r"/Users/simon/Desktop/envs/troubleshooting/two_black_animals_14bp/project_folder/project_config.ini", # data_path = "outlier_corrected_movement_location", # settings=settings, # calculate_distances=True) # test.read_roi_dfs() # test.analyze_ROIs() # test.save_data() # settings = {'body_parts': {'animal_1_bp': 'Ear_left_1', 'animal_2_bp': 'Ear_left_2'}, 'threshold': 0.4} # test = ROIAnalyzer(ini_path = r"/Users/simon/Desktop/envs/troubleshooting/two_black_animals_14bp/project_folder/project_config.ini", # data_path = "outlier_corrected_movement_location", # calculate_distances=True) # test.run() # test.analyze_ROIs() # test.save_data()