From 161393f0e5531220f9adb98c52edb0900edd5c3e Mon Sep 17 00:00:00 2001 From: zdhruv3 Date: Wed, 5 Mar 2025 15:30:36 +0000 Subject: [PATCH 01/12] introduce a change --- src/hyperleaup/hyper_file.py | 41 ++++++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/src/hyperleaup/hyper_file.py b/src/hyperleaup/hyper_file.py index 4a9a474..b57096b 100644 --- a/src/hyperleaup/hyper_file.py +++ b/src/hyperleaup/hyper_file.py @@ -10,7 +10,7 @@ from hyperleaup.hyper_utils import HyperUtils from hyperleaup.publisher import Publisher from hyperleaup.spark_fixture import get_spark_session - +import time def get_spark_dataframe(sql) -> DataFrame: return get_spark_session().sql(sql) @@ -18,12 +18,14 @@ def get_spark_dataframe(sql) -> DataFrame: class HyperFile: - def __init__(self, name: str, - sql: str = None, df: DataFrame = None, - is_dbfs_enabled: bool = False, - creation_mode: str = CreationMode.PARQUET.value, - null_values_replacement: dict = None, - config: HyperFileConfig = HyperFileConfig()): + def __init__( + self, name: str, + sql: str = None, df: DataFrame = None, + is_dbfs_enabled: bool = False, + creation_mode: str = CreationMode.PARQUET.value, + null_values_replacement: dict = None, + config: HyperFileConfig = HyperFileConfig() + ): self.name = name # Create a DataFrame from Spark SQL if sql is not None and df is None: @@ -39,12 +41,14 @@ def __init__(self, name: str, if sql is None and df is None: self.path = None else: - self.path = Creator(self.df, - self.name, - self.is_dbfs_enabled, - self.creation_mode, - self.null_values_replacement, - self.config).create() + self.path = Creator( + self.df, + self.name, + self.is_dbfs_enabled, + self.creation_mode, + self.null_values_replacement, + self.config + ).create() self.luid = None def print_rows(self): @@ -108,6 +112,17 @@ def save(self, path: str) -> str: logging.info(f'Saving Hyper File to new location: {dest_path}') + timeout = 60 # 1 minute + start_time = time.time() + + while not os.path.exists(self.path): + if time.time() - start_time > timeout: + raise FileNotFoundError(f"File {self.path} not found within the timeout period.") + print(f"Waiting for the file {self.path} to be written...") + time.sleep(1) # wait for a second before checking again + + print(f"File {self.path} exists, proceeding.") + return copyfile(self.path, dest_path) @staticmethod From 435ab7037125edd733dee59625ccbfacd9dfdaea Mon Sep 17 00:00:00 2001 From: zdhruv3 Date: Wed, 5 Mar 2025 15:45:01 +0000 Subject: [PATCH 02/12] update the package --- src/hyperleaup/creator.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/hyperleaup/creator.py b/src/hyperleaup/creator.py index c4b57ae..00383cd 100644 --- a/src/hyperleaup/creator.py +++ b/src/hyperleaup/creator.py @@ -258,6 +258,12 @@ def write_parquet_to_dbfs(df: DataFrame, name: str, allow_nulls = False, convert if file.endswith(".parquet"): parquet_file = file + + dbfs_tmp_dir = "/dbfs" + tmp_dir + files = dbutils.fs.ls(tmp_dir) + if files is None: + logging.info(f"Parquet path '{tmp_dir}' not found on DBFS.") + if parquet_file is None: raise FileNotFoundError(f"Parquet file '{tmp_dir}' not found on DBFS.") From 57c52f7ca420566562e93ca3ccbfdbf3bb71eae2 Mon Sep 17 00:00:00 2001 From: zdhruv3 Date: Wed, 5 Mar 2025 16:01:44 +0000 Subject: [PATCH 03/12] display everything --- src/hyperleaup/creator.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/hyperleaup/creator.py b/src/hyperleaup/creator.py index 00383cd..85bec9d 100644 --- a/src/hyperleaup/creator.py +++ b/src/hyperleaup/creator.py @@ -261,6 +261,8 @@ def write_parquet_to_dbfs(df: DataFrame, name: str, allow_nulls = False, convert dbfs_tmp_dir = "/dbfs" + tmp_dir files = dbutils.fs.ls(tmp_dir) + print(files) + print(dbfs_tmp_dir) if files is None: logging.info(f"Parquet path '{tmp_dir}' not found on DBFS.") From a5c3e822838015666eb6d87ed0b6cfa39b74f3df Mon Sep 17 00:00:00 2001 From: zdhruv3 Date: Wed, 5 Mar 2025 16:07:21 +0000 Subject: [PATCH 04/12] introduce delay --- src/hyperleaup/creator.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/hyperleaup/creator.py b/src/hyperleaup/creator.py index 85bec9d..b3a0a67 100644 --- a/src/hyperleaup/creator.py +++ b/src/hyperleaup/creator.py @@ -11,6 +11,7 @@ Inserter, Connection, CreateMode from pathlib import Path from databricks.sdk.runtime import * +import time def clean_dataframe(df: DataFrame, allow_nulls=False, convert_decimal_precision=False) -> DataFrame: """Replaces null or NaN values with '' and 0s""" @@ -248,8 +249,14 @@ def write_parquet_to_dbfs(df: DataFrame, name: str, allow_nulls = False, convert cleaned_df = clean_dataframe(df, allow_nulls, convert_decimal_precision) # write the DataFrame to DBFS as a single Parquet file - cleaned_df.coalesce(1).write \ - .mode("overwrite").parquet(tmp_dir) + cleaned_df.coalesce(1).write.mode("overwrite").parquet(tmp_dir) + time.sleep(5) + dbfs_tmp_dir = "/dbfs" + tmp_dir + files = dbutils.fs.ls(tmp_dir) + print(files) + print(dbfs_tmp_dir) + if files is None: + logging.info(f"Parquet path '{tmp_dir}' not found on DBFS.") dbfs_tmp_dir = "/dbfs" + tmp_dir parquet_file = None @@ -259,12 +266,6 @@ def write_parquet_to_dbfs(df: DataFrame, name: str, allow_nulls = False, convert parquet_file = file - dbfs_tmp_dir = "/dbfs" + tmp_dir - files = dbutils.fs.ls(tmp_dir) - print(files) - print(dbfs_tmp_dir) - if files is None: - logging.info(f"Parquet path '{tmp_dir}' not found on DBFS.") if parquet_file is None: raise FileNotFoundError(f"Parquet file '{tmp_dir}' not found on DBFS.") From e071083d51a8dd96cbfe597838274e9965d21fe9 Mon Sep 17 00:00:00 2001 From: zdhruv3 Date: Wed, 5 Mar 2025 16:20:13 +0000 Subject: [PATCH 05/12] update code --- src/hyperleaup/creator.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/hyperleaup/creator.py b/src/hyperleaup/creator.py index b3a0a67..13baf55 100644 --- a/src/hyperleaup/creator.py +++ b/src/hyperleaup/creator.py @@ -260,12 +260,14 @@ def write_parquet_to_dbfs(df: DataFrame, name: str, allow_nulls = False, convert dbfs_tmp_dir = "/dbfs" + tmp_dir parquet_file = None - for root_dir, dirs, files in os.walk(dbfs_tmp_dir): - for file in files: - if file.endswith(".parquet"): - parquet_file = file - - + # List files in the DBFS directory + files_info = dbutils.fs.ls(dbfs_tmp_dir) + + for file_info in files_info: + if file_info.name.endswith(".parquet"): + parquet_file = file_info.path + print(f"Found parquet file: {parquet_file}") + break # Stop after finding the first .parquet file if parquet_file is None: raise FileNotFoundError(f"Parquet file '{tmp_dir}' not found on DBFS.") From 58e2e99eb5d0af8149d0b66e7f077756c3839103 Mon Sep 17 00:00:00 2001 From: zdhruv3 Date: Wed, 5 Mar 2025 16:31:31 +0000 Subject: [PATCH 06/12] apply fix --- src/hyperleaup/creator.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/hyperleaup/creator.py b/src/hyperleaup/creator.py index 13baf55..251bb74 100644 --- a/src/hyperleaup/creator.py +++ b/src/hyperleaup/creator.py @@ -258,11 +258,11 @@ def write_parquet_to_dbfs(df: DataFrame, name: str, allow_nulls = False, convert if files is None: logging.info(f"Parquet path '{tmp_dir}' not found on DBFS.") - dbfs_tmp_dir = "/dbfs" + tmp_dir + dbfs_tmp_dir_list = "dbfs:" + tmp_dir parquet_file = None # List files in the DBFS directory - files_info = dbutils.fs.ls(dbfs_tmp_dir) - + files_info = dbutils.fs.ls(dbfs_tmp_dir_list) + for file_info in files_info: if file_info.name.endswith(".parquet"): parquet_file = file_info.path From 7d71a36e12dd4bfec16afc393a104bcd25af0eaf Mon Sep 17 00:00:00 2001 From: zdhruv3 Date: Wed, 5 Mar 2025 16:40:15 +0000 Subject: [PATCH 07/12] fix path --- src/hyperleaup/creator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/hyperleaup/creator.py b/src/hyperleaup/creator.py index 251bb74..b073b43 100644 --- a/src/hyperleaup/creator.py +++ b/src/hyperleaup/creator.py @@ -276,7 +276,7 @@ def write_parquet_to_dbfs(df: DataFrame, name: str, allow_nulls = False, convert if not os.path.exists(tmp_dir): os.makedirs(tmp_dir) - src_path = dbfs_tmp_dir + parquet_file + src_path = parquet_file dest_path = tmp_dir + parquet_file copyfile(src_path, dest_path) From e77840de16e8311b9c7cf95244bebb6f3cfd7d14 Mon Sep 17 00:00:00 2001 From: zdhruv3 Date: Wed, 5 Mar 2025 16:57:36 +0000 Subject: [PATCH 08/12] extract logic --- src/hyperleaup/creator.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/hyperleaup/creator.py b/src/hyperleaup/creator.py index b073b43..0628713 100644 --- a/src/hyperleaup/creator.py +++ b/src/hyperleaup/creator.py @@ -246,8 +246,7 @@ def write_parquet_to_dbfs(df: DataFrame, name: str, allow_nulls = False, convert """Writes and moves a single Parquet file written to a Databricks Filesystem to a temp directory on the driver node.""" tmp_dir = f"/tmp/hyperleaup/{name}/" - cleaned_df = clean_dataframe(df, allow_nulls, convert_decimal_precision) - + cleaned_df = clean_dataframe(df, allow_nulls, convert_decimal_precision) # write the DataFrame to DBFS as a single Parquet file cleaned_df.coalesce(1).write.mode("overwrite").parquet(tmp_dir) time.sleep(5) @@ -269,6 +268,12 @@ def write_parquet_to_dbfs(df: DataFrame, name: str, allow_nulls = False, convert print(f"Found parquet file: {parquet_file}") break # Stop after finding the first .parquet file + def extract_filename(dbfs_path): + # Extract the filename using os.path.basename + filename = os.path.basename(dbfs_path) + return filename + + parquet_file = extract_filename(parquet_file) if parquet_file is None: raise FileNotFoundError(f"Parquet file '{tmp_dir}' not found on DBFS.") @@ -276,7 +281,7 @@ def write_parquet_to_dbfs(df: DataFrame, name: str, allow_nulls = False, convert if not os.path.exists(tmp_dir): os.makedirs(tmp_dir) - src_path = parquet_file + src_path = dbfs_tmp_dir + parquet_file dest_path = tmp_dir + parquet_file copyfile(src_path, dest_path) From 68c4cd31f2c784881bfc89af9426cb4e68ea98bf Mon Sep 17 00:00:00 2001 From: zdhruv3 Date: Wed, 5 Mar 2025 17:18:50 +0000 Subject: [PATCH 09/12] add a potenial fix in loading --- src/hyperleaup/creator.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/hyperleaup/creator.py b/src/hyperleaup/creator.py index 0628713..9c23686 100644 --- a/src/hyperleaup/creator.py +++ b/src/hyperleaup/creator.py @@ -273,6 +273,8 @@ def extract_filename(dbfs_path): filename = os.path.basename(dbfs_path) return filename + dbfs_parquet_file_path = parquet_file + parquet_file = extract_filename(parquet_file) if parquet_file is None: raise FileNotFoundError(f"Parquet file '{tmp_dir}' not found on DBFS.") @@ -282,6 +284,7 @@ def extract_filename(dbfs_path): os.makedirs(tmp_dir) src_path = dbfs_tmp_dir + parquet_file + dbutils.fs.cp(dbfs_parquet_file_path, src_path) dest_path = tmp_dir + parquet_file copyfile(src_path, dest_path) From 04b5f536c92f6c0330b518b8811a76fd4fc4b97f Mon Sep 17 00:00:00 2001 From: zdhruv3 Date: Wed, 5 Mar 2025 17:24:14 +0000 Subject: [PATCH 10/12] add logging statement --- src/hyperleaup/creator.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/hyperleaup/creator.py b/src/hyperleaup/creator.py index 9c23686..762a261 100644 --- a/src/hyperleaup/creator.py +++ b/src/hyperleaup/creator.py @@ -285,6 +285,8 @@ def extract_filename(dbfs_path): src_path = dbfs_tmp_dir + parquet_file dbutils.fs.cp(dbfs_parquet_file_path, src_path) + logging.info(f"Parquet file copied to: {src_path}") + print(f"Parquet file copied to: {src_path}") dest_path = tmp_dir + parquet_file copyfile(src_path, dest_path) From a7df04fda41eb54d44442fdf9c49b09b7deb48eb Mon Sep 17 00:00:00 2001 From: zdhruv3 Date: Wed, 5 Mar 2025 17:32:23 +0000 Subject: [PATCH 11/12] last attempt for today --- src/hyperleaup/creator.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/hyperleaup/creator.py b/src/hyperleaup/creator.py index 762a261..a712541 100644 --- a/src/hyperleaup/creator.py +++ b/src/hyperleaup/creator.py @@ -285,10 +285,12 @@ def extract_filename(dbfs_path): src_path = dbfs_tmp_dir + parquet_file dbutils.fs.cp(dbfs_parquet_file_path, src_path) + dest_path = tmp_dir + parquet_file logging.info(f"Parquet file copied to: {src_path}") print(f"Parquet file copied to: {src_path}") - dest_path = tmp_dir + parquet_file - copyfile(src_path, dest_path) + dbutils.fs.cp(dbfs_parquet_file_path, dest_path) + logging.info(f"Parquet file copied to: {dest_path}") + # copyfile(src_path, dest_path) return dest_path From 010b51a1742327bc75abaf5f8d498cbb681c86c4 Mon Sep 17 00:00:00 2001 From: zdhruv3 Date: Thu, 6 Mar 2025 11:08:51 +0000 Subject: [PATCH 12/12] update the logging --- src/hyperleaup/creator.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/hyperleaup/creator.py b/src/hyperleaup/creator.py index a712541..722fb6a 100644 --- a/src/hyperleaup/creator.py +++ b/src/hyperleaup/creator.py @@ -283,14 +283,12 @@ def extract_filename(dbfs_path): if not os.path.exists(tmp_dir): os.makedirs(tmp_dir) + logging.info(f"Copying Parquet file from DBFS to local disk...") src_path = dbfs_tmp_dir + parquet_file - dbutils.fs.cp(dbfs_parquet_file_path, src_path) dest_path = tmp_dir + parquet_file - logging.info(f"Parquet file copied to: {src_path}") - print(f"Parquet file copied to: {src_path}") - dbutils.fs.cp(dbfs_parquet_file_path, dest_path) - logging.info(f"Parquet file copied to: {dest_path}") - # copyfile(src_path, dest_path) + logging.info(f"Source path is {src_path}") + logging.info(f"Dest path is {dest_path}") + copyfile(src_path, dest_path) return dest_path