Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions hlink/linking/hh_matching/link_step_block_on_households.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pyspark.sql.functions import col

from hlink.linking.link_step import LinkStep
from hlink.linking.util import set_job_description


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -112,11 +113,13 @@ def _run(self):

logger.debug("Blocking on household serial ID and generating potential matches")
# Generate potential matches with those unmatched people who were in a household with a match, blocking only on household id
self.task.run_register_python(
"hh_blocked_matches",
lambda: stm.join(uma, hhid_a).join(umb, hhid_b).distinct(),
persist=True,
)
spark_context = self.task.spark.sparkContext
with set_job_description("create table hh_blocked_matches", spark_context):
self.task.run_register_python(
"hh_blocked_matches",
lambda: stm.join(uma, hhid_a).join(umb, hhid_b).distinct(),
persist=True,
)

hh_blocked_matches = self.task.spark.table("hh_blocked_matches")
logger.debug(f"hh_blocked_matches has {hh_blocked_matches.count()} records")
Expand Down
26 changes: 17 additions & 9 deletions hlink/linking/hh_matching/link_step_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import hlink.linking.core.comparison as comparison_core

from hlink.linking.link_step import LinkStep
from hlink.linking.util import set_job_description


class LinkStepFilter(LinkStep):
Expand All @@ -21,6 +22,7 @@ def __init__(self, task):
def _run(self):
# self.task.spark.sql("set spark.sql.shuffle.partitions=4000")
config = self.task.link_run.config
spark_context = self.task.spark.sparkContext

# establish empty table context dict to pass to SQL template
t_ctx = {}
Expand All @@ -44,19 +46,25 @@ def _run(self):
if f["alias"] in comp_feature_names
]

self.task.run_register_sql(
"hh_potential_matches", t_ctx=t_ctx, persist=True
)
with set_job_description(
"create table hh_potential_matches", spark_context
):
self.task.run_register_sql(
"hh_potential_matches", t_ctx=t_ctx, persist=True
)

else:
self.task.run_register_python(
"hh_potential_matches",
lambda: self.task.spark.table("hh_blocked_matches"),
persist=True,
)
with set_job_description(
"create table hh_potential_matches", spark_context
):
self.task.run_register_python(
"hh_potential_matches",
lambda: self.task.spark.table("hh_blocked_matches"),
persist=True,
)

self.task.spark.sql("set spark.sql.shuffle.partitions=200")

print(
"Potential matches from households which meet hh_comparsions thresholds have been saved to table 'hh_potential_matches'."
"Potential matches from households which meet hh_comparisons thresholds have been saved to table 'hh_potential_matches'."
)
55 changes: 33 additions & 22 deletions hlink/linking/matching/link_step_explode.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
# in this project's top-level directory, and also on-line at:
# https://github.com/ipums/hlink

import logging
from typing import Any

from pyspark.sql import Column, DataFrame
from pyspark.sql.functions import array, explode, col

import hlink.linking.core.comparison as comparison_core
from hlink.linking.link_step import LinkStep
from hlink.linking.util import set_job_description

logger = logging.getLogger(__name__)


class LinkStepExplode(LinkStep):
Expand All @@ -23,6 +27,7 @@ def __init__(self, task):

def _run(self):
config = self.task.link_run.config
spark_context = self.task.spark.sparkContext
# filter the universe of potential matches before exploding
t_ctx = {}
universe_conf = config.get("potential_matches_universe", [])
Expand All @@ -42,28 +47,33 @@ def _run(self):
# self.spark.sql("set spark.sql.shuffle.partitions=4000")
blocking = config["blocking"]

self.task.run_register_python(
name="exploded_df_a",
func=lambda: self._explode(
df=self.task.spark.table("match_universe_df_a"),
comparisons=config["comparisons"],
comparison_features=config["comparison_features"],
blocking=blocking,
id_column=config["id_column"],
is_a=True,
),
)
self.task.run_register_python(
name="exploded_df_b",
func=lambda: self._explode(
df=self.task.spark.table("match_universe_df_b"),
comparisons=config["comparisons"],
comparison_features=config["comparison_features"],
blocking=blocking,
id_column=config["id_column"],
is_a=False,
),
)
logger.debug("Creating table exploded_df_a")
with set_job_description("create table exploded_df_a", spark_context):
self.task.run_register_python(
name="exploded_df_a",
func=lambda: self._explode(
df=self.task.spark.table("match_universe_df_a"),
comparisons=config["comparisons"],
comparison_features=config["comparison_features"],
blocking=blocking,
id_column=config["id_column"],
is_a=True,
),
)

