From 9f54bd5f5d998c474ba32e75271920ddd78f8369 Mon Sep 17 00:00:00 2001 From: "jake.ross" Date: Tue, 13 Jan 2026 23:03:06 -0700 Subject: [PATCH 1/3] feat: enhance transfer configurations and logging for various data types --- .env.example | 17 ++++++++++ alembic/env.py | 23 ++++++++------ transfers/aquifer_system_transfer.py | 6 +++- transfers/chemistry_sampleinfo.py | 40 ++++++++++++++++++++++-- transfers/geologic_formation_transfer.py | 6 +++- transfers/logger.py | 5 +++ transfers/permissions_transfer.py | 6 ++++ transfers/stratigraphy_transfer.py | 5 ++- transfers/thing_transfer.py | 2 ++ transfers/transfer.py | 35 ++++++++++++++------- transfers/transferer.py | 12 +++++++ 11 files changed, 131 insertions(+), 26 deletions(-) diff --git a/.env.example b/.env.example index d93d9811..2aba5949 100644 --- a/.env.example +++ b/.env.example @@ -17,6 +17,23 @@ DB_MAX_OVERFLOW=20 TRANSFER_PARALLEL=1 # Limit number of records per transfer type (for testing) # TRANSFER_LIMIT=1000 +TRANSFER_WELL_SCREENS=True +TRANSFER_SENSORS=True +TRANSFER_CONTACTS=True +TRANSFER_WATERLEVELS=True +TRANSFER_WATERLEVELS_PRESSURE=True +TRANSFER_WATERLEVELS_ACOUSTIC=True +TRANSFER_LINK_IDS=True +TRANSFER_GROUPS=True +TRANSFER_ASSETS=False +TRANSFER_SURFACE_WATER_DATA=True +TRANSFER_HYDRAULICS_DATA=True +TRANSFER_CHEMISTRY_SAMPLEINFO=True +TRANSFER_RADIONUCLIDES=True +TRANSFER_NGWMN_VIEWS=True +TRANSFER_WATERLEVELS_PRESSURE_DAILY=True +TRANSFER_WEATHER_DATA=True +TRANSFER_MINOR_TRACE_CHEMISTRY=True # asset storage GCS_BUCKET_NAME= diff --git a/alembic/env.py b/alembic/env.py index 045bcf3b..92522b47 100644 --- a/alembic/env.py +++ b/alembic/env.py @@ -1,4 +1,5 @@ import copy +import logging import os from logging.config import fileConfig @@ -11,11 +12,12 @@ # this is the Alembic Config object, which provides # access to the values within the .ini file in use. config = context.config +alembic_logger = logging.getLogger("alembic.env") # Interpret the config file for Python logging. # This line sets up loggers basically. if config.config_file_name is not None: - fileConfig(config.config_file_name) + fileConfig(config.config_file_name, disable_existing_loggers=False) # add your model's MetaData object here # for 'autogenerate' support @@ -171,15 +173,18 @@ def getconn(): ) with context.begin_transaction(): context.run_migrations() - connection.execute( - text( - """ - GRANT SELECT ON ALL TABLES IN SCHEMA public TO app_read; - ALTER DEFAULT PRIVILEGES IN SCHEMA public - GRANT SELECT ON TABLES TO app_read; - """ - ) + + alembic_logger.info("Alembic migrations completed; applying app_read grants") + autocommit_conn.execute( + text("GRANT SELECT ON ALL TABLES IN SCHEMA public TO app_read") + ) + autocommit_conn.execute( + text( + "ALTER DEFAULT PRIVILEGES IN SCHEMA public " + "GRANT SELECT ON TABLES TO app_read" ) + ) + alembic_logger.info("Applied app_read grants") if context.is_offline_mode(): diff --git a/transfers/aquifer_system_transfer.py b/transfers/aquifer_system_transfer.py index 8d374b59..6d223b80 100644 --- a/transfers/aquifer_system_transfer.py +++ b/transfers/aquifer_system_transfer.py @@ -37,7 +37,10 @@ def transfer_aquifer_systems(session: Session, limit: int = None) -> tuple: created_count = 0 skipped_count = 0 - logger.info(f"Starting transfer of {n} aquifer systems from LU_AquiferClass.") + logger.info( + "Starting transfer: AquiferSystems (%s rows) from LU_AquiferClass", + n, + ) # 4. Process each row for i, row in enumerate(cleaned_df.itertuples()): @@ -138,4 +141,5 @@ def transfer_aquifer_systems(session: Session, limit: int = None) -> tuple: logger.critical(f"Error in final commit: {e}") session.rollback() + logger.info("Completed transfer: AquiferSystems") return input_df, cleaned_df, errors diff --git a/transfers/chemistry_sampleinfo.py b/transfers/chemistry_sampleinfo.py index f06f022b..bb9c2bc4 100644 --- a/transfers/chemistry_sampleinfo.py +++ b/transfers/chemistry_sampleinfo.py @@ -51,7 +51,24 @@ def _build_thing_id_cache(self): """Build cache of Thing.name -> thing.id to prevent orphan records.""" with session_ctx() as session: things = session.query(Thing.name, Thing.id).all() - self._thing_id_cache = {name: thing_id for name, thing_id in things} + normalized = {} + for name, thing_id in things: + normalized_name = self._normalize_for_thing_match(name) + if not normalized_name: + continue + if ( + normalized_name in normalized + and normalized[normalized_name] != thing_id + ): + logger.warning( + "Duplicate Thing match key '%s' for ids %s and %s", + normalized_name, + normalized[normalized_name], + thing_id, + ) + continue + normalized[normalized_name] = thing_id + self._thing_id_cache = normalized logger.info(f"Built Thing ID cache with {len(self._thing_id_cache)} entries") def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: @@ -72,7 +89,7 @@ def _filter_to_valid_things(self, df: pd.DataFrame) -> pd.DataFrame: valid_point_ids = set(self._thing_id_cache.keys()) # Normalize SamplePointID to handle suffixed sample counts (e.g. AB-0002A -> AB-0002). - normalized_ids = df["SamplePointID"].apply(self._normalize_sample_point_id) + normalized_ids = df["SamplePointID"].apply(self._normalize_for_thing_match) # Filter to rows where SamplePointID exists as a Thing.name before_count = len(df) @@ -104,6 +121,16 @@ def _normalize_sample_point_id(value: Any) -> Optional[str]: return match.group("base") return text + @classmethod + def _normalize_for_thing_match(cls, value: Any) -> Optional[str]: + """ + Normalize IDs for Thing matching (strip suffixes, trim, uppercase). + """ + normalized = cls._normalize_sample_point_id(value) + if not normalized: + return None + return normalized.strip().upper() + def _filter_to_valid_sample_pt_ids(self, df: pd.DataFrame) -> pd.DataFrame: """Filter to rows with a valid SamplePtID UUID (required for idempotent upserts).""" @@ -259,7 +286,7 @@ def bool_val(key: str) -> Optional[bool]: # Look up Thing by SamplePointID to prevent orphan records sample_point_id = val("SamplePointID") - normalized_sample_point_id = self._normalize_sample_point_id(sample_point_id) + normalized_sample_point_id = self._normalize_for_thing_match(sample_point_id) thing_id = None if ( normalized_sample_point_id @@ -267,6 +294,12 @@ def bool_val(key: str) -> Optional[bool]: ): thing_id = self._thing_id_cache[normalized_sample_point_id] # If Thing not found, thing_id remains None and will be filtered out + if thing_id is None and sample_point_id is not None: + logger.info( + "ChemistrySampleInfo Thing lookup miss: SamplePointID=%s normalized=%s", + sample_point_id, + normalized_sample_point_id, + ) return { "SamplePtID": uuid_val("SamplePtID"), @@ -288,6 +321,7 @@ def bool_val(key: str) -> Optional[bool]: "SampleNotes": str_val("SampleNotes"), "LocationId": uuid_val("LocationId"), "OBJECTID": val("OBJECTID"), + "thing_id": thing_id, } def _dedupe_rows( diff --git a/transfers/geologic_formation_transfer.py b/transfers/geologic_formation_transfer.py index 4483aa3a..4b8250c7 100644 --- a/transfers/geologic_formation_transfer.py +++ b/transfers/geologic_formation_transfer.py @@ -35,7 +35,10 @@ def transfer_geologic_formations(session: Session, limit: int = None) -> tuple: created_count = 0 skipped_count = 0 - logger.info(f"Starting transfer of {n} geologic formations") + logger.info( + "Starting transfer: GeologicFormations (%s rows) from LU_Formations", + n, + ) # 4. Process each row for i, row in enumerate(cleaned_df.itertuples()): @@ -134,4 +137,5 @@ def transfer_geologic_formations(session: Session, limit: int = None) -> tuple: logger.critical(f"Error during final commit of geologic formations: {e}") session.rollback() + logger.info("Completed transfer: GeologicFormations") return input_df, cleaned_df, errors diff --git a/transfers/logger.py b/transfers/logger.py index 745582c7..a5fd6241 100644 --- a/transfers/logger.py +++ b/transfers/logger.py @@ -50,8 +50,13 @@ logging.StreamHandler(sys.stdout), logging.FileHandler(log_filename, mode="w", encoding="utf-8"), ], + force=True, ) logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +# ensure root logger propagates INFO level to handlers +logging.getLogger().setLevel(logging.INFO) # workaround to not redirect httpx logging logging.getLogger("httpx").setLevel(logging.WARNING) diff --git a/transfers/permissions_transfer.py b/transfers/permissions_transfer.py index 6bb0f712..364eacc9 100644 --- a/transfers/permissions_transfer.py +++ b/transfers/permissions_transfer.py @@ -52,6 +52,7 @@ def transfer_permissions(session: Session) -> None: wdf = read_csv("WellData", dtype={"OSEWelltagID": str}) wdf = replace_nans(wdf) + logger.info("Starting transfer: Permissions") transferred_wells = ( session.query(Thing, Contact) .select_from(Thing) @@ -61,6 +62,7 @@ def transfer_permissions(session: Session) -> None: .order_by(Thing.name) .all() ) + created_count = 0 visited = [] for chunk in chunk_by_size(transferred_wells, 100): objs = [] @@ -77,12 +79,16 @@ def transfer_permissions(session: Session) -> None: ) if permission: objs.append(permission) + created_count += 1 permission = _make_permission( wdf, well, contact.id, "MonitorOK", "Water Level Sample" ) if permission: objs.append(permission) + created_count += 1 session.bulk_save_objects(objs) session.commit() + + logger.info("Completed transfer: Permissions (%s records)", created_count) diff --git a/transfers/stratigraphy_transfer.py b/transfers/stratigraphy_transfer.py index f75f43ab..d822d70a 100644 --- a/transfers/stratigraphy_transfer.py +++ b/transfers/stratigraphy_transfer.py @@ -56,7 +56,9 @@ def transfer_stratigraphy(session: Session, limit: int = None) -> tuple: n_wells = len(cleaned_df["PointID"].unique()) logger.info( - f"Starting transfer of {n_records} stratigraphy records for {n_wells} wells" + "Starting transfer: Stratigraphy (%s records, %s wells)", + n_records, + n_wells, ) # 3. Initialize tracking variables for logging @@ -294,4 +296,5 @@ def transfer_stratigraphy(session: Session, limit: int = None) -> tuple: logger.critical(f"Error in final commit: {e}") session.rollback() + logger.info("Completed transfer: Stratigraphy") return input_df, cleaned_df, errors diff --git a/transfers/thing_transfer.py b/transfers/thing_transfer.py index 5b3c49d5..754634b7 100644 --- a/transfers/thing_transfer.py +++ b/transfers/thing_transfer.py @@ -38,6 +38,7 @@ def transfer_thing(session: Session, site_type: str, make_payload, limit=None) - n = len(ldf) start_time = time.time() + logger.info("Starting transfer: Things (%s) [%s rows]", site_type, n) cached_elevations = {} for i, row in enumerate(ldf.itertuples()): @@ -89,6 +90,7 @@ def transfer_thing(session: Session, site_type: str, make_payload, limit=None) - continue session.commit() + logger.info("Completed transfer: Things (%s)", site_type) def transfer_springs(session, limit=None): diff --git a/transfers/transfer.py b/transfers/transfer.py index e326fb84..8de1fd5c 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -14,6 +14,7 @@ # limitations under the License. # =============================================================================== import os +import sys import time from concurrent.futures import ThreadPoolExecutor, as_completed @@ -138,14 +139,27 @@ def _alembic_config() -> Config: def _drop_and_rebuild_db() -> None: + logger.info("Dropping schema public") with session_ctx() as session: session.execute(text("DROP SCHEMA public CASCADE")) session.execute(text("CREATE SCHEMA public")) session.execute(text("CREATE EXTENSION IF NOT EXISTS postgis")) session.commit() - command.upgrade(_alembic_config(), "head") + logger.info("Running Alembic migrations") + try: + command.upgrade(_alembic_config(), "head") + except SystemExit as exc: + if exc.code not in (0, None): + raise + logger.info( + "Alembic upgrade returned SystemExit(%s); continuing transfer", exc.code + ) + logger.info("Alembic migrations complete") + logger.info("Initializing lexicon data") init_lexicon() + logger.info("Initializing parameter data") init_parameter() + logger.info("Schema rebuild complete") @timeit @@ -322,8 +336,6 @@ def _transfer_parallel( parallel_tasks_1.append( ("ChemistrySampleInfo", ChemistrySampleInfoTransferer, flags) ) - if transfer_radionuclides: - parallel_tasks_1.append(("Radionuclides", RadionuclidesTransferer, flags)) if transfer_ngwmn_views: parallel_tasks_1.append( ("NGWMNWellConstruction", NGWMNWellConstructionTransferer, flags) @@ -340,10 +352,6 @@ def _transfer_parallel( ) if transfer_weather_data: parallel_tasks_1.append(("WeatherData", WeatherDataTransferer, flags)) - if transfer_minor_trace_chemistry: - parallel_tasks_1.append( - ("MinorTraceChemistry", MinorTraceChemistryTransferer, flags) - ) # Track results for metrics results_map = {} @@ -404,8 +412,6 @@ def _transfer_parallel( metrics.hydraulics_data_metrics(*results_map["HydraulicsData"]) if "ChemistrySampleInfo" in results_map and results_map["ChemistrySampleInfo"]: metrics.chemistry_sampleinfo_metrics(*results_map["ChemistrySampleInfo"]) - if "Radionuclides" in results_map and results_map["Radionuclides"]: - metrics.radionuclides_metrics(*results_map["Radionuclides"]) if "NGWMNWellConstruction" in results_map and results_map["NGWMNWellConstruction"]: metrics.ngwmn_well_construction_metrics(*results_map["NGWMNWellConstruction"]) if "NGWMNWaterLevels" in results_map and results_map["NGWMNWaterLevels"]: @@ -421,8 +427,15 @@ def _transfer_parallel( ) if "WeatherData" in results_map and results_map["WeatherData"]: metrics.weather_data_metrics(*results_map["WeatherData"]) - if "MinorTraceChemistry" in results_map and results_map["MinorTraceChemistry"]: - metrics.minor_trace_chemistry_metrics(*results_map["MinorTraceChemistry"]) + if transfer_radionuclides: + message("TRANSFERRING RADIONUCLIDES") + results = _execute_transfer(RadionuclidesTransferer, flags=flags) + metrics.radionuclides_metrics(*results) + + if transfer_minor_trace_chemistry: + message("TRANSFERRING MINOR TRACE CHEMISTRY") + results = _execute_transfer(MinorTraceChemistryTransferer, flags=flags) + metrics.minor_trace_chemistry_metrics(*results) # ========================================================================= # PHASE 3: Sensors (Sequential - required before continuous water levels) diff --git a/transfers/transferer.py b/transfers/transferer.py index be814e2b..dcfb3975 100644 --- a/transfers/transferer.py +++ b/transfers/transferer.py @@ -45,11 +45,23 @@ def __init__(self, flags: dict = None, pointids: list = None): self.manual_fixer = ManualFixer() self.pointids = pointids + def _df_len(self, df: pd.DataFrame | None) -> int: + return int(len(df)) if df is not None else 0 + def transfer(self) -> None: with session_ctx() as session: + name = self.source_table or self.__class__.__name__ + logger.info("Starting transfer: %s", name) self.input_df, self.cleaned_df = self._get_dfs() + logger.info( + "Loaded %s rows (%s cleaned) for %s", + self._df_len(self.input_df), + self._df_len(self.cleaned_df), + name, + ) self._transfer_hook(session) session.commit() + logger.info("Completed transfer: %s", name) def _capture_validation_error(self, pointid: str, err: ValidationError) -> None: self._capture_error( From 083f15961e28d19fb1822098da0bd053ae43501a Mon Sep 17 00:00:00 2001 From: Jake Ross Date: Tue, 13 Jan 2026 23:05:05 -0700 Subject: [PATCH 2/3] Update transfers/transfer.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- transfers/transfer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/transfers/transfer.py b/transfers/transfer.py index 8de1fd5c..113d473f 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -14,7 +14,6 @@ # limitations under the License. # =============================================================================== import os -import sys import time from concurrent.futures import ThreadPoolExecutor, as_completed From bbc32f26190d719f85b193b0086fa5524b094df2 Mon Sep 17 00:00:00 2001 From: "jake.ross" Date: Tue, 13 Jan 2026 23:08:54 -0700 Subject: [PATCH 3/3] feat: enhance ChemistrySampleInfo logging and role management in migrations --- alembic/env.py | 23 +++++++++++++++-------- transfers/chemistry_sampleinfo.py | 8 +++++++- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/alembic/env.py b/alembic/env.py index 92522b47..f0bd9e77 100644 --- a/alembic/env.py +++ b/alembic/env.py @@ -158,14 +158,17 @@ def getconn(): poolclass=pool.NullPool, ) - with connectable.connect() as connection: - autocommit_conn = connection.execution_options(isolation_level="AUTOCOMMIT") - role_exists = autocommit_conn.execute( + with connectable.connect() as role_connection: + autocommit_role = role_connection.execution_options( + isolation_level="AUTOCOMMIT" + ) + role_exists = autocommit_role.execute( text("SELECT 1 FROM pg_roles WHERE rolname = 'app_read'") ).first() if not role_exists: - autocommit_conn.execute(text("CREATE ROLE app_read")) + autocommit_role.execute(text("CREATE ROLE app_read")) + with connectable.connect() as connection: context.configure( connection=connection, target_metadata=target_metadata, @@ -174,17 +177,21 @@ def getconn(): with context.begin_transaction(): context.run_migrations() - alembic_logger.info("Alembic migrations completed; applying app_read grants") - autocommit_conn.execute( + alembic_logger.info("Alembic migrations completed; applying app_read grants") + with connectable.connect() as grant_connection: + autocommit_grants = grant_connection.execution_options( + isolation_level="AUTOCOMMIT" + ) + autocommit_grants.execute( text("GRANT SELECT ON ALL TABLES IN SCHEMA public TO app_read") ) - autocommit_conn.execute( + autocommit_grants.execute( text( "ALTER DEFAULT PRIVILEGES IN SCHEMA public " "GRANT SELECT ON TABLES TO app_read" ) ) - alembic_logger.info("Applied app_read grants") + alembic_logger.info("Applied app_read grants") if context.is_offline_mode(): diff --git a/transfers/chemistry_sampleinfo.py b/transfers/chemistry_sampleinfo.py index bb9c2bc4..9020f553 100644 --- a/transfers/chemistry_sampleinfo.py +++ b/transfers/chemistry_sampleinfo.py @@ -165,6 +165,7 @@ def _transfer_hook(self, session: Session) -> None: row_dicts = [] skipped_orphan_count = 0 skipped_sample_pt_id_count = 0 + lookup_miss_count = 0 for row in self.cleaned_df.to_dict("records"): row_dict = self._row_dict(row) if row_dict.get("SamplePtID") is None: @@ -179,6 +180,7 @@ def _transfer_hook(self, session: Session) -> None: # Skip rows without valid thing_id (orphan prevention) if row_dict.get("thing_id") is None: skipped_orphan_count += 1 + lookup_miss_count += 1 logger.warning( f"Skipping ChemistrySampleInfo OBJECTID={row_dict.get('OBJECTID')} " f"SamplePointID={row_dict.get('SamplePointID')} - Thing not found" @@ -196,6 +198,10 @@ def _transfer_hook(self, session: Session) -> None: f"Skipped {skipped_orphan_count} ChemistrySampleInfo records without valid Thing " f"(orphan prevention)" ) + if lookup_miss_count > 0: + logger.warning( + "ChemistrySampleInfo Thing lookup misses: %s", lookup_miss_count + ) rows = self._dedupe_rows(row_dicts, key="OBJECTID") @@ -295,7 +301,7 @@ def bool_val(key: str) -> Optional[bool]: thing_id = self._thing_id_cache[normalized_sample_point_id] # If Thing not found, thing_id remains None and will be filtered out if thing_id is None and sample_point_id is not None: - logger.info( + logger.debug( "ChemistrySampleInfo Thing lookup miss: SamplePointID=%s normalized=%s", sample_point_id, normalized_sample_point_id,