From b88b66c651f2fd528b2609f36124029308adb442 Mon Sep 17 00:00:00 2001 From: Mikael Hugo Date: Fri, 15 May 2026 14:54:27 +0200 Subject: [PATCH] feat(auto): fan out swarm research units --- src/resources/extensions/sf/auto/run-unit.js | 313 ++++++++++++++++++ .../sf/tests/run-unit-via-swarm.test.mjs | 95 +++++- .../extensions/sf/tests/swarm.test.mjs | 23 ++ .../extensions/sf/uok/swarm-dispatch.js | 12 +- 4 files changed, 428 insertions(+), 15 deletions(-) diff --git a/src/resources/extensions/sf/auto/run-unit.js b/src/resources/extensions/sf/auto/run-unit.js index b7de09273..3a2963f48 100644 --- a/src/resources/extensions/sf/auto/run-unit.js +++ b/src/resources/extensions/sf/auto/run-unit.js @@ -193,6 +193,100 @@ function shouldRouteRunUnitViaSwarm(options) { ); } +/** + * Build a bounded fan-out plan for autonomous swarm units. + * + * Purpose: let headless autonomy use multiple swarm agents only when the work is + * naturally parallel and low-conflict. Research and validation can split into + * independent read-only lanes; implementation stays single-agent unless a future + * planner provides explicit disjoint write scopes. + * + * Consumer: runUnitViaSwarm before dispatching an autonomous unit envelope. + * + * @param {string} unitType + * @param {string} _unitId + * @param {string} prompt + * @param {object} [options={}] + * @returns {{ reason: string, lanes: Array<{ id: string, targetAgent: string, workMode: string, instruction: string }> } | null} + */ +export function buildSwarmFanoutPlan(unitType, _unitId, prompt, options = {}) { + const fanoutFlag = process.env.SF_SWARM_FANOUT; + if (fanoutFlag === "0" || fanoutFlag === "false") return null; + if (isCheckpointOnlyProtocolPass(options)) return null; + if (options?.keepSession === true) return null; + if (options?.swarmFanout === false) return null; + + const type = String(unitType ?? "").toLowerCase(); + const text = String(prompt ?? "").toLowerCase(); + + if (type.includes("research") || text.includes("parallel research")) { + return { + reason: + "research units are read-heavy and can be checked from independent angles", + lanes: [ + { + id: "source-scout", + targetAgent: "scout", + workMode: "research", + instruction: + "Find the most relevant repo evidence and external assumptions. Do not edit files.", + }, + { + id: "contract-review", + targetAgent: "reviewer", + workMode: "review", + instruction: + "Review the proposed direction for purpose, consumer, and missing falsifiers. Do not edit files.", + }, + { + id: "verification-map", + targetAgent: "verifier", + workMode: "verify", + instruction: + "Identify executable verification, likely tests, and failure boundaries. Do not edit files.", + }, + ], + }; + } + + if ( + type.includes("validate") || + type.includes("audit") || + type.includes("gate") || + type.includes("uat") + ) { + return { + reason: + "validation benefits from independent review, verification, and challenge lanes", + lanes: [ + { + id: "reviewer", + targetAgent: "reviewer", + workMode: "review", + instruction: + "Review the artifact against the stated contract. Do not edit files.", + }, + { + id: "verifier", + targetAgent: "verifier", + workMode: "verify", + instruction: + "Run or identify verification evidence and report residual risk. Do not edit files.", + }, + { + id: "adversary", + targetAgent: "adversary", + workMode: "challenge", + instruction: + "Challenge weak assumptions and look for counterexamples. Do not edit files.", + }, + ], + }; + } + + return null; +} + /** * Build the system prompt for a swarm worker executing an autonomous unit. * @@ -422,6 +516,225 @@ async function runUnitViaSwarm(ctx, _pi, s, unitType, unitId, prompt, options) { } } + const fanoutPlan = buildSwarmFanoutPlan(unitType, unitId, prompt, options); + if (fanoutPlan && fanoutPlan.lanes.length > 1) { + debugLog("runUnit[swarm]", { + phase: "fanout-dispatch", + unitType, + unitId, + lanes: fanoutPlan.lanes.map((lane) => lane.targetAgent), + }); + if (typeof ctx.ui?.notify === "function") { + ctx.ui.notify( + `[${unitType}] ${unitId} → swarm fan-out ${fanoutPlan.lanes.length} lanes`, + "info", + ); + } + + const laneResults = await Promise.all( + fanoutPlan.lanes.map(async (lane) => { + const lanePayload = [ + `Fan-out lane: ${lane.id}`, + `Target agent: ${lane.targetAgent}`, + `Reason: ${fanoutPlan.reason}`, + "", + "Lane instruction:", + lane.instruction, + "", + "Original autonomous unit prompt:", + prompt, + ].join("\n"); + const laneEnvelope = { + ...envelope, + workMode: lane.workMode, + targetAgent: lane.targetAgent, + payload: lanePayload, + }; + const result = await swarmDispatchAndWait(basePath, laneEnvelope, { + timeoutMs, + noOutputTimeoutMs, + signal: controller.signal, + onEvent, + }); + return { lane, result }; + }), + ); + + const failedLane = laneResults.find( + ({ result }) => result.error || result.reply === null, + ); + if (failedLane) { + const reason = + failedLane.result.error ?? + `lane ${failedLane.lane.id} returned no reply`; + debugLog("runUnit[swarm]", { + phase: "fanout-failed", + unitType, + unitId, + lane: failedLane.lane.id, + reason, + }); + if (typeof ctx.ui?.notify === "function") { + ctx.ui.notify( + `[${unitType}] ${unitId} → swarm fan-out failed: ${reason}`, + "error", + ); + } + return { + status: "cancelled", + requestDispatchedAt, + errorContext: { + message: reason, + category: "session-failed", + isTransient: true, + }, + }; + } + + const replyText = laneResults + .map(({ lane, result }) => { + const reply = result.reply ?? ""; + return `## ${lane.targetAgent}\n${reply}`; + }) + .join("\n\n"); + const hasCheckpointCall = collectedToolCalls.some( + (tc) => tc.name === "checkpoint", + ); + if (!hasCheckpointCall) { + const verificationEvidence = laneResults.map( + ({ lane, result }) => + `swarm-agent:${lane.targetAgent}; replyMessageId:${result.replyMessageId ?? "unknown"}`, + ); + collectedToolCalls.push({ + type: "tool_use", + id: `swarm-fanout-cp-${Date.now()}`, + name: "checkpoint", + input: { + outcome: "continue", + unitType, + unitId, + summary: `Swarm fan-out completed ${laneResults.length} lanes.`, + completedItems: fanoutPlan.lanes.map( + (lane) => `${lane.targetAgent} completed ${lane.id}`, + ), + remainingItems: [ + `Synthesize fan-out findings for ${unitType} ${unitId}.`, + ], + verificationEvidence, + pdd: { + purpose: + "Synthetic checkpoint injected because fan-out lanes returned text without a single parent checkpoint.", + consumer: + "phases-unit.js assessAutonomousSolverTurn + missing-checkpoint-repair loop", + contract: + "Outcome is 'continue' so the parent loop synthesizes independent lane findings.", + failureBoundary: + "If any lane fails, fan-out returns cancelled before synthesizing evidence.", + evidence: verificationEvidence.join("; "), + nonGoals: + "Does not claim implementation completion from read-only fan-out lanes.", + invariants: + "Read-only fan-out lanes are used only for naturally parallel research or validation units.", + assumptions: + "Independent lane replies are useful input for the next autonomous step.", + }, + }, + }); + try { + appendAutonomousSolverCheckpoint(basePath, { + unitType, + unitId, + outcome: "continue", + summary: `Swarm fan-out completed ${laneResults.length} lanes.`, + completedItems: fanoutPlan.lanes.map( + (lane) => `${lane.targetAgent} completed ${lane.id}`, + ), + remainingItems: [ + `Synthesize fan-out findings for ${unitType} ${unitId}.`, + ], + verificationEvidence, + pdd: { + purpose: + "Synthetic checkpoint from multi-agent fan-out when lanes returned without a parent checkpoint.", + consumer: "phases-unit.js assessAutonomousSolverTurn", + contract: + "Falls back to 'continue' so the loop uses the fan-out evidence without pretending completion.", + failureBoundary: + "appendAutonomousSolverCheckpoint failure is swallowed — the loop repairs via missing-checkpoint handling.", + evidence: verificationEvidence.join("; "), + nonGoals: "Does not synthesize completion.", + invariants: + "Synthetic checkpoints are only written when no real checkpoint exists.", + assumptions: "All fan-out lanes produced non-empty replies.", + }, + }); + } catch (cpErr) { + debugLog("runUnit[swarm]", { + phase: "fanout-synthesized-checkpoint-error", + unitType, + unitId, + error: getErrorMessage(cpErr), + }); + } + } + + try { + emitJournalEvent(basePath, { + ts: new Date().toISOString(), + eventType: "swarm-dispatch", + data: { + unitType, + unitId, + targetAgent: "fanout", + workMode: envelope.workMode, + toolCallCount: collectedToolCalls.length, + outcome: workerSignaledOutcome ?? "continue", + via: "autonomous-unit", + fanout: true, + lanes: fanoutPlan.lanes.map((lane) => lane.targetAgent), + }, + }); + } catch { + /* journal write failure must not break dispatch */ + } + + const contentBlocks = [ + ...collectedToolCalls, + { type: "text", text: replyText }, + ]; + return { + status: "completed", + event: { + messages: [ + { + role: "assistant", + content: contentBlocks, + _swarm: true, + replyMessageId: laneResults + .map(({ result }) => result.replyMessageId) + .filter(Boolean) + .join(","), + targetAgent: "fanout", + }, + ], + }, + requestDispatchedAt, + _via: "swarm", + _swarmResult: { + fanout: true, + reason: fanoutPlan.reason, + lanes: laneResults.map(({ lane, result }) => ({ + id: lane.id, + targetAgent: lane.targetAgent, + replyMessageId: result.replyMessageId, + })), + reply: replyText, + }, + swarmToolCallCount: + collectedToolCalls.length - (hasCheckpointCall ? 0 : 1), + }; + } + let swarmResult; try { swarmResult = await swarmDispatchAndWait(basePath, envelope, { diff --git a/src/resources/extensions/sf/tests/run-unit-via-swarm.test.mjs b/src/resources/extensions/sf/tests/run-unit-via-swarm.test.mjs index 46a005d0a..28077062f 100644 --- a/src/resources/extensions/sf/tests/run-unit-via-swarm.test.mjs +++ b/src/resources/extensions/sf/tests/run-unit-via-swarm.test.mjs @@ -134,15 +134,18 @@ function makeS(basePath = "/tmp/test-project") { let origEnv; let origNoOutputEnv; let origHeadlessEnv; +let origFanoutEnv; beforeEach(() => { vi.clearAllMocks(); origEnv = process.env.SF_AUTONOMOUS_VIA_SWARM; origNoOutputEnv = process.env.SF_SWARM_NO_OUTPUT_TIMEOUT_MS; origHeadlessEnv = process.env.SF_HEADLESS; + origFanoutEnv = process.env.SF_SWARM_FANOUT; delete process.env.SF_AUTONOMOUS_VIA_SWARM; delete process.env.SF_SWARM_NO_OUTPUT_TIMEOUT_MS; delete process.env.SF_HEADLESS; + delete process.env.SF_SWARM_FANOUT; // Default implementation for the happy-path tests: return a deterministic reply. mockSwarmDispatchAndWait.mockImplementation( @@ -173,6 +176,11 @@ afterEach(() => { } else { process.env.SF_HEADLESS = origHeadlessEnv; } + if (origFanoutEnv === undefined) { + delete process.env.SF_SWARM_FANOUT; + } else { + process.env.SF_SWARM_FANOUT = origFanoutEnv; + } }); // ─── Flag ON — happy path ───────────────────────────────────────────────────── @@ -207,24 +215,19 @@ describe("runUnit — SF_AUTONOMOUS_VIA_SWARM=1 — happy path", () => { const pi = makePi(); const s = makeS("/myproject"); - await runUnit( - ctx, - pi, - s, - "research-slice", - "slice-42", - "research the topic", - { scope: "milestone-1", priority: 7 }, - ); + await runUnit(ctx, pi, s, "execute-task", "task-42", "execute the task", { + scope: "milestone-1", + priority: 7, + }); expect(mockSwarmDispatchAndWait).toHaveBeenCalledOnce(); const [basePath, envelope, opts] = mockSwarmDispatchAndWait.mock.calls[0]; expect(basePath).toBe("/myproject"); - expect(envelope.unitId).toBe("slice-42"); - expect(envelope.unitType).toBe("research-slice"); - expect(envelope.workMode).toBe("research"); // research-slice → research - expect(envelope.payload).toBe("research the topic"); + expect(envelope.unitId).toBe("task-42"); + expect(envelope.unitType).toBe("execute-task"); + expect(envelope.workMode).toBe("build"); // execute-task → build + expect(envelope.payload).toBe("execute the task"); expect(envelope.scope).toBe("milestone-1"); expect(envelope.priority).toBe(7); expect(envelope.executorPermissionLevel).toBe("low"); @@ -544,6 +547,71 @@ describe("runUnit — SF_AUTONOMOUS_VIA_SWARM=1 — happy path", () => { }); }); +describe("runUnit — swarm fan-out policy", () => { + test("research-slice_when_swarm_enabled_fans_out_to_independent_read_only_lanes", async () => { + process.env.SF_AUTONOMOUS_VIA_SWARM = "1"; + mockSwarmDispatchAndWait.mockImplementation( + async (_basePath, envelope, _opts) => ({ + messageId: `dispatch-${envelope.targetAgent}`, + targetAgent: envelope.targetAgent, + swarmName: "default", + envelope, + reply: `reply from ${envelope.targetAgent}`, + replyMessageId: `reply-${envelope.targetAgent}`, + }), + ); + + const ctx = makeCtx("/proj"); + const pi = makePi(); + const s = makeS("/proj"); + + const result = await runUnit( + ctx, + pi, + s, + "research-slice", + "M001/S01", + "research provider routing", + {}, + ); + + expect(result.status).toBe("completed"); + expect(mockSwarmDispatchAndWait).toHaveBeenCalledTimes(3); + const targetAgents = mockSwarmDispatchAndWait.mock.calls.map( + (call) => call[1].targetAgent, + ); + expect(targetAgents).toEqual(["scout", "reviewer", "verifier"]); + expect(result._swarmResult.fanout).toBe(true); + expect(result._swarmResult.lanes).toHaveLength(3); + expect(result.event.messages[0].targetAgent).toBe("fanout"); + expect(result.event.messages[0].content.at(-1).text).toContain("## scout"); + }); + + test("execute-task_when_no_disjoint_scope_remains_single_agent", async () => { + process.env.SF_AUTONOMOUS_VIA_SWARM = "1"; + + const ctx = makeCtx("/proj"); + const pi = makePi(); + const s = makeS("/proj"); + + const result = await runUnit( + ctx, + pi, + s, + "execute-task", + "M001/S01/T01", + "implement a coupled change", + {}, + ); + + expect(result.status).toBe("completed"); + expect(mockSwarmDispatchAndWait).toHaveBeenCalledOnce(); + expect( + mockSwarmDispatchAndWait.mock.calls[0][1].targetAgent, + ).toBeUndefined(); + }); +}); + // ─── Flag ON — error paths ──────────────────────────────────────────────────── describe("runUnit — SF_AUTONOMOUS_VIA_SWARM=1 — error paths", () => { @@ -760,6 +828,7 @@ describe("deriveWorkMode (via envelope.workMode in dispatch calls)", () => { for (const [unitType, expectedWorkMode] of cases) { test(`${unitType} → ${expectedWorkMode}`, async () => { process.env.SF_AUTONOMOUS_VIA_SWARM = "1"; + process.env.SF_SWARM_FANOUT = "0"; const ctx = makeCtx("/proj"); const pi = makePi(); diff --git a/src/resources/extensions/sf/tests/swarm.test.mjs b/src/resources/extensions/sf/tests/swarm.test.mjs index d4b75cb83..9cebac233 100644 --- a/src/resources/extensions/sf/tests/swarm.test.mjs +++ b/src/resources/extensions/sf/tests/swarm.test.mjs @@ -297,6 +297,29 @@ describe("SwarmDispatchLayer — routing", () => { expect(agent.identity.role).toBe("reviewer"); }); + test("dispatch_when_targetAgent_set_routes_to_named_agent", async () => { + const root = makeProject(); + const layer = new SwarmDispatchLayer(root); + + const result = await layer.dispatch({ + unitId: "task-target-1", + unitType: "task", + workMode: "build", + targetAgent: "verifier", + payload: "verify task data", + priority: 5, + scope: "test-scope", + }); + + expect(result.messageId).toBeTruthy(); + expect(result.targetAgent).toBe("verifier"); + + const swarm = await layer.getOrCreateSwarm(); + const agent = swarm.get(result.targetAgent); + expect(agent).toBeDefined(); + expect(agent.identity.role).toBe("verifier"); + }); + test("dispatchBatch_dispatches_all", async () => { const root = makeProject(); const layer = new SwarmDispatchLayer(root); diff --git a/src/resources/extensions/sf/uok/swarm-dispatch.js b/src/resources/extensions/sf/uok/swarm-dispatch.js index b8c6cc751..e9c53ae6a 100644 --- a/src/resources/extensions/sf/uok/swarm-dispatch.js +++ b/src/resources/extensions/sf/uok/swarm-dispatch.js @@ -168,6 +168,8 @@ async function getA2ATransport() { * per unit type (e.g. ensure "checkpoint" is always available for execute-task units). * @property {string} [executorPermissionLevel] Optional: legacy SF permission * extension level used by in-process headless worker sessions. + * @property {string} [targetAgent] Optional explicit target agent name. When + * set, dispatch uses this agent instead of role-derived routing. */ /** @@ -266,7 +268,10 @@ export class SwarmDispatchLayer { */ async _busDispatch(envelope) { const swarm = await this.getOrCreateSwarm(); - const target = swarm.route(envelope); + const target = + envelope.targetAgent != null + ? swarm.get(envelope.targetAgent) + : swarm.route(envelope); if (!target) { throw new Error( @@ -324,7 +329,10 @@ export class SwarmDispatchLayer { */ async _a2aDispatch(envelope) { const swarm = await this.getOrCreateSwarm(); - const target = swarm.route(envelope); + const target = + envelope.targetAgent != null + ? swarm.get(envelope.targetAgent) + : swarm.route(envelope); if (!target) { throw new Error(