import itertools import os import random import pandas as pd import tqdm from imblearn.over_sampling import KMeansSMOTE from lightgbm import LGBMClassifier from sklearn.metrics import ( accuracy_score, f1_score, fbeta_score, precision_score, recall_score, ) from sklearn.model_selection import StratifiedKFold, train_test_split from utils import scaling_handler RESULTS_FILENAME = "results_lgbm_tuning.csv" def get_metrics(y_true, y_pred, prefix=""): metrics = {} metrics[f"{prefix}accuracy"] = accuracy_score(y_true, y_pred) metrics[f"{prefix}f1_macro"] = f1_score(y_true, y_pred, average="macro") metrics[f"{prefix}f2_macro"] = fbeta_score(y_true, y_pred, beta=2, average="macro") metrics[f"{prefix}recall_macro"] = recall_score(y_true, y_pred, average="macro") metrics[f"{prefix}precision_macro"] = precision_score( y_true, y_pred, average="macro" ) f1_scores = f1_score(y_true, y_pred, average=None, zero_division=0) f2_scores = fbeta_score(y_true, y_pred, beta=2, average=None, zero_division=0) recall_scores = recall_score(y_true, y_pred, average=None, zero_division=0) precision_scores = precision_score(y_true, y_pred, average=None, zero_division=0) metrics[f"{prefix}f1_class0"] = f1_scores[0] metrics[f"{prefix}f1_class1"] = f1_scores[1] metrics[f"{prefix}f2_class0"] = f2_scores[0] metrics[f"{prefix}f2_class1"] = f2_scores[1] metrics[f"{prefix}recall_class0"] = recall_scores[0] metrics[f"{prefix}recall_class1"] = recall_scores[1] metrics[f"{prefix}precision_class0"] = precision_scores[0] metrics[f"{prefix}precision_class1"] = precision_scores[1] TP = sum((y_true == 1) & (y_pred == 1)) TN = sum((y_true == 0) & (y_pred == 0)) FP = sum((y_true == 0) & (y_pred == 1)) FN = sum((y_true == 1) & (y_pred == 0)) metrics[f"{prefix}TP"] = TP metrics[f"{prefix}TN"] = TN metrics[f"{prefix}FP"] = FP metrics[f"{prefix}FN"] = FN return metrics try: data_frame = pd.read_csv("./data/Ketamine_icp_no_missing.csv") except FileNotFoundError: print("Please ensure the data file exists at './data/Ketamine_icp_no_missing.csv'") exit() random_state = 42 n_split_kfold = 5 scaling_methods_list = [ "standard_scaling", "robust_scaling", "minmax_scaling", "yeo_johnson", ] boosting_type_list = ["gbdt", "dart"] learning_rate_list = [0.03, 0.05, 0.1] number_of_leaves_list = [100] l2_regularization_lambda_list = [0.1, 0.5] l1_regularization_alpha_list = [0.1, 0.5] tree_subsample_tree_list = [0.8, 1.0] subsample_list = [0.8, 1.0] is_balanced_list = [True, False] kmeans_smote_k_neighbors_list = [10] kmeans_smote_n_clusters_list = [5] param_combinations = list( itertools.product( scaling_methods_list, boosting_type_list, learning_rate_list, number_of_leaves_list, l2_regularization_lambda_list, l1_regularization_alpha_list, tree_subsample_tree_list, subsample_list, is_balanced_list, kmeans_smote_k_neighbors_list, kmeans_smote_n_clusters_list, ) ) template_metrics = get_metrics( pd.Series([0, 1, 0, 1]), pd.Series([0, 1, 0, 1]), ) template_cols = ["iteration", "model", "params"] for k in template_metrics.keys(): template_cols.append(f"avg_val_{k}") template_cols.append(f"test_{k}") empty_df = pd.DataFrame(columns=template_cols) if not os.path.exists(RESULTS_FILENAME): empty_df.to_csv(RESULTS_FILENAME, index=False) print(f"Initialized {RESULTS_FILENAME} with headers.") else: print(f"File {RESULTS_FILENAME} already exists. Appending to it.") iteration = 0 for ( scaling_method, boosting_type, learning_rate, num_leaves, reg_lambda, reg_alpha, colsample_bytree, subsample, is_balanced, k_neighbors, kmeans_estimator, ) in tqdm.tqdm(param_combinations): skf = StratifiedKFold( n_splits=n_split_kfold, shuffle=True, random_state=random_state ) data_frame_scaled = scaling_handler(data_frame, scaling_method) y = data_frame_scaled["label"] X = data_frame_scaled.drop(columns=["label"]) x_train_val, x_test, y_train_val, y_test = train_test_split( X, y, test_size=0.15, stratify=y, random_state=random_state ) fold_results = [] lgbm_classifier_params = None sampling_method = "none" for fold_idx, (train_index, val_index) in enumerate( skf.split(x_train_val, y_train_val) ): x_train_fold, x_val = x_train_val.iloc[train_index], x_train_val.iloc[val_index] y_train_fold, y_val = y_train_val.iloc[train_index], y_train_val.iloc[val_index] x_train = x_train_fold y_train = y_train_fold lgbm_classifier_params = None lgbm_base_params = { "boosting_type": boosting_type, "objective": "binary", "learning_rate": learning_rate, "n_jobs": -1, "num_leaves": num_leaves, "reg_lambda": reg_lambda, "reg_alpha": reg_alpha, "colsample_bytree": colsample_bytree, "subsample": subsample, } if is_balanced: sampling_method = "KMeansSMOTE" smote_params = { "sampling_strategy": "minority", "k_neighbors": k_neighbors, "kmeans_estimator": kmeans_estimator, "cluster_balance_threshold": 0.001, "random_state": random_state, "n_jobs": -1, } try: smote = KMeansSMOTE(**smote_params) x_train, y_train = smote.fit_resample(x_train_fold, y_train_fold) lgbm_classifier_params = lgbm_base_params.copy() except RuntimeError as e: print( f"KMeansSMOTE failed with RuntimeError in fold {fold_idx} of iteration {iteration}: {e}. Skipping fold." ) continue except ValueError as e: print( f"KMeansSMOTE failed with ValueError in fold {fold_idx} of iteration {iteration}: {e}. Skipping fold." ) continue else: sampling_method = "class_weight" class_1_weight = int( (y_train_fold.shape[0] - y_train_fold.sum()) / y_train_fold.sum() ) lgbm_classifier_params = lgbm_base_params.copy() lgbm_classifier_params["class_weight"] = {0: 1, 1: class_1_weight} if lgbm_classifier_params: model = LGBMClassifier( **lgbm_classifier_params, random_state=random_state, verbose=-1 ) model.fit(x_train, y_train) y_pred_val = model.predict(x_val) val_metrics = get_metrics(y_val, y_pred_val) fold_results.append(val_metrics) avg_val_metrics = {} if fold_results: val_df = pd.DataFrame(fold_results) avg_val_metrics = val_df.mean().to_dict() test_metrics = {} if lgbm_classifier_params: x_train_final = x_train_val y_train_final = y_train_val if is_balanced and sampling_method == "KMeansSMOTE": try: smote = KMeansSMOTE(**smote_params) x_train_final, y_train_final = smote.fit_resample( x_train_val, y_train_val ) except (RuntimeError, ValueError) as e: print( f"Final KMeansSMOTE failed for iteration {iteration}: {e}. Skipping test evaluation." ) lgbm_classifier_params = None if lgbm_classifier_params: final_lgbm_params = lgbm_base_params.copy() test_model = LGBMClassifier( **final_lgbm_params, random_state=random_state, verbose=-1 ) test_model.fit(x_train_final, y_train_final) y_pred_test = test_model.predict(x_test) test_metrics = get_metrics(y_test, y_pred_test, prefix="test_") if lgbm_classifier_params: params_str = str(lgbm_base_params).replace("}", "") if is_balanced: params_str += f", 'smote_k_neighbors': {k_neighbors}, 'smote_n_clusters': {kmeans_estimator}" final_result_dict = { "iteration": iteration, "model": "LGBMClassifier", "params": params_str + f", 'sampling_method': '{sampling_method}', 'scaling_method': '{scaling_method}'}}", } for k in template_metrics.keys(): final_result_dict[f"avg_val_{k}"] = avg_val_metrics.get(k, float("nan")) final_result_dict[f"test_{k}"] = test_metrics.get(f"test_{k}", float("nan")) result_row_df = pd.DataFrame([final_result_dict]) result_row_df = result_row_df.reindex(columns=template_cols, fill_value=None) result_row_df.to_csv(RESULTS_FILENAME, mode="a", header=False, index=False) iteration += 1 print(f"Finished: check {RESULTS_FILENAME}")