Skip to content

feat: supports 小黑盒语音机器人#5093

Open
Soulter wants to merge 1 commit intomasterfrom
feat/heibox
Open

feat: supports 小黑盒语音机器人#5093
Soulter wants to merge 1 commit intomasterfrom
feat/heibox

Conversation

@Soulter
Copy link
Member

@Soulter Soulter commented Feb 13, 2026

#4728

Modifications / 改动点

  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果


Checklist / 检查清单

  • 😊 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。/ If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
  • 👀 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。/ My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
  • 🤓 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到了 requirements.txtpyproject.toml 文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
  • 😮 我的更改没有引入恶意代码。/ My changes do not introduce malicious code.

Summary by Sourcery

为小黑盒(Xiaoheihe)机器人平台新增基于 WebSocket 的平台适配器,并将其集成到平台管理器中。

新功能:

  • 引入 HeihePlatformAdapter,包含用于通过 WebSocket 连接小黑盒的配置元数据和 i18n 资源。
  • 支持接收并转换小黑盒消息事件为 AstrBot 消息,包括文本、提及和附件。
  • 支持通过专用的 HeiheMessageEvent 桥接器向小黑盒频道和会话发送消息。
Original summary in English

Summary by Sourcery

Add a new WebSocket-based platform adapter for the Xiaoheihe (小黑盒) bot platform and integrate it into the platform manager.

New Features:

  • Introduce the HeihePlatformAdapter with configuration metadata and i18n resources for connecting to Xiaoheihe via WebSocket.
  • Support receiving and converting Xiaoheihe message events into AstrBot messages, including text, mentions, and attachments.
  • Support sending messages to Xiaoheihe channels and sessions through a dedicated HeiheMessageEvent bridge.

@dosubot dosubot bot added the size:XL This PR changes 500-999 lines, ignoring generated files. label Feb 13, 2026
@dosubot
Copy link

dosubot bot commented Feb 13, 2026

Related Documentation

Checked 1 published document(s) in 1 knowledge base(s). No updates required.

How did I do? Any feedback?  Join Discord

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了两个问题,并给出了一些总体反馈:

  • HeiheMessageEvent.send_streaming 中,你先把 generator 完全消费到一个缓冲区里,然后仍然调用 super().send_streaming(generator, use_fallback)。但这时传给父类的已经是一个被耗尽的 generator。建议要么这里不要再调用 super().send_streaming,要么重构逻辑,让基类实现负责流式发送,以避免重复处理和语义上的困惑。
给 AI Agent 的提示
Please address the comments from this code review:

## Overall Comments
- In `HeiheMessageEvent.send_streaming`, you fully consume the `generator` into a buffer and then still call `super().send_streaming(generator, use_fallback)`, which will receive an already-exhausted generator; consider either not calling `super().send_streaming` here or restructuring to let the base implementation handle streaming to avoid double handling and confusion.

## Individual Comments

### Comment 1
<location> `astrbot/core/platform/sources/heihe/heihe_adapter.py:273-277` </location>
<code_context>
+            logger.debug("[heihe] skip non-json frame: %s", raw[:200])
+            return
+
+        if isinstance(data, list):
+            for item in data:
+                if isinstance(item, dict):
+                    await self._handle_packet(item)
+            return
+        if isinstance(data, dict):
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Batch packet handling does not isolate errors per item, so one bad element can drop the rest of the batch.

Because each list element is awaited directly via `_handle_packet` without its own error handling, any unexpected exception will abort processing of the remaining items. If the server can send multiple events in one frame, consider wrapping each call in `try/except` so you can log the failure and continue with later items.

```suggestion
        if isinstance(data, list):
            for item in data:
                if isinstance(item, dict):
                    try:
                        await self._handle_packet(item)
                    except Exception:
                        logger.exception("[heihe] failed to handle packet in batch; item=%r", item)
            return
```
</issue_to_address>

