Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 106 additions & 21 deletions pi/skills/debug-agent/debug-dashboard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ interface DashboardData {
baudbotSha: string | null;
bridgeUp: boolean;
bridgeType: string | null;
sessions: { name: string; alive: boolean }[];
bridgeUptimeMs: number | null;
sessions: { name: string; alive: boolean; uptimeMs: number | null }[];
devAgentCount: number;
devAgentNames: string[];
todosInProgress: number;
todosDone: number;
todosTotal: number;
worktreeCount: number;
uptimeMs: number;
serviceUptimeMs: number | null;
lastRefresh: Date;
heartbeat: HeartbeatInfo;
lastEvent: LastEvent | null;
Expand Down Expand Up @@ -142,6 +143,37 @@ function detectBridgeType(): string | null {
}
}

function getBridgeUptime(): number | null {
try {
const out = execSync("ps -eo etime,cmd 2>/dev/null | grep -E 'broker-bridge|bridge\\.mjs' | grep -v grep", {
encoding: "utf-8", timeout: 3000,
}).trim();
if (!out) return null;

// Parse etime format: [[dd-]hh:]mm:ss
const etimeStr = out.split(/\s+/)[0];
const parts = etimeStr.split(/[-:]/);

let seconds = 0;
if (parts.length === 4) {
// dd-hh:mm:ss
seconds = parseInt(parts[0]) * 86400 + parseInt(parts[1]) * 3600 + parseInt(parts[2]) * 60 + parseInt(parts[3]);
} else if (parts.length === 3) {
// hh:mm:ss
seconds = parseInt(parts[0]) * 3600 + parseInt(parts[1]) * 60 + parseInt(parts[2]);
} else if (parts.length === 2) {
// mm:ss
seconds = parseInt(parts[0]) * 60 + parseInt(parts[1]);
} else {
return null;
}

return seconds * 1000;
} catch {
return null;
}
}

