Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions astrbot/core/platform/sources/aiocqhttp/aiocqhttp_platform_adapter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import itertools
import json
import logging
import time
import uuid
Expand Down Expand Up @@ -139,6 +140,118 @@ async def convert_message(self, event: Event) -> AstrBotMessage | None:

return abm

def _extract_forward_text_from_nodes(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 建议将归一化、分段渲染、转发内容拉取以及 ID/占位符/截断等逻辑拆分成若干小的辅助方法,让新的转发消息处理逻辑更加声明式、易读。

你可以保留现在的功能,但通过把几个职责拆分到 helper 中来降低整体的理解成本。

1. 拆分 _extract_forward_text_from_nodes

当前该方法同时处理:

  • 发送者名称解析
  • 原始内容归一化(message / content / JSON / list / str)
  • 按段渲染
  • 对嵌套转发的递归处理

你可以把归一化 + 单段渲染逻辑抽成几个小 helper:

def _normalize_forward_content(self, node: dict[str, Any]) -> list[Any]:
    raw_content = node.get("message") or node.get("content", [])
    if isinstance(raw_content, list):
        return raw_content
    if isinstance(raw_content, str) and raw_content.strip():
        try:
            parsed = json.loads(raw_content)
            if isinstance(parsed, list):
                return parsed
        except Exception:
            pass
        return [{"type": "text", "data": {"text": raw_content}}]
    return []
def _render_forward_segment(
    self,
    seg: dict[str, Any],
    depth: int,
    max_depth: int,
) -> str:
    seg_type = seg.get("type")
    seg_data = seg.get("data", {}) if isinstance(seg.get("data"), dict) else {}

    if seg_type in ("text", "plain"):
        text = seg_data.get("text", "")
        return text if isinstance(text, str) else ""
    if seg_type == "at":
        qq = seg_data.get("qq")
        return f"@{qq}" if qq else ""
    if seg_type == "image":
        return "[图片]"
    if seg_type == "face":
        face_id = seg_data.get("id")
        return f"[表情:{face_id}]" if face_id is not None else "[表情]"
    if seg_type in ("forward", "forward_msg", "nodes"):
        nested = seg_data.get("content")
        if isinstance(nested, list):
            nested_text = self._extract_forward_text_from_nodes(
                nested,
                depth=depth + 1,
                max_depth=max_depth,
            )
            return nested_text or ""
        return "[转发消息]"
    return ""

这样 _extract_forward_text_from_nodes 就主要负责“编排”:

def _extract_forward_text_from_nodes(
    self,
    nodes: list[Any],
    depth: int = 0,
    max_depth: int = 5,
) -> str:
    if depth > max_depth or not isinstance(nodes, list):
        return ""

    lines: list[str] = []
    for node in nodes:
        if not isinstance(node, dict):
            continue

        sender = node.get("sender", {}) if isinstance(node.get("sender"), dict) else {}
        sender_name = (
            sender.get("nickname")
            or sender.get("card")
            or sender.get("user_id")
            or "未知用户"
        )

        content_chain = self._normalize_forward_content(node)
        text_parts = [
            self._render_forward_segment(seg, depth, max_depth)
            for seg in content_chain
            if isinstance(seg, dict)
        ]
        node_text = "".join(p for p in text_parts if p).strip()
        if node_text:
            lines.append(f"{sender_name}: {node_text}")

    return "\n".join(lines).strip()

在保持原有行为的前提下,让每一块逻辑更容易理解和测试。

2. 在 _fetch_forward_text 中抽取 helper

可以再加两个小 helper,让重试 + 归一化策略更清晰:

def _forward_param_candidates(self, forward_id: str) -> list[dict[str, Any]]:
    candidates: list[dict[str, Any]] = [{"id": forward_id}]
    if forward_id.isdigit():
        candidates.insert(0, {"id": int(forward_id)})
    candidates.extend([{"message_id": forward_id}, {"forward_id": forward_id}])
    return candidates
def _extract_forward_nodes(self, payload: dict[str, Any]) -> list[Any]:
    data = payload.get("data", payload)
    if not isinstance(data, dict):
        return []
    nodes = (
        data.get("messages")
        or data.get("message")
        or data.get("nodes")
        or data.get("nodeList")
    )
    return nodes if isinstance(nodes, list) else []

使用方式:

async def _fetch_forward_text(self, forward_id: str) -> str:
    if not forward_id:
        return ""

    payload: dict[str, Any] | None = None
    for params in self._forward_param_candidates(forward_id):
        try:
            payload = await self.bot.call_action("get_forward_msg", **params)
            if isinstance(payload, dict):
                break
        except Exception:
            continue

    if not isinstance(payload, dict):
        return ""

    nodes = self._extract_forward_nodes(payload)
    text = self._extract_forward_text_from_nodes(nodes)
    return text.strip()

3. 简化 "forward"/"forward_msg" 分支

可以把 ID 解析、占位符拼接和截断逻辑再拆成 helper,这样这个分支的意图就更清晰了:

def _resolve_forward_id(self, data: dict[str, Any]) -> str | None:
    fid = data.get("id") or data.get("message_id") or data.get("forward_id")
    return str(fid) if fid else None
def _append_forward_placeholder(self, message_str: str) -> str:
    if not message_str.strip():
        return "[转发消息]"
    return f"{message_str}\n[转发消息]"
def _clip_forward_text(self, text: str, limit: int = 4000) -> str:
    return text[:limit]

然后该分支可以写成:

elif t in ("forward", "forward_msg"):
    for m in m_group:
        data = m.get("data", {}) if isinstance(m.get("data"), dict) else {}
        if t in ComponentTypes:
            try:
                abm.message.append(ComponentTypes[t](**data))
            except Exception:
                pass

        fid = self._resolve_forward_id(data)
        if not fid:
            continue

        forward_text = await self._fetch_forward_text(fid)
        if not forward_text:
            message_str = self._append_forward_placeholder(message_str)
            continue

        if message_str.strip():
            message_str += "\n"
        clipped = self._clip_forward_text(forward_text)
        message_str += f"[转发消息]\n{clipped}"

整体功能保持不变,但核心分支大致就读作“解析 ID → 拉取 → 占位符/追加内容”,具体细节隐藏在几个聚焦度更高的 helper 中。

Original comment in English

issue (complexity): Consider extracting the normalization, segment rendering, forward fetching, and ID/placeholder/clipping logic into small helper methods so the new forward-message handling reads more declaratively.

You can keep the new functionality but reduce the cognitive load by splitting a few responsibilities into helpers.

1. Split _extract_forward_text_from_nodes

Right now it handles:

  • sender name resolution
  • raw content normalization (message / content / JSON / list / str)
  • per-segment rendering
  • recursion into nested forwards

You can peel off normalization + per-segment logic into small helpers:

def _normalize_forward_content(self, node: dict[str, Any]) -> list[Any]:
    raw_content = node.get("message") or node.get("content", [])
    if isinstance(raw_content, list):
        return raw_content
    if isinstance(raw_content, str) and raw_content.strip():
        try:
            parsed = json.loads(raw_content)
            if isinstance(parsed, list):
                return parsed
        except Exception:
            pass
        return [{"type": "text", "data": {"text": raw_content}}]
    return []
def _render_forward_segment(
    self,
    seg: dict[str, Any],
    depth: int,
    max_depth: int,
) -> str:
    seg_type = seg.get("type")
    seg_data = seg.get("data", {}) if isinstance(seg.get("data"), dict) else {}

    if seg_type in ("text", "plain"):
        text = seg_data.get("text", "")
        return text if isinstance(text, str) else ""
    if seg_type == "at":
        qq = seg_data.get("qq")
        return f"@{qq}" if qq else ""
    if seg_type == "image":
        return "[图片]"
    if seg_type == "face":
        face_id = seg_data.get("id")
        return f"[表情:{face_id}]" if face_id is not None else "[表情]"
    if seg_type in ("forward", "forward_msg", "nodes"):
        nested = seg_data.get("content")
        if isinstance(nested, list):
            nested_text = self._extract_forward_text_from_nodes(
                nested,
                depth=depth + 1,
                max_depth=max_depth,
            )
            return nested_text or ""
        return "[转发消息]"
    return ""

Then _extract_forward_text_from_nodes becomes mostly orchestration:

def _extract_forward_text_from_nodes(
    self,
    nodes: list[Any],
    depth: int = 0,
    max_depth: int = 5,
) -> str:
    if depth > max_depth or not isinstance(nodes, list):
        return ""

    lines: list[str] = []
    for node in nodes:
        if not isinstance(node, dict):
            continue

        sender = node.get("sender", {}) if isinstance(node.get("sender"), dict) else {}
        sender_name = (
            sender.get("nickname")
            or sender.get("card")
            or sender.get("user_id")
            or "未知用户"
        )

        content_chain = self._normalize_forward_content(node)
        text_parts = [
            self._render_forward_segment(seg, depth, max_depth)
            for seg in content_chain
            if isinstance(seg, dict)
        ]
        node_text = "".join(p for p in text_parts if p).strip()
        if node_text:
            lines.append(f"{sender_name}: {node_text}")

    return "\n".join(lines).strip()

This keeps all behavior but makes each piece easier to understand and test.

2. Extract helpers in _fetch_forward_text

Two small helpers make the retry + normalization strategy clearer:

def _forward_param_candidates(self, forward_id: str) -> list[dict[str, Any]]:
    candidates: list[dict[str, Any]] = [{"id": forward_id}]
    if forward_id.isdigit():
        candidates.insert(0, {"id": int(forward_id)})
    candidates.extend([{"message_id": forward_id}, {"forward_id": forward_id}])
    return candidates
def _extract_forward_nodes(self, payload: dict[str, Any]) -> list[Any]:
    data = payload.get("data", payload)
    if not isinstance(data, dict):
        return []
    nodes = (
        data.get("messages")
        or data.get("message")
        or data.get("nodes")
        or data.get("nodeList")
    )
    return nodes if isinstance(nodes, list) else []

Usage:

async def _fetch_forward_text(self, forward_id: str) -> str:
    if not forward_id:
        return ""

    payload: dict[str, Any] | None = None
    for params in self._forward_param_candidates(forward_id):
        try:
            payload = await self.bot.call_action("get_forward_msg", **params)
            if isinstance(payload, dict):
                break
        except Exception:
            continue

    if not isinstance(payload, dict):
        return ""

    nodes = self._extract_forward_nodes(payload)
    text = self._extract_forward_text_from_nodes(nodes)
    return text.strip()

3. Simplify the "forward"/"forward_msg" branch

You can pull out ID resolution, placeholder formatting, and clipping to helpers so the branch reads more declaratively.

def _resolve_forward_id(self, data: dict[str, Any]) -> str | None:
    fid = data.get("id") or data.get("message_id") or data.get("forward_id")
    return str(fid) if fid else None
def _append_forward_placeholder(self, message_str: str) -> str:
    if not message_str.strip():
        return "[转发消息]"
    return f"{message_str}\n[转发消息]"
def _clip_forward_text(self, text: str, limit: int = 4000) -> str:
    return text[:limit]

Then the branch:

elif t in ("forward", "forward_msg"):
    for m in m_group:
        data = m.get("data", {}) if isinstance(m.get("data"), dict) else {}
        if t in ComponentTypes:
            try:
                abm.message.append(ComponentTypes[t](**data))
            except Exception:
                pass

        fid = self._resolve_forward_id(data)
        if not fid:
            continue

        forward_text = await self._fetch_forward_text(fid)
        if not forward_text:
            message_str = self._append_forward_placeholder(message_str)
            continue

        if message_str.strip():
            message_str += "\n"
        clipped = self._clip_forward_text(forward_text)
        message_str += f"[转发消息]\n{clipped}"

Functionality remains the same, but the core branch now mostly reads as “resolve id → fetch → placeholder/append”, with the detailed logic hidden in small, focused helpers.

self,
nodes: list[Any],
depth: int = 0,
max_depth: int = 5,
) -> str:
if depth > max_depth or not isinstance(nodes, list):
return ""

