Skip to content

Commit 7838211

Browse files
author
Chojan Shang
committed
refactor: spilt agent and client
Signed-off-by: Chojan Shang <chojan.shang@vesoft.com>
1 parent a6ab431 commit 7838211

File tree

2 files changed

+94
-74
lines changed

2 files changed

+94
-74
lines changed

examples/mini_swe_agent/client.py

Lines changed: 48 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
RequestPermissionResponse,
3131
SessionNotification,
3232
SetSessionModeRequest,
33-
stdio_streams,
3433
)
3534
from acp.schema import (
3635
ContentBlock1,
@@ -45,6 +44,7 @@
4544
ToolCallContent1,
4645
ToolCallUpdate,
4746
)
47+
from acp.stdio import _WritePipeProtocol
4848

4949

5050
MODE = Literal["confirm", "yolo", "human"]
@@ -324,7 +324,7 @@ def _ask_initial_task(self) -> None:
324324
task = self.input_container.request_input("Enter your task for mini-swe-agent:")
325325
blocks = [ContentBlock1(type="text", text=task)]
326326
self._outbox.put(blocks)
327-
self._start_backend()
327+
self._start_connection_thread()
328328

329329
def on_unmount(self) -> None:
330330
if self._bg_loop:
@@ -335,57 +335,58 @@ def on_unmount(self) -> None:
335335

336336
# --- Backend comms ---
337337

338-
def _start_backend(self) -> None:
339-
def _runner():
338+
def _start_connection_thread(self) -> None:
339+
"""Start a background thread running the ACP connection event loop."""
340+
341+
def _runner() -> None:
340342
loop = asyncio.new_event_loop()
341343
self._bg_loop = loop
342344
asyncio.set_event_loop(loop)
343-
loop.run_until_complete(self._backend_main())
345+
loop.run_until_complete(self._run_connection())
344346

345347
t = threading.Thread(target=_runner, daemon=True)
346348
t.start()
347349
self._bg_thread = t
348350

349-
async def _backend_main(self) -> None:
350-
# Spawn the agent as a child process and talk over its stdio pipes
351-
import sys as _sys
352-
353-
agent_path = str(Path(__file__).parent / "agent.py")
354-
env = os.environ.copy()
355-
src_dir = str((Path(__file__).resolve().parents[2] / "src").resolve())
356-
env["PYTHONPATH"] = src_dir + os.pathsep + env.get("PYTHONPATH", "")
357-
# Load .env into environment for child agent
358-
try:
359-
from pathlib import Path as _P
360-
361-
dotenv_path = _P(__file__).resolve().parents[2] / ".env"
362-
if dotenv_path.is_file():
363-
for line in dotenv_path.read_text().splitlines():
364-
s = line.strip()
365-
if not s or s.startswith("#"):
366-
continue
367-
if s.startswith("export "):
368-
s = s[len("export ") :]
369-
if "=" not in s:
370-
continue
371-
k, v = s.split("=", 1)
372-
k = k.strip()
373-
v = v.strip().strip('"').strip("'")
374-
env.setdefault(k, v)
375-
except Exception:
376-
pass
377-
proc = await asyncio.create_subprocess_exec(
378-
_sys.executable,
379-
agent_path,
380-
stdin=asyncio.subprocess.PIPE,
381-
stdout=asyncio.subprocess.PIPE,
382-
stderr=_sys.stderr,
383-
env=env,
384-
)
385-
assert proc.stdout and proc.stdin
386-
# Use subprocess-provided streams directly
387-
reader = proc.stdout
388-
writer = proc.stdin
351+
async def _open_acp_streams_from_env(self) -> tuple[Optional[asyncio.StreamReader], Optional[asyncio.StreamWriter]]:
352+
"""If launched via duet, open ACP streams from inherited FDs; else return (None, None)."""
353+
read_fd_s = os.environ.get("MSWEA_READ_FD")
354+
write_fd_s = os.environ.get("MSWEA_WRITE_FD")
355+
if not read_fd_s or not write_fd_s:
356+
return None, None
357+
read_fd = int(read_fd_s)
358+
write_fd = int(write_fd_s)
359+
loop = asyncio.get_running_loop()
360+
# Reader
361+
reader = asyncio.StreamReader()
362+
reader_proto = asyncio.StreamReaderProtocol(reader)
363+
r_file = os.fdopen(read_fd, "rb", buffering=0)
364+
await loop.connect_read_pipe(lambda: reader_proto, r_file)
365+
# Writer
366+
write_proto = _WritePipeProtocol()
367+
w_file = os.fdopen(write_fd, "wb", buffering=0)
368+
transport, _ = await loop.connect_write_pipe(lambda: write_proto, w_file)
369+
writer = asyncio.StreamWriter(transport, write_proto, None, loop)
370+
return reader, writer
371+
372+
async def _run_connection(self) -> None:
373+
"""Run the ACP client connection using FDs provided by duet; do not fallback."""
374+
reader, writer = await self._open_acp_streams_from_env()
375+
if reader is None or writer is None: # type: ignore[truthy-bool]
376+
# Do not fallback; inform user and stop
377+
self.call_from_thread(
378+
lambda: (
379+
self.enqueue_message(
380+
UIMessage(
381+
"assistant",
382+
"Communication endpoints not provided. Please launch via examples/mini_swe_agent/duet.py",
383+
)
384+
),
385+
self.on_message_added(),
386+
)
387+
)
388+
self.agent_state = "STOPPED"
389+
return
389390

