Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/ci_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: 'pip'
Expand All @@ -39,7 +39,7 @@ jobs:
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: 'pip'
Expand All @@ -64,7 +64,7 @@ jobs:
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: 'pip'
Expand Down Expand Up @@ -112,7 +112,7 @@ jobs:
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: 'pip'
Expand Down Expand Up @@ -140,7 +140,7 @@ jobs:
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: 'pip'
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

This repository is part of the 'Nieuwe Warmte Nu Design Toolkit' project.

Python implementation of the OMOTES SDK through jobs which may be submitted, receive status updates for submitted jobs or cancel submitted jobs.
Python implementation of the OMOTES SDK through jobs which may be submitted, receive status updates for submitted jobs or delete submitted jobs.

## Protobuf
Please install `protoc` on your machine and make sure it is available in your `PATH`.
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = [
{ name = "Sebastiaan la Fleur", email = "sebastiaan.lafleur@tno.nl" },
{ name = "Mark Vrijlandt", email = "mark.vrijlandt@tno.nl" },
]
description = "Python implementation of the OMOTES SDK through jobs which may be submitted, receive status updates for submitted jobs or cancel submitted jobs."
description = "Python implementation of the OMOTES SDK through jobs which may be submitted, receive status updates for submitted jobs or delete submitted jobs."
readme = "README.md"
license = {file = "LICENSE"}
classifiers = [
Expand All @@ -25,7 +25,7 @@ classifiers = [

dependencies = [
"aio-pika ~= 9.4",
"omotes-sdk-protocol ~= 0.1",
"omotes-sdk-protocol ~= 0.1,>=0.1.15",
"pyesdl ~= 24.2",
"pamqp ~= 3.3",
"celery ~= 5.3",
Expand Down
13 changes: 13 additions & 0 deletions src/omotes_sdk/internal/common/esdl_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from esdl.esdl_handler import EnergySystemHandler


def pyesdl_from_string(input_str: str) -> EnergySystemHandler:
"""
Loads esdl file from a string into memory.

Please note that it is not checked if the contents of the string is a valid esdl.
:param input_str: string containing the contents of an esdl file.
"""
esh = EnergySystemHandler()
esh.load_from_string(input_str)
return esh
14 changes: 1 addition & 13 deletions src/omotes_sdk/internal/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
from celery.apps.worker import Worker as CeleryWorker
from kombu import Queue as KombuQueue
from esdl import EnergySystem
from esdl.esdl_handler import EnergySystemHandler

from omotes_sdk.internal.orchestrator_worker_events.esdl_messages import EsdlMessage
from omotes_sdk.internal.common.esdl_util import pyesdl_from_string
from omotes_sdk.internal.worker.configs import WorkerConfig
from omotes_sdk.internal.common.broker_interface import BrokerInterface
from omotes_sdk_protocol.internal.task_pb2 import (
Expand Down Expand Up @@ -241,18 +241,6 @@ def on_failure(
)


def pyesdl_from_string(input_str: str) -> EnergySystemHandler:
"""
Loads esdl file from a string into memory.

Please note that it is not checked if the contents of the string is a valid esdl.
:param input_str: string containing the contents of an esdl file.
"""
esh = EnergySystemHandler()
esh.load_from_string(input_str)
return esh


def wrapped_worker_task(
task: WorkerTask, job_id: UUID, job_reference: str, input_esdl: str, params_dict: ProtobufDict
) -> None:
Expand Down
29 changes: 18 additions & 11 deletions src/omotes_sdk/omotes_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
JobProgressUpdate,
JobStatusUpdate,
JobSubmission,
JobCancel,
JobDelete,
)
from omotes_sdk_protocol.workflow_pb2 import AvailableWorkflows, RequestAvailableWorkflows

Expand Down Expand Up @@ -372,21 +372,28 @@ def submit_job(

return job

def cancel_job(self, job: Job) -> None:
"""Cancel a job.
def delete_job(self, job: Job) -> None:
"""Delete a job and all of its resources.

If this succeeds or not will be send as a job status update through the
`callback_on_status_update` handler. This method will not disconnect from the submitted job
events. This will need to be done separately using `disconnect_from_submitted_job`.
This will delete the job regardless of its current state. If it is running, it will be
cancelled. If the job produced any timeseries data, it will be deleted eventually.

:param job: The job to cancel.
Developers note:
If the jobs is successfully cancelled or not will be sent as a job status update through
the `callback_on_status_update` handler. This method will not disconnect from the submitted
job events. This will need to be done separately using `disconnect_from_submitted_job`
after receiving the job status update.
Deletion of the timeseries is done by the orchestrator. See:
https://github.com/Project-OMOTES/architecture-documentation/blob/main/Feature_Time_Series_DB_Cleanup/Feature_Time_Series_DB_Cleanup.md

:param job: The job to delete.
"""
logger.info("Cancelling job %s", job.id)
cancel_msg = JobCancel(uuid=str(job.id))
logger.info("Deleting job %s", job.id)
delete_msg = JobDelete(uuid=str(job.id))
self.broker_if.send_message_to(
exchange_name=OmotesQueueNames.omotes_exchange_name(),
routing_key=OmotesQueueNames.job_cancel_queue_name(),
message=cancel_msg.SerializeToString(),
routing_key=OmotesQueueNames.job_delete_queue_name(),
message=delete_msg.SerializeToString(),
)

def connect_to_available_workflows_updates(self) -> None:
Expand Down
6 changes: 3 additions & 3 deletions src/omotes_sdk/queue_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ def job_status_queue_name(job_uuid: uuid.UUID) -> str:
return f"jobs.{job_uuid}.status"

@staticmethod
def job_cancel_queue_name() -> str:
"""Generate the job cancellation queue name.
def job_delete_queue_name() -> str:
"""Generate the job deletion queue name.

:return: The queue name.
"""
return "job_cancellations"
return "job_deletions"

@staticmethod
def available_workflows_routing_key() -> str:
Expand Down
Loading