diff --git a/.gitignore b/.gitignore index 8e50c6b3d..5eef561fd 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ _templates .DS_store .venv *.swp +*.swn *.pyc # pip package metadata diff --git a/dataset/dataset.py b/dataset/dataset.py index 7a52461bd..b5a09aaff 100644 --- a/dataset/dataset.py +++ b/dataset/dataset.py @@ -1,9 +1,8 @@ -import logging -import time from enum import Enum +import logging import os -import time import pandas as pd +import time from .dbengine import DBengine from .table import Table, Source @@ -49,9 +48,7 @@ def __init__(self, name, env): self.raw_data = None self.repaired_data = None self.constraints = None - self.aux_table = {} - for tab in AuxTables: - self.aux_table[tab] = None + self.aux_tables = {} # start dbengine self.engine = DBengine( env['db_user'], @@ -119,7 +116,6 @@ def load_data(self, name, fpath, na_values=None, entity_col=None, src_col=None): df.fillna('_nan_', inplace=True) # Call to store to database - self.raw_data.store_to_db(self.engine.engine) status = 'DONE Loading {fname}'.format(fname=os.path.basename(fpath)) @@ -143,6 +139,25 @@ def load_data(self, name, fpath, na_values=None, entity_col=None, src_col=None): def set_constraints(self, constraints): self.constraints = constraints + def aux_table_exists(self, aux_table): + """ + get_aux_table returns True if :param aux_table: has been generated. + + :param aux_table: (AuxTables(Enum)) auxiliary table to check + """ + return aux_table in self.aux_tables + + def get_aux_table(self, aux_table): + """ + get_aux_table returns the Table associated with :param aux_table:. + + :param aux_table: (AuxTables(Enum)) auxiliary table to retrieve + """ + if not self.aux_table_exists(aux_table): + raise Exception("{} auxiliary table has not been generated".format(aux_table)) + return self.aux_tables[aux_table] + + def generate_aux_table(self, aux_table, df, store=False, index_attrs=False): """ generate_aux_table writes/overwrites the auxiliary table specified by @@ -160,13 +175,13 @@ def generate_aux_table(self, aux_table, df, store=False, index_attrs=False): also creates indexes on Postgres table. """ try: - self.aux_table[aux_table] = Table(aux_table.name, Source.DF, df=df) + self.aux_tables[aux_table] = Table(aux_table.name, Source.DF, df=df) if store: - self.aux_table[aux_table].store_to_db(self.engine.engine) + self.aux_tables[aux_table].store_to_db(self.engine.engine) if index_attrs: - self.aux_table[aux_table].create_df_index(index_attrs) + self.aux_tables[aux_table].create_df_index(index_attrs) if store and index_attrs: - self.aux_table[aux_table].create_db_index(self.engine, index_attrs) + self.aux_tables[aux_table].create_db_index(self.engine, index_attrs) except Exception: logging.error('generating aux_table %s', aux_table.name) raise @@ -177,10 +192,10 @@ def generate_aux_table_sql(self, aux_table, query, index_attrs=False): :param query: (str) SQL query whose result is used for generating the auxiliary table. """ try: - self.aux_table[aux_table] = Table(aux_table.name, Source.SQL, table_query=query, db_engine=self.engine) + self.aux_tables[aux_table] = Table(aux_table.name, Source.SQL, table_query=query, db_engine=self.engine) if index_attrs: - self.aux_table[aux_table].create_df_index(index_attrs) - self.aux_table[aux_table].create_db_index(self.engine, index_attrs) + self.aux_tables[aux_table].create_df_index(index_attrs) + self.aux_tables[aux_table].create_db_index(self.engine, index_attrs) except Exception: logging.error('generating aux_table %s', aux_table.name) raise @@ -225,14 +240,14 @@ def get_statistics(self): : frequency (# of entities) where attr1: val1 AND attr2: val2 """ if not self.stats_ready: - self.collect_stats() + self.collect_init_stats() stats = (self.total_tuples, self.single_attr_stats, self.pair_attr_stats) self.stats_ready = True return stats - def collect_stats(self): + def collect_init_stats(self): """ - collect_stats memoizes: + collect_init_stats calculates and memoizes: (based on RAW/INITIAL data) 1. self.single_attr_stats ({ attribute -> { value -> count } }) the frequency (# of entities) of a given attribute-value 2. self.pair_attr_stats ({ attr1 -> { attr2 -> {val1 -> {val2 -> count } } } }) @@ -243,35 +258,112 @@ def collect_stats(self): Also known as co-occurrence count. """ - self.total_tuples = self.get_raw_data().shape[0] + self.total_tuples = self.get_raw_data()['_tid_'].nunique() + # Single attribute-value frequency + for attr in self.get_attributes(): + self.single_attr_stats[attr] = self._get_init_stats_single(attr) + # Co-occurence frequency + for first_attr in self.get_attributes(): + self.pair_attr_stats[first_attr] = {} + for second_attr in self.get_attributes(): + if second_attr != first_attr: + self.pair_attr_stats[first_attr][second_attr] = self._get_init_stats_pair(first_attr,second_attr) + + def collect_current_stats(self, attr): + """ + collect_current_stats calculates and memoizes frequency and co-occurence + statistics based on the CURRENT values/data. + + See collect_init_stats for which member variables are memoized/overwritten. + Does NOT overwrite self.total_tuples. + """ # Single attribute-value frequency for attr in self.get_attributes(): - self.single_attr_stats[attr] = self.get_stats_single(attr) + self.single_attr_stats[attr] = self._get_current_stats_single(attr) # Co-occurence frequency - for cond_attr in self.get_attributes(): - self.pair_attr_stats[cond_attr] = {} - for trg_attr in self.get_attributes(): - if trg_attr != cond_attr: - self.pair_attr_stats[cond_attr][trg_attr] = self.get_stats_pair(cond_attr,trg_attr) + for first_attr in self.get_attributes(): + self.pair_attr_stats[first_attr] = {} + for second_attr in self.get_attributes(): + if second_attr != first_attr: + self.pair_attr_stats[first_attr][second_attr] = self._get_current_stats_pair(first_attr,second_attr) - def get_stats_single(self, attr): + def _get_init_stats_single(self, attr): + """ + _get_init_stats_single returns a dictionary where the keys possible + values for :param attr: and the values contain the frequency count in + the RAW/INITIAL data of that value for this attribute. + """ + # We need to iterate through this in a for loop instead of groupby & size + # since our values may be '|||' separated + freq_count = {} + for (cell,) in self.get_raw_data()[[attr]].itertuples(index=False): + vals = cell.split('|||') + for val in vals: + # Correct for if there are multiple values: equal weight all + # values and their contribution to counts + freq_count[val] = freq_count.get(val, 0) + 1. / len(vals) + return freq_count + + def _get_current_stats_single(self, attr): """ - Returns a dictionary where the keys possible values for :param attr: and - the values contain the frequency count of that value for this attribute. + _get_current_stats_single a dictionary where the keys possible values + for :param attr: and the values contain the frequency count in the + CURRENT data of that value for this attribute. + """ + # Retrieve statistics on current value from cell_domain + df_domain = self.get_aux_table(AuxTables.cell_domain).df + df_count = df_domain.loc[df_domain['attribute'] == attr, 'current_value'].value_counts() + # We do not store attributes with only NULL values in cell_domain: + # we require _nan_ in our single stats however + if df_count.empty: + return {'_nan_': self.total_tuples} + return df_count.to_dict() + + def _get_init_stats_pair(self, first_attr, second_attr): + """ + _get_init_stats_pair returns a dictionary {first_val -> {second_val -> + count } } where (based on RAW/INITIAL dataset): + : all possible values for first_attr + : all values for second_attr that appeared at least once with + : frequency (# of entities) where first_attr: AND second_attr: """ - # need to decode values into unicode strings since we do lookups via - # unicode strings from Postgres - return self.get_raw_data()[[attr]].groupby([attr]).size().to_dict() - def get_stats_pair(self, first_attr, second_attr): + # We need to iterate through this in a for loop instead of groupby & size + # since our values may be '|||' separated + cooccur_freq_count = {} + for cell1, cell2 in self.get_raw_data()[[first_attr,second_attr]].itertuples(index=False): + vals1 = cell1.split('|||') + vals2 = cell2.split('|||') + for val1 in vals1: + cooccur_freq_count[val1] = cooccur_freq_count.get(val1, {}) + for val2 in vals2: + # Correct for if there are multiple values: equal weight all + # co-pairs and their contribution to co-occur counts + cooccur_freq_count[val1][val2] = cooccur_freq_count[val1].get(val2, 0) + 1. / (len(vals1) * len(vals2)) + return cooccur_freq_count + + def _get_current_stats_pair(self, first_attr, second_attr): """ - Returns a dictionary {first_val -> {second_val -> count } } where: + _get_current_stats_pair returns a dictionary {first_val -> {second_val -> + count } } where (based on CURRENT dataset): : all possible values for first_attr : all values for second_attr that appeared at least once with : frequency (# of entities) where first_attr: AND second_attr: """ - tmp_df = self.get_raw_data()[[first_attr,second_attr]].groupby([first_attr,second_attr]).size().reset_index(name="count") - return _dictify(tmp_df) + # Retrieve pairwise statistics on current value from cell_domain + df_domain = self.get_aux_table(AuxTables.cell_domain).df + # Filter cell_domain for only the attributes we care about + df_domain = df_domain[df_domain['attribute'].isin([first_attr, second_attr])] + # Convert to wide form so we have our :param first_attr: + # and :second_attr: as columns along with the _tid_ column + df_domain = df_domain[['_tid_', 'attribute', 'current_value']].pivot(index='_tid_', columns='attribute', values='current_value') + # We do not store cells for attributes consisting of only NULL values in cell_domain. + # We require this for pair stats though. + if first_attr not in df_domain.columns: + df_domain[first_attr] = '_nan_' + if second_attr not in df_domain.columns: + df_domain[second_attr] = '_nan_' + return _dictify(df_domain.groupby([first_attr, second_attr]).size().reset_index(name="count")) def get_domain_info(self): """ @@ -284,30 +376,39 @@ def get_domain_info(self): classes = int(res[0][1]) return total_vars, classes - def get_inferred_values(self): + def generate_inferred_values(self): tic = time.clock() - query = "SELECT t1._tid_, t1.attribute, domain[inferred_assignment + 1] as rv_value " \ - "FROM " \ - "(SELECT _tid_, attribute, " \ - "_vid_, init_value, string_to_array(regexp_replace(domain, \'[{\"\"}]\', \'\', \'gi\'), \'|||\') as domain " \ - "FROM %s) as t1, %s as t2 " \ - "WHERE t1._vid_ = t2._vid_"%(AuxTables.cell_domain.name, AuxTables.inf_values_idx.name) + query = """ + SELECT t1._tid_, + t1.attribute, + domain[inferred_assignment + 1] AS rv_value + FROM ( + SELECT _tid_, + attribute, + _vid_, + current_value, + string_to_array(regexp_replace(domain, \'[{{\"\"}}]\', \'\', \'gi\'), \'|||\') AS domain + FROM {cell_domain}) AS t1, + {inf_values_idx} AS t2 + WHERE t1._vid_ = t2._vid_ + """.format(cell_domain=AuxTables.cell_domain.name, + inf_values_idx=AuxTables.inf_values_idx.name) self.generate_aux_table_sql(AuxTables.inf_values_dom, query, index_attrs=['_tid_']) - self.aux_table[AuxTables.inf_values_dom].create_db_index(self.engine, ['attribute']) + self.get_aux_table(AuxTables.inf_values_dom).create_db_index(self.engine, ['attribute']) status = "DONE collecting the inferred values." toc = time.clock() total_time = toc - tic return status, total_time - def get_repaired_dataset(self): + def generate_repaired_dataset(self): tic = time.clock() - init_records = self.raw_data.df.sort_values(['_tid_']).to_records(index=False) - t = self.aux_table[AuxTables.inf_values_dom] + records = self.raw_data.df.sort_values(['_tid_']).to_records(index=False) + t = self.aux_tables[AuxTables.inf_values_dom] repaired_vals = _dictify(t.df.reset_index()) for tid in repaired_vals: for attr in repaired_vals[tid]: - init_records[tid][attr] = repaired_vals[tid][attr] - repaired_df = pd.DataFrame.from_records(init_records) + records[tid][attr] = repaired_vals[tid][attr] + repaired_df = pd.DataFrame.from_records(records) name = self.raw_data.name+'_repaired' self.repaired_data = Table(name, Source.DF, df=repaired_df) self.repaired_data.store_to_db(self.engine.engine) @@ -316,4 +417,22 @@ def get_repaired_dataset(self): total_time = toc - tic return status, total_time - + def update_current_values(self): + """ + update_current_values takes the inferred values from inf_values_dom + (auxiliary table) and updates them in the current_value column in + cell_domain (auxiliary table). + """ + df_inferred = self.get_aux_table(AuxTables.inf_values_dom).df + df_cell_domain = self.get_aux_table(AuxTables.cell_domain).df + + # Update current_value column with rv_values for inferred TIDs + df_updated = df_cell_domain.reset_index().merge(df_inferred, on=['_tid_', 'attribute'], how='left') + update_filter = df_updated['rv_value'].notnull() + df_updated.loc[update_filter, 'current_value'] = df_updated.loc[update_filter, 'rv_value'] + df_updated.drop('rv_value', axis=1, inplace=True) + + self.generate_aux_table(AuxTables.cell_domain, df_updated, store=True, index_attrs=['_vid_']) + # self.ds.get_aux_table(AuxTables.cell_domain).create_db_index(self.ds.engine, ['_tid_']) + # self.ds.get_aux_table(AuxTables.cell_domain).create_db_index(self.ds.engine, ['_cid_']) + logging.info('DONE updating current values with inferred values') diff --git a/dataset/table.py b/dataset/table.py index 0a1a683e2..99ffab808 100644 --- a/dataset/table.py +++ b/dataset/table.py @@ -1,6 +1,6 @@ +from enum import Enum import os import pandas as pd -from enum import Enum class Source(Enum): FILE = 1 diff --git a/detect/__init__.py b/detect/__init__.py index e8764e549..aed62b0f6 100644 --- a/detect/__init__.py +++ b/detect/__init__.py @@ -1,6 +1,7 @@ from .detect import DetectEngine from .detector import Detector -from .nulldetector import NullDetector -from .violationdetector import ViolationDetector +from .multi_init_detector import MultiInitDetector +from .null_detector import NullDetector +from .violation_detector import ViolationDetector -__all__ = ['DetectEngine', 'Detector', 'NullDetector', 'ViolationDetector'] +__all__ = ['DetectEngine', 'Detector', 'MultiInitDetector', 'NullDetector', 'ViolationDetector'] diff --git a/detect/detect.py b/detect/detect.py index 25a47bb5d..7ef85fd90 100644 --- a/detect/detect.py +++ b/detect/detect.py @@ -34,5 +34,5 @@ def store_detected_errors(self, errors_df): if errors_df.empty: raise Exception("ERROR: Detected errors dataframe is empty.") self.ds.generate_aux_table(AuxTables.dk_cells, errors_df, store=True) - self.ds.aux_table[AuxTables.dk_cells].create_db_index(self.ds.engine, ['_cid_']) + self.ds.get_aux_table(AuxTables.dk_cells).create_db_index(self.ds.engine, ['_cid_']) diff --git a/detect/multi_init_detector.py b/detect/multi_init_detector.py new file mode 100644 index 000000000..32bdb9839 --- /dev/null +++ b/detect/multi_init_detector.py @@ -0,0 +1,33 @@ +import pandas as pd +from .detector import Detector + +class MultiInitDetector(Detector): + """ + MultiInitDetector detects every cell with multiple initial values as errors. + """ + def __init__(self, name='MultiInitDetector'): + super(MultiInitDetector, self).__init__(name) + + def setup(self, dataset, env): + self.ds = dataset + self.env = env + self.df = self.ds.get_raw_data() + + def detect_noisy_cells(self): + """ + detech_noisy_cells returns a pandas.DataFrame containing all cells with + multiple '|||' separated values. + + :return: pandas.DataFrame with columns: + _tid_: entity ID + attribute: attribute with NULL value for this entity + """ + attributes = self.ds.get_attributes() + errors = [] + for attr in attributes: + tmp_df = self.df[self.df[attr].str.contains(r'\|\|\|')]['_tid_'].to_frame() + tmp_df.insert(1, "attribute", attr) + errors.append(tmp_df) + errors_df = pd.concat(errors, ignore_index=True) + return errors_df + diff --git a/detect/nulldetector.py b/detect/null_detector.py similarity index 85% rename from detect/nulldetector.py rename to detect/null_detector.py index 412196766..0d4108f11 100644 --- a/detect/nulldetector.py +++ b/detect/null_detector.py @@ -22,7 +22,8 @@ def detect_noisy_cells(self): attributes = self.ds.get_attributes() errors = [] for attr in attributes: - tmp_df = self.df[self.df[attr].isnull()]['_tid_'].to_frame() + # self.df i.e. raw_data has all NULL values converted to '_nan_' + tmp_df = self.df[self.df[attr] == '_nan_']['_tid_'].to_frame() tmp_df.insert(1, "attribute", attr) errors.append(tmp_df) errors_df = pd.concat(errors, ignore_index=True) diff --git a/detect/violationdetector.py b/detect/violation_detector.py similarity index 71% rename from detect/violationdetector.py rename to detect/violation_detector.py index 575472869..5d586e8d0 100644 --- a/detect/violationdetector.py +++ b/detect/violation_detector.py @@ -17,24 +17,33 @@ def setup(self, dataset, env): self.constraints = dataset.constraints def detect_noisy_cells(self): - # Convert Constraints to SQL queries + """ + detect_noisy_cells returns all cells that are involved in a DC violation. + + :return: pandas.DataFrame with two columns: + '_tid_': TID of tuple + 'attribute': attribute corresponding to cell involved in DC violation + """ + + # Convert Constraints to SQL queries + tbl = self.ds.raw_data.name queries = [] + # attributes involved in a DC violation (indexed by corresponding query) attrs = [] - for c_key in self.constraints: - c = self.constraints[c_key] - q = self.to_sql(tbl, c) + for constraint in self.constraints.values(): + # SQL query to query for TIDs involved in a DC violation for this constraint + q = self.to_sql(tbl, constraint) queries.append(q) - attrs.append(c.components) + attrs.append(constraint.components) # Execute Queries over the DBEngine of Dataset results = self.ds.engine.execute_queries(queries) # Generate final output errors = [] - for i in range(len(attrs)): - res = results[i] - attr_list = attrs[i] - tmp_df = self.gen_tid_attr_output(res, attr_list) + for attr_list, res in zip(attrs, results): + # DataFrame with TID and attribute pairs from DC violation queries + tmp_df = self._gen_tid_attr_output(res, attr_list) errors.append(tmp_df) errors_df = pd.concat(errors, ignore_index=True).drop_duplicates().reset_index(drop=True) return errors_df @@ -79,7 +88,12 @@ def gen_mult_query(self, tbl, c): query = mult_template.substitute(table=tbl, cond1=cond1, c='', cond2=cond2) return query - def gen_tid_attr_output(self, res, attr_list): + def _gen_tid_attr_output(self, res, attr_list): + """ + _gen_tid_attr_output creates a DataFrame containing the TIDs from + the DC violation query results in :param res: with the attributes + that were involved in the violation in :param attr_list:. + """ errors = [] for tuple in res: tid = int(tuple[0]) diff --git a/domain/domain.py b/domain/domain.py index d45b8d97d..7f9a2d1c2 100644 --- a/domain/domain.py +++ b/domain/domain.py @@ -7,7 +7,6 @@ from dataset import AuxTables - class DomainEngine: def __init__(self, env, dataset, cor_strength = 0.1, sampling_prob=0.3, max_sample=5): """ @@ -32,7 +31,6 @@ def __init__(self, env, dataset, cor_strength = 0.1, sampling_prob=0.3, max_samp self.max_sample = max_sample self.single_stats = {} self.pair_stats = {} - self.all_attrs = {} def setup(self): """ @@ -55,7 +53,9 @@ def find_correlations(self): the pairwise correlations between attributes (values are treated as discrete categories). """ - df = self.ds.get_raw_data()[self.ds.get_attributes()].copy() + # use expanded raw DataFrame to calculate correlations (since + # raw may contain '|||' separated values) + df = self._expand_raw_df()[self.ds.get_attributes()] # convert dataset to categories/factors for attr in df.columns: df[attr] = df[attr].astype('category').cat.codes @@ -65,30 +65,79 @@ def find_correlations(self): m_corr = df.corr() self.correlations = m_corr + def _expand_raw_df(self): + """ + _expand_raw_df returns an expanded version of the raw DataFrame + where every row with cells with multiple values (separated by '|||') + are expanded into multiple rows that is the cross-product of the + multi-valued cells. + + For example if a row contains + + attr1 | attr2 + A|||B|||C D|||E + + this would be expanded into + + attr1 | attr2 + A D + A E + B D + B E + C D + C E + """ + # Cells may contain values separated by '|||': we need to + # expand this into multiple rows + raw_df = self.ds.get_raw_data() + + tic = time.clock() + expanded_rows = [] + for tup in raw_df.itertuples(): + expanded_tup = [val.split('|||') if hasattr(val, 'split') else (val,) for val in tup ] + expanded_rows.extend([new_tup for new_tup in itertools.product(*expanded_tup)]) + toc = time.clock() + logging.debug("Time to expand raw data: %.2f secs", toc-tic) + expanded_df = pd.DataFrame(expanded_rows, columns=raw_df.index.names + list(raw_df.columns)) + expanded_df.set_index(raw_df.index.names, inplace=True) + return expanded_df + def store_domains(self, domain): """ - store_domains stores the 'domain' DataFrame as the 'cell_domain' + store_domains stores the :param domain: DataFrame as the 'cell_domain' auxiliary table as well as generates the 'pos_values' auxiliary table, a long-format of the domain values, in Postgres. pos_values schema: _tid_: entity/tuple ID _cid_: cell ID - _vid_: random variable ID (all cells with more than 1 domain value) - _ - + _vid_: random variable ID (1-1 with _cid_) + attribute: name of attribute + rv_val: domain value + val_id: domain index of rv_val """ if domain.empty: raise Exception("ERROR: Generated domain is empty.") - else: - self.ds.generate_aux_table(AuxTables.cell_domain, domain, store=True, index_attrs=['_vid_']) - self.ds.aux_table[AuxTables.cell_domain].create_db_index(self.ds.engine, ['_tid_']) - self.ds.aux_table[AuxTables.cell_domain].create_db_index(self.ds.engine, ['_cid_']) - query = "SELECT _vid_, _cid_, _tid_, attribute, a.rv_val, a.val_id from %s , unnest(string_to_array(regexp_replace(domain,\'[{\"\"}]\',\'\',\'gi\'),\'|||\')) WITH ORDINALITY a(rv_val,val_id)" % AuxTables.cell_domain.name - self.ds.generate_aux_table_sql(AuxTables.pos_values, query, index_attrs=['_tid_', 'attribute']) + + self.ds.generate_aux_table(AuxTables.cell_domain, domain, store=True, index_attrs=['_vid_']) + self.ds.get_aux_table(AuxTables.cell_domain).create_db_index(self.ds.engine, ['_tid_']) + self.ds.get_aux_table(AuxTables.cell_domain).create_db_index(self.ds.engine, ['_cid_']) + query = """ + SELECT + _vid_, + _cid_, + _tid_, + attribute, + a.rv_val, + a.val_id + FROM + {cell_domain}, + unnest(string_to_array(regexp_replace(domain,\'[{{\"\"}}]\',\'\',\'gi\'),\'|||\')) WITH ORDINALITY a(rv_val,val_id) + """.format(cell_domain=AuxTables.cell_domain.name) + self.ds.generate_aux_table_sql(AuxTables.pos_values, query, index_attrs=['_tid_', 'attribute']) def setup_attributes(self): - self.active_attributes = self.get_active_attributes() + self.active_attributes = self.fetch_active_attributes() total, single_stats, pair_stats = self.ds.get_statistics() self.total = total self.single_stats = single_stats @@ -132,9 +181,9 @@ def _topk_pair_stats(self, pair_stats): out[attr1][attr2][val1] = top_cands return out - def get_active_attributes(self): + def fetch_active_attributes(self): """ - get_active_attributes returns the attributes to be modeled. + fetch_active_attributes fetches/refetches the attributes to be modeled. These attributes correspond only to attributes that contain at least one potentially erroneous cell. """ @@ -177,10 +226,13 @@ def generate_domain(self): _cid_: cell ID (unique for every entity-attribute) _vid_: variable ID (1-1 correspondence with _cid_) attribute: attribute name + attribute_idx: index of attribute domain: ||| seperated string of domain values domain_size: length of domain - init_value: initial value for this cell - init_value_idx: domain index of init_value + init_values: initial values for this cell + init_values_idx: domain indexes of init_values + current_value: current value (current predicted) + current_value_idx: domain index for current value fixed: 1 if a random sample was taken since no correlated attributes/top K values """ @@ -190,29 +242,37 @@ def generate_domain(self): # Iterate over dataset rows cells = [] vid = 0 - records = self.ds.get_raw_data().to_records() - self.all_attrs = list(records.dtype.names) - for row in tqdm(list(records)): + raw_records = self.ds.get_raw_data().to_records() + for row in tqdm(raw_records): tid = row['_tid_'] app = [] + + # Iterate over each active attribute (attributes that have at + # least one dk cell) and generate for this cell: + # 1) the domain values + # 2) the initial values (taken from raw data) + # 3) the current value (best predicted value) for attr in self.active_attributes: - init_value, dom = self.get_domain_cell(attr, row) - init_value_idx = dom.index(init_value) - if len(dom) > 1: - cid = self.ds.get_cell_id(tid, attr) - app.append({"_tid_": tid, "attribute": attr, "_cid_": cid, "_vid_":vid, "domain": "|||".join(dom), "domain_size": len(dom), - "init_value": init_value, "init_index": init_value_idx, "fixed":0}) - vid += 1 - else: - add_domain = self.get_random_domain(attr,init_value) - # Check if attribute has more than one unique values - if len(add_domain) > 0: - dom.extend(self.get_random_domain(attr,init_value)) - cid = self.ds.get_cell_id(tid, attr) - app.append({"_tid_": tid, "attribute": attr, "_cid_": cid, "_vid_": vid, "domain": "|||".join(dom), - "domain_size": len(dom), - "init_value": init_value, "init_index": init_value_idx, "fixed": 1}) - vid += 1 + init_values, current_value, dom = self.get_domain_cell(attr, row) + init_values_idx = [dom.index(val) for val in init_values] + current_value_idx = dom.index(current_value) + cid = self.ds.get_cell_id(tid, attr) + fixed = 0 + + # If domain could not be generated from correlated attributes, + # randomly choose values to add to our domain. + if len(dom) == 1: + fixed = 1 + add_domain = self.get_random_domain(attr, init_values) + dom.extend(add_domain) + + app.append({"_tid_": tid, "_cid_": cid, "_vid_":vid, + "attribute": attr, "attribute_idx": self.ds.attr_to_idx[attr], + "domain": '|||'.join(dom), "domain_size": len(dom), + "init_values": '|||'.join(init_values), "init_values_idx": '|||'.join(map(str,init_values_idx)), + "current_value": current_value, "current_value_idx": current_value_idx, + "fixed": fixed}) + vid+=1 cells.extend(app) domain_df = pd.DataFrame(data=cells) logging.info('DONE generating domain') @@ -220,8 +280,8 @@ def generate_domain(self): def get_domain_cell(self, attr, row): """ - get_domain_cell returns a list of all domain values for the given - entity (row) and attribute. + get_domain_cell returns list of init values, current (best predicted) + value, and list of domain values for the given cell. We define domain values as values in 'attr' that co-occur with values in attributes ('cond_attr') that are correlated with 'attr' at least in @@ -237,26 +297,29 @@ def get_domain_cell(self, attr, row): This would produce [B,C,E] as domain values. - :return: (initial value of entity-attribute, domain values for entity-attribute). + :param attr: (str) name of attribute to generate domain info for + :param row: (pandas.record) Pandas record (tuple) of the current TID's row + + :return: (list of initial values, current value, list of domain values). """ - domain = set([]) + domain = set() correlated_attributes = self.get_corr_attributes(attr) # Iterate through all attributes correlated at least self.cor_strength ('cond_attr') # and take the top K co-occurrence values for 'attr' with the current # row's 'cond_attr' value. for cond_attr in correlated_attributes: - if cond_attr == attr or cond_attr == 'index' or cond_attr == '_tid_': + if cond_attr == attr: continue - cond_val = row[cond_attr] - if not pd.isnull(cond_val): - if not self.pair_stats[cond_attr][attr]: - break + # row[cond_attr] should always be a string (since it comes from self.raw_data) + for cond_val in row[cond_attr].split('|||'): s = self.pair_stats[cond_attr][attr] try: candidates = s[cond_val] domain.update(candidates) except KeyError as missing_val: + # KeyError is possible since we do not store stats for + # attributes with only NULL values if not pd.isnull(row[attr]): # error since co-occurrence must be at least 1 (since # the current row counts as one co-occurrence). @@ -265,25 +328,75 @@ def get_domain_cell(self, attr, row): # Remove _nan_ if added due to correlated attributes domain.discard('_nan_') - # Add initial value in domain - if pd.isnull(row[attr]): - domain.update(set(['_nan_'])) - init_value = '_nan_' - else: - domain.update(set([row[attr]])) - init_value = row[attr] - return init_value, list(domain) - def get_random_domain(self, attr, cur_value): + init_values, current_value = self._init_and_current(attr, row) + domain.update(init_values) + + return init_values, current_value, list(domain) + + def _init_and_current(self, attr, init_row): + """ + _init_and_current returns the initial values for :param attr: + and the current value: the initial value that has the highest + cumulative co-occurrence probability with the other initial values in + this row. + """ + # Assume value in raw dataset is given as ||| separate initial values + init_values = init_row[attr].split('|||') + + # Only one initial value: current is the initial value + if len(init_values) == 1: + return init_values, init_values[0] + + _, single_stats, pair_stats = self.ds.get_statistics() + attrs = self.ds.get_attributes() + + # Determine current value by computing co-occurrence probability + best_val = None + best_score = None + for init_val in init_values: + # Compute total sum of co-occur probabilities with all other + # initial values in this row, that is we calculate the sum of + # + # P(initial | other_init_val) = P(initial, other_init_val) / P(other_init_val) + cur_score = 0 + for other_attr in attrs: + if attr == other_attr: + continue + other_vals = init_row[other_attr].split('|||') + for other_val in other_vals: + # We subtract the co-occurrence weight for this current row + # from pair_stats since we do not want to include the + # co-occurrence of our current row. + # + # Consider the extreme case where an errorneous initial + # value only occurs once: its co-occurrence probability + # will always be 1 but it does not mean this value + # co-occurs most frequently with our other initial values. + cooccur_freq = pair_stats[attr][other_attr][init_val][other_val] - 1. / (len(other_vals) * len(init_values)) + + cur_score += float(cooccur_freq) / single_stats[attr][init_val] + # Keep the best initial value only + if best_score is None or cur_score > best_score: + best_val = init_val + best_score = cur_score + return init_values, best_val + + def get_random_domain(self, attr, init_values): """ get_random_domain returns a random sample of at most size - 'self.max_sample' of domain values for 'attr' that is NOT 'cur_value'. + 'self.max_sample' of domain values for :param attr: that is NOT any + of :param init_values: + + :param attr: (str) name of attribute to generate random domain for + :param init_values: (list[str]) list of initial values """ if random.random() > self.sampling_prob: return [] domain_pool = set(self.single_stats[attr].keys()) - domain_pool.discard(cur_value) + # Do not include initial values in random domain + domain_pool = domain_pool.difference(init_values) size = len(domain_pool) if size > 0: k = min(self.max_sample, size) diff --git a/evaluate/eval.py b/evaluate/eval.py index c2b358dfe..d59dd7c8c 100644 --- a/evaluate/eval.py +++ b/evaluate/eval.py @@ -7,11 +7,18 @@ from dataset import AuxTables from dataset.table import Table, Source -errors_template = Template('SELECT count(*) '\ - 'FROM $init_table as t1, $grdt_table as t2 '\ - 'WHERE t1._tid_ = t2._tid_ '\ - 'AND t2._attribute_ = \'$attr\' '\ - 'AND t1.\"$attr\" != t2._value_') +errors_template = Template(""" +SELECT + count(*) +FROM + $raw_table as t1 +INNER JOIN + $clean_table as t2 +ON + t1._tid_ = t2._tid_ + AND t2._attribute_ = '$attr' + AND t1."$attr" != t2._value_ +""") """ The 'errors' aliased subquery returns the (_tid_, _attribute_, _value_) @@ -23,15 +30,30 @@ We then count the number of cells that we repaired to the correct ground truth value. """ -correct_repairs_template = Template('SELECT COUNT(*) FROM'\ - '(SELECT t2._tid_, t2._attribute_, t2._value_ '\ - 'FROM $init_table as t1, $grdt_table as t2 '\ - 'WHERE t1._tid_ = t2._tid_ '\ - 'AND t2._attribute_ = \'$attr\' '\ - 'AND t1.\"$attr\" != t2._value_ ) as errors, $inf_dom as repairs '\ - 'WHERE errors._tid_ = repairs._tid_ '\ - 'AND errors._attribute_ = repairs.attribute '\ - 'AND errors._value_ = repairs.rv_value') +correct_repairs_template = Template(""" +SELECT + count(*) +FROM ( + SELECT + t2._tid_, + t2._attribute_, + t2._value_ + FROM + $raw_table AS t1 + INNER JOIN + $clean_table AS t2 + ON + t1._tid_ = t2._tid_ + AND t2._attribute_ = '$attr' + AND t1."$attr" != t2._value_ +) AS errors +INNER JOIN + $inf_dom AS repairs +ON + errors._tid_ = repairs._tid_ + AND errors._attribute_ = repairs.attribute + AND errors._value_ = repairs.rv_value +""") class EvalEngine: @@ -64,7 +86,7 @@ def load_data(self, name, fpath, tid_col, attr_col, val_col, na_values=None): def evaluate_repairs(self): self.compute_total_repairs() - self.compute_total_repairs_grdt() + self.compute_total_repairs_clean() self.compute_total_errors() self.compute_detected_errors() self.compute_correct_repairs() @@ -79,10 +101,10 @@ def eval_report(self): tic = time.clock() try: prec, rec, rep_recall, f1, rep_f1 = self.evaluate_repairs() - report = "Precision = %.2f, Recall = %.2f, Repairing Recall = %.2f, F1 = %.2f, Repairing F1 = %.2f, Detected Errors = %d, Total Errors = %d, Correct Repairs = %d, Total Repairs = %d, Total Repairs (Grdth present) = %d" % ( - prec, rec, rep_recall, f1, rep_f1, self.detected_errors, self.total_errors, self.correct_repairs, self.total_repairs, self.total_repairs_grdt) + report = "Precision = %.2f, Recall = %.2f, Repairing Recall = %.2f, F1 = %.2f, Repairing F1 = %.2f, Detected Errors = %d, Total Errors = %d, Correct Repairs = %d, Total Repairs = %d, Total Repairs (clean data) = %d" % ( + prec, rec, rep_recall, f1, rep_f1, self.detected_errors, self.total_errors, self.correct_repairs, self.total_repairs, self.total_repairs_clean) report_list = [prec, rec, rep_recall, f1, rep_f1, self.detected_errors, self.total_errors, - self.correct_repairs, self.total_repairs, self.total_repairs_grdt] + self.correct_repairs, self.total_repairs, self.total_repairs_clean] except Exception as e: logging.error("ERROR generating evaluation report %s" % e) raise @@ -92,34 +114,68 @@ def eval_report(self): return report, report_time, report_list def compute_total_repairs(self): - query = "SELECT count(*) FROM " \ - "(SELECT _vid_ " \ - "FROM %s as t1, %s as t2 " \ - "WHERE t1._tid_ = t2._tid_ " \ - "AND t1.attribute = t2.attribute " \ - "AND t1.init_value != t2.rv_value) AS t"\ - %(AuxTables.cell_domain.name, AuxTables.inf_values_dom.name) + """ + compute_total_repairs memoizes into self.total_repairs + the number of cells where the initial value differs from the inferred + value (i.e. the number of repairs) for the entities in the TRAINING data. + """ + # This query works with init_values with multiple values ('|||' separated) + # since it is still considered a 'repair' if we convert multiple + # initial values into one value. + query = """ + SELECT + count(*) + FROM + {cell_domain} AS t1 + INNER JOIN + {inf_values_dom} as t2 + ON + t1._tid_ = t2._tid_ + AND t1.attribute = t2.attribute + WHERE + t1.init_values != t2.rv_value + """.format(cell_domain=AuxTables.cell_domain.name, + inf_values_dom=AuxTables.inf_values_dom.name) res = self.ds.engine.execute_query(query) self.total_repairs = float(res[0][0]) - def compute_total_repairs_grdt(self): - query = "SELECT count(*) FROM " \ - "(SELECT _vid_ " \ - "FROM %s as t1, %s as t2, %s as t3 " \ - "WHERE t1._tid_ = t2._tid_ " \ - "AND t1.attribute = t2.attribute " \ - "AND t1.init_value != t2.rv_value " \ - "AND t1._tid_ = t3._tid_ " \ - "AND t1.attribute = t3._attribute_) AS t"\ - %(AuxTables.cell_domain.name, AuxTables.inf_values_dom.name, self.clean_data.name) + def compute_total_repairs_clean(self): + """ + compute_total_repairs_clean memoizes into self.total_repairs_clean + the number of cells where the initial value differs from the inferred + value (i.e. the number of repairs) for the entities in the TEST (clean) data. + """ + # This query works with init_values with multiple values ('|||' separated) + # since it is still considered a 'repair' if we convert multiple + # initial values into one value. + query = """ + SELECT + count(*) + FROM + {cell_domain} AS t1 + INNER JOIN + {inf_values_dom} AS t2 + ON + t1._tid_ = t2._tid_ + AND t1.attribute = t2.attribute + INNER JOIN + {clean_data} AS t3 + ON + t1._tid_ = t3._tid_ + AND t1.attribute = t3._attribute_ + WHERE + t1.init_values != t2.rv_value + """.format(cell_domain=AuxTables.cell_domain.name, + inf_values_dom=AuxTables.inf_values_dom.name, + clean_data=self.clean_data.name) res = self.ds.engine.execute_query(query) - self.total_repairs_grdt = float(res[0][0]) + self.total_repairs_clean = float(res[0][0]) def compute_total_errors(self): queries = [] total_errors = 0.0 for attr in self.ds.get_attributes(): - query = errors_template.substitute(init_table=self.ds.raw_data.name, grdt_table=self.clean_data.name, + query = errors_template.substitute(raw_table=self.ds.raw_data.name, clean_table=self.clean_data.name, attr=attr) queries.append(query) results = self.ds.engine.execute_queries(queries) @@ -127,11 +183,11 @@ def compute_total_errors(self): total_errors += float(res[0][0]) self.total_errors = total_errors - def compute_total_errors_grdt(self): + def compute_total_errors_clean(self): queries = [] total_errors = 0.0 for attr in self.ds.get_attributes(): - query = errors_template.substitute(init_table=self.ds.raw_data.name, grdt_table=self.clean_data.name, + query = errors_template.substitute(raw_table=self.ds.raw_data.name, clean_table=self.clean_data.name, attr=attr) queries.append(query) results = self.ds.engine.execute_queries(queries) @@ -140,13 +196,30 @@ def compute_total_errors_grdt(self): self.total_errors = total_errors def compute_detected_errors(self): - query = "SELECT count(*) FROM " \ - "(SELECT _vid_ " \ - "FROM %s as t1, %s as t2, %s as t3 " \ - "WHERE t1._tid_ = t2._tid_ AND t1._cid_ = t3._cid_ " \ - "AND t1.attribute = t2._attribute_ " \ - "AND t1.init_value != t2._value_) AS t" \ - % (AuxTables.cell_domain.name, self.clean_data.name, AuxTables.dk_cells.name) + """ + compute_detected_errors + """ + # TODO(richardwu): how do we define a "repair" if we have multiple + # init values? + query = """ + SELECT + count(*) + FROM + (SELECT + _vid_ + FROM + {cell_domain} AS t1, + {clean_data} AS t2, + {dk_cells} AS t3 + WHERE + t1._tid_ = t2._tid_ + AND t1._cid_ = t3._cid_ + AND t1.attribute = t2._attribute_ + AND t1.init_values != t2._value_ + ) AS t + """.format(cell_domain=AuxTables.cell_domain.name, + clean_data=self.clean_data.name, + dk_cells=AuxTables.dk_cells.name) res = self.ds.engine.execute_query(query) self.detected_errors = float(res[0][0]) @@ -154,8 +227,8 @@ def compute_correct_repairs(self): queries = [] correct_repairs = 0.0 for attr in self.ds.get_attributes(): - query = correct_repairs_template.substitute(init_table=self.ds.raw_data.name, grdt_table=self.clean_data.name, - attr=attr, inf_dom=AuxTables.inf_values_dom.name) + query = correct_repairs_template.substitute(raw_table=self.ds.raw_data.name, clean_table=self.clean_data.name, + attr=attr, inf_dom=AuxTables.inf_values_dom.name) queries.append(query) results = self.ds.engine.execute_queries(queries) for res in results: @@ -173,9 +246,9 @@ def compute_repairing_recall(self): return self.correct_repairs / self.detected_errors def compute_precision(self): - if self.total_repairs_grdt == 0: + if self.total_repairs_clean == 0: return 0 - return self.correct_repairs / self.total_repairs_grdt + return self.correct_repairs / self.total_repairs_clean def compute_f1(self): prec = self.compute_precision() diff --git a/examples/holoclean_repair_example.py b/examples/holoclean_repair_example.py index 6aaddfe52..cfbe0f27d 100644 --- a/examples/holoclean_repair_example.py +++ b/examples/holoclean_repair_example.py @@ -1,7 +1,8 @@ import holoclean -from detect import NullDetector, ViolationDetector -from repair.featurize import InitAttFeaturizer -from repair.featurize import InitSimFeaturizer +from detect import MultiInitDetector, NullDetector, ViolationDetector +from repair.featurize import CurrentFeaturizer +from repair.featurize import CurrentAttrFeaturizer +from repair.featurize import CurrentSimFeaturizer from repair.featurize import FreqFeaturizer from repair.featurize import OccurFeaturizer from repair.featurize import ConstraintFeat @@ -11,9 +12,9 @@ # 1. Setup a HoloClean session. hc = holoclean.HoloClean( pruning_topk=0.1, - epochs=30, + epochs=10, weight_decay=0.01, - threads=20, + threads=4, batch_size=1, verbose=True, timeout=3*60000, @@ -26,21 +27,21 @@ hc.ds.set_constraints(hc.get_dcs()) # 3. Detect erroneous cells using these two detectors. -detectors = [NullDetector(), ViolationDetector()] +detectors = [MultiInitDetector(), NullDetector(), ViolationDetector()] hc.detect_errors(detectors) # 4. Repair errors utilizing the defined features. hc.setup_domain() featurizers = [ - InitAttFeaturizer(learnable=False), - InitSimFeaturizer(), + CurrentAttrFeaturizer(learnable=False), + CurrentSimFeaturizer(), FreqFeaturizer(), OccurFeaturizer(), LangModelFeat(), ConstraintFeat() ] -hc.repair_errors(featurizers) - # 5. Evaluate the correctness of the results. -hc.evaluate('../testdata/hospital_clean.csv', 'tid', 'attribute', 'correct_val') +em_iter_func = lambda: hc.evaluate('../testdata/hospital_clean.csv', 'tid', 'attribute', 'correct_val') +hc.repair_errors(featurizers, em_iterations=3, em_iter_func=em_iter_func) + diff --git a/examples/start_example.sh b/examples/start_example.sh index 5ee266ef4..f26bcedd9 100755 --- a/examples/start_example.sh +++ b/examples/start_example.sh @@ -4,5 +4,6 @@ source ../set_env.sh echo "Launching example" python holoclean_repair_example.py +# python holoclean_food.py diff --git a/holoclean.py b/holoclean.py index eb5760e4f..aadc355a9 100644 --- a/holoclean.py +++ b/holoclean.py @@ -112,7 +112,12 @@ {'default': False, 'dest': 'print_fw', 'action': 'store_true', - 'help': 'print the weights of featurizers'}) + 'help': 'print the weights of featurizers'}), + (tuple(['--currentstats']), + {'default': False, + 'dest': 'current_stats', + 'action': 'store_true', + 'help': 're-compute frequency and co-occur stats after every EM iteration'}), ] @@ -238,30 +243,63 @@ def setup_domain(self): logging.info(status) logging.debug('Time to setup the domain: %.2f secs'%domain_time) - def repair_errors(self, featurizers): - status, feat_time = self.repair_engine.setup_featurized_ds(featurizers) - logging.info(status) - logging.debug('Time to featurize data: %.2f secs'%feat_time) - status, setup_time = self.repair_engine.setup_repair_model() - logging.info(status) - logging.debug('Time to setup repair model: %.2f secs' % feat_time) - status, fit_time = self.repair_engine.fit_repair_model() - logging.info(status) - logging.debug('Time to fit repair model: %.2f secs'%fit_time) - status, infer_time = self.repair_engine.infer_repairs() - logging.info(status) - logging.debug('Time to infer correct cell values: %.2f secs'%infer_time) - status, time = self.ds.get_inferred_values() - logging.info(status) - logging.debug('Time to collect inferred values: %.2f secs' % time) - status, time = self.ds.get_repaired_dataset() - logging.info(status) - logging.debug('Time to store repaired dataset: %.2f secs' % time) - if self.env['print_fw']: - status, time = self.repair_engine.get_featurizer_weights() + def repair_errors(self, featurizers, em_iterations=1, em_iter_func=None): + """ + repair_errors attempts to repair detected error cells with the given + featurizers. + + :param featurizers: (list[Featurizer]) list of featurizers to use on the dataset. + :param em_iterations: (int) number of EM iterations to perform. + :param em_iter_func: (function w/ no parameters) function to invoke + at the end of every EM iteration. + + :returns: list of featurizer weights summary, one per EM iteration. + + """ + all_weights = [] + for em_iter in range(1, em_iterations+1): + logging.info('performing EM iteration: %d', em_iter) + # Setup featurizers + status, feat_time = self.repair_engine.setup_featurized_ds(featurizers) + logging.info(status) + logging.debug('Time to featurize data: %.2f secs'%feat_time) + # Initialize steps for repair process + status, setup_time = self.repair_engine.setup_repair_model() + logging.info(status) + logging.debug('Time to setup repair model: %.2f secs' % feat_time) + # Repair training using clean cells + status, fit_time = self.repair_engine.fit_repair_model() logging.info(status) - logging.debug('Time to store featurizer weights: %.2f secs' % time) - return status + logging.debug('Time to fit repair model: %.2f secs'%fit_time) + # Do actual inference on DK cells + status, infer_time = self.repair_engine.infer_repairs() + logging.info(status) + logging.debug('Time to infer correct cell values: %.2f secs'%infer_time) + # Convert probabilities to predictions (i.e. argmax) + status, time = self.ds.generate_inferred_values() + logging.info(status) + logging.debug('Time to collect inferred values: %.2f secs' % time) + # Convert long format of inferred values to wide format + status, time = self.ds.generate_repaired_dataset() + logging.info(status) + logging.debug('Time to store repaired dataset: %.2f secs' % time) + # Log featurizer weights + weights, time = self.repair_engine.get_featurizer_weights() + all_weights.append(weights) + if self.env['print_fw']: + logging.info(weights) + logging.debug('Time to retrieve featurizer weights: %.2f secs' % time) + # Update current values with inferred values + self.ds.update_current_values() + if self.env['current_stats']: + # Re-compute statistics with new current values + self.ds.collect_current_stats() + + # Call em_iter_func if provided at the end of every EM iteration + if em_iter_func is not None: + em_iter_func() + logging.info('DONE EM iteration: %d', em_iter) + return all_weights def evaluate(self, fpath, tid_col, attr_col, val_col, na_values=None): """ diff --git a/repair/featurize/__init__.py b/repair/featurize/__init__.py index 23a9d438e..fab366780 100644 --- a/repair/featurize/__init__.py +++ b/repair/featurize/__init__.py @@ -1,13 +1,13 @@ from .featurize import FeaturizedDataset from .featurizer import Featurizer -from .initfeat import InitFeaturizer -from .initsimfeat import InitSimFeaturizer -from .freqfeat import FreqFeaturizer -from .occurfeat import OccurFeaturizer from .constraintfeat import ConstraintFeat +from .currentfeat import CurrentFeaturizer +from .currentattrfeat import CurrentAttrFeaturizer +from .currentsimfeat import CurrentSimFeaturizer +from .freqfeat import FreqFeaturizer from .langmodel import LangModelFeat -from .initattfeat import InitAttFeaturizer +from .occurfeat import OccurFeaturizer from .occurattrfeat import OccurAttrFeaturizer -__all__ = ['FeaturizedDataset', 'Featurizer', 'InitFeaturizer', 'InitSimFeaturizer', 'FreqFeaturizer', - 'OccurFeaturizer', 'ConstraintFeat', 'LangModelFeat', 'InitAttFeaturizer', 'OccurAttrFeaturizer'] +__all__ = ['FeaturizedDataset', 'Featurizer', 'CurrentFeaturizer', 'CurrentSimFeaturizer', 'FreqFeaturizer', + 'OccurFeaturizer', 'ConstraintFeat', 'LangModelFeat', 'CurrentAttrFeaturizer', 'OccurAttrFeaturizer'] diff --git a/repair/featurize/constraintfeat.py b/repair/featurize/constraintfeat.py index c44ac8cf3..053e539d5 100644 --- a/repair/featurize/constraintfeat.py +++ b/repair/featurize/constraintfeat.py @@ -70,11 +70,11 @@ def execute_queries(self,queries): def relax_unary_predicate(self, predicate): """ - relax_binary_predicate returns the attribute, operation, and + relax_unary_predicate returns the attribute, operation, and tuple attribute reference. :return: (attr, op, const), for example: - ("StateAvg", "<>", 't1."StateAvg"') + ("StateAvg", "<>", "StateAvg"') """ attr = predicate.components[0][1] op = predicate.operation diff --git a/repair/featurize/initattfeat.py b/repair/featurize/currentattrfeat.py similarity index 58% rename from repair/featurize/initattfeat.py rename to repair/featurize/currentattrfeat.py index 10d0369b6..b7d0eaac4 100644 --- a/repair/featurize/initattfeat.py +++ b/repair/featurize/currentattrfeat.py @@ -7,23 +7,27 @@ def gen_feat_tensor(input, classes, total_attrs): vid = int(input[0]) attr_idx = input[1] - init_idx = int(input[2]) + current_idx = int(input[2]) tensor = -1.0*torch.ones(1,classes,total_attrs) - tensor[0][init_idx][attr_idx] = 1.0 + tensor[0][current_idx][attr_idx] = 1.0 return tensor -class InitAttFeaturizer(Featurizer): +class CurrentAttrFeaturizer(Featurizer): def specific_setup(self): - self.name = 'InitAttFeaturizer' + self.name = 'CurrentAttrFeaturizer' self.attr_to_idx = self.ds.attr_to_idx self.total_attrs = len(self.ds.attr_to_idx) def create_tensor(self): - query = 'SELECT _vid_, attribute, init_index FROM %s ORDER BY _vid_'%AuxTables.cell_domain.name + query = """ + SELECT + _vid_, + attribute_idx, + current_value_idx + FROM {cell_domain} + ORDER BY _vid_ + """.format(cell_domain=AuxTables.cell_domain.name) results = self.ds.engine.execute_query(query) - map_input = [] - for res in results: - map_input.append((res[0], self.attr_to_idx[res[1]], res[2])) - tensors = self.pool.map(partial(gen_feat_tensor, classes=self.classes, total_attrs=self.total_attrs), map_input) + tensors = self.pool.map(partial(gen_feat_tensor, classes=self.classes, total_attrs=self.total_attrs), results) combined = torch.cat(tensors) return combined diff --git a/repair/featurize/initfeat.py b/repair/featurize/currentfeat.py similarity index 59% rename from repair/featurize/initfeat.py rename to repair/featurize/currentfeat.py index e21be4989..72d610356 100644 --- a/repair/featurize/initfeat.py +++ b/repair/featurize/currentfeat.py @@ -7,18 +7,24 @@ def gen_feat_tensor(input, classes): vid = int(input[0]) - init_idx = int(input[1]) + current_idx = int(input[1]) tensor = -1.0*torch.ones(1,classes,1) - tensor[0][init_idx][0] = 1.0 + tensor[0][current_idx][0] = 1.0 return tensor -class InitFeaturizer(Featurizer): +class CurrentFeaturizer(Featurizer): def specific_setup(self): - self.name = 'InitFeaturizer' + self.name = 'CurrentFeaturizer' def create_tensor(self): - query = 'SELECT _vid_, init_index FROM %s ORDER BY _vid_'%AuxTables.cell_domain.name + query = """ + SELECT + _vid_, + current_value_idx + FROM {cell_domain} + ORDER BY _vid_ + """.format(cell_domain=AuxTables.cell_domain.name) results = self.ds.engine.execute_query(query) tensors = self.pool.map(partial(gen_feat_tensor, classes=self.classes), results) combined = torch.cat(tensors) diff --git a/repair/featurize/initsimfeat.py b/repair/featurize/currentsimfeat.py similarity index 58% rename from repair/featurize/initsimfeat.py rename to repair/featurize/currentsimfeat.py index a29529621..59c2a06a1 100644 --- a/repair/featurize/initsimfeat.py +++ b/repair/featurize/currentsimfeat.py @@ -9,31 +9,37 @@ def gen_feat_tensor(input, classes, total_attrs): vid = int(input[0]) attr_idx = input[1] - init_value = input[2] + current_value = input[2] + domain = input[3].split('|||') # TODO: To add more similarity metrics increase the last dimension of tensor. tensor = torch.zeros(1, classes, total_attrs) - domain = input[2].split('|||') for idx, val in enumerate(domain): - if val == init_value: + if val == current_value: sim = -1.0 else: - sim = 2*Levenshtein.ratio(val, init_value) - 1 + sim = 2*Levenshtein.ratio(val, current_value) - 1 tensor[0][idx][attr_idx] = sim return tensor -class InitSimFeaturizer(Featurizer): +class CurrentSimFeaturizer(Featurizer): def specific_setup(self): - self.name = 'InitSimFeaturizer' + self.name = 'CurrentSimFeaturizer' self.attr_to_idx = self.ds.attr_to_idx self.total_attrs = len(self.ds.attr_to_idx) def create_tensor(self): - query = 'SELECT _vid_, attribute, init_value, domain FROM %s ORDER BY _vid_'%AuxTables.cell_domain.name + query = """ + SELECT + _vid_, + attribute_idx, + current_value, + domain + FROM {cell_domain} + ORDER BY _vid_ + """.format(cell_domain=AuxTables.cell_domain.name) results = self.ds.engine.execute_query(query) - map_input = [] - for res in results: - map_input.append((res[0],self.attr_to_idx[res[1]],res[2])) - tensors = self.pool.map(partial(gen_feat_tensor, classes=self.classes, total_attrs=self.total_attrs), map_input) + # Map attribute to their attribute indexes + tensors = self.pool.map(partial(gen_feat_tensor, classes=self.classes, total_attrs=self.total_attrs), results) combined = torch.cat(tensors) return combined diff --git a/repair/featurize/featurize.py b/repair/featurize/featurize.py index 94747a6b9..00326ab79 100644 --- a/repair/featurize/featurize.py +++ b/repair/featurize/featurize.py @@ -6,6 +6,7 @@ FeatInfo = namedtuple('FeatInfo', ['name', 'size', 'learnable', 'init_weight']) + class FeaturizedDataset: def __init__(self, dataset, env, featurizers): self.ds = dataset @@ -13,7 +14,7 @@ def __init__(self, dataset, env, featurizers): self.total_vars, self.classes = self.ds.get_domain_info() self.processes = self.env['threads'] for f in featurizers: - f.setup_featurizer(self.ds, self.total_vars, self.classes, self.processes) + f.setup_featurizer(self.env, self.ds, self.total_vars, self.classes, self.processes) tensors = [f.create_tensor() for f in featurizers] self.featurizer_info = [FeatInfo(featurizers[i].name, t.size()[2], featurizers[i].learnable, featurizers[i].init_weight) for i, t in enumerate(tensors)] tensor = torch.cat(tensors,2) @@ -26,7 +27,7 @@ def __init__(self, dataset, env, featurizers): self.debugging[feat] = {} self.debugging[feat]['size'] = debug.shape self.debugging[feat]['weights'] = debug - + self.tensor = tensor # TODO: remove after we validate it is not needed. self.in_features = self.tensor.shape[2] @@ -36,16 +37,33 @@ def __init__(self, dataset, env, featurizers): def generate_weak_labels(self): """ generate_weak_labels returns a tensor where for each VID we have the - domain index of the initial value. + domain index of the current value (our initial current value is our + target "weak" labels). :return: Torch.Tensor of size (# of variables) X 1 where tensor[i][0] - contains the domain index of the initial value for the i-th + contains the domain index of the current value for the i-th variable/VID. """ logging.debug("Generating weak labels.") - query = 'SELECT _vid_, init_index FROM %s AS t1 LEFT JOIN %s AS t2 ' \ - 'ON t1._cid_ = t2._cid_ WHERE t2._cid_ is NULL OR t1.fixed = 1;' % ( - AuxTables.cell_domain.name, AuxTables.dk_cells.name) + + # We include "t1.fixed = 1" i.e. cells with randomly generated domains + # are included in our weak labels: we want to keep their values as is. + # The other domain values in the random domain are negative samples + # for training. + query = """ + SELECT + _vid_, + current_value_idx + FROM + {cell_domain} AS t1 + LEFT JOIN + {dk_cells} AS t2 + ON t1._cid_ = t2._cid_ + WHERE + t2._cid_ is NULL + OR t1.fixed = 1 + """.format(cell_domain=AuxTables.cell_domain.name, + dk_cells=AuxTables.dk_cells.name) res = self.ds.engine.execute_query(query) if len(res) == 0: raise Exception("No weak labels available. Reduce pruning threshold.") @@ -92,10 +110,10 @@ def get_training_data(self): get_training_data returns X_train, y_train, and mask_train where each row of each tensor is a variable/VID and y_train are weak labels for each variable i.e. they are - set as the initial values. + set as the current value. - This assumes that we have a larger proportion of correct initial values - and only a small amount of incorrect initial values which allow us + This assumes that we have a larger proportion of correct current values + and only a small amount of incorrect current values which allow us to train to convergence. """ train_idx = (self.weak_labels != -1).nonzero()[:,0] diff --git a/repair/featurize/featurizer.py b/repair/featurize/featurizer.py index 70068dc83..9538cd985 100644 --- a/repair/featurize/featurizer.py +++ b/repair/featurize/featurizer.py @@ -11,7 +11,8 @@ def __init__(self, learnable=True, init_weight=1.0): self.learnable = learnable self.init_weight = init_weight - def setup_featurizer(self, dataset, total_vars, classes, processes=20): + def setup_featurizer(self, env, dataset, total_vars, classes, processes=20): + self.env = env self.ds = dataset self.total_vars = total_vars self.classes = classes diff --git a/repair/featurize/freqfeat.py b/repair/featurize/freqfeat.py index d1a9b6b8c..b29b0a221 100644 --- a/repair/featurize/freqfeat.py +++ b/repair/featurize/freqfeat.py @@ -19,7 +19,15 @@ def gen_feat_tensor(self, input, classes): attr_idx = self.ds.attr_to_idx[attribute] tensor = torch.zeros(1, classes, self.attrs_number) for idx, val in enumerate(domain): - prob = float(self.single_stats[attribute][val])/float(self.total) + freq = 0.0 + if self.env['current_stats']: + # In the case where we update statistics to current values after + # every EM iteration, the domain value may no longer appear amongst + # current values. + freq = self.single_stats[attribute].get(val, 0) + else: + freq = self.single_stats[attribute][val] + prob = float(freq)/float(self.total) tensor[0][idx][attr_idx] = prob return tensor diff --git a/repair/featurize/occurattrfeat.py b/repair/featurize/occurattrfeat.py index aaf92b946..8702029a2 100644 --- a/repair/featurize/occurattrfeat.py +++ b/repair/featurize/occurattrfeat.py @@ -30,10 +30,10 @@ def create_tensor(self): # Iterate over tuples in domain tensors = [] # Set tuple_id index on raw_data - t = self.ds.aux_table[AuxTables.cell_domain] + t = self.ds.get_aux_table(AuxTables.cell_domain) sorted_domain = t.df.reset_index().sort_values(by=['_vid_'])[['_tid_','attribute','_vid_','domain']] - records = sorted_domain.to_records() - for row in tqdm(list(records)): + tuples = sorted_domain.itertuples() + for row in tqdm(list(tuples)): #Get tuple from raw_dataset tid = row['_tid_'] tuple = self.raw_data_dict[tid] diff --git a/repair/featurize/occurfeat.py b/repair/featurize/occurfeat.py index 0233c00bc..a195fbd6a 100644 --- a/repair/featurize/occurfeat.py +++ b/repair/featurize/occurfeat.py @@ -13,7 +13,7 @@ def specific_setup(self): raise Exception('Featurizer %s is not properly setup.'%self.name) self.all_attrs = self.ds.get_attributes() self.attrs_number = len(self.ds.attr_to_idx) - self.raw_data_dict = {} + self.current_values_dict = {} self.total = None self.single_stats = None self.pair_stats = None @@ -23,13 +23,19 @@ def setup_stats(self): """ Memoize single (frequency of attribute-value) and pairwise stats (frequency of attr1-value1-attr2-value2) - from Dataset. + for the current values from loaded dataset. self.single_stats is a dict { attribute -> { value -> count } }. self.pair_stats is a dict { attr1 -> { attr2 -> { val1 -> {val2 -> co-occur frequency } } } }. """ - # raw_data_dict is a Dictionary mapping TID -> { attribute -> value } - self.raw_data_dict = self.ds.raw_data.df.set_index('_tid_').to_dict('index') + # current_values_dict is a Dictionary mapping TID -> { attribute -> current value } + self.current_values_dict = {} + + for (tid, attr, cur_val) in self.ds.get_aux_table(AuxTables.cell_domain).df[['_tid_', 'attribute', 'current_value']].itertuples(index=False): + self.current_values_dict[tid] = self.current_values_dict.get(tid, {}) + self.current_values_dict[tid][attr] = cur_val + + # frequency and co-occurrence frequencies total, single_stats, pair_stats = self.ds.get_statistics() self.total = float(total) self.single_stats = single_stats @@ -38,33 +44,32 @@ def setup_stats(self): def create_tensor(self): """ For each unique VID (cell) returns the co-occurrence probability between - each possible domain value for this VID and the initial/raw values for the + each possible domain value for this VID and the current value for the corresponding entity/tuple of this cell. :return: Torch.Tensor of shape (# of VIDs) X (max domain) X (# of attributes) where tensor[i][j][k] contains the co-occur probability between the j-th domain value - of the i-th random variable (VID) and the initial/raw value of the k-th + of the i-th random variable (VID) and the current value of the k-th attribute for the corresponding entity. """ # Iterate over tuples in domain tensors = [] # Set tuple_id index on raw_data - t = self.ds.aux_table[AuxTables.cell_domain] - sorted_domain = t.df.reset_index().sort_values(by=['_vid_'])[['_tid_','attribute','_vid_','domain']] + t = self.ds.get_aux_table(AuxTables.cell_domain) + sorted_domain = t.df.reset_index().sort_values(by=['_vid_'])[['_tid_','attribute','domain']] records = sorted_domain.to_records() - for row in tqdm(list(records)): - #Get tuple from raw_dataset + for row in tqdm(records): tid = row['_tid_'] - tuple = self.raw_data_dict[tid] - feat_tensor = self.gen_feat_tensor(row, tuple) + current_tuple = self.current_values_dict[tid] + feat_tensor = self.gen_feat_tensor(row, current_tuple) tensors.append(feat_tensor) combined = torch.cat(tensors) return combined - def gen_feat_tensor(self, row, tuple): + def gen_feat_tensor(self, row, current_tuple): """ For a given cell, we calculate the co-occurence probability of all domain values - for row['attribute'] with the row's co-value in every co-attributes. + for row['attribute'] with the row's current value in every co-attributes. That is for a domain value 'd' and current co-attribute value 'c' we have P(d | c) P(d, c) / P(c) @@ -73,9 +78,9 @@ def gen_feat_tensor(self, row, tuple): is the frequency of 'c'. """ # tensor is a (1 X domain size X # of attributes) pytorch.Tensor - # tensor[0][domain_idx][coattr_idx] contains the co-occurrence probability - # between the co-attribute (coattr_idx) and the domain values - # a possible domain value for this entity (row/tuple) + # tensor[0][domain_idx][rv_idx] contains the co-occurrence probability + # between the current attribute (row.attribute) and the domain values + # a possible domain value for this entity tensor = torch.zeros(1, self.classes, self.attrs_number) rv_attr = row['attribute'] # Domain value --> index mapping @@ -84,31 +89,36 @@ def gen_feat_tensor(self, row, tuple): # Iterate through every attribute (and current value for that # attribute) and set the co-occurrence probability for every - # domain value for our current row['attribute']. - for attr in self.all_attrs: - # Skip pairwise with current attribute or NULL value - if attr == rv_attr or pd.isnull(tuple[attr]): + # domain value for our current row.attribute. + for cur_attr in self.all_attrs: + # Skip pairwise with current attribute or NULL value. + # 'attr' may not be in 'current_tuple' since we do not + # store attributes with all NULL values in cell_domain. + if cur_attr == rv_attr or pd.isnull(current_tuple.get(cur_attr, None)): + continue + + cur_val = current_tuple[cur_attr] + if cur_val not in self.pair_stats[cur_attr][rv_attr]: + # cur_val may not be in pairwise stats if cur_attr contains + # only NULL values + if not pd.isnull(current_tuple[cur_attr]): + # Actual error if not null + raise Exception('Something is wrong with the pairwise statistics. <{cur_val}> should be present in dictionary.'.format(cur_val=cur_val)) continue - attr_idx = self.ds.attr_to_idx[attr] - val = tuple[attr] - attr_freq = float(self.single_stats[attr][val]) # Get topK values - if val not in self.pair_stats[attr][rv_attr]: - if not pd.isnull(tuple[rv_attr]): - raise Exception('Something is wrong with the pairwise statistics. <{val}> should be present in dictionary.'.format(val)) + all_vals = self.pair_stats[cur_attr][rv_attr][cur_val] + if len(all_vals) <= len(rv_domain_idx): + candidates = list(all_vals.keys()) else: - # dict of { val -> co-occur count } - all_vals = self.pair_stats[attr][rv_attr][val] - if len(all_vals) <= len(rv_domain_idx): - candidates = list(all_vals.keys()) - else: - candidates = domain - - # iterate through all possible domain values of row['attribute'] - for rv_val in candidates: - cooccur_freq = float(all_vals.get(rv_val,0.0)) - prob = cooccur_freq/attr_freq - if rv_val in rv_domain_idx: - tensor[0][rv_domain_idx[rv_val]][attr_idx] = prob + candidates = domain + + # iterate through all possible domain values of row.attribute + for rv_val in candidates: + cooccur_freq = float(all_vals.get(rv_val,0.0)) + cur_attr_freq = float(self.single_stats[cur_attr][cur_val]) + prob = cooccur_freq/cur_attr_freq + if rv_val in rv_domain_idx: + cur_attr_idx = self.ds.attr_to_idx[cur_attr] + tensor[0][rv_domain_idx[rv_val]][cur_attr_idx] = prob return tensor diff --git a/repair/repair.py b/repair/repair.py index 0025e415f..bb74ad789 100644 --- a/repair/repair.py +++ b/repair/repair.py @@ -72,4 +72,4 @@ def get_featurizer_weights(self): report = self.repair_model.get_featurizer_weights(self.feat_dataset.featurizer_info, self.feat_dataset.debugging) toc = time.clock() report_time = toc - tic - return status, report_time + return report, report_time diff --git a/tests/test_holoclean_repair.py b/tests/test_holoclean_repair.py index 9f2259647..15933345c 100644 --- a/tests/test_holoclean_repair.py +++ b/tests/test_holoclean_repair.py @@ -1,10 +1,10 @@ import unittest import holoclean -from detect import NullDetector, ViolationDetector -from repair.featurize import InitFeaturizer -from repair.featurize import InitAttFeaturizer -from repair.featurize import InitSimFeaturizer +from detect import MultiInitDetector, NullDetector, ViolationDetector +from repair.featurize import CurrentFeaturizer +from repair.featurize import CurrentAttrFeaturizer +from repair.featurize import CurrentSimFeaturizer from repair.featurize import FreqFeaturizer from repair.featurize import OccurFeaturizer from repair.featurize import ConstraintFeat @@ -14,7 +14,8 @@ class TestHolocleanRepair(unittest.TestCase): def test_hospital(self): # 1. Setup a HoloClean session. - hc = holoclean.HoloClean(pruning_topk=0.1, epochs=30, weight_decay=0.01, threads=20, batch_size=1, verbose=True, timeout=3*60000).session + hc = holoclean.HoloClean(pruning_topk=0.1, epochs=10, weight_decay=0.01, + threads=4, batch_size=1, verbose=True, timeout=3*60000).session # 2. Load training data and denial constraints. hc.load_data('hospital', '../testdata/hospital.csv') @@ -22,16 +23,17 @@ def test_hospital(self): hc.ds.set_constraints(hc.get_dcs()) # 3. Detect erroneous cells using these two detectors. - detectors = [NullDetector(), ViolationDetector()] + detectors = [MultiInitDetector(), NullDetector(), ViolationDetector()] hc.detect_errors(detectors) # 4. Repair errors utilizing the defined features. hc.setup_domain() - featurizers = [InitAttFeaturizer(), InitSimFeaturizer(), FreqFeaturizer(), OccurFeaturizer(), LangModelFeat(), ConstraintFeat()] - hc.repair_errors(featurizers) + featurizers = [CurrentAttrFeaturizer(), CurrentSimFeaturizer(), FreqFeaturizer(), OccurFeaturizer(), LangModelFeat(), ConstraintFeat()] + + # 5. Repair and evaluate the correctness of the results. + eval_func = lambda: hc.evaluate('../testdata/hospital_clean.csv', 'tid', 'attribute', 'correct_val') + hc.repair_errors(featurizers, em_iterations=3, em_iter_func=eval_func) - # 5. Evaluate the correctness of the results. - hc.evaluate('../testdata/hospital_clean.csv', 'tid', 'attribute', 'correct_val') if __name__ == '__main__': unitttest.main()