Source code for simba.bounding_box_tools.boundary_statistics

__author__ = "Simon Nilsson; sronilsson@gmail.com"

import os
from collections import defaultdict
from copy import deepcopy

import pandas as pd
from joblib import Parallel, delayed
from shapely.geometry import Point

from simba.mixins.config_reader import ConfigReader
from simba.utils.enums import Formats
from simba.utils.errors import NoFilesFoundError
from simba.utils.printing import stdout_success
from simba.utils.read_write import read_df, write_df


[docs]class BoundaryStatisticsCalculator(ConfigReader): """ Compute boundary intersection statistics. :param str config_path: Path to SimBA project config file in Configparser format. :param bool roi_intersections: If True, calculates intersection of animal-anchored ROIs :param bool roi_keypoint_intersections: If True, calculates intersection of animal-anchored ROIs and pose-estimated animal key-points. :param str save_format: Output data format. OPTIONS: CSV, PARQUET, PICKLE. Notes ---------- `Bounding boxes tutorial <https://github.com/sgoldenlab/simba/blob/master/docs/anchored_rois.md>`_. Examples ---------- >>> boundary_stats_calculator = BoundaryStatisticsCalculator(config_path='MyConfigFile',roi_intersections=True, roi_keypoint_intersections=True, save_format='CSV') >>> boundary_stats_calculator.save_results() """ def __init__( self, config_path: str, roi_intersections: bool, roi_keypoint_intersections: bool, save_format: str, ): ConfigReader.__init__(self, config_path=config_path) self.save_format, self.roi_intersections, self.roi_keypoint_intersections = ( save_format, roi_intersections, roi_keypoint_intersections, ) self.anchored_roi_path = os.path.join( self.project_path, "logs", "anchored_rois.pickle" ) self.save_folder = os.path.join(self.project_path, "csv", "anchored_roi_data") if not os.path.isfile(self.anchored_roi_path): raise NoFilesFoundError( msg=f"No anchored ROI data detected. Extract anchored ROIs before computing statistics. File expected at path {self.anchored_roi_path}" ) self.polygons = read_df( file_path=self.anchored_roi_path, file_type=Formats.PICKLE.value ) self.calculate_statistics() def _find_intersections(self, animal_roi: list, other_animals: dict): results = [] for first_animal, second_animal in zip(animal_roi, other_animals): results.append(first_animal.intersects(second_animal)) return results def _find_points_in_roi(self, animal_roi: list, second_animal_bps: list): results = [] for polygon_cnt, polygon in enumerate(animal_roi): frm_results = [] for k, v in second_animal_bps[polygon_cnt].items(): frm_results.append(Point(v).within(polygon)) results.append(frm_results) return results def _sort_keypoint_results(self, first_animal_name: str, second_animal_name: str): results = defaultdict(list) for frm in self.results: for body_part_cnt, body_part in enumerate(frm): body_part_name = self.animal_bp_dict[second_animal_name]["X_bps"][ body_part_cnt ][:-4] results[ "{}:{}:{}".format( first_animal_name, second_animal_name, body_part_name ) ].append(int(body_part)) return pd.DataFrame(results) def _sort_intersection_results(self): results = defaultdict(list) for animal_one_name, animal_one_data in self.intersecting_rois.items(): for animal_two_name, animal_two_data in animal_one_data.items(): results["{}:{}:ROI_ONLY".format(animal_one_name, animal_two_name)] = [ int(x) for x in animal_two_data ] return pd.DataFrame(results) def calculate_statistics(self): self.intersection_dfs = {} self.keypoint_dfs = {} for video_cnt, (video_name, video_data) in enumerate(self.polygons.items()): print("Calculating statistics for video {}...".format(video_name)) if self.roi_intersections: self.intersecting_rois = {} print( "Calculating intersecting anchored ROIs for video {}...".format( video_name ) ) for first_animal in self.animal_bp_dict.keys(): first_animal_anchored_rois = [ video_data[first_animal][i : i + 100] for i in range(0, len(video_data[first_animal]), 100) ] self.intersecting_rois[first_animal] = {} for second_animal in { k: v for k, v in video_data.items() if k != first_animal }.keys(): second_animal_anchored_rois = [ video_data[second_animal][i : i + 100] for i in range(0, len(video_data[second_animal]), 100) ] results = Parallel(n_jobs=5, verbose=2, backend="threading")( delayed(self._find_intersections)(i, j) for i, j in zip( first_animal_anchored_rois, second_animal_anchored_rois ) ) self.intersecting_rois[first_animal][second_animal] = [ i for s in results for i in s ] self.intersection_dfs[video_name] = self._sort_intersection_results() if self.roi_keypoint_intersections: self.data_df = read_df( os.path.join( self.outlier_corrected_dir, video_name + "." + self.file_type ), self.file_type, ).astype(int) keypoints_df_lst = [] print( "Calculate intersecting anchored ROIs and keypoints for video {}...".format( video_name ) ) for first_animal in self.animal_bp_dict.keys(): first_animal_anchored_rois = [ video_data[first_animal][i : i + 100] for i in range(0, len(video_data[first_animal]), 100) ] for second_animal in { k: v for k, v in self.animal_bp_dict.items() if k != first_animal }.keys(): second_animal_name = deepcopy(second_animal) second_animal_df_tuples = pd.DataFrame() for x_col, y_col in zip( self.animal_bp_dict[second_animal]["X_bps"], self.animal_bp_dict[second_animal]["Y_bps"], ): second_animal_df_tuples[x_col[:-4]] = list( zip(self.data_df[x_col], self.data_df[y_col]) ) second_animal = second_animal_df_tuples.to_dict( orient="records" ) second_animal = [ second_animal[i : i + 100] for i in range(0, len(second_animal), 100) ] results = Parallel(n_jobs=5, verbose=1, backend="threading")( delayed(self._find_points_in_roi)(i, j) for i, j in zip(first_animal_anchored_rois, second_animal) ) self.results = [i for s in results for i in s] keypoints_df_lst.append( self._sort_keypoint_results( first_animal_name=first_animal, second_animal_name=second_animal_name, ) ) self.keypoint_dfs[video_name] = pd.concat(keypoints_df_lst, axis=1) def save_results(self): if not os.path.exists(self.save_folder): os.makedirs(self.save_folder) out_df = None for video_cnt, (video_name, video_data) in enumerate(self.polygons.items()): save_path = os.path.join( self.save_folder, f"{video_name}.{self.save_format.lower()}" ) if (self.roi_intersections) and (self.roi_keypoint_intersections): out_df = pd.concat( [self.intersection_dfs[video_name], self.keypoint_dfs[video_name]], axis=1, ) elif self.roi_intersections: out_df = self.intersection_dfs[video_name] elif self.roi_keypoint_intersections: out_df = self.keypoint_dfs[video_name] write_df(df=out_df, file_type=self.save_format.lower(), save_path=save_path) print(f"Data for video {video_name} saved...") stdout_success( msg=f"Data for {str(len(self.polygons.keys()))} videos saved in {self.save_folder}" )
# boundary_stats_calculator = BoundaryStatisticsCalculator(config_path='/Users/simon/Desktop/envs/troubleshooting/sleap_5_animals/project_folder/project_config.ini', # roi_intersections=True, # roi_keypoint_intersections=True, # save_format='PICKLE') # boundary_stats_calculator.save_results()