fix(swarm): bound silent worker responses

This commit is contained in:
Mikael Hugo 2026-05-15 07:35:31 +02:00
parent 81425230f5
commit e464a1bd6e
6 changed files with 150 additions and 6 deletions

View file

@ -87,6 +87,13 @@ function createSubagentUIContext(): ExtensionUIContext {
export interface RunSubagentOptions {
signal?: AbortSignal;
timeoutMs?: number;
/**
* Abort when the subagent produces no session events for this long.
* This is separate from timeoutMs: a long-running worker may keep making
* useful progress, but a silent model call should fail fast so callers can
* retry or route to a different model.
*/
noOutputTimeoutMs?: number;
/**
* Called for each agent session event (forwarded from session.subscribe).
* Use this to drive live UI updates without polling.
@ -111,6 +118,7 @@ export async function runSubagent(
const name = config.name ?? "subagent";
const cwd = config.cwd ?? process.cwd();
const timeoutMs = options?.timeoutMs ?? DEFAULT_SUBAGENT_TIMEOUT_MS;
const noOutputTimeoutMs = options?.noOutputTimeoutMs ?? 0;
// Build an isolated resource loader with the caller's system prompt appended.
const agentDir = getAgentDir();
@ -246,12 +254,18 @@ export async function runSubagent(
};
let timer: ReturnType<typeof setTimeout> | undefined;
let noOutputTimer: ReturnType<typeof setTimeout> | undefined;
let noOutputTimedOut = false;
const cleanup = (): void => {
if (timer) {
clearTimeout(timer);
timer = undefined;
}
if (noOutputTimer) {
clearTimeout(noOutputTimer);
noOutputTimer = undefined;
}
unsubscribe();
};
@ -270,7 +284,12 @@ export async function runSubagent(
}
// Build race competitors.
type RaceResult = { timedOut?: true; cancelled?: true; error?: unknown };
type RaceResult = {
timedOut?: true;
noOutputTimedOut?: true;
cancelled?: true;
error?: unknown;
};
const competitors: Promise<RaceResult>[] = [
promptPromise.then(() => ({}) as RaceResult),
];
@ -286,6 +305,30 @@ export async function runSubagent(
);
}
if (noOutputTimeoutMs > 0) {
competitors.push(
new Promise<RaceResult>((resolve) => {
const armNoOutputTimer = () => {
if (noOutputTimer) clearTimeout(noOutputTimer);
noOutputTimer = setTimeout(() => {
noOutputTimedOut = true;
void session.abort().catch(() => {});
resolve({ noOutputTimedOut: true });
}, noOutputTimeoutMs);
};
armNoOutputTimer();
const previousOnEvent = options?.onEvent;
options = {
...options,
onEvent: (event) => {
if (!noOutputTimedOut) armNoOutputTimer();
previousOnEvent?.(event);
},
};
}),
);
}
if (options?.signal) {
const sig = options.signal;
if (sig.aborted) {
@ -322,6 +365,15 @@ export async function runSubagent(
};
}
if (result.noOutputTimedOut) {
return {
ok: false,
output: extractFinalOutput(),
stderr: `${name} produced no output for ${noOutputTimeoutMs}ms`,
exitCode: 124,
};
}
if (result.cancelled) {
return {
ok: false,

View file

@ -288,6 +288,16 @@ async function runUnitViaSwarm(ctx, _pi, s, unitType, unitId, prompt, options) {
30_000,
(supervisor.hard_timeout_minutes ?? 8) * 60 * 1000,
);
const configuredNoOutputTimeoutMs = Number(
process.env.SF_SWARM_NO_OUTPUT_TIMEOUT_MS ?? "",
);
const noOutputTimeoutMs = Math.min(
timeoutMs,
Number.isFinite(configuredNoOutputTimeoutMs) &&
configuredNoOutputTimeoutMs > 0
? Math.floor(configuredNoOutputTimeoutMs)
: 180_000,
);
// ── Event collector: capture real tool calls and completion signal ──────────
// The worker agent emits events as it runs. We intercept "toolcall_end"
@ -358,6 +368,7 @@ async function runUnitViaSwarm(ctx, _pi, s, unitType, unitId, prompt, options) {
try {
swarmResult = await swarmDispatchAndWait(basePath, envelope, {
timeoutMs,
noOutputTimeoutMs,
onEvent,
});
} catch (err) {

View file

@ -118,11 +118,14 @@ function makeS(basePath = "/tmp/test-project") {
// ─── Save / restore env ───────────────────────────────────────────────────────
let origEnv;
let origNoOutputEnv;
beforeEach(() => {
vi.clearAllMocks();
origEnv = process.env.SF_AUTONOMOUS_VIA_SWARM;
origNoOutputEnv = process.env.SF_SWARM_NO_OUTPUT_TIMEOUT_MS;
delete process.env.SF_AUTONOMOUS_VIA_SWARM;
delete process.env.SF_SWARM_NO_OUTPUT_TIMEOUT_MS;
// Default implementation for the happy-path tests: return a deterministic reply.
mockSwarmDispatchAndWait.mockImplementation(
@ -143,6 +146,11 @@ afterEach(() => {
} else {
process.env.SF_AUTONOMOUS_VIA_SWARM = origEnv;
}
if (origNoOutputEnv === undefined) {
delete process.env.SF_SWARM_NO_OUTPUT_TIMEOUT_MS;
} else {
process.env.SF_SWARM_NO_OUTPUT_TIMEOUT_MS = origNoOutputEnv;
}
});
// ─── Flag ON — happy path ─────────────────────────────────────────────────────
@ -199,6 +207,23 @@ describe("runUnit — SF_AUTONOMOUS_VIA_SWARM=1 — happy path", () => {
expect(envelope.priority).toBe(7);
expect(envelope.executorPermissionLevel).toBe("low");
expect(opts.timeoutMs).toBeGreaterThan(0);
expect(opts.noOutputTimeoutMs).toBe(180_000);
expect(opts.noOutputTimeoutMs).toBeLessThanOrEqual(opts.timeoutMs);
});
test("uses configured no-output timeout without exceeding hard timeout", async () => {
process.env.SF_AUTONOMOUS_VIA_SWARM = "1";
process.env.SF_SWARM_NO_OUTPUT_TIMEOUT_MS = "45000";
const ctx = makeCtx("/proj");
const pi = makePi();
const s = makeS("/proj");
await runUnit(ctx, pi, s, "execute-task", "unit-timeout", "do work", {});
const [, , opts] = mockSwarmDispatchAndWait.mock.calls[0];
expect(opts.noOutputTimeoutMs).toBe(45_000);
expect(opts.noOutputTimeoutMs).toBeLessThanOrEqual(opts.timeoutMs);
});
// ─── Round 7: executor system prompt + tools on the envelope ─────────────

View file

@ -539,6 +539,48 @@ describe("SwarmDispatchLayer.dispatchAndWait — Round 7: executor config forwar
expect(capturedOpts.permissionLevel).toBe("low");
});
test("noOutputTimeoutMs option is forwarded to runAgentTurn", async () => {
const { runAgentTurn } = await import("../uok/agent-runner.js");
let capturedOpts = null;
runAgentTurn.mockImplementationOnce(async (agent, opts = {}) => {
capturedOpts = opts;
const { onlyMessageId } = opts;
if (onlyMessageId) agent._inbox.refresh();
const all = agent.receive(false);
const target = all.find((m) => m.id === onlyMessageId && !m.read);
const messages = target ? [target] : [];
if (messages.length === 0) return { turnsProcessed: 0, response: null };
for (const msg of messages) agent.markRead(msg.id);
const lastMsg = messages[messages.length - 1];
const replyId = agent._bus.send(
`agent:${agent.identity.name}`,
lastMsg.from,
MOCK_REPLY_TEXT,
{ replyTo: lastMsg.id, type: "response" },
);
return { turnsProcessed: 1, response: MOCK_REPLY_TEXT, replyId };
});
const root = makeProject();
const layer = new SwarmDispatchLayer(root);
await layer.dispatchAndWait(
{
unitId: "task-no-output-timeout",
unitType: "execute-task",
workMode: "build",
payload: "edit files",
priority: 5,
scope: "scope-timeout",
},
{ noOutputTimeoutMs: 45_000 },
);
expect(capturedOpts).not.toBeNull();
expect(capturedOpts.noOutputTimeoutMs).toBe(45_000);
});
test("envelope without executorSystemPrompt does not forward systemPromptOverride", async () => {
// Envelopes without the optional fields must not pass undefined opts to runAgentTurn.
const { runAgentTurn } = await import("../uok/agent-runner.js");

View file

@ -18,6 +18,7 @@ import { runSubagent } from "@singularity-forge/coding-agent";
const DEFAULT_MAX_CONTEXT_TURNS = 10;
const DEFAULT_MAX_TURNS_PER_RUN = 5;
const DEFAULT_RUNNER_TIMEOUT_MS = 120_000;
const DEFAULT_NO_OUTPUT_TIMEOUT_MS = 180_000;
const DEFAULT_POLL_INTERVAL_MS = 1_000;
/**
@ -62,6 +63,7 @@ function buildAgentPrompt(agent, messages) {
* @param {number} [timeoutMs]
* @param {object} [opts]
* @param {Function} [opts.onEvent] Optional event callback forwarded to runSubagent.
* @param {number} [opts.noOutputTimeoutMs] Abort a silent model call after this long.
* @param {string} [opts.systemPromptOverride] Override the default swarm-agent system prompt.
* When set (e.g. from envelope.executorSystemPrompt), this replaces the generic
* "persistent agent in a swarm" prompt so the worker receives the full autonomous
@ -78,8 +80,13 @@ async function runHeadlessPrompt(
timeoutMs = DEFAULT_RUNNER_TIMEOUT_MS,
opts = {},
) {
const { onEvent, systemPromptOverride, toolsOverride, permissionLevel } =
opts;
const {
onEvent,
noOutputTimeoutMs = DEFAULT_NO_OUTPUT_TIMEOUT_MS,
systemPromptOverride,
toolsOverride,
permissionLevel,
} = opts;
const result = await runSubagent(
{
systemPrompt:
@ -93,12 +100,14 @@ async function runHeadlessPrompt(
: {}),
},
prompt,
{ timeoutMs, ...(onEvent ? { onEvent } : {}) },
{ timeoutMs, noOutputTimeoutMs, ...(onEvent ? { onEvent } : {}) },
);
if (!result.ok) {
if (result.exitCode === 124) {
throw new Error(`Agent runner timed out after ${timeoutMs}ms`);
throw new Error(
result.stderr ?? `Agent runner timed out after ${timeoutMs}ms`,
);
}
throw new Error(
`sf headless failed: ${result.stderr || result.output || "unknown error"}`,
@ -122,6 +131,7 @@ async function runHeadlessPrompt(
* specified messageId exactly, and legitimately queued messages from other
* senders remain unread and available for the next turn.
* @param {Function} [opts.onEvent] Optional event callback forwarded to runHeadlessPrompt.
* @param {number} [opts.noOutputTimeoutMs] Abort a silent model call after this long.
* @param {string} [opts.systemPromptOverride] Override the worker's system prompt.
* Forwarded to runHeadlessPrompt so executor-specific contracts (e.g. the autonomous
* checkpoint requirement) reach the LLM session unchanged.
@ -136,6 +146,7 @@ export async function runAgentTurn(agent, opts = {}) {
const {
maxContextTurns = DEFAULT_MAX_CONTEXT_TURNS,
timeoutMs = DEFAULT_RUNNER_TIMEOUT_MS,
noOutputTimeoutMs = DEFAULT_NO_OUTPUT_TIMEOUT_MS,
onlyMessageId,
onEvent,
systemPromptOverride,
@ -186,6 +197,7 @@ export async function runAgentTurn(agent, opts = {}) {
try {
response = await runHeadlessPrompt(agent._basePath, prompt, timeoutMs, {
onEvent,
noOutputTimeoutMs,
...(systemPromptOverride ? { systemPromptOverride } : {}),
...(toolsOverride ? { toolsOverride } : {}),
...(permissionLevel ? { permissionLevel } : {}),

View file

@ -288,12 +288,13 @@ export class SwarmDispatchLayer {
* @param {DispatchEnvelope} envelope
* @param {object} [options={}]
* @param {number} [options.timeoutMs=480000] Hard cap for the agent's turn.
* @param {number} [options.noOutputTimeoutMs] Cap for silent model response time.
* @param {AbortSignal} [options.signal]
* @param {Function} [options.onEvent] Optional event callback forwarded to runAgentTurn.
* @returns {Promise<DispatchResult & { reply: string | null; replyMessageId: string | null }>}
*/
async dispatchAndWait(envelope, options = {}) {
const { timeoutMs = 480_000, signal, onEvent } = options;
const { timeoutMs = 480_000, noOutputTimeoutMs, signal, onEvent } = options;
// A2A path: no synchronous wait support yet — return nulled reply fields.
if (process.env.SF_A2A_ENABLED) {
@ -335,6 +336,7 @@ export class SwarmDispatchLayer {
try {
turnResult = await runAgentTurn(agent, {
timeoutMs,
...(noOutputTimeoutMs ? { noOutputTimeoutMs } : {}),
signal,
onlyMessageId: dispatchResult.messageId,
...(onEvent ? { onEvent } : {}),