Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
3fa0540
init
brettedw Mar 2, 2026
8064bcc
diif & histo title
brettedw Mar 2, 2026
5c7ff5a
use threshold for wind
brettedw Mar 2, 2026
759692e
wind speed interpolator
brettedw Mar 2, 2026
38872d3
no sonar?
brettedw Mar 2, 2026
297919a
Revert "no sonar?"
brettedw Mar 3, 2026
03c51d2
plotter
brettedw Mar 3, 2026
c7e6847
docstring
brettedw Mar 3, 2026
510de21
tests
brettedw Mar 3, 2026
6317d53
test comment
brettedw Mar 3, 2026
0929811
fwi refactor for wind speed + ISI/BUI/FWI
brettedw Mar 3, 2026
39367df
COG for interpolation
brettedw Mar 3, 2026
9e71fd7
comments
brettedw Mar 3, 2026
8b5b0f3
roundtrip test
brettedw Mar 4, 2026
ac2c4ef
add wind uv tests
brettedw Mar 4, 2026
b9f5584
test u/v fail
brettedw Mar 5, 2026
d30d575
BaseInterpolator
brettedw Mar 5, 2026
b24a612
sfms logging
brettedw Mar 5, 2026
60e7faf
validity lat/long alignment
brettedw Mar 5, 2026
b1b46c7
FWIDatasets
brettedw Mar 5, 2026
ab28745
no cast
brettedw Mar 5, 2026
9651b4b
Revert "BaseInterpolator"
brettedw Mar 6, 2026
e0bf06e
simplify
brettedw Mar 6, 2026
40d16ff
object type
brettedw Mar 6, 2026
84e492c
map using WPSDataset.ds_path
brettedw Mar 6, 2026
e632c34
tests
brettedw Mar 6, 2026
41d2458
test update
brettedw Mar 6, 2026
db12699
refactor
brettedw Mar 9, 2026
1a45dcb
grid explainer
brettedw Mar 9, 2026
18bdac3
test edits
brettedw Mar 9, 2026
c31ce02
import
brettedw Mar 9, 2026
742f6d2
docstring
brettedw Mar 9, 2026
e8b2ff3
geojson test remove
brettedw Mar 9, 2026
84b55e7
field & source
brettedw Mar 9, 2026
328b715
COGs with publish
brettedw Mar 10, 2026
da69f45
add tests
brettedw Mar 10, 2026
bb0a8dd
docstring
brettedw Mar 10, 2026
c0c7e13
_VALID_SFMS_ATTRIBUTES
brettedw Mar 10, 2026
4013029
use rasters_match
brettedw Mar 10, 2026
e71c30d
jdk
brettedw Mar 10, 2026
8b438c4
Merge branch 'main' into task/sfms-daily-refactor
brettedw Mar 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,27 @@ jobs:
with:
fetch-depth: 0
- uses: ./.github/actions/python-setup
with:
install-jdk: "true"
- name: Unit Test (wps_weather)
working-directory: ./backend
run: |
uv run pytest packages/wps-weather/src/wps_weather/tests -x -o log_cli=true --disable-warnings -vvv

test-wps-sfms:
name: Python - WPS SFMS Test
runs-on: ubuntu-24.04
container:
image: ghcr.io/bcgov/wps/wps-api-base:01-29-2026
options: --user 0
steps:
- uses: actions/checkout@v6
with:
fetch-depth: 0
- uses: ./.github/actions/python-setup
- name: Unit Test (wps_sfms)
working-directory: ./backend
run: |
uv run pytest packages/wps-sfms/src/wps_sfms/tests -x -o log_cli=true --disable-warnings -vvv

test-wps-wf1:
name: Python - WF1 Test with coverage
runs-on: ubuntu-24.04
Expand Down
232 changes: 163 additions & 69 deletions backend/packages/wps-api/src/app/jobs/sfms_daily_actuals.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,36 @@
"""
Job for running SFMS daily actuals: temperature interpolation followed by
precipitation interpolation for the current date.
Job for running SFMS daily actuals weather interpolation and FWI processing.

Usage:
python -m app.jobs.sfms_daily_actuals "YYYY-MM-DD"
python -m app.jobs.sfms_daily_actuals # Uses current date
"""

