feat(platform): add CLI Tester for plugin testing and debugging#4787
feat(platform): add CLI Tester for plugin testing and debugging#4787YukiRa1n wants to merge 37 commits intoAstrBotDevs:masterfrom
Conversation
There was a problem hiding this comment.
Hey - 我发现了两个问题,并留下了一些高层面的反馈:
- 当前 CLI 适配器将诸如
/AstrBot/data/{config_file}和/tmp/astrbot.sock这样的路径写死;建议从现有的配置/根路径工具中派生这些路径,或者允许通过配置覆盖,这样在非 Docker 或非默认部署环境中该特性也能正常工作。 - 白名单检查对
cli平台无条件绕过校验;如果你预期有些环境会以更受控的方式使用 CLI,那么将该行为放在一个配置开关后面,可能会比硬编码豁免更安全。 - 对于 Unix 套接字服务器,你可能需要显式控制文件权限/所有权(例如通过
os.chmod或 umask),并在 JSON 载荷上增加最小的分帧/大小检查,以避免在多个客户端连接时因部分读取或超大读取导致的问题。
供 AI Agents 使用的提示词
请根据这次代码审查中的评论进行修改:
## 整体评论
- 当前 CLI 适配器将诸如 `/AstrBot/data/{config_file}` 和 `/tmp/astrbot.sock` 这样的路径写死;建议从现有的配置/根路径工具中派生这些路径,或者允许通过配置覆盖,这样在非 Docker 或非默认部署环境中该特性也能正常工作。
- 白名单检查对 `cli` 平台无条件绕过校验;如果你预期有些环境会以更受控的方式使用 CLI,那么将该行为放在一个配置开关后面,可能会比硬编码豁免更安全。
- 对于 Unix 套接字服务器,你可能需要显式控制文件权限/所有权(例如通过 `os.chmod` 或 umask),并在 JSON 载荷上增加最小的分帧/大小检查,以避免在多个客户端连接时因部分读取或超大读取导致的问题。
## 单独评论
### 评论 1
<location> `astrbot/core/platform/sources/cli/cli_adapter.py:302` </location>
<code_context>
+ while self._running:
+ try:
+ # 接受连接(非阻塞)
+ loop = asyncio.get_event_loop()
+ client_socket, _ = await loop.sock_accept(server_socket)
+
</code_context>
<issue_to_address>
**suggestion (bug_risk):** 在异步代码中使用 asyncio.get_event_loop() 并不推荐;asyncio.get_running_loop() 更安全且更具前向兼容性。
在 Python 3.10+ 中,`asyncio.get_event_loop()` 在异步代码中已被弃用,并且在某些策略下可能返回错误的事件循环。在 `_run_socket_mode` 中(以及类似的 `_handle_socket_client` / `_read_input` 中),请使用 `asyncio.get_running_loop()`,以确保你获取的是当前任务所在的事件循环。
建议实现:
```python
# 接受连接(非阻塞)
loop = asyncio.get_running_loop()
client_socket, _ = await loop.sock_accept(server_socket)
```
在 `astrbot/core/platform/sources/cli/cli_adapter.py` 中搜索其他在 `async def` 函数内部使用 `asyncio.get_event_loop()` 的地方,尤其是 `_handle_socket_client` 和 `_read_input`,并以类似方式替换:
- 当上下文为运行在事件循环上的异步代码时,将 `loop = asyncio.get_event_loop()` 替换为 `loop = asyncio.get_running_loop()`。
如果在纯同步初始化代码中(不在活动事件循环内)有 `asyncio.get_event_loop()` 的使用,应单独进行审查,因为它们可能需要不同的模式(例如使用 `asyncio.new_event_loop()` 显式创建事件循环,而不是使用 `get_running_loop()`)。
</issue_to_address>
### 评论 2
<location> `astrbot/core/platform/sources/cli/cli_adapter.py:134` </location>
<code_context>
+ logger.info("[ENTRY] CLIPlatformAdapter.run inputs={}")
+ return self._run_loop()
+
+ async def _run_loop(self) -> None:
+ """主运行循环
+
</code_context>
<issue_to_address>
**issue (complexity):** 建议通过抽取小的辅助函数、集中复用逻辑以及将模式映射到处理函数的方式来重构 CLI 适配器,使代码在不改变行为的前提下更易理解。
在不改变行为的前提下,你可以通过抽取一些聚焦的小型辅助函数并整合重复逻辑,显著降低复杂度。下面是一些具体、局部化的重构建议,可以保留现有设计,但让代码更易于推理。
---
### 1. 使用策略映射简化模式选择
`_run_loop` 目前在函数体中同时包含 TTY 检测和分支逻辑。你可以将模式解析抽取到一个小的辅助函数中,并将模式映射到可调用对象:
```python
# in __init__
self._mode_handlers: dict[str, callable[[], Awaitable[None]]] = {
"tty": self._run_tty_mode,
"file": self._run_file_mode,
"socket": self._run_socket_mode,
}
def _resolve_mode(self) -> str:
has_tty = sys.stdin.isatty()
if self.mode == "auto":
return "file" if not has_tty else "tty"
if self.mode in ("tty", "file", "socket"):
if self.mode == "tty" and not has_tty:
logger.warning(
"[PROCESS] TTY mode requested but no TTY detected. "
"CLI platform will not start."
)
return "" # or None
return self.mode
logger.error("[ERROR] Unknown mode: %s", self.mode)
return ""
```
```python
async def _run_loop(self) -> None:
logger.info("[PROCESS] Starting CLI loop")
if self.use_isolated_sessions:
self._cleanup_task = asyncio.create_task(self._cleanup_expired_sessions())
mode = self._resolve_mode()
if not mode:
return
handler = self._mode_handlers.get(mode)
if handler:
logger.info("[PROCESS] Starting %s mode", mode)
await handler()
```
这样可以在不改变行为的前提下,压平分支结构,并把 TTY 逻辑局部化。
---
### 2. 抽取一个小型 `SessionTracker` 辅助类
`_convert_input` 和 `_cleanup_expired_sessions` 都会操作 `_session_timestamps`。将它们移入一个小型辅助类中,可以将消息构建和会话生命周期解耦:
```python
class _SessionTracker:
def __init__(self, ttl: int) -> None:
import time
self._ttl = ttl
self._timestamps: dict[str, float] = {}
self._time = time
def ensure_session(self, base_session_id: str, request_id: str | None) -> str:
if request_id is None:
return base_session_id
session_id = f"{base_session_id}_{request_id}"
if session_id not in self._timestamps:
self._timestamps[session_id] = self._time.time()
return session_id
def collect_expired(self) -> list[str]:
now = self._time.time()
expired = [
s for s, ts in list(self._timestamps.items())
if now - ts > self._ttl
]
for s in expired:
self._timestamps.pop(s, None)
return expired
```
将其接入适配器:
```python
# __init__
self._session_tracker = _SessionTracker(self.session_ttl)
```
```python
def _convert_input(self, text: str, request_id: str | None = None) -> AstrBotMessage:
...
if self.use_isolated_sessions and request_id:
message.session_id = self._session_tracker.ensure_session(
base_session_id="cli_session",
request_id=request_id,
)
else:
message.session_id = self.session_id
...
```
```python
async def _cleanup_expired_sessions(self) -> None:
logger.info("[ENTRY] _cleanup_expired_sessions started, TTL=%s seconds", self.session_ttl)
while self._running:
try:
await asyncio.sleep(10)
if not self.use_isolated_sessions:
continue
expired_sessions = self._session_tracker.collect_expired()
for session_id in expired_sessions:
logger.info("[PROCESS] Cleaning expired session: %s", session_id)
# TODO: DB cleanup if needed
if expired_sessions:
logger.info("[PROCESS] Cleaned %d expired sessions", len(expired_sessions))
except Exception as e:
logger.error("[ERROR] Session cleanup error: %s", e)
```
这样可以避免在多个位置直接修改 `_session_timestamps`,并将会话策略集中到一个地方。
---
### 3. 去重图片提取/归一化逻辑
你已经在 `_handle_socket_client` 中实现了图片提取和 base64 转换。如果 `CLIMessageEvent.send`(或类似逻辑)中有重叠代码,可以将其集中到一个可复用的辅助函数中:
```python
from astrbot.core.message.components import Image
def normalize_images_from_chain(message_chain: MessageChain) -> list[dict[str, Any]]:
import base64
images: list[dict[str, Any]] = []
for comp in message_chain.chain:
if not isinstance(comp, Image) or not comp.file:
continue
info: dict[str, Any] = {}
if comp.file.startswith("http"):
info["type"] = "url"
info["url"] = comp.file
elif comp.file.startswith("file:///"):
info["type"] = "file"
file_path = comp.file[8:]
info["path"] = file_path
try:
with open(file_path, "rb") as f:
raw = f.read()
info["base64_data"] = base64.b64encode(raw).decode("utf-8")
info["size"] = len(raw)
except Exception as e:
logger.error("[ERROR] Failed to read image file %s: %s", file_path, e)
info["error"] = str(e)
elif comp.file.startswith("base64://"):
info["type"] = "base64"
base64_data = comp.file[9:]
info["base64_data"] = base64_data
info["base64_length"] = len(base64_data)
images.append(info)
return images
```
在 `_handle_socket_client` 中使用:
```python
from .image_utils import normalize_images_from_chain # or local helper
...
message_chain = await asyncio.wait_for(response_future, timeout=30.0)
response_text = message_chain.get_plain_text()
images = normalize_images_from_chain(message_chain)
response = json.dumps(
{
"status": "success",
"response": response_text,
"images": images,
"request_id": request_id,
},
ensure_ascii=False,
)
await loop.sock_sendall(client_socket, response.encode("utf-8"))
```
并在 `CLIMessageEvent` 或其他复用该逻辑的代码路径中同样复用该辅助函数。
---
### 4. 将 `_handle_socket_client` 拆分为更小的步骤
在不改变行为的前提下,你可以把该方法拆分成解析 / 处理 / 序列化等辅助函数,使其更易测试且更短:
```python
def _parse_socket_request(self, raw: bytes) -> tuple[dict[str, Any], str, str]:
import json
request = json.loads(raw.decode("utf-8"))
message_text = request.get("message", "")
request_id = request.get("request_id", str(uuid.uuid4()))
return request, message_text, request_id
def _serialize_error(self, request_id: str, msg: str) -> bytes:
import json
return json.dumps(
{"status": "error", "error": msg, "request_id": request_id},
ensure_ascii=False,
).encode("utf-8")
def _serialize_success(self, request_id: str, chain: MessageChain) -> bytes:
import json
response_text = chain.get_plain_text()
images = normalize_images_from_chain(chain)
return json.dumps(
{
"status": "success",
"response": response_text,
"images": images,
"request_id": request_id,
},
ensure_ascii=False,
).encode("utf-8")
```
在 `_handle_socket_client` 中使用它们:
```python
async def _handle_socket_client(self, client_socket) -> None:
import json
logger.debug("[ENTRY] _handle_socket_client")
loop = asyncio.get_event_loop()
try:
data = await loop.sock_recv(client_socket, 4096)
if not data:
logger.debug("[PROCESS] Empty request, closing connection")
return
try:
_, message_text, request_id = self._parse_socket_request(data)
except json.JSONDecodeError:
await loop.sock_sendall(
client_socket, self._serialize_error("", "Invalid JSON format")
)
return
response_future: asyncio.Future[MessageChain] = asyncio.Future()
message = self._convert_input(message_text, request_id=request_id)
message_event = CLIMessageEvent(
message_str=message.message_str,
message_obj=message,
platform_meta=self.meta(),
session_id=message.session_id,
output_queue=self._output_queue,
response_future=response_future,
)
self.commit_event(message_event)
try:
chain = await asyncio.wait_for(response_future, timeout=30.0)
await loop.sock_sendall(
client_socket, self._serialize_success(request_id, chain)
)
except asyncio.TimeoutError:
await loop.sock_sendall(
client_socket, self._serialize_error(request_id, "Request timeout")
)
except Exception as e:
logger.error("[ERROR] Socket client handler error: %s", e)
import traceback
logger.error(traceback.format_exc())
finally:
client_socket.close()
logger.debug("[EXIT] _handle_socket_client return=None")
```
这样可以在保持现有行为的同时,将这个较长的方法拆分为更小、可单独测试的步骤。
---
这些改动在保持现有特性和流程的前提下,可以:
- 将会话策略从消息转换逻辑中分离出来;
- 降低模式选择时的分支复杂度;
- 去除图片处理中的重复代码;
- 将套接字客户端处理逻辑拆分得更短、更清晰。
</issue_to_address>帮我变得更有用!请在每条评论上点击 👍 或 👎,我会根据这些反馈来改进后续的评审。
Original comment in English
Hey - I've found 2 issues, and left some high level feedback:
- The CLI adapter currently hardcodes paths like
/AstrBot/data/{config_file}and/tmp/astrbot.sock; consider deriving these from existing config/root-path utilities or allowing them to be overridden so the feature works in non-Docker or non-default deployments. - The whitelist check unconditionally bypasses validation for the
cliplatform; if you expect some environments to use CLI in a more controlled way, it might be safer to gate this behavior behind a config flag rather than hardcoding the exemption. - For the Unix socket server, you may want to explicitly control file permissions/ownership (e.g., via
os.chmodor umask) and add minimal framing/size checks on the JSON payload to avoid issues with partial or oversized reads when multiple clients connect.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The CLI adapter currently hardcodes paths like `/AstrBot/data/{config_file}` and `/tmp/astrbot.sock`; consider deriving these from existing config/root-path utilities or allowing them to be overridden so the feature works in non-Docker or non-default deployments.
- The whitelist check unconditionally bypasses validation for the `cli` platform; if you expect some environments to use CLI in a more controlled way, it might be safer to gate this behavior behind a config flag rather than hardcoding the exemption.
- For the Unix socket server, you may want to explicitly control file permissions/ownership (e.g., via `os.chmod` or umask) and add minimal framing/size checks on the JSON payload to avoid issues with partial or oversized reads when multiple clients connect.
## Individual Comments
### Comment 1
<location> `astrbot/core/platform/sources/cli/cli_adapter.py:302` </location>
<code_context>
+ while self._running:
+ try:
+ # 接受连接(非阻塞)
+ loop = asyncio.get_event_loop()
+ client_socket, _ = await loop.sock_accept(server_socket)
+
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Using asyncio.get_event_loop() in async code is discouraged; asyncio.get_running_loop() is safer and future‑proof.
In Python 3.10+, `asyncio.get_event_loop()` is deprecated in async code and can return the wrong loop under some policies. In `_run_socket_mode` (and similarly in `_handle_socket_client` / `_read_input`), please use `asyncio.get_running_loop()` so you reliably get the loop for the current task.
Suggested implementation:
```python
# 接受连接(非阻塞)
loop = asyncio.get_running_loop()
client_socket, _ = await loop.sock_accept(server_socket)
```
Search in `astrbot/core/platform/sources/cli/cli_adapter.py` for other occurrences of `asyncio.get_event_loop()` used inside `async def` functions, especially in `_handle_socket_client` and `_read_input`, and replace them similarly:
- Change `loop = asyncio.get_event_loop()` to `loop = asyncio.get_running_loop()` where the surrounding context is async code running on the event loop.
If there are any uses of `asyncio.get_event_loop()` in purely synchronous initialization code (outside of an active event loop), those should be reviewed separately, as they may need a different pattern (e.g., explicitly creating a loop with `asyncio.new_event_loop()` rather than `get_running_loop()`).
</issue_to_address>
### Comment 2
<location> `astrbot/core/platform/sources/cli/cli_adapter.py:134` </location>
<code_context>
+ logger.info("[ENTRY] CLIPlatformAdapter.run inputs={}")
+ return self._run_loop()
+
+ async def _run_loop(self) -> None:
+ """主运行循环
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring the CLI adapter by extracting small helpers, centralizing shared logic, and mapping modes to handlers to make the code easier to follow without changing behavior.
You can reduce complexity meaningfully without changing behavior by extracting a few focused helpers and consolidating duplicated logic. Here are concrete, localized refactors that keep the existing design but make it easier to reason about.
---
### 1. Simplify mode selection with a strategy map
`_run_loop` currently mixes TTY detection and branching logic inline. You can pull the mode resolution into a small helper and map modes to callables:
```python
# in __init__
self._mode_handlers: dict[str, callable[[], Awaitable[None]]] = {
"tty": self._run_tty_mode,
"file": self._run_file_mode,
"socket": self._run_socket_mode,
}
def _resolve_mode(self) -> str:
has_tty = sys.stdin.isatty()
if self.mode == "auto":
return "file" if not has_tty else "tty"
if self.mode in ("tty", "file", "socket"):
if self.mode == "tty" and not has_tty:
logger.warning(
"[PROCESS] TTY mode requested but no TTY detected. "
"CLI platform will not start."
)
return "" # or None
return self.mode
logger.error("[ERROR] Unknown mode: %s", self.mode)
return ""
```
```python
async def _run_loop(self) -> None:
logger.info("[PROCESS] Starting CLI loop")
if self.use_isolated_sessions:
self._cleanup_task = asyncio.create_task(self._cleanup_expired_sessions())
mode = self._resolve_mode()
if not mode:
return
handler = self._mode_handlers.get(mode)
if handler:
logger.info("[PROCESS] Starting %s mode", mode)
await handler()
```
This flattens the branching and localizes TTY logic without changing behavior.
---
### 2. Extract a small `SessionTracker` helper
`_convert_input` and `_cleanup_expired_sessions` both manipulate `_session_timestamps`. Move this into a tiny helper to decouple message construction from session lifecycle:
```python
class _SessionTracker:
def __init__(self, ttl: int) -> None:
import time
self._ttl = ttl
self._timestamps: dict[str, float] = {}
self._time = time
def ensure_session(self, base_session_id: str, request_id: str | None) -> str:
if request_id is None:
return base_session_id
session_id = f"{base_session_id}_{request_id}"
if session_id not in self._timestamps:
self._timestamps[session_id] = self._time.time()
return session_id
def collect_expired(self) -> list[str]:
now = self._time.time()
expired = [
s for s, ts in list(self._timestamps.items())
if now - ts > self._ttl
]
for s in expired:
self._timestamps.pop(s, None)
return expired
```
Wire it into the adapter:
```python
# __init__
self._session_tracker = _SessionTracker(self.session_ttl)
```
```python
def _convert_input(self, text: str, request_id: str | None = None) -> AstrBotMessage:
...
if self.use_isolated_sessions and request_id:
message.session_id = self._session_tracker.ensure_session(
base_session_id="cli_session",
request_id=request_id,
)
else:
message.session_id = self.session_id
...
```
```python
async def _cleanup_expired_sessions(self) -> None:
logger.info("[ENTRY] _cleanup_expired_sessions started, TTL=%s seconds", self.session_ttl)
while self._running:
try:
await asyncio.sleep(10)
if not self.use_isolated_sessions:
continue
expired_sessions = self._session_tracker.collect_expired()
for session_id in expired_sessions:
logger.info("[PROCESS] Cleaning expired session: %s", session_id)
# TODO: DB cleanup if needed
if expired_sessions:
logger.info("[PROCESS] Cleaned %d expired sessions", len(expired_sessions))
except Exception as e:
logger.error("[ERROR] Session cleanup error: %s", e)
```
This removes direct mutation of `_session_timestamps` from multiple places and keeps session policy in one spot.
---
### 3. Deduplicate image extraction/normalization
You already do image extraction and base64 conversion in `_handle_socket_client`. If `CLIMessageEvent.send` (or similar) has overlapping logic, centralize it in a reusable helper:
```python
from astrbot.core.message.components import Image
def normalize_images_from_chain(message_chain: MessageChain) -> list[dict[str, Any]]:
import base64
images: list[dict[str, Any]] = []
for comp in message_chain.chain:
if not isinstance(comp, Image) or not comp.file:
continue
info: dict[str, Any] = {}
if comp.file.startswith("http"):
info["type"] = "url"
info["url"] = comp.file
elif comp.file.startswith("file:///"):
info["type"] = "file"
file_path = comp.file[8:]
info["path"] = file_path
try:
with open(file_path, "rb") as f:
raw = f.read()
info["base64_data"] = base64.b64encode(raw).decode("utf-8")
info["size"] = len(raw)
except Exception as e:
logger.error("[ERROR] Failed to read image file %s: %s", file_path, e)
info["error"] = str(e)
elif comp.file.startswith("base64://"):
info["type"] = "base64"
base64_data = comp.file[9:]
info["base64_data"] = base64_data
info["base64_length"] = len(base64_data)
images.append(info)
return images
```
Then in `_handle_socket_client`:
```python
from .image_utils import normalize_images_from_chain # or local helper
...
message_chain = await asyncio.wait_for(response_future, timeout=30.0)
response_text = message_chain.get_plain_text()
images = normalize_images_from_chain(message_chain)
response = json.dumps(
{
"status": "success",
"response": response_text,
"images": images,
"request_id": request_id,
},
ensure_ascii=False,
)
await loop.sock_sendall(client_socket, response.encode("utf-8"))
```
And reuse the same helper wherever you currently replicate that logic in `CLIMessageEvent` or other code paths.
---
### 4. Factor `_handle_socket_client` into smaller steps
Without changing behavior, you can split the method into parsing / processing / serializing helpers to make it testable and shorter:
```python
def _parse_socket_request(self, raw: bytes) -> tuple[dict[str, Any], str, str]:
import json
request = json.loads(raw.decode("utf-8"))
message_text = request.get("message", "")
request_id = request.get("request_id", str(uuid.uuid4()))
return request, message_text, request_id
def _serialize_error(self, request_id: str, msg: str) -> bytes:
import json
return json.dumps(
{"status": "error", "error": msg, "request_id": request_id},
ensure_ascii=False,
).encode("utf-8")
def _serialize_success(self, request_id: str, chain: MessageChain) -> bytes:
import json
response_text = chain.get_plain_text()
images = normalize_images_from_chain(chain)
return json.dumps(
{
"status": "success",
"response": response_text,
"images": images,
"request_id": request_id,
},
ensure_ascii=False,
).encode("utf-8")
```
Use them in `_handle_socket_client`:
```python
async def _handle_socket_client(self, client_socket) -> None:
import json
logger.debug("[ENTRY] _handle_socket_client")
loop = asyncio.get_event_loop()
try:
data = await loop.sock_recv(client_socket, 4096)
if not data:
logger.debug("[PROCESS] Empty request, closing connection")
return
try:
_, message_text, request_id = self._parse_socket_request(data)
except json.JSONDecodeError:
await loop.sock_sendall(
client_socket, self._serialize_error("", "Invalid JSON format")
)
return
response_future: asyncio.Future[MessageChain] = asyncio.Future()
message = self._convert_input(message_text, request_id=request_id)
message_event = CLIMessageEvent(
message_str=message.message_str,
message_obj=message,
platform_meta=self.meta(),
session_id=message.session_id,
output_queue=self._output_queue,
response_future=response_future,
)
self.commit_event(message_event)
try:
chain = await asyncio.wait_for(response_future, timeout=30.0)
await loop.sock_sendall(
client_socket, self._serialize_success(request_id, chain)
)
except asyncio.TimeoutError:
await loop.sock_sendall(
client_socket, self._serialize_error(request_id, "Request timeout")
)
except Exception as e:
logger.error("[ERROR] Socket client handler error: %s", e)
import traceback
logger.error(traceback.format_exc())
finally:
client_socket.close()
logger.debug("[EXIT] _handle_socket_client return=None")
```
This keeps the same behavior but breaks the long method into smaller, independently testable pieces.
---
These changes preserve the existing feature set and flow, but:
- Isolate session policy from message conversion.
- Reduce branching complexity in mode selection.
- Remove duplication in image handling.
- Shorten and clarify the socket client handler into clear sub-steps.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
- Replace hardcoded paths with dynamic path resolution - Use get_astrbot_data_path() for data directory - Use get_astrbot_temp_path() for temp directory - Support ASTRBOT_ROOT environment variable - Fix asyncio deprecation warnings - Replace asyncio.get_event_loop() with get_running_loop() - Improve Python 3.10+ compatibility - Add socket file permission control - Set socket permissions to 600 (owner-only) - Add security logging - Update astrbot-cli client - Add dynamic path resolution functions - Match server-side path logic - Improve cross-environment compatibility Addresses code review feedback from PR AstrBotDevs#4787
|
建议加个Windows兼容性支持 |
4aec9ef to
b070282
Compare
|
已为该 PR 生成文档更新 PR(待人工审核): AI 改动摘要: 根据 PR 内容,我已更新了文档,增加了关于 CLI 测试器 (CLI Tester) 的说明。 修改内容:
文档要点:
|
|
比如说后续对bot本体进行开发,也可以随时部署,随时就测试,而无需繁琐的适配器配置 |
|
已为该 PR 生成文档更新 PR(待人工审核): AI 改动摘要: 根据上游 PR #4787 的改动,我已更新了文档以包含新引入的 CLI Tester (CLI测试器)。 主要改动:
文档摘要:
|
4cad0a4 to
0a865f2
Compare
|
修改pyproject.toml astr : 侧重于开发,以及更加复杂的功能,控制daemon进程等 我建议在后面加上 |
|
不要在根目录丢一个astrbot-cli |
谢谢建议,我已修改,可以看一下我新的pr描述。 |
|
astrbot既然都导入click这个命令行程序库了,你加入的这个也用click吧,就不要再用标准库的写法了,后面肯定还要加上命令补全的,按tab补全 |
谢谢建议,已经修改了~ |
已被 usage: astr [-h] [-s SOCKET] [-t TIMEOUT] [-j] [--log] [--lines LINES]
[--level LEVEL] [--pattern PATTERN]
[message]
AstrBot CLI Client - Send messages to AstrBot CLI Platform (Unix Socket or TCP Socket)
positional arguments:
message Message to send (if not provided, read from stdin)
options:
-h, --help show this help message and exit
-s SOCKET, --socket SOCKET
Unix socket path (default: {temp_dir}/astrbot.sock)
-t TIMEOUT, --timeout TIMEOUT
Timeout in seconds (default: 30.0)
-j, --json Output raw JSON response
--log Get recent console logs (instead of sending a message)
--lines LINES Number of log lines to return (default: 100, max:
1000)
--level LEVEL Filter logs by level
(DEBUG/INFO/WARNING/ERROR/CRITICAL)
--pattern PATTERN Filter logs by pattern (substring match)
Examples:
astr "你好"
astr "/help"
astr --socket /tmp/custom.sock "测试消息"
echo "你好" | astr
Connection:
Automatically detects connection type from .cli_connection file.
Falls back to default Unix Socket if file not found.
命令替代,功能已整合到 包中
…nes LINES]
[--level LEVEL] [--pattern PATTERN]
[message]
AstrBot CLI Client - 与 CLI Platform 通信的客户端工具
positional arguments:
message Message to send (if not provided, read from stdin)
options:
-h, --help show this help message and exit
-s SOCKET, --socket SOCKET
Unix socket path (default: {temp_dir}/astrbot.sock)
-t TIMEOUT, --timeout TIMEOUT
Timeout in seconds (default: 30.0)
-j, --json Output raw JSON response
--log Get recent console logs (instead of sending a message)
--lines LINES Number of log lines to return (default: 100, max:
1000)
--level LEVEL Filter logs by level
(DEBUG/INFO/WARNING/ERROR/CRITICAL)
--pattern PATTERN Filter logs by pattern (substring match)
使用示例:
发送消息:
astr "你好" # 发送消息给 AstrBot
astr "/help" # 查看内置帮助
echo "你好" | astr # 从标准输入读取
获取日志:
astr --log # 获取最近 100 行日志
astr --log --lines 50 # 获取最近 50 行
astr --log --level ERROR # 只显示 ERROR 级别
astr --log --pattern "CLI" # 只显示包含 "CLI" 的日志
astr --log --json # 以 JSON 格式输出日志
高级选项:
astr -j "测试" # 输出原始 JSON 响应
astr -t 60 "长时间任务" # 设置超时时间为 60 秒
连接说明:
- 自动从 data/.cli_connection 文件检测连接类型(Unix Socket 或 TCP)
- Token 自动从 data/.cli_token 文件读取
- 必须在 AstrBot 根目录下运行,或设置 ASTRBOT_ROOT 环境变量
输出
- 添加中文说明
- 补充详细使用示例(发送消息、获取日志、高级选项)
- 添加连接说明
- / 不是 AstrBot 的命令前缀 - help 等内置命令不需要 / 前缀 - 添加内置命令使用示例说明 - 说明带 / 的消息会发给 LLM 处理
使用 argparse.REMAINDER 模式捕获所有剩余参数, 允许 /plugin ls 这类命令正常工作。
在接收 Socket 数据时,如果多字节字符被截断在缓冲区边界会导致解码失败。 使用 errors="replace" 参数来处理截断的 UTF-8 字符。
- 添加 format_response() 函数处理分段回复和图片占位符 - 图片显示为 [图片] 或 [N张图片] 占位符 - 更新 --help 说明,添加输出说明部分 - 修复 socket_handler.py 中 action 变量未定义的问题 - 修复 _get_logs() 方法返回格式问题
当日志文件不存在时,返回中文提示信息, 说明需要在配置中启用 log_file_enable。
- 添加 noqa 注释抑制必要的警告 - 格式化代码符合 ruff 规范
Git Bash (MSYS2) 会把 /plugin ls 转换为 C:/Program Files/Git/plugin ls 添加 fix_git_bash_path() 函数检测并还原原始命令
1. astrbot/cli/client/__main__.py: argparse → click (LIghtJUNction review) - 子命令结构: astr send / astr log - DefaultToSend兼容: astr 你好 等价于 astr send 你好 - RawEpilogGroup保留帮助文本原始格式 - 支持shell tab补全(Bash/Zsh/Fish) 2. tcp_socket_server.py: get_event_loop() → get_running_loop() (sourcery-ai review) - Python 3.10+弃用get_event_loop(),async函数中应使用get_running_loop()
- astr -j "你好" → 自动路由到 astr send -j "你好" - astr -t 60 "你好" → 自动路由到 astr send -t 60 "你好" - astr --log → 自动路由到 astr log - 更新帮助文本,展示新旧两种用法
问题: - `astr log --level ERROR` 无法筛选到 [ERRO] 日志 - 日志文件使用4字符缩写 [ERRO]/[WARN]/[INFO] - 但用户输入的是完整名称 ERROR/WARN/INFO 修复: - 添加 LEVEL_MAP 映射表,将完整名称映射到缩写 ERROR -> ERRO, WARNING -> WARN, CRITICAL -> CRIT - 使用正则表达式精确匹配 [级别] 格式 - 避免 ERROR 错误匹配到 ERROR_INFO 等字符串 测试: - `astr log --level ERROR` ✅ 筛选 [ERRO] 日志 - `astr log --level WARN` ✅ 筛选 [WARN] 日志 - `astr log --level WARNING` ✅ 映射到 [WARN]
- 默认直接读取日志文件,无需 AstrBot 运行 - 添加 --socket 参数,保留通过 Socket 获取日志的功能 - 正则表达式在文件模式下完全支持,无转义问题 - 简化使用,提高可靠性 示例: astr log # 默认:直接读取文件 astr log --level ERROR # 筛选 ERROR 级别 astr log --pattern "ERRO|WARN" --regex # 正则匹配 astr log --socket # 通过 Socket 获取(需 AstrBot 运行)
- 修复 CLI 适配器插件指令执行后触发 LLM 的问题 - 新增 restart 命令,支持停止并重启 AstrBot 实例 - 添加 --window 选项,Windows 下可在新窗口启动 - Linux/macOS 默认在当前窗口运行(服务器环境) - Windows 默认在当前窗口,可选 --window 在新窗口启动
- Windows 默认在新窗口启动,使用 --no-window 在当前窗口 - Linux/macOS 始终在当前窗口运行(服务器环境) - 修改 run 和 restart 命令的行为
- CLIMessageEvent: 用 finalize() 替代 _delayed_response() 延迟机制, 管道完成后统一返回响应,解决工具调用响应截断问题 - CLIMessageEvent: 添加 send_streaming() 支持,采用收集后一次性发送策略 - SocketClientHandler: 超时从30s增加到120s,处理 finalize 返回 None 的情况 - PipelineScheduler: 管道完成后调用 event.finalize()(鸭子类型) - CLIAdapter: 添加 get_stats()/unified_webhook() 兼容 CLIConfig 数据类 - PlatformManager: 安全获取平台ID,兼容 dict 和 dataclass 类型 - PlatformRoute: 兼容 CLIConfig 的 webhook_uuid 获取方式 - MessageConverter: 补充 raw_message 字段
不是很懂?但是感觉你说的应该可以实现,你可以试试效果 |
- get_astrbot_root: 支持环境变量 ASTRBOT_ROOT 和向上查找 .astrbot 标记 - run: 默认当前窗口运行,--new-window 才开新窗口 - restart: 保持默认新窗口行为不变
|
rebase了一下,同时修复了适配器兼容性问题,优化了消息接收逻辑,稍微优化了一下astrbot cli服务端的指令,添加了restart指令以及使run和restart指令能够全局调用 |
- get_astrbot_root() 优先通过 __file__ 定位源码目录,避免匹配到错误的 .astrbot 标记 - get_temp_path() 增加 __file__ 回退,硬编码 /tmp 改为 tempfile.gettempdir() 兼容 Windows - launch_in_new_window() 用 CREATE_NEW_CONSOLE 替代 Start-Process powershell,修复环境变量传递 - taskkill 加 /T 参数杀进程树,避免子进程残留
|
命令: 结果: 当前已观察到的不足/风险:
|



