Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 22 additions & 21 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@ casadi-gil-comp==3.6.7
# -c requirements.txt
# mesido
# rtc-tools-gil-comp
celery==5.5.1
celery==5.5.2
# via
# -c requirements.txt
# omotes-sdk-python
certifi==2025.1.31
certifi==2025.4.26
# via
# -c requirements.txt
# requests
charset-normalizer==3.4.1
charset-normalizer==3.4.2
# via
# -c requirements.txt
# requests
click==8.1.8
click==8.2.1
# via
# -c requirements.txt
# black
Expand Down Expand Up @@ -77,7 +77,7 @@ defusedxml==0.7.1
# via
# -c requirements.txt
# rtc-tools-gil-comp
exceptiongroup==1.2.2
exceptiongroup==1.3.0
# via pytest
flake8==7.1.1
# via
Expand Down Expand Up @@ -116,31 +116,31 @@ iniconfig==2.1.0
# via pytest
isort==5.13.2
# via omotes-grow-worker (pyproject.toml)
kombu==5.5.2
kombu==5.5.3
# via
# -c requirements.txt
# celery
lxml==5.3.2
lxml==5.4.0
# via
# -c requirements.txt
# pyecore
mccabe==0.7.0
# via flake8
mesido==0.1.8.5
mesido==0.1.11
# via
# -c requirements.txt
# omotes-grow-worker (pyproject.toml)
msgpack==1.1.0
# via
# -c requirements.txt
# influxdb
multidict==6.4.3
multidict==6.4.4
# via
# -c requirements.txt
# yarl
mypy==1.13.0
# via omotes-grow-worker (pyproject.toml)
mypy-extensions==1.0.0
mypy-extensions==1.1.0
# via
# black
# mypy
Expand All @@ -152,19 +152,19 @@ numpy==1.25.2
# pymoca
# rtc-tools-gil-comp
# scipy
omotes-sdk-protocol==1.1.0
omotes-sdk-protocol==1.2.0
# via
# -c requirements.txt
# omotes-sdk-python
omotes-sdk-python==4.1.0
omotes-sdk-python==4.2.1
# via
# -c requirements.txt
# omotes-grow-worker (pyproject.toml)
ordered-set==4.1.0
# via
# -c requirements.txt
# pyecore
packaging==24.2
packaging==25.0
# via
# black
# build
Expand All @@ -181,11 +181,11 @@ pandas==1.5.3
# mesido
pathspec==0.12.1
# via black
platformdirs==4.3.7
platformdirs==4.3.8
# via black
pluggy==1.5.0
pluggy==1.6.0
# via pytest
prompt-toolkit==3.0.50
prompt-toolkit==3.0.51
# via
# -c requirements.txt
# click-repl
Expand All @@ -201,12 +201,12 @@ pycodestyle==2.12.1
# via flake8
pydocstyle==6.3.0
# via flake8-docstrings
pyecore==0.12.1
pyecore==0.13.2
# via
# -c requirements.txt
# mesido
# pyesdl
pyesdl==24.2
pyesdl==25.5.1
# via
# -c requirements.txt
# mesido
Expand Down Expand Up @@ -246,7 +246,7 @@ requests==2.32.3
# via
# -c requirements.txt
# influxdb
restrictedpython==8.1a1.dev0
restrictedpython==8.0
# via
# -c requirements.txt
# pyecore
Expand All @@ -270,7 +270,7 @@ six==1.17.0
# flake8-tuple
# influxdb
# python-dateutil
snowballstemmer==2.2.0
snowballstemmer==3.0.1
# via pydocstyle
streamcapture==1.2.5
# via
Expand All @@ -293,6 +293,7 @@ typing-extensions==4.13.2
# via
# -c requirements.txt
# black
# exceptiongroup
# multidict
# mypy
# omotes-sdk-python
Expand All @@ -316,7 +317,7 @@ wcwidth==0.2.13
# prompt-toolkit
wheel==0.45.1
# via omotes-grow-worker (pyproject.toml)
yarl==1.19.0
yarl==1.20.0
# via
# -c requirements.txt
# aio-pika
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ requires-python = ">=3.10"