390391
self._conn = ClientSideConnection(lambda _agent: MiniSweClientImpl(self), writer, reader)
391392
try:
@@ -411,7 +412,9 @@ async def _backend_main(self) -> None:
411412
self.on_message_added(),
412413
)
413414
)
415+
self.agent_state = "STOPPED"
414416
return
417+
415418
# Autostep loop: take queued prompts and send; if none and mode != human, keep stepping
416419
while True:
417420
blocks: list[ContentBlock1]

examples/mini_swe_agent/duet.py

Lines changed: 46 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,67 @@
11
import asyncio
22
import contextlib
33
import os
4-
import signal
54
import sys
65
from pathlib import Path
76

87

9-
async def _relay(reader: asyncio.StreamReader, writer: asyncio.StreamWriter, tag: str):
10-
try:
11-
while True:
12-
line = await reader.readline()
13-
if not line:
14-
break
15-
writer.write(line)
16-
try:
17-
await writer.drain()
18-
except ConnectionError:
19-
break
20-
# Mirror minimal logs for visibility
21-
sys.stderr.write(f"[{tag}] {line.decode('utf-8', errors='replace')}")
22-
sys.stderr.flush()
23-
finally:
24-
try:
25-
writer.close()
26-
await writer.wait_closed()
27-
except Exception:
28-
pass
29-
30-
318
async def main() -> None:
32-
# Launch the Textual client only; it will spawn the agent as a child and connect to it via pipes.
9+
# Launch agent and client, wiring a dedicated pipe pair for ACP protocol.
10+
# Client keeps its own stdin/stdout for the Textual UI.
3311
root = Path(__file__).resolve().parent
12+
agent_path = str(root / "agent.py")
3413
client_path = str(root / "client.py")
3514

36-
env = os.environ.copy()
15+
# Load .env into process env so children inherit it (prefer python-dotenv if available)
16+
try:
17+
from dotenv import load_dotenv # type: ignore
18+
19+
load_dotenv(dotenv_path=str(root.parents[2] / ".env"), override=True)
20+
except Exception:
21+
pass
22+
23+
base_env = os.environ.copy()
3724
src_dir = str((root.parents[1] / "src").resolve())
38-
env["PYTHONPATH"] = src_dir + os.pathsep + env.get("PYTHONPATH", "")
25+
base_env["PYTHONPATH"] = src_dir + os.pathsep + base_env.get("PYTHONPATH", "")
26+
27+
# Create two pipes: agent->client and client->agent
28+
a2c_r, a2c_w = os.pipe()
29+
c2a_r, c2a_w = os.pipe()
30+
# Ensure the FDs we pass to children are inheritable
31+
for fd in (a2c_r, a2c_w, c2a_r, c2a_w):
32+
os.set_inheritable(fd, True)
33+
34+
# Start agent: stdin <- client (c2a_r), stdout -> client (a2c_w)
35+
agent = await asyncio.create_subprocess_exec(
36+
sys.executable,
37+
agent_path,
38+
stdin=c2a_r,
39+
stdout=a2c_w,
40+
stderr=sys.stderr,
41+
env=base_env,
42+
close_fds=True,
43+
)
44+
45+
# Start client with ACP FDs exported via environment; keep terminal IO for UI
46+
client_env = base_env.copy()
47+
client_env["MSWEA_READ_FD"] = str(a2c_r) # where client reads ACP messages
48+
client_env["MSWEA_WRITE_FD"] = str(c2a_w) # where client writes ACP messages
3949

4050
client = await asyncio.create_subprocess_exec(
4151
sys.executable,
4252
client_path,
43-
stderr=sys.stderr,
44-
env=env,
53+
env=client_env,
54+
pass_fds=(a2c_r, c2a_w), # ensure client inherits these FDs
55+
close_fds=True,
4556
)
4657

47-
await client.wait()
58+
# Close parent's copies of the pipe ends to avoid leaks
59+
for fd in (a2c_r, a2c_w, c2a_r, c2a_w):
60+
with contextlib.suppress(OSError):
61+
os.close(fd)
62+
63+
# Wait for processes to finish; no relay needed
64+
await asyncio.gather(agent.wait(), client.wait())
4865

4966

5067
if __name__ == "__main__":

0 commit comments

Comments
 (0)