From 091168303c5aa6dc82a5bc11e94d710394fd7e36 Mon Sep 17 00:00:00 2001 From: Mikael Hugo Date: Fri, 15 May 2026 10:55:37 +0200 Subject: [PATCH] fix(auto): abort swarm checkpoint loops --- scripts/copy-resources.cjs | 43 +++++++-- src/resources/extensions/sf/auto/run-unit.js | 88 ++++++++++++++++++- .../extensions/sf/autonomous-solver.js | 34 ++----- .../sf/tests/run-unit-via-swarm.test.mjs | 72 ++++++++++++++- 4 files changed, 199 insertions(+), 38 deletions(-) diff --git a/scripts/copy-resources.cjs b/scripts/copy-resources.cjs index 74d697e8e..5849b21dd 100644 --- a/scripts/copy-resources.cjs +++ b/scripts/copy-resources.cjs @@ -80,18 +80,49 @@ if (hasTsFiles) { "bin", "tsgo.js", ); - const compile = spawnSync( - process.execPath, - [tsgoBin, "--project", resourcesTsconfig], - { + const compileWithTsgo = () => + spawnSync(process.execPath, [tsgoBin, "--project", resourcesTsconfig], { cwd: root, stdio: "inherit", - }, - ); + }); + const compileWithTsc = () => { + const tscBin = require.resolve("typescript/bin/tsc"); + return spawnSync( + process.execPath, + [tscBin, "--project", resourcesTsconfig], + { + cwd: root, + stdio: "inherit", + }, + ); + }; + + let compile = compileWithTsgo(); if (compile.status !== 0) { process.exit(compile.status ?? 1); } + + // Native tsgo has reported successful emits while leaving .ts resources copied + // without their .js runtime output. Verify a known TS-only resource before the + // launcher tries to import it, and fall back to local tsc if needed. + const requiredJs = join( + distResources, + "extensions", + "sf", + "model-registry.js", + ); + try { + readFileSync(requiredJs); + } catch { + console.warn( + "[copy-resources] tsgo did not emit model-registry.js; retrying resources build with tsc", + ); + compile = compileWithTsc(); + if (compile.status !== 0) { + process.exit(compile.status ?? 1); + } + } } else { // No .ts files — just create the dist/resources directory and copy .js files mkdirSync(distResources, { recursive: true }); diff --git a/src/resources/extensions/sf/auto/run-unit.js b/src/resources/extensions/sf/auto/run-unit.js index b06b8f096..d91c05973 100644 --- a/src/resources/extensions/sf/auto/run-unit.js +++ b/src/resources/extensions/sf/auto/run-unit.js @@ -9,7 +9,10 @@ * path. Default behavior is unchanged when the flag is unset. */ -import { appendAutonomousSolverCheckpoint } from "../autonomous-solver.js"; +import { + appendAutonomousSolverCheckpoint, + MAX_CHECKPOINTS_PER_ITERATION, +} from "../autonomous-solver.js"; import { scopeActiveToolsForUnitType } from "../constants.js"; import { debugLog } from "../debug-logger.js"; import { getErrorMessage } from "../error-utils.js"; @@ -298,6 +301,16 @@ async function runUnitViaSwarm(ctx, _pi, s, unitType, unitId, prompt, options) { ? Math.floor(configuredNoOutputTimeoutMs) : 180_000, ); + const controller = new AbortController(); + if (options?.signal) { + if (options.signal.aborted) { + controller.abort(); + } else { + options.signal.addEventListener("abort", () => controller.abort(), { + once: true, + }); + } + } // ── Event collector: capture real tool calls and completion signal ────────── // The worker agent emits events as it runs. We intercept "toolcall_end" @@ -311,6 +324,8 @@ async function runUnitViaSwarm(ctx, _pi, s, unitType, unitId, prompt, options) { let workerCompletedItems = null; let workerRemainingItems = null; let workerVerificationEvidence = null; + let checkpointToolCallCount = 0; + let checkpointAbortReason = null; function onEvent(event) { if ( @@ -331,6 +346,7 @@ async function runUnitViaSwarm(ctx, _pi, s, unitType, unitId, prompt, options) { // This is the same tool inspected by the legacy runUnit path's solver pass // (phases-unit.js line ~989: activeToolsAllowlist: ["checkpoint"]). if (toolCall.name === "checkpoint") { + checkpointToolCallCount += 1; const args = toolCall.arguments ?? {}; const rawOutcome = String(args.outcome ?? "").toLowerCase(); if ( @@ -359,6 +375,13 @@ async function runUnitViaSwarm(ctx, _pi, s, unitType, unitId, prompt, options) { if (Array.isArray(args.verificationEvidence)) { workerVerificationEvidence = args.verificationEvidence.map(String); } + if ( + checkpointToolCallCount >= MAX_CHECKPOINTS_PER_ITERATION && + !checkpointAbortReason + ) { + checkpointAbortReason = `checkpoint limit reached (${checkpointToolCallCount}/${MAX_CHECKPOINTS_PER_ITERATION})`; + controller.abort(); + } } } } @@ -369,9 +392,41 @@ async function runUnitViaSwarm(ctx, _pi, s, unitType, unitId, prompt, options) { swarmResult = await swarmDispatchAndWait(basePath, envelope, { timeoutMs, noOutputTimeoutMs, + signal: controller.signal, onEvent, }); } catch (err) { + if (checkpointAbortReason) { + return { + status: "completed", + event: { + messages: [ + { + role: "assistant", + content: [ + ...collectedToolCalls, + { + type: "text", + text: `Swarm worker stopped: ${checkpointAbortReason}.`, + }, + ], + _swarm: true, + targetAgent: "unknown", + }, + ], + }, + requestDispatchedAt, + _via: "swarm", + _swarmResult: { + error: checkpointAbortReason, + reply: null, + replyMessageId: null, + }, + swarmToolCallCount: collectedToolCalls.filter( + (tc) => tc.name !== "checkpoint", + ).length, + }; + } const msg = `swarmDispatchAndWait threw: ${getErrorMessage(err)}`; debugLog("runUnit[swarm]", { phase: "dispatch-error", @@ -395,6 +450,34 @@ async function runUnitViaSwarm(ctx, _pi, s, unitType, unitId, prompt, options) { // Error from the swarm (no reply, runAgentTurn failure, etc.) if (swarmResult.error || swarmResult.reply === null) { + if (checkpointAbortReason) { + return { + status: "completed", + event: { + messages: [ + { + role: "assistant", + content: [ + ...collectedToolCalls, + { + type: "text", + text: `Swarm worker stopped: ${checkpointAbortReason}.`, + }, + ], + _swarm: true, + replyMessageId: swarmResult.replyMessageId, + targetAgent: swarmResult.targetAgent, + }, + ], + }, + requestDispatchedAt, + _via: "swarm", + _swarmResult: swarmResult, + swarmToolCallCount: collectedToolCalls.filter( + (tc) => tc.name !== "checkpoint", + ).length, + }; + } const reason = swarmResult.error ?? "swarm returned no reply"; debugLog("runUnit[swarm]", { phase: "dispatch-no-reply", @@ -505,8 +588,7 @@ async function runUnitViaSwarm(ctx, _pi, s, unitType, unitId, prompt, options) { failureBoundary: "If the loop's solver disagrees, it will override via its own assessment.", evidence: `Worker produced ${replyText.length} chars and ${collectedToolCalls.length} tool calls but no checkpoint.`, - nonGoals: - "Never claims completion from a synthetic checkpoint.", + nonGoals: "Never claims completion from a synthetic checkpoint.", invariants: "Synthetic checkpoints are only injected when no real checkpoint exists.", assumptions: diff --git a/src/resources/extensions/sf/autonomous-solver.js b/src/resources/extensions/sf/autonomous-solver.js index c02d7965e..7c2b38c70 100644 --- a/src/resources/extensions/sf/autonomous-solver.js +++ b/src/resources/extensions/sf/autonomous-solver.js @@ -16,8 +16,8 @@ import { } from "node:fs"; import { dirname, join } from "node:path"; import { atomicWriteSync } from "./atomic-write.js"; -import { sfRoot } from "./paths.js"; import { emitJournalEvent } from "./journal.js"; +import { sfRoot } from "./paths.js"; import { loadPrompt } from "./prompt-loader.js"; export const AUTONOMOUS_SOLVER_OUTCOMES = [ @@ -32,7 +32,7 @@ const DEFAULT_SOLVER_MAX_ITERATIONS = 30000; const MIN_SOLVER_MAX_ITERATIONS = 1; const MAX_SOLVER_MAX_ITERATIONS = 100000; const DEFAULT_MISSING_CHECKPOINT_REPAIR_ATTEMPTS = 4; -const MAX_CHECKPOINTS_PER_ITERATION = 5; +export const MAX_CHECKPOINTS_PER_ITERATION = 5; const SOLVER_CHECKPOINT_SCHEMA_VERSION = 1; const SOLVER_STEERING_SCHEMA_VERSION = 1; const STALL_THRESHOLD_ITERATIONS = 3; @@ -347,7 +347,7 @@ function buildAutonomousLoopVars(state) { close: `CLOSE PHASE (final ${CLOSE_PHASE_LOOKAHEAD} iterations): You are approaching the iteration budget.\n` + "Priority: verify all acceptance criteria, run the test suite, and confirm the unit is complete.\n" + - "If the unit cannot be completed in the remaining iterations, checkpoint with outcome=\"blocked\" and a precise reason.\n" + + 'If the unit cannot be completed in the remaining iterations, checkpoint with outcome="blocked" and a precise reason.\n' + "Do NOT start new work — finish and verify existing work.", }; @@ -534,25 +534,6 @@ export function isNoOpExecutorTranscript(messages) { if (block.type === "tool_use" && isWorkToolName(block.name)) { return false; } - if (block.type === "tool_result") { - // tool_result has no name on the block itself; presence of a - // non-checkpoint tool_result implies a non-checkpoint tool_use - // preceded it. The pair-match would require backward scan; for - // robustness, treat ANY tool_result as evidence of work unless - // it's a checkpoint result (which would have been emitted by - // the assistant's checkpoint tool_use earlier in this same - // transcript — but that's protocol, not work). Without the - // block name we can't distinguish, so be conservative: a - // tool_result is non-no-op work UNLESS the entire transcript's - // only tool_use was `checkpoint`. We carry that check via the - // tool_use scan above — if a non-checkpoint tool_use exists, - // we've already returned false. If only `checkpoint` was used, - // the tool_result here is the checkpoint reply and we should - // keep scanning. - // Simpler approach: ignore tool_result blocks for the - // classification; the tool_use scan is authoritative. - continue; - } } } @@ -610,8 +591,7 @@ export function appendAutonomousSolverCheckpoint(basePath, params) { persisted.unitId && persisted.status !== "complete"; const isMismatch = - hasActiveIdentity && - !sameUnit(persisted, params.unitType, params.unitId); + hasActiveIdentity && !sameUnit(persisted, params.unitType, params.unitId); if (isMismatch) { emitJournalEvent(basePath, { flowId: `${state.unitType}-${state.unitId}-${Date.now()}`, @@ -714,8 +694,7 @@ export function appendAutonomousSolverCheckpoint(basePath, params) { checkpoint.summary, ].slice(-ROLLING_SUMMARY_WINDOW), // Increment checkpoint count for this iteration (safety cap) - checkpointCountThisIteration: - (state.checkpointCountThisIteration || 0) + 1, + checkpointCountThisIteration: (state.checkpointCountThisIteration || 0) + 1, }; mkdirSync(dirname(historyPath(basePath)), { recursive: true }); writeFileSync(historyPath(basePath), `${JSON.stringify(checkpoint)}\n`, { @@ -1099,8 +1078,7 @@ export function assessAutonomousSolverTurn( } // Hard cap on excessive checkpoints within a single iteration if ( - (state.checkpointCountThisIteration || 0) >= - MAX_CHECKPOINTS_PER_ITERATION + (state.checkpointCountThisIteration || 0) >= MAX_CHECKPOINTS_PER_ITERATION ) { return { action: "pause", diff --git a/src/resources/extensions/sf/tests/run-unit-via-swarm.test.mjs b/src/resources/extensions/sf/tests/run-unit-via-swarm.test.mjs index 89f951d97..52f195f94 100644 --- a/src/resources/extensions/sf/tests/run-unit-via-swarm.test.mjs +++ b/src/resources/extensions/sf/tests/run-unit-via-swarm.test.mjs @@ -51,6 +51,7 @@ const { mockAppendCheckpoint } = vi.hoisted(() => { vi.mock("../autonomous-solver.js", () => ({ appendAutonomousSolverCheckpoint: mockAppendCheckpoint, + MAX_CHECKPOINTS_PER_ITERATION: 5, })); // ─── Mock everything runUnit imports that touches DB / session infrastructure ─ @@ -911,10 +912,79 @@ describe("runUnit — Round 6: real tool calls captured from onEvent", () => { expect(toolUseBlocks[0]).toMatchObject({ type: "tool_use", name: "Bash" }); expect(toolUseBlocks[1]).toMatchObject({ type: "tool_use", name: "Read" }); - expect(toolUseBlocks[2]).toMatchObject({ type: "tool_use", name: "checkpoint" }); + expect(toolUseBlocks[2]).toMatchObject({ + type: "tool_use", + name: "checkpoint", + }); expect(textBlocks[0].text).toBe(MOCK_REPLY); }); + test("checkpoint loop aborts swarm turn at the per-iteration cap", async () => { + process.env.SF_AUTONOMOUS_VIA_SWARM = "1"; + + mockSwarmDispatchAndWait.mockImplementationOnce( + async (_basePath, envelope, opts = {}) => { + for (let i = 0; i < 6; i++) { + opts.onEvent?.({ + type: "message_update", + assistantMessageEvent: { + type: "toolcall_end", + contentIndex: i, + toolCall: { + type: "toolCall", + id: `tc-checkpoint-${i}`, + name: "checkpoint", + arguments: { + outcome: "continue", + unitType: "plan-milestone", + unitId: "M002", + summary: "still discovering", + completedItems: ["read files"], + remainingItems: ["keep planning"], + verificationEvidence: ["read:file.py"], + }, + }, + partial: {}, + }, + }); + if (opts.signal?.aborted) break; + } + return { + messageId: "msg-dispatch-001", + targetAgent: MOCK_TARGET, + swarmName: "default", + envelope, + error: "aborted", + reply: null, + replyMessageId: null, + }; + }, + ); + + const ctx = makeCtx("/proj"); + const pi = makePi(); + const s = makeS("/proj"); + + const result = await runUnit( + ctx, + pi, + s, + "plan-slice", + "M002/S01", + "plan it", + {}, + ); + + expect(result.status).toBe("completed"); + expect(mockAppendCheckpoint).not.toHaveBeenCalled(); + const lastMsg = result.event.messages[result.event.messages.length - 1]; + const toolUseBlocks = lastMsg.content.filter((b) => b.type === "tool_use"); + expect(toolUseBlocks).toHaveLength(5); + expect(toolUseBlocks.every((b) => b.name === "checkpoint")).toBe(true); + expect(lastMsg.content.at(-1).text).toContain("checkpoint limit reached"); + expect(result.swarmToolCallCount).toBe(0); + }); + test("checkpoint tool call with outcome=complete → appendCheckpoint called with outcome=complete", async () => { // The canonical completion detection: when the worker calls checkpoint with // outcome='complete', runUnitViaSwarm should pass outcome='complete' to