dependencies = [
"python-dotenv ~= 1.0.0",
"mesido ~= 0.1.8.5",
"omotes-sdk-python ~= 4.1.0"
"mesido ~= 0.1.11",
"omotes-sdk-python ~= 4.2.1"
]

[project.optional-dependencies]
Expand Down
30 changes: 15 additions & 15 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ casadi-gil-comp==3.6.7
# via
# mesido
# rtc-tools-gil-comp
celery==5.5.1
celery==5.5.2
# via omotes-sdk-python
certifi==2025.1.31
certifi==2025.4.26
# via requests
charset-normalizer==3.4.1
charset-normalizer==3.4.2
# via requests
click==8.1.8
click==8.2.1
# via
# celery
# click-didyoumean
Expand All @@ -48,15 +48,15 @@ idna==3.10
# yarl
influxdb==5.3.2
# via mesido
kombu==5.5.2
kombu==5.5.3
# via celery
lxml==5.3.2
lxml==5.4.0
# via pyecore
mesido==0.1.8.5
mesido==0.1.11
# via omotes-grow-worker (pyproject.toml)
msgpack==1.1.0
# via influxdb
multidict==6.4.3
multidict==6.4.4
# via yarl
numpy==1.25.2
# via
Expand All @@ -65,9 +65,9 @@ numpy==1.25.2
# pymoca
# rtc-tools-gil-comp
# scipy
omotes-sdk-protocol==1.1.0
omotes-sdk-protocol==1.2.0
# via omotes-sdk-python
omotes-sdk-python==4.1.0
omotes-sdk-python==4.2.1
# via omotes-grow-worker (pyproject.toml)
ordered-set==4.1.0
# via pyecore
Expand All @@ -77,17 +77,17 @@ pamqp==3.3.0
# omotes-sdk-python
pandas==1.5.3
# via mesido
prompt-toolkit==3.0.50
prompt-toolkit==3.0.51
# via click-repl
propcache==0.3.1
# via yarl
protobuf==5.29.4
# via omotes-sdk-protocol
pyecore==0.12.1
pyecore==0.13.2
# via
# mesido
# pyesdl
pyesdl==24.2
pyesdl==25.5.1
# via
# mesido
# omotes-sdk-python
Expand All @@ -109,7 +109,7 @@ pytz==2025.2
# pandas
requests==2.32.3
# via influxdb
restrictedpython==8.1a1.dev0
restrictedpython==8.0
# via pyecore
rtc-tools-channel-flow==1.2.0
# via rtc-tools-gil-comp
Expand Down Expand Up @@ -140,7 +140,7 @@ vine==5.1.0
# kombu
wcwidth==0.2.13
# via prompt-toolkit
yarl==1.19.0
yarl==1.20.0
# via
# aio-pika
# aiormq
Expand Down
2 changes: 1 addition & 1 deletion run.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

. .venv/bin/activate
. ci/linux/_load_dot_env.sh .env.local
. ci/linux/_load_dot_env.sh .env

PYTHONPATH="src/" python3 -m grow_worker.worker
88 changes: 12 additions & 76 deletions src/grow_worker/worker.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
import base64
import logging
import multiprocessing
import multiprocessing.pool
import signal
import time
from multiprocessing.process import current_process
import os
from pathlib import Path
from typing import cast, Dict, List, Tuple, Optional
Expand Down Expand Up @@ -34,20 +29,26 @@
class EarlySystemExit(Exception):
"""Wrapper for `SystemExit` exception.

To ensure that the worker process does not shutdown but rather handles the `SystemExit` as an
To ensure that the worker process does not shut down but rather handles the `SystemExit` as an
error
"""

...


def run_mesido(input_esdl: str) -> Tuple[Optional[str], List[EsdlMessage]]:
"""Run mesido using the specific workflow.
def grow_worker_task(
input_esdl: str, workflow_config: ProtobufDict, update_progress_handler: UpdateProgressHandler
) -> Tuple[Optional[str], List[EsdlMessage]]:
"""Run the grow worker task and run configured specific problem type for this worker instance.

Note: This is run without a subprocess! Casadi does not yield the GIL and therefore
causes starved thread issues.
Note: Be careful! This spawns within a subprocess and gains a copy of memory from parent
process. You cannot open sockets and other resources in the main process and expect
it to be copied to subprocess. Any resources e.g. connections/sockets need to be opened
in this task by the subprocess.

:param input_esdl: The input ESDL XML string.
:param workflow_config: Extra parameters to configure this run.
:param update_progress_handler: Handler to notify of any progress changes.
:return: GROW optimized or simulated ESDL and a list of ESDL feedback messages.
"""
mesido_func = get_problem_function(GROW_TASK_TYPE)
Expand Down Expand Up @@ -77,7 +78,7 @@ def run_mesido(input_esdl: str) -> Tuple[Optional[str], List[EsdlMessage]]:
influxdb_password=os.environ.get("INFLUXDB_PASSWORD"),
influxdb_ssl=False,
influxdb_verify_ssl=False,
update_progress_function=None,
update_progress_function=update_progress_handler,
profile_reader=InfluxDBProfileReader,
)
esdl_str = cast(str, solution.optimized_esdl_string)
Expand Down Expand Up @@ -121,70 +122,5 @@ def parse_mesido_esdl_messages(
return esdl_messages


def kill_pool(pool: multiprocessing.pool.Pool) -> None:
"""Terminate all the process of a multiprocessing.Pool with SIGKILL.

Found here: https://stackoverflow.com/a/47580796

multiprocessing.Pool.terminate does not provide a way to give a different signal than SIGTERM
so this function hooks into the internals to properly handle sending SIGKILL to all processes in
the pool.

:param pool: The multiprocessing to kill all processes in.
"""
#
# stop repopulating new child
pool._state = multiprocessing.pool.TERMINATE # type: ignore[attr-defined]
pool._worker_handler._state = multiprocessing.pool.TERMINATE # type: ignore[attr-defined]
for p in pool._pool: # type: ignore[attr-defined]
if p.is_alive():
logger.warning("Sending SIGKILL to pool process with pid %s", p.pid)
os.kill(p.pid, signal.SIGKILL)
# .is_alive() will reap dead process
wait_till = time.time() + 5.0
while (
any(p.is_alive() for p in pool._pool) # type: ignore[attr-defined]
and time.time() < wait_till
):
pass
logger.warning("All processes in pool have been terminated.")
pool.terminate()
logger.warning("Forceful pool termination completed.")


def grow_worker_task(
input_esdl: str, workflow_config: ProtobufDict, update_progress_handler: UpdateProgressHandler
) -> Tuple[Optional[str], List[EsdlMessage]]:
"""Run the grow worker task and run configured specific problem type for this worker instance.

Note: Be careful! This spawns within a subprocess and gains a copy of memory from parent
process. You cannot open sockets and other resources in the main process and expect
it to be copied to subprocess. Any resources e.g. connections/sockets need to be opened
in this task by the subprocess.

:param input_esdl: The input ESDL XML string.
:param workflow_config: Extra parameters to configure this run.
:param update_progress_handler: Handler to notify of any progress changes.
:return: GROW optimized or simulated ESDL and a list of ESDL feedback messages.
"""
# TODO Very nasty hack. Celery unfortunately starts the worker subprocesses as 'daemons'
# which prevents this process from creating any other subprocesses. Therefore, we
# acknowledge this process is a daemon and turn of the protectioon that prevents new
# subprocesses from being created. This does introduce the issue that if this
# process is killed/cancelled/revoked, the subprocess will continue as a zombie process.
# See https://github.com/Project-OMOTES/optimizer-worker/issues/54
current_process()._config["daemon"] = False # type: ignore[attr-defined]

with multiprocessing.Pool(1) as pool:
try:
output = pool.map(run_mesido, [input_esdl])[0]
except SystemExit as e:
logger.warning("During pool the worker was requested to quit: %s %s", type(e), e)
kill_pool(pool)
raise

return output


if __name__ == "__main__":
initialize_worker(GROW_TASK_TYPE.value, grow_worker_task)
Loading