feat(auto): fan out swarm research units

This commit is contained in:
Mikael Hugo 2026-05-15 14:54:27 +02:00
parent c8854ca896
commit b88b66c651
4 changed files with 428 additions and 15 deletions

View file

@ -193,6 +193,100 @@ function shouldRouteRunUnitViaSwarm(options) {
);
}
/**
* Build a bounded fan-out plan for autonomous swarm units.
*
* Purpose: let headless autonomy use multiple swarm agents only when the work is
* naturally parallel and low-conflict. Research and validation can split into
* independent read-only lanes; implementation stays single-agent unless a future
* planner provides explicit disjoint write scopes.
*
* Consumer: runUnitViaSwarm before dispatching an autonomous unit envelope.
*
* @param {string} unitType
* @param {string} _unitId
* @param {string} prompt
* @param {object} [options={}]
* @returns {{ reason: string, lanes: Array<{ id: string, targetAgent: string, workMode: string, instruction: string }> } | null}
*/
export function buildSwarmFanoutPlan(unitType, _unitId, prompt, options = {}) {
const fanoutFlag = process.env.SF_SWARM_FANOUT;
if (fanoutFlag === "0" || fanoutFlag === "false") return null;
if (isCheckpointOnlyProtocolPass(options)) return null;
if (options?.keepSession === true) return null;
if (options?.swarmFanout === false) return null;
const type = String(unitType ?? "").toLowerCase();
const text = String(prompt ?? "").toLowerCase();
if (type.includes("research") || text.includes("parallel research")) {
return {
reason:
"research units are read-heavy and can be checked from independent angles",
lanes: [
{
id: "source-scout",
targetAgent: "scout",
workMode: "research",
instruction:
"Find the most relevant repo evidence and external assumptions. Do not edit files.",
},
{
id: "contract-review",
targetAgent: "reviewer",
workMode: "review",
instruction:
"Review the proposed direction for purpose, consumer, and missing falsifiers. Do not edit files.",
},
{
id: "verification-map",
targetAgent: "verifier",
workMode: "verify",
instruction:
"Identify executable verification, likely tests, and failure boundaries. Do not edit files.",
},
],
};
}
if (
type.includes("validate") ||
type.includes("audit") ||
type.includes("gate") ||
type.includes("uat")
) {
return {
reason:
"validation benefits from independent review, verification, and challenge lanes",
lanes: [
{
id: "reviewer",
targetAgent: "reviewer",
workMode: "review",
instruction:
"Review the artifact against the stated contract. Do not edit files.",
},
{
id: "verifier",
targetAgent: "verifier",
workMode: "verify",
instruction:
"Run or identify verification evidence and report residual risk. Do not edit files.",
},
{
id: "adversary",
targetAgent: "adversary",
workMode: "challenge",
instruction:
"Challenge weak assumptions and look for counterexamples. Do not edit files.",
},
],
};
}
return null;
}
/**
* Build the system prompt for a swarm worker executing an autonomous unit.
*
@ -422,6 +516,225 @@ async function runUnitViaSwarm(ctx, _pi, s, unitType, unitId, prompt, options) {
}
}
const fanoutPlan = buildSwarmFanoutPlan(unitType, unitId, prompt, options);
if (fanoutPlan && fanoutPlan.lanes.length > 1) {
debugLog("runUnit[swarm]", {
phase: "fanout-dispatch",
unitType,
unitId,
lanes: fanoutPlan.lanes.map((lane) => lane.targetAgent),
});
if (typeof ctx.ui?.notify === "function") {
ctx.ui.notify(
`[${unitType}] ${unitId} → swarm fan-out ${fanoutPlan.lanes.length} lanes`,
"info",
);
}
const laneResults = await Promise.all(
fanoutPlan.lanes.map(async (lane) => {
const lanePayload = [
`Fan-out lane: ${lane.id}`,
`Target agent: ${lane.targetAgent}`,
`Reason: ${fanoutPlan.reason}`,
"",
"Lane instruction:",
lane.instruction,
"",
"Original autonomous unit prompt:",
prompt,
].join("\n");
const laneEnvelope = {
...envelope,
workMode: lane.workMode,
targetAgent: lane.targetAgent,
payload: lanePayload,
};
const result = await swarmDispatchAndWait(basePath, laneEnvelope, {
timeoutMs,
noOutputTimeoutMs,
signal: controller.signal,
onEvent,
});
return { lane, result };
}),
);
const failedLane = laneResults.find(
({ result }) => result.error || result.reply === null,
);
if (failedLane) {
const reason =
failedLane.result.error ??
`lane ${failedLane.lane.id} returned no reply`;
debugLog("runUnit[swarm]", {
phase: "fanout-failed",
unitType,
unitId,
lane: failedLane.lane.id,
reason,
});
if (typeof ctx.ui?.notify === "function") {
ctx.ui.notify(
`[${unitType}] ${unitId} → swarm fan-out failed: ${reason}`,
"error",
);
}
return {
status: "cancelled",
requestDispatchedAt,
errorContext: {
message: reason,
category: "session-failed",
isTransient: true,
},
};
}
const replyText = laneResults
.map(({ lane, result }) => {
const reply = result.reply ?? "";
return `## ${lane.targetAgent}\n${reply}`;
})
.join("\n\n");
const hasCheckpointCall = collectedToolCalls.some(
(tc) => tc.name === "checkpoint",
);
if (!hasCheckpointCall) {
const verificationEvidence = laneResults.map(
({ lane, result }) =>
`swarm-agent:${lane.targetAgent}; replyMessageId:${result.replyMessageId ?? "unknown"}`,
);
collectedToolCalls.push({
type: "tool_use",
id: `swarm-fanout-cp-${Date.now()}`,
name: "checkpoint",
input: {
outcome: "continue",
unitType,
unitId,
summary: `Swarm fan-out completed ${laneResults.length} lanes.`,
completedItems: fanoutPlan.lanes.map(
(lane) => `${lane.targetAgent} completed ${lane.id}`,
),
remainingItems: [
`Synthesize fan-out findings for ${unitType} ${unitId}.`,
],
verificationEvidence,
pdd: {
purpose:
"Synthetic checkpoint injected because fan-out lanes returned text without a single parent checkpoint.",
consumer:
"phases-unit.js assessAutonomousSolverTurn + missing-checkpoint-repair loop",
contract:
"Outcome is 'continue' so the parent loop synthesizes independent lane findings.",
failureBoundary:
"If any lane fails, fan-out returns cancelled before synthesizing evidence.",
evidence: verificationEvidence.join("; "),
nonGoals:
"Does not claim implementation completion from read-only fan-out lanes.",
invariants:
"Read-only fan-out lanes are used only for naturally parallel research or validation units.",
assumptions:
"Independent lane replies are useful input for the next autonomous step.",
},
},
});
try {
appendAutonomousSolverCheckpoint(basePath, {
unitType,
unitId,
outcome: "continue",
summary: `Swarm fan-out completed ${laneResults.length} lanes.`,
completedItems: fanoutPlan.lanes.map(
(lane) => `${lane.targetAgent} completed ${lane.id}`,
),
remainingItems: [
`Synthesize fan-out findings for ${unitType} ${unitId}.`,
],
verificationEvidence,
pdd: {
purpose:
"Synthetic checkpoint from multi-agent fan-out when lanes returned without a parent checkpoint.",
consumer: "phases-unit.js assessAutonomousSolverTurn",
contract:
"Falls back to 'continue' so the loop uses the fan-out evidence without pretending completion.",
failureBoundary:
"appendAutonomousSolverCheckpoint failure is swallowed — the loop repairs via missing-checkpoint handling.",
evidence: verificationEvidence.join("; "),
nonGoals: "Does not synthesize completion.",
invariants:
"Synthetic checkpoints are only written when no real checkpoint exists.",
assumptions: "All fan-out lanes produced non-empty replies.",
},
});
} catch (cpErr) {
debugLog("runUnit[swarm]", {
phase: "fanout-synthesized-checkpoint-error",
unitType,
unitId,
error: getErrorMessage(cpErr),
});
}
}
try {
emitJournalEvent(basePath, {
ts: new Date().toISOString(),
eventType: "swarm-dispatch",
data: {
unitType,
unitId,
targetAgent: "fanout",
workMode: envelope.workMode,
toolCallCount: collectedToolCalls.length,
outcome: workerSignaledOutcome ?? "continue",
via: "autonomous-unit",
fanout: true,
lanes: fanoutPlan.lanes.map((lane) => lane.targetAgent),
},
});
} catch {
/* journal write failure must not break dispatch */
}
const contentBlocks = [
...collectedToolCalls,
{ type: "text", text: replyText },
];
return {
status: "completed",
event: {
messages: [
{
role: "assistant",
content: contentBlocks,
_swarm: true,
replyMessageId: laneResults
.map(({ result }) => result.replyMessageId)
.filter(Boolean)
.join(","),
targetAgent: "fanout",
},
],
},
requestDispatchedAt,
_via: "swarm",
_swarmResult: {
fanout: true,
reason: fanoutPlan.reason,
lanes: laneResults.map(({ lane, result }) => ({
id: lane.id,
targetAgent: lane.targetAgent,
replyMessageId: result.replyMessageId,
})),
reply: replyText,
},
swarmToolCallCount:
collectedToolCalls.length - (hasCheckpointCall ? 0 : 1),
};
}
let swarmResult;
try {
swarmResult = await swarmDispatchAndWait(basePath, envelope, {

View file

@ -134,15 +134,18 @@ function makeS(basePath = "/tmp/test-project") {
let origEnv;
let origNoOutputEnv;
let origHeadlessEnv;
let origFanoutEnv;
beforeEach(() => {
vi.clearAllMocks();
origEnv = process.env.SF_AUTONOMOUS_VIA_SWARM;
origNoOutputEnv = process.env.SF_SWARM_NO_OUTPUT_TIMEOUT_MS;
origHeadlessEnv = process.env.SF_HEADLESS;
origFanoutEnv = process.env.SF_SWARM_FANOUT;
delete process.env.SF_AUTONOMOUS_VIA_SWARM;
delete process.env.SF_SWARM_NO_OUTPUT_TIMEOUT_MS;
delete process.env.SF_HEADLESS;
delete process.env.SF_SWARM_FANOUT;
// Default implementation for the happy-path tests: return a deterministic reply.
mockSwarmDispatchAndWait.mockImplementation(
@ -173,6 +176,11 @@ afterEach(() => {
} else {
process.env.SF_HEADLESS = origHeadlessEnv;
}
if (origFanoutEnv === undefined) {
delete process.env.SF_SWARM_FANOUT;
} else {
process.env.SF_SWARM_FANOUT = origFanoutEnv;
}
});
// ─── Flag ON — happy path ─────────────────────────────────────────────────────
@ -207,24 +215,19 @@ describe("runUnit — SF_AUTONOMOUS_VIA_SWARM=1 — happy path", () => {
const pi = makePi();
const s = makeS("/myproject");
await runUnit(
ctx,
pi,
s,
"research-slice",
"slice-42",
"research the topic",
{ scope: "milestone-1", priority: 7 },
);
await runUnit(ctx, pi, s, "execute-task", "task-42", "execute the task", {
scope: "milestone-1",
priority: 7,
});
expect(mockSwarmDispatchAndWait).toHaveBeenCalledOnce();
const [basePath, envelope, opts] = mockSwarmDispatchAndWait.mock.calls[0];
expect(basePath).toBe("/myproject");
expect(envelope.unitId).toBe("slice-42");
expect(envelope.unitType).toBe("research-slice");
expect(envelope.workMode).toBe("research"); // research-slice → research
expect(envelope.payload).toBe("research the topic");
expect(envelope.unitId).toBe("task-42");
expect(envelope.unitType).toBe("execute-task");
expect(envelope.workMode).toBe("build"); // execute-task → build
expect(envelope.payload).toBe("execute the task");
expect(envelope.scope).toBe("milestone-1");
expect(envelope.priority).toBe(7);
expect(envelope.executorPermissionLevel).toBe("low");
@ -544,6 +547,71 @@ describe("runUnit — SF_AUTONOMOUS_VIA_SWARM=1 — happy path", () => {
});
});
describe("runUnit — swarm fan-out policy", () => {
test("research-slice_when_swarm_enabled_fans_out_to_independent_read_only_lanes", async () => {
process.env.SF_AUTONOMOUS_VIA_SWARM = "1";
mockSwarmDispatchAndWait.mockImplementation(
async (_basePath, envelope, _opts) => ({
messageId: `dispatch-${envelope.targetAgent}`,
targetAgent: envelope.targetAgent,
swarmName: "default",
envelope,
reply: `reply from ${envelope.targetAgent}`,
replyMessageId: `reply-${envelope.targetAgent}`,
}),
);
const ctx = makeCtx("/proj");
const pi = makePi();
const s = makeS("/proj");
const result = await runUnit(
ctx,
pi,
s,
"research-slice",
"M001/S01",
"research provider routing",
{},
);
expect(result.status).toBe("completed");
expect(mockSwarmDispatchAndWait).toHaveBeenCalledTimes(3);
const targetAgents = mockSwarmDispatchAndWait.mock.calls.map(
(call) => call[1].targetAgent,
);
expect(targetAgents).toEqual(["scout", "reviewer", "verifier"]);
expect(result._swarmResult.fanout).toBe(true);
expect(result._swarmResult.lanes).toHaveLength(3);
expect(result.event.messages[0].targetAgent).toBe("fanout");
expect(result.event.messages[0].content.at(-1).text).toContain("## scout");
});
test("execute-task_when_no_disjoint_scope_remains_single_agent", async () => {
process.env.SF_AUTONOMOUS_VIA_SWARM = "1";
const ctx = makeCtx("/proj");
const pi = makePi();
const s = makeS("/proj");
const result = await runUnit(
ctx,
pi,
s,
"execute-task",
"M001/S01/T01",
"implement a coupled change",
{},
);
expect(result.status).toBe("completed");
expect(mockSwarmDispatchAndWait).toHaveBeenCalledOnce();
expect(
mockSwarmDispatchAndWait.mock.calls[0][1].targetAgent,
).toBeUndefined();
});
});
// ─── Flag ON — error paths ────────────────────────────────────────────────────
describe("runUnit — SF_AUTONOMOUS_VIA_SWARM=1 — error paths", () => {
@ -760,6 +828,7 @@ describe("deriveWorkMode (via envelope.workMode in dispatch calls)", () => {
for (const [unitType, expectedWorkMode] of cases) {
test(`${unitType}${expectedWorkMode}`, async () => {
process.env.SF_AUTONOMOUS_VIA_SWARM = "1";
process.env.SF_SWARM_FANOUT = "0";
const ctx = makeCtx("/proj");
const pi = makePi();

View file

@ -297,6 +297,29 @@ describe("SwarmDispatchLayer — routing", () => {
expect(agent.identity.role).toBe("reviewer");
});
test("dispatch_when_targetAgent_set_routes_to_named_agent", async () => {
const root = makeProject();
const layer = new SwarmDispatchLayer(root);
const result = await layer.dispatch({
unitId: "task-target-1",
unitType: "task",
workMode: "build",
targetAgent: "verifier",
payload: "verify task data",
priority: 5,
scope: "test-scope",
});
expect(result.messageId).toBeTruthy();
expect(result.targetAgent).toBe("verifier");
const swarm = await layer.getOrCreateSwarm();
const agent = swarm.get(result.targetAgent);
expect(agent).toBeDefined();
expect(agent.identity.role).toBe("verifier");
});
test("dispatchBatch_dispatches_all", async () => {
const root = makeProject();
const layer = new SwarmDispatchLayer(root);

View file

@ -168,6 +168,8 @@ async function getA2ATransport() {
* per unit type (e.g. ensure "checkpoint" is always available for execute-task units).
* @property {string} [executorPermissionLevel] Optional: legacy SF permission
* extension level used by in-process headless worker sessions.
* @property {string} [targetAgent] Optional explicit target agent name. When
* set, dispatch uses this agent instead of role-derived routing.
*/
/**
@ -266,7 +268,10 @@ export class SwarmDispatchLayer {
*/
async _busDispatch(envelope) {
const swarm = await this.getOrCreateSwarm();
const target = swarm.route(envelope);
const target =
envelope.targetAgent != null
? swarm.get(envelope.targetAgent)
: swarm.route(envelope);
if (!target) {
throw new Error(
@ -324,7 +329,10 @@ export class SwarmDispatchLayer {
*/
async _a2aDispatch(envelope) {
const swarm = await this.getOrCreateSwarm();
const target = swarm.route(envelope);
const target =
envelope.targetAgent != null
? swarm.get(envelope.targetAgent)
: swarm.route(envelope);
if (!target) {
throw new Error(