From bbade22388148ec315a6d5d029f90c1031d4c322 Mon Sep 17 00:00:00 2001 From: Mikael Hugo Date: Fri, 15 May 2026 04:12:36 +0200 Subject: [PATCH] =?UTF-8?q?feat(swarm):=20dispatchAndWait=20=E2=80=94=20sy?= =?UTF-8?q?nchronous=20request/response=20for=20swarm=20agents?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add SwarmDispatchLayer.dispatchAndWait(envelope, { timeoutMs, signal }) which enqueues via _busDispatch, drives the target agent's turn via runAgentTurn (in-process runSubagent), and reads back the agent's reply from the bus. Returns DispatchResult extended with reply + replyMessageId. This is the missing piece for collapsing /delegate-style subagent calls into the swarm interface: callers that need a reply (not just delivery) can now use the swarm contract instead of the subagent extension's bespoke dispatch path. Round 4 will migrate those callers. New helper MessageBus.getReplyTo(messageId, fromAgent) queries SQLite directly via json_extract for the most recent reply to a given message. Plus 8 tests covering happy path, error paths (no reply, runner throws, runner returns {error}), the swarmDispatchAndWait convenience function, and the A2A short-circuit path. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/resources/extensions/sf/auto.js | 6 +- .../sf/tests/swarm-dispatch-and-wait.test.mjs | 296 ++++++++++++++++++ .../sf/tests/uok-message-bus.test.mjs | 23 ++ .../extensions/sf/uok/message-bus.js | 48 +++ .../extensions/sf/uok/swarm-dispatch.js | 137 +++++++- 5 files changed, 506 insertions(+), 4 deletions(-) create mode 100644 src/resources/extensions/sf/tests/swarm-dispatch-and-wait.test.mjs diff --git a/src/resources/extensions/sf/auto.js b/src/resources/extensions/sf/auto.js index 13b62ebfa..e23a40463 100644 --- a/src/resources/extensions/sf/auto.js +++ b/src/resources/extensions/sf/auto.js @@ -1516,7 +1516,11 @@ export async function startAuto(ctx, pi, base, verboseMode, options) { const interruptedAssessment = options?.interrupted ?? null; // Default: agent CAN ask the user. Autonomous mode flips this off so the // agent must self-resolve via code/web/lookup. - s.canAskUser = options?.canAskUser !== false; + // Headless mode also disables user questions — there is no UI to answer them. + // Without this, the agent calls ask_user_questions, gets a "UI not available" + // error, and may retry in a loop instead of self-resolving. + s.canAskUser = + process.env.SF_HEADLESS === "1" ? false : options?.canAskUser !== false; if (options?.milestoneLock !== undefined) { s.sessionMilestoneLock = options.milestoneLock ?? null; } diff --git a/src/resources/extensions/sf/tests/swarm-dispatch-and-wait.test.mjs b/src/resources/extensions/sf/tests/swarm-dispatch-and-wait.test.mjs new file mode 100644 index 000000000..d38697618 --- /dev/null +++ b/src/resources/extensions/sf/tests/swarm-dispatch-and-wait.test.mjs @@ -0,0 +1,296 @@ +/** + * swarm-dispatch-and-wait.test.mjs — Tests for SwarmDispatchLayer.dispatchAndWait(). + * + * Purpose: verify that dispatchAndWait enqueues an envelope, drives runAgentTurn, + * reads the agent reply from the bus, and returns the structured result with `reply` + * and `replyMessageId` populated. + * + * Consumer: CI unit-test suite (`npm run test:unit`). + */ + +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; +import { _clearSfRootCache } from "../paths.js"; +import { closeDatabase } from "../sf-db.js"; +import { + SwarmDispatchLayer, + swarmDispatchAndWait, +} from "../uok/swarm-dispatch.js"; +import { createDefaultSwarm } from "../uok/swarm-roles.js"; + +// ─── Mock agent-runner.js ───────────────────────────────────────────────────── +// We stub runAgentTurn so the test never spawns an LLM subprocess. The mock +// writes a deterministic reply to the bus and returns turnsProcessed + replyId, +// exactly as the real implementation does. + +const MOCK_REPLY_TEXT = "mock agent reply: task complete"; + +vi.mock("../uok/agent-runner.js", () => ({ + runAgentTurn: vi.fn(async (agent, _opts) => { + // Read the unread inbox messages (same as the real runner) + const messages = agent.receive(true); + if (messages.length === 0) { + return { turnsProcessed: 0, response: null }; + } + // Mark messages as read + for (const msg of messages) { + agent.markRead(msg.id); + } + // Write a deterministic reply back to the bus — replyTo the last message + const lastMsg = messages[messages.length - 1]; + const replyId = agent._bus.send( + `agent:${agent.identity.name}`, + lastMsg.from, + MOCK_REPLY_TEXT, + { replyTo: lastMsg.id, type: "response" }, + ); + return { + turnsProcessed: messages.length, + response: MOCK_REPLY_TEXT, + replyId, + }; + }), +})); + +// ─── Shared Setup ───────────────────────────────────────────────────────────── + +const tmpRoots = []; + +beforeEach(() => { + vi.clearAllMocks(); +}); + +afterEach(() => { + closeDatabase(); + _clearSfRootCache(); + for (const root of tmpRoots.splice(0)) { + rmSync(root, { recursive: true, force: true }); + } +}); + +function makeProject() { + const root = mkdtempSync(join(tmpdir(), "sf-daw-")); + tmpRoots.push(root); + return root; +} + +// ─── dispatchAndWait — happy path ───────────────────────────────────────────── + +describe("SwarmDispatchLayer.dispatchAndWait — happy path", () => { + test("returns messageId, targetAgent, reply, and replyMessageId", async () => { + const root = makeProject(); + const layer = new SwarmDispatchLayer(root); + + const result = await layer.dispatchAndWait({ + unitId: "task-daw-1", + unitType: "task", + workMode: "build", + payload: "implement feature X", + priority: 5, + scope: "test-scope", + }); + + expect(result.messageId).toBeTruthy(); + expect(result.targetAgent).toBeTruthy(); + expect(result.reply).toBe(MOCK_REPLY_TEXT); + expect(result.replyMessageId).toBeTruthy(); + expect(typeof result.replyMessageId).toBe("string"); + expect(result.error).toBeUndefined(); + }); + + test("targetAgent is a worker for workMode=build", async () => { + const root = makeProject(); + const layer = new SwarmDispatchLayer(root); + + const result = await layer.dispatchAndWait({ + unitId: "task-daw-2", + unitType: "task", + workMode: "build", + payload: "build something", + priority: 3, + scope: "scope-A", + }); + + const swarm = await layer.getOrCreateSwarm(); + const agent = swarm.get(result.targetAgent); + expect(agent.identity.role).toBe("worker"); + }); + + test("targetAgent is a scout for workMode=research", async () => { + const root = makeProject(); + const layer = new SwarmDispatchLayer(root); + + const result = await layer.dispatchAndWait({ + unitId: "task-daw-3", + unitType: "research", + workMode: "research", + payload: "research topic Y", + priority: 2, + scope: "scope-B", + }); + + const swarm = await layer.getOrCreateSwarm(); + const agent = swarm.get(result.targetAgent); + expect(agent.identity.role).toBe("scout"); + }); +}); + +// ─── swarmDispatchAndWait — convenience function ─────────────────────────────── + +describe("swarmDispatchAndWait — convenience function", () => { + test("delegates to SwarmDispatchLayer and returns structured result", async () => { + const root = makeProject(); + // Pre-create the swarm so dispatch can route + await createDefaultSwarm(root); + + const result = await swarmDispatchAndWait(root, { + unitId: "task-daw-convenience", + unitType: "task", + workMode: "build", + payload: "convenience test payload", + priority: 1, + scope: "scope-C", + }); + + expect(result.messageId).toBeTruthy(); + expect(result.targetAgent).toBeTruthy(); + expect(result.reply).toBe(MOCK_REPLY_TEXT); + expect(result.replyMessageId).toBeTruthy(); + }); +}); + +// ─── dispatchAndWait — error / no-reply paths ───────────────────────────────── + +describe("SwarmDispatchLayer.dispatchAndWait — error paths", () => { + test("returns error when runAgentTurn returns zero turns (empty inbox)", async () => { + // Override mock for this test to simulate an agent that has no messages + // (can happen if the bus is inconsistent). We simulate this by having + // the mock return turnsProcessed=0 after the first call drains the inbox. + const { runAgentTurn } = await import("../uok/agent-runner.js"); + + // Make runAgentTurn drain the message without writing a reply + runAgentTurn.mockImplementationOnce(async (agent, _opts) => { + const messages = agent.receive(true); + for (const msg of messages) agent.markRead(msg.id); + // Deliberately do NOT write a reply message + return { turnsProcessed: messages.length, response: null }; + }); + + const root = makeProject(); + const layer = new SwarmDispatchLayer(root); + + const result = await layer.dispatchAndWait({ + unitId: "task-daw-no-reply", + unitType: "task", + workMode: "build", + payload: "no reply scenario", + priority: 1, + scope: "scope-D", + }); + + // No reply message was written, so we expect error="no reply" + expect(result.reply).toBeNull(); + expect(result.replyMessageId).toBeNull(); + expect(result.error).toBe("no reply"); + }); + + test("returns error when runAgentTurn throws", async () => { + const { runAgentTurn } = await import("../uok/agent-runner.js"); + + runAgentTurn.mockImplementationOnce(async () => { + throw new Error("LLM timeout"); + }); + + const root = makeProject(); + const layer = new SwarmDispatchLayer(root); + + const result = await layer.dispatchAndWait({ + unitId: "task-daw-throw", + unitType: "task", + workMode: "build", + payload: "throw scenario", + priority: 1, + scope: "scope-E", + }); + + expect(result.reply).toBeNull(); + expect(result.replyMessageId).toBeNull(); + expect(result.error).toContain("LLM timeout"); + }); + + test("returns error struct not throw when runAgentTurn returns error field", async () => { + const { runAgentTurn } = await import("../uok/agent-runner.js"); + + runAgentTurn.mockImplementationOnce(async () => ({ + turnsProcessed: 0, + response: null, + error: "sf headless failed: non-zero exit", + })); + + const root = makeProject(); + const layer = new SwarmDispatchLayer(root); + + const result = await layer.dispatchAndWait({ + unitId: "task-daw-error-field", + unitType: "task", + workMode: "build", + payload: "error field scenario", + priority: 1, + scope: "scope-F", + }); + + expect(result.reply).toBeNull(); + expect(result.replyMessageId).toBeNull(); + expect(result.error).toContain("sf headless failed"); + }); +}); + +// ─── A2A path — falls through cleanly ──────────────────────────────────────── + +describe("SwarmDispatchLayer.dispatchAndWait — SF_A2A_ENABLED path", () => { + test("returns null reply and replyMessageId when A2A is enabled", async () => { + // We don't actually start an A2A server; we just verify that dispatchAndWait + // detects SF_A2A_ENABLED and short-circuits with null reply fields without + // calling runAgentTurn. Use a minimal mock for _a2aDispatch. + const root = makeProject(); + const layer = new SwarmDispatchLayer(root); + + // Stub _a2aDispatch directly so we don't need a real A2A process + const fakeA2aResult = { + messageId: "a2a-msg-001", + targetAgent: "worker-1", + swarmName: "default", + envelope: {}, + transport: "a2a", + }; + layer._a2aDispatch = vi.fn(async () => fakeA2aResult); + + const orig = process.env.SF_A2A_ENABLED; + try { + process.env.SF_A2A_ENABLED = "1"; + const result = await layer.dispatchAndWait({ + unitId: "task-a2a", + unitType: "task", + workMode: "build", + payload: "a2a payload", + priority: 1, + scope: "scope-a2a", + }); + + expect(result.reply).toBeNull(); + expect(result.replyMessageId).toBeNull(); + expect(result.messageId).toBe("a2a-msg-001"); + // runAgentTurn must NOT have been called + const { runAgentTurn } = await import("../uok/agent-runner.js"); + expect(runAgentTurn).not.toHaveBeenCalled(); + } finally { + if (orig === undefined) { + delete process.env.SF_A2A_ENABLED; + } else { + process.env.SF_A2A_ENABLED = orig; + } + } + }); +}); diff --git a/src/resources/extensions/sf/tests/uok-message-bus.test.mjs b/src/resources/extensions/sf/tests/uok-message-bus.test.mjs index 9fb2ea3b3..eed0d8fe6 100644 --- a/src/resources/extensions/sf/tests/uok-message-bus.test.mjs +++ b/src/resources/extensions/sf/tests/uok-message-bus.test.mjs @@ -181,6 +181,29 @@ test("messageBus_getConversation_filters_by_pair", () => { assert.ok(conv.every((m) => m.body !== "cb1")); }); +test("messageBus_getReplyTo_finds_latest_reply_without_inbox_refresh", () => { + const root = makeProject(); + const bus = new MessageBus(root); + const requestId = bus.send("agent:coordinator", "agent:worker-1", "work"); + bus.send("agent:worker-1", "agent:coordinator", "older", { + replyTo: requestId, + }); + const latestId = bus.send("agent:worker-1", "agent:coordinator", "latest", { + replyTo: requestId, + }); + bus.send("agent:worker-2", "agent:coordinator", "wrong-agent", { + replyTo: requestId, + }); + + const reply = bus.getReplyTo(requestId, "agent:worker-1"); + + assert.equal(reply.id, latestId); + assert.equal(reply.from, "agent:worker-1"); + assert.equal(reply.to, "agent:coordinator"); + assert.equal(reply.body, "latest"); + assert.equal(reply.metadata.replyTo, requestId); +}); + test("messageBus_inbox_enforces_maxSize", () => { const root = makeProject(); const bus = new MessageBus(root, { maxInboxSize: 3 }); diff --git a/src/resources/extensions/sf/uok/message-bus.js b/src/resources/extensions/sf/uok/message-bus.js index 44301763d..91b760605 100644 --- a/src/resources/extensions/sf/uok/message-bus.js +++ b/src/resources/extensions/sf/uok/message-bus.js @@ -15,6 +15,7 @@ import { join } from "node:path"; import { sfRoot } from "../paths.js"; import { compactUokMessages, + getDatabase, getUokConversation, getUokMessageBusMetrics, getUokMessageReadIds, @@ -237,4 +238,51 @@ export class MessageBus { compact() { return compactUokMessages(this.retentionDays); } + + /** + * Find the most recent message that is a reply to a given messageId from a specific sender. + * + * Purpose: support dispatchAndWait in SwarmDispatchLayer by locating the reply + * posted by an agent after runAgentTurn completes. Queries SQLite directly so + * this works even if the in-memory inbox cache has not yet been refreshed. + * + * Consumer: SwarmDispatchLayer.dispatchAndWait(). + * + * @param {string} messageId - the original message ID (replyTo value to match) + * @param {string} fromAgent - the full agent address, e.g. "agent:worker-1" + * @returns {{ id: string, body: string | object, from: string, to: string, metadata: object } | null} + */ + getReplyTo(messageId, fromAgent) { + const db = getDatabase(); + if (!db) return null; + try { + const row = db + .prepare( + `SELECT id, from_agent AS "from", to_agent AS "to", body, metadata_json AS metadataJson, sent_at AS sentAt + FROM uok_messages + WHERE from_agent = :from_agent + AND json_extract(metadata_json, '$.replyTo') = :reply_to + ORDER BY sent_at DESC, rowid DESC + LIMIT 1`, + ) + .get({ ":from_agent": fromAgent, ":reply_to": messageId }); + if (!row) return null; + let metadata = {}; + try { + metadata = JSON.parse(row.metadataJson || "{}"); + } catch { + // leave empty + } + return { + id: row.id, + from: row.from, + to: row.to, + body: row.body, + metadata, + sentAt: row.sentAt, + }; + } catch { + return null; + } + } } diff --git a/src/resources/extensions/sf/uok/swarm-dispatch.js b/src/resources/extensions/sf/uok/swarm-dispatch.js index 35a45fc88..dc4ed8554 100644 --- a/src/resources/extensions/sf/uok/swarm-dispatch.js +++ b/src/resources/extensions/sf/uok/swarm-dispatch.js @@ -22,13 +22,24 @@ * the sleeptime memory path. Wire this in when building the autonomous orchestrator. */ -import { AgentSwarm } from "./agent-swarm.js"; -import { MessageBus } from "./message-bus.js"; -import { createDefaultSwarm } from "./swarm-roles.js"; import { formatMemoriesForPrompt, getActiveMemoriesRanked, } from "../memory-store.js"; +import { AgentSwarm } from "./agent-swarm.js"; +import { MessageBus } from "./message-bus.js"; +import { createDefaultSwarm } from "./swarm-roles.js"; + +// Lazily imported to avoid circular deps — agent-runner depends on runSubagent +// which should not be pulled in for pure message-routing code paths. +let _runAgentTurnFn = null; +async function getRunAgentTurn() { + if (!_runAgentTurnFn) { + const runner = await import("./agent-runner.js"); + _runAgentTurnFn = runner.runAgentTurn; + } + return _runAgentTurnFn; +} // Module-level cache keyed by `${basePath}:${swarmName}` const _cache = new Map(); @@ -252,6 +263,104 @@ export class SwarmDispatchLayer { }; } + /** + * Dispatch an envelope and block until the target agent has produced a reply. + * + * Purpose: provide a request/response API for callers that need the agent's + * output, not just delivery confirmation. Routes the envelope to the target + * agent's inbox (same as dispatch()), then drives runAgentTurn so the agent + * processes the inbox in-process, then reads the agent's reply from the bus. + * + * Behavior under SF_A2A_ENABLED: falls through to the A2A transport and returns + * `{ ...result, reply: null, replyMessageId: null }` — the A2A path does not yet + * support synchronous waits. + * + * @param {DispatchEnvelope} envelope + * @param {object} [options={}] + * @param {number} [options.timeoutMs=480000] Hard cap for the agent's turn. + * @param {AbortSignal} [options.signal] + * @returns {Promise} + */ + async dispatchAndWait(envelope, options = {}) { + const { timeoutMs = 480_000, signal } = options; + + // A2A path: no synchronous wait support yet — return nulled reply fields. + if (process.env.SF_A2A_ENABLED) { + const result = await this._a2aDispatch(envelope); + return { ...result, reply: null, replyMessageId: null }; + } + + // Step 1: Enqueue via existing _busDispatch + const dispatchResult = await this._busDispatch(envelope); + + // Step 2: Look up the target agent in the swarm + const swarm = await this.getOrCreateSwarm(); + const agent = swarm.get(dispatchResult.targetAgent); + + if (!agent) { + return { + ...dispatchResult, + reply: null, + replyMessageId: null, + error: `dispatchAndWait: target agent "${dispatchResult.targetAgent}" not found in swarm`, + }; + } + + // Step 3: Drive one inbox-processing turn via runAgentTurn (in-process) + const runAgentTurn = await getRunAgentTurn(); + let turnResult; + try { + turnResult = await runAgentTurn(agent, { timeoutMs, signal }); + } catch (err) { + return { + ...dispatchResult, + reply: null, + replyMessageId: null, + error: `runAgentTurn threw: ${err?.message ?? String(err)}`, + }; + } + + if (turnResult?.error) { + return { + ...dispatchResult, + reply: null, + replyMessageId: null, + error: turnResult.error, + }; + } + + // Step 4: Read the agent's reply back from the bus. + // agent-runner writes the reply with metadata.replyTo set to the last processed + // message id; we look for the most recent message from "agent:" + // whose replyTo matches our dispatched messageId. + const fromAgent = `agent:${dispatchResult.targetAgent}`; + const replyMessage = this._bus.getReplyTo( + dispatchResult.messageId, + fromAgent, + ); + + if (!replyMessage) { + return { + ...dispatchResult, + reply: null, + replyMessageId: null, + error: "no reply", + }; + } + + // Step 5: Return structured result + const replyBody = + typeof replyMessage.body === "string" + ? replyMessage.body + : JSON.stringify(replyMessage.body); + + return { + ...dispatchResult, + reply: replyBody, + replyMessageId: replyMessage.id, + }; + } + /** * Route multiple DispatchEnvelopes through the swarm in parallel. * @@ -322,3 +431,25 @@ export class SwarmDispatchLayer { export async function swarmDispatch(basePath, envelope) { return SwarmDispatchLayer.getOrCreate(basePath).dispatch(envelope); } + +/** + * Dispatch an envelope and block until the target agent has produced a reply. + * + * Purpose: one-liner entry point for request/response dispatch callers that + * don't need to manage a SwarmDispatchLayer instance directly. + * + * Consumer: orchestrator flows that need the agent's output in-line. + * + * @param {string} basePath + * @param {DispatchEnvelope} envelope + * @param {object} [options] + * @param {number} [options.timeoutMs=480000] + * @param {AbortSignal} [options.signal] + * @returns {Promise} + */ +export async function swarmDispatchAndWait(basePath, envelope, options) { + return SwarmDispatchLayer.getOrCreate(basePath).dispatchAndWait( + envelope, + options, + ); +}