lines: list[str] = []
for node in nodes:
if not isinstance(node, dict):
continue

sender = node.get("sender", {}) if isinstance(node.get("sender"), dict) else {}
sender_name = (
sender.get("nickname")
or sender.get("card")
or sender.get("user_id")
or "未知用户"
)

raw_content = node.get("message")
if raw_content is None:
raw_content = node.get("content", [])

content_chain: list[Any] = []
if isinstance(raw_content, list):
content_chain = raw_content
elif isinstance(raw_content, str) and raw_content.strip():
try:
parsed = json.loads(raw_content)
if isinstance(parsed, list):
content_chain = parsed
else:
content_chain = [{"type": "text", "data": {"text": raw_content}}]
except Exception:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): 吞掉来自 bot.call_action 的所有异常可能会掩盖真实的集成问题。

直接捕获裸露的 Exception 然后继续循环,意味着严重问题(鉴权失败、限流、接口模式/数据结构变更等)都会被隐藏,调用方最终只会拿到一个空结果。请将捕获范围收窄到预期的、特定的错误类型(例如表示 ID 不存在的 API 异常);或者至少对非预期异常进行日志记录,以便能够诊断转发消息拉取失败的问题。

建议实现如下:

        candidates.extend([{"message_id": forward_id}, {"forward_id": forward_id}])

        payload: dict[str, Any] | None = None
        for params in candidates:
            try:
                result = await self.bot.call_action("get_forward_msg", **params)
            except Exception:
                logger.exception(
                    "Unexpected error while calling get_forward_msg with params=%s",
                    params,
                )
                continue

            if isinstance(result, dict):
                payload = result
                break

        if not payload:
            return ""

        nodes = payload.get("messages") or payload.get("nodes") or []
        return self._extract_forward_text_from_nodes(nodes)
  1. 确保在该文件中定义了模块级 logger,例如:
    • 在顶部导入:import logging
    • 在模块作用域添加:logger = logging.getLogger(__name__)
  2. 在你确认 self.bot.call_action("get_forward_msg", ...) 在预期的、非致命失败(例如“消息不存在”)情况下会抛出的具体异常类型后,将 except Exception 收窄为这些特定异常类;并且可以选择性地再添加一个单独的 except Exception 分支,用于记录日志并重新抛出真正非预期的错误。