logger.debug("Creating table exploded_df_b")
with set_job_description("create table exploded_df_b", spark_context):
self.task.run_register_python(
name="exploded_df_b",
func=lambda: self._explode(
df=self.task.spark.table("match_universe_df_b"),
comparisons=config["comparisons"],
comparison_features=config["comparison_features"],
blocking=blocking,
id_column=config["id_column"],
is_a=False,
),
)

def _explode(
self,
Expand Down Expand Up @@ -118,6 +128,7 @@ def _explode(

all_exploding_columns = [bc for bc in blocking if bc.get("explode", False)]

logger.debug(f"Exploding {len(all_exploding_columns)} column(s)")
for exploding_column in all_exploding_columns:
exploding_column_name = exploding_column["column_name"]
if exploding_column.get("expand_length", False):
Expand Down
10 changes: 8 additions & 2 deletions hlink/linking/matching/link_step_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import hlink.linking.core.comparison_feature as comparison_feature_core
import hlink.linking.core.dist_table as dist_table_core
import hlink.linking.core.comparison as comparison_core
from hlink.linking.util import spark_shuffle_partitions_heuristic
from hlink.linking.util import set_job_description, spark_shuffle_partitions_heuristic

from hlink.linking.link_step import LinkStep

Expand Down Expand Up @@ -149,7 +149,13 @@ def _run(self):

if config.get("streamline_potential_match_generation", False):
t_ctx["dataset_columns"] = [config["id_column"]]

spark_context = self.task.spark.sparkContext
logger.debug("Creating table potential_matches via potential_matches.sql")
try:
self.task.run_register_sql("potential_matches", t_ctx=t_ctx, persist=True)
with set_job_description("create table potential_matches", spark_context):
self.task.run_register_sql(
"potential_matches", t_ctx=t_ctx, persist=True
)
finally:
self.task.spark.sql("set spark.sql.shuffle.partitions=200")
43 changes: 33 additions & 10 deletions hlink/linking/matching/link_step_score.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import hlink.linking.core.comparison_feature as comparison_feature_core
import hlink.linking.core.threshold as threshold_core
import hlink.linking.core.dist_table as dist_table_core
from hlink.linking.util import spark_shuffle_partitions_heuristic
from hlink.linking.util import set_job_description, spark_shuffle_partitions_heuristic

from hlink.linking.link_step import LinkStep

Expand All @@ -35,6 +35,7 @@ def _run(self):
training_conf = str(self.task.training_conf)
table_prefix = self.task.table_prefix
config = self.task.link_run.config
spark_context = self.task.spark.sparkContext

if training_conf not in config or "chosen_model" not in config[training_conf]:
print(
Expand All @@ -52,7 +53,8 @@ def _run(self):
id_a = config["id_column"] + "_a"
id_b = config["id_column"] + "_b"
chosen_model_params = config[training_conf]["chosen_model"].copy()
self._create_features(config)
with set_job_description("create comparison features", spark_context):
self._create_features(config)
pm = self.task.spark.table(f"{table_prefix}potential_matches_prepped")

ind_var_columns = config[training_conf]["independent_vars"]
Expand Down Expand Up @@ -80,11 +82,13 @@ def _run(self):
"Missing a temporary table from the training task. This table will not be persisted between sessions of hlink for technical reasons. Please run training before running this step."
)

self.task.run_register_python(
f"{table_prefix}potential_matches_pipeline",
lambda: pre_pipeline.transform(pm.select(*required_columns)),
persist=True,
)
logger.debug(f"Creating table {table_prefix}potential_matches_pipeline")
with set_job_description("prepare the data for the model", spark_context):
self.task.run_register_python(
f"{table_prefix}potential_matches_pipeline",
lambda: pre_pipeline.transform(pm.select(*required_columns)),
persist=True,
)
plm = self.task.link_run.trained_models[f"{table_prefix}trained_model"]
pp_required_cols = set(plm.stages[0].getInputCols() + [id_a, id_b])
pre_pipeline = self.task.spark.table(
Expand All @@ -97,17 +101,34 @@ def _run(self):
config[training_conf], chosen_model_params, default=1.3
)
decision = config[training_conf].get("decision")
logger.debug("Predicting with thresholds")
predictions = threshold_core.predict_using_thresholds(
score_tmp,
alpha_threshold,
threshold_ratio,
config["id_column"],
decision,
)
predictions.write.mode("overwrite").saveAsTable(f"{table_prefix}predictions")

with set_job_description(
f"create table {table_prefix}predictions", spark_context
):
predictions.write.mode("overwrite").saveAsTable(
f"{table_prefix}predictions"
)
pmp = self.task.spark.table(f"{table_prefix}potential_matches_pipeline")
self._save_table_with_requested_columns(pm, pmp, predictions, id_a, id_b)
self._save_predicted_matches(config, id_a, id_b)
logger.debug(f"Creating table {table_prefix}scored_potential_matches")
with set_job_description(
f"create table {table_prefix}scored_potential_matches", spark_context
):
self._save_table_with_requested_columns(pm, pmp, predictions, id_a, id_b)
logger.debug(
f"Creating table {table_prefix}predicted_matches and removing records with duplicated id_b"
)
with set_job_description(
f"create table {table_prefix}predicted_matches", spark_context
):
self._save_predicted_matches(config, id_a, id_b)
self.task.spark.sql("set spark.sql.shuffle.partitions=200")

def _save_table_with_requested_columns(self, pm, pmp, predictions, id_a, id_b):
Expand Down Expand Up @@ -174,6 +195,7 @@ def _create_features(self, conf):
potential_matches = f"{table_prefix}potential_matches"
table_name = f"{table_prefix}potential_matches_prepped"
pm_columns = self.task.spark.table(potential_matches).columns
logger.debug("Getting comparison features")
(
comp_features,
advanced_comp_features,
Expand All @@ -200,6 +222,7 @@ def _create_features(self, conf):
dist_tables
)

logger.debug("Creating all of the comparison features")
comparison_feature_core.create_feature_tables(
self.task,
t_ctx_def,
Expand Down
59 changes: 34 additions & 25 deletions hlink/linking/preprocessing/link_step_prep_dataframes.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import hlink.linking.core.column_mapping as column_mapping_core
import hlink.linking.core.substitutions as substitutions_core
import hlink.linking.core.transforms as transforms_core
from hlink.linking.util import spark_shuffle_partitions_heuristic
from hlink.linking.util import set_job_description, spark_shuffle_partitions_heuristic

from hlink.linking.link_step import LinkStep

Expand All @@ -27,6 +27,7 @@ def __init__(self, task):

def _run(self):
config = self.task.link_run.config
spark_context = self.task.spark.sparkContext

dataset_size_a = self.task.spark.table("raw_df_a").count()
dataset_size_b = self.task.spark.table("raw_df_b").count()
Expand All @@ -40,30 +41,35 @@ def _run(self):
substitution_columns = config.get("substitution_columns", [])
feature_selections = config.get("feature_selections", [])

self.task.run_register_python(
name="prepped_df_a",
func=lambda: self._prep_dataframe(
self.task.spark.table("raw_df_a"),
config["column_mappings"],
substitution_columns,
feature_selections,
True,
config["id_column"],
),
persist=True,
)
self.task.run_register_python(
name="prepped_df_b",
func=lambda: self._prep_dataframe(
self.task.spark.table("raw_df_b"),
config["column_mappings"],
substitution_columns,
feature_selections,
False,
config["id_column"],
),
persist=True,
)
logger.debug("Creating table prepped_df_a")
with set_job_description("create table prepped_df_a", spark_context):
self.task.run_register_python(
name="prepped_df_a",
func=lambda: self._prep_dataframe(
self.task.spark.table("raw_df_a"),
config["column_mappings"],
substitution_columns,
feature_selections,
True,
config["id_column"],
),
persist=True,
)

logger.debug("Creating table prepped_df_b")
with set_job_description("create table prepped_df_b", spark_context):
self.task.run_register_python(
name="prepped_df_b",
func=lambda: self._prep_dataframe(
self.task.spark.table("raw_df_b"),
config["column_mappings"],
substitution_columns,
feature_selections,
False,
config["id_column"],
),
persist=True,
)

self.task.spark.sql("set spark.sql.shuffle.partitions=200")

Expand Down Expand Up @@ -97,6 +103,7 @@ def _prep_dataframe(
column_selects = [col(id_column)]
custom_transforms = self.task.link_run.custom_column_mapping_transforms

logger.debug("Selecting column mappings")
for column_mapping in column_definitions:
df_selected, column_selects = column_mapping_core.select_column_mapping(
column_mapping,
Expand All @@ -108,10 +115,12 @@ def _prep_dataframe(

df_selected = df_selected.select(column_selects)

logger.debug("Generating substitutions")
df_selected = substitutions_core.generate_substitutions(
spark, df_selected, substitution_columns
)

logger.debug("Generating transforms")
df_selected = transforms_core.generate_transforms(
spark, df_selected, feature_selections, self.task, is_a, id_column
)
Expand Down
Loading