Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions documentation/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Bugfixes
* Fix: flex-context dialogue is empty when flex-context has two booleans with the same value [see `PR #1907 <https://www.github.com/FlexMeasures/flexmeasures/pull/1907>`_]
* Fixed toast notification display on the sensor detail upload form [see `PR #1921 <https://www.github.com/FlexMeasures/flexmeasures/pull/1921>`_]
* Fix: return validation errors instead of database errors for API/CLI fields that map to database objects [see `PR #1923 <https://www.github.com/FlexMeasures/flexmeasures/pull/1923>`_]
* Mutate job state when running ``flexmeasures jobs run-job <job ID>`` [see `PR #1929 <https://www.github.com/FlexMeasures/flexmeasures/pull/1929>`_]


v0.30.3 | January 12, 2026
Expand Down
1 change: 1 addition & 0 deletions documentation/cli/change_log.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ since v0.31.0 | February XX, 2026
* Let ``flexmeasures add schedule`` create schedules with only information known prior to some time using the ``prior`` option.
* New ``-dry-run`` flag for ``flexmeasures add schedule`` to avoid saving anything (printing out the results instead).
* Return validation errors instead of database errors for fields that map to database objects.
* Mutate job state when running ``flexmeasures jobs run-job <job ID>``, including updating metadata and moving between registries

since v0.30.3 | January 12, 2026
=================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from flexmeasures import Sensor
from flexmeasures.api.v3_0.tests.utils import message_for_trigger_schedule
from flexmeasures.data.models.planning.tests.utils import check_constraints
from flexmeasures.data.tests.utils import work_on_rq
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.data.services.scheduling import (
handle_scheduling_exception,
get_data_source_for_job,
Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/api/v3_0/tests/test_forecasting_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
get_data_source_for_job,
)
from rq.job import Job
from flexmeasures.data.tests.utils import work_on_rq
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.api.tests.utils import get_auth_token
from flexmeasures.data.services.forecasting import handle_forecasting_exception
from flexmeasures.data.models.forecasting.pipelines import TrainPredictPipeline
Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/api/v3_0/tests/test_sensor_schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from flexmeasures.api.tests.utils import check_deprecation
from flexmeasures.api.v3_0.tests.utils import message_for_trigger_schedule
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.tests.utils import work_on_rq
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.data.services.scheduling import handle_scheduling_exception
from flexmeasures.tests.utils import get_test_sensor
from flexmeasures.utils.unit_utils import ur
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from flexmeasures.data.models.generic_assets import GenericAsset
from flexmeasures.data.models.planning.utils import get_power_values
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.tests.utils import work_on_rq
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.data.services.scheduling import (
handle_scheduling_exception,
get_data_source_for_job,
Expand Down
4 changes: 3 additions & 1 deletion flexmeasures/cli/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from flexmeasures.data.schemas import AssetIdField, SensorIdField
from flexmeasures.data.services.scheduling import handle_scheduling_exception
from flexmeasures.data.services.forecasting import handle_forecasting_exception
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.cli.utils import MsgStyle
from flexmeasures.utils.flexmeasures_inflection import join_words_into_a_list

Expand Down Expand Up @@ -66,7 +67,8 @@ def run_job(job_id: str):
"""
connection = app.queues["scheduling"].connection
job = Job.fetch(job_id, connection=connection)
result = job.func(**job.kwargs)
work_on_rq(app.queues["scheduling"], job)
result = job.perform()
click.echo(f"Job {job_id} finished with: {result}")


Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/data/tests/test_forecasting_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.tests.utils import work_on_rq
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.data.services.forecasting import (
create_forecasting_jobs,
handle_forecasting_exception,
Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/data/tests/test_forecasting_jobs_fresh_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
check_failures,
get_data_source,
)
from flexmeasures.data.tests.utils import work_on_rq
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.utils.time_utils import as_server_time


Expand Down
3 changes: 2 additions & 1 deletion flexmeasures/data/tests/test_scheduling_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from flexmeasures.data.models.planning.utils import initialize_series
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.time_series import TimedBelief
from flexmeasures.data.tests.utils import work_on_rq, exception_reporter
from flexmeasures.data.tests.utils import exception_reporter
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.data.services.scheduling import (
create_scheduling_job,
load_custom_scheduler,
Expand Down
3 changes: 2 additions & 1 deletion flexmeasures/data/tests/test_scheduling_jobs_fresh_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.time_series import TimedBelief
from flexmeasures.data.services.scheduling import create_scheduling_job
from flexmeasures.data.tests.utils import work_on_rq, exception_reporter
from flexmeasures.data.tests.utils import exception_reporter
from flexmeasures.utils.job_utils import work_on_rq


def test_scheduling_a_charging_station(
Expand Down
3 changes: 2 additions & 1 deletion flexmeasures/data/tests/test_scheduling_repeated_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.generic_assets import GenericAsset
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.tests.utils import work_on_rq, exception_reporter
from flexmeasures.data.tests.utils import exception_reporter
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.data.services.scheduling import create_scheduling_job
from flexmeasures.data.services.utils import hash_function_arguments, job_cache

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

import pytz

from flexmeasures.data.tests.utils import work_on_rq, exception_reporter
from flexmeasures.data.tests.utils import exception_reporter
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.data.services.scheduling import create_scheduling_job
from flexmeasures.data.models.planning import Scheduler
from flexmeasures.data.services.scheduling import load_custom_scheduler
Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/data/tests/test_scheduling_sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pandas as pd
from rq.job import Job
from flexmeasures.data.services.scheduling import create_sequential_scheduling_job
from flexmeasures.data.tests.utils import work_on_rq
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.data.services.scheduling import handle_scheduling_exception
from flexmeasures.data.models.time_series import Sensor

Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/data/tests/test_scheduling_simultaneous.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import numpy as np
import pandas as pd
from flexmeasures.data.services.scheduling import create_simultaneous_scheduling_job
from flexmeasures.data.tests.utils import work_on_rq
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.data.models.time_series import Sensor


Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/data/tests/test_train_predict_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from flexmeasures.data.models.forecasting.pipelines import TrainPredictPipeline
from flexmeasures.data.models.forecasting.exceptions import CustomException
from flexmeasures.data.tests.utils import work_on_rq
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.data.services.forecasting import handle_forecasting_exception


Expand Down
23 changes: 2 additions & 21 deletions flexmeasures/data/tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,10 @@
import os
from __future__ import annotations

from traceback import print_tb

import click


def work_on_rq(redis_queue, exc_handler=None, max_jobs=None):

# we only want this import distinction to matter when we actually are testing
if os.name == "nt":
from rq_win import WindowsWorker as SimpleWorker
else:
from rq import SimpleWorker

exc_handlers = []
if exc_handler is not None:
exc_handlers.append(exc_handler)
print("STARTING SIMPLE RQ WORKER, seeing %d job(s)" % redis_queue.count)
worker = SimpleWorker(
[redis_queue],
connection=redis_queue.connection,
exception_handlers=exc_handlers,
)
worker.work(burst=True, max_jobs=max_jobs)


def exception_reporter(job, exc_type, exc_value, traceback):
print_tb(traceback)
click.echo("HANDLING RQ WORKER EXCEPTION: %s:%s\n" % (exc_type, exc_value))
37 changes: 37 additions & 0 deletions flexmeasures/utils/job_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from __future__ import annotations

import os

from rq import Queue
from rq.job import Job


def work_on_rq(
redis_queue: Queue,
exc_handler=None,
max_jobs=None,
job: Job | str | None = None,
):

# we only want this import distinction to matter when we actually are testing
if os.name == "nt":
from rq_win import WindowsWorker as SimpleWorker
else:
from rq import SimpleWorker

exc_handlers = []
if exc_handler is not None:
exc_handlers.append(exc_handler)
print("STARTING SIMPLE RQ WORKER, seeing %d job(s)" % redis_queue.count)
worker = SimpleWorker(
[redis_queue],
connection=redis_queue.connection,
exception_handlers=exc_handlers,
)

if job:
if isinstance(job, str):
job = Job.fetch(job, connection=redis_queue.connection)
worker.perform_job(job, redis_queue)
else:
worker.work(burst=True, max_jobs=max_jobs)