diff --git a/documentation/changelog.rst b/documentation/changelog.rst index 296d8aab76..373bb21fc2 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -48,6 +48,7 @@ Bugfixes * Fix: flex-context dialogue is empty when flex-context has two booleans with the same value [see `PR #1907 `_] * Fixed toast notification display on the sensor detail upload form [see `PR #1921 `_] * Fix: return validation errors instead of database errors for API/CLI fields that map to database objects [see `PR #1923 `_] +* Mutate job state when running ``flexmeasures jobs run-job `` [see `PR #1929 `_] v0.30.3 | January 12, 2026 diff --git a/documentation/cli/change_log.rst b/documentation/cli/change_log.rst index eb21a0354c..dec8e00cf4 100644 --- a/documentation/cli/change_log.rst +++ b/documentation/cli/change_log.rst @@ -11,6 +11,7 @@ since v0.31.0 | February XX, 2026 * Let ``flexmeasures add schedule`` create schedules with only information known prior to some time using the ``prior`` option. * New ``-dry-run`` flag for ``flexmeasures add schedule`` to avoid saving anything (printing out the results instead). * Return validation errors instead of database errors for fields that map to database objects. +* Mutate job state when running ``flexmeasures jobs run-job ``, including updating metadata and moving between registries since v0.30.3 | January 12, 2026 ================================= diff --git a/flexmeasures/api/v3_0/tests/test_asset_schedules_fresh_db.py b/flexmeasures/api/v3_0/tests/test_asset_schedules_fresh_db.py index db5f82fcdc..0ebf0548ef 100644 --- a/flexmeasures/api/v3_0/tests/test_asset_schedules_fresh_db.py +++ b/flexmeasures/api/v3_0/tests/test_asset_schedules_fresh_db.py @@ -11,7 +11,7 @@ from flexmeasures import Sensor from flexmeasures.api.v3_0.tests.utils import message_for_trigger_schedule from flexmeasures.data.models.planning.tests.utils import check_constraints -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.scheduling import ( handle_scheduling_exception, get_data_source_for_job, diff --git a/flexmeasures/api/v3_0/tests/test_forecasting_api.py b/flexmeasures/api/v3_0/tests/test_forecasting_api.py index f4e4460523..a621be8290 100644 --- a/flexmeasures/api/v3_0/tests/test_forecasting_api.py +++ b/flexmeasures/api/v3_0/tests/test_forecasting_api.py @@ -6,7 +6,7 @@ get_data_source_for_job, ) from rq.job import Job -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.api.tests.utils import get_auth_token from flexmeasures.data.services.forecasting import handle_forecasting_exception from flexmeasures.data.models.forecasting.pipelines import TrainPredictPipeline diff --git a/flexmeasures/api/v3_0/tests/test_sensor_schedules.py b/flexmeasures/api/v3_0/tests/test_sensor_schedules.py index 5710ee493e..0b0e8c14ee 100644 --- a/flexmeasures/api/v3_0/tests/test_sensor_schedules.py +++ b/flexmeasures/api/v3_0/tests/test_sensor_schedules.py @@ -9,7 +9,7 @@ from flexmeasures.api.tests.utils import check_deprecation from flexmeasures.api.v3_0.tests.utils import message_for_trigger_schedule from flexmeasures.data.models.data_sources import DataSource -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.scheduling import handle_scheduling_exception from flexmeasures.tests.utils import get_test_sensor from flexmeasures.utils.unit_utils import ur diff --git a/flexmeasures/api/v3_0/tests/test_sensor_schedules_fresh_db.py b/flexmeasures/api/v3_0/tests/test_sensor_schedules_fresh_db.py index 9ee352bb4c..3b4db45fb1 100644 --- a/flexmeasures/api/v3_0/tests/test_sensor_schedules_fresh_db.py +++ b/flexmeasures/api/v3_0/tests/test_sensor_schedules_fresh_db.py @@ -11,7 +11,7 @@ from flexmeasures.data.models.generic_assets import GenericAsset from flexmeasures.data.models.planning.utils import get_power_values from flexmeasures.data.models.time_series import Sensor, TimedBelief -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.scheduling import ( handle_scheduling_exception, get_data_source_for_job, diff --git a/flexmeasures/cli/jobs.py b/flexmeasures/cli/jobs.py index d0df364122..86bb184038 100644 --- a/flexmeasures/cli/jobs.py +++ b/flexmeasures/cli/jobs.py @@ -31,6 +31,7 @@ from flexmeasures.data.schemas import AssetIdField, SensorIdField from flexmeasures.data.services.scheduling import handle_scheduling_exception from flexmeasures.data.services.forecasting import handle_forecasting_exception +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.cli.utils import MsgStyle from flexmeasures.utils.flexmeasures_inflection import join_words_into_a_list @@ -66,7 +67,8 @@ def run_job(job_id: str): """ connection = app.queues["scheduling"].connection job = Job.fetch(job_id, connection=connection) - result = job.func(**job.kwargs) + work_on_rq(app.queues["scheduling"], job) + result = job.perform() click.echo(f"Job {job_id} finished with: {result}") diff --git a/flexmeasures/data/tests/test_forecasting_jobs.py b/flexmeasures/data/tests/test_forecasting_jobs.py index b20cc1aac8..7901aadb2e 100644 --- a/flexmeasures/data/tests/test_forecasting_jobs.py +++ b/flexmeasures/data/tests/test_forecasting_jobs.py @@ -9,7 +9,7 @@ from flexmeasures.data.models.data_sources import DataSource from flexmeasures.data.models.time_series import Sensor, TimedBelief -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.forecasting import ( create_forecasting_jobs, handle_forecasting_exception, diff --git a/flexmeasures/data/tests/test_forecasting_jobs_fresh_db.py b/flexmeasures/data/tests/test_forecasting_jobs_fresh_db.py index 19cf6a758a..25e862fd1f 100644 --- a/flexmeasures/data/tests/test_forecasting_jobs_fresh_db.py +++ b/flexmeasures/data/tests/test_forecasting_jobs_fresh_db.py @@ -14,7 +14,7 @@ check_failures, get_data_source, ) -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.utils.time_utils import as_server_time diff --git a/flexmeasures/data/tests/test_scheduling_jobs.py b/flexmeasures/data/tests/test_scheduling_jobs.py index f2a9198a3b..7384bd4efd 100644 --- a/flexmeasures/data/tests/test_scheduling_jobs.py +++ b/flexmeasures/data/tests/test_scheduling_jobs.py @@ -14,7 +14,8 @@ from flexmeasures.data.models.planning.utils import initialize_series from flexmeasures.data.models.data_sources import DataSource from flexmeasures.data.models.time_series import TimedBelief -from flexmeasures.data.tests.utils import work_on_rq, exception_reporter +from flexmeasures.data.tests.utils import exception_reporter +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.scheduling import ( create_scheduling_job, load_custom_scheduler, diff --git a/flexmeasures/data/tests/test_scheduling_jobs_fresh_db.py b/flexmeasures/data/tests/test_scheduling_jobs_fresh_db.py index 096fa7ebd2..812bf44301 100644 --- a/flexmeasures/data/tests/test_scheduling_jobs_fresh_db.py +++ b/flexmeasures/data/tests/test_scheduling_jobs_fresh_db.py @@ -7,7 +7,8 @@ from flexmeasures.data.models.data_sources import DataSource from flexmeasures.data.models.time_series import TimedBelief from flexmeasures.data.services.scheduling import create_scheduling_job -from flexmeasures.data.tests.utils import work_on_rq, exception_reporter +from flexmeasures.data.tests.utils import exception_reporter +from flexmeasures.utils.job_utils import work_on_rq def test_scheduling_a_charging_station( diff --git a/flexmeasures/data/tests/test_scheduling_repeated_jobs.py b/flexmeasures/data/tests/test_scheduling_repeated_jobs.py index 61bf291054..c7ad33eea9 100644 --- a/flexmeasures/data/tests/test_scheduling_repeated_jobs.py +++ b/flexmeasures/data/tests/test_scheduling_repeated_jobs.py @@ -12,7 +12,8 @@ from flexmeasures.data.models.data_sources import DataSource from flexmeasures.data.models.generic_assets import GenericAsset from flexmeasures.data.models.time_series import Sensor -from flexmeasures.data.tests.utils import work_on_rq, exception_reporter +from flexmeasures.data.tests.utils import exception_reporter +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.scheduling import create_scheduling_job from flexmeasures.data.services.utils import hash_function_arguments, job_cache diff --git a/flexmeasures/data/tests/test_scheduling_repeated_jobs_fresh_db.py b/flexmeasures/data/tests/test_scheduling_repeated_jobs_fresh_db.py index 98e7533c0d..63d5a4424a 100644 --- a/flexmeasures/data/tests/test_scheduling_repeated_jobs_fresh_db.py +++ b/flexmeasures/data/tests/test_scheduling_repeated_jobs_fresh_db.py @@ -4,7 +4,8 @@ import pytz -from flexmeasures.data.tests.utils import work_on_rq, exception_reporter +from flexmeasures.data.tests.utils import exception_reporter +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.scheduling import create_scheduling_job from flexmeasures.data.models.planning import Scheduler from flexmeasures.data.services.scheduling import load_custom_scheduler diff --git a/flexmeasures/data/tests/test_scheduling_sequential.py b/flexmeasures/data/tests/test_scheduling_sequential.py index 3e0f17a197..254c386c86 100644 --- a/flexmeasures/data/tests/test_scheduling_sequential.py +++ b/flexmeasures/data/tests/test_scheduling_sequential.py @@ -4,7 +4,7 @@ import pandas as pd from rq.job import Job from flexmeasures.data.services.scheduling import create_sequential_scheduling_job -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.scheduling import handle_scheduling_exception from flexmeasures.data.models.time_series import Sensor diff --git a/flexmeasures/data/tests/test_scheduling_simultaneous.py b/flexmeasures/data/tests/test_scheduling_simultaneous.py index 5e31e598f1..57671f4fa6 100644 --- a/flexmeasures/data/tests/test_scheduling_simultaneous.py +++ b/flexmeasures/data/tests/test_scheduling_simultaneous.py @@ -3,7 +3,7 @@ import numpy as np import pandas as pd from flexmeasures.data.services.scheduling import create_simultaneous_scheduling_job -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.models.time_series import Sensor diff --git a/flexmeasures/data/tests/test_train_predict_pipeline.py b/flexmeasures/data/tests/test_train_predict_pipeline.py index 06979c8c88..5af5722a47 100644 --- a/flexmeasures/data/tests/test_train_predict_pipeline.py +++ b/flexmeasures/data/tests/test_train_predict_pipeline.py @@ -10,7 +10,7 @@ from flexmeasures.data.models.forecasting.pipelines import TrainPredictPipeline from flexmeasures.data.models.forecasting.exceptions import CustomException -from flexmeasures.data.tests.utils import work_on_rq +from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.forecasting import handle_forecasting_exception diff --git a/flexmeasures/data/tests/utils.py b/flexmeasures/data/tests/utils.py index c6df948a10..ec022ade49 100644 --- a/flexmeasures/data/tests/utils.py +++ b/flexmeasures/data/tests/utils.py @@ -1,29 +1,10 @@ -import os +from __future__ import annotations + from traceback import print_tb import click -def work_on_rq(redis_queue, exc_handler=None, max_jobs=None): - - # we only want this import distinction to matter when we actually are testing - if os.name == "nt": - from rq_win import WindowsWorker as SimpleWorker - else: - from rq import SimpleWorker - - exc_handlers = [] - if exc_handler is not None: - exc_handlers.append(exc_handler) - print("STARTING SIMPLE RQ WORKER, seeing %d job(s)" % redis_queue.count) - worker = SimpleWorker( - [redis_queue], - connection=redis_queue.connection, - exception_handlers=exc_handlers, - ) - worker.work(burst=True, max_jobs=max_jobs) - - def exception_reporter(job, exc_type, exc_value, traceback): print_tb(traceback) click.echo("HANDLING RQ WORKER EXCEPTION: %s:%s\n" % (exc_type, exc_value)) diff --git a/flexmeasures/utils/job_utils.py b/flexmeasures/utils/job_utils.py new file mode 100644 index 0000000000..19a32b9699 --- /dev/null +++ b/flexmeasures/utils/job_utils.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +import os + +from rq import Queue +from rq.job import Job + + +def work_on_rq( + redis_queue: Queue, + exc_handler=None, + max_jobs=None, + job: Job | str | None = None, +): + + # we only want this import distinction to matter when we actually are testing + if os.name == "nt": + from rq_win import WindowsWorker as SimpleWorker + else: + from rq import SimpleWorker + + exc_handlers = [] + if exc_handler is not None: + exc_handlers.append(exc_handler) + print("STARTING SIMPLE RQ WORKER, seeing %d job(s)" % redis_queue.count) + worker = SimpleWorker( + [redis_queue], + connection=redis_queue.connection, + exception_handlers=exc_handlers, + ) + + if job: + if isinstance(job, str): + job = Job.fetch(job, connection=redis_queue.connection) + worker.perform_job(job, redis_queue) + else: + worker.work(burst=True, max_jobs=max_jobs)