From da50f4b904fed00338521a3772cbb6bc95fee527 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Sun, 16 Nov 2025 23:02:10 +0100 Subject: [PATCH 01/46] Feat: add endpoint to trigger forecasting job for a sensor Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/api/v3_0/sensors.py | 91 ++++++++++++++++++++++++++++++-- 1 file changed, 88 insertions(+), 3 deletions(-) diff --git a/flexmeasures/api/v3_0/sensors.py b/flexmeasures/api/v3_0/sensors.py index 7499c7da69..6bb5234872 100644 --- a/flexmeasures/api/v3_0/sensors.py +++ b/flexmeasures/api/v3_0/sensors.py @@ -62,7 +62,10 @@ ) from flexmeasures.utils.time_utils import duration_isoformat from flexmeasures.utils.flexmeasures_inflection import join_words_into_a_list - +from flexmeasures.data.schemas.forecasting.pipeline import ( + ForecasterParametersSchema, + TrainPredictPipelineConfigSchema, +) # Instantiate schemes outside of endpoint logic to minimize response time sensors_schema = SensorSchema(many=True) @@ -940,8 +943,8 @@ def get_schedule( # noqa: C901 ) sign = 1 - if sensor.measures_power and sensor.get_attribute( - "consumption_is_positive", True + if sensor.measures_power and not sensor.get_attribute( + "consumption_is_positive", False ): sign = -1 @@ -1395,3 +1398,85 @@ def get_status(self, id, sensor): status_data = serialize_sensor_status_data(sensor=sensor) return {"sensors_data": status_data}, 200 + + @route("//forecasts/trigger", methods=["POST"]) + @use_kwargs({"sensor": SensorIdField()}, location="path") + @permission_required_for_context("read", ctx_arg_name="sensor") + def trigger_forecast(self, sensor: Sensor): + """ + .. :quickref: Forecasts; Trigger forecasting job for one sensor + --- + post: + summary: Trigger forecasting job for one sensor + description: | + Launch a forecasting pipeline for the given sensor asynchronously. + This reuses the same validation as the CLI command `flexmeasures add forecasts`. + + Example: + ``` + { + "sensor": 2092, + "end_date": "2025-10-15T00:00:00+01:00", + "train_period": "P7D", + "retrain_frequency": "PT24H", + "max_forecast_horizon": "PT24H", + "model_save_dir": "flexmeasures/data/models/forecasting/artifacts/models", + "probabilistic": false + } + ``` + responses: + 200: + description: Forecasting job queued + content: + application/json: + example: + status: "PROCESSED" + forecast_job: "b3d26a8a-7a43-4a9f-93e1-fc2a869ea97b" + message: "Forecasting job has been queued." + tags: + - Forecasts + """ + + from flask import request + from marshmallow import ValidationError + from flexmeasures.data.models.forecasting import Forecaster + from flexmeasures.cli.utils import get_data_generator + from flexmeasures.api.common.responses import invalid_flex_config, request_processed + + try: + # Load and validate JSON payload + parameters = request.get_json() + + # Ensure the forecast is run as a job on a forecasting queue + parameters['as_job'] = True + + # Instantiate the forecaster + forecaster = get_data_generator( + source=None, + model="TrainPredictPipeline", + config={}, + save_config=True, + data_generator_type=Forecaster, + ) + + # Queue forecasting job + result = forecaster.compute(parameters=parameters) + + # Extract job ID (UUID) + job_id = getattr(result, "id", None) + + # Commit DB transaction + db.session.commit() + + # Prepare response + response = dict( + forecast_job=job_id, + ) + d, s = request_processed() + return dict(**response, **d), s + + except ValidationError as e: + return invalid_flex_config(e.messages) + except Exception as e: + current_app.logger.exception("Forecast job failed to enqueue.") + return invalid_flex_config(str(e)) From b6969cfa8db645e1ce45c3974fa0317b02be16b6 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Sun, 16 Nov 2025 23:03:57 +0100 Subject: [PATCH 02/46] chore: update tags for trigger_forecast to sensors Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/api/v3_0/sensors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/api/v3_0/sensors.py b/flexmeasures/api/v3_0/sensors.py index 6bb5234872..d12734c290 100644 --- a/flexmeasures/api/v3_0/sensors.py +++ b/flexmeasures/api/v3_0/sensors.py @@ -1434,7 +1434,7 @@ def trigger_forecast(self, sensor: Sensor): forecast_job: "b3d26a8a-7a43-4a9f-93e1-fc2a869ea97b" message: "Forecasting job has been queued." tags: - - Forecasts + - Sensors """ from flask import request From 72a9d7c82ad17f3a00c455800e046e3e48cd4c09 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Sun, 16 Nov 2025 23:19:56 +0100 Subject: [PATCH 03/46] style: run pre-commit. Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/api/v3_0/sensors.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/flexmeasures/api/v3_0/sensors.py b/flexmeasures/api/v3_0/sensors.py index d12734c290..2676c2cd72 100644 --- a/flexmeasures/api/v3_0/sensors.py +++ b/flexmeasures/api/v3_0/sensors.py @@ -1441,14 +1441,17 @@ def trigger_forecast(self, sensor: Sensor): from marshmallow import ValidationError from flexmeasures.data.models.forecasting import Forecaster from flexmeasures.cli.utils import get_data_generator - from flexmeasures.api.common.responses import invalid_flex_config, request_processed + from flexmeasures.api.common.responses import ( + invalid_flex_config, + request_processed, + ) try: # Load and validate JSON payload parameters = request.get_json() # Ensure the forecast is run as a job on a forecasting queue - parameters['as_job'] = True + parameters["as_job"] = True # Instantiate the forecaster forecaster = get_data_generator( From 9b8404befff53ed6c2d3327fc5959c05af405bc1 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Sun, 16 Nov 2025 23:28:59 +0100 Subject: [PATCH 04/46] chore: remove unused imports Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/api/v3_0/sensors.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/flexmeasures/api/v3_0/sensors.py b/flexmeasures/api/v3_0/sensors.py index 2676c2cd72..98cce65330 100644 --- a/flexmeasures/api/v3_0/sensors.py +++ b/flexmeasures/api/v3_0/sensors.py @@ -62,10 +62,6 @@ ) from flexmeasures.utils.time_utils import duration_isoformat from flexmeasures.utils.flexmeasures_inflection import join_words_into_a_list -from flexmeasures.data.schemas.forecasting.pipeline import ( - ForecasterParametersSchema, - TrainPredictPipelineConfigSchema, -) # Instantiate schemes outside of endpoint logic to minimize response time sensors_schema = SensorSchema(many=True) From 2ea181a883e97c91ef188e2fc992767ee0a52391 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 19 Nov 2025 02:24:10 +0100 Subject: [PATCH 05/46] Feat: add source parameter to forecaster instantiation in SensorAPI Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/api/v3_0/sensors.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flexmeasures/api/v3_0/sensors.py b/flexmeasures/api/v3_0/sensors.py index 98cce65330..e676c5f7fd 100644 --- a/flexmeasures/api/v3_0/sensors.py +++ b/flexmeasures/api/v3_0/sensors.py @@ -1449,9 +1449,12 @@ def trigger_forecast(self, sensor: Sensor): # Ensure the forecast is run as a job on a forecasting queue parameters["as_job"] = True + # Set up forecaster source + source = parameters.pop("source", None) + # Instantiate the forecaster forecaster = get_data_generator( - source=None, + source=source, model="TrainPredictPipeline", config={}, save_config=True, From b1f10cf7e44a34ba1993157b7fbeebfe4373d285 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 19 Nov 2025 02:24:55 +0100 Subject: [PATCH 06/46] Feat: allow dynamic model selection for forecaster instantiation in SensorAPI if model passed in as field in api call Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/api/v3_0/sensors.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flexmeasures/api/v3_0/sensors.py b/flexmeasures/api/v3_0/sensors.py index e676c5f7fd..6f0ea8226d 100644 --- a/flexmeasures/api/v3_0/sensors.py +++ b/flexmeasures/api/v3_0/sensors.py @@ -1452,10 +1452,13 @@ def trigger_forecast(self, sensor: Sensor): # Set up forecaster source source = parameters.pop("source", None) + # Set forecaster model + model = parameters.pop("model", "TrainPredictPipeline") + # Instantiate the forecaster forecaster = get_data_generator( source=source, - model="TrainPredictPipeline", + model=model, config={}, save_config=True, data_generator_type=Forecaster, From ac51a259adf117210364519131d41dc3c9a4eea3 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 19 Nov 2025 02:35:44 +0100 Subject: [PATCH 07/46] Feat: return bdf from prediction pipeline upon successful completion Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/models/forecasting/pipelines/predict.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flexmeasures/data/models/forecasting/pipelines/predict.py b/flexmeasures/data/models/forecasting/pipelines/predict.py index cd7103a5c3..92f3ff3b76 100644 --- a/flexmeasures/data/models/forecasting/pipelines/predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/predict.py @@ -315,5 +315,7 @@ def run(self, delete_model: bool = False): os.remove(self.model_path) logging.info("Prediction pipeline completed successfully.") + + return bdf except Exception as e: raise CustomException(f"Error running pipeline: {e}", sys) from e From dc309b157cec169c34fd083c16413b0d94f5bf8e Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 19 Nov 2025 02:36:37 +0100 Subject: [PATCH 08/46] Feat: return forecasts from run_cycle in TrainPredictPipeline Signed-off-by: Mohamed Belhsan Hmida --- .../data/models/forecasting/pipelines/train_predict.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index cf11b7039b..a0344551c5 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -121,7 +121,7 @@ def run_cycle( f"Prediction cycle from {predict_start} to {predict_end} started ..." ) predict_start_time = time.time() - predict_pipeline.run(delete_model=self.delete_model) + forecasts = predict_pipeline.run(delete_model=self.delete_model) predict_runtime = time.time() - predict_start_time logging.info( f"{p.ordinal(counter)} Prediction cycle completed in {predict_runtime:.2f} seconds. " @@ -134,7 +134,7 @@ def run_cycle( f"{p.ordinal(counter)} Train-Predict cycle from {train_start} to {predict_end} completed in {total_runtime:.2f} seconds." ) - return total_runtime + return total_runtime, forecasts def _compute_forecast(self, **kwargs) -> list[dict[str, Any]]: # Run the train-and-predict pipeline @@ -170,6 +170,7 @@ def run( cumulative_cycles_runtime = 0 # To track the cumulative runtime of TrainPredictPipeline cycles when not running as a job. cycles_job_params = [] + forecasts_list = [] while predict_end <= self._parameters["end_date"]: counter += 1 @@ -183,7 +184,8 @@ def run( } if not as_job: - cycle_runtime = self.run_cycle(**train_predict_params) + cycle_runtime, forecasts = self.run_cycle(**train_predict_params) + forecasts_list.append(forecasts) cumulative_cycles_runtime += cycle_runtime else: train_predict_params["target_sensor_id"] = self._parameters[ From 5b78fd051f9b4f9c3381d60795ebb2222d30be2f Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 19 Nov 2025 02:37:35 +0100 Subject: [PATCH 09/46] Feat: return forecasts list from run_cycle in TrainPredictPipeline Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/models/forecasting/pipelines/train_predict.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index a0344551c5..15a8b2faea 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -234,6 +234,7 @@ def run( asset_or_sensor_type="sensor", ) return jobs + return forecasts_list except Exception as e: raise CustomException( f"Error running Train-Predict Pipeline: {e}", sys From 23b4f4e1dd2c1e2b685a99cbc7340d15e4fa607a Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 19 Nov 2025 03:18:31 +0100 Subject: [PATCH 10/46] fix: correctly pass as_job param in forecasting parameters Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/cli/data_add.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flexmeasures/cli/data_add.py b/flexmeasures/cli/data_add.py index 78adf02f53..7e395767a6 100755 --- a/flexmeasures/cli/data_add.py +++ b/flexmeasures/cli/data_add.py @@ -1170,7 +1170,6 @@ def add_holidays( ) @with_appcontext def train_predict_pipeline( - as_job, forecaster_class: str, source: DataSource | None = None, config_file: TextIOBase | None = None, @@ -1249,7 +1248,7 @@ def train_predict_pipeline( ) try: - forecaster.compute(as_job=as_job, parameters=parameters) + forecaster.compute(parameters=parameters) except Exception as e: click.echo(f"Error running Train-Predict Pipeline: {str(e)}") From 41476e1632b9be59699092886d0c2e3ada3c838b Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 19 Nov 2025 03:25:13 +0100 Subject: [PATCH 11/46] Feat: return forecasts in dict forma with data and sensor fields Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/models/forecasting/pipelines/train_predict.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 15a8b2faea..78e3ccd91f 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -185,7 +185,7 @@ def run( if not as_job: cycle_runtime, forecasts = self.run_cycle(**train_predict_params) - forecasts_list.append(forecasts) + forecasts_list.append({"data": forecasts, "sensor": self._parameters["target"]}) cumulative_cycles_runtime += cycle_runtime else: train_predict_params["target_sensor_id"] = self._parameters[ From 17707c397f718de62e773b37622a120ccbf195da Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 19 Nov 2025 03:25:52 +0100 Subject: [PATCH 12/46] Feat: initialize jobs list before while loop so we can store all jobs created Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/models/forecasting/pipelines/train_predict.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 78e3ccd91f..f61b9b78ff 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -171,6 +171,7 @@ def run( cumulative_cycles_runtime = 0 # To track the cumulative runtime of TrainPredictPipeline cycles when not running as a job. cycles_job_params = [] forecasts_list = [] + jobs = [] while predict_end <= self._parameters["end_date"]: counter += 1 @@ -211,7 +212,6 @@ def run( ) if as_job: - jobs = [] for param in cycles_job_params: job = Job.create( self.run_cycle, From bfe774d3af85f497b46813ae4c134d09a48b73a6 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 19 Nov 2025 03:43:56 +0100 Subject: [PATCH 13/46] Feat: initialize return_values to store forecasts and jobs in TrainPredictPipeline Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/models/forecasting/pipelines/train_predict.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index f61b9b78ff..654ea3e749 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -44,6 +44,7 @@ def __init__( for k, v in self._config.items(): setattr(self, k, v) self.delete_model = delete_model + self.return_values = [] # To store forecasts and jobs def run_cycle( self, From 6d30a532755ebc7b7844d402a4a7d0c2c74d0165 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 19 Nov 2025 03:44:26 +0100 Subject: [PATCH 14/46] Feat: append forecasts and sensor data to return_values in run_cycle Signed-off-by: Mohamed Belhsan Hmida --- .../data/models/forecasting/pipelines/train_predict.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 654ea3e749..2db8624dcf 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -134,8 +134,8 @@ def run_cycle( logging.info( f"{p.ordinal(counter)} Train-Predict cycle from {train_start} to {predict_end} completed in {total_runtime:.2f} seconds." ) - - return total_runtime, forecasts + self.return_values.append({"data": forecasts, "sensor": self._parameters["target"]}) + return total_runtime def _compute_forecast(self, **kwargs) -> list[dict[str, Any]]: # Run the train-and-predict pipeline From 280e8f727f3de628678165f5057fc21106568e9b Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 19 Nov 2025 03:44:52 +0100 Subject: [PATCH 15/46] Feat: remove unused forecasts_list and jobs variables in TrainPredictPipeline Signed-off-by: Mohamed Belhsan Hmida --- .../data/models/forecasting/pipelines/train_predict.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 2db8624dcf..dac5c73005 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -171,8 +171,6 @@ def run( cumulative_cycles_runtime = 0 # To track the cumulative runtime of TrainPredictPipeline cycles when not running as a job. cycles_job_params = [] - forecasts_list = [] - jobs = [] while predict_end <= self._parameters["end_date"]: counter += 1 @@ -186,8 +184,7 @@ def run( } if not as_job: - cycle_runtime, forecasts = self.run_cycle(**train_predict_params) - forecasts_list.append({"data": forecasts, "sensor": self._parameters["target"]}) + cycle_runtime = self.run_cycle(**train_predict_params) cumulative_cycles_runtime += cycle_runtime else: train_predict_params["target_sensor_id"] = self._parameters[ From e3191c5a995856977bbec729eb0d78c8fbd49766 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 19 Nov 2025 03:45:12 +0100 Subject: [PATCH 16/46] Feat: associate jobs with corresponding forecast entries in TrainPredictPipeline Signed-off-by: Mohamed Belhsan Hmida --- .../data/models/forecasting/pipelines/train_predict.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index dac5c73005..f7707c4e0d 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -210,7 +210,7 @@ def run( ) if as_job: - for param in cycles_job_params: + for index, param in enumerate(cycles_job_params): job = Job.create( self.run_cycle, kwargs={**param, **job_kwargs}, @@ -222,7 +222,9 @@ def run( ), ) - jobs.append(job) + # Attach job to the corresponding forecast entry + if self.return_values: + self.return_values[index]['job'] = job current_app.queues[queue].enqueue_job(job) current_app.job_cache.add( From cf2921096e4c68318a872d3e6b67a853224ac269 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 19 Nov 2025 03:45:30 +0100 Subject: [PATCH 17/46] Feat: return combined forecasts and jobs from run method in TrainPredictPipeline Signed-off-by: Mohamed Belhsan Hmida --- .../data/models/forecasting/pipelines/train_predict.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index f7707c4e0d..847d6629c3 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -233,8 +233,8 @@ def run( queue=queue, asset_or_sensor_type="sensor", ) - return jobs - return forecasts_list + + return self.return_values except Exception as e: raise CustomException( f"Error running Train-Predict Pipeline: {e}", sys From 5fa41615d1817fb0ee6dad5e0464f4e8dc324f99 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 19 Nov 2025 03:45:44 +0100 Subject: [PATCH 18/46] Feat: return results from _compute_forecast method in TrainPredictPipeline Signed-off-by: Mohamed Belhsan Hmida --- .../data/models/forecasting/pipelines/train_predict.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 847d6629c3..81d6c5d228 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -139,9 +139,7 @@ def run_cycle( def _compute_forecast(self, **kwargs) -> list[dict[str, Any]]: # Run the train-and-predict pipeline - self.run(**kwargs) - # todo: return results - return [] + return self.run(**kwargs) def run( self, From d797a3fc75461ff81654deca8cfbf4bba0edf71d Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 19 Nov 2025 04:03:22 +0100 Subject: [PATCH 19/46] Feat: append job entries to return_values in TrainPredictPipeline Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/models/forecasting/pipelines/train_predict.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 81d6c5d228..eca0b154e4 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -223,6 +223,8 @@ def run( # Attach job to the corresponding forecast entry if self.return_values: self.return_values[index]['job'] = job + else: + self.return_values.append({f'job-{index}': job}) current_app.queues[queue].enqueue_job(job) current_app.job_cache.add( From 872db8eb049a7ad83500888c80cf93af20a49ea9 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 19 Nov 2025 04:25:56 +0100 Subject: [PATCH 20/46] Feat: store job IDs in return_values for corresponding forecast entries in TrainPredictPipeline Signed-off-by: Mohamed Belhsan Hmida --- .../data/models/forecasting/pipelines/train_predict.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index eca0b154e4..ac40f0e117 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -222,9 +222,9 @@ def run( # Attach job to the corresponding forecast entry if self.return_values: - self.return_values[index]['job'] = job + self.return_values[index]['job'] = job.id else: - self.return_values.append({f'job-{index}': job}) + self.return_values.append({f'job-{index}': job.id}) current_app.queues[queue].enqueue_job(job) current_app.job_cache.add( From 9f878f9bc51800d888748da4bea9f7f4fde5db0d Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 19 Nov 2025 23:00:05 +0100 Subject: [PATCH 21/46] Fix: conditionally assign sensors and source in compute method. this is to fix error when running forecaster data generator it returns only job id's when ran as_job as forecasts haven't been generated yet. Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/models/data_sources.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flexmeasures/data/models/data_sources.py b/flexmeasures/data/models/data_sources.py index f6d356f42c..a4968b6f8d 100644 --- a/flexmeasures/data/models/data_sources.py +++ b/flexmeasures/data/models/data_sources.py @@ -131,7 +131,9 @@ def compute(self, parameters: dict | None = None, **kwargs) -> list[dict[str, An self._parameters = self._parameters_schema.load(self._parameters) results = self._compute(**self._parameters) - results = self._assign_sensors_and_source(results) + + if not self._parameters.get("as_job", False): + results = self._assign_sensors_and_source(results) return results def _assign_sensors_and_source( From c5cbb3fd2d2f0e5bb235414e7fa4e66aa93a2c35 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 19 Nov 2025 23:02:11 +0100 Subject: [PATCH 22/46] Feat: return multiple job IDs from forecasting computation in SensorAPI Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/api/v3_0/sensors.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/flexmeasures/api/v3_0/sensors.py b/flexmeasures/api/v3_0/sensors.py index 6f0ea8226d..f10f9a37ea 100644 --- a/flexmeasures/api/v3_0/sensors.py +++ b/flexmeasures/api/v3_0/sensors.py @@ -1465,17 +1465,19 @@ def trigger_forecast(self, sensor: Sensor): ) # Queue forecasting job - result = forecaster.compute(parameters=parameters) + results = forecaster.compute(parameters=parameters) # Extract job ID (UUID) - job_id = getattr(result, "id", None) + job_ids = [] + for result in results: + job_ids.extend(result.values()) # Commit DB transaction db.session.commit() # Prepare response response = dict( - forecast_job=job_id, + forecast_jobs=job_ids, ) d, s = request_processed() return dict(**response, **d), s From bfe24be79c186aa9f8246cdebd5ba9f99b3fe44c Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 19 Nov 2025 23:07:43 +0100 Subject: [PATCH 23/46] Feat: conditionally check event resolution in Forecaster when not running as job Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/models/forecasting/__init__.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/flexmeasures/data/models/forecasting/__init__.py b/flexmeasures/data/models/forecasting/__init__.py index b8a2d4da97..da5e5ee808 100644 --- a/flexmeasures/data/models/forecasting/__init__.py +++ b/flexmeasures/data/models/forecasting/__init__.py @@ -86,11 +86,12 @@ def _compute(self, check_output_resolution=True, **kwargs) -> list[dict[str, Any results = self._compute_forecast(**kwargs) - for result in results: - # checking that the event_resolution of the output BeliefDataFrame is equal to the one of the output sensor - assert not check_output_resolution or ( - result["sensor"].event_resolution == result["data"].event_resolution - ), f"The resolution of the results ({result['data'].event_resolution}) should match that of the output sensor ({result['sensor'].event_resolution}, ID {result['sensor'].id})." + if not kwargs.get("as_job", False): + for result in results: + # checking that the event_resolution of the output BeliefDataFrame is equal to the one of the output sensor + assert not check_output_resolution or ( + result["sensor"].event_resolution == result["data"].event_resolution + ), f"The resolution of the results ({result['data'].event_resolution}) should match that of the output sensor ({result['sensor'].event_resolution}, ID {result['sensor'].id})." return results From 19b488be744a26ed5baf4d840344de583ad3c9ed Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 19 Nov 2025 23:10:46 +0100 Subject: [PATCH 24/46] style: run pre-commit Signed-off-by: Mohamed Belhsan Hmida --- .../data/models/forecasting/pipelines/train_predict.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index ac40f0e117..25c7ccb5e0 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -134,7 +134,9 @@ def run_cycle( logging.info( f"{p.ordinal(counter)} Train-Predict cycle from {train_start} to {predict_end} completed in {total_runtime:.2f} seconds." ) - self.return_values.append({"data": forecasts, "sensor": self._parameters["target"]}) + self.return_values.append( + {"data": forecasts, "sensor": self._parameters["target"]} + ) return total_runtime def _compute_forecast(self, **kwargs) -> list[dict[str, Any]]: @@ -222,9 +224,9 @@ def run( # Attach job to the corresponding forecast entry if self.return_values: - self.return_values[index]['job'] = job.id + self.return_values[index]["job"] = job.id else: - self.return_values.append({f'job-{index}': job.id}) + self.return_values.append({f"job-{index}": job.id}) current_app.queues[queue].enqueue_job(job) current_app.job_cache.add( From 8df1714543bc121b38290affb4bce613f3d088aa Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Mon, 1 Dec 2025 17:13:35 +0100 Subject: [PATCH 25/46] revert: return default value for consumption_is_positive in sensor power measurement to True Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/api/v3_0/sensors.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flexmeasures/api/v3_0/sensors.py b/flexmeasures/api/v3_0/sensors.py index ec82129f3c..a01ce864ed 100644 --- a/flexmeasures/api/v3_0/sensors.py +++ b/flexmeasures/api/v3_0/sensors.py @@ -1021,8 +1021,8 @@ def get_schedule( # noqa: C901 ) sign = 1 - if sensor.measures_power and not sensor.get_attribute( - "consumption_is_positive", False + if sensor.measures_power and sensor.get_attribute( + "consumption_is_positive", True ): sign = -1 From 938882ee269dc6046e332e6263a8316c29319b2e Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 3 Dec 2025 19:55:22 +0100 Subject: [PATCH 26/46] revert: remove trigger forecasting endpoint that's to be handled in the trigger forecasting endpoint pr Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/api/v3_0/sensors.py | 93 -------------------------------- 1 file changed, 93 deletions(-) diff --git a/flexmeasures/api/v3_0/sensors.py b/flexmeasures/api/v3_0/sensors.py index a01ce864ed..bc7a39bcdc 100644 --- a/flexmeasures/api/v3_0/sensors.py +++ b/flexmeasures/api/v3_0/sensors.py @@ -1508,96 +1508,3 @@ def get_status(self, id, sensor): status_data = serialize_sensor_status_data(sensor=sensor) return {"sensors_data": status_data}, 200 - - @route("//forecasts/trigger", methods=["POST"]) - @use_kwargs({"sensor": SensorIdField()}, location="path") - @permission_required_for_context("read", ctx_arg_name="sensor") - def trigger_forecast(self, sensor: Sensor): - """ - .. :quickref: Forecasts; Trigger forecasting job for one sensor - --- - post: - summary: Trigger forecasting job for one sensor - description: | - Launch a forecasting pipeline for the given sensor asynchronously. - This reuses the same validation as the CLI command `flexmeasures add forecasts`. - - Example: - ``` - { - "sensor": 2092, - "end_date": "2025-10-15T00:00:00+01:00", - "train_period": "P7D", - "retrain_frequency": "PT24H", - "max_forecast_horizon": "PT24H", - "model_save_dir": "flexmeasures/data/models/forecasting/artifacts/models", - "probabilistic": false - } - ``` - responses: - 200: - description: Forecasting job queued - content: - application/json: - example: - status: "PROCESSED" - forecast_job: "b3d26a8a-7a43-4a9f-93e1-fc2a869ea97b" - message: "Forecasting job has been queued." - tags: - - Sensors - """ - - from flask import request - from marshmallow import ValidationError - from flexmeasures.data.models.forecasting import Forecaster - from flexmeasures.cli.utils import get_data_generator - from flexmeasures.api.common.responses import ( - invalid_flex_config, - request_processed, - ) - - try: - # Load and validate JSON payload - parameters = request.get_json() - - # Ensure the forecast is run as a job on a forecasting queue - parameters["as_job"] = True - - # Set up forecaster source - source = parameters.pop("source", None) - - # Set forecaster model - model = parameters.pop("model", "TrainPredictPipeline") - - # Instantiate the forecaster - forecaster = get_data_generator( - source=source, - model=model, - config={}, - save_config=True, - data_generator_type=Forecaster, - ) - - # Queue forecasting job - results = forecaster.compute(parameters=parameters) - - # Extract job ID (UUID) - job_ids = [] - for result in results: - job_ids.extend(result.values()) - - # Commit DB transaction - db.session.commit() - - # Prepare response - response = dict( - forecast_jobs=job_ids, - ) - d, s = request_processed() - return dict(**response, **d), s - - except ValidationError as e: - return invalid_flex_config(e.messages) - except Exception as e: - current_app.logger.exception("Forecast job failed to enqueue.") - return invalid_flex_config(str(e)) From 7b976f93a244f18c600a323f60dba11b75b219f9 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 3 Dec 2025 20:35:35 +0100 Subject: [PATCH 27/46] feat: store result of compute method in pipeline_returns for further validation Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/tests/test_train_predict_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/data/tests/test_train_predict_pipeline.py b/flexmeasures/data/tests/test_train_predict_pipeline.py index 38572f750f..314ccaebb0 100644 --- a/flexmeasures/data/tests/test_train_predict_pipeline.py +++ b/flexmeasures/data/tests/test_train_predict_pipeline.py @@ -172,7 +172,7 @@ def test_train_predict_pipeline( assert expected_error[1] in str(e_info) else: pipeline = TrainPredictPipeline(config=config) - pipeline.compute(parameters=params) + pipeline_returns = pipeline.compute(parameters=params) # Check pipeline properties for attr in ("model",): From b171e75dff9e5fc8bfdd36c24f3c29da2dc574f5 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 3 Dec 2025 20:36:00 +0100 Subject: [PATCH 28/46] feat: add assertion to ensure pipeline returns a list of dict Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/tests/test_train_predict_pipeline.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flexmeasures/data/tests/test_train_predict_pipeline.py b/flexmeasures/data/tests/test_train_predict_pipeline.py index 314ccaebb0..9ea92f381d 100644 --- a/flexmeasures/data/tests/test_train_predict_pipeline.py +++ b/flexmeasures/data/tests/test_train_predict_pipeline.py @@ -203,6 +203,7 @@ def test_train_predict_pipeline( source ), "string representation of the Forecaster (DataSource) should mention the used model" + assert isinstance(pipeline_returns, list), "pipeline should return a list" # Check DataGenerator configuration stored under DataSource attributes data_generator_config = source.attributes["data_generator"]["config"] assert data_generator_config["model"] == "CustomLGBM" From 77d5f194a7e6a39f6e94e9643da06d41d6577b40 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 3 Dec 2025 20:37:27 +0100 Subject: [PATCH 29/46] test: validate pipeline return in case of pipeline run as_job or not Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/tests/test_train_predict_pipeline.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/flexmeasures/data/tests/test_train_predict_pipeline.py b/flexmeasures/data/tests/test_train_predict_pipeline.py index 9ea92f381d..b7dc71cd65 100644 --- a/flexmeasures/data/tests/test_train_predict_pipeline.py +++ b/flexmeasures/data/tests/test_train_predict_pipeline.py @@ -204,6 +204,16 @@ def test_train_predict_pipeline( ), "string representation of the Forecaster (DataSource) should mention the used model" assert isinstance(pipeline_returns, list), "pipeline should return a list" + assert all(isinstance(item, dict) for item in pipeline_returns), "each item should be a dict" + for index, pipeline_return in enumerate(pipeline_returns): + if not dg_params["as_job"]: + assert {"data", "sensor"}.issubset(pipeline_return.keys()) + assert pipeline_return["sensor"].id == dg_params["sensor_to_save"].id, "returned sensor should match sensor that forecasts will be saved into" + # assert forecasts == pipeline_return["data"], "returned data should match stored forecasts" + else: + assert f"job-{index}" in pipeline_return + assert isinstance(pipeline_return[f"job-{index}"], str) + # Check DataGenerator configuration stored under DataSource attributes data_generator_config = source.attributes["data_generator"]["config"] assert data_generator_config["model"] == "CustomLGBM" From 2a8665e259f1ca6481604e41d36d39dcb54f67ab Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 3 Dec 2025 20:38:35 +0100 Subject: [PATCH 30/46] style: run pre-commit Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/tests/test_train_predict_pipeline.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flexmeasures/data/tests/test_train_predict_pipeline.py b/flexmeasures/data/tests/test_train_predict_pipeline.py index b7dc71cd65..9bca08166c 100644 --- a/flexmeasures/data/tests/test_train_predict_pipeline.py +++ b/flexmeasures/data/tests/test_train_predict_pipeline.py @@ -204,11 +204,15 @@ def test_train_predict_pipeline( ), "string representation of the Forecaster (DataSource) should mention the used model" assert isinstance(pipeline_returns, list), "pipeline should return a list" - assert all(isinstance(item, dict) for item in pipeline_returns), "each item should be a dict" + assert all( + isinstance(item, dict) for item in pipeline_returns + ), "each item should be a dict" for index, pipeline_return in enumerate(pipeline_returns): if not dg_params["as_job"]: assert {"data", "sensor"}.issubset(pipeline_return.keys()) - assert pipeline_return["sensor"].id == dg_params["sensor_to_save"].id, "returned sensor should match sensor that forecasts will be saved into" + assert ( + pipeline_return["sensor"].id == dg_params["sensor_to_save"].id + ), "returned sensor should match sensor that forecasts will be saved into" # assert forecasts == pipeline_return["data"], "returned data should match stored forecasts" else: assert f"job-{index}" in pipeline_return From 893a9cd8c7c985394ac49d9317f7347698914e39 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Thu, 4 Dec 2025 01:51:32 +0100 Subject: [PATCH 31/46] feat: enhance assertion message for pipeline return validation Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/tests/test_train_predict_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/data/tests/test_train_predict_pipeline.py b/flexmeasures/data/tests/test_train_predict_pipeline.py index 9bca08166c..9a6f64e15e 100644 --- a/flexmeasures/data/tests/test_train_predict_pipeline.py +++ b/flexmeasures/data/tests/test_train_predict_pipeline.py @@ -209,7 +209,7 @@ def test_train_predict_pipeline( ), "each item should be a dict" for index, pipeline_return in enumerate(pipeline_returns): if not dg_params["as_job"]: - assert {"data", "sensor"}.issubset(pipeline_return.keys()) + assert {"data", "sensor"}.issubset(pipeline_return.keys()), "returned dict should have data and sensor keys" assert ( pipeline_return["sensor"].id == dg_params["sensor_to_save"].id ), "returned sensor should match sensor that forecasts will be saved into" From dfceb2dc7bf2b417162a80b2c69bc2a6e62a5855 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Thu, 4 Dec 2025 01:52:12 +0100 Subject: [PATCH 32/46] feat: improve data comparison in train predict pipeline test with pandas. testes saved forecasts and returned forecasts Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/tests/test_train_predict_pipeline.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/flexmeasures/data/tests/test_train_predict_pipeline.py b/flexmeasures/data/tests/test_train_predict_pipeline.py index 9a6f64e15e..ff013baef8 100644 --- a/flexmeasures/data/tests/test_train_predict_pipeline.py +++ b/flexmeasures/data/tests/test_train_predict_pipeline.py @@ -3,7 +3,7 @@ import pytest import logging - +import pandas as pd from datetime import timedelta from marshmallow import ValidationError @@ -213,7 +213,13 @@ def test_train_predict_pipeline( assert ( pipeline_return["sensor"].id == dg_params["sensor_to_save"].id ), "returned sensor should match sensor that forecasts will be saved into" - # assert forecasts == pipeline_return["data"], "returned data should match stored forecasts" + try: + pd.testing.assert_frame_equal( + forecasts.sort_index(), + pipeline_return["data"].sort_index(), + ) + except AssertionError as e: + raise AssertionError(f"returned data should match stored forecasts: {e}") else: assert f"job-{index}" in pipeline_return assert isinstance(pipeline_return[f"job-{index}"], str) From 9dc53d03653d9d23bafc04ac923f092d5d542251 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Thu, 4 Dec 2025 01:53:37 +0100 Subject: [PATCH 33/46] style: run pre-commit Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/tests/test_train_predict_pipeline.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flexmeasures/data/tests/test_train_predict_pipeline.py b/flexmeasures/data/tests/test_train_predict_pipeline.py index ff013baef8..9ceaf5b84a 100644 --- a/flexmeasures/data/tests/test_train_predict_pipeline.py +++ b/flexmeasures/data/tests/test_train_predict_pipeline.py @@ -209,7 +209,9 @@ def test_train_predict_pipeline( ), "each item should be a dict" for index, pipeline_return in enumerate(pipeline_returns): if not dg_params["as_job"]: - assert {"data", "sensor"}.issubset(pipeline_return.keys()), "returned dict should have data and sensor keys" + assert {"data", "sensor"}.issubset( + pipeline_return.keys() + ), "returned dict should have data and sensor keys" assert ( pipeline_return["sensor"].id == dg_params["sensor_to_save"].id ), "returned sensor should match sensor that forecasts will be saved into" @@ -219,7 +221,9 @@ def test_train_predict_pipeline( pipeline_return["data"].sort_index(), ) except AssertionError as e: - raise AssertionError(f"returned data should match stored forecasts: {e}") + raise AssertionError( + f"returned data should match stored forecasts: {e}" + ) else: assert f"job-{index}" in pipeline_return assert isinstance(pipeline_return[f"job-{index}"], str) From 3fcaa445280f79e5d0c1f2a355949fb8a6265f1a Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Thu, 4 Dec 2025 02:01:14 +0100 Subject: [PATCH 34/46] style: ignore complexity warning for test_train_predict_pipeline function. to do will be simplified Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/tests/test_train_predict_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/data/tests/test_train_predict_pipeline.py b/flexmeasures/data/tests/test_train_predict_pipeline.py index 9ceaf5b84a..d5528aa9f0 100644 --- a/flexmeasures/data/tests/test_train_predict_pipeline.py +++ b/flexmeasures/data/tests/test_train_predict_pipeline.py @@ -133,7 +133,7 @@ # ) ], ) -def test_train_predict_pipeline( +def test_train_predict_pipeline( # noqa: C901 app, setup_fresh_test_forecast_data, config, # config passed to the Forecaster From a540585186f82e7a5af101ea69c09815b5b0d779 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida <149331360+BelhsanHmida@users.noreply.github.com> Date: Thu, 4 Dec 2025 21:30:38 +0100 Subject: [PATCH 35/46] Update flexmeasures/data/models/forecasting/pipelines/train_predict.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Mohamed Belhsan Hmida <149331360+BelhsanHmida@users.noreply.github.com> --- .../data/models/forecasting/pipelines/train_predict.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 3c68573188..d08888cf80 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -222,11 +222,8 @@ def run( ), ) - # Attach job to the corresponding forecast entry - if self.return_values: - self.return_values[index]["job"] = job.id - else: - self.return_values.append({f"job-{index}": job.id}) + # Store the job ID for this cycle + self.return_values.append({f"job-{index}": job.id}) current_app.queues[queue].enqueue_job(job) current_app.job_cache.add( From fa48ef2d7738e6476d208129bf70352bb0a26538 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida <149331360+BelhsanHmida@users.noreply.github.com> Date: Fri, 5 Dec 2025 16:18:53 +0100 Subject: [PATCH 36/46] Update flexmeasures/data/tests/test_train_predict_pipeline.py Co-authored-by: Felix Claessen <30658763+Flix6x@users.noreply.github.com> Signed-off-by: Mohamed Belhsan Hmida <149331360+BelhsanHmida@users.noreply.github.com> --- flexmeasures/data/tests/test_train_predict_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/data/tests/test_train_predict_pipeline.py b/flexmeasures/data/tests/test_train_predict_pipeline.py index d5528aa9f0..36c9168175 100644 --- a/flexmeasures/data/tests/test_train_predict_pipeline.py +++ b/flexmeasures/data/tests/test_train_predict_pipeline.py @@ -203,7 +203,7 @@ def test_train_predict_pipeline( # noqa: C901 source ), "string representation of the Forecaster (DataSource) should mention the used model" - assert isinstance(pipeline_returns, list), "pipeline should return a list" + assert isinstance(pipeline_returns, list) and len(pipeline_returns) > 0, "pipeline should return a non-empty list" assert all( isinstance(item, dict) for item in pipeline_returns ), "each item should be a dict" From e34f4aca2138161def12bea06b560d4ae8eace70 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Fri, 5 Dec 2025 16:23:43 +0100 Subject: [PATCH 37/46] chore: annotate run() with BeliefsDataFrame return type Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/models/forecasting/pipelines/predict.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/predict.py b/flexmeasures/data/models/forecasting/pipelines/predict.py index 92f3ff3b76..de380f7746 100644 --- a/flexmeasures/data/models/forecasting/pipelines/predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/predict.py @@ -10,6 +10,7 @@ import pandas as pd from darts import TimeSeries from isodate import duration_isoformat +from timely_beliefs import BeliefsDataFrame from flexmeasures import Sensor, Source from flexmeasures.data import db @@ -267,7 +268,7 @@ def save_results_to_CSV(self, df_pred: pd.DataFrame): except Exception as e: raise CustomException(f"Error saving predictions: {e}", sys) from e - def run(self, delete_model: bool = False): + def run(self, delete_model: bool = False) -> BeliefsDataFrame: """ Execute the prediction pipeline. """ From 82ac0be72f41bcddb147abd76cc4c0f3816047cd Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Fri, 5 Dec 2025 16:30:58 +0100 Subject: [PATCH 38/46] test: remove try/except around DataFrame comparison to show original error Signed-off-by: Mohamed Belhsan Hmida --- .../data/tests/test_train_predict_pipeline.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/flexmeasures/data/tests/test_train_predict_pipeline.py b/flexmeasures/data/tests/test_train_predict_pipeline.py index 36c9168175..76c202b469 100644 --- a/flexmeasures/data/tests/test_train_predict_pipeline.py +++ b/flexmeasures/data/tests/test_train_predict_pipeline.py @@ -215,15 +215,10 @@ def test_train_predict_pipeline( # noqa: C901 assert ( pipeline_return["sensor"].id == dg_params["sensor_to_save"].id ), "returned sensor should match sensor that forecasts will be saved into" - try: - pd.testing.assert_frame_equal( - forecasts.sort_index(), - pipeline_return["data"].sort_index(), - ) - except AssertionError as e: - raise AssertionError( - f"returned data should match stored forecasts: {e}" - ) + pd.testing.assert_frame_equal( + forecasts.sort_index(), + pipeline_return["data"].sort_index(), + ) else: assert f"job-{index}" in pipeline_return assert isinstance(pipeline_return[f"job-{index}"], str) From 3e1f5faccb97bd94bd9c389e91121b23257f878d Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Fri, 5 Dec 2025 17:37:30 +0100 Subject: [PATCH 39/46] test: enhance job existence and completion checks in train_predict_pipeline test Signed-off-by: Mohamed Belhsan Hmida --- .../data/tests/test_train_predict_pipeline.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/flexmeasures/data/tests/test_train_predict_pipeline.py b/flexmeasures/data/tests/test_train_predict_pipeline.py index 76c202b469..64a0ac4e0b 100644 --- a/flexmeasures/data/tests/test_train_predict_pipeline.py +++ b/flexmeasures/data/tests/test_train_predict_pipeline.py @@ -220,8 +220,15 @@ def test_train_predict_pipeline( # noqa: C901 pipeline_return["data"].sort_index(), ) else: - assert f"job-{index}" in pipeline_return - assert isinstance(pipeline_return[f"job-{index}"], str) + job_id = pipeline_return[f"job-{index}"] + + # Check the job exists in the queue or registries + job = app.queues["forecasting"].fetch_job(job_id) + assert job is not None, f"Job {job_id} should exist" + + # Check it's finished + finished_jobs = app.queues["forecasting"].finished_job_registry + assert job_id in finished_jobs, f"Job {job_id} should be in the finished registry" # Check DataGenerator configuration stored under DataSource attributes data_generator_config = source.attributes["data_generator"]["config"] From 8da32b878614cabc06d760f380a29ef87af1fb58 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Fri, 5 Dec 2025 17:52:47 +0100 Subject: [PATCH 40/46] cli: enhance train_predict_pipeline to show detailed success messages - Reports number of forecast jobs created when --as-job is used - Reports number of forecast beliefs and unique belief times for direct computation - Displays error message if no forecasts or jobs were create Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/cli/data_add.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/flexmeasures/cli/data_add.py b/flexmeasures/cli/data_add.py index 58d689b85b..69929e1106 100755 --- a/flexmeasures/cli/data_add.py +++ b/flexmeasures/cli/data_add.py @@ -1249,8 +1249,30 @@ def train_predict_pipeline( ) try: - forecaster.compute(parameters=parameters) - click.secho("Successfully computed forecasts.", **MsgStyle.SUCCESS) + pipeline_returns = forecaster.compute(parameters=parameters) + + # Empty result + if not pipeline_returns: + click.secho("No forecasts or jobs were created.", **MsgStyle.ERROR) + return + + # as_job case → list of job dicts like {"job-1": ""} + if parameters.get("as_job"): + n_jobs = len(pipeline_returns) + click.secho(f"Created {n_jobs} forecasting job(s).", **MsgStyle.SUCCESS) + return + + # direct computation: list of dicts containing BeliefsDataFrames + total_beliefs = sum(len(item["data"]) for item in pipeline_returns) + unique_belief_times = { + ts + for item in pipeline_returns + for ts in item["data"].belief_time.unique() + } + click.secho( + f"Successfully created {total_beliefs} forecast beliefs across {len(unique_belief_times)} unique belief times.", + **MsgStyle.SUCCESS, + ) except Exception as e: click.echo(f"Error running Train-Predict Pipeline: {str(e)}") From 1b3e500bcc78f9a90f0fc7c6a3a103c49dd7ac3d Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Fri, 5 Dec 2025 17:57:30 +0100 Subject: [PATCH 41/46] style: run pre-commit Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/cli/data_add.py | 4 +--- flexmeasures/data/tests/test_train_predict_pipeline.py | 8 ++++++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/flexmeasures/cli/data_add.py b/flexmeasures/cli/data_add.py index 69929e1106..3f4f20814e 100755 --- a/flexmeasures/cli/data_add.py +++ b/flexmeasures/cli/data_add.py @@ -1265,9 +1265,7 @@ def train_predict_pipeline( # direct computation: list of dicts containing BeliefsDataFrames total_beliefs = sum(len(item["data"]) for item in pipeline_returns) unique_belief_times = { - ts - for item in pipeline_returns - for ts in item["data"].belief_time.unique() + ts for item in pipeline_returns for ts in item["data"].belief_time.unique() } click.secho( f"Successfully created {total_beliefs} forecast beliefs across {len(unique_belief_times)} unique belief times.", diff --git a/flexmeasures/data/tests/test_train_predict_pipeline.py b/flexmeasures/data/tests/test_train_predict_pipeline.py index 64a0ac4e0b..3292c915bb 100644 --- a/flexmeasures/data/tests/test_train_predict_pipeline.py +++ b/flexmeasures/data/tests/test_train_predict_pipeline.py @@ -203,7 +203,9 @@ def test_train_predict_pipeline( # noqa: C901 source ), "string representation of the Forecaster (DataSource) should mention the used model" - assert isinstance(pipeline_returns, list) and len(pipeline_returns) > 0, "pipeline should return a non-empty list" + assert ( + isinstance(pipeline_returns, list) and len(pipeline_returns) > 0 + ), "pipeline should return a non-empty list" assert all( isinstance(item, dict) for item in pipeline_returns ), "each item should be a dict" @@ -228,7 +230,9 @@ def test_train_predict_pipeline( # noqa: C901 # Check it's finished finished_jobs = app.queues["forecasting"].finished_job_registry - assert job_id in finished_jobs, f"Job {job_id} should be in the finished registry" + assert ( + job_id in finished_jobs + ), f"Job {job_id} should be in the finished registry" # Check DataGenerator configuration stored under DataSource attributes data_generator_config = source.attributes["data_generator"]["config"] From c29c870563c4bb5138a621f00b019bd8aa46b3cf Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Fri, 5 Dec 2025 18:00:38 +0100 Subject: [PATCH 42/46] docs: update CLI changelog with TrainPredictPipeline return values feature and cli response update Signed-off-by: Mohamed Belhsan Hmida --- documentation/cli/change_log.rst | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/documentation/cli/change_log.rst b/documentation/cli/change_log.rst index 912c87e399..6a32916238 100644 --- a/documentation/cli/change_log.rst +++ b/documentation/cli/change_log.rst @@ -6,6 +6,13 @@ FlexMeasures CLI Changelog since v0.30.0 | December 02, 2025 ================================= +* Enhance `flexmeasures add forecasts` CLI command to display meaningful success messages: + - Shows number of forecast jobs created when `--as-job` is used. + - Shows number of forecast beliefs and unique belief times created when forecasts are computed directly. + - Displays an error message if no forecasts or jobs are produced. +* `TrainPredictPipeline.compute()` now returns the actual results instead of an empty list: + - Direct computation (`--as-job=False`) returns dictionaries containing `data` (BeliefsDataFrame) and `sensor` (Sensor object). + - Queued computation (`--as-job=True`) returns a list of job dictionaries with job IDs. * Major overhaul of ``flexmeasures add forecasts`` (use the ``--help`` option to learn more). * Fix the interpretation of ``--start-offset`` and ``--end-offset`` options in ``flexmeasures add report``, which was not using the correct time at which the command was run; instead, that time was off by and offset corresponding to the offset between the ``FLEXMEASURES_TIMEZONE`` config setting and :abbr:`UTC (Coordinated Universal Time)`). From 1790ffcedfd747bf54adda60f35453d5dac3fa8e Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Sat, 6 Dec 2025 02:14:03 +0100 Subject: [PATCH 43/46] Revert "docs: update CLI changelog with TrainPredictPipeline return values feature and cli response update" This reverts commit c29c870563c4bb5138a621f00b019bd8aa46b3cf. --- documentation/cli/change_log.rst | 7 ------- 1 file changed, 7 deletions(-) diff --git a/documentation/cli/change_log.rst b/documentation/cli/change_log.rst index 6a32916238..912c87e399 100644 --- a/documentation/cli/change_log.rst +++ b/documentation/cli/change_log.rst @@ -6,13 +6,6 @@ FlexMeasures CLI Changelog since v0.30.0 | December 02, 2025 ================================= -* Enhance `flexmeasures add forecasts` CLI command to display meaningful success messages: - - Shows number of forecast jobs created when `--as-job` is used. - - Shows number of forecast beliefs and unique belief times created when forecasts are computed directly. - - Displays an error message if no forecasts or jobs are produced. -* `TrainPredictPipeline.compute()` now returns the actual results instead of an empty list: - - Direct computation (`--as-job=False`) returns dictionaries containing `data` (BeliefsDataFrame) and `sensor` (Sensor object). - - Queued computation (`--as-job=True`) returns a list of job dictionaries with job IDs. * Major overhaul of ``flexmeasures add forecasts`` (use the ``--help`` option to learn more). * Fix the interpretation of ``--start-offset`` and ``--end-offset`` options in ``flexmeasures add report``, which was not using the correct time at which the command was run; instead, that time was off by and offset corresponding to the offset between the ``FLEXMEASURES_TIMEZONE`` config setting and :abbr:`UTC (Coordinated Universal Time)`). From 7e1671c4d76c76ea748a11e871ec5cce32bd5b6f Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Sat, 6 Dec 2025 02:17:36 +0100 Subject: [PATCH 44/46] docs: update changelog to include improved success and error messages for `flexmeasures add forecasts` CLI command Signed-off-by: Mohamed Belhsan Hmida --- documentation/changelog.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/documentation/changelog.rst b/documentation/changelog.rst index 173b5e7a86..a0b1329401 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -38,6 +38,7 @@ v0.30.0 | December 02, 2025 New features ------------- +* Improve `flexmeasures add forecasts` CLI command with clearer success and error messages when creating jobs or beliefs [see `PR #1822 `]. * Major overhaul of ``flexmeasures add forecasts`` (use the ``--help`` option to learn more) [see `PR #1546 `_, `PR #1744 `_ and `PR #1834 `_] * The new ``commitments`` field in the ``flex-context`` can be used to specify previous commitments (e.g. market positions) that the scheduler needs to take into account [see `PR #1754 `_ and `PR #1793 `_] * Many more field descriptions in Swagger, including flex-model and flex-context fields [see `PR #1777 `_ and `PR #1841 `_] From 9ca8e5915ac7911d7f6ee190881ef784197de176 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Sat, 6 Dec 2025 17:38:32 +0100 Subject: [PATCH 45/46] docs: fix formatting of changelog entry Signed-off-by: Mohamed Belhsan Hmida --- documentation/changelog.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/documentation/changelog.rst b/documentation/changelog.rst index a0b1329401..10b64c0f59 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -38,7 +38,7 @@ v0.30.0 | December 02, 2025 New features ------------- -* Improve `flexmeasures add forecasts` CLI command with clearer success and error messages when creating jobs or beliefs [see `PR #1822 `]. +* Improve ``flexmeasures add forecasts`` CLI command with clearer success and error messages when creating jobs or beliefs [see `PR #1822 `]. * Major overhaul of ``flexmeasures add forecasts`` (use the ``--help`` option to learn more) [see `PR #1546 `_, `PR #1744 `_ and `PR #1834 `_] * The new ``commitments`` field in the ``flex-context`` can be used to specify previous commitments (e.g. market positions) that the scheduler needs to take into account [see `PR #1754 `_ and `PR #1793 `_] * Many more field descriptions in Swagger, including flex-model and flex-context fields [see `PR #1777 `_ and `PR #1841 `_] From d420295b875e7a66f2e4398292050ece2a517032 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Sat, 6 Dec 2025 17:40:19 +0100 Subject: [PATCH 46/46] docs: add _ to github pr link Signed-off-by: Mohamed Belhsan Hmida --- documentation/changelog.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/documentation/changelog.rst b/documentation/changelog.rst index 10b64c0f59..c90faf215b 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -38,7 +38,7 @@ v0.30.0 | December 02, 2025 New features ------------- -* Improve ``flexmeasures add forecasts`` CLI command with clearer success and error messages when creating jobs or beliefs [see `PR #1822 `]. +* Improve ``flexmeasures add forecasts`` CLI command with clearer success and error messages when creating jobs or beliefs [see `PR #1822 `_]. * Major overhaul of ``flexmeasures add forecasts`` (use the ``--help`` option to learn more) [see `PR #1546 `_, `PR #1744 `_ and `PR #1834 `_] * The new ``commitments`` field in the ``flex-context`` can be used to specify previous commitments (e.g. market positions) that the scheduler needs to take into account [see `PR #1754 `_ and `PR #1793 `_] * Many more field descriptions in Swagger, including flex-model and flex-context fields [see `PR #1777 `_ and `PR #1841 `_]