diff --git a/parallel_orchestrator.py b/parallel_orchestrator.py index b7f2bac5..31d9348e 100644 --- a/parallel_orchestrator.py +++ b/parallel_orchestrator.py @@ -27,6 +27,7 @@ import subprocess import sys import threading +import time from datetime import datetime, timezone from pathlib import Path from typing import Any, Callable, Literal @@ -135,6 +136,7 @@ def _dump_database_state(feature_dicts: list[dict], label: str = ""): POLL_INTERVAL = 5 # seconds between checking for ready features MAX_FEATURE_RETRIES = 3 # Maximum times to retry a failed feature INITIALIZER_TIMEOUT = 1800 # 30 minutes timeout for initializer +AGENT_SPAWN_STAGGER_SECS = 1.5 # Delay between consecutive agent spawns to avoid ~/.claude.json race condition class ParallelOrchestrator: @@ -225,6 +227,9 @@ def __init__( self._agent_completed_event: asyncio.Event | None = None # Created in run_loop self._event_loop: asyncio.AbstractEventLoop | None = None # Stored for thread-safe signaling + # Track time of last agent spawn to enforce stagger between all spawns (coding + testing) + self._last_spawn_time: float = 0.0 + # Database session for this orchestrator self._engine, self._session_maker = create_database(project_dir) @@ -643,7 +648,19 @@ def get_passing_count(self, feature_dicts: list[dict] | None = None) -> int: session.close() return sum(1 for fd in feature_dicts if fd.get("passes")) - def _maintain_testing_agents(self, feature_dicts: list[dict] | None = None) -> None: + async def _stagger_if_needed(self) -> None: + """Sleep if needed to enforce minimum gap between any two consecutive agent spawns. + + Prevents ~/.claude.json race conditions when coding and testing agents + start in rapid succession and concurrently read/write the config file. + Applies across all spawn types (coding and testing). + """ + elapsed = time.monotonic() - self._last_spawn_time + remaining = AGENT_SPAWN_STAGGER_SECS - elapsed + if remaining > 0: + await asyncio.sleep(remaining) + + async def _maintain_testing_agents(self, feature_dicts: list[dict] | None = None) -> None: """Maintain the desired count of testing agents independently. This runs every loop iteration and spawns testing agents as needed to maintain @@ -698,6 +715,8 @@ def _maintain_testing_agents(self, feature_dicts: list[dict] | None = None) -> N # Spawn outside lock (I/O bound operation) logger.debug("Spawning testing agent (%d/%d)", spawn_index, desired) + # Enforce minimum gap from last spawn of any type (coding or testing) + await self._stagger_if_needed() success, msg = self._spawn_testing_agent() if not success: debug_log.log("TESTING", f"Spawn failed, stopping: {msg}") @@ -862,6 +881,7 @@ def _spawn_coding_agent(self, feature_id: int) -> tuple[bool, str]: popen_kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW proc = subprocess.Popen(cmd, **popen_kwargs) + self._last_spawn_time = time.monotonic() except Exception as e: # Reset in_progress on failure session = self.get_session() @@ -925,6 +945,7 @@ def _spawn_coding_agent_batch(self, feature_ids: list[int]) -> tuple[bool, str]: popen_kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW proc = subprocess.Popen(cmd, **popen_kwargs) + self._last_spawn_time = time.monotonic() except Exception as e: # Reset in_progress on failure session = self.get_session() @@ -1030,6 +1051,7 @@ def _spawn_testing_agent(self) -> tuple[bool, str]: popen_kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW proc = subprocess.Popen(cmd, **popen_kwargs) + self._last_spawn_time = time.monotonic() except Exception as e: debug_log.log("TESTING", f"FAILED to spawn testing agent: {e}") return False, f"Failed to start testing agent: {e}" @@ -1549,7 +1571,7 @@ async def run_loop(self): continue # Maintain testing agents independently (runs every iteration) - self._maintain_testing_agents(feature_dicts) + await self._maintain_testing_agents(feature_dicts) # Check capacity with self._lock: @@ -1620,10 +1642,12 @@ async def run_loop(self): batch_count=len(batches), batches=[[f['id'] for f in b] for b in batches[:slots]]) - for batch in batches[:slots]: + for spawn_index, batch in enumerate(batches[:slots]): batch_ids = [f["id"] for f in batch] batch_names = [f"{f['id']}:{f['name']}" for f in batch] logger.debug("Starting batch: %s", batch_ids) + # Enforce minimum gap from last spawn of any type (coding or testing) + await self._stagger_if_needed() success, msg = self.start_feature_batch(batch_ids) if not success: logger.debug("Failed to start batch %s: %s", batch_ids, msg)