import functools
import os
from typing import List, Optional, Tuple, Union
import cv2
import numpy as np
from simba.mixins.config_reader import ConfigReader
from simba.mixins.plotting_mixin import PlottingMixin
from simba.utils.checks import (
check_all_file_names_are_represented_in_video_log,
check_file_exist_and_readable, check_float, check_if_dir_exists,
check_if_valid_rgb_tuple, check_int, check_str, check_valid_boolean,
check_valid_lst, check_valid_tuple)
from simba.utils.data import (create_color_palette,
egocentrically_align_pose_numba, get_cpu_pool,
terminate_cpu_pool)
from simba.utils.errors import InvalidInputError, NoFilesFoundError
from simba.utils.printing import SimbaTimer, stdout_information, stdout_success
from simba.utils.read_write import (concatenate_videos_in_folder,
create_directory,
find_files_of_filetypes_in_directory,
get_fn_ext, read_df, read_video_info_csv)
def _pose_video_worker(frm_range: tuple,
data_arr: np.ndarray,
save_dir: str,
resolution: Tuple[int, int],
fps: float,
bg_bgr: Tuple[int, int, int],
colors: list,
draw_bp_idxs: list,
skeleton_idxs: list,
circle_size: int,
line_thickness: int,
p_arr: np.ndarray,
bp_threshold: float = 0.0,
verbose: bool = False):
batch_id, frame_rng = frm_range[0], frm_range[1]
start_frm, end_frm, total_frms = frame_rng[0], frame_rng[-1], len(data_arr)
video_save_path = os.path.join(save_dir, f'{batch_id}.mp4')
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
writer = cv2.VideoWriter(video_save_path, fourcc, fps, resolution)
for frm_idx in range(start_frm, end_frm + 1):
if verbose:
stdout_information(msg=f'Batch {batch_id}, frame {frm_idx}/{total_frms}...')
frame = np.full((resolution[1], resolution[0], 3), bg_bgr, dtype=np.uint8)
pts = data_arr[frm_idx].astype(np.int32)
probs = p_arr[frm_idx]
if skeleton_idxs is not None:
for idx1, idx2 in skeleton_idxs:
if probs[idx1] >= bp_threshold and probs[idx2] >= bp_threshold:
cv2.line(frame, tuple(pts[idx1]), tuple(pts[idx2]), (105, 105, 105), line_thickness)
for bp_idx in draw_bp_idxs:
if probs[bp_idx] >= bp_threshold:
cv2.circle(frame, tuple(pts[bp_idx]), circle_size, colors[bp_idx], -1)
writer.write(frame)
writer.release()
return batch_id
[docs]class SkeletonVideoCreator:
"""
Create pose-estimation videos rendered on a solid RGB background from SimBA CSV data.
Reads outlier-corrected pose CSV files (one row per frame), extracts body-part x/y columns,
and renders keypoints and optional skeleton segments on a blank canvasβno source video is
required. FPS for each output file is taken from ``video_info.csv`` for the matching video name.
Alignment modes (at most one applies; egocentric alignment takes precedence if both are set):
* **Egocentric** (``ego_anchor_1`` + ``ego_anchor_2``): rotates/translates the pose so the
segment from anchor 1 β anchor 2 matches ``ego_direction`` (see parameter).
* **Center anchor** (``anchor_bp`` only, no egocentric anchors): each frame, shifts all
keypoints so ``anchor_bp`` sits at the image center; no rotation.
Input CSVs must list body parts as ``<bp>_x`` / ``<bp>_y`` columns. Optional ``<bp>_p``
probability columns gate drawing; if any are missing, probabilities default to 1.0 for all
body-parts. Skeleton edges are drawn in a fixed gray; keypoint disks use ``palette``.
.. video:: _static/img/SkeletonVideoCreator.mp4
:width: 800
:autoplay:
:loop:
:muted:
:align: center
.. video:: _static/img/SkeletonVideoCreator_2.mp4
:width: 800
:autoplay:
:loop:
:muted:
:align: center
.. seealso::
:class:`~simba.plotting.pose_plotter_mp.PosePlotterMultiProcess` β overlay pose on the original recording instead of a blank background.
:func:`~simba.video_processors.video_processing.superimpose_overlay_video` β inset one video on another (for example, a skeleton clip over the raw recording).
:param Optional[Union[str, os.PathLike]] config_path: Path to SimBA project ``project_config.ini``.
If set, ``data_path``, ``save_dir``, and ``video_info_path`` default from the project unless
overridden. Required unless all three of those are provided explicitly.
:param Optional[Union[str, os.PathLike]] data_path: Path to one pose CSV or a directory of ``.csv``
files. If ``None`` and ``config_path`` is set, uses the project's outlier-corrected movement directory.
:param Optional[Union[str, os.PathLike]] save_dir: Directory for output ``<video_name>.mp4`` files.
If ``None`` and ``config_path`` is set, uses ``<project>/frames/output/pose_videos`` (created if needed).
:param Optional[Union[str, os.PathLike]] video_info_path: Path to ``logs/video_info.csv`` (fps and video names).
If ``None`` and ``config_path`` is set, uses the project's video info path.
:param Tuple[int, int] resolution: Output size ``(width, height)`` in pixels. Default ``(500, 500)``.
:param Tuple[int, int, int] bg_color: Background color as **RGB** ``(R, G, B)``, each 0β255.
Default ``(0, 0, 0)`` (black).
:param Optional[str] anchor_bp: Body-part name whose location is pinned to the frame center each
frame (case-insensitive match to CSV names). Ignored if egocentric anchors are set. Default None.
:param Optional[List[Tuple[str, str]]] skeleton: Pairs of body-part names ``(from, to)`` for line
segments. Omitted or skipped pairs involving ``omit_bps``. If None, only keypoints are drawn.
:param Optional[int] circle_size: Keypoint circle radius in pixels. If None, scaled from ``resolution``.
:param Optional[int] line_thickness: Skeleton line thickness in pixels. If None, scaled from ``resolution``.
:param Optional[str] ego_anchor_1: First anchor body-part for egocentric alignment (e.g. ``tail_base``).
Must be given together with ``ego_anchor_2``.
:param Optional[str] ego_anchor_2: Second anchor; together with ``ego_anchor_1`` defines the forward
axis before rotation.
:param int ego_direction: Desired compass heading in degrees for the vector from ``ego_anchor_1`` to
``ego_anchor_2`` after alignment: 0 = north/up, 90 = east/right, 180 = south/down, 270 = west/left.
Default 0.
:param Optional[List[str]] omit_bps: Body-part names to exclude from dots and skeleton (lowercased internally).
:param str palette: Matplotlib qualitative palette name for per-body-part keypoint colors. Default ``Set1``.
:param float bp_threshold: Minimum per-frame probability to draw a keypoint or use it in a skeleton edge.
Default ``0.0``.
:param int core_cnt: Worker processes for frame batches; ``-1`` uses all CPUs. Default ``-1``.
:param bool verbose: Print batch and file progress. Default True.
:raises InvalidInputError: If neither ``config_path`` nor the triple
(``data_path``, ``save_dir``, ``video_info_path``) is satisfactorily provided; or if only one
of ``ego_anchor_1`` / ``ego_anchor_2`` is set.
:raises NoFilesFoundError: If ``data_path`` is not a valid file or directory.
:example:
>>> creator = SkeletonVideoCreator(
... config_path=r'E:/project/project_config.ini',
... resolution=(500, 500),
... bg_color=(0, 0, 0),
... anchor_bp='tail_base',
... skeleton=[('nose', 'left_ear'), ('nose', 'right_ear'), ('left_ear', 'center'), ('right_ear', 'center'), ('center', 'left_side'), ('center', 'right_side'), ('center', 'tail_base'), ('tail_base', 'tail_mid'), ('tail_mid', 'tail_end')],
... ego_anchor_1='tail_base',
... ego_anchor_2='nose',
... )
>>> creator.run()
"""
def __init__(self,
config_path: Optional[Union[str, os.PathLike]] = None,
data_path: Optional[Union[str, os.PathLike]] = None,
save_dir: Optional[Union[str, os.PathLike]] = None,
video_info_path: Optional[Union[str, os.PathLike]] = None,
resolution: Tuple[int, int] = (500, 500),
bg_color: Tuple[int, int, int] = (0, 0, 0),
anchor_bp: Optional[str] = None,
skeleton: Optional[List[Tuple[str, str]]] = None,
circle_size: Optional[int] = None,
line_thickness: Optional[int] = None,
ego_anchor_1: Optional[str] = None,
ego_anchor_2: Optional[str] = None,
ego_direction: int = 0,
omit_bps: Optional[List[str]] = None,
palette: str = 'Set1',
bp_threshold: float = 0.0,
core_cnt: int = -1,
verbose: bool = True):
if config_path is None and (data_path is None or save_dir is None or video_info_path is None):
raise InvalidInputError(msg='Either config_path or all of data_path, save_dir, and video_info_path must be provided.', source=self.__class__.__name__)
if config_path is not None:
check_file_exist_and_readable(file_path=config_path)
config = ConfigReader(config_path=config_path, read_video_info=False)
if data_path is None:
data_path = config.outlier_corrected_movement_dir
if save_dir is None:
save_dir = os.path.join(config.frames_output_dir, 'pose_videos')
if not os.path.isdir(save_dir):
create_directory(paths=[save_dir])
if video_info_path is None:
video_info_path = config.video_info_path
check_if_dir_exists(in_dir=save_dir, source=f'{self.__class__.__name__} save_dir')
check_file_exist_and_readable(file_path=video_info_path)
check_valid_tuple(x=resolution, source=f'{self.__class__.__name__} resolution', accepted_lengths=(2,), valid_dtypes=(int,))
check_int(name=f'{self.__class__.__name__} resolution width', value=resolution[0], min_value=10)
check_int(name=f'{self.__class__.__name__} resolution height', value=resolution[1], min_value=10)
check_if_valid_rgb_tuple(data=bg_color)
check_int(name=f'{self.__class__.__name__} ego_direction', value=ego_direction, min_value=0, max_value=360)
check_int(name=f'{self.__class__.__name__} core_cnt', value=core_cnt, min_value=-1, unaccepted_vals=[0])
check_valid_boolean(value=verbose, source=f'{self.__class__.__name__} verbose')
if circle_size is not None:
check_int(name=f'{self.__class__.__name__} circle_size', value=circle_size, min_value=1)
if line_thickness is not None:
check_int(name=f'{self.__class__.__name__} line_thickness', value=line_thickness, min_value=1)
if anchor_bp is not None:
check_str(name=f'{self.__class__.__name__} anchor_bp', value=anchor_bp)
if ego_anchor_1 is not None or ego_anchor_2 is not None:
if ego_anchor_1 is None or ego_anchor_2 is None:
raise InvalidInputError(msg='Both ego_anchor_1 and ego_anchor_2 must be provided for egocentric alignment.', source=self.__class__.__name__)
check_str(name=f'{self.__class__.__name__} ego_anchor_1', value=ego_anchor_1)
check_str(name=f'{self.__class__.__name__} ego_anchor_2', value=ego_anchor_2)
if skeleton is not None:
check_valid_lst(data=skeleton, source=f'{self.__class__.__name__} skeleton', valid_dtypes=(tuple,), min_len=1)
if omit_bps is not None:
check_valid_lst(data=omit_bps, source=f'{self.__class__.__name__} omit_bps', valid_dtypes=(str,), min_len=1)
check_str(name=f'{self.__class__.__name__} palette', value=palette)
check_float(name=f'{self.__class__.__name__} bp_threshold', value=bp_threshold, min_value=0.0, max_value=1.0)
if circle_size is None:
circle_size = max(1, PlottingMixin().get_optimal_circle_size(frame_size=resolution, circle_frame_ratio=25))
if line_thickness is None:
line_thickness = max(1, PlottingMixin().get_optimal_circle_size(frame_size=resolution, circle_frame_ratio=100))
self.save_dir, self.resolution, self.core_cnt = save_dir, resolution, core_cnt
self.bg_color, self.anchor_bp, self.skeleton = bg_color, anchor_bp, skeleton
self.circle_size, self.line_thickness, self.verbose = circle_size, line_thickness, verbose
self.ego_anchor_1, self.ego_anchor_2, self.ego_direction = ego_anchor_1, ego_anchor_2, ego_direction
self.omit_bps = [x.lower() for x in omit_bps] if omit_bps is not None else None
self.palette, self.bp_threshold = palette, bp_threshold
self.video_info_df = read_video_info_csv(file_path=video_info_path)
if os.path.isfile(data_path):
check_file_exist_and_readable(file_path=data_path)
self.data_paths = [data_path]
elif os.path.isdir(data_path):
check_if_dir_exists(in_dir=data_path, source=f'{self.__class__.__name__} data_path')
self.data_paths = find_files_of_filetypes_in_directory(directory=data_path, extensions=['.csv'], raise_error=True)
else:
raise NoFilesFoundError(msg=f'{data_path} is not a valid file or directory path.', source=self.__class__.__name__)
check_all_file_names_are_represented_in_video_log(video_info_df=self.video_info_df, data_paths=self.data_paths)
[docs] def run(self):
"""
Render skeleton videos for every CSV in ``data_paths``.
For each file, splits frames across a process pool, writes batch MP4s under
``<save_dir>/<video_name>/temp/``, concatenates them to ``<save_dir>/<video_name>.mp4``,
then prints a completion summary. Does not return a value.
"""
timer = SimbaTimer(start=True)
pool = get_cpu_pool(core_cnt=self.core_cnt, verbose=self.verbose, source=self.__class__.__name__)
core_cnt = pool._processes
for file_cnt, file_path in enumerate(self.data_paths):
_, video_name, _ = get_fn_ext(filepath=file_path)
fps = float(self.video_info_df.loc[self.video_info_df['Video'] == video_name, 'fps'].values[0])
df = read_df(file_path=file_path, file_type='csv')
df.columns = [x.lower() for x in df.columns]
bp_cols = [x for x in df.columns if not x.endswith('_p')]
body_parts = []
for c in bp_cols:
bp_name = c[:-2]
if bp_name not in body_parts:
body_parts.append(bp_name)
if self.verbose:
stdout_information(msg=f'Processing {file_cnt + 1}/{len(self.data_paths)} ({video_name}): {len(df)} frames, {len(body_parts)} body-parts, {fps} fps...')
data_arr = df[bp_cols].values.reshape(len(df), len(body_parts), 2).astype(np.float64)
p_cols = [f'{bp}_p' for bp in body_parts]
p_arr = df[p_cols].values.astype(np.float64) if all(c in df.columns for c in p_cols) else np.ones((len(df), len(body_parts)), dtype=np.float64)
if self.ego_anchor_1 is not None:
anchor_1_idx = body_parts.index(self.ego_anchor_1.lower())
anchor_2_idx = body_parts.index(self.ego_anchor_2.lower())
center = np.array([self.resolution[0] // 2, self.resolution[1] // 2], dtype=np.int64)
numba_direction = (270 + self.ego_direction) % 360
data_arr, _, _ = egocentrically_align_pose_numba(data=data_arr.astype(np.int32), anchor_1_idx=anchor_1_idx, anchor_2_idx=anchor_2_idx, direction=numba_direction, anchor_location=center)
data_arr = data_arr.astype(np.float64)
elif self.anchor_bp is not None:
anchor_idx = body_parts.index(self.anchor_bp.lower())
center_x, center_y = self.resolution[0] // 2, self.resolution[1] // 2
for frm_idx in range(len(data_arr)):
anchor_x, anchor_y = data_arr[frm_idx, anchor_idx, 0], data_arr[frm_idx, anchor_idx, 1]
data_arr[frm_idx, :, 0] += center_x - anchor_x
data_arr[frm_idx, :, 1] += center_y - anchor_y
draw_bp_idxs = list(range(len(body_parts)))
if self.omit_bps is not None:
draw_bp_idxs = [i for i, bp in enumerate(body_parts) if bp not in self.omit_bps]
colors = create_color_palette(pallete_name=self.palette, increments=len(body_parts), as_int=True)
skeleton_idxs = None
if self.skeleton is not None:
skeleton_idxs = []
for bp1, bp2 in self.skeleton:
bp1_l, bp2_l = bp1.lower(), bp2.lower()
if self.omit_bps is not None and (bp1_l in self.omit_bps or bp2_l in self.omit_bps):
continue
idx1, idx2 = body_parts.index(bp1_l), body_parts.index(bp2_l)
skeleton_idxs.append((idx1, idx2))
bg_bgr = (self.bg_color[2], self.bg_color[1], self.bg_color[0])
video_temp_dir = os.path.join(self.save_dir, video_name, 'temp')
save_path = os.path.join(self.save_dir, f'{video_name}.mp4')
create_directory(paths=video_temp_dir)
frm_batches = np.array_split(list(range(len(data_arr))), core_cnt)
frm_batches = [(i, j) for i, j in enumerate(frm_batches)]
constants = functools.partial(_pose_video_worker,
data_arr=data_arr,
save_dir=video_temp_dir,
resolution=self.resolution,
fps=fps,
bg_bgr=bg_bgr,
colors=colors,
draw_bp_idxs=draw_bp_idxs,
skeleton_idxs=skeleton_idxs,
circle_size=self.circle_size,
line_thickness=self.line_thickness,
p_arr=p_arr,
bp_threshold=self.bp_threshold,
verbose=self.verbose)
for cnt, result in enumerate(pool.imap(constants, frm_batches, chunksize=1)):
if self.verbose:
stdout_information(msg=f'{video_name}: batch {result + 1}/{core_cnt} complete...')
concatenate_videos_in_folder(in_folder=video_temp_dir, save_path=save_path, gpu=True)
if self.verbose:
stdout_information(msg=f'{video_name} saved at {save_path}')
terminate_cpu_pool(pool=pool, source=self.__class__.__name__)
timer.stop_timer()
stdout_success(msg=f'Pose videos for {len(self.data_paths)} files saved in {self.save_dir}', source=self.__class__.__name__, elapsed_time=timer.elapsed_time_str)
if __name__ == "__main__":
creator = SkeletonVideoCreator(
config_path=r'E:\troubleshooting\mitra_emergence_hour\project_folder\project_config.ini',
data_path=r"E:\troubleshooting\mitra_emergence_hour\project_folder\csv\outlier_corrected_movement_location\Box3_180mISOcontrol_Females.csv",
resolution=(375, 375),
bg_color=(255, 255, 255),
bp_threshold=0.1,
anchor_bp='center',
skeleton=[('nose', 'left_ear'), ('nose', 'right_ear'), ('left_ear', 'right_ear'), ('left_ear', 'center'), ('right_ear', 'center'), ('center', 'left_side'), ('left_ear', 'left_side'), ('center', 'right_side'), ('right_ear', 'right_side'), ('left_side', 'tail_base'), ('right_side', 'tail_base'), ('center', 'tail_base'), ('tail_base', 'tail_end')],
ego_anchor_1='center',
ego_anchor_2='nose',
core_cnt=8,
omit_bps=['tail_center',])
creator.run()