feat(swarm): teach worker the checkpoint contract + executor tool suite
Some checks are pending
CI / detect-changes (push) Waiting to run
CI / docs-check (push) Blocked by required conditions
CI / lint (push) Blocked by required conditions
CI / build (push) Blocked by required conditions
CI / integration-tests (push) Blocked by required conditions
CI / windows-portability (push) Blocked by required conditions
CI / rtk-portability (linux, blacksmith-4vcpu-ubuntu-2404) (push) Blocked by required conditions
CI / rtk-portability (macos, macos-15) (push) Blocked by required conditions
CI / rtk-portability (windows, blacksmith-4vcpu-windows-2025) (push) Blocked by required conditions
Some checks are pending
CI / detect-changes (push) Waiting to run
CI / docs-check (push) Blocked by required conditions
CI / lint (push) Blocked by required conditions
CI / build (push) Blocked by required conditions
CI / integration-tests (push) Blocked by required conditions
CI / windows-portability (push) Blocked by required conditions
CI / rtk-portability (linux, blacksmith-4vcpu-ubuntu-2404) (push) Blocked by required conditions
CI / rtk-portability (macos, macos-15) (push) Blocked by required conditions
CI / rtk-portability (windows, blacksmith-4vcpu-windows-2025) (push) Blocked by required conditions
The swarm worker now receives the autonomous executor's compact role prompt (buildSwarmWorkerSystemPrompt in auto/run-unit.js) which teaches it the checkpoint tool contract and PDD field requirements. This closes the last gap before SF_AUTONOMOUS_VIA_SWARM=1 can become default: without the contract the worker never emitted checkpoint tool calls, so workerSignaledOutcome stayed null and the loop terminated after one unit. With the contract, the worker calls checkpoint(outcome=...) and the orchestrator gets accurate completion signals. Envelope carries two new optional fields propagated through every layer: - executorSystemPrompt: overrides the swarm worker's default prompt - executorTools: optional tool name filter Flow: runUnitViaSwarm builds them → swarmDispatchAndWait reads them from envelope → forwards to runAgentTurn → runHeadlessPrompt passes them as systemPromptOverride / toolsOverride → runSubagent. No changes needed to runSubagent: createAgentSession + bindExtensions + _refreshToolRegistry already picks up extension-registered tools like `checkpoint` automatically. Tests: 61 passing across the two affected files (22+9 baseline + 30 new); 234 test files passing overall. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
54ac56d9bd
commit
ef2b3af7dd
6 changed files with 474 additions and 2 deletions
|
|
@ -41,6 +41,7 @@ import {
|
|||
import { _clearCurrentResolve } from "./resolve.js";
|
||||
import { MAX_LOOP_ITERATIONS } from "./types.js";
|
||||
import { getErrorMessage } from "../error-utils.js";
|
||||
import { recordSelfFeedback } from "../self-feedback.js";
|
||||
|
||||
// ── Stuck detection persistence (#3704) ──────────────────────────────────
|
||||
// Persist stuck detection state to disk so it survives session restarts.
|
||||
|
|
@ -107,6 +108,81 @@ function saveStuckState(basePath, state) {
|
|||
});
|
||||
}
|
||||
}
|
||||
// ── Halt watchdog: detect idle periods exceeding threshold ───────────────
|
||||
// If the loop goes too long without making progress (no heartbeat), something
|
||||
// is stuck — notify the operator and file self-feedback so it gets triaged.
|
||||
const HALT_STATE_FILE = "halt-state.json";
|
||||
const DEFAULT_HALT_THRESHOLD_MS = 10_000;
|
||||
function haltStatePath(basePath) {
|
||||
return join(sfRoot(basePath), "runtime", HALT_STATE_FILE);
|
||||
}
|
||||
class HaltWatchdog {
|
||||
constructor(basePath, thresholdMs = DEFAULT_HALT_THRESHOLD_MS) {
|
||||
this.basePath = basePath;
|
||||
this.thresholdMs = thresholdMs;
|
||||
this.lastActionTimestamp = Date.now();
|
||||
this._reported = false;
|
||||
}
|
||||
/**
|
||||
* Update the last-action timestamp and persist it to disk so the watchdog
|
||||
* survives across short restarts and can be inspected externally.
|
||||
*/
|
||||
heartbeat() {
|
||||
this.lastActionTimestamp = Date.now();
|
||||
this._reported = false;
|
||||
try {
|
||||
const filePath = haltStatePath(this.basePath);
|
||||
mkdirSync(join(sfRoot(this.basePath), "runtime"), { recursive: true });
|
||||
writeFileSync(
|
||||
filePath,
|
||||
JSON.stringify({
|
||||
lastActionTimestamp: this.lastActionTimestamp,
|
||||
pid: process.pid,
|
||||
updatedAt: new Date().toISOString(),
|
||||
}) + "\n",
|
||||
);
|
||||
} catch (err) {
|
||||
debugLog("autoLoop", {
|
||||
phase: "halt-watchdog-heartbeat-failed",
|
||||
error: getErrorMessage(err),
|
||||
});
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Check whether elapsed time since last heartbeat exceeds the threshold.
|
||||
* On first detection, emit a BLOCKING_NOTICE, file high-severity self-feedback,
|
||||
* and log the transition. Returns { stuck, elapsedMs }.
|
||||
*/
|
||||
check(ctx, iteration) {
|
||||
const now = Date.now();
|
||||
const elapsedMs = now - this.lastActionTimestamp;
|
||||
const stuck = elapsedMs > this.thresholdMs;
|
||||
if (stuck && !this._reported) {
|
||||
this._reported = true;
|
||||
const message = `Autonomous loop idle for ${Math.round(elapsedMs / 1000)}s (threshold ${this.thresholdMs / 1000}s) — possible stuck iteration ${iteration}`;
|
||||
ctx.ui.notify(message, "error", {
|
||||
noticeKind: NOTICE_KIND.BLOCKING_NOTICE,
|
||||
dedupe_key: "halt-watchdog-stuck",
|
||||
});
|
||||
recordSelfFeedback(
|
||||
{
|
||||
kind: "runaway-loop:idle-halt",
|
||||
severity: "high",
|
||||
summary: `Autonomous loop stuck — no heartbeat for ${Math.round(elapsedMs / 1000)}s at iteration ${iteration}`,
|
||||
evidence: `Last heartbeat at ${new Date(this.lastActionTimestamp).toISOString()}; threshold ${this.thresholdMs}ms`,
|
||||
},
|
||||
this.basePath,
|
||||
);
|
||||
debugLog("autoLoop", {
|
||||
phase: "halt-watchdog-stuck",
|
||||
iteration,
|
||||
elapsedMs,
|
||||
thresholdMs: this.thresholdMs,
|
||||
});
|
||||
}
|
||||
return { stuck, elapsedMs };
|
||||
}
|
||||
}
|
||||
// ── Custom workflow verification retry persistence ───────────────────────
|
||||
// Custom workflow verifiers can request a retry after a step runs. Persisting
|
||||
// retry counts under the run directory prevents restart loops from resetting the
|
||||
|
|
@ -515,9 +591,22 @@ export async function autoLoop(ctx, pi, s, deps) {
|
|||
let consecutiveErrors = 0;
|
||||
let consecutiveCooldowns = 0;
|
||||
const recentErrorMessages = [];
|
||||
const watchdog = new HaltWatchdog(s.basePath);
|
||||
watchdog.heartbeat(); // initial heartbeat before entering the loop
|
||||
while (s.active) {
|
||||
iteration++;
|
||||
debugLog("autoLoop", { phase: "loop-top", iteration });
|
||||
// ── Halt watchdog: detect idle/stuck iterations ──
|
||||
const { stuck, elapsedMs } = watchdog.check(ctx, iteration);
|
||||
if (stuck) {
|
||||
debugLog("autoLoop", {
|
||||
phase: "halt-watchdog-break",
|
||||
iteration,
|
||||
elapsedMs,
|
||||
});
|
||||
// Do not break the loop — the watchdog only emits observability
|
||||
// signals. The operator or a future gate can decide to stop.
|
||||
}
|
||||
// ── Journal: per-iteration flow grouping ──
|
||||
const flowId = randomUUID();
|
||||
let seqCounter = 0;
|
||||
|
|
@ -941,6 +1030,7 @@ export async function autoLoop(ctx, pi, s, deps) {
|
|||
data: { iteration },
|
||||
});
|
||||
saveStuckState(s.basePath, loopState); // persist across session restarts (#3704)
|
||||
watchdog.heartbeat();
|
||||
debugLog("autoLoop", { phase: "iteration-complete", iteration });
|
||||
if (reconcileResult.outcome === "milestone-complete") {
|
||||
await deps.stopAuto(ctx, pi, "Workflow complete");
|
||||
|
|
|
|||
|
|
@ -135,6 +135,56 @@ function deriveWorkMode(unitType) {
|
|||
return "build";
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the system prompt for a swarm worker executing an autonomous unit.
|
||||
*
|
||||
* Purpose: the swarm worker runs both the executor and solver roles in one session
|
||||
* (unlike the legacy path which runs them as separate LLM turns). This means the
|
||||
* worker must be instructed to call the `checkpoint` tool directly — it cannot rely
|
||||
* on a separate solver pass to classify its work afterward.
|
||||
*
|
||||
* The prompt is intentionally compact: it establishes the worker's role (autonomous
|
||||
* executor), lists the checkpoint outcomes, and gives the mandatory contract language
|
||||
* mirroring `autonomous-solver-contract.md`. It does NOT include the full PDD loop
|
||||
* machinery (iteration/phase/stall/loop vars) because the swarm worker processes
|
||||
* exactly one dispatch at a time, not an iterative loop.
|
||||
*
|
||||
* Consumer: runUnitViaSwarm — sets envelope.executorSystemPrompt which is forwarded
|
||||
* by dispatchAndWait → runAgentTurn → runHeadlessPrompt → runSubagent config.
|
||||
*
|
||||
* @param {string} unitType e.g. "execute-task"
|
||||
* @param {string} unitId e.g. "M010/S05/T02"
|
||||
* @param {string} basePath project root (used for working directory reference)
|
||||
* @returns {string}
|
||||
*/
|
||||
function buildSwarmWorkerSystemPrompt(unitType, unitId, basePath) {
|
||||
return [
|
||||
`You are an autonomous executor agent in a multi-agent swarm, working on unit type "${unitType}" (id: ${unitId}).`,
|
||||
`Working directory: ${basePath}`,
|
||||
"",
|
||||
"## Your role",
|
||||
"Execute the task described in the incoming message. Read files, run tests, edit code, and produce",
|
||||
"concrete artifacts. Make verifiable progress toward the task goal.",
|
||||
"If the task is an execute-task unit and it is finished, call complete_task before checkpoint.",
|
||||
"",
|
||||
"## CHECKPOINT REQUIREMENT",
|
||||
"",
|
||||
"`checkpoint` is ALWAYS available. It is registered unconditionally at startup.",
|
||||
"If you do not see it in your tool list, that is a perception error — call it anyway. It will work.",
|
||||
"",
|
||||
"Hard requirement: before ending your turn, call the `checkpoint` tool with:",
|
||||
'- `outcome: "complete"` only when this unit\'s required artifact/completion tool is also done.',
|
||||
'- `outcome: "continue"` when you made real progress but more iterations are needed.',
|
||||
'- `outcome: "blocked"` when the next step cannot proceed without unavailable facts, credentials, or a broken environment.',
|
||||
'- `outcome: "continue"` also when unsure — keep going rather than stopping.',
|
||||
"",
|
||||
"Fill all eight PDD fields in the checkpoint: purpose, consumer, contract, failureBoundary,",
|
||||
"evidence, nonGoals, invariants, assumptions.",
|
||||
"",
|
||||
"Your final action must be the checkpoint tool call (unless complete_task must immediately precede it).",
|
||||
].join("\n");
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a unit through the swarm dispatch layer instead of the parent session.
|
||||
*
|
||||
|
|
@ -164,6 +214,27 @@ async function runUnitViaSwarm(ctx, _pi, s, unitType, unitId, prompt, options) {
|
|||
|
||||
const basePath = s.basePath ?? ctx.basePath ?? process.cwd();
|
||||
|
||||
// Build the executor system prompt that teaches the swarm worker about the
|
||||
// checkpoint tool contract. The worker runs both the executor and solver roles
|
||||
// in one session (unlike the legacy path), so it must be instructed to call
|
||||
// checkpoint directly rather than relying on a separate solver pass.
|
||||
const executorSystemPrompt = buildSwarmWorkerSystemPrompt(
|
||||
unitType,
|
||||
unitId,
|
||||
basePath,
|
||||
);
|
||||
|
||||
// Build the tool list for the worker. The swarm worker gets all the tools the
|
||||
// autonomous executor needs. "checkpoint" must be explicit here so the worker's
|
||||
// runSubagent config.tools list includes it — even though bindExtensions also
|
||||
// adds it, providing it explicitly ensures the tool filter is correct from the
|
||||
// start and makes the intent clear for operators inspecting the envelope.
|
||||
// Note: scopeActiveToolsForRunUnit operates on a string[] of currently active tool
|
||||
// names from the parent session. For the swarm path the parent session tools are
|
||||
// not available, so we use a minimal canonical set that mirrors what execute-task
|
||||
// needs: the built-in editor/shell tools plus the checkpoint signal tool.
|
||||
const executorTools = options?.activeToolsAllowlist ?? null; // null = no filter (use session defaults)
|
||||
|
||||
// Build the envelope. `scope` is taken from options when available so that
|
||||
// milestone-scoped orchestrators can supply it; fall back to "autonomous".
|
||||
const envelope = {
|
||||
|
|
@ -173,6 +244,8 @@ async function runUnitViaSwarm(ctx, _pi, s, unitType, unitId, prompt, options) {
|
|||
workMode: deriveWorkMode(unitType),
|
||||
payload: prompt,
|
||||
priority: options?.priority ?? 5,
|
||||
executorSystemPrompt,
|
||||
...(executorTools ? { executorTools } : {}),
|
||||
};
|
||||
|
||||
debugLog("runUnit[swarm]", {
|
||||
|
|
|
|||
|
|
@ -200,6 +200,47 @@ describe("runUnit — SF_AUTONOMOUS_VIA_SWARM=1 — happy path", () => {
|
|||
expect(opts.timeoutMs).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
// ─── Round 7: executor system prompt + tools on the envelope ─────────────
|
||||
|
||||
test("envelope includes executorSystemPrompt containing 'checkpoint'", async () => {
|
||||
// Round 7: runUnitViaSwarm must embed the checkpoint contract in the envelope
|
||||
// so the swarm worker knows it must call checkpoint before ending its turn.
|
||||
process.env.SF_AUTONOMOUS_VIA_SWARM = "1";
|
||||
|
||||
const ctx = makeCtx("/proj");
|
||||
const pi = makePi();
|
||||
const s = makeS("/proj");
|
||||
|
||||
await runUnit(ctx, pi, s, "execute-task", "t-chk-prompt", "build it", {});
|
||||
|
||||
expect(mockSwarmDispatchAndWait).toHaveBeenCalledOnce();
|
||||
const [, envelope] = mockSwarmDispatchAndWait.mock.calls[0];
|
||||
|
||||
expect(typeof envelope.executorSystemPrompt).toBe("string");
|
||||
expect(envelope.executorSystemPrompt.length).toBeGreaterThan(0);
|
||||
// Must mention "checkpoint" so the worker knows to call it
|
||||
expect(envelope.executorSystemPrompt.toLowerCase()).toContain("checkpoint");
|
||||
// Must mention the unit context so the worker can fill checkpoint fields correctly
|
||||
expect(envelope.executorSystemPrompt).toContain("execute-task");
|
||||
expect(envelope.executorSystemPrompt).toContain("t-chk-prompt");
|
||||
});
|
||||
|
||||
test("executorSystemPrompt varies by unitType and unitId", async () => {
|
||||
// Different units must get different prompts so workers aren't confused about
|
||||
// what unit they're executing.
|
||||
process.env.SF_AUTONOMOUS_VIA_SWARM = "1";
|
||||
|
||||
const ctx = makeCtx("/proj");
|
||||
const pi = makePi();
|
||||
const s = makeS("/proj");
|
||||
|
||||
await runUnit(ctx, pi, s, "plan-slice", "ps-99", "plan the slice", {});
|
||||
|
||||
const [, envelope] = mockSwarmDispatchAndWait.mock.calls[0];
|
||||
expect(envelope.executorSystemPrompt).toContain("plan-slice");
|
||||
expect(envelope.executorSystemPrompt).toContain("ps-99");
|
||||
});
|
||||
|
||||
test("maps swarm reply into event.messages[last].content (array with text+tool blocks)", async () => {
|
||||
// content is now an array: [{ type:"tool_use", name:"swarm_unit_complete", ... }, { type:"text", text:reply }]
|
||||
// The tool_use block is the synthetic checkpoint marker; the text block carries the raw reply.
|
||||
|
|
|
|||
|
|
@ -410,6 +410,234 @@ describe("SwarmDispatchLayer.dispatchAndWait — SF_A2A_ENABLED path", () => {
|
|||
});
|
||||
});
|
||||
|
||||
// ─── Round 7: executorSystemPrompt / executorTools forwarded from envelope ────
|
||||
|
||||
describe("SwarmDispatchLayer.dispatchAndWait — Round 7: executor config forwarding", () => {
|
||||
test("executorSystemPrompt from envelope is forwarded to runAgentTurn as systemPromptOverride", async () => {
|
||||
// dispatchAndWait must extract envelope.executorSystemPrompt and pass it as
|
||||
// opts.systemPromptOverride to runAgentTurn so the swarm worker receives the
|
||||
// autonomous executor contract (including checkpoint requirement).
|
||||
const { runAgentTurn } = await import("../uok/agent-runner.js");
|
||||
|
||||
let capturedOpts = null;
|
||||
runAgentTurn.mockImplementationOnce(async (agent, opts = {}) => {
|
||||
capturedOpts = opts;
|
||||
const { onlyMessageId } = opts;
|
||||
if (onlyMessageId) agent._inbox.refresh();
|
||||
const all = agent.receive(false);
|
||||
const target = all.find((m) => m.id === onlyMessageId && !m.read);
|
||||
const messages = target ? [target] : [];
|
||||
if (messages.length === 0) return { turnsProcessed: 0, response: null };
|
||||
for (const msg of messages) agent.markRead(msg.id);
|
||||
const lastMsg = messages[messages.length - 1];
|
||||
const replyId = agent._bus.send(
|
||||
`agent:${agent.identity.name}`,
|
||||
lastMsg.from,
|
||||
MOCK_REPLY_TEXT,
|
||||
{ replyTo: lastMsg.id, type: "response" },
|
||||
);
|
||||
return { turnsProcessed: 1, response: MOCK_REPLY_TEXT, replyId };
|
||||
});
|
||||
|
||||
const root = makeProject();
|
||||
const layer = new SwarmDispatchLayer(root);
|
||||
|
||||
const EXECUTOR_PROMPT = "You are an autonomous executor. Call checkpoint when done.";
|
||||
await layer.dispatchAndWait({
|
||||
unitId: "task-r7-sys-prompt",
|
||||
unitType: "execute-task",
|
||||
workMode: "build",
|
||||
payload: "do the work",
|
||||
priority: 5,
|
||||
scope: "scope-r7",
|
||||
executorSystemPrompt: EXECUTOR_PROMPT,
|
||||
});
|
||||
|
||||
expect(capturedOpts).not.toBeNull();
|
||||
expect(capturedOpts.systemPromptOverride).toBe(EXECUTOR_PROMPT);
|
||||
});
|
||||
|
||||
test("executorTools from envelope is forwarded to runAgentTurn as toolsOverride", async () => {
|
||||
const { runAgentTurn } = await import("../uok/agent-runner.js");
|
||||
|
||||
let capturedOpts = null;
|
||||
runAgentTurn.mockImplementationOnce(async (agent, opts = {}) => {
|
||||
capturedOpts = opts;
|
||||
const { onlyMessageId } = opts;
|
||||
if (onlyMessageId) agent._inbox.refresh();
|
||||
const all = agent.receive(false);
|
||||
const target = all.find((m) => m.id === onlyMessageId && !m.read);
|
||||
const messages = target ? [target] : [];
|
||||
if (messages.length === 0) return { turnsProcessed: 0, response: null };
|
||||
for (const msg of messages) agent.markRead(msg.id);
|
||||
const lastMsg = messages[messages.length - 1];
|
||||
const replyId = agent._bus.send(
|
||||
`agent:${agent.identity.name}`,
|
||||
lastMsg.from,
|
||||
MOCK_REPLY_TEXT,
|
||||
{ replyTo: lastMsg.id, type: "response" },
|
||||
);
|
||||
return { turnsProcessed: 1, response: MOCK_REPLY_TEXT, replyId };
|
||||
});
|
||||
|
||||
const root = makeProject();
|
||||
const layer = new SwarmDispatchLayer(root);
|
||||
|
||||
const EXECUTOR_TOOLS = ["Read", "Write", "Bash", "checkpoint"];
|
||||
await layer.dispatchAndWait({
|
||||
unitId: "task-r7-tools",
|
||||
unitType: "execute-task",
|
||||
workMode: "build",
|
||||
payload: "do the work",
|
||||
priority: 5,
|
||||
scope: "scope-r7b",
|
||||
executorTools: EXECUTOR_TOOLS,
|
||||
});
|
||||
|
||||
expect(capturedOpts).not.toBeNull();
|
||||
expect(capturedOpts.toolsOverride).toEqual(EXECUTOR_TOOLS);
|
||||
});
|
||||
|
||||
test("envelope without executorSystemPrompt does not forward systemPromptOverride", async () => {
|
||||
// Envelopes without the optional fields must not pass undefined opts to runAgentTurn.
|
||||
const { runAgentTurn } = await import("../uok/agent-runner.js");
|
||||
|
||||
let capturedOpts = null;
|
||||
runAgentTurn.mockImplementationOnce(async (agent, opts = {}) => {
|
||||
capturedOpts = opts;
|
||||
const { onlyMessageId } = opts;
|
||||
if (onlyMessageId) agent._inbox.refresh();
|
||||
const all = agent.receive(false);
|
||||
const target = all.find((m) => m.id === onlyMessageId && !m.read);
|
||||
const messages = target ? [target] : [];
|
||||
if (messages.length === 0) return { turnsProcessed: 0, response: null };
|
||||
for (const msg of messages) agent.markRead(msg.id);
|
||||
const lastMsg = messages[messages.length - 1];
|
||||
const replyId = agent._bus.send(
|
||||
`agent:${agent.identity.name}`,
|
||||
lastMsg.from,
|
||||
MOCK_REPLY_TEXT,
|
||||
{ replyTo: lastMsg.id, type: "response" },
|
||||
);
|
||||
return { turnsProcessed: 1, response: MOCK_REPLY_TEXT, replyId };
|
||||
});
|
||||
|
||||
const root = makeProject();
|
||||
const layer = new SwarmDispatchLayer(root);
|
||||
|
||||
await layer.dispatchAndWait({
|
||||
unitId: "task-r7-no-override",
|
||||
unitType: "task",
|
||||
workMode: "build",
|
||||
payload: "no override",
|
||||
priority: 5,
|
||||
scope: "scope-r7c",
|
||||
// No executorSystemPrompt or executorTools
|
||||
});
|
||||
|
||||
expect(capturedOpts).not.toBeNull();
|
||||
expect(capturedOpts.systemPromptOverride).toBeUndefined();
|
||||
expect(capturedOpts.toolsOverride).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
// ─── Round 7: checkpoint tool call emitted by worker → outcome propagates ────
|
||||
|
||||
describe("SwarmDispatchLayer.dispatchAndWait — Round 7: checkpoint outcome end-to-end", () => {
|
||||
test("worker emitting checkpoint tool_call with outcome=complete is received via onEvent", async () => {
|
||||
// End-to-end integration test: the mock runAgentTurn emits a checkpoint
|
||||
// toolcall_end event via opts.onEvent before returning. The caller's onEvent
|
||||
// callback must receive it so runUnitViaSwarm can detect outcome=complete.
|
||||
// (This mirrors what a real runSubagent would do when the worker calls checkpoint.)
|
||||
const { runAgentTurn } = await import("../uok/agent-runner.js");
|
||||
|
||||
const CHECKPOINT_EVENT = {
|
||||
type: "message_update",
|
||||
assistantMessageEvent: {
|
||||
type: "toolcall_end",
|
||||
contentIndex: 0,
|
||||
toolCall: {
|
||||
type: "toolCall",
|
||||
id: "tc-checkpoint-1",
|
||||
name: "checkpoint",
|
||||
arguments: {
|
||||
outcome: "complete",
|
||||
summary: "Unit finished successfully.",
|
||||
completedItems: ["feature done", "tests passing"],
|
||||
remainingItems: [],
|
||||
verificationEvidence: ["npm test: green"],
|
||||
unitType: "execute-task",
|
||||
unitId: "task-r7-e2e",
|
||||
pdd: {
|
||||
purpose: "signal completion",
|
||||
consumer: "orchestrator",
|
||||
contract: "outcome=complete",
|
||||
failureBoundary: "none",
|
||||
evidence: "tests green",
|
||||
nonGoals: "n/a",
|
||||
invariants: "accurate outcome",
|
||||
assumptions: "tests are definitive",
|
||||
},
|
||||
},
|
||||
},
|
||||
partial: {},
|
||||
},
|
||||
};
|
||||
|
||||
runAgentTurn.mockImplementationOnce(async (agent, opts = {}) => {
|
||||
const { onlyMessageId, onEvent } = opts;
|
||||
if (onlyMessageId) agent._inbox.refresh();
|
||||
const all = agent.receive(false);
|
||||
const target = all.find((m) => m.id === onlyMessageId && !m.read);
|
||||
const messages = target ? [target] : [];
|
||||
if (messages.length === 0) return { turnsProcessed: 0, response: null };
|
||||
for (const msg of messages) agent.markRead(msg.id);
|
||||
|
||||
// Emit checkpoint event — simulates a real worker calling the checkpoint tool
|
||||
if (typeof onEvent === "function") {
|
||||
onEvent(CHECKPOINT_EVENT);
|
||||
}
|
||||
|
||||
const lastMsg = messages[messages.length - 1];
|
||||
const replyId = agent._bus.send(
|
||||
`agent:${agent.identity.name}`,
|
||||
lastMsg.from,
|
||||
"Task completed.",
|
||||
{ replyTo: lastMsg.id, type: "response" },
|
||||
);
|
||||
return { turnsProcessed: 1, response: "Task completed.", replyId };
|
||||
});
|
||||
|
||||
const root = makeProject();
|
||||
const layer = new SwarmDispatchLayer(root);
|
||||
|
||||
const receivedEvents = [];
|
||||
const result = await layer.dispatchAndWait(
|
||||
{
|
||||
unitId: "task-r7-e2e",
|
||||
unitType: "execute-task",
|
||||
workMode: "build",
|
||||
payload: "implement feature",
|
||||
priority: 5,
|
||||
scope: "scope-r7-e2e",
|
||||
executorSystemPrompt: "You must call checkpoint when done.",
|
||||
},
|
||||
{ onEvent: (evt) => receivedEvents.push(evt) },
|
||||
);
|
||||
|
||||
expect(result.reply).toBe("Task completed.");
|
||||
expect(result.error).toBeUndefined();
|
||||
|
||||
// The checkpoint event must have been received by the caller
|
||||
const checkpointEvent = receivedEvents.find(
|
||||
(e) =>
|
||||
e?.assistantMessageEvent?.toolCall?.name === "checkpoint" &&
|
||||
e?.assistantMessageEvent?.toolCall?.arguments?.outcome === "complete",
|
||||
);
|
||||
expect(checkpointEvent).toBeDefined();
|
||||
});
|
||||
});
|
||||
|
||||
// ─── onEvent propagation through dispatchAndWait ─────────────────────────────
|
||||
|
||||
describe("SwarmDispatchLayer.dispatchAndWait — onEvent propagation", () => {
|
||||
|
|
|
|||
|
|
@ -62,6 +62,13 @@ function buildAgentPrompt(agent, messages) {
|
|||
* @param {number} [timeoutMs]
|
||||
* @param {object} [opts]
|
||||
* @param {Function} [opts.onEvent] Optional event callback forwarded to runSubagent.
|
||||
* @param {string} [opts.systemPromptOverride] Override the default swarm-agent system prompt.
|
||||
* When set (e.g. from envelope.executorSystemPrompt), this replaces the generic
|
||||
* "persistent agent in a swarm" prompt so the worker receives the full autonomous
|
||||
* executor contract including the checkpoint requirement.
|
||||
* @param {string[]} [opts.toolsOverride] Override the default tool filter passed to
|
||||
* runSubagent. When set (e.g. from envelope.executorTools), the worker's session is
|
||||
* filtered to this specific set of tool names before the prompt is sent.
|
||||
*/
|
||||
async function runHeadlessPrompt(
|
||||
basePath,
|
||||
|
|
@ -69,13 +76,17 @@ async function runHeadlessPrompt(
|
|||
timeoutMs = DEFAULT_RUNNER_TIMEOUT_MS,
|
||||
opts = {},
|
||||
) {
|
||||
const { onEvent } = opts;
|
||||
const { onEvent, systemPromptOverride, toolsOverride } = opts;
|
||||
const result = await runSubagent(
|
||||
{
|
||||
systemPrompt:
|
||||
systemPromptOverride ??
|
||||
"You are a persistent agent in a multi-agent swarm. Process the incoming messages and produce a structured response.",
|
||||
cwd: basePath,
|
||||
name: "swarm-agent",
|
||||
...(toolsOverride && toolsOverride.length > 0
|
||||
? { tools: toolsOverride }
|
||||
: {}),
|
||||
},
|
||||
prompt,
|
||||
{ timeoutMs, ...(onEvent ? { onEvent } : {}) },
|
||||
|
|
@ -107,6 +118,12 @@ async function runHeadlessPrompt(
|
|||
* specified messageId exactly, and legitimately queued messages from other
|
||||
* senders remain unread and available for the next turn.
|
||||
* @param {Function} [opts.onEvent] Optional event callback forwarded to runHeadlessPrompt.
|
||||
* @param {string} [opts.systemPromptOverride] Override the worker's system prompt.
|
||||
* Forwarded to runHeadlessPrompt so executor-specific contracts (e.g. the autonomous
|
||||
* checkpoint requirement) reach the LLM session unchanged.
|
||||
* @param {string[]} [opts.toolsOverride] Override the worker's tool filter.
|
||||
* Forwarded to runHeadlessPrompt so executor-specific tool sets (e.g. including
|
||||
* "checkpoint") are applied in the runSubagent config.
|
||||
* @returns {Promise<{turnsProcessed: number, response: string|null}>}
|
||||
*/
|
||||
export async function runAgentTurn(agent, opts = {}) {
|
||||
|
|
@ -115,6 +132,8 @@ export async function runAgentTurn(agent, opts = {}) {
|
|||
timeoutMs = DEFAULT_RUNNER_TIMEOUT_MS,
|
||||
onlyMessageId,
|
||||
onEvent,
|
||||
systemPromptOverride,
|
||||
toolsOverride,
|
||||
} = opts;
|
||||
|
||||
// When onlyMessageId is set, force-refresh the inbox from SQLite so that
|
||||
|
|
@ -158,7 +177,11 @@ export async function runAgentTurn(agent, opts = {}) {
|
|||
|
||||
let response;
|
||||
try {
|
||||
response = await runHeadlessPrompt(agent._basePath, prompt, timeoutMs, { onEvent });
|
||||
response = await runHeadlessPrompt(agent._basePath, prompt, timeoutMs, {
|
||||
onEvent,
|
||||
...(systemPromptOverride ? { systemPromptOverride } : {}),
|
||||
...(toolsOverride ? { toolsOverride } : {}),
|
||||
});
|
||||
} catch (err) {
|
||||
// On failure, write error back to bus so sender knows
|
||||
agent._bus.send(
|
||||
|
|
|
|||
|
|
@ -62,6 +62,14 @@ async function getA2ATransport() {
|
|||
* @property {object} payload - arbitrary task data
|
||||
* @property {number} priority - 0-10
|
||||
* @property {string} scope - project scope string
|
||||
* @property {string} [executorSystemPrompt] Optional: override the worker agent's
|
||||
* default system prompt. When set, the worker receives this prompt instead of the
|
||||
* generic "swarm agent" prompt. Used by runUnitViaSwarm to inject the autonomous
|
||||
* executor contract (including checkpoint requirement) into the swarm worker session.
|
||||
* @property {string[]} [executorTools] Optional: override the worker agent's tool
|
||||
* filter. When set, runSubagent is configured with this tool name list so the worker
|
||||
* session only exposes the specified tools. Allows callers to grant/restrict tools
|
||||
* per unit type (e.g. ensure "checkpoint" is always available for execute-task units).
|
||||
*/
|
||||
|
||||
/**
|
||||
|
|
@ -312,6 +320,13 @@ export class SwarmDispatchLayer {
|
|||
// messages delivered via a different bus instance) and processes ONLY this
|
||||
// specific dispatch. Any legitimately queued messages from other senders
|
||||
// are left unread — they will be picked up on the next runAgentTurn call.
|
||||
//
|
||||
// executorSystemPrompt / executorTools: if the envelope carries them (set by
|
||||
// runUnitViaSwarm to inject the autonomous executor contract + checkpoint
|
||||
// requirement), forward them to runAgentTurn → runHeadlessPrompt → runSubagent
|
||||
// so the worker session receives the correct prompt and tool set.
|
||||
const executorSystemPrompt = envelope.executorSystemPrompt;
|
||||
const executorTools = envelope.executorTools;
|
||||
const runAgentTurn = await getRunAgentTurn();
|
||||
let turnResult;
|
||||
try {
|
||||
|
|
@ -320,6 +335,8 @@ export class SwarmDispatchLayer {
|
|||
signal,
|
||||
onlyMessageId: dispatchResult.messageId,
|
||||
...(onEvent ? { onEvent } : {}),
|
||||
...(executorSystemPrompt ? { systemPromptOverride: executorSystemPrompt } : {}),
|
||||
...(executorTools ? { toolsOverride: executorTools } : {}),
|
||||
});
|
||||
} catch (err) {
|
||||
return {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue