Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 58 additions & 39 deletions agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,46 +74,65 @@ async def run_agent_session(
await client.query(message)

# Collect response text and show tool use
# Retry receive_response() on MessageParseError — the SDK raises this for
# unknown CLI message types (e.g. "rate_limit_event") which kills the async
# generator. The subprocess is still alive so we restart to read remaining
# messages from the buffered channel.
response_text = ""
async for msg in client.receive_response():
msg_type = type(msg).__name__

# Handle AssistantMessage (text and tool use)
if msg_type == "AssistantMessage" and hasattr(msg, "content"):
for block in msg.content:
block_type = type(block).__name__

if block_type == "TextBlock" and hasattr(block, "text"):
response_text += block.text
print(block.text, end="", flush=True)
elif block_type == "ToolUseBlock" and hasattr(block, "name"):
print(f"\n[Tool: {block.name}]", flush=True)
if hasattr(block, "input"):
input_str = str(block.input)
if len(input_str) > 200:
print(f" Input: {input_str[:200]}...", flush=True)
else:
print(f" Input: {input_str}", flush=True)

# Handle UserMessage (tool results)
elif msg_type == "UserMessage" and hasattr(msg, "content"):
for block in msg.content:
block_type = type(block).__name__

if block_type == "ToolResultBlock":
result_content = getattr(block, "content", "")
is_error = getattr(block, "is_error", False)

# Check if command was blocked by security hook
if "blocked" in str(result_content).lower():
print(f" [BLOCKED] {result_content}", flush=True)
elif is_error:
# Show errors (truncated)
error_str = str(result_content)[:500]
print(f" [Error] {error_str}", flush=True)
else:
# Tool succeeded - just show brief confirmation
print(" [Done]", flush=True)
max_parse_retries = 50
parse_retries = 0
while True:
try:
async for msg in client.receive_response():
msg_type = type(msg).__name__

# Handle AssistantMessage (text and tool use)
if msg_type == "AssistantMessage" and hasattr(msg, "content"):
for block in msg.content:
block_type = type(block).__name__

if block_type == "TextBlock" and hasattr(block, "text"):
response_text += block.text
print(block.text, end="", flush=True)
elif block_type == "ToolUseBlock" and hasattr(block, "name"):
print(f"\n[Tool: {block.name}]", flush=True)
if hasattr(block, "input"):
input_str = str(block.input)
if len(input_str) > 200:
print(f" Input: {input_str[:200]}...", flush=True)
else:
print(f" Input: {input_str}", flush=True)

# Handle UserMessage (tool results)
elif msg_type == "UserMessage" and hasattr(msg, "content"):
for block in msg.content:
block_type = type(block).__name__

if block_type == "ToolResultBlock":
result_content = getattr(block, "content", "")
is_error = getattr(block, "is_error", False)

# Check if command was blocked by security hook
if "blocked" in str(result_content).lower():
print(f" [BLOCKED] {result_content}", flush=True)
elif is_error:
# Show errors (truncated)
error_str = str(result_content)[:500]
print(f" [Error] {error_str}", flush=True)
else:
# Tool succeeded - just show brief confirmation
print(" [Done]", flush=True)

break # Normal completion
except Exception as inner_exc:
if type(inner_exc).__name__ == "MessageParseError":
parse_retries += 1
if parse_retries > max_parse_retries:
print(f"Too many unrecognized CLI messages ({parse_retries}), stopping")
break
print(f"Ignoring unrecognized message from Claude CLI: {inner_exc}")
continue
raise # Re-raise to outer except

print("\n" + "-" * 70 + "\n")
return "continue", response_text
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "autoforge-ai",
"version": "0.1.14",
"version": "0.1.15",
"description": "Autonomous coding agent with web UI - build complete apps with AI",
"license": "AGPL-3.0",
"bin": {
Expand Down
105 changes: 42 additions & 63 deletions server/services/assistant_chat_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
but cannot modify any files.
"""

import asyncio
import json
import logging
import os
Expand All @@ -27,10 +26,9 @@
get_messages,
)
from .chat_constants import (
MAX_CHAT_RATE_LIMIT_RETRIES,
ROOT_DIR,
calculate_rate_limit_backoff,
check_rate_limit_error,
safe_receive_response,
)

# Load environment variables from .env file if present
Expand Down Expand Up @@ -399,66 +397,47 @@ async def _query_claude(self, message: str) -> AsyncGenerator[dict, None]:

full_response = ""

# Stream the response (with rate-limit retry)
for _attempt in range(MAX_CHAT_RATE_LIMIT_RETRIES + 1):
try:
async for msg in self.client.receive_response():
msg_type = type(msg).__name__

if msg_type == "AssistantMessage" and hasattr(msg, "content"):
for block in msg.content:
block_type = type(block).__name__

if block_type == "TextBlock" and hasattr(block, "text"):
text = block.text
if text:
full_response += text
yield {"type": "text", "content": text}

elif block_type == "ToolUseBlock" and hasattr(block, "name"):
tool_name = block.name
tool_input = getattr(block, "input", {})

# Intercept ask_user tool calls -> yield as question message
if tool_name == "mcp__features__ask_user":
questions = tool_input.get("questions", [])
if questions:
yield {
"type": "question",
"questions": questions,
}
continue

yield {
"type": "tool_call",
"tool": tool_name,
"input": tool_input,
}
# Completed successfully — break out of retry loop
break
except Exception as exc:
is_rate_limit, retry_secs = check_rate_limit_error(exc)
if is_rate_limit and _attempt < MAX_CHAT_RATE_LIMIT_RETRIES:
delay = retry_secs if retry_secs else calculate_rate_limit_backoff(_attempt)
logger.warning(f"Rate limited (attempt {_attempt + 1}/{MAX_CHAT_RATE_LIMIT_RETRIES}), retrying in {delay}s")
yield {
"type": "rate_limited",
"retry_in": delay,
"attempt": _attempt + 1,
"max_attempts": MAX_CHAT_RATE_LIMIT_RETRIES,
}
await asyncio.sleep(delay)
await self.client.query(message)
continue
if is_rate_limit:
logger.error("Rate limit retries exhausted for assistant chat")
yield {"type": "error", "content": "Rate limited. Please try again later."}
return
# Non-rate-limit MessageParseError: log and break (don't crash)
if type(exc).__name__ == "MessageParseError":
logger.warning(f"Ignoring unrecognized message from Claude CLI: {exc}")
break
raise
# Stream the response
try:
async for msg in safe_receive_response(self.client, logger):
msg_type = type(msg).__name__

if msg_type == "AssistantMessage" and hasattr(msg, "content"):
for block in msg.content:
block_type = type(block).__name__

if block_type == "TextBlock" and hasattr(block, "text"):
text = block.text
if text:
full_response += text
yield {"type": "text", "content": text}

elif block_type == "ToolUseBlock" and hasattr(block, "name"):
tool_name = block.name
tool_input = getattr(block, "input", {})

# Intercept ask_user tool calls -> yield as question message
if tool_name == "mcp__features__ask_user":
questions = tool_input.get("questions", [])
if questions:
yield {
"type": "question",
"questions": questions,
}
continue

yield {
"type": "tool_call",
"tool": tool_name,
"input": tool_input,
}
except Exception as exc:
is_rate_limit, _ = check_rate_limit_error(exc)
if is_rate_limit:
logger.warning(f"Rate limited: {exc}")
yield {"type": "error", "content": "Rate limited. Please try again later."}
return
raise

# Store the complete response in the database
if full_response and self.conversation_id:
Expand Down
60 changes: 37 additions & 23 deletions server/services/chat_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import logging
import sys
from pathlib import Path
from typing import AsyncGenerator
from typing import Any, AsyncGenerator

# -------------------------------------------------------------------
# Root directory of the autoforge project (repository root).
Expand All @@ -33,47 +33,61 @@
# imports continue to work unchanged.
# -------------------------------------------------------------------
from env_constants import API_ENV_VARS # noqa: E402, F401
from rate_limit_utils import calculate_rate_limit_backoff, is_rate_limit_error, parse_retry_after # noqa: E402, F401
from rate_limit_utils import is_rate_limit_error, parse_retry_after # noqa: E402, F401

logger = logging.getLogger(__name__)

# -------------------------------------------------------------------
# Rate-limit handling for chat sessions
# -------------------------------------------------------------------
MAX_CHAT_RATE_LIMIT_RETRIES = 3


def check_rate_limit_error(exc: Exception) -> tuple[bool, int | None]:
"""Inspect an exception and determine if it represents a rate-limit.

Returns ``(is_rate_limit, retry_seconds)``. ``retry_seconds`` is the
parsed Retry-After value when available, otherwise ``None`` (caller
should use exponential backoff).

Handles:
- ``MessageParseError`` whose raw *data* dict has
``type == "rate_limit_event"`` (Claude CLI sends this).
- Any exception whose string representation matches known rate-limit
patterns (via ``rate_limit_utils.is_rate_limit_error``).
"""
exc_str = str(exc)

# Check for MessageParseError with a rate_limit_event payload
cls_name = type(exc).__name__
if cls_name == "MessageParseError":
raw_data = getattr(exc, "data", None)
if isinstance(raw_data, dict) and raw_data.get("type") == "rate_limit_event":
retry = parse_retry_after(str(raw_data)) if raw_data else None
return True, retry
# MessageParseError = unknown CLI message type (e.g. "rate_limit_event").
# These are informational events, NOT actual rate limit errors.
# The word "rate_limit" in the type name would false-positive the regex.
if type(exc).__name__ == "MessageParseError":
return False, None

# Fallback: match error text against known rate-limit patterns
# For all other exceptions: match error text against known rate-limit patterns
exc_str = str(exc)
if is_rate_limit_error(exc_str):
retry = parse_retry_after(exc_str)
return True, retry

return False, None


async def safe_receive_response(client: Any, log: logging.Logger) -> AsyncGenerator:
"""Wrap ``client.receive_response()`` to skip ``MessageParseError``.

The Claude Code CLI may emit message types (e.g. ``rate_limit_event``)
that the installed Python SDK does not recognise, causing
``MessageParseError`` which kills the async generator. The CLI
subprocess is still alive and the SDK uses a buffered memory channel,
so we restart ``receive_response()`` to continue reading remaining
messages without losing data.
"""
max_retries = 50
retries = 0
while True:
try:
async for msg in client.receive_response():
yield msg
return # Normal completion
except Exception as exc:
if type(exc).__name__ == "MessageParseError":
retries += 1
if retries > max_retries:
log.error(f"Too many unrecognized CLI messages ({retries}), stopping")
return
log.warning(f"Ignoring unrecognized message from Claude CLI: {exc}")
continue
raise


async def make_multimodal_message(content_blocks: list[dict]) -> AsyncGenerator[dict, None]:
"""Yield a single multimodal user message in Claude Agent SDK format.

Expand Down
Loading
Loading