async function checkBridge(): Promise<boolean> {
try {
const controller = new AbortController();
Expand All @@ -159,29 +191,55 @@ async function checkBridge(): Promise<boolean> {
}
}

function getSessions(): { name: string; alive: boolean }[] {
const results: { name: string; alive: boolean }[] = [];
function getSessionUptime(sessionName: string): number | null {
try {
const aliasFile = join(SOCKET_DIR, `${sessionName}.alias`);
const target = readlinkSync(aliasFile);
const sessionId = basename(target, ".sock");

// Find session file
const subdirs = readdirSync(SESSION_DIR);
for (const subdir of subdirs) {
const dirPath = join(SESSION_DIR, subdir);
try {
const files = readdirSync(dirPath);
const match = files.find((f) => f.includes(sessionId) && f.endsWith(".jsonl"));
if (match) {
const filePath = join(dirPath, match);
const stat = statSync(filePath);
return Date.now() - stat.birthtimeMs;
}
} catch { continue; }
}
} catch {}
return null;
}

function getSessions(): { name: string; alive: boolean; uptimeMs: number | null }[] {
const results: { name: string; alive: boolean; uptimeMs: number | null }[] = [];
const expected = ["control-agent", "sentry-agent"];
try {
const files = readdirSync(SOCKET_DIR);
const aliases = files.filter((f) => f.endsWith(".alias"));
for (const alias of expected) {
const aliasFile = `${alias}.alias`;
if (!aliases.includes(aliasFile)) {
results.push({ name: alias, alive: false });
results.push({ name: alias, alive: false, uptimeMs: null });
continue;
}
try {
const target = readlinkSync(join(SOCKET_DIR, aliasFile));
const sockPath = join(SOCKET_DIR, target);
results.push({ name: alias, alive: existsSync(sockPath) });
const alive = existsSync(sockPath);
const uptimeMs = alive ? getSessionUptime(alias) : null;
results.push({ name: alias, alive, uptimeMs });
} catch {
results.push({ name: alias, alive: false });
results.push({ name: alias, alive: false, uptimeMs: null });
}
}
} catch {
for (const alias of expected) {
results.push({ name: alias, alive: false });
results.push({ name: alias, alive: false, uptimeMs: null });
}
}
return results;
Expand Down Expand Up @@ -230,6 +288,24 @@ function getWorktreeCount(): number {
}
}

function getServiceUptime(): number | null {
try {
const out = execSync("systemctl show baudbot --property=ActiveEnterTimestamp --value 2>/dev/null", {
encoding: "utf-8",
timeout: 3000,
}).trim();

if (!out || out === "" || out === "0") return null;

const startTime = new Date(out);
if (isNaN(startTime.getTime())) return null;

return Date.now() - startTime.getTime();
} catch {
return null;
}
}

function readHeartbeatState(ctx: ExtensionContext): HeartbeatInfo {
const info: HeartbeatInfo = { enabled: true, lastRunAt: null, totalRuns: 0, healthy: true };
for (const entry of ctx.sessionManager.getEntries()) {
Expand Down Expand Up @@ -533,18 +609,29 @@ function renderDashboard(
}

const bridgeIcon = data.bridgeUp ? theme.fg("success", "●") : theme.fg("error", "●");
const bridgeLabel = data.bridgeUp ? "up" : theme.fg("error", "DOWN");
const bridgeTypeStr = data.bridgeType ? dim(` ${data.bridgeType}`) : "";
let bridgeLabel: string;
if (!data.bridgeUp) {
bridgeLabel = theme.fg("error", "bridge DOWN");
} else if (data.bridgeType && data.bridgeUptimeMs !== null) {
bridgeLabel = `bridge ${data.bridgeType} ${dim(`(up ${formatUptime(data.bridgeUptimeMs)})`)}`;
} else if (data.bridgeType) {
bridgeLabel = `bridge ${data.bridgeType}`;
} else {
bridgeLabel = "bridge up";
}

const row1Left = ` baudbot ${bbDisplay} ${dim("│")} pi ${piDisplay} ${dim("│")} ${bridgeIcon} bridge ${bridgeLabel}${bridgeTypeStr}`;
const row1Right = dim(`up ${formatUptime(data.uptimeMs)}`);
lines.push(pad(row1Left, row1Right, width));
const row1Left = ` baudbot ${bbDisplay} ${dim("│")} pi ${piDisplay} ${dim("│")} ${bridgeIcon} ${bridgeLabel}`;
lines.push(pad(row1Left, "", width));

// ── Row 2: sessions ──
// ── Row 2: sessions with uptimes ──
const parts: string[] = [];
for (const s of data.sessions) {
const icon = s.alive ? theme.fg("success", "●") : theme.fg("error", "●");
const label = s.alive ? dim(s.name) : theme.fg("error", s.name);
const name = s.alive ? s.name : theme.fg("error", s.name);
const uptimeStr = s.alive && s.uptimeMs !== null
? dim(`(up ${formatUptime(s.uptimeMs)})`)
: "";
const label = uptimeStr ? `${name} ${uptimeStr}` : name;
parts.push(`${icon} ${label}`);
}
if (data.devAgentCount > 0) {
Expand Down Expand Up @@ -631,17 +718,13 @@ function renderDashboard(
}
}

// ── Bottom border ──
lines.push(truncateToWidth(dim(bar.repeat(width)), width));

return lines;
}

// ── Extension ───────────────────────────────────────────────────────────────

export default function dashboardExtension(pi: ExtensionAPI): void {
let timer: ReturnType<typeof setInterval> | null = null;
const startTime = Date.now();
const piVersion = getPiVersion();

let data: DashboardData | null = null;
Expand All @@ -661,6 +744,8 @@ export default function dashboardExtension(pi: ExtensionAPI): void {
const worktreeCount = getWorktreeCount();
const baudbot = getBaudbotVersion();
const bridgeType = detectBridgeType();
const bridgeUptimeMs = getBridgeUptime();
const serviceUptimeMs = getServiceUptime();
const heartbeat = savedCtx ? readHeartbeatState(savedCtx) : { enabled: true, lastRunAt: null, totalRuns: 0, healthy: true };

data = {
Expand All @@ -670,14 +755,15 @@ export default function dashboardExtension(pi: ExtensionAPI): void {
baudbotSha: baudbot.sha,
bridgeUp,
bridgeType,
bridgeUptimeMs,
sessions,
devAgentCount: devAgents.count,
devAgentNames: devAgents.names,
todosInProgress: todoStats.inProgress,
todosDone: todoStats.done,
todosTotal: todoStats.total,
worktreeCount,
uptimeMs: Date.now() - startTime,
serviceUptimeMs,
lastRefresh: new Date(),
heartbeat,
lastEvent,
Expand All @@ -696,7 +782,6 @@ export default function dashboardExtension(pi: ExtensionAPI): void {
theme.fg("dim", "─".repeat(width)),
];
}
data.uptimeMs = Date.now() - startTime;
return renderDashboard(data, activityFeed.getLines(), theme, width);
},
invalidate() {},
Expand Down
17 changes: 12 additions & 5 deletions slack-bridge/broker-bridge.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -740,11 +740,18 @@ function verifyBrokerEnvelope(message) {
}

function decryptEnvelope(message) {
const plaintext = sodium.crypto_box_seal_open(
fromBase64(message.encrypted),
cryptoState.serverBoxPublicKey,
cryptoState.serverBoxSecretKey,
);
let plaintext;
try {
plaintext = sodium.crypto_box_seal_open(
fromBase64(message.encrypted),
cryptoState.serverBoxPublicKey,
cryptoState.serverBoxSecretKey,
);
} catch (err) {
// Wrap libsodium errors (e.g., "incorrect key pair for the given ciphertext")
// into a format that isPoisonMessageError() can detect
throw new Error(`failed to decrypt broker envelope (message_id: ${message.message_id || "unknown"})`);
}
if (!plaintext) {
throw new Error(`failed to decrypt broker envelope (message_id: ${message.message_id || "unknown"})`);
}
Expand Down
133 changes: 133 additions & 0 deletions test/broker-bridge.integration.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,139 @@ describe("broker pull bridge semi-integration", () => {
expect(valid).toBe(true);
});

it("acks poison messages with decryption failures (wrong keys)", async () => {
await sodium.ready;

let pullCount = 0;
let ackPayload = null;

// Generate valid encryption with mismatched keys to trigger "incorrect key pair"
const wrongBoxKeypair = sodium.crypto_box_keypair();
const serverBoxKeypair = sodium.crypto_box_seed_keypair(new Uint8Array(Buffer.alloc(32, 11)));
const brokerSignKeypair = sodium.crypto_sign_seed_keypair(new Uint8Array(Buffer.alloc(32, 15)));

const payload = JSON.stringify({ source: "slack", type: "message", payload: { text: "test" }, broker_timestamp: 123 });
// Encrypt with WRONG key (wrongBoxKeypair) but bridge will try to decrypt with serverBoxKeypair
const encrypted = sodium.crypto_box_seal(new TextEncoder().encode(payload), wrongBoxKeypair.publicKey);
const encryptedB64 = toBase64(encrypted);

const broker = createServer(async (req, res) => {
if (req.method === "POST" && req.url === "/api/inbox/pull") {
pullCount += 1;
const brokerTimestamp = Math.floor(Date.now() / 1000);

const canonical = canonicalizeEnvelope("T123BROKER", brokerTimestamp, encryptedB64);
const signature = sodium.crypto_sign_detached(canonical, brokerSignKeypair.privateKey);

const messages = pullCount === 1
? [{
message_id: "m-decrypt-fail-1",
workspace_id: "T123BROKER",
encrypted: encryptedB64,
broker_timestamp: brokerTimestamp,
broker_signature: toBase64(signature),
}]
: [];

res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ ok: true, messages }));
return;
}

if (req.method === "POST" && req.url === "/api/inbox/ack") {
let raw = "";
for await (const chunk of req) raw += chunk;
ackPayload = JSON.parse(raw);

res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ ok: true, acked: ackPayload.message_ids?.length ?? 0 }));
return;
}

if (req.method === "POST" && req.url === "/api/send") {
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ ok: true, ts: "1234.5678" }));
return;
}

res.writeHead(404, { "Content-Type": "application/json" });
res.end(JSON.stringify({ ok: false, error: "not found" }));
});

