Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions parallel_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import subprocess
import sys
import threading
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable, Literal
Expand Down Expand Up @@ -135,6 +136,7 @@ def _dump_database_state(feature_dicts: list[dict], label: str = ""):
POLL_INTERVAL = 5 # seconds between checking for ready features
MAX_FEATURE_RETRIES = 3 # Maximum times to retry a failed feature
INITIALIZER_TIMEOUT = 1800 # 30 minutes timeout for initializer
AGENT_SPAWN_STAGGER_SECS = 1.5 # Delay between consecutive agent spawns to avoid ~/.claude.json race condition


class ParallelOrchestrator:
Expand Down Expand Up @@ -225,6 +227,9 @@ def __init__(
self._agent_completed_event: asyncio.Event | None = None # Created in run_loop
self._event_loop: asyncio.AbstractEventLoop | None = None # Stored for thread-safe signaling

# Track time of last agent spawn to enforce stagger between all spawns (coding + testing)
self._last_spawn_time: float = 0.0

# Database session for this orchestrator
self._engine, self._session_maker = create_database(project_dir)

Expand Down Expand Up @@ -643,7 +648,19 @@ def get_passing_count(self, feature_dicts: list[dict] | None = None) -> int:
session.close()
return sum(1 for fd in feature_dicts if fd.get("passes"))

def _maintain_testing_agents(self, feature_dicts: list[dict] | None = None) -> None:
async def _stagger_if_needed(self) -> None:
"""Sleep if needed to enforce minimum gap between any two consecutive agent spawns.

Prevents ~/.claude.json race conditions when coding and testing agents
start in rapid succession and concurrently read/write the config file.
Applies across all spawn types (coding and testing).
"""
elapsed = time.monotonic() - self._last_spawn_time
remaining = AGENT_SPAWN_STAGGER_SECS - elapsed
if remaining > 0:
await asyncio.sleep(remaining)

async def _maintain_testing_agents(self, feature_dicts: list[dict] | None = None) -> None:
"""Maintain the desired count of testing agents independently.

This runs every loop iteration and spawns testing agents as needed to maintain
Expand Down Expand Up @@ -698,6 +715,8 @@ def _maintain_testing_agents(self, feature_dicts: list[dict] | None = None) -> N

# Spawn outside lock (I/O bound operation)
logger.debug("Spawning testing agent (%d/%d)", spawn_index, desired)
# Enforce minimum gap from last spawn of any type (coding or testing)
await self._stagger_if_needed()
success, msg = self._spawn_testing_agent()
if not success:
debug_log.log("TESTING", f"Spawn failed, stopping: {msg}")
Expand Down Expand Up @@ -862,6 +881,7 @@ def _spawn_coding_agent(self, feature_id: int) -> tuple[bool, str]:
popen_kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW

proc = subprocess.Popen(cmd, **popen_kwargs)
self._last_spawn_time = time.monotonic()
except Exception as e:
# Reset in_progress on failure
session = self.get_session()
Expand Down Expand Up @@ -925,6 +945,7 @@ def _spawn_coding_agent_batch(self, feature_ids: list[int]) -> tuple[bool, str]:
popen_kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW

proc = subprocess.Popen(cmd, **popen_kwargs)
self._last_spawn_time = time.monotonic()
except Exception as e:
# Reset in_progress on failure
session = self.get_session()
Expand Down Expand Up @@ -1030,6 +1051,7 @@ def _spawn_testing_agent(self) -> tuple[bool, str]:
popen_kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW

proc = subprocess.Popen(cmd, **popen_kwargs)
self._last_spawn_time = time.monotonic()
except Exception as e:
debug_log.log("TESTING", f"FAILED to spawn testing agent: {e}")
return False, f"Failed to start testing agent: {e}"
Expand Down Expand Up @@ -1549,7 +1571,7 @@ async def run_loop(self):
continue

# Maintain testing agents independently (runs every iteration)
self._maintain_testing_agents(feature_dicts)
await self._maintain_testing_agents(feature_dicts)

# Check capacity
with self._lock:
Expand Down Expand Up @@ -1620,10 +1642,12 @@ async def run_loop(self):
batch_count=len(batches),
batches=[[f['id'] for f in b] for b in batches[:slots]])

for batch in batches[:slots]:
for spawn_index, batch in enumerate(batches[:slots]):
batch_ids = [f["id"] for f in batch]
batch_names = [f"{f['id']}:{f['name']}" for f in batch]
logger.debug("Starting batch: %s", batch_ids)
# Enforce minimum gap from last spawn of any type (coding or testing)
await self._stagger_if_needed()
success, msg = self.start_feature_batch(batch_ids)
if not success:
logger.debug("Failed to start batch %s: %s", batch_ids, msg)
Expand Down