-
Notifications
You must be signed in to change notification settings - Fork 0
Hive Partitioning
Our initial file structure for the data on AWS S3 was a simple one. We stored data in the following manner:
├── Database 1
├── Table 1
├── Parquet File 1
├── Parquet File 2
├── .
└── .
├── Table 2
├── Parquet File 1
├── Parquet File 2
├── .
└── .
├── .
└── .
├── Database 2
├── Table 1
├── Parquet File 1
├── Parquet File 2
├── .
└── .
├── Table 2
├── Parquet File 1
├── Parquet File 2
├── .
└── .
├── .
└── .
├── .
└── .
But as this file structure neither had any kind of index nor it offered data within specific time range, the backend team (the core technology that they were using was DuckDB) had to fetch the entire data (millions of rows) and then extract the required data, this in turn made the API very slow.
To improve the performance & response time of the APIs, we did some research and we found a very easy method to do so. The DuckDB documentation suggested storing the data in a Hive Partitioning Structure, which is as follows:
To achieve the hive partitioning file structure, we had thought of two ways:
-
Using AWS Glue - We had already performed the data migration from Google BigQuery to AWS S3, so had some idea on how we can use this approach to get the desired result.
-
Using DuckDB - In this approach we thought of fetching the entire data from AWS S3, reconstructing the tables, extracting month wise data and storing the data in the new file structure format.
The cloud team had a better understanding for the first approach while the backend team had a better understanding for the second approach. Since the backend team already had a lot of work, we decided that the cloud team should go ahead with the first approach. Following are the detailed descriptions of each method followed.
For the hive partitioning process using AWS Glue, the cloud team wrote a script which involves two main subparts, Script 1 and Script 2.
Script 1 is responsible for determining the date range within the dataset, essential for defining partition boundaries. Here is a shorthand pseudocode to understand the logic -
dataset = 'bluesky_social'
table_names = ["likes"]
for table_name in table_names:
# SQL Query to find time range
SqlQuery172 = "SELECT MIN(date) AS min_date, MAX(date) AS max_date FROM myDataSource GROUP BY EXTRACT(year FROM date) HAVING COUNT(*) >= 3"
# Execute SQL query
SQLQuery_node1703166571931 = ExecuteSQL(SqlQuery172)
# Write result to Amazon S3
WriteToS3(SQLQuery_node1703166571931, f"s3://arbiter.datasets/partitioned-data/{dataset}/{table_name}/test/time-range/")
# Commit the job
CommitJob()In simpler terms, Script 1 performs the following steps:
- It queries the Google BigQuery dataset to determine the minimum and maximum dates available.
- The retrieved date range is written to Amazon S3 in a specified format.
- Once completed, the job is committed, finalizing the process.
With the date range determined, proceed to Script 2 for actual data partitioning. This script iterates through each month within the date range and creates partitions accordingly.
# Initialize dataset and table_name variables
dataset = 'bluesky_social'
table_name = "likes"
# Define minimum and maximum dates
min_date = '2014-01-01'
max_date = '2024-09-11'
# Calculate number of months
no_of_months = calculate_months(min_date, max_date)
# Iterate through each month
for i in range(0, no_of_months):
# Calculate from_date and to_date for the current month
from_date, to_date = calculate_month_range(min_date, i)
# Execute SQL query to fetch data within the specified date range
query_result = execute_sql_query(from_date, to_date)
# Write the retrieved data to Amazon S3 with appropriate partitioning
write_to_s3(query_result, dataset, table_name, from_date, to_date)
# Commit the job
commit_job()In simpler terms, Script 2 performs the following steps:
- It iterates through each month within a specified date range.
- For each month, it calculates the start and end dates.
- It executes an SQL query to fetch data within that month's date range.
- The retrieved data is written to Amazon S3 in a partitioned format.
- Finally, the job is committed to finalize the partitioning process.
Note: Script 2 must be run for each table individually for successful partitioning.
The process of performing Hive Partitioning using DuckDB, a SQL query engine, on data stored in an S3 bucket, consists of 2 steps.
The initial step involves establishing a connection to DuckDB and configuring the necessary settings for accessing the S3 bucket.
# Establish connection to DuckDB
conn = duckdb.connect(read_only=False)
# Install and load HTTPFS extension for DuckDB
conn.execute("SET home_directory='arbiter';")
conn.execute("INSTALL httpfs; LOAD httpfs;")
# Set S3 configuration for HTTPFS
conn.execute(f"SET s3_region='{aws_region}';")
conn.execute(f"SET s3_access_key_id='{aws_access_key_id}';")
conn.execute(f"SET s3_secret_access_key='{aws_secret_access_key}';")The next step involves retrieving data from the specified S3 source folder, partitioning it based on date attributes, and saving the partitioned data locally.
# Create a temporary directory to store partitioned data locally
local_temp_dir = tempfile.mkdtemp()
# Retrieve list of objects from the S3 source folder
s3_objects = s3.list_objects(Bucket=s3_bucket_name, Prefix=source_folder)
# Iterate through each object
for obj in s3_objects.get('Contents', []):
key = obj['Key']
if key.endswith('.parquet'):
# Download Parquet file to the temporary directory
local_file_path = os.path.join(local_temp_dir, key.split('/')[-1])
s3.download_file(s3_bucket_name, key, local_file_path)
# Read Parquet file using PyArrow
parquet_file = pq.ParquetFile(local_file_path)
table = parquet_file.read().to_pandas()
# Partition data based on date attribute
for date, group in table.groupby(table['date'].dt.date):
date_str = date.strftime('%Y-%m-%d')
year = date.strftime('%Y')
month = date.strftime('%m')
destination_path = os.path.join('.', destination_folder, "YEAR="+year, "MONTH="+month)
# Write partitioned data to Parquet files locally
if not os.path.exists(destination_path):
os.makedirs(destination_path)
destination_key = os.path.join(destination_path, f"{key.split('/')[-1].split('.')[0]}_{date_str}.parquet")
group.to_parquet(destination_key)
# Close DuckDB connection
conn.close()The steps involved are as follows
-
Retrieve Data from S3:
- The script starts by retrieving a list of objects from the specified S3 source folder (
source_folder). This is achieved using thelist_objectsmethod provided by theboto3library, which enables interaction with AWS services.
- The script starts by retrieving a list of objects from the specified S3 source folder (
-
Iterate Through Objects:
- For each object found in the S3 source folder, the script iterates through and checks if the object ends with the ".parquet" extension. This extension is common for Parquet files, a columnar storage format suitable for large-scale data processing.
-
Download Parquet Files:
- If the object is a Parquet file, the script proceeds to download it to a temporary local directory (
local_temp_dir). Thedownload_filemethod provided by theboto3library is used for downloading objects from S3 to the local file system.
- If the object is a Parquet file, the script proceeds to download it to a temporary local directory (
-
Read Parquet File:
- Once downloaded, the Parquet file is read using the
pyarrow.parquet.ParquetFileclass provided by the PyArrow library. This class allows for reading Parquet files efficiently.
- Once downloaded, the Parquet file is read using the
-
Partition Data Based on Date Attribute:
- After reading the Parquet file, the script partitions the data based on a date attribute (
datecolumn). This is typically done to optimize querying and analysis based on date ranges. - For each unique date value in the dataset, the script creates a separate partition.
- Partitions are organized into folders based on the year and month of the date attribute. This hierarchical structure helps in efficient data retrieval and management.
- After reading the Parquet file, the script partitions the data based on a date attribute (
-
Write Partitioned Data to Parquet Files Locally:
- For each partition, the script creates a destination path where the partitioned data will be saved.
- If the destination path does not exist, it is created using
os.makedirs(). - The partitioned data is then written to Parquet files within the corresponding destination path using the
to_parquetmethod provided by the Pandas library.
-
Cleanup and Close Connection:
- Finally, after completing the partitioning process, the script cleans up the temporary resources and closes the connection to DuckDB.
The Hive Partitioning process using DuckDB enables cost efficient organization and management of large datasets stored in AWS S3 buckets. By partitioning data based on specific attributes, querying and analyzing the data become more streamlined and optimized for performance.
Note: After partitioning the data locally, the next step involves uploading the partitioned data back to the AWS S3 bucket, which may encounter slow transfer speeds due to limitations in network bandwidth.
In this analysis, we compared the query performance between accessing normal data and partitioned data for the tweets_check table in the TruthSocial dataset. By executing queries on both types of data and measuring the time taken to retrieve results, we aimed to assess the impact of Hive partitioning on query efficiency.
- Query on Normal Data
start = '2022-11-01'
end = '2022-11-30'
file_path_1 = r's3://arbiter.datasets/data/truthsocial_data/tweets_check/*.parquet'
query = f"""SELECT * FROM read_parquet('{file_path2}') WHERE created_at BETWEEN '{start}' AND '{end}';"""- Query on Partitioned Data
file_path = r's3://arbiter.datasets/partitioned-data/truthsocial_data/tweets_check/*/*/*.parquet'
query = f"""SELECT * as tweets FROM read_parquet('{file_path}', hive_partitioning=1) WHERE year=2022 AND month=11;"""The query execution times were as follows:
- For normal data: 34.87 seconds
- For partitioned data: 3.46 seconds
The performance comparison reveals a significant improvement in query execution time when accessing partitioned data compared to normal data. By leveraging Hive partitioning, the query execution time is reduced by approximately 90.08% 🎉, highlighting a substantial enhancement in efficiency. This underscores the importance and highlights the efficiency gains achieved through hive partitioning, making it a valuable technique for optimizing query performance in large datasets.
