Source code for simba.data_processors.cuda.create_shap_log

__author__ = "Simon Nilsson; sronilsson@gmail.com"

import os
from typing import List, Optional, Tuple, Union

import numpy as np
import pandas as pd
import shap
from sklearn.ensemble import RandomForestClassifier

from simba.mixins.train_model_mixin import TrainModelMixin
from simba.utils.checks import (check_if_dir_exists, check_instance, check_int,
                                check_nvidea_gpu_available, check_str,
                                check_valid_array, check_valid_dataframe,
                                check_valid_lst)
from simba.utils.enums import Formats
from simba.utils.errors import FFMPEGCodecGPUError
from simba.utils.printing import SimbaTimer, stdout_success
from simba.utils.read_write import write_df
from simba.utils.warnings import NotEnoughDataWarning


[docs]def create_shap_log(rf_clf: Union[str, os.PathLike, RandomForestClassifier], x: Union[pd.DataFrame, np.ndarray], y: Union[pd.DataFrame, pd.Series, np.ndarray], cnt_present: int, cnt_absent: int, x_names: Optional[List[str]] = None, clf_name: Optional[str] = None, save_dir: Optional[Union[str, os.PathLike]] = None, verbose: Optional[bool] = True) -> Union[None, Tuple[pd.DataFrame, pd.DataFrame, int]]: """ Computes SHAP (SHapley Additive exPlanations) values using a GPU for a RandomForestClassifier, based on specified counts of positive and negative samples, and optionally saves the results. .. image:: _static/img/create_shap_log_cuda.png :alt: Create shap log cuda :width: 500 :align: center .. note:: (i) The SHAP library has to be built from git repo rather than pip: `pip install git+https://github.com/slundberg/shap.git`. (ii) The scikit model cannot be built using max_depth > 31. You can set this in the SimBA config under [create ensemble settings][rf_max_depth], or `rf_max_depth` in the config CSV's. :param Union[str, os.PathLike, RandomForestClassifier] rf_clf: Trained RandomForestClassifier model or path to the saved model. Can be a string, os.PathLike object, or an instance of RandomForestClassifier. :param Union[pd.DataFrame, np.ndarray] x: Input features used for SHAP value computation. Can be a pandas DataFrame or numpy ndarray. :param Union[pd.DataFrame, pd.Series, np.ndarray] y: Target labels corresponding to the input features. Can be a pandas DataFrame, pandas Series, or numpy ndarray with 0 and 1 values. :param int cnt_present: Number of positive samples (label=1) to include in the SHAP value computation. :param int cnt_absent: Number of negative samples (label=0) to include in the SHAP value computation. :param Optional[List[str]] x_names: Optional list of feature names corresponding to the columns in `x`. If `x` is a DataFrame, this is extracted automatically. :param Optional[str] clf_name: Optional name for the classifier, used in naming output files. If not provided, it is extracted from the `y` labels if possible. :param Optional[Union[str, os.PathLike]] save_dir: Optional directory path where the SHAP values and corresponding raw features are saved as CSV files. :param Optional[bool] verbose: Optional boolean flag indicating whether to print progress messages. Defaults to True. :return Union[None, Tuple[pd.DataFrame, pd.DataFrame, int]]: If `save_dir` is None, returns a tuple containing: - V: DataFrame with SHAP values, expected value, sum of SHAP values, prediction probability, and target labels. - R: DataFrame containing the raw feature values for the selected samples. - expected_value: The expected value from the SHAP explainer. If `save_dir` is provided, the function returns None and saves the output to CSV files in the specified directory. :example: >>> x = np.random.random((1000, 501)).astype(np.float32) >>> y = np.random.randint(0, 2, size=(len(x), 1)).astype(np.int32) >>> clf_names = [str(x) for x in range(501)] >>> results = create_shap_log(rf_clf=MODEL_PATH, x=x, y=y, cnt_present=int(i/2), cnt_absent=int(i/2), clf_name='TEST', x_names=clf_names, verbose=False) """ timer = SimbaTimer(start=True) if verbose: print('Computing SHAP values (GPU)...') if not check_nvidea_gpu_available(): raise FFMPEGCodecGPUError(msg="No GPU found (as evaluated by nvidea-smi returning None)", source=create_shap_log.__name__) check_instance(source=f'{create_shap_log.__name__} rf_clf', instance=rf_clf, accepted_types=(str, RandomForestClassifier)) if isinstance(rf_clf, (str, os.PathLike)): rf_clf = TrainModelMixin().read_pickle(file_path=rf_clf) check_instance(source=f'{create_shap_log.__name__} x', instance=x, accepted_types=(pd.DataFrame, np.ndarray)) if isinstance(x, np.ndarray): check_valid_lst(data=x_names, source=f'{create_shap_log.__name__} x_names', valid_dtypes=(str,), exact_len=x.shape[1]) check_valid_array(data=x, source=f'{create_shap_log.__name__} x', accepted_ndims=[2, ], accepted_dtypes=Formats.NUMERIC_DTYPES.value) else: check_valid_dataframe(df=x, source=f'{create_shap_log.__name__} x', valid_dtypes=Formats.NUMERIC_DTYPES.value) x_names = list(x.columns) x = x.values check_instance(source=f'{create_shap_log.__name__} y', instance=y, accepted_types=(pd.DataFrame, np.ndarray, pd.Series)) if isinstance(y, np.ndarray): check_str(name=f'{create_shap_log.__name__} clf_name', value=clf_name) y = y.flatten() elif isinstance(y, pd.Series): clf_name = y.name y = y.values.flatten() else: check_valid_dataframe(df=y, source=f'{create_shap_log.__name__} y', valid_dtypes=Formats.NUMERIC_DTYPES.value, max_axis_1=1) clf_name = list(y.columns)[0] y = y.values.flatten() save_shap_path, save_raw_path = None, None if save_dir is not None: check_if_dir_exists(in_dir=save_dir) save_shap_path = os.path.join(save_dir, f"SHAP_values_{clf_name}.csv") save_raw_path = os.path.join(save_dir, f"RAW_SHAP_feature_values_{clf_name}.csv") check_valid_array(data=y, source=f'{create_shap_log.__name__} y', accepted_values=[0, 1]) check_int(name=f'{create_shap_log.__name__} cnt_present', value=cnt_present, min_value=1) check_int(name=f'{create_shap_log.__name__} cnt_absent', value=cnt_absent, min_value=1) target_cnt = np.sum(y) absent_cnt = y.shape[0] - target_cnt if cnt_present > target_cnt: NotEnoughDataWarning( msg=f"Data contains {target_cnt} behavior-present annotations. This is less the number of frames you specified to calculate shap values for ({cnt_present}). SimBA will calculate shap scores for the {target_cnt} behavior-present frames available", source=create_shap_log.__name__) cnt_present = target_cnt if absent_cnt < cnt_absent: NotEnoughDataWarning( msg=f"Data contains {absent_cnt} behavior-absent annotations. This is less the number of frames you specified to calculate shap values for ({cnt_absent}). SimBA will calculate shap scores for the {absent_cnt} behavior-absent frames available", source=create_shap_log.__name__) cnt_absent = absent_cnt target_idx = np.argwhere(y == 1).flatten() absent_idx = np.argwhere(y == 0).flatten() target_idx = np.sort(np.random.choice(target_idx, cnt_present)) absent_idx = np.sort(np.random.choice(absent_idx, cnt_absent)) target_x = x[target_idx] absent_x = x[absent_idx] X = np.vstack([target_x, absent_x]).astype(np.float32) Y = np.hstack([np.ones(target_x.shape[0]), np.zeros(absent_x.shape[0])]).astype(np.int32) explainer = shap.explainers.GPUTree(model=rf_clf, data=None, model_output='raw', feature_names='tree_path_dependent') shap_values = explainer.shap_values(X, check_additivity=True) V = pd.DataFrame(shap_values[1], columns=x_names).astype(np.float32) sum = V.sum(axis=1) expected_value = explainer.expected_value[1] p = TrainModelMixin().clf_predict_proba(clf=rf_clf, x_df=X) V['EXPECTED_VALUE'] = expected_value.round(4) V['SUM'] = sum + V['EXPECTED_VALUE'] V['PREDICTION_PROBABILITY'] = p.round(4) V['SUM'] = V['SUM'].round(4) V[clf_name] = Y x_idx = np.hstack([target_idx, absent_idx]) R = pd.DataFrame(x[x_idx, :], columns=x_names) timer.stop_timer() if save_dir is None: if verbose: stdout_success(msg=f'Shap values compute complete (GPU) for {len(V)} observations.', elapsed_time=timer.elapsed_time_str) return (V, R, expected_value) else: write_df(df=V, file_type='csv', save_path=save_shap_path) write_df(df=R, file_type='csv', save_path=save_raw_path) if verbose: stdout_success(msg=f'Shap values compute complete (GPU) for {len(V)} observations, and saved in {save_dir}', elapsed_time=timer.elapsed_time_str)