diff --git a/src/hyperleaup/creator.py b/src/hyperleaup/creator.py index c4b57ae..722fb6a 100644 --- a/src/hyperleaup/creator.py +++ b/src/hyperleaup/creator.py @@ -11,6 +11,7 @@ Inserter, Connection, CreateMode from pathlib import Path from databricks.sdk.runtime import * +import time def clean_dataframe(df: DataFrame, allow_nulls=False, convert_decimal_precision=False) -> DataFrame: """Replaces null or NaN values with '' and 0s""" @@ -245,19 +246,36 @@ def write_parquet_to_dbfs(df: DataFrame, name: str, allow_nulls = False, convert """Writes and moves a single Parquet file written to a Databricks Filesystem to a temp directory on the driver node.""" tmp_dir = f"/tmp/hyperleaup/{name}/" - cleaned_df = clean_dataframe(df, allow_nulls, convert_decimal_precision) - + cleaned_df = clean_dataframe(df, allow_nulls, convert_decimal_precision) # write the DataFrame to DBFS as a single Parquet file - cleaned_df.coalesce(1).write \ - .mode("overwrite").parquet(tmp_dir) - + cleaned_df.coalesce(1).write.mode("overwrite").parquet(tmp_dir) + time.sleep(5) dbfs_tmp_dir = "/dbfs" + tmp_dir + files = dbutils.fs.ls(tmp_dir) + print(files) + print(dbfs_tmp_dir) + if files is None: + logging.info(f"Parquet path '{tmp_dir}' not found on DBFS.") + + dbfs_tmp_dir_list = "dbfs:" + tmp_dir parquet_file = None - for root_dir, dirs, files in os.walk(dbfs_tmp_dir): - for file in files: - if file.endswith(".parquet"): - parquet_file = file + # List files in the DBFS directory + files_info = dbutils.fs.ls(dbfs_tmp_dir_list) + + for file_info in files_info: + if file_info.name.endswith(".parquet"): + parquet_file = file_info.path + print(f"Found parquet file: {parquet_file}") + break # Stop after finding the first .parquet file + + def extract_filename(dbfs_path): + # Extract the filename using os.path.basename + filename = os.path.basename(dbfs_path) + return filename + + dbfs_parquet_file_path = parquet_file + parquet_file = extract_filename(parquet_file) if parquet_file is None: raise FileNotFoundError(f"Parquet file '{tmp_dir}' not found on DBFS.") @@ -265,8 +283,11 @@ def write_parquet_to_dbfs(df: DataFrame, name: str, allow_nulls = False, convert if not os.path.exists(tmp_dir): os.makedirs(tmp_dir) + logging.info(f"Copying Parquet file from DBFS to local disk...") src_path = dbfs_tmp_dir + parquet_file dest_path = tmp_dir + parquet_file + logging.info(f"Source path is {src_path}") + logging.info(f"Dest path is {dest_path}") copyfile(src_path, dest_path) return dest_path diff --git a/src/hyperleaup/hyper_file.py b/src/hyperleaup/hyper_file.py index 4a9a474..b57096b 100644 --- a/src/hyperleaup/hyper_file.py +++ b/src/hyperleaup/hyper_file.py @@ -10,7 +10,7 @@ from hyperleaup.hyper_utils import HyperUtils from hyperleaup.publisher import Publisher from hyperleaup.spark_fixture import get_spark_session - +import time def get_spark_dataframe(sql) -> DataFrame: return get_spark_session().sql(sql) @@ -18,12 +18,14 @@ def get_spark_dataframe(sql) -> DataFrame: class HyperFile: - def __init__(self, name: str, - sql: str = None, df: DataFrame = None, - is_dbfs_enabled: bool = False, - creation_mode: str = CreationMode.PARQUET.value, - null_values_replacement: dict = None, - config: HyperFileConfig = HyperFileConfig()): + def __init__( + self, name: str, + sql: str = None, df: DataFrame = None, + is_dbfs_enabled: bool = False, + creation_mode: str = CreationMode.PARQUET.value, + null_values_replacement: dict = None, + config: HyperFileConfig = HyperFileConfig() + ): self.name = name # Create a DataFrame from Spark SQL if sql is not None and df is None: @@ -39,12 +41,14 @@ def __init__(self, name: str, if sql is None and df is None: self.path = None else: - self.path = Creator(self.df, - self.name, - self.is_dbfs_enabled, - self.creation_mode, - self.null_values_replacement, - self.config).create() + self.path = Creator( + self.df, + self.name, + self.is_dbfs_enabled, + self.creation_mode, + self.null_values_replacement, + self.config + ).create() self.luid = None def print_rows(self): @@ -108,6 +112,17 @@ def save(self, path: str) -> str: logging.info(f'Saving Hyper File to new location: {dest_path}') + timeout = 60 # 1 minute + start_time = time.time() + + while not os.path.exists(self.path): + if time.time() - start_time > timeout: + raise FileNotFoundError(f"File {self.path} not found within the timeout period.") + print(f"Waiting for the file {self.path} to be written...") + time.sleep(1) # wait for a second before checking again + + print(f"File {self.path} exists, proceeding.") + return copyfile(self.path, dest_path) @staticmethod