Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions hyperleaup/creator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import logging
from shutil import copyfile
from typing import List, Any
from typing import List, Any, Mapping
from hyperleaup.creation_mode import CreationMode
from pyspark.sql import DataFrame
from pyspark.sql.types import *
Expand Down Expand Up @@ -93,14 +93,15 @@ def get_table_def(df: DataFrame, schema_name: str, table_name: str) -> TableDefi
)


def insert_data_into_hyper_file(data: List[Any], name: str, table_def: TableDefinition):
def insert_data_into_hyper_file(data: List[Any], name: str, table_def: TableDefinition,
hyper_process_parameters: Mapping[str, str] = None):
"""Helper function that inserts data into a .hyper file."""
# first, create a temp directory on the driver node
tmp_dir = f"/tmp/hyperleaup/{name}/"
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir)
hyper_database_path = f"/tmp/hyperleaup/{name}/{name}.hyper"
with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hp:
with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU, parameters=hyper_process_parameters) as hp:
with Connection(endpoint=hp.endpoint,
database=hyper_database_path,
create_mode=CreateMode.CREATE_AND_REPLACE) as connection:
Expand All @@ -113,10 +114,11 @@ def insert_data_into_hyper_file(data: List[Any], name: str, table_def: TableDefi
return hyper_database_path


def copy_data_into_hyper_file(csv_path: str, name: str, table_def: TableDefinition) -> str:
def copy_data_into_hyper_file(csv_path: str, name: str, table_def: TableDefinition,
hyper_process_parameters: Mapping[str, str] = None) -> str:
"""Helper function that copies data from a CSV file to a .hyper file."""
hyper_database_path = f"/tmp/hyperleaup/{name}/{name}.hyper"
with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hp:
with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU, parameters=hyper_process_parameters) as hp:
with Connection(endpoint=hp.endpoint,
database=Path(hyper_database_path),
create_mode=CreateMode.CREATE_AND_REPLACE) as connection:
Expand All @@ -132,11 +134,12 @@ def copy_data_into_hyper_file(csv_path: str, name: str, table_def: TableDefiniti
return hyper_database_path


def copy_parquet_to_hyper_file(parquet_path: str, name: str, table_def: TableDefinition) -> str:
def copy_parquet_to_hyper_file(parquet_path: str, name: str, table_def: TableDefinition,
hyper_process_parameters: Mapping[str, str] = None) -> str:
"""Helper function that copies data from a Parquet file to a .hyper file."""
hyper_database_path = f"/tmp/hyperleaup/{name}/{name}.hyper"

with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hp:
with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU, parameters=hyper_process_parameters) as hp:
with Connection(endpoint=hp.endpoint,
database=Path(hyper_database_path),
create_mode=CreateMode.CREATE_AND_REPLACE) as connection:
Expand Down Expand Up @@ -258,14 +261,16 @@ class Creator:
def __init__(self, df: DataFrame, name: str,
is_dbfs_enabled: bool = False,
creation_mode: str = CreationMode.COPY.value,
null_values_replacement = None):
null_values_replacement = None,
hyper_process_parameters: Mapping[str, str] = None):
if null_values_replacement is None:
null_values_replacement = {}
self.df = df
self.name = name
self.is_dbfs_enabled = is_dbfs_enabled
self.creation_mode = creation_mode
self.null_values_replacement = null_values_replacement
self.hyper_process_parameters = hyper_process_parameters

def create(self) -> str:
"""Creates a Tableau Hyper File given a SQL statement"""
Expand All @@ -285,7 +290,7 @@ def create(self) -> str:

# COPY data into a Tableau .hyper file
logging.info("Copying data into Hyper File...")
database_path = copy_data_into_hyper_file(csv_path, self.name, table_def)
database_path = copy_data_into_hyper_file(csv_path, self.name, table_def, self.hyper_process_parameters)

elif self.creation_mode.upper() == CreationMode.INSERT.value:

Expand All @@ -299,7 +304,7 @@ def create(self) -> str:

# Insert data into a Tableau .hyper file
logging.info("Inserting data into Hyper File...")
database_path = insert_data_into_hyper_file(data, self.name, table_def)
database_path = insert_data_into_hyper_file(data, self.name, table_def, self.hyper_process_parameters)

elif self.creation_mode.upper() == CreationMode.PARQUET.value:

Expand All @@ -317,7 +322,8 @@ def create(self) -> str:

# COPY data into a Tableau .hyper file
logging.info("Copying data into Hyper File...")
database_path = copy_parquet_to_hyper_file(parquet_path, self.name, table_def)
database_path = copy_parquet_to_hyper_file(parquet_path, self.name, table_def,
self.hyper_process_parameters)

else:
raise ValueError(f'Invalid "creation_mode" specified: {self.creation_mode}')
Expand Down
21 changes: 14 additions & 7 deletions hyperleaup/hyper_file.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import logging
from shutil import copyfile
from typing import Mapping

