diff --git a/documentation/changelog.rst b/documentation/changelog.rst index f3ce1dbcae..4e814a4b00 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -39,6 +39,7 @@ v0.30.0 | December 02, 2025 New features ------------- +* Improve ``flexmeasures add forecasts`` CLI command with clearer success and error messages when creating jobs or beliefs [see `PR #1822 `_]. * Major overhaul of ``flexmeasures add forecasts`` (use the ``--help`` option to learn more) [see `PR #1546 `_, `PR #1744 `_ and `PR #1834 `_] * The new ``commitments`` field in the ``flex-context`` can be used to specify previous commitments (e.g. market positions) that the scheduler needs to take into account [see `PR #1754 `_ and `PR #1793 `_] * Many more field descriptions in Swagger, including flex-model and flex-context fields [see `PR #1777 `_ and `PR #1841 `_] diff --git a/flexmeasures/api/v3_0/sensors.py b/flexmeasures/api/v3_0/sensors.py index 15ade3ec27..bc7a39bcdc 100644 --- a/flexmeasures/api/v3_0/sensors.py +++ b/flexmeasures/api/v3_0/sensors.py @@ -63,7 +63,6 @@ from flexmeasures.utils.time_utils import duration_isoformat from flexmeasures.utils.flexmeasures_inflection import join_words_into_a_list - # Instantiate schemes outside of endpoint logic to minimize response time sensors_schema = SensorSchema(many=True) sensor_schema = SensorSchema() diff --git a/flexmeasures/cli/data_add.py b/flexmeasures/cli/data_add.py index 58d689b85b..3f4f20814e 100755 --- a/flexmeasures/cli/data_add.py +++ b/flexmeasures/cli/data_add.py @@ -1249,8 +1249,28 @@ def train_predict_pipeline( ) try: - forecaster.compute(parameters=parameters) - click.secho("Successfully computed forecasts.", **MsgStyle.SUCCESS) + pipeline_returns = forecaster.compute(parameters=parameters) + + # Empty result + if not pipeline_returns: + click.secho("No forecasts or jobs were created.", **MsgStyle.ERROR) + return + + # as_job case → list of job dicts like {"job-1": ""} + if parameters.get("as_job"): + n_jobs = len(pipeline_returns) + click.secho(f"Created {n_jobs} forecasting job(s).", **MsgStyle.SUCCESS) + return + + # direct computation: list of dicts containing BeliefsDataFrames + total_beliefs = sum(len(item["data"]) for item in pipeline_returns) + unique_belief_times = { + ts for item in pipeline_returns for ts in item["data"].belief_time.unique() + } + click.secho( + f"Successfully created {total_beliefs} forecast beliefs across {len(unique_belief_times)} unique belief times.", + **MsgStyle.SUCCESS, + ) except Exception as e: click.echo(f"Error running Train-Predict Pipeline: {str(e)}") diff --git a/flexmeasures/data/models/data_sources.py b/flexmeasures/data/models/data_sources.py index b150092567..ce949c7524 100644 --- a/flexmeasures/data/models/data_sources.py +++ b/flexmeasures/data/models/data_sources.py @@ -131,7 +131,9 @@ def compute(self, parameters: dict | None = None, **kwargs) -> list[dict[str, An self._parameters = self._parameters_schema.load(self._parameters) results = self._compute(**self._parameters) - results = self._assign_sensors_and_source(results) + + if not self._parameters.get("as_job", False): + results = self._assign_sensors_and_source(results) return results def _assign_sensors_and_source( diff --git a/flexmeasures/data/models/forecasting/__init__.py b/flexmeasures/data/models/forecasting/__init__.py index c33a2abd1a..8b28a31aa5 100644 --- a/flexmeasures/data/models/forecasting/__init__.py +++ b/flexmeasures/data/models/forecasting/__init__.py @@ -86,11 +86,12 @@ def _compute(self, check_output_resolution=True, **kwargs) -> list[dict[str, Any results = self._compute_forecast(**kwargs) - for result in results: - # checking that the event_resolution of the output BeliefDataFrame is equal to the one of the output sensor - assert not check_output_resolution or ( - result["sensor"].event_resolution == result["data"].event_resolution - ), f"The resolution of the results ({result['data'].event_resolution}) should match that of the output sensor ({result['sensor'].event_resolution}, ID {result['sensor'].id})." + if not kwargs.get("as_job", False): + for result in results: + # checking that the event_resolution of the output BeliefDataFrame is equal to the one of the output sensor + assert not check_output_resolution or ( + result["sensor"].event_resolution == result["data"].event_resolution + ), f"The resolution of the results ({result['data'].event_resolution}) should match that of the output sensor ({result['sensor'].event_resolution}, ID {result['sensor'].id})." return results diff --git a/flexmeasures/data/models/forecasting/pipelines/predict.py b/flexmeasures/data/models/forecasting/pipelines/predict.py index cd7103a5c3..de380f7746 100644 --- a/flexmeasures/data/models/forecasting/pipelines/predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/predict.py @@ -10,6 +10,7 @@ import pandas as pd from darts import TimeSeries from isodate import duration_isoformat +from timely_beliefs import BeliefsDataFrame from flexmeasures import Sensor, Source from flexmeasures.data import db @@ -267,7 +268,7 @@ def save_results_to_CSV(self, df_pred: pd.DataFrame): except Exception as e: raise CustomException(f"Error saving predictions: {e}", sys) from e - def run(self, delete_model: bool = False): + def run(self, delete_model: bool = False) -> BeliefsDataFrame: """ Execute the prediction pipeline. """ @@ -315,5 +316,7 @@ def run(self, delete_model: bool = False): os.remove(self.model_path) logging.info("Prediction pipeline completed successfully.") + + return bdf except Exception as e: raise CustomException(f"Error running pipeline: {e}", sys) from e diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 3ef33b21c6..d08888cf80 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -44,6 +44,7 @@ def __init__( for k, v in self._config.items(): setattr(self, k, v) self.delete_model = delete_model + self.return_values = [] # To store forecasts and jobs def run_cycle( self, @@ -121,7 +122,7 @@ def run_cycle( f"Prediction cycle from {predict_start} to {predict_end} started ..." ) predict_start_time = time.time() - predict_pipeline.run(delete_model=self.delete_model) + forecasts = predict_pipeline.run(delete_model=self.delete_model) predict_runtime = time.time() - predict_start_time logging.info( f"{p.ordinal(counter)} Prediction cycle completed in {predict_runtime:.2f} seconds. " @@ -133,14 +134,14 @@ def run_cycle( logging.info( f"{p.ordinal(counter)} Train-Predict cycle from {train_start} to {predict_end} completed in {total_runtime:.2f} seconds." ) - + self.return_values.append( + {"data": forecasts, "sensor": self._parameters["target"]} + ) return total_runtime def _compute_forecast(self, **kwargs) -> list[dict[str, Any]]: # Run the train-and-predict pipeline - self.run(**kwargs) - # todo: return results - return [] + return self.run(**kwargs) def run( self, @@ -209,8 +210,7 @@ def run( ) if as_job: - jobs = [] - for param in cycles_job_params: + for index, param in enumerate(cycles_job_params): job = Job.create( self.run_cycle, kwargs={**param, **job_kwargs}, @@ -222,7 +222,8 @@ def run( ), ) - jobs.append(job) + # Store the job ID for this cycle + self.return_values.append({f"job-{index}": job.id}) current_app.queues[queue].enqueue_job(job) current_app.job_cache.add( @@ -231,7 +232,8 @@ def run( queue=queue, asset_or_sensor_type="sensor", ) - return jobs + + return self.return_values except Exception as e: raise CustomException( f"Error running Train-Predict Pipeline: {e}", sys diff --git a/flexmeasures/data/tests/test_train_predict_pipeline.py b/flexmeasures/data/tests/test_train_predict_pipeline.py index 38572f750f..3292c915bb 100644 --- a/flexmeasures/data/tests/test_train_predict_pipeline.py +++ b/flexmeasures/data/tests/test_train_predict_pipeline.py @@ -3,7 +3,7 @@ import pytest import logging - +import pandas as pd from datetime import timedelta from marshmallow import ValidationError @@ -133,7 +133,7 @@ # ) ], ) -def test_train_predict_pipeline( +def test_train_predict_pipeline( # noqa: C901 app, setup_fresh_test_forecast_data, config, # config passed to the Forecaster @@ -172,7 +172,7 @@ def test_train_predict_pipeline( assert expected_error[1] in str(e_info) else: pipeline = TrainPredictPipeline(config=config) - pipeline.compute(parameters=params) + pipeline_returns = pipeline.compute(parameters=params) # Check pipeline properties for attr in ("model",): @@ -203,6 +203,37 @@ def test_train_predict_pipeline( source ), "string representation of the Forecaster (DataSource) should mention the used model" + assert ( + isinstance(pipeline_returns, list) and len(pipeline_returns) > 0 + ), "pipeline should return a non-empty list" + assert all( + isinstance(item, dict) for item in pipeline_returns + ), "each item should be a dict" + for index, pipeline_return in enumerate(pipeline_returns): + if not dg_params["as_job"]: + assert {"data", "sensor"}.issubset( + pipeline_return.keys() + ), "returned dict should have data and sensor keys" + assert ( + pipeline_return["sensor"].id == dg_params["sensor_to_save"].id + ), "returned sensor should match sensor that forecasts will be saved into" + pd.testing.assert_frame_equal( + forecasts.sort_index(), + pipeline_return["data"].sort_index(), + ) + else: + job_id = pipeline_return[f"job-{index}"] + + # Check the job exists in the queue or registries + job = app.queues["forecasting"].fetch_job(job_id) + assert job is not None, f"Job {job_id} should exist" + + # Check it's finished + finished_jobs = app.queues["forecasting"].finished_job_registry + assert ( + job_id in finished_jobs + ), f"Job {job_id} should be in the finished registry" + # Check DataGenerator configuration stored under DataSource attributes data_generator_config = source.attributes["data_generator"]["config"] assert data_generator_config["model"] == "CustomLGBM"