147 lines
4.6 KiB
Python
147 lines
4.6 KiB
Python
def split_path(full_path):
|
|
import os
|
|
|
|
directory = os.path.dirname(full_path)
|
|
filename = os.path.splitext(os.path.basename(full_path))[0]
|
|
return directory, filename
|
|
|
|
|
|
def write_textfile(path, data_list):
|
|
with open(path, "w") as file:
|
|
for data in data_list:
|
|
file.write(f"{data} \n")
|
|
|
|
|
|
def missing_value_handler(data_path):
|
|
import pandas
|
|
from sklearn.impute import KNNImputer
|
|
|
|
data_directory, data_filename = split_path(data_path)
|
|
|
|
data_frame = pandas.read_csv(data_path)
|
|
|
|
columns = list(data_frame.head(0))
|
|
# remove column id
|
|
if "id" in columns:
|
|
data_frame = data_frame.drop("id", axis="columns")
|
|
|
|
columns = list(data_frame.head(0))
|
|
write_textfile(f"{data_directory}/columns.txt", columns)
|
|
|
|
# find missing values
|
|
missing_value_counts = data_frame.isna().sum()
|
|
write_textfile(f"{data_directory}/missing.txt", missing_value_counts)
|
|
|
|
# fill missing values - KNNImputer
|
|
|
|
imputer = KNNImputer(n_neighbors=5)
|
|
data_imputed = imputer.fit_transform(data_frame)
|
|
data_frame_imputed = pandas.DataFrame(data_imputed, columns=columns)
|
|
|
|
missing_value_counts = data_frame_imputed.isna().sum()
|
|
write_textfile(f"{data_directory}/no_missing.txt", missing_value_counts)
|
|
|
|
data_frame_imputed.to_csv("./data/Ketamine_icp_no_missing.csv", index=False)
|
|
|
|
return data_frame_imputed
|
|
|
|
|
|
def scaling_handler(data_frame, method="robust_scaling"):
|
|
import pandas
|
|
from sklearn.preprocessing import (
|
|
MaxAbsScaler,
|
|
MinMaxScaler,
|
|
PowerTransformer,
|
|
QuantileTransformer,
|
|
RobustScaler,
|
|
StandardScaler,
|
|
)
|
|
|
|
# Separate features and label
|
|
labels = data_frame["label"]
|
|
X = data_frame.drop("label", axis=1)
|
|
|
|
# Choose scaler/transformer
|
|
if method == "robust_scaling":
|
|
scaler = RobustScaler()
|
|
elif method == "standard_scaling":
|
|
scaler = StandardScaler()
|
|
elif method == "minmax_scaling":
|
|
scaler = MinMaxScaler()
|
|
elif method == "maxabs_scaling":
|
|
scaler = MaxAbsScaler()
|
|
elif method == "quantile_normal":
|
|
scaler = QuantileTransformer(output_distribution="normal", random_state=42)
|
|
elif method == "quantile_uniform":
|
|
scaler = QuantileTransformer(output_distribution="uniform", random_state=42)
|
|
elif method == "yeo_johnson":
|
|
scaler = PowerTransformer(method="yeo-johnson")
|
|
elif method == "box_cox":
|
|
# Box-Cox requires all positive values
|
|
scaler = PowerTransformer(
|
|
method="box-cox",
|
|
)
|
|
X_pos = X.copy()
|
|
|
|
min_per_column = X_pos.min()
|
|
|
|
for col in X_pos.columns:
|
|
if min_per_column[col] <= 0:
|
|
X_pos[col] = X_pos[col] + abs(min_per_column[col]) + 1e-6 # tiny offset
|
|
|
|
X = X_pos
|
|
else:
|
|
raise ValueError(f"Unknown scaling method: {method}")
|
|
|
|
# Fit and transform
|
|
X_scaled = scaler.fit_transform(X)
|
|
data_frame_scaled = pandas.DataFrame(X_scaled, columns=X.columns)
|
|
data_frame_scaled["label"] = labels.values
|
|
|
|
return data_frame_scaled
|
|
|
|
|
|
from sklearn.metrics import (
|
|
accuracy_score,
|
|
f1_score,
|
|
fbeta_score,
|
|
precision_score,
|
|
recall_score,
|
|
)
|
|
|
|
|
|
def get_metrics(y_true, y_pred, prefix=""):
|
|
metrics = {}
|
|
metrics[f"{prefix}accuracy"] = accuracy_score(y_true, y_pred)
|
|
metrics[f"{prefix}f1_macro"] = f1_score(y_true, y_pred, average="macro")
|
|
metrics[f"{prefix}f2_macro"] = fbeta_score(y_true, y_pred, beta=2, average="macro")
|
|
metrics[f"{prefix}recall_macro"] = recall_score(y_true, y_pred, average="macro")
|
|
metrics[f"{prefix}precision_macro"] = precision_score(
|
|
y_true, y_pred, average="macro"
|
|
)
|
|
|
|
# Per-class scores
|
|
f1_scores = f1_score(y_true, y_pred, average=None, zero_division=0)
|
|
f2_scores = fbeta_score(y_true, y_pred, beta=2, average=None, zero_division=0)
|
|
recall_scores = recall_score(y_true, y_pred, average=None, zero_division=0)
|
|
precision_scores = precision_score(y_true, y_pred, average=None, zero_division=0)
|
|
|
|
for i in range(len(f1_scores)):
|
|
metrics[f"{prefix}f1_class{i}"] = f1_scores[i]
|
|
metrics[f"{prefix}f2_class{i}"] = f2_scores[i]
|
|
metrics[f"{prefix}recall_class{i}"] = recall_scores[i]
|
|
metrics[f"{prefix}precision_class{i}"] = precision_scores[i]
|
|
|
|
# Confusion-matrix components
|
|
TP = sum((y_true == 1) & (y_pred == 1))
|
|
TN = sum((y_true == 0) & (y_pred == 0))
|
|
FP = sum((y_true == 0) & (y_pred == 1))
|
|
FN = sum((y_true == 1) & (y_pred == 0))
|
|
|
|
metrics[f"{prefix}TP"] = TP
|
|
metrics[f"{prefix}TN"] = TN
|
|
metrics[f"{prefix}FP"] = FP
|
|
metrics[f"{prefix}FN"] = FN
|
|
|
|
return metrics
|