Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
154 changes: 154 additions & 0 deletions lasso_selection_bin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.linear_model import LogisticRegressionCV
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

def lasso_var_sel_binary(df, outcome_col='mapped_outcome', random_state=42):
"""
Prepare data and select features using binary logistic regression with elastic net penalty.
Specifically designed for binary outcomes only.
"""
if outcome_col not in df.columns:
raise ValueError(f"Outcome column '{outcome_col}' not found in DataFrame")

# Remove 'ID' column if it exists
if 'ID' in df.columns:
df = df.drop('ID', axis=1)

# Separate predictors and outcome
y = df[outcome_col].copy()
X = df.drop(columns=[outcome_col])

# Encode the binary outcome
label_encoder = LabelEncoder()
y = label_encoder.fit_transform(y)

# Verify that we have a binary outcome
n_classes = len(np.unique(y))
if n_classes != 2:
raise ValueError("This function is designed for binary classification only. More than two classes found.")

print("\nOutcome classes:", dict(zip(label_encoder.classes_, range(len(label_encoder.classes_)))))

# Encode categorical predictors
categorical_cols = X.select_dtypes(include=['object', 'category']).columns
for col in categorical_cols:
le = LabelEncoder()
X[col] = le.fit_transform(X[col].astype(str))

print(f"\nInitial shape of X: {X.shape}")
if X.shape[1] > 0:
print("First actual predictor column:", X.columns[0])
else:
raise ValueError("No predictor columns left after dropping outcome (and ID if applicable).")

# Standardize features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
X_scaled = pd.DataFrame(X_scaled, columns=X.columns)

# Fit binary logistic regression with elastic net
# For binary classification, multi_class defaults to 'ovr', which yields a single set of coefficients.
logistic = LogisticRegressionCV(
penalty='elasticnet',
l1_ratios=[0.8, 0.9],
solver='saga',
cv=StratifiedKFold(n_splits=5, shuffle=True, random_state=random_state),
random_state=random_state,
max_iter=5000,
class_weight='balanced',
Cs=np.logspace(-4, 8, 20),
tol=5e-4
)

logistic.fit(X_scaled, y)

# logistic.coef_ will have shape (1, n_features) for binary classification
coef_df = pd.DataFrame(logistic.coef_, columns=X.columns)
# No indexing by classes since it's binary (one row of coefficients)

# Compute feature importance as absolute value of coefficients
# Since there's only one class row, mean across rows is just that row
feature_importance = np.abs(coef_df.iloc[0, :])

# Select features with non-zero importance
selected_features = feature_importance[feature_importance > 0].index.tolist()

# Predictions
y_pred = logistic.predict(X_scaled)

# Performance metrics
print("\nPerformance Metrics:")
print("-------------------")

# Find the best C and corresponding CV score
best_c = logistic.C_[0]
c_index = np.where(logistic.Cs_ == best_c)[0][0]
all_class_scores = []
for cl in logistic.scores_:
all_class_scores.extend(logistic.scores_[cl][:, c_index])
best_cv_score = np.mean(all_class_scores)

print(f"Best C value: {best_c}")
print("\nConfusion Matrix:")
conf_matrix = confusion_matrix(y, y_pred)
print(conf_matrix)
print("\nClassification Report:")

target_names = [str(c) for c in label_encoder.classes_]
print(classification_report(y, y_pred, target_names=target_names))
print(f"Best CV score: {best_cv_score}")

# l1_ratio_ returns the best ratio found for each class. For binary, there should be one:
print(f"Best l1_ratio: {logistic.l1_ratio_[0]}")

print(f"\nSelected {len(selected_features)} features")

# Print feature importance for selected features
print("\nFeature importance for selected features:")
for feat in sorted(selected_features, key=lambda x: feature_importance[x], reverse=True):
print(f"{feat}: {feature_importance[feat]:.4f}")

X_selected = X[selected_features]
print(f"Final shape of selected features: {X_selected.shape}")

