Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
49949c2
WQP TDS duplicate handling
jacob-a-brown May 19, 2025
c8b3dbb
Formatting changes
jacob-a-brown May 19, 2025
15b7c9d
mypy and _clean_records fix
jacob-a-brown May 19, 2025
bfab903
Merge branch 'dev/jab' of https://github.com/DataIntegrationGroup/Dat…
jacob-a-brown May 19, 2025
21dd9ac
log removing duplicate TDS data for WQP
jacob-a-brown May 19, 2025
ee55ee8
Merge branch 'pre-production' into dev/jab
jacob-a-brown May 19, 2025
d2df6f4
Formatting changes
jacob-a-brown May 19, 2025
d972ba8
Merge branch 'dev/jab' of https://github.com/DataIntegrationGroup/Dat…
jacob-a-brown May 19, 2025
572139e
use ActivityIdentifier to identify duplicate records
jacob-a-brown May 20, 2025
54a74b9
bump version to 0.9.6 for duplicate handling fix
jacob-a-brown May 20, 2025
ed5cc41
remove -rx flag to make parsing logs easier
jacob-a-brown May 20, 2025
3efe798
test tests
jacob-a-brown May 20, 2025
503cbde
more test tests
jacob-a-brown May 20, 2025
d28a6f6
Formatting changes
jacob-a-brown May 20, 2025
c82ab35
make output format case insensitive
jacob-a-brown May 20, 2025
74194a5
Merge branch 'dev/jab' of https://github.com/DataIntegrationGroup/Dat…
jacob-a-brown May 20, 2025
9203a5b
Formatting changes
jacob-a-brown May 20, 2025
781f577
test cli and sources in GitHub Actions
jacob-a-brown May 20, 2025
f31a927
Merge branch 'dev/jab' of https://github.com/DataIntegrationGroup/Dat…
jacob-a-brown May 20, 2025
99cf02b
complete all tests even if failure
jacob-a-brown May 20, 2025
93880d9
log date of duplicate records removed
jacob-a-brown May 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cicd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@ jobs:

- name: Test with pytest
run: |
pytest -s -rx tests
pytest -s tests
49 changes: 46 additions & 3 deletions backend/connectors/wqp/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
DT_MEASURED,
EARLIEST,
LATEST,
TDS,
WATERLEVELS,
USGS_PCODE_30210,
USGS_PCODE_70300,
USGS_PCODE_70301,
USGS_PCODE_70303,
)
from backend.connectors.wqp.transformer import (
WQPSiteTransformer,
Expand Down Expand Up @@ -97,7 +103,7 @@ def get_records(self):
else:
# every record with pCode 30210 (depth in m) has a corresponding
# record with pCode 72019 (depth in ft) but not vice versa
params["pCode"] = "30210"
params["pCode"] = USGS_PCODE_30210

params.update(get_date_range(config))

Expand Down Expand Up @@ -130,7 +136,44 @@ def _extract_source_parameter_results(self, records):
return [ri["ResultMeasureValue"] for ri in records]

def _clean_records(self, records):
return [ri for ri in records if ri["ResultMeasureValue"]]
clean_records = [r for r in records if r["ResultMeasureValue"]]

if self.config.parameter == TDS and len(clean_records) > 1:
site_id = clean_records[0]["MonitoringLocationIdentifier"]
return_records = []
activity_identifiers = [record["ActivityIdentifier"] for record in records]
activity_identifiers = list(set(activity_identifiers))
for activity_identifier in activity_identifiers:
# get all records for this activity identifier
ai_records = {
record["USGSPCode"]: record
for record in records
if record["ActivityIdentifier"] == activity_identifier
}
if len(ai_records.items()) > 1:
if USGS_PCODE_70300 in ai_records.keys():
kept_record = ai_records[USGS_PCODE_70300]
pcode = USGS_PCODE_70300
elif USGS_PCODE_70301 in ai_records.keys():
kept_record = ai_records[USGS_PCODE_70301]
pcode = USGS_PCODE_70301
elif USGS_PCODE_70303 in ai_records.keys():
kept_record = ai_records[USGS_PCODE_70303]
pcode = USGS_PCODE_70303
else:
raise ValueError(
f"Multiple TDS records found for {site_id} with ActivityIdentifier {activity_identifier} but no 70300, 70301, or 70303 pcodes found."
)
record_date = kept_record["ActivityStartDate"]
self.log(
f"Removing duplicates for {site_id} on {record_date} with ActivityIdentifier {activity_identifier}. Keeping record with pcode {pcode}."
)
else:
kept_record = list(ai_records.values())[0]
return_records.append(kept_record)
return return_records
else:
return clean_records

def _extract_source_parameter_units(self, records):
return [ri["ResultMeasure/MeasureUnitCode"] for ri in records]
Expand Down Expand Up @@ -160,7 +203,7 @@ def get_records(self, site_record):
}
params.update(get_date_range(self.config))

if config.parameter.lower() != "waterlevels":
if config.parameter.lower() != WATERLEVELS:
params["characteristicName"] = get_analyte_search_param(
config.parameter, WQP_ANALYTE_MAPPING
)
Expand Down
5 changes: 5 additions & 0 deletions backend/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@
SOURCE_PARAMETER_UNITS = "source_parameter_units"
CONVERSION_FACTOR = "conversion_factor"

