Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,23 @@ This can be used to retrieve the schedule, using:
The client will re-try until the schedule is available or the ``MAX_POLLING_STEPS`` of ``10`` is reached.


Forecasting
===========

Trigger a forecast for a sensor and wait for the result:

.. code-block:: python

forecast = await client.trigger_and_get_forecast(
sensor_id=<sensor_id>, # int
duration="PT24H", # ISO duration – how far ahead to forecast
)
# Returns e.g. {"values": [1.2, 1.5, ...], "start": "...", "duration": "PT24H", "unit": "kW"}

The client polls until the forecasting job is complete. For more advanced options
(training window, regressors, forecast frequency, etc.) see :doc:`forecasting`.


Development
==============

Expand Down
147 changes: 147 additions & 0 deletions docs/forecasting.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
.. _forecasting:

Forecasting
===========

The FlexMeasures Client supports the forecasting API endpoints introduced in
FlexMeasures v0.31.0:

- ``POST /sensors/<id>/forecasts/trigger`` — queue a forecasting job
- ``GET /sensors/<id>/forecasts/<uuid>`` — poll for results

These are exposed through three client methods:

- :meth:`trigger_forecast` — trigger and return the job UUID
- :meth:`get_forecast` — poll until results are ready
- :meth:`trigger_and_get_forecast` — convenience wrapper for both

.. note::

These endpoints require a FlexMeasures server of version **0.31.0** or above.


Basic example
-------------

Forecast the next 24 hours for a sensor, using server-side defaults for the
training window:

.. code-block:: python

import asyncio
from flexmeasures_client import FlexMeasuresClient

async def main():
client = FlexMeasuresClient(
host="localhost:5000",
ssl=False,
email="user@example.com",
password="password",
)

forecast = await client.trigger_and_get_forecast(
sensor_id=1,
duration="PT24H",
)
print(forecast)
# e.g. {"values": [1.2, 1.5, 1.8, ...], "start": "...", "duration": "PT24H", "unit": "kW"}

await client.close()

asyncio.run(main())


Specifying a forecast window
-----------------------------

Use ``start`` and ``end`` (or ``start`` and ``duration``) to define the exact
period to forecast:

.. code-block:: python

forecast = await client.trigger_and_get_forecast(
sensor_id=1,
start="2025-01-15T00:00:00+01:00",
end="2025-01-17T00:00:00+01:00",
)


Controlling the training window
---------------------------------

Pass training parameters inside a nested structure via the ``train_start``,
``train_period``, and ``retrain_frequency`` keyword arguments:

.. code-block:: python

forecast = await client.trigger_and_get_forecast(
sensor_id=1,
start="2025-01-15T00:00:00+01:00",
duration="PT48H",
# Training configuration
train_start="2025-01-01T00:00:00+01:00", # historical data start
train_period="P14D", # use 14 days of history
retrain_frequency="PT24H", # retrain every 24 h
)


Using regressors
----------------

You can improve forecast accuracy by supplying regressor sensor IDs:

.. code-block:: python

forecast = await client.trigger_and_get_forecast(
sensor_id=1,
duration="PT24H",
# Sensors whose *forecasts* matter (e.g. weather forecasts)
future_regressors=[10, 11],
# Sensors whose *measurements* matter (e.g. price history)
past_regressors=[20],
)


Step-by-step usage
-------------------

Trigger and retrieve separately to handle the job UUID yourself:

.. code-block:: python

# Step 1 – enqueue the forecasting job
forecast_id = await client.trigger_forecast(
sensor_id=1,
start="2025-01-15T00:00:00+01:00",
end="2025-01-17T00:00:00+01:00",
)
print(f"Job queued: {forecast_id}")

# Step 2 – poll until the job finishes
forecast = await client.get_forecast(
sensor_id=1,
forecast_id=forecast_id,
)
print(forecast)


Polling behaviour
-----------------

``get_forecast`` polls the server with a ``GET`` request and returns when the
server responds with HTTP 200. The polling respects the same client-level
settings as scheduling:

- ``polling_interval`` (default 10 s) — time between retries
- ``polling_timeout`` (default 200 s) — maximum total wait time
- ``max_polling_steps`` (default 10) — maximum number of poll attempts

Override them at client construction time:

.. code-block:: python

client = FlexMeasuresClient(
...,
polling_interval=5.0, # check every 5 seconds
polling_timeout=300.0, # wait up to 5 minutes
)
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Contents
:maxdepth: 2

Overview <readme>
Forecasting <forecasting>
Contributions & Help <contributing>
License <license>
Authors <authors>
Expand Down
204 changes: 204 additions & 0 deletions src/flexmeasures_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1265,6 +1265,210 @@ async def trigger_schedule(
self.logger.info(f"Schedule triggered successfully. Schedule ID: {schedule_id}")
return schedule_id

async def trigger_forecast(
self,
sensor_id: int,
start: str | datetime | None = None,
end: str | datetime | None = None,
duration: str | timedelta | None = None,
train_start: str | datetime | None = None,
train_period: str | timedelta | None = None,
max_training_period: str | timedelta | None = None,
retrain_frequency: str | timedelta | None = None,
future_regressors: list[int] | None = None,
past_regressors: list[int] | None = None,
regressors: list[int] | None = None,
max_forecast_horizon: str | timedelta | None = None,
forecast_frequency: str | timedelta | None = None,
probabilistic: bool | None = None,
) -> str:
"""Trigger a forecasting job for the given sensor.

:param sensor_id: ID of the sensor to forecast.
:param start: Start date for predictions (ISO 8601 datetime string or datetime).
:param end: End date of the forecast period (ISO 8601 datetime string or datetime).
:param duration: Duration of the forecast period (ISO 8601 duration or timedelta).
Provide at most two of start, end, and duration.
:param train_start: Start date of historical training data.
:param train_period: Duration of the training period (min 2 days).
:param max_training_period: Maximum duration of the training period. Defaults to 1 year.
:param retrain_frequency: How often to retrain (ISO 8601 duration or timedelta).
:param future_regressors: List of sensor IDs used only as future regressors.
:param past_regressors: List of sensor IDs used only as past regressors.
:param regressors: List of sensor IDs used as both past and future regressors.
:param max_forecast_horizon: Maximum forecast horizon (ISO 8601 duration or timedelta).
:param forecast_frequency: How often to recompute forecasts (ISO 8601 duration or timedelta).
:param probabilistic: If True, enable probabilistic forecasting.

:returns: Forecast job UUID (string).

This function raises a ValueError when an unhandled status code is returned.
"""
json_payload: dict[str, Any] = {}

if start is not None:
json_payload["start"] = pd.Timestamp(start).isoformat()
if end is not None:
json_payload["end"] = pd.Timestamp(end).isoformat()
if duration is not None:
json_payload["duration"] = pd.Timedelta(duration).isoformat()
if max_forecast_horizon is not None:
json_payload["max-forecast-horizon"] = pd.Timedelta(
max_forecast_horizon
).isoformat()
if forecast_frequency is not None:
json_payload["forecast-frequency"] = pd.Timedelta(
forecast_frequency
).isoformat()
if probabilistic is not None:
json_payload["probabilistic"] = probabilistic

# Build config sub-dict for training and regressor parameters
config: dict[str, Any] = {}
if train_start is not None:
config["train-start"] = pd.Timestamp(train_start).isoformat()
if train_period is not None:
config["train-period"] = pd.Timedelta(train_period).isoformat()
if max_training_period is not None:
config["max-training-period"] = pd.Timedelta(
max_training_period
).isoformat()
if retrain_frequency is not None:
config["retrain-frequency"] = pd.Timedelta(retrain_frequency).isoformat()
if future_regressors is not None:
config["future-regressors"] = future_regressors
if past_regressors is not None:
config["past-regressors"] = past_regressors
if regressors is not None:
config["regressors"] = regressors
if config:
json_payload["config"] = config

response, status = await self.request(
uri=f"sensors/{sensor_id}/forecasts/trigger",
json_payload=json_payload,
method="POST",
minimum_server_version="0.31.0",
)
check_for_status(status, 200)

if not isinstance(response, dict):
raise ContentTypeError(
f"Expected a dictionary, but got {type(response)}",
)

if not isinstance(response.get("forecast"), str):
raise ContentTypeError(
f"Expected a forecast ID, but got {type(response.get('forecast'))}",
)
forecast_id = response["forecast"]
self.logger.info(f"Forecast triggered successfully. Forecast ID: {forecast_id}")
return forecast_id

async def get_forecast(
self,
sensor_id: int,
forecast_id: str,
) -> dict:
"""Get forecast with given ID.

Polls the server until the forecasting job is complete and returns the result.

:returns: forecast as dictionary, for example:
{
'values': [1.2, 1.5, 1.4, 0.8],
'start': '2025-10-15T00:00:00+01:00',
'duration': 'PT4H',
'unit': 'kW'
}

This function raises a ValueError when an unhandled status code is returned.
"""
polling_step = 0
try:
async with async_timeout.timeout(self.polling_timeout):
while polling_step < self.max_polling_steps:
forecast, status = await self.request(
uri=f"sensors/{sensor_id}/forecasts/{forecast_id}",
method="GET",
minimum_server_version="0.31.0",
)
if status == 200:
if not isinstance(forecast, dict):
raise ContentTypeError(
f"Expected a forecast dictionary, but got {type(forecast)}",
)
return forecast
elif status == 202:
job_status = (
forecast.get("status", "unknown")
if isinstance(forecast, dict)
else "unknown"
)
message = f"Forecast job status: {job_status}. Polling step: {polling_step}. Retrying in {self.polling_interval} seconds..."
self.logger.debug(message)
polling_step += 1
await asyncio.sleep(self.polling_interval)
else:
check_for_status(status, 200)
except asyncio.TimeoutError as exception:
raise ConnectionError(
"Client polling timeout while waiting for forecast job to complete."
) from exception
raise ConnectionError(
"Max polling steps reached while waiting for forecast job to complete."
)

async def trigger_and_get_forecast(
self,
sensor_id: int,
start: str | datetime | None = None,
end: str | datetime | None = None,
duration: str | timedelta | None = None,
train_start: str | datetime | None = None,
train_period: str | timedelta | None = None,
max_training_period: str | timedelta | None = None,
retrain_frequency: str | timedelta | None = None,
future_regressors: list[int] | None = None,
past_regressors: list[int] | None = None,
regressors: list[int] | None = None,
max_forecast_horizon: str | timedelta | None = None,
forecast_frequency: str | timedelta | None = None,
probabilistic: bool | None = None,
) -> dict:
"""Trigger a forecasting job and then fetch the result.

:returns: forecast as dictionary, for example:
{
'values': [1.2, 1.5, 1.4, 0.8],
'start': '2025-10-15T00:00:00+01:00',
'duration': 'PT4H',
'unit': 'kW'
}

This function raises a ValueError when an unhandled status code is returned.
"""
forecast_id = await self.trigger_forecast(
sensor_id=sensor_id,
start=start,
end=end,
duration=duration,
train_start=train_start,
train_period=train_period,
max_training_period=max_training_period,
retrain_frequency=retrain_frequency,
future_regressors=future_regressors,
past_regressors=past_regressors,
regressors=regressors,
max_forecast_horizon=max_forecast_horizon,
forecast_frequency=forecast_frequency,
probabilistic=probabilistic,
)
return await self.get_forecast(
sensor_id=sensor_id,
forecast_id=forecast_id,
)

@staticmethod
def create_storage_flex_model(
soc_unit: str,
Expand Down
Loading
Loading