Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ classifiers = [

dependencies = [
"aio-pika ~= 9.4, < 9.5",
"omotes-sdk-protocol ~= 1.0",
"omotes-sdk-protocol ~= 1.1",
"pyesdl ~= 24.2",
"pamqp ~= 3.3",
"celery ~= 5.3",
Expand Down
9 changes: 6 additions & 3 deletions src/omotes_sdk/internal/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,13 +316,16 @@ def start(self) -> None:
)

# Config of celery app
self.celery_app.conf.task_queues = (
KombuQueue(WORKER_TASK_TYPE, routing_key=WORKER_TASK_TYPE),
) # Tell the worker to listen to a specific queue for 1 workflow type.
self.celery_app.conf.task_queues = [KombuQueue(
WORKER_TASK_TYPE, routing_key=WORKER_TASK_TYPE, queue_arguments={"x-max-priority": 10}
)] # Tell the worker to listen to a specific queue for 1 workflow type.
self.celery_app.conf.task_acks_late = True
self.celery_app.conf.task_reject_on_worker_lost = True
self.celery_app.conf.task_acks_on_failure_or_timeout = False
self.celery_app.conf.worker_prefetch_multiplier = 1
self.celery_app.conf.broker_transport_options = {
"priority_step": 1
} # Prioritize higher numbers
self.celery_app.conf.broker_connection_retry_on_startup = True
# app.conf.worker_send_task_events = True # Tell the worker to send task events.
self.celery_app.conf.worker_hijack_root_logger = False
Expand Down
4 changes: 4 additions & 0 deletions src/omotes_sdk/omotes_interface.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import annotations
import logging
import threading
import uuid
Expand Down Expand Up @@ -303,6 +304,7 @@ def submit_job(
auto_disconnect_on_result: bool,
job_reference: Optional[str] = None,
auto_cleanup_after_ttl: Optional[timedelta] = JOB_QUEUES_TTL,
job_priority: Union[JobSubmission.JobPriority, int] = JobSubmission.JobPriority.MEDIUM,
) -> Job:
"""Submit a new job and connect to progress and status updates and the job result.

Expand All @@ -322,6 +324,7 @@ def submit_job(
`callback_on_finished`.
:param job_reference: An optional reference to the submitted job which is used in the
name of the output ESDL as well as in internal logging of OMOTES.
:param job_priority: An optional priority value for the job used in celery.
:param auto_cleanup_after_ttl: When erroneous situations occur (e.g. client is offline),
all queues pertaining to this job will be removed after the given TTL.
Default to 48 hours if unset. Set to `None` to turn off auto clean up,
Expand Down Expand Up @@ -362,6 +365,7 @@ def submit_job(
esdl=esdl,
params_dict=convert_params_dict_to_struct(workflow_type, params_dict),
job_reference=job_reference,
job_priority=job_priority, # type: ignore [arg-type]
)
self.broker_if.send_message_to(
exchange_name=OmotesQueueNames.omotes_exchange_name(),
Expand Down
Loading