Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
da50f4b
Feat: add endpoint to trigger forecasting job for a sensor
BelhsanHmida Nov 16, 2025
b6969cf
chore: update tags for trigger_forecast to sensors
BelhsanHmida Nov 16, 2025
72a9d7c
style: run pre-commit.
BelhsanHmida Nov 16, 2025
9b8404b
chore: remove unused imports
BelhsanHmida Nov 16, 2025
2ea181a
Feat: add source parameter to forecaster instantiation in SensorAPI
BelhsanHmida Nov 19, 2025
b1f10cf
Feat: allow dynamic model selection for forecaster instantiation in S…
BelhsanHmida Nov 19, 2025
ac51a25
Feat: return bdf from prediction pipeline upon successful completion
BelhsanHmida Nov 19, 2025
dc309b1
Feat: return forecasts from run_cycle in TrainPredictPipeline
BelhsanHmida Nov 19, 2025
5b78fd0
Feat: return forecasts list from run_cycle in TrainPredictPipeline
BelhsanHmida Nov 19, 2025
23b4f4e
fix: correctly pass as_job param in forecasting parameters
BelhsanHmida Nov 19, 2025
41476e1
Feat: return forecasts in dict forma with data and sensor fields
BelhsanHmida Nov 19, 2025
17707c3
Feat: initialize jobs list before while loop so we can store all jobs…
BelhsanHmida Nov 19, 2025
bfe774d
Feat: initialize return_values to store forecasts and jobs in TrainPr…
BelhsanHmida Nov 19, 2025
6d30a53
Feat: append forecasts and sensor data to return_values in run_cycle
BelhsanHmida Nov 19, 2025
280e8f7
Feat: remove unused forecasts_list and jobs variables in TrainPredict…
BelhsanHmida Nov 19, 2025
e3191c5
Feat: associate jobs with corresponding forecast entries in TrainPred…
BelhsanHmida Nov 19, 2025
cf29210
Feat: return combined forecasts and jobs from run method in TrainPred…
BelhsanHmida Nov 19, 2025
5fa4161
Feat: return results from _compute_forecast method in TrainPredictPip…
BelhsanHmida Nov 19, 2025
d797a3f
Feat: append job entries to return_values in TrainPredictPipeline
BelhsanHmida Nov 19, 2025
872db8e
Feat: store job IDs in return_values for corresponding forecast entri…
BelhsanHmida Nov 19, 2025
9f878f9
Fix: conditionally assign sensors and source in compute method. this …
BelhsanHmida Nov 19, 2025
c5cbb3f
Feat: return multiple job IDs from forecasting computation in SensorAPI
BelhsanHmida Nov 19, 2025
bfe24be
Feat: conditionally check event resolution in Forecaster when not run…
BelhsanHmida Nov 19, 2025
19b488b
style: run pre-commit
BelhsanHmida Nov 19, 2025
ac3f633
Merge branch 'main' into feat/complete-forecasting-pipeline-return
BelhsanHmida Nov 30, 2025
8df1714
revert: return default value for consumption_is_positive in sensor po…
BelhsanHmida Dec 1, 2025
908db72
Merge branch 'main' into feat/complete-forecasting-pipeline-return
BelhsanHmida Dec 1, 2025
938882e
revert: remove trigger forecasting endpoint that's to be handled in t…
BelhsanHmida Dec 3, 2025
7b976f9
feat: store result of compute method in pipeline_returns for further …
BelhsanHmida Dec 3, 2025
b171e75
feat: add assertion to ensure pipeline returns a list of dict
BelhsanHmida Dec 3, 2025
77d5f19
test: validate pipeline return in case of pipeline run as_job or not
BelhsanHmida Dec 3, 2025
2a8665e
style: run pre-commit
BelhsanHmida Dec 3, 2025
893a9cd
feat: enhance assertion message for pipeline return validation
BelhsanHmida Dec 4, 2025
dfceb2d
feat: improve data comparison in train predict pipeline test with pan…
BelhsanHmida Dec 4, 2025
9dc53d0
style: run pre-commit
BelhsanHmida Dec 4, 2025
a6d77a6
Merge branch 'main' into feat/complete-forecasting-pipeline-return
BelhsanHmida Dec 4, 2025
3fcaa44
style: ignore complexity warning for test_train_predict_pipeline func…
BelhsanHmida Dec 4, 2025
a540585
Update flexmeasures/data/models/forecasting/pipelines/train_predict.py
BelhsanHmida Dec 4, 2025
fa48ef2
Update flexmeasures/data/tests/test_train_predict_pipeline.py
BelhsanHmida Dec 5, 2025
e34f4ac
chore: annotate run() with BeliefsDataFrame return type
BelhsanHmida Dec 5, 2025
82ac0be
test: remove try/except around DataFrame comparison to show original …
BelhsanHmida Dec 5, 2025
3e1f5fa
test: enhance job existence and completion checks in train_predict_pi…
BelhsanHmida Dec 5, 2025
8da32b8
cli: enhance train_predict_pipeline to show detailed success messages
BelhsanHmida Dec 5, 2025
1b3e500
style: run pre-commit
BelhsanHmida Dec 5, 2025
c29c870
docs: update CLI changelog with TrainPredictPipeline return values fe…
BelhsanHmida Dec 5, 2025
1790ffc
Revert "docs: update CLI changelog with TrainPredictPipeline return v…
BelhsanHmida Dec 6, 2025
7e1671c
docs: update changelog to include improved success and error messages…
BelhsanHmida Dec 6, 2025
9ca8e59
docs: fix formatting of changelog entry
BelhsanHmida Dec 6, 2025
d420295
docs: add _ to github pr link
BelhsanHmida Dec 6, 2025
d3638be
Merge branch 'main' into feat/complete-forecasting-pipeline-return
BelhsanHmida Dec 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions documentation/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ v0.30.0 | December 02, 2025

