Conversation
|
Related Documentation 1 document(s) may need updating based on files changed in this PR: AstrBotTeam's Space pr4697的改动View Suggested Changes@@ -6,10 +6,15 @@
### 1. SubAgent(子代理)架构
#### 架构说明
-主动代理系统引入了 SubAgent(子代理)架构。主代理(MainAgent)不仅专注于对话和任务委派,还可以直接使用自身工具集。具体工具和任务也可以由子代理(SubAgent)处理。这样可以有效避免 prompt 膨胀和调用失败,同时提升主代理的灵活性。
+主代理可以通过 `transfer_to_<subagent>` 工具将任务转交给指定子代理。现在该工具新增了 `background_mission` 参数:
-- 工具分配:每个 SubAgent 继承自 Persona,拥有独立的工具集、技能、名称和描述。主代理可以直接调用自身工具,也可以通过 `transfer_to_<subagent>` 工具将任务转交给指定子代理。启用 SubAgent 编排后,主代理会保留自身工具,并自动挂载 handoff 工具(transfer_to_*),可根据配置决定是否移除与子代理重复的工具。
-- 任务流转:主代理既可以直接处理任务,也可以分发任务给子代理。子代理之间可通过 transfer 工具实现任务转移。主代理和子代理的工具集可以根据配置灵活分配,支持去重和 handoff 工具自动挂载。
+- `background_mission`: 若设置为 `true`,任务将以后台模式异步执行,主代理会立即返回任务 ID,任务完成后通过后台任务唤醒机制通知用户结果。适用于长时间或非紧急任务。
+
+任务流转支持同步和异步两种模式:
+- 同步模式(默认):主代理等待子代理完成任务后返回结果。
+- 异步模式(background_mission=true):主代理立即返回任务 ID,子代理后台执行,完成后自动通知用户。
+
+这样,主代理既可以自主处理,也可以灵活委派任务,并支持异步任务流转。
#### 使用示例
假设有两个子代理:`DataAgent` 和 `ReportAgent`,分别负责数据处理和报告生成。主代理收到用户请求后,可以选择直接使用自身工具处理部分任务,或通过 `transfer_to_DataAgent` 工具将数据处理任务分配给 DataAgent,处理完成后再通过 `transfer_to_ReportAgent` 工具将结果交给 ReportAgent 生成报告。
@@ -25,6 +30,10 @@
#### 配置说明
SubAgent 的定义与 Persona 配置一致,需在配置文件中指定 tools、skills、name、description 等。
+
+transfer_to_* 工具现支持 `background_mission` 参数:
+- 在调用时可传入 `background_mission: true`,任务将以后台模式异步执行,主代理立即返回任务 ID,完成后通过后台任务唤醒机制通知用户。
+- 若未设置或为 `false`,则为同步模式,主代理等待子代理完成任务。
主代理在未启用 SubAgent 编排时,可直接挂载自身工具集(按 persona 规则,默认全部),并直接调用工具。
Note: You must be authenticated to accept/decline updates. |
There was a problem hiding this comment.
Hey - 我发现了两个问题,并留下了一些整体性的反馈:
- 后台交接(background handoff)路径在
asyncio.create_task中复用了run_context(以及其中的event/context),而该 task 的生命周期可能会长于原始请求;建议在启动后台任务前,将你需要的不可变数据拷贝到一个普通 dict 中,再传给后台任务,以避免生命周期或并发问题。 - 在
_do_handoff_background中,你重新构建了主 agent,并以类似现有后台逻辑的方式遍历会话历史;可以考虑把这部分通用的请求构建 / 初始化逻辑提取到一个 helper 中,避免前台和后台执行路径逐渐产生偏差。 - 后台任务相关的用户可见文案(例如初始的
CallToolResult消息和summary_note)目前是硬编码的英文;如果系统其它部分是支持多语言/语言自适应的,建议复用现有的本地化或语言检测工具,使输出语言与用户语言保持一致。
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The background handoff path reuses `run_context` (and its `event`/`context`) inside an `asyncio.create_task`, which may outlive the original request; consider copying only the immutable data you need into a plain dict before spawning the background task to avoid lifecycle or concurrency issues.
- In `_do_handoff_background`, you rebuild a main agent and traverse conversation history similarly to existing background logic; it may be worth extracting the shared request-building/setup code into a helper to avoid divergence between background and foreground execution paths.
- The user-facing strings for background missions (e.g., the initial `CallToolResult` message and `summary_note`) are currently hard-coded English; if the rest of the system is language-adaptive, you might want to leverage existing localization or language-detection utilities to match the user’s language.
## Individual Comments
### Comment 1
<location> `astrbot/core/astr_agent_tool_exec.py:258` </location>
<code_context>
+ req = ProviderRequest()
+ conv = await _get_session_conv(event=cron_event, plugin_context=ctx)
+ req.conversation = conv
+ context = json.loads(conv.history)
+ if context:
+ req.contexts = context
</code_context>
<issue_to_address>
**issue (bug_risk):** 考虑让 `json.loads(conv.history)` 对异常或不合法的历史数据更加健壮。
如果 `conv.history` 可能为空、为 `None`,或者不是合法 JSON(例如来源于旧数据或部分写入),`json.loads` 会抛异常并中断后台交接流程。建议用 try/except 包裹,或者先检查是否为有效且非空的 JSON 字符串,并在历史数据无效时将其视为“无上下文(no context)”,以便后台任务仍然可以完成。
</issue_to_address>
### Comment 2
<location> `astrbot/core/astr_agent_tool_exec.py:200` </location>
<code_context>
+ yield mcp.types.CallToolResult(content=[text_content])
+
+ @classmethod
+ async def _do_handoff_background(
+ cls,
+ tool: HandoffTool,
</code_context>
<issue_to_address>
**issue (complexity):** 建议通过提取共享的后台执行和唤醒主 agent 逻辑到可复用的 helper,中幅重构 `_do_handoff_background`,让该方法只负责编排流程。
通过把通用的“运行子 agent + 用 background_task_result 唤醒主 agent”的编排逻辑拆分成小的 helper,并复用 `_execute_background` 中已有的模式,可以显著降低 `_do_handoff_background` 的复杂度和重复度。
### 1. 分离交接结果收集逻辑
下面这段逻辑:
```python
result_text = ""
try:
async for r in cls._execute_handoff(tool, run_context, **tool_args):
if isinstance(r, mcp.types.CallToolResult):
for content in r.content:
if isinstance(content, mcp.types.TextContent):
result_text += content.text + "\n"
except Exception as e:
result_text = (
f"error: Background handoff execution failed, internal error: {e!s}"
)
```
可以移动到一个职责单一的 helper 中,这样 `_do_handoff_background` 不再同时处理流式结果与流程编排:
```python
@classmethod
async def _collect_handoff_result(
cls,
tool: HandoffTool,
run_context: ContextWrapper[AstrAgentContext],
**tool_args: Any,
) -> str:
result_text = ""
try:
async for r in cls._execute_handoff(tool, run_context, **tool_args):
if isinstance(r, mcp.types.CallToolResult):
for content in r.content:
if isinstance(content, mcp.types.TextContent):
result_text += content.text + "\n"
except Exception as e:
result_text = (
f"error: Background handoff execution failed, internal error: {e!s}"
)
return result_text
```
然后 `_do_handoff_background` 只需调用:
```python
result_text = await cls._collect_handoff_result(tool, run_context, **tool_args)
```
### 2. 复用通用的“用后台结果唤醒主 agent”逻辑
`_do_handoff_background` 的大部分逻辑和 `_execute_background` / 通用后台模式相似(构建 `CronMessageEvent`、配置 `ProviderRequest`、挂载历史、追加 `BACKGROUND_TASK_RESULT_WOKE_SYSTEM_PROMPT`、运行 agent、持久化历史)。
可以将这部分逻辑抽成一个通用工具函数,让 `_execute_background` 与 `_do_handoff_background` 都调用它:
```python
async def _wake_main_agent_with_background_result(
*,
ctx: AstrAgentContext,
base_event: AstrMessageEvent,
note: str,
background_task_result: dict[str, Any],
) -> None:
from astrbot.core.astr_main_agent import (
MainAgentBuildConfig,
_get_session_conv,
build_main_agent,
)
session = MessageSession.from_str(base_event.unified_msg_origin)
cron_event = CronMessageEvent(
context=ctx,
session=session,
message=note,
extras={"background_task_result": background_task_result},
message_type=session.message_type,
)
cron_event.role = base_event.role
req = ProviderRequest()
conv = await _get_session_conv(event=cron_event, plugin_context=ctx)
req.conversation = conv
context = json.loads(conv.history)
if context:
req.contexts = context
context_dump = req._print_friendly_context()
req.contexts = []
req.system_prompt += (
"\n\nBellow is you and user previous conversation history:\n"
f"{context_dump}"
)
bg = json.dumps(background_task_result, ensure_ascii=False)
req.system_prompt += BACKGROUND_TASK_RESULT_WOKE_SYSTEM_PROMPT.format(
background_task_result=bg
)
req.prompt = (
"Proceed according to your system instructions. "
"Output using same language as previous conversation."
" After completing your task, summarize and output your actions and results."
)
if not req.func_tool:
req.func_tool = ToolSet()
req.func_tool.add_tool(SEND_MESSAGE_TO_USER_TOOL)
config = MainAgentBuildConfig(tool_call_timeout=3600)
result = await build_main_agent(
event=cron_event, plugin_context=ctx, config=config, req=req
)
if not result:
logger.error("Failed to build main agent for background mission.")
return
runner = result.agent_runner
async for _ in runner.step_until_done(30):
pass
llm_resp = runner.get_final_llm_resp()
summary_note = _build_background_summary_note(
tool_name=background_task_result.get("tool_name"),
subagent_name=background_task_result.get("subagent_name"),
task_id=background_task_result.get("task_id"),
result_text=background_task_result.get("result"),
llm_resp=llm_resp,
)
await persist_agent_history(
ctx.conversation_manager,
event=cron_event,
req=req,
summary_note=summary_note,
)
if not llm_resp:
logger.warning("background mission agent got no response")
```
### 3. 抽取 summary note 构造逻辑
将当前内联构造字符串的逻辑:
```python
task_meta = extras.get("background_task_result", {})
summary_note = (
f"[BackgroundMission] {task_meta.get('subagent_name', tool.agent.name)} "
f"(task_id={task_meta.get('task_id', task_id)}) finished. "
f"Result: {task_meta.get('result') or result_text or 'no content'}"
)
if llm_resp and llm_resp.completion_text:
summary_note += (
f"I finished the task, here is the result: {llm_resp.completion_text}"
)
```
提取成一个 helper,同时供两种后台路径使用:
```python
def _build_background_summary_note(
*,
tool_name: str | None,
subagent_name: str | None,
task_id: str,
result_text: str,
llm_resp: Any | None,
) -> str:
base = (
f"[BackgroundMission] {subagent_name or tool_name or 'background_task'} "
f"(task_id={task_id}) finished. "
f"Result: {result_text or 'no content'}"
)
if llm_resp and getattr(llm_resp, "completion_text", None):
return (
base
+ " I finished the task, here is the result: "
+ llm_resp.completion_text
)
return base
```
### 4. 让 `_do_handoff_background` 只负责流程编排
有了上述 helper 之后,`_do_handoff_background` 就可以简化为只做“接线”工作:
```python
@classmethod
async def _do_handoff_background(
cls,
tool: HandoffTool,
run_context: ContextWrapper[AstrAgentContext],
task_id: str,
**tool_args,
) -> None:
event = run_context.context.event
ctx = run_context.context.context
result_text = await cls._collect_handoff_result(tool, run_context, **tool_args)
note = (
event.get_extra("background_note")
or f"Background subagent mission '{tool.agent.name}' finished."
)
background_task_result = {
"task_id": task_id,
"tool_name": tool.name,
"subagent_name": tool.agent.name,
"result": result_text or "",
"tool_args": tool_args,
}
await _wake_main_agent_with_background_result(
ctx=ctx,
base_event=event,
note=note,
background_task_result=background_task_result,
)
```
这样可以在保持现有行为的前提下:
- 去除重复的 ProviderRequest / 历史处理 / 唤醒逻辑;
- 将 `_do_handoff_background` 缩减为一个简短、单一职责的编排方法;
- 让未来对后台唤醒行为的修改可以同时作用于交接任务和普通后台任务。
</issue_to_address>帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进评审质量。
Original comment in English
Hey - I've found 2 issues, and left some high level feedback:
- The background handoff path reuses
run_context(and itsevent/context) inside anasyncio.create_task, which may outlive the original request; consider copying only the immutable data you need into a plain dict before spawning the background task to avoid lifecycle or concurrency issues. - In
_do_handoff_background, you rebuild a main agent and traverse conversation history similarly to existing background logic; it may be worth extracting the shared request-building/setup code into a helper to avoid divergence between background and foreground execution paths. - The user-facing strings for background missions (e.g., the initial
CallToolResultmessage andsummary_note) are currently hard-coded English; if the rest of the system is language-adaptive, you might want to leverage existing localization or language-detection utilities to match the user’s language.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The background handoff path reuses `run_context` (and its `event`/`context`) inside an `asyncio.create_task`, which may outlive the original request; consider copying only the immutable data you need into a plain dict before spawning the background task to avoid lifecycle or concurrency issues.
- In `_do_handoff_background`, you rebuild a main agent and traverse conversation history similarly to existing background logic; it may be worth extracting the shared request-building/setup code into a helper to avoid divergence between background and foreground execution paths.
- The user-facing strings for background missions (e.g., the initial `CallToolResult` message and `summary_note`) are currently hard-coded English; if the rest of the system is language-adaptive, you might want to leverage existing localization or language-detection utilities to match the user’s language.
## Individual Comments
### Comment 1
<location> `astrbot/core/astr_agent_tool_exec.py:258` </location>
<code_context>
+ req = ProviderRequest()
+ conv = await _get_session_conv(event=cron_event, plugin_context=ctx)
+ req.conversation = conv
+ context = json.loads(conv.history)
+ if context:
+ req.contexts = context
</code_context>
<issue_to_address>
**issue (bug_risk):** Consider making `json.loads(conv.history)` more robust against malformed or empty history.
If `conv.history` can ever be empty, `None`, or non-JSON (e.g., from older data or partial writes), `json.loads` will raise and stop the background handoff flow. Consider wrapping this in a try/except or checking for a valid, non-empty JSON string first, and treating invalid history as “no context” so the mission can still complete.
</issue_to_address>
### Comment 2
<location> `astrbot/core/astr_agent_tool_exec.py:200` </location>
<code_context>
+ yield mcp.types.CallToolResult(content=[text_content])
+
+ @classmethod
+ async def _do_handoff_background(
+ cls,
+ tool: HandoffTool,
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring `_do_handoff_background` by extracting shared background-execution and wake-up logic into reusable helpers so the method only orchestrates the flow.
You can significantly reduce complexity and duplication in `_do_handoff_background` by extracting the generic “run subagent + wake main agent with background_task_result” orchestration into small helpers and reusing the same pattern as `_execute_background`.
### 1. Isolate the handoff result collection
All this logic:
```python
result_text = ""
try:
async for r in cls._execute_handoff(tool, run_context, **tool_args):
if isinstance(r, mcp.types.CallToolResult):
for content in r.content:
if isinstance(content, mcp.types.TextContent):
result_text += content.text + "\n"
except Exception as e:
result_text = (
f"error: Background handoff execution failed, internal error: {e!s}"
)
```
can be moved to a focused helper so `_do_handoff_background` no longer mixes streaming handling with orchestration:
```python
@classmethod
async def _collect_handoff_result(
cls,
tool: HandoffTool,
run_context: ContextWrapper[AstrAgentContext],
**tool_args: Any,
) -> str:
result_text = ""
try:
async for r in cls._execute_handoff(tool, run_context, **tool_args):
if isinstance(r, mcp.types.CallToolResult):
for content in r.content:
if isinstance(content, mcp.types.TextContent):
result_text += content.text + "\n"
except Exception as e:
result_text = (
f"error: Background handoff execution failed, internal error: {e!s}"
)
return result_text
```
Then `_do_handoff_background` calls:
```python
result_text = await cls._collect_handoff_result(tool, run_context, **tool_args)
```
### 2. Reuse generic “wake main agent with background result” logic
Most of `_do_handoff_background` mirrors `_execute_background` / the general background pattern (build `CronMessageEvent`, configure `ProviderRequest`, attach history, add `BACKGROUND_TASK_RESULT_WOKE_SYSTEM_PROMPT`, run agent, persist history).
Move that into a reusable utility that both `_execute_background` and `_do_handoff_background` can call:
```python
async def _wake_main_agent_with_background_result(
*,
ctx: AstrAgentContext,
base_event: AstrMessageEvent,
note: str,
background_task_result: dict[str, Any],
) -> None:
from astrbot.core.astr_main_agent import (
MainAgentBuildConfig,
_get_session_conv,
build_main_agent,
)
session = MessageSession.from_str(base_event.unified_msg_origin)
cron_event = CronMessageEvent(
context=ctx,
session=session,
message=note,
extras={"background_task_result": background_task_result},
message_type=session.message_type,
)
cron_event.role = base_event.role
req = ProviderRequest()
conv = await _get_session_conv(event=cron_event, plugin_context=ctx)
req.conversation = conv
context = json.loads(conv.history)
if context:
req.contexts = context
context_dump = req._print_friendly_context()
req.contexts = []
req.system_prompt += (
"\n\nBellow is you and user previous conversation history:\n"
f"{context_dump}"
)
bg = json.dumps(background_task_result, ensure_ascii=False)
req.system_prompt += BACKGROUND_TASK_RESULT_WOKE_SYSTEM_PROMPT.format(
background_task_result=bg
)
req.prompt = (
"Proceed according to your system instructions. "
"Output using same language as previous conversation."
" After completing your task, summarize and output your actions and results."
)
if not req.func_tool:
req.func_tool = ToolSet()
req.func_tool.add_tool(SEND_MESSAGE_TO_USER_TOOL)
config = MainAgentBuildConfig(tool_call_timeout=3600)
result = await build_main_agent(
event=cron_event, plugin_context=ctx, config=config, req=req
)
if not result:
logger.error("Failed to build main agent for background mission.")
return
runner = result.agent_runner
async for _ in runner.step_until_done(30):
pass
llm_resp = runner.get_final_llm_resp()
summary_note = _build_background_summary_note(
tool_name=background_task_result.get("tool_name"),
subagent_name=background_task_result.get("subagent_name"),
task_id=background_task_result.get("task_id"),
result_text=background_task_result.get("result"),
llm_resp=llm_resp,
)
await persist_agent_history(
ctx.conversation_manager,
event=cron_event,
req=req,
summary_note=summary_note,
)
if not llm_resp:
logger.warning("background mission agent got no response")
```
### 3. Extract summary-note construction
Lift this inline string-building:
```python
task_meta = extras.get("background_task_result", {})
summary_note = (
f"[BackgroundMission] {task_meta.get('subagent_name', tool.agent.name)} "
f"(task_id={task_meta.get('task_id', task_id)}) finished. "
f"Result: {task_meta.get('result') or result_text or 'no content'}"
)
if llm_resp and llm_resp.completion_text:
summary_note += (
f"I finished the task, here is the result: {llm_resp.completion_text}"
)
```
into a helper used by both background paths:
```python
def _build_background_summary_note(
*,
tool_name: str | None,
subagent_name: str | None,
task_id: str,
result_text: str,
llm_resp: Any | None,
) -> str:
base = (
f"[BackgroundMission] {subagent_name or tool_name or 'background_task'} "
f"(task_id={task_id}) finished. "
f"Result: {result_text or 'no content'}"
)
if llm_resp and getattr(llm_resp, "completion_text", None):
return (
base
+ " I finished the task, here is the result: "
+ llm_resp.completion_text
)
return base
```
### 4. Simplify `_do_handoff_background` to orchestration only
With the above helpers, `_do_handoff_background` reduces to wiring:
```python
@classmethod
async def _do_handoff_background(
cls,
tool: HandoffTool,
run_context: ContextWrapper[AstrAgentContext],
task_id: str,
**tool_args,
) -> None:
event = run_context.context.event
ctx = run_context.context.context
result_text = await cls._collect_handoff_result(tool, run_context, **tool_args)
note = (
event.get_extra("background_note")
or f"Background subagent mission '{tool.agent.name}' finished."
)
background_task_result = {
"task_id": task_id,
"tool_name": tool.name,
"subagent_name": tool.agent.name,
"result": result_text or "",
"tool_args": tool_args,
}
await _wake_main_agent_with_background_result(
ctx=ctx,
base_event=event,
note=note,
background_task_result=background_task_result,
)
```
This keeps all current behavior, but:
- Removes duplicated ProviderRequest/history/wake-up logic.
- Shrinks `_do_handoff_background` to a short, single-responsibility orchestration method.
- Makes future changes to background wake-up behavior apply to handoff and normal background tasks in one place.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| req = ProviderRequest() | ||
| conv = await _get_session_conv(event=cron_event, plugin_context=ctx) | ||
| req.conversation = conv | ||
| context = json.loads(conv.history) |
There was a problem hiding this comment.
issue (bug_risk): 考虑让 json.loads(conv.history) 对异常或不合法的历史数据更加健壮。
如果 conv.history 可能为空、为 None,或者不是合法 JSON(例如来源于旧数据或部分写入),json.loads 会抛异常并中断后台交接流程。建议用 try/except 包裹,或者先检查是否为有效且非空的 JSON 字符串,并在历史数据无效时将其视为“无上下文(no context)”,以便后台任务仍然可以完成。
Original comment in English
issue (bug_risk): Consider making json.loads(conv.history) more robust against malformed or empty history.
If conv.history can ever be empty, None, or non-JSON (e.g., from older data or partial writes), json.loads will raise and stop the background handoff flow. Consider wrapping this in a try/except or checking for a valid, non-empty JSON string first, and treating invalid history as “no context” so the mission can still complete.
| yield mcp.types.CallToolResult(content=[text_content]) | ||
|
|
||
| @classmethod | ||
| async def _do_handoff_background( |
There was a problem hiding this comment.
issue (complexity): 建议通过提取共享的后台执行和唤醒主 agent 逻辑到可复用的 helper,中幅重构 _do_handoff_background,让该方法只负责编排流程。
通过把通用的“运行子 agent + 用 background_task_result 唤醒主 agent”的编排逻辑拆分成小的 helper,并复用 _execute_background 中已有的模式,可以显著降低 _do_handoff_background 的复杂度和重复度。
1. 分离交接结果收集逻辑
下面这段逻辑:
result_text = ""
try:
async for r in cls._execute_handoff(tool, run_context, **tool_args):
if isinstance(r, mcp.types.CallToolResult):
for content in r.content:
if isinstance(content, mcp.types.TextContent):
result_text += content.text + "\n"
except Exception as e:
result_text = (
f"error: Background handoff execution failed, internal error: {e!s}"
)可以移动到一个职责单一的 helper 中,这样 _do_handoff_background 不再同时处理流式结果与流程编排:
@classmethod
async def _collect_handoff_result(
cls,
tool: HandoffTool,
run_context: ContextWrapper[AstrAgentContext],
**tool_args: Any,
) -> str:
result_text = ""
try:
async for r in cls._execute_handoff(tool, run_context, **tool_args):
if isinstance(r, mcp.types.CallToolResult):
for content in r.content:
if isinstance(content, mcp.types.TextContent):
result_text += content.text + "\n"
except Exception as e:
result_text = (
f"error: Background handoff execution failed, internal error: {e!s}"
)
return result_text然后 _do_handoff_background 只需调用:
result_text = await cls._collect_handoff_result(tool, run_context, **tool_args)2. 复用通用的“用后台结果唤醒主 agent”逻辑
_do_handoff_background 的大部分逻辑和 _execute_background / 通用后台模式相似(构建 CronMessageEvent、配置 ProviderRequest、挂载历史、追加 BACKGROUND_TASK_RESULT_WOKE_SYSTEM_PROMPT、运行 agent、持久化历史)。
可以将这部分逻辑抽成一个通用工具函数,让 _execute_background 与 _do_handoff_background 都调用它:
async def _wake_main_agent_with_background_result(
*,
ctx: AstrAgentContext,
base_event: AstrMessageEvent,
note: str,
background_task_result: dict[str, Any],
) -> None:
from astrbot.core.astr_main_agent import (
MainAgentBuildConfig,
_get_session_conv,
build_main_agent,
)
session = MessageSession.from_str(base_event.unified_msg_origin)
cron_event = CronMessageEvent(
context=ctx,
session=session,
message=note,
extras={"background_task_result": background_task_result},
message_type=session.message_type,
)
cron_event.role = base_event.role
req = ProviderRequest()
conv = await _get_session_conv(event=cron_event, plugin_context=ctx)
req.conversation = conv
context = json.loads(conv.history)
if context:
req.contexts = context
context_dump = req._print_friendly_context()
req.contexts = []
req.system_prompt += (
"\n\nBellow is you and user previous conversation history:\n"
f"{context_dump}"
)
bg = json.dumps(background_task_result, ensure_ascii=False)
req.system_prompt += BACKGROUND_TASK_RESULT_WOKE_SYSTEM_PROMPT.format(
background_task_result=bg
)
req.prompt = (
"Proceed according to your system instructions. "
"Output using same language as previous conversation."
" After completing your task, summarize and output your actions and results."
)
if not req.func_tool:
req.func_tool = ToolSet()
req.func_tool.add_tool(SEND_MESSAGE_TO_USER_TOOL)
config = MainAgentBuildConfig(tool_call_timeout=3600)
result = await build_main_agent(
event=cron_event, plugin_context=ctx, config=config, req=req
)
if not result:
logger.error("Failed to build main agent for background mission.")
return
runner = result.agent_runner
async for _ in runner.step_until_done(30):
pass
llm_resp = runner.get_final_llm_resp()
summary_note = _build_background_summary_note(
tool_name=background_task_result.get("tool_name"),
subagent_name=background_task_result.get("subagent_name"),
task_id=background_task_result.get("task_id"),
result_text=background_task_result.get("result"),
llm_resp=llm_resp,
)
await persist_agent_history(
ctx.conversation_manager,
event=cron_event,
req=req,
summary_note=summary_note,
)
if not llm_resp:
logger.warning("background mission agent got no response")3. 抽取 summary note 构造逻辑
将当前内联构造字符串的逻辑:
task_meta = extras.get("background_task_result", {})
summary_note = (
f"[BackgroundMission] {task_meta.get('subagent_name', tool.agent.name)} "
f"(task_id={task_meta.get('task_id', task_id)}) finished. "
f"Result: {task_meta.get('result') or result_text or 'no content'}"
)
if llm_resp and llm_resp.completion_text:
summary_note += (
f"I finished the task, here is the result: {llm_resp.completion_text}"
)提取成一个 helper,同时供两种后台路径使用:
def _build_background_summary_note(
*,
tool_name: str | None,
subagent_name: str | None,
task_id: str,
result_text: str,
llm_resp: Any | None,
) -> str:
base = (
f"[BackgroundMission] {subagent_name or tool_name or 'background_task'} "
f"(task_id={task_id}) finished. "
f"Result: {result_text or 'no content'}"
)
if llm_resp and getattr(llm_resp, "completion_text", None):
return (
base
+ " I finished the task, here is the result: "
+ llm_resp.completion_text
)
return base4. 让 _do_handoff_background 只负责流程编排
有了上述 helper 之后,_do_handoff_background 就可以简化为只做“接线”工作:
@classmethod
async def _do_handoff_background(
cls,
tool: HandoffTool,
run_context: ContextWrapper<AstrAgentContext],
task_id: str,
**tool_args,
) -> None:
event = run_context.context.event
ctx = run_context.context.context
result_text = await cls._collect_handoff_result(tool, run_context, **tool_args)
note = (
event.get_extra("background_note")
or f"Background subagent mission '{tool.agent.name}' finished."
)
background_task_result = {
"task_id": task_id,
"tool_name": tool.name,
"subagent_name": tool.agent.name,
"result": result_text or "",
"tool_args": tool_args,
}
await _wake_main_agent_with_background_result(
ctx=ctx,
base_event=event,
note=note,
background_task_result=background_task_result,
)这样可以在保持现有行为的前提下:
- 去除重复的 ProviderRequest / 历史处理 / 唤醒逻辑;
- 将
_do_handoff_background缩减为一个简短、单一职责的编排方法; - 让未来对后台唤醒行为的修改可以同时作用于交接任务和普通后台任务。
Original comment in English
issue (complexity): Consider refactoring _do_handoff_background by extracting shared background-execution and wake-up logic into reusable helpers so the method only orchestrates the flow.
You can significantly reduce complexity and duplication in _do_handoff_background by extracting the generic “run subagent + wake main agent with background_task_result” orchestration into small helpers and reusing the same pattern as _execute_background.
1. Isolate the handoff result collection
All this logic:
result_text = ""
try:
async for r in cls._execute_handoff(tool, run_context, **tool_args):
if isinstance(r, mcp.types.CallToolResult):
for content in r.content:
if isinstance(content, mcp.types.TextContent):
result_text += content.text + "\n"
except Exception as e:
result_text = (
f"error: Background handoff execution failed, internal error: {e!s}"
)can be moved to a focused helper so _do_handoff_background no longer mixes streaming handling with orchestration:
@classmethod
async def _collect_handoff_result(
cls,
tool: HandoffTool,
run_context: ContextWrapper[AstrAgentContext],
**tool_args: Any,
) -> str:
result_text = ""
try:
async for r in cls._execute_handoff(tool, run_context, **tool_args):
if isinstance(r, mcp.types.CallToolResult):
for content in r.content:
if isinstance(content, mcp.types.TextContent):
result_text += content.text + "\n"
except Exception as e:
result_text = (
f"error: Background handoff execution failed, internal error: {e!s}"
)
return result_textThen _do_handoff_background calls:
result_text = await cls._collect_handoff_result(tool, run_context, **tool_args)2. Reuse generic “wake main agent with background result” logic
Most of _do_handoff_background mirrors _execute_background / the general background pattern (build CronMessageEvent, configure ProviderRequest, attach history, add BACKGROUND_TASK_RESULT_WOKE_SYSTEM_PROMPT, run agent, persist history).
Move that into a reusable utility that both _execute_background and _do_handoff_background can call:
async def _wake_main_agent_with_background_result(
*,
ctx: AstrAgentContext,
base_event: AstrMessageEvent,
note: str,
background_task_result: dict[str, Any],
) -> None:
from astrbot.core.astr_main_agent import (
MainAgentBuildConfig,
_get_session_conv,
build_main_agent,
)
session = MessageSession.from_str(base_event.unified_msg_origin)
cron_event = CronMessageEvent(
context=ctx,
session=session,
message=note,
extras={"background_task_result": background_task_result},
message_type=session.message_type,
)
cron_event.role = base_event.role
req = ProviderRequest()
conv = await _get_session_conv(event=cron_event, plugin_context=ctx)
req.conversation = conv
context = json.loads(conv.history)
if context:
req.contexts = context
context_dump = req._print_friendly_context()
req.contexts = []
req.system_prompt += (
"\n\nBellow is you and user previous conversation history:\n"
f"{context_dump}"
)
bg = json.dumps(background_task_result, ensure_ascii=False)
req.system_prompt += BACKGROUND_TASK_RESULT_WOKE_SYSTEM_PROMPT.format(
background_task_result=bg
)
req.prompt = (
"Proceed according to your system instructions. "
"Output using same language as previous conversation."
" After completing your task, summarize and output your actions and results."
)
if not req.func_tool:
req.func_tool = ToolSet()
req.func_tool.add_tool(SEND_MESSAGE_TO_USER_TOOL)
config = MainAgentBuildConfig(tool_call_timeout=3600)
result = await build_main_agent(
event=cron_event, plugin_context=ctx, config=config, req=req
)
if not result:
logger.error("Failed to build main agent for background mission.")
return
runner = result.agent_runner
async for _ in runner.step_until_done(30):
pass
llm_resp = runner.get_final_llm_resp()
summary_note = _build_background_summary_note(
tool_name=background_task_result.get("tool_name"),
subagent_name=background_task_result.get("subagent_name"),
task_id=background_task_result.get("task_id"),
result_text=background_task_result.get("result"),
llm_resp=llm_resp,
)
await persist_agent_history(
ctx.conversation_manager,
event=cron_event,
req=req,
summary_note=summary_note,
)
if not llm_resp:
logger.warning("background mission agent got no response")3. Extract summary-note construction
Lift this inline string-building:
task_meta = extras.get("background_task_result", {})
summary_note = (
f"[BackgroundMission] {task_meta.get('subagent_name', tool.agent.name)} "
f"(task_id={task_meta.get('task_id', task_id)}) finished. "
f"Result: {task_meta.get('result') or result_text or 'no content'}"
)
if llm_resp and llm_resp.completion_text:
summary_note += (
f"I finished the task, here is the result: {llm_resp.completion_text}"
)into a helper used by both background paths:
def _build_background_summary_note(
*,
tool_name: str | None,
subagent_name: str | None,
task_id: str,
result_text: str,
llm_resp: Any | None,
) -> str:
base = (
f"[BackgroundMission] {subagent_name or tool_name or 'background_task'} "
f"(task_id={task_id}) finished. "
f"Result: {result_text or 'no content'}"
)
if llm_resp and getattr(llm_resp, "completion_text", None):
return (
base
+ " I finished the task, here is the result: "
+ llm_resp.completion_text
)
return base4. Simplify _do_handoff_background to orchestration only
With the above helpers, _do_handoff_background reduces to wiring:
@classmethod
async def _do_handoff_background(
cls,
tool: HandoffTool,
run_context: ContextWrapper[AstrAgentContext],
task_id: str,
**tool_args,
) -> None:
event = run_context.context.event
ctx = run_context.context.context
result_text = await cls._collect_handoff_result(tool, run_context, **tool_args)
note = (
event.get_extra("background_note")
or f"Background subagent mission '{tool.agent.name}' finished."
)
background_task_result = {
"task_id": task_id,
"tool_name": tool.name,
"subagent_name": tool.agent.name,
"result": result_text or "",
"tool_args": tool_args,
}
await _wake_main_agent_with_background_result(
ctx=ctx,
base_event=event,
note=note,
background_task_result=background_task_result,
)This keeps all current behavior, but:
- Removes duplicated ProviderRequest/history/wake-up logic.
- Shrinks
_do_handoff_backgroundto a short, single-responsibility orchestration method. - Makes future changes to background wake-up behavior apply to handoff and normal background tasks in one place.
为 SubAgent 的 transfer_to_* 工具添加 background_mission 参数,由 LLM 在调用时自行判断是否以后台任务模式执行。设为 true 时立即返回任务 ID,subagent 异步执行,完成后通过已有的后台任务唤醒机制通知用户结果。
Modifications / 改动点
astrbot/core/agent/handoff.py:default_parameters()新增background_missionboolean 参数。astrbot/core/astr_agent_tool_exec.py:从tool_args读取该参数,为true时立即返回 task ID 并异步执行 subagent,完成后通过CronMessageEvent唤醒主 agent 通知用户结果。This is NOT a breaking change. / 这不是一个破坏性变更。
Screenshots or Test Results / 运行截图或测试结果
Checklist / 检查清单
requirements.txt和pyproject.toml文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations inrequirements.txtandpyproject.toml.Summary by Sourcery
为子代理(subagent)接管添加后台任务(background mission)支持,使长时间运行的子代理任务能够异步执行,同时立即返回任务标识符,并在完成后通知用户。
New Features:
background_mission布尔参数,用于选择启用后台执行模式。task_id,并在子代理完成后通过现有的后台任务唤醒流程通知用户。Enhancements:
Original summary in English
Summary by Sourcery
Add background mission support for subagent handoffs, allowing long-running subagent tasks to execute asynchronously while immediately returning a task identifier and notifying the user upon completion.
New Features:
Enhancements: