From 6c01b525074809d55982f3cb15f6b7875815a6e3 Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Mon, 24 Mar 2025 21:18:14 +0100 Subject: [PATCH 1/4] add job priority --- src/omotes_sdk/internal/worker/worker.py | 9 ++++++--- src/omotes_sdk/omotes_interface.py | 2 ++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/omotes_sdk/internal/worker/worker.py b/src/omotes_sdk/internal/worker/worker.py index ef86204..0e63cde 100644 --- a/src/omotes_sdk/internal/worker/worker.py +++ b/src/omotes_sdk/internal/worker/worker.py @@ -316,13 +316,16 @@ def start(self) -> None: ) # Config of celery app - self.celery_app.conf.task_queues = ( - KombuQueue(WORKER_TASK_TYPE, routing_key=WORKER_TASK_TYPE), - ) # Tell the worker to listen to a specific queue for 1 workflow type. + self.celery_app.conf.task_queues = [KombuQueue( + WORKER_TASK_TYPE, routing_key=WORKER_TASK_TYPE, queue_arguments={"x-max-priority": 10} + )] # Tell the worker to listen to a specific queue for 1 workflow type. self.celery_app.conf.task_acks_late = True self.celery_app.conf.task_reject_on_worker_lost = True self.celery_app.conf.task_acks_on_failure_or_timeout = False self.celery_app.conf.worker_prefetch_multiplier = 1 + self.celery_app.conf.broker_transport_options = { + "priority_step": 1 + } # Prioritize higher numbers self.celery_app.conf.broker_connection_retry_on_startup = True # app.conf.worker_send_task_events = True # Tell the worker to send task events. self.celery_app.conf.worker_hijack_root_logger = False diff --git a/src/omotes_sdk/omotes_interface.py b/src/omotes_sdk/omotes_interface.py index c2552ac..b7d38d3 100644 --- a/src/omotes_sdk/omotes_interface.py +++ b/src/omotes_sdk/omotes_interface.py @@ -303,6 +303,7 @@ def submit_job( auto_disconnect_on_result: bool, job_reference: Optional[str] = None, auto_cleanup_after_ttl: Optional[timedelta] = JOB_QUEUES_TTL, + job_priority: JobSubmission.JobPriority | int = JobSubmission.JobPriority.LOW, ) -> Job: """Submit a new job and connect to progress and status updates and the job result. @@ -362,6 +363,7 @@ def submit_job( esdl=esdl, params_dict=convert_params_dict_to_struct(workflow_type, params_dict), job_reference=job_reference, + job_priority=job_priority, # type: ignore [arg-type] ) self.broker_if.send_message_to( exchange_name=OmotesQueueNames.omotes_exchange_name(), From 8fabb117b00df602110a0e0eda3d9f16de798768 Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Fri, 11 Apr 2025 15:59:30 +0200 Subject: [PATCH 2/4] to omotes-sdk-protocol 1.1.0 and default MEDIUM job priority --- pyproject.toml | 2 +- src/omotes_sdk/omotes_interface.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index f06094d..93e7617 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,7 @@ classifiers = [ dependencies = [ "aio-pika ~= 9.4, < 9.5", - "omotes-sdk-protocol ~= 1.0", + "omotes-sdk-protocol ~= 1.1", "pyesdl ~= 24.2", "pamqp ~= 3.3", "celery ~= 5.3", diff --git a/src/omotes_sdk/omotes_interface.py b/src/omotes_sdk/omotes_interface.py index b7d38d3..df95021 100644 --- a/src/omotes_sdk/omotes_interface.py +++ b/src/omotes_sdk/omotes_interface.py @@ -303,7 +303,7 @@ def submit_job( auto_disconnect_on_result: bool, job_reference: Optional[str] = None, auto_cleanup_after_ttl: Optional[timedelta] = JOB_QUEUES_TTL, - job_priority: JobSubmission.JobPriority | int = JobSubmission.JobPriority.LOW, + job_priority: JobSubmission.JobPriority | int = JobSubmission.JobPriority.MEDIUM, ) -> Job: """Submit a new job and connect to progress and status updates and the job result. From 56378f69715a06d9052ec5f005906148d898a07c Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Fri, 11 Apr 2025 16:05:57 +0200 Subject: [PATCH 3/4] fix python3.9 Union instead of pipe --- src/omotes_sdk/omotes_interface.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/omotes_sdk/omotes_interface.py b/src/omotes_sdk/omotes_interface.py index df95021..fd3c4ef 100644 --- a/src/omotes_sdk/omotes_interface.py +++ b/src/omotes_sdk/omotes_interface.py @@ -303,7 +303,7 @@ def submit_job( auto_disconnect_on_result: bool, job_reference: Optional[str] = None, auto_cleanup_after_ttl: Optional[timedelta] = JOB_QUEUES_TTL, - job_priority: JobSubmission.JobPriority | int = JobSubmission.JobPriority.MEDIUM, + job_priority: Union[JobSubmission.JobPriority, int] = JobSubmission.JobPriority.MEDIUM, ) -> Job: """Submit a new job and connect to progress and status updates and the job result. @@ -323,6 +323,7 @@ def submit_job( `callback_on_finished`. :param job_reference: An optional reference to the submitted job which is used in the name of the output ESDL as well as in internal logging of OMOTES. + :param job_priority: An optional priority value for the job used in celery. :param auto_cleanup_after_ttl: When erroneous situations occur (e.g. client is offline), all queues pertaining to this job will be removed after the given TTL. Default to 48 hours if unset. Set to `None` to turn off auto clean up, From a58454faaf11fece35d5d697bc50c1eb4742e208 Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Fri, 11 Apr 2025 16:11:21 +0200 Subject: [PATCH 4/4] add annotations for protobuf enum type --- src/omotes_sdk/omotes_interface.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/omotes_sdk/omotes_interface.py b/src/omotes_sdk/omotes_interface.py index fd3c4ef..58d01c8 100644 --- a/src/omotes_sdk/omotes_interface.py +++ b/src/omotes_sdk/omotes_interface.py @@ -1,3 +1,4 @@ +from __future__ import annotations import logging import threading import uuid