Original comment in English

suggestion (bug_risk): Swallowing all exceptions from bot.call_action may hide real integration issues.

Catching bare Exception and continuing means serious issues (auth failures, rate limits, schema/schema changes) will be hidden and the caller just gets an empty result. Please restrict this to the specific, expected error types (e.g., the API error for a missing ID), or at least log unexpected exceptions so forward-message retrieval problems are diagnosable.

Suggested implementation:

        candidates.extend([{"message_id": forward_id}, {"forward_id": forward_id}])

        payload: dict[str, Any] | None = None
        for params in candidates:
            try:
                result = await self.bot.call_action("get_forward_msg", **params)
            except Exception:
                logger.exception(
                    "Unexpected error while calling get_forward_msg with params=%s",
                    params,
                )
                continue

            if isinstance(result, dict):
                payload = result
                break

        if not payload:
            return ""

        nodes = payload.get("messages") or payload.get("nodes") or []
        return self._extract_forward_text_from_nodes(nodes)
  1. Ensure there is a module-level logger defined in this file, e.g.:
    • import logging near the top-level imports.
    • logger = logging.getLogger(__name__) at module scope.
  2. Once you confirm the concrete exception type(s) raised by self.bot.call_action("get_forward_msg", ...) for expected, non-fatal failures (e.g. "message not found"), narrow the except Exception to those specific exception classes, and optionally add a separate except Exception block that logs and re-raises truly unexpected errors.

