-
Notifications
You must be signed in to change notification settings - Fork 42
Remove asyncio usage in jobs monitoring #796
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,5 @@ | ||
| # SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES | ||
| # Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | ||
| # Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
|
|
@@ -14,7 +14,6 @@ | |
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| import asyncio | ||
| import datetime | ||
| import logging | ||
| from types import FrameType | ||
|
|
@@ -80,39 +79,18 @@ def create_runner(self, mode: str, system: System, test_scenario: TestScenario) | |
|
|
||
| return runner_class(mode, system, test_scenario, results_root) | ||
|
|
||
| async def run(self): | ||
| def run(self): | ||
| """Run the test scenario using the instantiated runner.""" | ||
| try: | ||
| await self.runner.run() | ||
| self.runner.run() | ||
| logging.debug("All jobs finished successfully.") | ||
| except asyncio.CancelledError: | ||
| logging.info("Runner cancelled, performing cleanup...") | ||
| await self.runner.shutdown() | ||
| return | ||
| except JobFailureError as exc: | ||
| logging.debug(f"Runner failed JobFailure exception: {exc}", exc_info=True) | ||
|
|
||
| def _cancel_all(self): | ||
| # the below code might look excessive, this is to address https://docs.astral.sh/ruff/rules/asyncio-dangling-task/ | ||
| shutdown_task = asyncio.create_task(self.runner.shutdown()) | ||
| tasks = {shutdown_task} | ||
| shutdown_task.add_done_callback(tasks.discard) | ||
|
|
||
| for task in asyncio.all_tasks(): | ||
| if task == shutdown_task: | ||
| continue | ||
|
|
||
| logging.debug(f"Cancelling task: {task}") | ||
| try: | ||
| task.cancel() | ||
| except asyncio.CancelledError as exc: | ||
| logging.debug(f"Error cancelling task: {task}, {exc}", exc_info=True) | ||
| pass | ||
|
|
||
| def cancel_on_signal( | ||
| self, | ||
| signum: int, | ||
| frame: Optional[FrameType], # noqa: Vulture | ||
| ): | ||
| logging.info(f"Signal {signum} received, shutting down...") | ||
| asyncio.get_running_loop().call_soon_threadsafe(self._cancel_all) | ||
| self.runner.shutdown() | ||
|
Comment on lines
90
to
+96
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Blocking work in signal handler
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,5 @@ | ||
| # SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES | ||
| # Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | ||
| # Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
|
|
@@ -20,7 +20,7 @@ | |
| from typing import Any, Dict, Optional | ||
|
|
||
| import requests | ||
| import websockets | ||
| from websockets.sync.client import connect as ws_connect | ||
|
|
||
|
|
||
| class RunAIRestClient: | ||
|
|
@@ -496,7 +496,7 @@ def is_cluster_api_available(self, cluster_domain: str) -> bool: | |
| response = requests.get(url, headers=headers) | ||
| return "OK" in response.text | ||
|
|
||
| async def fetch_training_logs( | ||
| def fetch_training_logs( | ||
| self, cluster_domain: str, project_name: str, training_task_name: str, output_file_path: Path | ||
| ): | ||
| if not self.is_cluster_api_available(cluster_domain): | ||
|
|
@@ -512,9 +512,11 @@ async def fetch_training_logs( | |
| } | ||
|
|
||
| ssl_context = ssl._create_unverified_context() | ||
| async with websockets.connect(url, extra_headers=headers, ssl=ssl_context) as websocket: | ||
| with output_file_path.open("w") as log_file: | ||
| async for message in websocket: | ||
| if isinstance(message, bytes): | ||
| message = message.decode("utf-8") | ||
| log_file.write(str(message)) | ||
| with ( | ||
| ws_connect(url, additional_headers=headers, ssl=ssl_context) as websocket, | ||
| output_file_path.open("w") as log_file, | ||
| ): | ||
| for message in websocket: | ||
| if isinstance(message, bytes): | ||
| message = message.decode("utf-8") | ||
| log_file.write(str(message)) | ||
|
Comment on lines
498
to
522
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. WebSocket can hang indefinitely
This is new behavior due to the async→sync refactor and can deadlock the overall runner completion path. |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Misleading shutdown completion log
shutdown()logs "Waiting for all jobs to be killed." but the implementation does not wait for anything (it only loops overself.jobsand callsself.system.kill(job)). This is misleading during failures/cancel, and can also mask cases wherekill()is async/eventually-consistent and jobs remain running. Either restore the previous “All jobs have been killed.” wording, or implement an actual wait/poll loop untilis_job_completed/is_job_runningindicates termination.