from itertools import product
from typing import Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
import xgboost as xgb
from sklearn.model_selection import StratifiedKFold
from simba.model.regression.metrics import (mean_absolute_error,
mean_absolute_percentage_error,
mean_squared_error, r2_score,
root_mean_squared_error)
from simba.utils.checks import (check_float, check_instance, check_int,
check_str, check_valid_array,
check_valid_dataframe, check_valid_lst)
from simba.utils.enums import Formats
from simba.utils.errors import DataHeaderError
[docs]def fit_xgb(x: pd.DataFrame,
y: np.ndarray,
mdl: xgb.XGBRegressor) -> xgb.XGBRegressor:
"""
Fits an XGBoost regressor model to the given data.
:param pd.DataFrame x: Input feature matrix where each row represents a sample and each column a feature. The data must have numeric types.
:param np.ndarray y: Target values, must be a 1-dimensional array of numeric types with the same number of rows as `x`.
:param xgb.XGBRegressor mdl: Defined xgb.XGBRegressor. E.g., can be defined with :func:`simba.model.regression.model.xgb_define`,
:return: Trained XGBoost regressor model.
:rtype: xgb.XGBRegressor
:example:
>>> x = pd.DataFrame(np.random.randint(0, 500, (100, 20)))
>>> y = np.random.randint(1, 6, (100,))
>>> mdl = fit_xgb(x=x, y=y)
"""
check_valid_dataframe(df=x, source=f'{fit_xgb.__name__} x', valid_dtypes=Formats.NUMERIC_DTYPES.value)
check_valid_array(data=y, source=f'{fit_xgb.__name__} y', accepted_ndims=(1,), accepted_axis_0_shape=[x.shape[0]], accepted_dtypes=Formats.NUMERIC_DTYPES.value)
check_instance(source=f'{fit_xgb.__name__} fit_xgb', instance=xgb_reg, accepted_types=(xgb.XGBRegressor,))
return mdl.fit(X=x, y=y)
[docs]def evaluate_xgb(y_pred: np.ndarray,
y_true: np.ndarray,
metrics: List[str],
stratified: Optional[bool] = False) -> dict:
"""
Evaluates the performance of a regression model (e.g., XGBoost) by calculating selected metrics. Optionally, the evaluation can be stratified by unique
values in the true target variable (`y_true`), where performance is computed separately for each class/level.
:param np.ndarray y_pred: Predicted values generated by the model, must have the same shape as `y_true`.
:param np.ndarray y_true: True target values to compare the predictions against.
:param List[str] metrics: List of metrics to compute.
:param stratified: If True, computes the metric for each unique class/level in `y_true`. If False (default), computes the metric for the entire dataset.
:return: A dictionary containing the computed metrics.
:rtype: dict
:example:
>>> x = pd.DataFrame(np.random.randint(0, 500, (100, 20)))
>>> y = np.random.randint(1, 6, (100,))
>>> mdl = fit_xgb(x=x, y=y)
>>> new_x = pd.DataFrame(np.random.randint(0, 500, (100, 20)))
>>> y_pred = transform_xgb(x=new_x, mdl=mdl)
>>> evaluate_xgb(y_pred=y_pred, y_true=y, metrics=['MAE', 'MAPE', 'RMSE', 'MSE'])
"""
METRICS = {'MAPE': mean_absolute_percentage_error, 'MSE': mean_squared_error, 'MAE': mean_absolute_error, 'R2': r2_score, 'RMSE': root_mean_squared_error}
check_valid_array(data=y_true, source=evaluate_xgb.__name__, accepted_ndims=(1,), accepted_dtypes=Formats.NUMERIC_DTYPES.value)
check_valid_array(data=y_pred, source=evaluate_xgb.__name__, accepted_ndims=(1,), min_axis_0=y_true.shape[0],accepted_dtypes=Formats.NUMERIC_DTYPES.value)
check_valid_lst(data=metrics, source=f'{evaluate_xgb.__name__} metrics', valid_values=list(METRICS.keys()))
results = {}
for metric in metrics:
if not stratified:
results[metric] = METRICS[metric](y_true=y_true, y_pred=y_pred)
else:
results[metric] = {}
for unique_true in (np.unique(y_true)):
sample_idx = np.argwhere(y_true == unique_true)
sample_y_true, sample_y_pred = y_true[sample_idx].flatten(), y_pred[sample_idx].flatten()
results[metric][unique_true] = METRICS[metric](y_true=sample_y_true, y_pred=sample_y_pred)
return results
[docs]def xgb_define(objective: str = 'reg:squarederror',
n_estimators: int = 100,
max_depth: int = 6,
verbosity: int = 1,
learning_rate: float = 0.3,
eta: float = 0.3,
gamma: float = 0.0,
tree_method: str = 'auto') -> xgb.XGBRegressor:
"""
Defines an XGBoost regressor.
:param str objective: The learning objective for the model.
:param int n_estimators: Number of boosting rounds. Must be greater than or equal to 1. Default is 100.
:param int max_depth: Maximum depth of a tree. Increasing this value makes the model more complex and more likely to overfit. Must be greater than or equal to 1. Default is 6.
:param int verbosity: Verbosity of the training process (0-3).
:param float learning_rate: Step size shrinkage used to prevent overfitting. Lower values make the model more robust but require more boosting rounds. Must be between 0.1 and 1.0. Default is 0.3.
:param float eta: Learning rate alias. Must be between 0.0 and 1.0. Default is 0.3.
:param float gamma: Minimum loss reduction required to make a further partition on a leaf node of the tree. Larger values prevent overfitting. Must be greater than or equal to 0.0. Default is 0.0.
:param str tree_method: The tree construction algorithm used in XGBoost.
:return: An initialized XGBoost Regressor with the specified configuration.
:rtype: xgb.XGBRegressor
"""
OBJECTIVES = ('reg:squarederror', 'reg:squaredlogerror', 'reg:logistic', 'reg:pseudohubererror')
TREE_METHODS = ('auto', 'exact', 'approx', 'hist', 'gpu_hist')
check_str(name=f'{fit_xgb.__name__} objective', value=objective, options=OBJECTIVES)
check_str(name=f'{fit_xgb.__name__} tree_method', value=tree_method, options=TREE_METHODS)
check_int(name=f'{fit_xgb.__name__} n_estimators', value=n_estimators, min_value=1)
check_int(name=f'{fit_xgb.__name__} max_depth', value=max_depth, min_value=1)
check_int(name=f'{fit_xgb.__name__} verbosity', value=verbosity, min_value=0, max_value=3)
check_float(name=f'{fit_xgb.__name__} learning_rate', value=learning_rate, min_value=0.1, max_value=1.0)
check_float(name=f'{fit_xgb.__name__} eta', value=eta, min_value=0.0, max_value=1.0)
check_float(name=f'{fit_xgb.__name__} gamma', value=gamma, min_value=0.0)
return xgb.XGBRegressor(objective=objective, max_depth=max_depth, n_estimators=n_estimators, verbosity=verbosity, learning_rate=learning_rate, eta=eta, gamma=gamma, tree_method=tree_method)
def xgb_grid_define(objective: Tuple[str] = ('reg:squarederror',),
n_estimators: Tuple[int] = (100,),
max_depth: Tuple[int] =(6,),
verbosity: Tuple[int] = (1,),
learning_rate: Tuple[float] = (0.3,),
eta: Tuple[float] = (0.3,),
gamma: Tuple[float] = (0.0,),
tree_method: Tuple[str] = ('auto',)) -> List[xgb.XGBRegressor]:
grid = list(product(objective, n_estimators, max_depth, verbosity, learning_rate, eta, gamma, tree_method))
mdls = []
for i in grid:
mdl = xgb_define(objective=i[0], n_estimators=i[1], max_depth=i[2], verbosity=i[3], learning_rate=i[4], eta=i[5], gamma=i[6], tree_method=i[7])
mdls.append(mdl)
return mdls
def xgb_grid_fit(x: pd.DataFrame,
y: np.ndarray,
mdls: List[xgb.XGBRegressor]) -> List[xgb.XGBRegressor]:
check_valid_dataframe(df=x, source=f'{fit_xgb.__name__} x', valid_dtypes=Formats.NUMERIC_DTYPES.value)
check_valid_array(data=y, source=f'{fit_xgb.__name__} y', accepted_ndims=(1,), accepted_axis_0_shape=[x.shape[0]], accepted_dtypes=Formats.NUMERIC_DTYPES.value)
check_valid_lst(data=mdls, source=xgb_grid_fit.__name__, valid_dtypes=(xgb.XGBRegressor,))
results = []
for mdl in mdls:
results.append(fit_xgb(x=x, y=y, mdl=mdl))
return results
# grid_df = pd.DataFrame(grid, columns=['objective', 'n_estimators', 'max_depth', 'learning_rate', 'eta', 'gamma'])
# grid_df['verbosity'], grid_df['tree_method'] = verbosity, tree_method
#
# grid_df.apply(fit, axis=1, result_type='expand')
#xgb_grid_define(max_depth=(6, 3), gamma=(0, 0.3))
# x = pd.DataFrame(np.random.randint(0, 500, (100, 20)))
# y = np.random.randint(1, 6, (100,))
# mdl = fit_xgb(x=x, y=y)
# new_x = pd.DataFrame(np.random.randint(0, 500, (100, 20)))
# y_pred = transform_xgb(x=new_x, model=mdl)
# evaluate_xgb(y_pred=y_pred, y_true=y, metrics=['MAE', 'MAPE', 'RMSE', 'MSE'], stratified=True)
#
# x = pd.DataFrame(np.random.randint(0, 500, (100, 20)))
# y = np.random.randint(1, 6, (100,))
# #mdl = fit_xgb(x=x, y=y)
#
# kfold_fit_xgb(x=x, y=y)
#for fold_cnt, (train_index, test_index) in enumerate(k_fold.split(x_data, y_data)):