diff --git a/documentation/plugin/customisation.rst b/documentation/plugin/customisation.rst index 3faa1e303c..5f452df13e 100644 --- a/documentation/plugin/customisation.rst +++ b/documentation/plugin/customisation.rst @@ -61,22 +61,14 @@ Finally, make your scheduler be the one that FlexMeasures will use for certain s .. code-block:: python from flexmeasures import Sensor - - scheduler_specs = { - "module": "flexmeasures.data.tests.dummy_scheduler", # or a file path, see note below - "class": "DummyScheduler", - } my_sensor = Sensor.query.filter(Sensor.name == "My power sensor on a flexible asset").one_or_none() - my_sensor.attributes["custom-scheduler"] = scheduler_specs + my_sensor.attributes["custom-scheduler"] = "DummyScheduler" From now on, all schedules (see :ref:`tut_forecasting_scheduling`) which are requested for this sensor should get computed by your custom function! For later lookup, the data will be linked to a new data source with the name "My Opinion". -.. note:: To describe the module, we used an importable module here (actually a custom scheduling function we use to test this). - You can also provide a full file path to the module, e.g. "/path/to/my_file.py". - Adding your own forecasting algorithm ------------------------------------- diff --git a/flexmeasures/api/v3_0/tests/test_asset_schedules_fresh_db.py b/flexmeasures/api/v3_0/tests/test_asset_schedules_fresh_db.py index db5f82fcdc..0ebf0548ef 100644 --- a/flexmeasures/api/v3_0/tests/test_asset_schedules_fresh_db.py +++ b/flexmeasures/api/v3_0/tests/test_asset_schedules_fresh_db.py @@ -11,7 +11,7 @@ from flexmeasures import Sensor from flexmeasures.api.v3_0.tests.utils import message_for_trigger_schedule from flexmeasures.data.models.planning.tests.utils import check_constraints -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.scheduling import ( handle_scheduling_exception, get_data_source_for_job, diff --git a/flexmeasures/api/v3_0/tests/test_forecasting_api.py b/flexmeasures/api/v3_0/tests/test_forecasting_api.py index f4e4460523..a621be8290 100644 --- a/flexmeasures/api/v3_0/tests/test_forecasting_api.py +++ b/flexmeasures/api/v3_0/tests/test_forecasting_api.py @@ -6,7 +6,7 @@ get_data_source_for_job, ) from rq.job import Job -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.api.tests.utils import get_auth_token from flexmeasures.data.services.forecasting import handle_forecasting_exception from flexmeasures.data.models.forecasting.pipelines import TrainPredictPipeline diff --git a/flexmeasures/api/v3_0/tests/test_sensor_schedules.py b/flexmeasures/api/v3_0/tests/test_sensor_schedules.py index 5710ee493e..0b0e8c14ee 100644 --- a/flexmeasures/api/v3_0/tests/test_sensor_schedules.py +++ b/flexmeasures/api/v3_0/tests/test_sensor_schedules.py @@ -9,7 +9,7 @@ from flexmeasures.api.tests.utils import check_deprecation from flexmeasures.api.v3_0.tests.utils import message_for_trigger_schedule from flexmeasures.data.models.data_sources import DataSource -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.scheduling import handle_scheduling_exception from flexmeasures.tests.utils import get_test_sensor from flexmeasures.utils.unit_utils import ur diff --git a/flexmeasures/api/v3_0/tests/test_sensor_schedules_fresh_db.py b/flexmeasures/api/v3_0/tests/test_sensor_schedules_fresh_db.py index 9ee352bb4c..3b4db45fb1 100644 --- a/flexmeasures/api/v3_0/tests/test_sensor_schedules_fresh_db.py +++ b/flexmeasures/api/v3_0/tests/test_sensor_schedules_fresh_db.py @@ -11,7 +11,7 @@ from flexmeasures.data.models.generic_assets import GenericAsset from flexmeasures.data.models.planning.utils import get_power_values from flexmeasures.data.models.time_series import Sensor, TimedBelief -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.scheduling import ( handle_scheduling_exception, get_data_source_for_job, diff --git a/flexmeasures/cli/jobs.py b/flexmeasures/cli/jobs.py index d0df364122..e0d97e217a 100644 --- a/flexmeasures/cli/jobs.py +++ b/flexmeasures/cli/jobs.py @@ -31,6 +31,7 @@ from flexmeasures.data.schemas import AssetIdField, SensorIdField from flexmeasures.data.services.scheduling import handle_scheduling_exception from flexmeasures.data.services.forecasting import handle_forecasting_exception +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.cli.utils import MsgStyle from flexmeasures.utils.flexmeasures_inflection import join_words_into_a_list @@ -58,16 +59,37 @@ def fm_jobs(): required=True, help="Job UUID of the job you want to run.", ) -def run_job(job_id: str): +@click.option( + "--dry-run", + is_flag=False, + help=( + "Run the job function directly, without executing it as an RQ job. " + "This skips RQ bookkeeping such as job metadata updates, registry transitions, " + "and worker context. " + "However, if the job itself saved data to the database then that still happens." + ), +) +def run_job(job_id: str, dry_run: bool): """ - Run a single job. + Run a single RQ job by its UUID. We use the app context to find out which redis queues to use. + + By default, the job is executed using RQ's normal execution mechanism, + meaning it runs with a worker context and can update job metadata, move between RQ registries and record its result. + + With the --dry-run flag, the job's callable is executed directly (job.func(**job.kwargs)) without RQ involvement. + This bypasses all RQ bookkeeping and is useful for debugging or inspecting job behavior without mutating job state. """ connection = app.queues["scheduling"].connection job = Job.fetch(job_id, connection=connection) - result = job.func(**job.kwargs) - click.echo(f"Job {job_id} finished with: {result}") + if dry_run: + result = job.func(**job.kwargs) + click.echo(f"Job {job_id} finished (dry-run) with: {result}") + else: + work_on_rq(app.queues["scheduling"], job) + result = job.perform() + click.echo(f"Job {job_id} finished with: {result}") @fm_jobs.command("run-worker") diff --git a/flexmeasures/data/models/planning/storage.py b/flexmeasures/data/models/planning/storage.py index a0d6f8da19..f31fdb2f16 100644 --- a/flexmeasures/data/models/planning/storage.py +++ b/flexmeasures/data/models/planning/storage.py @@ -1332,6 +1332,12 @@ def compute(self, skip_validation: bool = False) -> SchedulerOutputType: for d, sensor in enumerate(sensors) if sensor is not None } + # Obtain the aggregate power schedule, too, if the flex-context states the associated sensor + aggregate_power_sensor = self.flex_context.get("aggregate_power", None) + if isinstance(aggregate_power_sensor, Sensor): + storage_schedule[aggregate_power_sensor] = pd.concat( + ems_schedule, axis=1 + ).sum(axis=1) # Convert each device schedule to the unit of the device's power sensor storage_schedule = { @@ -1341,7 +1347,7 @@ def compute(self, skip_validation: bool = False) -> SchedulerOutputType: sensor.unit, event_resolution=sensor.event_resolution, ) - for sensor in sensors + for sensor in storage_schedule.keys() if sensor is not None } @@ -1378,7 +1384,7 @@ def compute(self, skip_validation: bool = False) -> SchedulerOutputType: sensor: storage_schedule[sensor] .resample(sensor.event_resolution) .mean() - for sensor in sensors + for sensor in storage_schedule.keys() if sensor is not None } @@ -1386,7 +1392,7 @@ def compute(self, skip_validation: bool = False) -> SchedulerOutputType: if self.round_to_decimals: storage_schedule = { sensor: storage_schedule[sensor].round(self.round_to_decimals) - for sensor in sensors + for sensor in storage_schedule.keys() if sensor is not None } soc_schedule = { @@ -1402,7 +1408,7 @@ def compute(self, skip_validation: bool = False) -> SchedulerOutputType: "data": storage_schedule[sensor], "unit": sensor.unit, } - for sensor in sensors + for sensor in storage_schedule.keys() if sensor is not None ] commitment_costs = [ diff --git a/flexmeasures/data/schemas/scheduling/__init__.py b/flexmeasures/data/schemas/scheduling/__init__.py index 763bd08f50..29f5761866 100644 --- a/flexmeasures/data/schemas/scheduling/__init__.py +++ b/flexmeasures/data/schemas/scheduling/__init__.py @@ -289,6 +289,11 @@ class FlexContextSchema(Schema): data_key="inflexible-device-sensors", metadata=metadata.INFLEXIBLE_DEVICE_SENSORS.to_dict(), ) + aggregate_power = VariableQuantityField( + to_unit="MW", + data_key="aggregate-power", + required=False, + ) def set_default_breach_prices( self, data: dict, fields: list[str], price: ur.Quantity diff --git a/flexmeasures/data/services/scheduling.py b/flexmeasures/data/services/scheduling.py index 55423287ca..5c408890fc 100644 --- a/flexmeasures/data/services/scheduling.py +++ b/flexmeasures/data/services/scheduling.py @@ -44,7 +44,7 @@ ) -def load_custom_scheduler(scheduler_specs: dict) -> type: +def load_custom_scheduler(scheduler_specs: dict | str) -> type: """ Read in custom scheduling spec. Attempt to load the Scheduler class to use. @@ -59,12 +59,28 @@ def load_custom_scheduler(scheduler_specs: dict) -> type: "class": "NameOfSchedulerClass", } + or if the scheduler is already subclassing flexmeasures.Scheduler, simply: + + "NameOfSchedulerClass" + """ - assert isinstance( - scheduler_specs, dict - ), f"Scheduler specs is {type(scheduler_specs)}, should be a dict" - assert "module" in scheduler_specs, "scheduler specs have no 'module'." - assert "class" in scheduler_specs, "scheduler specs have no 'class'" + if isinstance(scheduler_specs, dict): + assert "module" in scheduler_specs, "scheduler specs have no 'module'." + assert "class" in scheduler_specs, "scheduler specs have no 'class'" + elif isinstance(scheduler_specs, str): + scheduler_class = current_app.data_generators["scheduler"].get(scheduler_specs) + scheduler_specs = { + "class": scheduler_class.__name__, + "module": scheduler_class.__module__, + } + if scheduler_specs is None: + raise ValueError( + f"Scheduler {scheduler_specs} does not seem to be registered." + ) + else: + raise TypeError( + f"Scheduler specs is {type(scheduler_specs)}, should be a dict or str" + ) scheduler_name = scheduler_specs["class"] diff --git a/flexmeasures/data/tests/test_forecasting_jobs.py b/flexmeasures/data/tests/test_forecasting_jobs.py index b20cc1aac8..7901aadb2e 100644 --- a/flexmeasures/data/tests/test_forecasting_jobs.py +++ b/flexmeasures/data/tests/test_forecasting_jobs.py @@ -9,7 +9,7 @@ from flexmeasures.data.models.data_sources import DataSource from flexmeasures.data.models.time_series import Sensor, TimedBelief -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.forecasting import ( create_forecasting_jobs, handle_forecasting_exception, diff --git a/flexmeasures/data/tests/test_forecasting_jobs_fresh_db.py b/flexmeasures/data/tests/test_forecasting_jobs_fresh_db.py index 19cf6a758a..25e862fd1f 100644 --- a/flexmeasures/data/tests/test_forecasting_jobs_fresh_db.py +++ b/flexmeasures/data/tests/test_forecasting_jobs_fresh_db.py @@ -14,7 +14,7 @@ check_failures, get_data_source, ) -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.utils.time_utils import as_server_time diff --git a/flexmeasures/data/tests/test_scheduling_jobs.py b/flexmeasures/data/tests/test_scheduling_jobs.py index f2a9198a3b..7384bd4efd 100644 --- a/flexmeasures/data/tests/test_scheduling_jobs.py +++ b/flexmeasures/data/tests/test_scheduling_jobs.py @@ -14,7 +14,8 @@ from flexmeasures.data.models.planning.utils import initialize_series from flexmeasures.data.models.data_sources import DataSource from flexmeasures.data.models.time_series import TimedBelief -from flexmeasures.data.tests.utils import work_on_rq, exception_reporter +from flexmeasures.data.tests.utils import exception_reporter +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.scheduling import ( create_scheduling_job, load_custom_scheduler, diff --git a/flexmeasures/data/tests/test_scheduling_jobs_fresh_db.py b/flexmeasures/data/tests/test_scheduling_jobs_fresh_db.py index 096fa7ebd2..812bf44301 100644 --- a/flexmeasures/data/tests/test_scheduling_jobs_fresh_db.py +++ b/flexmeasures/data/tests/test_scheduling_jobs_fresh_db.py @@ -7,7 +7,8 @@ from flexmeasures.data.models.data_sources import DataSource from flexmeasures.data.models.time_series import TimedBelief from flexmeasures.data.services.scheduling import create_scheduling_job -from flexmeasures.data.tests.utils import work_on_rq, exception_reporter +from flexmeasures.data.tests.utils import exception_reporter +from flexmeasures.utils.job_utils import work_on_rq def test_scheduling_a_charging_station( diff --git a/flexmeasures/data/tests/test_scheduling_repeated_jobs.py b/flexmeasures/data/tests/test_scheduling_repeated_jobs.py index 61bf291054..c7ad33eea9 100644 --- a/flexmeasures/data/tests/test_scheduling_repeated_jobs.py +++ b/flexmeasures/data/tests/test_scheduling_repeated_jobs.py @@ -12,7 +12,8 @@ from flexmeasures.data.models.data_sources import DataSource from flexmeasures.data.models.generic_assets import GenericAsset from flexmeasures.data.models.time_series import Sensor -from flexmeasures.data.tests.utils import work_on_rq, exception_reporter +from flexmeasures.data.tests.utils import exception_reporter +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.scheduling import create_scheduling_job from flexmeasures.data.services.utils import hash_function_arguments, job_cache diff --git a/flexmeasures/data/tests/test_scheduling_repeated_jobs_fresh_db.py b/flexmeasures/data/tests/test_scheduling_repeated_jobs_fresh_db.py index 98e7533c0d..63d5a4424a 100644 --- a/flexmeasures/data/tests/test_scheduling_repeated_jobs_fresh_db.py +++ b/flexmeasures/data/tests/test_scheduling_repeated_jobs_fresh_db.py @@ -4,7 +4,8 @@ import pytz -from flexmeasures.data.tests.utils import work_on_rq, exception_reporter +from flexmeasures.data.tests.utils import exception_reporter +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.scheduling import create_scheduling_job from flexmeasures.data.models.planning import Scheduler from flexmeasures.data.services.scheduling import load_custom_scheduler diff --git a/flexmeasures/data/tests/test_scheduling_sequential.py b/flexmeasures/data/tests/test_scheduling_sequential.py index 3e0f17a197..254c386c86 100644 --- a/flexmeasures/data/tests/test_scheduling_sequential.py +++ b/flexmeasures/data/tests/test_scheduling_sequential.py @@ -4,7 +4,7 @@ import pandas as pd from rq.job import Job from flexmeasures.data.services.scheduling import create_sequential_scheduling_job -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.scheduling import handle_scheduling_exception from flexmeasures.data.models.time_series import Sensor diff --git a/flexmeasures/data/tests/test_scheduling_simultaneous.py b/flexmeasures/data/tests/test_scheduling_simultaneous.py index 5e31e598f1..57671f4fa6 100644 --- a/flexmeasures/data/tests/test_scheduling_simultaneous.py +++ b/flexmeasures/data/tests/test_scheduling_simultaneous.py @@ -3,7 +3,7 @@ import numpy as np import pandas as pd from flexmeasures.data.services.scheduling import create_simultaneous_scheduling_job -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.models.time_series import Sensor diff --git a/flexmeasures/data/tests/test_train_predict_pipeline.py b/flexmeasures/data/tests/test_train_predict_pipeline.py index 06979c8c88..5af5722a47 100644 --- a/flexmeasures/data/tests/test_train_predict_pipeline.py +++ b/flexmeasures/data/tests/test_train_predict_pipeline.py @@ -10,7 +10,7 @@ from flexmeasures.data.models.forecasting.pipelines import TrainPredictPipeline from flexmeasures.data.models.forecasting.exceptions import CustomException -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.forecasting import handle_forecasting_exception diff --git a/flexmeasures/data/tests/utils.py b/flexmeasures/data/tests/utils.py index c6df948a10..ec022ade49 100644 --- a/flexmeasures/data/tests/utils.py +++ b/flexmeasures/data/tests/utils.py @@ -1,29 +1,10 @@ -import os +from __future__ import annotations + from traceback import print_tb import click -def work_on_rq(redis_queue, exc_handler=None, max_jobs=None): - - # we only want this import distinction to matter when we actually are testing - if os.name == "nt": - from rq_win import WindowsWorker as SimpleWorker - else: - from rq import SimpleWorker - - exc_handlers = [] - if exc_handler is not None: - exc_handlers.append(exc_handler) - print("STARTING SIMPLE RQ WORKER, seeing %d job(s)" % redis_queue.count) - worker = SimpleWorker( - [redis_queue], - connection=redis_queue.connection, - exception_handlers=exc_handlers, - ) - worker.work(burst=True, max_jobs=max_jobs) - - def exception_reporter(job, exc_type, exc_value, traceback): print_tb(traceback) click.echo("HANDLING RQ WORKER EXCEPTION: %s:%s\n" % (exc_type, exc_value)) diff --git a/flexmeasures/utils/job_utils.py b/flexmeasures/utils/job_utils.py new file mode 100644 index 0000000000..6cf5d11b25 --- /dev/null +++ b/flexmeasures/utils/job_utils.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +import os + +from rq import Queue +from rq.job import Job + + +def work_on_rq( + redis_queue: Queue, job: Job | str | None = None, exc_handler=None, max_jobs=None +): + + # we only want this import distinction to matter when we actually are testing + if os.name == "nt": + from rq_win import WindowsWorker as SimpleWorker + else: + from rq import SimpleWorker + + exc_handlers = [] + if exc_handler is not None: + exc_handlers.append(exc_handler) + print("STARTING SIMPLE RQ WORKER, seeing %d job(s)" % redis_queue.count) + worker = SimpleWorker( + [redis_queue], + connection=redis_queue.connection, + exception_handlers=exc_handlers, + ) + + if job: + if isinstance(job, str): + job = Job.fetch(job, connection=redis_queue.connection) + worker.perform_job(job, redis_queue) + else: + worker.work(burst=True, max_jobs=max_jobs)