feat(swarm): dispatchAndWait — synchronous request/response for swarm agents

Add SwarmDispatchLayer.dispatchAndWait(envelope, { timeoutMs, signal })
which enqueues via _busDispatch, drives the target agent's turn via
runAgentTurn (in-process runSubagent), and reads back the agent's reply
from the bus. Returns DispatchResult extended with reply + replyMessageId.

This is the missing piece for collapsing /delegate-style subagent calls
into the swarm interface: callers that need a reply (not just delivery)
can now use the swarm contract instead of the subagent extension's
bespoke dispatch path. Round 4 will migrate those callers.

New helper MessageBus.getReplyTo(messageId, fromAgent) queries SQLite
directly via json_extract for the most recent reply to a given message.

Plus 8 tests covering happy path, error paths (no reply, runner throws,
runner returns {error}), the swarmDispatchAndWait convenience function,
and the A2A short-circuit path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mikael Hugo 2026-05-15 04:12:36 +02:00
parent 903cdd4d9d
commit bbade22388
5 changed files with 506 additions and 4 deletions

View file

@ -1516,7 +1516,11 @@ export async function startAuto(ctx, pi, base, verboseMode, options) {
const interruptedAssessment = options?.interrupted ?? null;
// Default: agent CAN ask the user. Autonomous mode flips this off so the
// agent must self-resolve via code/web/lookup.
s.canAskUser = options?.canAskUser !== false;
// Headless mode also disables user questions — there is no UI to answer them.
// Without this, the agent calls ask_user_questions, gets a "UI not available"
// error, and may retry in a loop instead of self-resolving.
s.canAskUser =
process.env.SF_HEADLESS === "1" ? false : options?.canAskUser !== false;
if (options?.milestoneLock !== undefined) {
s.sessionMilestoneLock = options.milestoneLock ?? null;
}

View file

@ -0,0 +1,296 @@
/**
* swarm-dispatch-and-wait.test.mjs Tests for SwarmDispatchLayer.dispatchAndWait().
*
* Purpose: verify that dispatchAndWait enqueues an envelope, drives runAgentTurn,
* reads the agent reply from the bus, and returns the structured result with `reply`
* and `replyMessageId` populated.
*
* Consumer: CI unit-test suite (`npm run test:unit`).
*/
import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
import { _clearSfRootCache } from "../paths.js";
import { closeDatabase } from "../sf-db.js";
import {
SwarmDispatchLayer,
swarmDispatchAndWait,
} from "../uok/swarm-dispatch.js";
import { createDefaultSwarm } from "../uok/swarm-roles.js";
// ─── Mock agent-runner.js ─────────────────────────────────────────────────────
// We stub runAgentTurn so the test never spawns an LLM subprocess. The mock
// writes a deterministic reply to the bus and returns turnsProcessed + replyId,
// exactly as the real implementation does.
const MOCK_REPLY_TEXT = "mock agent reply: task complete";
vi.mock("../uok/agent-runner.js", () => ({
runAgentTurn: vi.fn(async (agent, _opts) => {
// Read the unread inbox messages (same as the real runner)
const messages = agent.receive(true);
if (messages.length === 0) {
return { turnsProcessed: 0, response: null };
}
// Mark messages as read
for (const msg of messages) {
agent.markRead(msg.id);
}
// Write a deterministic reply back to the bus — replyTo the last message
const lastMsg = messages[messages.length - 1];
const replyId = agent._bus.send(
`agent:${agent.identity.name}`,
lastMsg.from,
MOCK_REPLY_TEXT,
{ replyTo: lastMsg.id, type: "response" },
);
return {
turnsProcessed: messages.length,
response: MOCK_REPLY_TEXT,
replyId,
};
}),
}));
// ─── Shared Setup ─────────────────────────────────────────────────────────────
const tmpRoots = [];
beforeEach(() => {
vi.clearAllMocks();
});
afterEach(() => {
closeDatabase();
_clearSfRootCache();
for (const root of tmpRoots.splice(0)) {
rmSync(root, { recursive: true, force: true });
}
});
function makeProject() {
const root = mkdtempSync(join(tmpdir(), "sf-daw-"));
tmpRoots.push(root);
return root;
}
// ─── dispatchAndWait — happy path ─────────────────────────────────────────────
describe("SwarmDispatchLayer.dispatchAndWait — happy path", () => {
test("returns messageId, targetAgent, reply, and replyMessageId", async () => {
const root = makeProject();
const layer = new SwarmDispatchLayer(root);
const result = await layer.dispatchAndWait({
unitId: "task-daw-1",
unitType: "task",
workMode: "build",
payload: "implement feature X",
priority: 5,
scope: "test-scope",
});
expect(result.messageId).toBeTruthy();
expect(result.targetAgent).toBeTruthy();
expect(result.reply).toBe(MOCK_REPLY_TEXT);
expect(result.replyMessageId).toBeTruthy();
expect(typeof result.replyMessageId).toBe("string");
expect(result.error).toBeUndefined();
});
test("targetAgent is a worker for workMode=build", async () => {
const root = makeProject();
const layer = new SwarmDispatchLayer(root);
const result = await layer.dispatchAndWait({
unitId: "task-daw-2",
unitType: "task",
workMode: "build",
payload: "build something",
priority: 3,
scope: "scope-A",
});
const swarm = await layer.getOrCreateSwarm();
const agent = swarm.get(result.targetAgent);
expect(agent.identity.role).toBe("worker");
});
test("targetAgent is a scout for workMode=research", async () => {
const root = makeProject();
const layer = new SwarmDispatchLayer(root);
const result = await layer.dispatchAndWait({
unitId: "task-daw-3",
unitType: "research",
workMode: "research",
payload: "research topic Y",
priority: 2,
scope: "scope-B",
});
const swarm = await layer.getOrCreateSwarm();
const agent = swarm.get(result.targetAgent);
expect(agent.identity.role).toBe("scout");
});
});
// ─── swarmDispatchAndWait — convenience function ───────────────────────────────
describe("swarmDispatchAndWait — convenience function", () => {
test("delegates to SwarmDispatchLayer and returns structured result", async () => {
const root = makeProject();
// Pre-create the swarm so dispatch can route
await createDefaultSwarm(root);
const result = await swarmDispatchAndWait(root, {
unitId: "task-daw-convenience",
unitType: "task",
workMode: "build",
payload: "convenience test payload",
priority: 1,
scope: "scope-C",
});
expect(result.messageId).toBeTruthy();
expect(result.targetAgent).toBeTruthy();
expect(result.reply).toBe(MOCK_REPLY_TEXT);
expect(result.replyMessageId).toBeTruthy();
});
});
// ─── dispatchAndWait — error / no-reply paths ─────────────────────────────────
describe("SwarmDispatchLayer.dispatchAndWait — error paths", () => {
test("returns error when runAgentTurn returns zero turns (empty inbox)", async () => {
// Override mock for this test to simulate an agent that has no messages
// (can happen if the bus is inconsistent). We simulate this by having
// the mock return turnsProcessed=0 after the first call drains the inbox.
const { runAgentTurn } = await import("../uok/agent-runner.js");
// Make runAgentTurn drain the message without writing a reply
runAgentTurn.mockImplementationOnce(async (agent, _opts) => {
const messages = agent.receive(true);
for (const msg of messages) agent.markRead(msg.id);
// Deliberately do NOT write a reply message
return { turnsProcessed: messages.length, response: null };
});
const root = makeProject();
const layer = new SwarmDispatchLayer(root);
const result = await layer.dispatchAndWait({
unitId: "task-daw-no-reply",
unitType: "task",
workMode: "build",
payload: "no reply scenario",
priority: 1,
scope: "scope-D",
});
// No reply message was written, so we expect error="no reply"
expect(result.reply).toBeNull();
expect(result.replyMessageId).toBeNull();
expect(result.error).toBe("no reply");
});
test("returns error when runAgentTurn throws", async () => {
const { runAgentTurn } = await import("../uok/agent-runner.js");
runAgentTurn.mockImplementationOnce(async () => {
throw new Error("LLM timeout");
});
const root = makeProject();
const layer = new SwarmDispatchLayer(root);
const result = await layer.dispatchAndWait({
unitId: "task-daw-throw",
unitType: "task",
workMode: "build",
payload: "throw scenario",
priority: 1,
scope: "scope-E",
});
expect(result.reply).toBeNull();
expect(result.replyMessageId).toBeNull();
expect(result.error).toContain("LLM timeout");
});
test("returns error struct not throw when runAgentTurn returns error field", async () => {
const { runAgentTurn } = await import("../uok/agent-runner.js");
runAgentTurn.mockImplementationOnce(async () => ({
turnsProcessed: 0,
response: null,
error: "sf headless failed: non-zero exit",
}));
const root = makeProject();
const layer = new SwarmDispatchLayer(root);
const result = await layer.dispatchAndWait({
unitId: "task-daw-error-field",
unitType: "task",
workMode: "build",
payload: "error field scenario",
priority: 1,
scope: "scope-F",
});
expect(result.reply).toBeNull();
expect(result.replyMessageId).toBeNull();
expect(result.error).toContain("sf headless failed");
});
});
// ─── A2A path — falls through cleanly ────────────────────────────────────────
describe("SwarmDispatchLayer.dispatchAndWait — SF_A2A_ENABLED path", () => {
test("returns null reply and replyMessageId when A2A is enabled", async () => {
// We don't actually start an A2A server; we just verify that dispatchAndWait
// detects SF_A2A_ENABLED and short-circuits with null reply fields without
// calling runAgentTurn. Use a minimal mock for _a2aDispatch.
const root = makeProject();
const layer = new SwarmDispatchLayer(root);
// Stub _a2aDispatch directly so we don't need a real A2A process
const fakeA2aResult = {
messageId: "a2a-msg-001",
targetAgent: "worker-1",
swarmName: "default",
envelope: {},
transport: "a2a",
};
layer._a2aDispatch = vi.fn(async () => fakeA2aResult);
const orig = process.env.SF_A2A_ENABLED;
try {
process.env.SF_A2A_ENABLED = "1";
const result = await layer.dispatchAndWait({
unitId: "task-a2a",
unitType: "task",
workMode: "build",
payload: "a2a payload",
priority: 1,
scope: "scope-a2a",
});
expect(result.reply).toBeNull();
expect(result.replyMessageId).toBeNull();
expect(result.messageId).toBe("a2a-msg-001");
// runAgentTurn must NOT have been called
const { runAgentTurn } = await import("../uok/agent-runner.js");
expect(runAgentTurn).not.toHaveBeenCalled();
} finally {
if (orig === undefined) {
delete process.env.SF_A2A_ENABLED;
} else {
process.env.SF_A2A_ENABLED = orig;
}
}
});
});

View file

@ -181,6 +181,29 @@ test("messageBus_getConversation_filters_by_pair", () => {
assert.ok(conv.every((m) => m.body !== "cb1"));
});
test("messageBus_getReplyTo_finds_latest_reply_without_inbox_refresh", () => {
const root = makeProject();
const bus = new MessageBus(root);
const requestId = bus.send("agent:coordinator", "agent:worker-1", "work");
bus.send("agent:worker-1", "agent:coordinator", "older", {
replyTo: requestId,
});
const latestId = bus.send("agent:worker-1", "agent:coordinator", "latest", {
replyTo: requestId,
});
bus.send("agent:worker-2", "agent:coordinator", "wrong-agent", {
replyTo: requestId,
});
const reply = bus.getReplyTo(requestId, "agent:worker-1");
assert.equal(reply.id, latestId);
assert.equal(reply.from, "agent:worker-1");
assert.equal(reply.to, "agent:coordinator");
assert.equal(reply.body, "latest");
assert.equal(reply.metadata.replyTo, requestId);
});
test("messageBus_inbox_enforces_maxSize", () => {
const root = makeProject();
const bus = new MessageBus(root, { maxInboxSize: 3 });

View file

@ -15,6 +15,7 @@ import { join } from "node:path";
import { sfRoot } from "../paths.js";
import {
compactUokMessages,
getDatabase,
getUokConversation,
getUokMessageBusMetrics,
getUokMessageReadIds,
@ -237,4 +238,51 @@ export class MessageBus {
compact() {
return compactUokMessages(this.retentionDays);
}
/**
* Find the most recent message that is a reply to a given messageId from a specific sender.
*
* Purpose: support dispatchAndWait in SwarmDispatchLayer by locating the reply
* posted by an agent after runAgentTurn completes. Queries SQLite directly so
* this works even if the in-memory inbox cache has not yet been refreshed.
*
* Consumer: SwarmDispatchLayer.dispatchAndWait().
*
* @param {string} messageId - the original message ID (replyTo value to match)
* @param {string} fromAgent - the full agent address, e.g. "agent:worker-1"
* @returns {{ id: string, body: string | object, from: string, to: string, metadata: object } | null}
*/
getReplyTo(messageId, fromAgent) {
const db = getDatabase();
if (!db) return null;
try {
const row = db
.prepare(
`SELECT id, from_agent AS "from", to_agent AS "to", body, metadata_json AS metadataJson, sent_at AS sentAt
FROM uok_messages
WHERE from_agent = :from_agent
AND json_extract(metadata_json, '$.replyTo') = :reply_to
ORDER BY sent_at DESC, rowid DESC
LIMIT 1`,
)
.get({ ":from_agent": fromAgent, ":reply_to": messageId });
if (!row) return null;
let metadata = {};
try {
metadata = JSON.parse(row.metadataJson || "{}");
} catch {
// leave empty
}
return {
id: row.id,
from: row.from,
to: row.to,
body: row.body,
metadata,
sentAt: row.sentAt,
};
} catch {
return null;
}
}
}

View file

@ -22,13 +22,24 @@
* the sleeptime memory path. Wire this in when building the autonomous orchestrator.
*/
import { AgentSwarm } from "./agent-swarm.js";
import { MessageBus } from "./message-bus.js";
import { createDefaultSwarm } from "./swarm-roles.js";
import {
formatMemoriesForPrompt,
getActiveMemoriesRanked,
} from "../memory-store.js";
import { AgentSwarm } from "./agent-swarm.js";
import { MessageBus } from "./message-bus.js";
import { createDefaultSwarm } from "./swarm-roles.js";
// Lazily imported to avoid circular deps — agent-runner depends on runSubagent
// which should not be pulled in for pure message-routing code paths.
let _runAgentTurnFn = null;
async function getRunAgentTurn() {
if (!_runAgentTurnFn) {
const runner = await import("./agent-runner.js");
_runAgentTurnFn = runner.runAgentTurn;
}
return _runAgentTurnFn;
}
// Module-level cache keyed by `${basePath}:${swarmName}`
const _cache = new Map();
@ -252,6 +263,104 @@ export class SwarmDispatchLayer {
};
}
/**
* Dispatch an envelope and block until the target agent has produced a reply.
*
* Purpose: provide a request/response API for callers that need the agent's
* output, not just delivery confirmation. Routes the envelope to the target
* agent's inbox (same as dispatch()), then drives runAgentTurn so the agent
* processes the inbox in-process, then reads the agent's reply from the bus.
*
* Behavior under SF_A2A_ENABLED: falls through to the A2A transport and returns
* `{ ...result, reply: null, replyMessageId: null }` the A2A path does not yet
* support synchronous waits.
*
* @param {DispatchEnvelope} envelope
* @param {object} [options={}]
* @param {number} [options.timeoutMs=480000] Hard cap for the agent's turn.
* @param {AbortSignal} [options.signal]
* @returns {Promise<DispatchResult & { reply: string | null; replyMessageId: string | null }>}
*/
async dispatchAndWait(envelope, options = {}) {
const { timeoutMs = 480_000, signal } = options;
// A2A path: no synchronous wait support yet — return nulled reply fields.
if (process.env.SF_A2A_ENABLED) {
const result = await this._a2aDispatch(envelope);
return { ...result, reply: null, replyMessageId: null };
}
// Step 1: Enqueue via existing _busDispatch
const dispatchResult = await this._busDispatch(envelope);
// Step 2: Look up the target agent in the swarm
const swarm = await this.getOrCreateSwarm();
const agent = swarm.get(dispatchResult.targetAgent);
if (!agent) {
return {
...dispatchResult,
reply: null,
replyMessageId: null,
error: `dispatchAndWait: target agent "${dispatchResult.targetAgent}" not found in swarm`,
};
}
// Step 3: Drive one inbox-processing turn via runAgentTurn (in-process)
const runAgentTurn = await getRunAgentTurn();
let turnResult;
try {
turnResult = await runAgentTurn(agent, { timeoutMs, signal });
} catch (err) {
return {
...dispatchResult,
reply: null,
replyMessageId: null,
error: `runAgentTurn threw: ${err?.message ?? String(err)}`,
};
}
if (turnResult?.error) {
return {
...dispatchResult,
reply: null,
replyMessageId: null,
error: turnResult.error,
};
}
// Step 4: Read the agent's reply back from the bus.
// agent-runner writes the reply with metadata.replyTo set to the last processed
// message id; we look for the most recent message from "agent:<targetAgent>"
// whose replyTo matches our dispatched messageId.
const fromAgent = `agent:${dispatchResult.targetAgent}`;
const replyMessage = this._bus.getReplyTo(
dispatchResult.messageId,
fromAgent,
);
if (!replyMessage) {
return {
...dispatchResult,
reply: null,
replyMessageId: null,
error: "no reply",
};
}
// Step 5: Return structured result
const replyBody =
typeof replyMessage.body === "string"
? replyMessage.body
: JSON.stringify(replyMessage.body);
return {
...dispatchResult,
reply: replyBody,
replyMessageId: replyMessage.id,
};
}
/**
* Route multiple DispatchEnvelopes through the swarm in parallel.
*
@ -322,3 +431,25 @@ export class SwarmDispatchLayer {
export async function swarmDispatch(basePath, envelope) {
return SwarmDispatchLayer.getOrCreate(basePath).dispatch(envelope);
}
/**
* Dispatch an envelope and block until the target agent has produced a reply.
*
* Purpose: one-liner entry point for request/response dispatch callers that
* don't need to manage a SwarmDispatchLayer instance directly.
*
* Consumer: orchestrator flows that need the agent's output in-line.
*
* @param {string} basePath
* @param {DispatchEnvelope} envelope
* @param {object} [options]
* @param {number} [options.timeoutMs=480000]
* @param {AbortSignal} [options.signal]
* @returns {Promise<DispatchResult & { reply: string | null; replyMessageId: string | null }>}
*/
export async function swarmDispatchAndWait(basePath, envelope, options) {
return SwarmDispatchLayer.getOrCreate(basePath).dispatchAndWait(
envelope,
options,
);
}