Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,23 @@ DB_MAX_OVERFLOW=20
TRANSFER_PARALLEL=1
# Limit number of records per transfer type (for testing)
# TRANSFER_LIMIT=1000
TRANSFER_WELL_SCREENS=True
TRANSFER_SENSORS=True
TRANSFER_CONTACTS=True
TRANSFER_WATERLEVELS=True
TRANSFER_WATERLEVELS_PRESSURE=True
TRANSFER_WATERLEVELS_ACOUSTIC=True
TRANSFER_LINK_IDS=True
TRANSFER_GROUPS=True
TRANSFER_ASSETS=False
TRANSFER_SURFACE_WATER_DATA=True
TRANSFER_HYDRAULICS_DATA=True
TRANSFER_CHEMISTRY_SAMPLEINFO=True
TRANSFER_RADIONUCLIDES=True
TRANSFER_NGWMN_VIEWS=True
TRANSFER_WATERLEVELS_PRESSURE_DAILY=True
TRANSFER_WEATHER_DATA=True
TRANSFER_MINOR_TRACE_CHEMISTRY=True

# asset storage
GCS_BUCKET_NAME=
Expand Down
38 changes: 25 additions & 13 deletions alembic/env.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
import logging
import os
from logging.config import fileConfig

Expand All @@ -11,11 +12,12 @@
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
alembic_logger = logging.getLogger("alembic.env")

# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
fileConfig(config.config_file_name, disable_existing_loggers=False)

# add your model's MetaData object here
# for 'autogenerate' support
Expand Down Expand Up @@ -156,30 +158,40 @@ def getconn():
poolclass=pool.NullPool,
)

with connectable.connect() as connection:
autocommit_conn = connection.execution_options(isolation_level="AUTOCOMMIT")
role_exists = autocommit_conn.execute(
with connectable.connect() as role_connection:
autocommit_role = role_connection.execution_options(
isolation_level="AUTOCOMMIT"
)
role_exists = autocommit_role.execute(
text("SELECT 1 FROM pg_roles WHERE rolname = 'app_read'")
).first()
if not role_exists:
autocommit_conn.execute(text("CREATE ROLE app_read"))
autocommit_role.execute(text("CREATE ROLE app_read"))

with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
include_object=include_object,
)
with context.begin_transaction():
context.run_migrations()
connection.execute(
text(
"""
GRANT SELECT ON ALL TABLES IN SCHEMA public TO app_read;
ALTER DEFAULT PRIVILEGES IN SCHEMA public
GRANT SELECT ON TABLES TO app_read;
"""
)

alembic_logger.info("Alembic migrations completed; applying app_read grants")
with connectable.connect() as grant_connection:
autocommit_grants = grant_connection.execution_options(
isolation_level="AUTOCOMMIT"
)
autocommit_grants.execute(
text("GRANT SELECT ON ALL TABLES IN SCHEMA public TO app_read")
)
autocommit_grants.execute(
text(
"ALTER DEFAULT PRIVILEGES IN SCHEMA public "
"GRANT SELECT ON TABLES TO app_read"
)
)
alembic_logger.info("Applied app_read grants")


if context.is_offline_mode():
Expand Down
6 changes: 5 additions & 1 deletion transfers/aquifer_system_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ def transfer_aquifer_systems(session: Session, limit: int = None) -> tuple:
created_count = 0
skipped_count = 0

logger.info(f"Starting transfer of {n} aquifer systems from LU_AquiferClass.")
logger.info(
"Starting transfer: AquiferSystems (%s rows) from LU_AquiferClass",
n,
)

# 4. Process each row
for i, row in enumerate(cleaned_df.itertuples()):
Expand Down Expand Up @@ -138,4 +141,5 @@ def transfer_aquifer_systems(session: Session, limit: int = None) -> tuple:
logger.critical(f"Error in final commit: {e}")
session.rollback()

logger.info("Completed transfer: AquiferSystems")
return input_df, cleaned_df, errors
46 changes: 43 additions & 3 deletions transfers/chemistry_sampleinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,24 @@ def _build_thing_id_cache(self):
"""Build cache of Thing.name -> thing.id to prevent orphan records."""
with session_ctx() as session:
things = session.query(Thing.name, Thing.id).all()
self._thing_id_cache = {name: thing_id for name, thing_id in things}
normalized = {}
for name, thing_id in things:
normalized_name = self._normalize_for_thing_match(name)
if not normalized_name:
continue
if (
normalized_name in normalized
and normalized[normalized_name] != thing_id
):
logger.warning(
"Duplicate Thing match key '%s' for ids %s and %s",
normalized_name,
normalized[normalized_name],
thing_id,
)
continue
normalized[normalized_name] = thing_id
self._thing_id_cache = normalized
logger.info(f"Built Thing ID cache with {len(self._thing_id_cache)} entries")

def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]:
Expand All @@ -72,7 +89,7 @@ def _filter_to_valid_things(self, df: pd.DataFrame) -> pd.DataFrame:
valid_point_ids = set(self._thing_id_cache.keys())

# Normalize SamplePointID to handle suffixed sample counts (e.g. AB-0002A -> AB-0002).
normalized_ids = df["SamplePointID"].apply(self._normalize_sample_point_id)
normalized_ids = df["SamplePointID"].apply(self._normalize_for_thing_match)

# Filter to rows where SamplePointID exists as a Thing.name
before_count = len(df)
Expand Down Expand Up @@ -104,6 +121,16 @@ def _normalize_sample_point_id(value: Any) -> Optional[str]:
return match.group("base")
return text

@classmethod
def _normalize_for_thing_match(cls, value: Any) -> Optional[str]:
"""
Normalize IDs for Thing matching (strip suffixes, trim, uppercase).
"""
normalized = cls._normalize_sample_point_id(value)
if not normalized:
return None
return normalized.strip().upper()

def _filter_to_valid_sample_pt_ids(self, df: pd.DataFrame) -> pd.DataFrame:
"""Filter to rows with a valid SamplePtID UUID (required for idempotent upserts)."""

Expand Down Expand Up @@ -138,6 +165,7 @@ def _transfer_hook(self, session: Session) -> None:
row_dicts = []
skipped_orphan_count = 0
skipped_sample_pt_id_count = 0
lookup_miss_count = 0
for row in self.cleaned_df.to_dict("records"):
row_dict = self._row_dict(row)
if row_dict.get("SamplePtID") is None:
Expand All @@ -152,6 +180,7 @@ def _transfer_hook(self, session: Session) -> None:
# Skip rows without valid thing_id (orphan prevention)
if row_dict.get("thing_id") is None:
skipped_orphan_count += 1
lookup_miss_count += 1
logger.warning(
f"Skipping ChemistrySampleInfo OBJECTID={row_dict.get('OBJECTID')} "
f"SamplePointID={row_dict.get('SamplePointID')} - Thing not found"
Expand All @@ -169,6 +198,10 @@ def _transfer_hook(self, session: Session) -> None:
f"Skipped {skipped_orphan_count} ChemistrySampleInfo records without valid Thing "
f"(orphan prevention)"
)
if lookup_miss_count > 0:
logger.warning(
"ChemistrySampleInfo Thing lookup misses: %s", lookup_miss_count
)

rows = self._dedupe_rows(row_dicts, key="OBJECTID")

Expand Down Expand Up @@ -259,14 +292,20 @@ def bool_val(key: str) -> Optional[bool]:

# Look up Thing by SamplePointID to prevent orphan records
sample_point_id = val("SamplePointID")
normalized_sample_point_id = self._normalize_sample_point_id(sample_point_id)
normalized_sample_point_id = self._normalize_for_thing_match(sample_point_id)
thing_id = None
if (
normalized_sample_point_id
and normalized_sample_point_id in self._thing_id_cache
):
thing_id = self._thing_id_cache[normalized_sample_point_id]
# If Thing not found, thing_id remains None and will be filtered out
if thing_id is None and sample_point_id is not None:
logger.debug(
"ChemistrySampleInfo Thing lookup miss: SamplePointID=%s normalized=%s",
sample_point_id,
normalized_sample_point_id,
)