await new Promise((resolve) => broker.listen(0, "127.0.0.1", resolve));
servers.push(broker);

const address = broker.address();
if (!address || typeof address === "string") {
throw new Error("failed to get broker test server address");
}
const brokerUrl = `http://127.0.0.1:${address.port}`;

const testFileDir = path.dirname(fileURLToPath(import.meta.url));
const repoRoot = path.dirname(testFileDir);
const bridgePath = path.join(repoRoot, "slack-bridge", "broker-bridge.mjs");
const bridgeCwd = path.join(repoRoot, "slack-bridge");

let bridgeStdout = "";
let bridgeStderr = "";
let bridgeExit = null;

const bridge = spawn("node", [bridgePath], {
cwd: bridgeCwd,
env: {
...cleanEnv(),
SLACK_BROKER_URL: brokerUrl,
SLACK_BROKER_WORKSPACE_ID: "T123BROKER",
SLACK_BROKER_SERVER_PRIVATE_KEY: toBase64(serverBoxKeypair.privateKey),
SLACK_BROKER_SERVER_PUBLIC_KEY: toBase64(serverBoxKeypair.publicKey),
SLACK_BROKER_SERVER_SIGNING_PRIVATE_KEY: b64(32, 13),
SLACK_BROKER_PUBLIC_KEY: b64(32, 14),
SLACK_BROKER_SIGNING_PUBLIC_KEY: toBase64(brokerSignKeypair.publicKey),
SLACK_BROKER_ACCESS_TOKEN: "test-broker-token",
SLACK_ALLOWED_USERS: "U_ALLOWED",
SLACK_BROKER_POLL_INTERVAL_MS: "50",
BRIDGE_API_PORT: "0",
},
stdio: ["ignore", "pipe", "pipe"],
});

