From fb14ae79ce8878210a00af4d2b1c3ce37cd2bc34 Mon Sep 17 00:00:00 2001 From: Victor Lucio Date: Fri, 5 Sep 2025 14:37:44 -0300 Subject: [PATCH 1/3] =?UTF-8?q?=E2=9C=A8=E2=99=BB=EF=B8=8F=20Update=20upse?= =?UTF-8?q?rt=20to=20create=20target=20if=20it=20doen't=20exist,=20depreca?= =?UTF-8?q?te=20json=20schema=20usage,=20use=20target=20table=20schema=20t?= =?UTF-8?q?o=20upsert.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gcpde/bq.py | 75 +++++++++++++++++++++++++-------- tests/unit/test_bq.py | 98 +++++++++++++++++++++++++++++-------------- 2 files changed, 124 insertions(+), 49 deletions(-) diff --git a/gcpde/bq.py b/gcpde/bq.py index ad2693c..0d7a33f 100644 --- a/gcpde/bq.py +++ b/gcpde/bq.py @@ -75,7 +75,7 @@ def create_table( self, dataset: str, table: str, - schema: list[dict[str, str]], + schema: list[bigquery.SchemaField], ) -> None: """Create a new table on bigquery. @@ -94,7 +94,7 @@ def create_table( table_ref = self._create_table_reference(dataset, table) table_obj = bigquery.Table( table_ref=table_ref, - schema=[bigquery.SchemaField.from_api_repr(field) for field in schema], + schema=schema, ) self.client.create_table(table_obj) @@ -228,7 +228,7 @@ def delete_table( return -def _create_schema_from_records(records: ListJsonType) -> list[dict[str, str]]: +def _create_schema_from_records(records: ListJsonType) -> list[bigquery.SchemaField]: generator = SchemaGenerator( input_format="dict", keep_nulls=True, @@ -241,9 +241,12 @@ def _create_schema_from_records(records: ListJsonType) -> list[dict[str, str]]: raise BigQueryClientException( f"Can't infer schema from records, error: {error_logs}" ) - output: list[dict[str, str]] = generator.flatten_schema(schema_map) + output_json: list[dict[str, str]] = generator.flatten_schema(schema_map) + output_api: list[bigquery.SchemaField] = [ + bigquery.SchemaField.from_api_repr(field) for field in output_json + ] logger.debug("Schema generator complete!") - return output + return output_api @tenacity.retry( @@ -258,7 +261,7 @@ def _create_schema_from_records(records: ListJsonType) -> list[dict[str, str]]: def create_table( dataset: str, table: str, - schema: list[dict[str, str]] | None = None, + schema: list[bigquery.SchemaField] | None = None, schema_from_records: ListJsonType | None = None, json_key: dict[str, str] | None = None, client: BigQueryClient | None = None, @@ -270,7 +273,7 @@ def create_table( Args: dataset: dataset name. table: table name. - schema: json dict schema for the table. + schema: json dict schema for the table or list of bigquery.SchemaField. https://cloud.google.com/bigquery/docs/schemas#creating_a_json_schema_file schema_from_records: infer schema from a records sample. json_key: json key with gcp credentials. @@ -382,6 +385,7 @@ def upsert_table_from_records( json_key: dict[str, str] | None = None, insert_chunk_size: int | None = None, client: BigQueryClient | None = None, + use_target_schema: bool = True, ) -> None: """Upsert records into a table. @@ -390,6 +394,9 @@ def upsert_table_from_records( 2. using MERGE statement to update/insert records 3. Cleaning up temporary table + > If the target table doesn't exist, it will be created using + inferred schema from records. + Args: dataset: dataset name. table: table name. @@ -398,17 +405,33 @@ def upsert_table_from_records( json_key: json key with gcp credentials. insert_chunk_size: chunk size for batch inserts. client: client to connect to BigQuery. + use_target_schema: whether to use the schema of the target table or + infer from records for the temporary table. Raises: BigQuerySchemaMismatchException: if schema of new records doesn't match table. BigQueryClientException: if schema cannot be inferred from records. """ + client = client or BigQueryClient(json_key=json_key or {}) + tmp_table = table + "_tmp" + if not records: logger.warning("No records to create a table from! (empty collection given)") return - client = client or BigQueryClient(json_key=json_key or {}) - tmp_table = table + "_tmp" + try: + table_schema_bq = client.get_table(dataset, table).schema + except NotFound: + create_table_from_records( + dataset=dataset, + table=table, + records=records, + overwrite=False, + json_key=json_key, + client=client, + chunk_size=insert_chunk_size, + ) + return create_table_from_records( dataset=dataset, @@ -418,10 +441,14 @@ def upsert_table_from_records( json_key=json_key, client=client, chunk_size=insert_chunk_size, + schema=table_schema_bq if use_target_schema else None, ) - tmp_table_schema_bq = client.get_table(dataset, tmp_table).schema - table_schema_bq = client.get_table(dataset, table).schema + tmp_table_schema_bq = ( + table_schema_bq + if use_target_schema + else client.get_table(dataset, tmp_table).schema + ) if table_schema_bq != tmp_table_schema_bq: logger.info("Cleaning up temporary table...") @@ -429,8 +456,8 @@ def upsert_table_from_records( raise BigQuerySchemaMismatchException( message="New data schema does not match table schema", - source_schema=table_schema_bq, - target_schema=tmp_table_schema_bq, + source_schema=tmp_table_schema_bq, + target_schema=table_schema_bq, ) update_statement = ", ".join( @@ -461,7 +488,7 @@ def replace_table( dataset: str, table: str, records: ListJsonType, - schema: list[dict[str, str]] | None = None, + schema: list[bigquery.SchemaField] | None = None, chunk_size: int | None = None, json_key: dict[str, str] | None = None, client: BigQueryClient | None = None, @@ -471,7 +498,13 @@ def replace_table( tmp_table = table + "_tmp" delete_table(dataset=dataset, table=tmp_table, client=client) - create_table(dataset=dataset, table=tmp_table, schema=schema, client=client) + create_table( + dataset=dataset, + table=tmp_table, + schema_from_records=records, + schema=schema, + client=client, + ) insert( dataset=dataset, table=tmp_table, @@ -496,6 +529,7 @@ def create_table_from_records( json_key: dict[str, str] | None = None, client: BigQueryClient | None = None, chunk_size: int | None = None, + schema: list[bigquery.SchemaField] | None = None, ) -> None: """Create or replace a table from a collection of records. @@ -507,12 +541,13 @@ def create_table_from_records( json_key: json key with gcp credentials. client: client to connect to gcp. chunk_size: chunk size number to send to GCP API. + schema: json dict schema for the table or list of bigquery.SchemaField. + if None, it will be inferred from the records. """ if not records: logger.warning("No records to create a table from! (empty collection given)") return - schema = _create_schema_from_records(records=records or []) client = client or BigQueryClient(json_key=json_key or {}) if overwrite: @@ -526,7 +561,13 @@ def create_table_from_records( ) return - create_table(dataset=dataset, table=table, schema=schema, client=client) + create_table( + dataset=dataset, + table=table, + schema_from_records=records, + schema=schema, + client=client, + ) insert( dataset=dataset, table=table, diff --git a/tests/unit/test_bq.py b/tests/unit/test_bq.py index 8fa1836..a42244f 100644 --- a/tests/unit/test_bq.py +++ b/tests/unit/test_bq.py @@ -112,33 +112,37 @@ def test_create_table_from_records(): {"json_col": {"col3": "abc"}}, ] target_schema = [ - { - "name": "id", - "type": "INTEGER", - "mode": "NULLABLE", - }, - { - "name": "json_col", - "type": "RECORD", - "mode": "NULLABLE", - "fields": [ - { - "name": "col1", - "type": "INTEGER", - "mode": "NULLABLE", - }, - { - "name": "col2", - "type": "BOOLEAN", - "mode": "NULLABLE", - }, - { - "name": "col3", - "type": "STRING", - "mode": "NULLABLE", - }, - ], - }, + SchemaField.from_api_repr( + { + "name": "id", + "type": "INTEGER", + "mode": "NULLABLE", + } + ), + SchemaField.from_api_repr( + { + "name": "json_col", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "col1", + "type": "INTEGER", + "mode": "NULLABLE", + }, + { + "name": "col2", + "type": "BOOLEAN", + "mode": "NULLABLE", + }, + { + "name": "col3", + "type": "STRING", + "mode": "NULLABLE", + }, + ], + } + ), ] # act @@ -171,11 +175,13 @@ def test_create_table_from_records_overwrite_false(): mock_client = Mock(spec_set=bq.BigQueryClient) input_records = [{"id": 1}] target_schema = [ - { - "name": "id", - "type": "INTEGER", - "mode": "NULLABLE", - } + SchemaField.from_api_repr( + { + "name": "id", + "type": "INTEGER", + "mode": "NULLABLE", + } + ) ] # act @@ -346,6 +352,7 @@ def test_upsert_table_from_records(mock_create_table, mock_delete_table): json_key=None, client=mock_client, chunk_size=None, + schema=table_mock.schema, ) mock_delete_table.assert_called_once_with( @@ -398,10 +405,37 @@ def test_upsert_table_from_records_schema_mismatch(mock_delete_table): records=[{"id": 1}], key_field="id", client=mock_client, + use_target_schema=False, ) mock_delete_table.call_count == 2 +@patch("gcpde.bq.create_table_from_records") +def test_upsert_table_from_records_missing_target_table(mock_create_table): + # arrange + mock_client = Mock(spec_set=bq.BigQueryClient) + mock_client.get_table.side_effect = NotFound("") + + # act + bq.upsert_table_from_records( + dataset="dataset", + table="table", + records=[{"id": 1}], + key_field="id", + client=mock_client, + ) + + # assert + mock_create_table.assert_called_once_with( + dataset="dataset", + table="table", + records=[{"id": 1}], + overwrite=False, + json_key=None, + client=mock_client, + chunk_size=None, + ) + def test_big_query_schema_mismatch_exception(): # arrange From f09d85696823272f78d2ef586f94f8a275a8b3a8 Mon Sep 17 00:00:00 2001 From: Victor Lucio Date: Fri, 5 Sep 2025 16:07:43 -0300 Subject: [PATCH 2/3] =?UTF-8?q?=F0=9F=93=9D=20update=20docstrings=20of=20s?= =?UTF-8?q?chema=20object?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gcpde/bq.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gcpde/bq.py b/gcpde/bq.py index 0d7a33f..4c2a5db 100644 --- a/gcpde/bq.py +++ b/gcpde/bq.py @@ -84,7 +84,7 @@ def create_table( Args: dataset: dataset name. table: table name. - schema: json dict schema for the table. + schema: list of bigquery.SchemaField for the table. https://cloud.google.com/bigquery/docs/schemas#creating_a_json_schema_file Raises: @@ -273,7 +273,7 @@ def create_table( Args: dataset: dataset name. table: table name. - schema: json dict schema for the table or list of bigquery.SchemaField. + schema: list of bigquery.SchemaField for the table. https://cloud.google.com/bigquery/docs/schemas#creating_a_json_schema_file schema_from_records: infer schema from a records sample. json_key: json key with gcp credentials. @@ -541,7 +541,7 @@ def create_table_from_records( json_key: json key with gcp credentials. client: client to connect to gcp. chunk_size: chunk size number to send to GCP API. - schema: json dict schema for the table or list of bigquery.SchemaField. + schema: list of bigquery.SchemaField for the table. if None, it will be inferred from the records. """ From 4a6b59a2c658308fd5713342e713e1d77f52afb8 Mon Sep 17 00:00:00 2001 From: Victor Lucio Date: Fri, 5 Sep 2025 17:52:05 -0300 Subject: [PATCH 3/3] =?UTF-8?q?=E2=9C=A8=20Added=20BigQuerySchema=20and=20?= =?UTF-8?q?get=5Fschema=5Ffrom=5Fjson=20helper?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gcpde/bq.py | 48 ++++++++++++++++++++++++++++--------------- gcpde/types.py | 4 ++++ tests/unit/test_bq.py | 21 +++++++++++++++++++ 3 files changed, 56 insertions(+), 17 deletions(-) diff --git a/gcpde/bq.py b/gcpde/bq.py index 4c2a5db..b7c61cf 100644 --- a/gcpde/bq.py +++ b/gcpde/bq.py @@ -13,7 +13,7 @@ from google.oauth2.service_account import Credentials from loguru import logger -from gcpde.types import ListJsonType +from gcpde.types import BigQuerySchema, ListJsonType FIVE_MINUTES = 1 * 60 * 5 @@ -75,7 +75,7 @@ def create_table( self, dataset: str, table: str, - schema: list[bigquery.SchemaField], + schema: BigQuerySchema, ) -> None: """Create a new table on bigquery. @@ -84,8 +84,9 @@ def create_table( Args: dataset: dataset name. table: table name. - schema: list of bigquery.SchemaField for the table. - https://cloud.google.com/bigquery/docs/schemas#creating_a_json_schema_file + schema: BigQuerySchema for the table. Use + gcpde.bq.get_schema_from_json to create it from a list of + dictionaries. Raises: google.cloud.exceptions.Conflict: if the table already exists. @@ -186,8 +187,8 @@ class BigQuerySchemaMismatchException(Exception): def __init__( self, message: str, - source_schema: list[bigquery.SchemaField], - target_schema: list[bigquery.SchemaField], + source_schema: BigQuerySchema, + target_schema: BigQuerySchema, ): super().__init__(message) self.message = message @@ -202,6 +203,19 @@ def __str__(self) -> str: ) +def get_schema_from_json(schema: list[dict[str, str]]) -> BigQuerySchema: + """Get a schema from a list of dictionaries. + + Args: + schema: list of dictionaries representing the schema. + ref: https://cloud.google.com/bigquery/docs/schemas#creating_a_json_schema_file + + Returns: + BigQuerySchema + """ + return [bigquery.SchemaField.from_api_repr(field) for field in schema] + + def delete_table( dataset: str, table: str, @@ -228,7 +242,7 @@ def delete_table( return -def _create_schema_from_records(records: ListJsonType) -> list[bigquery.SchemaField]: +def _create_schema_from_records(records: ListJsonType) -> BigQuerySchema: generator = SchemaGenerator( input_format="dict", keep_nulls=True, @@ -242,9 +256,7 @@ def _create_schema_from_records(records: ListJsonType) -> list[bigquery.SchemaFi f"Can't infer schema from records, error: {error_logs}" ) output_json: list[dict[str, str]] = generator.flatten_schema(schema_map) - output_api: list[bigquery.SchemaField] = [ - bigquery.SchemaField.from_api_repr(field) for field in output_json - ] + output_api: BigQuerySchema = get_schema_from_json(output_json) logger.debug("Schema generator complete!") return output_api @@ -261,7 +273,7 @@ def _create_schema_from_records(records: ListJsonType) -> list[bigquery.SchemaFi def create_table( dataset: str, table: str, - schema: list[bigquery.SchemaField] | None = None, + schema: BigQuerySchema | None = None, schema_from_records: ListJsonType | None = None, json_key: dict[str, str] | None = None, client: BigQueryClient | None = None, @@ -273,8 +285,9 @@ def create_table( Args: dataset: dataset name. table: table name. - schema: list of bigquery.SchemaField for the table. - https://cloud.google.com/bigquery/docs/schemas#creating_a_json_schema_file + schema: BigQuerySchema for the table. Use + gcpde.bq.get_schema_from_json to create it from a list of + dictionaries. schema_from_records: infer schema from a records sample. json_key: json key with gcp credentials. client: client to connect to gcp. @@ -488,7 +501,7 @@ def replace_table( dataset: str, table: str, records: ListJsonType, - schema: list[bigquery.SchemaField] | None = None, + schema: BigQuerySchema | None = None, chunk_size: int | None = None, json_key: dict[str, str] | None = None, client: BigQueryClient | None = None, @@ -529,7 +542,7 @@ def create_table_from_records( json_key: dict[str, str] | None = None, client: BigQueryClient | None = None, chunk_size: int | None = None, - schema: list[bigquery.SchemaField] | None = None, + schema: BigQuerySchema | None = None, ) -> None: """Create or replace a table from a collection of records. @@ -541,8 +554,9 @@ def create_table_from_records( json_key: json key with gcp credentials. client: client to connect to gcp. chunk_size: chunk size number to send to GCP API. - schema: list of bigquery.SchemaField for the table. - if None, it will be inferred from the records. + schema: BigQuerySchema for the table. Use + gcpde.bq.get_schema_from_json to create it from a list of + dictionaries. """ if not records: diff --git a/gcpde/types.py b/gcpde/types.py index ea5a7a8..dbf7c33 100644 --- a/gcpde/types.py +++ b/gcpde/types.py @@ -2,4 +2,8 @@ from typing import Any +from google.cloud import bigquery + ListJsonType = list[dict[str, Any]] + +BigQuerySchema = list[bigquery.SchemaField] diff --git a/tests/unit/test_bq.py b/tests/unit/test_bq.py index a42244f..1f828a2 100644 --- a/tests/unit/test_bq.py +++ b/tests/unit/test_bq.py @@ -410,6 +410,7 @@ def test_upsert_table_from_records_schema_mismatch(mock_delete_table): mock_delete_table.call_count == 2 + @patch("gcpde.bq.create_table_from_records") def test_upsert_table_from_records_missing_target_table(mock_create_table): # arrange @@ -452,3 +453,23 @@ def test_big_query_schema_mismatch_exception(): str(exception) == "message\nSource schema: [{'name': 'id'}]\nTarget schema: [{'name': 'id'}]" ) + + +def test_get_schema_from_json(): + # arrange + schema_json = [ + {"name": "id", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "name", "type": "STRING", "mode": "REQUIRED"}, + ] + + # act + result = bq.get_schema_from_json(schema_json) + + # assert + assert len(result) == 2 + assert result[0].name == "id" + assert result[0].field_type == "INTEGER" + assert result[0].mode == "NULLABLE" + assert result[1].name == "name" + assert result[1].field_type == "STRING" + assert result[1].mode == "REQUIRED"