Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
6db22a6
Adding extra example for loading errors from database (#82)
oattia Feb 24, 2019
57a4187
fix bugs on ConstraintFeaturizer
zaqthss Apr 4, 2019
5a7b10b
Fix bug where NULL cells with one candidate value was skipped.
richardwu Apr 23, 2019
0fca412
Merge pull request #47 from HoloClean/dev
Ihabilyas Apr 27, 2019
3c70290
Merge pull request #85 from HoloClean/fix-null-random-domain
richardwu Apr 27, 2019
d4f5929
Merge pull request #83 from zaqthss/dev
richardwu Apr 27, 2019
53a4b3a
Packaging holoclean as a lib
trunghlt Jul 11, 2020
4b647ef
Merge branch 'master' into latest-aimnet
trunghlt Jul 11, 2020
c2a1cf3
Renamed setupy.py to setup.py
trunghlt Jul 11, 2020
044c474
Merge branch 'latest-aimnet' of https://github.com/trunghlt/holoclean…
trunghlt Jul 11, 2020
3553ca7
Fixed relative imports for holoclean.py
trunghlt Jul 13, 2020
ef58c8d
Fixed setup.py and package name for it to work properly
trunghlt Jul 13, 2020
66815de
Fixed empty constraint for 1 row and an operation
trunghlt Aug 3, 2020
a0d2ac6
Ignore vscode setting
Oct 2, 2020
36aa7b5
Use Blake to reformat
tinyHui Oct 3, 2020
ae2ed8d
Use raw value to keep track of the value before normalize and revert …
tinyHui Oct 3, 2020
b620ea8
Merge branch 'latest-aimnet' of github.com:trunghlt/holoclean into la…
tinyHui Oct 5, 2020
5f9d106
Replaced database auth with sqlachlemy_uri instead of username and pa…
trunghlt Oct 9, 2020
334d497
Fixed trivial conflict
trunghlt Oct 9, 2020
283e40e
Fix accidental copy
trunghlt Oct 10, 2020
5694bc4
Return False when detect_errors doesn't find any problematic cells
trunghlt Oct 14, 2020
3fd63ec
Fixed detect_errors when no error is found
trunghlt Oct 14, 2020
d1e1912
Fix the error caused by train_attrs missing
tinyHui Nov 14, 2020
2dac2dc
Merge pull request #11 from trunghlt/fix/tuple_embedding
trunghlt Nov 16, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ _templates
*.pyc
.cache/

.vscode

# pip package metadata
*egg-info/

Expand Down
199 changes: 131 additions & 68 deletions dataset/dataset.py

Large diffs are not rendered by default.

29 changes: 12 additions & 17 deletions dataset/dbengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,11 @@ class DBengine:
A wrapper class for postgresql engine.
Maintains connections and executes queries.
"""
def __init__(self, user, pwd, db, host='localhost', port=5432, pool_size=20, timeout=60000):
def __init__(self, sqlalchemy_uri, pool_size=20, timeout=60000):
self.timeout = timeout
self._pool = Pool(pool_size) if pool_size > 1 else None
url = 'postgresql+psycopg2://{}:{}@{}:{}/{}?client_encoding=utf8'
url = url.format(user, pwd, host, port, db)
self.conn = url
con = 'dbname={} user={} password={} host={} port={}'
con = con.format(db, user, pwd, host, port)
self.conn_args = con
self.engine = sql.create_engine(url, client_encoding='utf8', pool_size=pool_size)
self.conn = sqlalchemy_uri
self.engine = sql.create_engine(self.conn, client_encoding='utf8', pool_size=pool_size)

def execute_queries(self, queries):
"""
Expand All @@ -36,7 +31,7 @@ def execute_queries(self, queries):
"""
logging.debug('Preparing to execute %d queries.', len(queries))
tic = time.clock()
results = self._apply_func(partial(_execute_query, conn_args=self.conn_args), [(idx, q) for idx, q in enumerate(queries)])
results = self._apply_func(partial(_execute_query, conn_args=self.conn), [(idx, q) for idx, q in enumerate(queries)])
toc = time.clock()
logging.debug('Time to execute %d queries: %.2f secs', len(queries), toc-tic)
return results
Expand All @@ -50,7 +45,7 @@ def execute_queries_w_backup(self, queries):
logging.debug('Preparing to execute %d queries.', len(queries))
tic = time.clock()
results = self._apply_func(
partial(_execute_query_w_backup, conn_args=self.conn_args, timeout=self.timeout),
partial(_execute_query_w_backup, conn_args=self.conn, timeout=self.timeout),
[(idx, q) for idx, q in enumerate(queries)])
toc = time.clock()
logging.debug('Time to execute %d queries: %.2f secs', len(queries), toc-tic)
Expand Down Expand Up @@ -114,7 +109,8 @@ def _execute_query(args, conn_args):
query = args[1]
logging.debug("Starting to execute query %s with id %s", query, query_id)
tic = time.clock()
con = psycopg2.connect(conn_args)
engine = sql.create_engine(conn_args)
con = engine.raw_connection()
cur = con.cursor()
cur.execute(query)
res = cur.fetchall()
Expand All @@ -130,7 +126,8 @@ def _execute_query_w_backup(args, conn_args, timeout):
query_backup = args[1][1]
logging.debug("Starting to execute query %s with id %s", query, query_id)
tic = time.clock()
con = psycopg2.connect(conn_args)
engine = sql.create_engine(conn_args)
con = engine.raw_connection()
cur = con.cursor()
cur.execute("SET statement_timeout to %d;"%timeout)
try:
Expand All @@ -145,12 +142,10 @@ def _execute_query_w_backup(args, conn_args, timeout):
return []

logging.debug("Starting to execute backup query %s with id %s", query_backup, query_id)
con.close()
con = psycopg2.connect(conn_args)
cur = con.cursor()
cur.execute(query_backup)
res = cur.fetchall()
con.close()
toc = time.clock()
logging.debug('Time to execute query with id %d: %.2f secs', query_id, toc - tic)
toc = time.clock()
logging.debug('Time to execute query with id %d: %.2f secs', query_id, toc - tic)
con.close()
return res
2 changes: 1 addition & 1 deletion dataset/quantization.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import numpy as np
from sklearn.cluster import KMeans
from utils import NULL_REPR
from ..utils import NULL_REPR


def quantize_km(env, df_raw, num_attr_groups_bins):
Expand Down
103 changes: 87 additions & 16 deletions dataset/table.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,34 @@
from enum import Enum
import logging

from tqdm import tqdm
import pandas as pd


class Source(Enum):
FILE = 1
DF = 2
DB = 3
SQL = 4
DF = 2
DB = 3
SQL = 4


class Table:
"""
A wrapper class for Dataset Tables.
"""
def __init__(self, name, src, na_values=None, exclude_attr_cols=['_tid_'],
fpath=None, df=None, schema_name=None, table_query=None, db_engine=None):

def __init__(
self,
name,
src,
na_values=None,
exclude_attr_cols=["_tid_"],
fpath=None,
df=None,
schema_name=None,
table_query=None,
db_engine=None,
):
"""
:param name: (str) name to assign to dataset.
:param na_values: (str or list[str]) values to interpret as NULL.
Expand All @@ -41,17 +53,28 @@ def __init__(self, name, src, na_values=None, exclude_attr_cols=['_tid_'],
# Copy the list to memoize
self.exclude_attr_cols = list(exclude_attr_cols)
self.df = pd.DataFrame()
self.df_raw = pd.DataFrame() # data before normalized - not lower all string

if src == Source.FILE:
if fpath is None:
raise Exception("ERROR while loading table. File path for CSV file name expected. Please provide <fpath> param.")
raise Exception(
"ERROR while loading table. File path for CSV file name expected. Please provide <fpath> param."
)
# TODO(richardwu): use COPY FROM instead of loading this into memory
self.df = pd.read_csv(fpath, dtype=str, na_values=na_values, encoding='utf-8')
self.df = pd.read_csv(
fpath, dtype=str, na_values=na_values, encoding="utf-8"
)
self.df_raw = pd.read_csv(
fpath, dtype=str, na_values=na_values, encoding="utf-8"
)

# Normalize the dataframe: drop null columns, convert to lowercase strings, and strip whitespaces.
for attr in self.df.columns.values:
if self.df[attr].isnull().all():
logging.warning("Dropping the following null column from the dataset: '%s'", attr)
logging.warning(
"Dropping the following null column from the dataset: '%s'",
attr,
)
self.df.drop(labels=[attr], axis=1, inplace=True)
continue
if attr in exclude_attr_cols:
Expand All @@ -60,36 +83,84 @@ def __init__(self, name, src, na_values=None, exclude_attr_cols=['_tid_'],
self.df[attr] = self.df[attr].str.strip().str.lower()
elif src == Source.DF:
if df is None:
raise Exception("ERROR while loading table. Dataframe expected. Please provide <df> param.")
raise Exception(
"ERROR while loading table. Dataframe expected. Please provide <df> param."
)
self.df = df
elif src == Source.DB:
if db_engine is None:
raise Exception("ERROR while loading table. DB connection expected. Please provide <db_engine>.")
raise Exception(
"ERROR while loading table. DB connection expected. Please provide <db_engine>."
)
self.df = pd.read_sql_table(name, db_engine.conn, schema=schema_name)
elif src == Source.SQL:
if table_query is None or db_engine is None:
raise Exception("ERROR while loading table. SQL Query and DB connection expected. Please provide <table_query> and <db_engine>.")
raise Exception(
"ERROR while loading table. SQL Query and DB connection expected. Please provide <table_query> and <db_engine>."
)
db_engine.create_db_table_from_query(self.name, table_query)
self.df = pd.read_sql_table(name, db_engine.conn)

def store_to_db(self, db_conn, if_exists='replace', index=False, index_label=None):
def _revert_normalized_value(self, df_raw: pd.DataFrame) -> pd.DataFrame:
if df_raw.empty:
return self.df

self._df_reverted = pd.DataFrame()

logging.info("Reverting normalized values")
r_size, _ = self.df.shape

for attr in tqdm(self.df.columns.values):
if attr in self.exclude_attr_cols:
self._df_reverted[attr] = self.df[attr]
else:
for indx in range(r_size):
raw_value = df_raw[attr].iloc[indx]
repair_value = self.df[attr].iloc[indx]
if (
str(raw_value).strip().lower()
== str(repair_value).strip().lower()
):
self._df_reverted.at[indx, attr] = raw_value
else:
self._df_reverted.at[indx, attr] = repair_value

return self._df_reverted

def store_to_db(
self,
db_conn,
df_raw: pd.DataFrame = pd.DataFrame(),
if_exists="replace",
index=False,
index_label=None,
):
# TODO: This version supports single session, single worker.
self.df.to_sql(self.name, db_conn, if_exists=if_exists, index=index, index_label=index_label)
self._revert_normalized_value(df_raw).to_sql(
self.name,
db_conn,
if_exists=if_exists,
index=index,
index_label=index_label,
)

def get_attributes(self):
"""
get_attributes returns the columns that are trainable/learnable attributes
(i.e. exclude meta-columns like _tid_).
"""
if self.df.empty:
raise Exception("Empty Dataframe associated with table {name}. Cannot return attributes.".format(
name=self.name))
raise Exception(
"Empty Dataframe associated with table {name}. Cannot return attributes.".format(
name=self.name
)
)
return list(col for col in self.df.columns if col not in self.exclude_attr_cols)

def create_df_index(self, attr_list):
self.df.set_index(attr_list, inplace=True)

def create_db_index(self, db_engine, attr_list):
index_name = '{name}_{idx}'.format(name=self.name, idx=self.index_count)
index_name = "{name}_{idx}".format(name=self.name, idx=self.index_count)
db_engine.create_db_index(index_name, self.name, attr_list)
self.index_count += 1
15 changes: 14 additions & 1 deletion dcparser/constraint.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,19 @@ def is_symmetric(operation):
return False


def get_flip_operation(operation):
if operation == '<=':
return '>='
elif operation == '>=':
return '<='
elif operation == '<':
return '>'
elif operation == '>':
return '<'
else:
return operation


def contains_operation(string):
"""
Method to check if a given string contains one of the operation signs.
Expand Down Expand Up @@ -46,7 +59,7 @@ def __init__(self, dc_string, schema):
# Find all tuple names used in DC
logging.debug('DONE pre-processing constraint: %s', dc_string)
for component in split:
if contains_operation(component):
if contains_operation(component) is not None:
break
else:
self.tuple_names.append(component)
Expand Down
7 changes: 5 additions & 2 deletions detect/detect.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pandas as pd

from dataset import AuxTables
from ..dataset import AuxTables


class DetectEngine:
Expand Down Expand Up @@ -31,6 +31,9 @@ def detect_errors(self, detectors):
logging.debug("DONE with Error Detector: %s in %.2f secs", detector.name, toc-tic)
errors.append(error_df)

if len(errors) == 0 or all([df is None for df in errors]):
return False

# Get unique errors only that might have been detected from multiple detectors.
self.errors_df = pd.concat(errors, ignore_index=True).drop_duplicates().reset_index(drop=True)
if self.errors_df.shape[0]:
Expand All @@ -42,7 +45,7 @@ def detect_errors(self, detectors):
status = "DONE with error detection."
toc_total = time.clock()
detect_time = toc_total - tic_total
return status, detect_time
return True

def store_detected_errors(self, errors_df):
if errors_df.empty:
Expand Down
14 changes: 7 additions & 7 deletions detect/errorloaderdetector.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pandas as pd

from dataset.table import Table, Source
from ..dataset.table import Table, Source
from .detector import Detector


Expand All @@ -10,12 +10,12 @@ class ErrorsLoaderDetector(Detector):
id_col: entity ID
attr_col: attribute in violation
in the format id_col, attr_col
Can load these erros from a csv file, a relational table, or a pandas
Can load these erros from a csv file, a relational table, or a pandas
dataframe with the same format.
"""
def __init__(self, fpath=None, df=None,
db_engine=None, table_name=None, schema_name=None,
id_col="_tid_", attr_col="attribute",
id_col="_tid_", attr_col="attribute",
name="ErrorLoaderDetector"):
"""
:param fpath: (str) Path to source csv file to load errors
Expand All @@ -28,7 +28,7 @@ def __init__(self, fpath=None, df=None,
:param name: (str) name of the detector

To load from csv file, :param fpath: must be specified.
To load from a relational table, :param db_engine:, and
To load from a relational table, :param db_engine:, and
:param table_name: must be specified, optionally specifying :param schema_name:.
"""
super(ErrorsLoaderDetector, self).__init__(name)
Expand All @@ -46,15 +46,15 @@ def __init__(self, fpath=None, df=None,
else:
raise Exception("ERROR while intializing ErrorsLoaderDetector. Please provide (<fpath>), (<db_engine> and <table_name>), OR <df>")

self.errors_table = Table(dataset_name, src,
self.errors_table = Table(dataset_name, src,
exclude_attr_cols=[attr_col],
fpath=fpath, df=df,
schema_name=schema_name, db_engine=db_engine)

expected_schema = [id_col, attr_col]
if list(self.errors_table.df.columns) != expected_schema:
raise Exception("ERROR while intializing ErrorsLoaderDetector: The loaded errors table does not match the expected schema of {}".format(expected_schema))

self.errors_table.df = self.errors_table.df.astype({
id_col: int,
attr_col: str
Expand Down
2 changes: 1 addition & 1 deletion detect/nulldetector.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pandas as pd

from .detector import Detector
from utils import NULL_REPR
from ..utils import NULL_REPR


class NullDetector(Detector):
Expand Down
2 changes: 1 addition & 1 deletion domain/correlations.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from pyitlib import discrete_random_variable as drv

from utils import NULL_REPR
from ..utils import NULL_REPR

def compute_norm_cond_entropy_corr(data_df, attrs_from, attrs_to):
"""
Expand Down
4 changes: 2 additions & 2 deletions domain/domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
import numpy as np
from tqdm import tqdm

from dataset import AuxTables, CellStatus
from ..dataset import AuxTables, CellStatus
from .estimators import *
from .correlations import compute_norm_cond_entropy_corr
from utils import NULL_REPR
from ..utils import NULL_REPR


class DomainEngine:
Expand Down
Loading