content_chain = [{"type": "text", "data": {"text": raw_content}}]

text_parts: list[str] = []
for seg in content_chain:
if not isinstance(seg, dict):
continue
seg_type = seg.get("type")
seg_data = seg.get("data", {}) if isinstance(seg.get("data"), dict) else {}

if seg_type in ("text", "plain"):
text = seg_data.get("text", "")
if isinstance(text, str) and text:
text_parts.append(text)
elif seg_type == "at":
qq = seg_data.get("qq")
if qq:
text_parts.append(f"@{qq}")
elif seg_type == "image":
text_parts.append("[图片]")
elif seg_type == "face":
face_id = seg_data.get("id")
text_parts.append(f"[表情:{face_id}]" if face_id is not None else "[表情]")
elif seg_type in ("forward", "forward_msg", "nodes"):
nested = seg_data.get("content")
if isinstance(nested, list):
nested_text = self._extract_forward_text_from_nodes(
nested,
depth=depth + 1,
max_depth=max_depth,
)
if nested_text:
text_parts.append(nested_text)
else:
text_parts.append("[转发消息]")

node_text = "".join(text_parts).strip()
if node_text:
lines.append(f"{sender_name}: {node_text}")

return "\n".join(lines).strip()

async def _fetch_forward_text(self, forward_id: str) -> str:
if not forward_id:
return ""

