import itertools
import os
from copy import deepcopy
from typing import Optional, Union
from shapely.ops import linemerge
try:
from typing import Literal
except:
from typing_extensions import Literal
import numpy as np
import pandas as pd
from shapely.geometry import LineString
from simba.feature_extractors.perimeter_jit import get_hull_sizes
from simba.mixins.abstract_classes import AbstractFeatureExtraction
from simba.mixins.config_reader import ConfigReader
from simba.mixins.feature_extraction_mixin import FeatureExtractionMixin
from simba.mixins.geometry_mixin import GeometryMixin
from simba.mixins.timeseries_features_mixin import TimeseriesFeatureMixin
from simba.utils.checks import (
check_all_file_names_are_represented_in_video_log, check_if_dir_exists,
check_that_column_exist, check_valid_dataframe)
from simba.utils.enums import Formats
from simba.utils.lookups import get_current_time
from simba.utils.printing import SimbaTimer, stdout_success
from simba.utils.read_write import (find_core_cnt,
find_files_of_filetypes_in_directory,
get_fn_ext, read_df, write_df)
####################################################################
TOP_LEFT_CORNER = 'top_left'
TOP_RIGHT_CORNER = 'top_right'
BOTTOM_LEFT_CORNER = 'bottom_left'
BOTTOM_RIGHT_CORNER = 'bottom_right'
SNOUT = 'snout'
TAILBASE = 'tailbase'
TAIL1 = 'tail1'
TAIL2 = 'tail2'
TAILTIP = 'tailtip'
LEFT_HIP = 'lefthip'
RIGHT_HIP = 'righthip'
WINDOW_SIZES = [0.5, 1.0, 2.0, 4.0]
####################################################################
LEFT, RIGHT = 'left', 'right'
TOP, BOTTOM = 'top', 'bottom'
RECTANGLE = 'rectangle'
[docs]class BoundaryRearingFeaturizer(ConfigReader,
AbstractFeatureExtraction):
"""
:example:
>>> x = BoundaryRearingFeaturizer(config_path=r"C:\troubleshooting\open_field_rearing\project_folder\project_config.ini")
>>> x.run()
"""
def __init__(self,
config_path: Union[str, os.PathLike],
data_dir: Optional[Union[str, os.PathLike]] = None,
save_dir: Optional[Union[str, os.PathLike]] = None):
ConfigReader.__init__(self, config_path=config_path, read_video_info=True, create_logger=False)
if data_dir is None:
self.data_dir = deepcopy(self.outlier_corrected_dir)
else:
check_if_dir_exists(in_dir=data_dir)
self.data_dir = deepcopy(data_dir)
self.save_dir = self.features_dir if save_dir is None else save_dir
check_if_dir_exists(in_dir=save_dir)
self.data_paths = find_files_of_filetypes_in_directory(directory=self.data_dir, extensions=[f'.{self.file_type}'], raise_error=True)
check_all_file_names_are_represented_in_video_log(video_info_df=self.video_info_df, data_paths=self.data_paths)
self.core_count = find_core_cnt()[1]
self.top_left_x, self.top_left_y, self.top_right_x, self.top_right_y = f'{TOP_LEFT_CORNER}_x', f'{TOP_LEFT_CORNER}_y', f'{TOP_RIGHT_CORNER}_x', f'{TOP_RIGHT_CORNER}_y'
self.bottom_left_x, self.bottom_left_y, self.bottom_right_x, self.bottom_right_y = f'{BOTTOM_LEFT_CORNER}_x', f'{BOTTOM_LEFT_CORNER}_y', f'{BOTTOM_RIGHT_CORNER}_x', f'{BOTTOM_RIGHT_CORNER}_y'
self.tail_base_x, self.tail_base_y, self.snout_x, self.snout_y = f'{TAILBASE}_x', f'{TAILBASE}_y', f'{SNOUT}_x', f'{SNOUT}_y'
self.lefthip_x, self.lefthip_y, self.righthip_x, self.righthip_y = f'{LEFT_HIP}_x', f'{LEFT_HIP}_y', f'{RIGHT_HIP}_x', f'{RIGHT_HIP}_y'
self.corner_cols = [self.top_left_x, self.top_left_y, self.top_right_x, self.top_right_y, self.bottom_left_x, self.bottom_left_y, self.bottom_right_x, self.bottom_right_y]
self.hull_cols = [x for x in self.bp_headers if not any(key in x for key in (TAIL1, TAIL2, TAILTIP, TOP_LEFT_CORNER, TOP_RIGHT_CORNER, BOTTOM_LEFT_CORNER, BOTTOM_RIGHT_CORNER)) and not x.endswith('_p')]
self.p_cols = [x for x in self.bp_headers if x.endswith('_p')]
[docs] def run(self):
print(f'Processing features for {len(self.data_paths)} files (using cores: {self.core_count})...')
for file_cnt, file_path in enumerate(self.data_paths):
video_timer = SimbaTimer(start=True)
_, video_name, _ = get_fn_ext(filepath=file_path)
save_path = os.path.join(self.save_dir, f'{video_name}.{self.file_type}')
print(f'Processing {video_name} ({file_cnt+1}/{len(self.data_paths)})... file start time: {get_current_time()}')
_, pixels_per_mm, fps = self.read_video_info(video_name=video_name)
df = read_df(file_path=file_path, file_type=self.file_type)
check_valid_dataframe(df=df, source=f'{self.__class__.__name__} {file_path}', valid_dtypes=Formats.NUMERIC_DTYPES.value, required_fields=self.bp_col_names)
check_that_column_exist(df=df, column_name=[self.tail_base_x, self.tail_base_y, self.snout_x, self.snout_y], file_name=file_path, raise_error=True)
tl_x, tl_y = df[self.top_left_x].values, df[self.top_left_y].values
br_x, br_y = df[self.bottom_right_x].values, df[self.bottom_right_y].values
bl_x, bl_y = df[self.bottom_left_x].values, df[self.bottom_left_y].values
tr_x, tr_y = df[self.top_right_x].values, df[self.top_right_y].values
lines = {}
lines[LEFT] = [LineString(pts) for pts in np.stack([np.column_stack([tl_x, tl_y]), np.column_stack([bl_x, bl_y])], axis=1)]
lines[RIGHT] = [LineString(pts) for pts in np.stack([np.column_stack([tr_x, tr_y]), np.column_stack([br_x, br_y])], axis=1)]
lines[TOP] = [LineString(pts) for pts in np.stack([np.column_stack([tl_x, tl_y]), np.column_stack([tr_x, tr_y])], axis=1)]
lines[BOTTOM] = [LineString(pts) for pts in np.stack([np.column_stack([bl_x, bl_y]), np.column_stack([br_x, br_y])], axis=1)]
lines[RECTANGLE] = []
for i in range(len(lines[LEFT])):
lines[RECTANGLE].append(linemerge([lines[LEFT][i], lines[TOP][i], lines[RIGHT][i], lines[BOTTOM][i]]))
self.results, side_col_names = deepcopy(df), []
wall_dists = pd.DataFrame()
for (bp, line) in list(itertools.product([SNOUT, TAILBASE], [RECTANGLE])):
bp_data = GeometryMixin.bodyparts_to_points(data=df[[f'{bp}_x', f'{bp}_y']].values.astype(np.int32))
wall_dists[f'{bp}->{line}_mm'] = GeometryMixin().multiframe_shape_distance(shapes_a=bp_data, shapes_b=lines[line], core_cnt=self.core_count, verbose=True, shape_names=f'{video_name}, {bp}->{line}', pixels_per_mm=pixels_per_mm)
side_col_names.append(f'{bp}->{line}_mm')
self.results["min_wall_distance_mm"] = wall_dists[side_col_names].min(axis=1)
self.results["max_wall_distance_mm"] = wall_dists[side_col_names].max(axis=1)
self.results["mean_wall_distance_mm"] = wall_dists[side_col_names].mean(axis=1)
self.results["skew_wall_distance_mm"] = wall_dists[side_col_names].skew(axis=1)
self.results["std_wall_distance_mm"] = wall_dists[side_col_names].std(axis=1)
print('Computing movement sliding windows...')
for l in range(0, len(self.hull_cols), 2):
bp_data = self.results[self.hull_cols[l:l+2]]
shifted = FeatureExtractionMixin.create_shifted_df(df=bp_data, periods=1).values[:, -2:]
frame_movement = FeatureExtractionMixin().keypoint_distances(a=bp_data.values, b=shifted, px_per_mm=pixels_per_mm, in_centimeters=False).astype(np.int32)
bp_name = self.hull_cols[l][:-2]
sum_arr = TimeseriesFeatureMixin.sliding_window_stats(data=frame_movement, window_sizes=WINDOW_SIZES, statistics=['sum'], sample_rate=fps)
self.results = pd.concat([self.results, pd.DataFrame(sum_arr[0], columns=[f'{bp_name}_sum_movement_{WINDOW_SIZES[0]}s', f'{bp_name}_sum_movement_{WINDOW_SIZES[1]}s', f'{bp_name}_sum_movement_{WINDOW_SIZES[2]}s', f'{bp_name}_sum_movement_{WINDOW_SIZES[3]}s'])], axis=1)
print('Computing hull sizes...')
self.results['hull_perimeter_mm'] = get_hull_sizes(points=df[self.hull_cols].values.reshape(len(df), -1, 2), target='perimeter', pixels_per_mm=pixels_per_mm)
self.results['nose_2_tail_distance_mm'] = FeatureExtractionMixin().keypoint_distances(a=self.results[[self.snout_x, self.snout_y]].values, b=self.results[[self.tail_base_x, self.tail_base_y]].values, px_per_mm=pixels_per_mm, in_centimeters=False).astype(np.int32)
self.results['left_2_right_hip_distance_mm'] = FeatureExtractionMixin().keypoint_distances(a=self.results[[self.lefthip_x, self.lefthip_y]].values, b=self.results[[self.righthip_x, self.righthip_y]].values, px_per_mm=pixels_per_mm, in_centimeters=False).astype(np.int32)
print('Computing pose confidence distributions...')
p = FeatureExtractionMixin.count_values_in_range(data=self.results[self.p_cols].values, ranges=np.array([[0.0, 0.20], [0.20, 0.40]]))
p = pd.DataFrame(data=p, columns=['low_conf_detections_0_2', 'low_conf_detections_2_4'])
self.results = pd.concat([self.results, p], axis=1)
print('Computing hull sizes sliding windows...')
window_cols = ['hull_perimeter_mm', 'nose_2_tail_distance_mm', 'left_2_right_hip_distance_mm', 'low_conf_detections_0_2', 'low_conf_detections_2_4']
for measure in window_cols:
min_arr = TimeseriesFeatureMixin.sliding_window_stats(data=self.results[measure].values.flatten().astype(np.float32), window_sizes=WINDOW_SIZES, statistics=['min'], sample_rate=fps)
max_arr = TimeseriesFeatureMixin.sliding_window_stats(data=self.results[measure].values.flatten().astype(np.float32), window_sizes=WINDOW_SIZES, statistics=['max'], sample_rate=fps)
mean_arr = TimeseriesFeatureMixin.sliding_window_stats(data=self.results[measure].values.flatten().astype(np.float32), window_sizes=WINDOW_SIZES, statistics=['mean'], sample_rate=fps)
self.results = pd.concat([self.results, pd.DataFrame(min_arr[0], columns=[f'{measure}_min_{WINDOW_SIZES[0]}s', f'{measure}_min_{WINDOW_SIZES[1]}s', f'{measure}_min_{WINDOW_SIZES[2]}s', f'{measure}_min_{WINDOW_SIZES[3]}s'])], axis=1)
self.results = pd.concat([self.results, pd.DataFrame(max_arr[0], columns=[f'{measure}_max_{WINDOW_SIZES[0]}s', f'{measure}_max_{WINDOW_SIZES[1]}s', f'{measure}_max_{WINDOW_SIZES[2]}s', f'{measure}_max_{WINDOW_SIZES[3]}s'])], axis=1)
self.results = pd.concat([self.results, pd.DataFrame(mean_arr[0], columns=[f'{measure}_mean_{WINDOW_SIZES[0]}s', f'{measure}_mean_{WINDOW_SIZES[1]}s', f'{measure}_mean_{WINDOW_SIZES[2]}s', f'{measure}_mean_{WINDOW_SIZES[3]}s'])], axis=1)
self.save(data=self.results, save_path=save_path)
video_timer.stop_timer()
stdout_success(msg=f'{video_name} complete!', elapsed_time=video_timer.elapsed_time_str)
self.timer.stop_timer()
stdout_success(msg=f'{len(self.data_paths)} data files saved in {self.features_dir}', elapsed_time=self.timer.elapsed_time_str, source=self.__class__.__name__)
[docs] def save(self,
data: pd.DataFrame,
save_path: str):
write_df(df=data, file_type=self.file_type, save_path=save_path)
# x = BoundaryRearingFeaturizer(config_path=r"C:\troubleshooting\mouse_open_field\project_folder\project_config.ini",
# data_dir=r'C:\troubleshooting\mouse_open_field\project_folder\csv\outlier_corrected_movement_location',
# save_dir=r'C:\troubleshooting\mouse_open_field\project_folder\csv\features_extracted')
# x.run()