Skip to content

Conversation

@amaslenn
Copy link
Contributor

@amaslenn amaslenn commented Feb 6, 2026

Summary

Most of the calls inside the async wrappers are blocking: sync rest calls and system calls. So asyncio only adds a level of complexity without bringing any benefits.

Test Plan

  1. CI (updated)
  2. Manual runs of scenarios under conf/common + some private ones.

Additional Notes

Most of the calls inside the async wrappers are blocking: sync rest
calls and system calls. So asyncio only adds a level of complexity
without bringing any benefit.
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 6, 2026

📝 Walkthrough

Walkthrough

The codebase transitions from asynchronous to synchronous execution patterns across multiple subsystems. Async methods are converted to sync equivalents using time.sleep instead of asyncio.sleep, WebSocket handling shifts to synchronous clients, and dev dependencies are updated to remove pytest-asyncio. Public method signatures throughout the codebase change from async to sync.

Changes

Cohort / File(s) Summary
Dev Dependencies
pyproject.toml
Removed pytest-asyncio~=1.3; updated import-linter and pytest-deadfixtures to newer compatible versions (2.5 and 2.2 respectively).
Core Runner & Execution
src/cloudai/_core/base_runner.py, src/cloudai/_core/runner.py
Converted all async methods to synchronous equivalents. Replaced asyncio.sleep with time.sleep, removed await expressions, and updated method signatures (shutdown, run, submit_test, monitor_jobs, handle_job_completion, handle_dependencies, etc.). Signal handling now directly invokes shutdown instead of scheduling coroutines.
CLI & Configuration Handlers
src/cloudai/cli/handlers.py, src/cloudai/configurator/cloudai_gym.py
Replaced asyncio.run(runner.run()) with direct synchronous runner.run() calls. Removed asyncio imports. Updated copyright years.
RunAI System
src/cloudai/systems/runai/runai_rest_client.py, src/cloudai/systems/runai/runai_runner.py, src/cloudai/systems/runai/runai_system.py
Converted fetch_training_logs from async to sync using websockets.sync.client. Renamed job_completion_callback to on_job_completion as synchronous method. Converted store_logs from async to sync; replaced awaited API calls with direct invocations.
SLURM System
src/cloudai/systems/slurm/single_sbatch_runner.py
Converted run() from async to sync, replacing await asyncio.sleep() with time.sleep(). Removed asyncio import; added time import.
Tests
tests/test_acceptance.py, tests/test_base_runner.py
Updated mock target from asyncio.sleep to time.sleep in acceptance tests. Converted test methods and MyRunner overrides from async to sync; removed pytest.mark.asyncio decorators and await expressions.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐰 From async hops to steady stride,
The rabbit sleeps with time's own tide,
No more awaiting, just direct flow,
Through synchronous paths we now shall go!
One call, one step, the work is done—
No event loops left to run!

🚥 Pre-merge checks | ✅ 2
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The PR title 'Remove asyncio usage in jobs monitoring' accurately and specifically describes the main objective of the changeset: converting async code to synchronous implementations across multiple runner and system modules.
Description check ✅ Passed The PR description is relevant and clearly explains the rationale for the changes: blocking calls within async wrappers make asyncio unnecessary and add complexity without benefits.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch am/no-async

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@amaslenn amaslenn requested a review from podkidyshev February 6, 2026 10:52
podkidyshev
podkidyshev previously approved these changes Feb 6, 2026
@amaslenn amaslenn marked this pull request as ready for review February 10, 2026 11:38
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/cloudai/_core/base_runner.py (1)

120-130: ⚠️ Potential issue | 🟠 Major

delayed_submit_test now blocks the monitor loop for its entire delay.

Previously with asyncio, delayed_submit_test was scheduled as a concurrent task — the monitor loop continued while the delay elapsed. Now time.sleep(delay) blocks the calling thread. This matters when multiple dependent tests are triggered (e.g., in handle_dependencies at Line 346 or check_and_schedule_start_post_init_dependent_tests at Line 179): each call blocks for delay seconds sequentially, stalling the entire monitoring loop for N × 5 seconds.

If this is acceptable given the workloads, no change needed — but it's a behavioral regression from the async version worth acknowledging.

Alternative: submit immediately with an initial grace period tracked per-job

One option is to remove the sleep entirely and track a not_before timestamp on the test run, deferring actual submission until the next monitor iteration after the delay has elapsed. This preserves the delay semantics without blocking the loop.

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Feb 10, 2026

Greptile Overview

Greptile Summary