return {
"SamplePtID": uuid_val("SamplePtID"),
Expand All @@ -288,6 +327,7 @@ def bool_val(key: str) -> Optional[bool]:
"SampleNotes": str_val("SampleNotes"),
"LocationId": uuid_val("LocationId"),
"OBJECTID": val("OBJECTID"),
"thing_id": thing_id,
}

def _dedupe_rows(
Expand Down
6 changes: 5 additions & 1 deletion transfers/geologic_formation_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ def transfer_geologic_formations(session: Session, limit: int = None) -> tuple:
created_count = 0
skipped_count = 0

logger.info(f"Starting transfer of {n} geologic formations")
logger.info(
"Starting transfer: GeologicFormations (%s rows) from LU_Formations",
n,
)

# 4. Process each row
for i, row in enumerate(cleaned_df.itertuples()):
Expand Down Expand Up @@ -134,4 +137,5 @@ def transfer_geologic_formations(session: Session, limit: int = None) -> tuple:
logger.critical(f"Error during final commit of geologic formations: {e}")
session.rollback()

logger.info("Completed transfer: GeologicFormations")
return input_df, cleaned_df, errors
5 changes: 5 additions & 0 deletions transfers/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,13 @@
logging.StreamHandler(sys.stdout),
logging.FileHandler(log_filename, mode="w", encoding="utf-8"),
],
force=True,
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# ensure root logger propagates INFO level to handlers
logging.getLogger().setLevel(logging.INFO)

# workaround to not redirect httpx logging
logging.getLogger("httpx").setLevel(logging.WARNING)
Expand Down
6 changes: 6 additions & 0 deletions transfers/permissions_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def transfer_permissions(session: Session) -> None:
wdf = read_csv("WellData", dtype={"OSEWelltagID": str})
wdf = replace_nans(wdf)

logger.info("Starting transfer: Permissions")
transferred_wells = (
session.query(Thing, Contact)
.select_from(Thing)
Expand All @@ -61,6 +62,7 @@ def transfer_permissions(session: Session) -> None:
.order_by(Thing.name)
.all()
)
created_count = 0
visited = []
for chunk in chunk_by_size(transferred_wells, 100):
objs = []
Expand All @@ -77,12 +79,16 @@ def transfer_permissions(session: Session) -> None:
)
if permission:
objs.append(permission)
created_count += 1

permission = _make_permission(
wdf, well, contact.id, "MonitorOK", "Water Level Sample"
)
if permission:
objs.append(permission)
created_count += 1

session.bulk_save_objects(objs)
session.commit()

logger.info("Completed transfer: Permissions (%s records)", created_count)
5 changes: 4 additions & 1 deletion transfers/stratigraphy_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ def transfer_stratigraphy(session: Session, limit: int = None) -> tuple:
n_wells = len(cleaned_df["PointID"].unique())

logger.info(
f"Starting transfer of {n_records} stratigraphy records for {n_wells} wells"
"Starting transfer: Stratigraphy (%s records, %s wells)",
n_records,
n_wells,
)

# 3. Initialize tracking variables for logging
Expand Down Expand Up @@ -294,4 +296,5 @@ def transfer_stratigraphy(session: Session, limit: int = None) -> tuple:
logger.critical(f"Error in final commit: {e}")
session.rollback()

logger.info("Completed transfer: Stratigraphy")
return input_df, cleaned_df, errors
2 changes: 2 additions & 0 deletions transfers/thing_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def transfer_thing(session: Session, site_type: str, make_payload, limit=None) -
n = len(ldf)
start_time = time.time()

logger.info("Starting transfer: Things (%s) [%s rows]", site_type, n)
cached_elevations = {}

