singularity-forge/src/resources/extensions/subagent/index.js

2614 lines
77 KiB
JavaScript

/**
* Subagent Tool - Delegate tasks to specialized agents
*
* Spawns a separate `pi` process for each subagent invocation,
* giving it an isolated context window.
*
* Supports four modes:
* - Single: { agent: "name", task: "..." }
* - Parallel: { tasks: [{ agent: "name", task: "..." }, ...] }
* - Debate: { mode: "debate", rounds: 3, tasks: [{ agent: "name", task: "..." }, ...] }
* - Chain: { chain: [{ agent: "name", task: "... {previous} ..." }, ...] }
*
* Uses JSON mode to capture structured output from subagents.
*/
import { spawn } from "node:child_process";
import * as crypto from "node:crypto";
import * as fs from "node:fs";
import * as os from "node:os";
import * as path from "node:path";
import { Type } from "@sinclair/typebox";
import { StringEnum } from "@singularity-forge/pi-ai";
import { getMarkdownTheme } from "@singularity-forge/pi-coding-agent";
import { Container, Markdown, Spacer, Text } from "@singularity-forge/pi-tui";
import { CmuxClient, shellEscape } from "../cmux/index.js";
import {
buildSiftEnv,
ensureSiftRuntimeDirs,
resolveSiftBinary,
resolveSiftSearchScope,
} from "../sf/code-intelligence.js";
import { loadEffectiveSFPreferences } from "../sf/preferences.js";
import { recordRetrievalEvidence } from "../sf/retrieval-evidence.js";
import { formatTokenCount } from "../shared/mod.js";
import { getCurrentPhase } from "../shared/sf-phase-state.js";
import { discoverAgents } from "./agents.js";
import { SubagentBackgroundJobManager } from "./background-jobs.js";
import {
createIsolation,
mergeDeltaPatches,
readIsolationMode,
} from "./isolation.js";
import { registerWorker, updateWorker } from "./worker-registry.js";
const MAX_PARALLEL_TASKS = 8;
const MAX_CONCURRENCY = 4;
const COLLAPSED_ITEM_COUNT = 10;
/**
* Bounds Sift-backed code search so a failed model/runtime path cannot leave the
* TUI showing an eternal running tool.
*
* Purpose: keep codebase exploration responsive when Sift stalls, builds a cold
* cache, or waits on an unavailable local model.
*
* Consumer: the `codebase_search` extension tool registered below.
*/
const CODEBASE_SEARCH_TIMEOUT_MS = 120_000;
const liveSubagentProcesses = new Set();
const AGENT_ALIASES = {
default: "worker",
code: "reviewer",
coder: "typescript-pro",
"product-manager": "planner",
"user-advocate": "reviewer",
"customer-panel": "reviewer",
business: "planner",
"delivery-lead": "planner",
partner: "reviewer",
combatant: "reviewer",
architect: "planner",
moderator: "planner",
["g" + "sd-executor"]: "worker",
"sf-worker": "worker",
"sf-scout": "scout",
"sf-reviewer": "reviewer",
};
function resolveAgentByName(agents, agentName) {
const direct = agents.find((a) => a.name === agentName);
if (direct) return { agent: direct, effectiveName: agentName };
const alias = AGENT_ALIASES[agentName];
if (!alias) return { agent: undefined, effectiveName: agentName };
return {
agent: agents.find((a) => a.name === alias),
effectiveName: alias,
};
}
async function stopLiveSubagents() {
const active = Array.from(liveSubagentProcesses);
if (active.length === 0) return;
for (const proc of active) {
try {
proc.kill("SIGTERM");
} catch {
/* ignore */
}
}
await Promise.all(
active.map(
(proc) =>
new Promise((resolve) => {
const done = () => resolve();
const timer = setTimeout(done, 500);
proc.once("exit", () => {
clearTimeout(timer);
resolve();
});
}),
),
);
for (const proc of active) {
if (proc.exitCode === null) {
try {
proc.kill("SIGKILL");
} catch {
/* ignore */
}
}
}
}
/**
* Returns true when a Sift search result should render as failure.
*
* Purpose: keep UI status derived from stable execution details instead of
* relying on provider-specific `AgentToolResult` fields that custom renderers do
* not receive consistently.
*
* Consumer: `codebase_search.renderResult`.
*/
function isCodebaseSearchError(details) {
return Boolean(
details?.timedOut ||
details?.aborted ||
(typeof details?.exitCode === "number" && details.exitCode !== 0),
);
}
/**
* Builds the exact Sift command argv for autonomous local retrieval.
*
* Purpose: make the contract explicit: `codebase_search` is Sift-backed
* retrieval over a local scope, while `scout` remains the broader explorer
* subagent role that may choose this tool among others.
*
* Consumer: `codebase_search.execute`.
*/
function buildCodebaseSearchArgs(strategy, query, scope) {
return ["search", "--strategy", strategy, "--agent", query, scope];
}
function formatUsageStats(usage, model) {
const parts = [];
if (usage.turns)
parts.push(`${usage.turns} turn${usage.turns > 1 ? "s" : ""}`);
if (usage.input) parts.push(`${formatTokenCount(usage.input)}`);
if (usage.output) parts.push(`${formatTokenCount(usage.output)}`);
if (usage.cacheRead) parts.push(`R${formatTokenCount(usage.cacheRead)}`);
if (usage.cacheWrite) parts.push(`W${formatTokenCount(usage.cacheWrite)}`);
if (usage.cost) parts.push(`$${(Number(usage.cost) || 0).toFixed(4)}`);
if (usage.contextTokens && usage.contextTokens > 0) {
parts.push(`ctx:${formatTokenCount(usage.contextTokens)}`);
}
if (model) parts.push(model);
return parts.join(" ");
}
function formatToolCall(toolName, args, themeFg) {
const shortenPath = (p) => {
const home = os.homedir();
return p.startsWith(home) ? `~${p.slice(home.length)}` : p;
};
switch (toolName) {
case "bash": {
const command = args.command || "...";
const preview =
command.length > 60 ? `${command.slice(0, 60)}...` : command;
return themeFg("muted", "$ ") + themeFg("toolOutput", preview);
}
case "read": {
const rawPath = args.file_path || args.path || "...";
const filePath = shortenPath(rawPath);
const offset = args.offset;
const limit = args.limit;
let text = themeFg("accent", filePath);
if (offset !== undefined || limit !== undefined) {
const startLine = offset ?? 1;
const endLine = limit !== undefined ? startLine + limit - 1 : "";
text += themeFg(
"warning",
`:${startLine}${endLine ? `-${endLine}` : ""}`,
);
}
return themeFg("muted", "read ") + text;
}
case "write": {
const rawPath = args.file_path || args.path || "...";
const filePath = shortenPath(rawPath);
const content = args.content || "";
const lines = content.split("\n").length;
let text = themeFg("muted", "write ") + themeFg("accent", filePath);
if (lines > 1) text += themeFg("dim", ` (${lines} lines)`);
return text;
}
case "edit": {
const rawPath = args.file_path || args.path || "...";
return (
themeFg("muted", "edit ") + themeFg("accent", shortenPath(rawPath))
);
}
case "ls": {
const rawPath = args.path || ".";
return themeFg("muted", "ls ") + themeFg("accent", shortenPath(rawPath));
}
case "find": {
const pattern = args.pattern || "*";
const rawPath = args.path || ".";
return (
themeFg("muted", "find ") +
themeFg("accent", pattern) +
themeFg("dim", ` in ${shortenPath(rawPath)}`)
);
}
case "grep": {
const pattern = args.pattern || "";
const rawPath = args.path || ".";
return (
themeFg("muted", "grep ") +
themeFg("accent", `/${pattern}/`) +
themeFg("dim", ` in ${shortenPath(rawPath)}`)
);
}
default: {
const argsStr = JSON.stringify(args);
const preview =
argsStr.length > 50 ? `${argsStr.slice(0, 50)}...` : argsStr;
return themeFg("accent", toolName) + themeFg("dim", ` ${preview}`);
}
}
}
function getPrimaryTextContent(result) {
const first = result.content.find((item) => item.type === "text");
return first?.type === "text" ? first.text : "(no output)";
}
function summarizeBackgroundInvocation(params) {
if (params.chain && params.chain.length > 0)
return `chain:${params.chain.map((step) => step.agent).join("→")}`;
if (params.tasks && params.tasks.length > 0) {
if (params.mode === "debate")
return `debate:${params.tasks.map((task) => task.agent).join(",")}`;
return `parallel:${params.tasks.map((task) => task.agent).join(",")}`;
}
if (params.agent) return `single:${params.agent}`;
return "subagent";
}
async function executeSubagentInvocation({
defaultCwd,
agents,
agentScope,
projectAgentsDir,
params,
signal,
onUpdate,
cmuxClient,
cmuxSplitsEnabled,
useIsolation,
}) {
const makeDetails = (mode) => (results) => ({
mode,
agentScope,
projectAgentsDir,
results,
});
if (params.chain && params.chain.length > 0) {
const results = [];
let previousOutput = "";
for (let i = 0; i < params.chain.length; i++) {
const step = params.chain[i];
const taskWithContext = step.task.replace(
/\{previous\}/g,
previousOutput,
);
// Parent trace is only injected on the first chain step.
// Subsequent steps see {previous} from the predecessor — adding
// parent_trace again would duplicate audit context the chain has
// already moved past.
const taskForStep =
i === 0
? composeTaskWithParentTrace(
taskWithContext,
step.parentTrace ?? params.parentTrace,
)
: taskWithContext;
const chainUpdate = onUpdate
? (partial) => {
const currentResult = partial.details?.results[0];
if (!currentResult) return;
onUpdate({
content: partial.content,
details: makeDetails("chain")([...results, currentResult]),
});
}
: undefined;
const result = await runSingleAgent(
defaultCwd,
agents,
step.agent,
taskForStep,
step.cwd,
i + 1,
signal,
chainUpdate,
makeDetails("chain"),
step.model ?? params.model,
);
results.push(result);
const isError =
result.exitCode !== 0 ||
result.stopReason === "error" ||
result.stopReason === "aborted";
if (isError) {
const errorMsg =
result.errorMessage ||
result.stderr ||
getFinalOutput(result.messages) ||
"(no output)";
return {
content: [
{
type: "text",
text: `Chain stopped at step ${i + 1} (${step.agent}): ${errorMsg}`,
},
],
details: makeDetails("chain")(results),
isError: true,
};
}
previousOutput = getFinalOutput(result.messages);
}
return {
content: [
{
type: "text",
text:
getFinalOutput(results[results.length - 1].messages) ||
"(no output)",
},
],
details: makeDetails("chain")(results),
};
}
if (params.tasks && params.tasks.length > 0) {
if (params.tasks.length > MAX_PARALLEL_TASKS) {
const taskMode = params.mode === "debate" ? "debate" : "parallel";
return {
content: [
{
type: "text",
text: `Too many parallel tasks (${params.tasks.length}). Max is ${MAX_PARALLEL_TASKS}.`,
},
],
details: makeDetails(taskMode)([]),
isError: true,
};
}
const batchTasks = params.tasks;
const taskMode = params.mode ?? "parallel";
if (taskMode === "debate") {
const rounds = params.rounds ?? 2;
if (!Number.isInteger(rounds) || rounds < 1 || rounds > 5) {
return {
content: [
{
type: "text",
text: "Invalid debate rounds. Use an integer from 1 to 5.",
},
],
details: makeDetails("debate")([]),
isError: true,
};
}
if (batchTasks.length < 2) {
return {
content: [
{
type: "text",
text: "Debate mode requires at least two tasks/participants.",
},
],
details: makeDetails("debate")([]),
isError: true,
};
}
const debateResults = new Array(rounds * batchTasks.length);
const transcriptEntries = [];
const emitDebateUpdate = () => {
if (!onUpdate) return;
const knownResults = debateResults.filter(Boolean);
const running = knownResults.filter((r) => r.exitCode === -1).length;
const done = knownResults.filter((r) => r.exitCode !== -1).length;
onUpdate({
content: [
{
type: "text",
text: `Debate: ${done}/${debateResults.length} turns done, ${running} running...`,
},
],
details: makeDetails("debate")([...knownResults]),
});
};
const buildDebatePrompt = (task, round, transcript) => {
// Parent trace is only injected at round 1.
// In later rounds, the debate transcript carries the relevant
// context; repeating parent_trace would crowd it out.
const assignment =
round === 1
? composeTaskWithParentTrace(
task.task,
task.parentTrace ?? params.parentTrace,
)
: task.task;
return [
`You are participant "${task.agent}" in a structured multi-agent debate.`,
`Round ${round} of ${rounds}.`,
"Original assignment:",
assignment,
"Debate transcript so far:",
transcript.trim() || "(no prior rounds)",
round === rounds
? "This is the final round. Engage the strongest opposing claims, state what changed your mind if anything did, and end with FINAL_VERDICT: <PROCEED | CHANGE | BLOCK> plus one sentence."
: "Engage the strongest opposing claims directly. Add new evidence or a sharper objection; do not repeat prior points.",
].join("\n\n");
};
for (let round = 1; round <= rounds; round++) {
for (let i = 0; i < batchTasks.length; i++) {
const resultIndex = (round - 1) * batchTasks.length + i;
const task = batchTasks[i];
debateResults[resultIndex] = {
agent: task.agent,
agentSource: "unknown",
task: task.task,
exitCode: -1,
messages: [],
stderr: "",
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
cost: 0,
contextTokens: 0,
turns: 0,
},
step: round,
};
}
emitDebateUpdate();
const transcript = transcriptEntries.join("\n\n");
const roundResults = await mapWithConcurrencyLimit(
batchTasks,
MAX_CONCURRENCY,
async (t, index) => {
const resultIndex = (round - 1) * batchTasks.length + index;
const prompt = buildDebatePrompt(t, round, transcript);
const taskModelOverride = t.model ?? params.model;
const result = cmuxSplitsEnabled
? await runSingleAgentInCmuxSplit(
cmuxClient,
index % 2 === 0 ? "right" : "down",
defaultCwd,
agents,
t.agent,
prompt,
t.cwd,
round,
signal,
(partial) => {
const currentResult = partial.details?.results[0];
if (!currentResult) return;
currentResult.task = t.task;
currentResult.step = round;
debateResults[resultIndex] = currentResult;
emitDebateUpdate();
},
makeDetails("debate"),
taskModelOverride,
)
: await runSingleAgent(
defaultCwd,
agents,
t.agent,
prompt,
t.cwd,
round,
signal,
(partial) => {
const currentResult = partial.details?.results[0];
if (!currentResult) return;
currentResult.task = t.task;
currentResult.step = round;
debateResults[resultIndex] = currentResult;
emitDebateUpdate();
},
makeDetails("debate"),
taskModelOverride,
);
result.task = t.task;
result.step = round;
debateResults[resultIndex] = result;
emitDebateUpdate();
return result;
},
);
const failed = roundResults.find(
(r) =>
r.exitCode !== 0 ||
r.stopReason === "error" ||
r.stopReason === "aborted",
);
transcriptEntries.push(
...roundResults.map((r) => {
const output =
getFinalOutput(r.messages) ||
r.errorMessage ||
r.stderr ||
"(no output)";
return `## Round ${round}${r.agent}\n\n${output}`;
}),
);
if (failed) {
const errorMsg =
failed.errorMessage ||
failed.stderr ||
getFinalOutput(failed.messages) ||
"(no output)";
return {
content: [
{
type: "text",
text: `Debate stopped in round ${round} (${failed.agent}): ${errorMsg}`,
},
],
details: makeDetails("debate")(debateResults.filter(Boolean)),
isError: true,
};
}
}
const finalResults = debateResults.filter(Boolean);
const summaries = finalResults.map((r) => {
const output =
getFinalOutput(r.messages) ||
r.errorMessage ||
r.stderr ||
"(no output)";
return `[round ${r.step}] [${r.agent}] ${r.exitCode === 0 ? "completed" : `failed (exit ${r.exitCode})`}: ${output}`;
});
return {
content: [
{
type: "text",
text: `Debate: ${finalResults.length}/${debateResults.length} turns succeeded over ${rounds} rounds\n\n${summaries.join("\n\n")}`,
},
],
details: makeDetails("debate")(finalResults),
};
}
if (params.rounds !== undefined) {
return {
content: [
{
type: "text",
text: '`rounds` is only valid with `mode: "debate"`.',
},
],
details: makeDetails("parallel")([]),
isError: true,
};
}
const allResults = new Array(params.tasks.length);
for (let i = 0; i < params.tasks.length; i++) {
allResults[i] = {
agent: params.tasks[i].agent,
agentSource: "unknown",
task: params.tasks[i].task,
exitCode: -1,
messages: [],
stderr: "",
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
cost: 0,
contextTokens: 0,
turns: 0,
},
};
}
const emitParallelUpdate = () => {
if (!onUpdate) return;
const running = allResults.filter((r) => r.exitCode === -1).length;
const done = allResults.filter((r) => r.exitCode !== -1).length;
onUpdate({
content: [
{
type: "text",
text: `Parallel: ${done}/${allResults.length} done, ${running} running...`,
},
],
details: makeDetails("parallel")([...allResults]),
});
};
const MAX_RETRIES = 1;
const batchId = crypto.randomUUID();
const batchSize = params.tasks.length;
const gridSurfaces = cmuxSplitsEnabled
? await cmuxClient.createGridLayout(Math.min(batchSize, MAX_CONCURRENCY))
: [];
const results = await mapWithConcurrencyLimit(
params.tasks,
MAX_CONCURRENCY,
async (t, index) => {
const workerId = registerWorker(
t.agent,
t.task,
index,
batchSize,
batchId,
);
const taskModelOverride = t.model ?? params.model;
const taskWithTrace = composeTaskWithParentTrace(
t.task,
t.parentTrace ?? params.parentTrace,
);
const runTask = () =>
cmuxSplitsEnabled
? runSingleAgentInCmuxSplit(
cmuxClient,
gridSurfaces[index] ?? (index % 2 === 0 ? "right" : "down"),
defaultCwd,
agents,
t.agent,
taskWithTrace,
t.cwd,
undefined,
signal,
(partial) => {
if (partial.details?.results[0]) {
allResults[index] = partial.details.results[0];
emitParallelUpdate();
}
},
makeDetails("parallel"),
taskModelOverride,
)
: runSingleAgent(
defaultCwd,
agents,
t.agent,
taskWithTrace,
t.cwd,
undefined,
signal,
(partial) => {
if (partial.details?.results[0]) {
allResults[index] = partial.details.results[0];
emitParallelUpdate();
}
},
makeDetails("parallel"),
taskModelOverride,
);
let result = await runTask();
const isFailed =
result.exitCode !== 0 ||
(result.messages.length === 0 && !signal?.aborted);
if (isFailed && MAX_RETRIES > 0 && !signal?.aborted) {
result = await runTask();
}
updateWorker(workerId, result.exitCode === 0 ? "completed" : "failed");
allResults[index] = result;
emitParallelUpdate();
return result;
},
);
const successCount = results.filter((r) => r.exitCode === 0).length;
const summaries = results.map((r) => {
const isError =
r.exitCode !== 0 ||
r.stopReason === "error" ||
r.stopReason === "aborted";
const output = isError
? r.errorMessage ||
r.stderr ||
getFinalOutput(r.messages) ||
"(no output)"
: getFinalOutput(r.messages);
return `[${r.agent}] ${r.exitCode === 0 ? "completed" : `failed (exit ${r.exitCode})`}: ${output || "(no output)"}`;
});
return {
content: [
{
type: "text",
text: `Parallel: ${successCount}/${results.length} succeeded\n\n${summaries.join("\n\n")}`,
},
],
details: makeDetails("parallel")(results),
};
}
if (params.agent && params.task) {
let isolation = null;
let mergeResult;
try {
const effectiveCwd = params.cwd ?? defaultCwd;
if (useIsolation) {
const taskId = crypto.randomUUID();
isolation = await createIsolation(
effectiveCwd,
taskId,
readIsolationMode(),
);
}
const singleTaskWithTrace = composeTaskWithParentTrace(
params.task,
params.parentTrace,
);
const result = cmuxSplitsEnabled
? await runSingleAgentInCmuxSplit(
cmuxClient,
"right",
defaultCwd,
agents,
params.agent,
singleTaskWithTrace,
isolation ? isolation.workDir : params.cwd,
undefined,
signal,
onUpdate,
makeDetails("single"),
params.model,
)
: await runSingleAgent(
defaultCwd,
agents,
params.agent,
singleTaskWithTrace,
isolation ? isolation.workDir : params.cwd,
undefined,
signal,
onUpdate,
makeDetails("single"),
params.model,
);
if (isolation) {
const patches = await isolation.captureDelta();
if (patches.length > 0)
mergeResult = await mergeDeltaPatches(effectiveCwd, patches);
}
const isError =
result.exitCode !== 0 ||
result.stopReason === "error" ||
result.stopReason === "aborted";
if (isError) {
const errorMsg =
result.errorMessage ||
result.stderr ||
getFinalOutput(result.messages) ||
"(no output)";
return {
content: [
{
type: "text",
text: `Agent ${result.stopReason || "failed"}: ${errorMsg}`,
},
],
details: makeDetails("single")([result]),
isError: true,
};
}
let outputText = getFinalOutput(result.messages) || "(no output)";
if (mergeResult && !mergeResult.success) {
outputText += `\n\n⚠ Patch merge failed: ${mergeResult.error || "unknown error"}`;
}
return {
content: [{ type: "text", text: outputText }],
details: makeDetails("single")([result]),
};
} finally {
if (isolation) await isolation.cleanup();
}
}
const available =
agents.map((a) => `${a.name} (${a.source})`).join(", ") || "none";
return {
content: [
{
type: "text",
text: `Invalid parameters. Available agents: ${available}`,
},
],
details: makeDetails("single")([]),
isError: true,
};
}
function getFinalOutput(messages) {
for (let i = messages.length - 1; i >= 0; i--) {
const msg = messages[i];
if (msg.role === "assistant") {
for (const part of msg.content) {
if (part.type === "text") return part.text;
}
}
}
return "";
}
function getFailureOutput(result) {
return (
result.errorMessage?.trim() ||
result.stderr.trim() ||
getFinalOutput(result.messages).trim()
);
}
function getDisplayItems(messages) {
const items = [];
for (const msg of messages) {
if (msg.role === "assistant") {
for (const part of msg.content) {
if (part.type === "text") items.push({ type: "text", text: part.text });
else if (part.type === "toolCall")
items.push({
type: "toolCall",
name: part.name,
args: part.arguments,
});
}
}
}
return items;
}
async function mapWithConcurrencyLimit(items, concurrency, fn) {
if (items.length === 0) return [];
const limit = Math.max(1, Math.min(concurrency, items.length));
const results = new Array(items.length);
let nextIndex = 0;
const workers = new Array(limit).fill(null).map(async () => {
while (true) {
const current = nextIndex++;
if (current >= items.length) return;
results[current] = await fn(items[current], current);
}
});
await Promise.all(workers);
return results;
}
/**
* Prepends a <parent_trace> audit block to a task string so a verifier or
* review subagent can audit what the parent actually did — every tool call,
* observed output, and shortcut — instead of trusting the parent's prose
* summary.
*
* The parent agent is the source of truth for what trace to include. The
* dispatch tool does not capture session state automatically; the caller
* decides what is relevant. When parentTrace is empty/undefined, the task is
* returned unchanged.
*
* The injected block also carries verifier instructions (look for hedge
* words, glossed-over tool errors, untraced self-reports) so review subagent
* prompts do not need to repeat them.
*/
export function composeTaskWithParentTrace(task, parentTrace) {
const trimmed = parentTrace?.trim();
if (!trimmed) return task;
return [
"<parent_trace>",
"The parent agent's recent tool calls and observed outputs are included below as audit context.",
'Read it carefully and look for: hedge words ("should be fine", "probably", "I think it works"),',
"tool errors the parent may have glossed over, claims of success without a Command/Output trace,",
"or shortcuts the parent took. If you find evidence the parent's claims are unsupported,",
"surface it concretely in your response (cite the line or tool call).",
"",
trimmed,
"</parent_trace>",
"",
task,
].join("\n");
}
function writePromptToTempFile(agentName, prompt) {
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "pi-subagent-"));
const safeName = agentName.replace(/[^\w.-]+/g, "_");
const filePath = path.join(tmpDir, `prompt-${safeName}.md`);
fs.writeFileSync(filePath, prompt, { encoding: "utf-8", mode: 0o600 });
return { dir: tmpDir, filePath };
}
function buildSubagentProcessArgs(agent, task, tmpPromptPath, modelOverride) {
const args = [
...getBundledExtensionCliArgs(),
"--mode",
"json",
"-p",
"--no-session",
];
const modelToUse = modelOverride ?? agent.model;
if (modelToUse) args.push("--model", modelToUse);
if (agent.tools && agent.tools.length > 0)
args.push("--tools", agent.tools.join(","));
if (tmpPromptPath) args.push("--append-system-prompt", tmpPromptPath);
args.push(`Task: ${task}`);
return args;
}
function getBundledExtensionCliArgs() {
return (process.env.SF_BUNDLED_EXTENSION_PATHS ?? "")
.split(path.delimiter)
.map((s) => s.trim())
.filter(Boolean)
.flatMap((p) => ["--extension", p]);
}
function resolveSubagentLaunchSpec(args) {
const sfBinPath = process.env.SF_BIN_PATH || process.argv[1];
const env = { ...process.env };
const envPatch = {};
const command = process.env.SF_NODE_BIN || process.execPath;
if (sfBinPath && path.basename(sfBinPath) === "sf-from-source") {
const sourceRoot = path.resolve(path.dirname(sfBinPath), "..");
// Use dist/loader.js for SF_BIN_PATH so subagent gets a proper Node.js entry point,
// not the bash shim which Node cannot execute as a module.
const distLoaderPath = path.join(sourceRoot, "dist", "loader.js");
env.SF_BIN_PATH = distLoaderPath;
env.SF_CLI_PATH =
env.SF_CLI_PATH || path.join(sourceRoot, "bin", "sf-from-source");
envPatch.SF_BIN_PATH = distLoaderPath;
envPatch.SF_CLI_PATH = env.SF_CLI_PATH;
return {
command,
args: [
"--import",
path.join(
sourceRoot,
"src",
"resources",
"extensions",
"sf",
"tests",
"resolve-ts.mjs",
),
"--experimental-strip-types",
"--no-warnings",
path.join(sourceRoot, "src", "loader.ts"),
...args,
],
env,
envPatch,
};
}
if (!sfBinPath) {
throw new Error("Cannot determine SF launch path for subagent");
}
// Strip --extension flags from args so they are NOT passed to process.execPath
// (Node.js does not support --extension; extension paths are routed via
// SF_BUNDLED_EXTENSION_PATHS env var instead — set by writeNodeSubagentLauncher).
const filteredArgs = [];
for (let i = 0; i < args.length; i++) {
if (args[i] === "--extension") {
// Skip this flag and its value; extension paths go in SF_BUNDLED_EXTENSION_PATHS
i++;
} else {
filteredArgs.push(args[i]);
}
}
return {
command,
args: [sfBinPath, ...filteredArgs],
env,
envPatch,
};
}
function writeNodeSubagentLauncher(
launchSpec,
cwd,
stdoutPath,
stderrPath,
exitPath,
) {
const launcherPath = path.join(path.dirname(exitPath), "launch-subagent.mjs");
// Propagate extension paths via env var so the spawned sf binary (not Node.js)
// receives them. process.execPath does not support --extension, so extension
// paths must travel via SF_BUNDLED_EXTENSION_PATHS instead.
const bundledPaths = process.env.SF_BUNDLED_EXTENSION_PATHS ?? "";
const launcher = `import { spawn } from "node:child_process";
import { createWriteStream, writeFileSync } from "node:fs";
const command = ${JSON.stringify(launchSpec.command)};
const args = ${JSON.stringify(launchSpec.args)};
const cwd = ${JSON.stringify(cwd)};
const stdoutPath = ${JSON.stringify(stdoutPath)};
const stderrPath = ${JSON.stringify(stderrPath)};
const exitPath = ${JSON.stringify(exitPath)};
const env = {
...process.env,
SF_BUNDLED_EXTENSION_PATHS: ${JSON.stringify(bundledPaths)},
...${JSON.stringify(launchSpec.envPatch)},
};
const stdout = createWriteStream(stdoutPath, { flags: "a" });
const stderr = createWriteStream(stderrPath, { flags: "a" });
const child = spawn(command, args, { cwd, env, shell: false, stdio: ["ignore", "pipe", "pipe"] });
child.stdout.on("data", (chunk) => {
stdout.write(chunk);
process.stdout.write(chunk);
});
child.stderr.on("data", (chunk) => {
stderr.write(chunk);
process.stderr.write(chunk);
});
child.on("error", (error) => {
const message = error instanceof Error ? error.stack || error.message : String(error);
stderr.write(message + "\\n");
process.stderr.write(message + "\\n");
writeFileSync(exitPath, "1");
process.exit(1);
});
child.on("close", (code, signal) => {
const exitCode = code ?? (signal ? 128 : 1);
writeFileSync(exitPath, String(exitCode));
process.exit(exitCode);
});
`;
fs.writeFileSync(launcherPath, launcher, { encoding: "utf-8", mode: 0o600 });
return launcherPath;
}
function processSubagentEventLine(line, currentResult, emitUpdate) {
if (!line.trim()) return;
let event;
try {
event = JSON.parse(line);
} catch {
return;
}
if (event.type === "message_end" && event.message) {
const msg = event.message;
currentResult.messages.push(msg);
if (msg.role === "assistant") {
currentResult.usage.turns++;
const usage = msg.usage;
if (usage) {
currentResult.usage.input += usage.input || 0;
currentResult.usage.output += usage.output || 0;
currentResult.usage.cacheRead += usage.cacheRead || 0;
currentResult.usage.cacheWrite += usage.cacheWrite || 0;
currentResult.usage.cost += usage.cost?.total || 0;
currentResult.usage.contextTokens = usage.totalTokens || 0;
}
if (!currentResult.model && msg.model) currentResult.model = msg.model;
if (msg.stopReason) currentResult.stopReason = msg.stopReason;
if (msg.errorMessage) currentResult.errorMessage = msg.errorMessage;
}
emitUpdate();
}
if (event.type === "tool_result_end" && event.message) {
currentResult.messages.push(event.message);
emitUpdate();
}
}
async function waitForFile(filePath, signal, timeoutMs = 30 * 60 * 1000) {
const started = Date.now();
while (Date.now() - started < timeoutMs) {
if (signal?.aborted) return false;
if (fs.existsSync(filePath)) return true;
await new Promise((resolve) => setTimeout(resolve, 150));
}
return false;
}
async function runSingleAgent(
defaultCwd,
agents,
agentName,
task,
cwd,
step,
signal,
onUpdate,
makeDetails,
modelOverride,
) {
const { agent, effectiveName } = resolveAgentByName(agents, agentName);
if (!agent) {
const available = agents.map((a) => `"${a.name}"`).join(", ") || "none";
return {
agent: agentName,
agentSource: "unknown",
task,
exitCode: 1,
messages: [],
stderr: `Unknown agent: "${agentName}". Available agents: ${available}.`,
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
cost: 0,
contextTokens: 0,
turns: 0,
},
step,
};
}
// SF phase guard: block agents that conflict with the active SF phase
if (agent.conflictsWith && agent.conflictsWith.length > 0) {
const activePhase = getCurrentPhase();
if (activePhase && agent.conflictsWith.includes(activePhase)) {
return {
agent: agentName,
agentSource: agent.source,
task,
exitCode: 1,
messages: [],
stderr: `Agent "${agentName}" is blocked: it conflicts with the active SF phase "${activePhase}". Use the built-in SF workflow instead.`,
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
cost: 0,
contextTokens: 0,
turns: 0,
},
step,
};
}
}
let tmpPromptDir = null;
let tmpPromptPath = null;
const currentResult = {
agent: effectiveName,
agentSource: agent.source,
task,
exitCode: 0,
messages: [],
stderr: "",
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
cost: 0,
contextTokens: 0,
turns: 0,
},
model: modelOverride ?? agent.model,
step,
};
const emitUpdate = () => {
if (onUpdate) {
onUpdate({
content: [
{
type: "text",
text: getFinalOutput(currentResult.messages) || "(running...)",
},
],
details: makeDetails([currentResult]),
});
}
};
try {
if (agent.systemPrompt.trim()) {
const tmp = writePromptToTempFile(agent.name, agent.systemPrompt);
tmpPromptDir = tmp.dir;
tmpPromptPath = tmp.filePath;
}
const args = buildSubagentProcessArgs(
agent,
task,
tmpPromptPath,
modelOverride,
);
const launchSpec = resolveSubagentLaunchSpec(args);
let wasAborted = false;
const exitCode = await new Promise((resolve) => {
const proc = spawn(launchSpec.command, launchSpec.args, {
cwd: cwd ?? defaultCwd,
env: launchSpec.env,
shell: false,
stdio: ["ignore", "pipe", "pipe"],
});
liveSubagentProcesses.add(proc);
let buffer = "";
proc.stdout.on("data", (data) => {
buffer += data.toString();
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines)
processSubagentEventLine(line, currentResult, emitUpdate);
});
proc.stderr.on("data", (data) => {
currentResult.stderr += data.toString();
});
proc.on("close", (code) => {
liveSubagentProcesses.delete(proc);
if (buffer.trim())
processSubagentEventLine(buffer, currentResult, emitUpdate);
resolve(code ?? 0);
});
proc.on("error", (error) => {
liveSubagentProcesses.delete(proc);
const message =
error instanceof Error
? error.message
: `Subagent spawn failed: ${String(error)}`;
currentResult.errorMessage = message;
currentResult.stderr += currentResult.stderr ? `\n${message}` : message;
resolve(1);
});
if (signal) {
const killProc = () => {
wasAborted = true;
proc.kill("SIGTERM");
setTimeout(() => {
if (!proc.killed) proc.kill("SIGKILL");
}, 5000);
};
if (signal.aborted) killProc();
else signal.addEventListener("abort", killProc, { once: true });
}
});
currentResult.exitCode = exitCode;
if (wasAborted) throw new Error("Subagent was aborted");
return currentResult;
} finally {
if (tmpPromptPath)
try {
fs.unlinkSync(tmpPromptPath);
} catch {
/* ignore */
}
if (tmpPromptDir)
try {
fs.rmdirSync(tmpPromptDir);
} catch {
/* ignore */
}
}
}
async function runSingleAgentInCmuxSplit(
cmuxClient,
directionOrSurfaceId,
defaultCwd,
agents,
agentName,
task,
cwd,
step,
signal,
onUpdate,
makeDetails,
modelOverride,
) {
const { agent, effectiveName } = resolveAgentByName(agents, agentName);
if (!agent) {
return runSingleAgent(
defaultCwd,
agents,
agentName,
task,
cwd,
step,
signal,
onUpdate,
makeDetails,
modelOverride,
);
}
let tmpPromptDir = null;
let tmpPromptPath = null;
let tmpOutputDir = null;
const currentResult = {
agent: effectiveName,
agentSource: agent.source,
task,
exitCode: 0,
messages: [],
stderr: "",
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
cost: 0,
contextTokens: 0,
turns: 0,
},
model: modelOverride ?? agent.model,
step,
};
const emitUpdate = () => {
if (onUpdate) {
onUpdate({
content: [
{
type: "text",
text: getFinalOutput(currentResult.messages) || "(running...)",
},
],
details: makeDetails([currentResult]),
});
}
};
try {
if (agent.systemPrompt.trim()) {
const tmp = writePromptToTempFile(agent.name, agent.systemPrompt);
tmpPromptDir = tmp.dir;
tmpPromptPath = tmp.filePath;
}
tmpOutputDir = fs.mkdtempSync(path.join(os.tmpdir(), "pi-subagent-cmux-"));
const stdoutPath = path.join(tmpOutputDir, "stdout.jsonl");
const stderrPath = path.join(tmpOutputDir, "stderr.log");
const exitPath = path.join(tmpOutputDir, "exit.code");
// Accept either a pre-created surface ID or a direction to create a new split
const isDirection =
directionOrSurfaceId === "right" ||
directionOrSurfaceId === "down" ||
directionOrSurfaceId === "left" ||
directionOrSurfaceId === "up";
const cmuxSurfaceId = isDirection
? await cmuxClient.createSplit(directionOrSurfaceId)
: directionOrSurfaceId;
if (!cmuxSurfaceId) {
return runSingleAgent(
defaultCwd,
agents,
agentName,
task,
cwd,
step,
signal,
onUpdate,
makeDetails,
modelOverride,
);
}
const launchSpec = resolveSubagentLaunchSpec(
buildSubagentProcessArgs(agent, task, tmpPromptPath, modelOverride),
);
const launcherPath = writeNodeSubagentLauncher(
launchSpec,
cwd ?? defaultCwd,
stdoutPath,
stderrPath,
exitPath,
);
const sent = await cmuxClient.sendSurface(
cmuxSurfaceId,
`${shellEscape(process.env.SF_NODE_BIN || process.execPath)} ${shellEscape(launcherPath)}`,
);
if (!sent) {
return runSingleAgent(
defaultCwd,
agents,
agentName,
task,
cwd,
step,
signal,
onUpdate,
makeDetails,
modelOverride,
);
}
const finished = await waitForFile(exitPath, signal);
if (!finished) {
currentResult.exitCode = 1;
currentResult.stderr = "cmux split execution timed out or was aborted";
return currentResult;
}
if (fs.existsSync(stdoutPath)) {
const stdout = fs.readFileSync(stdoutPath, "utf-8");
for (const line of stdout.split("\n")) {
processSubagentEventLine(line, currentResult, emitUpdate);
}
}
if (fs.existsSync(stderrPath)) {
currentResult.stderr = fs.readFileSync(stderrPath, "utf-8");
}
currentResult.exitCode =
Number.parseInt(fs.readFileSync(exitPath, "utf-8").trim() || "1", 10) ||
0;
return currentResult;
} finally {
if (tmpPromptPath)
try {
fs.unlinkSync(tmpPromptPath);
} catch {
/* ignore */
}
if (tmpPromptDir)
try {
fs.rmdirSync(tmpPromptDir);
} catch {
/* ignore */
}
if (tmpOutputDir)
try {
fs.rmSync(tmpOutputDir, { recursive: true, force: true });
} catch {
/* ignore */
}
}
}
const TaskItem = Type.Object({
agent: Type.String({ description: "Name of the agent to invoke" }),
task: Type.String({ description: "Task to delegate to the agent" }),
cwd: Type.Optional(
Type.String({ description: "Working directory for the agent process" }),
),
model: Type.Optional(
Type.String({
description: "Override the agent's default model for this task",
}),
),
parentTrace: Type.Optional(
Type.String({
description:
"Audit context for verifier/review subagents: the parent agent's recent tool calls, observed outputs, and decisions, formatted as a string. The dispatch wraps this in a <parent_trace> block prepended to the task. Use for review, verification, or adversarial-audit tasks where the subagent must check the parent's actual work, not just a summary. Overrides the batch-level parentTrace.",
}),
),
});
const ChainItem = Type.Object({
agent: Type.String({ description: "Name of the agent to invoke" }),
task: Type.String({
description: "Task with optional {previous} placeholder for prior output",
}),
cwd: Type.Optional(
Type.String({ description: "Working directory for the agent process" }),
),
model: Type.Optional(
Type.String({
description: "Override the agent's default model for this step",
}),
),
parentTrace: Type.Optional(
Type.String({
description:
"Audit context for this chain step (see TaskItem.parentTrace). Typically only set on the first step; subsequent steps see {previous} from the prior step's output. Overrides the batch-level parentTrace.",
}),
),
});
const AgentScopeSchema = StringEnum(["user", "project", "both"], {
description:
'Which agent directories to use. Default: "both" (user + project-local).',
default: "both",
});
const TaskBatchModeSchema = StringEnum(["parallel", "debate"], {
description:
'How to execute `tasks`: "parallel" runs all tasks independently; "debate" runs bounded rounds where each task sees prior-round outputs.',
default: "parallel",
});
const SubagentParams = Type.Object({
agent: Type.Optional(
Type.String({
description: "Name of the agent to invoke (for single mode)",
}),
),
task: Type.Optional(
Type.String({ description: "Task to delegate (for single mode)" }),
),
background: Type.Optional(
Type.Boolean({
description:
"Launch the subagent run in the background for later retrieval via await_subagent. " +
"Useful for longer autonomous test or research waves.",
default: false,
}),
),
model: Type.Optional(
Type.String({
description:
"Override the agent's default model. Applies to single mode, or as a default for all tasks/chain steps unless they set their own `model`.",
}),
),
tasks: Type.Optional(
Type.Array(TaskItem, {
description:
'Array of {agent, task} for task-batch execution. Defaults to parallel; set `mode: "debate"` for debate rounds.',
}),
),
mode: Type.Optional(TaskBatchModeSchema),
rounds: Type.Optional(
Type.Integer({
description:
'Number of debate rounds when `mode` is "debate". Default: 2; max: 5.',
minimum: 1,
maximum: 5,
default: 2,
}),
),
chain: Type.Optional(
Type.Array(ChainItem, {
description: "Array of {agent, task} for sequential execution",
}),
),
agentScope: Type.Optional(AgentScopeSchema),
confirmProjectAgents: Type.Optional(
Type.Boolean({
description:
"Prompt before running project-local agents. Default: false.",
default: false,
}),
),
cwd: Type.Optional(
Type.String({
description: "Working directory for the agent process (single mode)",
}),
),
isolated: Type.Optional(
Type.Boolean({
description:
"Run the subagent in an isolated filesystem (git worktree). " +
"Changes are captured as patches and merged back. " +
"Only available when taskIsolation.mode is configured in settings.",
default: false,
}),
),
parentTrace: Type.Optional(
Type.String({
description:
"Default audit context for verifier/review subagents: the parent agent's recent tool calls and outputs as a string. Wrapped in a <parent_trace> block prepended to each task. Per-task or per-chain-step parentTrace overrides this. The parent agent assembles the trace it considers relevant — this dispatch tool only plumbs it through.",
}),
),
});
export default function (pi) {
let backgroundJobs = null;
function getBackgroundJobs() {
if (!backgroundJobs)
throw new Error(
"Subagent background job manager not initialized. Wait for session_start.",
);
return backgroundJobs;
}
pi.on("session_start", async () => {
backgroundJobs = new SubagentBackgroundJobManager({
onJobComplete: (job) => {
if (job.awaited) return;
const elapsed = ((Date.now() - job.startTime) / 1000).toFixed(1);
const output = job.result
? getPrimaryTextContent(job.result)
: `Error: ${job.errorText ?? "unknown error"}`;
const maxLen = 2000;
const truncated =
output.length > maxLen
? `${output.slice(0, maxLen)}\n\n[... truncated, use await_subagent for full output]`
: output;
pi.sendMessage(
{
customType: "subagent_job_result",
content: [
`**Background subagent ${job.status}: ${job.id}** (${job.label}, ${elapsed}s)`,
"",
truncated,
].join("\n"),
display: true,
},
{ deliverAs: "followUp" },
);
},
});
});
pi.on("session_before_switch", async () => {
if (!backgroundJobs) return;
for (const job of backgroundJobs.getRunningJobs()) {
backgroundJobs.cancel(job.id);
}
});
pi.on("session_shutdown", async () => {
if (backgroundJobs) {
backgroundJobs.shutdown();
backgroundJobs = null;
}
await stopLiveSubagents();
});
// /subagent command - list available agents
pi.registerCommand("subagent", {
description: "List available subagents",
handler: async (_args, ctx) => {
const discovery = discoverAgents(ctx.cwd, "both");
if (discovery.agents.length === 0) {
ctx.ui.notify(
"No agents found. Add .md files to ~/.sf/agent/agents/ or .sf/agents/",
"warning",
);
return;
}
const lines = discovery.agents.map(
(a) =>
` ${a.name} [${a.source}]${a.model ? ` (${a.model})` : ""}: ${a.description}`,
);
ctx.ui.notify(
`Available agents (${discovery.agents.length}):\n${lines.join("\n")}`,
"info",
);
},
});
pi.registerTool({
name: "subagent",
label: "Subagent",
description: [
"Delegate tasks to specialized subagents with isolated context windows.",
"Each subagent is a separate pi process with its own tools, model, and system prompt.",
"Modes: single ({ agent, task }), parallel ({ tasks: [{agent, task},...] }), debate ({ mode: 'debate', rounds, tasks: [...] }), chain ({ chain: [{agent, task},...] } with {previous} placeholder).",
"Agents are defined as .md files in ~/.sf/agent/agents/ (user) or .sf/agents/ (project).",
"Use the /subagent command to list available agents and their descriptions.",
"Use chain mode to pipeline: scout finds context, planner designs, worker implements.",
].join(" "),
promptGuidelines: [
"Use subagent to delegate self-contained tasks that benefit from an isolated context window.",
"Use scout agent first when you need codebase context before implementing.",
"Use chain mode for scout→planner→worker or worker→reviewer→worker pipelines.",
"Use parallel mode when tasks are independent and don't need each other's output.",
"Use debate mode for bounded advocate/challenger or multi-role reviews where participants should respond to prior-round outputs.",
"Always check available agents with /subagent before choosing one.",
],
parameters: SubagentParams,
async execute(_toolCallId, params, signal, onUpdate, ctx) {
const agentScope = params.agentScope ?? "both";
const discovery = discoverAgents(ctx.cwd, agentScope);
const agents = discovery.agents;
const confirmProjectAgents = params.confirmProjectAgents ?? false;
const cmuxClient = CmuxClient.fromPreferences(
loadEffectiveSFPreferences()?.preferences,
);
const cmuxSplitsEnabled = cmuxClient.getConfig().splits;
// Resolve isolation mode
const isolationMode = readIsolationMode();
const useIsolation = Boolean(params.isolated) && isolationMode !== "none";
const hasChain = (params.chain?.length ?? 0) > 0;
const hasTasks = (params.tasks?.length ?? 0) > 0;
const hasSingle = Boolean(params.agent && params.task);
const modeCount = Number(hasChain) + Number(hasTasks) + Number(hasSingle);
const makeDetails = (mode) => (results) => ({
mode,
agentScope,
projectAgentsDir: discovery.projectAgentsDir,
results,
});
if (modeCount !== 1) {
const available =
agents.map((a) => `${a.name} (${a.source})`).join(", ") || "none";
return {
content: [
{
type: "text",
text: `Invalid parameters. Provide exactly one mode.\nAvailable agents: ${available}`,
},
],
details: makeDetails("single")([]),
};
}
if ((params.mode || params.rounds !== undefined) && !hasTasks) {
return {
content: [
{
type: "text",
text: "`mode` and `rounds` are only valid with parallel task batches (`tasks: [...]`).",
},
],
details: makeDetails(
hasChain ? "chain" : hasSingle ? "single" : "parallel",
)([]),
isError: true,
};
}
if (
(agentScope === "project" || agentScope === "both") &&
confirmProjectAgents &&
ctx.hasUI
) {
const requestedAgentNames = new Set();
if (params.chain)
for (const step of params.chain) requestedAgentNames.add(step.agent);
if (params.tasks)
for (const t of params.tasks) requestedAgentNames.add(t.agent);
if (params.agent) requestedAgentNames.add(params.agent);
const projectAgentsRequested = Array.from(requestedAgentNames)
.map((name) => agents.find((a) => a.name === name))
.filter((a) => a?.source === "project");
if (projectAgentsRequested.length > 0) {
const names = projectAgentsRequested.map((a) => a.name).join(", ");
const dir = discovery.projectAgentsDir ?? "(unknown)";
const ok = await ctx.ui.confirm(
"Run project-local agents?",
`Agents: ${names}\nSource: ${dir}\n\nProject agents are repo-controlled. Only continue for trusted repositories.`,
);
if (!ok)
return {
content: [
{
type: "text",
text: "Canceled: project-local agents not approved.",
},
],
details: makeDetails(
hasChain ? "chain" : hasTasks ? "parallel" : "single",
)([]),
};
}
}
if (params.background) {
const manager = getBackgroundJobs();
const jobId = manager.register(
summarizeBackgroundInvocation(params),
(backgroundSignal) =>
executeSubagentInvocation({
defaultCwd: ctx.cwd,
agents,
agentScope,
projectAgentsDir: discovery.projectAgentsDir,
params: {
...params,
confirmProjectAgents: false,
background: false,
},
signal: backgroundSignal,
onUpdate: undefined,
cmuxClient,
cmuxSplitsEnabled,
useIsolation,
}),
);
return {
content: [
{
type: "text",
text:
`Background subagent job started: **${jobId}**\n` +
`Invocation: \`${summarizeBackgroundInvocation(params)}\`\n\n` +
"Use `await_subagent` to retrieve the result or `cancel_subagent` to stop it.",
},
],
details: undefined,
};
}
return executeSubagentInvocation({
defaultCwd: ctx.cwd,
agents,
agentScope,
projectAgentsDir: discovery.projectAgentsDir,
params,
signal,
onUpdate,
cmuxClient,
cmuxSplitsEnabled,
useIsolation,
});
},
renderCall(args, theme) {
const scope = args.agentScope ?? "both";
if (args.chain && args.chain.length > 0) {
let text =
theme.fg("toolTitle", theme.bold("subagent ")) +
theme.fg("accent", `chain (${args.chain.length} steps)`) +
theme.fg("muted", ` [${scope}]`);
for (let i = 0; i < Math.min(args.chain.length, 3); i++) {
const step = args.chain[i];
// Clean up {previous} placeholder for display
const cleanTask = step.task.replace(/\{previous\}/g, "").trim();
const preview =
cleanTask.length > 40 ? `${cleanTask.slice(0, 40)}...` : cleanTask;
text +=
"\n " +
theme.fg("muted", `${i + 1}.`) +
" " +
theme.fg("accent", step.agent) +
theme.fg("dim", ` ${preview}`);
}
if (args.chain.length > 3)
text += `\n ${theme.fg("muted", `... +${args.chain.length - 3} more`)}`;
return new Text(text, 0, 0);
}
if (args.tasks && args.tasks.length > 0) {
let text =
theme.fg("toolTitle", theme.bold("subagent ")) +
theme.fg("accent", `parallel (${args.tasks.length} tasks)`) +
theme.fg("muted", ` [${scope}]`);
for (const t of args.tasks.slice(0, 3)) {
const preview =
t.task.length > 40 ? `${t.task.slice(0, 40)}...` : t.task;
text += `\n ${theme.fg("accent", t.agent)}${theme.fg("dim", ` ${preview}`)}`;
}
if (args.tasks.length > 3)
text += `\n ${theme.fg("muted", `... +${args.tasks.length - 3} more`)}`;
return new Text(text, 0, 0);
}
const agentName = args.agent || "...";
const preview = args.task
? args.task.length > 60
? `${args.task.slice(0, 60)}...`
: args.task
: "...";
let text =
theme.fg("toolTitle", theme.bold("subagent ")) +
theme.fg("accent", agentName) +
theme.fg("muted", ` [${scope}]`);
text += `\n ${theme.fg("dim", preview)}`;
return new Text(text, 0, 0);
},
renderResult(result, { expanded }, theme) {
const details = result.details;
if (!details || details.results.length === 0) {
const text = result.content[0];
return new Text(
text?.type === "text" ? text.text : "(no output)",
0,
0,
);
}
const mdTheme = getMarkdownTheme();
const renderDisplayItems = (items, limit) => {
const toShow = limit ? items.slice(-limit) : items;
const skipped =
limit && items.length > limit ? items.length - limit : 0;
let text = "";
if (skipped > 0)
text += theme.fg("muted", `... ${skipped} earlier items\n`);
for (const item of toShow) {
if (item.type === "text") {
const preview = expanded
? item.text
: item.text.split("\n").slice(0, 3).join("\n");
text += `${theme.fg("toolOutput", preview)}\n`;
} else {
text += `${theme.fg("muted", "→ ") + formatToolCall(item.name, item.args, theme.fg.bind(theme))}\n`;
}
}
return text.trimEnd();
};
if (details.mode === "single" && details.results.length === 1) {
const r = details.results[0];
const isError =
r.exitCode !== 0 ||
r.stopReason === "error" ||
r.stopReason === "aborted";
const icon = isError
? theme.fg("error", "✗")
: theme.fg("success", "✓");
const displayItems = getDisplayItems(r.messages);
const finalOutput = getFinalOutput(r.messages);
const failureOutput = isError ? getFailureOutput(r) : "";
if (expanded) {
const container = new Container();
let header = `${icon} ${theme.fg("toolTitle", theme.bold(r.agent))}${theme.fg("muted", ` (${r.agentSource})`)}`;
if (isError && r.stopReason)
header += ` ${theme.fg("error", `[${r.stopReason}]`)}`;
container.addChild(new Text(header, 0, 0));
container.addChild(new Spacer(1));
container.addChild(new Text(theme.fg("muted", "─── Task ───"), 0, 0));
container.addChild(new Text(theme.fg("dim", r.task), 0, 0));
container.addChild(new Spacer(1));
container.addChild(
new Text(theme.fg("muted", "─── Output ───"), 0, 0),
);
if (failureOutput) {
container.addChild(
new Text(theme.fg("error", failureOutput), 0, 0),
);
} else if (displayItems.length === 0 && !finalOutput) {
container.addChild(
new Text(theme.fg("muted", "(no output)"), 0, 0),
);
} else {
for (const item of displayItems) {
if (item.type === "toolCall")
container.addChild(
new Text(
theme.fg("muted", "→ ") +
formatToolCall(
item.name,
item.args,
theme.fg.bind(theme),
),
0,
0,
),
);
}
if (finalOutput) {
container.addChild(new Spacer(1));
container.addChild(
new Markdown(finalOutput.trim(), 0, 0, mdTheme),
);
}
}
const usageStr = formatUsageStats(r.usage, r.model);
if (usageStr) {
container.addChild(new Spacer(1));
container.addChild(new Text(theme.fg("dim", usageStr), 0, 0));
}
return container;
}
let text = `${icon} ${theme.fg("toolTitle", theme.bold(r.agent))}${theme.fg("muted", ` (${r.agentSource})`)}`;
if (isError && r.stopReason)
text += ` ${theme.fg("error", `[${r.stopReason}]`)}`;
if (isError && failureOutput)
text += `\n${theme.fg("error", `Error: ${failureOutput}`)}`;
else if (displayItems.length === 0)
text += `\n${theme.fg("muted", "(no output)")}`;
else {
text += `\n${renderDisplayItems(displayItems, COLLAPSED_ITEM_COUNT)}`;
if (displayItems.length > COLLAPSED_ITEM_COUNT)
text += `\n${theme.fg("muted", "(Ctrl+O to expand)")}`;
}
const usageStr = formatUsageStats(r.usage, r.model);
if (usageStr) text += `\n${theme.fg("dim", usageStr)}`;
return new Text(text, 0, 0);
}
const aggregateUsage = (results) => {
const total = {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
cost: 0,
turns: 0,
};
for (const r of results) {
total.input += r.usage.input;
total.output += r.usage.output;
total.cacheRead += r.usage.cacheRead;
total.cacheWrite += r.usage.cacheWrite;
total.cost += r.usage.cost;
total.turns += r.usage.turns;
}
return total;
};
if (details.mode === "chain") {
const successCount = details.results.filter(
(r) => r.exitCode === 0,
).length;
const icon =
successCount === details.results.length
? theme.fg("success", "✓")
: theme.fg("error", "✗");
if (expanded) {
const container = new Container();
container.addChild(
new Text(
icon +
" " +
theme.fg("toolTitle", theme.bold("chain ")) +
theme.fg(
"accent",
`${successCount}/${details.results.length} steps`,
),
0,
0,
),
);
for (const r of details.results) {
const rIcon =
r.exitCode === 0
? theme.fg("success", "✓")
: theme.fg("error", "✗");
const displayItems = getDisplayItems(r.messages);
const finalOutput = getFinalOutput(r.messages);
const failureOutput =
r.exitCode !== 0 ||
r.stopReason === "error" ||
r.stopReason === "aborted"
? getFailureOutput(r)
: "";
container.addChild(new Spacer(1));
container.addChild(
new Text(
`${theme.fg("muted", `─── Step ${r.step}: `) + theme.fg("accent", r.agent)} ${rIcon}`,
0,
0,
),
);
container.addChild(
new Text(
theme.fg("muted", "Task: ") + theme.fg("dim", r.task),
0,
0,
),
);
// Show tool calls
for (const item of displayItems) {
if (item.type === "toolCall") {
container.addChild(
new Text(
theme.fg("muted", "→ ") +
formatToolCall(
item.name,
item.args,
theme.fg.bind(theme),
),
0,
0,
),
);
}
}
if (failureOutput) {
container.addChild(new Spacer(1));
container.addChild(
new Text(theme.fg("error", failureOutput), 0, 0),
);
}
// Show final output as markdown
if (!failureOutput && finalOutput) {
container.addChild(new Spacer(1));
container.addChild(
new Markdown(finalOutput.trim(), 0, 0, mdTheme),
);
}
const stepUsage = formatUsageStats(r.usage, r.model);
if (stepUsage)
container.addChild(new Text(theme.fg("dim", stepUsage), 0, 0));
}
const usageStr = formatUsageStats(aggregateUsage(details.results));
if (usageStr) {
container.addChild(new Spacer(1));
container.addChild(
new Text(theme.fg("dim", `Total: ${usageStr}`), 0, 0),
);
}
return container;
}
// Collapsed view
let text =
icon +
" " +
theme.fg("toolTitle", theme.bold("chain ")) +
theme.fg("accent", `${successCount}/${details.results.length} steps`);
for (const r of details.results) {
const rIcon =
r.exitCode === 0
? theme.fg("success", "✓")
: theme.fg("error", "✗");
const displayItems = getDisplayItems(r.messages);
const failureOutput =
r.exitCode !== 0 ||
r.stopReason === "error" ||
r.stopReason === "aborted"
? getFailureOutput(r)
: "";
text += `\n\n${theme.fg("muted", `─── Step ${r.step}: `)}${theme.fg("accent", r.agent)} ${rIcon}`;
if (failureOutput) text += `\n${theme.fg("error", failureOutput)}`;
else if (displayItems.length === 0)
text += `\n${theme.fg("muted", "(no output)")}`;
else text += `\n${renderDisplayItems(displayItems, 5)}`;
}
const usageStr = formatUsageStats(aggregateUsage(details.results));
if (usageStr) text += `\n\n${theme.fg("dim", `Total: ${usageStr}`)}`;
text += `\n${theme.fg("muted", "(Ctrl+O to expand)")}`;
return new Text(text, 0, 0);
}
if (details.mode === "parallel" || details.mode === "debate") {
const modeLabel = details.mode;
const running = details.results.filter((r) => r.exitCode === -1).length;
const successCount = details.results.filter(
(r) => r.exitCode === 0,
).length;
const failCount = details.results.filter((r) => r.exitCode > 0).length;
const isRunning = running > 0;
const icon = isRunning
? theme.fg("warning", "⏳ RUNNING")
: failCount > 0
? theme.fg("warning", "◐")
: theme.fg("success", "✓");
const status = isRunning
? `${successCount + failCount}/${details.results.length} done, ${running} running`
: details.mode === "debate"
? `${successCount}/${details.results.length} turns`
: `${successCount}/${details.results.length} tasks`;
if (expanded && !isRunning) {
const container = new Container();
container.addChild(
new Text(
`${icon} ${theme.fg("toolTitle", theme.bold(`${modeLabel} `))}${theme.fg("accent", status)}`,
0,
0,
),
);
for (const r of details.results) {
const rIcon =
r.exitCode === 0
? theme.fg("success", "✓")
: theme.fg("error", "✗");
const displayItems = getDisplayItems(r.messages);
const finalOutput = getFinalOutput(r.messages);
const failureOutput =
r.exitCode !== 0 ||
r.stopReason === "error" ||
r.stopReason === "aborted"
? getFailureOutput(r)
: "";
container.addChild(new Spacer(1));
container.addChild(
new Text(
`${theme.fg("muted", details.mode === "debate" ? `─── Round ${r.step}: ` : "─── ") + theme.fg("accent", r.agent)} ${rIcon}`,
0,
0,
),
);
container.addChild(
new Text(
theme.fg("muted", "Task: ") + theme.fg("dim", r.task),
0,
0,
),
);
// Show tool calls
for (const item of displayItems) {
if (item.type === "toolCall") {
container.addChild(
new Text(
theme.fg("muted", "→ ") +
formatToolCall(
item.name,
item.args,
theme.fg.bind(theme),
),
0,
0,
),
);
}
}
if (failureOutput) {
container.addChild(new Spacer(1));
container.addChild(
new Text(theme.fg("error", failureOutput), 0, 0),
);
}
// Show final output as markdown
if (!failureOutput && finalOutput) {
container.addChild(new Spacer(1));
container.addChild(
new Markdown(finalOutput.trim(), 0, 0, mdTheme),
);
}
const taskUsage = formatUsageStats(r.usage, r.model);
if (taskUsage)
container.addChild(new Text(theme.fg("dim", taskUsage), 0, 0));
}
const usageStr = formatUsageStats(aggregateUsage(details.results));
if (usageStr) {
container.addChild(new Spacer(1));
container.addChild(
new Text(theme.fg("dim", `Total: ${usageStr}`), 0, 0),
);
}
return container;
}
// Collapsed view (or still running)
let text = `${icon} ${theme.fg("toolTitle", theme.bold(`${modeLabel} `))}${theme.fg("accent", status)}`;
for (const r of details.results) {
const rIcon =
r.exitCode === -1
? theme.fg("warning", "RUNNING")
: r.exitCode === 0
? theme.fg("success", "✓")
: theme.fg("error", "✗");
const displayItems = getDisplayItems(r.messages);
const failureOutput =
r.exitCode !== 0 && r.exitCode !== -1 ? getFailureOutput(r) : "";
const prefix =
details.mode === "debate" ? `─── Round ${r.step}: ` : "─── ";
text += `\n\n${theme.fg("muted", prefix)}${theme.fg("accent", r.agent)} ${rIcon}`;
if (failureOutput) text += `\n${theme.fg("error", failureOutput)}`;
else if (displayItems.length === 0)
text += `\n${theme.fg("muted", r.exitCode === -1 ? "still running; waiting for first output..." : "(no output)")}`;
else text += `\n${renderDisplayItems(displayItems, 5)}`;
}
if (!isRunning) {
const usageStr = formatUsageStats(aggregateUsage(details.results));
if (usageStr) text += `\n\n${theme.fg("dim", `Total: ${usageStr}`)}`;
}
if (!expanded) text += `\n${theme.fg("muted", "(Ctrl+O to expand)")}`;
return new Text(text, 0, 0);
}
const text = result.content[0];
return new Text(text?.type === "text" ? text.text : "(no output)", 0, 0);
},
});
// ── Codebase Search Tool ───────────────────────────────────────────────────────
// Sift-backed local retrieval. This is intentionally not named "scout":
// `scout` is the explorer subagent role; `codebase_search` is the retrieval
// primitive that scouts, planners, and parent agents can call for evidence.
const CodebaseSearchParams = Type.Object({
query: Type.String({
description:
"Natural-language query describing what to explore (e.g. 'find where the write gate tool_call hooks are registered')",
}),
scope: Type.Optional(
Type.String({
description:
"Path to search within. Defaults to repository root ('.'); absolute paths inside the repo are normalized to repo-relative paths so .siftignore applies.",
}),
),
strategy: Type.Optional(
Type.String({
description:
"Search strategy: 'path-hybrid' (default), 'page-index-hybrid', 'bm25', or 'path'",
}),
),
timeoutMs: Type.Optional(
Type.Number({
description:
"Maximum time to wait for Sift before aborting. Defaults to 120000.",
}),
),
});
pi.registerTool({
name: "codebase_search",
label: "Code Search",
description: [
"Perform Sift-backed hybrid (BM25 + Vector) retrieval over a scoped codebase path.",
" Use this for conceptual, behavioral, or cross-cutting questions only after choosing a narrow scope",
" (e.g. 'how is X handled?', 'where is the logic for Y?', 'find examples of Z').",
" If Sift status is degraded or the scope is broad, prefer grep/find/ls and retry with a narrower scope.",
].join(""),
promptGuidelines: [
"Use grep/find/ls for broad orientation first, then codebase_search with a specific scope for conceptual patterns.",
" page-index-hybrid (default): Use for 'How' and 'Why' questions (logic, implementation, reasoning).",
" path-hybrid: Use for 'Where' questions (architecture, directory structure, file location).",
" Keep scope narrow enough to avoid root-level Sift timeouts; each repo uses its own SIFT_SEARCH_CACHE under .sf/runtime/sift/.",
" Be descriptive in your query: include function names, types, or intent (e.g. 'auth middleware validation').",
" This tool is read-only and optimized for evidence gathering before you plan or edit.",
],
parameters: CodebaseSearchParams,
renderCall(args, theme) {
const query = typeof args.query === "string" ? args.query : "";
const scope = resolveSiftSearchScope(
process.cwd(),
typeof args.scope === "string" ? args.scope : undefined,
);
const strategy =
typeof args.strategy === "string" ? args.strategy : "page-index-hybrid";
const preview =
query.length > 90 ? `${query.slice(0, 89).trimEnd()}` : query;
const scopeLabel =
scope.length > 70
? `${scope.slice(Math.max(0, scope.length - 69))}`
: scope;
return new Text(
[
theme.fg("toolTitle", theme.bold("Code search is querying Sift")),
preview ? ` ${theme.fg("toolOutput", preview)}` : "",
` ${theme.fg("muted", `scope: ${scopeLabel}`)}`,
` ${theme.fg("muted", `strategy: ${strategy}`)}`,
]
.filter(Boolean)
.join("\n"),
0,
0,
);
},
renderResult(result, { expanded }, theme) {
const text =
result.content.find((item) => item.type === "text")?.text ??
"(code search returned no text)";
const details = result.details;
const isError = isCodebaseSearchError(details);
const icon = isError ? theme.fg("error", "✗") : theme.fg("success", "✓");
const status = details?.timedOut
? "timed out"
: details?.aborted
? "aborted"
: isError
? "failed"
: "done";
const lines = text.split("\n");
const maxLines = expanded ? lines.length : 12;
const shown = lines.slice(0, maxLines).join("\n");
const hidden = Math.max(0, lines.length - maxLines);
let rendered = `${icon} ${theme.fg("toolTitle", theme.bold(`Code search ${status}`))}`;
if (details?.strategy) {
rendered += theme.fg("muted", ` (${details.strategy})`);
}
rendered += `\n${theme.fg(isError ? "error" : "toolOutput", shown)}`;
if (hidden > 0) {
rendered += `\n${theme.fg("muted", `${hidden} more lines hidden · Ctrl+O expands`)}`;
}
return new Text(rendered, 0, 0);
},
async execute(_toolCallId, params, signal) {
const projectRoot = process.cwd();
const scope = resolveSiftSearchScope(projectRoot, params.scope);
const strategy = params.strategy ?? "page-index-hybrid";
const query = params.query;
const startedAt = Date.now();
const timeoutMs =
typeof params.timeoutMs === "number" &&
Number.isFinite(params.timeoutMs)
? Math.max(1_000, params.timeoutMs)
: CODEBASE_SEARCH_TIMEOUT_MS;
const siftBin = resolveSiftBinary();
if (!siftBin) {
await recordRetrievalEvidence(projectRoot, {
backend: "codebase_search",
sourceKind: "code",
query,
strategy,
scope,
status: "error",
hitCount: 0,
elapsedMs: Date.now() - startedAt,
error: "sift binary not found",
});
return {
content: [
{
type: "text",
text: "codebase_search unavailable: sift binary not found. Use grep/find/ls or set SIFT_PATH.",
},
],
details: {
operation: "codebase_search",
exitCode: 127,
query,
scope,
strategy,
timeoutMs,
},
};
}
const args = buildCodebaseSearchArgs(strategy, query, scope);
const stderr = [];
const stdout = [];
let wasAborted = false;
let timedOut = false;
const runtimeDirs = ensureSiftRuntimeDirs(projectRoot);
const childEnv = buildSiftEnv(projectRoot, process.env);
const proc = spawn(siftBin, args, {
cwd: projectRoot,
env: childEnv,
shell: false,
stdio: ["ignore", "pipe", "pipe"],
});
liveSubagentProcesses.add(proc);
// Collect output
proc.stdout.on("data", (chunk) => stdout.push(chunk.toString()));
proc.stderr.on("data", (chunk) => stderr.push(chunk.toString()));
// Handle abort signal
const killProc = () => {
wasAborted = true;
try {
proc.kill("SIGTERM");
} catch {
// ignore
}
setTimeout(() => {
if (proc.exitCode === null) {
try {
proc.kill("SIGKILL");
} catch {
// ignore
}
}
}, 5000).unref?.();
};
const timeout = setTimeout(() => {
timedOut = true;
killProc();
}, timeoutMs);
timeout.unref?.();
if (signal) {
if (signal.aborted) killProc();
else signal.addEventListener("abort", killProc, { once: true });
}
const exitCode = await new Promise((resolve) => {
proc.on("close", (code) => {
clearTimeout(timeout);
liveSubagentProcesses.delete(proc);
if (signal) signal.removeEventListener("abort", killProc);
resolve(code ?? 0);
});
proc.on("error", () => {
clearTimeout(timeout);
liveSubagentProcesses.delete(proc);
if (signal) signal.removeEventListener("abort", killProc);
resolve(1);
});
});
if (wasAborted) {
const text = timedOut
? `Code search timed out after ${Math.round(timeoutMs / 1000)}s. Narrow the query or scope and retry.`
: "Code search aborted.";
await recordRetrievalEvidence(projectRoot, {
backend: "codebase_search",
sourceKind: "code",
query,
strategy,
scope,
status: timedOut ? "timeout" : "aborted",
hitCount: 0,
elapsedMs: Date.now() - startedAt,
cachePath: runtimeDirs.searchCache,
error: text,
result: {
siftBin,
timeoutMs,
},
});
return {
content: [
{
type: "text",
text,
},
],
details: {
operation: "codebase_search",
aborted: true,
timedOut,
siftBin,
query,
scope,
strategy,
timeoutMs,
searchCache: runtimeDirs.searchCache,
},
};
}
const out = stdout.join("");
const err = stderr.join("").trim();
if (exitCode !== 0 && !out) {
const hint =
err.includes("not found") || err.includes("No such file")
? "\n\nHint: install rupurt/sift and ensure `sift` is on PATH."
: err
? `\n\nsift stderr: ${err.slice(0, 500)}`
: "";
await recordRetrievalEvidence(projectRoot, {
backend: "codebase_search",
sourceKind: "code",
query,
strategy,
scope,
status: "error",
hitCount: 0,
elapsedMs: Date.now() - startedAt,
cachePath: runtimeDirs.searchCache,
error: err || `exit ${exitCode}`,
result: {
siftBin,
exitCode,
timeoutMs,
},
});
return {
content: [
{
type: "text",
text: `codebase_search failed (exit ${exitCode}). Is sift installed?${hint}`,
},
],
details: {
operation: "codebase_search",
exitCode,
siftBin,
query,
scope,
strategy,
timeoutMs,
searchCache: runtimeDirs.searchCache,
},
};
}
await recordRetrievalEvidence(projectRoot, {
backend: "codebase_search",
sourceKind: "code",
query,
strategy,
scope,
status: exitCode === 0 ? "ok" : "partial",
hitCount: out.trim() ? 1 : 0,
elapsedMs: Date.now() - startedAt,
cachePath: runtimeDirs.searchCache,
error: err || null,
result: {
siftBin,
exitCode,
timeoutMs,
outputPreview: out.slice(0, 2_000),
},
});
return {
content: [
{
type: "text",
text: out || "(sift returned no output)",
},
],
details: {
operation: "codebase_search",
query,
scope,
strategy,
exitCode,
siftBin,
timeoutMs,
searchCache: runtimeDirs.searchCache,
},
};
},
});
pi.registerTool({
name: "await_subagent",
label: "Await Subagent",
description:
"Wait for a background subagent job to complete and return its full result.",
parameters: Type.Object({
job_id: Type.String({
description: "Background subagent job ID (for example sub_a1b2c3d4)",
}),
timeout: Type.Optional(
Type.Number({
description:
"Maximum seconds to wait before returning control. Defaults to 120. " +
"If the timeout is reached the subagent keeps running in the background.",
}),
),
}),
async execute(_toolCallId, params) {
const manager = getBackgroundJobs();
const job = manager.getJob(params.job_id);
if (!job) {
return {
content: [
{
type: "text",
text: `Background subagent job not found: ${params.job_id}`,
},
],
details: undefined,
};
}
manager.suppressFollowUp(job.id);
if (job.status !== "running") {
if (job.result) return job.result;
return {
content: [
{
type: "text",
text: `Background subagent ${job.status}: ${job.errorText ?? "unknown error"}`,
},
],
details: undefined,
isError: job.status !== "completed",
};
}
const timeoutSeconds = params.timeout ?? 120;
const timeoutMs = timeoutSeconds * 1000;
const TIMEOUT_SENTINEL = Symbol("timeout");
const timeoutPromise = new Promise((resolve) => {
const timer = setTimeout(() => resolve(TIMEOUT_SENTINEL), timeoutMs);
if (typeof timer === "object" && "unref" in timer) timer.unref();
});
const raceResult = await Promise.race([
job.promise.then(() => "completed"),
timeoutPromise,
]);
if (raceResult === TIMEOUT_SENTINEL) {
const elapsed = ((Date.now() - job.startTime) / 1000).toFixed(1);
return {
content: [
{
type: "text",
text:
`Background subagent still running: **${job.id}** (${job.label}, ${elapsed}s)\n\n` +
`Timed out after ${timeoutSeconds}s. Call \`await_subagent\` again later or \`cancel_subagent\` to stop it.`,
},
],
details: undefined,
};
}
const finished = manager.getJob(job.id);
if (finished?.result) return finished.result;
return {
content: [
{
type: "text",
text: `Background subagent ${finished?.status ?? "failed"}: ${finished?.errorText ?? "unknown error"}`,
},
],
details: undefined,
isError: true,
};
},
});
pi.registerTool({
name: "cancel_subagent",
label: "Cancel Subagent",
description: "Cancel a running background subagent job by ID.",
parameters: Type.Object({
job_id: Type.String({
description: "Background subagent job ID (for example sub_a1b2c3d4)",
}),
}),
async execute(_toolCallId, params) {
const manager = getBackgroundJobs();
const result = manager.cancel(params.job_id);
const messages = {
cancelled: `Background subagent ${params.job_id} has been cancelled.`,
not_found: `Background subagent ${params.job_id} not found.`,
already_completed: `Background subagent ${params.job_id} has already completed (or failed/cancelled).`,
};
return {
content: [
{
type: "text",
text: messages[result] ?? `Unknown result: ${result}`,
},
],
details: undefined,
};
},
});
}