New features
-------------
* Improve ``flexmeasures add forecasts`` CLI command with clearer success and error messages when creating jobs or beliefs [see `PR #1822 <https://github.com/FlexMeasures/flexmeasures/pull/1822>`_].
* Major overhaul of ``flexmeasures add forecasts`` (use the ``--help`` option to learn more) [see `PR #1546 <https://github.com/FlexMeasures/flexmeasures/pull/1546>`_, `PR #1744 <https://github.com/FlexMeasures/flexmeasures/pull/1744>`_ and `PR #1834 <https://github.com/FlexMeasures/flexmeasures/pull/1834>`_]
* The new ``commitments`` field in the ``flex-context`` can be used to specify previous commitments (e.g. market positions) that the scheduler needs to take into account [see `PR #1754 <https://github.com/FlexMeasures/flexmeasures/pull/1754>`_ and `PR #1793 <https://github.com/FlexMeasures/flexmeasures/pull/1793>`_]
* Many more field descriptions in Swagger, including flex-model and flex-context fields [see `PR #1777 <https://www.github.com/FlexMeasures/flexmeasures/pull/1777>`_ and `PR #1841 <https://www.github.com/FlexMeasures/flexmeasures/pull/1841>`_]
Expand Down
1 change: 0 additions & 1 deletion flexmeasures/api/v3_0/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
from flexmeasures.utils.time_utils import duration_isoformat
from flexmeasures.utils.flexmeasures_inflection import join_words_into_a_list


# Instantiate schemes outside of endpoint logic to minimize response time
sensors_schema = SensorSchema(many=True)
sensor_schema = SensorSchema()
Expand Down
24 changes: 22 additions & 2 deletions flexmeasures/cli/data_add.py
Original file line number Diff line number Diff line change
Expand Up @@ -1249,8 +1249,28 @@ def train_predict_pipeline(
)