USGS_PCODE_30210 = "30210"
USGS_PCODE_70300 = "70300"
USGS_PCODE_70301 = "70301"
USGS_PCODE_70303 = "70303"

ANALYTE_OPTIONS = sorted(
[
ARSENIC,
Expand Down
4 changes: 2 additions & 2 deletions frontend/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,11 @@ def cli():
)
]

OUTPUT_FORMATS = sorted([value for value in OutputFormat])
OUTPUT_FORMATS = sorted([of for of in OutputFormat])
OUTPUT_FORMAT_OPTIONS = [
click.option(
"--output-format",
type=click.Choice(OUTPUT_FORMATS),
type=click.Choice(OUTPUT_FORMATS, case_sensitive=False),
default="csv",
help=f"Output file format for sites: {OUTPUT_FORMATS}. Default is csv",
)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

setup(
name="nmuwd",
version="0.9.5",
version="0.9.6",
author="Jake Ross",
description="New Mexico Water Data Integration Engine",
long_description=long_description,
Expand Down
140 changes: 74 additions & 66 deletions tests/test_cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,74 +104,82 @@ def _test_weave(
result = self.runner.invoke(weave, arguments, standalone_mode=False)

# Assert
assert result.exit_code == 0

"""
For the config, check that

0. (set output dir to clean up tests results even in event of failure)
1. The parameter is set correctly
2. The agencies are set correctly
3. The output types are set correctly
4. The site limit is set correctly
5. The dry is set correctly
6. The start date is set correctly
7. The end date is set correctly
8. The geographic filter is set correctly
9. The site output type is set correctly
"""
config = result.return_value

# 0
self.output_dir = Path(config.output_path)

# 1
assert getattr(config, "parameter") == parameter

# 2
agency_with_underscore = self.agency.replace("-", "_")
if self.agency_reports_parameter[parameter]:
assert getattr(config, f"use_source_{agency_with_underscore}") is True
else:
assert getattr(config, f"use_source_{agency_with_underscore}") is False

for no_agency in no_agencies:
no_agency_with_underscore = no_agency.replace("--no-", "").replace("-", "_")
assert getattr(config, f"use_source_{no_agency_with_underscore}") is False

# 3
output_types = ["summary", "timeseries_unified", "timeseries_separated"]
for ot in output_types:
if ot == output_type:
assert getattr(config, f"output_{ot}") is True
try:
assert result.exit_code == 0

"""
For the config, check that

0. (set output dir to clean up tests results even in event of failure)
1. The parameter is set correctly
2. The agencies are set correctly
3. The output types are set correctly
4. The site limit is set correctly
5. The dry is set correctly
6. The start date is set correctly
7. The end date is set correctly
8. The geographic filter is set correctly
9. The site output type is set correctly
"""
config = result.return_value

# 0
self.output_dir = Path(config.output_path)

# 1
assert getattr(config, "parameter") == parameter

# 2
agency_with_underscore = self.agency.replace("-", "_")
if self.agency_reports_parameter[parameter]:
assert getattr(config, f"use_source_{agency_with_underscore}") is True
else:
assert getattr(config, f"output_{ot}") is False

# 4
assert getattr(config, "site_limit") == 4

# 5
assert getattr(config, "dry") is True

# 6
assert getattr(config, "start_date") == start_date

# 7
assert getattr(config, "end_date") == end_date

# 8
if geographic_filter_name and geographic_filter_value:
for _geographic_filter_name in ["bbox", "county", "wkt"]:
if _geographic_filter_name == geographic_filter_name:
assert (
getattr(config, _geographic_filter_name)
== geographic_filter_value
)
assert getattr(config, f"use_source_{agency_with_underscore}") is False

for no_agency in no_agencies:
no_agency_with_underscore = no_agency.replace("--no-", "").replace(
"-", "_"
)
assert (
getattr(config, f"use_source_{no_agency_with_underscore}") is False
)

# 3
output_types = ["summary", "timeseries_unified", "timeseries_separated"]
for ot in output_types:
if ot == output_type:
assert getattr(config, f"output_{ot}") is True
else:
assert getattr(config, _geographic_filter_name) == ""

# 9
assert getattr(config, "output_format") == output_format
assert getattr(config, f"output_{ot}") is False

# 4
assert getattr(config, "site_limit") == 4

# 5
assert getattr(config, "dry") is True

# 6
assert getattr(config, "start_date") == start_date

# 7
assert getattr(config, "end_date") == end_date

# 8
if geographic_filter_name and geographic_filter_value:
for _geographic_filter_name in ["bbox", "county", "wkt"]:
if _geographic_filter_name == geographic_filter_name:
assert (
getattr(config, _geographic_filter_name)
== geographic_filter_value
)
else:
assert getattr(config, _geographic_filter_name) == ""

# 9
assert getattr(config, "output_format") == output_format
except Exception as e:
print(result)
assert False

def test_weave_summary(self):
self._test_weave(parameter=WATERLEVELS, output_type="summary")
Expand Down
Loading