From 4f7c24cf1e7eb323db34c198b922ec975c890bcf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Stuphorn?= Date: Fri, 27 Feb 2026 10:10:42 +0100 Subject: [PATCH 1/2] fix: stagger parallel agent spawns to prevent ~/.claude.json race condition When running with concurrency > 1, multiple agents were spawned in rapid succession within the same loop iteration. All agents started nearly simultaneously and concurrently read/wrote ~/.claude.json during Claude SDK initialization, causing intermittent "JSON Parse error: Unexpected EOF" errors. Fix: introduce AGENT_SPAWN_STAGGER_SECS (1.5s) delay between consecutive agent spawns in both the coding batch loop and _maintain_testing_agents. The first spawn in each burst has zero added latency; only subsequent spawns in the same burst are staggered. - Add AGENT_SPAWN_STAGGER_SECS = 1.5 constant - Make _maintain_testing_agents async; add stagger between testing agents - Add stagger between coding batch spawns in the main run_loop - Update call site to await _maintain_testing_agents Co-Authored-By: Claude Sonnet 4.6 --- parallel_orchestrator.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/parallel_orchestrator.py b/parallel_orchestrator.py index b7f2bac5..35b46331 100644 --- a/parallel_orchestrator.py +++ b/parallel_orchestrator.py @@ -135,6 +135,7 @@ def _dump_database_state(feature_dicts: list[dict], label: str = ""): POLL_INTERVAL = 5 # seconds between checking for ready features MAX_FEATURE_RETRIES = 3 # Maximum times to retry a failed feature INITIALIZER_TIMEOUT = 1800 # 30 minutes timeout for initializer +AGENT_SPAWN_STAGGER_SECS = 1.5 # Delay between consecutive agent spawns to avoid ~/.claude.json race condition class ParallelOrchestrator: @@ -643,7 +644,7 @@ def get_passing_count(self, feature_dicts: list[dict] | None = None) -> int: session.close() return sum(1 for fd in feature_dicts if fd.get("passes")) - def _maintain_testing_agents(self, feature_dicts: list[dict] | None = None) -> None: + async def _maintain_testing_agents(self, feature_dicts: list[dict] | None = None) -> None: """Maintain the desired count of testing agents independently. This runs every loop iteration and spawns testing agents as needed to maintain @@ -698,6 +699,9 @@ def _maintain_testing_agents(self, feature_dicts: list[dict] | None = None) -> N # Spawn outside lock (I/O bound operation) logger.debug("Spawning testing agent (%d/%d)", spawn_index, desired) + # Stagger consecutive spawns to avoid ~/.claude.json race condition + if spawn_index > 1: + await asyncio.sleep(AGENT_SPAWN_STAGGER_SECS) success, msg = self._spawn_testing_agent() if not success: debug_log.log("TESTING", f"Spawn failed, stopping: {msg}") @@ -1549,7 +1553,7 @@ async def run_loop(self): continue # Maintain testing agents independently (runs every iteration) - self._maintain_testing_agents(feature_dicts) + await self._maintain_testing_agents(feature_dicts) # Check capacity with self._lock: @@ -1620,10 +1624,13 @@ async def run_loop(self): batch_count=len(batches), batches=[[f['id'] for f in b] for b in batches[:slots]]) - for batch in batches[:slots]: + for spawn_index, batch in enumerate(batches[:slots]): batch_ids = [f["id"] for f in batch] batch_names = [f"{f['id']}:{f['name']}" for f in batch] logger.debug("Starting batch: %s", batch_ids) + # Stagger consecutive agent spawns to avoid ~/.claude.json race condition + if spawn_index > 0: + await asyncio.sleep(AGENT_SPAWN_STAGGER_SECS) success, msg = self.start_feature_batch(batch_ids) if not success: logger.debug("Failed to start batch %s: %s", batch_ids, msg) From fc0ef204db6ba8e7482d1daf45adb60fc38d86dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Stuphorn?= Date: Fri, 27 Feb 2026 10:14:58 +0100 Subject: [PATCH 2/2] fix: enforce spawn stagger across coding AND testing agents The previous fix only staggered spawns within the same agent type. The race condition also occurred when a testing agent and a coding agent were spawned in the same loop iteration with no delay between them. Replace the per-type index guards with a single _last_spawn_time float tracked on the orchestrator instance. A new _stagger_if_needed() async helper sleeps for the remaining time before each spawn, regardless of agent type. _last_spawn_time is updated immediately after every subprocess.Popen() call in all three spawn methods (_spawn_coding_agent, _spawn_coding_agent_batch, _spawn_testing_agent). This ensures at least AGENT_SPAWN_STAGGER_SECS (1.5s) between any two consecutive agent starts, closing the cross-type race window. Co-Authored-By: Claude Sonnet 4.6 --- parallel_orchestrator.py | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/parallel_orchestrator.py b/parallel_orchestrator.py index 35b46331..31d9348e 100644 --- a/parallel_orchestrator.py +++ b/parallel_orchestrator.py @@ -27,6 +27,7 @@ import subprocess import sys import threading +import time from datetime import datetime, timezone from pathlib import Path from typing import Any, Callable, Literal @@ -226,6 +227,9 @@ def __init__( self._agent_completed_event: asyncio.Event | None = None # Created in run_loop self._event_loop: asyncio.AbstractEventLoop | None = None # Stored for thread-safe signaling + # Track time of last agent spawn to enforce stagger between all spawns (coding + testing) + self._last_spawn_time: float = 0.0 + # Database session for this orchestrator self._engine, self._session_maker = create_database(project_dir) @@ -644,6 +648,18 @@ def get_passing_count(self, feature_dicts: list[dict] | None = None) -> int: session.close() return sum(1 for fd in feature_dicts if fd.get("passes")) + async def _stagger_if_needed(self) -> None: + """Sleep if needed to enforce minimum gap between any two consecutive agent spawns. + + Prevents ~/.claude.json race conditions when coding and testing agents + start in rapid succession and concurrently read/write the config file. + Applies across all spawn types (coding and testing). + """ + elapsed = time.monotonic() - self._last_spawn_time + remaining = AGENT_SPAWN_STAGGER_SECS - elapsed + if remaining > 0: + await asyncio.sleep(remaining) + async def _maintain_testing_agents(self, feature_dicts: list[dict] | None = None) -> None: """Maintain the desired count of testing agents independently. @@ -699,9 +715,8 @@ async def _maintain_testing_agents(self, feature_dicts: list[dict] | None = None # Spawn outside lock (I/O bound operation) logger.debug("Spawning testing agent (%d/%d)", spawn_index, desired) - # Stagger consecutive spawns to avoid ~/.claude.json race condition - if spawn_index > 1: - await asyncio.sleep(AGENT_SPAWN_STAGGER_SECS) + # Enforce minimum gap from last spawn of any type (coding or testing) + await self._stagger_if_needed() success, msg = self._spawn_testing_agent() if not success: debug_log.log("TESTING", f"Spawn failed, stopping: {msg}") @@ -866,6 +881,7 @@ def _spawn_coding_agent(self, feature_id: int) -> tuple[bool, str]: popen_kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW proc = subprocess.Popen(cmd, **popen_kwargs) + self._last_spawn_time = time.monotonic() except Exception as e: # Reset in_progress on failure session = self.get_session() @@ -929,6 +945,7 @@ def _spawn_coding_agent_batch(self, feature_ids: list[int]) -> tuple[bool, str]: popen_kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW proc = subprocess.Popen(cmd, **popen_kwargs) + self._last_spawn_time = time.monotonic() except Exception as e: # Reset in_progress on failure session = self.get_session() @@ -1034,6 +1051,7 @@ def _spawn_testing_agent(self) -> tuple[bool, str]: popen_kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW proc = subprocess.Popen(cmd, **popen_kwargs) + self._last_spawn_time = time.monotonic() except Exception as e: debug_log.log("TESTING", f"FAILED to spawn testing agent: {e}") return False, f"Failed to start testing agent: {e}" @@ -1628,9 +1646,8 @@ async def run_loop(self): batch_ids = [f["id"] for f in batch] batch_names = [f"{f['id']}:{f['name']}" for f in batch] logger.debug("Starting batch: %s", batch_ids) - # Stagger consecutive agent spawns to avoid ~/.claude.json race condition - if spawn_index > 0: - await asyncio.sleep(AGENT_SPAWN_STAGGER_SECS) + # Enforce minimum gap from last spawn of any type (coding or testing) + await self._stagger_if_needed() success, msg = self.start_feature_batch(batch_ids) if not success: logger.debug("Failed to start batch %s: %s", batch_ids, msg)