Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions documentation/plugin/customisation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,22 +61,14 @@ Finally, make your scheduler be the one that FlexMeasures will use for certain s
.. code-block:: python

from flexmeasures import Sensor

scheduler_specs = {
"module": "flexmeasures.data.tests.dummy_scheduler", # or a file path, see note below
"class": "DummyScheduler",
}

my_sensor = Sensor.query.filter(Sensor.name == "My power sensor on a flexible asset").one_or_none()
my_sensor.attributes["custom-scheduler"] = scheduler_specs
my_sensor.attributes["custom-scheduler"] = "DummyScheduler"


From now on, all schedules (see :ref:`tut_forecasting_scheduling`) which are requested for this sensor should
get computed by your custom function! For later lookup, the data will be linked to a new data source with the name "My Opinion".

.. note:: To describe the module, we used an importable module here (actually a custom scheduling function we use to test this).
You can also provide a full file path to the module, e.g. "/path/to/my_file.py".


Adding your own forecasting algorithm
-------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from flexmeasures import Sensor
from flexmeasures.api.v3_0.tests.utils import message_for_trigger_schedule
from flexmeasures.data.models.planning.tests.utils import check_constraints
from flexmeasures.data.tests.utils import work_on_rq
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.data.services.scheduling import (
handle_scheduling_exception,
get_data_source_for_job,
Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/api/v3_0/tests/test_forecasting_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
get_data_source_for_job,
)
from rq.job import Job
from flexmeasures.data.tests.utils import work_on_rq
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.api.tests.utils import get_auth_token
from flexmeasures.data.services.forecasting import handle_forecasting_exception
from flexmeasures.data.models.forecasting.pipelines import TrainPredictPipeline
Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/api/v3_0/tests/test_sensor_schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from flexmeasures.api.tests.utils import check_deprecation
from flexmeasures.api.v3_0.tests.utils import message_for_trigger_schedule
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.tests.utils import work_on_rq
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.data.services.scheduling import handle_scheduling_exception
from flexmeasures.tests.utils import get_test_sensor
from flexmeasures.utils.unit_utils import ur
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from flexmeasures.data.models.generic_assets import GenericAsset
from flexmeasures.data.models.planning.utils import get_power_values
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.tests.utils import work_on_rq
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.data.services.scheduling import (
handle_scheduling_exception,
get_data_source_for_job,
Expand Down
30 changes: 26 additions & 4 deletions flexmeasures/cli/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from flexmeasures.data.schemas import AssetIdField, SensorIdField
from flexmeasures.data.services.scheduling import handle_scheduling_exception
from flexmeasures.data.services.forecasting import handle_forecasting_exception
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.cli.utils import MsgStyle
from flexmeasures.utils.flexmeasures_inflection import join_words_into_a_list

Expand Down Expand Up @@ -58,16 +59,37 @@ def fm_jobs():
required=True,
help="Job UUID of the job you want to run.",
)
def run_job(job_id: str):
@click.option(
"--dry-run",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better: --direct-run

"dry" for me means nothing actually happens.

is_flag=False,
help=(
"Run the job function directly, without executing it as an RQ job. "
"This skips RQ bookkeeping such as job metadata updates, registry transitions, "
"and worker context. "
"However, if the job itself saved data to the database then that still happens."
),
)
def run_job(job_id: str, dry_run: bool):
"""
Run a single job.
Run a single RQ job by its UUID.

We use the app context to find out which redis queues to use.

By default, the job is executed using RQ's normal execution mechanism,
meaning it runs with a worker context and can update job metadata, move between RQ registries and record its result.

With the --dry-run flag, the job's callable is executed directly (job.func(**job.kwargs)) without RQ involvement.
This bypasses all RQ bookkeeping and is useful for debugging or inspecting job behavior without mutating job state.
"""
connection = app.queues["scheduling"].connection
job = Job.fetch(job_id, connection=connection)
result = job.func(**job.kwargs)
click.echo(f"Job {job_id} finished with: {result}")
if dry_run:
result = job.func(**job.kwargs)
click.echo(f"Job {job_id} finished (dry-run) with: {result}")
else:
work_on_rq(app.queues["scheduling"], job)
result = job.perform()
click.echo(f"Job {job_id} finished with: {result}")


