Source code for simba.feature_extractors.feature_extractor_user_defined
__author__ = "Simon Nilsson; sronilsson@gmail.com"
import os
from itertools import product
from typing import Union
import numpy as np
import pandas as pd
from simba.mixins.config_reader import ConfigReader
from simba.mixins.feature_extraction_mixin import FeatureExtractionMixin
from simba.utils.checks import check_float, check_str
from simba.utils.errors import MissingColumnsError, ParametersFileError
from simba.utils.printing import SimbaTimer, stdout_success
from simba.utils.read_write import get_fn_ext, read_df, write_df
[docs]class UserDefinedFeatureExtractor(ConfigReader, FeatureExtractionMixin):
"""
Generic featurizer of data within SimBA project using user-defined body-parts in the pose-estimation data.
Results are stored in the `project_folder/csv/features_extracted` directory of the SimBA project.
.. note::
`Feature extraction tutorial <https://github.com/sgoldenlab/simba/blob/master/docs/tutorial.md#step-5-extract-features>`__.
:param str config_path: path to SimBA project config file in Configparser format
:example:
>>> feature_extractor = UserDefinedFeatureExtractor(config_path='MyProjectConfig')
>>> feature_extractor.run()
"""
def __init__(self, config_path: Union[str, os.PathLike]):
FeatureExtractionMixin.__init__(self, config_path=config_path)
ConfigReader.__init__(self, config_path=config_path)
print(f"Extracting features from {len(self.files_found)} file(s)...")
def __euclid_dist_between_bps_of_other_animals(self):
print("Calculating euclidean distances...")
self.distance_col_names = []
for animal_name, animal_data in self.animal_bp_dict.items():
current_animal_bp_xs, current_animal_bp_ys = (animal_data["X_bps"], animal_data["Y_bps"])
other_animals = {
i: self.animal_bp_dict[i]
for i in self.animal_bp_dict
if i != animal_name
}
for current_animal_bp_x, current_animal_bp_y in zip(
current_animal_bp_xs, current_animal_bp_ys
):
for other_animal_name, other_animal_data in other_animals.items():
other_animal_bp_xs, other_animal_bp_ys = (
other_animal_data["X_bps"],
other_animal_data["Y_bps"],
)
for other_animal_bp_x, other_animal_bp_y in zip(
other_animal_bp_xs, other_animal_bp_ys
):
current_bp_name, other_bp_name = current_animal_bp_x.strip(
"_x"
), other_animal_bp_x.strip("_x")
col_name = "Euclidean_distance_{}_{}".format(
current_bp_name, other_bp_name
)
reverse_col_name = "Euclidean_distance_{}_{}".format(
other_bp_name, current_bp_name
)
if not reverse_col_name in self.data_df.columns:
self.data_df[col_name] = (
np.sqrt(
(
self.data_df[current_animal_bp_x]
- self.data_df[other_animal_bp_x]
)
** 2
+ (
self.data_df[current_animal_bp_y]
- self.data_df[other_animal_bp_y]
)
** 2
)
) / self.px_per_mm
self.distance_col_names.append(col_name)
def __movement_of_all_bps(self):
print("Calculating movements of all body-parts...")
self.mean_movement_cols, self.sum_movement_cols = [], []
for animal_name, animal_data in self.animal_bp_dict.items():
animal_cols = []
current_animal_bp_xs, current_animal_bp_ys = (
animal_data["X_bps"],
animal_data["Y_bps"],
)
for bp_x, bp_y in zip(current_animal_bp_xs, current_animal_bp_ys):
shifted_bp_x, shifted_bp_y = bp_x + "_shifted", bp_y + "_shifted"
col_name = "Movement_" + bp_x.strip("_x")
self.data_df[col_name] = (
np.sqrt(
(self.data_df_comb[bp_x] - self.data_df_comb[shifted_bp_x]) ** 2
+ (self.data_df_comb[bp_y] - self.data_df_comb[shifted_bp_y])
** 2
)
) / self.px_per_mm
animal_cols.append(col_name)
self.data_df["All_bp_movements_" + animal_name + "_sum"] = self.data_df[
animal_cols
].sum(axis=1)
self.data_df["All_bp_movements_" + animal_name + "_mean"] = self.data_df[
animal_cols
].mean(axis=1)
self.data_df["All_bp_movements_" + animal_name + "_min"] = self.data_df[
animal_cols
].min(axis=1)
self.data_df["All_bp_movements_" + animal_name + "_max"] = self.data_df[
animal_cols
].max(axis=1)
self.mean_movement_cols.append("All_bp_movements_" + animal_name + "_mean")
self.sum_movement_cols.append("All_bp_movements_" + animal_name + "_sum")
def __rolling_windows_bp_distances(self):
print("Calculating rolling windows data: distances between body-parts...")
for i in product(self.roll_windows_values, self.distance_col_names):
self.data_df["Mean_{}_{}".format(i[1], i[0])] = (
self.data_df[i[1]].rolling(int(i[0]), min_periods=1).mean()
)
self.data_df["Sum_{}_{}".format(i[1], i[0])] = (
self.data_df[i[1]].rolling(int(i[0]), min_periods=1).sum()
)
def __rolling_windows_movement(self):
print("Calculating rolling windows data: animal movements...")
for i in product(self.roll_windows_values, self.mean_movement_cols):
self.data_df["Mean_{}_{}".format(i[1], i[0])] = (
self.data_df[i[1]].rolling(int(i[0]), min_periods=1).mean()
)
self.data_df["Sum_{}_{}".format(i[1], i[0])] = (
self.data_df[i[1]].rolling(int(i[0]), min_periods=1).sum()
)
def __pose_probability_filters(self):
p_df = self.data_df.filter(self.p_cols, axis=1)
self.data_df["Sum_probabilities"] = p_df.sum(axis=1)
self.data_df["Mean_probabilities"] = p_df.mean(axis=1)
results = pd.DataFrame(
self.count_values_in_range(
data=self.data_df.filter(self.p_cols).values,
ranges=np.array([[0.0, 0.1], [0.0, 0.5], [0.0, 0.75]]),
),
columns=[
"Low_prob_detections_0.1",
"Low_prob_detections_0.5",
"Low_prob_detections_0.75",
],
)
self.data_df = pd.concat([self.data_df, results], axis=1)
[docs] def run(self):
for file_cnt, file_path in enumerate(self.files_found):
video_timer = SimbaTimer(start=True)
_, file_name, _ = get_fn_ext(file_path)
print(f"Extracting features for video {file_name} ({file_cnt + 1}/{len(self.files_found)})...")
check_str("file name", file_name)
video_settings, self.px_per_mm, fps = self.read_video_info(video_name=file_name)
roll_windows = []
for i in range(len(self.roll_windows_values)):
roll_windows.append(int(fps / self.roll_windows_values[i]))
self.data_df = read_df(file_path, self.file_type)
if len(self.data_df.columns) != len(self.col_headers):
raise MissingColumnsError(msg=f'There is a mismatch in the number headers in file {file_name} ({len(self.data_df.columns)}) and the number of columns expected in the SImBA project ({len(self.col_headers)}) as determined by the SimBA file at {self.body_parts_path}')
self.data_df.columns = self.col_headers
self.data_df = self.data_df.fillna(0).apply(pd.to_numeric)
self.data_df_shifted = self.data_df.shift(periods=1)
self.data_df_shifted.columns = self.col_headers_shifted
self.data_df_comb = (pd.concat([self.data_df, self.data_df_shifted], axis=1, join="inner").fillna(0).reset_index(drop=True))
self.__euclid_dist_between_bps_of_other_animals()
self.__movement_of_all_bps()
self.__rolling_windows_bp_distances()
self.__rolling_windows_movement()
self.__pose_probability_filters()
save_path = os.path.join(self.save_dir, file_name + "." + self.file_type)
self.data_df = self.data_df.reset_index(drop=True).fillna(0)
write_df(df=self.data_df, file_type=self.file_type, save_path=save_path)
video_timer.stop_timer()
print(f"Feature extraction complete for video {file_name} (elapsed time: {video_timer.elapsed_time_str}s)")
self.timer.stop_timer()
stdout_success(f"Feature extraction complete for {str(len(self.files_found))} video(s). Results are saved inside the {self.features_dir} directory", elapsed_time=self.timer.elapsed_time_str,)
# test = UserDefinedFeatureExtractor(config_path=r"C:\troubleshooting\two_black_animals_14bp\project_folder\project_config.ini")
# test.run()