__author__ = "Simon Nilsson; sronilsson@gmail.com"
import itertools
import os
from typing import List, Optional, Union
import numpy as np
import pandas as pd
from simba.mixins.config_reader import ConfigReader
from simba.mixins.plotting_mixin import PlottingMixin
from simba.utils.checks import check_if_filepath_list_is_empty
from simba.utils.data import detect_bouts
from simba.utils.enums import TagNames
from simba.utils.errors import CountError
from simba.utils.printing import log_event, stdout_success
from simba.utils.read_write import get_fn_ext, read_df
[docs]class FSTTCCalculator(ConfigReader, PlottingMixin):
"""
Compute forward spike-time tiling coefficients between pairs of
classified behaviors.
:param str config_path: path to SimBA project config file in Configparser format.
:param: Optional[bool] join_bouts_within_delta: If several bouts onsets (of the same classifier) occurs within a single time-delta, then join the bouts into a single bout.
:param: Optional[bool] time_delta_at_onset: If True, time delta is initatiated at bout onset. If False, then initated at bout offset and includes bout duration. Default: False.
:param int time_window: FSTTC hyperparameter; Integer representing the time window in seconds.
:param List[str] behavior_lst: Behaviors to calculate FSTTC between. FSTTC will be computed for all combinations of behaviors.
:param bool create_graphs: If True, created violin plots (as below) representing each FSTTC. Default: False.
.. image:: _static/img/fsttc_violin.png
:alt: Fsttc violin
:width: 500
:align: center
.. note::
`Tutorial <https://github.com/sgoldenlab/simba/blob/master/docs/FSTTC.md>`__.
Examples
-----
>>> fsttc_calculator = FSTTCCalculator(config_path='MyConfigPath', time_window=2, behavior_lst=['Attack', 'Sniffing'], create_graphs=True)
>>> fsttc_calculator.run()
References
----------
.. [1] Lee, W., et al. (2019). Temporal microstructure of dyadic social behavior during relationship formation in mice.
`PLOS One, 14(12), e0220596 <https://doi.org/10.1371/journal.pone.0220596>`_.
.. [2] Cutts, C. S., & Eglen, S. J. (2014). Detecting pairwise correlations in spike trains: an objective comparison of methods and application to the study of retinal waves.
`Journal of Neuroscience, 34(43), 14288–14303 <https://doi.org/10.1523/JNEUROSCI.2767-14.2014>`_.
"""
def __init__(
self,
config_path: Union[str, os.PathLike],
time_window: int,
behavior_lst: List[str],
time_delta_at_onset: Optional[bool] = False,
join_bouts_within_delta: Optional[bool] = False,
create_graphs: Optional[bool] = False,
):
ConfigReader.__init__(self, config_path=config_path)
PlottingMixin.__init__(self)
log_event(
logger_name=str(self.__class__.__name__),
log_type=TagNames.CLASS_INIT.value,
msg=self.create_log_msg_from_init_args(locals=locals()),
)
self.time_delta = int(time_window)
self.behavior_lst = behavior_lst
if len(self.behavior_lst) < 2:
raise CountError(
msg="FSTCC requires at least two behaviors",
source=self.__class__.__name__,
)
self.graph_status, self.join_bouts_within_delta, self.time_delta_at_onset = (
create_graphs,
join_bouts_within_delta,
time_delta_at_onset,
)
check_if_filepath_list_is_empty(
filepaths=self.machine_results_paths,
error_msg=f"Cannot calculate FSTTC, no data found in {self.machine_results_paths} directory",
)
self.clf_permutations = list(itertools.permutations(self.behavior_lst, 2))
print(f"Processing FSTTC for {str(len(self.machine_results_paths))} file(s)...")
def __join_bouts(self):
results = []
if self.time_delta_at_onset:
self.bouts_df["DELTA_END"] = pd.to_datetime(
self.bouts_df["Start_frame"] + self.frames_in_window
)
else:
self.bouts_df["DELTA_END"] = pd.to_datetime(
self.bouts_df["End_frame"] + self.frames_in_window
)
self.bouts_df["Start_frame"] = pd.to_datetime(self.bouts_df["Start_frame"])
for clf in self.bouts_df["Event"].unique():
clf_df = self.bouts_df[self.bouts_df["Event"] == clf].reset_index(drop=True)
grouped_clf = (
(
clf_df.sort_values(by=["Start_frame", "End_frame", "DELTA_END"])
.assign(
max_End=lambda d: d["DELTA_END"].cummax(),
group=lambda d: d["Start_frame"]
.ge(d["max_End"].shift())
.cumsum(),
)
.groupby("group")
.agg({"Start_frame": "min", "DELTA_END": "max"})
.assign(Duration=lambda g: g["DELTA_END"] - g["Start_frame"])
)
.drop("Duration", axis=1)
.reset_index(drop=True)
)
grouped_clf["Start_frame"] = grouped_clf["Start_frame"].astype(np.int64)
grouped_clf["End_frame"] = grouped_clf["DELTA_END"].astype(np.int64)
grouped_clf["Event"] = clf
results.append(grouped_clf)
return (
pd.concat(results, axis=0)
.sort_values(by=["Event", "Start_frame"])
.drop("DELTA_END", axis=1)
)
[docs] def find_sequences(self):
"""
Method to create list of dataframes holding information on the sequences of behaviors including
inter-temporal distances.
Returns
-------
Attribute: list
vide_df_sequence_lst
"""
self.video_sequences = {}
out_columns = [
"Video",
"First behaviour",
"First behaviour start frame",
"First behavior end frame",
"Second behaviour",
"Second behaviour start frame",
"Difference: first behavior start to second behavior start",
"Time 2nd behaviour start to time window end",
]
for file_cnt, file_path in enumerate(self.machine_results_paths):
_, self.video_name, _ = get_fn_ext(file_path)
self.video_sequences[self.video_name] = {}
print(
f"Analyzing behavioral sequences: {self.video_name} (Video {file_cnt + 1}/{len(self.machine_results_paths)})..."
)
_, _, self.fps = self.read_video_info(video_name=self.video_name)
self.video_sequences[self.video_name]["fps"] = self.fps
self.frames_in_window = int((self.fps / 1000) * self.time_delta)
self.data_df = read_df(file_path, self.file_type) # [self.behavior_lst]
self.video_sequences[self.video_name]["session_length_frames"] = len(
self.data_df
)
self.bouts_df = detect_bouts(
data_df=self.data_df, target_lst=self.behavior_lst, fps=self.fps
)
self.bouts_df["Start_frame"] = (
self.bouts_df["Start_time"] * self.fps
).astype(int) - 1
self.bouts_df = self.bouts_df[["Event", "Start_frame", "End_frame"]]
if self.join_bouts_within_delta:
self.bouts_df = self.__join_bouts()
for first_clf, second_clf in self.clf_permutations:
self.vide_df_sequence_lst = []
sequence_name = "FSTTC {} {}".format(first_clf, second_clf)
first_clf_df = (
self.bouts_df[self.bouts_df["Event"] == first_clf]
.sort_values(by=["Start_frame"])
.reset_index(drop=True)
)
second_clf_df = (
self.bouts_df[self.bouts_df["Event"] == second_clf]
.sort_values(by=["Start_frame"])
.reset_index(drop=True)
)
for index, row in first_clf_df.iterrows():
if self.time_delta_at_onset:
frame_crtrn_min, frame_crtrn_max = (
row["Start_frame"] + 1,
row["Start_frame"] + self.frames_in_window,
)
else:
if not self.join_bouts_within_delta:
frame_crtrn_min, frame_crtrn_max = (
row["Start_frame"] + 1,
row["End_frame"] + self.frames_in_window,
)
else:
frame_crtrn_min, frame_crtrn_max = (
row["Start_frame"] + 1,
row["End_frame"],
)
second_clf_df_crtrn = second_clf_df.loc[
(second_clf_df["Start_frame"] >= frame_crtrn_min)
& (second_clf_df["Start_frame"] <= frame_crtrn_max)
]
if len(second_clf_df_crtrn) > 0:
second_clf_df_crtrn = second_clf_df_crtrn.head(1)
frames_between_behaviors = (
second_clf_df_crtrn["Start_frame"] - row["Start_frame"]
).values
second_clf_df_crtrn["Frames_between_behaviors"] = (
second_clf_df_crtrn["Start_frame"] - row["Start_frame"]
)
if frames_between_behaviors <= 0:
frames_between_behaviors = 1
second_clf_df_crtrn["Frames_between_behaviors"] = (
frames_between_behaviors
)
if self.time_delta_at_onset:
second_clf_df_crtrn[
"Frames_between_second_behavior_start_to_time_window_end"
] = (
row["Start_frame"] + self.frames_in_window
) - second_clf_df_crtrn[
"Start_frame"
].values[
0
]
else:
second_clf_df_crtrn[
"Frames_between_second_behavior_start_to_time_window_end"
] = (
row["End_frame"] + self.frames_in_window
) - second_clf_df_crtrn[
"Start_frame"
].values[
0
]
self.vide_df_sequence_lst.append(
pd.DataFrame(
[
[
self.video_name,
first_clf,
row["Start_frame"],
row["End_frame"],
second_clf,
second_clf_df_crtrn["Start_frame"].values[0],
second_clf_df_crtrn[
"Frames_between_behaviors"
].values[0],
second_clf_df_crtrn[
"Frames_between_second_behavior_start_to_time_window_end"
].values[0],
]
],
columns=out_columns,
)
)
else:
self.vide_df_sequence_lst.append(
pd.DataFrame(
[
[
self.video_name,
first_clf,
int(row["Start_frame"]),
int(row["End_frame"]),
"None",
"None",
"None",
"None",
]
],
columns=out_columns,
)
)
if len(self.vide_df_sequence_lst) > 0:
video_sequences = (
pd.concat(self.vide_df_sequence_lst, axis=0)
.drop_duplicates(
subset=[
"Video",
"First behaviour",
"First behaviour start frame",
"First behavior end frame",
"Second behaviour",
],
keep="first",
)
.reset_index(drop=True)
)
if self.time_delta_at_onset:
video_sequences["Total_window_frames"] = self.frames_in_window
else:
video_sequences["Total_window_frames"] = (
video_sequences["First behavior end frame"]
- video_sequences["First behaviour start frame"]
) + self.frames_in_window
self.video_sequences[self.video_name][
sequence_name
] = video_sequences
else:
self.video_sequences[self.video_name][sequence_name] = None
[docs] def run(self):
"""
Method to calculate forward spike-time tiling coefficients (FSTTC) using the data computed in :meth:
:meth:`~simba.FSTTCPerformer.find_sequences`.
Returns
-------
Attribute: dict
results_dict
"""
self.find_sequences()
self.results_dict = {}
for video_name, video_data in self.video_sequences.items():
self.results_dict[video_name] = {}
fps, session_frames = video_data["fps"], video_data["session_length_frames"]
for first_clf, second_clf in self.clf_permutations:
if first_clf not in self.results_dict[video_name].keys():
self.results_dict[video_name][first_clf] = {}
self.results_dict[video_name][first_clf][second_clf] = {}
sequence_data = video_data[f"FSTTC {first_clf} {second_clf}"]
if sequence_data is None:
self.results_dict[video_name][first_clf][second_clf] = "No events"
else:
len_clf_1 = len(
sequence_data[sequence_data["First behaviour"] == first_clf]
)
len_clf_1_2 = len(
sequence_data[
(sequence_data["First behaviour"] == first_clf)
& (sequence_data["Second behaviour"] == second_clf)
]
)
if (len_clf_1 > 0) & (len_clf_1_2 == 0):
self.results_dict[video_name][first_clf][second_clf] = 0.0
else:
clf_1_2_df = sequence_data[
(sequence_data["First behaviour"] == first_clf)
& (sequence_data["Second behaviour"] == second_clf)
]
P = len_clf_1_2 / len_clf_1
Ta = sum(clf_1_2_df["Total_window_frames"]) / session_frames
Tb = (
sum(
clf_1_2_df[
"Time 2nd behaviour start to time window end"
]
)
/ session_frames
)
self.results_dict[video_name][first_clf][second_clf] = 0.5 * (
(P - Tb) / (1 - (P * Tb)) + ((P - Ta) / (1 - (P * Ta)))
)
# print(self.results_dict[video_name][first_clf][second_clf])
self.save()
if self.graph_status:
self.__plot_FSTTC()
def __plot_FSTTC(self) -> None:
"""
Private method to visualize forward spike-time tiling coefficients (FSTTC) as png violin plots. Results are stored on
disk within the `project_folder/logs` directory.
"""
self.out_df["BEHAVIOR COMBINATION"] = self.out_df["FIRST BEHAVIOR"].str.cat(
self.out_df["SECOND BEHAVIOR"], sep="-"
)
data_df = self.out_df[self.out_df["FSTTC"] != "No events"].reset_index(
drop=True
)
data_df["FSTTC"] = pd.to_numeric(data_df["FSTTC"])
self.violin_plot(
data=data_df,
x="BEHAVIOR COMBINATION",
y="FSTTC",
save_path=os.path.join(self.logs_path, f"FSTTC_{self.datetime}.png"),
)
[docs] def save(self):
"""
Method to save forward spike-time tiling coefficients (FSTTC) to disk within the `project_folder/logs` directory.
Returns
-------
None
"""
self.out_df = pd.DataFrame(
columns=["VIDEO", "FIRST BEHAVIOR", "SECOND BEHAVIOR", "FSTTC"]
)
for video_name, video_data in self.results_dict.items():
for first_behavior, first_behavior_data in video_data.items():
for second_behavior, fsttc in first_behavior_data.items():
self.out_df.loc[len(self.out_df)] = [
video_name,
first_behavior,
second_behavior,
fsttc,
]
self.file_save_path = os.path.join(
self.logs_path, "FSTTC_{}.csv".format(str(self.datetime))
)
self.out_df.to_csv(self.file_save_path)
self.timer.stop_timer()
stdout_success(
msg=f"FSTTC data saved at {self.file_save_path}",
elapsed_time=self.timer.elapsed_time_str,
source=self.__class__.__name__,
)
#
#
# test = FSTTCCalculator(config_path='/Users/simon/Desktop/envs/simba/troubleshooting/two_black_animals_14bp/project_folder/project_config.ini',
# time_window=10000,
# behavior_lst=['Attack', 'Sniffing'],
# create_graphs=True)
# test.run()
#
# test = FSTTCCalculator(config_path='/Users/simon/Desktop/envs/troubleshooting/two_black_animals_14bp/project_folder/project_config.ini',
# time_window=60000,
# join_bouts_within_delta=True,
# time_delta_at_onset=True,
# behavior_lst=['licking_grooming', 'active_nursing', 'nest_attendance'], #'passive_nursing', 'nest_attendance'
# create_graphs=True)
# test.run()
# test = FSTTCCalculator(config_path='/Users/simon/Desktop/envs/troubleshooting/two_black_animals_14bp/project_folder/project_config.ini',
# time_window=10000,
# behavior_lst=['Attack', 'Sniffing'],
# create_graphs=False)
# test.run()
#
# test = FSTTCCalculator(config_path='/Users/simon/Desktop/envs/troubleshooting/naresh/project_folder/project_config.ini',
# time_window=2000,
# behavior_lst=['Erratic Turning', 'Bottom', 'Normal Swimming', 'Freezing', 'Wall Bumping'],
# create_graphs=True)
# test.run()