@fm_jobs.command("run-worker")
Expand Down
14 changes: 10 additions & 4 deletions flexmeasures/data/models/planning/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1332,6 +1332,12 @@ def compute(self, skip_validation: bool = False) -> SchedulerOutputType:
for d, sensor in enumerate(sensors)
if sensor is not None
}
# Obtain the aggregate power schedule, too, if the flex-context states the associated sensor
aggregate_power_sensor = self.flex_context.get("aggregate_power", None)
if isinstance(aggregate_power_sensor, Sensor):
storage_schedule[aggregate_power_sensor] = pd.concat(
ems_schedule, axis=1
).sum(axis=1)

# Convert each device schedule to the unit of the device's power sensor
storage_schedule = {
Expand All @@ -1341,7 +1347,7 @@ def compute(self, skip_validation: bool = False) -> SchedulerOutputType:
sensor.unit,
event_resolution=sensor.event_resolution,
)
for sensor in sensors
for sensor in storage_schedule.keys()
if sensor is not None
}

Expand Down Expand Up @@ -1378,15 +1384,15 @@ def compute(self, skip_validation: bool = False) -> SchedulerOutputType:
sensor: storage_schedule[sensor]
.resample(sensor.event_resolution)
.mean()
for sensor in sensors
for sensor in storage_schedule.keys()
if sensor is not None
}

# Round schedule
if self.round_to_decimals:
storage_schedule = {
sensor: storage_schedule[sensor].round(self.round_to_decimals)
for sensor in sensors
for sensor in storage_schedule.keys()
if sensor is not None
}
soc_schedule = {
Expand All @@ -1402,7 +1408,7 @@ def compute(self, skip_validation: bool = False) -> SchedulerOutputType:
"data": storage_schedule[sensor],
"unit": sensor.unit,
}
for sensor in sensors
for sensor in storage_schedule.keys()
if sensor is not None
]
commitment_costs = [
Expand Down
5 changes: 5 additions & 0 deletions flexmeasures/data/schemas/scheduling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,11 @@ class FlexContextSchema(Schema):
data_key="inflexible-device-sensors",
metadata=metadata.INFLEXIBLE_DEVICE_SENSORS.to_dict(),
)
aggregate_power = VariableQuantityField(
to_unit="MW",
data_key="aggregate-power",
required=False,
)

def set_default_breach_prices(
self, data: dict, fields: list[str], price: ur.Quantity
Expand Down
28 changes: 22 additions & 6 deletions flexmeasures/data/services/scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
)


def load_custom_scheduler(scheduler_specs: dict) -> type:
def load_custom_scheduler(scheduler_specs: dict | str) -> type:
"""
Read in custom scheduling spec.
Attempt to load the Scheduler class to use.
Expand All @@ -59,12 +59,28 @@ def load_custom_scheduler(scheduler_specs: dict) -> type:
"class": "NameOfSchedulerClass",
}

or if the scheduler is already subclassing flexmeasures.Scheduler, simply:

"NameOfSchedulerClass"

"""
assert isinstance(
scheduler_specs, dict
), f"Scheduler specs is {type(scheduler_specs)}, should be a dict"
assert "module" in scheduler_specs, "scheduler specs have no 'module'."
assert "class" in scheduler_specs, "scheduler specs have no 'class'"
if isinstance(scheduler_specs, dict):
assert "module" in scheduler_specs, "scheduler specs have no 'module'."
assert "class" in scheduler_specs, "scheduler specs have no 'class'"
elif isinstance(scheduler_specs, str):
scheduler_class = current_app.data_generators["scheduler"].get(scheduler_specs)
scheduler_specs = {
"class": scheduler_class.__name__,
"module": scheduler_class.__module__,
}
if scheduler_specs is None:
raise ValueError(
f"Scheduler {scheduler_specs} does not seem to be registered."
)
else:
raise TypeError(
f"Scheduler specs is {type(scheduler_specs)}, should be a dict or str"
)

scheduler_name = scheduler_specs["class"]

Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/data/tests/test_forecasting_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.tests.utils import work_on_rq
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.data.services.forecasting import (
create_forecasting_jobs,
handle_forecasting_exception,
Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/data/tests/test_forecasting_jobs_fresh_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
check_failures,
get_data_source,
)
from flexmeasures.data.tests.utils import work_on_rq
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.utils.time_utils import as_server_time