try:
forecaster.compute(parameters=parameters)
click.secho("Successfully computed forecasts.", **MsgStyle.SUCCESS)
pipeline_returns = forecaster.compute(parameters=parameters)

# Empty result
if not pipeline_returns:
click.secho("No forecasts or jobs were created.", **MsgStyle.ERROR)
return

# as_job case → list of job dicts like {"job-1": "<uuid>"}
if parameters.get("as_job"):
n_jobs = len(pipeline_returns)
click.secho(f"Created {n_jobs} forecasting job(s).", **MsgStyle.SUCCESS)
return

# direct computation: list of dicts containing BeliefsDataFrames
total_beliefs = sum(len(item["data"]) for item in pipeline_returns)
unique_belief_times = {
ts for item in pipeline_returns for ts in item["data"].belief_time.unique()
}
click.secho(
f"Successfully created {total_beliefs} forecast beliefs across {len(unique_belief_times)} unique belief times.",
**MsgStyle.SUCCESS,
)

except Exception as e:
click.echo(f"Error running Train-Predict Pipeline: {str(e)}")
Expand Down
4 changes: 3 additions & 1 deletion flexmeasures/data/models/data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ def compute(self, parameters: dict | None = None, **kwargs) -> list[dict[str, An
self._parameters = self._parameters_schema.load(self._parameters)

results = self._compute(**self._parameters)
results = self._assign_sensors_and_source(results)

if not self._parameters.get("as_job", False):
results = self._assign_sensors_and_source(results)
return results

def _assign_sensors_and_source(
Expand Down
11 changes: 6 additions & 5 deletions flexmeasures/data/models/forecasting/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,12 @@ def _compute(self, check_output_resolution=True, **kwargs) -> list[dict[str, Any

results = self._compute_forecast(**kwargs)

for result in results:
# checking that the event_resolution of the output BeliefDataFrame is equal to the one of the output sensor
assert not check_output_resolution or (
result["sensor"].event_resolution == result["data"].event_resolution
), f"The resolution of the results ({result['data'].event_resolution}) should match that of the output sensor ({result['sensor'].event_resolution}, ID {result['sensor'].id})."
if not kwargs.get("as_job", False):
for result in results:
# checking that the event_resolution of the output BeliefDataFrame is equal to the one of the output sensor
assert not check_output_resolution or (
result["sensor"].event_resolution == result["data"].event_resolution
), f"The resolution of the results ({result['data'].event_resolution}) should match that of the output sensor ({result['sensor'].event_resolution}, ID {result['sensor'].id})."

return results

Expand Down
5 changes: 4 additions & 1 deletion flexmeasures/data/models/forecasting/pipelines/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pandas as pd
from darts import TimeSeries
from isodate import duration_isoformat
from timely_beliefs import BeliefsDataFrame

from flexmeasures import Sensor, Source
from flexmeasures.data import db
Expand Down Expand Up @@ -267,7 +268,7 @@ def save_results_to_CSV(self, df_pred: pd.DataFrame):
except Exception as e:
raise CustomException(f"Error saving predictions: {e}", sys) from e

def run(self, delete_model: bool = False):
def run(self, delete_model: bool = False) -> BeliefsDataFrame:
"""
Execute the prediction pipeline.
"""
Expand Down Expand Up @@ -315,5 +316,7 @@ def run(self, delete_model: bool = False):
os.remove(self.model_path)

logging.info("Prediction pipeline completed successfully.")

return bdf
except Exception as e:
raise CustomException(f"Error running pipeline: {e}", sys) from e
20 changes: 11 additions & 9 deletions flexmeasures/data/models/forecasting/pipelines/train_predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(
for k, v in self._config.items():
setattr(self, k, v)
self.delete_model = delete_model
self.return_values = [] # To store forecasts and jobs

def run_cycle(
self,
Expand Down Expand Up @@ -121,7 +122,7 @@ def run_cycle(
f"Prediction cycle from {predict_start} to {predict_end} started ..."
)
predict_start_time = time.time()
predict_pipeline.run(delete_model=self.delete_model)
forecasts = predict_pipeline.run(delete_model=self.delete_model)
predict_runtime = time.time() - predict_start_time
logging.info(
f"{p.ordinal(counter)} Prediction cycle completed in {predict_runtime:.2f} seconds. "
Expand All @@ -133,14 +134,14 @@ def run_cycle(
logging.info(
f"{p.ordinal(counter)} Train-Predict cycle from {train_start} to {predict_end} completed in {total_runtime:.2f} seconds."
)

self.return_values.append(
{"data": forecasts, "sensor": self._parameters["target"]}
)
return total_runtime

def _compute_forecast(self, **kwargs) -> list[dict[str, Any]]:
# Run the train-and-predict pipeline
self.run(**kwargs)
# todo: return results
return []
return self.run(**kwargs)

def run(
self,
Expand Down Expand Up @@ -209,8 +210,7 @@ def run(
)

if as_job:
jobs = []
for param in cycles_job_params:
for index, param in enumerate(cycles_job_params):
job = Job.create(
self.run_cycle,
kwargs={**param, **job_kwargs},
Expand All @@ -222,7 +222,8 @@ def run(
),
)

jobs.append(job)
# Store the job ID for this cycle
self.return_values.append({f"job-{index}": job.id})

current_app.queues[queue].enqueue_job(job)
current_app.job_cache.add(
Expand All @@ -231,7 +232,8 @@ def run(
queue=queue,
asset_or_sensor_type="sensor",
)
return jobs

return self.return_values
except Exception as e:
raise CustomException(
f"Error running Train-Predict Pipeline: {e}", sys
Expand Down
37 changes: 34 additions & 3 deletions flexmeasures/data/tests/test_train_predict_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest

import logging

import pandas as pd
from datetime import timedelta

from marshmallow import ValidationError
Expand Down Expand Up @@ -133,7 +133,7 @@
# )
],
)
def test_train_predict_pipeline(
def test_train_predict_pipeline( # noqa: C901
app,
setup_fresh_test_forecast_data,
config, # config passed to the Forecaster
Expand Down Expand Up @@ -172,7 +172,7 @@ def test_train_predict_pipeline(
assert expected_error[1] in str(e_info)
else:
pipeline = TrainPredictPipeline(config=config)
pipeline.compute(parameters=params)
pipeline_returns = pipeline.compute(parameters=params)

# Check pipeline properties
for attr in ("model",):
Expand Down Expand Up @@ -203,6 +203,37 @@ def test_train_predict_pipeline(
source
), "string representation of the Forecaster (DataSource) should mention the used model"

assert (
isinstance(pipeline_returns, list) and len(pipeline_returns) > 0
), "pipeline should return a non-empty list"
assert all(
isinstance(item, dict) for item in pipeline_returns
), "each item should be a dict"
for index, pipeline_return in enumerate(pipeline_returns):
if not dg_params["as_job"]:
assert {"data", "sensor"}.issubset(
pipeline_return.keys()
), "returned dict should have data and sensor keys"
assert (
pipeline_return["sensor"].id == dg_params["sensor_to_save"].id
), "returned sensor should match sensor that forecasts will be saved into"
pd.testing.assert_frame_equal(
forecasts.sort_index(),
pipeline_return["data"].sort_index(),
)
else:
job_id = pipeline_return[f"job-{index}"]

# Check the job exists in the queue or registries
job = app.queues["forecasting"].fetch_job(job_id)
assert job is not None, f"Job {job_id} should exist"

# Check it's finished
finished_jobs = app.queues["forecasting"].finished_job_registry
assert (
job_id in finished_jobs
), f"Job {job_id} should be in the finished registry"

# Check DataGenerator configuration stored under DataSource attributes
data_generator_config = source.attributes["data_generator"]["config"]
assert data_generator_config["model"] == "CustomLGBM"
Expand Down