from itertools import product from operator import sub from tabnanny import verbose import lightgbm as lgb import pandas from imblearn.over_sampling import KMeansSMOTE from sklearn.model_selection import StratifiedKFold, train_test_split from tqdm import tqdm from models.model_utils import average_fold_results, get_metrics, scaling_handler class LIGHT_GBM: def __init__( self, data_frame, params={}, n_split_kfold=5, test_size=0.15, seed=42, output_file_tuning="lightgbm_tuning_results.csv", ): self.data_frame = data_frame self.params = params self.n_split_kfold = n_split_kfold self.test_size = test_size self.seed = 42 self.x_test = None self.y_test = None self.scaling_method = None self.sampling_method = None self.class_weights = {0: 1.0, 1: 1.0} self.model = None self.learning_rate = self.params.get("learning_rate", 0.1) self.num_leaves = self.params.get("num_leaves", 100) self.boosting_type = self.params.get("boosting_type", "gbdt") self.l1_reg = self.params.get("l1_reg", 0.1) self.l2_reg = self.params.get("l2_reg", 0.1) self.subsample = self.params.get("subsample", 1.0) self.tree_subsample = self.params.get("tree_subsample", 1.0) self.k_neighbors = self.params.get("k_neighbors", 10) self.kmeans_estimator = self.params.get("kmeans_estimator", 5) self.tuning_results = None self.output_file_tuning = output_file_tuning def preprocess(self): self.scaling_method = self.params.get("scaling_method", None) if self.scaling_method: self.data_frame = scaling_handler(self.data_frame, self.scaling_method) def fit(self): y = self.data_frame["label"] X = self.data_frame.drop(columns=["label"]) x_train_val, self.x_test, y_train_val, self.y_test = train_test_split( X, y, test_size=self.test_size, stratify=y, random_state=self.seed ) skf = StratifiedKFold( n_splits=self.n_split_kfold, shuffle=True, random_state=self.seed ) fold_results = [] for fold_idx, (train_index, val_index) in enumerate( tqdm( skf.split(x_train_val, y_train_val), total=self.n_split_kfold, desc=" >> LightGBM Fitting: ", ) ): x_train_fold, x_val = ( x_train_val.iloc[train_index], x_train_val.iloc[val_index], ) y_train_fold, y_val = ( y_train_val.iloc[train_index], y_train_val.iloc[val_index], ) self.sampling_method = self.params.get("sampling_method", None) if self.sampling_method == "KMeansSMOTE": smote = KMeansSMOTE( sampling_strategy="minority", k_neighbors=self.k_neighbors, kmeans_estimator=self.kmeans_estimator, cluster_balance_threshold=0.001, random_state=self.seed, n_jobs=-1, ) x_train_fold, y_train_fold = smote.fit_resample( x_train_fold, y_train_fold ) y_train_fold = y_train_fold.astype(int) elif self.sampling_method == "class_weight": self.class_1_weight = int( (y_train_fold.shape[0] - y_train_fold.sum()) / y_train_fold.sum() ) self.class_weights = {0: 1, 1: self.class_1_weight} self.model = lgb.LGBMClassifier( boosting_type=self.boosting_type, learning_rate=self.learning_rate, num_leaves=self.num_leaves, reg_alpha=self.l1_reg, reg_lambda=self.l2_reg, subsample=self.subsample, subsample_freq=1, colsample_bytree=self.tree_subsample, class_weight=self.class_weights if self.sampling_method == "class_weight" else None, n_estimators=100, random_state=self.seed, verbose=-1, ) self.model.fit(x_train_fold, y_train_fold) y_pred_val = self.model.predict(x_val) val_metrics = get_metrics(y_val, y_pred_val) fold_results.append(val_metrics) return average_fold_results(fold_results) def eval(self, x_test=None, y_test=None): if x_test is not None and y_test is not None: self.x_test = x_test self.y_test = y_test self.y_pred_test = self.model.predict(self.x_test) test_metrics = get_metrics(self.y_test, self.y_pred_test) return test_metrics def tune(self): scaling_methods = [ "standard_scaling", "robust_scaling", "minmax_scaling", "yeo_johnson", ] sampling_methods = [ "KMeansSMOTE", "class_weight", ] boosting_type_list = ["gbdt", "dart"] learning_rate_list = [0.03, 0.05, 0.1] number_of_leaves_list = [100] l2_regularization_lambda_list = [0.1] l1_regularization_alpha_list = [0.1] tree_subsample_tree_list = [0.8, 1.0] subsample_list = [0.8, 1.0] kmeans_smote_k_neighbors_list = [10] kmeans_smote_n_clusters_list = [5] tuning_results = [] param_product = list( product( scaling_methods, sampling_methods, boosting_type_list, learning_rate_list, number_of_leaves_list, l2_regularization_lambda_list, l1_regularization_alpha_list, tree_subsample_tree_list, subsample_list, kmeans_smote_k_neighbors_list, kmeans_smote_n_clusters_list, ) ) for ( scaling_method, sampling_method, boosting_type, learning_rate, num_leaves, l2_reg, l1_reg, tree_subsample, subsample, k_neighbors, kmeans_estimator, ) in tqdm(param_product, total=len(param_product), desc=" > LightGBM Tuning: "): self.scaling_method = scaling_method self.sampling_method = sampling_method self.boosting_type = boosting_type self.learning_rate = learning_rate self.num_leaves = num_leaves self.l2_reg = l2_reg self.l1_reg = l1_reg self.tree_subsample = tree_subsample self.subsample = subsample self.k_neighbors = k_neighbors self.kmeans_estimator = kmeans_estimator print( " >> Fitting Params: ", scaling_method, sampling_method, boosting_type, learning_rate, num_leaves, l2_reg, l1_reg, tree_subsample, subsample, k_neighbors, kmeans_estimator, ) self.preprocess() fold_result = self.fit() tuning_results.append( { "model": "lightgbm", "scaling_method": scaling_method, "sampling_method": sampling_method, "boosting_type": boosting_type, "learning_rate": learning_rate, "num_leaves": num_leaves, "l2_reg": l2_reg, "l1_reg": l1_reg, "tree_subsample": tree_subsample, "subsample": subsample, "k_neighbors": k_neighbors, "kmeans_estimator": kmeans_estimator, "metrics": fold_result, } ) self.tuning_results = tuning_results df_tuning = pandas.DataFrame(tuning_results) metrics_df = df_tuning["metrics"].apply(pandas.Series) df_tuning = pandas.concat( [df_tuning.drop(columns=["metrics"]), metrics_df], axis=1 ) df_tuning.to_csv(self.output_file_tuning, index=False) return