Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions src/hyperleaup/creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
Inserter, Connection, CreateMode
from pathlib import Path
from databricks.sdk.runtime import *
import time

def clean_dataframe(df: DataFrame, allow_nulls=False, convert_decimal_precision=False) -> DataFrame:
"""Replaces null or NaN values with '' and 0s"""
Expand Down Expand Up @@ -245,28 +246,48 @@ def write_parquet_to_dbfs(df: DataFrame, name: str, allow_nulls = False, convert
"""Writes and moves a single Parquet file written to a Databricks Filesystem to a temp directory on the driver node."""
tmp_dir = f"/tmp/hyperleaup/{name}/"

cleaned_df = clean_dataframe(df, allow_nulls, convert_decimal_precision)

cleaned_df = clean_dataframe(df, allow_nulls, convert_decimal_precision)
# write the DataFrame to DBFS as a single Parquet file
cleaned_df.coalesce(1).write \
.mode("overwrite").parquet(tmp_dir)

cleaned_df.coalesce(1).write.mode("overwrite").parquet(tmp_dir)
time.sleep(5)
dbfs_tmp_dir = "/dbfs" + tmp_dir
files = dbutils.fs.ls(tmp_dir)
print(files)
print(dbfs_tmp_dir)
if files is None:
logging.info(f"Parquet path '{tmp_dir}' not found on DBFS.")

dbfs_tmp_dir_list = "dbfs:" + tmp_dir
parquet_file = None
for root_dir, dirs, files in os.walk(dbfs_tmp_dir):
for file in files:
if file.endswith(".parquet"):
parquet_file = file
# List files in the DBFS directory
files_info = dbutils.fs.ls(dbfs_tmp_dir_list)

for file_info in files_info:
if file_info.name.endswith(".parquet"):
parquet_file = file_info.path
print(f"Found parquet file: {parquet_file}")
break # Stop after finding the first .parquet file

def extract_filename(dbfs_path):
# Extract the filename using os.path.basename
filename = os.path.basename(dbfs_path)
return filename

dbfs_parquet_file_path = parquet_file

parquet_file = extract_filename(parquet_file)
if parquet_file is None:
raise FileNotFoundError(f"Parquet file '{tmp_dir}' not found on DBFS.")

# Copy Parquet file from DBFS location to temp dir on driver node
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir)

logging.info(f"Copying Parquet file from DBFS to local disk...")
src_path = dbfs_tmp_dir + parquet_file
dest_path = tmp_dir + parquet_file
logging.info(f"Source path is {src_path}")
logging.info(f"Dest path is {dest_path}")
copyfile(src_path, dest_path)

return dest_path
Expand Down
41 changes: 28 additions & 13 deletions src/hyperleaup/hyper_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,22 @@
from hyperleaup.hyper_utils import HyperUtils
from hyperleaup.publisher import Publisher
from hyperleaup.spark_fixture import get_spark_session

import time

def get_spark_dataframe(sql) -> DataFrame:
return get_spark_session().sql(sql)


class HyperFile:

def __init__(self, name: str,
sql: str = None, df: DataFrame = None,
is_dbfs_enabled: bool = False,
creation_mode: str = CreationMode.PARQUET.value,
null_values_replacement: dict = None,
config: HyperFileConfig = HyperFileConfig()):
def __init__(
self, name: str,
sql: str = None, df: DataFrame = None,
is_dbfs_enabled: bool = False,
creation_mode: str = CreationMode.PARQUET.value,
null_values_replacement: dict = None,
config: HyperFileConfig = HyperFileConfig()
):
self.name = name
# Create a DataFrame from Spark SQL
if sql is not None and df is None:
Expand All @@ -39,12 +41,14 @@ def __init__(self, name: str,
if sql is None and df is None:
self.path = None
else:
self.path = Creator(self.df,
self.name,
self.is_dbfs_enabled,
self.creation_mode,
self.null_values_replacement,
self.config).create()
self.path = Creator(
self.df,
self.name,
self.is_dbfs_enabled,
self.creation_mode,
self.null_values_replacement,
self.config
).create()
self.luid = None

def print_rows(self):
Expand Down Expand Up @@ -108,6 +112,17 @@ def save(self, path: str) -> str:

logging.info(f'Saving Hyper File to new location: {dest_path}')

timeout = 60 # 1 minute
start_time = time.time()

while not os.path.exists(self.path):
if time.time() - start_time > timeout:
raise FileNotFoundError(f"File {self.path} not found within the timeout period.")
print(f"Waiting for the file {self.path} to be written...")
time.sleep(1) # wait for a second before checking again

print(f"File {self.path} exists, proceeding.")

return copyfile(self.path, dest_path)

@staticmethod
Expand Down