for i, row in enumerate(ldf.itertuples()):
Expand Down Expand Up @@ -89,6 +90,7 @@ def transfer_thing(session: Session, site_type: str, make_payload, limit=None) -
continue

session.commit()
logger.info("Completed transfer: Things (%s)", site_type)


def transfer_springs(session, limit=None):
Expand Down
34 changes: 23 additions & 11 deletions transfers/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,27 @@ def _alembic_config() -> Config:


def _drop_and_rebuild_db() -> None:
logger.info("Dropping schema public")
with session_ctx() as session:
session.execute(text("DROP SCHEMA public CASCADE"))
session.execute(text("CREATE SCHEMA public"))
session.execute(text("CREATE EXTENSION IF NOT EXISTS postgis"))
session.commit()
command.upgrade(_alembic_config(), "head")
logger.info("Running Alembic migrations")
try:
command.upgrade(_alembic_config(), "head")
except SystemExit as exc:
if exc.code not in (0, None):
raise
logger.info(
"Alembic upgrade returned SystemExit(%s); continuing transfer", exc.code
)
logger.info("Alembic migrations complete")
logger.info("Initializing lexicon data")
init_lexicon()
logger.info("Initializing parameter data")
init_parameter()
logger.info("Schema rebuild complete")


@timeit
Expand Down Expand Up @@ -322,8 +335,6 @@ def _transfer_parallel(
parallel_tasks_1.append(
("ChemistrySampleInfo", ChemistrySampleInfoTransferer, flags)
)
if transfer_radionuclides:
parallel_tasks_1.append(("Radionuclides", RadionuclidesTransferer, flags))
if transfer_ngwmn_views:
parallel_tasks_1.append(
("NGWMNWellConstruction", NGWMNWellConstructionTransferer, flags)
Expand All @@ -340,10 +351,6 @@ def _transfer_parallel(
)
if transfer_weather_data:
parallel_tasks_1.append(("WeatherData", WeatherDataTransferer, flags))
if transfer_minor_trace_chemistry:
parallel_tasks_1.append(
("MinorTraceChemistry", MinorTraceChemistryTransferer, flags)
)

# Track results for metrics
results_map = {}
Expand Down Expand Up @@ -404,8 +411,6 @@ def _transfer_parallel(
metrics.hydraulics_data_metrics(*results_map["HydraulicsData"])
if "ChemistrySampleInfo" in results_map and results_map["ChemistrySampleInfo"]:
metrics.chemistry_sampleinfo_metrics(*results_map["ChemistrySampleInfo"])
if "Radionuclides" in results_map and results_map["Radionuclides"]:
metrics.radionuclides_metrics(*results_map["Radionuclides"])
if "NGWMNWellConstruction" in results_map and results_map["NGWMNWellConstruction"]:
metrics.ngwmn_well_construction_metrics(*results_map["NGWMNWellConstruction"])
if "NGWMNWaterLevels" in results_map and results_map["NGWMNWaterLevels"]:
Expand All @@ -421,8 +426,15 @@ def _transfer_parallel(
)
if "WeatherData" in results_map and results_map["WeatherData"]:
metrics.weather_data_metrics(*results_map["WeatherData"])
if "MinorTraceChemistry" in results_map and results_map["MinorTraceChemistry"]:
metrics.minor_trace_chemistry_metrics(*results_map["MinorTraceChemistry"])
if transfer_radionuclides:
message("TRANSFERRING RADIONUCLIDES")
results = _execute_transfer(RadionuclidesTransferer, flags=flags)
metrics.radionuclides_metrics(*results)

if transfer_minor_trace_chemistry:
message("TRANSFERRING MINOR TRACE CHEMISTRY")
results = _execute_transfer(MinorTraceChemistryTransferer, flags=flags)
metrics.minor_trace_chemistry_metrics(*results)

# =========================================================================
# PHASE 3: Sensors (Sequential - required before continuous water levels)
Expand Down
Loading
Loading