From 3463b78b2fe939063d873f3bad91efde184605de Mon Sep 17 00:00:00 2001 From: Richard Wu Date: Tue, 20 Nov 2018 21:10:47 -0500 Subject: [PATCH 01/14] Created separate column for init values (1 or more) and current value (singular value, old 'init_value'). --- dataset/dataset.py | 21 +++- domain/domain.py | 108 +++++++++++------- evaluate/eval.py | 78 +++++++++---- examples/holoclean_repair_example.py | 9 +- repair/featurize/__init__.py | 14 +-- .../{initattfeat.py => currentattrfeat.py} | 22 ++-- .../featurize/{initfeat.py => currentfeat.py} | 16 ++- .../{initsimfeat.py => currentsimfeat.py} | 28 +++-- repair/featurize/featurize.py | 27 +++-- repair/featurize/occurfeat.py | 6 +- tests/test_holoclean_repair.py | 8 +- 11 files changed, 217 insertions(+), 120 deletions(-) rename repair/featurize/{initattfeat.py => currentattrfeat.py} (58%) rename repair/featurize/{initfeat.py => currentfeat.py} (59%) rename repair/featurize/{initsimfeat.py => currentsimfeat.py} (58%) diff --git a/dataset/dataset.py b/dataset/dataset.py index 7a52461bd..9d255d084 100644 --- a/dataset/dataset.py +++ b/dataset/dataset.py @@ -286,12 +286,21 @@ def get_domain_info(self): def get_inferred_values(self): tic = time.clock() - query = "SELECT t1._tid_, t1.attribute, domain[inferred_assignment + 1] as rv_value " \ - "FROM " \ - "(SELECT _tid_, attribute, " \ - "_vid_, init_value, string_to_array(regexp_replace(domain, \'[{\"\"}]\', \'\', \'gi\'), \'|||\') as domain " \ - "FROM %s) as t1, %s as t2 " \ - "WHERE t1._vid_ = t2._vid_"%(AuxTables.cell_domain.name, AuxTables.inf_values_idx.name) + query = """ + SELECT t1._tid_, + t1.attribute, + domain[inferred_assignment + 1] AS rv_value + FROM ( + SELECT _tid_, + attribute, + _vid_, + current_value, + string_to_array(regexp_replace(domain, \'[{\"\"}]\', \'\', \'gi\'), \'|||\') AS domain + FROM {cell_domain}) AS t1, + {inf_values_idx} AS t2 + WHERE t1._vid_ = t2._vid_ + """.format(cell_domain=AuxTables.cell_domain.name, + inf_values_idx=AuxTables.inf_values_idx.name) self.generate_aux_table_sql(AuxTables.inf_values_dom, query, index_attrs=['_tid_']) self.aux_table[AuxTables.inf_values_dom].create_db_index(self.engine, ['attribute']) status = "DONE collecting the inferred values." diff --git a/domain/domain.py b/domain/domain.py index d45b8d97d..660cfb823 100644 --- a/domain/domain.py +++ b/domain/domain.py @@ -7,7 +7,6 @@ from dataset import AuxTables - class DomainEngine: def __init__(self, env, dataset, cor_strength = 0.1, sampling_prob=0.3, max_sample=5): """ @@ -74,18 +73,30 @@ def store_domains(self, domain): pos_values schema: _tid_: entity/tuple ID _cid_: cell ID - _vid_: random variable ID (all cells with more than 1 domain value) - _ - + _vid_: random variable ID (1-1 with _cid_) + attribute: name of attribute + rv_val: cell value + val_id: domain index of rv_val """ if domain.empty: raise Exception("ERROR: Generated domain is empty.") - else: - self.ds.generate_aux_table(AuxTables.cell_domain, domain, store=True, index_attrs=['_vid_']) - self.ds.aux_table[AuxTables.cell_domain].create_db_index(self.ds.engine, ['_tid_']) - self.ds.aux_table[AuxTables.cell_domain].create_db_index(self.ds.engine, ['_cid_']) - query = "SELECT _vid_, _cid_, _tid_, attribute, a.rv_val, a.val_id from %s , unnest(string_to_array(regexp_replace(domain,\'[{\"\"}]\',\'\',\'gi\'),\'|||\')) WITH ORDINALITY a(rv_val,val_id)" % AuxTables.cell_domain.name - self.ds.generate_aux_table_sql(AuxTables.pos_values, query, index_attrs=['_tid_', 'attribute']) + + self.ds.generate_aux_table(AuxTables.cell_domain, domain, store=True, index_attrs=['_vid_']) + self.ds.aux_table[AuxTables.cell_domain].create_db_index(self.ds.engine, ['_tid_']) + self.ds.aux_table[AuxTables.cell_domain].create_db_index(self.ds.engine, ['_cid_']) + query = """ + SELECT + _vid_, + _cid_, + _tid_, + attribute, + a.rv_val, + a.val_id + FROM + {cell_domain}, + unnest(string_to_array(regexp_replace(domain,\'[{{\"\"}}]\',\'\',\'gi\'),\'|||\')) WITH ORDINALITY a(rv_val,val_id) + """.format(cell_domain=AuxTables.cell_domain.name) + self.ds.generate_aux_table_sql(AuxTables.pos_values, query, index_attrs=['_tid_', 'attribute']) def setup_attributes(self): self.active_attributes = self.get_active_attributes() @@ -177,10 +188,13 @@ def generate_domain(self): _cid_: cell ID (unique for every entity-attribute) _vid_: variable ID (1-1 correspondence with _cid_) attribute: attribute name + attribute_idx: index of attribute domain: ||| seperated string of domain values domain_size: length of domain - init_value: initial value for this cell - init_value_idx: domain index of init_value + init_values: initial values for this cell + init_values_idx: domain indexes of init_values + current_value: current value (current predicted) + current_value_idx: domain index for current value fixed: 1 if a random sample was taken since no correlated attributes/top K values """ @@ -196,23 +210,28 @@ def generate_domain(self): tid = row['_tid_'] app = [] for attr in self.active_attributes: - init_value, dom = self.get_domain_cell(attr, row) - init_value_idx = dom.index(init_value) - if len(dom) > 1: - cid = self.ds.get_cell_id(tid, attr) - app.append({"_tid_": tid, "attribute": attr, "_cid_": cid, "_vid_":vid, "domain": "|||".join(dom), "domain_size": len(dom), - "init_value": init_value, "init_index": init_value_idx, "fixed":0}) - vid += 1 - else: - add_domain = self.get_random_domain(attr,init_value) + init_values, current_value, dom = self.get_domain_cell(attr, row) + init_values_idx = list(map(dom.index, init_values)) + current_value_idx = dom.index(current_value) + cid = self.ds.get_cell_id(tid, attr) + fixed = 0 + + # If domain could not be generated from correlated attributes, + # randomly choose values to add to our domain. + if len(dom) == 1: + fixed = 1 + add_domain = self.get_random_domain(attr, init_values) # Check if attribute has more than one unique values if len(add_domain) > 0: - dom.extend(self.get_random_domain(attr,init_value)) - cid = self.ds.get_cell_id(tid, attr) - app.append({"_tid_": tid, "attribute": attr, "_cid_": cid, "_vid_": vid, "domain": "|||".join(dom), - "domain_size": len(dom), - "init_value": init_value, "init_index": init_value_idx, "fixed": 1}) - vid += 1 + dom.extend(add_domain) + + app.append({"_tid_": tid, "_cid_": cid, "_vid_":vid, + "attribute": attr, "attribute_idx": self.ds.attr_to_idx[attr], + "domain": '|||'.join(dom), "domain_size": len(dom), + "init_values": '|||'.join(init_values), "init_values_idx": '|||'.join(init_values_idx), + "current_value": current_value, "current_value_idx": current_value_idx, + "fixed": fixed}) + vid+=1 cells.extend(app) domain_df = pd.DataFrame(data=cells) logging.info('DONE generating domain') @@ -220,8 +239,8 @@ def generate_domain(self): def get_domain_cell(self, attr, row): """ - get_domain_cell returns a list of all domain values for the given - entity (row) and attribute. + get_domain_cell returns list of init values, current (best predicted) + value, and list of domain values for the given cell. We define domain values as values in 'attr' that co-occur with values in attributes ('cond_attr') that are correlated with 'attr' at least in @@ -237,7 +256,7 @@ def get_domain_cell(self, attr, row): This would produce [B,C,E] as domain values. - :return: (initial value of entity-attribute, domain values for entity-attribute). + :return: (list of initial values, current value, list of domain values). """ domain = set([]) @@ -265,25 +284,34 @@ def get_domain_cell(self, attr, row): # Remove _nan_ if added due to correlated attributes domain.discard('_nan_') + # Add initial value in domain - if pd.isnull(row[attr]): - domain.update(set(['_nan_'])) - init_value = '_nan_' - else: - domain.update(set([row[attr]])) - init_value = row[attr] - return init_value, list(domain) + init_values = ['_nan_'] + if not pd.isnull(row[attr]): + # Assume value in raw dataset is given as ||| separate initial values + init_values = row[attr].split('|||') + domain.update(set(init_values)) + + # Take the first initial value as the current value + current_value = init_values[0] - def get_random_domain(self, attr, cur_value): + return init_values, current_value, list(domain) + + def get_random_domain(self, attr, init_values): """ get_random_domain returns a random sample of at most size - 'self.max_sample' of domain values for 'attr' that is NOT 'cur_value'. + 'self.max_sample' of domain values for :param attr: that is NOT any + of :param init_values: + + :param attr: (str) name of attribute to generate random domain for + :param init_values: (list[str]) list of initial values """ if random.random() > self.sampling_prob: return [] domain_pool = set(self.single_stats[attr].keys()) - domain_pool.discard(cur_value) + # Do not include initial values in random domain + domain_pool = domain_pool.difference(init_values) size = len(domain_pool) if size > 0: k = min(self.max_sample, size) diff --git a/evaluate/eval.py b/evaluate/eval.py index c2b358dfe..852e00d4e 100644 --- a/evaluate/eval.py +++ b/evaluate/eval.py @@ -92,26 +92,46 @@ def eval_report(self): return report, report_time, report_list def compute_total_repairs(self): - query = "SELECT count(*) FROM " \ - "(SELECT _vid_ " \ - "FROM %s as t1, %s as t2 " \ - "WHERE t1._tid_ = t2._tid_ " \ - "AND t1.attribute = t2.attribute " \ - "AND t1.init_value != t2.rv_value) AS t"\ - %(AuxTables.cell_domain.name, AuxTables.inf_values_dom.name) + query = """ + SELECT + count(*) + FROM + (SELECT + _vid_ + FROM + {cell_domain} AS t1, + {inf_values_dom} as t2 + WHERE + t1._tid_ = t2._tid_ + AND t1.attribute = t2.attribute + AND t1.current_value != t2.rv_value + ) + """.format(cell_domain=AuxTables.cell_domain.name, + inf_values_dom=AuxTables.inf_values_dom.name) res = self.ds.engine.execute_query(query) self.total_repairs = float(res[0][0]) def compute_total_repairs_grdt(self): - query = "SELECT count(*) FROM " \ - "(SELECT _vid_ " \ - "FROM %s as t1, %s as t2, %s as t3 " \ - "WHERE t1._tid_ = t2._tid_ " \ - "AND t1.attribute = t2.attribute " \ - "AND t1.init_value != t2.rv_value " \ - "AND t1._tid_ = t3._tid_ " \ - "AND t1.attribute = t3._attribute_) AS t"\ - %(AuxTables.cell_domain.name, AuxTables.inf_values_dom.name, self.clean_data.name) + query = """ + SELECT + count(*) + FROM + (SELECT + _vid_ + FROM + {cell_domain} AS t1, + {inf_values_dom} AS t2, + {clean_data} AS t3 + WHERE + t1._tid_ = t2._tid_ + AND t1.attribute = t2.attribute + AND t1.current_value != t2.rv_value + AND t1._tid_ = t3._tid_ + AND t1.attribute = t3._attribute_ + ) AS t + """.format(cell_domain=AuxTables.cell_domain.name, + inf_values_dom=AuxTables.inf_values_dom.name, + clean_data=self.clean_data.name) res = self.ds.engine.execute_query(query) self.total_repairs_grdt = float(res[0][0]) @@ -140,13 +160,25 @@ def compute_total_errors_grdt(self): self.total_errors = total_errors def compute_detected_errors(self): - query = "SELECT count(*) FROM " \ - "(SELECT _vid_ " \ - "FROM %s as t1, %s as t2, %s as t3 " \ - "WHERE t1._tid_ = t2._tid_ AND t1._cid_ = t3._cid_ " \ - "AND t1.attribute = t2._attribute_ " \ - "AND t1.init_value != t2._value_) AS t" \ - % (AuxTables.cell_domain.name, self.clean_data.name, AuxTables.dk_cells.name) + query = """ + SELECT + count(*) + FROM + (SELECT + _vid_ + FROM + {cell_domain} AS t1, + {clean_data} AS t2, + {dk_cells} AS t3 + WHERE + t1._tid_ = t2._tid_ + AND t1._cid_ = t3._cid_ + AND t1.attribute = t2._attribute_ + AND t1.current_value != t2._value_ + ) AS t + """.format(cell_domain=AuxTables.cell_domain.name, + clean_data=self.clean_data.name, + dk_cells=AuxTables.dk_cells.name) res = self.ds.engine.execute_query(query) self.detected_errors = float(res[0][0]) diff --git a/examples/holoclean_repair_example.py b/examples/holoclean_repair_example.py index 6aaddfe52..52df2bea4 100644 --- a/examples/holoclean_repair_example.py +++ b/examples/holoclean_repair_example.py @@ -1,7 +1,8 @@ import holoclean from detect import NullDetector, ViolationDetector -from repair.featurize import InitAttFeaturizer -from repair.featurize import InitSimFeaturizer +from repair.featurize import CurrentFeaturizer +from repair.featurize import CurrentAttrFeaturizer +from repair.featurize import CurrentSimFeaturizer from repair.featurize import FreqFeaturizer from repair.featurize import OccurFeaturizer from repair.featurize import ConstraintFeat @@ -32,8 +33,8 @@ # 4. Repair errors utilizing the defined features. hc.setup_domain() featurizers = [ - InitAttFeaturizer(learnable=False), - InitSimFeaturizer(), + CurrentAttrFeaturizer(learnable=False), + CurrentSimFeaturizer(), FreqFeaturizer(), OccurFeaturizer(), LangModelFeat(), diff --git a/repair/featurize/__init__.py b/repair/featurize/__init__.py index 23a9d438e..fab366780 100644 --- a/repair/featurize/__init__.py +++ b/repair/featurize/__init__.py @@ -1,13 +1,13 @@ from .featurize import FeaturizedDataset from .featurizer import Featurizer -from .initfeat import InitFeaturizer -from .initsimfeat import InitSimFeaturizer -from .freqfeat import FreqFeaturizer -from .occurfeat import OccurFeaturizer from .constraintfeat import ConstraintFeat +from .currentfeat import CurrentFeaturizer +from .currentattrfeat import CurrentAttrFeaturizer +from .currentsimfeat import CurrentSimFeaturizer +from .freqfeat import FreqFeaturizer from .langmodel import LangModelFeat -from .initattfeat import InitAttFeaturizer +from .occurfeat import OccurFeaturizer from .occurattrfeat import OccurAttrFeaturizer -__all__ = ['FeaturizedDataset', 'Featurizer', 'InitFeaturizer', 'InitSimFeaturizer', 'FreqFeaturizer', - 'OccurFeaturizer', 'ConstraintFeat', 'LangModelFeat', 'InitAttFeaturizer', 'OccurAttrFeaturizer'] +__all__ = ['FeaturizedDataset', 'Featurizer', 'CurrentFeaturizer', 'CurrentSimFeaturizer', 'FreqFeaturizer', + 'OccurFeaturizer', 'ConstraintFeat', 'LangModelFeat', 'CurrentAttrFeaturizer', 'OccurAttrFeaturizer'] diff --git a/repair/featurize/initattfeat.py b/repair/featurize/currentattrfeat.py similarity index 58% rename from repair/featurize/initattfeat.py rename to repair/featurize/currentattrfeat.py index 10d0369b6..b7d0eaac4 100644 --- a/repair/featurize/initattfeat.py +++ b/repair/featurize/currentattrfeat.py @@ -7,23 +7,27 @@ def gen_feat_tensor(input, classes, total_attrs): vid = int(input[0]) attr_idx = input[1] - init_idx = int(input[2]) + current_idx = int(input[2]) tensor = -1.0*torch.ones(1,classes,total_attrs) - tensor[0][init_idx][attr_idx] = 1.0 + tensor[0][current_idx][attr_idx] = 1.0 return tensor -class InitAttFeaturizer(Featurizer): +class CurrentAttrFeaturizer(Featurizer): def specific_setup(self): - self.name = 'InitAttFeaturizer' + self.name = 'CurrentAttrFeaturizer' self.attr_to_idx = self.ds.attr_to_idx self.total_attrs = len(self.ds.attr_to_idx) def create_tensor(self): - query = 'SELECT _vid_, attribute, init_index FROM %s ORDER BY _vid_'%AuxTables.cell_domain.name + query = """ + SELECT + _vid_, + attribute_idx, + current_value_idx + FROM {cell_domain} + ORDER BY _vid_ + """.format(cell_domain=AuxTables.cell_domain.name) results = self.ds.engine.execute_query(query) - map_input = [] - for res in results: - map_input.append((res[0], self.attr_to_idx[res[1]], res[2])) - tensors = self.pool.map(partial(gen_feat_tensor, classes=self.classes, total_attrs=self.total_attrs), map_input) + tensors = self.pool.map(partial(gen_feat_tensor, classes=self.classes, total_attrs=self.total_attrs), results) combined = torch.cat(tensors) return combined diff --git a/repair/featurize/initfeat.py b/repair/featurize/currentfeat.py similarity index 59% rename from repair/featurize/initfeat.py rename to repair/featurize/currentfeat.py index e21be4989..72d610356 100644 --- a/repair/featurize/initfeat.py +++ b/repair/featurize/currentfeat.py @@ -7,18 +7,24 @@ def gen_feat_tensor(input, classes): vid = int(input[0]) - init_idx = int(input[1]) + current_idx = int(input[1]) tensor = -1.0*torch.ones(1,classes,1) - tensor[0][init_idx][0] = 1.0 + tensor[0][current_idx][0] = 1.0 return tensor -class InitFeaturizer(Featurizer): +class CurrentFeaturizer(Featurizer): def specific_setup(self): - self.name = 'InitFeaturizer' + self.name = 'CurrentFeaturizer' def create_tensor(self): - query = 'SELECT _vid_, init_index FROM %s ORDER BY _vid_'%AuxTables.cell_domain.name + query = """ + SELECT + _vid_, + current_value_idx + FROM {cell_domain} + ORDER BY _vid_ + """.format(cell_domain=AuxTables.cell_domain.name) results = self.ds.engine.execute_query(query) tensors = self.pool.map(partial(gen_feat_tensor, classes=self.classes), results) combined = torch.cat(tensors) diff --git a/repair/featurize/initsimfeat.py b/repair/featurize/currentsimfeat.py similarity index 58% rename from repair/featurize/initsimfeat.py rename to repair/featurize/currentsimfeat.py index a29529621..59c2a06a1 100644 --- a/repair/featurize/initsimfeat.py +++ b/repair/featurize/currentsimfeat.py @@ -9,31 +9,37 @@ def gen_feat_tensor(input, classes, total_attrs): vid = int(input[0]) attr_idx = input[1] - init_value = input[2] + current_value = input[2] + domain = input[3].split('|||') # TODO: To add more similarity metrics increase the last dimension of tensor. tensor = torch.zeros(1, classes, total_attrs) - domain = input[2].split('|||') for idx, val in enumerate(domain): - if val == init_value: + if val == current_value: sim = -1.0 else: - sim = 2*Levenshtein.ratio(val, init_value) - 1 + sim = 2*Levenshtein.ratio(val, current_value) - 1 tensor[0][idx][attr_idx] = sim return tensor -class InitSimFeaturizer(Featurizer): +class CurrentSimFeaturizer(Featurizer): def specific_setup(self): - self.name = 'InitSimFeaturizer' + self.name = 'CurrentSimFeaturizer' self.attr_to_idx = self.ds.attr_to_idx self.total_attrs = len(self.ds.attr_to_idx) def create_tensor(self): - query = 'SELECT _vid_, attribute, init_value, domain FROM %s ORDER BY _vid_'%AuxTables.cell_domain.name + query = """ + SELECT + _vid_, + attribute_idx, + current_value, + domain + FROM {cell_domain} + ORDER BY _vid_ + """.format(cell_domain=AuxTables.cell_domain.name) results = self.ds.engine.execute_query(query) - map_input = [] - for res in results: - map_input.append((res[0],self.attr_to_idx[res[1]],res[2])) - tensors = self.pool.map(partial(gen_feat_tensor, classes=self.classes, total_attrs=self.total_attrs), map_input) + # Map attribute to their attribute indexes + tensors = self.pool.map(partial(gen_feat_tensor, classes=self.classes, total_attrs=self.total_attrs), results) combined = torch.cat(tensors) return combined diff --git a/repair/featurize/featurize.py b/repair/featurize/featurize.py index 94747a6b9..351f8dd17 100644 --- a/repair/featurize/featurize.py +++ b/repair/featurize/featurize.py @@ -36,16 +36,27 @@ def __init__(self, dataset, env, featurizers): def generate_weak_labels(self): """ generate_weak_labels returns a tensor where for each VID we have the - domain index of the initial value. + domain index of the current value. :return: Torch.Tensor of size (# of variables) X 1 where tensor[i][0] - contains the domain index of the initial value for the i-th + contains the domain index of the current value for the i-th variable/VID. """ logging.debug("Generating weak labels.") - query = 'SELECT _vid_, init_index FROM %s AS t1 LEFT JOIN %s AS t2 ' \ - 'ON t1._cid_ = t2._cid_ WHERE t2._cid_ is NULL OR t1.fixed = 1;' % ( - AuxTables.cell_domain.name, AuxTables.dk_cells.name) + query = """ + SELECT + _vid_, + current_index + FROM + {cell_domain} AS t1 + LEFT JOIN + {dk_cells} AS t2 + ON t1._cid_ = t2._cid_ + WHERE + t2._cid_ is NULL + OR t1.fixed = 1 + """.format(cell_domain=AuxTables.cell_domain.name, + dk_cells=AuxTables.dk_cells.name) res = self.ds.engine.execute_query(query) if len(res) == 0: raise Exception("No weak labels available. Reduce pruning threshold.") @@ -92,10 +103,10 @@ def get_training_data(self): get_training_data returns X_train, y_train, and mask_train where each row of each tensor is a variable/VID and y_train are weak labels for each variable i.e. they are - set as the initial values. + set as the current value. - This assumes that we have a larger proportion of correct initial values - and only a small amount of incorrect initial values which allow us + This assumes that we have a larger proportion of correct current values + and only a small amount of incorrect current values which allow us to train to convergence. """ train_idx = (self.weak_labels != -1).nonzero()[:,0] diff --git a/repair/featurize/occurfeat.py b/repair/featurize/occurfeat.py index 0233c00bc..d2e038acc 100644 --- a/repair/featurize/occurfeat.py +++ b/repair/featurize/occurfeat.py @@ -23,7 +23,7 @@ def setup_stats(self): """ Memoize single (frequency of attribute-value) and pairwise stats (frequency of attr1-value1-attr2-value2) - from Dataset. + for the current values from loaded dataset. self.single_stats is a dict { attribute -> { value -> count } }. self.pair_stats is a dict { attr1 -> { attr2 -> { val1 -> {val2 -> co-occur frequency } } } }. @@ -38,12 +38,12 @@ def setup_stats(self): def create_tensor(self): """ For each unique VID (cell) returns the co-occurrence probability between - each possible domain value for this VID and the initial/raw values for the + each possible domain value for this VID and the current value for the corresponding entity/tuple of this cell. :return: Torch.Tensor of shape (# of VIDs) X (max domain) X (# of attributes) where tensor[i][j][k] contains the co-occur probability between the j-th domain value - of the i-th random variable (VID) and the initial/raw value of the k-th + of the i-th random variable (VID) and the current value of the k-th attribute for the corresponding entity. """ # Iterate over tuples in domain diff --git a/tests/test_holoclean_repair.py b/tests/test_holoclean_repair.py index 9f2259647..5bdef72de 100644 --- a/tests/test_holoclean_repair.py +++ b/tests/test_holoclean_repair.py @@ -2,9 +2,9 @@ import holoclean from detect import NullDetector, ViolationDetector -from repair.featurize import InitFeaturizer -from repair.featurize import InitAttFeaturizer -from repair.featurize import InitSimFeaturizer +from repair.featurize import CurrentFeaturizer +from repair.featurize import CurrentAttrFeaturizer +from repair.featurize import CurrentSimFeaturizer from repair.featurize import FreqFeaturizer from repair.featurize import OccurFeaturizer from repair.featurize import ConstraintFeat @@ -27,7 +27,7 @@ def test_hospital(self): # 4. Repair errors utilizing the defined features. hc.setup_domain() - featurizers = [InitAttFeaturizer(), InitSimFeaturizer(), FreqFeaturizer(), OccurFeaturizer(), LangModelFeat(), ConstraintFeat()] + featurizers = [CurrentAttrFeaturizer(), CurrentSimFeaturizer(), FreqFeaturizer(), OccurFeaturizer(), LangModelFeat(), ConstraintFeat()] hc.repair_errors(featurizers) # 5. Evaluate the correctness of the results. From 394d0bd3a0687733f6e93f9380b916e65c8666bc Mon Sep 17 00:00:00 2001 From: Richard Wu Date: Tue, 20 Nov 2018 21:46:08 -0500 Subject: [PATCH 02/14] Fixed some typos for current columns. --- dataset/dataset.py | 2 +- domain/domain.py | 6 ++---- evaluate/eval.py | 2 +- repair/featurize/featurize.py | 3 ++- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/dataset/dataset.py b/dataset/dataset.py index 9d255d084..e7475a70a 100644 --- a/dataset/dataset.py +++ b/dataset/dataset.py @@ -295,7 +295,7 @@ def get_inferred_values(self): attribute, _vid_, current_value, - string_to_array(regexp_replace(domain, \'[{\"\"}]\', \'\', \'gi\'), \'|||\') AS domain + string_to_array(regexp_replace(domain, \'[{{\"\"}}]\', \'\', \'gi\'), \'|||\') AS domain FROM {cell_domain}) AS t1, {inf_values_idx} AS t2 WHERE t1._vid_ = t2._vid_ diff --git a/domain/domain.py b/domain/domain.py index 660cfb823..11101c427 100644 --- a/domain/domain.py +++ b/domain/domain.py @@ -221,14 +221,12 @@ def generate_domain(self): if len(dom) == 1: fixed = 1 add_domain = self.get_random_domain(attr, init_values) - # Check if attribute has more than one unique values - if len(add_domain) > 0: - dom.extend(add_domain) + dom.extend(add_domain) app.append({"_tid_": tid, "_cid_": cid, "_vid_":vid, "attribute": attr, "attribute_idx": self.ds.attr_to_idx[attr], "domain": '|||'.join(dom), "domain_size": len(dom), - "init_values": '|||'.join(init_values), "init_values_idx": '|||'.join(init_values_idx), + "init_values": '|||'.join(init_values), "init_values_idx": '|||'.join(map(str,init_values_idx)), "current_value": current_value, "current_value_idx": current_value_idx, "fixed": fixed}) vid+=1 diff --git a/evaluate/eval.py b/evaluate/eval.py index 852e00d4e..f9daf7e92 100644 --- a/evaluate/eval.py +++ b/evaluate/eval.py @@ -105,7 +105,7 @@ def compute_total_repairs(self): t1._tid_ = t2._tid_ AND t1.attribute = t2.attribute AND t1.current_value != t2.rv_value - ) + ) AS t """.format(cell_domain=AuxTables.cell_domain.name, inf_values_dom=AuxTables.inf_values_dom.name) res = self.ds.engine.execute_query(query) diff --git a/repair/featurize/featurize.py b/repair/featurize/featurize.py index 351f8dd17..98eedb168 100644 --- a/repair/featurize/featurize.py +++ b/repair/featurize/featurize.py @@ -6,6 +6,7 @@ FeatInfo = namedtuple('FeatInfo', ['name', 'size', 'learnable', 'init_weight']) + class FeaturizedDataset: def __init__(self, dataset, env, featurizers): self.ds = dataset @@ -46,7 +47,7 @@ def generate_weak_labels(self): query = """ SELECT _vid_, - current_index + current_value_idx FROM {cell_domain} AS t1 LEFT JOIN From 5d675019adeaaff800548d209dd6b8af2b17ff2b Mon Sep 17 00:00:00 2001 From: Richard Wu Date: Wed, 21 Nov 2018 17:41:16 -0500 Subject: [PATCH 03/14] Addressed PR comments. --- domain/domain.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/domain/domain.py b/domain/domain.py index 11101c427..8d0d144b9 100644 --- a/domain/domain.py +++ b/domain/domain.py @@ -99,7 +99,7 @@ def store_domains(self, domain): self.ds.generate_aux_table_sql(AuxTables.pos_values, query, index_attrs=['_tid_', 'attribute']) def setup_attributes(self): - self.active_attributes = self.get_active_attributes() + self.active_attributes = self.fetch_active_attributes() total, single_stats, pair_stats = self.ds.get_statistics() self.total = total self.single_stats = single_stats @@ -143,9 +143,9 @@ def _topk_pair_stats(self, pair_stats): out[attr1][attr2][val1] = top_cands return out - def get_active_attributes(self): + def fetch_active_attributes(self): """ - get_active_attributes returns the attributes to be modeled. + fetch_active_attributes fetches/refetches the attributes to be modeled. These attributes correspond only to attributes that contain at least one potentially erroneous cell. """ @@ -209,6 +209,12 @@ def generate_domain(self): for row in tqdm(list(records)): tid = row['_tid_'] app = [] + + # Iterate over each active attribute (attributes that have at + # least one dk cell) and generate for this cell: + # 1) the domain values + # 2) the initial values (taken from raw data) + # 3) the current value (best predicted value) for attr in self.active_attributes: init_values, current_value, dom = self.get_domain_cell(attr, row) init_values_idx = list(map(dom.index, init_values)) @@ -257,7 +263,7 @@ def get_domain_cell(self, attr, row): :return: (list of initial values, current value, list of domain values). """ - domain = set([]) + domain = set() correlated_attributes = self.get_corr_attributes(attr) # Iterate through all attributes correlated at least self.cor_strength ('cond_attr') # and take the top K co-occurrence values for 'attr' with the current @@ -291,6 +297,7 @@ def get_domain_cell(self, attr, row): domain.update(set(init_values)) # Take the first initial value as the current value + # TODO(richardwu): revisit how we should initialize 'current' current_value = init_values[0] return init_values, current_value, list(domain) From 7c7a59c5457017054dfc680bc78ad9ace533f690 Mon Sep 17 00:00:00 2001 From: Richard Wu Date: Wed, 21 Nov 2018 17:42:29 -0500 Subject: [PATCH 04/14] Use list comprehension over map-list. --- domain/domain.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/domain/domain.py b/domain/domain.py index 8d0d144b9..17fbe9380 100644 --- a/domain/domain.py +++ b/domain/domain.py @@ -217,7 +217,7 @@ def generate_domain(self): # 3) the current value (best predicted value) for attr in self.active_attributes: init_values, current_value, dom = self.get_domain_cell(attr, row) - init_values_idx = list(map(dom.index, init_values)) + init_values_idx = [dom.index(val) for val in init_values] current_value_idx = dom.index(current_value) cid = self.ds.get_cell_id(tid, attr) fixed = 0 From 58574f189f6e859e71c15fe0db673c81480c435a Mon Sep 17 00:00:00 2001 From: Richard Wu Date: Wed, 21 Nov 2018 20:34:14 -0500 Subject: [PATCH 05/14] Cleaned up some private functions and accesses to aux_tables. --- dataset/dataset.py | 56 ++++++++++++++++++++----------- detect/detect.py | 2 +- domain/domain.py | 4 +-- holoclean.py | 6 ++-- repair/featurize/occurattrfeat.py | 2 +- repair/featurize/occurfeat.py | 2 +- 6 files changed, 44 insertions(+), 28 deletions(-) diff --git a/dataset/dataset.py b/dataset/dataset.py index e7475a70a..d4fb1607e 100644 --- a/dataset/dataset.py +++ b/dataset/dataset.py @@ -49,9 +49,7 @@ def __init__(self, name, env): self.raw_data = None self.repaired_data = None self.constraints = None - self.aux_table = {} - for tab in AuxTables: - self.aux_table[tab] = None + self.aux_tables = {} # start dbengine self.engine = DBengine( env['db_user'], @@ -143,6 +141,17 @@ def load_data(self, name, fpath, na_values=None, entity_col=None, src_col=None): def set_constraints(self, constraints): self.constraints = constraints + def get_aux_table(self, aux_table): + """ + get_aux_table returns the Table associated with :param aux_table:. + + :param aux_table: (AuxTables(Enum)) auxiliary table to check + """ + if aux_table not in self.aux_tables: + raise Exception("{} auxiliary table has not been generated".format(aux_table)) + return self.aux_tables[aux_table] + + def generate_aux_table(self, aux_table, df, store=False, index_attrs=False): """ generate_aux_table writes/overwrites the auxiliary table specified by @@ -160,13 +169,13 @@ def generate_aux_table(self, aux_table, df, store=False, index_attrs=False): also creates indexes on Postgres table. """ try: - self.aux_table[aux_table] = Table(aux_table.name, Source.DF, df=df) + self.aux_tables[aux_table] = Table(aux_table.name, Source.DF, df=df) if store: - self.aux_table[aux_table].store_to_db(self.engine.engine) + self.aux_tables[aux_table].store_to_db(self.engine.engine) if index_attrs: - self.aux_table[aux_table].create_df_index(index_attrs) + self.aux_tables[aux_table].create_df_index(index_attrs) if store and index_attrs: - self.aux_table[aux_table].create_db_index(self.engine, index_attrs) + self.aux_tables[aux_table].create_db_index(self.engine, index_attrs) except Exception: logging.error('generating aux_table %s', aux_table.name) raise @@ -177,10 +186,10 @@ def generate_aux_table_sql(self, aux_table, query, index_attrs=False): :param query: (str) SQL query whose result is used for generating the auxiliary table. """ try: - self.aux_table[aux_table] = Table(aux_table.name, Source.SQL, table_query=query, db_engine=self.engine) + self.aux_tables[aux_table] = Table(aux_table.name, Source.SQL, table_query=query, db_engine=self.engine) if index_attrs: - self.aux_table[aux_table].create_df_index(index_attrs) - self.aux_table[aux_table].create_db_index(self.engine, index_attrs) + self.aux_tables[aux_table].create_df_index(index_attrs) + self.aux_tables[aux_table].create_db_index(self.engine, index_attrs) except Exception: logging.error('generating aux_table %s', aux_table.name) raise @@ -225,12 +234,12 @@ def get_statistics(self): : frequency (# of entities) where attr1: val1 AND attr2: val2 """ if not self.stats_ready: - self.collect_stats() + self._collect_stats() stats = (self.total_tuples, self.single_attr_stats, self.pair_attr_stats) self.stats_ready = True return stats - def collect_stats(self): + def _collect_stats(self): """ collect_stats memoizes: 1. self.single_attr_stats ({ attribute -> { value -> count } }) @@ -246,15 +255,15 @@ def collect_stats(self): self.total_tuples = self.get_raw_data().shape[0] # Single attribute-value frequency for attr in self.get_attributes(): - self.single_attr_stats[attr] = self.get_stats_single(attr) + self.single_attr_stats[attr] = self._get_stats_single(attr) # Co-occurence frequency for cond_attr in self.get_attributes(): self.pair_attr_stats[cond_attr] = {} for trg_attr in self.get_attributes(): if trg_attr != cond_attr: - self.pair_attr_stats[cond_attr][trg_attr] = self.get_stats_pair(cond_attr,trg_attr) + self.pair_attr_stats[cond_attr][trg_attr] = self._get_stats_pair(cond_attr,trg_attr) - def get_stats_single(self, attr): + def _get_stats_single(self, attr): """ Returns a dictionary where the keys possible values for :param attr: and the values contain the frequency count of that value for this attribute. @@ -263,7 +272,11 @@ def get_stats_single(self, attr): # unicode strings from Postgres return self.get_raw_data()[[attr]].groupby([attr]).size().to_dict() +<<<<<<< HEAD def get_stats_pair(self, first_attr, second_attr): +======= + def _get_stats_pair(self, cond_attr, trg_attr): +>>>>>>> Cleaned up some private functions and accesses to aux_tables. """ Returns a dictionary {first_val -> {second_val -> count } } where: : all possible values for first_attr @@ -284,7 +297,7 @@ def get_domain_info(self): classes = int(res[0][1]) return total_vars, classes - def get_inferred_values(self): + def generate_inferred_values(self): tic = time.clock() query = """ SELECT t1._tid_, @@ -302,17 +315,22 @@ def get_inferred_values(self): """.format(cell_domain=AuxTables.cell_domain.name, inf_values_idx=AuxTables.inf_values_idx.name) self.generate_aux_table_sql(AuxTables.inf_values_dom, query, index_attrs=['_tid_']) - self.aux_table[AuxTables.inf_values_dom].create_db_index(self.engine, ['attribute']) + self.get_aux_table(AuxTables.inf_values_dom).create_db_index(self.engine, ['attribute']) status = "DONE collecting the inferred values." toc = time.clock() total_time = toc - tic return status, total_time - def get_repaired_dataset(self): + def generate_repaired_dataset(self): tic = time.clock() init_records = self.raw_data.df.sort_values(['_tid_']).to_records(index=False) +<<<<<<< HEAD t = self.aux_table[AuxTables.inf_values_dom] repaired_vals = _dictify(t.df.reset_index()) +======= + t = self.aux_tables[AuxTables.inf_values_dom] + repaired_vals = dictify(t.df.reset_index()) +>>>>>>> Cleaned up some private functions and accesses to aux_tables. for tid in repaired_vals: for attr in repaired_vals[tid]: init_records[tid][attr] = repaired_vals[tid][attr] @@ -324,5 +342,3 @@ def get_repaired_dataset(self): toc = time.clock() total_time = toc - tic return status, total_time - - diff --git a/detect/detect.py b/detect/detect.py index 25a47bb5d..7ef85fd90 100644 --- a/detect/detect.py +++ b/detect/detect.py @@ -34,5 +34,5 @@ def store_detected_errors(self, errors_df): if errors_df.empty: raise Exception("ERROR: Detected errors dataframe is empty.") self.ds.generate_aux_table(AuxTables.dk_cells, errors_df, store=True) - self.ds.aux_table[AuxTables.dk_cells].create_db_index(self.ds.engine, ['_cid_']) + self.ds.get_aux_table(AuxTables.dk_cells).create_db_index(self.ds.engine, ['_cid_']) diff --git a/domain/domain.py b/domain/domain.py index 17fbe9380..c0f29cb46 100644 --- a/domain/domain.py +++ b/domain/domain.py @@ -82,8 +82,8 @@ def store_domains(self, domain): raise Exception("ERROR: Generated domain is empty.") self.ds.generate_aux_table(AuxTables.cell_domain, domain, store=True, index_attrs=['_vid_']) - self.ds.aux_table[AuxTables.cell_domain].create_db_index(self.ds.engine, ['_tid_']) - self.ds.aux_table[AuxTables.cell_domain].create_db_index(self.ds.engine, ['_cid_']) + self.ds.get_aux_table(AuxTables.cell_domain).create_db_index(self.ds.engine, ['_tid_']) + self.ds.get_aux_table(AuxTables.cell_domain).create_db_index(self.ds.engine, ['_cid_']) query = """ SELECT _vid_, diff --git a/holoclean.py b/holoclean.py index eb5760e4f..28d3f3239 100644 --- a/holoclean.py +++ b/holoclean.py @@ -251,16 +251,16 @@ def repair_errors(self, featurizers): status, infer_time = self.repair_engine.infer_repairs() logging.info(status) logging.debug('Time to infer correct cell values: %.2f secs'%infer_time) - status, time = self.ds.get_inferred_values() + status, time = self.ds.generate_inferred_values() logging.info(status) logging.debug('Time to collect inferred values: %.2f secs' % time) - status, time = self.ds.get_repaired_dataset() + status, time = self.ds.generate_repaired_dataset() logging.info(status) logging.debug('Time to store repaired dataset: %.2f secs' % time) if self.env['print_fw']: status, time = self.repair_engine.get_featurizer_weights() logging.info(status) - logging.debug('Time to store featurizer weights: %.2f secs' % time) + logging.debug('Time to retrieve featurizer weights: %.2f secs' % time) return status def evaluate(self, fpath, tid_col, attr_col, val_col, na_values=None): diff --git a/repair/featurize/occurattrfeat.py b/repair/featurize/occurattrfeat.py index aaf92b946..47042e1a1 100644 --- a/repair/featurize/occurattrfeat.py +++ b/repair/featurize/occurattrfeat.py @@ -30,7 +30,7 @@ def create_tensor(self): # Iterate over tuples in domain tensors = [] # Set tuple_id index on raw_data - t = self.ds.aux_table[AuxTables.cell_domain] + t = self.ds.get_aux_table(AuxTables.cell_domain) sorted_domain = t.df.reset_index().sort_values(by=['_vid_'])[['_tid_','attribute','_vid_','domain']] records = sorted_domain.to_records() for row in tqdm(list(records)): diff --git a/repair/featurize/occurfeat.py b/repair/featurize/occurfeat.py index d2e038acc..2956c1030 100644 --- a/repair/featurize/occurfeat.py +++ b/repair/featurize/occurfeat.py @@ -49,7 +49,7 @@ def create_tensor(self): # Iterate over tuples in domain tensors = [] # Set tuple_id index on raw_data - t = self.ds.aux_table[AuxTables.cell_domain] + t = self.ds.get_aux_table(AuxTables.cell_domain) sorted_domain = t.df.reset_index().sort_values(by=['_vid_'])[['_tid_','attribute','_vid_','domain']] records = sorted_domain.to_records() for row in tqdm(list(records)): From f34ba7cf1a64032d40c7369de22b85c85faf1fa3 Mon Sep 17 00:00:00 2001 From: Richard Wu Date: Wed, 21 Nov 2018 21:40:13 -0500 Subject: [PATCH 06/14] Added method to update current values with inferred values and EM iterations for repair. --- dataset/dataset.py | 20 +++++++++ domain/domain.py | 2 +- evaluate/eval.py | 8 +++- holoclean.py | 76 ++++++++++++++++++++++++---------- tests/test_holoclean_repair.py | 9 ++-- 5 files changed, 85 insertions(+), 30 deletions(-) diff --git a/dataset/dataset.py b/dataset/dataset.py index d4fb1607e..d2a74cbe0 100644 --- a/dataset/dataset.py +++ b/dataset/dataset.py @@ -342,3 +342,23 @@ def generate_repaired_dataset(self): toc = time.clock() total_time = toc - tic return status, total_time + + def update_current_values(self): + """ + update_current_values takes the inferred values from inf_values_dom + (auxiliary table) and updates them in the current_value column in + cell_domain (auxiliary table). + """ + df_inferred = self.get_aux_table(AuxTables.inf_values_dom).df + df_cell_domain = self.get_aux_table(AuxTables.cell_domain).df + + # Update current_value column with rv_values for inferred TIDs + df_updated = df_cell_domain.reset_index().merge(df_inferred, on=['_tid_', 'attribute'], how='left') + update_filter = df_updated['rv_value'].notnull() + df_updated.loc[update_filter, 'current_value'] = df_updated.loc[update_filter, 'rv_value'] + df_updated.drop('rv_value', axis=1, inplace=True) + + self.generate_aux_table(AuxTables.cell_domain, df_updated, store=True, index_attrs=['_vid_']) + # self.ds.get_aux_table(AuxTables.cell_domain).create_db_index(self.ds.engine, ['_tid_']) + # self.ds.get_aux_table(AuxTables.cell_domain).create_db_index(self.ds.engine, ['_cid_']) + logging.info('DONE updating current values with inferred values') diff --git a/domain/domain.py b/domain/domain.py index c0f29cb46..3b96371b3 100644 --- a/domain/domain.py +++ b/domain/domain.py @@ -66,7 +66,7 @@ def find_correlations(self): def store_domains(self, domain): """ - store_domains stores the 'domain' DataFrame as the 'cell_domain' + store_domains stores the :param domain: DataFrame as the 'cell_domain' auxiliary table as well as generates the 'pos_values' auxiliary table, a long-format of the domain values, in Postgres. diff --git a/evaluate/eval.py b/evaluate/eval.py index f9daf7e92..3003df777 100644 --- a/evaluate/eval.py +++ b/evaluate/eval.py @@ -92,6 +92,8 @@ def eval_report(self): return report, report_time, report_list def compute_total_repairs(self): + # TODO(richardwu): how do we define a "repair" if we have multiple + # init values? query = """ SELECT count(*) @@ -104,7 +106,7 @@ def compute_total_repairs(self): WHERE t1._tid_ = t2._tid_ AND t1.attribute = t2.attribute - AND t1.current_value != t2.rv_value + AND t1.init_values != t2.rv_value ) AS t """.format(cell_domain=AuxTables.cell_domain.name, inf_values_dom=AuxTables.inf_values_dom.name) @@ -112,6 +114,8 @@ def compute_total_repairs(self): self.total_repairs = float(res[0][0]) def compute_total_repairs_grdt(self): + # TODO(richardwu): how do we define a "repair" if we have multiple + # init values? query = """ SELECT count(*) @@ -125,7 +129,7 @@ def compute_total_repairs_grdt(self): WHERE t1._tid_ = t2._tid_ AND t1.attribute = t2.attribute - AND t1.current_value != t2.rv_value + AND t1.init_values != t2.rv_value AND t1._tid_ = t3._tid_ AND t1.attribute = t3._attribute_ ) AS t diff --git a/holoclean.py b/holoclean.py index 28d3f3239..07397d0cd 100644 --- a/holoclean.py +++ b/holoclean.py @@ -238,30 +238,60 @@ def setup_domain(self): logging.info(status) logging.debug('Time to setup the domain: %.2f secs'%domain_time) - def repair_errors(self, featurizers): - status, feat_time = self.repair_engine.setup_featurized_ds(featurizers) - logging.info(status) - logging.debug('Time to featurize data: %.2f secs'%feat_time) - status, setup_time = self.repair_engine.setup_repair_model() - logging.info(status) - logging.debug('Time to setup repair model: %.2f secs' % feat_time) - status, fit_time = self.repair_engine.fit_repair_model() - logging.info(status) - logging.debug('Time to fit repair model: %.2f secs'%fit_time) - status, infer_time = self.repair_engine.infer_repairs() - logging.info(status) - logging.debug('Time to infer correct cell values: %.2f secs'%infer_time) - status, time = self.ds.generate_inferred_values() - logging.info(status) - logging.debug('Time to collect inferred values: %.2f secs' % time) - status, time = self.ds.generate_repaired_dataset() - logging.info(status) - logging.debug('Time to store repaired dataset: %.2f secs' % time) - if self.env['print_fw']: - status, time = self.repair_engine.get_featurizer_weights() + def repair_errors(self, featurizers, em_iterations=1, em_iter_func=None): + """ + repair_errors attempts to repair detected error cells with the given + featurizers. + + :param featurizers: (list[Featurizer]) list of featurizers to use on the dataset. + :param em_iterations: (int) number of EM iterations to perform. + :param em_iter_func: (function w/ no parameters) function to invoke + at the end of every EM iteration. + + :returns: list of featurizer weights summary, one per EM iteration. + + """ + all_weights = [] + for em_iter in range(1, em_iterations+1): + logging.info('performing EM iteration: %d', em_iter) + # Setup featurizers + status, feat_time = self.repair_engine.setup_featurized_ds(featurizers) + logging.info(status) + logging.debug('Time to featurize data: %.2f secs'%feat_time) + # Initialize steps for repair process + status, setup_time = self.repair_engine.setup_repair_model() + logging.info(status) + logging.debug('Time to setup repair model: %.2f secs' % feat_time) + # Repair training using clean cells + status, fit_time = self.repair_engine.fit_repair_model() logging.info(status) - logging.debug('Time to retrieve featurizer weights: %.2f secs' % time) - return status + logging.debug('Time to fit repair model: %.2f secs'%fit_time) + # Do actual inference on DK cells + status, infer_time = self.repair_engine.infer_repairs() + logging.info(status) + logging.debug('Time to infer correct cell values: %.2f secs'%infer_time) + # Convert probabilities to predictions (i.e. argmax) + status, time = self.ds.generate_inferred_values() + logging.info(status) + logging.debug('Time to collect inferred values: %.2f secs' % time) + # Convert long format of inferred values to wide format + status, time = self.ds.generate_repaired_dataset() + logging.info(status) + logging.debug('Time to store repaired dataset: %.2f secs' % time) + # Log featurizer weights + weights, time = self.repair_engine.get_featurizer_weights() + all_weights.append(weights) + if self.env['print_fw']: + logging.info(weights) + logging.debug('Time to retrieve featurizer weights: %.2f secs' % time) + # Update current values with inferred values + self.ds.update_current_values() + + # Call em_iter_func if provided at the end of every EM iteration + if em_iter_func is not None: + em_iter_func() + logging.info('DONE EM iteration: %d', em_iter) + return all_weights def evaluate(self, fpath, tid_col, attr_col, val_col, na_values=None): """ diff --git a/tests/test_holoclean_repair.py b/tests/test_holoclean_repair.py index 5bdef72de..390b30484 100644 --- a/tests/test_holoclean_repair.py +++ b/tests/test_holoclean_repair.py @@ -14,7 +14,7 @@ class TestHolocleanRepair(unittest.TestCase): def test_hospital(self): # 1. Setup a HoloClean session. - hc = holoclean.HoloClean(pruning_topk=0.1, epochs=30, weight_decay=0.01, threads=20, batch_size=1, verbose=True, timeout=3*60000).session + hc = holoclean.HoloClean(pruning_topk=0.1, epochs=10, weight_decay=0.01, threads=20, batch_size=1, verbose=True, timeout=3*60000).session # 2. Load training data and denial constraints. hc.load_data('hospital', '../testdata/hospital.csv') @@ -28,10 +28,11 @@ def test_hospital(self): # 4. Repair errors utilizing the defined features. hc.setup_domain() featurizers = [CurrentAttrFeaturizer(), CurrentSimFeaturizer(), FreqFeaturizer(), OccurFeaturizer(), LangModelFeat(), ConstraintFeat()] - hc.repair_errors(featurizers) - # 5. Evaluate the correctness of the results. - hc.evaluate('../testdata/hospital_clean.csv', 'tid', 'attribute', 'correct_val') + # 5. Repair and evaluate the correctness of the results. + eval_func = lambda: hc.evaluate('../testdata/hospital_clean.csv', 'tid', 'attribute', 'correct_val') + hc.repair_errors(featurizers, em_iterations=2, em_iter_func=eval_func) + if __name__ == '__main__': unitttest.main() From d0fc0da27630fc35f83715357d5379be3e11639e Mon Sep 17 00:00:00 2001 From: Richard Wu Date: Thu, 22 Nov 2018 12:39:23 -0500 Subject: [PATCH 07/14] Re-compute single and co-occur stats after every EM iteration. --- dataset/dataset.py | 61 +++++++++++++-- evaluate/eval.py | 137 +++++++++++++++++++++------------ holoclean.py | 2 + repair/featurize/occurfeat.py | 36 +++++---- tests/test_holoclean_repair.py | 2 +- 5 files changed, 166 insertions(+), 72 deletions(-) diff --git a/dataset/dataset.py b/dataset/dataset.py index d2a74cbe0..0d4001165 100644 --- a/dataset/dataset.py +++ b/dataset/dataset.py @@ -141,13 +141,21 @@ def load_data(self, name, fpath, na_values=None, entity_col=None, src_col=None): def set_constraints(self, constraints): self.constraints = constraints + def aux_table_exists(self, aux_table): + """ + get_aux_table returns True if :param aux_table: has been generated. + + :param aux_table: (AuxTables(Enum)) auxiliary table to check + """ + return aux_table in self.aux_tables + def get_aux_table(self, aux_table): """ get_aux_table returns the Table associated with :param aux_table:. - :param aux_table: (AuxTables(Enum)) auxiliary table to check + :param aux_table: (AuxTables(Enum)) auxiliary table to retrieve """ - if aux_table not in self.aux_tables: + if not self.aux_table_exists(aux_table): raise Exception("{} auxiliary table has not been generated".format(aux_table)) return self.aux_tables[aux_table] @@ -234,14 +242,14 @@ def get_statistics(self): : frequency (# of entities) where attr1: val1 AND attr2: val2 """ if not self.stats_ready: - self._collect_stats() + self.collect_stats() stats = (self.total_tuples, self.single_attr_stats, self.pair_attr_stats) self.stats_ready = True return stats - def _collect_stats(self): + def collect_stats(self): """ - collect_stats memoizes: + collect_stats calculates and memoizes: (based on current statistics) 1. self.single_attr_stats ({ attribute -> { value -> count } }) the frequency (# of entities) of a given attribute-value 2. self.pair_attr_stats ({ attr1 -> { attr2 -> {val1 -> {val2 -> count } } } }) @@ -252,7 +260,7 @@ def _collect_stats(self): Also known as co-occurrence count. """ - self.total_tuples = self.get_raw_data().shape[0] + self.total_tuples = self.get_raw_data()['_tid_'].nunique() # Single attribute-value frequency for attr in self.get_attributes(): self.single_attr_stats[attr] = self._get_stats_single(attr) @@ -268,9 +276,27 @@ def _get_stats_single(self, attr): Returns a dictionary where the keys possible values for :param attr: and the values contain the frequency count of that value for this attribute. """ +<<<<<<< HEAD # need to decode values into unicode strings since we do lookups via # unicode strings from Postgres return self.get_raw_data()[[attr]].groupby([attr]).size().to_dict() +======= + + # If cell_domain has not been initialized yet, retrieve statistics + # from raw data (this happens when the domain is just being setup) + if not self.aux_table_exists(AuxTables.cell_domain): + return self.get_raw_data()[[attr]].groupby([attr]).size() + + # Retrieve statistics on current value from cell_domain + + df_domain = self.get_aux_table(AuxTables.cell_domain).df + df_count = df_domain.loc[df_domain['attribute'] == attr, 'current_value'].value_counts() + # We do not store attributes with only NULL values in cell_domain: + # we require _nan_ in our single stats however + if df_count.empty: + return pd.Series(self.total_tuples, index=['_nan_']) + return df_count +>>>>>>> Re-compute single and co-occur stats after every EM iteration. <<<<<<< HEAD def get_stats_pair(self, first_attr, second_attr): @@ -283,8 +309,31 @@ def _get_stats_pair(self, cond_attr, trg_attr): : all values for second_attr that appeared at least once with : frequency (# of entities) where first_attr: AND second_attr: """ +<<<<<<< HEAD tmp_df = self.get_raw_data()[[first_attr,second_attr]].groupby([first_attr,second_attr]).size().reset_index(name="count") return _dictify(tmp_df) +======= + # If cell_domain has not been initialized yet, retrieve statistics + # from raw data (this happens when the domain is just being setup) + if not self.aux_table_exists(AuxTables.cell_domain): + return self.get_raw_data()[[cond_attr,trg_attr]].groupby([cond_attr,trg_attr]).size().reset_index(name="count") + + # Retrieve pairwise statistics on current value from cell_domain + + df_domain = self.get_aux_table(AuxTables.cell_domain).df + # Filter cell_domain for only the attributes we care about + df_domain = df_domain[df_domain['attribute'].isin([cond_attr, trg_attr])] + # Convert to wide form so we have our :param cond_attr: + # and :trg_attr: as columns along with the _tid_ column + df_domain = df_domain[['_tid_', 'attribute', 'current_value']].pivot(index='_tid_', columns='attribute', values='current_value') + # We do not store cells for attributes consisting of only NULL values in cell_domain. + # We require this for pair stats though. + if cond_attr not in df_domain.columns: + df_domain[cond_attr] = '_nan_' + if trg_attr not in df_domain.columns: + df_domain[trg_attr] = '_nan_' + return df_domain.groupby([cond_attr, trg_attr]).size().reset_index(name="count") +>>>>>>> Re-compute single and co-occur stats after every EM iteration. def get_domain_info(self): """ diff --git a/evaluate/eval.py b/evaluate/eval.py index 3003df777..c142c0e68 100644 --- a/evaluate/eval.py +++ b/evaluate/eval.py @@ -7,11 +7,18 @@ from dataset import AuxTables from dataset.table import Table, Source -errors_template = Template('SELECT count(*) '\ - 'FROM $init_table as t1, $grdt_table as t2 '\ - 'WHERE t1._tid_ = t2._tid_ '\ - 'AND t2._attribute_ = \'$attr\' '\ - 'AND t1.\"$attr\" != t2._value_') +errors_template = Template(""" +SELECT + count(*) +FROM + $raw_table as t1 +INNER JOIN + $clean_table as t2 +ON + t1._tid_ = t2._tid_ + AND t2._attribute_ = '$attr' + AND t1."$attr" != t2._value_ +""") """ The 'errors' aliased subquery returns the (_tid_, _attribute_, _value_) @@ -23,15 +30,30 @@ We then count the number of cells that we repaired to the correct ground truth value. """ -correct_repairs_template = Template('SELECT COUNT(*) FROM'\ - '(SELECT t2._tid_, t2._attribute_, t2._value_ '\ - 'FROM $init_table as t1, $grdt_table as t2 '\ - 'WHERE t1._tid_ = t2._tid_ '\ - 'AND t2._attribute_ = \'$attr\' '\ - 'AND t1.\"$attr\" != t2._value_ ) as errors, $inf_dom as repairs '\ - 'WHERE errors._tid_ = repairs._tid_ '\ - 'AND errors._attribute_ = repairs.attribute '\ - 'AND errors._value_ = repairs.rv_value') +correct_repairs_template = Template(""" +SELECT + count(*) +FROM ( + SELECT + t2._tid_, + t2._attribute_, + t2._value_ + FROM + $raw_table AS t1 + INNER JOIN + $clean_table AS t2 + ON + t1._tid_ = t2._tid_ + AND t2._attribute_ = '$attr' + AND t1."$attr" != t2._value_ +) AS errors +INNER JOIN + $inf_dom AS repairs +ON + errors._tid_ = repairs._tid_ + AND errors._attribute_ = repairs.attribute + AND errors._value_ = repairs.rv_value +""") class EvalEngine: @@ -64,7 +86,7 @@ def load_data(self, name, fpath, tid_col, attr_col, val_col, na_values=None): def evaluate_repairs(self): self.compute_total_repairs() - self.compute_total_repairs_grdt() + self.compute_total_repairs_clean() self.compute_total_errors() self.compute_detected_errors() self.compute_correct_repairs() @@ -79,10 +101,10 @@ def eval_report(self): tic = time.clock() try: prec, rec, rep_recall, f1, rep_f1 = self.evaluate_repairs() - report = "Precision = %.2f, Recall = %.2f, Repairing Recall = %.2f, F1 = %.2f, Repairing F1 = %.2f, Detected Errors = %d, Total Errors = %d, Correct Repairs = %d, Total Repairs = %d, Total Repairs (Grdth present) = %d" % ( - prec, rec, rep_recall, f1, rep_f1, self.detected_errors, self.total_errors, self.correct_repairs, self.total_repairs, self.total_repairs_grdt) + report = "Precision = %.2f, Recall = %.2f, Repairing Recall = %.2f, F1 = %.2f, Repairing F1 = %.2f, Detected Errors = %d, Total Errors = %d, Correct Repairs = %d, Total Repairs = %d, Total Repairs (clean data) = %d" % ( + prec, rec, rep_recall, f1, rep_f1, self.detected_errors, self.total_errors, self.correct_repairs, self.total_repairs, self.total_repairs_clean) report_list = [prec, rec, rep_recall, f1, rep_f1, self.detected_errors, self.total_errors, - self.correct_repairs, self.total_repairs, self.total_repairs_grdt] + self.correct_repairs, self.total_repairs, self.total_repairs_clean] except Exception as e: logging.error("ERROR generating evaluation report %s" % e) raise @@ -92,58 +114,66 @@ def eval_report(self): return report, report_time, report_list def compute_total_repairs(self): + """ + compute_total_repairs memoizes into self.total_repairs + the number of cells where the initial value differs from the inferred + value (i.e. the number of repairs) for the entities in the TRAINING data. + """ # TODO(richardwu): how do we define a "repair" if we have multiple # init values? query = """ SELECT count(*) FROM - (SELECT - _vid_ - FROM - {cell_domain} AS t1, - {inf_values_dom} as t2 - WHERE - t1._tid_ = t2._tid_ - AND t1.attribute = t2.attribute - AND t1.init_values != t2.rv_value - ) AS t + {cell_domain} AS t1 + INNER JOIN + {inf_values_dom} as t2 + ON + t1._tid_ = t2._tid_ + AND t1.attribute = t2.attribute + WHERE + t1.init_values != t2.rv_value """.format(cell_domain=AuxTables.cell_domain.name, inf_values_dom=AuxTables.inf_values_dom.name) res = self.ds.engine.execute_query(query) self.total_repairs = float(res[0][0]) - def compute_total_repairs_grdt(self): + def compute_total_repairs_clean(self): + """ + compute_total_repairs_clean memoizes into self.total_repairs_clean + the number of cells where the initial value differs from the inferred + value (i.e. the number of repairs) for the entities in the TEST (clean) data. + """ # TODO(richardwu): how do we define a "repair" if we have multiple # init values? query = """ SELECT count(*) FROM - (SELECT - _vid_ - FROM - {cell_domain} AS t1, - {inf_values_dom} AS t2, - {clean_data} AS t3 - WHERE - t1._tid_ = t2._tid_ - AND t1.attribute = t2.attribute - AND t1.init_values != t2.rv_value - AND t1._tid_ = t3._tid_ - AND t1.attribute = t3._attribute_ - ) AS t + {cell_domain} AS t1 + INNER JOIN + {inf_values_dom} AS t2 + ON + t1._tid_ = t2._tid_ + AND t1.attribute = t2.attribute + INNER JOIN + {clean_data} AS t3 + ON + t1._tid_ = t3._tid_ + AND t1.attribute = t3._attribute_ + WHERE + t1.init_values != t2.rv_value """.format(cell_domain=AuxTables.cell_domain.name, inf_values_dom=AuxTables.inf_values_dom.name, clean_data=self.clean_data.name) res = self.ds.engine.execute_query(query) - self.total_repairs_grdt = float(res[0][0]) + self.total_repairs_clean = float(res[0][0]) def compute_total_errors(self): queries = [] total_errors = 0.0 for attr in self.ds.get_attributes(): - query = errors_template.substitute(init_table=self.ds.raw_data.name, grdt_table=self.clean_data.name, + query = errors_template.substitute(raw_table=self.ds.raw_data.name, clean_table=self.clean_data.name, attr=attr) queries.append(query) results = self.ds.engine.execute_queries(queries) @@ -151,11 +181,11 @@ def compute_total_errors(self): total_errors += float(res[0][0]) self.total_errors = total_errors - def compute_total_errors_grdt(self): + def compute_total_errors_clean(self): queries = [] total_errors = 0.0 for attr in self.ds.get_attributes(): - query = errors_template.substitute(init_table=self.ds.raw_data.name, grdt_table=self.clean_data.name, + query = errors_template.substitute(raw_table=self.ds.raw_data.name, clean_table=self.clean_data.name, attr=attr) queries.append(query) results = self.ds.engine.execute_queries(queries) @@ -164,6 +194,11 @@ def compute_total_errors_grdt(self): self.total_errors = total_errors def compute_detected_errors(self): + """ + compute_detected_errors + """ + # TODO(richardwu): how do we define a "repair" if we have multiple + # init values? query = """ SELECT count(*) @@ -178,7 +213,7 @@ def compute_detected_errors(self): t1._tid_ = t2._tid_ AND t1._cid_ = t3._cid_ AND t1.attribute = t2._attribute_ - AND t1.current_value != t2._value_ + AND t1.init_values != t2._value_ ) AS t """.format(cell_domain=AuxTables.cell_domain.name, clean_data=self.clean_data.name, @@ -190,8 +225,8 @@ def compute_correct_repairs(self): queries = [] correct_repairs = 0.0 for attr in self.ds.get_attributes(): - query = correct_repairs_template.substitute(init_table=self.ds.raw_data.name, grdt_table=self.clean_data.name, - attr=attr, inf_dom=AuxTables.inf_values_dom.name) + query = correct_repairs_template.substitute(raw_table=self.ds.raw_data.name, clean_table=self.clean_data.name, + attr=attr, inf_dom=AuxTables.inf_values_dom.name) queries.append(query) results = self.ds.engine.execute_queries(queries) for res in results: @@ -209,9 +244,9 @@ def compute_repairing_recall(self): return self.correct_repairs / self.detected_errors def compute_precision(self): - if self.total_repairs_grdt == 0: + if self.total_repairs_clean == 0: return 0 - return self.correct_repairs / self.total_repairs_grdt + return self.correct_repairs / self.total_repairs_clean def compute_f1(self): prec = self.compute_precision() diff --git a/holoclean.py b/holoclean.py index 07397d0cd..66f63a49d 100644 --- a/holoclean.py +++ b/holoclean.py @@ -286,6 +286,8 @@ def repair_errors(self, featurizers, em_iterations=1, em_iter_func=None): logging.debug('Time to retrieve featurizer weights: %.2f secs' % time) # Update current values with inferred values self.ds.update_current_values() + # Re-compute statistics with new current values + self.ds.collect_stats() # Call em_iter_func if provided at the end of every EM iteration if em_iter_func is not None: diff --git a/repair/featurize/occurfeat.py b/repair/featurize/occurfeat.py index 2956c1030..d68de2f87 100644 --- a/repair/featurize/occurfeat.py +++ b/repair/featurize/occurfeat.py @@ -13,7 +13,7 @@ def specific_setup(self): raise Exception('Featurizer %s is not properly setup.'%self.name) self.all_attrs = self.ds.get_attributes() self.attrs_number = len(self.ds.attr_to_idx) - self.raw_data_dict = {} + self.current_values_dict = {} self.total = None self.single_stats = None self.pair_stats = None @@ -28,8 +28,14 @@ def setup_stats(self): self.single_stats is a dict { attribute -> { value -> count } }. self.pair_stats is a dict { attr1 -> { attr2 -> { val1 -> {val2 -> co-occur frequency } } } }. """ - # raw_data_dict is a Dictionary mapping TID -> { attribute -> value } - self.raw_data_dict = self.ds.raw_data.df.set_index('_tid_').to_dict('index') + # current_values_dict is a Dictionary mapping TID -> { attribute -> current value } + self.current_values_dict = {} + + for (tid, attr, cur_val) in self.ds.get_aux_table(AuxTables.cell_domain).df[['_tid_', 'attribute', 'current_value']].to_records(index=False): + self.current_values_dict[tid] = self.current_values_dict.get(tid, {}) + self.current_values_dict[tid][attr] = cur_val + + # frequency and co-occurrence frequencies total, single_stats, pair_stats = self.ds.get_statistics() self.total = float(total) self.single_stats = single_stats @@ -53,18 +59,18 @@ def create_tensor(self): sorted_domain = t.df.reset_index().sort_values(by=['_vid_'])[['_tid_','attribute','_vid_','domain']] records = sorted_domain.to_records() for row in tqdm(list(records)): - #Get tuple from raw_dataset + #Get current values for this TID tid = row['_tid_'] - tuple = self.raw_data_dict[tid] - feat_tensor = self.gen_feat_tensor(row, tuple) + current_tuple = self.current_values_dict[tid] + feat_tensor = self.gen_feat_tensor(row, current_tuple) tensors.append(feat_tensor) combined = torch.cat(tensors) return combined - def gen_feat_tensor(self, row, tuple): + def gen_feat_tensor(self, row, current_tuple): """ For a given cell, we calculate the co-occurence probability of all domain values - for row['attribute'] with the row's co-value in every co-attributes. + for row['attribute'] with the row's current value in every co-attributes. That is for a domain value 'd' and current co-attribute value 'c' we have P(d | c) P(d, c) / P(c) @@ -73,9 +79,9 @@ def gen_feat_tensor(self, row, tuple): is the frequency of 'c'. """ # tensor is a (1 X domain size X # of attributes) pytorch.Tensor - # tensor[0][domain_idx][coattr_idx] contains the co-occurrence probability - # between the co-attribute (coattr_idx) and the domain values - # a possible domain value for this entity (row/tuple) + # tensor[0][domain_idx][rv_idx] contains the co-occurrence probability + # between the current attribute (row['attribute']) and the domain values + # a possible domain value for this entity tensor = torch.zeros(1, self.classes, self.attrs_number) rv_attr = row['attribute'] # Domain value --> index mapping @@ -87,15 +93,17 @@ def gen_feat_tensor(self, row, tuple): # domain value for our current row['attribute']. for attr in self.all_attrs: # Skip pairwise with current attribute or NULL value - if attr == rv_attr or pd.isnull(tuple[attr]): + # 'attr' may not be in 'current_tuple' since we do not store + # attributes with all NULL values in our current values + if attr == rv_attr or pd.isnull(current_tuple.get(attr, None)): continue attr_idx = self.ds.attr_to_idx[attr] - val = tuple[attr] + val = current_tuple[attr] attr_freq = float(self.single_stats[attr][val]) # Get topK values if val not in self.pair_stats[attr][rv_attr]: - if not pd.isnull(tuple[rv_attr]): + if not pd.isnull(current_tuple[rv_attr]): raise Exception('Something is wrong with the pairwise statistics. <{val}> should be present in dictionary.'.format(val)) else: # dict of { val -> co-occur count } diff --git a/tests/test_holoclean_repair.py b/tests/test_holoclean_repair.py index 390b30484..654c6b5a0 100644 --- a/tests/test_holoclean_repair.py +++ b/tests/test_holoclean_repair.py @@ -31,7 +31,7 @@ def test_hospital(self): # 5. Repair and evaluate the correctness of the results. eval_func = lambda: hc.evaluate('../testdata/hospital_clean.csv', 'tid', 'attribute', 'correct_val') - hc.repair_errors(featurizers, em_iterations=2, em_iter_func=eval_func) + hc.repair_errors(featurizers, em_iterations=5, em_iter_func=eval_func) if __name__ == '__main__': From 39088f42eba57c864b33065fad33e95f0f7ea940 Mon Sep 17 00:00:00 2001 From: Richard Wu Date: Fri, 23 Nov 2018 18:58:32 -0500 Subject: [PATCH 08/14] Add option to enable current stats updates. Updated code to allow multiple init values by specifying init values in raw data separated by |||. --- dataset/dataset.py | 144 +++++++++++++++------------ dataset/table.py | 2 +- detect/nulldetector.py | 3 +- detect/violationdetector.py | 34 +++++-- domain/domain.py | 115 +++++++++++++++++---- evaluate/eval.py | 10 +- examples/holoclean_repair_example.py | 8 +- examples/start_example.sh | 2 +- holoclean.py | 12 ++- repair/featurize/constraintfeat.py | 4 +- repair/featurize/featurize.py | 2 +- repair/featurize/featurizer.py | 3 +- repair/featurize/freqfeat.py | 10 +- repair/featurize/occurattrfeat.py | 4 +- repair/featurize/occurfeat.py | 62 ++++++------ tests/test_holoclean_repair.py | 3 +- 16 files changed, 272 insertions(+), 146 deletions(-) diff --git a/dataset/dataset.py b/dataset/dataset.py index 0d4001165..2c67cc6fb 100644 --- a/dataset/dataset.py +++ b/dataset/dataset.py @@ -1,9 +1,8 @@ -import logging -import time from enum import Enum +import logging import os -import time import pandas as pd +import time from .dbengine import DBengine from .table import Table, Source @@ -117,7 +116,6 @@ def load_data(self, name, fpath, na_values=None, entity_col=None, src_col=None): df.fillna('_nan_', inplace=True) # Call to store to database - self.raw_data.store_to_db(self.engine.engine) status = 'DONE Loading {fname}'.format(fname=os.path.basename(fpath)) @@ -242,14 +240,14 @@ def get_statistics(self): : frequency (# of entities) where attr1: val1 AND attr2: val2 """ if not self.stats_ready: - self.collect_stats() + self.collect_init_stats() stats = (self.total_tuples, self.single_attr_stats, self.pair_attr_stats) self.stats_ready = True return stats - def collect_stats(self): + def collect_init_stats(self): """ - collect_stats calculates and memoizes: (based on current statistics) + collect_init_stats calculates and memoizes: (based on RAW/INITIAL data) 1. self.single_attr_stats ({ attribute -> { value -> count } }) the frequency (# of entities) of a given attribute-value 2. self.pair_attr_stats ({ attr1 -> { attr2 -> {val1 -> {val2 -> count } } } }) @@ -263,77 +261,102 @@ def collect_stats(self): self.total_tuples = self.get_raw_data()['_tid_'].nunique() # Single attribute-value frequency for attr in self.get_attributes(): - self.single_attr_stats[attr] = self._get_stats_single(attr) + self.single_attr_stats[attr] = self._get_init_stats_single(attr) # Co-occurence frequency - for cond_attr in self.get_attributes(): - self.pair_attr_stats[cond_attr] = {} - for trg_attr in self.get_attributes(): - if trg_attr != cond_attr: - self.pair_attr_stats[cond_attr][trg_attr] = self._get_stats_pair(cond_attr,trg_attr) + for first_attr in self.get_attributes(): + self.pair_attr_stats[first_attr] = {} + for second_attr in self.get_attributes(): + if second_attr != first_attr: + self.pair_attr_stats[first_attr][second_attr] = self._get_init_stats_pair(first_attr,second_attr) - def _get_stats_single(self, attr): + def collect_current_stats(self, attr): """ - Returns a dictionary where the keys possible values for :param attr: and - the values contain the frequency count of that value for this attribute. - """ -<<<<<<< HEAD - # need to decode values into unicode strings since we do lookups via - # unicode strings from Postgres - return self.get_raw_data()[[attr]].groupby([attr]).size().to_dict() -======= + collect_current_stats calculates and memoizes frequency and co-occurence + statistics based on the CURRENT values/data. - # If cell_domain has not been initialized yet, retrieve statistics - # from raw data (this happens when the domain is just being setup) - if not self.aux_table_exists(AuxTables.cell_domain): - return self.get_raw_data()[[attr]].groupby([attr]).size() + See collect_init_stats for which member variables are memoized/overwritten. + Does NOT overwrite self.total_tuples. + """ + # Single attribute-value frequency + for attr in self.get_attributes(): + self.single_attr_stats[attr] = self._get_current_stats_single(attr) + # Co-occurence frequency + for first_attr in self.get_attributes(): + self.pair_attr_stats[first_attr] = {} + for second_attr in self.get_attributes(): + if second_attr != first_attr: + self.pair_attr_stats[first_attr][second_attr] = self._get_current_stats_pair(first_attr,second_attr) + def _get_init_stats_single(self, attr): + """ + _get_init_stats_single returns a dictionary where the keys possible + values for :param attr: and the values contain the frequency count in + the RAW/INITIAL data of that value for this attribute. + """ + # We need to iterate through this in a for loop instead of groupby & size + # since our values may be '|||' separated + freq_count = {} + for (vals,) in self.get_raw_data()[[attr]].itertuples(index=False): + for val in vals.split('|||'): + freq_count[val] = freq_count.get(val, 0) + 1 + return freq_count + + def _get_current_stats_single(self, attr): + """ + _get_current_stats_single a dictionary where the keys possible values + for :param attr: and the values contain the frequency count in the + CURRENT data of that value for this attribute. + """ # Retrieve statistics on current value from cell_domain - df_domain = self.get_aux_table(AuxTables.cell_domain).df df_count = df_domain.loc[df_domain['attribute'] == attr, 'current_value'].value_counts() # We do not store attributes with only NULL values in cell_domain: # we require _nan_ in our single stats however if df_count.empty: - return pd.Series(self.total_tuples, index=['_nan_']) - return df_count ->>>>>>> Re-compute single and co-occur stats after every EM iteration. + return {'_nan_': self.total_tuples} + return df_count.to_dict() -<<<<<<< HEAD - def get_stats_pair(self, first_attr, second_attr): -======= - def _get_stats_pair(self, cond_attr, trg_attr): ->>>>>>> Cleaned up some private functions and accesses to aux_tables. + def _get_init_stats_pair(self, first_attr, second_attr): """ - Returns a dictionary {first_val -> {second_val -> count } } where: + _get_init_stats_pair returns a dictionary {first_val -> {second_val -> + count } } where (based on RAW/INITIAL dataset): : all possible values for first_attr : all values for second_attr that appeared at least once with : frequency (# of entities) where first_attr: AND second_attr: """ -<<<<<<< HEAD - tmp_df = self.get_raw_data()[[first_attr,second_attr]].groupby([first_attr,second_attr]).size().reset_index(name="count") - return _dictify(tmp_df) -======= - # If cell_domain has not been initialized yet, retrieve statistics - # from raw data (this happens when the domain is just being setup) - if not self.aux_table_exists(AuxTables.cell_domain): - return self.get_raw_data()[[cond_attr,trg_attr]].groupby([cond_attr,trg_attr]).size().reset_index(name="count") + # We need to iterate through this in a for loop instead of groupby & size + # since our values may be '|||' separated + cooccur_freq_count = {} + for vals1, vals2 in self.get_raw_data()[[first_attr,second_attr]].itertuples(index=False): + for val1 in vals1.split('|||'): + cooccur_freq_count[val1] = cooccur_freq_count.get(val1, {}) + for val2 in vals2.split('|||'): + cooccur_freq_count[val1][val2] = cooccur_freq_count[val1].get(val2, 0) + 1 + return cooccur_freq_count + + def _get_current_stats_pair(self, first_attr, second_attr): + """ + _get_current_stats_pair returns a dictionary {first_val -> {second_val -> + count } } where (based on CURRENT dataset): + : all possible values for first_attr + : all values for second_attr that appeared at least once with + : frequency (# of entities) where first_attr: AND second_attr: + """ # Retrieve pairwise statistics on current value from cell_domain - df_domain = self.get_aux_table(AuxTables.cell_domain).df # Filter cell_domain for only the attributes we care about - df_domain = df_domain[df_domain['attribute'].isin([cond_attr, trg_attr])] - # Convert to wide form so we have our :param cond_attr: - # and :trg_attr: as columns along with the _tid_ column + df_domain = df_domain[df_domain['attribute'].isin([first_attr, second_attr])] + # Convert to wide form so we have our :param first_attr: + # and :second_attr: as columns along with the _tid_ column df_domain = df_domain[['_tid_', 'attribute', 'current_value']].pivot(index='_tid_', columns='attribute', values='current_value') # We do not store cells for attributes consisting of only NULL values in cell_domain. # We require this for pair stats though. - if cond_attr not in df_domain.columns: - df_domain[cond_attr] = '_nan_' - if trg_attr not in df_domain.columns: - df_domain[trg_attr] = '_nan_' - return df_domain.groupby([cond_attr, trg_attr]).size().reset_index(name="count") ->>>>>>> Re-compute single and co-occur stats after every EM iteration. + if first_attr not in df_domain.columns: + df_domain[first_attr] = '_nan_' + if second_attr not in df_domain.columns: + df_domain[second_attr] = '_nan_' + return _dictify(df_domain.groupby([first_attr, second_attr]).size().reset_index(name="count")) def get_domain_info(self): """ @@ -372,18 +395,13 @@ def generate_inferred_values(self): def generate_repaired_dataset(self): tic = time.clock() - init_records = self.raw_data.df.sort_values(['_tid_']).to_records(index=False) -<<<<<<< HEAD - t = self.aux_table[AuxTables.inf_values_dom] - repaired_vals = _dictify(t.df.reset_index()) -======= + records = self.raw_data.df.sort_values(['_tid_']).to_records(index=False) t = self.aux_tables[AuxTables.inf_values_dom] - repaired_vals = dictify(t.df.reset_index()) ->>>>>>> Cleaned up some private functions and accesses to aux_tables. + repaired_vals = _dictify(t.df.reset_index()) for tid in repaired_vals: for attr in repaired_vals[tid]: - init_records[tid][attr] = repaired_vals[tid][attr] - repaired_df = pd.DataFrame.from_records(init_records) + records[tid][attr] = repaired_vals[tid][attr] + repaired_df = pd.DataFrame.from_records(records) name = self.raw_data.name+'_repaired' self.repaired_data = Table(name, Source.DF, df=repaired_df) self.repaired_data.store_to_db(self.engine.engine) diff --git a/dataset/table.py b/dataset/table.py index 0a1a683e2..99ffab808 100644 --- a/dataset/table.py +++ b/dataset/table.py @@ -1,6 +1,6 @@ +from enum import Enum import os import pandas as pd -from enum import Enum class Source(Enum): FILE = 1 diff --git a/detect/nulldetector.py b/detect/nulldetector.py index 412196766..0d4108f11 100644 --- a/detect/nulldetector.py +++ b/detect/nulldetector.py @@ -22,7 +22,8 @@ def detect_noisy_cells(self): attributes = self.ds.get_attributes() errors = [] for attr in attributes: - tmp_df = self.df[self.df[attr].isnull()]['_tid_'].to_frame() + # self.df i.e. raw_data has all NULL values converted to '_nan_' + tmp_df = self.df[self.df[attr] == '_nan_']['_tid_'].to_frame() tmp_df.insert(1, "attribute", attr) errors.append(tmp_df) errors_df = pd.concat(errors, ignore_index=True) diff --git a/detect/violationdetector.py b/detect/violationdetector.py index 575472869..5d586e8d0 100644 --- a/detect/violationdetector.py +++ b/detect/violationdetector.py @@ -17,24 +17,33 @@ def setup(self, dataset, env): self.constraints = dataset.constraints def detect_noisy_cells(self): - # Convert Constraints to SQL queries + """ + detect_noisy_cells returns all cells that are involved in a DC violation. + + :return: pandas.DataFrame with two columns: + '_tid_': TID of tuple + 'attribute': attribute corresponding to cell involved in DC violation + """ + + # Convert Constraints to SQL queries + tbl = self.ds.raw_data.name queries = [] + # attributes involved in a DC violation (indexed by corresponding query) attrs = [] - for c_key in self.constraints: - c = self.constraints[c_key] - q = self.to_sql(tbl, c) + for constraint in self.constraints.values(): + # SQL query to query for TIDs involved in a DC violation for this constraint + q = self.to_sql(tbl, constraint) queries.append(q) - attrs.append(c.components) + attrs.append(constraint.components) # Execute Queries over the DBEngine of Dataset results = self.ds.engine.execute_queries(queries) # Generate final output errors = [] - for i in range(len(attrs)): - res = results[i] - attr_list = attrs[i] - tmp_df = self.gen_tid_attr_output(res, attr_list) + for attr_list, res in zip(attrs, results): + # DataFrame with TID and attribute pairs from DC violation queries + tmp_df = self._gen_tid_attr_output(res, attr_list) errors.append(tmp_df) errors_df = pd.concat(errors, ignore_index=True).drop_duplicates().reset_index(drop=True) return errors_df @@ -79,7 +88,12 @@ def gen_mult_query(self, tbl, c): query = mult_template.substitute(table=tbl, cond1=cond1, c='', cond2=cond2) return query - def gen_tid_attr_output(self, res, attr_list): + def _gen_tid_attr_output(self, res, attr_list): + """ + _gen_tid_attr_output creates a DataFrame containing the TIDs from + the DC violation query results in :param res: with the attributes + that were involved in the violation in :param attr_list:. + """ errors = [] for tuple in res: tid = int(tuple[0]) diff --git a/domain/domain.py b/domain/domain.py index 3b96371b3..405847b28 100644 --- a/domain/domain.py +++ b/domain/domain.py @@ -31,7 +31,6 @@ def __init__(self, env, dataset, cor_strength = 0.1, sampling_prob=0.3, max_samp self.max_sample = max_sample self.single_stats = {} self.pair_stats = {} - self.all_attrs = {} def setup(self): """ @@ -54,7 +53,9 @@ def find_correlations(self): the pairwise correlations between attributes (values are treated as discrete categories). """ - df = self.ds.get_raw_data()[self.ds.get_attributes()].copy() + # use expanded raw DataFrame to calculate correlations (since + # raw may contain '|||' separated values) + df = self._expand_raw_df()[self.ds.get_attributes()] # convert dataset to categories/factors for attr in df.columns: df[attr] = df[attr].astype('category').cat.codes @@ -64,6 +65,43 @@ def find_correlations(self): m_corr = df.corr() self.correlations = m_corr + def _expand_raw_df(self): + """ + _expand_raw_df returns an expanded version of the raw DataFrame + where every row with cells with multiple values (separated by '|||') + are expanded into multiple rows that is the cross-product of the + multi-valued cells. + + For example if a row contains + + attr1 | attr2 + A|||B|||C D|||E + + this would be expanded into + + attr1 | attr2 + A D + A E + B D + B E + C D + C E + """ + # Cells may contain values separated by '|||': we need to + # expand this into multiple rows + raw_df = self.ds.get_raw_data() + + tic = time.clock() + expanded_rows = [] + for tup in raw_df.itertuples(): + expanded_tup = [val.split('|||') if hasattr(val, 'split') else (val,) for val in tup ] + expanded_rows.extend([new_tup for new_tup in itertools.product(*expanded_tup)]) + toc = time.clock() + logging.debug("Time to expand raw data: %.2f secs", toc-tic) + expanded_df = pd.DataFrame(expanded_rows, columns=raw_df.index.names + list(raw_df.columns)) + expanded_df.set_index(raw_df.index.names, inplace=True) + return expanded_df + def store_domains(self, domain): """ store_domains stores the :param domain: DataFrame as the 'cell_domain' @@ -75,7 +113,7 @@ def store_domains(self, domain): _cid_: cell ID _vid_: random variable ID (1-1 with _cid_) attribute: name of attribute - rv_val: cell value + rv_val: domain value val_id: domain index of rv_val """ if domain.empty: @@ -204,9 +242,8 @@ def generate_domain(self): # Iterate over dataset rows cells = [] vid = 0 - records = self.ds.get_raw_data().to_records() - self.all_attrs = list(records.dtype.names) - for row in tqdm(list(records)): + raw_records = self.ds.get_raw_data().to_records() + for row in tqdm(raw_records): tid = row['_tid_'] app = [] @@ -260,6 +297,9 @@ def get_domain_cell(self, attr, row): This would produce [B,C,E] as domain values. + :param attr: (str) name of attribute to generate domain info for + :param row: (pandas.record) Pandas record (tuple) of the current TID's row + :return: (list of initial values, current value, list of domain values). """ @@ -269,17 +309,17 @@ def get_domain_cell(self, attr, row): # and take the top K co-occurrence values for 'attr' with the current # row's 'cond_attr' value. for cond_attr in correlated_attributes: - if cond_attr == attr or cond_attr == 'index' or cond_attr == '_tid_': + if cond_attr == attr: continue - cond_val = row[cond_attr] - if not pd.isnull(cond_val): - if not self.pair_stats[cond_attr][attr]: - break + # row[cond_attr] should always be a string (since it comes from self.raw_data) + for cond_val in row[cond_attr].split('|||'): s = self.pair_stats[cond_attr][attr] try: candidates = s[cond_val] domain.update(candidates) except KeyError as missing_val: + # KeyError is possible since we do not store stats for + # attributes with only NULL values if not pd.isnull(row[attr]): # error since co-occurrence must be at least 1 (since # the current row counts as one co-occurrence). @@ -289,19 +329,52 @@ def get_domain_cell(self, attr, row): # Remove _nan_ if added due to correlated attributes domain.discard('_nan_') - # Add initial value in domain - init_values = ['_nan_'] - if not pd.isnull(row[attr]): - # Assume value in raw dataset is given as ||| separate initial values - init_values = row[attr].split('|||') - domain.update(set(init_values)) - - # Take the first initial value as the current value - # TODO(richardwu): revisit how we should initialize 'current' - current_value = init_values[0] + init_values, current_value = self._init_and_current(attr, row) + domain.update(init_values) return init_values, current_value, list(domain) + def _init_and_current(self, attr, init_row): + """ + _init_and_current returns the initial values for :param attr: + and the current value: the initial value that has the highest + cumulative co-occurrence probability with the other initial values in + this row. + """ + # Assume value in raw dataset is given as ||| separate initial values + init_values = init_row[attr].split('|||') + + # Only one initial value: current is the initial value + if len(init_values) == 1: + return init_values, init_values[0] + + _, single_stats, pair_stats = self.ds.get_statistics() + attrs = self.ds.get_attributes() + + # Determine current value by computing co-occurrence probability + best_val = None + best_score = None + for init_val in init_values: + # Compute total sum of co-occur probabilities with all other + # initial values in this row, that is we calculate the sum of + # + # P(initial | other_init_val) = P(initial, other_init_val) / P(other_init_val) + # + # We subtract one for pair_stats since we do not want to include + # the co-occurrence of our current row (consider the extreme case + # where an errorneous initial value only occurs once: its co-occurrence + # probability will always be 1 but it does not tell us this value + # co-occurs most frequently with our other initial values). + cur_score = sum(float(pair_stats[attr][other_attr][init_val][other_val] - 1) / single_stats[attr][init_val] + for other_attr in attrs + if attr != other_attr + for other_val in init_row[other_attr].split('|||')) + # Keep the best initial value only + if best_score is None or cur_score > best_score: + best_val = init_val + best_score = cur_score + return init_values, best_val + def get_random_domain(self, attr, init_values): """ get_random_domain returns a random sample of at most size diff --git a/evaluate/eval.py b/evaluate/eval.py index c142c0e68..d59dd7c8c 100644 --- a/evaluate/eval.py +++ b/evaluate/eval.py @@ -119,8 +119,9 @@ def compute_total_repairs(self): the number of cells where the initial value differs from the inferred value (i.e. the number of repairs) for the entities in the TRAINING data. """ - # TODO(richardwu): how do we define a "repair" if we have multiple - # init values? + # This query works with init_values with multiple values ('|||' separated) + # since it is still considered a 'repair' if we convert multiple + # initial values into one value. query = """ SELECT count(*) @@ -144,8 +145,9 @@ def compute_total_repairs_clean(self): the number of cells where the initial value differs from the inferred value (i.e. the number of repairs) for the entities in the TEST (clean) data. """ - # TODO(richardwu): how do we define a "repair" if we have multiple - # init values? + # This query works with init_values with multiple values ('|||' separated) + # since it is still considered a 'repair' if we convert multiple + # initial values into one value. query = """ SELECT count(*) diff --git a/examples/holoclean_repair_example.py b/examples/holoclean_repair_example.py index 52df2bea4..b59ecdc6f 100644 --- a/examples/holoclean_repair_example.py +++ b/examples/holoclean_repair_example.py @@ -14,7 +14,7 @@ pruning_topk=0.1, epochs=30, weight_decay=0.01, - threads=20, + threads=4, batch_size=1, verbose=True, timeout=3*60000, @@ -40,8 +40,8 @@ LangModelFeat(), ConstraintFeat() ] -hc.repair_errors(featurizers) - # 5. Evaluate the correctness of the results. -hc.evaluate('../testdata/hospital_clean.csv', 'tid', 'attribute', 'correct_val') +em_iter_func = lambda: hc.evaluate('../testdata/hospital_clean.csv', 'tid', 'attribute', 'correct_val') +hc.repair_errors(featurizers, em_iterations=1, em_iter_func=em_iter_func) + diff --git a/examples/start_example.sh b/examples/start_example.sh index 5ee266ef4..c0cb2508e 100755 --- a/examples/start_example.sh +++ b/examples/start_example.sh @@ -3,6 +3,6 @@ source ../set_env.sh echo "Launching example" -python holoclean_repair_example.py +python holoclean_food.py diff --git a/holoclean.py b/holoclean.py index 66f63a49d..aadc355a9 100644 --- a/holoclean.py +++ b/holoclean.py @@ -112,7 +112,12 @@ {'default': False, 'dest': 'print_fw', 'action': 'store_true', - 'help': 'print the weights of featurizers'}) + 'help': 'print the weights of featurizers'}), + (tuple(['--currentstats']), + {'default': False, + 'dest': 'current_stats', + 'action': 'store_true', + 'help': 're-compute frequency and co-occur stats after every EM iteration'}), ] @@ -286,8 +291,9 @@ def repair_errors(self, featurizers, em_iterations=1, em_iter_func=None): logging.debug('Time to retrieve featurizer weights: %.2f secs' % time) # Update current values with inferred values self.ds.update_current_values() - # Re-compute statistics with new current values - self.ds.collect_stats() + if self.env['current_stats']: + # Re-compute statistics with new current values + self.ds.collect_current_stats() # Call em_iter_func if provided at the end of every EM iteration if em_iter_func is not None: diff --git a/repair/featurize/constraintfeat.py b/repair/featurize/constraintfeat.py index c44ac8cf3..053e539d5 100644 --- a/repair/featurize/constraintfeat.py +++ b/repair/featurize/constraintfeat.py @@ -70,11 +70,11 @@ def execute_queries(self,queries): def relax_unary_predicate(self, predicate): """ - relax_binary_predicate returns the attribute, operation, and + relax_unary_predicate returns the attribute, operation, and tuple attribute reference. :return: (attr, op, const), for example: - ("StateAvg", "<>", 't1."StateAvg"') + ("StateAvg", "<>", "StateAvg"') """ attr = predicate.components[0][1] op = predicate.operation diff --git a/repair/featurize/featurize.py b/repair/featurize/featurize.py index 98eedb168..69876b992 100644 --- a/repair/featurize/featurize.py +++ b/repair/featurize/featurize.py @@ -14,7 +14,7 @@ def __init__(self, dataset, env, featurizers): self.total_vars, self.classes = self.ds.get_domain_info() self.processes = self.env['threads'] for f in featurizers: - f.setup_featurizer(self.ds, self.total_vars, self.classes, self.processes) + f.setup_featurizer(self.env, self.ds, self.total_vars, self.classes, self.processes) tensors = [f.create_tensor() for f in featurizers] self.featurizer_info = [FeatInfo(featurizers[i].name, t.size()[2], featurizers[i].learnable, featurizers[i].init_weight) for i, t in enumerate(tensors)] tensor = torch.cat(tensors,2) diff --git a/repair/featurize/featurizer.py b/repair/featurize/featurizer.py index 70068dc83..9538cd985 100644 --- a/repair/featurize/featurizer.py +++ b/repair/featurize/featurizer.py @@ -11,7 +11,8 @@ def __init__(self, learnable=True, init_weight=1.0): self.learnable = learnable self.init_weight = init_weight - def setup_featurizer(self, dataset, total_vars, classes, processes=20): + def setup_featurizer(self, env, dataset, total_vars, classes, processes=20): + self.env = env self.ds = dataset self.total_vars = total_vars self.classes = classes diff --git a/repair/featurize/freqfeat.py b/repair/featurize/freqfeat.py index d1a9b6b8c..b29b0a221 100644 --- a/repair/featurize/freqfeat.py +++ b/repair/featurize/freqfeat.py @@ -19,7 +19,15 @@ def gen_feat_tensor(self, input, classes): attr_idx = self.ds.attr_to_idx[attribute] tensor = torch.zeros(1, classes, self.attrs_number) for idx, val in enumerate(domain): - prob = float(self.single_stats[attribute][val])/float(self.total) + freq = 0.0 + if self.env['current_stats']: + # In the case where we update statistics to current values after + # every EM iteration, the domain value may no longer appear amongst + # current values. + freq = self.single_stats[attribute].get(val, 0) + else: + freq = self.single_stats[attribute][val] + prob = float(freq)/float(self.total) tensor[0][idx][attr_idx] = prob return tensor diff --git a/repair/featurize/occurattrfeat.py b/repair/featurize/occurattrfeat.py index 47042e1a1..8702029a2 100644 --- a/repair/featurize/occurattrfeat.py +++ b/repair/featurize/occurattrfeat.py @@ -32,8 +32,8 @@ def create_tensor(self): # Set tuple_id index on raw_data t = self.ds.get_aux_table(AuxTables.cell_domain) sorted_domain = t.df.reset_index().sort_values(by=['_vid_'])[['_tid_','attribute','_vid_','domain']] - records = sorted_domain.to_records() - for row in tqdm(list(records)): + tuples = sorted_domain.itertuples() + for row in tqdm(list(tuples)): #Get tuple from raw_dataset tid = row['_tid_'] tuple = self.raw_data_dict[tid] diff --git a/repair/featurize/occurfeat.py b/repair/featurize/occurfeat.py index d68de2f87..a195fbd6a 100644 --- a/repair/featurize/occurfeat.py +++ b/repair/featurize/occurfeat.py @@ -31,7 +31,7 @@ def setup_stats(self): # current_values_dict is a Dictionary mapping TID -> { attribute -> current value } self.current_values_dict = {} - for (tid, attr, cur_val) in self.ds.get_aux_table(AuxTables.cell_domain).df[['_tid_', 'attribute', 'current_value']].to_records(index=False): + for (tid, attr, cur_val) in self.ds.get_aux_table(AuxTables.cell_domain).df[['_tid_', 'attribute', 'current_value']].itertuples(index=False): self.current_values_dict[tid] = self.current_values_dict.get(tid, {}) self.current_values_dict[tid][attr] = cur_val @@ -56,10 +56,9 @@ def create_tensor(self): tensors = [] # Set tuple_id index on raw_data t = self.ds.get_aux_table(AuxTables.cell_domain) - sorted_domain = t.df.reset_index().sort_values(by=['_vid_'])[['_tid_','attribute','_vid_','domain']] + sorted_domain = t.df.reset_index().sort_values(by=['_vid_'])[['_tid_','attribute','domain']] records = sorted_domain.to_records() - for row in tqdm(list(records)): - #Get current values for this TID + for row in tqdm(records): tid = row['_tid_'] current_tuple = self.current_values_dict[tid] feat_tensor = self.gen_feat_tensor(row, current_tuple) @@ -80,7 +79,7 @@ def gen_feat_tensor(self, row, current_tuple): """ # tensor is a (1 X domain size X # of attributes) pytorch.Tensor # tensor[0][domain_idx][rv_idx] contains the co-occurrence probability - # between the current attribute (row['attribute']) and the domain values + # between the current attribute (row.attribute) and the domain values # a possible domain value for this entity tensor = torch.zeros(1, self.classes, self.attrs_number) rv_attr = row['attribute'] @@ -90,33 +89,36 @@ def gen_feat_tensor(self, row, current_tuple): # Iterate through every attribute (and current value for that # attribute) and set the co-occurrence probability for every - # domain value for our current row['attribute']. - for attr in self.all_attrs: - # Skip pairwise with current attribute or NULL value - # 'attr' may not be in 'current_tuple' since we do not store - # attributes with all NULL values in our current values - if attr == rv_attr or pd.isnull(current_tuple.get(attr, None)): + # domain value for our current row.attribute. + for cur_attr in self.all_attrs: + # Skip pairwise with current attribute or NULL value. + # 'attr' may not be in 'current_tuple' since we do not + # store attributes with all NULL values in cell_domain. + if cur_attr == rv_attr or pd.isnull(current_tuple.get(cur_attr, None)): + continue + + cur_val = current_tuple[cur_attr] + if cur_val not in self.pair_stats[cur_attr][rv_attr]: + # cur_val may not be in pairwise stats if cur_attr contains + # only NULL values + if not pd.isnull(current_tuple[cur_attr]): + # Actual error if not null + raise Exception('Something is wrong with the pairwise statistics. <{cur_val}> should be present in dictionary.'.format(cur_val=cur_val)) continue - attr_idx = self.ds.attr_to_idx[attr] - val = current_tuple[attr] - attr_freq = float(self.single_stats[attr][val]) # Get topK values - if val not in self.pair_stats[attr][rv_attr]: - if not pd.isnull(current_tuple[rv_attr]): - raise Exception('Something is wrong with the pairwise statistics. <{val}> should be present in dictionary.'.format(val)) + all_vals = self.pair_stats[cur_attr][rv_attr][cur_val] + if len(all_vals) <= len(rv_domain_idx): + candidates = list(all_vals.keys()) else: - # dict of { val -> co-occur count } - all_vals = self.pair_stats[attr][rv_attr][val] - if len(all_vals) <= len(rv_domain_idx): - candidates = list(all_vals.keys()) - else: - candidates = domain - - # iterate through all possible domain values of row['attribute'] - for rv_val in candidates: - cooccur_freq = float(all_vals.get(rv_val,0.0)) - prob = cooccur_freq/attr_freq - if rv_val in rv_domain_idx: - tensor[0][rv_domain_idx[rv_val]][attr_idx] = prob + candidates = domain + + # iterate through all possible domain values of row.attribute + for rv_val in candidates: + cooccur_freq = float(all_vals.get(rv_val,0.0)) + cur_attr_freq = float(self.single_stats[cur_attr][cur_val]) + prob = cooccur_freq/cur_attr_freq + if rv_val in rv_domain_idx: + cur_attr_idx = self.ds.attr_to_idx[cur_attr] + tensor[0][rv_domain_idx[rv_val]][cur_attr_idx] = prob return tensor diff --git a/tests/test_holoclean_repair.py b/tests/test_holoclean_repair.py index 654c6b5a0..6163f6e7d 100644 --- a/tests/test_holoclean_repair.py +++ b/tests/test_holoclean_repair.py @@ -14,7 +14,8 @@ class TestHolocleanRepair(unittest.TestCase): def test_hospital(self): # 1. Setup a HoloClean session. - hc = holoclean.HoloClean(pruning_topk=0.1, epochs=10, weight_decay=0.01, threads=20, batch_size=1, verbose=True, timeout=3*60000).session + hc = holoclean.HoloClean(pruning_topk=0.1, epochs=10, weight_decay=0.01, + threads=4, batch_size=1, verbose=True, timeout=3*60000).session # 2. Load training data and denial constraints. hc.load_data('hospital', '../testdata/hospital.csv') From 77514eca2ac57bc785b81a68be6e0d41e310b0dc Mon Sep 17 00:00:00 2001 From: Richard Wu Date: Fri, 23 Nov 2018 20:20:26 -0500 Subject: [PATCH 09/14] Fixed report/status in get_featurizer_weights. --- repair/repair.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/repair/repair.py b/repair/repair.py index 0025e415f..bb74ad789 100644 --- a/repair/repair.py +++ b/repair/repair.py @@ -72,4 +72,4 @@ def get_featurizer_weights(self): report = self.repair_model.get_featurizer_weights(self.feat_dataset.featurizer_info, self.feat_dataset.debugging) toc = time.clock() report_time = toc - tic - return status, report_time + return report, report_time From 6a437c42c6cdde24396223d3c8c885e0ec917602 Mon Sep 17 00:00:00 2001 From: Richard Wu Date: Fri, 23 Nov 2018 20:48:34 -0500 Subject: [PATCH 10/14] Use weight-adjusted frequency and co-occur for multiple init values. --- dataset/dataset.py | 21 ++++++++++++++------- examples/start_example.sh | 3 ++- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/dataset/dataset.py b/dataset/dataset.py index 2c67cc6fb..b5a09aaff 100644 --- a/dataset/dataset.py +++ b/dataset/dataset.py @@ -296,9 +296,12 @@ def _get_init_stats_single(self, attr): # We need to iterate through this in a for loop instead of groupby & size # since our values may be '|||' separated freq_count = {} - for (vals,) in self.get_raw_data()[[attr]].itertuples(index=False): - for val in vals.split('|||'): - freq_count[val] = freq_count.get(val, 0) + 1 + for (cell,) in self.get_raw_data()[[attr]].itertuples(index=False): + vals = cell.split('|||') + for val in vals: + # Correct for if there are multiple values: equal weight all + # values and their contribution to counts + freq_count[val] = freq_count.get(val, 0) + 1. / len(vals) return freq_count def _get_current_stats_single(self, attr): @@ -328,11 +331,15 @@ def _get_init_stats_pair(self, first_attr, second_attr): # We need to iterate through this in a for loop instead of groupby & size # since our values may be '|||' separated cooccur_freq_count = {} - for vals1, vals2 in self.get_raw_data()[[first_attr,second_attr]].itertuples(index=False): - for val1 in vals1.split('|||'): + for cell1, cell2 in self.get_raw_data()[[first_attr,second_attr]].itertuples(index=False): + vals1 = cell1.split('|||') + vals2 = cell2.split('|||') + for val1 in vals1: cooccur_freq_count[val1] = cooccur_freq_count.get(val1, {}) - for val2 in vals2.split('|||'): - cooccur_freq_count[val1][val2] = cooccur_freq_count[val1].get(val2, 0) + 1 + for val2 in vals2: + # Correct for if there are multiple values: equal weight all + # co-pairs and their contribution to co-occur counts + cooccur_freq_count[val1][val2] = cooccur_freq_count[val1].get(val2, 0) + 1. / (len(vals1) * len(vals2)) return cooccur_freq_count def _get_current_stats_pair(self, first_attr, second_attr): diff --git a/examples/start_example.sh b/examples/start_example.sh index c0cb2508e..f26bcedd9 100755 --- a/examples/start_example.sh +++ b/examples/start_example.sh @@ -3,6 +3,7 @@ source ../set_env.sh echo "Launching example" -python holoclean_food.py +python holoclean_repair_example.py +# python holoclean_food.py From fd3ba08acb4480081da607f5a16640442b878e03 Mon Sep 17 00:00:00 2001 From: Richard Wu Date: Fri, 23 Nov 2018 21:26:31 -0500 Subject: [PATCH 11/14] Reduce epoches for faster runs. --- examples/holoclean_repair_example.py | 4 ++-- tests/test_holoclean_repair.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/holoclean_repair_example.py b/examples/holoclean_repair_example.py index b59ecdc6f..c8715ad75 100644 --- a/examples/holoclean_repair_example.py +++ b/examples/holoclean_repair_example.py @@ -12,7 +12,7 @@ # 1. Setup a HoloClean session. hc = holoclean.HoloClean( pruning_topk=0.1, - epochs=30, + epochs=10, weight_decay=0.01, threads=4, batch_size=1, @@ -43,5 +43,5 @@ # 5. Evaluate the correctness of the results. em_iter_func = lambda: hc.evaluate('../testdata/hospital_clean.csv', 'tid', 'attribute', 'correct_val') -hc.repair_errors(featurizers, em_iterations=1, em_iter_func=em_iter_func) +hc.repair_errors(featurizers, em_iterations=3, em_iter_func=em_iter_func) diff --git a/tests/test_holoclean_repair.py b/tests/test_holoclean_repair.py index 6163f6e7d..472773e8a 100644 --- a/tests/test_holoclean_repair.py +++ b/tests/test_holoclean_repair.py @@ -32,7 +32,7 @@ def test_hospital(self): # 5. Repair and evaluate the correctness of the results. eval_func = lambda: hc.evaluate('../testdata/hospital_clean.csv', 'tid', 'attribute', 'correct_val') - hc.repair_errors(featurizers, em_iterations=5, em_iter_func=eval_func) + hc.repair_errors(featurizers, em_iterations=3, em_iter_func=eval_func) if __name__ == '__main__': From 38b95b807231b1900d1066d5a419665d55da7f04 Mon Sep 17 00:00:00 2001 From: Richard Wu Date: Fri, 23 Nov 2018 21:45:10 -0500 Subject: [PATCH 12/14] Fixed correction factor for current value selection from multiple initial values. --- domain/domain.py | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/domain/domain.py b/domain/domain.py index 405847b28..7f9a2d1c2 100644 --- a/domain/domain.py +++ b/domain/domain.py @@ -359,16 +359,23 @@ def _init_and_current(self, attr, init_row): # initial values in this row, that is we calculate the sum of # # P(initial | other_init_val) = P(initial, other_init_val) / P(other_init_val) - # - # We subtract one for pair_stats since we do not want to include - # the co-occurrence of our current row (consider the extreme case - # where an errorneous initial value only occurs once: its co-occurrence - # probability will always be 1 but it does not tell us this value - # co-occurs most frequently with our other initial values). - cur_score = sum(float(pair_stats[attr][other_attr][init_val][other_val] - 1) / single_stats[attr][init_val] - for other_attr in attrs - if attr != other_attr - for other_val in init_row[other_attr].split('|||')) + cur_score = 0 + for other_attr in attrs: + if attr == other_attr: + continue + other_vals = init_row[other_attr].split('|||') + for other_val in other_vals: + # We subtract the co-occurrence weight for this current row + # from pair_stats since we do not want to include the + # co-occurrence of our current row. + # + # Consider the extreme case where an errorneous initial + # value only occurs once: its co-occurrence probability + # will always be 1 but it does not mean this value + # co-occurs most frequently with our other initial values. + cooccur_freq = pair_stats[attr][other_attr][init_val][other_val] - 1. / (len(other_vals) * len(init_values)) + + cur_score += float(cooccur_freq) / single_stats[attr][init_val] # Keep the best initial value only if best_score is None or cur_score > best_score: best_val = init_val From 3542531a4fc49de20565d1db9ca2b29117345598 Mon Sep 17 00:00:00 2001 From: Richard Wu Date: Sun, 25 Nov 2018 16:23:42 -0500 Subject: [PATCH 13/14] Added MultiInitDetector for marking cells with multiple initial values as DK. --- .gitignore | 1 + detect/__init__.py | 7 ++-- detect/multi_init_detector.py | 33 +++++++++++++++++++ detect/{nulldetector.py => null_detector.py} | 0 ...ationdetector.py => violation_detector.py} | 0 examples/holoclean_repair_example.py | 4 +-- repair/featurize/featurize.py | 5 +-- tests/test_holoclean_repair.py | 4 +-- 8 files changed, 45 insertions(+), 9 deletions(-) create mode 100644 detect/multi_init_detector.py rename detect/{nulldetector.py => null_detector.py} (100%) rename detect/{violationdetector.py => violation_detector.py} (100%) diff --git a/.gitignore b/.gitignore index 8e50c6b3d..5eef561fd 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ _templates .DS_store .venv *.swp +*.swn *.pyc # pip package metadata diff --git a/detect/__init__.py b/detect/__init__.py index e8764e549..aed62b0f6 100644 --- a/detect/__init__.py +++ b/detect/__init__.py @@ -1,6 +1,7 @@ from .detect import DetectEngine from .detector import Detector -from .nulldetector import NullDetector -from .violationdetector import ViolationDetector +from .multi_init_detector import MultiInitDetector +from .null_detector import NullDetector +from .violation_detector import ViolationDetector -__all__ = ['DetectEngine', 'Detector', 'NullDetector', 'ViolationDetector'] +__all__ = ['DetectEngine', 'Detector', 'MultiInitDetector', 'NullDetector', 'ViolationDetector'] diff --git a/detect/multi_init_detector.py b/detect/multi_init_detector.py new file mode 100644 index 000000000..c94301cc7 --- /dev/null +++ b/detect/multi_init_detector.py @@ -0,0 +1,33 @@ +import pandas as pd +from .detector import Detector + +class MultiInitDetector(Detector): + """ + MultiInitDetector detects every cell with multiple initial values as errors. + """ + def __init__(self, name='MultiInitDetector'): + super(MultiInitDetector, self).__init__(name) + + def setup(self, dataset, env): + self.ds = dataset + self.env = env + self.df = self.ds.get_raw_data() + + def detect_noisy_cells(self): + """ + detech_noisy_cells returns a pandas.DataFrame containing all cells with + multiple '|||' separated values. + + :return: pandas.DataFrame with columns: + _tid_: entity ID + attribute: attribute with NULL value for this entity + """ + attributes = self.ds.get_attributes() + errors = [] + for attr in attributes: + tmp_df = self.df[self.df[attr].str.contains('|||')]['_tid_'].to_frame() + tmp_df.insert(1, "attribute", attr) + errors.append(tmp_df) + errors_df = pd.concat(errors, ignore_index=True) + return errors_df + diff --git a/detect/nulldetector.py b/detect/null_detector.py similarity index 100% rename from detect/nulldetector.py rename to detect/null_detector.py diff --git a/detect/violationdetector.py b/detect/violation_detector.py similarity index 100% rename from detect/violationdetector.py rename to detect/violation_detector.py diff --git a/examples/holoclean_repair_example.py b/examples/holoclean_repair_example.py index c8715ad75..cfbe0f27d 100644 --- a/examples/holoclean_repair_example.py +++ b/examples/holoclean_repair_example.py @@ -1,5 +1,5 @@ import holoclean -from detect import NullDetector, ViolationDetector +from detect import MultiInitDetector, NullDetector, ViolationDetector from repair.featurize import CurrentFeaturizer from repair.featurize import CurrentAttrFeaturizer from repair.featurize import CurrentSimFeaturizer @@ -27,7 +27,7 @@ hc.ds.set_constraints(hc.get_dcs()) # 3. Detect erroneous cells using these two detectors. -detectors = [NullDetector(), ViolationDetector()] +detectors = [MultiInitDetector(), NullDetector(), ViolationDetector()] hc.detect_errors(detectors) # 4. Repair errors utilizing the defined features. diff --git a/repair/featurize/featurize.py b/repair/featurize/featurize.py index 69876b992..307a7a8b6 100644 --- a/repair/featurize/featurize.py +++ b/repair/featurize/featurize.py @@ -27,7 +27,7 @@ def __init__(self, dataset, env, featurizers): self.debugging[feat] = {} self.debugging[feat]['size'] = debug.shape self.debugging[feat]['weights'] = debug - + self.tensor = tensor # TODO: remove after we validate it is not needed. self.in_features = self.tensor.shape[2] @@ -37,7 +37,8 @@ def __init__(self, dataset, env, featurizers): def generate_weak_labels(self): """ generate_weak_labels returns a tensor where for each VID we have the - domain index of the current value. + domain index of the current value (our initial current value is our + target "weak" labels). :return: Torch.Tensor of size (# of variables) X 1 where tensor[i][0] contains the domain index of the current value for the i-th diff --git a/tests/test_holoclean_repair.py b/tests/test_holoclean_repair.py index 472773e8a..15933345c 100644 --- a/tests/test_holoclean_repair.py +++ b/tests/test_holoclean_repair.py @@ -1,7 +1,7 @@ import unittest import holoclean -from detect import NullDetector, ViolationDetector +from detect import MultiInitDetector, NullDetector, ViolationDetector from repair.featurize import CurrentFeaturizer from repair.featurize import CurrentAttrFeaturizer from repair.featurize import CurrentSimFeaturizer @@ -23,7 +23,7 @@ def test_hospital(self): hc.ds.set_constraints(hc.get_dcs()) # 3. Detect erroneous cells using these two detectors. - detectors = [NullDetector(), ViolationDetector()] + detectors = [MultiInitDetector(), NullDetector(), ViolationDetector()] hc.detect_errors(detectors) # 4. Repair errors utilizing the defined features. From 6413a300def96fe94392edead475c5669ce43c5e Mon Sep 17 00:00:00 2001 From: Richard Wu Date: Sun, 25 Nov 2018 16:44:37 -0500 Subject: [PATCH 14/14] Fixed str matching for MultiInitDetector. --- detect/multi_init_detector.py | 2 +- repair/featurize/featurize.py | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/detect/multi_init_detector.py b/detect/multi_init_detector.py index c94301cc7..32bdb9839 100644 --- a/detect/multi_init_detector.py +++ b/detect/multi_init_detector.py @@ -25,7 +25,7 @@ def detect_noisy_cells(self): attributes = self.ds.get_attributes() errors = [] for attr in attributes: - tmp_df = self.df[self.df[attr].str.contains('|||')]['_tid_'].to_frame() + tmp_df = self.df[self.df[attr].str.contains(r'\|\|\|')]['_tid_'].to_frame() tmp_df.insert(1, "attribute", attr) errors.append(tmp_df) errors_df = pd.concat(errors, ignore_index=True) diff --git a/repair/featurize/featurize.py b/repair/featurize/featurize.py index 307a7a8b6..00326ab79 100644 --- a/repair/featurize/featurize.py +++ b/repair/featurize/featurize.py @@ -45,6 +45,11 @@ def generate_weak_labels(self): variable/VID. """ logging.debug("Generating weak labels.") + + # We include "t1.fixed = 1" i.e. cells with randomly generated domains + # are included in our weak labels: we want to keep their values as is. + # The other domain values in the random domain are negative samples + # for training. query = """ SELECT _vid_,