Source code for simba.third_party_label_appenders.transform.sleap_to_yolo

import json
import os
import random
from typing import Optional, Union

import cv2
import h5py
import numpy as np

from simba.third_party_label_appenders.transform.utils import \
    create_yolo_keypoint_yaml
from simba.utils.checks import (check_file_exist_and_readable, check_float,
                                check_if_dir_exists,
                                check_if_keys_exist_in_dict, check_str,
                                check_valid_boolean)
from simba.utils.errors import InvalidInputError, NoFilesFoundError
from simba.utils.printing import SimbaTimer, stdout_success
from simba.utils.read_write import (create_directory,
                                    find_files_of_filetypes_in_directory,
                                    find_video_of_file, get_fn_ext,
                                    read_frm_of_video, read_img)
from simba.utils.yolo import keypoint_array_to_yolo_annotation_str


[docs]class SleapAnnotations2Yolo(): """ Convert SLEAP annotations to YOLO formatted training data. :param Union[str, os.PathLike] data_dir: Directory containing SLEAP annotations `.slp` files :param Union[str, os.PathLike] save_dir: Directory to save YOLO-formatted images, labels, and metadata. :param bool verbose: If True, print progress during processing. :param float train_size: Proportion of frames to assign to the training set (rest go to validation). :param bool greyscale: If True, convert images to grayscale. :param bool clahe: If True, apply CLAHE (Contrast Limited Adaptive Histogram Equalization). :param float padding: Relative padding to apply around the bounding box of keypoints (range 0.0 to 1.0). :param Optional[str] single_id: Optional custom ID to assign all annotations the same class (used in single-animal datasets). :example: >>> runner = SleapAnnotations2Yolo(sleap_dir=r'D:/cvat_annotations/frames/slp_to_yolo', save_dir=r'D:/cvat_annotations/frames/slp_to_yolo/yolo') >>> runner.run() """ def __init__(self, sleap_dir: Union[str, os.PathLike], save_dir: Union[str, os.PathLike], video_dir: Optional[Union[str, os.PathLike]] = None, padding: Optional[float] = None, train_size: float = 0.8, verbose: bool = True, greyscale: bool = False, clahe: bool = False, single_id: Optional[str] = None): check_if_dir_exists(in_dir=os.path.dirname(save_dir), source=f'{self.__class__.__name__} save_path') check_if_dir_exists(in_dir=os.path.dirname(sleap_dir), source=f'{self.__class__.__name__} sleap_dir') check_float(name=f'{self.__class__.__name__} train_size', value=train_size, max_value=0.99, min_value=0.1) check_valid_boolean(value=verbose, source=f'{self.__class__.__name__} verbose', raise_error=True) check_valid_boolean(value=greyscale, source=f'{self.__class__.__name__} greyscale', raise_error=True) check_valid_boolean(value=clahe, source=f'{self.__class__.__name__} clahe', raise_error=True) if single_id is not None: check_str(name=f'{self.__class__.__name__} single_id', value=single_id, raise_error=True) if video_dir is not None: check_if_dir_exists(in_dir=video_dir) self.slp_paths = find_files_of_filetypes_in_directory(directory=sleap_dir, extensions=['.slp'], as_dict=True, raise_error=True) self.padding, self.train_size, self.verbose, self.save_dir = padding, train_size, verbose, save_dir self.clahe, self.greyscale = clahe, greyscale self.img_dir, self.lbl_dir = os.path.join(save_dir, 'images'), os.path.join(save_dir, 'labels') self.img_train_dir, self.img_val_dir = os.path.join(self.img_dir, 'train'), os.path.join(self.img_dir, 'val') self.lbl_train_dir, self.lb_val_dir = os.path.join(self.lbl_dir, 'train'), os.path.join(self.lbl_dir, 'val') create_directory(paths=[self.img_train_dir, self.img_val_dir, self.lbl_train_dir, self.lb_val_dir], overwrite=False) self.map_path = os.path.join(save_dir, 'map.yaml') self.single_id, self.video_dir = single_id, video_dir def __h5_to_dict(self, name_or_group): if isinstance(name_or_group, h5py.Dataset): return name_or_group[()] elif isinstance(name_or_group, h5py.Group): return {k: self.__h5_to_dict(name_or_group[k]) for k in name_or_group} else: return None def run(self): timer = SimbaTimer(start=True) lbl_cnt, sleap_data = 0, {} for file_cnt, (file_name, file_path) in enumerate(self.slp_paths.items()): with h5py.File(file_path, "r") as f: data = self.__h5_to_dict(f) check_if_keys_exist_in_dict(data=data, key=['frames', 'instances', 'points', 'videos_json']) img_names = json.loads(data["videos_json"].item().decode("utf-8")) if not 'backend' in img_names.keys() and not 'filename' in img_names.keys(): raise InvalidInputError(msg='Could not find backend or filename keys in videos_json', source=self.__class__.__name__) #check_if_keys_exist_in_dict(data=img_names, key=['filename']) sleap_data[file_name] = data for file_cnt, (file_name, data) in enumerate(sleap_data.items()): lbl_cnt += len(data["frames"]) train_idx = random.sample(list(range(0, lbl_cnt)), int(lbl_cnt * self.train_size)) img_cnt, kpt_shapes, names = 0, set(), {} for file_cnt, (file_name, file_path) in enumerate(self.slp_paths.items()): with h5py.File(file_path, "r") as f: data = self.__h5_to_dict(f) frames = data["frames"] instances = data["instances"] points = data["points"] img_names, video_path = json.loads(data["videos_json"].item().decode("utf-8")), None if 'backend' in img_names.keys(): video_path = img_names['backend']['filename'] if not os.path.isfile(video_path): if self.video_dir is None: raise NoFilesFoundError(msg=f'Could not locate file {video_path}, pass a video directory', source=self.__class__.__name__) else: video_path = find_video_of_file(video_dir=self.video_dir, filename=get_fn_ext(filepath=video_path)[1], raise_error=True, recursive=True) for i, frame_data in enumerate(frames): if self.verbose: print(f'Converting SLEAP annotation to YOLO {img_cnt+1}/{lbl_cnt}...') frame_idx = frame_data[0] if video_path is None: frm_path = img_names['filename'][frame_idx] file_name = get_fn_ext(filepath=frm_path)[1] check_file_exist_and_readable(file_path=frm_path, raise_error=True) img = read_img(img_path=frm_path, greyscale=self.greyscale, clahe=self.clahe) else: img = read_frm_of_video(video_path=video_path, frame_index=frame_idx, greyscale=self.greyscale, clahe=self.clahe) img_w, img_h = img.shape[1], img.shape[0] frm_instances = [inst for inst in instances if int(inst[2]) == frame_idx] if img_cnt in train_idx: img_save_path, lbl_save_path = os.path.join(self.img_train_dir, f'{file_name}.png'), os.path.join(self.lbl_train_dir, f'{file_name}.txt') else: img_save_path, lbl_save_path = os.path.join(self.img_val_dir, f'{file_name}.png'), os.path.join(self.lb_val_dir, f'{file_name}.txt') img_lbl = '' for instance in frm_instances: if instance[1] not in names.keys(): names[int(instance[1])] = f'animal_{int(instance[1])}' instance_str = f'{instance[1]} ' if self.single_id is None else '0 ' pt_start = int(instance[7]) pt_end = int(instance[8]) pts = points[pt_start:pt_end] keypoints = np.array([[p[0], p[1]] if p[2] else [0.0, 0.0] for p in pts]).astype(np.int32) if keypoints.shape[0] == 0 or np.all(keypoints == 0): continue keypoints = np.nan_to_num(keypoints, nan=0.0) kpt_shapes.add(keypoints.shape[0]) visability_col = np.full((keypoints.shape[0], 1), fill_value=2).flatten() keypoints = np.insert(keypoints, 2, visability_col, axis=1) both_zero = (keypoints[:, 0] == 0) & (keypoints[:, 1] == 0) has_nan_or_inf = ~np.isfinite(keypoints[:, 0]) | ~np.isfinite(keypoints[:, 1]) mask = both_zero | has_nan_or_inf keypoints[mask, 2] = 0 keypoints[~np.isfinite(keypoints)] = 0 instance_str += keypoint_array_to_yolo_annotation_str(x=keypoints, img_w=img_w, img_h=img_h, padding=self.padding) img_lbl += instance_str with open(lbl_save_path, mode='wt', encoding='utf-8') as f: f.write(img_lbl) cv2.imwrite(img_save_path, img) img_cnt+=1 if len(list(kpt_shapes)) > 1: raise InvalidInputError(msg=f'Found more than 1 keypoint shapes: {kpt_shapes}', source=self.__class__.__name__) names = names if self.single_id is None else {0: self.single_id} create_yolo_keypoint_yaml(path=self.save_dir, train_path=self.img_train_dir, val_path=self.img_val_dir, names=names, save_path=self.map_path, kpt_shape=(list(kpt_shapes)[0], 3), flip_idx=tuple(list(range(0, list(kpt_shapes)[0])))) timer.stop_timer() if self.verbose: stdout_success(msg=f'YOLO annotations saved at {self.save_dir}...', source=self.__class__.__name__, elapsed_time=timer.elapsed_time_str)
# runner = SleapAnnotations2Yolo(sleap_dir=r'D:\slp_predictions', save_dir=r'D:\slp_predictions\yolo', single_id='mouse') # runner.run() # # runner = SleapAnnotations2Yolo( # sleap_dir=r'F:\todd_sleap', # save_dir=r'F:\todd_sleap\yolo_dataset', # video_dir=r'F:\todd_sleap', # if paths in SLP don't match # single_id='mouse', # one class; or None for multi-animal # train_size=0.8, # verbose=True, # ) # runner.run() #