import asyncio
from dataclasses import dataclass
import logging
import os
import sys
from datetime import datetime, timezone
from typing import Awaitable, Callable

from aiohttp import ClientSession

from wps_sfms.interpolation.source import (
StationDCSource,
StationDewPointSource,
StationDMCSource,
StationFFMCSource,
StationPrecipitationSource,
StationTemperatureSource,
from wps_sfms.interpolation.field import (
build_dc_field,
build_dewpoint_field,
build_dmc_field,
build_ffmc_field,
build_precipitation_field,
build_temperature_field,
build_wind_speed_field,
build_wind_vector_field,
)
from wps_sfms.processors.fwi import DCCalculator, DMCCalculator, FFMCCalculator, FWIProcessor
from wps_sfms.processors.idw import Interpolator
from wps_sfms.processors.idw import Interpolator, RasterProcessor
from wps_sfms.processors.relative_humidity import RHInterpolator
from wps_sfms.processors.temperature import TemperatureInterpolator
from wps_sfms.processors.wind import WindDirectionInterpolator, WindSpeedInterpolator
from wps_shared.db.crud.fuel_layer import get_fuel_type_raster_by_year
from wps_shared.db.crud.sfms_run import save_sfms_run, track_sfms_run
from wps_shared.db.database import get_async_read_session_scope, get_async_write_session_scope
Expand All @@ -43,6 +47,20 @@
logger = logging.getLogger(__name__)


@dataclass(frozen=True)
class RasterInterpolationJob:
job_name: SFMSRunLogJobName
output_key: str
log_label: str
processor: RasterProcessor


@dataclass(frozen=True)
class FWICalculationJob:
job_name: SFMSRunLogJobName
calculator: FFMCCalculator | DMCCalculator | DCCalculator


def is_fwi_interpolation_day(dt: datetime) -> bool:
"""Return True if FWI indices should be re-interpolated from station observations.

Expand All @@ -53,6 +71,37 @@ def is_fwi_interpolation_day(dt: datetime) -> bool:
return dt.weekday() == 0 and dt.month in (4, 5)


async def _run_tracked_job(
job_name: SFMSRunLogJobName,
sfms_run_id: int,
session,
action: Callable[[], Awaitable[object]],
):
@track_sfms_run(job_name, sfms_run_id, session)
async def _wrapped():
return await action()

return await _wrapped()


async def _process_raster_job(
*,
job_name: SFMSRunLogJobName,
sfms_run_id: int,
session,
processor: RasterProcessor,
s3_client: S3Client,
fuel_raster_path: str,
output_key: str,
log_label: str,
) -> None:
async def _run() -> None:
s3_key = await processor.process(s3_client, fuel_raster_path, output_key)
logger.info("%s: %s", log_label, s3_key)

await _run_tracked_job(job_name, sfms_run_id, session, _run)


async def run_weather_interpolation(
datetime_to_process: datetime,
raster_addresser: SFMSNGRasterAddresser,
Expand All @@ -62,50 +111,69 @@ async def run_weather_interpolation(
sfms_run_id: int,
session,
) -> None:
"""Interpolate temperature, RH, and precipitation from station observations."""
"""Interpolate weather rasters from station observations."""
mask_path = raster_addresser.get_mask_key()
dem_path = raster_addresser.get_dem_key()
temp_key = raster_addresser.get_actual_weather_key(
temp_output_key = raster_addresser.get_actual_weather_key(
datetime_to_process, SFMSInterpolatedWeatherParameter.TEMP
)
temp_processor = TemperatureInterpolator(mask_path, dem_path)
rh_processor = RHInterpolator(mask_path, dem_path, raster_addresser.gdal_path(temp_key))
precip_processor = Interpolator(mask_path)

@track_sfms_run(SFMSRunLogJobName.TEMPERATURE_INTERPOLATION, sfms_run_id, session)
async def run_temperature_interpolation() -> None:
temp_s3_key = await temp_processor.process(
s3_client, fuel_raster_path, StationTemperatureSource(sfms_actuals), temp_key
)
logger.info("Temperature interpolation raster: %s", temp_s3_key)

@track_sfms_run(SFMSRunLogJobName.RH_INTERPOLATION, sfms_run_id, session)
async def run_rh_interpolation() -> None:
rh_s3_key = await rh_processor.process(
s3_client,
fuel_raster_path,
StationDewPointSource(sfms_actuals),
raster_addresser.get_actual_weather_key(
temp_raster_path = raster_addresser.gdal_path(temp_output_key)
jobs = [
RasterInterpolationJob(
job_name=SFMSRunLogJobName.TEMPERATURE_INTERPOLATION,
output_key=temp_output_key,
log_label="Temperature interpolation raster",
processor=TemperatureInterpolator(
mask_path, dem_path, build_temperature_field(sfms_actuals)
),
),
RasterInterpolationJob(
job_name=SFMSRunLogJobName.RH_INTERPOLATION,
output_key=raster_addresser.get_actual_weather_key(
datetime_to_process, SFMSInterpolatedWeatherParameter.RH
),
)
logger.info("RH interpolation raster: %s", rh_s3_key)

@track_sfms_run(SFMSRunLogJobName.PRECIPITATION_INTERPOLATION, sfms_run_id, session)
async def run_precipitation_interpolation() -> None:
precip_s3_key = await precip_processor.process(
s3_client,
fuel_raster_path,
StationPrecipitationSource(sfms_actuals),
raster_addresser.get_actual_weather_key(
log_label="RH interpolation raster",
processor=RHInterpolator(
mask_path, dem_path, temp_raster_path, build_dewpoint_field(sfms_actuals)
),
),
RasterInterpolationJob(
job_name=SFMSRunLogJobName.WIND_SPEED_INTERPOLATION,
output_key=raster_addresser.get_actual_weather_key(
datetime_to_process, SFMSInterpolatedWeatherParameter.WIND_SPEED
),
log_label="Wind speed interpolation raster",
processor=WindSpeedInterpolator(mask_path, build_wind_speed_field(sfms_actuals)),
),
RasterInterpolationJob(
job_name=SFMSRunLogJobName.WIND_DIRECTION_INTERPOLATION,
output_key=raster_addresser.get_actual_weather_key(
datetime_to_process, SFMSInterpolatedWeatherParameter.WIND_DIRECTION
),
log_label="Wind direction interpolation raster",
processor=WindDirectionInterpolator(mask_path, build_wind_vector_field(sfms_actuals)),
),
RasterInterpolationJob(
job_name=SFMSRunLogJobName.PRECIPITATION_INTERPOLATION,
output_key=raster_addresser.get_actual_weather_key(
datetime_to_process, SFMSInterpolatedWeatherParameter.PRECIP
),
)
logger.info("Precip interpolation raster: %s", precip_s3_key)
log_label="Precip interpolation raster",
processor=Interpolator(mask_path, build_precipitation_field(sfms_actuals)),
),
]

await run_temperature_interpolation()
await run_rh_interpolation()
await run_precipitation_interpolation()
for job in jobs:
await _process_raster_job(
job_name=job.job_name,
sfms_run_id=sfms_run_id,
session=session,
processor=job.processor,
s3_client=s3_client,
fuel_raster_path=fuel_raster_path,
output_key=job.output_key,
log_label=job.log_label,
)


async def run_fwi_interpolation(
Expand All @@ -123,23 +191,41 @@ async def run_fwi_interpolation(
)

mask_path = raster_addresser.get_mask_key()
fwi_sources = [
(SFMSRunLogJobName.FFMC_INTERPOLATION, StationFFMCSource(sfms_actuals), FWIParameter.FFMC),
(SFMSRunLogJobName.DMC_INTERPOLATION, StationDMCSource(sfms_actuals), FWIParameter.DMC),
(SFMSRunLogJobName.DC_INTERPOLATION, StationDCSource(sfms_actuals), FWIParameter.DC),
ffmc_output_key = raster_addresser.get_actual_index_key(datetime_to_process, FWIParameter.FFMC)
dmc_output_key = raster_addresser.get_actual_index_key(datetime_to_process, FWIParameter.DMC)
dc_output_key = raster_addresser.get_actual_index_key(datetime_to_process, FWIParameter.DC)
jobs = [
RasterInterpolationJob(
job_name=SFMSRunLogJobName.FFMC_INTERPOLATION,
output_key=ffmc_output_key,
log_label=f"{SFMSRunLogJobName.FFMC_INTERPOLATION.value} raster",
processor=Interpolator(mask_path, build_ffmc_field(sfms_actuals)),
),
RasterInterpolationJob(
job_name=SFMSRunLogJobName.DMC_INTERPOLATION,
output_key=dmc_output_key,
log_label=f"{SFMSRunLogJobName.DMC_INTERPOLATION.value} raster",
processor=Interpolator(mask_path, build_dmc_field(sfms_actuals)),
),
RasterInterpolationJob(
job_name=SFMSRunLogJobName.DC_INTERPOLATION,
output_key=dc_output_key,
log_label=f"{SFMSRunLogJobName.DC_INTERPOLATION.value} raster",
processor=Interpolator(mask_path, build_dc_field(sfms_actuals)),
),
]

processor = Interpolator(mask_path)

for job_name, source, fwi_param in fwi_sources:

@track_sfms_run(job_name, sfms_run_id, session)
async def _run(_source=source, _job_name=job_name, _fwi_param=fwi_param) -> None:
output_key = raster_addresser.get_actual_index_key(datetime_to_process, _fwi_param)
s3_key = await processor.process(s3_client, fuel_raster_path, _source, output_key)
logger.info("%s interpolation raster: %s", _job_name.value, s3_key)

await _run()
for job in jobs:
await _process_raster_job(
job_name=job.job_name,
sfms_run_id=sfms_run_id,
session=session,
processor=job.processor,
s3_client=s3_client,
fuel_raster_path=fuel_raster_path,
output_key=job.output_key,
log_label=job.log_label,
)


async def run_fwi_calculations(
Expand All @@ -158,28 +244,36 @@ async def run_fwi_calculations(
fwi_processor = FWIProcessor(datetime_to_process)
month = datetime_to_process.month

fwi_calculations = [
(SFMSRunLogJobName.FFMC_CALCULATION, FFMCCalculator()),
(SFMSRunLogJobName.DMC_CALCULATION, DMCCalculator(month)),
(SFMSRunLogJobName.DC_CALCULATION, DCCalculator(month)),
jobs = [
FWICalculationJob(
job_name=SFMSRunLogJobName.FFMC_CALCULATION,
calculator=FFMCCalculator(),
),
FWICalculationJob(
job_name=SFMSRunLogJobName.DMC_CALCULATION,
calculator=DMCCalculator(month),
),
FWICalculationJob(
job_name=SFMSRunLogJobName.DC_CALCULATION,
calculator=DCCalculator(month),
),
]

for job_name, calculator in fwi_calculations:
for job in jobs:

@track_sfms_run(job_name, sfms_run_id, session)
async def _run(_calculator=calculator) -> None:
async def _run(_calculator=job.calculator) -> None:
_fwi_inputs = raster_addresser.get_actual_fwi_inputs(
datetime_to_process, _calculator.fwi_param
)
await fwi_processor.calculate_index(
s3_client, multi_wps_dataset_context, _calculator, _fwi_inputs
)

await _run()
await _run_tracked_job(job.job_name, sfms_run_id, session, _run)


async def run_sfms_daily_actuals(target_date: datetime) -> None:
"""Run temperature then precipitation interpolation for the given date."""
"""Run SFMS daily weather interpolation and FWI updates for the given date."""
logger.info("Starting SFMS daily actuals for %s", target_date.date())

raster_addresser = SFMSNGRasterAddresser()
Expand Down
2 changes: 2 additions & 0 deletions backend/packages/wps-api/src/app/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def create_mock_sfms_actuals():
relative_humidity=50.0,
precipitation=2.5,
wind_speed=10.0,
wind_direction=180.0,
ffmc=85.0,
dmc=30.0,
dc=200.0,
Expand All @@ -46,6 +47,7 @@ def create_mock_sfms_actuals():
relative_humidity=60.0,
precipitation=5.0,
wind_speed=8.0,
wind_direction=200.0,
ffmc=80.0,
dmc=25.0,
dc=180.0,
Expand Down
Loading
Loading