from pyspark.sql import DataFrame
from tableauhyperapi import HyperProcess, Telemetry, Connection, CreateMode, Inserter
Expand All @@ -22,7 +23,8 @@ def __init__(self, name: str,
sql: str = None, df: DataFrame = None,
is_dbfs_enabled: bool = False,
creation_mode: str = CreationMode.PARQUET.value,
null_values_replacement: dict = None):
null_values_replacement: dict = None,
hyper_process_parameters: Mapping[str, str] = None):
self.name = name
# Create a DataFrame from Spark SQL
if sql is not None and df is None:
Expand All @@ -33,6 +35,7 @@ def __init__(self, name: str,
self.creation_mode = creation_mode
self.is_dbfs_enabled = is_dbfs_enabled
self.null_values_replacement = null_values_replacement
self.hyper_process_parameters = hyper_process_parameters
# Do not create a Hyper File if loading an existing Hyper File
if sql is None and df is None:
self.path = None
Expand All @@ -41,12 +44,14 @@ def __init__(self, name: str,
self.name,
self.is_dbfs_enabled,
self.creation_mode,
self.null_values_replacement).create()
self.null_values_replacement,
self.hyper_process_parameters).create()
self.luid = None

def print_rows(self):
"""Prints the first 1,000 rows of a Hyper file"""
with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hp:
with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU,
parameters=self.hyper_process_parameters) as hp:
with Connection(endpoint=hp.endpoint, database=self.path) as connection:
rows = connection.execute_list_query(f"SELECT * FROM {TableName('Extract', 'Extract')} LIMIT 1000")
print("Showing first 1,000 rows")
Expand All @@ -55,7 +60,8 @@ def print_rows(self):

def print_table_def(self, schema: str = "Extract", table: str = "Extract"):
"""Prints the table definition for a table in a Hyper file."""
with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hp:
with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU,
parameters=self.hyper_process_parameters) as hp:
with Connection(endpoint=hp.endpoint, database=self.path) as connection:
table_name = TableName(schema, table)
table_definition = connection.catalog.get_table_definition(name=table_name)
Expand Down Expand Up @@ -108,7 +114,7 @@ def save(self, path: str) -> str:
return copyfile(self.path, dest_path)

@staticmethod
def load(path: str, is_dbfs_enabled: bool = False):
def load(path: str, is_dbfs_enabled: bool = False, hyper_process_parameters: Mapping[str, str] = None):
"""Loads a Hyper File from a source path to a temp dir"""
# Guard against invalid paths
if path.lower().startswith("s3"):
Expand Down Expand Up @@ -151,7 +157,7 @@ def load(path: str, is_dbfs_enabled: bool = False):
hyper_file_path = path

# Create a HyperFile object with existing Hyper File path
hf = HyperFile(name=name, is_dbfs_enabled=is_dbfs_enabled)
hf = HyperFile(name=name, is_dbfs_enabled=is_dbfs_enabled, hyper_process_parameters=hyper_process_parameters)
hf.path = hyper_file_path

return hf
Expand All @@ -174,7 +180,8 @@ def append(self, sql: str = None, df: DataFrame = None):
# Insert, the new data into Hyper File
hyper_database_path = self.path
logging.info(f'Inserting new data into Hyper database: {hyper_database_path}')
with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hp:
with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU,
parameters=self.hyper_process_parameters) as hp:
with Connection(endpoint=hp.endpoint,
database=hyper_database_path,
create_mode=CreateMode.NONE) as connection:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
tableauserverclient==0.17.0
pyspark==3.1.3
requests==2.26.0
tableauhyperapi==0.0.13129
tableauhyperapi==0.0.16638
urllib3==1.26.6
29 changes: 28 additions & 1 deletion tests/test_hyper_file.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
from hyperleaup import HyperFile
from hyperleaup.spark_fixture import get_spark_session

from tests.test_utils import TestUtils


Expand Down Expand Up @@ -98,3 +97,31 @@ def test_append(self):
hf.append(df=df)
num_rows = TestUtils.get_row_count("Extract", "Extract", "/tmp/save/employees.hyper")
assert(num_rows == 6)

def test_hyper_process_parameters(self):
data_path = "/tmp/process_parameters"

log_dir = "/tmp/logs"
log_file = f"{log_dir}/hyperd.log"
if not os.path.exists(log_dir):
os.mkdir(log_dir)

data = [
(1001, "Jane", "Doe", "2000-05-01", 29, False),
(1002, "John", "Doe", "1988-05-03", 29, False),
(2201, "Elonzo", "Smith", "1990-05-03", 29, True)
]
df = get_spark_session().createDataFrame(data, ["id", "first_name", "last_name", "dob", "age", "is_temp"])

hyper_process_parameters = {"log_dir": log_dir}

for mode in ["insert", "copy", "parquet"]:
if os.path.exists(log_file):
os.remove(log_file)

HyperFile(name="employees", df=df, is_dbfs_enabled=False, creation_mode=mode,
hyper_process_parameters=hyper_process_parameters).save(data_path)

# Make sure that the logs have been created in the non-standard location
assert(os.path.exists(log_file))
assert(os.path.isfile(log_file))