Skip to content
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,19 +223,21 @@ Connect workers to external [MCP](https://modelcontextprotocol.io/) (Model Conte
- **API management** — full CRUD API under `/api/mcp/` for managing server definitions and monitoring connection status programmatically

```toml
[[mcp_servers]]
[[defaults.mcp]]
name = "filesystem"
transport = "stdio"
command = "npx"
args = ["-y", "@modelcontextprotocol/server-filesystem", "/workspace"]

[[mcp_servers]]
[[defaults.mcp]]
name = "sentry"
transport = "http"
url = "https://mcp.sentry.io"
headers = { Authorization = "Bearer ${SENTRY_TOKEN}" }
```

Legacy `[[mcp_servers]]` entries are still loaded for compatibility, but `[[defaults.mcp]]` is the canonical location.

---

## How It Works
Expand Down
4 changes: 3 additions & 1 deletion docs/design-docs/mcp.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ Per-agent visibility endpoints:
- `GET /api/agents/mcp` — list configured MCP servers and their connection status
- `POST /api/agents/mcp/reconnect` — force reconnect a specific server by name

CRUD endpoints for managing `[[mcp_servers]]` in config.toml:
CRUD endpoints for managing `[[defaults.mcp]]` in config.toml:

- `GET /api/mcp/servers` — list all configured servers with live connection state
- `POST /api/mcp/servers` — add a new server definition to config.toml
Expand All @@ -255,6 +255,8 @@ CRUD endpoints for managing `[[mcp_servers]]` in config.toml:
- `POST /api/mcp/servers/{name}/reconnect` — force-reconnect a specific server across all agents
- `GET /api/mcp/status` — per-agent connection status

Legacy `[[mcp_servers]]` entries are accepted for backward compatibility during config loading, but new writes and API edits target `[[defaults.mcp]]`.

### Shutdown

`McpManager::disconnect_all()` called during agent shutdown, before database cleanup. Kills child processes, closes HTTP sessions.
Expand Down
85 changes: 54 additions & 31 deletions src/agent/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,20 @@ impl ChannelState {
/// Cancel a running worker by aborting its tokio task and cleaning up state.
/// Returns an error message if the worker is not found.
pub async fn cancel_worker(&self, worker_id: WorkerId) -> std::result::Result<(), String> {
let handle = self.worker_handles.write().await.remove(&worker_id);
let removed = self
.active_workers
.write()
.await
.remove(&worker_id)
.is_some();
let handle = self.worker_handles.write().await.remove(&worker_id);
self.worker_inputs.write().await.remove(&worker_id);
let status_removed = self.status_block.write().await.remove_worker(worker_id);

if let Some(handle) = handle {
handle.abort();
Ok(())
} else if removed {
} else if removed || status_removed {
// Worker was in active_workers but had no handle (shouldn't happen, but handle gracefully)
Ok(())
} else {
Expand Down Expand Up @@ -391,8 +392,11 @@ impl Channel {

if messages.len() == 1 {
// Single message - process normally
let message = messages.into_iter().next().unwrap();
self.handle_message(message).await
if let Some(message) = messages.into_iter().next() {
self.handle_message(message).await
} else {
Ok(())
}
} else {
// Multiple messages - batch them
self.handle_message_batch(messages).await
Expand Down Expand Up @@ -462,11 +466,18 @@ impl Channel {
.get("telegram_chat_type")
.and_then(|v| v.as_str())
});
self.conversation_context = Some(
prompt_engine
.render_conversation_context(&first.source, server_name, channel_name)
.expect("failed to render conversation context"),
);
match prompt_engine.render_conversation_context(
&first.source,
server_name,
channel_name,
) {
Ok(context) => {
self.conversation_context = Some(context);
}
Err(error) => {
tracing::warn!(%error, "failed to render conversation context");
}
}
}

// Persist each message to conversation log (individual audit trail)
Expand Down Expand Up @@ -557,7 +568,7 @@ impl Channel {
// Build system prompt with coalesce hint
let system_prompt = self
.build_system_prompt_with_coalesce(message_count, elapsed_secs, unique_sender_count)
.await;
.await?;

{
let mut reply_target = self.state.reply_target_message_id.write().await;
Expand Down Expand Up @@ -594,7 +605,7 @@ impl Channel {
message_count: usize,
elapsed_secs: f64,
unique_senders: usize,
) -> String {
) -> Result<String> {
let rc = &self.deps.runtime_config;
let prompt_engine = rc.prompts.load();

Expand All @@ -608,7 +619,7 @@ impl Channel {
let opencode_enabled = rc.opencode.load().enabled;
let worker_capabilities = prompt_engine
.render_worker_capabilities(browser_enabled, web_search_enabled, opencode_enabled)
.expect("failed to render worker capabilities");
.map_err(|error| AgentError::Other(error.into()))?;

let status_text = {
let status = self.state.status_block.read().await;
Expand All @@ -625,7 +636,7 @@ impl Channel {

let empty_to_none = |s: String| if s.is_empty() { None } else { Some(s) };

prompt_engine
Ok(prompt_engine
.render_channel_prompt(
empty_to_none(identity_context),
empty_to_none(memory_bulletin.to_string()),
Expand All @@ -636,7 +647,7 @@ impl Channel {
coalesce_hint,
available_channels,
)
.expect("failed to render channel prompt")
.map_err(|error| AgentError::Other(error.into()))?)
}

/// Handle an incoming message by running the channel's LLM agent loop.
Expand Down Expand Up @@ -716,14 +727,21 @@ impl Channel {
.get("telegram_chat_type")
.and_then(|v| v.as_str())
});
self.conversation_context = Some(
prompt_engine
.render_conversation_context(&message.source, server_name, channel_name)
.expect("failed to render conversation context"),
);
match prompt_engine.render_conversation_context(
&message.source,
server_name,
channel_name,
) {
Ok(context) => {
self.conversation_context = Some(context);
}
Err(error) => {
tracing::warn!(%error, "failed to render conversation context");
}
}
}

let system_prompt = self.build_system_prompt().await;
let system_prompt = self.build_system_prompt().await?;

{
let mut reply_target = self.state.reply_target_message_id.write().await;
Expand Down Expand Up @@ -795,7 +813,7 @@ impl Channel {
}

/// Assemble the full system prompt using the PromptEngine.
async fn build_system_prompt(&self) -> String {
async fn build_system_prompt(&self) -> Result<String> {
let rc = &self.deps.runtime_config;
let prompt_engine = rc.prompts.load();

Expand All @@ -809,7 +827,7 @@ impl Channel {
let opencode_enabled = rc.opencode.load().enabled;
let worker_capabilities = prompt_engine
.render_worker_capabilities(browser_enabled, web_search_enabled, opencode_enabled)
.expect("failed to render worker capabilities");
.map_err(|error| AgentError::Other(error.into()))?;

let status_text = {
let status = self.state.status_block.read().await;
Expand All @@ -820,7 +838,7 @@ impl Channel {

let empty_to_none = |s: String| if s.is_empty() { None } else { Some(s) };

prompt_engine
Ok(prompt_engine
.render_channel_prompt(
empty_to_none(identity_context),
empty_to_none(memory_bulletin.to_string()),
Expand All @@ -831,7 +849,7 @@ impl Channel {
None, // coalesce_hint - only set for batched messages
available_channels,
)
.expect("failed to render channel prompt")
.map_err(|error| AgentError::Other(error.into()))?)
}

/// Register per-turn tools, run the LLM agentic loop, and clean up.
Expand Down Expand Up @@ -1147,8 +1165,10 @@ impl Channel {
for (key, value) in retrigger_metadata {
self.pending_retrigger_metadata.insert(key, value);
}
self.retrigger_deadline =
Some(tokio::time::Instant::now() + std::time::Duration::from_millis(RETRIGGER_DEBOUNCE_MS));
self.retrigger_deadline = Some(
tokio::time::Instant::now()
+ std::time::Duration::from_millis(RETRIGGER_DEBOUNCE_MS),
);
}
}

Expand Down Expand Up @@ -1183,7 +1203,10 @@ impl Channel {
.prompts
.load()
.render_system_retrigger()
.expect("failed to render retrigger message");
.unwrap_or_else(|error| {
tracing::warn!(%error, "failed to render retrigger message");
"Background work completed; continue processing.".to_string()
});

let synthetic = InboundMessage {
id: uuid::Uuid::new_v4().to_string(),
Expand Down Expand Up @@ -1255,7 +1278,7 @@ pub async fn spawn_branch_from_state(
&rc.instance_dir.display().to_string(),
&rc.workspace_dir.display().to_string(),
)
.expect("failed to render branch prompt");
.map_err(|error| AgentError::Other(error.into()))?;

spawn_branch(
state,
Expand All @@ -1279,10 +1302,10 @@ async fn spawn_memory_persistence_branch(
let prompt_engine = deps.runtime_config.prompts.load();
let system_prompt = prompt_engine
.render_static("memory_persistence")
.expect("failed to render memory_persistence prompt");
.map_err(|error| AgentError::Other(error.into()))?;
let prompt = prompt_engine
.render_system_memory_persistence()
.expect("failed to render memory persistence prompt");
.map_err(|error| AgentError::Other(error.into()))?;

spawn_branch(
state,
Expand Down Expand Up @@ -1413,7 +1436,7 @@ pub async fn spawn_worker_from_state(
&rc.instance_dir.display().to_string(),
&rc.workspace_dir.display().to_string(),
)
.expect("failed to render worker prompt");
.map_err(|error| AgentError::Other(error.into()))?;
let skills = rc.skills.load();
let browser_config = (**rc.browser_config.load()).clone();
let brave_search_key = (**rc.brave_search_key.load()).clone();
Expand Down
20 changes: 14 additions & 6 deletions src/agent/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,19 @@ impl Compactor {
let channel_id = self.channel_id.clone();
let deps = self.deps.clone();
let prompt_engine = deps.runtime_config.prompts.load();
let compactor_prompt = prompt_engine
.render_static("compactor")
.expect("failed to render compactor prompt");
let compactor_prompt = match prompt_engine.render_static("compactor") {
Ok(prompt) => prompt,
Err(error) => {
tracing::error!(
channel_id = %self.channel_id,
%error,
"failed to render compactor prompt"
);
let mut flag = self.is_compacting.write().await;
*flag = false;
return;
}
};

tokio::spawn(async move {
let result = run_compaction(&deps, &compactor_prompt, &history, fraction).await;
Expand Down Expand Up @@ -155,9 +165,7 @@ impl Compactor {

// Insert a marker at the beginning
let prompt_engine = self.deps.runtime_config.prompts.load();
let marker = prompt_engine
.render_system_truncation(remove_count)
.expect("failed to render truncation message");
let marker = prompt_engine.render_system_truncation(remove_count)?;
history.insert(0, Message::from(marker));

tracing::warn!(
Expand Down
14 changes: 14 additions & 0 deletions src/agent/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,20 @@ impl StatusBlock {
});
}

/// Remove an active worker by ID.
pub fn remove_worker(&mut self, worker_id: WorkerId) -> bool {
if let Some(position) = self
.active_workers
.iter()
.position(|worker| worker.id == worker_id)
{
self.active_workers.remove(position);
true
} else {
false
}
}

/// Render the status block as a string for context injection.
pub fn render(&self) -> String {
let mut output = String::new();
Expand Down
31 changes: 25 additions & 6 deletions src/agent/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,10 @@ impl Worker {
None
}
})
.unwrap_or_else(|| "Worker reached maximum segments without a final response.".to_string());
.unwrap_or_else(|| {
"Worker reached maximum segments without a final response."
.to_string()
});
}

self.maybe_compact_history(&mut history).await;
Expand Down Expand Up @@ -357,10 +360,19 @@ impl Worker {
self.hook.send_status("compacting (overflow recovery)");
self.force_compact_history(&mut history).await;
let prompt_engine = self.deps.runtime_config.prompts.load();
let overflow_msg = prompt_engine
.render_system_worker_overflow()
.expect("failed to render worker overflow message");
follow_up_prompt = format!("{follow_up}\n\n{overflow_msg}");
match prompt_engine.render_system_worker_overflow() {
Ok(overflow_msg) => {
follow_up_prompt = format!("{follow_up}\n\n{overflow_msg}");
}
Err(render_error) => {
tracing::warn!(
worker_id = %self.id,
%render_error,
"failed to render worker overflow message"
);
follow_up_prompt = follow_up.clone();
}
}
}
Err(error) => {
self.write_failure_log(&history, &format!("follow-up failed: {error}"));
Expand Down Expand Up @@ -451,7 +463,14 @@ impl Worker {
let prompt_engine = self.deps.runtime_config.prompts.load();
let marker = prompt_engine
.render_system_worker_compact(remove_count, &recap)
.expect("failed to render worker compact message");
.unwrap_or_else(|error| {
tracing::warn!(
worker_id = %self.id,
%error,
"failed to render worker compact message"
);
format!("[Compacted worker history: removed {remove_count} messages]")
});
history.insert(0, rig::message::Message::from(marker));

tracing::info!(
Expand Down
Loading