diff --git a/documentation/tut/forecasting_scheduling.rst b/documentation/tut/forecasting_scheduling.rst index 4d6e043ca9..1cc24d4cc6 100644 --- a/documentation/tut/forecasting_scheduling.rst +++ b/documentation/tut/forecasting_scheduling.rst @@ -104,9 +104,8 @@ There are two ways to queue a forecasting job: .. code-block:: json { - "start_date": "2025-01-01T00:00:00+00:00", - "start_predict_date": "2025-01-04T00:00:00+00:00", - "end_date": "2025-01-04T04:00:00+00:00" + "start": "2025-01-04T00:00:00+00:00", + "duration": "PT4H" } Example response: diff --git a/flexmeasures/api/common/schemas/utils.py b/flexmeasures/api/common/schemas/utils.py index 4f46ef9f81..536e062600 100644 --- a/flexmeasures/api/common/schemas/utils.py +++ b/flexmeasures/api/common/schemas/utils.py @@ -4,7 +4,10 @@ from marshmallow import Schema, fields from flexmeasures.utils.doc_utils import rst_to_openapi -from flexmeasures.data.schemas.forecasting.pipeline import ForecastingTriggerSchema +from flexmeasures.data.schemas.forecasting.pipeline import ( + ForecastingTriggerSchema, + TrainPredictPipelineConfigSchema, +) from flexmeasures.data.schemas.sensors import ( SensorReferenceSchema, VariableQuantityField, @@ -12,7 +15,7 @@ ) -def make_openapi_compatible(schema_cls: Type[Schema]) -> Type[Schema]: +def make_openapi_compatible(schema_cls: Type[Schema]) -> Type[Schema]: # noqa: C901 """ Create an OpenAPI-compatible version of a Marshmallow schema. @@ -28,11 +31,24 @@ def make_openapi_compatible(schema_cls: Type[Schema]) -> Type[Schema]: new_fields = {} for name, field in schema_cls._declared_fields.items(): - if schema_cls == ForecastingTriggerSchema: + if schema_cls in (ForecastingTriggerSchema, TrainPredictPipelineConfigSchema): if "cli" in field.metadata and field.metadata["cli"].get( "cli-exclusive", False ): continue + if isinstance(field, fields.Nested): + nested_schema_cls = type(field.schema) + if nested_schema_cls is TrainPredictPipelineConfigSchema: + field_copy = fields.Nested( + make_openapi_compatible(nested_schema_cls), + metadata=field.metadata, + data_key=field.data_key, + many=field.many, + required=field.required, + allow_none=field.allow_none, + ) + new_fields[name] = field_copy + continue # Copy metadata, but sanitize description for OpenAPI metadata = dict(getattr(field, "metadata", {})) diff --git a/flexmeasures/api/v3_0/__init__.py b/flexmeasures/api/v3_0/__init__.py index dd4c5a487a..c85e777ba4 100644 --- a/flexmeasures/api/v3_0/__init__.py +++ b/flexmeasures/api/v3_0/__init__.py @@ -140,7 +140,7 @@ def create_openapi_specs(app: Flask): # Explicitly register OpenAPI-compatible schemas schemas = [ ("FlexContextOpenAPISchema", flex_context_schema_openAPI), - ("forecaster_parameters_schema_openAPI", forecasting_trigger_schema_openAPI), + ("forecasting_trigger_schema_openAPI", forecasting_trigger_schema_openAPI), ("UserAPIQuerySchema", UserAPIQuerySchema), ("AssetAPIQuerySchema", AssetAPIQuerySchema), ("AssetSchema", AssetSchema), diff --git a/flexmeasures/api/v3_0/sensors.py b/flexmeasures/api/v3_0/sensors.py index a418beb1cf..15f446f4e7 100644 --- a/flexmeasures/api/v3_0/sensors.py +++ b/flexmeasures/api/v3_0/sensors.py @@ -82,7 +82,17 @@ partial_sensor_schema = SensorSchema(partial=True, exclude=["generic_asset_id"]) # Create ForecasterParametersSchema OpenAPI compatible schema -forecasting_trigger_schema_openAPI = make_openapi_compatible(ForecastingTriggerSchema) +EXCLUDED_FORECASTING_FIELDS = [ + # todo: hide these in the config schema instead + # "train_period", + # "max_training_period", + "sensor_to_save", +] +forecasting_trigger_schema_openAPI = make_openapi_compatible(ForecastingTriggerSchema)( + # partial=True, + exclude=EXCLUDED_FORECASTING_FIELDS + + ["sensor"], +) class SensorKwargsSchema(Schema): @@ -1524,7 +1534,10 @@ def get_status(self, id, sensor): @route("//forecasts/trigger", methods=["POST"]) @use_args( - ForecastingTriggerSchema(), + ForecastingTriggerSchema( + # partial=True, + exclude=EXCLUDED_FORECASTING_FIELDS, + ), location="combined_sensor_data_description", as_kwargs=True, ) @@ -1558,11 +1571,10 @@ def trigger_forecast(self, id: int, **params): required: true content: application/json: - schema: forecaster_parameters_schema_openAPI + schema: forecasting_trigger_schema_openAPI example: - start-date: "2026-01-01T00:00:00+01:00" - start-predict-date: "2026-01-15T00:00:00+01:00" - end-date: "2026-01-17T00:00:00+01:00" + start: "2026-01-15T00:00:00+01:00" + duration: "P2D" responses: 200: description: PROCESSED @@ -1609,7 +1621,7 @@ def trigger_forecast(self, id: int, **params): forecaster = get_data_generator( source=None, model=model, - config={}, + config=parameters.pop("config", {}), save_config=True, data_generator_type=Forecaster, ) @@ -1780,8 +1792,8 @@ def get_forecast(self, id: int, uuid: str, sensor: Sensor, job_id: str): data_source = get_data_source_for_job(job, type="forecasting") forecasts = sensor.search_beliefs( - event_starts_after=job.meta.get("start_predict_date"), - event_ends_before=job.meta.get("end_date"), + event_starts_after=job.meta.get("start"), + event_ends_before=job.meta.get("end"), source=data_source, most_recent_beliefs_only=True, use_latest_version_per_event=True, diff --git a/flexmeasures/api/v3_0/tests/test_forecasting_api.py b/flexmeasures/api/v3_0/tests/test_forecasting_api.py index d3e4918b36..cc7a9afb92 100644 --- a/flexmeasures/api/v3_0/tests/test_forecasting_api.py +++ b/flexmeasures/api/v3_0/tests/test_forecasting_api.py @@ -2,9 +2,7 @@ import isodate import pytest from flask import url_for -from flexmeasures.data.services.scheduling import ( - get_data_source_for_job, -) + from rq.job import Job from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.api.tests.utils import get_auth_token @@ -35,18 +33,21 @@ def test_trigger_and_fetch_forecasts( # Trigger job payload = { - "start-date": "2025-01-01T00:00:00+00:00", - "start-predict-date": "2025-01-05T00:00:00+00:00", - "end-date": "2025-01-05T02:00:00+00:00", + "start": "2025-01-05T00:00:00+00:00", + "end": "2025-01-05T02:00:00+00:00", "max-forecast-horizon": "PT1H", - "retrain-frequency": "PT1H", + "forecast-frequency": "PT1H", + "config": { + "train-start": "2025-01-01T00:00:00+00:00", + "retrain-frequency": "PT1H", + }, } trigger_url = url_for("SensorAPI:trigger_forecast", id=sensor_0.id) trigger_res = client.post( trigger_url, json=payload, headers={"Authorization": token} ) - assert trigger_res.status_code == 200 + assert trigger_res.status_code == 200, trigger_res.json trigger_json = trigger_res.get_json() wrap_up_job = app.queues["forecasting"].fetch_job(trigger_json["forecast"]) @@ -77,7 +78,7 @@ def test_trigger_and_fetch_forecasts( payload["sensor"] = sensor_1.id # Run pipeline manually to compute expected forecasts - pipeline = TrainPredictPipeline() + pipeline = TrainPredictPipeline(config=payload.pop("config", {})) pipeline.compute(parameters=payload) # Fetch forecasts for each job @@ -85,7 +86,7 @@ def test_trigger_and_fetch_forecasts( fetch_url = url_for("SensorAPI:get_forecast", id=sensor_0.id, uuid=job_id) res = client.get(fetch_url, headers={"Authorization": token}) - assert res.status_code == 200 + assert res.status_code == 200, res.json data = res.get_json() @@ -103,14 +104,11 @@ def test_trigger_and_fetch_forecasts( assert isinstance(api_forecasts, list) assert len(api_forecasts) > 0 - # Identify which data source wrote these beliefs - data_source = get_data_source_for_job(job, type="forecasting") - # Load only the latest belief per event_start forecasts_df = sensor_1.search_beliefs( - event_starts_after=job.meta.get("start_predict_date"), - event_ends_before=job.meta.get("end_date") + sensor_1.event_resolution, - source=data_source, + event_starts_after=job.meta.get("start"), + event_ends_before=job.meta.get("end"), + source_types=["forecaster"], most_recent_beliefs_only=True, use_latest_version_per_event=True, ).reset_index() diff --git a/flexmeasures/data/models/forecasting/__init__.py b/flexmeasures/data/models/forecasting/__init__.py index 556b2c6004..5aa7683d5f 100644 --- a/flexmeasures/data/models/forecasting/__init__.py +++ b/flexmeasures/data/models/forecasting/__init__.py @@ -141,7 +141,8 @@ def _clean_parameters(self, parameters: dict) -> dict: "output-path", "sensor-to-save", "as-job", - "n_cycles", # Computed internally, still uses snake_case + "m_viewpoints", # Computed internally, still uses snake_case + "sensor", ] for field in fields_to_remove: diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 8973b0e265..7da3a98ffd 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -34,7 +34,7 @@ def __init__( config: dict | None = None, delete_model: bool = False, save_config: bool = True, - save_parameters: bool = True, + save_parameters: bool = False, ): super().__init__( config=config, save_config=save_config, save_parameters=save_parameters @@ -72,11 +72,13 @@ def run_cycle( train_pipeline = TrainPipeline( future_regressors=self._config["future_regressors"], past_regressors=self._config["past_regressors"], - target_sensor=self._parameters["target"], + target_sensor=self._parameters["sensor"], model_save_dir=self._parameters["model_save_dir"], - n_steps_to_predict=self._parameters["train_period_in_hours"] * multiplier, + n_steps_to_predict=(predict_start - train_start) + // timedelta(hours=1) + * multiplier, max_forecast_horizon=self._parameters["max_forecast_horizon"] - // self._parameters["target"].event_resolution, + // self._parameters["sensor"].event_resolution, event_starts_after=train_start, event_ends_before=train_end, probabilistic=self._parameters["probabilistic"], @@ -95,24 +97,24 @@ def run_cycle( predict_pipeline = PredictPipeline( future_regressors=self._config["future_regressors"], past_regressors=self._config["past_regressors"], - target_sensor=self._parameters["target"], + target_sensor=self._parameters["sensor"], model_path=os.path.join( self._parameters["model_save_dir"], - f"sensor_{self._parameters['target'].id}-cycle_{counter}-lgbm.pkl", + f"sensor_{self._parameters['sensor'].id}-cycle_{counter}-lgbm.pkl", ), output_path=( os.path.join( self._parameters["output_path"], - f"sensor_{self._parameters['target'].id}-cycle_{counter}.csv", + f"sensor_{self._parameters['sensor'].id}-cycle_{counter}.csv", ) if self._parameters["output_path"] else None ), n_steps_to_predict=self._parameters["predict_period_in_hours"] * multiplier, max_forecast_horizon=self._parameters["max_forecast_horizon"] - // self._parameters["target"].event_resolution, + // self._parameters["sensor"].event_resolution, forecast_frequency=self._parameters["forecast_frequency"] - // self._parameters["target"].event_resolution, + // self._parameters["sensor"].event_resolution, probabilistic=self._parameters["probabilistic"], event_starts_after=train_start, # use beliefs about events before the start of the predict period event_ends_before=predict_end, # ignore any beliefs about events beyond the end of the predict period @@ -140,7 +142,7 @@ def run_cycle( f"{p.ordinal(counter)} Train-Predict cycle from {train_start} to {predict_end} completed in {total_runtime:.2f} seconds." ) self.return_values.append( - {"data": forecasts, "sensor": self._parameters["target"]} + {"data": forecasts, "sensor": self._parameters["sensor"]} ) return total_runtime @@ -148,6 +150,46 @@ def _compute_forecast(self, as_job: bool = False, **kwargs) -> list[dict[str, An # Run the train-and-predict pipeline return self.run(as_job=as_job, **kwargs) + def _derive_training_period(self) -> tuple[datetime, datetime]: + """Derive the effective training period for model fitting. + + The training period ends at ``predict_start`` and starts at the + most restrictive (latest) of the following: + + - The configured ``start_date`` (if any) + - ``predict_start - train_period_in_hours`` (if configured) + - ``predict_start - max_training_period`` (always enforced) + + Additionally, the resulting training window is guaranteed to span + at least two days. + + :return: A tuple ``(train_start, train_end)`` defining the training window. + """ + train_end = self._parameters["predict_start"] + + configured_start: datetime | None = self._config.get("train_start") + period_hours: int | None = self._config.get("train_period_in_hours") + + candidates: list[datetime] = [] + + if configured_start is not None: + candidates.append(configured_start) + + if period_hours is not None: + candidates.append(train_end - timedelta(hours=period_hours)) + + # Always enforce maximum training period + candidates.append(train_end - self._config["max_training_period"]) + + train_start = max(candidates) + + # Enforce minimum training period of 2 days + min_training_period = timedelta(days=2) + if train_end - train_start < min_training_period: + train_start = train_end - min_training_period + + return train_start, train_end + def run( self, as_job: bool = False, @@ -157,33 +199,44 @@ def run( logging.info( f"Starting Train-Predict Pipeline to predict for {self._parameters['predict_period_in_hours']} hours." ) + # How much to move forward to the next cycle one prediction period later + cycle_frequency = max( + self._config["retrain_frequency"], + self._parameters["forecast_frequency"], + ) predict_start = self._parameters["predict_start"] - predict_end = predict_start + timedelta( - hours=self._parameters["predict_period_in_hours"] - ) - train_start = predict_start - timedelta( - hours=self._parameters["train_period_in_hours"] - ) - train_end = predict_start - counter = 0 + predict_end = predict_start + cycle_frequency + + # Determine training window (start, end) + train_start, train_end = self._derive_training_period() - sensor_resolution = self._parameters["target"].event_resolution + sensor_resolution = self._parameters["sensor"].event_resolution multiplier = int( timedelta(hours=1) / sensor_resolution - ) # multiplier used to adapt n_steps_to_predict to hours from sensor resolution, e.g. 15 min sensor resolution will have 7*24*4 = 168 predicitons to predict a week + ) # multiplier used to adapt n_steps_to_predict to hours from sensor resolution, e.g. 15 min sensor resolution will have 7*24*4 = 168 predictions to predict a week + + # Compute number of training cycles (at least 1) + n_cycles = max( + timedelta(hours=self._parameters["predict_period_in_hours"]) + // max( + self._config["retrain_frequency"], + self._parameters["forecast_frequency"], + ), + 1, + ) cumulative_cycles_runtime = 0 # To track the cumulative runtime of TrainPredictPipeline cycles when not running as a job. cycles_job_params = [] - while predict_end <= self._parameters["end_date"]: - counter += 1 + for counter in range(n_cycles): + predict_end = min(predict_end, self._parameters["end_date"]) train_predict_params = { "train_start": train_start, "train_end": train_end, "predict_start": predict_start, "predict_end": predict_end, - "counter": counter, + "counter": counter + 1, "multiplier": multiplier, } @@ -191,36 +244,29 @@ def run( cycle_runtime = self.run_cycle(**train_predict_params) cumulative_cycles_runtime += cycle_runtime else: - train_predict_params["target_sensor_id"] = self._parameters["target"].id + train_predict_params["target_sensor_id"] = self._parameters["sensor"].id cycles_job_params.append(train_predict_params) - # Move forward to the next cycle one prediction period later - cycle_frequency = timedelta( - hours=self._parameters["predict_period_in_hours"] - ) train_end += cycle_frequency predict_start += cycle_frequency predict_end += cycle_frequency - if counter == 0: - logging.info( - f"Train-Predict Pipeline Not Run: start-predict-date + predict-period is {predict_end}, which exceeds end-date {self._parameters['end_date']}. " - f"Try decreasing the predict-period." - ) - elif not as_job: + if not as_job: logging.info( f"Train-Predict Pipeline completed successfully in {cumulative_cycles_runtime:.2f} seconds." ) if as_job: cycle_job_ids = [] + + # job metadata for tracking + job_metadata = { + "data_source_info": {"id": self.data_source.id}, + "start": self._parameters["predict_start"], + "end": self._parameters["end_date"], + "sensor_id": self._parameters["sensor_to_save"].id, + } for cycle_params in cycles_job_params: - # job metadata for tracking - job_metadata = { - "data_source_info": {"id": self.data_source.id}, - "start_predict_date": self._parameters["predict_start"], - "end_date": self._parameters["end_date"], - "sensor_id": self._parameters["sensor_to_save"].id, - } + job = Job.create( self.run_cycle, # Some cycle job params override job kwargs @@ -245,7 +291,7 @@ def run( current_app.queues[queue].enqueue_job(job) current_app.job_cache.add( - self._parameters["target"].id, + self._parameters["sensor"].id, job_id=job.id, queue=queue, asset_or_sensor_type="sensor", @@ -270,6 +316,6 @@ def run( return wrap_up_job.id else: # Return the single cycle job ID if only one job is queued - return cycle_job_ids[0] + return cycle_job_ids[0] if len(cycle_job_ids) == 1 else wrap_up_job.id return self.return_values diff --git a/flexmeasures/data/schemas/forecasting/pipeline.py b/flexmeasures/data/schemas/forecasting/pipeline.py index 1b66a4173a..c60178c933 100644 --- a/flexmeasures/data/schemas/forecasting/pipeline.py +++ b/flexmeasures/data/schemas/forecasting/pipeline.py @@ -4,7 +4,6 @@ import os from datetime import timedelta -from flask import current_app from isodate.duration import Duration from marshmallow import ( @@ -17,7 +16,11 @@ ) from flexmeasures.data.schemas import SensorIdField -from flexmeasures.data.schemas.times import AwareDateTimeOrDateField, DurationField +from flexmeasures.data.schemas.times import ( + AwareDateTimeOrDateField, + DurationField, + PlanningDurationField, +) from flexmeasures.data.models.forecasting.utils import floor_to_resolution from flexmeasures.utils.time_utils import server_now @@ -94,6 +97,84 @@ class TrainPredictPipelineConfigSchema(Schema): }, }, ) + train_start = AwareDateTimeOrDateField( + data_key="train-start", + required=False, + allow_none=True, + metadata={ + "description": "Timestamp marking the start of training data. Defaults to train_period before start if not set.", + "example": "2025-01-01T00:00:00+01:00", + "cli": { + "cli-exclusive": True, + "option": "--train-start", + "aliases": ["--start-date", "--train-start"], + }, + }, + ) + train_period = DurationField( + data_key="train-period", + load_default=timedelta(days=30), + allow_none=True, + metadata={ + "description": "Duration of the initial training period (ISO 8601 format, min 2 days). If not set, derived from train_start and start if not set or defaults to P30D (30 days).", + "example": "P7D", + "cli": { + "cli-exclusive": True, + "option": "--train-period", + }, + }, + ) + max_training_period = DurationField( + data_key="max-training-period", + load_default=timedelta(days=365), + allow_none=True, + metadata={ + "description": "Maximum duration of the training period. Defaults to 1 year (P1Y).", + "example": "P1Y", + "cli": { + "cli-exclusive": True, + "option": "--max-training-period", + }, + }, + ) + retrain_frequency = DurationField( + data_key="retrain-frequency", + load_default=PlanningDurationField.load_default, + allow_none=True, + metadata={ + "description": "Frequency of retraining/prediction cycle (ISO 8601 duration). Defaults to prediction window length if not set.", + "example": "PT24H", + "cli": { + "cli-exclusive": True, + "option": "--retrain-frequency", + }, + }, + ) + + @validates_schema + def validate_parameters(self, data: dict, **kwargs): # noqa: C901 + if data["retrain_frequency"] < timedelta(hours=1): + raise ValidationError( + "retrain-frequency must be at least 1 hour", + field_name="retrain_frequency", + ) + + train_period = data.get("train_period") + max_training_period = data.get("max_training_period") + + if train_period is not None and train_period < timedelta(days=2): + raise ValidationError( + "train-period must be at least 2 days (48 hours)", + field_name="train_period", + ) + + if isinstance(max_training_period, Duration): + # DurationField only returns Duration when years/months are present + raise ValidationError( + "max-training-period must be specified using days or smaller units " + "(e.g. P365D, PT48H). Years and months are not supported.", + field_name="max_training_period", + ) @post_load def resolve_config(self, data: dict, **kwargs) -> dict: # noqa: C901 @@ -110,6 +191,16 @@ def resolve_config(self, data: dict, **kwargs) -> dict: # noqa: C901 data["future_regressors"] = future_regressors data["past_regressors"] = past_regressors + + train_period_in_hours = data["train_period"] // timedelta(hours=1) + max_training_period = data["max_training_period"] + if train_period_in_hours > max_training_period // timedelta(hours=1): + train_period_in_hours = max_training_period // timedelta(hours=1) + logging.warning( + f"train-period is greater than max-training-period ({max_training_period}), setting train-period to max-training-period", + ) + + data["train_period_in_hours"] = train_period_in_hours return data @@ -155,68 +246,42 @@ class ForecasterParametersSchema(Schema): }, }, ) - start_date = AwareDateTimeOrDateField( - data_key="start-date", - required=False, - allow_none=True, - metadata={ - "description": "Timestamp marking the start of training data. Defaults to train_period before start_predict_date if not set.", - "example": "2025-01-01T00:00:00+01:00", - "cli": { - "option": "--start-date", - "aliases": ["--train-start"], + duration = PlanningDurationField( + load_default=PlanningDurationField.load_default, + metadata=dict( + description="The duration for which to create the forecast, in ISO 8601 duration format. Defaults to the planning horizon.", + example="PT24H", + cli={ + "option": "--duration", + "aliases": ["--predict-period"], }, - }, + ), ) - end_date = AwareDateTimeOrDateField( - data_key="end-date", + end = AwareDateTimeOrDateField( + data_key="end", required=False, allow_none=True, inclusive=True, metadata={ - "description": "End date for running the pipeline.", + "description": "End of the last event forecasted. Use either this field or the duration field.", "example": "2025-10-15T00:00:00+01:00", "cli": { - "option": "--end-date", - "aliases": ["--to-date"], - }, - }, - ) - train_period = DurationField( - data_key="train-period", - required=False, - allow_none=True, - metadata={ - "description": "Duration of the initial training period (ISO 8601 format, min 2 days). If not set, derived from start_date and start_predict_date or defaults to P30D (30 days).", - "example": "P7D", - "cli": { - "option": "--train-period", + "cli-exclusive": True, + "option": "--end", + "aliases": ["--end-date", "--to-date"], }, }, ) - start_predict_date = AwareDateTimeOrDateField( - data_key="start-predict-date", + start = AwareDateTimeOrDateField( + data_key="start", required=False, allow_none=True, metadata={ "description": "Start date for predictions. Defaults to now, floored to the sensor resolution, so that the first forecast is about the ongoing event.", "example": "2025-01-08T00:00:00+01:00", "cli": { - "option": "--start-predict-date", - "aliases": ["--from-date"], - }, - }, - ) - retrain_frequency = DurationField( - data_key="retrain-frequency", - required=False, - allow_none=True, - metadata={ - "description": "Frequency of retraining/prediction cycle (ISO 8601 duration). Defaults to prediction window length if not set.", - "example": "PT24H", - "cli": { - "cli-exclusive": True, - "option": "--retrain-frequency", + "option": "--start", + "aliases": ["--start-predict-date", "--from-date"], }, }, ) @@ -228,8 +293,8 @@ class ForecasterParametersSchema(Schema): "description": "Maximum forecast horizon. Defaults to covering the whole prediction period (which itself defaults to 48 hours).", "example": "PT48H", "cli": { + "cli-exclusive": True, "option": "--max-forecast-horizon", - "extra_help": "For example, if you have multiple viewpoints (by having set a `retrain-frequency`), then it is equal to the retrain-frequency by default.", }, }, ) @@ -238,7 +303,7 @@ class ForecasterParametersSchema(Schema): required=False, allow_none=True, metadata={ - "description": "How often to recompute forecasts. Defaults to retrain frequency.", + "description": "How often to recompute forecasts. This setting can be used to get forecasts from multiple viewpoints, which is especially useful for running simulations. Defaults to the max-forecast-horizon.", "example": "PT1H", "cli": { "option": "--forecast-frequency", @@ -269,64 +334,48 @@ class ForecasterParametersSchema(Schema): }, }, ) - max_training_period = DurationField( - data_key="max-training-period", - required=False, - allow_none=True, - metadata={ - "description": "Maximum duration of the training period. Defaults to 1 year (P1Y).", - "example": "P1Y", - "cli": { - "option": "--max-training-period", - }, - }, - ) @pre_load - def drop_none_values(self, data, **kwargs): - return {k: v for k, v in data.items() if v is not None} + def sanitize_input(self, data, **kwargs): + + # Check predict period + if len({"start", "end", "duration"} & data.keys()) > 2: + raise ValidationError( + "Provide 'duration' with either 'start' or 'end', but not with both.", + field_name="duration", + ) + + # Drop None values + data = {k: v for k, v in data.items() if v is not None} + + return data @validates_schema def validate_parameters(self, data: dict, **kwargs): # noqa: C901 - start_date = data.get("start_date") - end_date = data.get("end_date") - predict_start = data.get("start_predict_date", None) - train_period = data.get("train_period") - retrain_frequency = data.get("retrain_frequency") + end_date = data.get("end") + predict_start = data.get("start", None) max_forecast_horizon = data.get("max_forecast_horizon") forecast_frequency = data.get("forecast_frequency") sensor = data.get("sensor") - max_training_period = data.get("max_training_period") - if start_date is not None and end_date is not None and start_date >= end_date: - raise ValidationError( - "start-date must be before end-date", field_name="start_date" - ) + # todo: consider moving this to the run method in train_predict.py + # if train_start is not None and end is not None and train_start >= end_date: + # raise ValidationError( + # "train_start must be before end", field_name="train-start" + # ) if predict_start: - if start_date is not None and predict_start < start_date: - raise ValidationError( - "start-predict-date cannot be before start-date", - field_name="start_predict_date", - ) + # if train_start is not None and predict_start < train_start: + # raise ValidationError( + # "start cannot be before start", + # field_name="start", + # ) if end_date is not None and predict_start >= end_date: raise ValidationError( - "start-predict-date must be before end-date", - field_name="start_predict_date", + "start must be before end", + field_name="start", ) - if train_period is not None and train_period < timedelta(days=2): - raise ValidationError( - "train-period must be at least 2 days (48 hours)", - field_name="train_period", - ) - - if retrain_frequency is not None and retrain_frequency <= timedelta(0): - raise ValidationError( - "retrain-frequency must be greater than 0", - field_name="retrain_frequency", - ) - if max_forecast_horizon is not None: if max_forecast_horizon % sensor.event_resolution != timedelta(0): raise ValidationError( @@ -339,23 +388,21 @@ def validate_parameters(self, data: dict, **kwargs): # noqa: C901 f"forecast-frequency must be a multiple of the sensor resolution ({sensor.event_resolution})" ) - if retrain_frequency is not None and forecast_frequency is not None: - if retrain_frequency % forecast_frequency != timedelta(0): - raise ValidationError( - "retrain-frequency must be a multiple of forecast-frequency", - field_name="retrain_frequency", - ) + @post_load(pass_original=True) + def resolve_config( # noqa: C901 + self, data: dict, original_data: dict | None = None, **kwargs + ) -> dict: + """Resolve timing parameters, using sensible defaults and choices. - if isinstance(max_training_period, Duration): - # DurationField only returns Duration when years/months are present - raise ValidationError( - "max-training-period must be specified using days or smaller units " - "(e.g. P365D, PT48H). Years and months are not supported.", - field_name="max_training_period", - ) + Defaults: + 1. predict-period defaults to minimum of (FM planning horizon and max-forecast-horizon) only if there is a single default viewpoint. + 2. max-forecast-horizon defaults to the predict-period + 3. forecast-frequency defaults to minimum of (FM planning horizon, predict-period, max-forecast-horizon) - @post_load - def resolve_config(self, data: dict, **kwargs) -> dict: # noqa: C901 + Choices: + 1. If max-forecast-horizon < predict-period, we raise a ValidationError due to incomplete coverage + 2. retraining-frequency becomes the maximum of (FM planning horizon and forecast-frequency, this is capped by the predict-period. + """ target_sensor = data["sensor"] @@ -364,89 +411,56 @@ def resolve_config(self, data: dict, **kwargs) -> dict: # noqa: C901 now = server_now() floored_now = floor_to_resolution(now, resolution) - predict_start = data.get("start_predict_date") or floored_now - save_belief_time = ( - now if data.get("start_predict_date") is None else predict_start - ) + if data.get("start") is None: + if original_data.get("duration") and data.get("end") is not None: + predict_start = data["end"] - data["duration"] + else: + predict_start = floored_now + else: + predict_start = data["start"] - if ( - data.get("start_predict_date") is None - and data.get("train_period") - and data.get("start_date") - ): + save_belief_time = now if data.get("start") is None else predict_start - predict_start = data["start_date"] + data["train_period"] - save_belief_time = None + if data.get("end") is None: + data["end"] = predict_start + data["duration"] - if data.get("train_period") is None and data.get("start_date") is None: - train_period_in_hours = 30 * 24 # Set default train_period value to 30 days + predict_period = ( + data["end"] - predict_start if data.get("end") else data["duration"] + ) + forecast_frequency = data.get("forecast_frequency") - elif data.get("train_period") is None and data.get("start_date"): - train_period_in_hours = int( - (predict_start - data["start_date"]).total_seconds() / 3600 - ) - else: - train_period_in_hours = data["train_period"] // timedelta(hours=1) + max_forecast_horizon = data.get("max_forecast_horizon") - if train_period_in_hours < 48: + # Check for inconsistent parameters explicitly set + if ( + "max-forecast-horizon" in original_data + and "duration" in original_data + and max_forecast_horizon < predict_period + ): raise ValidationError( - "train-period must be at least 2 days (48 hours)", - field_name="train_period", - ) - max_training_period = data.get("max_training_period") or timedelta(days=365) - if train_period_in_hours > max_training_period // timedelta(hours=1): - train_period_in_hours = max_training_period // timedelta(hours=1) - logging.warning( - f"train-period is greater than max-training-period ({max_training_period}), setting train-period to max-training-period", + "This combination of parameters will not yield forecasts for the entire prediction window.", + field_name="max_forecast_horizon", ) - if data.get("retrain_frequency") is None and data.get("end_date") is not None: - retrain_frequency_in_hours = int( - (data["end_date"] - predict_start).total_seconds() / 3600 + if max_forecast_horizon is None: + max_forecast_horizon = predict_period + elif max_forecast_horizon > predict_period: + raise ValidationError( + "max-forecast-horizon must be less than or equal to predict-period", + field_name="max_forecast_horizon", ) - elif ( - data.get("retrain_frequency") is None - and data.get("end_date") is None - and data.get("max_forecast_horizon") is not None - ): - retrain_frequency_in_hours = data.get("max_forecast_horizon") // timedelta( - hours=1 + elif max_forecast_horizon < predict_period and forecast_frequency is None: + # Update the default predict-period if the user explicitly set a smaller max-forecast-horizon, + # unless they also set a forecast-frequency explicitly + predict_period = max_forecast_horizon + + if forecast_frequency is None: + forecast_frequency = min( + max_forecast_horizon, + predict_period, ) - elif ( - data.get("retrain_frequency") is None - and data.get("end_date") is None - and data.get("max_forecast_horizon") is None - ): - retrain_frequency_in_hours = current_app.config.get( - "FLEXMEASURES_PLANNING_HORIZON" - ) // timedelta( - hours=1 - ) # Set default retrain_frequency to planning horizon - else: - retrain_frequency_in_hours = data["retrain_frequency"] // timedelta(hours=1) - if retrain_frequency_in_hours < 1: - raise ValidationError("retrain-frequency must be at least 1 hour") - if data.get("end_date") is None: - data["end_date"] = predict_start + timedelta( - hours=retrain_frequency_in_hours - ) - - if data.get("start_date") is None: - start_date = predict_start - timedelta(hours=train_period_in_hours) - else: - start_date = data["start_date"] - - max_forecast_horizon = data.get("max_forecast_horizon") - forecast_frequency = data.get("forecast_frequency") - - if max_forecast_horizon is None and forecast_frequency is None: - max_forecast_horizon = timedelta(hours=retrain_frequency_in_hours) - forecast_frequency = timedelta(hours=retrain_frequency_in_hours) - elif max_forecast_horizon is None: - max_forecast_horizon = forecast_frequency - elif forecast_frequency is None: - forecast_frequency = max_forecast_horizon + predict_period_in_hours = int(predict_period.total_seconds() / 3600) if data.get("sensor_to_save") is None: sensor_to_save = target_sensor @@ -462,28 +476,30 @@ def resolve_config(self, data: dict, **kwargs) -> dict: # noqa: C901 # Read default from schema model_save_dir = self.fields["model_save_dir"].load_default + m_viewpoints = max(predict_period // forecast_frequency, 1) + return dict( - target=target_sensor, + sensor=target_sensor, model_save_dir=model_save_dir, output_path=output_path, - start_date=start_date, - end_date=data["end_date"], - train_period_in_hours=train_period_in_hours, - max_training_period=max_training_period, + end_date=data["end"], predict_start=predict_start, - predict_period_in_hours=retrain_frequency_in_hours, + predict_period_in_hours=predict_period_in_hours, max_forecast_horizon=max_forecast_horizon, forecast_frequency=forecast_frequency, - probabilistic=data["probabilistic"], + probabilistic=data.get("probabilistic"), sensor_to_save=sensor_to_save, save_belief_time=save_belief_time, - n_cycles=int( - (data["end_date"] - predict_start) - // timedelta(hours=retrain_frequency_in_hours) - ), + m_viewpoints=m_viewpoints, ) class ForecastingTriggerSchema(ForecasterParametersSchema): - config = fields.Nested(TrainPredictPipelineConfigSchema(), required=False) + config = fields.Nested( + TrainPredictPipelineConfigSchema(), + required=False, + metadata={ + "description": "Changing any of these will result in a new data source ID." + }, + ) diff --git a/flexmeasures/data/schemas/tests/test_forecasting.py b/flexmeasures/data/schemas/tests/test_forecasting.py index 74f277b55b..ed14afa2f8 100644 --- a/flexmeasures/data/schemas/tests/test_forecasting.py +++ b/flexmeasures/data/schemas/tests/test_forecasting.py @@ -1,5 +1,6 @@ import pytest +from marshmallow import ValidationError import pandas as pd from flexmeasures.data.schemas.forecasting.pipeline import ForecasterParametersSchema @@ -11,83 +12,65 @@ [ # Case 0: no timing parameters are given # - # User expects to get forecasts for the default FM planning horizon from a single viewpoint. + # User expects to get forecasts for the default FM planning horizon from a single viewpoint (server now, floored to the hour). # Specifically, we expect: # - predict-period = FM planning horizon # - max-forecast-horizon = FM planning horizon # - forecast-frequency = FM planning horizon # - (config) retraining-frequency = FM planning horizon # - 1 cycle, 1 belief time + # - training-period = 30 days + ( + {}, + { + "predict-start": pd.Timestamp( + "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" + ).floor("1h"), + # default training period 30 days before predict start + # "start-date": pd.Timestamp( + # "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" + # ).floor("1h") + # - pd.Timedelta(days=30), + # default prediction period 48 hours after predict start + "end-date": pd.Timestamp( + "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" + ).floor("1h") + + pd.Timedelta(hours=48), + # these are set by the schema defaults + "predict-period-in-hours": 48, + "max-forecast-horizon": pd.Timedelta(days=2), + # "train-period-in-hours": 24 * 30, + # "retrain_frequency": 2 * 24, + # "max-training-period": pd.Timedelta(days=365), + "forecast-frequency": pd.Timedelta(days=2), + # the one belief time corresponds to server now + "save-belief-time": pd.Timestamp( + "2025-01-15T12:23:58.387422+01", + tz="Europe/Amsterdam", + ), + "m_viewpoints": 1, + }, + ), # Case 1: predict-period = 12 hours # # User expects to get forecasts for the next 12 hours from a single viewpoint. # Specifically, we expect: - # - max-forecast-horizon = predict-period - # - forecast-frequency = predict-period - # - (config) retraining-frequency = FM planning horizon - # - 1 cycle, 1 belief time - # - # Case 2: max-forecast-horizon = 12 hours - # - # User expects to get forecasts for the next 12 hours from a single viewpoint (same as case 1). - # Specifically, we expect: - # - predict-period = 12 hours - # - forecast-frequency = max-forecast-horizon - # - retraining-period = FM planning horizon - # - 1 cycle, 1 belief time - # - # Case 3: forecast-frequency = 12 hours - # - # User expects to get forecasts for the default FM planning horizon from a new viewpoint every 12 hours. - # Specifically, we expect: - # - predict-period = FM planning horizon - # - max-forecast-horizon = predict-period (actual horizons are 48, 36, 24 and 12) - # - retraining-period = FM planning horizon - # - 1 cycle, 4 belief times - # - # Case 4: (config) retraining-period = 12 hours - # - # User expects to get forecasts for the default FM planning horizon from a new viewpoint every 12 hours (retraining at every viewpoint). - # Specifically, we expect: - # - predict-period = FM planning horizon - # - max-forecast-horizon = predict-period (actual horizons are 48, 36, 24 and 12) - # - forecast-frequency = retraining-period (capped by retraining-period, param changes based on config) - # - 4 cycles, 4 belief times - # Case 5: predict-period = 10 days and max-forecast-horizon = 12 hours - # - # User expects to get forecasts for the next 10 days from a new viewpoint every 12 hours. - # - forecast-frequency = max-forecast-horizon - # - retraining-frequency = FM planning horizon - # - 5 cycles, 20 belief times - # Case 6: predict-period = 12 hours and max-forecast-horizon = 10 days - # - # User expects that FM complains: the max-forecast-horizon should be lower than the predict-period - # - forecast-frequency = predict-period - # - retraining-frequency = FM planning horizon + # - max-forecast-horizon = predict-period = 12 hours + # - forecast-frequency = predict-period = 12 hours + # - (config) retraining-frequency = FM planning horizon, but capped by predict-period, so 12 hours # - 1 cycle, 1 belief time - # Timing parameter defaults - # - predict-period defaults to minimum of (FM planning horizon and max-forecast-horizon) - # - max-forecast-horizon defaults to the predict-period - # - forecast-frequency defaults to minimum of (FM planning horizon, predict-period, max-forecast-horizon and retraining-frequency) - # - retraining-frequency defaults to FM planning horizon - # Timing parameter constraints - # - max-forecast-horizon <= predict-period - # Case 1 user expectation: - # - Get forecasts for next 12 hours from a single viewpoint - # - max-forecast-horizon = 12 hours - # - forecast-frequency = 12 hours - # - 1 cycle + # - training-period = 30 days ( - {"retrain-frequency": "PT12H"}, + {"duration": "PT12H"}, { "predict_start": pd.Timestamp( "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" ).floor("1h"), - "start_date": pd.Timestamp( - "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" - ).floor("1h") - - pd.Timedelta(days=30), - "train_period_in_hours": 720, + # "start_date": pd.Timestamp( + # "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" + # ).floor("1h") + # - pd.Timedelta(days=30), + # "train_period_in_hours": 24 * 30, "predict_period_in_hours": 12, "max_forecast_horizon": pd.Timedelta(hours=12), "forecast_frequency": pd.Timedelta(hours=12), @@ -95,149 +78,100 @@ "2025-01-15T12:00:00+01", tz="Europe/Amsterdam" ) + pd.Timedelta(hours=12), - "max_training_period": pd.Timedelta(days=365), + # "retrain_frequency": 2 * 24, + # "max_training_period": pd.Timedelta(days=365), "save_belief_time": pd.Timestamp( "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" ), - "n_cycles": 1, + "m_viewpoints": 1, }, ), - # Case 2 user expectation: - # - Same behavior as case 1 - # - predict-period = 12 hours - # - forecast-frequency = 12 hours - # - 1 cycle + # Case 2: max-forecast-horizon = 12 hours # here we have issue that predict period is defaulted to 48 hours, but max-forecast-horizon is set to 12 hours, which should be less than or equal to predict-period + # + # User expects to get forecasts for the next 12 hours from a single viewpoint (same as case 1). + # Specifically, we expect: + # - predict-period = 12 hours + # - forecast-frequency = max-forecast-horizon = 12 hours + # - retraining-period = FM planning horizon + # - 1 cycle, 1 belief time + # These expectations are encoded in default 1 of ForecasterParametersSchema.resolve_config ( {"max-forecast-horizon": "PT12H"}, { "predict_start": pd.Timestamp( "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" ).floor("1h"), - "start_date": pd.Timestamp( - "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" - ).floor("1h") - - pd.Timedelta(days=30), - "train_period_in_hours": 720, - "predict_period_in_hours": 12, - "max_forecast_horizon": pd.Timedelta(hours=12), - "forecast_frequency": pd.Timedelta(hours=12), + # "start_date": pd.Timestamp( + # "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" + # ).floor("1h") + # - pd.Timedelta(days=30), "end_date": pd.Timestamp( "2025-01-15T12:00:00+01", tz="Europe/Amsterdam" ) - + pd.Timedelta(hours=12), - "max_training_period": pd.Timedelta(days=365), + + pd.Timedelta(hours=48), + # "train_period_in_hours": 30 * 24, + "predict_period_in_hours": 12, + "max_forecast_horizon": pd.Timedelta(hours=12), + "forecast_frequency": pd.Timedelta(hours=12), + # "retrain_frequency": 2 * 24, + # "max_training_period": pd.Timedelta(days=365), "save_belief_time": pd.Timestamp( "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" ), - "n_cycles": 1, + "m_viewpoints": 1, }, ), - ### - # Case 3 user expectation: - # - Keep default planning horizon prediction window - # - New forecast viewpoint every 12 hours - # - max-forecast-horizon remains at planning horizon (48 hours) - # - 1 cycle, 4 belief times - # this fails - # ( - # {"forecast-frequency": "PT12H"}, - # { - # "predict_start": pd.Timestamp( - # "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" - # ).floor("1h"), - # "start_date": pd.Timestamp( - # "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" - # ).floor("1h") - # - pd.Timedelta(days=30), - # "train_period_in_hours": 720, - # "predict_period_in_hours": 48, - # "max_forecast_horizon": pd.Timedelta(hours=12), - # "forecast_frequency": pd.Timedelta(hours=12), - # "end_date": pd.Timestamp( - # "2025-01-15T12:00:00+01", tz="Europe/Amsterdam" - # ) - # + pd.Timedelta(hours=48), - # "max_training_period": pd.Timedelta(days=365), - # "save_belief_time": pd.Timestamp( - # "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" - # ), - # "n_cycles": 1, - # }, - # ), - ### - # Case 4 user expectation: - # - Default planning horizon predictions, retraining every 12 hours - # - forecast-frequency follows retraining period (12 hours) - # - 4 cycles, 4 belief times + # Case 3: forecast-frequency = 12 hours + # todo: add to description that this should really be used in combination with the predict-start field + # + # User expects to get forecasts for the default FM planning horizon from a new viewpoint every 12 hours. + # Specifically, we expect: + # - predict-period = FM planning horizon + # - max-forecast-horizon = predict-period (actual horizons are 48, 36, 24 and 12) + # - retraining-period = FM planning horizon + # - 1 cycle, 4 belief times ( { - "retrain-frequency": "PT12H", - "end-date": "2025-01-17T12:00:00+01:00", + "start": "2025-01-15T12:00:00+01:00", + "forecast-frequency": "PT12H", }, { "predict_start": pd.Timestamp( - "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" - ).floor("1h"), - "start_date": pd.Timestamp( - "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" - ).floor("1h") - - pd.Timedelta(days=30), - "train_period_in_hours": 720, - "predict_period_in_hours": 12, - "max_forecast_horizon": pd.Timedelta(hours=12), + "2025-01-15T12:00:00.000+01", tz="Europe/Amsterdam" + ), + # "start_date": pd.Timestamp( + # "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" + # ).floor("1h") + # - pd.Timedelta(days=30), + # "train_period_in_hours": 30 * 24, + "predict_period_in_hours": 48, + "max_forecast_horizon": pd.Timedelta(hours=48), "forecast_frequency": pd.Timedelta(hours=12), "end_date": pd.Timestamp( - "2025-01-17T12:00:00+01", tz="Europe/Amsterdam" - ), - "max_training_period": pd.Timedelta(days=365), + "2025-01-15T12:00:00+01", tz="Europe/Amsterdam" + ) + + pd.Timedelta(hours=48), + # "max_training_period": pd.Timedelta(days=365), + # "retrain-frequency": 2 * 24, + # this is the first belief time of the four belief times "save_belief_time": pd.Timestamp( - "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" + "2025-01-15T12:00:00.00+01", tz="Europe/Amsterdam" ), - "n_cycles": 4, + "m_viewpoints": 4, }, ), - ### - # Case 5 user expectation: - # - Predict-period = 10 days - # - max-forecast-horizon = 12 hours - # - forecast-frequency = 12 hours - # - 5 cycles, 20 belief times - # this fails - # ( - # { - # "retrain-frequency": "P10D", - # "max-forecast-horizon": "PT12H", - # }, - # { - # "predict_start": pd.Timestamp( - # "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" - # ).floor("1h"), - # "start_date": pd.Timestamp( - # "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" - # ).floor("1h") - # - pd.Timedelta(days=30), - # "train_period_in_hours": 720, - # "predict_period_in_hours": 240, - # "max_forecast_horizon": pd.Timedelta(hours=12), - # "forecast_frequency": pd.Timedelta(hours=12), - # "end_date": pd.Timestamp( - # "2025-01-15T12:00:00+01", tz="Europe/Amsterdam" - # ) - # + pd.Timedelta(days=10), - # "max_training_period": pd.Timedelta(days=365), - # "save_belief_time": pd.Timestamp( - # "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" - # ), - # "n_cycles": 5, - # }, - # ), - # Case 6 user expectation: - # - FM should complain: max-forecast-horizon must be <= predict-period - # this fails + # Case 4: (config) retraining-period = 12 hours + # + # User expects to get forecasts for the default FM planning horizon from a new viewpoint every 12 hours (retraining at every viewpoint). + # Specifically, we expect: + # - predict-period = FM planning horizon + # - max-forecast-horizon = predict-period (actual horizons are 48, 36, 24 and 12) + # - forecast-frequency = predict-period (NOT capped by retraining-period, no param changes based on config) + # - 1 cycle, 1 belief time # ( # { # "retrain-frequency": "PT12H", - # "max-forecast-horizon": "P10D", + # "end-date": "2025-01-17T12:00:00+01:00", # }, # { # "predict_start": pd.Timestamp( @@ -247,105 +181,118 @@ # "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" # ).floor("1h") # - pd.Timedelta(days=30), - # "train_period_in_hours": 720, - # "predict_period_in_hours": 12, - # "max_forecast_horizon": pd.Timedelta(days=10), - # "forecast_frequency": pd.Timedelta(days=10), + # "train_period_in_hours": 30 * 24, + # "predict_period_in_hours": 48, + # "max_forecast_horizon": pd.Timedelta(hours=48), + # "forecast_frequency": pd.Timedelta(hours=48), # "end_date": pd.Timestamp( - # "2025-01-15T12:00:00+01", tz="Europe/Amsterdam" - # ) - # + pd.Timedelta(hours=12), + # "2025-01-17T12:00:00+01", tz="Europe/Amsterdam" + # ), + # "retrain-frequency": 12, # "max_training_period": pd.Timedelta(days=365), # "save_belief_time": pd.Timestamp( # "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" # ), - # "n_cycles": 1, + # "m_viewpoints": 1, # }, # ), - ### - # We expect training period of 30 days before predict start and prediction period of 48 hours after predict start, with predict start at server now (floored to hour). - # 1 cycle expected (1 belief time for forecast) given the forecast frequency equal defaulted to prediction period of 48 hours. + # Case 5: predict-period = 10 days and max-forecast-horizon = 12 hours + # + # User expects to get a ValidationError for having set parameters that won't give complete coverage of the predict-period. ( - {}, { - "predict-start": pd.Timestamp( - "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" - ).floor("1h"), - # default training period 30 days before predict start - "start-date": pd.Timestamp( - "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" - ).floor("1h") - - pd.Timedelta(days=30), - # default prediction period 48 hours after predict start - "end-date": pd.Timestamp( - "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" - ).floor("1h") - + pd.Timedelta(hours=48), - # these are set by the schema defaults - "predict-period-in-hours": 48, - "max-forecast-horizon": pd.Timedelta(days=2), - "train-period-in-hours": 720, - "max-training-period": pd.Timedelta(days=365), - "forecast-frequency": pd.Timedelta(days=2), - # server now - "save-belief-time": pd.Timestamp( - "2025-01-15T12:23:58.387422+01", - tz="Europe/Amsterdam", - ), - "n_cycles": 1, + "duration": "P10D", + "max-forecast-horizon": "PT12H", }, + ValidationError( + { + "max_forecast_horizon": [ + "This combination of parameters will not yield forecasts for the entire prediction window." + ] + } + ), ), - # Test defaults when only an end date is given - # We expect training period of 30 days before predict start and prediction period of 5 days after predict start, with predict start at server now (floored to hour). - # 1 cycle expected (1 belief time for forecast) given the forecast frequency equal defaulted to prediction period of 5 days. + # Case 6: predict-period = 12 hours and max-forecast-horizon = 10 days + # + # User expects that FM complains: the max-forecast-horizon should be lower than the predict-period + # - forecast-frequency = predict-period + # - retraining-frequency = FM planning horizon + # - 1 cycle, 1 belief time ( - {"end-date": "2025-01-20T12:00:00+01:00"}, + { + "duration": "PT12H", + "max-forecast-horizon": "P10D", + }, + ValidationError( + { + "max_forecast_horizon": [ + "max-forecast-horizon must be less than or equal to predict-period" + ] + } + ), + ), + # Case 7: end-date = almost 5 days after now + # + # User expects to get forecasts for the next 5 days (from server now floored to 1 hour) with a default 30-day training period + # - predict-period = 5 days + # - forecast-frequency = predict-period + # - retraining-frequency = FM planning horizon + # - 1 cycle, 1 belief time + # - training-period = 30 days + ( + {"end": "2025-01-20T12:00:00+01:00"}, { "predict-start": pd.Timestamp( "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam", ).floor("1h"), - "start-date": pd.Timestamp( - "2025-01-15T12:23:58.387422+01", - tz="Europe/Amsterdam", - ).floor("1h") - - pd.Timedelta( - days=30 - ), # default training period 30 days before predict start + # "start-date": pd.Timestamp( + # "2025-01-15T12:23:58.387422+01", + # tz="Europe/Amsterdam", + # ).floor("1h") + # - pd.Timedelta( + # days=30 + # ), # default training period 30 days before predict start "end-date": pd.Timestamp( "2025-01-20T12:00:00+01", tz="Europe/Amsterdam", ), - "train-period-in-hours": 720, # from start date to predict start + # "train-period-in-hours": 30 * 24, # from start date to predict start "predict-period-in-hours": 120, # from predict start to end date "forecast-frequency": pd.Timedelta( days=5 - ), # duration between predict start and end date + ), # default forecast frequency "max-forecast-horizon": pd.Timedelta( days=5 ), # duration between predict start and end date # default values - "max-training-period": pd.Timedelta(days=365), + # "retrain_frequency": 2 * 24, + # "max-training-period": pd.Timedelta(days=365), # server now "save-belief-time": pd.Timestamp( "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam", ), - "n_cycles": 1, + "m_viewpoints": 1, }, ), - # Test when both start and end dates are given - # We expect training period of 26.5 days (636 hours) from the given start date and predict start, prediction period of 108 hours duration from predict start to end date, with predict_start at server now (floored to hour). - # 1 cycle expected (1 belief_time for forecast) given the forecast frequency equal defaulted to prediction period + # Case 8: end-date = almost 4.5 days after now, train-start is 26.5 days before now + # + # User expects to get forecasts for the next 4.5 days (from server now floored to 1 hour) with a custom 636-hour training period + # - predict-period = 108 hours + # - forecast-frequency = predict-period + # - retraining-frequency = FM planning horizon + # - 1 cycle, 1 belief time + # - training-period = 636 hours ( { - "start-date": "2024-12-20T00:00:00+01:00", - "end-date": "2025-01-20T00:00:00+01:00", + # "train-start": "2024-12-20T00:00:00+01:00", + "end": "2025-01-20T00:00:00+01:00", }, { - "start-date": pd.Timestamp( - "2024-12-20T00:00:00+01", tz="Europe/Amsterdam" - ), + # "start-date": pd.Timestamp( + # "2024-12-20T00:00:00+01", tz="Europe/Amsterdam" + # ), "end-date": pd.Timestamp( "2025-01-20T00:00:00+01", tz="Europe/Amsterdam" ), @@ -354,29 +301,34 @@ tz="Europe/Amsterdam", ).floor("1h"), "predict-period-in-hours": 108, # hours from predict start to end date - "train-period-in-hours": 636, # hours between start date and predict start - "max-forecast-horizon": pd.Timedelta(days=4) - + pd.Timedelta(hours=12), # duration between predict start and end date - "forecast-frequency": pd.Timedelta(days=4) - + pd.Timedelta(hours=12), # duration between predict start and end date - # default values - "max-training-period": pd.Timedelta(days=365), + # "train-period-in-hours": 636, # hours between start date and predict start + "max-forecast-horizon": pd.Timedelta(hours=108), + "forecast-frequency": pd.Timedelta(hours=108), + # "retrain_frequency": 2 * 24, + # "max-training-period": pd.Timedelta(days=365), # server now "save-belief-time": pd.Timestamp( "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam", ), - "n_cycles": 1, + "m_viewpoints": 1, }, ), - # Test when only end date is given with a training period - # We expect the start date to be computed with respect to now. (training period before now (floored)). - # We expect training period of 30 days before predict start and prediction period of 48 hours after predict start, with predict start at server now (floored to hour). - # 1 cycle expected (1 belief_time for forecast) given the forecast frequency equal defaulted to prediction period + # Case 9: end-date is given with train-period = 3 days + # + # User expects the start date to be computed from the inferred predict-start and train-period. + # Specifically, we expect: + # - predict-start = server now floored to sensor resolution + # - train-period = 3 days (72 hours) + # - predict-period = 5 days (from predict-start to end-date) + # - max-forecast-horizon = predict-period = 5 days + # - forecast-frequency = predict-period = 5 days + # - retrain-frequency = FM planning horizon + # - 1 cycle, 1 belief time ( { - "end-date": "2025-01-20T12:00:00+01:00", - "train-period": "P3D", + "end": "2025-01-20T12:00:00+01:00", + # "train-period": "P3D", }, { "end-date": pd.Timestamp( @@ -386,77 +338,92 @@ "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam", ).floor("1h"), - "start-date": pd.Timestamp( - "2025-01-15T12:00:00+01", tz="Europe/Amsterdam" - ) - - pd.Timedelta(days=3), - "train-period-in-hours": 72, # from start date to predict start + # "start-date": pd.Timestamp( + # "2025-01-15T12:00:00+01", tz="Europe/Amsterdam" + # ) + # - pd.Timedelta(days=3), + # "train-period-in-hours": 72, # from start date to predict start "predict-period-in-hours": 120, # from predict start to end date "max-forecast-horizon": pd.Timedelta( days=5 ), # duration between predict start and end date - "forecast-frequency": pd.Timedelta( - days=5 - ), # duration between predict start and end date + "forecast-frequency": pd.Timedelta(days=5), # default values - "max-training-period": pd.Timedelta(days=365), + # "retrain_frequency": 2 * 24, + # "max-training-period": pd.Timedelta(days=365), # server now "save-belief-time": pd.Timestamp( "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam", ), - "n_cycles": 1, - }, - ), - # Test when only start date is given with a training period - # We expect the predict start to be computed with respect to the start date (training period after start date). - # We set training period of 3 days, we expect a prediction period to default 48 hours after predict start, with predict start at server now (floored to hour). - # 1 cycle expected (1 belief_time for forecast) given the forecast frequency equal defaulted to prediction period - ( - { - "start-date": "2024-12-25T00:00:00+01:00", - "train-period": "P3D", - }, - { - "start-date": pd.Timestamp( - "2024-12-25T00:00:00+01", tz="Europe/Amsterdam" - ), - "predict-start": pd.Timestamp( - "2024-12-25T00:00:00+01", tz="Europe/Amsterdam" - ) - + pd.Timedelta(days=3), - "end-date": pd.Timestamp( - "2024-12-28T00:00:00+01", tz="Europe/Amsterdam" - ) - + pd.Timedelta(days=2), - "train-period-in-hours": 72, - "max-forecast-horizon": pd.Timedelta( - days=2 - ), # duration between predict start and end date - "forecast-frequency": pd.Timedelta( - days=2 - ), # duration between predict start and end date - # default values - "predict-period-in-hours": 48, - "max-training-period": pd.Timedelta(days=365), - # the belief time of the forecasts will be calculated from start-predict-date and max-forecast-horizon and forecast-frequency - "save-belief-time": None, - "n_cycles": 1, + "m_viewpoints": 1, }, ), - # Test when only start date is given with a retrain frequency (prediction period) - # We expect the predict start to be computed with respect to the start date (training period after start date). - # We set training period of 3 days, we expect a prediction period to default 48 hours after predict start, with predict start at server now (floored to hour). - # 1 cycle expected (1 belief_time for forecast) given the forecast frequency equal defaulted to prediction period + # Case 10: train-start is given with train-period = 3 days + # + # User expects predict-start to be derived from train-start + train-period. + # Specifically, we expect: + # - predict-start = train-start + 3 days + # - predict-period = FM planning horizon (48 hours) + # - end-date = predict-start + 48 hours + # - max-forecast-horizon = predict-period = 48 hours + # - forecast-frequency = predict-period = 48 hours + # - retrain-frequency = FM planning horizon + # - 1 cycle, 1 belief time + # ( + # { + # # "train-start": "2024-12-25T00:00:00+01:00", + # # "train-period": "P3D", + # }, + # { + # # "train-start": pd.Timestamp( + # # "2024-12-25T00:00:00+01", tz="Europe/Amsterdam" + # # ), + # "predict-start": pd.Timestamp( + # "2024-12-25T00:00:00+01", tz="Europe/Amsterdam" + # ) + # + pd.Timedelta(days=3), + # "end-date": pd.Timestamp( + # "2024-12-28T00:00:00+01", tz="Europe/Amsterdam" + # ) + # + pd.Timedelta(days=2), + # # "train-period-in-hours": 72, + # "max-forecast-horizon": pd.Timedelta( + # days=2 + # ), # duration between predict start and end date + # "forecast-frequency": pd.Timedelta( + # days=2 + # ), # duration between predict start and end date + # # default values + # "predict-period-in-hours": 48, + # # "retrain_frequency": 2 * 24, + # # "max-training-period": pd.Timedelta(days=365), + # # the belief time of the forecasts will be calculated from start and max-forecast-horizon and forecast-frequency + # "save-belief-time": None, + # "m_viewpoints": 1, + # }, + # ), + # Case 11: train-start is given with predict-period duration = 3 days + # + # User expects predict-start to remain based on server now (no train-period given). + # Specifically, we expect: + # - predict-start = server now floored to sensor resolution + # - predict-period = 3 days + # - end-date = predict-start + 3 days + # - train-period derived from train-start to predict-start + # - max-forecast-horizon = predict-period = 3 days + # - forecast-frequency = predict-period = 3 days + # - retrain-frequency = FM planning horizon + # - 1 cycle, 1 belief time ( { - "start-date": "2024-12-25T00:00:00+01:00", - "retrain-frequency": "P3D", + # "train-start": "2024-12-25T00:00:00+01:00", + "duration": "P3D", }, { - "start-date": pd.Timestamp( - "2024-12-25T00:00:00+01", tz="Europe/Amsterdam" - ), + # "start-date": pd.Timestamp( + # "2024-12-25T00:00:00+01", tz="Europe/Amsterdam" + # ), "predict-start": pd.Timestamp( "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam", @@ -466,89 +433,148 @@ ) + pd.Timedelta(days=3), "predict-period-in-hours": 72, - "train-period-in-hours": 516, # from start-date to predict-start + # "train-period-in-hours": 516, # from train-start to predict-start "max-forecast-horizon": pd.Timedelta( days=3 ), # duration between predict-start and end-date - "forecast-frequency": pd.Timedelta( - days=3 - ), # duration between predict-start and end-date + "forecast-frequency": pd.Timedelta(days=3), # default values - "max-training-period": pd.Timedelta(days=365), + # "retrain_frequency": 2 * 24, + # "max-training-period": pd.Timedelta(days=365), # server now "save-belief-time": pd.Timestamp( "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam", ), - "n_cycles": 1, - }, - ), - # Test when only start date is given with both training period 20 days and retrain frequency 3 days - # We expect the predict start to be computed with respect to the start date (training period after start date). - # 1 cycle expected (1 belief_time for forecast) given the forecast frequency equal defaulted to prediction period - ( - { - "start-date": "2024-12-01T00:00:00+01:00", - "train-period": "P20D", - "retrain-frequency": "P3D", - }, - { - "start-date": pd.Timestamp( - "2024-12-01T00:00:00+01", tz="Europe/Amsterdam" - ), - "predict-start": pd.Timestamp( - "2024-12-01T00:00:00+01", tz="Europe/Amsterdam" - ) - + pd.Timedelta(days=20), - "end-date": pd.Timestamp( - "2024-12-01T00:00:00+01", tz="Europe/Amsterdam" - ) - + pd.Timedelta(days=23), - "train-period-in-hours": 480, - "predict-period-in-hours": 72, - "max-forecast-horizon": pd.Timedelta(days=3), # predict period duration - "forecast-frequency": pd.Timedelta(days=3), # predict period duration - # default values - "max-training-period": pd.Timedelta(days=365), - # the belief time of the forecasts will be calculated from start-predict-date and max-forecast-horizon and forecast-frequency - "save-belief-time": None, + "m_viewpoints": 1, }, ), - # Test when only end date is given with a prediction period: we expect the train start and predict start to both be computed with respect to the end date. - # we expect training period of 30 days before predict_start and prediction period of 3 days after predict_start, with predict_start at server now (floored to hour). - # we expect 2 cycles from the retrain frequency and predict period given the end date + # Case 12: train-start is given with train-period = 20 days and duration = 3 days + # + # User expects both predict-start and end-date to be derived from train-start. + # Specifically, we expect: + # - predict-start = train-start + 20 days + # - predict-period = 3 days + # - end-date = train-start + 23 days + # - max-forecast-horizon = predict-period = 3 days + # - forecast-frequency = predict-period = 3 days + # - retrain-frequency = FM planning horizon + # - 1 cycle, 1 belief time + # ( + # { + # # "train-start": "2024-12-01T00:00:00+01:00", + # # "train-period": "P20D", + # "duration": "P3D", + # }, + # { + # # "start-date": pd.Timestamp( + # # "2024-12-01T00:00:00+01", tz="Europe/Amsterdam" + # # ), + # "predict-start": pd.Timestamp( + # "2024-12-01T00:00:00+01", tz="Europe/Amsterdam" + # ) + # + pd.Timedelta(days=20), + # "end-date": pd.Timestamp( + # "2024-12-01T00:00:00+01", tz="Europe/Amsterdam" + # ) + # + pd.Timedelta(days=23), + # # "train-period-in-hours": 480, + # "predict-period-in-hours": 72, + # # defaults to prediction period (duration) + # "max-forecast-horizon": pd.Timedelta(days=3), + # "forecast-frequency": pd.Timedelta(days=3), + # # default values + # # "retrain_frequency": 2 * 24, + # # "max-training-period": pd.Timedelta(days=365), + # # the belief time of the forecasts will be calculated from start and max-forecast-horizon and forecast-frequency + # "save-belief-time": None, + # }, + # ), + # Case 13: only end is given with retrain-frequency = 3 days + # + # User expects train start and predict start to be derived from end-date and defaults. + # Specifically, we expect: + # - predict-start = end-date - default duration (FM planning horizon) + # - train-period = default 30 days + # - train-start = predict-start - 30 days + # - predict-period = 6 days + # - max-forecast-horizon = predict-period = 6 days + # - forecast-frequency = predict-period = 6 days + # - retrain-frequency = 3 days (explicit) + # - 1 cycle, 1 belief time + # ( + # { + # "end-date": "2025-01-21T12:00:00+01:00", + # "retrain-frequency": "P3D", # only comes into play if forecast-frequency is lower than retrain-frequency, which here it is not + # }, + # { + # "end-date": pd.Timestamp( + # "2025-01-21T12:00:00+01", tz="Europe/Amsterdam" + # ), + # "predict-start": pd.Timestamp( + # "2025-01-15T12:00:00+01", tz="Europe/Amsterdam" + # ), + # "start-date": pd.Timestamp( + # "2025-01-15T12:00:00+01", tz="Europe/Amsterdam" + # ) + # - pd.Timedelta(days=30), + # "predict-period-in-hours": 144, # from predict start to end date + # "train-period-in-hours": 30 * 24, + # "max-forecast-horizon": pd.Timedelta( + # days=6 + # ), # duration between predict start and end date + # "forecast-frequency": pd.Timedelta(hours=144), + # # default values + # "max-training-period": pd.Timedelta(days=365), + # "retrain-frequency": 3 * 24, + # # server now + # "save-belief-time": pd.Timestamp( + # "2025-01-15T12:23:58.387422+01", + # tz="Europe/Amsterdam", + # ), + # "m_viewpoints": 1, # we expect 1 cycle from the forecast-frequency defaulting to the predict-period + # }, + # ), + # Case 14: forecast-frequency = 5 days, predict-period = 10 days + # + # User expects to get forecasts for 10 days from two unique viewpoints 5 days apart. + # Specifically, we expect: + # - predict-period = 10 days + # - max-forecast-horizon = predict-period (actual horizons are 10 days and 5 days) + # - forecast-frequency = 5 days + # - retrain-frequency = FM planning horizon + # - 2 cycles, 2 belief times ( { - "end-date": "2025-01-21T12:00:00+01:00", - "retrain-frequency": "P3D", + "duration": "P10D", + "forecast-frequency": "P5D", }, { "end-date": pd.Timestamp( - "2025-01-21T12:00:00+01", tz="Europe/Amsterdam" + "2025-01-25T12:00:00+01", tz="Europe/Amsterdam" ), "predict-start": pd.Timestamp( "2025-01-15T12:00:00+01", tz="Europe/Amsterdam" ), - "start-date": pd.Timestamp( - "2025-01-15T12:00:00+01", tz="Europe/Amsterdam" - ) - - pd.Timedelta(days=30), - "predict-period-in-hours": 72, - "train-period-in-hours": 720, + # "start-date": pd.Timestamp( + # "2025-01-15T12:00:00+01", tz="Europe/Amsterdam" + # ) + # - pd.Timedelta(days=30), + "predict-period-in-hours": 240, # from predict start to end date + # "train-period-in-hours": 30 * 24, "max-forecast-horizon": pd.Timedelta( - days=3 - ), # duration between predict start and end date (retrain frequency) - "forecast-frequency": pd.Timedelta( - days=3 - ), # duration between predict start and end date (retrain frequency) + days=10 + ), # duration between predict start and end date + "forecast-frequency": pd.Timedelta(hours=120), # default values - "max-training-period": pd.Timedelta(days=365), + # "max-training-period": pd.Timedelta(days=365), + # "retrain-frequency": 2 * 24, # server now "save-belief-time": pd.Timestamp( "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam", ), - "n_cycles": 2, # we expect 2 cycles from the retrain frequency and predict period given the end date + "m_viewpoints": 2, # we expect 2 cycles from the retrain frequency and predict period given the end date }, ), ], @@ -560,14 +586,24 @@ def test_timing_parameters_of_forecaster_parameters_schema( pd.Timestamp("2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam") ) + if isinstance(expected_timing_output, ValidationError): + with pytest.raises(ValidationError) as exc: + ForecasterParametersSchema().load( + { + "sensor": 1, + **timing_input, + } + ) + assert exc.value.messages == expected_timing_output.messages + return data = ForecasterParametersSchema().load( { "sensor": 1, **timing_input, } ) - + # breakpoint() for k, v in expected_timing_output.items(): # Convert kebab-case key to snake_case to match data dictionary keys returned by schema snake_key = kebab_to_snake(k) - assert data[snake_key] == v + assert data[snake_key] == v, f"{k} did not match expectations." diff --git a/flexmeasures/data/tests/test_train_predict_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py similarity index 80% rename from flexmeasures/data/tests/test_train_predict_pipeline.py rename to flexmeasures/data/tests/test_forecasting_pipeline.py index 629092e947..348eff7f9a 100644 --- a/flexmeasures/data/tests/test_train_predict_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -20,38 +20,39 @@ ( { # "model": "CustomLGBM", + "train-start": "2025-01-01T00:00+02:00", + "train-period": "P2D", + "retrain-frequency": "P0D", # 0 days is expected to fail }, { "sensor": "solar-sensor", "model-save-dir": "flexmeasures/data/models/forecasting/artifacts/models", "output-path": None, - "start-date": "2025-01-01T00:00+02:00", - "end-date": "2025-01-03T00:00+02:00", - "train-period": "P2D", + "end": "2025-01-03T00:00+02:00", "sensor-to-save": None, - "start-predict-date": "2025-01-02T00:00+02:00", - "retrain-frequency": "P0D", # 0 days is expected to fail + "start": "2025-01-02T00:00+02:00", "max-forecast-horizon": "PT1H", "forecast-frequency": "PT1H", "probabilistic": False, }, False, - (ValidationError, "retrain-frequency must be greater than 0"), + (ValidationError, "retrain-frequency must be at least 1 hour"), ), ( { # "model": "CustomLGBM", "future-regressors": ["irradiance-sensor"], + "train-start": "2025-01-01T00:00+02:00", }, { "sensor": "solar-sensor", "model-save-dir": "flexmeasures/data/models/forecasting/artifacts/models", "output-path": None, - "start-date": "2025-01-01T00:00+02:00", - "start-predict-date": "2025-01-08T00:00+02:00", # start-predict-date coincides with end of available data in sensor - "end-date": "2025-01-09T00:00+02:00", + "start": "2025-01-08T00:00+02:00", # start coincides with end of available data in sensor + "end": "2025-01-09T00:00+02:00", "sensor-to-save": None, "max-forecast-horizon": "PT1H", + "forecast-frequency": "PT24H", # 1 cycle and 1 viewpoint "probabilistic": False, }, True, @@ -61,17 +62,18 @@ { # "model": "CustomLGBM", "future-regressors": ["irradiance-sensor"], + # "train-start": "2025-01-01T00:00+02:00", # without a start date, max-training-period takes over + "max-training-period": "P7D", }, { "sensor": "solar-sensor", "model-save-dir": "flexmeasures/data/models/forecasting/artifacts/models", "output-path": None, - # "start-date": "2025-01-01T00:00+02:00", # without a start date, max-training-period takes over - "max-training-period": "P7D", - "start-predict-date": "2025-01-08T00:00+02:00", # start-predict-date coincides with end of available data in sensor - "end-date": "2025-01-09T00:00+02:00", + "start": "2025-01-08T00:00+02:00", # start coincides with end of available data in sensor + "end": "2025-01-09T00:00+02:00", "sensor-to-save": None, "max-forecast-horizon": "PT1H", + "forecast-frequency": "PT24H", # 1 cycle and 1 viewpoint "probabilistic": False, }, False, @@ -82,16 +84,17 @@ # "model": "CustomLGBM", "past-regressors": ["irradiance-sensor"], "future-regressors": ["irradiance-sensor"], + "train-start": "2025-01-01T00:00+02:00", }, { # Test: duplicate sensor names in past and future regressors "sensor": "solar-sensor", "model-save-dir": "flexmeasures/data/models/forecasting/artifacts/models", "output-path": None, - "start-date": "2025-01-01T00:00+02:00", - "start-predict-date": "2025-01-04T00:00+02:00", - "end-date": "2025-01-09T00:00+02:00", + "start": "2025-01-08T00:00+02:00", + "end": "2025-01-09T00:00+02:00", "sensor-to-save": None, "max-forecast-horizon": "PT1H", + "forecast-frequency": "PT24H", "probabilistic": False, }, False, @@ -101,42 +104,24 @@ { # "model": "CustomLGBM", "future-regressors": ["irradiance-sensor"], + "retrain-frequency": "P1D", + "train-start": "2025-01-01T00:00+02:00", + "train-period": "P2D", }, { "sensor": "solar-sensor", "model-save-dir": "flexmeasures/data/models/forecasting/artifacts/models", "output-path": None, - "start-date": "2025-01-01T00:00+02:00", - "end-date": "2025-01-03T00:00+02:00", - "train-period": "P2D", + "end": "2025-01-03T00:00+02:00", "sensor-to-save": None, - "start-predict-date": "2025-01-02T00:00+02:00", - "retrain-frequency": "P1D", + "start": "2025-01-02T00:00+02:00", "max-forecast-horizon": "PT1H", - "forecast-frequency": "PT1H", + "forecast-frequency": "PT24H", "probabilistic": False, }, False, None, ), - # ( - # {}, - # { - # "sensor": "solar-sensor", - # "model-save-dir": "flexmeasures/data/models/forecasting/artifacts/models", - # "output-path": None, - # "start-date": "2025-07-01T00:00+02:00", - # "end-date": "2025-07-12T00:00+02:00", - # "sensor-to-save": 1, - # "start-predict-date": "2025-07-11T17:26+02:00", - # "retrain-frequency": "PT24H", - # "max-forecast-horizon": 24, - # "forecast-frequency": 1, - # "probabilistic": False, - # }, - # False, - # (ValidationError, "Try increasing the --end-date."), - # ) ], ) def test_train_predict_pipeline( # noqa: C901 @@ -193,18 +178,18 @@ def test_train_predict_pipeline( # noqa: C901 forecasts = sensor.search_beliefs(source_types=["forecaster"]) dg_params = pipeline._parameters # parameters stored in the data generator - n_cycles = (dg_params["end_date"] - dg_params["predict_start"]) / ( + m_viewpoints = (dg_params["end_date"] - dg_params["predict_start"]) / ( dg_params["forecast_frequency"] ) # 1 hour of forecasts is saved over 4 15-minute resolution events - n_events_per_horizon = timedelta(hours=1) / dg_params["target"].event_resolution + n_events_per_horizon = timedelta(hours=1) / dg_params["sensor"].event_resolution n_hourly_horizons = dg_params["max_forecast_horizon"] // timedelta(hours=1) assert ( - len(forecasts) == n_cycles * n_hourly_horizons * n_events_per_horizon - ), f"we expect 4 forecasts per horizon for each cycle within the prediction window, and {n_cycles} cycles with each {n_hourly_horizons} hourly horizons" + len(forecasts) == m_viewpoints * n_hourly_horizons * n_events_per_horizon + ), f"we expect 4 forecasts per horizon for each viewpoint within the prediction window, and {m_viewpoints} viewpoints with each {n_hourly_horizons} hourly horizons" assert ( - forecasts.lineage.number_of_belief_times == n_cycles - ), f"we expect 1 belief time per cycle, and {n_cycles} cycles" + forecasts.lineage.number_of_belief_times == m_viewpoints + ), f"we expect {m_viewpoints} viewpoints" source = forecasts.lineage.sources[0] assert "TrainPredictPipeline" in str( source @@ -278,11 +263,10 @@ def test_train_predict_pipeline( # noqa: C901 assert ( "regressors" not in data_generator_config ), "(past and future) regressors should be stored under 'past_regressors' and 'future_regressors' instead" + assert "max-training-period" in data_generator_config # Check DataGenerator parameters stored under DataSource attributes is empty - data_generator_params = source.attributes["data_generator"]["parameters"] - # todo: replace this with `assert data_generator_params == {}` after moving max-training-period to config - assert "max-training-period" in data_generator_params + assert "parameters" not in source.attributes["data_generator"] # Test that missing data logging works and raises NotEnoughDataException when threshold exceeded @@ -293,16 +277,15 @@ def test_train_predict_pipeline( # noqa: C901 { # "model": "CustomLGBM", "missing-threshold": "0.0", + "train-start": "2025-01-01T00:00+02:00", }, { "sensor": "solar-sensor", "model-save-dir": "flexmeasures/data/models/forecasting/artifacts/models", "output-path": None, - "start-date": "2025-01-01T00:00+02:00", - "end-date": "2025-01-30T00:00+02:00", + "end": "2025-01-30T00:00+02:00", "sensor-to-save": None, - "start-predict-date": "2025-01-25T00:00+02:00", - "retrain-frequency": "P1D", + "start": "2025-01-25T00:00+02:00", "max-forecast-horizon": "PT1H", "forecast-frequency": "PT1H", "probabilistic": False, @@ -314,16 +297,15 @@ def test_train_predict_pipeline( # noqa: C901 # "model": "CustomLGBM", "future-regressors": ["irradiance-sensor"], "missing-threshold": "0.0", + "train-start": "2025-01-01T00:00+02:00", }, { "sensor": "solar-sensor", "model-save-dir": "flexmeasures/data/models/forecasting/artifacts/models", "output-path": None, - "start-date": "2025-01-01T00:00+02:00", - "end-date": "2025-01-30T00:00+02:00", + "end": "2025-01-30T00:00+02:00", "sensor-to-save": None, - "start-predict-date": "2025-01-25T00:00+02:00", - "retrain-frequency": "P1D", + "start": "2025-01-25T00:00+02:00", "max-forecast-horizon": "PT1H", "forecast-frequency": "PT1H", "probabilistic": False, @@ -379,17 +361,17 @@ def test_missing_data_logs_warning( ( { # "model": "CustomLGBM", + "retrain-frequency": "P1D", + "train-start": "2025-01-01T00:00+02:00", + "max-training-period": "P10D", # cap at 10 days }, { "sensor": "solar-sensor", "model-save-dir": "flexmeasures/data/models/forecasting/artifacts/models", "output-path": None, - "start-date": "2025-01-01T00:00+02:00", - "end-date": "2025-01-30T00:00+02:00", - "max-training-period": "P10D", # cap at 10 days + "end": "2025-01-30T00:00+02:00", "sensor-to-save": None, - "start-predict-date": "2025-01-25T00:00+02:00", - "retrain-frequency": "P1D", + "start": "2025-01-25T00:00+02:00", "max-forecast-horizon": "PT1H", "forecast-frequency": "PT1H", "probabilistic": False, @@ -419,9 +401,8 @@ def test_train_period_capped_logs_warning( for message in caplog.messages ), "Expected warning about capping train_period" - params_used = pipeline._parameters config_used = pipeline._config assert config_used["missing_threshold"] == 1 - assert params_used["train_period_in_hours"] == timedelta(days=10) / timedelta( + assert config_used["train_period_in_hours"] == timedelta(days=10) / timedelta( hours=1 ), "train_period_in_hours should be capped to max_training_period" diff --git a/flexmeasures/ui/static/openapi-specs.json b/flexmeasures/ui/static/openapi-specs.json index 5be407a019..d8d1841f2a 100644 --- a/flexmeasures/ui/static/openapi-specs.json +++ b/flexmeasures/ui/static/openapi-specs.json @@ -1189,12 +1189,11 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/forecaster_parameters_schema_openAPI" + "$ref": "#/components/schemas/forecasting_trigger_schema_openAPI" }, "example": { - "start-date": "2026-01-01T00:00:00+01:00", - "start-predict-date": "2026-01-15T00:00:00+01:00", - "end-date": "2026-01-17T00:00:00+01:00" + "start": "2026-01-15T00:00:00+01:00", + "duration": "P2D" } } } @@ -4096,7 +4095,7 @@ }, "additionalProperties": false }, - "TrainPredictPipelineConfig": { + "TrainPredictPipelineConfigSchemaOpenAPI": { "type": "object", "properties": { "model": { @@ -4157,41 +4156,15 @@ }, "additionalProperties": false }, - "forecaster_parameters_schema_openAPI": { + "forecasting_trigger_schema_openAPI": { "type": "object", "properties": { - "sensor": { - "type": "integer", - "description": "ID of the sensor to forecast.", - "example": 2092 - }, - "start-date": { - "type": [ - "string", - "null" - ], - "format": "date-time", - "description": "Timestamp marking the start of training data. Defaults to train_period before start_predict_date if not set.", - "example": "2025-01-01T00:00:00+01:00" - }, - "end-date": { - "type": [ - "string", - "null" - ], - "format": "date-time", - "description": "End date for running the pipeline.", - "example": "2025-10-15T00:00:00+01:00" - }, - "train-period": { - "type": [ - "string", - "null" - ], - "description": "Duration of the initial training period (ISO 8601 format, min 2 days). If not set, derived from start_date and start_predict_date or defaults to P30D (30 days).", - "example": "P7D" + "duration": { + "type": "string", + "description": "The duration for which to create the forecast, in ISO 8601 duration format. Defaults to the planning horizon.", + "example": "PT24H" }, - "start-predict-date": { + "start": { "type": [ "string", "null" @@ -4200,45 +4173,19 @@ "description": "Start date for predictions. Defaults to now, floored to the sensor resolution, so that the first forecast is about the ongoing event.", "example": "2025-01-08T00:00:00+01:00" }, - "max-forecast-horizon": { - "type": [ - "string", - "null" - ], - "description": "Maximum forecast horizon. Defaults to covering the whole prediction period (which itself defaults to 48 hours).", - "example": "PT48H" - }, "forecast-frequency": { "type": [ "string", "null" ], - "description": "How often to recompute forecasts. Defaults to retrain frequency.", + "description": "How often to recompute forecasts. This setting can be used to get forecasts from multiple viewpoints, which is especially useful for running simulations. Defaults to the max-forecast-horizon.", "example": "PT1H" }, - "sensor-to-save": { - "type": [ - "integer", - "null" - ], - "description": "Sensor ID where forecasts will be saved; defaults to target sensor.", - "example": 2092 - }, - "max-training-period": { - "type": [ - "string", - "null" - ], - "description": "Maximum duration of the training period. Defaults to 1 year (P1Y).", - "example": "P1Y" - }, "config": { - "$ref": "#/components/schemas/TrainPredictPipelineConfig" + "description": "Changing any of these will result in a new data source ID.", + "$ref": "#/components/schemas/TrainPredictPipelineConfigSchemaOpenAPI" } }, - "required": [ - "sensor" - ], "additionalProperties": false }, "UserAPIQuerySchema": {