diff --git a/flexmeasures/data/models/planning/__init__.py b/flexmeasures/data/models/planning/__init__.py index e5336b00fa..e7a1980d92 100644 --- a/flexmeasures/data/models/planning/__init__.py +++ b/flexmeasures/data/models/planning/__init__.py @@ -44,6 +44,7 @@ class Scheduler: start: datetime end: datetime resolution: timedelta + load_resolution: timedelta belief_time: datetime round_to_decimals: int @@ -74,6 +75,7 @@ def __init__( flex_model: dict | None = None, flex_context: dict | None = None, return_multiple: bool = False, + load_time_series: bool = True, ): """ Initialize a new Scheduler. @@ -113,6 +115,7 @@ def __init__( self.start = start self.end = end self.resolution = resolution + self.load_resolution = resolution self.belief_time = belief_time self.round_to_decimals = round_to_decimals if flex_model is None: @@ -126,6 +129,7 @@ def __init__( self.info = dict(scheduler=self.__class__.__name__) self.return_multiple = return_multiple + self.load_time_series = load_time_series def compute_schedule(self) -> pd.Series | None: """ diff --git a/flexmeasures/data/models/planning/storage.py b/flexmeasures/data/models/planning/storage.py index 3fab171dec..968c12b4d2 100644 --- a/flexmeasures/data/models/planning/storage.py +++ b/flexmeasures/data/models/planning/storage.py @@ -32,6 +32,7 @@ from flexmeasures.data.schemas.scheduling.storage import StorageFlexModelSchema from flexmeasures.data.schemas.scheduling import ( FlexContextSchema, + FlexContextTimeSeriesSchema, MultiSensorFlexModelSchema, ) from flexmeasures.utils.calculations import ( @@ -96,11 +97,9 @@ def _prepare(self, skip_validation: bool = False) -> tuple: # noqa: C901 resolution = determine_minimum_resampling_resolution( [s.event_resolution for s in sensors] ) - asset = self.asset else: # For backwards compatibility with the single asset scheduler sensors = [self.sensor] - asset = self.sensor.generic_asset # For backwards compatibility with the single asset scheduler flex_model = self.flex_model @@ -160,29 +159,9 @@ def _prepare(self, skip_validation: bool = False) -> tuple: # noqa: C901 power_capacity_in_mw = self._get_device_power_capacity(flex_model, sensors) # Check for known prices or price forecasts - up_deviation_prices = get_continuous_series_sensor_or_quantity( - variable_quantity=consumption_price, - actuator=asset, - unit=FlexContextSchema() - .declared_fields["consumption_price"] - ._get_unit(consumption_price), - query_window=(start, end), - resolution=resolution, - beliefs_before=belief_time, - fill_sides=True, - ).to_frame(name="event_value") + up_deviation_prices = consumption_price.to_frame(name="event_value") ensure_prices_are_not_empty(up_deviation_prices, consumption_price) - down_deviation_prices = get_continuous_series_sensor_or_quantity( - variable_quantity=production_price, - actuator=asset, - unit=FlexContextSchema() - .declared_fields["production_price"] - ._get_unit(production_price), - query_window=(start, end), - resolution=resolution, - beliefs_before=belief_time, - fill_sides=True, - ).to_frame(name="event_value") + down_deviation_prices = production_price.to_frame(name="event_value") ensure_prices_are_not_empty(down_deviation_prices, production_price) start = pd.Timestamp(start).tz_convert("UTC") @@ -200,35 +179,21 @@ def _prepare(self, skip_validation: bool = False) -> tuple: # noqa: C901 ) # Create Series with EMS capacities - ems_power_capacity_in_mw = get_continuous_series_sensor_or_quantity( - variable_quantity=self.flex_context.get("ems_power_capacity_in_mw"), - actuator=asset, - unit="MW", - query_window=(start, end), - resolution=resolution, - beliefs_before=belief_time, - resolve_overlaps="min", - ) - ems_consumption_capacity = get_continuous_series_sensor_or_quantity( - variable_quantity=self.flex_context.get("ems_consumption_capacity_in_mw"), - actuator=asset, - unit="MW", - query_window=(start, end), - resolution=resolution, - beliefs_before=belief_time, - max_value=ems_power_capacity_in_mw, - resolve_overlaps="min", - ) - ems_production_capacity = -1 * get_continuous_series_sensor_or_quantity( - variable_quantity=self.flex_context.get("ems_production_capacity_in_mw"), - actuator=asset, - unit="MW", - query_window=(start, end), - resolution=resolution, - beliefs_before=belief_time, - max_value=ems_power_capacity_in_mw, - resolve_overlaps="min", - ) + ems_power_capacity_in_mw = self.flex_context.get("ems_power_capacity_in_mw") + if (cap := self.flex_context.get("ems_consumption_capacity_in_mw")) is not None: + ems_consumption_capacity = cap.clip(upper=ems_power_capacity_in_mw).fillna( + ems_power_capacity_in_mw + ) + else: + ems_consumption_capacity = ems_power_capacity_in_mw + if (cap := self.flex_context.get("ems_production_capacity_in_mw")) is not None: + ems_production_capacity = -1 * ( + cap.clip(upper=ems_power_capacity_in_mw).fillna( + ems_power_capacity_in_mw + ) + ) + else: + ems_production_capacity = -ems_power_capacity_in_mw # Set up commitments to optimise for commitments = [] @@ -260,30 +225,13 @@ def _prepare(self, skip_validation: bool = False) -> tuple: # noqa: C901 # Set up peak commitments if self.flex_context.get("ems_peak_consumption_price") is not None: - ems_peak_consumption = get_continuous_series_sensor_or_quantity( - variable_quantity=self.flex_context.get("ems_peak_consumption_in_mw"), - actuator=asset, - unit="MW", - query_window=(start, end), - resolution=resolution, - beliefs_before=belief_time, - max_value=np.inf, # np.nan -> np.inf to ignore commitment if no quantity is given - fill_sides=True, - ) + ems_peak_consumption = self.flex_context.get("ems_peak_consumption_in_mw") + ems_peak_consumption.fillna( + np.inf + ) # np.nan -> np.inf to ignore commitment if no quantity is given ems_peak_consumption_price = self.flex_context.get( "ems_peak_consumption_price" ) - ems_peak_consumption_price = get_continuous_series_sensor_or_quantity( - variable_quantity=ems_peak_consumption_price, - actuator=asset, - unit=FlexContextSchema() - .declared_fields["ems_peak_consumption_price"] - ._get_unit(ems_peak_consumption_price), - query_window=(start, end), - resolution=resolution, - beliefs_before=belief_time, - fill_sides=True, - ) # Set up commitments DataFrame commitment = FlowCommitment( @@ -296,30 +244,13 @@ def _prepare(self, skip_validation: bool = False) -> tuple: # noqa: C901 ) commitments.append(commitment) if self.flex_context.get("ems_peak_production_price") is not None: - ems_peak_production = get_continuous_series_sensor_or_quantity( - variable_quantity=self.flex_context.get("ems_peak_production_in_mw"), - actuator=asset, - unit="MW", - query_window=(start, end), - resolution=resolution, - beliefs_before=belief_time, - max_value=np.inf, # np.nan -> np.inf to ignore commitment if no quantity is given - fill_sides=True, - ) + ems_peak_production = self.flex_context.get("ems_peak_production_in_mw") + ems_peak_production.fillna( + np.inf + ) # np.nan -> np.inf to ignore commitment if no quantity is given ems_peak_production_price = self.flex_context.get( "ems_peak_production_price" ) - ems_peak_production_price = get_continuous_series_sensor_or_quantity( - variable_quantity=ems_peak_production_price, - actuator=asset, - unit=FlexContextSchema() - .declared_fields["ems_peak_production_price"] - ._get_unit(ems_peak_production_price), - query_window=(start, end), - resolution=resolution, - beliefs_before=belief_time, - fill_sides=True, - ) # Set up commitments DataFrame commitment = FlowCommitment( @@ -347,29 +278,10 @@ def _prepare(self, skip_validation: bool = False) -> tuple: # noqa: C901 if ems_consumption_breach_price is not None: # Convert to Series - any_ems_consumption_breach_price = get_continuous_series_sensor_or_quantity( - variable_quantity=ems_consumption_breach_price, - actuator=asset, - unit=FlexContextSchema() - .declared_fields["ems_consumption_breach_price"] - ._get_unit(ems_consumption_breach_price), - query_window=(start, end), - resolution=resolution, - beliefs_before=belief_time, - fill_sides=True, - ) - all_ems_consumption_breach_price = get_continuous_series_sensor_or_quantity( - variable_quantity=ems_consumption_breach_price, - actuator=asset, - unit=FlexContextSchema() - .declared_fields["ems_consumption_breach_price"] - ._get_unit(ems_consumption_breach_price) - + "*h", # from EUR/MWh to EUR/MW/resolution - query_window=(start, end), - resolution=resolution, - beliefs_before=belief_time, - fill_sides=True, - ) + any_ems_consumption_breach_price = ems_consumption_breach_price + all_ems_consumption_breach_price = ( + ems_consumption_breach_price * resolution / timedelta(hours=1) + ) # from EUR/MWh to EUR/MW/resolution # Set up commitments DataFrame to penalize any breach commitment = FlowCommitment( @@ -401,29 +313,10 @@ def _prepare(self, skip_validation: bool = False) -> tuple: # noqa: C901 if ems_production_breach_price is not None: # Convert to Series - any_ems_production_breach_price = get_continuous_series_sensor_or_quantity( - variable_quantity=ems_production_breach_price, - actuator=asset, - unit=FlexContextSchema() - .declared_fields["ems_production_breach_price"] - ._get_unit(ems_production_breach_price), - query_window=(start, end), - resolution=resolution, - beliefs_before=belief_time, - fill_sides=True, - ) - all_ems_production_breach_price = get_continuous_series_sensor_or_quantity( - variable_quantity=ems_production_breach_price, - actuator=asset, - unit=FlexContextSchema() - .declared_fields["ems_production_breach_price"] - ._get_unit(ems_production_breach_price) - + "*h", # from EUR/MWh to EUR/MW/resolution - query_window=(start, end), - resolution=resolution, - beliefs_before=belief_time, - fill_sides=True, - ) + any_ems_production_breach_price = ems_production_breach_price + all_ems_production_breach_price = ( + ems_production_breach_price * resolution / timedelta(hours=1) + ) # from EUR/MWh to EUR/MW/resolution # Set up commitments DataFrame to penalize any breach commitment = FlowCommitment( @@ -521,31 +414,14 @@ def _prepare(self, skip_validation: bool = False) -> tuple: # noqa: C901 and soc_minima[d] is not None ): soc_minima_breach_price = self.flex_context["soc_minima_breach_price"] - any_soc_minima_breach_price = get_continuous_series_sensor_or_quantity( - variable_quantity=soc_minima_breach_price, - actuator=asset, - unit=FlexContextSchema() - .declared_fields["soc_minima_breach_price"] - ._get_unit(soc_minima_breach_price), - query_window=(start + resolution, end + resolution), - resolution=resolution, - beliefs_before=belief_time, - fallback_attribute="soc-minima-breach-price", - fill_sides=True, - ).shift(-1, freq=resolution) - all_soc_minima_breach_price = get_continuous_series_sensor_or_quantity( - variable_quantity=soc_minima_breach_price, - actuator=asset, - unit=FlexContextSchema() - .declared_fields["soc_minima_breach_price"] - ._get_unit(soc_minima_breach_price) - + "*h", # from EUR/MWh² to EUR/MWh/resolution - query_window=(start + resolution, end + resolution), - resolution=resolution, - beliefs_before=belief_time, - fallback_attribute="soc-minima-breach-price", - fill_sides=True, - ).shift(-1, freq=resolution) + any_soc_minima_breach_price = soc_minima_breach_price.shift( + -1, freq=resolution + ) + all_soc_minima_breach_price = ( + soc_minima_breach_price.shift(-1, freq=resolution) + * resolution + / timedelta(hours=1) + ) # from EUR/MWh² to EUR/MWh/resolution # Set up commitments DataFrame # soc_minima_d is a temp variable because add_storage_constraints can't deal with Series yet soc_minima_d = get_continuous_series_sensor_or_quantity( @@ -605,31 +481,14 @@ def _prepare(self, skip_validation: bool = False) -> tuple: # noqa: C901 and soc_maxima[d] is not None ): soc_maxima_breach_price = self.flex_context["soc_maxima_breach_price"] - any_soc_maxima_breach_price = get_continuous_series_sensor_or_quantity( - variable_quantity=soc_maxima_breach_price, - actuator=asset, - unit=FlexContextSchema() - .declared_fields["soc_maxima_breach_price"] - ._get_unit(soc_maxima_breach_price), - query_window=(start + resolution, end + resolution), - resolution=resolution, - beliefs_before=belief_time, - fallback_attribute="soc-maxima-breach-price", - fill_sides=True, - ).shift(-1, freq=resolution) - all_soc_maxima_breach_price = get_continuous_series_sensor_or_quantity( - variable_quantity=soc_maxima_breach_price, - actuator=asset, - unit=FlexContextSchema() - .declared_fields["soc_maxima_breach_price"] - ._get_unit(soc_maxima_breach_price) - + "*h", # from EUR/MWh² to EUR/MWh/resolution - query_window=(start + resolution, end + resolution), - resolution=resolution, - beliefs_before=belief_time, - fallback_attribute="soc-maxima-breach-price", - fill_sides=True, - ).shift(-1, freq=resolution) + any_soc_maxima_breach_price = soc_maxima_breach_price.shift( + -1, freq=resolution + ) + all_soc_maxima_breach_price = ( + soc_maxima_breach_price.shift(-1, freq=resolution) + * resolution + / timedelta(hours=1) + ) # from EUR/MWh² to EUR/MWh/resolution # Set up commitments DataFrame # soc_maxima_d is a temp variable because add_storage_constraints can't deal with Series yet soc_maxima_d = get_continuous_series_sensor_or_quantity( @@ -722,35 +581,10 @@ def _prepare(self, skip_validation: bool = False) -> tuple: # noqa: C901 production_breach_price = self.flex_context[ "production_breach_price" ] - any_production_breach_price = ( - get_continuous_series_sensor_or_quantity( - variable_quantity=production_breach_price, - actuator=asset, - unit=FlexContextSchema() - .declared_fields["production_breach_price"] - ._get_unit(production_breach_price), - query_window=(start, end), - resolution=resolution, - beliefs_before=belief_time, - fallback_attribute="production-breach-price", - fill_sides=True, - ) - ) + any_production_breach_price = production_breach_price all_production_breach_price = ( - get_continuous_series_sensor_or_quantity( - variable_quantity=production_breach_price, - actuator=asset, - unit=FlexContextSchema() - .declared_fields["production_breach_price"] - ._get_unit(production_breach_price) - + "*h", # from EUR/MWh to EUR/MW/resolution - query_window=(start, end), - resolution=resolution, - beliefs_before=belief_time, - fallback_attribute="production-breach-price", - fill_sides=True, - ) - ) + production_breach_price * resolution / timedelta(hours=1) + ) # from EUR/MWh to EUR/MW/resolution # Set up commitments DataFrame commitment = FlowCommitment( name=f"any production breach device {d}", @@ -798,35 +632,10 @@ def _prepare(self, skip_validation: bool = False) -> tuple: # noqa: C901 consumption_breach_price = self.flex_context[ "consumption_breach_price" ] - any_consumption_breach_price = ( - get_continuous_series_sensor_or_quantity( - variable_quantity=consumption_breach_price, - actuator=asset, - unit=FlexContextSchema() - .declared_fields["consumption_breach_price"] - ._get_unit(consumption_breach_price), - query_window=(start, end), - resolution=resolution, - beliefs_before=belief_time, - fallback_attribute="consumption-breach-price", - fill_sides=True, - ) - ) + any_consumption_breach_price = consumption_breach_price all_consumption_breach_price = ( - get_continuous_series_sensor_or_quantity( - variable_quantity=consumption_breach_price, - actuator=asset, - unit=FlexContextSchema() - .declared_fields["consumption_breach_price"] - ._get_unit(consumption_breach_price) - + "*h", # from EUR/MWh to EUR/MW/resolution - query_window=(start, end), - resolution=resolution, - beliefs_before=belief_time, - fallback_attribute="consumption-breach-price", - fill_sides=True, - ) - ) + consumption_breach_price * resolution / timedelta(hours=1) + ) # from EUR/MWh to EUR/MW/resolution # Set up commitments DataFrame commitment = FlowCommitment( name=f"any consumption breach device {d}", @@ -986,6 +795,26 @@ def persist_flex_model(self): "soc_in_mwh", self.flex_model["soc_at_start"] ) + def _load_time_series(self, asset): + if not self.load_resolution: + self._determine_load_resolution() + self.flex_context = FlexContextTimeSeriesSchema( + asset=asset, + query_window=(self.start, self.end), + resolution=self.load_resolution, + belief_time=self.belief_time, + ).load(self.flex_context) + + def _determine_load_resolution(self): + """Determine resolution for loading time series data.""" + if self.asset is not None: + sensors = [flex_model_d["sensor"] for flex_model_d in self.flex_model] + self.load_resolution = determine_minimum_resampling_resolution( + [s.event_resolution for s in sensors] + ) + else: + self.load_resolution = self.resolution + def deserialize_flex_config(self): """ Deserialize storage flex model and the flex context against schemas. @@ -1063,6 +892,11 @@ def deserialize_flex_config(self): f"Unsupported type of flex-model: '{type(self.flex_model)}'" ) + # Load time series from flex-context + self._determine_load_resolution() + if self.load_time_series: + self._load_time_series(asset) + return self.flex_model def possibly_extend_end(self, soc_targets, sensor: Sensor = None): diff --git a/flexmeasures/data/models/planning/tests/test_solver.py b/flexmeasures/data/models/planning/tests/test_solver.py index fb563f020c..a91b5b1569 100644 --- a/flexmeasures/data/models/planning/tests/test_solver.py +++ b/flexmeasures/data/models/planning/tests/test_solver.py @@ -14,6 +14,7 @@ from flexmeasures.data.models.planning import Scheduler from flexmeasures.data.models.planning.exceptions import InfeasibleProblemException from flexmeasures.data.models.planning.storage import ( + MetaStorageScheduler, StorageScheduler, add_storage_constraints, validate_storage_constraints, @@ -424,6 +425,7 @@ def test_charging_station_solver_day_2( "consumption_price": epex_da, }, ) + scheduler._load_time_series(charging_station) scheduler.config_deserialized = ( True # soc targets are already a DataFrame, names get underscore ) @@ -508,6 +510,7 @@ def test_fallback_to_unsolvable_problem( }, } scheduler = StorageScheduler(**kwargs) + scheduler._load_time_series(charging_station) scheduler.config_deserialized = ( True # soc targets are already a DataFrame, names get underscore ) @@ -517,7 +520,10 @@ def test_fallback_to_unsolvable_problem( consumption_schedule = scheduler.compute(skip_validation=True) # check that the fallback scheduler provides a sensible fallback policy - fallback_scheduler = scheduler.fallback_scheduler_class(**kwargs) + fallback_scheduler: MetaStorageScheduler = scheduler.fallback_scheduler_class( + **kwargs + ) + fallback_scheduler._load_time_series(charging_station) fallback_scheduler.config_deserialized = True consumption_schedule = fallback_scheduler.compute(skip_validation=True) @@ -608,6 +614,7 @@ def test_building_solver_day_2( "consumption_price": consumption_price_sensor, }, ) + scheduler._load_time_series(battery) scheduler.config_deserialized = ( True # inflexible device sensors are already objects, names get underscore ) diff --git a/flexmeasures/data/schemas/scheduling/__init__.py b/flexmeasures/data/schemas/scheduling/__init__.py index 4dc080a958..d003e35365 100644 --- a/flexmeasures/data/schemas/scheduling/__init__.py +++ b/flexmeasures/data/schemas/scheduling/__init__.py @@ -1,5 +1,7 @@ from __future__ import annotations +from datetime import datetime, timedelta + from marshmallow import ( Schema, fields, @@ -9,8 +11,9 @@ pre_load, post_dump, ) +import pandas as pd -from flexmeasures import Sensor +from flexmeasures import Asset, Sensor from flexmeasures.data.schemas.generic_assets import GenericAssetIdField from flexmeasures.data.schemas.sensors import ( VariableQuantityField, @@ -29,37 +32,76 @@ ) +def series_range_validator(min=None, max=None): + range_validator = validate.Range(min=min, max=max) + + def _validate_series(value): + if isinstance(value, pd.Series): + invalid_mask = pd.Series([False] * len(value), index=value.index) + + if min is not None: + invalid_mask |= value < min + if max is not None: + invalid_mask |= value > max + + if invalid_mask.any(): + invalid_indexes = value.index[invalid_mask].tolist() + invalid_values = value[invalid_mask].tolist() + raise ValidationError( + f"Series contains values outside the allowed range (min={min}, max={max}).\n" + f"Invalid entries:\n" + f"Indexes: {invalid_indexes}\n" + f"Values: {invalid_values}" + ) + else: + range_validator(value) + + return _validate_series + + class FlexContextSchema(Schema): """This schema defines fields that provide context to the portfolio to be optimized.""" + shared_currency_unit = fields.String( + data_key="shared-currency-unit", + required=False, + load_default="EUR", + ) + # Device commitments consumption_breach_price = VariableQuantityField( "/MW", data_key="consumption-breach-price", required=False, - value_validator=validate.Range(min=0), + value_validator=series_range_validator(min=0), default=None, + fill_sides=True, ) production_breach_price = VariableQuantityField( "/MW", data_key="production-breach-price", required=False, - value_validator=validate.Range(min=0), + value_validator=series_range_validator(min=0), default=None, + fill_sides=True, ) soc_minima_breach_price = VariableQuantityField( "/MWh", data_key="soc-minima-breach-price", required=False, - value_validator=validate.Range(min=0), + value_validator=series_range_validator(min=0), default=None, + add_resolution=True, + fill_sides=True, ) soc_maxima_breach_price = VariableQuantityField( "/MWh", data_key="soc-maxima-breach-price", required=False, - value_validator=validate.Range(min=0), + value_validator=series_range_validator(min=0), default=None, + add_resolution=True, + fill_sides=True, ) # Dev fields relax_soc_constraints = fields.Bool( @@ -74,22 +116,29 @@ class FlexContextSchema(Schema): "MW", required=False, data_key="site-power-capacity", - value_validator=validate.Range(min=0), + value_validator=series_range_validator(min=0), + resolve_overlaps="min", ) # todo: deprecated since flexmeasures==0.23 - consumption_price_sensor = SensorIdField(data_key="consumption-price-sensor") - production_price_sensor = SensorIdField(data_key="production-price-sensor") + consumption_price_sensor = SensorIdField( + data_key="consumption-price-sensor", fill_sides=True + ) + production_price_sensor = SensorIdField( + data_key="production-price-sensor", fill_sides=True + ) consumption_price = VariableQuantityField( "/MWh", required=False, data_key="consumption-price", return_magnitude=False, + fill_sides=True, ) production_price = VariableQuantityField( "/MWh", required=False, data_key="production-price", return_magnitude=False, + fill_sides=True, ) # Capacity breach commitments @@ -97,27 +146,31 @@ class FlexContextSchema(Schema): "MW", required=False, data_key="site-production-capacity", - value_validator=validate.Range(min=0), + value_validator=series_range_validator(min=0), + resolve_overlaps="min", ) ems_consumption_capacity_in_mw = VariableQuantityField( "MW", required=False, data_key="site-consumption-capacity", - value_validator=validate.Range(min=0), + value_validator=series_range_validator(min=0), + resolve_overlaps="min", ) ems_consumption_breach_price = VariableQuantityField( "/MW", data_key="site-consumption-breach-price", required=False, - value_validator=validate.Range(min=0), + value_validator=series_range_validator(min=0), default=None, + fill_sides=True, ) ems_production_breach_price = VariableQuantityField( "/MW", data_key="site-production-breach-price", required=False, - value_validator=validate.Range(min=0), + value_validator=series_range_validator(min=0), default=None, + fill_sides=True, ) # Peak consumption commitment @@ -125,15 +178,17 @@ class FlexContextSchema(Schema): "MW", required=False, data_key="site-peak-consumption", - value_validator=validate.Range(min=0), + value_validator=series_range_validator(min=0), default="0 kW", + fill_sides=True, ) ems_peak_consumption_price = VariableQuantityField( "/MW", data_key="site-peak-consumption-price", required=False, - value_validator=validate.Range(min=0), + value_validator=series_range_validator(min=0), default=None, + fill_sides=True, ) # Peak production commitment @@ -141,15 +196,17 @@ class FlexContextSchema(Schema): "MW", required=False, data_key="site-peak-production", - value_validator=validate.Range(min=0), + value_validator=series_range_validator(min=0), default="0 kW", + fill_sides=True, ) ems_peak_production_price = VariableQuantityField( "/MW", data_key="site-peak-production-price", required=False, - value_validator=validate.Range(min=0), + value_validator=series_range_validator(min=0), default=None, + fill_sides=True, ) # todo: group by month start (MS), something like a commitment resolution, or a list of datetimes representing splits of the commitments @@ -157,6 +214,25 @@ class FlexContextSchema(Schema): SensorIdField(), data_key="inflexible-device-sensors" ) + def __init__( + self, + asset: Asset | None = None, + load_time_series: bool = False, + query_window: tuple[datetime, datetime] | None = None, + resolution: timedelta | None = None, + belief_time: datetime | None = None, + *args, + **kwargs, + ): + if load_time_series and asset is None: + raise NotImplementedError("Cannot load time series from an unknown asset.") + self.asset = asset + self.load_time_series = load_time_series + self.query_window = query_window + self.resolution = resolution + self.belief_time = belief_time + super().__init__(*args, **kwargs) + def set_default_breach_prices( self, data: dict, fields: list[str], price: ur.Quantity ): @@ -180,6 +256,8 @@ def check_prices(self, data: dict, **kwargs): 1. The flex-context must contain at most 1 consumption price and at most 1 production price field. 2. All prices must share the same currency. """ + if self.load_time_series: + return data # The flex-context must contain at most 1 consumption price and at most 1 production price field if "consumption_price_sensor" in data and "consumption_price" in data: @@ -284,7 +362,8 @@ def _try_to_convert_price_units(self, data): f"Prices must share the same monetary unit. '{field_name}' uses '{currency_unit}', but '{previous_field_name}' used '{shared_currency_unit}'.", field_name=field_name, ) - data["shared_currency_unit"] = shared_currency_unit + if shared_currency_unit is not None: + data["shared_currency_unit"] = shared_currency_unit return data @@ -408,6 +487,29 @@ def _forbid_fixed_prices(self, data: dict, **kwargs): ) +def passthrough_deserializer(): + return lambda value, attr, data, **kwargs: value + + +class FlexContextTimeSeriesSchema(FlexContextSchema): + """Schema for loading time series data for each VariableQuantityField in an already deserialized flex-context.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.load_time_series = True + for field_var, field in self.declared_fields.items(): + if isinstance(field, VariableQuantityField): + field.load_time_series = True + # Compatibility with deprecated fields + elif field_var in ("consumption_price_sensor", "production_price_sensor"): + field.load_time_series = True + else: + # Skip deserialization + field._deserialize = passthrough_deserializer() + field.data_key = field_var + setattr(self, field_var, field) + + class MultiSensorFlexModelSchema(Schema): """ diff --git a/flexmeasures/data/schemas/sensors.py b/flexmeasures/data/schemas/sensors.py index 46f642979c..eb55029616 100644 --- a/flexmeasures/data/schemas/sensors.py +++ b/flexmeasures/data/schemas/sensors.py @@ -3,6 +3,7 @@ import numbers from pytz.exceptions import UnknownTimeZoneError +import timely_beliefs from flask import current_app from marshmallow import ( Schema, @@ -21,6 +22,9 @@ from flexmeasures.data import ma, db from flexmeasures.data.models.generic_assets import GenericAsset +from flexmeasures.data.models.planning.utils import ( + get_continuous_series_sensor_or_quantity, +) from flexmeasures.data.models.time_series import Sensor from flexmeasures.data.schemas.utils import ( FMValidationError, @@ -202,7 +206,15 @@ class Meta: class SensorIdField(MarshmallowClickMixin, fields.Int): """Field that deserializes to a Sensor and serializes back to an integer.""" - def __init__(self, *args, unit: str | ur.Quantity | None = None, **kwargs): + def __init__( + self, + *args, + unit: str | ur.Quantity | None = None, + fill_sides: bool = False, + add_resolution: bool = False, + resolve_overlaps: str = "first", + **kwargs, + ): super().__init__(*args, **kwargs) if isinstance(unit, str): @@ -211,28 +223,53 @@ def __init__(self, *args, unit: str | ur.Quantity | None = None, **kwargs): self.to_unit = unit else: self.to_unit = None + self.load_time_series = False + self.fill_sides = fill_sides + self.add_resolution = add_resolution + self.resolve_overlaps = resolve_overlaps @with_appcontext_if_needed() - def _deserialize(self, value: int, attr, obj, **kwargs) -> Sensor: + def _deserialize( + self, value: int, attr, obj, **kwargs + ) -> Sensor | timely_beliefs.BeliefsSeries: """Turn a sensor id into a Sensor.""" - sensor = db.session.get(Sensor, value) - if sensor is None: - raise FMValidationError(f"No sensor found with id {value}.") - - # lazy loading now (sensor is somehow not in session after this) - sensor.generic_asset - sensor.generic_asset.generic_asset_type + if not self.load_time_series: + sensor = db.session.get(Sensor, value) + if sensor is None: + raise FMValidationError(f"No sensor found with id {value}.") + + # lazy loading now (sensor is somehow not in session after this) + sensor.generic_asset + sensor.generic_asset.generic_asset_type + + # if the units are defined, check if the sensor data is convertible to the target units + if self.to_unit is not None and not units_are_convertible( + sensor.unit, str(self.to_unit.units) + ): + raise FMValidationError( + f"Cannot convert {sensor.unit} to {self.to_unit.units}" + ) - # if the units are defined, check if the sensor data is convertible to the target units - if self.to_unit is not None and not units_are_convertible( - sensor.unit, str(self.to_unit.units) - ): - raise FMValidationError( - f"Cannot convert {sensor.unit} to {self.to_unit.units}" + return sensor + else: + query_window = self.parent.query_window + resolution = self.parent.resolution + if self.add_resolution: + query_window = ( + query_window[0] + resolution, + query_window[1] + resolution, + ) + return get_continuous_series_sensor_or_quantity( + variable_quantity=value, + actuator=self.parent.asset, + unit=value.unit, + query_window=query_window, + resolution=resolution, + beliefs_before=self.parent.belief_time, + fill_sides=self.fill_sides, + resolve_overlaps=self.resolve_overlaps, ) - return sensor - def _serialize(self, sensor: Sensor, attr, data, **kwargs) -> int: """Turn a Sensor into a sensor id.""" return sensor.id @@ -246,6 +283,9 @@ def __init__( default_src_unit: str | None = None, return_magnitude: bool = False, timezone: str | None = None, + fill_sides: bool = False, + add_resolution: bool = False, + resolve_overlaps: str = "first", value_validator: Validator | None = None, **kwargs, ): @@ -281,6 +321,9 @@ def __init__( value_validator = RepurposeValidatorToIgnoreSensorsAndLists(value_validator) self.validators.insert(0, value_validator) self.timezone = timezone + self.fill_sides = fill_sides + self.add_resolution = add_resolution + self.resolve_overlaps = resolve_overlaps self.value_validator = value_validator if to_unit.startswith("/") and len(to_unit) < 2: raise ValueError( @@ -293,24 +336,47 @@ def __init__( default_src_unit = "dimensionless" self.default_src_unit = default_src_unit self.return_magnitude = return_magnitude + self.load_time_series = False @with_appcontext_if_needed() def _deserialize( self, value: dict[str, int] | list[dict] | str, attr, obj, **kwargs - ) -> Sensor | list[dict] | ur.Quantity: - - if isinstance(value, dict): - return self._deserialize_dict(value) - elif isinstance(value, list): - return self._deserialize_list(value) - elif isinstance(value, str): - return self._deserialize_str(value) - elif isinstance(value, numbers.Real) and self.default_src_unit is not None: - return self._deserialize_numeric(value, attr, obj, **kwargs) - else: - raise FMValidationError( - f"Unsupported value type. `{type(value)}` was provided but only dict, list and str are supported." + ) -> Sensor | list[dict] | ur.Quantity | timely_beliefs.BeliefsSeries: + + if not self.load_time_series: + if isinstance(value, dict): + value = self._deserialize_dict(value) + elif isinstance(value, list): + value = self._deserialize_list(value) + elif isinstance(value, str): + value = self._deserialize_str(value) + elif isinstance(value, numbers.Real) and self.default_src_unit is not None: + value = self._deserialize_numeric(value, attr, obj, **kwargs) + else: + raise FMValidationError( + f"Unsupported value type. `{type(value)}` was provided but only dict, list and str are supported." + ) + # The schema can be initialized to load time series, rather than just the Sensor, time series specs or Quantity + # if hasattr(self.parent, "load_time_series") and self.parent.load_time_series: + if self.load_time_series: + query_window = self.parent.query_window + resolution = self.parent.resolution + if self.add_resolution: + query_window = ( + query_window[0] + resolution, + query_window[1] + resolution, + ) + return get_continuous_series_sensor_or_quantity( + variable_quantity=value, + actuator=self.parent.asset, + unit=self._get_unit(value) if self.to_unit[0] == "/" else self.to_unit, + query_window=query_window, + resolution=resolution, + beliefs_before=self.parent.belief_time, + fill_sides=self.fill_sides, + resolve_overlaps=self.resolve_overlaps, ) + return value def _deserialize_dict(self, value: dict[str, int]) -> Sensor: """Deserialize a sensor reference to a Sensor.""" @@ -382,7 +448,7 @@ def convert(self, value, param, ctx, **kwargs): return super().convert(_value, param, ctx, **kwargs) - def _get_unit(self, variable_quantity: ur.Quantity | list[dict | Sensor]) -> str: + def _get_unit(self, variable_quantity: ur.Quantity | list[dict] | Sensor) -> str: """Obtain the unit from the variable quantity.""" if isinstance(variable_quantity, ur.Quantity): unit = str(variable_quantity.units) diff --git a/flexmeasures/data/services/scheduling.py b/flexmeasures/data/services/scheduling.py index fe607ed4c8..25d68cfc0a 100644 --- a/flexmeasures/data/services/scheduling.py +++ b/flexmeasures/data/services/scheduling.py @@ -165,6 +165,7 @@ def trigger_optional_fallback(job, connection, type, value, traceback): def create_scheduling_job( asset_or_sensor: Asset | Sensor | None = None, sensor: Sensor | None = None, + load_time_series: bool = False, job_id: str | None = None, enqueue: bool = True, requeue: bool = False, @@ -189,6 +190,7 @@ def create_scheduling_job( Arguments: :param asset_or_sensor: Asset or sensor for which the schedule is computed. + :param load_time_series: If True, also loads data when the job gets created. todo: does this have a practical use case? :param job_id: Optionally, set a job id explicitly. :param enqueue: If True, enqueues the job in case it is new. :param requeue: If True, requeues the job in case it is not new and had previously failed @@ -214,6 +216,7 @@ def create_scheduling_job( else: scheduler_class: Type[Scheduler] = find_scheduler_class(asset_or_sensor) + scheduler_kwargs["load_time_series"] = load_time_series scheduler = get_scheduler_instance( scheduler_class=scheduler_class, asset_or_sensor=asset_or_sensor, @@ -519,6 +522,7 @@ def make_schedule( if belief_time is None: belief_time = server_now() + scheduler_kwargs["load_time_series"] = True scheduler_params = dict( start=start, end=end,