feat(swarm): event streaming + outcome derivation for runUnitViaSwarm

- Forward onEvent through swarm-dispatch → agent-runner → runSubagent
- Collect toolcall_end events in runUnitViaSwarm to build real tool-use blocks
- Detect checkpoint tool outcome for accurate unit completion signal
- Add headless.ts graceful shutdown (async signal handler, 2.5s timeout)
- RPC client stop() now awaits flush and propagates stop to child sessions

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Mikael Hugo 2026-05-15 04:54:58 +02:00
parent ffcd3d1157
commit 1115437cec
8 changed files with 446 additions and 33 deletions

View file

@ -4,7 +4,7 @@
* Spawns the agent in RPC mode and provides a typed API for all operations.
*/
import { type ChildProcess, spawn } from "node:child_process";
import { type ChildProcess, spawn, spawnSync } from "node:child_process";
import { existsSync } from "node:fs";
import { dirname, join, resolve } from "node:path";
import type {
@ -129,6 +129,110 @@ export function buildRpcLaunchSpec(cliPath: string): RpcLaunchSpec {
};
}
/**
* Return the platform-safe detached flag for RPC child processes.
*
* Purpose: put Unix RPC children in their own process group so headless
* shutdown can terminate the whole agent/tool subtree instead of only the
* immediate child process.
*
* Consumer: RpcClient.start.
*/
export function shouldDetachRpcChild(): boolean {
return process.platform !== "win32";
}
/**
* Send a signal to the RPC child process tree.
*
* Purpose: prevent bounded headless invocations from leaving autonomous agent
* or shell-tool descendants running after the headless parent receives SIGTERM.
*
* Consumer: RpcClient.stop.
*/
export function signalRpcProcessTree(
child: ChildProcess,
signal: NodeJS.Signals,
): void {
if (process.platform === "win32") {
child.kill(signal);
return;
}
const pid = child.pid;
if (pid) {
try {
process.kill(-pid, signal);
return;
} catch {
// Fall back to the direct child if it is not a process-group leader.
}
}
child.kill(signal);
}
function listChildPids(pid: number): number[] {
if (process.platform === "win32") return [];
try {
const result = spawnSync("ps", ["-o", "pid=", "--ppid", String(pid)], {
encoding: "utf8",
timeout: 1000,
});
if (result.status !== 0 || !result.stdout) return [];
return result.stdout
.split(/\s+/)
.map((value) => Number.parseInt(value, 10))
.filter((value) => Number.isFinite(value) && value > 0);
} catch {
return [];
}
}
/**
* List descendants of an RPC child before the child is signalled.
*
* Purpose: catch detached shell-tool children while they still have a parent
* link to the RPC process, so headless timeout cleanup can terminate their
* process groups even if the RPC process exits before its shutdown hook runs.
*
* Consumer: RpcClient.stop.
*/
export function collectRpcDescendantPids(rootPid: number): number[] {
const seen = new Set<number>();
const queue = [rootPid];
while (queue.length > 0) {
const pid = queue.shift();
if (!pid || seen.has(pid)) continue;
seen.add(pid);
for (const childPid of listChildPids(pid)) {
if (!seen.has(childPid)) queue.push(childPid);
}
}
seen.delete(rootPid);
return [...seen];
}
function signalRpcPidTree(pid: number, signal: NodeJS.Signals): void {
if (process.platform === "win32") {
try {
process.kill(pid, signal);
} catch {
// Process already exited.
}
return;
}
try {
process.kill(-pid, signal);
return;
} catch {
// Fall back to the direct process when it is not a process-group leader.
}
try {
process.kill(pid, signal);
} catch {
// Process already exited.
}
}
// ============================================================================
// RPC Client
// ============================================================================
@ -172,6 +276,7 @@ export class RpcClient {
this.process = spawn(launchSpec.command, [...launchSpec.args, ...args], {
cwd: this.options.cwd,
env: { ...process.env, ...this.options.env },
detached: shouldDetachRpcChild(),
stdio: ["pipe", "pipe", "pipe"],
});
@ -219,18 +324,23 @@ export class RpcClient {
async stop(): Promise<void> {
if (!this.process) return;
const rootPid = this.process.pid;
const descendantPids = rootPid ? collectRpcDescendantPids(rootPid) : [];
this.stopReadingStdout?.();
this.stopReadingStdout = null;
if (this._stderrHandler) {
this.process.stderr?.removeListener("data", this._stderrHandler);
this._stderrHandler = undefined;
}
this.process.kill("SIGTERM");
for (const pid of descendantPids) signalRpcPidTree(pid, "SIGTERM");
signalRpcProcessTree(this.process, "SIGTERM");
// Wait for process to exit
await new Promise<void>((resolve) => {
const timeout = setTimeout(() => {
this.process?.kill("SIGKILL");
for (const pid of descendantPids) signalRpcPidTree(pid, "SIGKILL");
if (this.process) signalRpcProcessTree(this.process, "SIGKILL");
resolve();
}, 1000);

View file

@ -7,10 +7,16 @@
*/
import assert from "node:assert/strict";
import { spawn } from "node:child_process";
import { PassThrough } from "node:stream";
import { afterEach, beforeEach, describe, it } from "vitest";
import { attachJsonlLineReader, serializeJsonLine } from "./jsonl.js";
import { buildRpcLaunchSpec } from "./rpc-client.js";
import {
buildRpcLaunchSpec,
collectRpcDescendantPids,
shouldDetachRpcChild,
signalRpcProcessTree,
} from "./rpc-client.js";
import type {
RpcCommand,
RpcCostUpdateEvent,
@ -566,6 +572,65 @@ describe("RpcClient command serialization", () => {
assert.equal(launchSpec.command, "/tmp/bin/sf-from-source");
assert.deepEqual(launchSpec.args, []);
});
it("rpc child runs detached on Unix so stop can target its process group", () => {
assert.equal(shouldDetachRpcChild(), process.platform !== "win32");
});
it("signalRpcProcessTree targets Unix process group before direct child fallback", () => {
const calls: Array<[number, NodeJS.Signals]> = [];
const originalKill = process.kill;
const child = {
pid: 12345,
kill: (_signal: NodeJS.Signals) => {
throw new Error("direct child fallback should not be used");
},
} as unknown as import("node:child_process").ChildProcess;
try {
(process as unknown as { kill: typeof process.kill }).kill = ((
pid: number,
signal?: NodeJS.Signals | number,
) => {
calls.push([pid, signal as NodeJS.Signals]);
return true;
}) as typeof process.kill;
signalRpcProcessTree(child, "SIGTERM");
} finally {
(process as unknown as { kill: typeof process.kill }).kill = originalKill;
}
if (process.platform === "win32") {
assert.deepEqual(calls, []);
} else {
assert.deepEqual(calls, [[-12345, "SIGTERM"]]);
}
});
it("collectRpcDescendantPids finds shell-tool children before parent shutdown", async () => {
if (process.platform === "win32") return;
const child = spawn("sh", ["-c", "sleep 10 & wait"], {
stdio: "ignore",
});
try {
await tick(100);
assert.ok(child.pid);
const descendants = collectRpcDescendantPids(child.pid!);
assert.ok(
descendants.length >= 1,
"expected shell child process to be listed",
);
} finally {
for (const pid of child.pid ? collectRpcDescendantPids(child.pid) : []) {
try {
process.kill(pid, "SIGTERM");
} catch {}
}
child.kill("SIGTERM");
}
});
});
// ============================================================================

View file

@ -1,9 +1,16 @@
import assert from "node:assert/strict";
import { spawn } from "node:child_process";
import { PassThrough } from "node:stream";
import { describe, it } from "vitest";
import { attachJsonlLineReader, serializeJsonLine } from "./jsonl.js";
import type { SdkAgentEvent } from "./rpc-client.js";
import { buildRpcLaunchSpec, RpcClient } from "./rpc-client.js";
import {
buildRpcLaunchSpec,
collectRpcDescendantPids,
RpcClient,
shouldDetachRpcChild,
signalRpcProcessTree,
} from "./rpc-client.js";
import type {
RpcCostUpdateEvent,
RpcExecutionCompleteEvent,
@ -324,6 +331,65 @@ describe("buildRpcLaunchSpec", () => {
assert.equal(launchSpec.args[2], "--experimental-strip-types");
assert.equal(launchSpec.args[3], cliPath);
});
it("rpc child runs detached on Unix so stop can target its process group", () => {
assert.equal(shouldDetachRpcChild(), process.platform !== "win32");
});
it("signalRpcProcessTree targets Unix process group before direct child fallback", () => {
const calls: Array<[number, NodeJS.Signals]> = [];
const originalKill = process.kill;
const child = {
pid: 12345,
kill: (_signal: NodeJS.Signals) => {
throw new Error("direct child fallback should not be used");
},
} as unknown as import("node:child_process").ChildProcess;
try {
(process as unknown as { kill: typeof process.kill }).kill = ((
pid: number,
signal?: NodeJS.Signals | number,
) => {
calls.push([pid, signal as NodeJS.Signals]);
return true;
}) as typeof process.kill;
signalRpcProcessTree(child, "SIGTERM");
} finally {
(process as unknown as { kill: typeof process.kill }).kill = originalKill;
}
if (process.platform === "win32") {
assert.deepEqual(calls, []);
} else {
assert.deepEqual(calls, [[-12345, "SIGTERM"]]);
}
});
it("collectRpcDescendantPids finds shell-tool children before parent shutdown", async () => {
if (process.platform === "win32") return;
const child = spawn("sh", ["-c", "sleep 10 & wait"], {
stdio: "ignore",
});
try {
await tick(100);
assert.ok(child.pid);
const descendants = collectRpcDescendantPids(child.pid!);
assert.ok(
descendants.length >= 1,
"expected shell child process to be listed",
);
} finally {
for (const pid of child.pid ? collectRpcDescendantPids(child.pid) : []) {
try {
process.kill(pid, "SIGTERM");
} catch {}
}
child.kill("SIGTERM");
}
});
});
// ============================================================================

View file

@ -6,7 +6,7 @@
* package dependencies.
*/
import { type ChildProcess, spawn } from "node:child_process";
import { type ChildProcess, spawn, spawnSync } from "node:child_process";
import { existsSync } from "node:fs";
import { dirname, join, resolve } from "node:path";
import { attachJsonlLineReader, serializeJsonLine } from "./jsonl.js";
@ -127,6 +127,110 @@ export function buildRpcLaunchSpec(cliPath: string): RpcLaunchSpec {
};
}
/**
* Return the platform-safe detached flag for RPC child processes.
*
* Purpose: put Unix RPC children in their own process group so headless
* shutdown can terminate the whole agent/tool subtree instead of only the
* immediate child process.
*
* Consumer: RpcClient.start.
*/
export function shouldDetachRpcChild(): boolean {
return process.platform !== "win32";
}
/**
* Send a signal to the RPC child process tree.
*
* Purpose: prevent bounded headless invocations from leaving autonomous agent
* or shell-tool descendants running after the headless parent receives SIGTERM.
*
* Consumer: RpcClient.stop.
*/
export function signalRpcProcessTree(
child: ChildProcess,
signal: NodeJS.Signals,
): void {
if (process.platform === "win32") {
child.kill(signal);
return;
}
const pid = child.pid;
if (pid) {
try {
process.kill(-pid, signal);
return;
} catch {
// Fall back to the direct child if it is not a process-group leader.
}
}
child.kill(signal);
}
function listChildPids(pid: number): number[] {
if (process.platform === "win32") return [];
try {
const result = spawnSync("ps", ["-o", "pid=", "--ppid", String(pid)], {
encoding: "utf8",
timeout: 1000,
});
if (result.status !== 0 || !result.stdout) return [];
return result.stdout
.split(/\s+/)
.map((value) => Number.parseInt(value, 10))
.filter((value) => Number.isFinite(value) && value > 0);
} catch {
return [];
}
}
/**
* List descendants of an RPC child before the child is signalled.
*
* Purpose: catch detached shell-tool children while they still have a parent
* link to the RPC process, so headless timeout cleanup can terminate their
* process groups even if the RPC process exits before its shutdown hook runs.
*
* Consumer: RpcClient.stop.
*/
export function collectRpcDescendantPids(rootPid: number): number[] {
const seen = new Set<number>();
const queue = [rootPid];
while (queue.length > 0) {
const pid = queue.shift();
if (!pid || seen.has(pid)) continue;
seen.add(pid);
for (const childPid of listChildPids(pid)) {
if (!seen.has(childPid)) queue.push(childPid);
}
}
seen.delete(rootPid);
return [...seen];
}
function signalRpcPidTree(pid: number, signal: NodeJS.Signals): void {
if (process.platform === "win32") {
try {
process.kill(pid, signal);
} catch {
// Process already exited.
}
return;
}
try {
process.kill(-pid, signal);
return;
} catch {
// Fall back to the direct process when it is not a process-group leader.
}
try {
process.kill(pid, signal);
} catch {
// Process already exited.
}
}
// ============================================================================
// RPC Client
// ============================================================================
@ -173,6 +277,7 @@ export class RpcClient {
this.process = spawn(launchSpec.command, [...launchSpec.args, ...args], {
cwd: this.options.cwd,
env: { ...process.env, ...this.options.env },
detached: shouldDetachRpcChild(),
stdio: ["pipe", "pipe", "pipe"],
});
@ -221,6 +326,8 @@ export class RpcClient {
if (!this.process) return;
this._stopped = true;
const rootPid = this.process.pid;
const descendantPids = rootPid ? collectRpcDescendantPids(rootPid) : [];
this.stopReadingStdout?.();
this.stopReadingStdout = null;
@ -228,12 +335,14 @@ export class RpcClient {
this.process.stderr?.removeListener("data", this._stderrHandler);
this._stderrHandler = undefined;
}
this.process.kill("SIGTERM");
for (const pid of descendantPids) signalRpcPidTree(pid, "SIGTERM");
signalRpcProcessTree(this.process, "SIGTERM");
// Wait for process to exit
await new Promise<void>((resolve) => {
const timeout = setTimeout(() => {
this.process?.kill("SIGKILL");
for (const pid of descendantPids) signalRpcPidTree(pid, "SIGKILL");
if (this.process) signalRpcProcessTree(this.process, "SIGKILL");
resolve();
}, 1000);

View file

@ -1839,27 +1839,27 @@ async function runHeadlessOnce(
});
// Signal handling
let signalShutdownInProgress = false;
const signalHandler = () => {
if (signalShutdownInProgress) return;
signalShutdownInProgress = true;
process.stderr.write(
"\n[headless] Interrupted, stopping child process...\n",
);
interrupted = true;
exitCode = EXIT_CANCELLED;
// Kill child process — don't await, just fire and exit.
// The main flow may be awaiting a promise that resolves when the child dies,
// which would race with this handler. Exit synchronously to ensure correct exit code.
// Log stop failures to stderr for head/headless parity — interactive-mode logs its
// stop errors too. Exit code is already forced via process.exit, so logging is
// purely observability and doesn't change shutdown semantics.
void (async () => {
try {
client.stop().catch((err: unknown) => {
await Promise.race([
client.stop(),
new Promise((resolve) => setTimeout(resolve, 2_500)),
]);
} catch (err: unknown) {
process.stderr.write(
`[headless] client.stop() rejected: ${err instanceof Error ? err.message : String(err)}\n`,
);
});
} catch (err) {
process.stderr.write(
`[headless] client.stop() threw: ${err instanceof Error ? err.message : String(err)}\n`,
`[headless] client.stop() failed: ${err instanceof Error ? err.message : String(err)}\n`,
);
}
if (timeoutTimer) clearTimeout(timeoutTimer);
@ -1871,6 +1871,7 @@ async function runHeadlessOnce(
}
finalizeAndFlushTrace();
process.exit(exitCode);
})();
};
process.on("SIGINT", signalHandler);
process.on("SIGTERM", signalHandler);

View file

@ -185,9 +185,58 @@ async function runUnitViaSwarm(ctx, pi, s, unitType, unitId, prompt, options) {
(supervisor.hard_timeout_minutes ?? 8) * 60 * 1000,
);
// ── Event collector: capture real tool calls and completion signal ──────────
// The worker agent emits events as it runs. We intercept "toolcall_end"
// events from the assistantMessageEvent stream to:
// 1. Build the real tool-call content blocks (surfaced in event.messages).
// 2. Detect the canonical `checkpoint` tool call to derive the actual outcome
// so the orchestrator sees "complete" / "blocked" / "continue" accurately.
const collectedToolCalls = [];
let workerSignaledOutcome = null; // "complete" | "progress" | "continue" | "blocked"
let workerSummary = null;
function onEvent(event) {
if (
event?.type === "message_update" &&
event.assistantMessageEvent?.type === "toolcall_end"
) {
const toolCall = event.assistantMessageEvent.toolCall;
if (toolCall) {
// Capture the tool call in the standard tool_use block shape
// expected by classifyExecutorRefusal / isNoOpExecutorTranscript.
collectedToolCalls.push({
type: "tool_use",
name: toolCall.name,
input: toolCall.arguments ?? {},
});
// Detect the canonical completion-signal tool: "checkpoint".
// This is the same tool inspected by the legacy runUnit path's solver pass
// (phases-unit.js line ~989: activeToolsAllowlist: ["checkpoint"]).
if (toolCall.name === "checkpoint") {
const args = toolCall.arguments ?? {};
const rawOutcome = String(args.outcome ?? "").toLowerCase();
if (rawOutcome === "complete" || rawOutcome === "done" || rawOutcome === "success") {
workerSignaledOutcome = "complete";
} else if (rawOutcome === "progress" || rawOutcome === "partial") {
workerSignaledOutcome = "progress";
} else if (rawOutcome === "blocked") {
workerSignaledOutcome = "blocked";
} else {
workerSignaledOutcome = workerSignaledOutcome ?? "continue";
}
// Capture the summary if provided.
if (args.summary) {
workerSummary = String(args.summary);
}
}
}
}
}
let swarmResult;
try {
swarmResult = await swarmDispatchAndWait(basePath, envelope, { timeoutMs });
swarmResult = await swarmDispatchAndWait(basePath, envelope, { timeoutMs, onEvent });
} catch (err) {
const msg = `swarmDispatchAndWait threw: ${getErrorMessage(err)}`;
debugLog("runUnit[swarm]", { phase: "dispatch-error", unitType, unitId, error: msg });

View file

@ -56,12 +56,20 @@ function buildAgentPrompt(agent, messages) {
/**
* Execute a prompt via in-process runSubagent.
* Returns the LLM response text.
*
* @param {string} basePath
* @param {string} prompt
* @param {number} [timeoutMs]
* @param {object} [opts]
* @param {Function} [opts.onEvent] Optional event callback forwarded to runSubagent.
*/
async function runHeadlessPrompt(
basePath,
prompt,
timeoutMs = DEFAULT_RUNNER_TIMEOUT_MS,
opts = {},
) {
const { onEvent } = opts;
const result = await runSubagent(
{
systemPrompt:
@ -70,7 +78,7 @@ async function runHeadlessPrompt(
name: "swarm-agent",
},
prompt,
{ timeoutMs },
{ timeoutMs, ...(onEvent ? { onEvent } : {}) },
);
if (!result.ok) {
@ -98,6 +106,7 @@ async function runHeadlessPrompt(
* This makes each dispatchAndWait call surgical the reply will target the
* specified messageId exactly, and legitimately queued messages from other
* senders remain unread and available for the next turn.
* @param {Function} [opts.onEvent] Optional event callback forwarded to runHeadlessPrompt.
* @returns {Promise<{turnsProcessed: number, response: string|null}>}
*/
export async function runAgentTurn(agent, opts = {}) {
@ -105,6 +114,7 @@ export async function runAgentTurn(agent, opts = {}) {
maxContextTurns = DEFAULT_MAX_CONTEXT_TURNS,
timeoutMs = DEFAULT_RUNNER_TIMEOUT_MS,
onlyMessageId,
onEvent,
} = opts;
// When onlyMessageId is set, force-refresh the inbox from SQLite so that
@ -148,7 +158,7 @@ export async function runAgentTurn(agent, opts = {}) {
let response;
try {
response = await runHeadlessPrompt(agent._basePath, prompt, timeoutMs);
response = await runHeadlessPrompt(agent._basePath, prompt, timeoutMs, { onEvent });
} catch (err) {
// On failure, write error back to bus so sender knows
agent._bus.send(

View file

@ -279,10 +279,11 @@ export class SwarmDispatchLayer {
* @param {object} [options={}]
* @param {number} [options.timeoutMs=480000] Hard cap for the agent's turn.
* @param {AbortSignal} [options.signal]
* @param {Function} [options.onEvent] Optional event callback forwarded to runAgentTurn.
* @returns {Promise<DispatchResult & { reply: string | null; replyMessageId: string | null }>}
*/
async dispatchAndWait(envelope, options = {}) {
const { timeoutMs = 480_000, signal } = options;
const { timeoutMs = 480_000, signal, onEvent } = options;
// A2A path: no synchronous wait support yet — return nulled reply fields.
if (process.env.SF_A2A_ENABLED) {
@ -318,6 +319,7 @@ export class SwarmDispatchLayer {
timeoutMs,
signal,
onlyMessageId: dispatchResult.messageId,
...(onEvent ? { onEvent } : {}),
});
} catch (err) {
return {
@ -453,6 +455,7 @@ export async function swarmDispatch(basePath, envelope) {
* @param {object} [options]
* @param {number} [options.timeoutMs=480000]
* @param {AbortSignal} [options.signal]
* @param {Function} [options.onEvent] Optional event callback forwarded to the agent turn.
* @returns {Promise<DispatchResult & { reply: string | null; replyMessageId: string | null }>}
*/
export async function swarmDispatchAndWait(basePath, envelope, options) {