This PR removes asyncio from the jobs monitoring/runner path and converts the runner lifecycle (CLI entrypoints, Runner, BaseRunner, and RunAI log collection) to synchronous execution. asyncio.sleep() calls were replaced with time.sleep(), and RunAI training log streaming was migrated from async websockets.connect() to websockets.sync.client.connect().

Key integration points are cloudai.cli.handlers:handle_non_dse_job() calling Runner.run() synchronously, and BaseRunner.monitor_jobs() invoking on_job_completion() hooks which for RunAI now synchronously fetch events and stream logs into files.

Blocking issues to fix before merge:

  • RunAISystem.store_logs() appears to iterate the get_clusters() response incorrectly (for data in clusters_data instead of clusters_data.get("clusters", [])), which will likely fail at runtime when collecting logs on job completion.
  • The new synchronous RunAI WebSocket log streaming loop has no timeout/termination condition, so on_job_completion() can block indefinitely and stall overall scenario completion.
  • Runner.cancel_on_signal() now performs shutdown directly inside the signal handler, and shutdown can perform blocking operations (e.g., REST calls / subprocesses) which is unsafe and can make Ctrl-C/SIGTERM unresponsive.
  • BaseRunner.shutdown() logs "Waiting for all jobs to be killed" without actually waiting, which is misleading for cancellation/failure flows.

Confidence Score: 2/5

  • Not safe to merge until RunAI log collection and signal/shutdown behavior are corrected.
  • The sync refactor introduces at least one clear runtime error in RunAI cluster parsing during store_logs(), and the new sync websocket streaming loop can block indefinitely. Additionally, doing blocking work directly in the signal handler can make termination unreliable.
  • src/cloudai/systems/runai/runai_system.py, src/cloudai/systems/runai/runai_rest_client.py, src/cloudai/_core/runner.py, src/cloudai/_core/base_runner.py

Important Files Changed

Filename Overview
pyproject.toml Removed pytest-asyncio from dev deps and downgraded import-linter/pytest-deadfixtures to older ranges; otherwise no functional code changes.
src/cloudai/_core/base_runner.py Converted BaseRunner async workflow to synchronous with time.sleep; also changed shutdown log message (now misleading) and dependency handling now blocks during delays.
src/cloudai/_core/runner.py Converted Runner.run/cancel path to synchronous; signal handler now performs shutdown directly (can block inside signal context).
src/cloudai/cli/handlers.py Switched CLI execution from asyncio.run to direct synchronous Runner.run invocation.
src/cloudai/configurator/cloudai_gym.py Gym environment now calls runner.run synchronously instead of asyncio.run.
src/cloudai/systems/runai/runai_rest_client.py Converted RunAI training logs retrieval from async websockets to sync websockets client; current implementation can block indefinitely while streaming logs.
src/cloudai/systems/runai/runai_runner.py Renamed completion callback override to BaseRunner.on_job_completion and made RunAI log/event collection synchronous on completion.
src/cloudai/systems/runai/runai_system.py Converted store_logs to synchronous and now calls sync websocket log fetch; also appears to iterate clusters_data incorrectly when building RunAICluster list (runtime error likely).
src/cloudai/systems/slurm/single_sbatch_runner.py Converted SingleSbatchRunner.run from async to sync and replaced asyncio.sleep with time.sleep in polling loop.
tests/test_acceptance.py Updated dry-run tests to patch time.sleep instead of asyncio.sleep after sync refactor.
tests/test_base_runner.py Converted BaseRunner dependency tests from async pytest-asyncio to synchronous tests.
uv.lock Removed pytest-asyncio and backports-asyncio-runner from lockfile; adjusted import-linter/pytest-deadfixtures versions accordingly.

Sequence Diagram

