fix(swarm): close multi-dispatch + checkpoint parity gaps

Two real bugs surfaced by SF_AUTONOMOUS_VIA_SWARM=1 dogfood (Round 4):

1. Second dispatch to the same swarm agent returned reply=null because
   each MessageBus instance held a 30s-stale inbox cache. runAgentTurn
   now accepts opts.onlyMessageId; when set it forces agent._inbox.refresh()
   from SQLite, processes only that message, and leaves stale messages
   untouched for later turns. dispatchAndWait passes the just-dispatched
   messageId so each call is surgical.

2. runUnitViaSwarm now writes an appendAutonomousSolverCheckpoint and
   synthesizes a swarm_unit_complete tool_use block alongside the text
   reply, so phases-unit.js stops firing claimed-checkpoint-without-tool
   repair loops. Outcome is conservatively "continue" — a real "complete"
   requires the swarm agent to emit an actual checkpoint tool call
   (future round wires runSubagent.onEvent through dispatchAndWait).

Tests: 51 passing for the two affected files (11 swarm-dispatch +
40 run-unit-via-swarm). Full suite: 1760/1760.

Known remaining gap before flipping default: synthesized outcome is
always "continue", so the loop relies on iteration caps for
termination rather than agent-signaled completion. Wiring real tool
calls through is the next round.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mikael Hugo 2026-05-15 04:37:59 +02:00
parent b428f1ab22
commit 3faa599f9d
5 changed files with 393 additions and 9 deletions

View file

@ -16,6 +16,7 @@ import {
resolveAutoSupervisorConfig,
resolvePersistModelChanges,
} from "../preferences.js";
import { appendAutonomousSolverCheckpoint } from "../autonomous-solver.js";
import {
collectSessionTokenUsage,
collectWorktreeFingerprint,
@ -235,6 +236,82 @@ async function runUnitViaSwarm(ctx, pi, s, unitType, unitId, prompt, options) {
ctx.ui.notify(`[${unitType}] ${unitId} → swarm agent ${swarmResult.targetAgent} completed`, "info");
}
// ── Synthesize checkpoint tool call ──────────────────────────────────────
// phases-unit.js's solver pass calls assessAutonomousSolverTurn() which reads
// the solver state file (written by appendAutonomousSolverCheckpoint). When
// the swarm agent replies as plain text (no real `checkpoint` tool call), no
// state record exists and the orchestrator fires a spurious repair loop
// ("claimed-checkpoint-without-tool" / "missing-checkpoint-retry").
//
// Fix: synthesize a checkpoint by calling appendAutonomousSolverCheckpoint
// directly so the solver state matches what phases-unit.js expects after a
// completed executor turn. Use outcome="continue" — the most conservative
// choice: it is correct even if the unit isn't fully done, causes the loop
// to re-dispatch rather than complete, and avoids false "complete" claims.
//
// Also write a synthetic tool-call block into event.messages so that
// classifyExecutorRefusal and isNoOpExecutorTranscript see structured tool
// activity rather than bare text (which they would classify as a no-op,
// triggering an additional repair path on top of the missing-checkpoint one).
const replyText = swarmResult.reply ?? "";
const checkpointSummary = replyText.length > 500
? `${replyText.slice(0, 497)}...`
: replyText;
try {
appendAutonomousSolverCheckpoint(basePath, {
unitType,
unitId,
outcome: "continue",
summary: checkpointSummary || "Swarm agent completed unit turn.",
completedItems: ["Swarm agent processed unit and replied."],
remainingItems: [],
verificationEvidence: [
`swarm-agent:${swarmResult.targetAgent}`,
`replyMessageId:${swarmResult.replyMessageId ?? "unknown"}`,
],
pdd: {
purpose: "Synthesized checkpoint from swarm agent reply to prevent missing-checkpoint repair loop.",
consumer: "phases-unit.js assessAutonomousSolverTurn",
contract: "outcome=continue is used conservatively so the loop re-evaluates rather than assuming completion.",
failureBoundary: "appendAutonomousSolverCheckpoint failure is swallowed — the loop will repair via its own missing-checkpoint retry path.",
evidence: `swarm-agent ${swarmResult.targetAgent} replied with ${replyText.length} chars`,
nonGoals: "Does not replace real tool-call plumbing from the swarm agent.",
invariants: "Never claims outcome=complete for a swarm reply — callers that truly complete a task should use the real checkpoint tool.",
assumptions: "The swarm agent processed the unit prompt and returned a non-empty reply.",
},
});
debugLog("runUnit[swarm]", {
phase: "synthesized-checkpoint",
unitType,
unitId,
outcome: "continue",
});
} catch (cpErr) {
// Fail-open: if checkpoint synthesis fails, the repair loop will handle it.
debugLog("runUnit[swarm]", {
phase: "synthesized-checkpoint-error",
unitType,
unitId,
error: getErrorMessage(cpErr),
});
}
// Synthetic tool-call block for classifyExecutorRefusal / isNoOpExecutorTranscript.
// Presence of a tool_use block with name != "checkpoint" prevents isNoOpExecutorTranscript
// from flagging the swarm turn as a no-op (which would trigger a second repair path).
// We use a special swarm-completion tool name that does not clash with any real SF tool.
const syntheticCheckpointBlock = {
type: "tool_use",
id: `swarm-${swarmResult.replyMessageId ?? "unknown"}`,
name: "swarm_unit_complete",
input: {
outcome: "continue",
summary: checkpointSummary,
targetAgent: swarmResult.targetAgent,
},
};
// Map swarm reply back to the UnitResult shape. The reply text becomes the
// last message in a synthetic event.messages array so callers that inspect
// event.messages (e.g. for refusal classification) see content.
@ -242,7 +319,10 @@ async function runUnitViaSwarm(ctx, pi, s, unitType, unitId, prompt, options) {
messages: [
{
role: "assistant",
content: swarmResult.reply,
content: [
{ type: "text", text: replyText },
syntheticCheckpointBlock,
],
_swarm: true,
replyMessageId: swarmResult.replyMessageId,
targetAgent: swarmResult.targetAgent,

View file

@ -39,6 +39,20 @@ vi.mock("../preferences.js", () => ({
resolvePersistModelChanges: vi.fn(() => false),
}));
// ─── Mock autonomous-solver.js ────────────────────────────────────────────────
// appendAutonomousSolverCheckpoint is called in runUnitViaSwarm to synthesize a
// checkpoint so phases-unit.js's assessAutonomousSolverTurn does not fire the
// missing-checkpoint repair loop (Bug 2 fix).
const { mockAppendCheckpoint } = vi.hoisted(() => {
const fn = vi.fn();
return { mockAppendCheckpoint: fn };
});
vi.mock("../autonomous-solver.js", () => ({
appendAutonomousSolverCheckpoint: mockAppendCheckpoint,
}));
// ─── Mock everything runUnit imports that touches DB / session infrastructure ─
vi.mock("../debug-logger.js", () => ({ debugLog: vi.fn() }));
@ -176,7 +190,9 @@ describe("runUnit — SF_AUTONOMOUS_VIA_SWARM=1 — happy path", () => {
expect(opts.timeoutMs).toBeGreaterThan(0);
});
test("maps swarm reply into event.messages[last].content", async () => {
test("maps swarm reply into event.messages[last].content (array with text+tool blocks)", async () => {
// content is now an array: [{ type:"text", text:reply }, { type:"tool_use", name:"swarm_unit_complete", ... }]
// The text block carries the raw reply; the tool_use block is the synthetic checkpoint marker.
process.env.SF_AUTONOMOUS_VIA_SWARM = "1";
const ctx = makeCtx("/proj");
@ -186,7 +202,11 @@ describe("runUnit — SF_AUTONOMOUS_VIA_SWARM=1 — happy path", () => {
const result = await runUnit(ctx, pi, s, "execute-task", "u-1", "do stuff", {});
const lastMsg = result.event.messages[result.event.messages.length - 1];
expect(lastMsg.content).toBe(MOCK_REPLY);
expect(Array.isArray(lastMsg.content)).toBe(true);
// Text block is first
expect(lastMsg.content[0]).toMatchObject({ type: "text", text: MOCK_REPLY });
// Tool-use block is second
expect(lastMsg.content[1]).toMatchObject({ type: "tool_use", name: "swarm_unit_complete" });
expect(lastMsg.role).toBe("assistant");
expect(lastMsg.replyMessageId).toBe(MOCK_REPLY_ID);
expect(lastMsg.targetAgent).toBe(MOCK_TARGET);
@ -204,6 +224,135 @@ describe("runUnit — SF_AUTONOMOUS_VIA_SWARM=1 — happy path", () => {
expect(pi.sendMessage).not.toHaveBeenCalled();
});
// ─── Bug 2: synthesized checkpoint prevents repair loop ───────────────────
test("calls appendAutonomousSolverCheckpoint with outcome=continue on success", async () => {
// Bug 2 fix: runUnitViaSwarm must write a synthetic checkpoint so that
// phases-unit.js's assessAutonomousSolverTurn does not find an empty solver
// state and fire the missing-checkpoint repair loop.
process.env.SF_AUTONOMOUS_VIA_SWARM = "1";
const ctx = makeCtx("/proj");
const pi = makePi();
const s = makeS("/proj");
await runUnit(ctx, pi, s, "execute-task", "synth-chk-1", "do the work", {});
expect(mockAppendCheckpoint).toHaveBeenCalledOnce();
const [basePath, params] = mockAppendCheckpoint.mock.calls[0];
expect(basePath).toBe("/proj");
expect(params.unitType).toBe("execute-task");
expect(params.unitId).toBe("synth-chk-1");
expect(params.outcome).toBe("continue");
expect(typeof params.summary).toBe("string");
expect(params.summary.length).toBeGreaterThan(0);
expect(Array.isArray(params.completedItems)).toBe(true);
expect(Array.isArray(params.remainingItems)).toBe(true);
expect(Array.isArray(params.verificationEvidence)).toBe(true);
});
test("synthesized checkpoint summary is truncated at 500 chars", async () => {
process.env.SF_AUTONOMOUS_VIA_SWARM = "1";
const longReply = "x".repeat(600);
mockSwarmDispatchAndWait.mockResolvedValueOnce({
messageId: "msg-long-1",
targetAgent: MOCK_TARGET,
swarmName: "default",
envelope: {},
reply: longReply,
replyMessageId: "rmsg-long-1",
});
const ctx = makeCtx("/proj");
const pi = makePi();
const s = makeS("/proj");
await runUnit(ctx, pi, s, "execute-task", "synth-long", "long reply test", {});
const [, params] = mockAppendCheckpoint.mock.calls[0];
expect(params.summary.length).toBeLessThanOrEqual(500);
expect(params.summary.endsWith("...")).toBe(true);
});
test("event.messages[last].content includes swarm_unit_complete tool_use block", async () => {
// Bug 2 fix: the synthetic tool-call block with name=swarm_unit_complete
// prevents isNoOpExecutorTranscript from flagging the swarm turn as a no-op
// (which would trigger a second repair path via "solver-noop-continue").
process.env.SF_AUTONOMOUS_VIA_SWARM = "1";
const ctx = makeCtx("/proj");
const pi = makePi();
const s = makeS("/proj");
const result = await runUnit(ctx, pi, s, "execute-task", "synth-tool-1", "build it", {});
const lastMsg = result.event.messages[result.event.messages.length - 1];
expect(Array.isArray(lastMsg.content)).toBe(true);
const toolBlock = lastMsg.content.find(
(b) => b.type === "tool_use" && b.name === "swarm_unit_complete",
);
expect(toolBlock).toBeDefined();
expect(toolBlock.input.outcome).toBe("continue");
expect(typeof toolBlock.input.summary).toBe("string");
expect(toolBlock.input.targetAgent).toBe(MOCK_TARGET);
});
test("event.messages[last].content[0] is text block with swarm reply", async () => {
process.env.SF_AUTONOMOUS_VIA_SWARM = "1";
const ctx = makeCtx("/proj");
const pi = makePi();
const s = makeS("/proj");
const result = await runUnit(ctx, pi, s, "execute-task", "synth-text-1", "build it", {});
const lastMsg = result.event.messages[result.event.messages.length - 1];
const textBlock = lastMsg.content.find((b) => b.type === "text");
expect(textBlock).toBeDefined();
expect(textBlock.text).toBe(MOCK_REPLY);
});
test("appendAutonomousSolverCheckpoint failure is silently swallowed (fail-open)", async () => {
// If checkpoint synthesis fails, the loop should still return status=completed
// rather than crashing or returning cancelled.
process.env.SF_AUTONOMOUS_VIA_SWARM = "1";
mockAppendCheckpoint.mockImplementationOnce(() => {
throw new Error("solver state write failed");
});
const ctx = makeCtx("/proj");
const pi = makePi();
const s = makeS("/proj");
const result = await runUnit(ctx, pi, s, "execute-task", "synth-fail-1", "build it", {});
// Must still return completed — checkpoint synthesis failure is non-fatal
expect(result.status).toBe("completed");
});
test("appendAutonomousSolverCheckpoint is NOT called when swarm returns no reply", async () => {
process.env.SF_AUTONOMOUS_VIA_SWARM = "1";
mockSwarmDispatchAndWait.mockResolvedValueOnce({
messageId: "m-noreply",
targetAgent: "worker-1",
swarmName: "default",
reply: null,
replyMessageId: null,
error: "no reply",
});
const ctx = makeCtx("/proj");
const pi = makePi();
const s = makeS("/proj");
await runUnit(ctx, pi, s, "execute-task", "synth-noreply", "prompt", {});
expect(mockAppendCheckpoint).not.toHaveBeenCalled();
});
test("accepts SF_AUTONOMOUS_VIA_SWARM=true (string)", async () => {
process.env.SF_AUTONOMOUS_VIA_SWARM = "true";

View file

@ -28,9 +28,27 @@ import { createDefaultSwarm } from "../uok/swarm-roles.js";
const MOCK_REPLY_TEXT = "mock agent reply: task complete";
vi.mock("../uok/agent-runner.js", () => ({
runAgentTurn: vi.fn(async (agent, _opts) => {
// Read the unread inbox messages (same as the real runner)
const messages = agent.receive(true);
runAgentTurn: vi.fn(async (agent, opts = {}) => {
const { onlyMessageId } = opts;
// When onlyMessageId is set, force-refresh the inbox from SQLite so
// messages delivered via a different MessageBus instance are visible.
// This mirrors the real runAgentTurn's Bug 1 fix.
if (onlyMessageId) {
agent._inbox.refresh();
}
// Isolate to the target message when onlyMessageId is provided, otherwise
// read all unread messages — mirrors the real runAgentTurn logic.
let messages;
if (onlyMessageId) {
const all = agent.receive(false); // all messages (read + unread)
const target = all.find((m) => m.id === onlyMessageId && !m.read);
messages = target ? [target] : [];
} else {
messages = agent.receive(true);
}
if (messages.length === 0) {
return { turnsProcessed: 0, response: null };
}
@ -247,6 +265,103 @@ describe("SwarmDispatchLayer.dispatchAndWait — error paths", () => {
});
});
// ─── Multi-dispatch to the same agent ────────────────────────────────────────
describe("SwarmDispatchLayer.dispatchAndWait — multi-dispatch to same agent", () => {
test("second dispatch to same agent returns correct reply (not null)", async () => {
// This covers Bug 1: the agent's in-memory inbox was stale after the first
// dispatch+turn because INBOX_REFRESH_INTERVAL_MS had not elapsed. The second
// message arrived via a different MessageBus instance (SwarmDispatchLayer._bus)
// and was in SQLite but not in agent._inbox._messages. The fix forces a
// refresh via opts.onlyMessageId before runAgentTurn reads the inbox.
const root = makeProject();
const layer = new SwarmDispatchLayer(root);
// First dispatch — must succeed
const result1 = await layer.dispatchAndWait({
unitId: "task-multi-1",
unitType: "task",
workMode: "build",
payload: "first task",
priority: 5,
scope: "scope-multi",
});
expect(result1.reply).toBe(MOCK_REPLY_TEXT);
expect(result1.error).toBeUndefined();
// Second dispatch to the SAME agent type (build → worker) — must also succeed.
// Before the fix, runAgentTurn would find the agent's inbox stale and return
// turnsProcessed=0, causing getReplyTo to find no reply → error="no reply".
const result2 = await layer.dispatchAndWait({
unitId: "task-multi-2",
unitType: "task",
workMode: "build",
payload: "second task",
priority: 5,
scope: "scope-multi",
});
expect(result2.reply).toBe(MOCK_REPLY_TEXT);
expect(result2.error).toBeUndefined();
expect(result2.replyMessageId).toBeTruthy();
// The two dispatches must have produced different reply message ids
expect(result2.replyMessageId).not.toBe(result1.replyMessageId);
});
test("third dispatch to same agent returns correct reply", async () => {
const root = makeProject();
const layer = new SwarmDispatchLayer(root);
for (let i = 1; i <= 3; i++) {
const result = await layer.dispatchAndWait({
unitId: `task-triple-${i}`,
unitType: "task",
workMode: "build",
payload: `task ${i}`,
priority: 5,
scope: "scope-triple",
});
expect(result.reply).toBe(MOCK_REPLY_TEXT);
expect(result.error).toBeUndefined();
}
});
test("runAgentTurn receives onlyMessageId and does not pick up stale messages", async () => {
// Verify that the onlyMessageId option causes the mock runner to process
// only the target message. The mock in this test file calls agent.receive(false)
// internally, but here we verify the option is forwarded by checking that the
// reply targets the correct messageId via getReplyTo.
const root = makeProject();
const layer = new SwarmDispatchLayer(root);
// Dispatch two tasks to the same agent sequentially
const r1 = await layer.dispatchAndWait({
unitId: "task-isolation-1",
unitType: "task",
workMode: "build",
payload: "isolation task 1",
priority: 5,
scope: "scope-iso",
});
const r2 = await layer.dispatchAndWait({
unitId: "task-isolation-2",
unitType: "task",
workMode: "build",
payload: "isolation task 2",
priority: 5,
scope: "scope-iso",
});
// Both replies are correctly wired to their respective dispatch messageIds
expect(r1.reply).toBe(MOCK_REPLY_TEXT);
expect(r2.reply).toBe(MOCK_REPLY_TEXT);
expect(r1.error).toBeUndefined();
expect(r2.error).toBeUndefined();
});
});
// ─── A2A path — falls through cleanly ────────────────────────────────────────
describe("SwarmDispatchLayer.dispatchAndWait — SF_A2A_ENABLED path", () => {

View file

@ -92,15 +92,47 @@ async function runHeadlessPrompt(
* @param {object} [opts]
* @param {number} [opts.maxContextTurns=10]
* @param {number} [opts.timeoutMs=120000]
* @param {string} [opts.onlyMessageId]
* When set, the agent forces an inbox refresh from SQLite and processes ONLY
* the message with this id, leaving any other unread messages untouched.
* This makes each dispatchAndWait call surgical the reply will target the
* specified messageId exactly, and legitimately queued messages from other
* senders remain unread and available for the next turn.
* @returns {Promise<{turnsProcessed: number, response: string|null}>}
*/
export async function runAgentTurn(agent, opts = {}) {
const {
maxContextTurns = DEFAULT_MAX_CONTEXT_TURNS,
timeoutMs = DEFAULT_RUNNER_TIMEOUT_MS,
onlyMessageId,
} = opts;
const messages = agent.receive(true);
// When onlyMessageId is set, force-refresh the inbox from SQLite so that
// messages delivered via a different MessageBus instance (i.e. the
// SwarmDispatchLayer's bus) are visible even within the 30s cache window.
// This is the root cause of Bug 1: the agent's in-memory inbox is stale on
// a second dispatch because INBOX_REFRESH_INTERVAL_MS has not elapsed.
if (onlyMessageId) {
agent._inbox.refresh();
}
// When onlyMessageId is provided, isolate this message for surgical processing.
// Other unread messages are left untouched so they can be processed in a
// subsequent turn. Without this isolation a stale message from an earlier
// dispatch could be included and its replyTo would mismatch the caller's
// expected dispatchResult.messageId.
let messages;
if (onlyMessageId) {
const allMessages = agent.receive(false); // all messages (read + unread)
const target = allMessages.find((m) => m.id === onlyMessageId && !m.read);
if (!target) {
return { turnsProcessed: 0, response: null };
}
messages = [target];
} else {
messages = agent.receive(true);
}
if (messages.length === 0) {
return { turnsProcessed: 0, response: null };
}

View file

@ -306,11 +306,19 @@ export class SwarmDispatchLayer {
};
}
// Step 3: Drive one inbox-processing turn via runAgentTurn (in-process)
// Step 3: Drive one inbox-processing turn via runAgentTurn (in-process).
// Pass onlyMessageId so runAgentTurn forces an inbox refresh (catching
// messages delivered via a different bus instance) and processes ONLY this
// specific dispatch. Any legitimately queued messages from other senders
// are left unread — they will be picked up on the next runAgentTurn call.
const runAgentTurn = await getRunAgentTurn();
let turnResult;
try {
turnResult = await runAgentTurn(agent, { timeoutMs, signal });
turnResult = await runAgentTurn(agent, {
timeoutMs,
signal,
onlyMessageId: dispatchResult.messageId,
});
} catch (err) {
return {
...dispatchResult,