From c9217628f801cdd94e3545e87026096ee110df23 Mon Sep 17 00:00:00 2001 From: rileyh Date: Thu, 20 Nov 2025 20:47:31 +0000 Subject: [PATCH 1/5] [#227] Create a set_job_description() context manager Since SparkContext.setJobDescription() sets a thread-local variable, we'll need to make sure to reset the description once each task finishes. This context manager should help with that. --- hlink/linking/util.py | 11 +++++++++++ hlink/tests/linking_util_test.py | 29 ++++++++++++++++++++++++++++- 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/hlink/linking/util.py b/hlink/linking/util.py index 69f0ba7..4aef2c2 100644 --- a/hlink/linking/util.py +++ b/hlink/linking/util.py @@ -1,3 +1,4 @@ +from contextlib import contextmanager from math import ceil @@ -16,3 +17,13 @@ def spark_shuffle_partitions_heuristic(dataset_size: int) -> int: clamped_below = max(MIN_PARTITIONS, partitions_approx) clamped = min(MAX_PARTITIONS, clamped_below) return clamped + + +@contextmanager +def set_job_description(desc: str | None, spark_context): + previous_desc = spark_context.getLocalProperty("spark.job.description") + spark_context.setJobDescription(desc) + try: + yield + finally: + spark_context.setJobDescription(previous_desc) diff --git a/hlink/tests/linking_util_test.py b/hlink/tests/linking_util_test.py index fa2ff25..873df36 100644 --- a/hlink/tests/linking_util_test.py +++ b/hlink/tests/linking_util_test.py @@ -1,6 +1,6 @@ import pytest -from hlink.linking.util import spark_shuffle_partitions_heuristic +from hlink.linking.util import set_job_description, spark_shuffle_partitions_heuristic @pytest.mark.parametrize( @@ -10,3 +10,30 @@ def test_spark_shuffle_partitions_heuristic(dataset_size, expected_output): output = spark_shuffle_partitions_heuristic(dataset_size) assert output == expected_output + + +def test_set_job_description(spark): + with set_job_description("my description", spark.sparkContext): + desc = spark.sparkContext.getLocalProperty("spark.job.description") + assert desc == "my description" + + +def test_set_job_description_resets_on_error(spark): + spark.sparkContext.setJobDescription(None) + + try: + with set_job_description("my description", spark.sparkContext): + raise Exception() + except Exception: + ... + + assert spark.sparkContext.getLocalProperty("spark.job.description") is None + + +def test_set_job_description_nested(spark): + with set_job_description("outer description", spark.sparkContext): + with set_job_description("inner description", spark.sparkContext): + desc = spark.sparkContext.getLocalProperty("spark.job.description") + assert desc == "inner description" + desc = spark.sparkContext.getLocalProperty("spark.job.description") + assert desc == "outer description" From e3311e1029b9779324f31135e660cd06f017f13f Mon Sep 17 00:00:00 2001 From: rileyh Date: Thu, 20 Nov 2025 21:53:22 +0000 Subject: [PATCH 2/5] [#227] Remove some verbose logger.debug statements --- hlink/linking/training/link_step_save_model_metadata.py | 2 -- hlink/linking/transformers/rename_vector_attributes.py | 5 ----- 2 files changed, 7 deletions(-) diff --git a/hlink/linking/training/link_step_save_model_metadata.py b/hlink/linking/training/link_step_save_model_metadata.py index 1d680e3..23d1565 100644 --- a/hlink/linking/training/link_step_save_model_metadata.py +++ b/hlink/linking/training/link_step_save_model_metadata.py @@ -107,7 +107,6 @@ def _run(self): model_type = config[training_conf]["chosen_model"]["type"] - logger.debug(f"Expanded features with categories are {expanded_features}") logger.debug(f"The model type is '{model_type}'") print("Retrieving model feature importances or coefficients...") @@ -169,7 +168,6 @@ def _run(self): ), ] - logger.debug("Creating the DataFrame and saving it as a table") feature_names, categories = zip(*expanded_features) importance_schema, importance_data = zip(*importance_columns) features_df = self.task.spark.createDataFrame( diff --git a/hlink/linking/transformers/rename_vector_attributes.py b/hlink/linking/transformers/rename_vector_attributes.py index e407c4d..d7f1fbd 100644 --- a/hlink/linking/transformers/rename_vector_attributes.py +++ b/hlink/linking/transformers/rename_vector_attributes.py @@ -69,11 +69,6 @@ def _transform(self, dataset: DataFrame) -> DataFrame: replacement_str = self.getOrDefault("replaceWith") metadata = dataset.schema[input_col].metadata - logger.debug( - f"Renaming the attributes of vector column '{input_col}': " - f"replacing {to_replace} with '{replacement_str}'" - ) - if "attrs" in metadata["ml_attr"]: attributes_by_type = metadata["ml_attr"]["attrs"] From d54e927a5314ae39c5bf5170a0a54652cb510f3e Mon Sep 17 00:00:00 2001 From: rileyh Date: Thu, 20 Nov 2025 22:23:49 +0000 Subject: [PATCH 3/5] [#227] Add more logging.debug statements in long-running tasks --- hlink/linking/matching/link_step_explode.py | 7 +++++++ hlink/linking/matching/link_step_match.py | 1 + hlink/linking/matching/link_step_score.py | 8 ++++++++ hlink/linking/preprocessing/link_step_prep_dataframes.py | 6 ++++++ hlink/linking/preprocessing/link_step_register_raw_dfs.py | 3 +++ hlink/scripts/main.py | 1 + 6 files changed, 26 insertions(+) diff --git a/hlink/linking/matching/link_step_explode.py b/hlink/linking/matching/link_step_explode.py index 4105a0b..f8f3a75 100644 --- a/hlink/linking/matching/link_step_explode.py +++ b/hlink/linking/matching/link_step_explode.py @@ -3,6 +3,7 @@ # in this project's top-level directory, and also on-line at: # https://github.com/ipums/hlink +import logging from typing import Any from pyspark.sql import Column, DataFrame @@ -11,6 +12,8 @@ import hlink.linking.core.comparison as comparison_core from hlink.linking.link_step import LinkStep +logger = logging.getLogger(__name__) + class LinkStepExplode(LinkStep): def __init__(self, task): @@ -42,6 +45,7 @@ def _run(self): # self.spark.sql("set spark.sql.shuffle.partitions=4000") blocking = config["blocking"] + logger.debug("Creating table exploded_df_a") self.task.run_register_python( name="exploded_df_a", func=lambda: self._explode( @@ -53,6 +57,8 @@ def _run(self): is_a=True, ), ) + + logger.debug("Creating table exploded_df_b") self.task.run_register_python( name="exploded_df_b", func=lambda: self._explode( @@ -118,6 +124,7 @@ def _explode( all_exploding_columns = [bc for bc in blocking if bc.get("explode", False)] + logger.debug(f"Exploding {len(all_exploding_columns)} column(s)") for exploding_column in all_exploding_columns: exploding_column_name = exploding_column["column_name"] if exploding_column.get("expand_length", False): diff --git a/hlink/linking/matching/link_step_match.py b/hlink/linking/matching/link_step_match.py index b62a1a8..35b496a 100644 --- a/hlink/linking/matching/link_step_match.py +++ b/hlink/linking/matching/link_step_match.py @@ -150,6 +150,7 @@ def _run(self): if config.get("streamline_potential_match_generation", False): t_ctx["dataset_columns"] = [config["id_column"]] try: + logger.debug("Creating table potential_matches via potential_matches.sql") self.task.run_register_sql("potential_matches", t_ctx=t_ctx, persist=True) finally: self.task.spark.sql("set spark.sql.shuffle.partitions=200") diff --git a/hlink/linking/matching/link_step_score.py b/hlink/linking/matching/link_step_score.py index 12b5da3..b2e12aa 100644 --- a/hlink/linking/matching/link_step_score.py +++ b/hlink/linking/matching/link_step_score.py @@ -80,6 +80,7 @@ def _run(self): "Missing a temporary table from the training task. This table will not be persisted between sessions of hlink for technical reasons. Please run training before running this step." ) + logger.debug(f"Creating table {table_prefix}potential_matches_pipeline") self.task.run_register_python( f"{table_prefix}potential_matches_pipeline", lambda: pre_pipeline.transform(pm.select(*required_columns)), @@ -97,6 +98,7 @@ def _run(self): config[training_conf], chosen_model_params, default=1.3 ) decision = config[training_conf].get("decision") + logger.debug("Predicting with thresholds") predictions = threshold_core.predict_using_thresholds( score_tmp, alpha_threshold, @@ -106,7 +108,11 @@ def _run(self): ) predictions.write.mode("overwrite").saveAsTable(f"{table_prefix}predictions") pmp = self.task.spark.table(f"{table_prefix}potential_matches_pipeline") + logger.debug(f"Creating table {table_prefix}scored_potential_matches") self._save_table_with_requested_columns(pm, pmp, predictions, id_a, id_b) + logger.debug( + f"Creating table {table_prefix}predicted_matches and removing records with duplicated id_b" + ) self._save_predicted_matches(config, id_a, id_b) self.task.spark.sql("set spark.sql.shuffle.partitions=200") @@ -174,6 +180,7 @@ def _create_features(self, conf): potential_matches = f"{table_prefix}potential_matches" table_name = f"{table_prefix}potential_matches_prepped" pm_columns = self.task.spark.table(potential_matches).columns + logger.debug("Getting comparison features") ( comp_features, advanced_comp_features, @@ -200,6 +207,7 @@ def _create_features(self, conf): dist_tables ) + logger.debug("Creating all of the comparison features") comparison_feature_core.create_feature_tables( self.task, t_ctx_def, diff --git a/hlink/linking/preprocessing/link_step_prep_dataframes.py b/hlink/linking/preprocessing/link_step_prep_dataframes.py index a8f9f20..da2e02e 100644 --- a/hlink/linking/preprocessing/link_step_prep_dataframes.py +++ b/hlink/linking/preprocessing/link_step_prep_dataframes.py @@ -40,6 +40,7 @@ def _run(self): substitution_columns = config.get("substitution_columns", []) feature_selections = config.get("feature_selections", []) + logger.debug("Creating table prepped_df_a") self.task.run_register_python( name="prepped_df_a", func=lambda: self._prep_dataframe( @@ -52,6 +53,8 @@ def _run(self): ), persist=True, ) + + logger.debug("Creating table prepped_df_b") self.task.run_register_python( name="prepped_df_b", func=lambda: self._prep_dataframe( @@ -97,6 +100,7 @@ def _prep_dataframe( column_selects = [col(id_column)] custom_transforms = self.task.link_run.custom_column_mapping_transforms + logger.debug("Selecting column mappings") for column_mapping in column_definitions: df_selected, column_selects = column_mapping_core.select_column_mapping( column_mapping, @@ -108,10 +112,12 @@ def _prep_dataframe( df_selected = df_selected.select(column_selects) + logger.debug("Generating substitutions") df_selected = substitutions_core.generate_substitutions( spark, df_selected, substitution_columns ) + logger.debug("Generating transforms") df_selected = transforms_core.generate_transforms( spark, df_selected, feature_selections, self.task, is_a, id_column ) diff --git a/hlink/linking/preprocessing/link_step_register_raw_dfs.py b/hlink/linking/preprocessing/link_step_register_raw_dfs.py index 2da1ece..86cadad 100644 --- a/hlink/linking/preprocessing/link_step_register_raw_dfs.py +++ b/hlink/linking/preprocessing/link_step_register_raw_dfs.py @@ -73,11 +73,14 @@ def _run(self): else: df_b = df_b_filtered + logger.debug("Creating table raw_df_a") self.task.run_register_python( name="raw_df_a", func=lambda: df_a, persist=True, ) + + logger.debug("Creating table raw_df_b") self.task.run_register_python( name="raw_df_b", func=lambda: df_b, diff --git a/hlink/scripts/main.py b/hlink/scripts/main.py index fb5c3dd..88ea3c3 100755 --- a/hlink/scripts/main.py +++ b/hlink/scripts/main.py @@ -223,6 +223,7 @@ def _setup_logging(conf_path, run_name): print(f"*** Hlink log: {log_file.absolute()}") logging.basicConfig(filename=log_file, level=logging.INFO, format=format_string) + logging.getLogger("hlink").setLevel(logging.DEBUG) logger.info(f"New session {session_id} by user {user}") logger.info(f"Configured with {conf_path}") From c59400a00cb0727b957ba22e70658f304a67c824 Mon Sep 17 00:00:00 2001 From: rileyh Date: Fri, 21 Nov 2025 17:02:45 +0000 Subject: [PATCH 4/5] [#227] Set Spark job descriptions for many of the important operations --- .../link_step_block_on_households.py | 13 +++-- hlink/linking/hh_matching/link_step_filter.py | 26 +++++---- hlink/linking/matching/link_step_explode.py | 48 +++++++++-------- hlink/linking/matching/link_step_match.py | 11 ++-- hlink/linking/matching/link_step_score.py | 35 ++++++++---- .../link_step_prep_dataframes.py | 53 ++++++++++--------- .../link_step_register_raw_dfs.py | 36 ++++++++----- .../link_step_create_comparison_features.py | 19 ++++--- .../linking/training/link_step_ingest_file.py | 22 ++++---- .../training/link_step_save_model_metadata.py | 6 ++- .../link_step_train_and_save_model.py | 17 ++++-- 11 files changed, 175 insertions(+), 111 deletions(-) diff --git a/hlink/linking/hh_matching/link_step_block_on_households.py b/hlink/linking/hh_matching/link_step_block_on_households.py index 6869bbb..738af50 100644 --- a/hlink/linking/hh_matching/link_step_block_on_households.py +++ b/hlink/linking/hh_matching/link_step_block_on_households.py @@ -8,6 +8,7 @@ from pyspark.sql.functions import col from hlink.linking.link_step import LinkStep +from hlink.linking.util import set_job_description logger = logging.getLogger(__name__) @@ -112,11 +113,13 @@ def _run(self): logger.debug("Blocking on household serial ID and generating potential matches") # Generate potential matches with those unmatched people who were in a household with a match, blocking only on household id - self.task.run_register_python( - "hh_blocked_matches", - lambda: stm.join(uma, hhid_a).join(umb, hhid_b).distinct(), - persist=True, - ) + spark_context = self.task.spark.sparkContext + with set_job_description("create table hh_blocked_matches", spark_context): + self.task.run_register_python( + "hh_blocked_matches", + lambda: stm.join(uma, hhid_a).join(umb, hhid_b).distinct(), + persist=True, + ) hh_blocked_matches = self.task.spark.table("hh_blocked_matches") logger.debug(f"hh_blocked_matches has {hh_blocked_matches.count()} records") diff --git a/hlink/linking/hh_matching/link_step_filter.py b/hlink/linking/hh_matching/link_step_filter.py index c34f06c..122427a 100644 --- a/hlink/linking/hh_matching/link_step_filter.py +++ b/hlink/linking/hh_matching/link_step_filter.py @@ -7,6 +7,7 @@ import hlink.linking.core.comparison as comparison_core from hlink.linking.link_step import LinkStep +from hlink.linking.util import set_job_description class LinkStepFilter(LinkStep): @@ -21,6 +22,7 @@ def __init__(self, task): def _run(self): # self.task.spark.sql("set spark.sql.shuffle.partitions=4000") config = self.task.link_run.config + spark_context = self.task.spark.sparkContext # establish empty table context dict to pass to SQL template t_ctx = {} @@ -44,19 +46,25 @@ def _run(self): if f["alias"] in comp_feature_names ] - self.task.run_register_sql( - "hh_potential_matches", t_ctx=t_ctx, persist=True - ) + with set_job_description( + "create table hh_potential_matches", spark_context + ): + self.task.run_register_sql( + "hh_potential_matches", t_ctx=t_ctx, persist=True + ) else: - self.task.run_register_python( - "hh_potential_matches", - lambda: self.task.spark.table("hh_blocked_matches"), - persist=True, - ) + with set_job_description( + "create table hh_potential_matches", spark_context + ): + self.task.run_register_python( + "hh_potential_matches", + lambda: self.task.spark.table("hh_blocked_matches"), + persist=True, + ) self.task.spark.sql("set spark.sql.shuffle.partitions=200") print( - "Potential matches from households which meet hh_comparsions thresholds have been saved to table 'hh_potential_matches'." + "Potential matches from households which meet hh_comparisons thresholds have been saved to table 'hh_potential_matches'." ) diff --git a/hlink/linking/matching/link_step_explode.py b/hlink/linking/matching/link_step_explode.py index f8f3a75..2013955 100644 --- a/hlink/linking/matching/link_step_explode.py +++ b/hlink/linking/matching/link_step_explode.py @@ -11,6 +11,7 @@ import hlink.linking.core.comparison as comparison_core from hlink.linking.link_step import LinkStep +from hlink.linking.util import set_job_description logger = logging.getLogger(__name__) @@ -26,6 +27,7 @@ def __init__(self, task): def _run(self): config = self.task.link_run.config + spark_context = self.task.spark.sparkContext # filter the universe of potential matches before exploding t_ctx = {} universe_conf = config.get("potential_matches_universe", []) @@ -46,30 +48,32 @@ def _run(self): blocking = config["blocking"] logger.debug("Creating table exploded_df_a") - self.task.run_register_python( - name="exploded_df_a", - func=lambda: self._explode( - df=self.task.spark.table("match_universe_df_a"), - comparisons=config["comparisons"], - comparison_features=config["comparison_features"], - blocking=blocking, - id_column=config["id_column"], - is_a=True, - ), - ) + with set_job_description("create table exploded_df_a", spark_context): + self.task.run_register_python( + name="exploded_df_a", + func=lambda: self._explode( + df=self.task.spark.table("match_universe_df_a"), + comparisons=config["comparisons"], + comparison_features=config["comparison_features"], + blocking=blocking, + id_column=config["id_column"], + is_a=True, + ), + ) logger.debug("Creating table exploded_df_b") - self.task.run_register_python( - name="exploded_df_b", - func=lambda: self._explode( - df=self.task.spark.table("match_universe_df_b"), - comparisons=config["comparisons"], - comparison_features=config["comparison_features"], - blocking=blocking, - id_column=config["id_column"], - is_a=False, - ), - ) + with set_job_description("create table exploded_df_b", spark_context): + self.task.run_register_python( + name="exploded_df_b", + func=lambda: self._explode( + df=self.task.spark.table("match_universe_df_b"), + comparisons=config["comparisons"], + comparison_features=config["comparison_features"], + blocking=blocking, + id_column=config["id_column"], + is_a=False, + ), + ) def _explode( self, diff --git a/hlink/linking/matching/link_step_match.py b/hlink/linking/matching/link_step_match.py index 35b496a..f8b87f2 100644 --- a/hlink/linking/matching/link_step_match.py +++ b/hlink/linking/matching/link_step_match.py @@ -10,7 +10,7 @@ import hlink.linking.core.comparison_feature as comparison_feature_core import hlink.linking.core.dist_table as dist_table_core import hlink.linking.core.comparison as comparison_core -from hlink.linking.util import spark_shuffle_partitions_heuristic +from hlink.linking.util import set_job_description, spark_shuffle_partitions_heuristic from hlink.linking.link_step import LinkStep @@ -149,8 +149,13 @@ def _run(self): if config.get("streamline_potential_match_generation", False): t_ctx["dataset_columns"] = [config["id_column"]] + + spark_context = self.task.spark.sparkContext + logger.debug("Creating table potential_matches via potential_matches.sql") try: - logger.debug("Creating table potential_matches via potential_matches.sql") - self.task.run_register_sql("potential_matches", t_ctx=t_ctx, persist=True) + with set_job_description("create table potential_matches", spark_context): + self.task.run_register_sql( + "potential_matches", t_ctx=t_ctx, persist=True + ) finally: self.task.spark.sql("set spark.sql.shuffle.partitions=200") diff --git a/hlink/linking/matching/link_step_score.py b/hlink/linking/matching/link_step_score.py index b2e12aa..581121e 100644 --- a/hlink/linking/matching/link_step_score.py +++ b/hlink/linking/matching/link_step_score.py @@ -10,7 +10,7 @@ import hlink.linking.core.comparison_feature as comparison_feature_core import hlink.linking.core.threshold as threshold_core import hlink.linking.core.dist_table as dist_table_core -from hlink.linking.util import spark_shuffle_partitions_heuristic +from hlink.linking.util import set_job_description, spark_shuffle_partitions_heuristic from hlink.linking.link_step import LinkStep @@ -35,6 +35,7 @@ def _run(self): training_conf = str(self.task.training_conf) table_prefix = self.task.table_prefix config = self.task.link_run.config + spark_context = self.task.spark.sparkContext if training_conf not in config or "chosen_model" not in config[training_conf]: print( @@ -52,7 +53,8 @@ def _run(self): id_a = config["id_column"] + "_a" id_b = config["id_column"] + "_b" chosen_model_params = config[training_conf]["chosen_model"].copy() - self._create_features(config) + with set_job_description("create comparison features", spark_context): + self._create_features(config) pm = self.task.spark.table(f"{table_prefix}potential_matches_prepped") ind_var_columns = config[training_conf]["independent_vars"] @@ -81,11 +83,12 @@ def _run(self): ) logger.debug(f"Creating table {table_prefix}potential_matches_pipeline") - self.task.run_register_python( - f"{table_prefix}potential_matches_pipeline", - lambda: pre_pipeline.transform(pm.select(*required_columns)), - persist=True, - ) + with set_job_description("prepare the data for the model", spark_context): + self.task.run_register_python( + f"{table_prefix}potential_matches_pipeline", + lambda: pre_pipeline.transform(pm.select(*required_columns)), + persist=True, + ) plm = self.task.link_run.trained_models[f"{table_prefix}trained_model"] pp_required_cols = set(plm.stages[0].getInputCols() + [id_a, id_b]) pre_pipeline = self.task.spark.table( @@ -106,14 +109,26 @@ def _run(self): config["id_column"], decision, ) - predictions.write.mode("overwrite").saveAsTable(f"{table_prefix}predictions") + + with set_job_description( + f"create table {table_prefix}predictions", spark_context + ): + predictions.write.mode("overwrite").saveAsTable( + f"{table_prefix}predictions" + ) pmp = self.task.spark.table(f"{table_prefix}potential_matches_pipeline") logger.debug(f"Creating table {table_prefix}scored_potential_matches") - self._save_table_with_requested_columns(pm, pmp, predictions, id_a, id_b) + with set_job_description( + f"create table {table_prefix}scored_potential_matches", spark_context + ): + self._save_table_with_requested_columns(pm, pmp, predictions, id_a, id_b) logger.debug( f"Creating table {table_prefix}predicted_matches and removing records with duplicated id_b" ) - self._save_predicted_matches(config, id_a, id_b) + with set_job_description( + f"create table {table_prefix}predicted_matches", spark_context + ): + self._save_predicted_matches(config, id_a, id_b) self.task.spark.sql("set spark.sql.shuffle.partitions=200") def _save_table_with_requested_columns(self, pm, pmp, predictions, id_a, id_b): diff --git a/hlink/linking/preprocessing/link_step_prep_dataframes.py b/hlink/linking/preprocessing/link_step_prep_dataframes.py index da2e02e..45a73c2 100644 --- a/hlink/linking/preprocessing/link_step_prep_dataframes.py +++ b/hlink/linking/preprocessing/link_step_prep_dataframes.py @@ -9,7 +9,7 @@ import hlink.linking.core.column_mapping as column_mapping_core import hlink.linking.core.substitutions as substitutions_core import hlink.linking.core.transforms as transforms_core -from hlink.linking.util import spark_shuffle_partitions_heuristic +from hlink.linking.util import set_job_description, spark_shuffle_partitions_heuristic from hlink.linking.link_step import LinkStep @@ -27,6 +27,7 @@ def __init__(self, task): def _run(self): config = self.task.link_run.config + spark_context = self.task.spark.sparkContext dataset_size_a = self.task.spark.table("raw_df_a").count() dataset_size_b = self.task.spark.table("raw_df_b").count() @@ -41,32 +42,34 @@ def _run(self): feature_selections = config.get("feature_selections", []) logger.debug("Creating table prepped_df_a") - self.task.run_register_python( - name="prepped_df_a", - func=lambda: self._prep_dataframe( - self.task.spark.table("raw_df_a"), - config["column_mappings"], - substitution_columns, - feature_selections, - True, - config["id_column"], - ), - persist=True, - ) + with set_job_description("create table prepped_df_a", spark_context): + self.task.run_register_python( + name="prepped_df_a", + func=lambda: self._prep_dataframe( + self.task.spark.table("raw_df_a"), + config["column_mappings"], + substitution_columns, + feature_selections, + True, + config["id_column"], + ), + persist=True, + ) logger.debug("Creating table prepped_df_b") - self.task.run_register_python( - name="prepped_df_b", - func=lambda: self._prep_dataframe( - self.task.spark.table("raw_df_b"), - config["column_mappings"], - substitution_columns, - feature_selections, - False, - config["id_column"], - ), - persist=True, - ) + with set_job_description("create table prepped_df_b", spark_context): + self.task.run_register_python( + name="prepped_df_b", + func=lambda: self._prep_dataframe( + self.task.spark.table("raw_df_b"), + config["column_mappings"], + substitution_columns, + feature_selections, + False, + config["id_column"], + ), + persist=True, + ) self.task.spark.sql("set spark.sql.shuffle.partitions=200") diff --git a/hlink/linking/preprocessing/link_step_register_raw_dfs.py b/hlink/linking/preprocessing/link_step_register_raw_dfs.py index 86cadad..b833086 100644 --- a/hlink/linking/preprocessing/link_step_register_raw_dfs.py +++ b/hlink/linking/preprocessing/link_step_register_raw_dfs.py @@ -11,6 +11,7 @@ from hlink.errors import DataError from hlink.linking.link_step import LinkStep +from hlink.linking.util import set_job_description logger = logging.getLogger(__name__) @@ -48,14 +49,19 @@ def __init__(self, task): def _run(self): config = self.task.link_run.config + spark_context = self.task.spark.sparkContext path_a, file_type_a = handle_paths(config["datasource_a"], "a") path_b, file_type_b = handle_paths(config["datasource_b"], "b") - self._load_unpartitioned(file_type_a, "_a", path_a) - self._load_unpartitioned(file_type_b, "_b", path_b) + with set_job_description("load data source A", spark_context): + self._load_unpartitioned(file_type_a, "_a", path_a) + with set_job_description("load data source B", spark_context): + self._load_unpartitioned(file_type_b, "_b", path_b) - df_a_filtered = self._filter_dataframe(config, "a") - df_b_filtered = self._filter_dataframe(config, "b") + with set_job_description("filter data source A", spark_context): + df_a_filtered = self._filter_dataframe(config, "a") + with set_job_description("filter data source B", spark_context): + df_b_filtered = self._filter_dataframe(config, "b") if config["datasource_a"].get("convert_ints_to_longs", False): logger.debug( @@ -74,18 +80,20 @@ def _run(self): df_b = df_b_filtered logger.debug("Creating table raw_df_a") - self.task.run_register_python( - name="raw_df_a", - func=lambda: df_a, - persist=True, - ) + with set_job_description("create table raw_df_a", spark_context): + self.task.run_register_python( + name="raw_df_a", + func=lambda: df_a, + persist=True, + ) logger.debug("Creating table raw_df_b") - self.task.run_register_python( - name="raw_df_b", - func=lambda: df_b, - persist=True, - ) + with set_job_description("create table raw_df_b", spark_context): + self.task.run_register_python( + name="raw_df_b", + func=lambda: df_b, + persist=True, + ) self._check_for_all_spaces_unrestricted_file("raw_df_a") self._check_for_all_spaces_unrestricted_file("raw_df_b") diff --git a/hlink/linking/training/link_step_create_comparison_features.py b/hlink/linking/training/link_step_create_comparison_features.py index 0b9bdb7..7e73575 100644 --- a/hlink/linking/training/link_step_create_comparison_features.py +++ b/hlink/linking/training/link_step_create_comparison_features.py @@ -7,6 +7,7 @@ import hlink.linking.core.dist_table as dist_table_core from hlink.linking.link_step import LinkStep +from hlink.linking.util import set_job_description class LinkStepCreateComparisonFeatures(LinkStep): @@ -86,11 +87,13 @@ def _create_training_features(self): dist_tables ) - comparison_feature_core.create_feature_tables( - self.task, - t_ctx_def, - advanced_comp_features, - hh_comp_features, - config["id_column"], - table_name=f"{table_prefix}training_features", - ) + spark_context = self.task.spark.sparkContext + with set_job_description("create comparison features", spark_context): + comparison_feature_core.create_feature_tables( + self.task, + t_ctx_def, + advanced_comp_features, + hh_comp_features, + config["id_column"], + table_name=f"{table_prefix}training_features", + ) diff --git a/hlink/linking/training/link_step_ingest_file.py b/hlink/linking/training/link_step_ingest_file.py index d7ad6cf..36011dc 100644 --- a/hlink/linking/training/link_step_ingest_file.py +++ b/hlink/linking/training/link_step_ingest_file.py @@ -4,6 +4,7 @@ # https://github.com/ipums/hlink from hlink.linking.link_step import LinkStep +from hlink.linking.util import set_job_description class LinkStepIngestFile(LinkStep): @@ -16,12 +17,15 @@ def __init__(self, task): ) def _run(self): - self.task.run_register_python( - f"{self.task.table_prefix}training_data", - lambda: self.task.spark.read.csv( - self.task.link_run.config[f"{self.task.training_conf}"]["dataset"], - header=True, - inferSchema=True, - ), - persist=True, - ) + spark_context = self.task.spark.sparkContext + + with set_job_description("load training data", spark_context): + self.task.run_register_python( + f"{self.task.table_prefix}training_data", + lambda: self.task.spark.read.csv( + self.task.link_run.config[f"{self.task.training_conf}"]["dataset"], + header=True, + inferSchema=True, + ), + persist=True, + ) diff --git a/hlink/linking/training/link_step_save_model_metadata.py b/hlink/linking/training/link_step_save_model_metadata.py index 23d1565..f592411 100644 --- a/hlink/linking/training/link_step_save_model_metadata.py +++ b/hlink/linking/training/link_step_save_model_metadata.py @@ -14,6 +14,7 @@ ) from hlink.linking.link_step import LinkStep +from hlink.linking.util import set_job_description logger = logging.getLogger(__name__) @@ -183,6 +184,9 @@ def _run(self): feature_importances_table = ( f"{self.task.table_prefix}training_feature_importances" ) - features_df.write.mode("overwrite").saveAsTable(feature_importances_table) + + spark_context = self.task.spark.sparkContext + with set_job_description("save training feature importances", spark_context): + features_df.write.mode("overwrite").saveAsTable(feature_importances_table) print(f"{label} have been saved to the {feature_importances_table} table") diff --git a/hlink/linking/training/link_step_train_and_save_model.py b/hlink/linking/training/link_step_train_and_save_model.py index b86d20b..d85c7e7 100644 --- a/hlink/linking/training/link_step_train_and_save_model.py +++ b/hlink/linking/training/link_step_train_and_save_model.py @@ -10,6 +10,7 @@ import hlink.linking.core.threshold as threshold_core from hlink.linking.link_step import LinkStep +from hlink.linking.util import set_job_description class LinkStepTrainAndSaveModel(LinkStep): @@ -26,6 +27,7 @@ def _run(self): training_conf = str(self.task.training_conf) table_prefix = self.task.table_prefix config = self.task.link_run.config + spark_context = self.task.spark.sparkContext if not config[training_conf].get("score_with_model", False): raise ValueError( @@ -58,11 +60,15 @@ def _run(self): pre_pipeline = Pipeline(stages=pipeline_stages[:-1]).fit(tf) self.task.link_run.trained_models[f"{table_prefix}pre_pipeline"] = pre_pipeline - tf_prepped = pre_pipeline.transform(tf) - tf_prepped.write.mode("overwrite").saveAsTable( - f"{table_prefix}training_features_prepped" - ) + with set_job_description( + "prepare the training data for the model", spark_context + ): + tf_prepped = pre_pipeline.transform(tf) + + tf_prepped.write.mode("overwrite").saveAsTable( + f"{table_prefix}training_features_prepped" + ) classifier, post_transformer = classifier_core.choose_classifier( chosen_model_type, chosen_model_params, dep_var @@ -71,7 +77,8 @@ def _run(self): # Train and save pipeline pipeline = Pipeline(stages=[vector_assembler, classifier, post_transformer]) - model = pipeline.fit(tf_prepped) + with set_job_description("train the model", spark_context): + model = pipeline.fit(tf_prepped) # model_path = config["spark_tmp_dir"] + "/chosen_model" self.task.link_run.trained_models[f"{table_prefix}trained_model"] = model # model.write().overwrite().save(model_path) From 8fe811d7bc7d75bd312b54fabc66429c085cadc1 Mon Sep 17 00:00:00 2001 From: rileyh Date: Fri, 21 Nov 2025 18:22:05 +0000 Subject: [PATCH 5/5] [#227] Add docs for set_job_description and update the changelog --- hlink/linking/util.py | 6 ++++++ sphinx-docs/changelog.md | 9 +++++++++ 2 files changed, 15 insertions(+) diff --git a/hlink/linking/util.py b/hlink/linking/util.py index 4aef2c2..f239be7 100644 --- a/hlink/linking/util.py +++ b/hlink/linking/util.py @@ -21,6 +21,12 @@ def spark_shuffle_partitions_heuristic(dataset_size: int) -> int: @contextmanager def set_job_description(desc: str | None, spark_context): + """Set the Spark job description. + + This context manager sets the Spark job description to the given string, + then restores the job description to its previous value on exit. Passing + desc=None resets the job description to the Spark default. + """ previous_desc = spark_context.getLocalProperty("spark.job.description") spark_context.setJobDescription(desc) try: diff --git a/sphinx-docs/changelog.md b/sphinx-docs/changelog.md index d5526ab..427a9c7 100644 --- a/sphinx-docs/changelog.md +++ b/sphinx-docs/changelog.md @@ -3,6 +3,14 @@ The format of this changelog is based on [Keep A Changelog][keep-a-changelog]. Hlink adheres to semantic versioning as much as possible. +## Not Yet Released + +### Added + +* Started setting custom Spark job descriptions for some of hlink's long-running + jobs. These descriptions appear in the Spark UI and make it easier to understand + what hlink is doing. [PR #228][pr228] + ## v4.2.1 (2025-08-18) ### Fixed @@ -453,6 +461,7 @@ and false negative data in model exploration. [PR #1][pr1] [pr215]: https://github.com/ipums/hlink/pull/215 [pr219]: https://github.com/ipums/hlink/pull/219 [pr222]: https://github.com/ipums/hlink/pull/222 +[pr228]: https://github.com/ipums/hlink/pull/228 [household-matching-docs]: config.html#household-matching [household-training-docs]: config.html#household-training-and-model-exploration