Expand Down
3 changes: 2 additions & 1 deletion flexmeasures/data/tests/test_scheduling_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from flexmeasures.data.models.planning.utils import initialize_series
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.time_series import TimedBelief
from flexmeasures.data.tests.utils import work_on_rq, exception_reporter
from flexmeasures.data.tests.utils import exception_reporter
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.data.services.scheduling import (
create_scheduling_job,
load_custom_scheduler,
Expand Down
3 changes: 2 additions & 1 deletion flexmeasures/data/tests/test_scheduling_jobs_fresh_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.time_series import TimedBelief
from flexmeasures.data.services.scheduling import create_scheduling_job
from flexmeasures.data.tests.utils import work_on_rq, exception_reporter
from flexmeasures.data.tests.utils import exception_reporter
from flexmeasures.utils.job_utils import work_on_rq


def test_scheduling_a_charging_station(
Expand Down
3 changes: 2 additions & 1 deletion flexmeasures/data/tests/test_scheduling_repeated_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.generic_assets import GenericAsset
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.tests.utils import work_on_rq, exception_reporter
from flexmeasures.data.tests.utils import exception_reporter
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.data.services.scheduling import create_scheduling_job
from flexmeasures.data.services.utils import hash_function_arguments, job_cache

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

import pytz

from flexmeasures.data.tests.utils import work_on_rq, exception_reporter
from flexmeasures.data.tests.utils import exception_reporter
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.data.services.scheduling import create_scheduling_job
from flexmeasures.data.models.planning import Scheduler
from flexmeasures.data.services.scheduling import load_custom_scheduler
Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/data/tests/test_scheduling_sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pandas as pd
from rq.job import Job
from flexmeasures.data.services.scheduling import create_sequential_scheduling_job
from flexmeasures.data.tests.utils import work_on_rq
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.data.services.scheduling import handle_scheduling_exception
from flexmeasures.data.models.time_series import Sensor

Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/data/tests/test_scheduling_simultaneous.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import numpy as np
import pandas as pd
from flexmeasures.data.services.scheduling import create_simultaneous_scheduling_job
from flexmeasures.data.tests.utils import work_on_rq
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.data.models.time_series import Sensor


Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/data/tests/test_train_predict_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from flexmeasures.data.models.forecasting.pipelines import TrainPredictPipeline
from flexmeasures.data.models.forecasting.exceptions import CustomException
from flexmeasures.data.tests.utils import work_on_rq
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.data.services.forecasting import handle_forecasting_exception


Expand Down
23 changes: 2 additions & 21 deletions flexmeasures/data/tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,10 @@
import os
from __future__ import annotations

from traceback import print_tb

import click


def work_on_rq(redis_queue, exc_handler=None, max_jobs=None):

# we only want this import distinction to matter when we actually are testing
if os.name == "nt":
from rq_win import WindowsWorker as SimpleWorker
else:
from rq import SimpleWorker

exc_handlers = []
if exc_handler is not None:
exc_handlers.append(exc_handler)
print("STARTING SIMPLE RQ WORKER, seeing %d job(s)" % redis_queue.count)
worker = SimpleWorker(
[redis_queue],
connection=redis_queue.connection,
exception_handlers=exc_handlers,
)
worker.work(burst=True, max_jobs=max_jobs)


def exception_reporter(job, exc_type, exc_value, traceback):
print_tb(traceback)
click.echo("HANDLING RQ WORKER EXCEPTION: %s:%s\n" % (exc_type, exc_value))
34 changes: 34 additions & 0 deletions flexmeasures/utils/job_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations

import os

from rq import Queue
from rq.job import Job


def work_on_rq(
redis_queue: Queue, job: Job | str | None = None, exc_handler=None, max_jobs=None
):

# we only want this import distinction to matter when we actually are testing
if os.name == "nt":
from rq_win import WindowsWorker as SimpleWorker
else:
from rq import SimpleWorker

exc_handlers = []
if exc_handler is not None:
exc_handlers.append(exc_handler)
print("STARTING SIMPLE RQ WORKER, seeing %d job(s)" % redis_queue.count)
worker = SimpleWorker(
[redis_queue],
connection=redis_queue.connection,
exception_handlers=exc_handlers,
)

if job:
if isinstance(job, str):
job = Job.fetch(job, connection=redis_queue.connection)
worker.perform_job(job, redis_queue)
else:
worker.work(burst=True, max_jobs=max_jobs)
Loading