# Store metrics in a dictionary
metrics = {
'confusion_matrix': conf_matrix,
'classification_report': classification_report(y, y_pred, target_names=label_encoder.classes_, output_dict=True),
'accuracy': accuracy_score(y, y_pred),
'cv_scores': all_class_scores
}

# Create a results_df for selected features
results_df = pd.DataFrame({
'Feature': selected_features,
'Average_Coefficient': feature_importance[selected_features]
})
results_df = results_df.sort_values('Average_Coefficient', ascending=False)

results_df.to_csv('feature_coefficients_binary.csv', index=False)
print("\nSelected features and their coefficients:")
print(results_df.head())

return results_df, X_selected, y, selected_features, coef_df, label_encoder, feature_importance, metrics




if __name__ == "__main__":
# Load the dataset
# df = pd.read_csv("ISARIC_redbin.csv", header=0, index_col=0)
df = pd.read_csv("ppcg_cna_ar2rmv.csv", header=0, index_col=0)
# df = pd.read_csv("cat_out_data.csv", header=0, index_col=0)
# df = pd.read_csv("PPCG_badEx3_CNA_bin_rmv.csv", header=0, index_col=0)
print(df.columns)
print("Initial size:", df.shape)
patient_ids = df.index

results_df, X_selected, y, selected_features, coef_df, label_encoder, feature_importance, metrics = lasso_var_sel_binary(
# df, outcome_col='mapped_outcome'
df, outcome_col='outcome'
)
181 changes: 181 additions & 0 deletions prep_sel2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import pandas as pd
import numpy as np

def impute_miss_val(df, missing_threshold=0.7):
"""
Imputes missing values or drops columns based on missing value proportion.

Returns:
- df: DataFrame with missing values imputed or columns dropped
"""
# Calculate the proportion of missing values in each column
missing_proportions = df.isnull().mean()

# Identify columns to drop
cols_to_drop = missing_proportions[missing_proportions > missing_threshold].index
df = df.drop(columns=cols_to_drop)

# Impute missing values in remaining columns
for col in df.columns:
if df[col].isnull().any():
if pd.api.types.is_numeric_dtype(df[col]):
# Numeric column: impute with median
median_value = df[col].median()
if pd.isnull(median_value):
# If median cannot be computed, drop the column
df = df.drop(columns=[col])
else:
df[col] = df[col].fillna(median_value)
else:
# Categorical column: impute with mode
mode_series = df[col].mode()
if not mode_series.empty:
mode_value = mode_series[0]
df[col] = df[col].fillna(mode_value)
else:
# If mode cannot be computed, drop the column
df = df.drop(columns=[col])
print("\nSummary after Imputation")
print("Size of remaining data:", df.shape)
return df

def rmv_low_var(df, mad_threshold=0.1, freq_threshold=0.05):
"""
Removes numerical variables with Median Absolute Deviation (MAD) below a threshold.
Excludes binary columns from MAD calculation.
Removes binary columns with very low frequencies
Returns:
- df: pandas DataFrame with low MAD columns removed
"""
# Select numeric columns
numeric_cols = df.select_dtypes(include=[np.number]).columns

# Filter out binary columns
non_binary_cols = []
binary_cols =[]
for col in numeric_cols:
unique_values = df[col].nunique()
# Consider a column binary if it has 2 or fewer unique values
# Also check if all values are 0 or 1
is_binary = (unique_values <= 2) or (set(df[col].unique()) <= {0, 1, np.nan})
if not is_binary:
non_binary_cols.append(col)
elif is_binary:
binary_cols.append(col)

# Calculate low frequency binary numeric column
min_freq = freq_threshold
X_bin = df[binary_cols]
binary_counts = X_bin.apply(pd.value_counts, normalize=True)
keep_cols = [col for col in X_bin.columns if
(binary_counts[col].min() >= min_freq if len(binary_counts[col]) == 2 else True)]
X_bin = X_bin[keep_cols]

# Normalise the numeric columns by max
df_tmp = df
for col in non_binary_cols:
df[col] = df_tmp[col]/np.max(np.abs(df_tmp[col]))

