From bea202a6e8f6fab79985a84de8167f6fee55d6f0 Mon Sep 17 00:00:00 2001 From: Victor Lucio Date: Wed, 3 Sep 2025 18:24:13 -0300 Subject: [PATCH 1/4] =?UTF-8?q?=E2=9C=A8=20Add=20upsert=20from=20records?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gcpde/bq.py | 122 +++++++++++++++++++++++++++++++++++- tests/unit/test_bq.py | 139 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 258 insertions(+), 3 deletions(-) diff --git a/gcpde/bq.py b/gcpde/bq.py index 8f37b23..7322af0 100644 --- a/gcpde/bq.py +++ b/gcpde/bq.py @@ -55,13 +55,22 @@ def check_table(self, dataset: str, table: str) -> bool: True if the table exists, False otherwise. """ - table_ref = self._create_table_reference(dataset, table) try: - self.client.get_table(table=table_ref) + self.get_table(dataset, table) return True except NotFound: return False + def get_table(self, dataset: str, table: str) -> bigquery.Table: + """Get a table from bigquery. + + Args: + dataset: dataset name. + table: table name. + """ + table_ref = self._create_table_reference(dataset, table) + return self.client.get_table(table=table_ref) + def create_table( self, dataset: str, @@ -171,6 +180,28 @@ class BigQueryClientException(Exception): """Base exception for connection or command errors.""" +class BigQuerySchemaMismatchException(Exception): + """Exception for schema mismatch.""" + + def __init__( + self, + message: str, + source_schema: list[bigquery.SchemaField], + target_schema: list[bigquery.SchemaField], + ): + super().__init__(message) + self.message = message + self.source_schema = source_schema + self.target_schema = target_schema + + def __str__(self): + return ( + f"{self.message}\n" + f"Source schema: {self.source_schema}\n" + f"Target schema: {self.target_schema}" + ) + + def delete_table( dataset: str, table: str, @@ -343,6 +374,93 @@ def create_or_replace_table_as( logger.info("Command executed!") +def upsert_table_from_records( + dataset: str, + table: str, + records: ListJsonType, + key_field: str, + json_key: dict[str, str] | None = None, + insert_chunk_size: int | None = None, + client: BigQueryClient | None = None, +) -> None: + """Upsert records into a table. + + This function performs an upsert (update/insert) operation by: + 1. Creating a temporary table with the new records + 2. Deleting matching records from target table based on key_field + 3. Inserting the new records into target table + 4. Cleaning up temporary table + + Args: + dataset: dataset name. + table: table name. + records: records to be upserted. + key_field: field used to match records for update. + json_key: json key with gcp credentials. + insert_chunk_size: chunk size for batch inserts. + client: client to connect to BigQuery. + + Raises: + BigQuerySchemaMismatchException: if schema of new records doesn't match table. + BigQueryClientException: if schema cannot be inferred from records. + """ + client = client or BigQueryClient(json_key=json_key or {}) + + if not records: + logger.warning("No records to create a table from! (empty collection given)") + return + + data_schema_json = _create_schema_from_records(records=records or []) + data_schema_bq = [ + bigquery.SchemaField.from_api_repr(field) for field in data_schema_json + ] + + table_schema_bq = client.get_table(dataset, table).schema + + if table_schema_bq != data_schema_bq: + raise BigQuerySchemaMismatchException( + message="New data schema does not match table schema", + source_schema=table_schema_bq, + target_schema=data_schema_bq, + ) + + # set up tmp table + tmp_table = table + "_tmp" + delete_table(dataset=dataset, table=tmp_table, client=client) + create_table( + dataset=dataset, table=tmp_table, schema=data_schema_json, client=client + ) + insert( + dataset=dataset, + table=tmp_table, + records=records, + client=client, + chunk_size=insert_chunk_size, + ) + + # delete records from target table + command_sql = ( + f"delete from {dataset}.{table} " + f"where {key_field} in (select {key_field} from {dataset}.{tmp_table})" + ) + + logger.info(f"Running `{command_sql}`...") + client.run_command(command_sql=command_sql) + logger.info("Command executed!") + + # insert records into target table + insert( + dataset=dataset, + table=table, + records=records, + client=client, + chunk_size=insert_chunk_size, + ) + + # clean up temporary table + delete_table(dataset=dataset, table=tmp_table, client=client) + + def replace_table( dataset: str, table: str, diff --git a/tests/unit/test_bq.py b/tests/unit/test_bq.py index d4766da..b28ec64 100644 --- a/tests/unit/test_bq.py +++ b/tests/unit/test_bq.py @@ -1,8 +1,9 @@ -from unittest.mock import Mock +from unittest.mock import Mock, patch import pandas as pd import pytest from google.api_core.exceptions import NotFound +from google.cloud.bigquery import DatasetReference, SchemaField, TableReference from gcpde import bq @@ -87,6 +88,18 @@ def test_run_command(self, bq_client: bq.BigQueryClient): # assert bq_client.client.query.assert_called_once_with(command) + def test_get_table(self, bq_client: bq.BigQueryClient): + # arrange + dataset_ref = DatasetReference(bq_client.client.project, self.dataset) + expected_table_ref = TableReference(dataset_ref, self.table) + + # act + result = bq_client.get_table(dataset=self.dataset, table=self.table) + + # assert + bq_client.client.get_table.assert_called_once_with(table=expected_table_ref) + assert result is not None + def test_create_table_from_records(): # arrange @@ -283,3 +296,127 @@ def test_create_table_from_query(): command_sql="create or replace table dataset.table as select * from table", timeout=10, ) + + +@patch("gcpde.bq.delete_table") +@patch("gcpde.bq.create_table") +@patch("gcpde.bq.insert") +def test_upsert_table_from_records(mock_insert, mock_create_table, mock_delete_table): + # arrange + mock_client = Mock(spec_set=bq.BigQueryClient) + table_tmp = "table_tmp" + table = "table" + dataset = "dataset" + + table_mock = Mock(schema=[]) + mock_client.get_table.return_value = table_mock + + schema_json = [{"name": "id", "type": "INTEGER", "mode": "NULLABLE"}] + table_mock.schema = [SchemaField.from_api_repr(field) for field in schema_json] + + command_sql = ( + "delete from dataset.table where id in (select id from dataset.table_tmp)" + ) + + # act + bq.upsert_table_from_records( + dataset=dataset, + table=table, + records=[{"id": 1}], + key_field="id", + client=mock_client, + insert_chunk_size=None, + ) + + # assert + + mock_delete_table.assert_called_with( + dataset=dataset, table=table_tmp, client=mock_client + ) + + mock_create_table.assert_called_once_with( + dataset=dataset, + table=table_tmp, + schema=schema_json, + client=mock_client, + ) + mock_insert.assert_any_call( + dataset=dataset, + table=table_tmp, + records=[{"id": 1}], + client=mock_client, + chunk_size=None, + ) + + # assert that the delete was right set + mock_client.run_command.assert_called_with(command_sql=command_sql) + + mock_insert.assert_any_call( + dataset=dataset, + table=table, + records=[{"id": 1}], + client=mock_client, + chunk_size=None, + ) + + # Assert that delete_table was not called with the main table name + for call in mock_delete_table.call_args_list: + assert call.kwargs.get("table") != table + + # cleanup was done + assert mock_delete_table.call_count == 2 + + # make sure only one insert was done to each table, we checked each call + assert mock_insert.call_count == 2 + + +def test_upsert_table_from_records_no_records(): + # arrange + mock_client = Mock(spec_set=bq.BigQueryClient) + + # act + bq.upsert_table_from_records( + dataset="dataset", table="table", records=[], key_field="id", client=mock_client + ) + + # assert + mock_client.run_command.assert_not_called() + mock_client.delete_table.assert_not_called() + mock_client.create_table.assert_not_called() + + +def test_upsert_table_from_records_schema_mismatch(): + # arrange + mock_client = Mock(spec_set=bq.BigQueryClient) + + table_mock = Mock(schema=[]) + mock_client.get_table.return_value = table_mock + + schema_json = [{"name": "uuid", "type": "STRING", "mode": "NULLABLE"}] + table_mock.schema = [SchemaField.from_api_repr(field) for field in schema_json] + + # act + with pytest.raises(bq.BigQuerySchemaMismatchException): + bq.upsert_table_from_records( + dataset="dataset", + table="table", + records=[{"id": 1}], + key_field="id", + client=mock_client, + ) + + # assert + mock_client.run_command.assert_not_called() + mock_client.delete_table.assert_not_called() + mock_client.create_table.assert_not_called() + +def test_big_query_schema_mismatch_exception(): + # arrange + source_schema = [{"name": "id"}] + target_schema = [{"name": "id"}] + + # act + exception = bq.BigQuerySchemaMismatchException(message="message", source_schema=source_schema, target_schema=target_schema) + + # assert + assert str(exception) == "message\nSource schema: [{'name': 'id'}]\nTarget schema: [{'name': 'id'}]" \ No newline at end of file From f2d2f07f084d935069690cb2a717514f200a5eee Mon Sep 17 00:00:00 2001 From: Victor Lucio Date: Wed, 3 Sep 2025 18:27:34 -0300 Subject: [PATCH 2/4] =?UTF-8?q?=F0=9F=92=84=20Fix=20style?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/test_bq.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_bq.py b/tests/unit/test_bq.py index b28ec64..391aff4 100644 --- a/tests/unit/test_bq.py +++ b/tests/unit/test_bq.py @@ -410,13 +410,19 @@ def test_upsert_table_from_records_schema_mismatch(): mock_client.delete_table.assert_not_called() mock_client.create_table.assert_not_called() + def test_big_query_schema_mismatch_exception(): # arrange source_schema = [{"name": "id"}] target_schema = [{"name": "id"}] # act - exception = bq.BigQuerySchemaMismatchException(message="message", source_schema=source_schema, target_schema=target_schema) + exception = bq.BigQuerySchemaMismatchException( + message="message", source_schema=source_schema, target_schema=target_schema + ) # assert - assert str(exception) == "message\nSource schema: [{'name': 'id'}]\nTarget schema: [{'name': 'id'}]" \ No newline at end of file + assert ( + str(exception) + == "message\nSource schema: [{'name': 'id'}]\nTarget schema: [{'name': 'id'}]" + ) From 9ed8e0c9f05b780879ec7ed073d3450a77a7b67d Mon Sep 17 00:00:00 2001 From: Victor Lucio Date: Wed, 3 Sep 2025 18:40:03 -0300 Subject: [PATCH 3/4] =?UTF-8?q?=F0=9F=93=9D=20Added=20type=20hint=20to=20t?= =?UTF-8?q?he=20new=20exception=20dunder=20method?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gcpde/bq.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gcpde/bq.py b/gcpde/bq.py index 7322af0..7894a0e 100644 --- a/gcpde/bq.py +++ b/gcpde/bq.py @@ -194,7 +194,7 @@ def __init__( self.source_schema = source_schema self.target_schema = target_schema - def __str__(self): + def __str__(self) -> str: return ( f"{self.message}\n" f"Source schema: {self.source_schema}\n" From 5e1fb4bbf96117d3731000768609abda3802f70e Mon Sep 17 00:00:00 2001 From: Victor Lucio Date: Wed, 3 Sep 2025 22:07:08 -0300 Subject: [PATCH 4/4] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20refactor=20to=20use=20?= =?UTF-8?q?merge=20and=20improve=20readability?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gcpde/bq.py | 76 +++++++++++++++++++-------------------- tests/unit/test_bq.py | 84 ++++++++++++++++++++----------------------- 2 files changed, 74 insertions(+), 86 deletions(-) diff --git a/gcpde/bq.py b/gcpde/bq.py index 7894a0e..ad2693c 100644 --- a/gcpde/bq.py +++ b/gcpde/bq.py @@ -387,9 +387,8 @@ def upsert_table_from_records( This function performs an upsert (update/insert) operation by: 1. Creating a temporary table with the new records - 2. Deleting matching records from target table based on key_field - 3. Inserting the new records into target table - 4. Cleaning up temporary table + 2. using MERGE statement to update/insert records + 3. Cleaning up temporary table Args: dataset: dataset name. @@ -404,60 +403,57 @@ def upsert_table_from_records( BigQuerySchemaMismatchException: if schema of new records doesn't match table. BigQueryClientException: if schema cannot be inferred from records. """ - client = client or BigQueryClient(json_key=json_key or {}) - if not records: logger.warning("No records to create a table from! (empty collection given)") return - data_schema_json = _create_schema_from_records(records=records or []) - data_schema_bq = [ - bigquery.SchemaField.from_api_repr(field) for field in data_schema_json - ] + client = client or BigQueryClient(json_key=json_key or {}) + tmp_table = table + "_tmp" + create_table_from_records( + dataset=dataset, + table=tmp_table, + records=records, + overwrite=True, + json_key=json_key, + client=client, + chunk_size=insert_chunk_size, + ) + + tmp_table_schema_bq = client.get_table(dataset, tmp_table).schema table_schema_bq = client.get_table(dataset, table).schema - if table_schema_bq != data_schema_bq: + if table_schema_bq != tmp_table_schema_bq: + logger.info("Cleaning up temporary table...") + delete_table(dataset=dataset, table=tmp_table, client=client) + raise BigQuerySchemaMismatchException( message="New data schema does not match table schema", source_schema=table_schema_bq, - target_schema=data_schema_bq, + target_schema=tmp_table_schema_bq, ) - # set up tmp table - tmp_table = table + "_tmp" - delete_table(dataset=dataset, table=tmp_table, client=client) - create_table( - dataset=dataset, table=tmp_table, schema=data_schema_json, client=client + update_statement = ", ".join( + [f"{field.name} = source.{field.name}" for field in table_schema_bq] ) - insert( - dataset=dataset, - table=tmp_table, - records=records, - client=client, - chunk_size=insert_chunk_size, - ) - - # delete records from target table - command_sql = ( - f"delete from {dataset}.{table} " - f"where {key_field} in (select {key_field} from {dataset}.{tmp_table})" + table_fields = ", ".join([field.name for field in table_schema_bq]) + + merge_command_sql = ( + f"MERGE INTO {dataset}.{table} AS target " + f"USING {dataset}.{tmp_table} AS source " + f"ON source.{key_field} = target.{key_field} " + f"WHEN MATCHED THEN " + f"UPDATE SET {update_statement} " + f"WHEN NOT MATCHED THEN " + f"INSERT ({table_fields}) " + f"VALUES ({table_fields})" ) - logger.info(f"Running `{command_sql}`...") - client.run_command(command_sql=command_sql) + logger.info(f"Running `{merge_command_sql}`...") + client.run_command(command_sql=merge_command_sql) logger.info("Command executed!") - # insert records into target table - insert( - dataset=dataset, - table=table, - records=records, - client=client, - chunk_size=insert_chunk_size, - ) - - # clean up temporary table + logger.info("Cleaning up temporary table...") delete_table(dataset=dataset, table=tmp_table, client=client) diff --git a/tests/unit/test_bq.py b/tests/unit/test_bq.py index 391aff4..8fa1836 100644 --- a/tests/unit/test_bq.py +++ b/tests/unit/test_bq.py @@ -299,9 +299,8 @@ def test_create_table_from_query(): @patch("gcpde.bq.delete_table") -@patch("gcpde.bq.create_table") -@patch("gcpde.bq.insert") -def test_upsert_table_from_records(mock_insert, mock_create_table, mock_delete_table): +@patch("gcpde.bq.create_table_from_records") +def test_upsert_table_from_records(mock_create_table, mock_delete_table): # arrange mock_client = Mock(spec_set=bq.BigQueryClient) table_tmp = "table_tmp" @@ -311,66 +310,57 @@ def test_upsert_table_from_records(mock_insert, mock_create_table, mock_delete_t table_mock = Mock(schema=[]) mock_client.get_table.return_value = table_mock - schema_json = [{"name": "id", "type": "INTEGER", "mode": "NULLABLE"}] + schema_json = [ + {"name": "id", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "name", "type": "STRING", "mode": "NULLABLE"}, + ] table_mock.schema = [SchemaField.from_api_repr(field) for field in schema_json] command_sql = ( - "delete from dataset.table where id in (select id from dataset.table_tmp)" + "MERGE INTO dataset.table AS target " + "USING dataset.table_tmp AS source " + "ON source.id = target.id " + "WHEN MATCHED THEN " + "UPDATE SET id = source.id, name = source.name " + "WHEN NOT MATCHED THEN " + "INSERT (id, name) " + "VALUES (id, name)" ) # act bq.upsert_table_from_records( dataset=dataset, table=table, - records=[{"id": 1}], + records=[{"id": 1, "name": "test"}, {"id": 2, "name": "test2"}], key_field="id", client=mock_client, insert_chunk_size=None, ) # assert - - mock_delete_table.assert_called_with( - dataset=dataset, table=table_tmp, client=mock_client - ) - mock_create_table.assert_called_once_with( dataset=dataset, table=table_tmp, - schema=schema_json, - client=mock_client, - ) - mock_insert.assert_any_call( - dataset=dataset, - table=table_tmp, - records=[{"id": 1}], + records=[{"id": 1, "name": "test"}, {"id": 2, "name": "test2"}], + overwrite=True, + json_key=None, client=mock_client, chunk_size=None, ) - # assert that the delete was right set - mock_client.run_command.assert_called_with(command_sql=command_sql) - - mock_insert.assert_any_call( - dataset=dataset, - table=table, - records=[{"id": 1}], - client=mock_client, - chunk_size=None, + mock_delete_table.assert_called_once_with( + dataset=dataset, table=table_tmp, client=mock_client ) - # Assert that delete_table was not called with the main table name + mock_client.run_command.assert_called_with(command_sql=command_sql) + for call in mock_delete_table.call_args_list: assert call.kwargs.get("table") != table - # cleanup was done - assert mock_delete_table.call_count == 2 - # make sure only one insert was done to each table, we checked each call - assert mock_insert.call_count == 2 - - -def test_upsert_table_from_records_no_records(): +@patch("gcpde.bq.delete_table") +@patch("gcpde.bq.create_table_from_records") +def test_upsert_table_from_records_no_records(mock_create_table, mock_delete_table): # arrange mock_client = Mock(spec_set=bq.BigQueryClient) @@ -380,22 +370,27 @@ def test_upsert_table_from_records_no_records(): ) # assert - mock_client.run_command.assert_not_called() - mock_client.delete_table.assert_not_called() - mock_client.create_table.assert_not_called() + mock_create_table.assert_not_called() + mock_delete_table.assert_not_called() -def test_upsert_table_from_records_schema_mismatch(): +@patch("gcpde.bq.delete_table") +def test_upsert_table_from_records_schema_mismatch(mock_delete_table): # arrange mock_client = Mock(spec_set=bq.BigQueryClient) - table_mock = Mock(schema=[]) - mock_client.get_table.return_value = table_mock + table_mock = Mock() + temp_table_mock = Mock() + table_mock.schema = [{"name": "uuid", "type": "STRING", "mode": "NULLABLE"}] + temp_table_mock.schema = [{"name": "id", "type": "INTEGER", "mode": "NULLABLE"}] + mock_client.get_table = ( + lambda dataset, table: table_mock if table == "table" else temp_table_mock + ) schema_json = [{"name": "uuid", "type": "STRING", "mode": "NULLABLE"}] table_mock.schema = [SchemaField.from_api_repr(field) for field in schema_json] - # act + # act/assert with pytest.raises(bq.BigQuerySchemaMismatchException): bq.upsert_table_from_records( dataset="dataset", @@ -405,10 +400,7 @@ def test_upsert_table_from_records_schema_mismatch(): client=mock_client, ) - # assert - mock_client.run_command.assert_not_called() - mock_client.delete_table.assert_not_called() - mock_client.create_table.assert_not_called() + mock_delete_table.call_count == 2 def test_big_query_schema_mismatch_exception():