candidates: list[dict[str, Any]] = [{"id": forward_id}]
if str(forward_id).isdigit():
candidates.insert(0, {"id": int(forward_id)})
candidates.extend([{"message_id": forward_id}, {"forward_id": forward_id}])

payload: dict[str, Any] | None = None
for params in candidates:
try:
payload = await self.bot.call_action("get_forward_msg", **params)
if isinstance(payload, dict):
break
except Exception:
continue

if not isinstance(payload, dict):
return ""

data = payload.get("data", payload)
if not isinstance(data, dict):
return ""

nodes = (
data.get("messages")
or data.get("message")
or data.get("nodes")
or data.get("nodeList")
)
text = self._extract_forward_text_from_nodes(nodes if isinstance(nodes, list) else [])
return text.strip()

async def _convert_handle_request_event(self, event: Event) -> AstrBotMessage:
"""OneBot V11 请求类事件"""
abm = AstrBotMessage()
Expand Down Expand Up @@ -393,6 +506,33 @@ async def _convert_handle_message_event(
text = m["data"].get("markdown") or m["data"].get("content", "")
abm.message.append(Plain(text=text))
message_str += text
elif t in ("forward", "forward_msg"):
for m in m_group:
data = m.get("data", {}) if isinstance(m.get("data"), dict) else {}
if t in ComponentTypes:
try:
abm.message.append(ComponentTypes[t](**data))
except Exception:
pass

fid = data.get("id") or data.get("message_id") or data.get("forward_id")
if not fid:
continue

forward_text = await self._fetch_forward_text(str(fid))
if not forward_text:
# 至少保留占位,避免纯转发被识别为空输入
if not message_str.strip():
message_str = "[转发消息]"
else:
message_str += "\n[转发消息]"
continue

if message_str.strip():
message_str += "\n"
# 限制长度,避免超长转发导致上下文爆炸
clipped = forward_text[:4000]
message_str += f"[转发消息]\n{clipped}"
Comment on lines +534 to +535
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: 直接做硬性长度截断而不给出任何提示,可能会让下游行为变得难以理解。

4000 字符限制可以保护上下文大小,但下游使用方会看到一个看上去“完整”的 [转发消息] 区块,却无法知道内容其实被截断了。当 len(forward_text) > 4000 时,建议追加一个明确的截断标记(例如 ... [内容已截断]),这样用户以及后续使用 message_str 的逻辑都能识别这是部分内容。

建议实现如下:

                    if message_str.strip():
                        message_str += "\n"
                    # 限制长度,避免超长转发导致上下文爆炸,并在被截断时追加标记
                    if isinstance(forward_text, str) and forward_text:
                        if len(forward_text) > 4000:
                            clipped = forward_text[:4000] + "... [内容已截断]"
                        else:
                            clipped = forward_text
                        message_str += f"[转发消息]\n{clipped}"

                    data = m.get("data", {}) if isinstance(m.get("data"), dict) else {}

这里假设 forward_text 已经在同一作用域的前面构造为一个 str
如果 forward_text 可能为非字符串(例如 None),那么 isinstance(forward_text, str) 这一层保护会直接跳过该代码块;如果你需要更严格的行为,请确保在此之前始终把 forward_text 处理为字符串,或者根据需要调整条件。

Original comment in English

suggestion: Clipping forwarded text at a hard limit without indication may confuse downstream behavior.

The 4000-character limit protects context size, but downstream consumers see a full-looking [转发消息] block with no indication it’s incomplete. When len(forward_text) > 4000, consider appending an explicit truncation marker (e.g., ... [内容已截断]) so both users and any logic using message_str can detect partial content.

Suggested implementation:

                    if message_str.strip():
                        message_str += "\n"
                    # 限制长度,避免超长转发导致上下文爆炸,并在被截断时追加标记
                    if isinstance(forward_text, str) and forward_text:
                        if len(forward_text) > 4000:
                            clipped = forward_text[:4000] + "... [内容已截断]"
                        else:
                            clipped = forward_text
                        message_str += f"[转发消息]\n{clipped}"

                    data = m.get("data", {}) if isinstance(m.get("data"), dict) else {}

This edit assumes forward_text is already constructed as a str earlier in the same scope.
If forward_text can be non-string (e.g. None), the isinstance(forward_text, str) guard will skip the block; if you need stricter behavior, ensure forward_text is always a string before this point or adapt the condition accordingly.

else:
for m in m_group:
try:
Expand Down