CLI Tester - 命令行测试平台
概述
为 AstrBot 添加 CLI 平台适配器,支持通过命令行直接测试插件功能,无需配置 IM 平台。
功能特性
核心功能
use_isolated_sessions配置,每个请求独立会话消息处理
astr命令(基于 click 库)AOP 装饰器
@handle_exceptions: 统一异常处理@retry: 重试机制@timeout: 超时控制@log_entry_exit: 入口出口日志@log_performance: 性能监控@require_auth: Token 认证@require_whitelist: 白名单校验文件架构
平台适配器
CLI 客户端(click 子命令架构)
配置修改
pyproject.toml 添加
astr命令:对现有代码的修改
Pipeline 扩展(不影响现有适配器)
astrbot/core/pipeline/scheduler.py: 管道执行完毕后通过鸭子类型调用event.finalize(),仅 CLI 事件实现了该方法,其他适配器无此方法直接跳过平台管理器 / Dashboard 兼容
astrbot/core/platform/manager.py: 新增_get_platform_id()兼容 dataclass 配置astrbot/dashboard/routes/platform.py:webhook_uuid查找兼容 dict 和 dataclassCLI 服务端命令改进
astrbot/cli/utils/basic.py:get_astrbot_root()支持环境变量 → 向上查找.astrbot标记 → 回退 cwd 三级查找,run/restart可在任意目录执行astrbot/cli/commands/cmd_run.py:astrbot run默认当前窗口运行,新增--new-window选项astrbot/cli/commands/cmd_restart.py:astrbot restart保持 Windows 默认新窗口行为采纳的 Review 修改
1. CLI 客户端改用 click(@LIghtJUNction review)
argparse→click,与项目已有的astrbotCLI 风格统一astr send/astr log2. asyncio API 修复(@sourcery-ai review)
tcp_socket_server.py:asyncio.get_event_loop()→asyncio.get_running_loop()get_event_loop(),async 函数中应使用get_running_loop()测试覆盖
共 76 个单元测试,覆盖所有核心模块:
使用方式
安装后全局使用
安装后可在任何目录使用
astr命令。发送消息
获取日志
高级选项
Shell Tab 补全
服务端命令
以上命令支持在任意目录执行,自动向上查找 AstrBot 根目录。
配置说明
编辑
data/cmd_config.json:{ "platform": [{ "id": "cli_test", "type": "cli", "enable": true, "mode": "socket" }] }输出说明
[图片]占位符显示-j参数获取完整 JSON(包含图片 URL/base64)astr log查看详细日志astr --help查看完整帮助信息设计原则
兼容性
/plugin→C:/Program Files/Git/plugin自动还原)