diff --git a/pyproject.toml b/pyproject.toml index f06094d..93e7617 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,7 @@ classifiers = [ dependencies = [ "aio-pika ~= 9.4, < 9.5", - "omotes-sdk-protocol ~= 1.0", + "omotes-sdk-protocol ~= 1.1", "pyesdl ~= 24.2", "pamqp ~= 3.3", "celery ~= 5.3", diff --git a/src/omotes_sdk/internal/worker/worker.py b/src/omotes_sdk/internal/worker/worker.py index ef86204..0e63cde 100644 --- a/src/omotes_sdk/internal/worker/worker.py +++ b/src/omotes_sdk/internal/worker/worker.py @@ -316,13 +316,16 @@ def start(self) -> None: ) # Config of celery app - self.celery_app.conf.task_queues = ( - KombuQueue(WORKER_TASK_TYPE, routing_key=WORKER_TASK_TYPE), - ) # Tell the worker to listen to a specific queue for 1 workflow type. + self.celery_app.conf.task_queues = [KombuQueue( + WORKER_TASK_TYPE, routing_key=WORKER_TASK_TYPE, queue_arguments={"x-max-priority": 10} + )] # Tell the worker to listen to a specific queue for 1 workflow type. self.celery_app.conf.task_acks_late = True self.celery_app.conf.task_reject_on_worker_lost = True self.celery_app.conf.task_acks_on_failure_or_timeout = False self.celery_app.conf.worker_prefetch_multiplier = 1 + self.celery_app.conf.broker_transport_options = { + "priority_step": 1 + } # Prioritize higher numbers self.celery_app.conf.broker_connection_retry_on_startup = True # app.conf.worker_send_task_events = True # Tell the worker to send task events. self.celery_app.conf.worker_hijack_root_logger = False diff --git a/src/omotes_sdk/omotes_interface.py b/src/omotes_sdk/omotes_interface.py index c2552ac..58d01c8 100644 --- a/src/omotes_sdk/omotes_interface.py +++ b/src/omotes_sdk/omotes_interface.py @@ -1,3 +1,4 @@ +from __future__ import annotations import logging import threading import uuid @@ -303,6 +304,7 @@ def submit_job( auto_disconnect_on_result: bool, job_reference: Optional[str] = None, auto_cleanup_after_ttl: Optional[timedelta] = JOB_QUEUES_TTL, + job_priority: Union[JobSubmission.JobPriority, int] = JobSubmission.JobPriority.MEDIUM, ) -> Job: """Submit a new job and connect to progress and status updates and the job result. @@ -322,6 +324,7 @@ def submit_job( `callback_on_finished`. :param job_reference: An optional reference to the submitted job which is used in the name of the output ESDL as well as in internal logging of OMOTES. + :param job_priority: An optional priority value for the job used in celery. :param auto_cleanup_after_ttl: When erroneous situations occur (e.g. client is offline), all queues pertaining to this job will be removed after the given TTL. Default to 48 hours if unset. Set to `None` to turn off auto clean up, @@ -362,6 +365,7 @@ def submit_job( esdl=esdl, params_dict=convert_params_dict_to_struct(workflow_type, params_dict), job_reference=job_reference, + job_priority=job_priority, # type: ignore [arg-type] ) self.broker_if.send_message_to( exchange_name=OmotesQueueNames.omotes_exchange_name(),