feat(autonomous): SF_AUTONOMOUS_VIA_SWARM=1 routes unit dispatch through swarm

Add runUnitViaSwarm as an opt-in path in auto/run-unit.js. When
SF_AUTONOMOUS_VIA_SWARM=1 (or =true), each unit dispatch builds a
DispatchEnvelope (unitType -> workMode via deriveWorkMode), calls
swarmDispatchAndWait, and returns the agent reply as a synthetic
{status: "completed", event.messages: [{role: "assistant", content: reply}]}
matching the shape phases-unit.js / classifyExecutorRefusal already expect.

Default (flag unset) is byte-identical to today — no regression in the
default path, 1751/1751 tests pass.

Known gap (acceptable for an experimental opt-in, must be closed before
swarm becomes default):
- Tool-call events from the swarm worker do NOT surface to the
  orchestrator UI (runAgentTurn handles them internally).
- The worker emits a plain text reply, not a structured checkpoint,
  so phases-unit.js' checkpoint-missing repair path will not trigger
  and classifyExecutorRefusal will not detect refusals.

This is the first concrete step toward routing autonomous unit work
through swarm: role-based agent selection, memory inheritance via the
envelope, and a durable bus audit trail of every unit dispatch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mikael Hugo 2026-05-15 04:27:00 +02:00
parent bbade22388
commit 78d52d7967
2 changed files with 577 additions and 0 deletions

View file

