From 9cb1d516d6d2b7b99ea92686a27d908b0c9500ae Mon Sep 17 00:00:00 2001 From: Yuvraaj Date: Mon, 8 Dec 2025 10:33:38 +0530 Subject: [PATCH 1/6] feat : init --- graph_weather/data/bufr_process.py | 15 ++++++ .../data/schema_for_bufr/__init__.py | 0 graph_weather/data/schema_for_bufr/adpupa.py | 46 +++++++++++++++++++ graph_weather/data/schema_for_bufr/base.py | 8 ++++ 4 files changed, 69 insertions(+) create mode 100644 graph_weather/data/bufr_process.py create mode 100644 graph_weather/data/schema_for_bufr/__init__.py create mode 100644 graph_weather/data/schema_for_bufr/adpupa.py create mode 100644 graph_weather/data/schema_for_bufr/base.py diff --git a/graph_weather/data/bufr_process.py b/graph_weather/data/bufr_process.py new file mode 100644 index 00000000..62fc1901 --- /dev/null +++ b/graph_weather/data/bufr_process.py @@ -0,0 +1,15 @@ +""" +A processor aka dataloader for bufr files +> Drafted #177 PR and focusing around ADPUPA as of now +""" + +class BUFR_Process: + """ + Low level bufr file decoder + """ + def __init__(self, schema ): + """ + schema : Majorly focusing around ADPUPA as of now + """ + + diff --git a/graph_weather/data/schema_for_bufr/__init__.py b/graph_weather/data/schema_for_bufr/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/graph_weather/data/schema_for_bufr/adpupa.py b/graph_weather/data/schema_for_bufr/adpupa.py new file mode 100644 index 00000000..2f89b910 --- /dev/null +++ b/graph_weather/data/schema_for_bufr/adpupa.py @@ -0,0 +1,46 @@ +from dataclasses import dataclass +from typing import Optional +from .base import GeoPoint + +@dataclass +class adpupa_level: + pressure_hPa: Optional[float] + geopotential_height_m: Optional[float] + temperature_K: Optional[float] + dewpoint_K: Optional[float] + wind_direction_deg: Optional[float] + wind_speed_ms: Optional[float] + significance: Optional[int] + + qc_pressure: Optional[int] = None + qc_height: Optional[int] = None + qc_temperature: Optional[int] = None + qc_dewpoint: Optional[int] = None + qc_wind_dir: Optional[int] = None + qc_wind_speed: Optional[int] = None + + +@dataclass +class adpupa_obs: + # metadata + station_id: str + datetime: str # or datetime.datetime + location: GeoPoint + + # radiosonde metadata + report_type: Optional[int] = None + data_subcategory: Optional[int] = None + instrument_type: Optional[int] = None + balloon_type: Optional[int] = None + wind_method: Optional[int] = None + + # levels + mandatory_levels: List[adpupa_level] = None + significant_temperature_levels: List[adpupa_level] = None + significant_wind_levels: List[adpupa_level] = None + tropopause_levels: List[adpupa_level] = None + max_wind_levels: List[adpupa_level] = None + + # provenance + file_source: Optional[str] = None + bufr_message_index: Optional[int] = None \ No newline at end of file diff --git a/graph_weather/data/schema_for_bufr/base.py b/graph_weather/data/schema_for_bufr/base.py new file mode 100644 index 00000000..7e056ed2 --- /dev/null +++ b/graph_weather/data/schema_for_bufr/base.py @@ -0,0 +1,8 @@ +from dataclasses import dataclass +from typing import Optional + +@dataclass +class GeoPoint: + lat : float + lon : float + elevation_m : Optional[float] = None \ No newline at end of file From ed3cb04e57676c0e88ab2e6d5e8bfbdfff9eee5f Mon Sep 17 00:00:00 2001 From: Yuvraaj Date: Mon, 8 Dec 2025 10:53:04 +0530 Subject: [PATCH 2/6] feat : internal decoder init for adpupa --- graph_weather/data/bufr_process.py | 103 ++++++++++++++++++++++++++++- 1 file changed, 101 insertions(+), 2 deletions(-) diff --git a/graph_weather/data/bufr_process.py b/graph_weather/data/bufr_process.py index 62fc1901..ca04766f 100644 --- a/graph_weather/data/bufr_process.py +++ b/graph_weather/data/bufr_process.py @@ -1,15 +1,114 @@ """ A processor aka dataloader for bufr files -> Drafted #177 PR and focusing around ADPUPA as of now +> Drafted #177 PR - focusing around ADPUPA as of now """ +from dataclasses import dataclass +from typing import Any, Optional, List +from eccodes import ( + codes_handle_new_from_file, + codes_release, + codes_set, + codes_get, + codes_get_array, +) +from .schema_for_bufr.adpupa import adpupa_level, adpupa_obs +from .schema_for_bufr.base import GeoPoint class BUFR_Process: """ Low level bufr file decoder """ - def __init__(self, schema ): + def __init__(self, schema): """ schema : Majorly focusing around ADPUPA as of now """ + supported = {"adpupa"} + if schema not in supported: + raise ValueError( + f"Unsupported schema '{schema}'. Supported : {supported}" + ) + self.schema = schema + + def decode_file(self, path: str) -> List[adpupa_obs]: + """ + Decode an entire BUFR file into ADPUPA dataclasses. + Returns a list (because a file may contain multiple soundings). + """ + observations = [] + + with open(path, "rb") as fh: + while True: + h = codes_handle_new_from_file(fh, "BUFR") + if not h: + break + + try: + codes_set(h, "unpack", 1) + except Exception: + codes_release(h) + continue + + obs = self._decode_adpupa(h, file_path=path) + if obs is not None: + observations.append(obs) + + codes_release(h) + + return observations + + def _decode_adpupa(self,h,file_path:str)->Optional[adpupa_obs]: + """ + Decode one BUFR message for adpupa + returns adpupa obs or none + """ + station_id = self._safe_str(h, "stationIdentifier") + + year = self._safe(h, "year") + month = self._safe(h, "month") + day = self._safe(h, "day") + hour = self._safe(h, "hour") + minute = self._safe(h, "minute") + + if not all([year, month, day, hour, minute]): + return None + + lat = self._safe(h, "latitude") + lon = self._safe(h, "longitude") + elev = self._safe(h, "heightOfStation") + + location = GeoPoint(lat=lat, lon=lon, elevation_m=elev) + + report_type = self._safe(h, "reportType") + subtype = self._safe(h, "dataSubCategory") + inst_type = self._safe(h, "instrumentType") + balloon_type = self._safe(h, "balloonOrSolarRadiation") + wind_method = self._safe(h, "methodOfWindMeasurement") + + # Decode levels + mandatory = self._decode_level_sequence(h, "mandatory") + sig_temp = self._decode_level_sequence(h, "significantTemperature") + sig_wind = self._decode_level_sequence(h, "significantWind") + trop = self._decode_level_sequence(h, "tropopause") + maxwind = self._decode_level_sequence(h, "maximumWind") + datetime_str = f"{year:04d}-{month:02d}-{day:02d}T{hour:02d}:{minute:02d}:00Z" + obs = adpupa_obs( + station_id=station_id, + datetime=datetime_str, + location=location, + report_type=report_type, + data_subcategory=subtype, + instrument_type=inst_type, + balloon_type=balloon_type, + wind_method=wind_method, + mandatory_levels=mandatory, + significant_temperature_levels=sig_temp, + significant_wind_levels=sig_wind, + tropopause_levels=trop, + max_wind_levels=maxwind, + file_source=file_path, + bufr_message_index=self._safe(h, "bufrHeaderCentre"), + ) + + return obs From e3d1a4095d25a6fb995d2dcf299c20400af65e80 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 8 Dec 2025 06:25:18 +0000 Subject: [PATCH 3/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- graph_weather/data/bufr_process.py | 32 +++++++++----------- graph_weather/data/schema_for_bufr/adpupa.py | 12 +++++--- graph_weather/data/schema_for_bufr/base.py | 11 ++++--- 3 files changed, 28 insertions(+), 27 deletions(-) diff --git a/graph_weather/data/bufr_process.py b/graph_weather/data/bufr_process.py index ca04766f..1f18f53d 100644 --- a/graph_weather/data/bufr_process.py +++ b/graph_weather/data/bufr_process.py @@ -2,33 +2,33 @@ A processor aka dataloader for bufr files > Drafted #177 PR - focusing around ADPUPA as of now """ -from dataclasses import dataclass -from typing import Any, Optional, List + +from typing import List, Optional + from eccodes import ( codes_handle_new_from_file, codes_release, codes_set, - codes_get, - codes_get_array, ) -from .schema_for_bufr.adpupa import adpupa_level, adpupa_obs + +from .schema_for_bufr.adpupa import adpupa_obs from .schema_for_bufr.base import GeoPoint + class BUFR_Process: """ - Low level bufr file decoder + Low level bufr file decoder """ + def __init__(self, schema): """ - schema : Majorly focusing around ADPUPA as of now + Schema : Majorly focusing around ADPUPA as of now """ supported = {"adpupa"} if schema not in supported: - raise ValueError( - f"Unsupported schema '{schema}'. Supported : {supported}" - ) + raise ValueError(f"Unsupported schema '{schema}'. Supported : {supported}") self.schema = schema - + def decode_file(self, path: str) -> List[adpupa_obs]: """ Decode an entire BUFR file into ADPUPA dataclasses. @@ -56,13 +56,13 @@ def decode_file(self, path: str) -> List[adpupa_obs]: return observations - def _decode_adpupa(self,h,file_path:str)->Optional[adpupa_obs]: + def _decode_adpupa(self, h, file_path: str) -> Optional[adpupa_obs]: """ Decode one BUFR message for adpupa returns adpupa obs or none """ station_id = self._safe_str(h, "stationIdentifier") - + year = self._safe(h, "year") month = self._safe(h, "month") day = self._safe(h, "day") @@ -70,9 +70,8 @@ def _decode_adpupa(self,h,file_path:str)->Optional[adpupa_obs]: minute = self._safe(h, "minute") if not all([year, month, day, hour, minute]): - return None + return None - lat = self._safe(h, "latitude") lon = self._safe(h, "longitude") elev = self._safe(h, "heightOfStation") @@ -85,7 +84,7 @@ def _decode_adpupa(self,h,file_path:str)->Optional[adpupa_obs]: balloon_type = self._safe(h, "balloonOrSolarRadiation") wind_method = self._safe(h, "methodOfWindMeasurement") - # Decode levels + # Decode levels mandatory = self._decode_level_sequence(h, "mandatory") sig_temp = self._decode_level_sequence(h, "significantTemperature") sig_wind = self._decode_level_sequence(h, "significantWind") @@ -111,4 +110,3 @@ def _decode_adpupa(self,h,file_path:str)->Optional[adpupa_obs]: ) return obs - diff --git a/graph_weather/data/schema_for_bufr/adpupa.py b/graph_weather/data/schema_for_bufr/adpupa.py index 2f89b910..ea53adca 100644 --- a/graph_weather/data/schema_for_bufr/adpupa.py +++ b/graph_weather/data/schema_for_bufr/adpupa.py @@ -1,8 +1,10 @@ -from dataclasses import dataclass -from typing import Optional +from dataclasses import dataclass +from typing import Optional + from .base import GeoPoint -@dataclass + +@dataclass class adpupa_level: pressure_hPa: Optional[float] geopotential_height_m: Optional[float] @@ -20,7 +22,7 @@ class adpupa_level: qc_wind_speed: Optional[int] = None -@dataclass +@dataclass class adpupa_obs: # metadata station_id: str @@ -43,4 +45,4 @@ class adpupa_obs: # provenance file_source: Optional[str] = None - bufr_message_index: Optional[int] = None \ No newline at end of file + bufr_message_index: Optional[int] = None diff --git a/graph_weather/data/schema_for_bufr/base.py b/graph_weather/data/schema_for_bufr/base.py index 7e056ed2..4ab8aaba 100644 --- a/graph_weather/data/schema_for_bufr/base.py +++ b/graph_weather/data/schema_for_bufr/base.py @@ -1,8 +1,9 @@ -from dataclasses import dataclass -from typing import Optional +from dataclasses import dataclass +from typing import Optional + @dataclass class GeoPoint: - lat : float - lon : float - elevation_m : Optional[float] = None \ No newline at end of file + lat: float + lon: float + elevation_m: Optional[float] = None From 0e22c5662f69ee4f11381d33324d40688d4e4dfa Mon Sep 17 00:00:00 2001 From: Yuvraaj Date: Mon, 8 Dec 2025 15:15:14 +0530 Subject: [PATCH 4/6] feat: test init --- tests/test_bufr.py | 55 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 tests/test_bufr.py diff --git a/tests/test_bufr.py b/tests/test_bufr.py new file mode 100644 index 00000000..e4d778ce --- /dev/null +++ b/tests/test_bufr.py @@ -0,0 +1,55 @@ +import pytest +import pandas as pd +from pathlib import Path + +from graph_weather.data.bufr_process import BUFR_Process +from graph_weather.data.schema_for_bufr.adpupa import adpupa_obs +from graph_weather.data.nnja_ai import load_nnja_dataset + +# @pytest.mark.skip(reason="Requires local historical BUFR + NNJA-AI parquet files.") +def test_adpupa_bufr_vs_parquet(): + """ + Integration test: + - Downloads known historical ADPUPA BUFR + NNJA-AI Parquet + - Decodes BUFR dataclasses + - Loads Parquet pandas + - Compares field-by-field + + """ + + bufr_path = Path("gdas.t00z.1bamua.tm00.bufr") + + nnja_df = load_nnja_dataset( + dataset_name="ADPUPA", + time="2025-12-08T00:00" + ).to_dataframe() + + assert bufr_path.exists(), f"Missing test BUFR: {bufr_path}" + + processor = BUFR_Process("adpupa") + decoded = processor.decode_file(str(bufr_path)) + + assert len(decoded) > 0, "No ADPUPA soundings decoded from BUFR." + obs: adpupa_obs = decoded[0] + # Compare station info + expected_station = nnja_df["station_id"].iloc[0] + assert obs.station_id == expected_station + + expected_datetime = nnja_df["datetime"].iloc[0] + assert obs.datetime == expected_datetime + + assert abs(obs.location.lat - nnja_df["lat"].iloc[0]) < 1e-6 + assert abs(obs.location.lon - nnja_df["lon"].iloc[0]) < 1e-6 + + # Compare pressure levels + + expected_levels = nnja_df.sort_values("level_index") + actual_pressures = [lvl.pressure_hPa for lvl in obs.mandatory_levels] + expected_pressures = expected_levels["pressure_hPa"].tolist() + + assert len(actual_pressures) == len(expected_pressures) + + for a, e in zip(actual_pressures, expected_pressures): + if e is None and a is None: + continue + assert abs(a - e) < 1e-3 From 10843820251d12aa0569f43559d20888b5c82a8d Mon Sep 17 00:00:00 2001 From: Yuvraaj Date: Mon, 8 Dec 2025 15:16:30 +0530 Subject: [PATCH 5/6] chore : fixes --- graph_weather/data/bufr_process.py | 101 ++++++++++++++++++++++++++++- 1 file changed, 100 insertions(+), 1 deletion(-) diff --git a/graph_weather/data/bufr_process.py b/graph_weather/data/bufr_process.py index ca04766f..8a62bd27 100644 --- a/graph_weather/data/bufr_process.py +++ b/graph_weather/data/bufr_process.py @@ -111,4 +111,103 @@ def _decode_adpupa(self,h,file_path:str)->Optional[adpupa_obs]: ) return obs - + + def _decode_level_sequence(self, h, kind): + """ + Extracts one of the ADPUPA sequences. + """ + if kind == "mandatory": + key_map = { + "pressure": ["airPressure", "pressure"], + "height": ["height"], + "temperature": ["airTemperature"], + "dewpoint": ["dewpointTemperature"], + "wind_dir": ["windDirection"], + "wind_speed": ["windSpeed"], + "significance": ["significance"], + } + elif kind == "significantTemperature": + key_map = { + "pressure": ["airPressure", "pressure"], + "height": ["height"], # often absent + "temperature": ["airTemperature"], + "dewpoint": ["dewpointTemperature"], + "wind_dir": [], + "wind_speed": [], + "significance": ["significance"], + } + elif kind == "significantWind": + key_map = { + "pressure": ["airPressure", "pressure"], + "height": [], + "temperature": [], + "dewpoint": [], + "wind_dir": ["windDirection"], + "wind_speed": ["windSpeed"], + "significance": ["significance"], + } + else: + # tropopause / maxwind have mixed keys + key_map = { + "pressure": ["airPressure", "pressure"], + "height": ["height"], + "temperature": ["airTemperature"], + "dewpoint": ["dewpointTemperature"], + "wind_dir": ["windDirection"], + "wind_speed": ["windSpeed"], + "significance": ["significance"], + } + + # Extract arrays + arrays = { + k: self._first_present_array(h, arr_keys) + for k, arr_keys in key_map.items() + } + + # Determine length + length = 0 + for arr in arrays.values(): + if isinstance(arr, list) and len(arr) > length: + length = len(arr) + + levels = [] + for i in range(length): + level = adpupa_level( + pressure_hPa=self._index_or_none(arrays["pressure"], i), + geopotential_height_m=self._index_or_none(arrays["height"], i), + temperature_K=self._index_or_none(arrays["temperature"], i), + dewpoint_K=self._index_or_none(arrays["dewpoint"], i), + wind_direction_deg=self._index_or_none(arrays["wind_dir"], i), + wind_speed_ms=self._index_or_none(arrays["wind_speed"], i), + significance=self._index_or_none(arrays["significance"], i), + ) + levels.append(level) + + return levels + def _safe(self, h, key: str) -> Any: + try: + return codes_get(h, key) + except Exception: + return None + + def _safe_str(self, h, key: str) -> Optional[str]: + val = self._safe(h, key) + return str(val) if val is not None else None + + def _safe_array(self, h, key: str) -> Optional[List[Any]]: + try: + return list(codes_get_array(h, key)) + except Exception: + return None + + def _first_present_array(self, h, keys: List[str]) -> Optional[List[Any]]: + for k in keys: + arr = self._safe_array(h, k) + if arr is not None: + return arr + return None + + def _index_or_none(self, arr: Optional[List[Any]], i: int) -> Any: + if arr is None or i >= len(arr): + return None + return arr[i] From c20d68a03fe5a9569acb38fc746cb38f0b712da8 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 8 Dec 2025 09:47:15 +0000 Subject: [PATCH 6/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- graph_weather/data/bufr_process.py | 6 ++---- tests/test_bufr.py | 12 +++++------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/graph_weather/data/bufr_process.py b/graph_weather/data/bufr_process.py index 4c386fe4..323ec079 100644 --- a/graph_weather/data/bufr_process.py +++ b/graph_weather/data/bufr_process.py @@ -158,10 +158,7 @@ def _decode_level_sequence(self, h, kind): } # Extract arrays - arrays = { - k: self._first_present_array(h, arr_keys) - for k, arr_keys in key_map.items() - } + arrays = {k: self._first_present_array(h, arr_keys) for k, arr_keys in key_map.items()} # Determine length length = 0 @@ -183,6 +180,7 @@ def _decode_level_sequence(self, h, kind): levels.append(level) return levels + def _safe(self, h, key: str) -> Any: try: return codes_get(h, key) diff --git a/tests/test_bufr.py b/tests/test_bufr.py index e4d778ce..15abd300 100644 --- a/tests/test_bufr.py +++ b/tests/test_bufr.py @@ -4,7 +4,8 @@ from graph_weather.data.bufr_process import BUFR_Process from graph_weather.data.schema_for_bufr.adpupa import adpupa_obs -from graph_weather.data.nnja_ai import load_nnja_dataset +from graph_weather.data.nnja_ai import load_nnja_dataset + # @pytest.mark.skip(reason="Requires local historical BUFR + NNJA-AI parquet files.") def test_adpupa_bufr_vs_parquet(): @@ -19,10 +20,7 @@ def test_adpupa_bufr_vs_parquet(): bufr_path = Path("gdas.t00z.1bamua.tm00.bufr") - nnja_df = load_nnja_dataset( - dataset_name="ADPUPA", - time="2025-12-08T00:00" - ).to_dataframe() + nnja_df = load_nnja_dataset(dataset_name="ADPUPA", time="2025-12-08T00:00").to_dataframe() assert bufr_path.exists(), f"Missing test BUFR: {bufr_path}" @@ -40,9 +38,9 @@ def test_adpupa_bufr_vs_parquet(): assert abs(obs.location.lat - nnja_df["lat"].iloc[0]) < 1e-6 assert abs(obs.location.lon - nnja_df["lon"].iloc[0]) < 1e-6 - + # Compare pressure levels - + expected_levels = nnja_df.sort_values("level_index") actual_pressures = [lvl.pressure_hPa for lvl in obs.mandatory_levels] expected_pressures = expected_levels["pressure_hPa"].tolist()