From e244bc0f37ee3300afd37575782b5bf2adba249a Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Thu, 30 Jan 2025 15:20:04 +0100 Subject: [PATCH 1/6] add time series delete callback, create esdl util --- src/omotes_sdk/internal/common/esdl_util.py | 13 +++++++++++++ src/omotes_sdk/internal/worker/worker.py | 14 +------------- src/omotes_sdk/omotes_interface.py | 19 ++++++++++++++++++- src/omotes_sdk/queue_names.py | 10 ++++++++++ 4 files changed, 42 insertions(+), 14 deletions(-) create mode 100644 src/omotes_sdk/internal/common/esdl_util.py diff --git a/src/omotes_sdk/internal/common/esdl_util.py b/src/omotes_sdk/internal/common/esdl_util.py new file mode 100644 index 0000000..7b2b6ac --- /dev/null +++ b/src/omotes_sdk/internal/common/esdl_util.py @@ -0,0 +1,13 @@ +from esdl.esdl_handler import EnergySystemHandler + + +def pyesdl_from_string(input_str: str) -> EnergySystemHandler: + """ + Loads esdl file from a string into memory. + + Please note that it is not checked if the contents of the string is a valid esdl. + :param input_str: string containing the contents of an esdl file. + """ + esh = EnergySystemHandler() + esh.load_from_string(input_str) + return esh diff --git a/src/omotes_sdk/internal/worker/worker.py b/src/omotes_sdk/internal/worker/worker.py index 322b5ea..ef86204 100644 --- a/src/omotes_sdk/internal/worker/worker.py +++ b/src/omotes_sdk/internal/worker/worker.py @@ -11,9 +11,9 @@ from celery.apps.worker import Worker as CeleryWorker from kombu import Queue as KombuQueue from esdl import EnergySystem -from esdl.esdl_handler import EnergySystemHandler from omotes_sdk.internal.orchestrator_worker_events.esdl_messages import EsdlMessage +from omotes_sdk.internal.common.esdl_util import pyesdl_from_string from omotes_sdk.internal.worker.configs import WorkerConfig from omotes_sdk.internal.common.broker_interface import BrokerInterface from omotes_sdk_protocol.internal.task_pb2 import ( @@ -241,18 +241,6 @@ def on_failure( ) -def pyesdl_from_string(input_str: str) -> EnergySystemHandler: - """ - Loads esdl file from a string into memory. - - Please note that it is not checked if the contents of the string is a valid esdl. - :param input_str: string containing the contents of an esdl file. - """ - esh = EnergySystemHandler() - esh.load_from_string(input_str) - return esh - - def wrapped_worker_task( task: WorkerTask, job_id: UUID, job_reference: str, input_esdl: str, params_dict: ProtobufDict ) -> None: diff --git a/src/omotes_sdk/omotes_interface.py b/src/omotes_sdk/omotes_interface.py index 972a077..6741107 100644 --- a/src/omotes_sdk/omotes_interface.py +++ b/src/omotes_sdk/omotes_interface.py @@ -17,6 +17,7 @@ JobStatusUpdate, JobSubmission, JobCancel, + TimeSeriesDelete, ) from omotes_sdk_protocol.workflow_pb2 import AvailableWorkflows, RequestAvailableWorkflows @@ -375,7 +376,7 @@ def submit_job( def cancel_job(self, job: Job) -> None: """Cancel a job. - If this succeeds or not will be send as a job status update through the + If this succeeds or not will be sent as a job status update through the `callback_on_status_update` handler. This method will not disconnect from the submitted job events. This will need to be done separately using `disconnect_from_submitted_job`. @@ -389,6 +390,22 @@ def cancel_job(self, job: Job) -> None: message=cancel_msg.SerializeToString(), ) + def delete_time_series_data(self, ouput_esdl_id: str) -> None: + """Delete time series data. + + The id of the omotes output ESDL is used as parameters since the omotes job id is not known + anymore after completion of a job. + + :param esdl_id: The id of the output ESDL for which to delete the time series data. + """ + logger.info("Deleting time series data for output ESDL id %s", ouput_esdl_id) + delete_time_series_msg = TimeSeriesDelete(ouput_esdl_id=str(ouput_esdl_id)) + self.broker_if.send_message_to( + exchange_name=OmotesQueueNames.omotes_exchange_name(), + routing_key=OmotesQueueNames.time_series_delete_queue_name(), + message=delete_time_series_msg.SerializeToString(), + ) + def connect_to_available_workflows_updates(self) -> None: """Connect to updates of the available workflows.""" self.broker_if.declare_queue_and_add_subscription( diff --git a/src/omotes_sdk/queue_names.py b/src/omotes_sdk/queue_names.py index 45467f0..c81e7d8 100644 --- a/src/omotes_sdk/queue_names.py +++ b/src/omotes_sdk/queue_names.py @@ -55,6 +55,16 @@ def job_cancel_queue_name() -> str: """ return "job_cancellations" + + @staticmethod + def time_series_delete_queue_name() -> str: + """Generate the time series deletion queue name. + + :return: The queue name. + """ + return "time_series_deletions" + + @staticmethod def available_workflows_routing_key() -> str: """Generate the available work flows routing key. From ad7a20bbcb955f66bf12336ac85966179051fc0d Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Thu, 30 Jan 2025 16:48:49 +0100 Subject: [PATCH 2/6] update job cancel to job delete, to also delete time series --- README.md | 2 +- pyproject.toml | 2 +- src/omotes_sdk/omotes_interface.py | 36 +++++++++--------------------- src/omotes_sdk/queue_names.py | 15 +++---------- 4 files changed, 15 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index d2ce5fe..058a065 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ This repository is part of the 'Nieuwe Warmte Nu Design Toolkit' project. -Python implementation of the OMOTES SDK through jobs which may be submitted, receive status updates for submitted jobs or cancel submitted jobs. +Python implementation of the OMOTES SDK through jobs which may be submitted, receive status updates for submitted jobs or delete submitted jobs. ## Protobuf Please install `protoc` on your machine and make sure it is available in your `PATH`. diff --git a/pyproject.toml b/pyproject.toml index 3cbd301..341a6c8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ authors = [ { name = "Sebastiaan la Fleur", email = "sebastiaan.lafleur@tno.nl" }, { name = "Mark Vrijlandt", email = "mark.vrijlandt@tno.nl" }, ] -description = "Python implementation of the OMOTES SDK through jobs which may be submitted, receive status updates for submitted jobs or cancel submitted jobs." +description = "Python implementation of the OMOTES SDK through jobs which may be submitted, receive status updates for submitted jobs or delete submitted jobs." readme = "README.md" license = {file = "LICENSE"} classifiers = [ diff --git a/src/omotes_sdk/omotes_interface.py b/src/omotes_sdk/omotes_interface.py index 6741107..be072ff 100644 --- a/src/omotes_sdk/omotes_interface.py +++ b/src/omotes_sdk/omotes_interface.py @@ -16,8 +16,7 @@ JobProgressUpdate, JobStatusUpdate, JobSubmission, - JobCancel, - TimeSeriesDelete, + JobDelete, ) from omotes_sdk_protocol.workflow_pb2 import AvailableWorkflows, RequestAvailableWorkflows @@ -373,37 +372,22 @@ def submit_job( return job - def cancel_job(self, job: Job) -> None: - """Cancel a job. + def delete_job(self, job: Job) -> None: + """Delete a job. - If this succeeds or not will be sent as a job status update through the + If the jobs is cancelled or not will be sent as a job status update through the `callback_on_status_update` handler. This method will not disconnect from the submitted job events. This will need to be done separately using `disconnect_from_submitted_job`. + Time series data, if present, will be deleted (after a waiting period). - :param job: The job to cancel. + :param job: The job to delete. """ - logger.info("Cancelling job %s", job.id) - cancel_msg = JobCancel(uuid=str(job.id)) + logger.info("Deleting job %s", job.id) + delete_msg = JobDelete(uuid=str(job.id)) self.broker_if.send_message_to( exchange_name=OmotesQueueNames.omotes_exchange_name(), - routing_key=OmotesQueueNames.job_cancel_queue_name(), - message=cancel_msg.SerializeToString(), - ) - - def delete_time_series_data(self, ouput_esdl_id: str) -> None: - """Delete time series data. - - The id of the omotes output ESDL is used as parameters since the omotes job id is not known - anymore after completion of a job. - - :param esdl_id: The id of the output ESDL for which to delete the time series data. - """ - logger.info("Deleting time series data for output ESDL id %s", ouput_esdl_id) - delete_time_series_msg = TimeSeriesDelete(ouput_esdl_id=str(ouput_esdl_id)) - self.broker_if.send_message_to( - exchange_name=OmotesQueueNames.omotes_exchange_name(), - routing_key=OmotesQueueNames.time_series_delete_queue_name(), - message=delete_time_series_msg.SerializeToString(), + routing_key=OmotesQueueNames.job_delete_queue_name(), + message=delete_msg.SerializeToString(), ) def connect_to_available_workflows_updates(self) -> None: diff --git a/src/omotes_sdk/queue_names.py b/src/omotes_sdk/queue_names.py index c81e7d8..19b2222 100644 --- a/src/omotes_sdk/queue_names.py +++ b/src/omotes_sdk/queue_names.py @@ -48,21 +48,12 @@ def job_status_queue_name(job_uuid: uuid.UUID) -> str: return f"jobs.{job_uuid}.status" @staticmethod - def job_cancel_queue_name() -> str: - """Generate the job cancellation queue name. + def job_delete_queue_name() -> str: + """Generate the job deletion queue name. :return: The queue name. """ - return "job_cancellations" - - - @staticmethod - def time_series_delete_queue_name() -> str: - """Generate the time series deletion queue name. - - :return: The queue name. - """ - return "time_series_deletions" + return "job_deletions" @staticmethod From 5ca676e539ac50c65eceeba0b5b3a74027628c6c Mon Sep 17 00:00:00 2001 From: Sebastiaan la Fleur Date: Thu, 13 Feb 2025 12:46:28 +0100 Subject: [PATCH 3/6] 94: Update omotes-sdk-protocol package to include rename from JobCancel to JobDeletion. --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 341a6c8..2e1ffbf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,7 @@ classifiers = [ dependencies = [ "aio-pika ~= 9.4", - "omotes-sdk-protocol ~= 0.1", + "omotes-sdk-protocol ~= 0.1,>=0.1.15", "pyesdl ~= 24.2", "pamqp ~= 3.3", "celery ~= 5.3", From b681815f61055a242c73a590d96ad78475ccd717 Mon Sep 17 00:00:00 2001 From: Sebastiaan la Fleur Date: Thu, 13 Feb 2025 13:12:58 +0100 Subject: [PATCH 4/6] 94: Fix documentation for delete_job. --- src/omotes_sdk/omotes_interface.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/omotes_sdk/omotes_interface.py b/src/omotes_sdk/omotes_interface.py index be072ff..dd12428 100644 --- a/src/omotes_sdk/omotes_interface.py +++ b/src/omotes_sdk/omotes_interface.py @@ -373,12 +373,17 @@ def submit_job( return job def delete_job(self, job: Job) -> None: - """Delete a job. + """Delete a job and all of its resources. - If the jobs is cancelled or not will be sent as a job status update through the - `callback_on_status_update` handler. This method will not disconnect from the submitted job - events. This will need to be done separately using `disconnect_from_submitted_job`. - Time series data, if present, will be deleted (after a waiting period). + This will delete the job regardless of its current state. If it is running, it will be + cancelled. If the job produced any timeseries data, it will be deleted (after a waiting + period). + + Developers note: + If the jobs is successfully cancelled or not will be sent as a job status update through + the `callback_on_status_update` handler. This method will not disconnect from the submitted + job events. This will need to be done separately using `disconnect_from_submitted_job` + after receiving the job status update. :param job: The job to delete. """ From 5e95a7afc49d60bb2fb4852896621e2317dd0b22 Mon Sep 17 00:00:00 2001 From: Sebastiaan la Fleur Date: Thu, 13 Feb 2025 13:16:21 +0100 Subject: [PATCH 5/6] 94: Fix doc of delete_job some more. --- src/omotes_sdk/omotes_interface.py | 5 +++-- src/omotes_sdk/queue_names.py | 1 - 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/omotes_sdk/omotes_interface.py b/src/omotes_sdk/omotes_interface.py index dd12428..c2552ac 100644 --- a/src/omotes_sdk/omotes_interface.py +++ b/src/omotes_sdk/omotes_interface.py @@ -376,14 +376,15 @@ def delete_job(self, job: Job) -> None: """Delete a job and all of its resources. This will delete the job regardless of its current state. If it is running, it will be - cancelled. If the job produced any timeseries data, it will be deleted (after a waiting - period). + cancelled. If the job produced any timeseries data, it will be deleted eventually. Developers note: If the jobs is successfully cancelled or not will be sent as a job status update through the `callback_on_status_update` handler. This method will not disconnect from the submitted job events. This will need to be done separately using `disconnect_from_submitted_job` after receiving the job status update. + Deletion of the timeseries is done by the orchestrator. See: + https://github.com/Project-OMOTES/architecture-documentation/blob/main/Feature_Time_Series_DB_Cleanup/Feature_Time_Series_DB_Cleanup.md :param job: The job to delete. """ diff --git a/src/omotes_sdk/queue_names.py b/src/omotes_sdk/queue_names.py index 19b2222..4da620e 100644 --- a/src/omotes_sdk/queue_names.py +++ b/src/omotes_sdk/queue_names.py @@ -55,7 +55,6 @@ def job_delete_queue_name() -> str: """ return "job_deletions" - @staticmethod def available_workflows_routing_key() -> str: """Generate the available work flows routing key. From 9e2424b23ed90d2c8fff4facdfa3e1186c0e7171 Mon Sep 17 00:00:00 2001 From: Sebastiaan la Fleur Date: Thu, 13 Feb 2025 13:25:11 +0100 Subject: [PATCH 6/6] 94: Update setup-python action in ci. --- .github/workflows/ci_linux.yml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci_linux.yml b/.github/workflows/ci_linux.yml index 0384b69..e2ffa4f 100644 --- a/.github/workflows/ci_linux.yml +++ b/.github/workflows/ci_linux.yml @@ -13,7 +13,7 @@ jobs: steps: - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} cache: 'pip' @@ -39,7 +39,7 @@ jobs: steps: - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} cache: 'pip' @@ -64,7 +64,7 @@ jobs: steps: - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} cache: 'pip' @@ -112,7 +112,7 @@ jobs: steps: - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} cache: 'pip' @@ -140,7 +140,7 @@ jobs: steps: - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} cache: 'pip'