diff --git a/dev-requirements.txt b/dev-requirements.txt index f80e705..73829b9 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -35,19 +35,19 @@ casadi-gil-comp==3.6.7 # -c requirements.txt # mesido # rtc-tools-gil-comp -celery==5.5.1 +celery==5.5.2 # via # -c requirements.txt # omotes-sdk-python -certifi==2025.1.31 +certifi==2025.4.26 # via # -c requirements.txt # requests -charset-normalizer==3.4.1 +charset-normalizer==3.4.2 # via # -c requirements.txt # requests -click==8.1.8 +click==8.2.1 # via # -c requirements.txt # black @@ -77,7 +77,7 @@ defusedxml==0.7.1 # via # -c requirements.txt # rtc-tools-gil-comp -exceptiongroup==1.2.2 +exceptiongroup==1.3.0 # via pytest flake8==7.1.1 # via @@ -116,17 +116,17 @@ iniconfig==2.1.0 # via pytest isort==5.13.2 # via omotes-grow-worker (pyproject.toml) -kombu==5.5.2 +kombu==5.5.3 # via # -c requirements.txt # celery -lxml==5.3.2 +lxml==5.4.0 # via # -c requirements.txt # pyecore mccabe==0.7.0 # via flake8 -mesido==0.1.8.5 +mesido==0.1.11 # via # -c requirements.txt # omotes-grow-worker (pyproject.toml) @@ -134,13 +134,13 @@ msgpack==1.1.0 # via # -c requirements.txt # influxdb -multidict==6.4.3 +multidict==6.4.4 # via # -c requirements.txt # yarl mypy==1.13.0 # via omotes-grow-worker (pyproject.toml) -mypy-extensions==1.0.0 +mypy-extensions==1.1.0 # via # black # mypy @@ -152,11 +152,11 @@ numpy==1.25.2 # pymoca # rtc-tools-gil-comp # scipy -omotes-sdk-protocol==1.1.0 +omotes-sdk-protocol==1.2.0 # via # -c requirements.txt # omotes-sdk-python -omotes-sdk-python==4.1.0 +omotes-sdk-python==4.2.1 # via # -c requirements.txt # omotes-grow-worker (pyproject.toml) @@ -164,7 +164,7 @@ ordered-set==4.1.0 # via # -c requirements.txt # pyecore -packaging==24.2 +packaging==25.0 # via # black # build @@ -181,11 +181,11 @@ pandas==1.5.3 # mesido pathspec==0.12.1 # via black -platformdirs==4.3.7 +platformdirs==4.3.8 # via black -pluggy==1.5.0 +pluggy==1.6.0 # via pytest -prompt-toolkit==3.0.50 +prompt-toolkit==3.0.51 # via # -c requirements.txt # click-repl @@ -201,12 +201,12 @@ pycodestyle==2.12.1 # via flake8 pydocstyle==6.3.0 # via flake8-docstrings -pyecore==0.12.1 +pyecore==0.13.2 # via # -c requirements.txt # mesido # pyesdl -pyesdl==24.2 +pyesdl==25.5.1 # via # -c requirements.txt # mesido @@ -246,7 +246,7 @@ requests==2.32.3 # via # -c requirements.txt # influxdb -restrictedpython==8.1a1.dev0 +restrictedpython==8.0 # via # -c requirements.txt # pyecore @@ -270,7 +270,7 @@ six==1.17.0 # flake8-tuple # influxdb # python-dateutil -snowballstemmer==2.2.0 +snowballstemmer==3.0.1 # via pydocstyle streamcapture==1.2.5 # via @@ -293,6 +293,7 @@ typing-extensions==4.13.2 # via # -c requirements.txt # black + # exceptiongroup # multidict # mypy # omotes-sdk-python @@ -316,7 +317,7 @@ wcwidth==0.2.13 # prompt-toolkit wheel==0.45.1 # via omotes-grow-worker (pyproject.toml) -yarl==1.19.0 +yarl==1.20.0 # via # -c requirements.txt # aio-pika diff --git a/pyproject.toml b/pyproject.toml index eadd7e4..14683f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,8 +25,8 @@ requires-python = ">=3.10" dependencies = [ "python-dotenv ~= 1.0.0", - "mesido ~= 0.1.8.5", - "omotes-sdk-python ~= 4.1.0" + "mesido ~= 0.1.11", + "omotes-sdk-python ~= 4.2.1" ] [project.optional-dependencies] diff --git a/requirements.txt b/requirements.txt index 86d509f..8033f51 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,13 +18,13 @@ casadi-gil-comp==3.6.7 # via # mesido # rtc-tools-gil-comp -celery==5.5.1 +celery==5.5.2 # via omotes-sdk-python -certifi==2025.1.31 +certifi==2025.4.26 # via requests -charset-normalizer==3.4.1 +charset-normalizer==3.4.2 # via requests -click==8.1.8 +click==8.2.1 # via # celery # click-didyoumean @@ -48,15 +48,15 @@ idna==3.10 # yarl influxdb==5.3.2 # via mesido -kombu==5.5.2 +kombu==5.5.3 # via celery -lxml==5.3.2 +lxml==5.4.0 # via pyecore -mesido==0.1.8.5 +mesido==0.1.11 # via omotes-grow-worker (pyproject.toml) msgpack==1.1.0 # via influxdb -multidict==6.4.3 +multidict==6.4.4 # via yarl numpy==1.25.2 # via @@ -65,9 +65,9 @@ numpy==1.25.2 # pymoca # rtc-tools-gil-comp # scipy -omotes-sdk-protocol==1.1.0 +omotes-sdk-protocol==1.2.0 # via omotes-sdk-python -omotes-sdk-python==4.1.0 +omotes-sdk-python==4.2.1 # via omotes-grow-worker (pyproject.toml) ordered-set==4.1.0 # via pyecore @@ -77,17 +77,17 @@ pamqp==3.3.0 # omotes-sdk-python pandas==1.5.3 # via mesido -prompt-toolkit==3.0.50 +prompt-toolkit==3.0.51 # via click-repl propcache==0.3.1 # via yarl protobuf==5.29.4 # via omotes-sdk-protocol -pyecore==0.12.1 +pyecore==0.13.2 # via # mesido # pyesdl -pyesdl==24.2 +pyesdl==25.5.1 # via # mesido # omotes-sdk-python @@ -109,7 +109,7 @@ pytz==2025.2 # pandas requests==2.32.3 # via influxdb -restrictedpython==8.1a1.dev0 +restrictedpython==8.0 # via pyecore rtc-tools-channel-flow==1.2.0 # via rtc-tools-gil-comp @@ -140,7 +140,7 @@ vine==5.1.0 # kombu wcwidth==0.2.13 # via prompt-toolkit -yarl==1.19.0 +yarl==1.20.0 # via # aio-pika # aiormq diff --git a/run.sh b/run.sh index e754c64..29b426f 100755 --- a/run.sh +++ b/run.sh @@ -1,6 +1,6 @@ #!/bin/bash . .venv/bin/activate -. ci/linux/_load_dot_env.sh .env.local +. ci/linux/_load_dot_env.sh .env PYTHONPATH="src/" python3 -m grow_worker.worker diff --git a/src/grow_worker/worker.py b/src/grow_worker/worker.py index b9662c4..7cc3cb1 100644 --- a/src/grow_worker/worker.py +++ b/src/grow_worker/worker.py @@ -1,10 +1,5 @@ import base64 import logging -import multiprocessing -import multiprocessing.pool -import signal -import time -from multiprocessing.process import current_process import os from pathlib import Path from typing import cast, Dict, List, Tuple, Optional @@ -34,20 +29,26 @@ class EarlySystemExit(Exception): """Wrapper for `SystemExit` exception. - To ensure that the worker process does not shutdown but rather handles the `SystemExit` as an + To ensure that the worker process does not shut down but rather handles the `SystemExit` as an error """ ... -def run_mesido(input_esdl: str) -> Tuple[Optional[str], List[EsdlMessage]]: - """Run mesido using the specific workflow. +def grow_worker_task( + input_esdl: str, workflow_config: ProtobufDict, update_progress_handler: UpdateProgressHandler +) -> Tuple[Optional[str], List[EsdlMessage]]: + """Run the grow worker task and run configured specific problem type for this worker instance. - Note: This is run without a subprocess! Casadi does not yield the GIL and therefore - causes starved thread issues. + Note: Be careful! This spawns within a subprocess and gains a copy of memory from parent + process. You cannot open sockets and other resources in the main process and expect + it to be copied to subprocess. Any resources e.g. connections/sockets need to be opened + in this task by the subprocess. :param input_esdl: The input ESDL XML string. + :param workflow_config: Extra parameters to configure this run. + :param update_progress_handler: Handler to notify of any progress changes. :return: GROW optimized or simulated ESDL and a list of ESDL feedback messages. """ mesido_func = get_problem_function(GROW_TASK_TYPE) @@ -77,7 +78,7 @@ def run_mesido(input_esdl: str) -> Tuple[Optional[str], List[EsdlMessage]]: influxdb_password=os.environ.get("INFLUXDB_PASSWORD"), influxdb_ssl=False, influxdb_verify_ssl=False, - update_progress_function=None, + update_progress_function=update_progress_handler, profile_reader=InfluxDBProfileReader, ) esdl_str = cast(str, solution.optimized_esdl_string) @@ -121,70 +122,5 @@ def parse_mesido_esdl_messages( return esdl_messages -def kill_pool(pool: multiprocessing.pool.Pool) -> None: - """Terminate all the process of a multiprocessing.Pool with SIGKILL. - - Found here: https://stackoverflow.com/a/47580796 - - multiprocessing.Pool.terminate does not provide a way to give a different signal than SIGTERM - so this function hooks into the internals to properly handle sending SIGKILL to all processes in - the pool. - - :param pool: The multiprocessing to kill all processes in. - """ - # - # stop repopulating new child - pool._state = multiprocessing.pool.TERMINATE # type: ignore[attr-defined] - pool._worker_handler._state = multiprocessing.pool.TERMINATE # type: ignore[attr-defined] - for p in pool._pool: # type: ignore[attr-defined] - if p.is_alive(): - logger.warning("Sending SIGKILL to pool process with pid %s", p.pid) - os.kill(p.pid, signal.SIGKILL) - # .is_alive() will reap dead process - wait_till = time.time() + 5.0 - while ( - any(p.is_alive() for p in pool._pool) # type: ignore[attr-defined] - and time.time() < wait_till - ): - pass - logger.warning("All processes in pool have been terminated.") - pool.terminate() - logger.warning("Forceful pool termination completed.") - - -def grow_worker_task( - input_esdl: str, workflow_config: ProtobufDict, update_progress_handler: UpdateProgressHandler -) -> Tuple[Optional[str], List[EsdlMessage]]: - """Run the grow worker task and run configured specific problem type for this worker instance. - - Note: Be careful! This spawns within a subprocess and gains a copy of memory from parent - process. You cannot open sockets and other resources in the main process and expect - it to be copied to subprocess. Any resources e.g. connections/sockets need to be opened - in this task by the subprocess. - - :param input_esdl: The input ESDL XML string. - :param workflow_config: Extra parameters to configure this run. - :param update_progress_handler: Handler to notify of any progress changes. - :return: GROW optimized or simulated ESDL and a list of ESDL feedback messages. - """ - # TODO Very nasty hack. Celery unfortunately starts the worker subprocesses as 'daemons' - # which prevents this process from creating any other subprocesses. Therefore, we - # acknowledge this process is a daemon and turn of the protectioon that prevents new - # subprocesses from being created. This does introduce the issue that if this - # process is killed/cancelled/revoked, the subprocess will continue as a zombie process. - # See https://github.com/Project-OMOTES/optimizer-worker/issues/54 - current_process()._config["daemon"] = False # type: ignore[attr-defined] - - with multiprocessing.Pool(1) as pool: - try: - output = pool.map(run_mesido, [input_esdl])[0] - except SystemExit as e: - logger.warning("During pool the worker was requested to quit: %s %s", type(e), e) - kill_pool(pool) - raise - - return output - - if __name__ == "__main__": initialize_worker(GROW_TASK_TYPE.value, grow_worker_task)