bridge.stdout.on("data", (chunk) => {
bridgeStdout += chunk.toString();
});
bridge.stderr.on("data", (chunk) => {
bridgeStderr += chunk.toString();
});
const bridgeExited = new Promise((_, reject) => {
bridge.on("error", (err) => {
if (ackPayload !== null) return;
reject(new Error(`bridge spawn error: ${err.message}; stdout=${bridgeStdout}; stderr=${bridgeStderr}`));
});
bridge.on("exit", (code, signal) => {
bridgeExit = { code, signal };
if (ackPayload !== null) return;
reject(new Error(`bridge exited early: code=${code} signal=${signal}; stdout=${bridgeStdout}; stderr=${bridgeStderr}`));
});
});

children.push(bridge);

const ackWait = waitFor(
() => ackPayload !== null,
10_000,
50,
`timeout waiting for ack; pullCount=${pullCount}; exit=${JSON.stringify(bridgeExit)}; stdout=${bridgeStdout}; stderr=${bridgeStderr}`,
);

await Promise.race([ackWait, bridgeExited]);

expect(ackPayload.workspace_id).toBe("T123BROKER");
expect(ackPayload.protocol_version).toBe("2026-02-1");
expect(ackPayload.message_ids).toContain("m-decrypt-fail-1");

// Verify that the bridge logged a decryption error before acking
expect(bridgeStderr).toContain("failed to decrypt");
});

it("forwards user messages to agent in fire-and-forget mode without get_message/turn_end RPCs", async () => {
await sodium.ready;

Expand Down