From edf589bcd45abef52e7cb9df7941d68ffe07a6f0 Mon Sep 17 00:00:00 2001 From: AgatheZ Date: Thu, 17 Oct 2024 12:09:17 +0100 Subject: [PATCH 1/8] Nlp project template --- .gitignore | 4 +- Dockerfile | 22 +-- config_NLP/config.cfg | 21 +++ config_NLP/local_config.cfg | 23 +++ project_NLP/NLPDataModule.py | 174 ++++++++++++++++++ project_NLP/NLPDataset.py | 75 ++++++++ project_NLP/NLPNetwork.py | 262 +++++++++++++++++++++++++++ project_NLP/__init__.py | 0 project_NLP/utils/Wrapper.py | 49 +++++ project_NLP/utils/tools.py | 36 ++++ scripts_NLP/__init__.py | 0 scripts_NLP/train_NLP.py | 192 ++++++++++++++++++++ tests/data_NLP/synthetic_example.csv | 16 ++ 13 files changed, 862 insertions(+), 12 deletions(-) create mode 100644 config_NLP/config.cfg create mode 100644 config_NLP/local_config.cfg create mode 100644 project_NLP/NLPDataModule.py create mode 100644 project_NLP/NLPDataset.py create mode 100644 project_NLP/NLPNetwork.py create mode 100644 project_NLP/__init__.py create mode 100644 project_NLP/utils/Wrapper.py create mode 100644 project_NLP/utils/tools.py create mode 100644 scripts_NLP/__init__.py create mode 100644 scripts_NLP/train_NLP.py create mode 100644 tests/data_NLP/synthetic_example.csv diff --git a/.gitignore b/.gitignore index cad1a26..0a3a1b3 100644 --- a/.gitignore +++ b/.gitignore @@ -122,9 +122,11 @@ mlruns/ #data *.xlsx -*.csv *.ipynb *.txt +!tests/ +!tests/data_NLP/ +*.csv #git hooks .files_exceptions diff --git a/Dockerfile b/Dockerfile index 8887b7e..0485cd2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,16 +1,16 @@ -FROM python:3.10 - -WORKDIR /project - -ENV DEBIAN_FRONTEND=noninteractive - +FROM python:3.9 RUN apt-get update && \ - apt-get install -y --no-install-recommends build-essential git rsync software-properties-common ffmpeg libsm6 libxext6 && \ - rm -rf /var/lib/apt/lists/* + apt-get install -y build-essential git rsync software-properties-common --allow-unauthenticated + +WORKDIR /project +ENV VIRTUAL_ENV=/opt/venv +RUN python3 -m venv $VIRTUAL_ENV +ENV PATH="$VIRTUAL_ENV/bin:$PATH" ENV PYTHONPATH="/mlflow/projects/code/:$PYTHONPATH" -COPY . . +COPY --chown=root . . -RUN python -m pip install --upgrade pip && \ - python -m pip install --no-cache-dir -r requirements.txt +# install requirements +RUN python -m pip install --upgrade pip && python -m pip install wheel +RUN python -m pip install --ignore-install ruamel-yaml -r requirements.txt \ No newline at end of file diff --git a/config_NLP/config.cfg b/config_NLP/config.cfg new file mode 100644 index 0000000..12647b3 --- /dev/null +++ b/config_NLP/config.cfg @@ -0,0 +1,21 @@ +[server] +MLFLOW_S3_ENDPOINT_URL = http://10.36.191.201:8002 +MLFLOW_TRACKING_URI = http://10.36.191.201:85 +LOCAL_MLFLOW_S3_ENDPOINT_URL = http://0.0.0.0:8002 +LOCAL_REMOTE_SERVER_URI = http://0.0.0.0:85 +ARTIFACT_PATH = s3://mlflow + +[project] +NAME = PROJECT_NLP + +[data] +DATA_PATH = tests/data_NLP/synthetic_example.csv + +[training] +MODEL_NAME = bert-base-uncased +LEARNING_RATE = 1e-5 +BATCH_SIZE = 32 +N_EPOCHS = 8 +KFOLD = True +N_FOLDS = 5 +RANDOM_STATE = 42 \ No newline at end of file diff --git a/config_NLP/local_config.cfg b/config_NLP/local_config.cfg new file mode 100644 index 0000000..81096aa --- /dev/null +++ b/config_NLP/local_config.cfg @@ -0,0 +1,23 @@ +[server] +MLFLOW_S3_ENDPOINT_URL = http://localhost:8002 +MLFLOW_TRACKING_URI = http://localhost:85 +ARTIFACT_PATH = s3://mlflow + +[project] +NAME = PROJECT_NLP + +[system] +USE_GPU = 0 + +[data] +DATA_PATH = tests/data_NLP/synthetic_example.csv + +[training] +MODEL_NAME = bert-base-uncased +LEARNING_RATE = 1e-5 +BATCH_SIZE = 32 +N_EPOCHS = 8 +N_FOLDS = 5 +RANDOM_STATE = 42 +KFOLD = False +SAVE_MODEL = False \ No newline at end of file diff --git a/project_NLP/NLPDataModule.py b/project_NLP/NLPDataModule.py new file mode 100644 index 0000000..07c6145 --- /dev/null +++ b/project_NLP/NLPDataModule.py @@ -0,0 +1,174 @@ +import pandas as pd +from torch.utils.data import DataLoader +import pytorch_lightning as pl +from project_NLP.NLPDataset import NLPDataset +import numpy as np +import pickle +from sklearn.model_selection import train_test_split +from sklearn.utils import resample +import mlflow + + +class NLPDataModule(pl.LightningDataModule): + def __init__( + self, + data_path, + tokenizer, + batch_size=8, + max_token_len=256, + num_workers=0, + fold_indices=None, + random_state=1, + ): + """Initialize the DataModule. Set batch size, data path, tokenizer, + maximum token length, label columns, sample status and number of workers. + + Args: + data_path (str): Path to the data file. + tokenizer: Tokenizer to be used. + batch_size (int, optional): Size of the data batches. Default is 8. + max_token_len (int, optional): Maximum length of tokens. Default is 256. + num_workers (int, optional): Number of workers. Default is 0. + sample (bool, optional): If True, use WeightedRandomSampler. Default is False. + """ + + super().__init__() + self.batch_size = batch_size + self.data_path = data_path + self.tokenizer = tokenizer + self.max_token_len = max_token_len + self.num_workers = num_workers + self.fold_indices = fold_indices + self.random_state = random_state + self.df = None + self.train_df = None + self.val_df = None + self.label_columns = None + self.train_dataset = None + self.val_dataset = None + self.read_csv_data() + + def setup(self, stage=None): + """ + Set up the data module. Parse the data and create train and validation datasets. + """ + self.parse_df_data() + if self.fold_indices: + train_indices, val_indices = self.fold_indices + self.train_df = self.df.iloc[train_indices] + self.val_df = self.df.iloc[val_indices] + else: + self.default_train_val_split() + + self.train_dataset = NLPDataset( + self.tokenizer, + self.train_df, + self.label_columns, + self.max_token_len, + ) + + self.val_dataset = NLPDataset( + self.tokenizer, + self.val_df, + self.label_columns, + self.max_token_len, + ) + + def train_dataloader(self): + """ + Create and return a data loader for the training data. + + Returns: + DataLoader: Data loader for the training data. + """ + + return DataLoader( + self.train_dataset, + batch_size=self.batch_size, + num_workers=self.num_workers, + shuffle=True, + ) + + def val_dataloader(self): + """ + Create and return a data loader for the validation data. + + Returns: + DataLoader: Data loader for the validation data. + """ + + return DataLoader( + self.val_dataset, + batch_size=self.batch_size, + num_workers=self.num_workers, + shuffle=False, + ) + + def read_csv_data(self): + self.df = pd.read_csv(self.data_path) + self.df = self.df.drop_duplicates() + + def parse_df_data(self): + """ + Parse the data. Create label dictionary and add 'label' and 'data_type' columns. + Split the data into train and validation dataframes. Store the label columns in a pickle file. + """ + + self.label_columns = self.df.ClassLabel.unique() + self.label_dict = {} + for index, possible_label in enumerate(self.label_columns): + self.label_dict[possible_label] = index + self.df[possible_label] = 0 + self.df.loc[self.df["ClassLabel"] == possible_label, [possible_label]] = 1 + + self.df["label"] = self.df.ClassLabel.replace(self.label_dict) + + self.length_label_dict = len(self.label_dict) + self.num_labels = self.length_label_dict + self.num_classes = len(list(set(self.df.label))) + + with open("label_columns.data", "wb") as filehandle: + pickle.dump(self.label_columns, filehandle) + + class_counts = self.df["ClassLabel"].value_counts() + for class_label, count in class_counts.items(): + mlflow.log_param(f"class_{class_label}_count", count) + + def default_train_val_split(self): + train_df, val_df = train_test_split( + self.df, + test_size=0.2, + random_state=self.random_state, + stratify=self.df["label"], + ) + self.train_df = train_df + self.val_df = val_df + + def steps_per_epoch(self): + """ + Calculate and return the number of steps per epoch based on the batch size. + + Returns: + int: Number of steps per epoch. + """ + if self.train_df is None or len(self.train_df) == 0: + self.parse_df_data() + self.default_train_val_split() + return len(self.train_df) // self.batch_size + + def dataset_stats(self, dataset) -> dict: + """ + Calculate and return a dictionary of dataset statistics including label distribution and number of samples. + + Args: + dataset (pandas.DataFrame): The dataset to analyze. + + Returns: + dict: Dictionary of dataset statistics. + """ + stats = {} + stats["n_samples"] = len(dataset) + label_counts = dataset["ClassLabel"].value_counts() + label_counts_dict = label_counts.to_dict() + stats["label_counts"] = label_counts_dict + return stats diff --git a/project_NLP/NLPDataset.py b/project_NLP/NLPDataset.py new file mode 100644 index 0000000..c899477 --- /dev/null +++ b/project_NLP/NLPDataset.py @@ -0,0 +1,75 @@ +import pandas as pd +import torch +from torch.utils.data import Dataset +from typing import Optional +from transformers import BertTokenizerFast as BertTokenizer + +import configparser + + +class NLPDataset(Dataset): + def __init__( + self, + tokenizer=None, + data: Optional[pd.DataFrame] = None, + label_columns: list = None, + max_token_len: int = 256, + ): + if tokenizer == None: + tokenizer = BertTokenizer.from_pretrained("bert-base-uncased") + else: + self.tokenizer = tokenizer + self.data = data + self.max_token_len = max_token_len + self.label_columns = label_columns + + def __len__(self): + return len(self.data) + + def __getitem__(self, index: int): + """ + Given an index, return a dictionary containing 'diag_final', 'input_ids', 'attention_mask' and 'labels' + after encoding a row of the data. + + Args: + index (int): Index of the data row. + + Returns: + dict: A dictionary containing 'diag_final', 'input_ids', 'attention_mask' and 'labels' + for the data row at the given index. + """ + + data_row = self.data.iloc[index] + diag_final = [data_row["TextString"]] + labels = data_row[self.label_columns] # nth index is nth class + encoding = self.encoder(diag_final) + return dict( + diag_final=diag_final, + input_ids=encoding["input_ids"].flatten(), + attention_mask=encoding["attention_mask"].flatten(), + labels=torch.FloatTensor(labels), + ) + + def encoder(self, diag_final): + """ + Encode a given list of text strings using the tokenizer set during initialization. + Args: + diag_final (list): List of text strings to be encoded. + + Returns: + dict: A dictionary containing the following keys: + - 'input_ids': Tensor of token ids obtained from the text strings. + - 'attention_mask': Tensor where positions with original tokens are represented by 1 and positions with + padding are represented by 0. + """ + + return self.tokenizer.batch_encode_plus( + diag_final, + add_special_tokens=True, + max_length=self.max_token_len, + return_token_type_ids=False, + padding="max_length", + truncation=True, + return_attention_mask=True, + return_tensors="pt", + ) diff --git a/project_NLP/NLPNetwork.py b/project_NLP/NLPNetwork.py new file mode 100644 index 0000000..e05ff07 --- /dev/null +++ b/project_NLP/NLPNetwork.py @@ -0,0 +1,262 @@ +import pytorch_lightning as pl +import torch +import torch.nn as nn +from torchmetrics.functional import auroc +from transformers import BertModel, AdamW, get_linear_schedule_with_warmup +from transformers import BertTokenizerFast as BertTokenizer +import mlflow +from torchmetrics.functional import precision, recall, f1_score +import matplotlib.pyplot as plt +import numpy as np +from sklearn import metrics + + +class NLPNetwork(pl.LightningModule): + + def __init__( + self, + n_classes: int, + n_training_steps=None, + n_warmup_steps=None, + learning_rate=None, + label_columns: list = None, + ): + super(NLPNetwork, self).__init__() + self.bert = BertModel.from_pretrained("bert-base-uncased", return_dict=True) + self.tokenizer = BertTokenizer.from_pretrained("bert-base-uncased") + self.classifier = nn.Linear(self.bert.config.hidden_size, n_classes) + self.n_training_steps = n_training_steps + self.n_classes = n_classes + self.n_warmup_steps = n_warmup_steps + self.criterion = nn.BCELoss() + self.learning_rate = learning_rate + self.label_columns = label_columns + self.training_step_outputs = [] + self.training_step_labels = [] + self.training_step_loss = [] + self.val_step_outputs = [] + self.val_step_labels = [] + self.val_step_loss = [] + self.val_step_id = [] + + def forward(self, input_ids, attention_mask, labels=None): + """ + The forward pass for the model. + + Args: + input_ids: Input features from the tokenizer. + attention_mask: Attention mask values. Identifies which tokens should be attended to by the model. + labels: Actual labels. Only provided during training. + + Returns: + loss: Loss calculated using the Binary Cross Entropy Loss function. + output: Output from the classifier. + """ + + output = self.bert(input_ids, attention_mask=attention_mask) + output = self.classifier(output.pooler_output) + output = torch.sigmoid(output) + loss = 0 + if labels is not None: + loss = self.criterion(output, labels) + return loss, output + + def training_step(self, batch, batch_idx): + """ + Defines a single step during training. It calculates loss, predictions and accuracy. + + Args: + batch: Data batch that is loaded from the DataLoader. + batch_idx: Index of the batch. + + Returns: + A dictionary containing loss, predictions, labels and accuracy for the step. + """ + + input_ids = batch["input_ids"] + attention_mask = batch["attention_mask"] + labels = batch["labels"] + loss, outputs = self(input_ids, attention_mask, labels) + accuracy = self.accuracy(outputs, labels) + self.training_step_outputs.append(outputs.detach().cpu()) + self.training_step_labels.append(labels.detach().cpu()) + self.training_step_loss.append(loss.detach().cpu()) + return { + "loss": loss, + "predictions": outputs, + "labels": labels, + "accuracy": accuracy, + } + + def validation_step(self, batch, batch_idx): + """ + Defines a single step during validation. Similar to the training step, it calculates loss, predictions, and accuracy. + + Args: + batch: Data batch that is loaded from the DataLoader. + batch_idx: Index of the batch. + + Returns: + A dictionary containing loss, predictions, labels and accuracy for the step. + """ + + input_ids = batch["input_ids"] + attention_mask = batch["attention_mask"] + labels = batch["labels"] + loss, outputs = self(input_ids, attention_mask, labels) + accuracy = self.accuracy(outputs, labels) + + self.val_step_id.append(batch["input_ids"].detach().cpu()) + self.val_step_outputs.append(outputs.detach().cpu()) + self.val_step_labels.append(labels.detach().cpu()) + self.val_step_loss.append(loss.detach().cpu()) + return { + "loss": loss, + "predictions": outputs, + "labels": labels, + "accuracy": accuracy, + } + + def accuracy(self, logits, labels): + """ + Compute accuracy. + + Args: + logits: Model's predictions. + labels: True labels. + + Returns: + Tensor value of the accuracy of the model. + """ + _, x = torch.max(logits.data, 1) + _, y = torch.max(labels.data, 1) + correct = (x == y).sum().item() + accuracy = correct / len(labels) + return torch.tensor(accuracy) + + def on_train_epoch_end(self): + """ + Operations to perform at the end of each training epoch. + """ + + avg_loss = torch.stack(self.training_step_loss).mean() + avg_acc = torch.stack( + [ + self.accuracy(outputs, labels) + for outputs, labels in zip( + self.training_step_outputs, self.training_step_labels + ) + ] + ).mean() + self.log("avg_train_loss", avg_loss) + self.log("avg_train_accuracy", avg_acc) + + self.training_step_outputs.clear() + self.training_step_labels.clear() + self.training_step_loss.clear() + + def on_validation_epoch_end(self): + """ + Operations to perform at the end of each validation epoch. Logs any relevant metrics/misclassified sentences to mlflow. + """ + + labels = [] + predictions = [] + input_ids = [] + for i in range(len(self.val_step_labels)): + if i == len(self.val_step_labels) - 1: + labels.append( + self.val_step_labels[i][: self.val_step_outputs[i].size(0)].int() + ) + predictions.append(self.val_step_outputs[i]) + input_ids.append(self.val_step_id[i]) + else: + labels.append(self.val_step_labels[i].int()) + predictions.append(self.val_step_outputs[i]) + input_ids.append(self.val_step_id[i]) + + labels = torch.cat(labels, dim=0) + predictions = torch.cat(predictions, dim=0) + + for i, name in enumerate(self.label_columns): + # Logging F1 score for each class + class_roc_auc = f1_score(predictions[:, i], labels[:, i], task="binary") + self.log(f"{name}_roc_auc/Validation", float(class_roc_auc)) + + # Logging Precision for each class + class_precision = precision(predictions[:, i], labels[:, i], task="binary") + self.log(f"{name}_precision/Validation", float(class_precision)) + + # Logging Recall for each class + class_recall = recall(predictions[:, i], labels[:, i], task="binary") + self.log(f"{name}_recall/Validation", float(class_recall)) + + # Log misclassified sentences + if self.trainer.current_epoch == self.trainer.max_epochs - 1: + misclassified_sentences = [] + for idx, (output, label) in enumerate(zip(predictions, labels)): + predicted_label = torch.argmax(output) + true_label = torch.argmax(label) + if (predicted_label != true_label) and (idx < len(input_ids[0])): + print(input_ids[0].shape) + print(predictions.shape) + print(labels.shape) + print("*" * 100) + + sentence = self.tokenizer.decode(input_ids[0][idx]) + sentence_info = { + "sentence": sentence, + "predicted_label": predicted_label.item(), + "true_label": true_label.item(), + } + misclassified_sentences.append(sentence_info) + + file_path = "misclassified_sentences.txt" + with open(file_path, "w", encoding="utf-8") as file: + for idx, sentence_info in enumerate(misclassified_sentences): + file.write( + f"Misclassified Sentence {idx+1}: {sentence_info['sentence']}\n" + ) + file.write( + f"Predicted Label {idx+1}: {sentence_info['predicted_label']}\n" + ) + file.write( + f"True Label {idx+1}: {sentence_info['true_label']}\n\n" + ) + + # Log the misclassified sentences file as an artifact in MLflow + mlflow.log_artifact(file_path, artifact_path="misclassified_sentences") + + avg_loss = torch.stack(self.val_step_loss).mean() + self.log("avg_val_loss", avg_loss) + avg_acc = torch.stack( + [ + self.accuracy(outputs, labels) + for outputs, labels in zip(self.val_step_outputs, self.val_step_labels) + ] + ).mean() + self.log("avg_val_accuracy", avg_acc) + self.val_step_outputs.clear() + self.val_step_labels.clear() + self.val_step_loss.clear() + self.val_step_id.clear() + + def configure_optimizers(self): + """ + Configures the optimizer and learning rate scheduler for the training. + + Returns: + A dictionary containing optimizer and lr_scheduler. + """ + + optimizer = AdamW(self.parameters(), lr=self.learning_rate) + + scheduler = get_linear_schedule_with_warmup( + optimizer, + num_warmup_steps=self.n_warmup_steps, + num_training_steps=self.n_training_steps, + ) + + return dict( + optimizer=optimizer, lr_scheduler=dict(scheduler=scheduler, interval="step") + ) diff --git a/project_NLP/__init__.py b/project_NLP/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/project_NLP/utils/Wrapper.py b/project_NLP/utils/Wrapper.py new file mode 100644 index 0000000..72ae769 --- /dev/null +++ b/project_NLP/utils/Wrapper.py @@ -0,0 +1,49 @@ +""" +mlflow PythonModel wrapper class for themodels. +This class is a custom wrapper that uses mlflow's PythonModel class for serving the models. +""" + +import mlflow +import pandas as pd +from torch import topk +from project_NLP.NLPDataset import NLPDataset +import logging +import pickle + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class Wrapper(mlflow.pyfunc.PythonModel): + def __init__(self, model, tokenizer): + self.model = model + with open("label_columns.data", "rb") as filehandle: + # read the data as binary data stream + label_columns = pickle.load(filehandle) + self.dataset = NLPDataset(tokenizer=tokenizer, label_columns=label_columns) + + def predict(self, context, model_input): + logger.info(f"Running prediction service: {model_input}") + encoding = self.dataset.encoder(model_input.TextString.tolist()) + + logger.info(f"Running inference") + _, test_prediction = self.model( + encoding["input_ids"], encoding["attention_mask"] + ) + res = topk(test_prediction, 1).indices.tolist() + + logger.info(f"inference results: {test_prediction} -- res {res}") + confidences = pd.DataFrame( + test_prediction.tolist(), columns=self.dataset.label_columns + ) + prediction = pd.DataFrame( + {"Prediction": [self.dataset.label_columns[x[0]] for x in res]} + ) + + results = pd.concat([prediction, confidences], axis=1) + logger.info(f"Inference complete, results: {results}") + return results + + +# https://www.alexanderjunge.net/blog/mlflow-sagemaker-deploy/ +# https://docs.databricks.com/_static/notebooks/mlflow/mlflow-end-to-end-example.html diff --git a/project_NLP/utils/tools.py b/project_NLP/utils/tools.py new file mode 100644 index 0000000..100e7c1 --- /dev/null +++ b/project_NLP/utils/tools.py @@ -0,0 +1,36 @@ +import mlflow +from project_NLP.utils.Wrapper import Wrapper +import pandas as pd + + +def wrap_and_log(model, tokenizer): + """ + This function wraps and logs the trained model using MLflow. + + 1. freezes the model to ensure the model weights aren't updated anymore. + 2. wraps the model to simplify the model's API for easier inference. + 3. A test DataFrame is prepared to infer the model's signature which defines the schema of the model's inputs and outputs. + 4. the model artifact along with its metadata (including the model signature, any associated code, + and pip requirements) is logged to the current MLflow run. + + Args: + model (nn.Module): The trained model to be logged. + tokenizer (Tokenizer): The tokenizer used during the model training. + Returns: + None + """ + + model.eval() + model.freeze() + wrappedModel = Wrapper(model, tokenizer) + test_df = pd.DataFrame(["Test string 1", "Test string 2"], columns=["TextString"]) + signature = mlflow.models.signature.infer_signature( + test_df, wrappedModel.predict(None, test_df) + ) + mlflow.pyfunc.log_model( + "project_nlp", + python_model=wrappedModel, + signature=signature, + code_path=["project_NLP/", "deployment/", "application/", "config/"], + pip_requirements="requirements.txt", + ) diff --git a/scripts_NLP/__init__.py b/scripts_NLP/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts_NLP/train_NLP.py b/scripts_NLP/train_NLP.py new file mode 100644 index 0000000..82975d7 --- /dev/null +++ b/scripts_NLP/train_NLP.py @@ -0,0 +1,192 @@ +import os +import logging +import sys +import configparser +import torch +import mlflow +import pytorch_lightning as pl +from torch.cuda import is_available as cuda_available +from transformers import BertTokenizerFast as BertTokenizer +from pytorch_lightning.callbacks import ModelCheckpoint +from sklearn.model_selection import KFold +import datetime + +from project_NLP.NLPDataModule import NLPDataModule +from project_NLP.NLPNetwork import NLPNetwork +from project_NLP.utils.tools import wrap_and_log + +mlflow.set_tracking_uri("http://localhost:85") + + +def train( + data_path, + n_epochs=1, + batch_size=6, + model_name="bert-base-uncased", + learning_rate=1e-5, + use_kfold=False, + n_folds=0, + random_state=42, + save_model=False, +): + """ + This function is used to train any of the EndominerAi models (which model depends on the config). + The function uses PyTorch Lightning's built-in ModelCheckpoint callback to save the best model (based on validation + accuracy). + The best trained model is then wrapped and logged to MLFlow for deployment. + + Args: + data_path (str): The path to the data. + n_epochs (int, optional): The number of epochs to train for. Defaults to 1. + batch_size (int, optional): The batch size for training. Defaults to 6. + + Returns: + None + """ + tokenizer = BertTokenizer.from_pretrained(model_name) + data_module = NLPDataModule( + data_path=data_path, + tokenizer=tokenizer, + batch_size=batch_size, + max_token_len=512, + random_state=random_state, + ) + + data_module.setup() + current_time = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + mlflow.end_run() + + with mlflow.start_run(run_name="Parent_Run_{current_time}"): + if use_kfold and n_folds > 1: + + kfold = KFold(n_splits=n_folds, shuffle=True, random_state=42) + for fold, (train_idx, val_idx) in enumerate(kfold.split(data_module.df)): + logging.info(f"Training fold {fold+1}/{n_folds}") + + run_name = f"{'KFold_' + str(fold + 1) if fold >= 0 else 'Train'}_{current_time}" + + mlflow.pytorch.autolog(log_models=False) + with mlflow.start_run(run_name=run_name, nested=True): + + # Initialize DataModule for current fold + data_module = NLPDataModule( + data_path=data_path, + tokenizer=tokenizer, + batch_size=batch_size, + max_token_len=512, + fold_indices=(train_idx, val_idx), + random_state=random_state, + ) + data_module.setup() + total_training_steps = data_module.steps_per_epoch() * n_epochs + warmup_steps = total_training_steps // 5 + + model = NLPNetwork( + n_classes=data_module.num_classes, + n_warmup_steps=warmup_steps, + n_training_steps=total_training_steps, + learning_rate=learning_rate, + label_columns=data_module.label_columns, + ) + + mlflow_logging_and_checkpoint( + model=model, + data_module=data_module, + fold=-1, + n_epochs=n_epochs, + tokenizer=tokenizer, + run_name=run_name, + save=save_model, + ) + + else: + mlflow.pytorch.autolog(log_models=False) + with mlflow.start_run(run_name="child_Run_{current_time}", nested=True): + + total_training_steps = data_module.steps_per_epoch() * n_epochs + warmup_steps = total_training_steps // 5 + + model = NLPNetwork( + n_classes=data_module.length_label_dict, + n_warmup_steps=warmup_steps, + n_training_steps=total_training_steps, + learning_rate=learning_rate, + label_columns=data_module.label_columns, + ) + + run = mlflow.active_run() + run_name = run.info.run_id + # Setup MLflow and checkpointing for standard training + mlflow_logging_and_checkpoint( + model=model, + data_module=data_module, + fold=-1, + n_epochs=n_epochs, + tokenizer=tokenizer, + run_name=run_name, + save=save_model, + ) + + +def mlflow_logging_and_checkpoint( + model, data_module, fold, n_epochs, tokenizer, run_name, save +): + + checkpoint_callback = ModelCheckpoint( + monitor="avg_val_accuracy", + dirpath="./checkpoints/", + filename=f"{run_name}-best_model", + save_top_k=0, + mode="max", + ) + + trainer = pl.Trainer( + callbacks=[checkpoint_callback], + max_epochs=n_epochs, + logger=True, + accelerator="auto", + devices=1, + log_every_n_steps=10, + ) + + trainer.fit(model, data_module) + + # Log the best model for each fold or standard training run + if save: + print(f"Logging best model for {run_name}") + checkpoint = torch.load(checkpoint_callback.best_model_path) + model.load_state_dict(checkpoint["state_dict"]) + wrap_and_log(model, tokenizer) + + +if __name__ == "__main__": + if len(sys.argv) > 0: + config_path = sys.argv[1] + else: + config_path = "config_NLP/local_config.cfg" + + config = configparser.ConfigParser() + config.read(config_path) + + data_path = config["data"]["DATA_PATH"] + learning_rate = float(config["training"]["LEARNING_RATE"]) + batch_size = int(config["training"]["BATCH_SIZE"]) + model_name = config["training"]["MODEL_NAME"] + n_epochs = int(config["training"]["N_EPOCHS"]) + use_kfold = config.getboolean("training", "KFOLD") + n_folds = int(config["training"]["N_FOLDS"]) if use_kfold else 0 + save = config["training"]["SAVE_MODEL"] + + random_state = int(config["training"]["RANDOM_STATE"]) + + train( + data_path, + n_epochs, + batch_size, + model_name, + learning_rate, + use_kfold, + n_folds, + random_state=random_state, + save_model=save, + ) diff --git a/tests/data_NLP/synthetic_example.csv b/tests/data_NLP/synthetic_example.csv new file mode 100644 index 0000000..39b9848 --- /dev/null +++ b/tests/data_NLP/synthetic_example.csv @@ -0,0 +1,16 @@ +ClassLabel,TextString +A,The patient shows mild symptoms of fatigue and lethargy. +A,No significant abnormalities were found during the examination. +A,A slight increase in iron levels was observed. +A,The patient reports occasional stomach discomfort. +A,Blood tests indicate borderline anemia. +B,There are no signs of infection or inflammation in the bloodwork. +B,An abnormal growth was detected in the gastrointestinal tract. +B,Patient complains of recurring chest pains. +B,The biopsy results showed mild inflammation in the tissues. +B,The colonoscopy revealed several polyps in the lower intestine. +C,The patient exhibits severe symptoms of jaundice and abdominal pain. +C,Advanced cirrhosis was confirmed during the imaging tests. +C,Liver function tests indicate significant deterioration. +C,There is evidence of ascites in the abdominal cavity. +C,The patient has a history of alcohol-related liver disease. From 380085c0f1f1cee285211511b3792bf9a1062b2e Mon Sep 17 00:00:00 2001 From: AgatheZ Date: Thu, 17 Oct 2024 14:45:39 +0100 Subject: [PATCH 2/8] update requirements --- requirements.txt | 40 ++++++++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/requirements.txt b/requirements.txt index d24b654..ba5ec0b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,30 @@ -csc-mlops -torch -torchmetrics -torchvision -pytorch_lightning -ray -timm -pytest -pytest-cov -flake8 +# Base requirements +pytorch-lightning==2.2.1 +torch==2.0.1 +csc-mlops +mlflow==2.6.0 +torchvision==0.15.2 +boto3 +seaborn==0.13.2 + +scikit-learn +matplotlib==3.7.2 +tqdm==4.67.0 +dask>=2023.9.1 +pandas>=2.1.1 +transformers==4.34.0 +tune==0.1.2 +torchmetrics==1.0.2 + +# Preprocessing requirements +rapidfuzz==3.3.0 +PySocks==1.7.1 + +# Automated tests +pytest>=7.4.2 +pytest-cov==4.1.0 +Faker==19.10.0 +fugue==0.8.9 +fugue-sql-antlr==0.1.10 +hypothesis==6.83.1 +numpy==1.26.0 \ No newline at end of file From 8cced0686bc99a07e340296dd784b2b306d0a9df Mon Sep 17 00:00:00 2001 From: AgatheZ Date: Thu, 17 Oct 2024 15:11:34 +0100 Subject: [PATCH 3/8] update requ --- requirements.txt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/requirements.txt b/requirements.txt index ba5ec0b..45cd6fb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,7 +9,7 @@ seaborn==0.13.2 scikit-learn matplotlib==3.7.2 -tqdm==4.67.0 +tqdm dask>=2023.9.1 pandas>=2.1.1 transformers==4.34.0 @@ -24,7 +24,7 @@ PySocks==1.7.1 pytest>=7.4.2 pytest-cov==4.1.0 Faker==19.10.0 -fugue==0.8.9 -fugue-sql-antlr==0.1.10 +fugue +fugue-sql-antlr hypothesis==6.83.1 numpy==1.26.0 \ No newline at end of file From 4d0809b54e7391195d545164788c77ff579010a5 Mon Sep 17 00:00:00 2001 From: AgatheZ Date: Thu, 17 Oct 2024 15:15:13 +0100 Subject: [PATCH 4/8] update --- config_NLP/config.cfg | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/config_NLP/config.cfg b/config_NLP/config.cfg index 12647b3..e900b3a 100644 --- a/config_NLP/config.cfg +++ b/config_NLP/config.cfg @@ -18,4 +18,5 @@ BATCH_SIZE = 32 N_EPOCHS = 8 KFOLD = True N_FOLDS = 5 -RANDOM_STATE = 42 \ No newline at end of file +RANDOM_STATE = 42 +SAVE_MODEL = True \ No newline at end of file From cfa07346d88cd1dbb1607943ad242c4512f31ed2 Mon Sep 17 00:00:00 2001 From: AgatheZ Date: Thu, 17 Oct 2024 15:47:29 +0100 Subject: [PATCH 5/8] remove error --- scripts_NLP/train_NLP.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/scripts_NLP/train_NLP.py b/scripts_NLP/train_NLP.py index 82975d7..9401804 100644 --- a/scripts_NLP/train_NLP.py +++ b/scripts_NLP/train_NLP.py @@ -15,8 +15,6 @@ from project_NLP.NLPNetwork import NLPNetwork from project_NLP.utils.tools import wrap_and_log -mlflow.set_tracking_uri("http://localhost:85") - def train( data_path, @@ -56,6 +54,7 @@ def train( current_time = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") mlflow.end_run() + # KFold, data preparation with mlflow.start_run(run_name="Parent_Run_{current_time}"): if use_kfold and n_folds > 1: From 0f4a443675795a3546d92672a0ec3586ab257377 Mon Sep 17 00:00:00 2001 From: AgatheZ Date: Thu, 17 Oct 2024 15:51:52 +0100 Subject: [PATCH 6/8] update model saving --- scripts_NLP/train_NLP.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts_NLP/train_NLP.py b/scripts_NLP/train_NLP.py index 9401804..4e15363 100644 --- a/scripts_NLP/train_NLP.py +++ b/scripts_NLP/train_NLP.py @@ -135,7 +135,7 @@ def mlflow_logging_and_checkpoint( monitor="avg_val_accuracy", dirpath="./checkpoints/", filename=f"{run_name}-best_model", - save_top_k=0, + save_top_k=1, mode="max", ) From f25f9411dbd393c6e5e4d726c740134d8ff00b06 Mon Sep 17 00:00:00 2001 From: AgatheZ Date: Thu, 17 Oct 2024 15:53:40 +0100 Subject: [PATCH 7/8] update tools --- project_NLP/utils/tools.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project_NLP/utils/tools.py b/project_NLP/utils/tools.py index 100e7c1..ff0873f 100644 --- a/project_NLP/utils/tools.py +++ b/project_NLP/utils/tools.py @@ -31,6 +31,6 @@ def wrap_and_log(model, tokenizer): "project_nlp", python_model=wrappedModel, signature=signature, - code_path=["project_NLP/", "deployment/", "application/", "config/"], + code_path=["project_NLP/", "config/"], pip_requirements="requirements.txt", ) From af1b8564de6db043a6f8525360e337e243506991 Mon Sep 17 00:00:00 2001 From: AgatheZ Date: Thu, 17 Oct 2024 15:58:22 +0100 Subject: [PATCH 8/8] remove prints --- config_NLP/config.cfg | 2 +- project_NLP/NLPNetwork.py | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/config_NLP/config.cfg b/config_NLP/config.cfg index e900b3a..a2e7944 100644 --- a/config_NLP/config.cfg +++ b/config_NLP/config.cfg @@ -19,4 +19,4 @@ N_EPOCHS = 8 KFOLD = True N_FOLDS = 5 RANDOM_STATE = 42 -SAVE_MODEL = True \ No newline at end of file +SAVE_MODEL = False \ No newline at end of file diff --git a/project_NLP/NLPNetwork.py b/project_NLP/NLPNetwork.py index e05ff07..e1fe256 100644 --- a/project_NLP/NLPNetwork.py +++ b/project_NLP/NLPNetwork.py @@ -198,11 +198,6 @@ def on_validation_epoch_end(self): predicted_label = torch.argmax(output) true_label = torch.argmax(label) if (predicted_label != true_label) and (idx < len(input_ids[0])): - print(input_ids[0].shape) - print(predictions.shape) - print(labels.shape) - print("*" * 100) - sentence = self.tokenizer.decode(input_ids[0][idx]) sentence_info = { "sentence": sentence,