feat(subagent): event streaming for in-process runSubagent

Add RunSubagentOptions.onEvent callback so callers (TUI live update panel
for /delegate, /rubber-duck, etc.) get every session event without polling.
Errors from the callback are caught so a buggy caller cannot crash the agent.

Chain caller-supplied AbortSignal through a local AbortController in
runSingleAgent and register it in a new liveSubagentControllers set so
stopLiveSubagents aborts in-process subagents alongside the legacy spawn-based
processes (cmux split, sift codebase_search).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mikael Hugo 2026-05-15 04:04:52 +02:00
parent 62f886430c
commit 903cdd4d9d
3 changed files with 79 additions and 13 deletions

View file

@ -79,18 +79,29 @@ function createSubagentUIContext(): ExtensionUIContext {
};
}
export interface RunSubagentOptions {
signal?: AbortSignal;
timeoutMs?: number;
/**
* Called for each agent session event (forwarded from session.subscribe).
* Use this to drive live UI updates without polling.
* Errors thrown from this callback are caught and logged but do not abort the subagent.
*/
onEvent?: (event: AgentSessionEvent) => void;
}
/**
* Run a subagent in-process with an isolated AgentSession.
*
* @param config - Subagent configuration (system prompt, model, tools, cwd).
* @param task - Task string to send as the first prompt.
* @param options - Optional AbortSignal and timeout.
* @param options - Optional AbortSignal, timeout, and event callback.
* @returns SubagentResult with ok, output, stderr, exitCode.
*/
export async function runSubagent(
config: SubagentConfig,
task: string,
options?: { signal?: AbortSignal; timeoutMs?: number },
options?: RunSubagentOptions,
): Promise<SubagentResult> {
const name = config.name ?? "subagent";
const cwd = config.cwd ?? process.cwd();
@ -182,6 +193,17 @@ export async function runSubagent(
partialOutput += streamEvent.delta;
}
}
// Forward every event to the caller's callback (if provided).
// Errors from the callback are caught so a buggy caller cannot crash the agent.
if (options?.onEvent) {
try {
options.onEvent(event);
} catch (cbErr) {
process.stderr.write(
`[subagent:${name}] onEvent callback error: ${cbErr instanceof Error ? cbErr.message : String(cbErr)}\n`,
);
}
}
});
// Helper to extract final output from session state messages.

View file

@ -14,6 +14,7 @@ export {
parseSkillBlock,
type SessionStats,
} from "./core/agent-session.js";
export { ArtifactManager } from "./core/artifact-manager.js";
// Auth and model registry
export {
@ -304,6 +305,7 @@ export {
} from "./core/skills.js";
// Subagent runner
export {
type RunSubagentOptions,
runSubagent,
type SubagentConfig,
type SubagentResult,

View file

@ -62,6 +62,7 @@ const COLLAPSED_ITEM_COUNT = 10;
*/
const CODEBASE_SEARCH_TIMEOUT_MS = 120_000;
const liveSubagentProcesses = new Set();
const liveSubagentControllers = new Set();
const AGENT_ALIASES = {
default: "worker",
code: "reviewer",
@ -91,6 +92,16 @@ function resolveAgentByName(agents, agentName) {
};
}
async function stopLiveSubagents() {
// Abort in-process subagents (RunSubagentOptions-based controllers)
for (const controller of liveSubagentControllers) {
try {
controller.abort();
} catch {
/* ignore */
}
}
liveSubagentControllers.clear();
// Kill spawned processes (sift codebase_search, cmux split path, etc.)
const active = Array.from(liveSubagentProcesses);
if (active.length === 0) return;
for (const proc of active) {
@ -1206,6 +1217,18 @@ async function runSingleAgent(
});
}
};
const controller = new AbortController();
// If the caller supplied a signal, chain it into our local controller
if (signal) {
if (signal.aborted) {
controller.abort();
} else {
signal.addEventListener("abort", () => controller.abort(), {
once: true,
});
}
}
liveSubagentControllers.add(controller);
try {
const systemPrompt = agent.systemPrompt.trim()
? composeAgentPrompt(agent, {
@ -1223,18 +1246,35 @@ async function runSingleAgent(
name: agent.name,
},
task,
{ signal },
{
signal: controller.signal,
onEvent: (event) => {
processSubagentEventLine(
JSON.stringify(event),
currentResult,
emitUpdate,
);
},
},
);
currentResult.exitCode = result.exitCode;
currentResult.stderr = result.stderr ?? "";
currentResult.messages.push({
role: "assistant",
content: [{ type: "text", text: result.output }],
model: modelOverride ?? agent.model,
stopReason: result.ok ? "stop" : "error",
errorMessage: result.ok ? undefined : result.stderr,
});
currentResult.usage.turns = result.output ? 1 : 0;
if (result.stderr) {
currentResult.stderr += currentResult.stderr
? `\n${result.stderr}`
: result.stderr;
}
// If processSubagentEventLine didn't capture messages from events,
// fall back to synthesising one from the final output text.
if (currentResult.messages.length === 0 && result.output) {
currentResult.messages.push({
role: "assistant",
content: [{ type: "text", text: result.output }],
model: modelOverride ?? agent.model,
stopReason: result.ok ? "stop" : "error",
errorMessage: result.ok ? undefined : result.stderr,
});
currentResult.usage.turns = 1;
}
if (!result.ok) currentResult.errorMessage = result.stderr;
emitUpdate();
return currentResult;
@ -1243,11 +1283,13 @@ async function runSingleAgent(
error instanceof Error
? error.message
: `Subagent failed: ${String(error)}`;
currentResult.exitCode = signal?.aborted ? 1 : 1;
currentResult.exitCode = 1;
currentResult.stderr += currentResult.stderr ? `\n${message}` : message;
currentResult.errorMessage = message;
emitUpdate();
return currentResult;
} finally {
liveSubagentControllers.delete(controller);
}
}
async function runSingleAgentInCmuxSplit(