From 6e4c3babe7bc5c826f0f16d47073566151264f42 Mon Sep 17 00:00:00 2001 From: "jake.ross" Date: Wed, 14 Jan 2026 15:06:59 -0700 Subject: [PATCH 1/2] feat: add NMA_MajorChemistry table and transfer logic for major chemistry data --- .env.example | 1 + ...a7b8c9d0e1f2_create_nma_major_chemistry.py | 104 +++++++ db/nma_legacy.py | 55 ++++ tests/test_major_chemistry_legacy.py | 284 ++++++++++++++++++ transfers/major_chemistry.py | 229 ++++++++++++++ transfers/metrics.py | 4 + transfers/transfer.py | 26 +- 7 files changed, 702 insertions(+), 1 deletion(-) create mode 100644 alembic/versions/a7b8c9d0e1f2_create_nma_major_chemistry.py create mode 100644 tests/test_major_chemistry_legacy.py create mode 100644 transfers/major_chemistry.py diff --git a/.env.example b/.env.example index 2aba5949..efcedc03 100644 --- a/.env.example +++ b/.env.example @@ -29,6 +29,7 @@ TRANSFER_ASSETS=False TRANSFER_SURFACE_WATER_DATA=True TRANSFER_HYDRAULICS_DATA=True TRANSFER_CHEMISTRY_SAMPLEINFO=True +TRANSFER_MAJOR_CHEMISTRY=True TRANSFER_RADIONUCLIDES=True TRANSFER_NGWMN_VIEWS=True TRANSFER_WATERLEVELS_PRESSURE_DAILY=True diff --git a/alembic/versions/a7b8c9d0e1f2_create_nma_major_chemistry.py b/alembic/versions/a7b8c9d0e1f2_create_nma_major_chemistry.py new file mode 100644 index 00000000..eef87a37 --- /dev/null +++ b/alembic/versions/a7b8c9d0e1f2_create_nma_major_chemistry.py @@ -0,0 +1,104 @@ +"""Create legacy NMA_MajorChemistry table. + +Revision ID: a7b8c9d0e1f2 +Revises: f3b4c5d6e7f8 +Create Date: 2026-03-01 02:00:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy import inspect +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = "a7b8c9d0e1f2" +down_revision: Union[str, Sequence[str], None] = "f3b4c5d6e7f8" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Create the legacy major chemistry table.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_MajorChemistry"): + op.create_table( + "NMA_MajorChemistry", + sa.Column( + "SamplePtID", + postgresql.UUID(as_uuid=True), + sa.ForeignKey( + "NMA_Chemistry_SampleInfo.SamplePtID", ondelete="CASCADE" + ), + nullable=False, + ), + sa.Column("SamplePointID", sa.String(length=10), nullable=True), + sa.Column("Analyte", sa.String(length=50), nullable=True), + sa.Column("Symbol", sa.String(length=50), nullable=True), + sa.Column( + "SampleValue", sa.Float(), nullable=True, server_default=sa.text("0") + ), + sa.Column("Units", sa.String(length=50), nullable=True), + sa.Column("Uncertainty", sa.Float(), nullable=True), + sa.Column("AnalysisMethod", sa.String(length=255), nullable=True), + sa.Column("AnalysisDate", sa.DateTime(), nullable=True), + sa.Column("Notes", sa.String(length=255), nullable=True), + sa.Column( + "Volume", sa.Integer(), nullable=True, server_default=sa.text("0") + ), + sa.Column("VolumeUnit", sa.String(length=50), nullable=True), + sa.Column("OBJECTID", sa.Integer(), nullable=True, unique=True), + sa.Column( + "GlobalID", + postgresql.UUID(as_uuid=True), + nullable=False, + primary_key=True, + ), + sa.Column("AnalysesAgency", sa.String(length=50), nullable=True), + sa.Column("WCLab_ID", sa.String(length=25), nullable=True), + ) + op.create_index( + "MajorChemistry$AnalysesAgency", + "NMA_MajorChemistry", + ["AnalysesAgency"], + ) + op.create_index( + "MajorChemistry$Analyte", + "NMA_MajorChemistry", + ["Analyte"], + ) + op.create_index( + "MajorChemistry$Chemistry SampleInfoMajorChemistry", + "NMA_MajorChemistry", + ["SamplePtID"], + ) + op.create_index( + "MajorChemistry$SamplePointID", + "NMA_MajorChemistry", + ["SamplePointID"], + ) + op.create_index( + "MajorChemistry$SamplePointIDAnalyte", + "NMA_MajorChemistry", + ["SamplePointID", "Analyte"], + ) + op.create_index( + "MajorChemistry$SamplePtID", + "NMA_MajorChemistry", + ["SamplePtID"], + ) + op.create_index( + "MajorChemistry$WCLab_ID", + "NMA_MajorChemistry", + ["WCLab_ID"], + ) + + +def downgrade() -> None: + """Drop the legacy major chemistry table.""" + bind = op.get_bind() + inspector = inspect(bind) + if inspector.has_table("NMA_MajorChemistry"): + op.drop_table("NMA_MajorChemistry") diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 07ca6efa..d36ee0f2 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -278,6 +278,13 @@ class ChemistrySampleInfo(Base): passive_deletes=True, ) + major_chemistries: Mapped[List["NMAMajorChemistry"]] = relationship( + "NMAMajorChemistry", + back_populates="chemistry_sample_info", + cascade="all, delete-orphan", + passive_deletes=True, + ) + @validates("thing_id") def validate_thing_id(self, key, value): """Prevent orphan ChemistrySampleInfo - must have a parent Thing.""" @@ -455,4 +462,52 @@ def validate_sample_pt_id(self, key, value): return value +class NMAMajorChemistry(Base): + """ + Legacy MajorChemistry table from NM_Aquifer_Dev_DB. + """ + + __tablename__ = "NMA_MajorChemistry" + + global_id: Mapped[uuid.UUID] = mapped_column( + "GlobalID", UUID(as_uuid=True), primary_key=True + ) + sample_pt_id: Mapped[uuid.UUID] = mapped_column( + "SamplePtID", + UUID(as_uuid=True), + ForeignKey("NMA_Chemistry_SampleInfo.SamplePtID", ondelete="CASCADE"), + nullable=False, + ) + sample_point_id: Mapped[Optional[str]] = mapped_column("SamplePointID", String(10)) + analyte: Mapped[Optional[str]] = mapped_column("Analyte", String(50)) + symbol: Mapped[Optional[str]] = mapped_column("Symbol", String(50)) + sample_value: Mapped[Optional[float]] = mapped_column( + "SampleValue", Float, server_default=text("0") + ) + units: Mapped[Optional[str]] = mapped_column("Units", String(50)) + uncertainty: Mapped[Optional[float]] = mapped_column("Uncertainty", Float) + analysis_method: Mapped[Optional[str]] = mapped_column( + "AnalysisMethod", String(255) + ) + analysis_date: Mapped[Optional[datetime]] = mapped_column("AnalysisDate", DateTime) + notes: Mapped[Optional[str]] = mapped_column("Notes", String(255)) + volume: Mapped[Optional[int]] = mapped_column( + "Volume", Integer, server_default=text("0") + ) + volume_unit: Mapped[Optional[str]] = mapped_column("VolumeUnit", String(50)) + object_id: Mapped[Optional[int]] = mapped_column("OBJECTID", Integer, unique=True) + analyses_agency: Mapped[Optional[str]] = mapped_column("AnalysesAgency", String(50)) + wclab_id: Mapped[Optional[str]] = mapped_column("WCLab_ID", String(25)) + + chemistry_sample_info: Mapped["ChemistrySampleInfo"] = relationship( + "ChemistrySampleInfo", back_populates="major_chemistries" + ) + + @validates("sample_pt_id") + def validate_sample_pt_id(self, key, value): + if value is None: + raise ValueError("NMAMajorChemistry requires a SamplePtID") + return value + + # ============= EOF ============================================= diff --git a/tests/test_major_chemistry_legacy.py b/tests/test_major_chemistry_legacy.py new file mode 100644 index 00000000..c1299f1c --- /dev/null +++ b/tests/test_major_chemistry_legacy.py @@ -0,0 +1,284 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +""" +Unit tests for MajorChemistry legacy model. + +These tests verify the migration of columns from the legacy MajorChemistry table. +Migrated columns (excluding SSMA_TimeStamp): +- SamplePtID -> sample_pt_id +- SamplePointID -> sample_point_id +- Analyte -> analyte +- Symbol -> symbol +- SampleValue -> sample_value +- Units -> units +- Uncertainty -> uncertainty +- AnalysisMethod -> analysis_method +- AnalysisDate -> analysis_date +- Notes -> notes +- Volume -> volume +- VolumeUnit -> volume_unit +- OBJECTID -> object_id +- GlobalID -> global_id +- AnalysesAgency -> analyses_agency +- WCLab_ID -> wclab_id +""" + +from datetime import datetime +from uuid import uuid4 + +from db.engine import session_ctx +from db.nma_legacy import ChemistrySampleInfo, NMAMajorChemistry + + +def _next_sample_point_id() -> str: + return f"SP-{uuid4().hex[:7]}" + + +# ===================== CREATE tests ========================== +def test_create_major_chemistry_all_fields(water_well_thing): + """Test creating a major chemistry record with all fields.""" + with session_ctx() as session: + sample_info = ChemistrySampleInfo( + sample_pt_id=uuid4(), + sample_point_id=_next_sample_point_id(), + thing_id=water_well_thing.id, + ) + session.add(sample_info) + session.commit() + + record = NMAMajorChemistry( + global_id=uuid4(), + sample_pt_id=sample_info.sample_pt_id, + sample_point_id=sample_info.sample_point_id, + analyte="Ca", + symbol="<", + sample_value=12.3, + units="mg/L", + uncertainty=0.1, + analysis_method="ICP-MS", + analysis_date=datetime(2024, 6, 15, 0, 0, 0), + notes="Test notes", + volume=250, + volume_unit="mL", + analyses_agency="NMBGMR", + wclab_id="LAB-101", + ) + session.add(record) + session.commit() + session.refresh(record) + + assert record.global_id is not None + assert record.sample_pt_id == sample_info.sample_pt_id + assert record.sample_point_id == sample_info.sample_point_id + assert record.analyte == "Ca" + assert record.sample_value == 12.3 + assert record.uncertainty == 0.1 + + session.delete(record) + session.delete(sample_info) + session.commit() + + +def test_create_major_chemistry_minimal(water_well_thing): + """Test creating a major chemistry record with minimal fields.""" + with session_ctx() as session: + sample_info = ChemistrySampleInfo( + sample_pt_id=uuid4(), + sample_point_id=_next_sample_point_id(), + thing_id=water_well_thing.id, + ) + session.add(sample_info) + session.commit() + + record = NMAMajorChemistry( + global_id=uuid4(), + sample_pt_id=sample_info.sample_pt_id, + ) + session.add(record) + session.commit() + session.refresh(record) + + assert record.global_id is not None + assert record.sample_pt_id == sample_info.sample_pt_id + assert record.analyte is None + assert record.units is None + + session.delete(record) + session.delete(sample_info) + session.commit() + + +# ===================== READ tests ========================== +def test_read_major_chemistry_by_global_id(water_well_thing): + """Test reading a major chemistry record by GlobalID.""" + with session_ctx() as session: + sample_info = ChemistrySampleInfo( + sample_pt_id=uuid4(), + sample_point_id=_next_sample_point_id(), + thing_id=water_well_thing.id, + ) + session.add(sample_info) + session.commit() + + record = NMAMajorChemistry( + global_id=uuid4(), + sample_pt_id=sample_info.sample_pt_id, + ) + session.add(record) + session.commit() + + fetched = session.get(NMAMajorChemistry, record.global_id) + assert fetched is not None + assert fetched.global_id == record.global_id + + session.delete(record) + session.delete(sample_info) + session.commit() + + +def test_query_major_chemistry_by_sample_point_id(water_well_thing): + """Test querying major chemistry by sample_point_id.""" + with session_ctx() as session: + sample_info = ChemistrySampleInfo( + sample_pt_id=uuid4(), + sample_point_id=_next_sample_point_id(), + thing_id=water_well_thing.id, + ) + session.add(sample_info) + session.commit() + + record1 = NMAMajorChemistry( + global_id=uuid4(), + sample_pt_id=sample_info.sample_pt_id, + sample_point_id=sample_info.sample_point_id, + ) + record2 = NMAMajorChemistry( + global_id=uuid4(), + sample_pt_id=sample_info.sample_pt_id, + sample_point_id="OTHER-PT", + ) + session.add_all([record1, record2]) + session.commit() + + results = ( + session.query(NMAMajorChemistry) + .filter(NMAMajorChemistry.sample_point_id == sample_info.sample_point_id) + .all() + ) + assert len(results) >= 1 + assert all(r.sample_point_id == sample_info.sample_point_id for r in results) + + session.delete(record1) + session.delete(record2) + session.delete(sample_info) + session.commit() + + +# ===================== UPDATE tests ========================== +def test_update_major_chemistry(water_well_thing): + """Test updating a major chemistry record.""" + with session_ctx() as session: + sample_info = ChemistrySampleInfo( + sample_pt_id=uuid4(), + sample_point_id=_next_sample_point_id(), + thing_id=water_well_thing.id, + ) + session.add(sample_info) + session.commit() + + record = NMAMajorChemistry( + global_id=uuid4(), + sample_pt_id=sample_info.sample_pt_id, + ) + session.add(record) + session.commit() + + record.analyses_agency = "Updated Agency" + record.notes = "Updated notes" + session.commit() + session.refresh(record) + + assert record.analyses_agency == "Updated Agency" + assert record.notes == "Updated notes" + + session.delete(record) + session.delete(sample_info) + session.commit() + + +# ===================== DELETE tests ========================== +def test_delete_major_chemistry(water_well_thing): + """Test deleting a major chemistry record.""" + with session_ctx() as session: + sample_info = ChemistrySampleInfo( + sample_pt_id=uuid4(), + sample_point_id=_next_sample_point_id(), + thing_id=water_well_thing.id, + ) + session.add(sample_info) + session.commit() + + record = NMAMajorChemistry( + global_id=uuid4(), + sample_pt_id=sample_info.sample_pt_id, + ) + session.add(record) + session.commit() + + session.delete(record) + session.commit() + + fetched = session.get(NMAMajorChemistry, record.global_id) + assert fetched is None + + session.delete(sample_info) + session.commit() + + +# ===================== Column existence tests ========================== +def test_major_chemistry_has_all_migrated_columns(): + """Test that the model has all expected columns.""" + expected_columns = [ + "global_id", + "sample_pt_id", + "sample_point_id", + "analyte", + "symbol", + "sample_value", + "units", + "uncertainty", + "analysis_method", + "analysis_date", + "notes", + "volume", + "volume_unit", + "object_id", + "analyses_agency", + "wclab_id", + ] + + for column in expected_columns: + assert hasattr( + NMAMajorChemistry, column + ), f"Expected column '{column}' not found in NMAMajorChemistry model" + + +def test_major_chemistry_table_name(): + """Test that the table name follows convention.""" + assert NMAMajorChemistry.__tablename__ == "NMA_MajorChemistry" + + +# ============= EOF ============================================= diff --git a/transfers/major_chemistry.py b/transfers/major_chemistry.py new file mode 100644 index 00000000..320132db --- /dev/null +++ b/transfers/major_chemistry.py @@ -0,0 +1,229 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== + +from __future__ import annotations + +from datetime import datetime +from typing import Any, Optional +from uuid import UUID + +import pandas as pd +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.orm import Session + +from db import ChemistrySampleInfo, NMAMajorChemistry +from db.engine import session_ctx +from transfers.logger import logger +from transfers.transferer import Transferer +from transfers.util import read_csv + + +class MajorChemistryTransferer(Transferer): + """ + Transfer for the legacy MajorChemistry table. + """ + + source_table = "MajorChemistry" + + def __init__(self, *args, batch_size: int = 1000, **kwargs): + super().__init__(*args, **kwargs) + self.batch_size = batch_size + self._sample_pt_ids: set[UUID] = set() + self._build_sample_pt_id_cache() + + def _build_sample_pt_id_cache(self) -> None: + with session_ctx() as session: + sample_infos = session.query(ChemistrySampleInfo.sample_pt_id).all() + self._sample_pt_ids = {sample_pt_id for (sample_pt_id,) in sample_infos} + logger.info( + f"Built ChemistrySampleInfo cache with {len(self._sample_pt_ids)} entries" + ) + + def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: + input_df = read_csv(self.source_table, parse_dates=["AnalysisDate"]) + cleaned_df = self._filter_to_valid_sample_infos(input_df) + return input_df, cleaned_df + + def _filter_to_valid_sample_infos(self, df: pd.DataFrame) -> pd.DataFrame: + valid_sample_pt_ids = self._sample_pt_ids + mask = df["SamplePtID"].apply( + lambda value: self._uuid_val(value) in valid_sample_pt_ids + ) + before_count = len(df) + filtered_df = df[mask].copy() + after_count = len(filtered_df) + + if before_count > after_count: + skipped = before_count - after_count + logger.warning( + f"Filtered out {skipped} MajorChemistry records without matching " + f"ChemistrySampleInfo ({after_count} valid, {skipped} orphan records prevented)" + ) + + return filtered_df + + def _transfer_hook(self, session: Session) -> None: + row_dicts = [] + skipped_global_id = 0 + for row in self.cleaned_df.to_dict("records"): + row_dict = self._row_dict(row) + if row_dict is None: + continue + if row_dict.get("GlobalID") is None: + skipped_global_id += 1 + logger.warning( + "Skipping MajorChemistry SamplePtID=%s - GlobalID missing or invalid", + row_dict.get("SamplePtID"), + ) + continue + row_dicts.append(row_dict) + + if skipped_global_id > 0: + logger.warning( + "Skipped %s MajorChemistry records without valid GlobalID", + skipped_global_id, + ) + + rows = self._dedupe_rows(row_dicts, key="GlobalID") + insert_stmt = insert(NMAMajorChemistry) + excluded = insert_stmt.excluded + + for i in range(0, len(rows), self.batch_size): + chunk = rows[i : i + self.batch_size] + logger.info( + f"Upserting batch {i}-{i+len(chunk)-1} ({len(chunk)} rows) into MajorChemistry" + ) + stmt = insert_stmt.values(chunk).on_conflict_do_update( + index_elements=["GlobalID"], + set_={ + "SamplePtID": excluded.SamplePtID, + "SamplePointID": excluded.SamplePointID, + "Analyte": excluded.Analyte, + "Symbol": excluded.Symbol, + "SampleValue": excluded.SampleValue, + "Units": excluded.Units, + "Uncertainty": excluded.Uncertainty, + "AnalysisMethod": excluded.AnalysisMethod, + "AnalysisDate": excluded.AnalysisDate, + "Notes": excluded.Notes, + "Volume": excluded.Volume, + "VolumeUnit": excluded.VolumeUnit, + "OBJECTID": excluded.OBJECTID, + "AnalysesAgency": excluded.AnalysesAgency, + "WCLab_ID": excluded.WCLab_ID, + }, + ) + session.execute(stmt) + session.commit() + session.expunge_all() + + def _row_dict(self, row: dict[str, Any]) -> Optional[dict[str, Any]]: + def val(key: str) -> Optional[Any]: + v = row.get(key) + if pd.isna(v): + return None + return v + + def float_val(key: str) -> Optional[float]: + v = val(key) + if v is None: + return None + try: + return float(v) + except (TypeError, ValueError): + return None + + def int_val(key: str) -> Optional[int]: + v = val(key) + if v is None: + return None + try: + return int(v) + except (TypeError, ValueError): + return None + + analysis_date = val("AnalysisDate") + if hasattr(analysis_date, "to_pydatetime"): + analysis_date = analysis_date.to_pydatetime() + if isinstance(analysis_date, datetime): + analysis_date = analysis_date.replace(tzinfo=None) + + sample_pt_id = self._uuid_val(val("SamplePtID")) + if sample_pt_id is None: + self._capture_error( + val("SamplePtID"), + f"Invalid SamplePtID: {val('SamplePtID')}", + "SamplePtID", + ) + return None + + global_id = self._uuid_val(val("GlobalID")) + + return { + "SamplePtID": sample_pt_id, + "SamplePointID": val("SamplePointID"), + "Analyte": val("Analyte"), + "Symbol": val("Symbol"), + "SampleValue": float_val("SampleValue"), + "Units": val("Units"), + "Uncertainty": float_val("Uncertainty"), + "AnalysisMethod": val("AnalysisMethod"), + "AnalysisDate": analysis_date, + "Notes": val("Notes"), + "Volume": int_val("Volume"), + "VolumeUnit": val("VolumeUnit"), + "OBJECTID": val("OBJECTID"), + "GlobalID": global_id, + "AnalysesAgency": val("AnalysesAgency"), + "WCLab_ID": val("WCLab_ID"), + } + + def _dedupe_rows( + self, rows: list[dict[str, Any]], key: str + ) -> list[dict[str, Any]]: + """Dedupe rows by unique key to avoid ON CONFLICT loops. Later rows win.""" + deduped = {} + for row in rows: + gid = row.get(key) + if gid is None: + continue + deduped[gid] = row + return list(deduped.values()) + + def _uuid_val(self, value: Any) -> Optional[UUID]: + if value is None or pd.isna(value): + return None + if isinstance(value, UUID): + return value + if isinstance(value, str): + try: + return UUID(value) + except ValueError: + return None + return None + + +def run(batch_size: int = 1000) -> None: + """Entrypoint to execute the transfer.""" + transferer = MajorChemistryTransferer(batch_size=batch_size) + transferer.transfer() + + +if __name__ == "__main__": + # Allow running via `python -m transfers.major_chemistry` + run() + +# ============= EOF ============================================= diff --git a/transfers/metrics.py b/transfers/metrics.py index 7f276b78..cf50644a 100644 --- a/transfers/metrics.py +++ b/transfers/metrics.py @@ -39,6 +39,7 @@ ChemistrySampleInfo, NMAHydraulicsData, NMARadionuclides, + NMAMajorChemistry, SurfaceWaterData, NMAWaterLevelsContinuousPressureDaily, ViewNGWMNWellConstruction, @@ -121,6 +122,9 @@ def chemistry_sampleinfo_metrics(self, *args, **kw) -> None: def radionuclides_metrics(self, *args, **kw) -> None: self._handle_metrics(NMARadionuclides, name="Radionuclides", *args, **kw) + def major_chemistry_metrics(self, *args, **kw) -> None: + self._handle_metrics(NMAMajorChemistry, name="MajorChemistry", *args, **kw) + def ngwmn_well_construction_metrics(self, *args, **kw) -> None: self._handle_metrics( ViewNGWMNWellConstruction, name="NGWMN WellConstruction", *args, **kw diff --git a/transfers/transfer.py b/transfers/transfer.py index 113d473f..0586e62b 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -48,13 +48,18 @@ from transfers.contact_transfer import ContactTransfer from transfers.sensor_transfer import SensorTransferer from transfers.waterlevels_transfer import WaterLevelTransferer -from transfers.well_transfer import WellTransferer, WellScreenTransferer +from transfers.well_transfer import ( + WellTransferer, + WellScreenTransferer, + cleanup_locations, +) from transfers.minor_trace_chemistry_transfer import MinorTraceChemistryTransferer from transfers.asset_transfer import AssetTransferer from transfers.chemistry_sampleinfo import ChemistrySampleInfoTransferer from transfers.hydraulicsdata import HydraulicsDataTransferer from transfers.radionuclides import RadionuclidesTransferer +from transfers.major_chemistry import MajorChemistryTransferer from transfers.ngwmn_views import ( NGWMNLithologyTransferer, NGWMNWaterLevelsTransferer, @@ -225,6 +230,7 @@ def transfer_all(metrics, limit=100): transfer_surface_water_data = get_bool_env("TRANSFER_SURFACE_WATER_DATA", True) transfer_hydraulics_data = get_bool_env("TRANSFER_HYDRAULICS_DATA", True) transfer_chemistry_sampleinfo = get_bool_env("TRANSFER_CHEMISTRY_SAMPLEINFO", True) + transfer_major_chemistry = get_bool_env("TRANSFER_MAJOR_CHEMISTRY", True) transfer_radionuclides = get_bool_env("TRANSFER_RADIONUCLIDES", True) transfer_ngwmn_views = get_bool_env("TRANSFER_NGWMN_VIEWS", True) transfer_pressure_daily = get_bool_env("TRANSFER_WATERLEVELS_PRESSURE_DAILY", True) @@ -251,6 +257,7 @@ def transfer_all(metrics, limit=100): transfer_surface_water_data, transfer_hydraulics_data, transfer_chemistry_sampleinfo, + transfer_major_chemistry, transfer_radionuclides, transfer_ngwmn_views, transfer_pressure_daily, @@ -274,6 +281,7 @@ def transfer_all(metrics, limit=100): transfer_surface_water_data, transfer_hydraulics_data, transfer_chemistry_sampleinfo, + transfer_major_chemistry, transfer_radionuclides, transfer_ngwmn_views, transfer_pressure_daily, @@ -298,6 +306,7 @@ def _transfer_parallel( transfer_surface_water_data, transfer_hydraulics_data, transfer_chemistry_sampleinfo, + transfer_major_chemistry, transfer_radionuclides, transfer_ngwmn_views, transfer_pressure_daily, @@ -426,6 +435,11 @@ def _transfer_parallel( ) if "WeatherData" in results_map and results_map["WeatherData"]: metrics.weather_data_metrics(*results_map["WeatherData"]) + if transfer_major_chemistry: + message("TRANSFERRING MAJOR CHEMISTRY") + results = _execute_transfer(MajorChemistryTransferer, flags=flags) + metrics.major_chemistry_metrics(*results) + if transfer_radionuclides: message("TRANSFERRING RADIONUCLIDES") results = _execute_transfer(RadionuclidesTransferer, flags=flags) @@ -501,6 +515,7 @@ def _transfer_sequential( transfer_surface_water_data, transfer_hydraulics_data, transfer_chemistry_sampleinfo, + transfer_major_chemistry, transfer_radionuclides, transfer_ngwmn_views, transfer_pressure_daily, @@ -569,6 +584,11 @@ def _transfer_sequential( results = _execute_transfer(ChemistrySampleInfoTransferer, flags=flags) metrics.chemistry_sampleinfo_metrics(*results) + if transfer_major_chemistry: + message("TRANSFERRING MAJOR CHEMISTRY") + results = _execute_transfer(MajorChemistryTransferer, flags=flags) + metrics.major_chemistry_metrics(*results) + if transfer_radionuclides: message("TRANSFERRING RADIONUCLIDES") results = _execute_transfer(RadionuclidesTransferer, flags=flags) @@ -616,6 +636,10 @@ def _transfer_sequential( ) metrics.acoustic_metrics(*results) + message("CLEANING UP LOCATIONS") + with session_ctx() as session: + cleanup_locations(session) + def main(): message("START--------------------------------------") From d86a6b116b9793612cfabfef24745aa2e99b429f Mon Sep 17 00:00:00 2001 From: jakeross Date: Wed, 14 Jan 2026 20:42:04 -0700 Subject: [PATCH 2/2] feat: update down_revision for NMA major chemistry migration --- alembic/versions/a7b8c9d0e1f2_create_nma_major_chemistry.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/alembic/versions/a7b8c9d0e1f2_create_nma_major_chemistry.py b/alembic/versions/a7b8c9d0e1f2_create_nma_major_chemistry.py index eef87a37..15519327 100644 --- a/alembic/versions/a7b8c9d0e1f2_create_nma_major_chemistry.py +++ b/alembic/versions/a7b8c9d0e1f2_create_nma_major_chemistry.py @@ -7,14 +7,14 @@ from typing import Sequence, Union -from alembic import op import sqlalchemy as sa +from alembic import op from sqlalchemy import inspect from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. revision: str = "a7b8c9d0e1f2" -down_revision: Union[str, Sequence[str], None] = "f3b4c5d6e7f8" +down_revision: Union[str, Sequence[str], None] = "f1a2b3c4d5e6" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None