281 lines
8.9 KiB
Python
281 lines
8.9 KiB
Python
import itertools
|
|
import os
|
|
import random
|
|
|
|
import pandas as pd
|
|
import tqdm
|
|
from imblearn.over_sampling import KMeansSMOTE
|
|
from lightgbm import LGBMClassifier
|
|
from sklearn.metrics import (
|
|
accuracy_score,
|
|
f1_score,
|
|
fbeta_score,
|
|
precision_score,
|
|
recall_score,
|
|
)
|
|
from sklearn.model_selection import StratifiedKFold, train_test_split
|
|
|
|
from utils import scaling_handler
|
|
|
|
RESULTS_FILENAME = "results_lgbm_tuning.csv"
|
|
|
|
|
|
def get_metrics(y_true, y_pred, prefix=""):
|
|
metrics = {}
|
|
|
|
metrics[f"{prefix}accuracy"] = accuracy_score(y_true, y_pred)
|
|
metrics[f"{prefix}f1_macro"] = f1_score(y_true, y_pred, average="macro")
|
|
metrics[f"{prefix}f2_macro"] = fbeta_score(y_true, y_pred, beta=2, average="macro")
|
|
metrics[f"{prefix}recall_macro"] = recall_score(y_true, y_pred, average="macro")
|
|
metrics[f"{prefix}precision_macro"] = precision_score(
|
|
y_true, y_pred, average="macro"
|
|
)
|
|
|
|
f1_scores = f1_score(y_true, y_pred, average=None, zero_division=0)
|
|
f2_scores = fbeta_score(y_true, y_pred, beta=2, average=None, zero_division=0)
|
|
recall_scores = recall_score(y_true, y_pred, average=None, zero_division=0)
|
|
precision_scores = precision_score(y_true, y_pred, average=None, zero_division=0)
|
|
|
|
metrics[f"{prefix}f1_class0"] = f1_scores[0]
|
|
metrics[f"{prefix}f1_class1"] = f1_scores[1]
|
|
metrics[f"{prefix}f2_class0"] = f2_scores[0]
|
|
metrics[f"{prefix}f2_class1"] = f2_scores[1]
|
|
metrics[f"{prefix}recall_class0"] = recall_scores[0]
|
|
metrics[f"{prefix}recall_class1"] = recall_scores[1]
|
|
metrics[f"{prefix}precision_class0"] = precision_scores[0]
|
|
metrics[f"{prefix}precision_class1"] = precision_scores[1]
|
|
|
|
TP = sum((y_true == 1) & (y_pred == 1))
|
|
TN = sum((y_true == 0) & (y_pred == 0))
|
|
FP = sum((y_true == 0) & (y_pred == 1))
|
|
FN = sum((y_true == 1) & (y_pred == 0))
|
|
|
|
metrics[f"{prefix}TP"] = TP
|
|
metrics[f"{prefix}TN"] = TN
|
|
metrics[f"{prefix}FP"] = FP
|
|
metrics[f"{prefix}FN"] = FN
|
|
|
|
return metrics
|
|
|
|
|
|
try:
|
|
data_frame = pd.read_csv("./data/Ketamine_icp_no_missing.csv")
|
|
except FileNotFoundError:
|
|
print("Please ensure the data file exists at './data/Ketamine_icp_no_missing.csv'")
|
|
exit()
|
|
|
|
random_state = 42
|
|
n_split_kfold = 5
|
|
|
|
scaling_methods_list = [
|
|
"standard_scaling",
|
|
"robust_scaling",
|
|
"minmax_scaling",
|
|
"yeo_johnson",
|
|
]
|
|
|
|
boosting_type_list = ["gbdt", "dart"]
|
|
learning_rate_list = [0.03, 0.05, 0.1]
|
|
number_of_leaves_list = [100]
|
|
l2_regularization_lambda_list = [0.1, 0.5]
|
|
l1_regularization_alpha_list = [0.1, 0.5]
|
|
tree_subsample_tree_list = [0.8, 1.0]
|
|
subsample_list = [0.8, 1.0]
|
|
is_balanced_list = [True, False]
|
|
|
|
kmeans_smote_k_neighbors_list = [10]
|
|
kmeans_smote_n_clusters_list = [5]
|
|
|
|
param_combinations = list(
|
|
itertools.product(
|
|
scaling_methods_list,
|
|
boosting_type_list,
|
|
learning_rate_list,
|
|
number_of_leaves_list,
|
|
l2_regularization_lambda_list,
|
|
l1_regularization_alpha_list,
|
|
tree_subsample_tree_list,
|
|
subsample_list,
|
|
is_balanced_list,
|
|
kmeans_smote_k_neighbors_list,
|
|
kmeans_smote_n_clusters_list,
|
|
)
|
|
)
|
|
|
|
template_metrics = get_metrics(
|
|
pd.Series([0, 1, 0, 1]),
|
|
pd.Series([0, 1, 0, 1]),
|
|
)
|
|
|
|
template_cols = ["iteration", "model", "params"]
|
|
for k in template_metrics.keys():
|
|
template_cols.append(f"avg_val_{k}")
|
|
template_cols.append(f"test_{k}")
|
|
empty_df = pd.DataFrame(columns=template_cols)
|
|
if not os.path.exists(RESULTS_FILENAME):
|
|
empty_df.to_csv(RESULTS_FILENAME, index=False)
|
|
print(f"Initialized {RESULTS_FILENAME} with headers.")
|
|
else:
|
|
print(f"File {RESULTS_FILENAME} already exists. Appending to it.")
|
|
|
|
iteration = 0
|
|
for (
|
|
scaling_method,
|
|
boosting_type,
|
|
learning_rate,
|
|
num_leaves,
|
|
reg_lambda,
|
|
reg_alpha,
|
|
colsample_bytree,
|
|
subsample,
|
|
is_balanced,
|
|
k_neighbors,
|
|
kmeans_estimator,
|
|
) in tqdm.tqdm(param_combinations):
|
|
skf = StratifiedKFold(
|
|
n_splits=n_split_kfold, shuffle=True, random_state=random_state
|
|
)
|
|
|
|
data_frame_scaled = scaling_handler(data_frame, scaling_method)
|
|
y = data_frame_scaled["label"]
|
|
X = data_frame_scaled.drop(columns=["label"])
|
|
|
|
x_train_val, x_test, y_train_val, y_test = train_test_split(
|
|
X, y, test_size=0.15, stratify=y, random_state=random_state
|
|
)
|
|
|
|
fold_results = []
|
|
lgbm_classifier_params = None
|
|
|
|
sampling_method = "none"
|
|
|
|
for fold_idx, (train_index, val_index) in enumerate(
|
|
skf.split(x_train_val, y_train_val)
|
|
):
|
|
x_train_fold, x_val = x_train_val.iloc[train_index], x_train_val.iloc[val_index]
|
|
y_train_fold, y_val = y_train_val.iloc[train_index], y_train_val.iloc[val_index]
|
|
|
|
x_train = x_train_fold
|
|
y_train = y_train_fold
|
|
lgbm_classifier_params = None
|
|
|
|
lgbm_base_params = {
|
|
"boosting_type": boosting_type,
|
|
"objective": "binary",
|
|
"learning_rate": learning_rate,
|
|
"n_jobs": -1,
|
|
"num_leaves": num_leaves,
|
|
"reg_lambda": reg_lambda,
|
|
"reg_alpha": reg_alpha,
|
|
"colsample_bytree": colsample_bytree,
|
|
"subsample": subsample,
|
|
}
|
|
|
|
if is_balanced:
|
|
sampling_method = "KMeansSMOTE"
|
|
|
|
smote_params = {
|
|
"sampling_strategy": "minority",
|
|
"k_neighbors": k_neighbors,
|
|
"kmeans_estimator": kmeans_estimator,
|
|
"cluster_balance_threshold": 0.001,
|
|
"random_state": random_state,
|
|
"n_jobs": -1,
|
|
}
|
|
|
|
try:
|
|
smote = KMeansSMOTE(**smote_params)
|
|
x_train, y_train = smote.fit_resample(x_train_fold, y_train_fold)
|
|
lgbm_classifier_params = lgbm_base_params.copy()
|
|
|
|
except RuntimeError as e:
|
|
print(
|
|
f"KMeansSMOTE failed with RuntimeError in fold {fold_idx} of iteration {iteration}: {e}. Skipping fold."
|
|
)
|
|
continue
|
|
except ValueError as e:
|
|
print(
|
|
f"KMeansSMOTE failed with ValueError in fold {fold_idx} of iteration {iteration}: {e}. Skipping fold."
|
|
)
|
|
continue
|
|
|
|
else:
|
|
sampling_method = "class_weight"
|
|
|
|
class_1_weight = int(
|
|
(y_train_fold.shape[0] - y_train_fold.sum()) / y_train_fold.sum()
|
|
)
|
|
|
|
lgbm_classifier_params = lgbm_base_params.copy()
|
|
lgbm_classifier_params["class_weight"] = {0: 1, 1: class_1_weight}
|
|
|
|
if lgbm_classifier_params:
|
|
model = LGBMClassifier(
|
|
**lgbm_classifier_params, random_state=random_state, verbose=-1
|
|
)
|
|
model.fit(x_train, y_train)
|
|
y_pred_val = model.predict(x_val)
|
|
|
|
val_metrics = get_metrics(y_val, y_pred_val)
|
|
fold_results.append(val_metrics)
|
|
|
|
avg_val_metrics = {}
|
|
if fold_results:
|
|
val_df = pd.DataFrame(fold_results)
|
|
avg_val_metrics = val_df.mean().to_dict()
|
|
|
|
test_metrics = {}
|
|
if lgbm_classifier_params:
|
|
x_train_final = x_train_val
|
|
y_train_final = y_train_val
|
|
|
|
if is_balanced and sampling_method == "KMeansSMOTE":
|
|
try:
|
|
smote = KMeansSMOTE(**smote_params)
|
|
x_train_final, y_train_final = smote.fit_resample(
|
|
x_train_val, y_train_val
|
|
)
|
|
except (RuntimeError, ValueError) as e:
|
|
print(
|
|
f"Final KMeansSMOTE failed for iteration {iteration}: {e}. Skipping test evaluation."
|
|
)
|
|
lgbm_classifier_params = None
|
|
|
|
if lgbm_classifier_params:
|
|
final_lgbm_params = lgbm_base_params.copy()
|
|
|
|
test_model = LGBMClassifier(
|
|
**final_lgbm_params, random_state=random_state, verbose=-1
|
|
)
|
|
|
|
test_model.fit(x_train_final, y_train_final)
|
|
y_pred_test = test_model.predict(x_test)
|
|
|
|
test_metrics = get_metrics(y_test, y_pred_test, prefix="test_")
|
|
|
|
if lgbm_classifier_params:
|
|
params_str = str(lgbm_base_params).replace("}", "")
|
|
if is_balanced:
|
|
params_str += f", 'smote_k_neighbors': {k_neighbors}, 'smote_n_clusters': {kmeans_estimator}"
|
|
|
|
final_result_dict = {
|
|
"iteration": iteration,
|
|
"model": "LGBMClassifier",
|
|
"params": params_str
|
|
+ f", 'sampling_method': '{sampling_method}', 'scaling_method': '{scaling_method}'}}",
|
|
}
|
|
|
|
for k in template_metrics.keys():
|
|
final_result_dict[f"avg_val_{k}"] = avg_val_metrics.get(k, float("nan"))
|
|
final_result_dict[f"test_{k}"] = test_metrics.get(f"test_{k}", float("nan"))
|
|
|
|
result_row_df = pd.DataFrame([final_result_dict])
|
|
|
|
result_row_df = result_row_df.reindex(columns=template_cols, fill_value=None)
|
|
|
|
result_row_df.to_csv(RESULTS_FILENAME, mode="a", header=False, index=False)
|
|
|
|
iteration += 1
|
|
|
|
print(f"Finished: check {RESULTS_FILENAME}")
|