### Comment 2
<location> `astrbot/core/platform/sources/heihe/heihe_adapter.py:314` </location>
<code_context>
+        keys = payload.keys()
+        return "content" in keys or "text" in keys or "message" in keys
+
+    def _convert_message(
+        self,
+        payload: Mapping[str, Any],
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting small helpers for repeated field extraction and connection cleanup to make the adapter logic more readable and maintainable without changing behavior.

You can reduce complexity without changing behavior by:

### 1. Deduplicating the repeated “first non-empty field” logic

Patterns like these appear many times:

```python
sender_id = str(
    sender_data.get("id")
    or sender_data.get("user_id")
    or payload.get("sender_id")
    or payload.get("user_id")
    or "",
).strip()
```

A small helper keeps the intent in one place and makes call sites shorter and less error-prone:

```python
def _first(
    self,
    *mappings: Mapping[str, Any],
    keys: tuple[str, ...],
    default: str = "",
) -> str:
    for mapping in mappings:
        for k in keys:
            if not mapping:
                continue
            v = mapping.get(k)
            if v is not None and v != "":
                return str(v).strip()
    return default
```

Then you can simplify the field extraction:

```python
sender_id = self._first(
    sender_data,
    payload,
    keys=("id", "user_id", "sender_id"),
)

sender_name = self._first(
    sender_data,
    keys=("nickname", "name", "username"),
    default=sender_id or "unknown",
)

self_id = self._first(
    payload,
    keys=("self_id", "bot_id"),
    default=self.bot_id or self.meta().id,
)

channel_id = self._first(
    payload,
    keys=("channel_id", "room_id", "chat_id", "session_id"),
)

guild_id = self._first(
    payload,
    keys=("guild_id", "server_id", "group_id"),
)
```

You can keep behavior identical while making `_convert_message` much easier to scan and maintain.

Similarly for timestamp and message ID you can add small helpers if desired:

```python
def _first_int(self, *values: Any, default: int | None = None) -> int | None:
    for v in values:
        if isinstance(v, int):
            return v
    return default
```

### 2. Deduplicating connection cleanup

The teardown logic is currently duplicated in `terminate` and `_connect_and_loop`’s `finally`:

```python
if self.heartbeat_task:
    self.heartbeat_task.cancel()
    try:
        await self.heartbeat_task
    except asyncio.CancelledError:
        pass
    self.heartbeat_task = None
if self.ws:
    try:
        await self.ws.close()
    except Exception:
        pass
    self.ws = None
```

Extracting a small helper keeps the shutdown path consistent:

```python
async def _cleanup_connection(self) -> None:
    if self.heartbeat_task:
        self.heartbeat_task.cancel()
        try:
            await self.heartbeat_task
        except asyncio.CancelledError:
            pass
        self.heartbeat_task = None
    if self.ws:
        try:
            await self.ws.close()
        except Exception:
            pass
        self.ws = None
```

Then use it in both places:

```python
async def terminate(self) -> None:
    self.running = False
    await self._cleanup_connection()
```

```python
async def _connect_and_loop(self) -> None:
    ...
    try:
        async for raw in websocket:
            await self._handle_incoming(raw)
    finally:
        await self._cleanup_connection()
```

### 3. Slightly structuring `_convert_message` into focused helpers

Without changing behavior, you can pull out a couple of small “read-only” helpers to reduce nesting and make intent clearer:

```python
def _extract_sender(
    self,
    payload: Mapping[str, Any],
) -> tuple[str, str]:
    sender_data_obj = (
        payload.get("sender")
        or payload.get("author")
        or payload.get("user")
        or {}
    )
    sender_data = sender_data_obj if isinstance(sender_data_obj, Mapping) else {}

    sender_id = self._first(
        sender_data,
        payload,
        keys=("id", "user_id", "sender_id"),
    )
    sender_name = self._first(
        sender_data,
        keys=("nickname", "name", "username"),
        default=sender_id or "unknown",
    )
    return sender_id, sender_name
```

And use it in `_convert_message`:

```python
def _convert_message(...):
    ...
    sender_id, sender_name = self._extract_sender(payload)
    self_id = self._first(
        payload,
        keys=("self_id", "bot_id"),
        default=self.bot_id or self.meta().id,
    )
    if self.ignore_self_message and sender_id and self_id and sender_id == self_id:
        return None

    channel_id = self._first(payload, keys=("channel_id", "room_id", "chat_id", "session_id"))
    guild_id = self._first(payload, keys=("guild_id", "server_id", "group_id"))
    ...
```

You can later follow the same pattern for timestamps or context (group vs private) if you want more separation, while keeping protocol flexibility as-is.
</issue_to_address>

Sourcery 对开源项目是免费的——如果你觉得我们的代码审查有帮助,欢迎分享 ✨
帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续的代码审查。
Original comment in English

Hey - I've found 2 issues, and left some high level feedback:

  • In HeiheMessageEvent.send_streaming, you fully consume the generator into a buffer and then still call super().send_streaming(generator, use_fallback), which will receive an already-exhausted generator; consider either not calling super().send_streaming here or restructuring to let the base implementation handle streaming to avoid double handling and confusion.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `HeiheMessageEvent.send_streaming`, you fully consume the `generator` into a buffer and then still call `super().send_streaming(generator, use_fallback)`, which will receive an already-exhausted generator; consider either not calling `super().send_streaming` here or restructuring to let the base implementation handle streaming to avoid double handling and confusion.

## Individual Comments

### Comment 1
<location> `astrbot/core/platform/sources/heihe/heihe_adapter.py:273-277` </location>
<code_context>
+            logger.debug("[heihe] skip non-json frame: %s", raw[:200])
+            return
+
+        if isinstance(data, list):
+            for item in data:
+                if isinstance(item, dict):
+                    await self._handle_packet(item)
+            return
+        if isinstance(data, dict):
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Batch packet handling does not isolate errors per item, so one bad element can drop the rest of the batch.

Because each list element is awaited directly via `_handle_packet` without its own error handling, any unexpected exception will abort processing of the remaining items. If the server can send multiple events in one frame, consider wrapping each call in `try/except` so you can log the failure and continue with later items.

```suggestion
        if isinstance(data, list):
            for item in data:
                if isinstance(item, dict):
                    try:
                        await self._handle_packet(item)
                    except Exception:
                        logger.exception("[heihe] failed to handle packet in batch; item=%r", item)
            return
```
</issue_to_address>

### Comment 2
<location> `astrbot/core/platform/sources/heihe/heihe_adapter.py:314` </location>
<code_context>
+        keys = payload.keys()
+        return "content" in keys or "text" in keys or "message" in keys
+
+    def _convert_message(
+        self,
+        payload: Mapping[str, Any],
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting small helpers for repeated field extraction and connection cleanup to make the adapter logic more readable and maintainable without changing behavior.

You can reduce complexity without changing behavior by:

### 1. Deduplicating the repeated “first non-empty field” logic

Patterns like these appear many times:

```python
sender_id = str(
    sender_data.get("id")
    or sender_data.get("user_id")
    or payload.get("sender_id")
    or payload.get("user_id")
    or "",
).strip()
```

A small helper keeps the intent in one place and makes call sites shorter and less error-prone:

```python
def _first(
    self,
    *mappings: Mapping[str, Any],
    keys: tuple[str, ...],
    default: str = "",
) -> str:
    for mapping in mappings:
        for k in keys:
            if not mapping:
                continue
            v = mapping.get(k)
            if v is not None and v != "":
                return str(v).strip()
    return default
```

Then you can simplify the field extraction:

```python
sender_id = self._first(
    sender_data,
    payload,
    keys=("id", "user_id", "sender_id"),
)

sender_name = self._first(
    sender_data,
    keys=("nickname", "name", "username"),
    default=sender_id or "unknown",
)

self_id = self._first(
    payload,
    keys=("self_id", "bot_id"),
    default=self.bot_id or self.meta().id,
)

channel_id = self._first(
    payload,
    keys=("channel_id", "room_id", "chat_id", "session_id"),
)

guild_id = self._first(
    payload,
    keys=("guild_id", "server_id", "group_id"),
)
```

You can keep behavior identical while making `_convert_message` much easier to scan and maintain.

Similarly for timestamp and message ID you can add small helpers if desired:

```python
def _first_int(self, *values: Any, default: int | None = None) -> int | None:
    for v in values:
        if isinstance(v, int):
            return v
    return default
```

### 2. Deduplicating connection cleanup

The teardown logic is currently duplicated in `terminate` and `_connect_and_loop`’s `finally`:

```python
if self.heartbeat_task:
    self.heartbeat_task.cancel()
    try:
        await self.heartbeat_task
    except asyncio.CancelledError:
        pass
    self.heartbeat_task = None
if self.ws:
    try:
        await self.ws.close()
    except Exception:
        pass
    self.ws = None
```

Extracting a small helper keeps the shutdown path consistent:

```python
async def _cleanup_connection(self) -> None:
    if self.heartbeat_task:
        self.heartbeat_task.cancel()
        try:
            await self.heartbeat_task
        except asyncio.CancelledError:
            pass
        self.heartbeat_task = None
    if self.ws:
        try:
            await self.ws.close()
        except Exception:
            pass
        self.ws = None
```

Then use it in both places:

```python
async def terminate(self) -> None:
    self.running = False
    await self._cleanup_connection()
```

```python
async def _connect_and_loop(self) -> None:
    ...
    try:
        async for raw in websocket:
            await self._handle_incoming(raw)
    finally:
        await self._cleanup_connection()
```

### 3. Slightly structuring `_convert_message` into focused helpers

Without changing behavior, you can pull out a couple of small “read-only” helpers to reduce nesting and make intent clearer:

```python
def _extract_sender(
    self,
    payload: Mapping[str, Any],
) -> tuple[str, str]:
    sender_data_obj = (
        payload.get("sender")
        or payload.get("author")
        or payload.get("user")
        or {}
    )
    sender_data = sender_data_obj if isinstance(sender_data_obj, Mapping) else {}

    sender_id = self._first(
        sender_data,
        payload,
        keys=("id", "user_id", "sender_id"),
    )
    sender_name = self._first(
        sender_data,
        keys=("nickname", "name", "username"),
        default=sender_id or "unknown",
    )
    return sender_id, sender_name
```

And use it in `_convert_message`:

```python
def _convert_message(...):
    ...
    sender_id, sender_name = self._extract_sender(payload)
    self_id = self._first(
        payload,
        keys=("self_id", "bot_id"),
        default=self.bot_id or self.meta().id,
    )
    if self.ignore_self_message and sender_id and self_id and sender_id == self_id:
        return None

    channel_id = self._first(payload, keys=("channel_id", "room_id", "chat_id", "session_id"))
    guild_id = self._first(payload, keys=("guild_id", "server_id", "group_id"))
    ...
```

You can later follow the same pattern for timestamps or context (group vs private) if you want more separation, while keeping protocol flexibility as-is.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +273 to +277
if isinstance(data, list):
for item in data:
if isinstance(item, dict):
await self._handle_packet(item)
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): 批量处理数据包时没有针对每个元素进行错误隔离,因此某一个元素出错会导致整批后续元素都被丢弃。

目前每个列表元素都直接通过 _handle_packetawait,但没有单独的错误处理。一旦发生未预期的异常,就会中断剩余元素的处理。如果服务器可能在一个帧中发送多个事件,建议为每次调用包一层 try/except,这样既能记录失败日志,又能继续处理后面的元素。

Suggested change
if isinstance(data, list):
for item in data:
if isinstance(item, dict):
await self._handle_packet(item)
return
if isinstance(data, list):
for item in data:
if isinstance(item, dict):
try:
await self._handle_packet(item)
except Exception:
logger.exception("[heihe] failed to handle packet in batch; item=%r", item)
return
Original comment in English

suggestion (bug_risk): Batch packet handling does not isolate errors per item, so one bad element can drop the rest of the batch.

Because each list element is awaited directly via _handle_packet without its own error handling, any unexpected exception will abort processing of the remaining items. If the server can send multiple events in one frame, consider wrapping each call in try/except so you can log the failure and continue with later items.

Suggested change
if isinstance(data, list):
for item in data:
if isinstance(item, dict):
await self._handle_packet(item)
return
if isinstance(data, list):
for item in data:
if isinstance(item, dict):
try:
await self._handle_packet(item)
except Exception:
logger.exception("[heihe] failed to handle packet in batch; item=%r", item)
return

keys = payload.keys()
return "content" in keys or "text" in keys or "message" in keys

def _convert_message(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 建议为重复的字段提取逻辑和连接清理逻辑抽取一些小的辅助函数,在不改变行为的前提下,让整个适配器逻辑更易读、更易维护。

你可以通过以下方式在不改变行为的前提下降低复杂度:

1. 去重“第一个非空字段”的重复逻辑

类似下面的模式出现了很多次:

sender_id = str(
    sender_data.get("id")
    or sender_data.get("user_id")
    or payload.get("sender_id")
    or payload.get("user_id")
    or "",
).strip()

抽一个小 helper,可以把意图集中在一个地方,也能让调用点更短、更不容易出错:

def _first(
    self,
    *mappings: Mapping[str, Any],
    keys: tuple[str, ...],
    default: str = "",
) -> str:
    for mapping in mappings:
        for k in keys:
            if not mapping:
                continue
            v = mapping.get(k)
            if v is not None and v != "":
                return str(v).strip()
    return default

然后可以简化字段提取逻辑:

sender_id = self._first(
    sender_data,
    payload,
    keys=("id", "user_id", "sender_id"),
)

sender_name = self._first(
    sender_data,
    keys=("nickname", "name", "username"),
    default=sender_id or "unknown",
)

self_id = self._first(
    payload,
    keys=("self_id", "bot_id"),
    default=self.bot_id or self.meta().id,
)

channel_id = self._first(
    payload,
    keys=("channel_id", "room_id", "chat_id", "session_id"),
)

guild_id = self._first(
    payload,
    keys=("guild_id", "server_id", "group_id"),
)

这样可以在保持行为一致的情况下,让 _convert_message 更易于阅读和维护。

时间戳和消息 ID 也可以按需增加类似的小 helper:

def _first_int(self, *values: Any, default: int | None = None) -> int | None:
    for v in values:
        if isinstance(v, int):
            return v
    return default

2. 去重连接清理逻辑

目前在 terminate_connect_and_loopfinally 中有重复的资源清理逻辑:

if self.heartbeat_task:
    self.heartbeat_task.cancel()
    try:
        await self.heartbeat_task
    except asyncio.CancelledError:
        pass
    self.heartbeat_task = None
if self.ws:
    try:
        await self.ws.close()
    except Exception:
        pass
    self.ws = None

抽取一个小 helper 可以让关闭路径保持一致:

async def _cleanup_connection(self) -> None:
    if self.heartbeat_task:
        self.heartbeat_task.cancel()
        try:
            await self.heartbeat_task
        except asyncio.CancelledError:
            pass
        self.heartbeat_task = None
    if self.ws:
        try:
            await self.ws.close()
        except Exception:
            pass
        self.ws = None

然后在两个地方复用它:

async def terminate(self) -> None:
    self.running = False
    await self._cleanup_connection()
async def _connect_and_loop(self) -> None:
    ...
    try:
        async for raw in websocket:
            await self._handle_incoming(raw)
    finally:
        await self._cleanup_connection()

3. 轻度拆分 _convert_message 为更聚焦的 helper

在不改变行为的前提下,可以抽出几个“只读”的小 helper,减少嵌套、让意图更清晰:

def _extract_sender(
    self,
    payload: Mapping[str, Any],
) -> tuple[str, str]:
    sender_data_obj = (
        payload.get("sender")
        or payload.get("author")
        or payload.get("user")
        or {}
    )
    sender_data = sender_data_obj if isinstance(sender_data_obj, Mapping) else {}

    sender_id = self._first(
        sender_data,
        payload,
        keys=("id", "user_id", "sender_id"),
    )
    sender_name = self._first(
        sender_data,
        keys=("nickname", "name", "username"),
        default=sender_id or "unknown",
    )
    return sender_id, sender_name

并在 _convert_message 中使用它:

def _convert_message(...):
    ...
    sender_id, sender_name = self._extract_sender(payload)
    self_id = self._first(
        payload,
        keys=("self_id", "bot_id"),
        default=self.bot_id or self.meta().id,
    )
    if self.ignore_self_message and sender_id and self_id and sender_id == self_id:
        return None

    channel_id = self._first(payload, keys=("channel_id", "room_id", "chat_id", "session_id"))
    guild_id = self._first(payload, keys=("guild_id", "server_id", "group_id"))
    ...

后续如果需要对时间戳或上下文(群聊 vs 私聊)做进一步拆分,也可以沿用同样的模式,同时保持当前协议兼容性不变。

Original comment in English

issue (complexity): Consider extracting small helpers for repeated field extraction and connection cleanup to make the adapter logic more readable and maintainable without changing behavior.

You can reduce complexity without changing behavior by:

1. Deduplicating the repeated “first non-empty field” logic

Patterns like these appear many times:

sender_id = str(
    sender_data.get("id")
    or sender_data.get("user_id")
    or payload.get("sender_id")
    or payload.get("user_id")
    or "",
).strip()

A small helper keeps the intent in one place and makes call sites shorter and less error-prone:

def _first(
    self,
    *mappings: Mapping[str, Any],
    keys: tuple[str, ...],
    default: str = "",
) -> str:
    for mapping in mappings:
        for k in keys:
            if not mapping:
                continue
            v = mapping.get(k)
            if v is not None and v != "":
                return str(v).strip()
    return default

Then you can simplify the field extraction:

sender_id = self._first(
    sender_data,
    payload,
    keys=("id", "user_id", "sender_id"),
)

sender_name = self._first(
    sender_data,
    keys=("nickname", "name", "username"),
    default=sender_id or "unknown",
)

self_id = self._first(
    payload,
    keys=("self_id", "bot_id"),
    default=self.bot_id or self.meta().id,
)

channel_id = self._first(
    payload,
    keys=("channel_id", "room_id", "chat_id", "session_id"),
)

guild_id = self._first(
    payload,
    keys=("guild_id", "server_id", "group_id"),
)

You can keep behavior identical while making _convert_message much easier to scan and maintain.

Similarly for timestamp and message ID you can add small helpers if desired:

def _first_int(self, *values: Any, default: int | None = None) -> int | None:
    for v in values:
        if isinstance(v, int):
            return v
    return default

2. Deduplicating connection cleanup

The teardown logic is currently duplicated in terminate and _connect_and_loop’s finally:

if self.heartbeat_task:
    self.heartbeat_task.cancel()
    try:
        await self.heartbeat_task
    except asyncio.CancelledError:
        pass
    self.heartbeat_task = None
if self.ws:
    try:
        await self.ws.close()
    except Exception:
        pass
    self.ws = None

Extracting a small helper keeps the shutdown path consistent:

async def _cleanup_connection(self) -> None:
    if self.heartbeat_task:
        self.heartbeat_task.cancel()
        try:
            await self.heartbeat_task
        except asyncio.CancelledError:
            pass
        self.heartbeat_task = None
    if self.ws:
        try:
            await self.ws.close()
        except Exception:
            pass
        self.ws = None

Then use it in both places:

async def terminate(self) -> None:
    self.running = False
    await self._cleanup_connection()
async def _connect_and_loop(self) -> None:
    ...
    try:
        async for raw in websocket:
            await self._handle_incoming(raw)
    finally:
        await self._cleanup_connection()

3. Slightly structuring _convert_message into focused helpers

Without changing behavior, you can pull out a couple of small “read-only” helpers to reduce nesting and make intent clearer:

def _extract_sender(
    self,
    payload: Mapping[str, Any],
) -> tuple[str, str]:
    sender_data_obj = (
        payload.get("sender")
        or payload.get("author")
        or payload.get("user")
        or {}
    )
    sender_data = sender_data_obj if isinstance(sender_data_obj, Mapping) else {}

    sender_id = self._first(
        sender_data,
        payload,
        keys=("id", "user_id", "sender_id"),
    )
    sender_name = self._first(
        sender_data,
        keys=("nickname", "name", "username"),
        default=sender_id or "unknown",
    )
    return sender_id, sender_name

And use it in _convert_message:

def _convert_message(...):
    ...
    sender_id, sender_name = self._extract_sender(payload)
    self_id = self._first(
        payload,
        keys=("self_id", "bot_id"),
        default=self.bot_id or self.meta().id,
    )
    if self.ignore_self_message and sender_id and self_id and sender_id == self_id:
        return None

    channel_id = self._first(payload, keys=("channel_id", "room_id", "chat_id", "session_id"))
    guild_id = self._first(payload, keys=("guild_id", "server_id", "group_id"))
    ...

You can later follow the same pattern for timestamps or context (group vs private) if you want more separation, while keeping protocol flexibility as-is.

@dosubot dosubot bot added the area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. label Feb 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. size:XL This PR changes 500-999 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant