From 6db22a6d991e1eb89fa609dff581fd92ca660cc9 Mon Sep 17 00:00:00 2001 From: Omar Yousry Attia Date: Sat, 23 Feb 2019 23:28:28 -0500 Subject: [PATCH 01/16] Adding extra example for loading errors from database (#82) --- examples/holoclean_repair_example_db.py | 56 +++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 examples/holoclean_repair_example_db.py diff --git a/examples/holoclean_repair_example_db.py b/examples/holoclean_repair_example_db.py new file mode 100644 index 000000000..d9bd52006 --- /dev/null +++ b/examples/holoclean_repair_example_db.py @@ -0,0 +1,56 @@ +import sys +sys.path.append('../') +import holoclean +from detect import ErrorsLoaderDetector +from repair.featurize import * + + +# 1. Setup a HoloClean session. +hc = holoclean.HoloClean( + db_name='holo', + domain_thresh_1=0, + domain_thresh_2=0, + weak_label_thresh=0.99, + max_domain=10000, + cor_strength=0.6, + nb_cor_strength=0.8, + epochs=10, + weight_decay=0.01, + learning_rate=0.001, + threads=1, + batch_size=1, + verbose=True, + timeout=3*60000, + feature_norm=False, + weight_norm=False, + print_fw=True +).session + +# 2. Load training data and denial constraints. +hc.load_data('hospital', '../testdata/hospital.csv') +hc.load_dcs('../testdata/hospital_constraints.txt') +hc.ds.set_constraints(hc.get_dcs()) + +# 3. Detect erroneous cells. +error_loader = ErrorsLoaderDetector( + db_engine=hc.ds.engine, + schema_name='hospital', + table_name='dk_cells' +) +hc.detect_errors([error_loader]) + +# 4. Repair errors utilizing the defined features. +hc.setup_domain() +featurizers = [ + OccurAttrFeaturizer(), + FreqFeaturizer(), + ConstraintFeaturizer(), +] + +hc.repair_errors(featurizers) + +# 5. Evaluate the correctness of the results. +hc.evaluate(fpath='../testdata/hospital_clean.csv', + tid_col='tid', + attr_col='attribute', + val_col='correct_val') From 57a418707b5b610df59f86a0747006337de15a81 Mon Sep 17 00:00:00 2001 From: stoke Date: Thu, 4 Apr 2019 16:08:17 -0400 Subject: [PATCH 02/16] fix bugs on ConstraintFeaturizer When generating relaxed sql, the operation should be flipped to keep the sql right when non-symmetric operation is used in DC. --- dcparser/constraint.py | 13 +++++++++++++ repair/featurize/constraintfeat.py | 6 +++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/dcparser/constraint.py b/dcparser/constraint.py index 728170b2f..899933f1a 100644 --- a/dcparser/constraint.py +++ b/dcparser/constraint.py @@ -10,6 +10,19 @@ def is_symmetric(operation): return False +def get_flip_operation(operation): + if operation == '<=': + return '>=' + elif operation == '>=': + return '<=' + elif operation == '<': + return '>' + elif operation == '>': + return '<' + else: + return operation + + def contains_operation(string): """ Method to check if a given string contains one of the operation signs. diff --git a/repair/featurize/constraintfeat.py b/repair/featurize/constraintfeat.py index 8dfdb37ff..8ef7f1022 100644 --- a/repair/featurize/constraintfeat.py +++ b/repair/featurize/constraintfeat.py @@ -6,7 +6,7 @@ from .featurizer import Featurizer from dataset import AuxTables -from dcparser.constraint import is_symmetric +from dcparser.constraint import is_symmetric, get_flip_operation # unary_template is used for constraints where the current predicate # used for detecting violations in pos_values have a reference to only @@ -111,6 +111,10 @@ def relax_binary_predicate(self, predicate, rel_idx): """ attr = predicate.components[rel_idx][1] op = predicate.operation + # the latter one should flip the operation, + # if t3.rv_val is always on the left side in query template + if rel_idx == 1: + op = get_flip_operation(op) const = '{}."{}"'.format( predicate.components[1-rel_idx][0], predicate.components[1-rel_idx][1]) From 5a7b10bff729350aaad9b3a02c445e297077e39d Mon Sep 17 00:00:00 2001 From: Richard Wu Date: Tue, 23 Apr 2019 19:25:08 -0400 Subject: [PATCH 03/16] Fix bug where NULL cells with one candidate value was skipped. --- domain/domain.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/domain/domain.py b/domain/domain.py index 55fbeee0f..8c1a0948c 100644 --- a/domain/domain.py +++ b/domain/domain.py @@ -254,7 +254,11 @@ def generate_domain(self): # Initial value is NULL and we cannot come up with # a domain; a random domain probably won't help us so # completely ignore this cell and continue. - if init_value == NULL_REPR: + # Note if len(dom) == 1, then we generated a single correct + # value (since NULL is not included in the domain). + # This would be a "SINGLE_VALUE" example and we'd still + # like to generate a random domain for it. + if init_value == NULL_REPR and len(dom) == 0: continue # Not enough domain values, we need to get some random From 53a4b3a25fffa2143b924d6bbe8742a01574f062 Mon Sep 17 00:00:00 2001 From: Trung Huynh Date: Sat, 11 Jul 2020 18:04:08 +0100 Subject: [PATCH 04/16] Packaging holoclean as a lib --- dataset/dataset.py | 2 +- dataset/quantization.py | 2 +- detect/detect.py | 2 +- detect/errorloaderdetector.py | 14 ++++----- detect/nulldetector.py | 2 +- domain/correlations.py | 2 +- domain/domain.py | 4 +-- domain/estimators/logistic.py | 2 +- domain/estimators/naive_bayes.py | 2 +- domain/estimators/tuple_embedding.py | 4 +-- evaluate/eval.py | 6 ++-- examples/holoclean_repair_example.py | 9 ++++-- holoclean.py | 16 +++++----- repair/featurize/constraintfeat.py | 4 +-- repair/featurize/embeddingfeat.py | 4 +-- repair/featurize/featurized_dataset.py | 4 +-- repair/featurize/freqfeat.py | 2 +- repair/featurize/initattrfeat.py | 2 +- repair/featurize/initsimfeat.py | 2 +- repair/featurize/langmodelfeat.py | 2 +- repair/featurize/occurattrfeat.py | 4 +-- repair/repair.py | 2 +- setupy.py | 41 ++++++++++++++++++++++++++ 23 files changed, 89 insertions(+), 45 deletions(-) create mode 100644 setupy.py diff --git a/dataset/dataset.py b/dataset/dataset.py index 335b8efb2..b293a2f96 100644 --- a/dataset/dataset.py +++ b/dataset/dataset.py @@ -8,7 +8,7 @@ from .dbengine import DBengine from .table import Table, Source -from utils import dictify_df, NULL_REPR +from ..utils import dictify_df, NULL_REPR class AuxTables(Enum): diff --git a/dataset/quantization.py b/dataset/quantization.py index 653d6e078..7a7d02877 100644 --- a/dataset/quantization.py +++ b/dataset/quantization.py @@ -2,7 +2,7 @@ import numpy as np from sklearn.cluster import KMeans -from utils import NULL_REPR +from ..utils import NULL_REPR def quantize_km(env, df_raw, num_attr_groups_bins): diff --git a/detect/detect.py b/detect/detect.py index 3ba151c85..7ee6ed386 100644 --- a/detect/detect.py +++ b/detect/detect.py @@ -3,7 +3,7 @@ import pandas as pd -from dataset import AuxTables +from ..dataset import AuxTables class DetectEngine: diff --git a/detect/errorloaderdetector.py b/detect/errorloaderdetector.py index bafdc53f1..f580de37b 100644 --- a/detect/errorloaderdetector.py +++ b/detect/errorloaderdetector.py @@ -1,6 +1,6 @@ import pandas as pd -from dataset.table import Table, Source +from ..dataset.table import Table, Source from .detector import Detector @@ -10,12 +10,12 @@ class ErrorsLoaderDetector(Detector): id_col: entity ID attr_col: attribute in violation in the format id_col, attr_col - Can load these erros from a csv file, a relational table, or a pandas + Can load these erros from a csv file, a relational table, or a pandas dataframe with the same format. """ def __init__(self, fpath=None, df=None, db_engine=None, table_name=None, schema_name=None, - id_col="_tid_", attr_col="attribute", + id_col="_tid_", attr_col="attribute", name="ErrorLoaderDetector"): """ :param fpath: (str) Path to source csv file to load errors @@ -28,7 +28,7 @@ def __init__(self, fpath=None, df=None, :param name: (str) name of the detector To load from csv file, :param fpath: must be specified. - To load from a relational table, :param db_engine:, and + To load from a relational table, :param db_engine:, and :param table_name: must be specified, optionally specifying :param schema_name:. """ super(ErrorsLoaderDetector, self).__init__(name) @@ -46,15 +46,15 @@ def __init__(self, fpath=None, df=None, else: raise Exception("ERROR while intializing ErrorsLoaderDetector. Please provide (), ( and ), OR ") - self.errors_table = Table(dataset_name, src, + self.errors_table = Table(dataset_name, src, exclude_attr_cols=[attr_col], fpath=fpath, df=df, schema_name=schema_name, db_engine=db_engine) - + expected_schema = [id_col, attr_col] if list(self.errors_table.df.columns) != expected_schema: raise Exception("ERROR while intializing ErrorsLoaderDetector: The loaded errors table does not match the expected schema of {}".format(expected_schema)) - + self.errors_table.df = self.errors_table.df.astype({ id_col: int, attr_col: str diff --git a/detect/nulldetector.py b/detect/nulldetector.py index 342f83c03..5e3585a11 100644 --- a/detect/nulldetector.py +++ b/detect/nulldetector.py @@ -1,7 +1,7 @@ import pandas as pd from .detector import Detector -from utils import NULL_REPR +from ..utils import NULL_REPR class NullDetector(Detector): diff --git a/domain/correlations.py b/domain/correlations.py index b8929d83e..5a44e8a2e 100644 --- a/domain/correlations.py +++ b/domain/correlations.py @@ -1,6 +1,6 @@ from pyitlib import discrete_random_variable as drv -from utils import NULL_REPR +from ..utils import NULL_REPR def compute_norm_cond_entropy_corr(data_df, attrs_from, attrs_to): """ diff --git a/domain/domain.py b/domain/domain.py index 1b97871e4..e363f28b8 100644 --- a/domain/domain.py +++ b/domain/domain.py @@ -8,10 +8,10 @@ import numpy as np from tqdm import tqdm -from dataset import AuxTables, CellStatus +from ..dataset import AuxTables, CellStatus from .estimators import * from .correlations import compute_norm_cond_entropy_corr -from utils import NULL_REPR +from ..utils import NULL_REPR class DomainEngine: diff --git a/domain/estimators/logistic.py b/domain/estimators/logistic.py index 90c936a6f..963465ce9 100644 --- a/domain/estimators/logistic.py +++ b/domain/estimators/logistic.py @@ -8,7 +8,7 @@ from tqdm import tqdm from ..estimator import Estimator -from utils import NULL_REPR, NA_COOCCUR_FV +from ...utils import NULL_REPR, NA_COOCCUR_FV class Logistic(Estimator, torch.nn.Module): diff --git a/domain/estimators/naive_bayes.py b/domain/estimators/naive_bayes.py index 6b9b3faf7..46ce51a2b 100644 --- a/domain/estimators/naive_bayes.py +++ b/domain/estimators/naive_bayes.py @@ -4,7 +4,7 @@ from tqdm import tqdm from ..estimator import Estimator -from utils import NULL_REPR +from ...utils import NULL_REPR class NaiveBayes(Estimator): diff --git a/domain/estimators/tuple_embedding.py b/domain/estimators/tuple_embedding.py index 632f4c981..61951fd19 100644 --- a/domain/estimators/tuple_embedding.py +++ b/domain/estimators/tuple_embedding.py @@ -14,9 +14,9 @@ import torch.nn.functional as F from tqdm import tqdm -from dataset import AuxTables +from ...dataset import AuxTables from ..estimator import Estimator -from utils import NULL_REPR +from ...utils import NULL_REPR NONNUMERICS = "[^0-9+-.e]" diff --git a/evaluate/eval.py b/evaluate/eval.py index cb1315c9e..afe814489 100644 --- a/evaluate/eval.py +++ b/evaluate/eval.py @@ -6,9 +6,9 @@ import pandas as pd -from dataset import AuxTables -from dataset.table import Table, Source -from utils import NULL_REPR +from ..dataset import AuxTables +from ..dataset.table import Table, Source +from ..utils import NULL_REPR report_name_list = ['precision', 'recall', 'repair_recall', 'f1', 'repair_f1', 'detected_errors', 'total_errors', 'correct_repairs', 'total_repairs', diff --git a/examples/holoclean_repair_example.py b/examples/holoclean_repair_example.py index 99d7abd44..13e9d3e6e 100644 --- a/examples/holoclean_repair_example.py +++ b/examples/holoclean_repair_example.py @@ -1,10 +1,13 @@ import holoclean -from detect import * -from repair.featurize import * +from holoclean.detect import * +from holoclean.repair.featurize import * # 1. Setup a HoloClean session. hc = holoclean.HoloClean( - db_name='holo', + db_name='superset', + db_pwd='superset', + db_host='localhost', + db_user='superset', domain_thresh_1=0.0, domain_thresh_2=0.0, weak_label_thresh=0.99, diff --git a/holoclean.py b/holoclean.py index c13b06a5a..df83649fd 100644 --- a/holoclean.py +++ b/holoclean.py @@ -7,14 +7,14 @@ import numpy as np import pandas as pd -from dataset import Dataset, Table, Source, AuxTables -from dcparser import Parser -from domain import DomainEngine -from detect import DetectEngine -from repair import RepairEngine -from evaluate import EvalEngine -from dataset.quantization import quantize_km -from utils import NULL_REPR +from .dataset import Dataset, Table, Source, AuxTables +from .dcparser import Parser +from .domain import DomainEngine +from .detect import DetectEngine +from .repair import RepairEngine +from .evaluate import EvalEngine +from .dataset.quantization import quantize_km +from .utils import NULL_REPR logging.basicConfig(format="%(asctime)s - [%(levelname)5s] - %(message)s", datefmt='%H:%M:%S') diff --git a/repair/featurize/constraintfeat.py b/repair/featurize/constraintfeat.py index 164a15f0a..bf3c9d564 100644 --- a/repair/featurize/constraintfeat.py +++ b/repair/featurize/constraintfeat.py @@ -5,8 +5,8 @@ import torch.nn.functional as F from .featurizer import Featurizer -from dataset import AuxTables -from dcparser.constraint import is_symmetric +from ...dataset import AuxTables +from ...dcparser.constraint import is_symmetric # unary_template is used for constraints where the current predicate # used for detecting violations in pos_values have a reference to only diff --git a/repair/featurize/embeddingfeat.py b/repair/featurize/embeddingfeat.py index 7d86f308f..f69a3cb5d 100644 --- a/repair/featurize/embeddingfeat.py +++ b/repair/featurize/embeddingfeat.py @@ -9,8 +9,8 @@ from torch.nn import functional as F from .featurizer import Featurizer -from dataset import AuxTables -from domain.estimators import TupleEmbedding +from ...dataset import AuxTables +from ...domain.estimators import TupleEmbedding class EmbeddingFeaturizer(Featurizer): diff --git a/repair/featurize/featurized_dataset.py b/repair/featurize/featurized_dataset.py index 8d0f48bce..edce65664 100644 --- a/repair/featurize/featurized_dataset.py +++ b/repair/featurize/featurized_dataset.py @@ -7,8 +7,8 @@ import torch import torch.nn.functional as F -from dataset import AuxTables, CellStatus -from utils import NULL_REPR +from ...dataset import AuxTables, CellStatus +from ...utils import NULL_REPR FeatInfo = namedtuple('FeatInfo', ['name', 'size', 'learnable', 'init_weight', 'feature_names']) diff --git a/repair/featurize/freqfeat.py b/repair/featurize/freqfeat.py index 1e9656560..9b8531e3e 100644 --- a/repair/featurize/freqfeat.py +++ b/repair/featurize/freqfeat.py @@ -1,6 +1,6 @@ import torch -from dataset import AuxTables +from ...dataset import AuxTables from .featurizer import Featurizer diff --git a/repair/featurize/initattrfeat.py b/repair/featurize/initattrfeat.py index 9ad87deaf..35fc9136b 100644 --- a/repair/featurize/initattrfeat.py +++ b/repair/featurize/initattrfeat.py @@ -2,7 +2,7 @@ import torch -from dataset import AuxTables +from ...dataset import AuxTables from .featurizer import Featurizer diff --git a/repair/featurize/initsimfeat.py b/repair/featurize/initsimfeat.py index 364e0f471..693d04f01 100644 --- a/repair/featurize/initsimfeat.py +++ b/repair/featurize/initsimfeat.py @@ -3,7 +3,7 @@ import torch import Levenshtein -from dataset import AuxTables +from ...dataset import AuxTables from .featurizer import Featurizer diff --git a/repair/featurize/langmodelfeat.py b/repair/featurize/langmodelfeat.py index 7332daf18..7344cc695 100644 --- a/repair/featurize/langmodelfeat.py +++ b/repair/featurize/langmodelfeat.py @@ -1,7 +1,7 @@ import torch from gensim.models import FastText -from dataset import AuxTables +from ...dataset import AuxTables from .featurizer import Featurizer diff --git a/repair/featurize/occurattrfeat.py b/repair/featurize/occurattrfeat.py index f266d1dde..175b0a0fd 100644 --- a/repair/featurize/occurattrfeat.py +++ b/repair/featurize/occurattrfeat.py @@ -2,8 +2,8 @@ from tqdm import tqdm from .featurizer import Featurizer -from dataset import AuxTables -from utils import NULL_REPR, NA_COOCCUR_FV +from ...dataset import AuxTables +from ...utils import NULL_REPR, NA_COOCCUR_FV class OccurAttrFeaturizer(Featurizer): diff --git a/repair/repair.py b/repair/repair.py index 785114fd0..77f01c6a5 100644 --- a/repair/repair.py +++ b/repair/repair.py @@ -5,7 +5,7 @@ from .featurize import FeaturizedDataset from .learn import RepairModel -from dataset import AuxTables +from ..dataset import AuxTables class RepairEngine: diff --git a/setupy.py b/setupy.py new file mode 100644 index 000000000..2b3619d9a --- /dev/null +++ b/setupy.py @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import os +import sys +from setuptools import find_packages, setup + +if sys.version_info < (3, 6): + sys.exit("Sorry, Python < 3.6 is not supported") + +BASE_DIR = os.path.abspath(os.path.dirname(__file__)) + +version_string = "1.0" + +setup( + name="holoclean", + description=("A data imputation lib based on deep learning."), + version=version_string, + package_dir={"holoclean": "."}, + include_package_data=True, + python_requires="~=3.6", + author="Apache Software Foundation", + classifiers=[ + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + ], + tests_require=["flask-testing==0.7.1"], +) From c2a1cf34f7b230703e362cccccf6423aefa18966 Mon Sep 17 00:00:00 2001 From: Trung Huynh Date: Sat, 11 Jul 2020 19:08:33 +0100 Subject: [PATCH 05/16] Renamed setupy.py to setup.py --- setupy.py => setup.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename setupy.py => setup.py (100%) diff --git a/setupy.py b/setup.py similarity index 100% rename from setupy.py rename to setup.py From 3553ca7065760b9cea9c4db4166c5c56d3245f6c Mon Sep 17 00:00:00 2001 From: Trung Huynh Date: Mon, 13 Jul 2020 10:24:52 +0100 Subject: [PATCH 06/16] Fixed relative imports for holoclean.py --- holoclean.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/holoclean.py b/holoclean.py index df83649fd..e0a751f34 100644 --- a/holoclean.py +++ b/holoclean.py @@ -7,14 +7,14 @@ import numpy as np import pandas as pd -from .dataset import Dataset, Table, Source, AuxTables -from .dcparser import Parser -from .domain import DomainEngine -from .detect import DetectEngine -from .repair import RepairEngine -from .evaluate import EvalEngine -from .dataset.quantization import quantize_km -from .utils import NULL_REPR +from holoclean.dataset import Dataset, Table, Source, AuxTables +from holoclean.dcparser import Parser +from holoclean.domain import DomainEngine +from holoclean.detect import DetectEngine +from holoclean.repair import RepairEngine +from holoclean.evaluate import EvalEngine +from holoclean.dataset.quantization import quantize_km +from holoclean.utils import NULL_REPR logging.basicConfig(format="%(asctime)s - [%(levelname)5s] - %(message)s", datefmt='%H:%M:%S') From ef58c8de651f7bb4b7e17bf22a562ff67521f08b Mon Sep 17 00:00:00 2001 From: Trung Huynh Date: Mon, 13 Jul 2020 11:42:39 +0100 Subject: [PATCH 07/16] Fixed setup.py and package name for it to work properly --- holoclean.py | 16 ++++++++-------- setup.py | 1 + 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/holoclean.py b/holoclean.py index e0a751f34..df83649fd 100644 --- a/holoclean.py +++ b/holoclean.py @@ -7,14 +7,14 @@ import numpy as np import pandas as pd -from holoclean.dataset import Dataset, Table, Source, AuxTables -from holoclean.dcparser import Parser -from holoclean.domain import DomainEngine -from holoclean.detect import DetectEngine -from holoclean.repair import RepairEngine -from holoclean.evaluate import EvalEngine -from holoclean.dataset.quantization import quantize_km -from holoclean.utils import NULL_REPR +from .dataset import Dataset, Table, Source, AuxTables +from .dcparser import Parser +from .domain import DomainEngine +from .detect import DetectEngine +from .repair import RepairEngine +from .evaluate import EvalEngine +from .dataset.quantization import quantize_km +from .utils import NULL_REPR logging.basicConfig(format="%(asctime)s - [%(levelname)5s] - %(message)s", datefmt='%H:%M:%S') diff --git a/setup.py b/setup.py index 2b3619d9a..d5ce8beb2 100644 --- a/setup.py +++ b/setup.py @@ -30,6 +30,7 @@ description=("A data imputation lib based on deep learning."), version=version_string, package_dir={"holoclean": "."}, + packages=["holoclean"] + ["holoclean." + package for package in find_packages(where=".")], include_package_data=True, python_requires="~=3.6", author="Apache Software Foundation", From 66815de9ca61ddac09045da41dbe6bda4ca56260 Mon Sep 17 00:00:00 2001 From: Trung Huynh Date: Mon, 3 Aug 2020 17:36:23 +0100 Subject: [PATCH 08/16] Fixed empty constraint for 1 row and an operation --- dcparser/constraint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dcparser/constraint.py b/dcparser/constraint.py index 899933f1a..9f70098d7 100644 --- a/dcparser/constraint.py +++ b/dcparser/constraint.py @@ -59,7 +59,7 @@ def __init__(self, dc_string, schema): # Find all tuple names used in DC logging.debug('DONE pre-processing constraint: %s', dc_string) for component in split: - if contains_operation(component): + if contains_operation(component) is not None: break else: self.tuple_names.append(component) From a0d2ac65aa687b862c1545aeba7f02187f0a7ba7 Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Fri, 2 Oct 2020 15:02:55 +0000 Subject: [PATCH 09/16] Ignore vscode setting --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 6e29259e2..138979632 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,8 @@ _templates *.pyc .cache/ +.vscode + # pip package metadata *egg-info/ From 36aa7b50198851eedf0a165529719daa9b43305f Mon Sep 17 00:00:00 2001 From: Jason Chen Date: Sat, 3 Oct 2020 15:59:07 +0000 Subject: [PATCH 10/16] Use Blake to reformat --- dataset/dataset.py | 198 +++++++++++++++++++++++++++++---------------- dataset/table.py | 65 +++++++++++---- 2 files changed, 179 insertions(+), 84 deletions(-) diff --git a/dataset/dataset.py b/dataset/dataset.py index 335b8efb2..d9beedbfb 100644 --- a/dataset/dataset.py +++ b/dataset/dataset.py @@ -12,24 +12,26 @@ class AuxTables(Enum): - c_cells = 1 - dk_cells = 2 - cell_domain = 3 - pos_values = 4 - cell_distr = 5 + c_cells = 1 + dk_cells = 2 + cell_domain = 3 + pos_values = 4 + cell_distr = 5 inf_values_idx = 6 inf_values_dom = 7 class CellStatus(Enum): - NOT_SET = 0 - WEAK_LABEL = 1 - SINGLE_VALUE = 2 + NOT_SET = 0 + WEAK_LABEL = 1 + SINGLE_VALUE = 2 + class Dataset: """ This class keeps all dataframes and tables for a HC session. """ + def __init__(self, name, env): self.env = env self.id = name @@ -41,12 +43,12 @@ def __init__(self, name, env): self.aux_table[tab] = None # start dbengine self.engine = DBengine( - env['db_user'], - env['db_pwd'], - env['db_name'], - env['db_host'], - pool_size=env['threads'], - timeout=env['timeout'] + env["db_user"], + env["db_pwd"], + env["db_name"], + env["db_host"], + pool_size=env["threads"], + timeout=env["timeout"], ) # members to convert (tuple_id, attribute) to cell_id self.attr_to_idx = {} @@ -76,8 +78,17 @@ def __init__(self, name, env): self.do_quantization = False # TODO(richardwu): load more than just CSV files - def load_data(self, name, fpath, na_values=None, entity_col=None, src_col=None, - exclude_attr_cols=None, numerical_attrs=None, store_to_db=True): + def load_data( + self, + name, + fpath, + na_values=None, + entity_col=None, + src_col=None, + exclude_attr_cols=None, + numerical_attrs=None, + store_to_db=True, + ): """ load_data takes a CSV file of the initial data, adds tuple IDs (_tid_) to each row to uniquely identify an 'entity', and generates unique @@ -102,15 +113,20 @@ def load_data(self, name, fpath, na_values=None, entity_col=None, src_col=None, try: # Do not include TID and source column as trainable attributes if exclude_attr_cols is None: - exclude_attr_cols = ['_tid_'] + exclude_attr_cols = ["_tid_"] else: - exclude_attr_cols.append('_tid_') + exclude_attr_cols.append("_tid_") if src_col is not None: exclude_attr_cols.append(src_col) # Load raw CSV file/data into a Postgres table 'name' (param). - self.raw_data = Table(name, Source.FILE, na_values=na_values, - exclude_attr_cols=exclude_attr_cols, fpath=fpath) + self.raw_data = Table( + name, + Source.FILE, + na_values=na_values, + exclude_attr_cols=exclude_attr_cols, + fpath=fpath, + ) df = self.raw_data.df # Add _tid_ column to dataset that uniquely identifies an entity. @@ -118,14 +134,16 @@ def load_data(self, name, fpath, na_values=None, entity_col=None, src_col=None, # Otherwise we use the entity values directly as _tid_'s. if entity_col is None: # auto-increment - df.insert(0, '_tid_', range(0,len(df))) + df.insert(0, "_tid_", range(0, len(df))) else: # use entity IDs as _tid_'s directly - df.rename({entity_col: '_tid_'}, axis='columns', inplace=True) + df.rename({entity_col: "_tid_"}, axis="columns", inplace=True) self.numerical_attrs = numerical_attrs or [] all_attrs = self.raw_data.get_attributes() - self.categorical_attrs = [attr for attr in all_attrs if attr not in self.numerical_attrs] + self.categorical_attrs = [ + attr for attr in all_attrs if attr not in self.numerical_attrs + ] if store_to_db: # Now df is all in str type, make a copy of df and then @@ -134,36 +152,48 @@ def load_data(self, name, fpath, na_values=None, entity_col=None, src_col=None, # 3. store the correct type into db (categorical->str, numerical->float) df_correct_type = df.copy() for attr in self.categorical_attrs: - df_correct_type.loc[df_correct_type[attr].isnull(), attr] = NULL_REPR + df_correct_type.loc[ + df_correct_type[attr].isnull(), attr + ] = NULL_REPR for attr in self.numerical_attrs: - df_correct_type[attr] = df_correct_type[attr].astype(float) + df_correct_type[attr] = df_correct_type[attr].astype(float) - df_correct_type.to_sql(self.raw_data.name, self.engine.engine, if_exists='replace', index=False, - index_label=None) + df_correct_type.to_sql( + self.raw_data.name, + self.engine.engine, + if_exists="replace", + index=False, + index_label=None, + ) # for df, which is all str # Use NULL_REPR to represent NULL values - df.replace('', NULL_REPR, inplace=True) + df.replace("", NULL_REPR, inplace=True) df.fillna(NULL_REPR, inplace=True) - logging.info("Loaded %d rows with %d cells", self.raw_data.df.shape[0], - self.raw_data.df.shape[0] * self.raw_data.df.shape[1]) + logging.info( + "Loaded %d rows with %d cells", + self.raw_data.df.shape[0], + self.raw_data.df.shape[0] * self.raw_data.df.shape[1], + ) # Call to store to database - status = 'DONE Loading {fname}'.format(fname=os.path.basename(fpath)) + status = "DONE Loading {fname}".format(fname=os.path.basename(fpath)) if store_to_db: # Generate indexes on attribute columns for faster queries for attr in self.raw_data.get_attributes(): # Generate index on attribute - self.raw_data.create_db_index(self.engine,[attr]) + self.raw_data.create_db_index(self.engine, [attr]) # Create attr_to_idx dictionary (assign unique index for each attribute) # and attr_count (total # of attributes) - self.attr_to_idx = {attr: idx for idx, attr in enumerate(self.raw_data.get_attributes())} + self.attr_to_idx = { + attr: idx for idx, attr in enumerate(self.raw_data.get_attributes()) + } self.attr_count = len(self.attr_to_idx) except Exception: - logging.error('loading data for table %s', name) + logging.error("loading data for table %s", name) raise toc = time.clock() load_time = toc - tic @@ -197,7 +227,7 @@ def generate_aux_table(self, aux_table, df, store=False, index_attrs=False): if store and index_attrs: self.aux_table[aux_table].create_db_index(self.engine, index_attrs) except Exception: - logging.error('generating aux_table %s', aux_table.name) + logging.error("generating aux_table %s", aux_table.name) raise def generate_aux_table_sql(self, aux_table, query, index_attrs=False): @@ -206,12 +236,14 @@ def generate_aux_table_sql(self, aux_table, query, index_attrs=False): :param query: (str) SQL query whose result is used for generating the auxiliary table. """ try: - self.aux_table[aux_table] = Table(aux_table.name, Source.SQL, table_query=query, db_engine=self.engine) + self.aux_table[aux_table] = Table( + aux_table.name, Source.SQL, table_query=query, db_engine=self.engine + ) if index_attrs: self.aux_table[aux_table].create_df_index(index_attrs) self.aux_table[aux_table].create_db_index(self.engine, index_attrs) except Exception: - logging.error('generating aux_table %s', aux_table.name) + logging.error("generating aux_table %s", aux_table.name) raise def get_raw_data(self): @@ -219,7 +251,7 @@ def get_raw_data(self): get_raw_data returns a pandas.DataFrame containing the raw data as it was initially loaded. """ if self.raw_data is None: - raise Exception('ERROR No dataset loaded') + raise Exception("ERROR No dataset loaded") return self.raw_data.df def get_quantized_data(self): @@ -228,7 +260,7 @@ def get_quantized_data(self): :return: the data after quantization in pandas.DataFrame """ if self.quantized_data is None: - raise Exception('ERROR No dataset quantized') + raise Exception("ERROR No dataset quantized") return self.quantized_data.df def get_attributes(self): @@ -237,7 +269,7 @@ def get_attributes(self): columns like _tid_). """ if self.raw_data is None: - raise Exception('ERROR No dataset loaded') + raise Exception("ERROR No dataset loaded") return self.raw_data.get_attributes() def get_active_attributes(self): @@ -252,14 +284,16 @@ def get_active_attributes(self): if self.train_attrs is None: self.train_attrs = self.get_attributes() - if self.env['infer_mode'] == 'dk': + if self.env["infer_mode"] == "dk": if self._active_attributes is None: - raise Exception('ERROR no active attributes loaded. Run error detection first.') + raise Exception( + "ERROR no active attributes loaded. Run error detection first." + ) attrs = self._active_attributes - elif self.env['infer_mode'] == 'all': + elif self.env["infer_mode"] == "all": attrs = self.get_attributes() else: - raise Exception('infer mode must be one of {dk, all}') + raise Exception("infer mode must be one of {dk, all}") return sorted([attr for attr in attrs if attr in self.train_attrs]) @@ -269,7 +303,7 @@ def get_cell_id(self, tuple_id, attr_name): Cell ID: _tid_ * (# of attributes) + attr_idx """ - vid = tuple_id*self.attr_count + self.attr_to_idx[attr_name] + vid = tuple_id * self.attr_count + self.attr_to_idx[attr_name] return vid def get_statistics(self): @@ -293,10 +327,12 @@ def get_statistics(self): Also, values that only co-occur with NULLs will NOT be in pair_attr_stats. """ if not self.stats_ready: - logging.debug('computing frequency and co-occurrence statistics from raw data...') + logging.debug( + "computing frequency and co-occurrence statistics from raw data..." + ) tic = time.clock() self.collect_stats() - logging.debug('DONE computing statistics in %.2fs', time.clock() - tic) + logging.debug("DONE computing statistics in %.2fs", time.clock() - tic) stats = (self.total_tuples, self.single_attr_stats, self.pair_attr_stats) self.stats_ready = True @@ -324,7 +360,9 @@ def collect_stats(self): self.pair_attr_stats[cond_attr] = {} for trg_attr in self.get_attributes(): if trg_attr != cond_attr: - self.pair_attr_stats[cond_attr][trg_attr] = self.get_stats_pair(cond_attr, trg_attr) + self.pair_attr_stats[cond_attr][trg_attr] = self.get_stats_pair( + cond_attr, trg_attr + ) def get_stats_single(self, attr): """ @@ -333,8 +371,16 @@ def get_stats_single(self, attr): """ # need to decode values into unicode strings since we do lookups via # unicode strings from Postgres - data_df = self.get_quantized_data() if self.do_quantization else self.get_raw_data() - return data_df[[attr]].loc[data_df[attr] != NULL_REPR].groupby([attr]).size().to_dict() + data_df = ( + self.get_quantized_data() if self.do_quantization else self.get_raw_data() + ) + return ( + data_df[[attr]] + .loc[data_df[attr] != NULL_REPR] + .groupby([attr]) + .size() + .to_dict() + ) def get_stats_pair(self, first_attr, second_attr): """ @@ -344,19 +390,27 @@ def get_stats_pair(self, first_attr, second_attr): : frequency (# of entities) where first_attr= AND second_attr= Filters out NULL values so no entries in the dictionary would have NULLs. """ - data_df = self.get_quantized_data() if self.do_quantization else self.get_raw_data() - tmp_df = data_df[[first_attr, second_attr]]\ - .loc[(data_df[first_attr] != NULL_REPR) & (data_df[second_attr] != NULL_REPR)]\ - .groupby([first_attr, second_attr])\ - .size()\ + data_df = ( + self.get_quantized_data() if self.do_quantization else self.get_raw_data() + ) + tmp_df = ( + data_df[[first_attr, second_attr]] + .loc[ + (data_df[first_attr] != NULL_REPR) & (data_df[second_attr] != NULL_REPR) + ] + .groupby([first_attr, second_attr]) + .size() .reset_index(name="count") + ) return dictify_df(tmp_df) def get_domain_info(self): """ Returns (number of random variables, count of distinct values across all attributes). """ - query = 'SELECT count(_vid_), max(domain_size) FROM %s'%AuxTables.cell_domain.name + query = ( + "SELECT count(_vid_), max(domain_size) FROM %s" % AuxTables.cell_domain.name + ) res = self.engine.execute_query(query) total_vars = int(res[0][0]) classes = int(res[0][1]) @@ -365,14 +419,21 @@ def get_domain_info(self): def get_inferred_values(self): tic = time.clock() # index into domain with inferred_val_idx + 1 since SQL arrays begin at index 1. - query = "SELECT t1._tid_, t1.attribute, domain[inferred_val_idx + 1] as rv_value " \ - "FROM " \ - "(SELECT _tid_, attribute, " \ - "_vid_, init_value, string_to_array(regexp_replace(domain, \'[{\"\"}]\', \'\', \'gi\'), \'|||\') as domain " \ - "FROM %s) as t1, %s as t2 " \ - "WHERE t1._vid_ = t2._vid_"%(AuxTables.cell_domain.name, AuxTables.inf_values_idx.name) - self.generate_aux_table_sql(AuxTables.inf_values_dom, query, index_attrs=['_tid_']) - self.aux_table[AuxTables.inf_values_dom].create_db_index(self.engine, ['attribute']) + query = ( + "SELECT t1._tid_, t1.attribute, domain[inferred_val_idx + 1] as rv_value " + "FROM " + "(SELECT _tid_, attribute, " + "_vid_, init_value, string_to_array(regexp_replace(domain, '[{\"\"}]', '', 'gi'), '|||') as domain " + "FROM %s) as t1, %s as t2 " + "WHERE t1._vid_ = t2._vid_" + % (AuxTables.cell_domain.name, AuxTables.inf_values_idx.name) + ) + self.generate_aux_table_sql( + AuxTables.inf_values_dom, query, index_attrs=["_tid_"] + ) + self.aux_table[AuxTables.inf_values_dom].create_db_index( + self.engine, ["attribute"] + ) status = "DONE collecting the inferred values." toc = time.clock() total_time = toc - tic @@ -380,14 +441,14 @@ def get_inferred_values(self): def get_repaired_dataset(self): tic = time.clock() - init_records = self.raw_data.df.sort_values(['_tid_']).to_records(index=False) + init_records = self.raw_data.df.sort_values(["_tid_"]).to_records(index=False) t = self.aux_table[AuxTables.inf_values_dom] repaired_vals = dictify_df(t.df.reset_index()) for tid in repaired_vals: for attr in repaired_vals[tid]: init_records[tid][attr] = repaired_vals[tid][attr] repaired_df = pd.DataFrame.from_records(init_records) - name = self.raw_data.name+'_repaired' + name = self.raw_data.name + "_repaired" self.repaired_data = Table(name, Source.DF, df=repaired_df) self.repaired_data.store_to_db(self.engine.engine) status = "DONE generating repaired dataset" @@ -407,6 +468,7 @@ def get_embedding_model(self): Retrieve the memoized embedding model. """ if self._embedding_model is None: - raise Exception("cannot retrieve embedding model: it was never trained and loaded!") + raise Exception( + "cannot retrieve embedding model: it was never trained and loaded!" + ) return self._embedding_model - diff --git a/dataset/table.py b/dataset/table.py index 4d9d9c786..14862ec81 100644 --- a/dataset/table.py +++ b/dataset/table.py @@ -6,17 +6,28 @@ class Source(Enum): FILE = 1 - DF = 2 - DB = 3 - SQL = 4 + DF = 2 + DB = 3 + SQL = 4 class Table: """ A wrapper class for Dataset Tables. """ - def __init__(self, name, src, na_values=None, exclude_attr_cols=['_tid_'], - fpath=None, df=None, schema_name=None, table_query=None, db_engine=None): + + def __init__( + self, + name, + src, + na_values=None, + exclude_attr_cols=["_tid_"], + fpath=None, + df=None, + schema_name=None, + table_query=None, + db_engine=None, + ): """ :param name: (str) name to assign to dataset. :param na_values: (str or list[str]) values to interpret as NULL. @@ -44,14 +55,21 @@ def __init__(self, name, src, na_values=None, exclude_attr_cols=['_tid_'], if src == Source.FILE: if fpath is None: - raise Exception("ERROR while loading table. File path for CSV file name expected. Please provide param.") + raise Exception( + "ERROR while loading table. File path for CSV file name expected. Please provide param." + ) # TODO(richardwu): use COPY FROM instead of loading this into memory - self.df = pd.read_csv(fpath, dtype=str, na_values=na_values, encoding='utf-8') + self.df = pd.read_csv( + fpath, dtype=str, na_values=na_values, encoding="utf-8" + ) # Normalize the dataframe: drop null columns, convert to lowercase strings, and strip whitespaces. for attr in self.df.columns.values: if self.df[attr].isnull().all(): - logging.warning("Dropping the following null column from the dataset: '%s'", attr) + logging.warning( + "Dropping the following null column from the dataset: '%s'", + attr, + ) self.df.drop(labels=[attr], axis=1, inplace=True) continue if attr in exclude_attr_cols: @@ -60,21 +78,33 @@ def __init__(self, name, src, na_values=None, exclude_attr_cols=['_tid_'], self.df[attr] = self.df[attr].str.strip().str.lower() elif src == Source.DF: if df is None: - raise Exception("ERROR while loading table. Dataframe expected. Please provide param.") + raise Exception( + "ERROR while loading table. Dataframe expected. Please provide param." + ) self.df = df elif src == Source.DB: if db_engine is None: - raise Exception("ERROR while loading table. DB connection expected. Please provide .") + raise Exception( + "ERROR while loading table. DB connection expected. Please provide ." + ) self.df = pd.read_sql_table(name, db_engine.conn, schema=schema_name) elif src == Source.SQL: if table_query is None or db_engine is None: - raise Exception("ERROR while loading table. SQL Query and DB connection expected. Please provide and .") + raise Exception( + "ERROR while loading table. SQL Query and DB connection expected. Please provide and ." + ) db_engine.create_db_table_from_query(self.name, table_query) self.df = pd.read_sql_table(name, db_engine.conn) - def store_to_db(self, db_conn, if_exists='replace', index=False, index_label=None): + def store_to_db(self, db_conn, if_exists="replace", index=False, index_label=None): # TODO: This version supports single session, single worker. - self.df.to_sql(self.name, db_conn, if_exists=if_exists, index=index, index_label=index_label) + self.df.to_sql( + self.name, + db_conn, + if_exists=if_exists, + index=index, + index_label=index_label, + ) def get_attributes(self): """ @@ -82,14 +112,17 @@ def get_attributes(self): (i.e. exclude meta-columns like _tid_). """ if self.df.empty: - raise Exception("Empty Dataframe associated with table {name}. Cannot return attributes.".format( - name=self.name)) + raise Exception( + "Empty Dataframe associated with table {name}. Cannot return attributes.".format( + name=self.name + ) + ) return list(col for col in self.df.columns if col not in self.exclude_attr_cols) def create_df_index(self, attr_list): self.df.set_index(attr_list, inplace=True) def create_db_index(self, db_engine, attr_list): - index_name = '{name}_{idx}'.format(name=self.name, idx=self.index_count) + index_name = "{name}_{idx}".format(name=self.name, idx=self.index_count) db_engine.create_db_index(index_name, self.name, attr_list) self.index_count += 1 From ae2ed8d20c9a7cd535cd6d8587d0c669013b628d Mon Sep 17 00:00:00 2001 From: Jason Chen Date: Sat, 3 Oct 2020 17:23:21 +0000 Subject: [PATCH 11/16] Use raw value to keep track of the value before normalize and revert to these values after repair --- dataset/dataset.py | 2 +- dataset/table.py | 42 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/dataset/dataset.py b/dataset/dataset.py index d9beedbfb..8c2145334 100644 --- a/dataset/dataset.py +++ b/dataset/dataset.py @@ -450,7 +450,7 @@ def get_repaired_dataset(self): repaired_df = pd.DataFrame.from_records(init_records) name = self.raw_data.name + "_repaired" self.repaired_data = Table(name, Source.DF, df=repaired_df) - self.repaired_data.store_to_db(self.engine.engine) + self.repaired_data.store_to_db(self.engine.engine, self.raw_data.df_raw) status = "DONE generating repaired dataset" toc = time.clock() total_time = toc - tic diff --git a/dataset/table.py b/dataset/table.py index 14862ec81..6f8cab893 100644 --- a/dataset/table.py +++ b/dataset/table.py @@ -1,6 +1,7 @@ from enum import Enum import logging +from tqdm import tqdm import pandas as pd @@ -52,6 +53,7 @@ def __init__( # Copy the list to memoize self.exclude_attr_cols = list(exclude_attr_cols) self.df = pd.DataFrame() + self.df_raw = pd.DataFrame() # data before normalized - not lower all string if src == Source.FILE: if fpath is None: @@ -62,6 +64,9 @@ def __init__( self.df = pd.read_csv( fpath, dtype=str, na_values=na_values, encoding="utf-8" ) + self.df_raw = pd.read_csv( + fpath, dtype=str, na_values=na_values, encoding="utf-8" + ) # Normalize the dataframe: drop null columns, convert to lowercase strings, and strip whitespaces. for attr in self.df.columns.values: @@ -96,9 +101,42 @@ def __init__( db_engine.create_db_table_from_query(self.name, table_query) self.df = pd.read_sql_table(name, db_engine.conn) - def store_to_db(self, db_conn, if_exists="replace", index=False, index_label=None): + def _revert_normalized_value(self, df_raw: pd.DataFrame) -> pd.DataFrame: + if df_raw.empty: + return self.df + + self._df_reverted = pd.DataFrame() + + logging.info("Reverting normalized values") + r_size, _ = self.df.shape + + for attr in tqdm(self.df.columns.values): + if attr in self.exclude_attr_cols: + self._df_reverted[attr] = self.df[attr] + else: + for indx in range(r_size): + raw_value = df_raw[attr].iloc[indx] + repair_value = self.df[attr].iloc[indx] + if ( + str(raw_value).strip().lower() + == str(repair_value).strip().lower() + ): + self._df_reverted.at[indx, attr] = raw_value + else: + self._df_reverted.at[indx, attr] = repair_value + + return self._df_reverted + + def store_to_db( + self, + db_conn, + df_raw: pd.DataFrame = pd.DataFrame(), + if_exists="replace", + index=False, + index_label=None, + ): # TODO: This version supports single session, single worker. - self.df.to_sql( + self._revert_normalized_value(df_raw).to_sql( self.name, db_conn, if_exists=if_exists, From 5f9d1069ec36c0d8c8ddb19996273502a44995b3 Mon Sep 17 00:00:00 2001 From: Trung Huynh Date: Fri, 9 Oct 2020 13:51:20 +0100 Subject: [PATCH 12/16] Replaced database auth with sqlachlemy_uri instead of username and passowrd --- dataset/dataset.py | 5 +---- dataset/dbengine.py | 29 ++++++++++++---------------- examples/holoclean_repair_example.py | 5 +---- holoclean.py | 24 ----------------------- 4 files changed, 14 insertions(+), 49 deletions(-) diff --git a/dataset/dataset.py b/dataset/dataset.py index b293a2f96..c32d731c4 100644 --- a/dataset/dataset.py +++ b/dataset/dataset.py @@ -41,10 +41,7 @@ def __init__(self, name, env): self.aux_table[tab] = None # start dbengine self.engine = DBengine( - env['db_user'], - env['db_pwd'], - env['db_name'], - env['db_host'], + env["sqlalchemy_uri"], pool_size=env['threads'], timeout=env['timeout'] ) diff --git a/dataset/dbengine.py b/dataset/dbengine.py index 635dc723f..e7c4c093b 100644 --- a/dataset/dbengine.py +++ b/dataset/dbengine.py @@ -17,16 +17,11 @@ class DBengine: A wrapper class for postgresql engine. Maintains connections and executes queries. """ - def __init__(self, user, pwd, db, host='localhost', port=5432, pool_size=20, timeout=60000): + def __init__(self, sqlalchemy_uri, pool_size=20, timeout=60000): self.timeout = timeout self._pool = Pool(pool_size) if pool_size > 1 else None - url = 'postgresql+psycopg2://{}:{}@{}:{}/{}?client_encoding=utf8' - url = url.format(user, pwd, host, port, db) - self.conn = url - con = 'dbname={} user={} password={} host={} port={}' - con = con.format(db, user, pwd, host, port) - self.conn_args = con - self.engine = sql.create_engine(url, client_encoding='utf8', pool_size=pool_size) + self.conn = sqlalchemy_uri + self.engine = sql.create_engine(self.conn, client_encoding='utf8', pool_size=pool_size) def execute_queries(self, queries): """ @@ -36,7 +31,7 @@ def execute_queries(self, queries): """ logging.debug('Preparing to execute %d queries.', len(queries)) tic = time.clock() - results = self._apply_func(partial(_execute_query, conn_args=self.conn_args), [(idx, q) for idx, q in enumerate(queries)]) + results = self._apply_func(partial(_execute_query, conn_args=self.conn), [(idx, q) for idx, q in enumerate(queries)]) toc = time.clock() logging.debug('Time to execute %d queries: %.2f secs', len(queries), toc-tic) return results @@ -50,7 +45,7 @@ def execute_queries_w_backup(self, queries): logging.debug('Preparing to execute %d queries.', len(queries)) tic = time.clock() results = self._apply_func( - partial(_execute_query_w_backup, conn_args=self.conn_args, timeout=self.timeout), + partial(_execute_query_w_backup, conn_args=self.conn, timeout=self.timeout), [(idx, q) for idx, q in enumerate(queries)]) toc = time.clock() logging.debug('Time to execute %d queries: %.2f secs', len(queries), toc-tic) @@ -114,7 +109,8 @@ def _execute_query(args, conn_args): query = args[1] logging.debug("Starting to execute query %s with id %s", query, query_id) tic = time.clock() - con = psycopg2.connect(conn_args) + engine = sql.create_engine(conn_args) + con = engine.raw_connection() cur = con.cursor() cur.execute(query) res = cur.fetchall() @@ -130,7 +126,8 @@ def _execute_query_w_backup(args, conn_args, timeout): query_backup = args[1][1] logging.debug("Starting to execute query %s with id %s", query, query_id) tic = time.clock() - con = psycopg2.connect(conn_args) + engine = sql.create_engine(conn_args) + con = engine.raw_connection() cur = con.cursor() cur.execute("SET statement_timeout to %d;"%timeout) try: @@ -145,12 +142,10 @@ def _execute_query_w_backup(args, conn_args, timeout): return [] logging.debug("Starting to execute backup query %s with id %s", query_backup, query_id) - con.close() - con = psycopg2.connect(conn_args) cur = con.cursor() cur.execute(query_backup) res = cur.fetchall() - con.close() - toc = time.clock() - logging.debug('Time to execute query with id %d: %.2f secs', query_id, toc - tic) + toc = time.clock() + logging.debug('Time to execute query with id %d: %.2f secs', query_id, toc - tic) + con.close() return res diff --git a/examples/holoclean_repair_example.py b/examples/holoclean_repair_example.py index 13e9d3e6e..7a02e4325 100644 --- a/examples/holoclean_repair_example.py +++ b/examples/holoclean_repair_example.py @@ -4,10 +4,7 @@ # 1. Setup a HoloClean session. hc = holoclean.HoloClean( - db_name='superset', - db_pwd='superset', - db_host='localhost', - db_user='superset', + sqlalchemy_uri="postgresql://superset:superset@localhost:5432/superset", domain_thresh_1=0.0, domain_thresh_2=0.0, weak_label_thresh=0.99, diff --git a/holoclean.py b/holoclean.py index df83649fd..ac70d8719 100644 --- a/holoclean.py +++ b/holoclean.py @@ -26,30 +26,6 @@ # Arguments for HoloClean arguments = [ - (('-u', '--db_user'), - {'metavar': 'DB_USER', - 'dest': 'db_user', - 'default': 'holocleanuser', - 'type': str, - 'help': 'User for DB used to persist state.'}), - (('-p', '--db-pwd', '--pass'), - {'metavar': 'DB_PWD', - 'dest': 'db_pwd', - 'default': 'abcd1234', - 'type': str, - 'help': 'Password for DB used to persist state.'}), - (('-h', '--db-host'), - {'metavar': 'DB_HOST', - 'dest': 'db_host', - 'default': 'localhost', - 'type': str, - 'help': 'Host for DB used to persist state.'}), - (('-d', '--db_name'), - {'metavar': 'DB_NAME', - 'dest': 'db_name', - 'default': 'holo', - 'type': str, - 'help': 'Name of DB used to persist state.'}), (('-t', '--threads'), {'metavar': 'THREADS', 'dest': 'threads', From 283e40e5e127dda6af3b3cd3fc338109cea2d41d Mon Sep 17 00:00:00 2001 From: Trung Huynh Date: Sat, 10 Oct 2020 17:08:58 +0100 Subject: [PATCH 13/16] Fix accidental copy --- dataset/dataset.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/dataset/dataset.py b/dataset/dataset.py index aa06f08d0..5eca167f0 100644 --- a/dataset/dataset.py +++ b/dataset/dataset.py @@ -46,8 +46,6 @@ def __init__(self, name, env): env["sqlalchemy_uri"], pool_size=env['threads'], timeout=env['timeout'] - pool_size=env["threads"], - timeout=env["timeout"], ) # members to convert (tuple_id, attribute) to cell_id self.attr_to_idx = {} From 5694bc4c33425a869df438183fc1501982537378 Mon Sep 17 00:00:00 2001 From: Trung Huynh Date: Wed, 14 Oct 2020 15:41:25 +0100 Subject: [PATCH 14/16] Return False when detect_errors doesn't find any problematic cells --- detect/detect.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/detect/detect.py b/detect/detect.py index 7ee6ed386..7fc1c9bf7 100644 --- a/detect/detect.py +++ b/detect/detect.py @@ -31,6 +31,9 @@ def detect_errors(self, detectors): logging.debug("DONE with Error Detector: %s in %.2f secs", detector.name, toc-tic) errors.append(error_df) + if len(errors) == 0: + return False + # Get unique errors only that might have been detected from multiple detectors. self.errors_df = pd.concat(errors, ignore_index=True).drop_duplicates().reset_index(drop=True) if self.errors_df.shape[0]: @@ -42,7 +45,7 @@ def detect_errors(self, detectors): status = "DONE with error detection." toc_total = time.clock() detect_time = toc_total - tic_total - return status, detect_time + return True def store_detected_errors(self, errors_df): if errors_df.empty: From 3fd63ec1204443c2e74ff98d265c61b30cf55d32 Mon Sep 17 00:00:00 2001 From: Trung Huynh Date: Wed, 14 Oct 2020 21:31:06 +0100 Subject: [PATCH 15/16] Fixed detect_errors when no error is found --- detect/detect.py | 2 +- holoclean.py | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/detect/detect.py b/detect/detect.py index 7fc1c9bf7..3fa303f5f 100644 --- a/detect/detect.py +++ b/detect/detect.py @@ -31,7 +31,7 @@ def detect_errors(self, detectors): logging.debug("DONE with Error Detector: %s in %.2f secs", detector.name, toc-tic) errors.append(error_df) - if len(errors) == 0: + if len(errors) == 0 or all([df is None for df in errors]): return False # Get unique errors only that might have been detected from multiple detectors. diff --git a/holoclean.py b/holoclean.py index ac70d8719..07cd0815f 100644 --- a/holoclean.py +++ b/holoclean.py @@ -320,9 +320,7 @@ def get_dcs(self): return self.dc_parser.get_dcs() def detect_errors(self, detect_list): - status, detect_time = self.detect_engine.detect_errors(detect_list) - logging.info(status) - logging.debug('Time to detect errors: %.2f secs', detect_time) + return self.detect_engine.detect_errors(detect_list) def disable_quantize(self): self.do_quantization = False From d1e19121bc1dfc383fa661c8cd5a480d9ed5543c Mon Sep 17 00:00:00 2001 From: Jason Chen Date: Sat, 14 Nov 2020 19:28:27 +0000 Subject: [PATCH 16/16] Fix the error caused by train_attrs missing --- dataset/dataset.py | 4 ++++ domain/estimators/tuple_embedding.py | 2 +- holoclean.py | 1 + 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/dataset/dataset.py b/dataset/dataset.py index 5eca167f0..ee8f744ce 100644 --- a/dataset/dataset.py +++ b/dataset/dataset.py @@ -74,6 +74,10 @@ def __init__(self, name, env): self.quantized_data = None self.do_quantization = False + @property + def columns(self): + return [c for c in self.raw_data.df.columns if not c.startswith("_") and not c.endswith("_")] + # TODO(richardwu): load more than just CSV files def load_data( self, diff --git a/domain/estimators/tuple_embedding.py b/domain/estimators/tuple_embedding.py index 61951fd19..313756514 100644 --- a/domain/estimators/tuple_embedding.py +++ b/domain/estimators/tuple_embedding.py @@ -932,12 +932,12 @@ def load_domain_df(self, domain_df, load_into_ds=True): else: self.domain_df['is_clean'] = True self.domain_df.loc[self.domain_df['weak_label'] == NULL_REPR, 'is_clean'] = False + self.domain_df = self.domain_df[self.domain_df['attribute'].isin(self.env['train_attrs'])] self.domain_recs = self.domain_df.to_records() if load_into_ds: self._dataset.load_domain_df(domain_df) - import pdb; pdb.set_trace() def _get_combined_init_vec(self, init_cat_idxs, init_numvals, init_nummasks, attr_idxs): diff --git a/holoclean.py b/holoclean.py index 07cd0815f..261100d8b 100644 --- a/holoclean.py +++ b/holoclean.py @@ -303,6 +303,7 @@ def load_data(self, name, fpath, na_values=None, entity_col=None, src_col=None, src_col=src_col, exclude_attr_cols=exclude_attr_cols, numerical_attrs=numerical_attrs) + self.env["train_attrs"] = self.env["train_attrs"] or self.ds.columns logging.info(status) logging.debug('Time to load dataset: %.2f secs', load_time)