diff --git a/README.rst b/README.rst index 0be4e10c..e6cdfa30 100644 --- a/README.rst +++ b/README.rst @@ -244,6 +244,23 @@ This can be used to retrieve the schedule, using: The client will re-try until the schedule is available or the ``MAX_POLLING_STEPS`` of ``10`` is reached. +Forecasting +=========== + +Trigger a forecast for a sensor and wait for the result: + +.. code-block:: python + + forecast = await client.trigger_and_get_forecast( + sensor_id=, # int + duration="PT24H", # ISO duration – how far ahead to forecast + ) + # Returns e.g. {"values": [1.2, 1.5, ...], "start": "...", "duration": "PT24H", "unit": "kW"} + +The client polls until the forecasting job is complete. For more advanced options +(training window, regressors, forecast frequency, etc.) see :doc:`forecasting`. + + Development ============== diff --git a/docs/forecasting.rst b/docs/forecasting.rst new file mode 100644 index 00000000..5bc3dacd --- /dev/null +++ b/docs/forecasting.rst @@ -0,0 +1,147 @@ +.. _forecasting: + +Forecasting +=========== + +The FlexMeasures Client supports the forecasting API endpoints introduced in +FlexMeasures v0.31.0: + +- ``POST /sensors//forecasts/trigger`` — queue a forecasting job +- ``GET /sensors//forecasts/`` — poll for results + +These are exposed through three client methods: + +- :meth:`trigger_forecast` — trigger and return the job UUID +- :meth:`get_forecast` — poll until results are ready +- :meth:`trigger_and_get_forecast` — convenience wrapper for both + +.. note:: + + These endpoints require a FlexMeasures server of version **0.31.0** or above. + + +Basic example +------------- + +Forecast the next 24 hours for a sensor, using server-side defaults for the +training window: + +.. code-block:: python + + import asyncio + from flexmeasures_client import FlexMeasuresClient + + async def main(): + client = FlexMeasuresClient( + host="localhost:5000", + ssl=False, + email="user@example.com", + password="password", + ) + + forecast = await client.trigger_and_get_forecast( + sensor_id=1, + duration="PT24H", + ) + print(forecast) + # e.g. {"values": [1.2, 1.5, 1.8, ...], "start": "...", "duration": "PT24H", "unit": "kW"} + + await client.close() + + asyncio.run(main()) + + +Specifying a forecast window +----------------------------- + +Use ``start`` and ``end`` (or ``start`` and ``duration``) to define the exact +period to forecast: + +.. code-block:: python + + forecast = await client.trigger_and_get_forecast( + sensor_id=1, + start="2025-01-15T00:00:00+01:00", + end="2025-01-17T00:00:00+01:00", + ) + + +Controlling the training window +--------------------------------- + +Pass training parameters inside a nested structure via the ``train_start``, +``train_period``, and ``retrain_frequency`` keyword arguments: + +.. code-block:: python + + forecast = await client.trigger_and_get_forecast( + sensor_id=1, + start="2025-01-15T00:00:00+01:00", + duration="PT48H", + # Training configuration + train_start="2025-01-01T00:00:00+01:00", # historical data start + train_period="P14D", # use 14 days of history + retrain_frequency="PT24H", # retrain every 24 h + ) + + +Using regressors +---------------- + +You can improve forecast accuracy by supplying regressor sensor IDs: + +.. code-block:: python + + forecast = await client.trigger_and_get_forecast( + sensor_id=1, + duration="PT24H", + # Sensors whose *forecasts* matter (e.g. weather forecasts) + future_regressors=[10, 11], + # Sensors whose *measurements* matter (e.g. price history) + past_regressors=[20], + ) + + +Step-by-step usage +------------------- + +Trigger and retrieve separately to handle the job UUID yourself: + +.. code-block:: python + + # Step 1 – enqueue the forecasting job + forecast_id = await client.trigger_forecast( + sensor_id=1, + start="2025-01-15T00:00:00+01:00", + end="2025-01-17T00:00:00+01:00", + ) + print(f"Job queued: {forecast_id}") + + # Step 2 – poll until the job finishes + forecast = await client.get_forecast( + sensor_id=1, + forecast_id=forecast_id, + ) + print(forecast) + + +Polling behaviour +----------------- + +``get_forecast`` polls the server with a ``GET`` request and returns when the +server responds with HTTP 200. The polling respects the same client-level +settings as scheduling: + +- ``polling_interval`` (default 10 s) — time between retries +- ``polling_timeout`` (default 200 s) — maximum total wait time +- ``max_polling_steps`` (default 10) — maximum number of poll attempts + +Override them at client construction time: + +.. code-block:: python + + client = FlexMeasuresClient( + ..., + polling_interval=5.0, # check every 5 seconds + polling_timeout=300.0, # wait up to 5 minutes + ) diff --git a/docs/index.rst b/docs/index.rst index 980910dd..05174429 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -30,6 +30,7 @@ Contents :maxdepth: 2 Overview + Forecasting Contributions & Help License Authors diff --git a/src/flexmeasures_client/client.py b/src/flexmeasures_client/client.py index c28ca854..adb0282b 100644 --- a/src/flexmeasures_client/client.py +++ b/src/flexmeasures_client/client.py @@ -1265,6 +1265,210 @@ async def trigger_schedule( self.logger.info(f"Schedule triggered successfully. Schedule ID: {schedule_id}") return schedule_id + async def trigger_forecast( + self, + sensor_id: int, + start: str | datetime | None = None, + end: str | datetime | None = None, + duration: str | timedelta | None = None, + train_start: str | datetime | None = None, + train_period: str | timedelta | None = None, + max_training_period: str | timedelta | None = None, + retrain_frequency: str | timedelta | None = None, + future_regressors: list[int] | None = None, + past_regressors: list[int] | None = None, + regressors: list[int] | None = None, + max_forecast_horizon: str | timedelta | None = None, + forecast_frequency: str | timedelta | None = None, + probabilistic: bool | None = None, + ) -> str: + """Trigger a forecasting job for the given sensor. + + :param sensor_id: ID of the sensor to forecast. + :param start: Start date for predictions (ISO 8601 datetime string or datetime). + :param end: End date of the forecast period (ISO 8601 datetime string or datetime). + :param duration: Duration of the forecast period (ISO 8601 duration or timedelta). + Provide at most two of start, end, and duration. + :param train_start: Start date of historical training data. + :param train_period: Duration of the training period (min 2 days). + :param max_training_period: Maximum duration of the training period. Defaults to 1 year. + :param retrain_frequency: How often to retrain (ISO 8601 duration or timedelta). + :param future_regressors: List of sensor IDs used only as future regressors. + :param past_regressors: List of sensor IDs used only as past regressors. + :param regressors: List of sensor IDs used as both past and future regressors. + :param max_forecast_horizon: Maximum forecast horizon (ISO 8601 duration or timedelta). + :param forecast_frequency: How often to recompute forecasts (ISO 8601 duration or timedelta). + :param probabilistic: If True, enable probabilistic forecasting. + + :returns: Forecast job UUID (string). + + This function raises a ValueError when an unhandled status code is returned. + """ + json_payload: dict[str, Any] = {} + + if start is not None: + json_payload["start"] = pd.Timestamp(start).isoformat() + if end is not None: + json_payload["end"] = pd.Timestamp(end).isoformat() + if duration is not None: + json_payload["duration"] = pd.Timedelta(duration).isoformat() + if max_forecast_horizon is not None: + json_payload["max-forecast-horizon"] = pd.Timedelta( + max_forecast_horizon + ).isoformat() + if forecast_frequency is not None: + json_payload["forecast-frequency"] = pd.Timedelta( + forecast_frequency + ).isoformat() + if probabilistic is not None: + json_payload["probabilistic"] = probabilistic + + # Build config sub-dict for training and regressor parameters + config: dict[str, Any] = {} + if train_start is not None: + config["train-start"] = pd.Timestamp(train_start).isoformat() + if train_period is not None: + config["train-period"] = pd.Timedelta(train_period).isoformat() + if max_training_period is not None: + config["max-training-period"] = pd.Timedelta( + max_training_period + ).isoformat() + if retrain_frequency is not None: + config["retrain-frequency"] = pd.Timedelta(retrain_frequency).isoformat() + if future_regressors is not None: + config["future-regressors"] = future_regressors + if past_regressors is not None: + config["past-regressors"] = past_regressors + if regressors is not None: + config["regressors"] = regressors + if config: + json_payload["config"] = config + + response, status = await self.request( + uri=f"sensors/{sensor_id}/forecasts/trigger", + json_payload=json_payload, + method="POST", + minimum_server_version="0.31.0", + ) + check_for_status(status, 200) + + if not isinstance(response, dict): + raise ContentTypeError( + f"Expected a dictionary, but got {type(response)}", + ) + + if not isinstance(response.get("forecast"), str): + raise ContentTypeError( + f"Expected a forecast ID, but got {type(response.get('forecast'))}", + ) + forecast_id = response["forecast"] + self.logger.info(f"Forecast triggered successfully. Forecast ID: {forecast_id}") + return forecast_id + + async def get_forecast( + self, + sensor_id: int, + forecast_id: str, + ) -> dict: + """Get forecast with given ID. + + Polls the server until the forecasting job is complete and returns the result. + + :returns: forecast as dictionary, for example: + { + 'values': [1.2, 1.5, 1.4, 0.8], + 'start': '2025-10-15T00:00:00+01:00', + 'duration': 'PT4H', + 'unit': 'kW' + } + + This function raises a ValueError when an unhandled status code is returned. + """ + polling_step = 0 + try: + async with async_timeout.timeout(self.polling_timeout): + while polling_step < self.max_polling_steps: + forecast, status = await self.request( + uri=f"sensors/{sensor_id}/forecasts/{forecast_id}", + method="GET", + minimum_server_version="0.31.0", + ) + if status == 200: + if not isinstance(forecast, dict): + raise ContentTypeError( + f"Expected a forecast dictionary, but got {type(forecast)}", + ) + return forecast + elif status == 202: + job_status = ( + forecast.get("status", "unknown") + if isinstance(forecast, dict) + else "unknown" + ) + message = f"Forecast job status: {job_status}. Polling step: {polling_step}. Retrying in {self.polling_interval} seconds..." + self.logger.debug(message) + polling_step += 1 + await asyncio.sleep(self.polling_interval) + else: + check_for_status(status, 200) + except asyncio.TimeoutError as exception: + raise ConnectionError( + "Client polling timeout while waiting for forecast job to complete." + ) from exception + raise ConnectionError( + "Max polling steps reached while waiting for forecast job to complete." + ) + + async def trigger_and_get_forecast( + self, + sensor_id: int, + start: str | datetime | None = None, + end: str | datetime | None = None, + duration: str | timedelta | None = None, + train_start: str | datetime | None = None, + train_period: str | timedelta | None = None, + max_training_period: str | timedelta | None = None, + retrain_frequency: str | timedelta | None = None, + future_regressors: list[int] | None = None, + past_regressors: list[int] | None = None, + regressors: list[int] | None = None, + max_forecast_horizon: str | timedelta | None = None, + forecast_frequency: str | timedelta | None = None, + probabilistic: bool | None = None, + ) -> dict: + """Trigger a forecasting job and then fetch the result. + + :returns: forecast as dictionary, for example: + { + 'values': [1.2, 1.5, 1.4, 0.8], + 'start': '2025-10-15T00:00:00+01:00', + 'duration': 'PT4H', + 'unit': 'kW' + } + + This function raises a ValueError when an unhandled status code is returned. + """ + forecast_id = await self.trigger_forecast( + sensor_id=sensor_id, + start=start, + end=end, + duration=duration, + train_start=train_start, + train_period=train_period, + max_training_period=max_training_period, + retrain_frequency=retrain_frequency, + future_regressors=future_regressors, + past_regressors=past_regressors, + regressors=regressors, + max_forecast_horizon=max_forecast_horizon, + forecast_frequency=forecast_frequency, + probabilistic=probabilistic, + ) + return await self.get_forecast( + sensor_id=sensor_id, + forecast_id=forecast_id, + ) + @staticmethod def create_storage_flex_model( soc_unit: str, diff --git a/tests/test_client.py b/tests/test_client.py index adcea5e8..8fdf292f 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1016,3 +1016,192 @@ async def test_trigger_schedule_with_custom_scheduler() -> None: ) await flexmeasures_client.close() + + +@pytest.mark.asyncio +async def test_trigger_forecast() -> None: + """Test triggering a forecast with basic parameters.""" + with aioresponses() as m: + flexmeasures_client = FlexMeasuresClient( + email="test@test.test", password="test" + ) + flexmeasures_client.access_token = "test-token" + + sensor_id = 1 + m.post( + f"http://localhost:5000/api/v3_0/sensors/{sensor_id}/forecasts/trigger", + status=200, + payload={"forecast": "test-forecast-uuid"}, + ) + + forecast_id = await flexmeasures_client.trigger_forecast( + sensor_id=sensor_id, + start="2025-01-05T00:00:00+00:00", + end="2025-01-07T00:00:00+00:00", + ) + + assert forecast_id == "test-forecast-uuid" + + m.assert_called_once_with( + f"http://localhost:5000/api/v3_0/sensors/{sensor_id}/forecasts/trigger", + method="POST", + headers={"Content-Type": "application/json", "Authorization": "test-token"}, + json={ + "start": "2025-01-05T00:00:00+00:00", + "end": "2025-01-07T00:00:00+00:00", + }, + params=None, + ssl=False, + allow_redirects=False, + ) + + await flexmeasures_client.close() + + +@pytest.mark.asyncio +async def test_trigger_forecast_with_config() -> None: + """Test triggering a forecast with training config parameters.""" + with aioresponses() as m: + flexmeasures_client = FlexMeasuresClient( + email="test@test.test", password="test" + ) + flexmeasures_client.access_token = "test-token" + + sensor_id = 1 + m.post( + f"http://localhost:5000/api/v3_0/sensors/{sensor_id}/forecasts/trigger", + status=200, + payload={"forecast": "test-forecast-uuid"}, + ) + + forecast_id = await flexmeasures_client.trigger_forecast( + sensor_id=sensor_id, + start="2025-01-05T00:00:00+00:00", + end="2025-01-07T00:00:00+00:00", + train_start="2025-01-01T00:00:00+00:00", + retrain_frequency="PT24H", + future_regressors=[2, 3], + ) + + assert forecast_id == "test-forecast-uuid" + + m.assert_called_once_with( + f"http://localhost:5000/api/v3_0/sensors/{sensor_id}/forecasts/trigger", + method="POST", + headers={"Content-Type": "application/json", "Authorization": "test-token"}, + json={ + "start": "2025-01-05T00:00:00+00:00", + "end": "2025-01-07T00:00:00+00:00", + "config": { + "train-start": "2025-01-01T00:00:00+00:00", + "retrain-frequency": "P1DT0H0M0S", + "future-regressors": [2, 3], + }, + }, + params=None, + ssl=False, + allow_redirects=False, + ) + + await flexmeasures_client.close() + + +@pytest.mark.asyncio +async def test_get_forecast_polling() -> None: + """Test getting a forecast with polling (202 -> 202 -> 200).""" + sensor_id = 1 + forecast_id = "test-uuid" + url = f"http://localhost:5000/api/v3_0/sensors/{sensor_id}/forecasts/{forecast_id}" + + with aioresponses() as m: + # First call returns 202 (QUEUED) + m.get( + url=url, + status=202, + payload={"status": "QUEUED"}, + ) + # Second call returns 202 (STARTED) + m.get( + url=url, + status=202, + payload={"status": "STARTED"}, + ) + # Third call returns 200 (completed) + m.get( + url=url, + status=200, + payload={ + "values": [1.2, 1.5], + "start": "2025-01-05T00:00:00+00:00", + "duration": "PT2H", + "unit": "kW", + }, + ) + + flexmeasures_client = FlexMeasuresClient( + email="test@test.test", + password="test", + request_timeout=2, + polling_interval=0.2, + access_token="skip-auth", + ) + + forecast = await flexmeasures_client.get_forecast( + sensor_id=sensor_id, forecast_id=forecast_id + ) + + assert forecast["values"] == [1.2, 1.5] + assert forecast["start"] == "2025-01-05T00:00:00+00:00" + assert forecast["duration"] == "PT2H" + assert forecast["unit"] == "kW" + + await flexmeasures_client.close() + + +@pytest.mark.asyncio +async def test_trigger_and_get_forecast() -> None: + """Test triggering and getting a forecast in one call.""" + with aioresponses() as m: + flexmeasures_client = FlexMeasuresClient( + email="test@test.test", + password="test", + request_timeout=2, + polling_interval=0.2, + ) + flexmeasures_client.access_token = "test-token" + + sensor_id = 1 + forecast_uuid = "test-forecast-uuid" + + # Mock the trigger request + m.post( + f"http://localhost:5000/api/v3_0/sensors/{sensor_id}/forecasts/trigger", + status=200, + payload={"forecast": forecast_uuid}, + ) + + # Mock the get forecast request (with immediate 200 response) + m.get( + f"http://localhost:5000/api/v3_0/sensors/{sensor_id}/forecasts/{forecast_uuid}", + status=200, + payload={ + "values": [1.2, 1.5, 1.8], + "start": "2025-01-05T00:00:00+00:00", + "duration": "PT3H", + "unit": "kW", + }, + ) + + forecast = await flexmeasures_client.trigger_and_get_forecast( + sensor_id=sensor_id, + start="2025-01-05T00:00:00+00:00", + end="2025-01-07T00:00:00+00:00", + ) + + assert "values" in forecast + assert forecast["values"] == [1.2, 1.5, 1.8] + assert forecast["start"] == "2025-01-05T00:00:00+00:00" + assert forecast["duration"] == "PT3H" + assert forecast["unit"] == "kW" + + await flexmeasures_client.close()