-
Notifications
You must be signed in to change notification settings - Fork 132
Merge latest changes into master from dev (quantization and embedding model) #90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
7c1a444
8e1d461
38db1c7
dcd9e0f
c6c1941
83c941c
8929efc
1e240a6
9c37148
d5b4065
2c24733
3fbe38f
22c903a
1c74de7
4627e38
8d019db
9a507d8
4bbe318
d17442c
5a0af23
211bab3
5c940a5
e5e01d0
34adeee
d7ade7f
d9f453a
e1a4b88
b6ba6a7
bdd2742
b726749
6c86d20
04a1653
5b28dc7
1c2216e
3a52fe6
598dd80
6d94842
592c7ec
0c4a3e6
591fa02
bbe68e7
ba1cc4b
2303c31
f0805ef
ae336cc
93e84e5
ae30186
24e881a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,7 +8,9 @@ _static | |
| _templates | ||
| .DS_store | ||
| .venv | ||
| *.swo | ||
| *.swp | ||
| *.swn | ||
| *.pyc | ||
| .cache/ | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,5 @@ | ||
| language: python | ||
| python: | ||
| - "2.7" | ||
| - "3.6" | ||
|
|
||
| addons: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,7 @@ | ||
| from .dataset import Dataset | ||
| from .dataset import AuxTables | ||
| from .dataset import CellStatus | ||
| from .dataset import Source | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is a source? |
||
| from .dataset import Table | ||
|
|
||
| __all__ = ['Dataset', 'AuxTables', 'CellStatus'] | ||
| __all__ = ['Dataset', 'AuxTables', 'CellStatus', 'Table', 'Source'] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ | |
| import os | ||
| import time | ||
|
|
||
| import numpy as np | ||
| import pandas as pd | ||
|
|
||
| from .dbengine import DBengine | ||
|
|
@@ -25,12 +26,12 @@ class CellStatus(Enum): | |
| WEAK_LABEL = 1 | ||
| SINGLE_VALUE = 2 | ||
|
|
||
|
|
||
| class Dataset: | ||
| """ | ||
| This class keeps all dataframes and tables for a HC session. | ||
| """ | ||
| def __init__(self, name, env): | ||
| self.env = env | ||
| self.id = name | ||
| self.raw_data = None | ||
| self.repaired_data = None | ||
|
|
@@ -58,9 +59,25 @@ def __init__(self, name, env): | |
| self.single_attr_stats = {} | ||
| # Domain stats for attribute pairs | ||
| self.pair_attr_stats = {} | ||
| # Active attributes (attributes with errors) | ||
| self._active_attributes = None | ||
| # Attributes to train on | ||
| self.train_attrs = env["train_attrs"] | ||
|
|
||
| # Embedding model for learned embedding vectors of domain values and | ||
| # tuple context | ||
| self._embedding_model = None | ||
|
|
||
| # Numerical attribute list, all strings | ||
| self.numerical_attrs = None | ||
| self.categorical_attrs = None | ||
|
|
||
| self.quantized_data = None | ||
| self.do_quantization = False | ||
|
|
||
| # TODO(richardwu): load more than just CSV files | ||
| def load_data(self, name, fpath, na_values=None, entity_col=None, src_col=None): | ||
| def load_data(self, name, fpath, na_values=None, entity_col=None, src_col=None, | ||
| exclude_attr_cols=None, numerical_attrs=None, store_to_db=True): | ||
| """ | ||
| load_data takes a CSV file of the initial data, adds tuple IDs (_tid_) | ||
| to each row to uniquely identify an 'entity', and generates unique | ||
|
|
@@ -78,16 +95,22 @@ def load_data(self, name, fpath, na_values=None, entity_col=None, src_col=None): | |
| :param src_col: (str) if not None, for fusion tasks | ||
| specifies the column containing the source for each "mention" of an | ||
| entity. | ||
| :param exclude_attr_cols: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what are the types of these inputs? format them appropriately. |
||
| :param numerical_attrs: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above. |
||
| """ | ||
| tic = time.clock() | ||
| try: | ||
| # Do not include TID and source column as trainable attributes | ||
| exclude_attr_cols = ['_tid_'] | ||
| if exclude_attr_cols is None: | ||
| exclude_attr_cols = ['_tid_'] | ||
| else: | ||
| exclude_attr_cols.append('_tid_') | ||
| if src_col is not None: | ||
| exclude_attr_cols.append(src_col) | ||
|
|
||
| # Load raw CSV file/data into a Postgres table 'name' (param). | ||
| self.raw_data = Table(name, Source.FILE, na_values=na_values, exclude_attr_cols=exclude_attr_cols, fpath=fpath) | ||
| self.raw_data = Table(name, Source.FILE, na_values=na_values, | ||
| exclude_attr_cols=exclude_attr_cols, fpath=fpath) | ||
|
|
||
| df = self.raw_data.df | ||
| # Add _tid_ column to dataset that uniquely identifies an entity. | ||
|
|
@@ -100,19 +123,40 @@ def load_data(self, name, fpath, na_values=None, entity_col=None, src_col=None): | |
| # use entity IDs as _tid_'s directly | ||
| df.rename({entity_col: '_tid_'}, axis='columns', inplace=True) | ||
|
|
||
| self.numerical_attrs = numerical_attrs or [] | ||
| all_attrs = self.raw_data.get_attributes() | ||
| self.categorical_attrs = [attr for attr in all_attrs if attr not in self.numerical_attrs] | ||
|
|
||
| if store_to_db: | ||
| # Now df is all in str type, make a copy of df and then | ||
| # 1. replace the null values in categorical data | ||
| # 2. make the numerical attrs as float | ||
| # 3. store the correct type into db (categorical->str, numerical->float) | ||
| df_correct_type = df.copy() | ||
| for attr in self.categorical_attrs: | ||
| df_correct_type.loc[df_correct_type[attr].isnull(), attr] = NULL_REPR | ||
| for attr in self.numerical_attrs: | ||
| df_correct_type[attr] = df_correct_type[attr].astype(float) | ||
|
|
||
| df_correct_type.to_sql(self.raw_data.name, self.engine.engine, if_exists='replace', index=False, | ||
| index_label=None) | ||
|
|
||
| # for df, which is all str | ||
| # Use NULL_REPR to represent NULL values | ||
| df.replace('', NULL_REPR, inplace=True) | ||
| df.fillna(NULL_REPR, inplace=True) | ||
|
|
||
| logging.info("Loaded %d rows with %d cells", self.raw_data.df.shape[0], self.raw_data.df.shape[0] * self.raw_data.df.shape[1]) | ||
| logging.info("Loaded %d rows with %d cells", self.raw_data.df.shape[0], | ||
| self.raw_data.df.shape[0] * self.raw_data.df.shape[1]) | ||
|
|
||
| # Call to store to database | ||
| self.raw_data.store_to_db(self.engine.engine) | ||
| status = 'DONE Loading {fname}'.format(fname=os.path.basename(fpath)) | ||
|
|
||
| # Generate indexes on attribute columns for faster queries | ||
| for attr in self.raw_data.get_attributes(): | ||
| # Generate index on attribute | ||
| self.raw_data.create_db_index(self.engine,[attr]) | ||
| if store_to_db: | ||
| # Generate indexes on attribute columns for faster queries | ||
| for attr in self.raw_data.get_attributes(): | ||
| # Generate index on attribute | ||
| self.raw_data.create_db_index(self.engine,[attr]) | ||
|
|
||
| # Create attr_to_idx dictionary (assign unique index for each attribute) | ||
| # and attr_count (total # of attributes) | ||
|
|
@@ -178,6 +222,15 @@ def get_raw_data(self): | |
| raise Exception('ERROR No dataset loaded') | ||
| return self.raw_data.df | ||
|
|
||
| def get_quantized_data(self): | ||
| """ | ||
| get_quantized_data returns a pandas.DataFrame containing the data after quantization | ||
| :return: the data after quantization in pandas.DataFrame | ||
| """ | ||
| if self.quantized_data is None: | ||
| raise Exception('ERROR No dataset quantized') | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix the message. This is no proper English. |
||
| return self.quantized_data.df | ||
|
|
||
| def get_attributes(self): | ||
| """ | ||
| get_attributes return the trainable/learnable attributes (i.e. exclude meta | ||
|
|
@@ -187,6 +240,29 @@ def get_attributes(self): | |
| raise Exception('ERROR No dataset loaded') | ||
| return self.raw_data.get_attributes() | ||
|
|
||
| def get_active_attributes(self): | ||
| """ | ||
| get_active_attributes returns the attributes to be modeled. | ||
|
|
||
| If infer_mode = 'dk', these attributes correspond only to attributes that contain at least | ||
| one potentially erroneous cell. Otherwise all attributes are returned. | ||
|
|
||
| If applicable, in the provided :param:`train_attrs` variable. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what does this second comment mean? |
||
| """ | ||
| if self.train_attrs is None: | ||
| self.train_attrs = self.get_attributes() | ||
|
|
||
| if self.env['infer_mode'] == 'dk': | ||
| if self._active_attributes is None: | ||
| raise Exception('ERROR no active attributes loaded. Run error detection first.') | ||
| attrs = self._active_attributes | ||
| elif self.env['infer_mode'] == 'all': | ||
| attrs = self.get_attributes() | ||
| else: | ||
| raise Exception('infer mode must be one of {dk, all}') | ||
|
|
||
| return sorted([attr for attr in attrs if attr in self.train_attrs]) | ||
|
|
||
| def get_cell_id(self, tuple_id, attr_name): | ||
| """ | ||
| get_cell_id returns cell ID: a unique ID for every cell. | ||
|
|
@@ -257,7 +333,7 @@ def get_stats_single(self, attr): | |
| """ | ||
| # need to decode values into unicode strings since we do lookups via | ||
| # unicode strings from Postgres | ||
| data_df = self.get_raw_data() | ||
| data_df = self.get_quantized_data() if self.do_quantization else self.get_raw_data() | ||
| return data_df[[attr]].loc[data_df[attr] != NULL_REPR].groupby([attr]).size().to_dict() | ||
|
|
||
| def get_stats_pair(self, first_attr, second_attr): | ||
|
|
@@ -268,7 +344,7 @@ def get_stats_pair(self, first_attr, second_attr): | |
| <count>: frequency (# of entities) where first_attr=<first_val> AND second_attr=<second_val> | ||
| Filters out NULL values so no entries in the dictionary would have NULLs. | ||
| """ | ||
| data_df = self.get_raw_data() | ||
| data_df = self.get_quantized_data() if self.do_quantization else self.get_raw_data() | ||
| tmp_df = data_df[[first_attr, second_attr]]\ | ||
| .loc[(data_df[first_attr] != NULL_REPR) & (data_df[second_attr] != NULL_REPR)]\ | ||
| .groupby([first_attr, second_attr])\ | ||
|
|
@@ -318,3 +394,19 @@ def get_repaired_dataset(self): | |
| toc = time.clock() | ||
| total_time = toc - tic | ||
| return status, total_time | ||
|
|
||
| def load_embedding_model(self, model): | ||
| """ | ||
| Memoize the TupleEmbedding model for retrieving learned embeddings | ||
| later (e.g. in EmbeddingFeaturizer). | ||
| """ | ||
| self._embedding_model = model | ||
|
|
||
| def get_embedding_model(self): | ||
| """ | ||
| Retrieve the memoized embedding model. | ||
| """ | ||
| if self._embedding_model is None: | ||
| raise Exception("cannot retrieve embedding model: it was never trained and loaded!") | ||
| return self._embedding_model | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| import time | ||
|
|
||
| import numpy as np | ||
| from sklearn.cluster import KMeans | ||
| from utils import NULL_REPR | ||
|
|
||
|
|
||
| def quantize_km(env, df_raw, num_attr_groups_bins): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the name is not informative. Switch to kmeans (it is not long). Also, I would expect to specify "k" here. Do bins refer to clusters? I would change the name to clusters instead of bins to follow common convention. |
||
| """ | ||
| Kmeans clustering using sklearn | ||
| https://scikit-learn.org/stable/modules/generated/sklearn.cluster.KMeans.html | ||
| Currently do 1D clustering | ||
| :param df_raw: pandas.dataframe | ||
| :param num_attr_groups_bins: list[tuple] where each tuple consists of | ||
| (# of bins, list[str]) where the list[str] is a group of attribues to be | ||
| treated as numerical. | ||
| Groups must be disjoint. | ||
|
|
||
| :return: pandas.dataframe after quantization | ||
| """ | ||
| tic = time.time() | ||
| df_quantized = df_raw.copy() | ||
|
|
||
| # Assert groups are disjoint | ||
| num_attrs = [attr for _, group in num_attr_groups_bins for attr in group] | ||
| assert len(set(num_attrs)) == len(num_attrs) | ||
|
|
||
| for bins, attrs in num_attr_groups_bins: | ||
| fil_notnull = (df_quantized[attrs] != NULL_REPR).all(axis=1) | ||
|
|
||
| df_group = df_quantized.loc[fil_notnull, attrs].reset_index(drop=True) | ||
| # Matrix of possibly n-dimension values | ||
| X_attrs = df_group.values.astype(np.float) | ||
|
|
||
| if bins >= np.unique(X_attrs, axis=0).shape[0]: | ||
| # No need to quantize since more bins than unique values. | ||
| continue | ||
|
|
||
| km = KMeans(n_clusters=bins) | ||
| km.fit(X_attrs) | ||
|
|
||
| label_pred = km.labels_ | ||
| centroids = km.cluster_centers_ | ||
|
|
||
| # Lookup cluster centroids and replace their values. | ||
| df_quantized.loc[fil_notnull, attrs] = np.array([centroids[label_pred[idx]] | ||
| for idx in df_group.index]).astype(str) | ||
|
|
||
| status = "DONE with quantization" | ||
| toc = time.time() | ||
| return status, toc - tic, df_quantized | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we sure that this does not break other dependencies?