From 055f8df325cd0f36f35b18c6c0571a496edecb9c Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 26 Jan 2026 09:55:34 +0100 Subject: [PATCH 1/5] fix: mutate job state Signed-off-by: F.N. Claessen --- flexmeasures/cli/jobs.py | 4 +++- flexmeasures/data/tests/utils.py | 16 ++++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/flexmeasures/cli/jobs.py b/flexmeasures/cli/jobs.py index d0df364122..41ef7b1d4c 100644 --- a/flexmeasures/cli/jobs.py +++ b/flexmeasures/cli/jobs.py @@ -31,6 +31,7 @@ from flexmeasures.data.schemas import AssetIdField, SensorIdField from flexmeasures.data.services.scheduling import handle_scheduling_exception from flexmeasures.data.services.forecasting import handle_forecasting_exception +from flexmeasures.data.tests.utils import work_on_rq from flexmeasures.cli.utils import MsgStyle from flexmeasures.utils.flexmeasures_inflection import join_words_into_a_list @@ -66,7 +67,8 @@ def run_job(job_id: str): """ connection = app.queues["scheduling"].connection job = Job.fetch(job_id, connection=connection) - result = job.func(**job.kwargs) + work_on_rq(app.queues["scheduling"], job) + result = job.perform() click.echo(f"Job {job_id} finished with: {result}") diff --git a/flexmeasures/data/tests/utils.py b/flexmeasures/data/tests/utils.py index c6df948a10..02c2deed43 100644 --- a/flexmeasures/data/tests/utils.py +++ b/flexmeasures/data/tests/utils.py @@ -1,10 +1,16 @@ +from __future__ import annotations + import os from traceback import print_tb import click +from rq import Queue +from rq.job import Job -def work_on_rq(redis_queue, exc_handler=None, max_jobs=None): +def work_on_rq( + redis_queue: Queue, job: Job | str | None = None, exc_handler=None, max_jobs=None +): # we only want this import distinction to matter when we actually are testing if os.name == "nt": @@ -21,7 +27,13 @@ def work_on_rq(redis_queue, exc_handler=None, max_jobs=None): connection=redis_queue.connection, exception_handlers=exc_handlers, ) - worker.work(burst=True, max_jobs=max_jobs) + + if job: + if isinstance(job, str): + job = Job.fetch(job, connection=redis_queue.connection) + worker.perform_job(job, redis_queue) + else: + worker.work(burst=True, max_jobs=max_jobs) def exception_reporter(job, exc_type, exc_value, traceback): From 90e266764113dd301109276adb18c192973ade1f Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 26 Jan 2026 10:02:49 +0100 Subject: [PATCH 2/5] refactor: move util function (now no longer exclusively used in tests) Signed-off-by: F.N. Claessen --- .../tests/test_asset_schedules_fresh_db.py | 2 +- .../api/v3_0/tests/test_forecasting_api.py | 2 +- .../api/v3_0/tests/test_sensor_schedules.py | 2 +- .../tests/test_sensor_schedules_fresh_db.py | 2 +- flexmeasures/cli/jobs.py | 2 +- .../data/tests/test_forecasting_jobs.py | 2 +- .../tests/test_forecasting_jobs_fresh_db.py | 2 +- .../data/tests/test_scheduling_jobs.py | 3 +- .../tests/test_scheduling_jobs_fresh_db.py | 3 +- .../tests/test_scheduling_repeated_jobs.py | 3 +- .../test_scheduling_repeated_jobs_fresh_db.py | 3 +- .../data/tests/test_scheduling_sequential.py | 2 +- .../tests/test_scheduling_simultaneous.py | 2 +- .../data/tests/test_train_predict_pipeline.py | 2 +- flexmeasures/data/tests/utils.py | 31 ----------------- flexmeasures/utils/job_utils.py | 34 +++++++++++++++++++ 16 files changed, 52 insertions(+), 45 deletions(-) create mode 100644 flexmeasures/utils/job_utils.py diff --git a/flexmeasures/api/v3_0/tests/test_asset_schedules_fresh_db.py b/flexmeasures/api/v3_0/tests/test_asset_schedules_fresh_db.py index db5f82fcdc..0ebf0548ef 100644 --- a/flexmeasures/api/v3_0/tests/test_asset_schedules_fresh_db.py +++ b/flexmeasures/api/v3_0/tests/test_asset_schedules_fresh_db.py @@ -11,7 +11,7 @@ from flexmeasures import Sensor from flexmeasures.api.v3_0.tests.utils import message_for_trigger_schedule from flexmeasures.data.models.planning.tests.utils import check_constraints -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.scheduling import ( handle_scheduling_exception, get_data_source_for_job, diff --git a/flexmeasures/api/v3_0/tests/test_forecasting_api.py b/flexmeasures/api/v3_0/tests/test_forecasting_api.py index f4e4460523..a621be8290 100644 --- a/flexmeasures/api/v3_0/tests/test_forecasting_api.py +++ b/flexmeasures/api/v3_0/tests/test_forecasting_api.py @@ -6,7 +6,7 @@ get_data_source_for_job, ) from rq.job import Job -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.api.tests.utils import get_auth_token from flexmeasures.data.services.forecasting import handle_forecasting_exception from flexmeasures.data.models.forecasting.pipelines import TrainPredictPipeline diff --git a/flexmeasures/api/v3_0/tests/test_sensor_schedules.py b/flexmeasures/api/v3_0/tests/test_sensor_schedules.py index 5710ee493e..0b0e8c14ee 100644 --- a/flexmeasures/api/v3_0/tests/test_sensor_schedules.py +++ b/flexmeasures/api/v3_0/tests/test_sensor_schedules.py @@ -9,7 +9,7 @@ from flexmeasures.api.tests.utils import check_deprecation from flexmeasures.api.v3_0.tests.utils import message_for_trigger_schedule from flexmeasures.data.models.data_sources import DataSource -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.scheduling import handle_scheduling_exception from flexmeasures.tests.utils import get_test_sensor from flexmeasures.utils.unit_utils import ur diff --git a/flexmeasures/api/v3_0/tests/test_sensor_schedules_fresh_db.py b/flexmeasures/api/v3_0/tests/test_sensor_schedules_fresh_db.py index 9ee352bb4c..3b4db45fb1 100644 --- a/flexmeasures/api/v3_0/tests/test_sensor_schedules_fresh_db.py +++ b/flexmeasures/api/v3_0/tests/test_sensor_schedules_fresh_db.py @@ -11,7 +11,7 @@ from flexmeasures.data.models.generic_assets import GenericAsset from flexmeasures.data.models.planning.utils import get_power_values from flexmeasures.data.models.time_series import Sensor, TimedBelief -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.scheduling import ( handle_scheduling_exception, get_data_source_for_job, diff --git a/flexmeasures/cli/jobs.py b/flexmeasures/cli/jobs.py index 41ef7b1d4c..86bb184038 100644 --- a/flexmeasures/cli/jobs.py +++ b/flexmeasures/cli/jobs.py @@ -31,7 +31,7 @@ from flexmeasures.data.schemas import AssetIdField, SensorIdField from flexmeasures.data.services.scheduling import handle_scheduling_exception from flexmeasures.data.services.forecasting import handle_forecasting_exception -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.cli.utils import MsgStyle from flexmeasures.utils.flexmeasures_inflection import join_words_into_a_list diff --git a/flexmeasures/data/tests/test_forecasting_jobs.py b/flexmeasures/data/tests/test_forecasting_jobs.py index b20cc1aac8..7901aadb2e 100644 --- a/flexmeasures/data/tests/test_forecasting_jobs.py +++ b/flexmeasures/data/tests/test_forecasting_jobs.py @@ -9,7 +9,7 @@ from flexmeasures.data.models.data_sources import DataSource from flexmeasures.data.models.time_series import Sensor, TimedBelief -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.forecasting import ( create_forecasting_jobs, handle_forecasting_exception, diff --git a/flexmeasures/data/tests/test_forecasting_jobs_fresh_db.py b/flexmeasures/data/tests/test_forecasting_jobs_fresh_db.py index 19cf6a758a..25e862fd1f 100644 --- a/flexmeasures/data/tests/test_forecasting_jobs_fresh_db.py +++ b/flexmeasures/data/tests/test_forecasting_jobs_fresh_db.py @@ -14,7 +14,7 @@ check_failures, get_data_source, ) -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.utils.time_utils import as_server_time diff --git a/flexmeasures/data/tests/test_scheduling_jobs.py b/flexmeasures/data/tests/test_scheduling_jobs.py index f2a9198a3b..7384bd4efd 100644 --- a/flexmeasures/data/tests/test_scheduling_jobs.py +++ b/flexmeasures/data/tests/test_scheduling_jobs.py @@ -14,7 +14,8 @@ from flexmeasures.data.models.planning.utils import initialize_series from flexmeasures.data.models.data_sources import DataSource from flexmeasures.data.models.time_series import TimedBelief -from flexmeasures.data.tests.utils import work_on_rq, exception_reporter +from flexmeasures.data.tests.utils import exception_reporter +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.scheduling import ( create_scheduling_job, load_custom_scheduler, diff --git a/flexmeasures/data/tests/test_scheduling_jobs_fresh_db.py b/flexmeasures/data/tests/test_scheduling_jobs_fresh_db.py index 096fa7ebd2..812bf44301 100644 --- a/flexmeasures/data/tests/test_scheduling_jobs_fresh_db.py +++ b/flexmeasures/data/tests/test_scheduling_jobs_fresh_db.py @@ -7,7 +7,8 @@ from flexmeasures.data.models.data_sources import DataSource from flexmeasures.data.models.time_series import TimedBelief from flexmeasures.data.services.scheduling import create_scheduling_job -from flexmeasures.data.tests.utils import work_on_rq, exception_reporter +from flexmeasures.data.tests.utils import exception_reporter +from flexmeasures.utils.job_utils import work_on_rq def test_scheduling_a_charging_station( diff --git a/flexmeasures/data/tests/test_scheduling_repeated_jobs.py b/flexmeasures/data/tests/test_scheduling_repeated_jobs.py index 61bf291054..c7ad33eea9 100644 --- a/flexmeasures/data/tests/test_scheduling_repeated_jobs.py +++ b/flexmeasures/data/tests/test_scheduling_repeated_jobs.py @@ -12,7 +12,8 @@ from flexmeasures.data.models.data_sources import DataSource from flexmeasures.data.models.generic_assets import GenericAsset from flexmeasures.data.models.time_series import Sensor -from flexmeasures.data.tests.utils import work_on_rq, exception_reporter +from flexmeasures.data.tests.utils import exception_reporter +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.scheduling import create_scheduling_job from flexmeasures.data.services.utils import hash_function_arguments, job_cache diff --git a/flexmeasures/data/tests/test_scheduling_repeated_jobs_fresh_db.py b/flexmeasures/data/tests/test_scheduling_repeated_jobs_fresh_db.py index 98e7533c0d..63d5a4424a 100644 --- a/flexmeasures/data/tests/test_scheduling_repeated_jobs_fresh_db.py +++ b/flexmeasures/data/tests/test_scheduling_repeated_jobs_fresh_db.py @@ -4,7 +4,8 @@ import pytz -from flexmeasures.data.tests.utils import work_on_rq, exception_reporter +from flexmeasures.data.tests.utils import exception_reporter +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.scheduling import create_scheduling_job from flexmeasures.data.models.planning import Scheduler from flexmeasures.data.services.scheduling import load_custom_scheduler diff --git a/flexmeasures/data/tests/test_scheduling_sequential.py b/flexmeasures/data/tests/test_scheduling_sequential.py index 3e0f17a197..254c386c86 100644 --- a/flexmeasures/data/tests/test_scheduling_sequential.py +++ b/flexmeasures/data/tests/test_scheduling_sequential.py @@ -4,7 +4,7 @@ import pandas as pd from rq.job import Job from flexmeasures.data.services.scheduling import create_sequential_scheduling_job -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.scheduling import handle_scheduling_exception from flexmeasures.data.models.time_series import Sensor diff --git a/flexmeasures/data/tests/test_scheduling_simultaneous.py b/flexmeasures/data/tests/test_scheduling_simultaneous.py index 5e31e598f1..57671f4fa6 100644 --- a/flexmeasures/data/tests/test_scheduling_simultaneous.py +++ b/flexmeasures/data/tests/test_scheduling_simultaneous.py @@ -3,7 +3,7 @@ import numpy as np import pandas as pd from flexmeasures.data.services.scheduling import create_simultaneous_scheduling_job -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.models.time_series import Sensor diff --git a/flexmeasures/data/tests/test_train_predict_pipeline.py b/flexmeasures/data/tests/test_train_predict_pipeline.py index 06979c8c88..5af5722a47 100644 --- a/flexmeasures/data/tests/test_train_predict_pipeline.py +++ b/flexmeasures/data/tests/test_train_predict_pipeline.py @@ -10,7 +10,7 @@ from flexmeasures.data.models.forecasting.pipelines import TrainPredictPipeline from flexmeasures.data.models.forecasting.exceptions import CustomException -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.forecasting import handle_forecasting_exception diff --git a/flexmeasures/data/tests/utils.py b/flexmeasures/data/tests/utils.py index 02c2deed43..ec022ade49 100644 --- a/flexmeasures/data/tests/utils.py +++ b/flexmeasures/data/tests/utils.py @@ -1,39 +1,8 @@ from __future__ import annotations -import os from traceback import print_tb import click -from rq import Queue -from rq.job import Job - - -def work_on_rq( - redis_queue: Queue, job: Job | str | None = None, exc_handler=None, max_jobs=None -): - - # we only want this import distinction to matter when we actually are testing - if os.name == "nt": - from rq_win import WindowsWorker as SimpleWorker - else: - from rq import SimpleWorker - - exc_handlers = [] - if exc_handler is not None: - exc_handlers.append(exc_handler) - print("STARTING SIMPLE RQ WORKER, seeing %d job(s)" % redis_queue.count) - worker = SimpleWorker( - [redis_queue], - connection=redis_queue.connection, - exception_handlers=exc_handlers, - ) - - if job: - if isinstance(job, str): - job = Job.fetch(job, connection=redis_queue.connection) - worker.perform_job(job, redis_queue) - else: - worker.work(burst=True, max_jobs=max_jobs) def exception_reporter(job, exc_type, exc_value, traceback): diff --git a/flexmeasures/utils/job_utils.py b/flexmeasures/utils/job_utils.py new file mode 100644 index 0000000000..6cf5d11b25 --- /dev/null +++ b/flexmeasures/utils/job_utils.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +import os + +from rq import Queue +from rq.job import Job + + +def work_on_rq( + redis_queue: Queue, job: Job | str | None = None, exc_handler=None, max_jobs=None +): + + # we only want this import distinction to matter when we actually are testing + if os.name == "nt": + from rq_win import WindowsWorker as SimpleWorker + else: + from rq import SimpleWorker + + exc_handlers = [] + if exc_handler is not None: + exc_handlers.append(exc_handler) + print("STARTING SIMPLE RQ WORKER, seeing %d job(s)" % redis_queue.count) + worker = SimpleWorker( + [redis_queue], + connection=redis_queue.connection, + exception_handlers=exc_handlers, + ) + + if job: + if isinstance(job, str): + job = Job.fetch(job, connection=redis_queue.connection) + worker.perform_job(job, redis_queue) + else: + worker.work(burst=True, max_jobs=max_jobs) From 93ddfc3348f282a37a5099f65a725e48afb7914d Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 26 Jan 2026 11:39:59 +0100 Subject: [PATCH 3/5] docs: changelog entries Signed-off-by: F.N. Claessen --- documentation/changelog.rst | 1 + documentation/cli/change_log.rst | 1 + 2 files changed, 2 insertions(+) diff --git a/documentation/changelog.rst b/documentation/changelog.rst index 296d8aab76..373bb21fc2 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -48,6 +48,7 @@ Bugfixes * Fix: flex-context dialogue is empty when flex-context has two booleans with the same value [see `PR #1907 `_] * Fixed toast notification display on the sensor detail upload form [see `PR #1921 `_] * Fix: return validation errors instead of database errors for API/CLI fields that map to database objects [see `PR #1923 `_] +* Mutate job state when running ``flexmeasures jobs run-job `` [see `PR #1929 `_] v0.30.3 | January 12, 2026 diff --git a/documentation/cli/change_log.rst b/documentation/cli/change_log.rst index eb21a0354c..dec8e00cf4 100644 --- a/documentation/cli/change_log.rst +++ b/documentation/cli/change_log.rst @@ -11,6 +11,7 @@ since v0.31.0 | February XX, 2026 * Let ``flexmeasures add schedule`` create schedules with only information known prior to some time using the ``prior`` option. * New ``-dry-run`` flag for ``flexmeasures add schedule`` to avoid saving anything (printing out the results instead). * Return validation errors instead of database errors for fields that map to database objects. +* Mutate job state when running ``flexmeasures jobs run-job ``, including updating metadata and moving between registries since v0.30.3 | January 12, 2026 ================================= From c860e9d81f247e1826a76fe2a6ffa56c18a14b03 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 26 Jan 2026 12:17:22 +0100 Subject: [PATCH 4/5] fix: move new kwarg to last position to be backwards compatible with using work_on_rq with positional arguments Signed-off-by: F.N. Claessen --- flexmeasures/utils/job_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/utils/job_utils.py b/flexmeasures/utils/job_utils.py index 6cf5d11b25..f1b5cbb029 100644 --- a/flexmeasures/utils/job_utils.py +++ b/flexmeasures/utils/job_utils.py @@ -7,7 +7,7 @@ def work_on_rq( - redis_queue: Queue, job: Job | str | None = None, exc_handler=None, max_jobs=None + redis_queue: Queue, exc_handler=None, max_jobs=None, job: Job | str | None = None, ): # we only want this import distinction to matter when we actually are testing From 667ae0227f0e2905f4479b77572f109b3c977dd4 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 26 Jan 2026 12:19:29 +0100 Subject: [PATCH 5/5] style: black Signed-off-by: F.N. Claessen --- flexmeasures/utils/job_utils.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flexmeasures/utils/job_utils.py b/flexmeasures/utils/job_utils.py index f1b5cbb029..19a32b9699 100644 --- a/flexmeasures/utils/job_utils.py +++ b/flexmeasures/utils/job_utils.py @@ -7,7 +7,10 @@ def work_on_rq( - redis_queue: Queue, exc_handler=None, max_jobs=None, job: Job | str | None = None, + redis_queue: Queue, + exc_handler=None, + max_jobs=None, + job: Job | str | None = None, ): # we only want this import distinction to matter when we actually are testing