sequenceDiagram
    participant CLI as cloudai CLI
    participant Runner as _core/runner.py:Runner
    participant BaseRunner as _core/base_runner.py:BaseRunner
    participant System as systems/*/System
    participant RunAI as RunAI REST/WebSocket

    CLI->>Runner: run() (handlers.py:193-196)
    Runner->>BaseRunner: runner.run() (runner.py:82-86)

    loop while jobs exist
        BaseRunner->>BaseRunner: check_start_post_init_dependencies()
        BaseRunner->>System: is_job_running/is_job_completed()
        BaseRunner->>BaseRunner: monitor_jobs()
        BaseRunner->>System: is_job_completed(job)
        alt completed
            BaseRunner->>BaseRunner: on_job_completion(job)
            opt RunAI system
                BaseRunner->>System: store_logs()/get_workload_events()
                System->>RunAI: fetch_training_logs() (WebSocket stream)
            end
            BaseRunner->>BaseRunner: handle_job_completion(job)
            BaseRunner->>BaseRunner: handle_dependencies(job)
        end
        BaseRunner->>BaseRunner: time.sleep(monitor_interval)
    end

    Note over Runner,BaseRunner: SIGINT/SIGTERM triggers Runner.cancel_on_signal()
    Runner-->>BaseRunner: shutdown() (runner.py:90-96)
    BaseRunner->>System: kill(job) for each running job
Loading

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

12 files reviewed, 5 comments

Edit Code Review Agent Settings | Greptile

Comment on lines +73 to +80
def shutdown(self):
"""Gracefully shut down the runner, terminating all outstanding jobs."""
self.shutting_down = True
logging.info("Terminating all jobs...")
for job in self.jobs:
logging.info(f"Terminating job {job.id} for test {job.test_run.name}")
self.system.kill(job)
logging.info("All jobs have been killed.")
logging.info("Waiting for all jobs to be killed.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Misleading shutdown completion log

shutdown() logs "Waiting for all jobs to be killed." but the implementation does not wait for anything (it only loops over self.jobs and calls self.system.kill(job)). This is misleading during failures/cancel, and can also mask cases where kill() is async/eventually-consistent and jobs remain running. Either restore the previous “All jobs have been killed.” wording, or implement an actual wait/poll loop until is_job_completed/is_job_running indicates termination.

Comment on lines 90 to +96
def cancel_on_signal(
self,
signum: int,
frame: Optional[FrameType], # noqa: Vulture
):
logging.info(f"Signal {signum} received, shutting down...")
asyncio.get_running_loop().call_soon_threadsafe(self._cancel_all)
self.runner.shutdown()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking work in signal handler

cancel_on_signal() now calls self.runner.shutdown() directly from the Python signal handler. Since shutdown() loops over jobs and calls system.kill(job) (which can involve blocking subprocess/HTTP calls), this can run for a long time inside the signal handler and delay process responsiveness/termination. Previously this was queued onto the asyncio loop (call_soon_threadsafe). With the sync design, it’s safer to make the handler set a flag and let the main loop perform shutdown outside the signal handler context.

Comment on lines 498 to 522
@@ -512,9 +512,11 @@ async def fetch_training_logs(
}

ssl_context = ssl._create_unverified_context()
async with websockets.connect(url, extra_headers=headers, ssl=ssl_context) as websocket:
with output_file_path.open("w") as log_file:
async for message in websocket:
if isinstance(message, bytes):
message = message.decode("utf-8")
log_file.write(str(message))
with (
ws_connect(url, additional_headers=headers, ssl=ssl_context) as websocket,
output_file_path.open("w") as log_file,
):
for message in websocket:
if isinstance(message, bytes):
message = message.decode("utf-8")
log_file.write(str(message))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WebSocket can hang indefinitely

fetch_training_logs() now uses websockets.sync.client.connect and iterates for message in websocket: with no timeout/termination condition. If the server keeps the socket open (or stalls without closing), this can block forever and prevent on_job_completion() from returning (RunAI runner calls store_logs() synchronously). Consider adding a receive timeout / max duration, or otherwise ensuring the loop terminates.

This is new behavior due to the async→sync refactor and can deadlock the overall runner completion path.

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Feb 10, 2026

Additional Comments (2)

src/cloudai/_core/base_runner.py
Potentially skipping job kills

handle_job_completion() removes completed_job from self.jobs and testrun_to_job_map before calling handle_dependencies(). For end_post_comp dependencies, handle_dependencies() iterates self.testrun_to_job_map.items() and schedules delayed_kill_job(dependent_job) for running dependents. If the dependent is the completed_job itself or relies on the just-removed map entry, this can silently miss dependency actions. Consider computing/scheduling dependency actions before mutating self.jobs/testrun_to_job_map, or iterating over a snapshot that includes the completed job.

Also appears in the same flow when monitor_jobs() calls on_job_completion(job) before handle_job_completion(job) (src/cloudai/_core/base_runner.py:246-274).


src/cloudai/systems/runai/runai_system.py
RunAI clusters iteration bug

clusters = [RunAICluster(**data) for data in clusters_data] iterates the top-level dict returned by get_clusters() (likely its keys), not the cluster entries themselves. In get_clusters() above you correctly use clusters_data.get("clusters", []). This mismatch will raise at runtime or produce invalid RunAICluster objects when store_logs() runs. Use the same .get("clusters", []) shape here as in get_clusters().

This affects log collection on job completion (src/cloudai/systems/runai/runai_runner.py:44-50).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants