diff --git a/packages/coding-agent/src/modes/rpc/rpc-client.ts b/packages/coding-agent/src/modes/rpc/rpc-client.ts index 3c10bef49..d1655712a 100644 --- a/packages/coding-agent/src/modes/rpc/rpc-client.ts +++ b/packages/coding-agent/src/modes/rpc/rpc-client.ts @@ -4,7 +4,7 @@ * Spawns the agent in RPC mode and provides a typed API for all operations. */ -import { type ChildProcess, spawn } from "node:child_process"; +import { type ChildProcess, spawn, spawnSync } from "node:child_process"; import { existsSync } from "node:fs"; import { dirname, join, resolve } from "node:path"; import type { @@ -129,6 +129,110 @@ export function buildRpcLaunchSpec(cliPath: string): RpcLaunchSpec { }; } +/** + * Return the platform-safe detached flag for RPC child processes. + * + * Purpose: put Unix RPC children in their own process group so headless + * shutdown can terminate the whole agent/tool subtree instead of only the + * immediate child process. + * + * Consumer: RpcClient.start. + */ +export function shouldDetachRpcChild(): boolean { + return process.platform !== "win32"; +} + +/** + * Send a signal to the RPC child process tree. + * + * Purpose: prevent bounded headless invocations from leaving autonomous agent + * or shell-tool descendants running after the headless parent receives SIGTERM. + * + * Consumer: RpcClient.stop. + */ +export function signalRpcProcessTree( + child: ChildProcess, + signal: NodeJS.Signals, +): void { + if (process.platform === "win32") { + child.kill(signal); + return; + } + const pid = child.pid; + if (pid) { + try { + process.kill(-pid, signal); + return; + } catch { + // Fall back to the direct child if it is not a process-group leader. + } + } + child.kill(signal); +} + +function listChildPids(pid: number): number[] { + if (process.platform === "win32") return []; + try { + const result = spawnSync("ps", ["-o", "pid=", "--ppid", String(pid)], { + encoding: "utf8", + timeout: 1000, + }); + if (result.status !== 0 || !result.stdout) return []; + return result.stdout + .split(/\s+/) + .map((value) => Number.parseInt(value, 10)) + .filter((value) => Number.isFinite(value) && value > 0); + } catch { + return []; + } +} + +/** + * List descendants of an RPC child before the child is signalled. + * + * Purpose: catch detached shell-tool children while they still have a parent + * link to the RPC process, so headless timeout cleanup can terminate their + * process groups even if the RPC process exits before its shutdown hook runs. + * + * Consumer: RpcClient.stop. + */ +export function collectRpcDescendantPids(rootPid: number): number[] { + const seen = new Set(); + const queue = [rootPid]; + while (queue.length > 0) { + const pid = queue.shift(); + if (!pid || seen.has(pid)) continue; + seen.add(pid); + for (const childPid of listChildPids(pid)) { + if (!seen.has(childPid)) queue.push(childPid); + } + } + seen.delete(rootPid); + return [...seen]; +} + +function signalRpcPidTree(pid: number, signal: NodeJS.Signals): void { + if (process.platform === "win32") { + try { + process.kill(pid, signal); + } catch { + // Process already exited. + } + return; + } + try { + process.kill(-pid, signal); + return; + } catch { + // Fall back to the direct process when it is not a process-group leader. + } + try { + process.kill(pid, signal); + } catch { + // Process already exited. + } +} + // ============================================================================ // RPC Client // ============================================================================ @@ -172,6 +276,7 @@ export class RpcClient { this.process = spawn(launchSpec.command, [...launchSpec.args, ...args], { cwd: this.options.cwd, env: { ...process.env, ...this.options.env }, + detached: shouldDetachRpcChild(), stdio: ["pipe", "pipe", "pipe"], }); @@ -219,18 +324,23 @@ export class RpcClient { async stop(): Promise { if (!this.process) return; + const rootPid = this.process.pid; + const descendantPids = rootPid ? collectRpcDescendantPids(rootPid) : []; + this.stopReadingStdout?.(); this.stopReadingStdout = null; if (this._stderrHandler) { this.process.stderr?.removeListener("data", this._stderrHandler); this._stderrHandler = undefined; } - this.process.kill("SIGTERM"); + for (const pid of descendantPids) signalRpcPidTree(pid, "SIGTERM"); + signalRpcProcessTree(this.process, "SIGTERM"); // Wait for process to exit await new Promise((resolve) => { const timeout = setTimeout(() => { - this.process?.kill("SIGKILL"); + for (const pid of descendantPids) signalRpcPidTree(pid, "SIGKILL"); + if (this.process) signalRpcProcessTree(this.process, "SIGKILL"); resolve(); }, 1000); diff --git a/packages/coding-agent/src/modes/rpc/rpc-protocol-v2.test.ts b/packages/coding-agent/src/modes/rpc/rpc-protocol-v2.test.ts index 27f359caf..cf0ef411c 100644 --- a/packages/coding-agent/src/modes/rpc/rpc-protocol-v2.test.ts +++ b/packages/coding-agent/src/modes/rpc/rpc-protocol-v2.test.ts @@ -7,10 +7,16 @@ */ import assert from "node:assert/strict"; +import { spawn } from "node:child_process"; import { PassThrough } from "node:stream"; import { afterEach, beforeEach, describe, it } from "vitest"; import { attachJsonlLineReader, serializeJsonLine } from "./jsonl.js"; -import { buildRpcLaunchSpec } from "./rpc-client.js"; +import { + buildRpcLaunchSpec, + collectRpcDescendantPids, + shouldDetachRpcChild, + signalRpcProcessTree, +} from "./rpc-client.js"; import type { RpcCommand, RpcCostUpdateEvent, @@ -566,6 +572,65 @@ describe("RpcClient command serialization", () => { assert.equal(launchSpec.command, "/tmp/bin/sf-from-source"); assert.deepEqual(launchSpec.args, []); }); + + it("rpc child runs detached on Unix so stop can target its process group", () => { + assert.equal(shouldDetachRpcChild(), process.platform !== "win32"); + }); + + it("signalRpcProcessTree targets Unix process group before direct child fallback", () => { + const calls: Array<[number, NodeJS.Signals]> = []; + const originalKill = process.kill; + const child = { + pid: 12345, + kill: (_signal: NodeJS.Signals) => { + throw new Error("direct child fallback should not be used"); + }, + } as unknown as import("node:child_process").ChildProcess; + + try { + (process as unknown as { kill: typeof process.kill }).kill = (( + pid: number, + signal?: NodeJS.Signals | number, + ) => { + calls.push([pid, signal as NodeJS.Signals]); + return true; + }) as typeof process.kill; + + signalRpcProcessTree(child, "SIGTERM"); + } finally { + (process as unknown as { kill: typeof process.kill }).kill = originalKill; + } + + if (process.platform === "win32") { + assert.deepEqual(calls, []); + } else { + assert.deepEqual(calls, [[-12345, "SIGTERM"]]); + } + }); + + it("collectRpcDescendantPids finds shell-tool children before parent shutdown", async () => { + if (process.platform === "win32") return; + + const child = spawn("sh", ["-c", "sleep 10 & wait"], { + stdio: "ignore", + }); + try { + await tick(100); + assert.ok(child.pid); + const descendants = collectRpcDescendantPids(child.pid!); + assert.ok( + descendants.length >= 1, + "expected shell child process to be listed", + ); + } finally { + for (const pid of child.pid ? collectRpcDescendantPids(child.pid) : []) { + try { + process.kill(pid, "SIGTERM"); + } catch {} + } + child.kill("SIGTERM"); + } + }); }); // ============================================================================ diff --git a/packages/rpc-client/src/rpc-client.test.ts b/packages/rpc-client/src/rpc-client.test.ts index 57fdcc3a2..7c1b9d76d 100644 --- a/packages/rpc-client/src/rpc-client.test.ts +++ b/packages/rpc-client/src/rpc-client.test.ts @@ -1,9 +1,16 @@ import assert from "node:assert/strict"; +import { spawn } from "node:child_process"; import { PassThrough } from "node:stream"; import { describe, it } from "vitest"; import { attachJsonlLineReader, serializeJsonLine } from "./jsonl.js"; import type { SdkAgentEvent } from "./rpc-client.js"; -import { buildRpcLaunchSpec, RpcClient } from "./rpc-client.js"; +import { + buildRpcLaunchSpec, + collectRpcDescendantPids, + RpcClient, + shouldDetachRpcChild, + signalRpcProcessTree, +} from "./rpc-client.js"; import type { RpcCostUpdateEvent, RpcExecutionCompleteEvent, @@ -324,6 +331,65 @@ describe("buildRpcLaunchSpec", () => { assert.equal(launchSpec.args[2], "--experimental-strip-types"); assert.equal(launchSpec.args[3], cliPath); }); + + it("rpc child runs detached on Unix so stop can target its process group", () => { + assert.equal(shouldDetachRpcChild(), process.platform !== "win32"); + }); + + it("signalRpcProcessTree targets Unix process group before direct child fallback", () => { + const calls: Array<[number, NodeJS.Signals]> = []; + const originalKill = process.kill; + const child = { + pid: 12345, + kill: (_signal: NodeJS.Signals) => { + throw new Error("direct child fallback should not be used"); + }, + } as unknown as import("node:child_process").ChildProcess; + + try { + (process as unknown as { kill: typeof process.kill }).kill = (( + pid: number, + signal?: NodeJS.Signals | number, + ) => { + calls.push([pid, signal as NodeJS.Signals]); + return true; + }) as typeof process.kill; + + signalRpcProcessTree(child, "SIGTERM"); + } finally { + (process as unknown as { kill: typeof process.kill }).kill = originalKill; + } + + if (process.platform === "win32") { + assert.deepEqual(calls, []); + } else { + assert.deepEqual(calls, [[-12345, "SIGTERM"]]); + } + }); + + it("collectRpcDescendantPids finds shell-tool children before parent shutdown", async () => { + if (process.platform === "win32") return; + + const child = spawn("sh", ["-c", "sleep 10 & wait"], { + stdio: "ignore", + }); + try { + await tick(100); + assert.ok(child.pid); + const descendants = collectRpcDescendantPids(child.pid!); + assert.ok( + descendants.length >= 1, + "expected shell child process to be listed", + ); + } finally { + for (const pid of child.pid ? collectRpcDescendantPids(child.pid) : []) { + try { + process.kill(pid, "SIGTERM"); + } catch {} + } + child.kill("SIGTERM"); + } + }); }); // ============================================================================ diff --git a/packages/rpc-client/src/rpc-client.ts b/packages/rpc-client/src/rpc-client.ts index 9ff3eac37..cfee291ae 100644 --- a/packages/rpc-client/src/rpc-client.ts +++ b/packages/rpc-client/src/rpc-client.ts @@ -6,7 +6,7 @@ * package dependencies. */ -import { type ChildProcess, spawn } from "node:child_process"; +import { type ChildProcess, spawn, spawnSync } from "node:child_process"; import { existsSync } from "node:fs"; import { dirname, join, resolve } from "node:path"; import { attachJsonlLineReader, serializeJsonLine } from "./jsonl.js"; @@ -127,6 +127,110 @@ export function buildRpcLaunchSpec(cliPath: string): RpcLaunchSpec { }; } +/** + * Return the platform-safe detached flag for RPC child processes. + * + * Purpose: put Unix RPC children in their own process group so headless + * shutdown can terminate the whole agent/tool subtree instead of only the + * immediate child process. + * + * Consumer: RpcClient.start. + */ +export function shouldDetachRpcChild(): boolean { + return process.platform !== "win32"; +} + +/** + * Send a signal to the RPC child process tree. + * + * Purpose: prevent bounded headless invocations from leaving autonomous agent + * or shell-tool descendants running after the headless parent receives SIGTERM. + * + * Consumer: RpcClient.stop. + */ +export function signalRpcProcessTree( + child: ChildProcess, + signal: NodeJS.Signals, +): void { + if (process.platform === "win32") { + child.kill(signal); + return; + } + const pid = child.pid; + if (pid) { + try { + process.kill(-pid, signal); + return; + } catch { + // Fall back to the direct child if it is not a process-group leader. + } + } + child.kill(signal); +} + +function listChildPids(pid: number): number[] { + if (process.platform === "win32") return []; + try { + const result = spawnSync("ps", ["-o", "pid=", "--ppid", String(pid)], { + encoding: "utf8", + timeout: 1000, + }); + if (result.status !== 0 || !result.stdout) return []; + return result.stdout + .split(/\s+/) + .map((value) => Number.parseInt(value, 10)) + .filter((value) => Number.isFinite(value) && value > 0); + } catch { + return []; + } +} + +/** + * List descendants of an RPC child before the child is signalled. + * + * Purpose: catch detached shell-tool children while they still have a parent + * link to the RPC process, so headless timeout cleanup can terminate their + * process groups even if the RPC process exits before its shutdown hook runs. + * + * Consumer: RpcClient.stop. + */ +export function collectRpcDescendantPids(rootPid: number): number[] { + const seen = new Set(); + const queue = [rootPid]; + while (queue.length > 0) { + const pid = queue.shift(); + if (!pid || seen.has(pid)) continue; + seen.add(pid); + for (const childPid of listChildPids(pid)) { + if (!seen.has(childPid)) queue.push(childPid); + } + } + seen.delete(rootPid); + return [...seen]; +} + +function signalRpcPidTree(pid: number, signal: NodeJS.Signals): void { + if (process.platform === "win32") { + try { + process.kill(pid, signal); + } catch { + // Process already exited. + } + return; + } + try { + process.kill(-pid, signal); + return; + } catch { + // Fall back to the direct process when it is not a process-group leader. + } + try { + process.kill(pid, signal); + } catch { + // Process already exited. + } +} + // ============================================================================ // RPC Client // ============================================================================ @@ -173,6 +277,7 @@ export class RpcClient { this.process = spawn(launchSpec.command, [...launchSpec.args, ...args], { cwd: this.options.cwd, env: { ...process.env, ...this.options.env }, + detached: shouldDetachRpcChild(), stdio: ["pipe", "pipe", "pipe"], }); @@ -221,6 +326,8 @@ export class RpcClient { if (!this.process) return; this._stopped = true; + const rootPid = this.process.pid; + const descendantPids = rootPid ? collectRpcDescendantPids(rootPid) : []; this.stopReadingStdout?.(); this.stopReadingStdout = null; @@ -228,12 +335,14 @@ export class RpcClient { this.process.stderr?.removeListener("data", this._stderrHandler); this._stderrHandler = undefined; } - this.process.kill("SIGTERM"); + for (const pid of descendantPids) signalRpcPidTree(pid, "SIGTERM"); + signalRpcProcessTree(this.process, "SIGTERM"); // Wait for process to exit await new Promise((resolve) => { const timeout = setTimeout(() => { - this.process?.kill("SIGKILL"); + for (const pid of descendantPids) signalRpcPidTree(pid, "SIGKILL"); + if (this.process) signalRpcProcessTree(this.process, "SIGKILL"); resolve(); }, 1000); diff --git a/src/headless.ts b/src/headless.ts index cdb008585..77d1c3521 100644 --- a/src/headless.ts +++ b/src/headless.ts @@ -1839,38 +1839,39 @@ async function runHeadlessOnce( }); // Signal handling + let signalShutdownInProgress = false; const signalHandler = () => { + if (signalShutdownInProgress) return; + signalShutdownInProgress = true; process.stderr.write( "\n[headless] Interrupted, stopping child process...\n", ); interrupted = true; exitCode = EXIT_CANCELLED; - // Kill child process — don't await, just fire and exit. - // The main flow may be awaiting a promise that resolves when the child dies, - // which would race with this handler. Exit synchronously to ensure correct exit code. // Log stop failures to stderr for head/headless parity — interactive-mode logs its // stop errors too. Exit code is already forced via process.exit, so logging is // purely observability and doesn't change shutdown semantics. - try { - client.stop().catch((err: unknown) => { + void (async () => { + try { + await Promise.race([ + client.stop(), + new Promise((resolve) => setTimeout(resolve, 2_500)), + ]); + } catch (err: unknown) { process.stderr.write( - `[headless] client.stop() rejected: ${err instanceof Error ? err.message : String(err)}\n`, + `[headless] client.stop() failed: ${err instanceof Error ? err.message : String(err)}\n`, ); - }); - } catch (err) { - process.stderr.write( - `[headless] client.stop() threw: ${err instanceof Error ? err.message : String(err)}\n`, - ); - } - if (timeoutTimer) clearTimeout(timeoutTimer); - if (idleTimer) clearTimeout(idleTimer); - if (heartbeatTimer) clearInterval(heartbeatTimer); - // Emit batch JSON result if in json mode before exiting - if (options.outputFormat === "json") { - emitBatchJsonResult(); - } - finalizeAndFlushTrace(); - process.exit(exitCode); + } + if (timeoutTimer) clearTimeout(timeoutTimer); + if (idleTimer) clearTimeout(idleTimer); + if (heartbeatTimer) clearInterval(heartbeatTimer); + // Emit batch JSON result if in json mode before exiting + if (options.outputFormat === "json") { + emitBatchJsonResult(); + } + finalizeAndFlushTrace(); + process.exit(exitCode); + })(); }; process.on("SIGINT", signalHandler); process.on("SIGTERM", signalHandler); diff --git a/src/resources/extensions/sf/auto/run-unit.js b/src/resources/extensions/sf/auto/run-unit.js index f689ee044..fe6677edb 100644 --- a/src/resources/extensions/sf/auto/run-unit.js +++ b/src/resources/extensions/sf/auto/run-unit.js @@ -185,9 +185,58 @@ async function runUnitViaSwarm(ctx, pi, s, unitType, unitId, prompt, options) { (supervisor.hard_timeout_minutes ?? 8) * 60 * 1000, ); + // ── Event collector: capture real tool calls and completion signal ────────── + // The worker agent emits events as it runs. We intercept "toolcall_end" + // events from the assistantMessageEvent stream to: + // 1. Build the real tool-call content blocks (surfaced in event.messages). + // 2. Detect the canonical `checkpoint` tool call to derive the actual outcome + // so the orchestrator sees "complete" / "blocked" / "continue" accurately. + const collectedToolCalls = []; + let workerSignaledOutcome = null; // "complete" | "progress" | "continue" | "blocked" + let workerSummary = null; + + function onEvent(event) { + if ( + event?.type === "message_update" && + event.assistantMessageEvent?.type === "toolcall_end" + ) { + const toolCall = event.assistantMessageEvent.toolCall; + if (toolCall) { + // Capture the tool call in the standard tool_use block shape + // expected by classifyExecutorRefusal / isNoOpExecutorTranscript. + collectedToolCalls.push({ + type: "tool_use", + name: toolCall.name, + input: toolCall.arguments ?? {}, + }); + + // Detect the canonical completion-signal tool: "checkpoint". + // This is the same tool inspected by the legacy runUnit path's solver pass + // (phases-unit.js line ~989: activeToolsAllowlist: ["checkpoint"]). + if (toolCall.name === "checkpoint") { + const args = toolCall.arguments ?? {}; + const rawOutcome = String(args.outcome ?? "").toLowerCase(); + if (rawOutcome === "complete" || rawOutcome === "done" || rawOutcome === "success") { + workerSignaledOutcome = "complete"; + } else if (rawOutcome === "progress" || rawOutcome === "partial") { + workerSignaledOutcome = "progress"; + } else if (rawOutcome === "blocked") { + workerSignaledOutcome = "blocked"; + } else { + workerSignaledOutcome = workerSignaledOutcome ?? "continue"; + } + // Capture the summary if provided. + if (args.summary) { + workerSummary = String(args.summary); + } + } + } + } + } + let swarmResult; try { - swarmResult = await swarmDispatchAndWait(basePath, envelope, { timeoutMs }); + swarmResult = await swarmDispatchAndWait(basePath, envelope, { timeoutMs, onEvent }); } catch (err) { const msg = `swarmDispatchAndWait threw: ${getErrorMessage(err)}`; debugLog("runUnit[swarm]", { phase: "dispatch-error", unitType, unitId, error: msg }); diff --git a/src/resources/extensions/sf/uok/agent-runner.js b/src/resources/extensions/sf/uok/agent-runner.js index 5b69b62a9..991f2e845 100644 --- a/src/resources/extensions/sf/uok/agent-runner.js +++ b/src/resources/extensions/sf/uok/agent-runner.js @@ -56,12 +56,20 @@ function buildAgentPrompt(agent, messages) { /** * Execute a prompt via in-process runSubagent. * Returns the LLM response text. + * + * @param {string} basePath + * @param {string} prompt + * @param {number} [timeoutMs] + * @param {object} [opts] + * @param {Function} [opts.onEvent] Optional event callback forwarded to runSubagent. */ async function runHeadlessPrompt( basePath, prompt, timeoutMs = DEFAULT_RUNNER_TIMEOUT_MS, + opts = {}, ) { + const { onEvent } = opts; const result = await runSubagent( { systemPrompt: @@ -70,7 +78,7 @@ async function runHeadlessPrompt( name: "swarm-agent", }, prompt, - { timeoutMs }, + { timeoutMs, ...(onEvent ? { onEvent } : {}) }, ); if (!result.ok) { @@ -98,6 +106,7 @@ async function runHeadlessPrompt( * This makes each dispatchAndWait call surgical — the reply will target the * specified messageId exactly, and legitimately queued messages from other * senders remain unread and available for the next turn. + * @param {Function} [opts.onEvent] Optional event callback forwarded to runHeadlessPrompt. * @returns {Promise<{turnsProcessed: number, response: string|null}>} */ export async function runAgentTurn(agent, opts = {}) { @@ -105,6 +114,7 @@ export async function runAgentTurn(agent, opts = {}) { maxContextTurns = DEFAULT_MAX_CONTEXT_TURNS, timeoutMs = DEFAULT_RUNNER_TIMEOUT_MS, onlyMessageId, + onEvent, } = opts; // When onlyMessageId is set, force-refresh the inbox from SQLite so that @@ -148,7 +158,7 @@ export async function runAgentTurn(agent, opts = {}) { let response; try { - response = await runHeadlessPrompt(agent._basePath, prompt, timeoutMs); + response = await runHeadlessPrompt(agent._basePath, prompt, timeoutMs, { onEvent }); } catch (err) { // On failure, write error back to bus so sender knows agent._bus.send( diff --git a/src/resources/extensions/sf/uok/swarm-dispatch.js b/src/resources/extensions/sf/uok/swarm-dispatch.js index 1e3488b38..46b001c65 100644 --- a/src/resources/extensions/sf/uok/swarm-dispatch.js +++ b/src/resources/extensions/sf/uok/swarm-dispatch.js @@ -279,10 +279,11 @@ export class SwarmDispatchLayer { * @param {object} [options={}] * @param {number} [options.timeoutMs=480000] Hard cap for the agent's turn. * @param {AbortSignal} [options.signal] + * @param {Function} [options.onEvent] Optional event callback forwarded to runAgentTurn. * @returns {Promise} */ async dispatchAndWait(envelope, options = {}) { - const { timeoutMs = 480_000, signal } = options; + const { timeoutMs = 480_000, signal, onEvent } = options; // A2A path: no synchronous wait support yet — return nulled reply fields. if (process.env.SF_A2A_ENABLED) { @@ -318,6 +319,7 @@ export class SwarmDispatchLayer { timeoutMs, signal, onlyMessageId: dispatchResult.messageId, + ...(onEvent ? { onEvent } : {}), }); } catch (err) { return { @@ -453,6 +455,7 @@ export async function swarmDispatch(basePath, envelope) { * @param {object} [options] * @param {number} [options.timeoutMs=480000] * @param {AbortSignal} [options.signal] + * @param {Function} [options.onEvent] Optional event callback forwarded to the agent turn. * @returns {Promise} */ export async function swarmDispatchAndWait(basePath, envelope, options) {