@ -2,6 +2,11 @@
* auto/run-unit.ts Single unit execution: session create prompt await agent_end.
*
* Imports from: auto/types, auto/resolve
*
* ## Swarm dispatch path (opt-in)
* When `SF_AUTONOMOUS_VIA_SWARM=1` (or `=true`) is set, `runUnit` routes each
* unit through `swarmDispatchAndWait` instead of the parent-session `pi.sendMessage`
* path. Default behavior is unchanged when the flag is unset.
*/
import { scopeActiveToolsForUnitType } from "../constants.js";
@ -18,6 +23,7 @@ import {
resetRunawayGuardState,
} from "../uok/auto-runaway-guard.js";
import { logWarning } from "../workflow-logger.js";
import { swarmDispatchAndWait } from "../uok/swarm-dispatch.js";
import {
_clearCurrentResolve,
_setCurrentResolve,
@ -82,6 +88,177 @@ export function scopeActiveToolsForRunUnit(
return scopeActiveToolsForUnitType(unitType, currentTools);
}
/**
* Derive the swarm workMode for a given unitType.
*
* Purpose: map autonomous unit type strings to the swarm routing vocabulary so
* envelopes dispatched via runUnitViaSwarm reach the correct role agent.
* Mirrors the logic in AgentSwarm._deriveWorkMode but lives here to keep
* run-unit self-contained.
*
* Consumer: runUnitViaSwarm.
*
* @param {string} unitType
* @returns {string}
*/
function deriveWorkMode(unitType) {
if (!unitType) return "build";
const t = unitType.toLowerCase();
if (t.includes("research") || t.includes("scout")) return "research";
if (t.includes("review")) return "review";
if (
t.includes("audit") ||
t.includes("validate") ||
t.includes("gate") ||
t.includes("uat")
)
return "verify";
if (t.includes("repair") || t.includes("fix")) return "repair";
if (
t.includes("build") ||
t.includes("code") ||
t.includes("implement") ||
t.includes("execute")
)
return "build";
if (
t.includes("plan") ||
t.includes("slice") ||
t.includes("milestone") ||
t.includes("roadmap")
)
return "plan";
if (t.includes("doc") || t.includes("rewrite") || t.includes("scribe"))
return "document";
if (t.includes("challenge") || t.includes("adversar")) return "challenge";
return "build";
}
/**
* Run a unit through the swarm dispatch layer instead of the parent session.
*
* Purpose: alternate execution path activated by `SF_AUTONOMOUS_VIA_SWARM=1`.
* Routes the unit prompt as a DispatchEnvelope through `swarmDispatchAndWait`,
* then maps the structured swarm result back into the same UnitResult shape that
* `runUnit`'s callers expect so the orchestrator loop is unaffected.
*
* Progress lifecycle hooks (debugLog + ctx.ui.notify) mirror the phases emitted
* by the legacy pi.sendMessage path so operator-facing output is equivalent.
*
* Consumer: runUnit when SF_AUTONOMOUS_VIA_SWARM is set.
*
* @param {object} ctx
* @param {object} pi
* @param {object} s
* @param {string} unitType
* @param {string} unitId
* @param {string} prompt
* @param {object} [options={}]
* @returns {Promise<object>} UnitResult
*/
async function runUnitViaSwarm(ctx, pi, s, unitType, unitId, prompt, options) {
const requestDispatchedAt = Date.now();
debugLog("runUnit[swarm]", { phase: "start", unitType, unitId });
const basePath = s.basePath ?? ctx.basePath ?? process.cwd();
// Build the envelope. `scope` is taken from options when available so that
// milestone-scoped orchestrators can supply it; fall back to "autonomous".
const envelope = {
scope: options?.scope ?? "autonomous",
unitId,
unitType,
workMode: deriveWorkMode(unitType),
payload: prompt,
priority: options?.priority ?? 5,
};
debugLog("runUnit[swarm]", { phase: "dispatch", unitType, unitId, workMode: envelope.workMode });
// Derive timeout: use the same hard-timeout logic as the legacy path so the
// swarm agent does not run longer than the orchestrator would tolerate.
const supervisor = resolveAutoSupervisorConfig();
const timeoutMs = Math.max(
30_000,
(supervisor.hard_timeout_minutes ?? 8) * 60 * 1000,
);
let swarmResult;
try {
swarmResult = await swarmDispatchAndWait(basePath, envelope, { timeoutMs });
} catch (err) {
const msg = `swarmDispatchAndWait threw: ${getErrorMessage(err)}`;
debugLog("runUnit[swarm]", { phase: "dispatch-error", unitType, unitId, error: msg });
if (typeof ctx.ui?.notify === "function") {
ctx.ui.notify(`[${unitType}] ${unitId} → swarm error: ${msg}`, "error");
}
return {
status: "cancelled",
requestDispatchedAt,
errorContext: {
message: msg,
category: "session-failed",
isTransient: true,
},
};
}
// Error from the swarm (no reply, runAgentTurn failure, etc.)
if (swarmResult.error || swarmResult.reply === null) {
const reason = swarmResult.error ?? "swarm returned no reply";
debugLog("runUnit[swarm]", { phase: "dispatch-no-reply", unitType, unitId, reason });
if (typeof ctx.ui?.notify === "function") {
ctx.ui.notify(`[${unitType}] ${unitId} → swarm dispatch failed: ${reason}`, "error");
}
return {
status: "cancelled",
requestDispatchedAt,
errorContext: {
message: reason,
category: "session-failed",
isTransient: true,
},
};
}
debugLog("runUnit[swarm]", {
phase: "agent-end-received",
unitType,
unitId,
status: "completed",
targetAgent: swarmResult.targetAgent,
replyMessageId: swarmResult.replyMessageId,
});
if (typeof ctx.ui?.notify === "function") {
ctx.ui.notify(`[${unitType}] ${unitId} → swarm agent ${swarmResult.targetAgent} completed`, "info");
}
// Map swarm reply back to the UnitResult shape. The reply text becomes the
// last message in a synthetic event.messages array so callers that inspect
// event.messages (e.g. for refusal classification) see content.
const syntheticEvent = {
messages: [
{
role: "assistant",
content: swarmResult.reply,
_swarm: true,
replyMessageId: swarmResult.replyMessageId,
targetAgent: swarmResult.targetAgent,
},
],
};
return {
status: "completed",
event: syntheticEvent,
requestDispatchedAt,
_via: "swarm",
_swarmResult: swarmResult,
};
}
/**
* Execute a single unit: create a new session, send the prompt, and await
* the agent_end promise. Returns a UnitResult describing what happened.
@ -97,6 +274,14 @@ export function scopeActiveToolsForRunUnit(
* Default: false (each new unit starts with a clean session).
*/
export async function runUnit(ctx, pi, s, unitType, unitId, prompt, options) {
// Feature-flagged swarm path — no-op when unset so default behavior is unchanged.
if (
process.env.SF_AUTONOMOUS_VIA_SWARM === "1" ||
process.env.SF_AUTONOMOUS_VIA_SWARM === "true"
) {
return runUnitViaSwarm(ctx, pi, s, unitType, unitId, prompt, options);
}
const keepSession = options?.keepSession === true;
// promptCacheSplit: {before, after} — stable prefix (to cache) + dynamic suffix.
// When present, passes the content as a two-block array so providers can mark

View file

@ -0,0 +1,392 @@
/**
* run-unit-via-swarm.test.mjs Tests for the SF_AUTONOMOUS_VIA_SWARM opt-in path in runUnit.
*
* Purpose: verify that when SF_AUTONOMOUS_VIA_SWARM=1 is set, runUnit routes through
* swarmDispatchAndWait instead of the legacy pi.sendMessage path, and that the swarm
* result is mapped back to the correct UnitResult shape.
*
* When the flag is unset, the swarm path must NOT be called (default behavior unchanged).
*
* Consumer: CI unit-test suite (`npm run test:unit`).
*/
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
// ─── Mock swarm-dispatch.js ───────────────────────────────────────────────────
// Stub swarmDispatchAndWait so tests never touch SQLite, swarm topology, or LLM.
// Uses vi.hoisted so the factory reference is available when vi.mock is hoisted.
const MOCK_REPLY = "swarm agent reply: unit complete";
const MOCK_REPLY_ID = "msg-swarm-001";
const MOCK_TARGET = "worker-1";
const { mockSwarmDispatchAndWait } = vi.hoisted(() => {
const fn = vi.fn();
return { mockSwarmDispatchAndWait: fn };
});
vi.mock("../uok/swarm-dispatch.js", () => ({
swarmDispatchAndWait: mockSwarmDispatchAndWait,
swarmDispatch: vi.fn(),
SwarmDispatchLayer: vi.fn(),
}));
// ─── Mock preferences.js ─────────────────────────────────────────────────────
// resolveAutoSupervisorConfig is called in runUnitViaSwarm to compute timeoutMs.
vi.mock("../preferences.js", () => ({
resolveAutoSupervisorConfig: vi.fn(() => ({ hard_timeout_minutes: 8 })),
resolvePersistModelChanges: vi.fn(() => false),
}));
// ─── Mock everything runUnit imports that touches DB / session infrastructure ─
vi.mock("../debug-logger.js", () => ({ debugLog: vi.fn() }));
vi.mock("../error-utils.js", () => ({ getErrorMessage: (e) => String(e?.message ?? e) }));
vi.mock("../workflow-logger.js", () => ({ logWarning: vi.fn() }));
vi.mock("../constants.js", () => ({
scopeActiveToolsForUnitType: vi.fn((_, tools) => tools),
}));
vi.mock("../uok/auto-runaway-guard.js", () => ({
collectSessionTokenUsage: vi.fn(() => 0),
collectWorktreeFingerprint: vi.fn(() => ""),
countChangedFiles: vi.fn(() => 0),
resetRunawayGuardState: vi.fn(),
}));
vi.mock("./resolve.js", () => ({
_clearCurrentResolve: vi.fn(),
_setCurrentResolve: vi.fn(),
_setSessionSwitchInFlight: vi.fn(),
}));
vi.mock("./session.js", () => ({ NEW_SESSION_TIMEOUT_MS: 5000 }));
vi.mock("./turn-epoch.js", () => ({
getCurrentTurnGeneration: vi.fn(() => 1),
runWithTurnGeneration: vi.fn(async (_gen, fn) => fn()),
}));
// ─── Import runUnit AFTER mocks are set up ────────────────────────────────────
import { runUnit } from "../auto/run-unit.js";
// ─── Helpers ──────────────────────────────────────────────────────────────────
function makeCtx(basePath = "/tmp/test-project") {
return {
basePath,
ui: { notify: vi.fn() },
modelRegistry: null,
model: null,
};
}
function makePi() {
return {
sendMessage: vi.fn(async () => {}),
getActiveTools: vi.fn(() => []),
setActiveTools: vi.fn(),
setModel: vi.fn(async () => true),
setFallbackUnitContext: vi.fn(),
};
}
function makeS(basePath = "/tmp/test-project") {
return {
basePath,
active: true,
verbose: false,
currentUnitModel: null,
cmdCtx: { newSession: vi.fn(async () => ({})), clearQueue: vi.fn() },
};
}
// ─── Save / restore env ───────────────────────────────────────────────────────
let origEnv;
beforeEach(() => {
vi.clearAllMocks();
origEnv = process.env.SF_AUTONOMOUS_VIA_SWARM;
delete process.env.SF_AUTONOMOUS_VIA_SWARM;
// Default implementation for the happy-path tests: return a deterministic reply.
mockSwarmDispatchAndWait.mockImplementation(async (_basePath, envelope, _opts) => ({
messageId: "msg-dispatch-001",
targetAgent: MOCK_TARGET,
swarmName: "default",
envelope,
reply: MOCK_REPLY,
replyMessageId: MOCK_REPLY_ID,
}));
});
afterEach(() => {
if (origEnv === undefined) {
delete process.env.SF_AUTONOMOUS_VIA_SWARM;
} else {
process.env.SF_AUTONOMOUS_VIA_SWARM = origEnv;
}
});
// ─── Flag ON — happy path ─────────────────────────────────────────────────────
describe("runUnit — SF_AUTONOMOUS_VIA_SWARM=1 — happy path", () => {
test("routes through swarmDispatchAndWait and returns status=completed", async () => {
process.env.SF_AUTONOMOUS_VIA_SWARM = "1";
const ctx = makeCtx("/proj");
const pi = makePi();
const s = makeS("/proj");
const result = await runUnit(
ctx, pi, s,
"execute-task", "unit-001",
"implement the feature",
{},
);
expect(result.status).toBe("completed");
expect(result.requestDispatchedAt).toBeDefined();
expect(result._via).toBe("swarm");
});
test("calls swarmDispatchAndWait with correct envelope shape", async () => {
process.env.SF_AUTONOMOUS_VIA_SWARM = "1";
const ctx = makeCtx("/myproject");
const pi = makePi();
const s = makeS("/myproject");
await runUnit(
ctx, pi, s,
"research-slice", "slice-42",
"research the topic",
{ scope: "milestone-1", priority: 7 },
);
expect(mockSwarmDispatchAndWait).toHaveBeenCalledOnce();
const [basePath, envelope, opts] = mockSwarmDispatchAndWait.mock.calls[0];
expect(basePath).toBe("/myproject");
expect(envelope.unitId).toBe("slice-42");
expect(envelope.unitType).toBe("research-slice");
expect(envelope.workMode).toBe("research"); // research-slice → research
expect(envelope.payload).toBe("research the topic");
expect(envelope.scope).toBe("milestone-1");
expect(envelope.priority).toBe(7);
expect(opts.timeoutMs).toBeGreaterThan(0);
});
test("maps swarm reply into event.messages[last].content", async () => {
process.env.SF_AUTONOMOUS_VIA_SWARM = "1";
const ctx = makeCtx("/proj");
const pi = makePi();
const s = makeS("/proj");
const result = await runUnit(ctx, pi, s, "execute-task", "u-1", "do stuff", {});
const lastMsg = result.event.messages[result.event.messages.length - 1];
expect(lastMsg.content).toBe(MOCK_REPLY);
expect(lastMsg.role).toBe("assistant");
expect(lastMsg.replyMessageId).toBe(MOCK_REPLY_ID);
expect(lastMsg.targetAgent).toBe(MOCK_TARGET);
});
test("does NOT call pi.sendMessage in the swarm path", async () => {
process.env.SF_AUTONOMOUS_VIA_SWARM = "1";
const ctx = makeCtx("/proj");
const pi = makePi();
const s = makeS("/proj");
await runUnit(ctx, pi, s, "execute-task", "u-2", "prompt", {});
expect(pi.sendMessage).not.toHaveBeenCalled();
});
test("accepts SF_AUTONOMOUS_VIA_SWARM=true (string)", async () => {
process.env.SF_AUTONOMOUS_VIA_SWARM = "true";
const ctx = makeCtx("/proj");
const pi = makePi();
const s = makeS("/proj");
const result = await runUnit(ctx, pi, s, "plan-slice", "ps-1", "plan it", {});
expect(result.status).toBe("completed");
expect(mockSwarmDispatchAndWait).toHaveBeenCalledOnce();
});
test("workMode defaults to 'build' for unknown unitType", async () => {
process.env.SF_AUTONOMOUS_VIA_SWARM = "1";
const ctx = makeCtx("/proj");
const pi = makePi();
const s = makeS("/proj");
await runUnit(ctx, pi, s, "unknown-type-xyz", "u-3", "do it", {});
const [, envelope] = mockSwarmDispatchAndWait.mock.calls[0];
expect(envelope.workMode).toBe("build");
});
});
// ─── Flag ON — error paths ────────────────────────────────────────────────────
describe("runUnit — SF_AUTONOMOUS_VIA_SWARM=1 — error paths", () => {
test("returns cancelled when swarmDispatchAndWait throws", async () => {
process.env.SF_AUTONOMOUS_VIA_SWARM = "1";
mockSwarmDispatchAndWait.mockRejectedValueOnce(new Error("bus failure"));
const ctx = makeCtx("/proj");
const pi = makePi();
const s = makeS("/proj");
const result = await runUnit(ctx, pi, s, "execute-task", "u-err-1", "prompt", {});
expect(result.status).toBe("cancelled");
expect(result.errorContext.message).toContain("bus failure");
expect(result.errorContext.category).toBe("session-failed");
expect(result.requestDispatchedAt).toBeDefined();
});
test("returns cancelled when swarm returns reply=null", async () => {
process.env.SF_AUTONOMOUS_VIA_SWARM = "1";
mockSwarmDispatchAndWait.mockResolvedValueOnce({
messageId: "m-1",
targetAgent: "worker-1",
swarmName: "default",
reply: null,
replyMessageId: null,
error: "no reply",
});
const ctx = makeCtx("/proj");
const pi = makePi();
const s = makeS("/proj");
const result = await runUnit(ctx, pi, s, "execute-task", "u-err-2", "prompt", {});
expect(result.status).toBe("cancelled");
expect(result.errorContext.message).toContain("no reply");
});
test("returns cancelled when swarm result has error field", async () => {
process.env.SF_AUTONOMOUS_VIA_SWARM = "1";
mockSwarmDispatchAndWait.mockResolvedValueOnce({
messageId: "m-2",
targetAgent: "worker-1",
swarmName: "default",
reply: null,
replyMessageId: null,
error: "runAgentTurn threw: LLM timeout",
});
const ctx = makeCtx("/proj");
const pi = makePi();
const s = makeS("/proj");
const result = await runUnit(ctx, pi, s, "execute-task", "u-err-3", "prompt", {});
expect(result.status).toBe("cancelled");
expect(result.errorContext.message).toContain("LLM timeout");
});
test("notifies ctx.ui on swarm error", async () => {
process.env.SF_AUTONOMOUS_VIA_SWARM = "1";
mockSwarmDispatchAndWait.mockRejectedValueOnce(new Error("network failure"));
const ctx = makeCtx("/proj");
const pi = makePi();
const s = makeS("/proj");
await runUnit(ctx, pi, s, "execute-task", "u-err-4", "prompt", {});
expect(ctx.ui.notify).toHaveBeenCalledWith(
expect.stringContaining("network failure"),
"error",
);
});
});
// ─── Flag OFF — default path preserved ───────────────────────────────────────
describe("runUnit — SF_AUTONOMOUS_VIA_SWARM unset — default path", () => {
test("does NOT call swarmDispatchAndWait when flag is absent", async () => {
// Flag is deleted in beforeEach
expect(process.env.SF_AUTONOMOUS_VIA_SWARM).toBeUndefined();
const ctx = makeCtx("/tmp");
const pi = makePi();
const s = makeS("/tmp");
// Make newSession() reject immediately so runUnit returns without hanging.
s.cmdCtx.newSession = vi.fn(async () => { throw new Error("session-failed"); });
const result = await runUnit(ctx, pi, s, "execute-task", "u-default-1", "prompt", {});
// Swarm path was NOT taken
expect(mockSwarmDispatchAndWait).not.toHaveBeenCalled();
// Returns cancelled because newSession threw (legacy path was entered)
expect(result.status).toBe("cancelled");
});
test("does NOT call swarmDispatchAndWait when flag is '0'", async () => {
process.env.SF_AUTONOMOUS_VIA_SWARM = "0";
const ctx = makeCtx("/tmp");
const pi = makePi();
const s = makeS("/tmp");
s.cmdCtx.newSession = vi.fn(async () => { throw new Error("session-failed"); });
const result = await runUnit(ctx, pi, s, "execute-task", "u-default-2", "prompt", {});
expect(mockSwarmDispatchAndWait).not.toHaveBeenCalled();
expect(result.status).toBe("cancelled");
});
});
// ─── workMode derivation unit tests ──────────────────────────────────────────
describe("deriveWorkMode (via envelope.workMode in dispatch calls)", () => {
const cases = [
["research-slice", "research"],
["research-project", "research"],
["scout-context", "research"],
["review-milestone", "review"],
["gate-evaluate", "verify"],
["run-uat", "verify"],
["validate-milestone", "verify"],
["repair-slice", "repair"],
["fix-bug", "repair"],
["execute-task", "build"],
["implement-feature", "build"],
["plan-milestone", "plan"],
["plan-slice", "plan"],
["refine-slice", "plan"],
["replan-slice", "plan"],
["rewrite-docs", "document"],
["rewrite-spec", "document"], // contains "rewrite"
["scribe-notes", "document"], // contains "scribe"
["promote-spec", "build"], // no matching keyword → build (fallback)
["adversary-review", "review"], // "review" check comes before "adversar"
["challenge-mission", "challenge"], // "challenge" matches; no plan/slice/etc.
["unknown-type", "build"], // fallback
];
for (const [unitType, expectedWorkMode] of cases) {
test(`${unitType}${expectedWorkMode}`, async () => {
process.env.SF_AUTONOMOUS_VIA_SWARM = "1";
const ctx = makeCtx("/proj");
const pi = makePi();
const s = makeS("/proj");
await runUnit(ctx, pi, s, unitType, "u-wm-test", "prompt", {});
const [, envelope] = mockSwarmDispatchAndWait.mock.calls[0];
expect(envelope.workMode).toBe(expectedWorkMode);
vi.clearAllMocks();
});
}
});