# Calculate MAD for each non-binary numeric column
mad_values = {}
for col in non_binary_cols:
mad = np.median(np.abs(df[col] - np.median(df[col])))
mad_values[col] = mad

# Create a Series from the MAD values
mad_series = pd.Series(mad_values)
#print(mad_series)

# Identify columns to keep:
# 1. Non-numeric columns
# 2. Binary numeric columns
# 3. Non-binary numeric columns with MAD above threshold
cols_to_keep = set(df.columns) - set(non_binary_cols)-set(binary_cols) # Start with all CAT columns
cols_to_keep.update(mad_series[mad_series >= mad_threshold].index) # Add high MAD columns

# Keep only the identified columns
X_comb = pd.concat([df[list(cols_to_keep)], X_bin], axis=1)
df = X_comb

# Print summary for debugging
print(f"\nMAD Analysis Summary:")
print(f"Total numeric columns: {len(numeric_cols)}")
print(f"Non binary numeric columns: {len(non_binary_cols)}")
print(f"Binary columns excluded from MAD: {len(numeric_cols) - len(non_binary_cols)}")
print("High Frequency Binary columns kept:", X_bin.shape)
print(f"Columns removed due to low MAD: {len(non_binary_cols) - len(mad_series[mad_series >= mad_threshold])}")


return df

def rmv_high_corr(df, correlation_threshold=0.5):
# Step 1: Select numeric columns only
numeric_cols = df.select_dtypes(include=[np.number]).columns
df_numeric = df[numeric_cols] # DataFrame with only numeric columns

# Step 2: Calculate the correlation matrix
corr_matrix = df_numeric.corr().abs()

# Step 3: Identify highly correlated columns with a double loop
to_drop = set() # Use a set to avoid duplicates
num_cols = corr_matrix.shape[0]

for i in range(num_cols):
for j in range(i + 1, num_cols): # Only look at the upper triangle
if corr_matrix.iloc[i, j] > correlation_threshold:
# Identify the columns with high correlation
col1 = corr_matrix.columns[i]
col2 = corr_matrix.columns[j]

# Add one of the columns to `to_drop`
to_drop.add(col2) # Arbitrarily drop the second column

# Step 4: Drop highly correlated columns

print("\nCORR Summary")
print(f"Columns removed due to high correlation: {len(to_drop)}")
df = df.drop(columns=list(to_drop))

return df


if __name__ == "__main__":
# Load the dataset
# df = pd.read_csv("PPCG_badEx3_cat.csv", header=0, index_col=0)
# df = pd.read_csv("PPCG_badEx3_CNA_bin.csv", header=0, index_col=0)
# df = pd.read_csv("ISAtest.csv", header=0, index_col=0)
df = pd.read_csv("ppcg_cna_ar.csv", header=0, index_col=0)
print("Initial size:", df.shape)
print("DF head beginning:", df.head)
patient_ids = df.index

# Step 1: Impute missing values or drop columns
missing_threshold = 0.5 # Adjust as needed
df = impute_miss_val(df, missing_threshold)
print("DF head after imputation:", df.head)

numeric_cols = df.select_dtypes(include=[np.number]).columns
print(f"Initial number of numeric columns: {len(numeric_cols)}")
print(f"list of numeric columns: {numeric_cols}")

# Step 2: Remove numerical variables with low MAD
mad_threshold = 0.01 # Adjust as needed
freq_threshold = 0.05 # Adjust as needed
df = rmv_low_var(df, mad_threshold, freq_threshold)
print("DF shape after MAD filtering:", df.shape)
# print("DF head after MAD filtering:", df.head)

# Step 3: Remove highly correlated numerical variables
correlation_threshold = 0.8 # Adjust as needed
df = rmv_high_corr(df, correlation_threshold)
print("DF shape after CORR filtering:", df.shape)
#print("DF head after CORR filtering:", df.head)

df.index = patient_ids
# Save the processed dataset
print("final size:", df.shape)
df.to_csv('ppcg_cna_ar2rmv.csv', index=True)
# df.to_csv('PPCG_badEx3_CNA_bin_rmv.csv', index=True)