diff --git a/TrainingDriver.py b/TrainingDriver.py index cc8a654..3540486 100755 --- a/TrainingDriver.py +++ b/TrainingDriver.py @@ -15,7 +15,7 @@ from nnlo.mpi.manager import MPIManager, get_device from nnlo.train.algo import Algo -from nnlo.train.data import H5Data +from nnlo.train.data import H5Data, FrameData from nnlo.train.model import ModelFromJson, ModelTensorFlow, ModelPytorch from nnlo.util.utils import import_keras from nnlo.util.timeline import Timeline @@ -57,6 +57,7 @@ def add_downpour_options(parser): def add_loader_options(parser): + parser.add_argument('--data-loader',help='Data loader to load the input files',default='h5py', dest='data_loader') parser.add_argument('--preload-data', help='Preload files as we read them', default=0, type=int, dest='data_preload') parser.add_argument('--cache-data', help='Cache the input files to a provided directory', default='', dest='caching_dir') parser.add_argument('--copy-command', help='Specific command line to copy the data into the cache. Expect a string with two {} first is the source (from input file list), second is the bare file name at destination. Like "cp {} {}"', default=None, dest='copy_command') @@ -136,13 +137,27 @@ def add_train_options(parser): add_checkpoint_options(parser) def make_loader( args, features_name, labels_name, train_list): - data = H5Data( batch_size=args.batch, - cache = args.caching_dir, - copy_command = args.copy_command, - preloading = args.data_preload, - features_name=features_name, - labels_name=labels_name, - ) + + if 'dataframe' in args.data_loader: + + data = FrameData(batch_size=args.batch, + feature_adaptor = features_name[1], + cache = args.caching_dir, + copy_command = args.copy_command, + preloading = None, #args.data_preload, + frame_name=features_name[0], + labels_name=labels_name, + ) + else: + + data = H5Data( batch_size=args.batch, + cache = args.caching_dir, + copy_command = args.copy_command, + preloading = args.data_preload, + features_name=features_name, + labels_name=labels_name, + ) + # We initialize the Data object with the training data list # so that we can use it to count the number of training examples data.set_full_file_names( train_list ) @@ -230,13 +245,16 @@ def make_features_labels(m_module, args): args = parser.parse_args() initialize_logger(filename=args.log_file, file_level=args.log_level, stream_level=args.log_level) + + + a_backend = args.backend if 'torch' in args.model: a_backend = 'torch' m_module = __import__(args.model.replace('.py','').replace('/', '.'), fromlist=[None]) if '.py' in args.model else None (features_name, labels_name) = make_features_labels(m_module, args) - (train_list, val_list) = make_train_val_lists(m_module, args) + (train_list, val_list) = make_train_val_lists(m_module, args) comm = MPI.COMM_WORLD.Dup() if args.timeline: Timeline.enable() @@ -287,9 +305,9 @@ def make_features_labels(m_module, args): model_builder = ModelTensorFlow( comm, source=args.model, weights=model_weights) - + data = make_loader(args, features_name, labels_name, train_list) - + # Some input arguments may be ignored depending on chosen algorithm algo = make_algo( args, use_tf, comm, validate_every=int(data.count_data()/args.batch )) diff --git a/examples/example_nlp.py b/examples/example_nlp.py new file mode 100644 index 0000000..b5e9b07 --- /dev/null +++ b/examples/example_nlp.py @@ -0,0 +1,243 @@ +# Constants +PATH_DATA = '/storage/group/gpu/bigdata/CMSOpPred/' +N_CODES = 77 +N_SITES = 81 +N_COUNTS = 2 +N_WORDS = 30674 +MAX_WORDS = 400 + + +def make_count_model(**args): + + from keras.layers import Input, Flatten, Dense, Dropout, Reshape, multiply + from keras.regularizers import l2 + from keras.models import Model + if args:logging.debug("receiving arguments {}".format(args)) + + dense_layers = args.get('dense_layers', 3) + dense_units = args.get('dense_units', 50) + l2reg = args.get('l2reg', 0.001) + dropout = args.get('dropout', 0.001) + + + m_input = Input((N_CODES, N_SITES, N_COUNTS)) + + m = m_input + + m = Flatten()(m) + for _ in range( dense_layers ): + m = Dense( units = dense_units, activation='relu', + kernel_regularizer=l2(l2reg)) (m) + m = Dropout(dropout)(m) + + m_output = Dense( units=1, activation='sigmoid')(m) + + model = Model(inputs=m_input, outputs=m_output) + return model + + +def make_nlp_model(**args): + + from keras.layers import Embedding, Input, Dense, GRU, TimeDistributed, Dropout, Flatten, Reshape, Concatenate + from keras.regularizers import l2 + from keras.models import Model + if args:logging.debug("receiving arguments {}".format(args)) + + # Hyper parameter + rnn_units = args.get('rnn_units', 10) + embedding_dim = args.get('embedding_dim', 20) + l2_reg = args.get('l2_reg', 0.) + rec_do = args.get('rec_do', 0.) + dense_layers = args.get('dense_layers', 3) + dense_units = args.get('dense_units', 50) + site_units = args.get('site_units', 100) + do = args.get('do', 0.) + + # Constants + encode_sites = False + + # Word encoder model + words_input = Input(shape = ( None, ), dtype='int32') + words_embedding = Embedding(N_WORDS, embedding_dim, mask_zero = True)(words_input) + words_gru = GRU(rnn_units, kernel_regularizer=l2(l2_reg), recurrent_dropout = rec_do)(words_embedding) + wordEncoder = Model(words_input, words_gru) + + # Full model + sent_input = Input(shape = (N_CODES * N_SITES, None), dtype='int32') + count_input = Input(shape = (N_CODES, N_SITES, 2, ), dtype='float32') + sent_encoded = TimeDistributed(wordEncoder)(sent_input) + sent_encoded_reshaped = Reshape(( N_CODES , N_SITES, rnn_units))(sent_encoded) + concat_counts_sent = Concatenate(axis=3)([sent_encoded_reshaped, count_input]) + if encode_sites: + codes_reshaped = Reshape(( N_CODES , N_SITES * (rnn_units + N_COUNTS)))(concat_counts_sent) + sites_encoded = TimeDistributed(Dense(site_units, activation = 'relu', kernel_regularizer=l2(l2_reg)))(codes_reshaped) + flat = Flatten()(sites_encoded) + else: + flat = Flatten()(concat_counts_sent) + dense = flat + for _ in range(dense_layers): + dense = Dense( dense_units, activation='relu', kernel_regularizer=l2(l2_reg) )(dense) + dense = Dropout(do)(dense) + preds = Dense(1, activation='sigmoid')(dense) + model = Model([sent_input, count_input], preds) + + return model + + +get_model = make_nlp_model + + + + +import numpy as np +import pickle +with open('/storage/user/llayer/NNLO/index.pickle', 'rb') as handle: + sites_dict = pickle.load(handle) + codes_dict = pickle.load(handle) + +def to_dense(np_msg, np_counts, index, values): + + errors, sites, counts, site_states, error_messages = values + + # Loop over the codes and sites + for i_key in range(len(errors)): + + error = errors[i_key] + site = sites[i_key] + count = counts[i_key] + site_state = site_states[i_key] + + # Fill counts + if site_state == 'good': + site_state_encoded = 0 + else: + site_state_encoded = 1 + np_counts[index, codes_dict[error], sites_dict[site], site_state_encoded] += count + + # Fill the error messages + error_message = error_messages[i_key] + # Only continue if there exists a message + if isinstance(error_message, (list,)): + + # Cut/Pad the error message + error_message = np.array(error_message) + pad_size = np_msg.shape[3] - error_message.shape[0] + if pad_size < 0: + error_message = error_message[-np_msg.shape[3] : ] + else: + npad = (0, pad_size) + error_message = np.pad(error_message, pad_width=npad, mode='constant', constant_values=int(0)) + + #print( error_message ) + np_msg[index, codes_dict[error], sites_dict[site]] = error_message + + +def batch_generator( batch ): + + batch_size = len(batch) + tokens_key = 'msg_encoded' + + # Loop over the messages to find the longest one + padding_dim = 1 + for messages in batch[tokens_key]: + for msg in messages: + if isinstance(msg, (list,)): + if len(msg) > padding_dim: + padding_dim = len(msg) + + # Limit to the maximum number of words + if padding_dim > MAX_WORDS: + padding_dim = MAX_WORDS + + # Setup the numpy matrix + np_msg = np.zeros((batch_size, N_CODES, N_SITES, padding_dim), dtype=np.int32) + np_counts = np.zeros((batch_size, N_CODES, N_SITES, N_COUNTS), dtype=np.int32) + + # Fill the matrix + [to_dense(np_msg, np_counts, counter, values) for counter, values in enumerate(zip(batch['error'], + batch['site'], + batch['count'], + batch['site_state'], + batch[tokens_key]))] + + # Reshape the error site matrix for the messages + np_msg = np_msg.reshape((batch_size, N_CODES * N_SITES, padding_dim)) + + # Return the matrix + return [np_msg, np_counts] + + + +from skopt.space import Real, Integer, Categorical +get_model.parameter_range = [ + Real( low=1e-3, high=0.1, prior='log-uniform', name='do' ), + Real( low=1e-4, high=0.9, prior="log-uniform", name='l2_reg' ), + Integer( low=5, high=32, name='embedding_dim' ), + Integer( low=5, high=20, name='rnn_units' ), + #Integer( low=5, high = 20, name = 'site_units' ), + Integer( low=1, high=5, name='dense_layers' ), + Integer( low=10, high=100, name='dense_units' ), +] + + + + + +def get_name(): + return 'nlp' + +def get_train(): + + return [PATH_DATA + 'train_0.h5', PATH_DATA + 'train_1.h5', PATH_DATA + 'train_2.h5'] + +def get_val(): + + return [PATH_DATA + 'test_0.h5', PATH_DATA + 'test_1.h5', PATH_DATA + 'test_2.h5'] + +def get_features(): + return ('frame', batch_generator) ##example of data adaptor + +def get_labels(): + return 'label' + + +if __name__ == "__main__": + + model = get_model() + model.summary() + + import pandas as pd + # Open a frame + path = PATH_DATA + 'train_0.h5' + frame = pd.read_hdf(path, 'frame') + print( frame.head() ) + + # Get a batch + start = 0 + batch_size = 2 + batch = frame.iloc[start: start+batch_size] + matrix = batch_generator( batch ) + print( matrix[0].shape, matrix[1].shape ) + matrix_msg = matrix[0].reshape((batch_size, N_CODES, N_SITES, matrix[0].shape[2])) + + # Fast check that the matrix is filled correctly + def print_sample( batch, index ): + + sample = batch.iloc[index] + errors = sample['error'] + sites = sample['site'] + message = sample['msg_encoded'] + print( errors ) + print( sites ) + print( message ) + + for i_key in range(len(errors)): + + print( 'Index error', errors[i_key], ':', codes_dict[errors[i_key]], + 'Index site', sites[i_key], ':', sites_dict[sites[i_key]] ) + print( 'Inserted in matrix' ) + print( matrix_msg[index, codes_dict[errors[i_key]], sites_dict[sites[i_key]]] ) + + print_sample( batch, 1 ) + + diff --git a/nnlo/train/data.py b/nnlo/train/data.py index 2d18cfd..1f8498f 100644 --- a/nnlo/train/data.py +++ b/nnlo/train/data.py @@ -1,6 +1,7 @@ ### Data class and associated helper methods import numpy as np +import pandas as pd import h5py import os import time @@ -191,6 +192,75 @@ def load_data(self, in_file): Not implemented in base class; derived classes should implement this function""" raise NotImplementedError + + +class FrameData(Data): + """ Load pandas frame stored in hdf5 files """ + def __init__(self, batch_size, + feature_adaptor, + cache=None, + copy_command=None, + preloading=0, + frame_name='frame', + labels_name='label'): + """Initializes and stores names of feature and label datasets""" + super(FrameData, self).__init__(batch_size,cache,copy_command) + self.feature_adaptor = feature_adaptor + self.frame_name = frame_name + self.labels_name = labels_name + ## initialize the data-preloader + self.fpl = None + + def load_data(self, in_file_name): + frame = pd.read_hdf(in_file_name, self.frame_name) + return frame + + def count_data(self): + num_data = 0 + for in_file_name in self.file_names: + frame = pd.read_hdf(in_file_name, self.frame_name) + num_data += len(frame) + return num_data + + def concat_data(self, data1, data2): + return pd.concat([data1, data2]) + + def generate_data(self): + """ + Overwrite the the parent generate_data and adapt to pandas frames + """ + leftovers = None + for cur_file_name in self.file_names: + cur_frame = self.load_data(cur_file_name) + # concatenate any leftover data from the previous file + if leftovers is not None: + cur_frame = self.concat_data( leftovers, cur_frame ) + leftovers = None + num_in_file = len(cur_frame) + for cur_pos in range(0, num_in_file, self.batch_size): + next_pos = cur_pos + self.batch_size + if next_pos <= num_in_file: + yield ( self.get_batch( cur_frame, cur_pos, next_pos ), + cur_frame[self.labels_name].iloc[cur_pos : next_pos].values) + else: + leftovers = cur_frame.iloc[cur_pos : num_in_file] + + def get_batch(self, cur_frame, start_pos, end_pos ): + """ + Convert the batch of the dataframe to a numpy array + with the provided function + """ + #print( 'Gen batch' ) + batch = cur_frame.iloc[start_pos : end_pos] + return self.feature_adaptor( batch ) + + def finalize(self): + if self.fpl: + self.fpl.stop() + Data.finalize(self) + + + class H5Data(Data): """Loads data stored in hdf5 files Attributes: