feat(swarm): wire @a2a-js/sdk as real A2A transport for SF_A2A_ENABLED dispatch path

- Install @a2a-js/sdk v0.3.13 as a dependency
- Add a2a-transport.js: A2ATransport class with spawnAgent, dispatch,
  getOrSpawnAgent, and buildAgentCard; spawns pi subprocesses with
  SF_A2A_AGENT_* env vars and dispatches envelopes via A2A JSON-RPC
- Add a2a-agent-server.js: A2A HTTP server entrypoint for spawned agent
  processes; starts express + A2AExpressApp with DefaultRequestHandler,
  handles incoming DispatchEnvelopes via SwarmAgentExecutor, writes
  envelope to SQLite MessageBus, and signals readiness via stdout JSON
- Update swarm-dispatch.js: split dispatch() into _busDispatch()
  (existing SQLite path) and _a2aDispatch() (new A2A path); lazy-load
  A2ATransport singleton only when SF_A2A_ENABLED is set; default
  path unchanged for all existing callers

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Mikael Hugo 2026-05-10 22:33:01 +02:00
parent 3fba4bcb03
commit a798aa1f6e
5 changed files with 656 additions and 3 deletions

48
package-lock.json generated
View file

@ -13,6 +13,7 @@
"packages/*" "packages/*"
], ],
"dependencies": { "dependencies": {
"@a2a-js/sdk": "^0.3.13",
"@anthropic-ai/sdk": "^0.95.1", "@anthropic-ai/sdk": "^0.95.1",
"@anthropic-ai/vertex-sdk": "^0.14.4", "@anthropic-ai/vertex-sdk": "^0.14.4",
"@aws-sdk/client-bedrock-runtime": "^3.983.0", "@aws-sdk/client-bedrock-runtime": "^3.983.0",
@ -104,9 +105,9 @@
} }
}, },
"node_modules/@a2a-js/sdk": { "node_modules/@a2a-js/sdk": {
"version": "0.3.11", "version": "0.3.13",
"resolved": "https://registry.npmjs.org/@a2a-js/sdk/-/sdk-0.3.11.tgz", "resolved": "https://registry.npmjs.org/@a2a-js/sdk/-/sdk-0.3.13.tgz",
"integrity": "sha512-pXjjlL0ZYHgAxObov1J+W3ylfQV0rOrDBB8Eo4a9eCunqs7iNW5OIfMcV8YnZQdzeVSRomj8jHeudVz0zc4RNw==", "integrity": "sha512-BZr0f9JVNQs3GKOM9xINWCh6OKIJWZFPyqqVqTym5mxO2Eemc6I/0zL7zWnljHzGdaf5aZQyQN5xa6PSH62q+A==",
"license": "Apache-2.0", "license": "Apache-2.0",
"dependencies": { "dependencies": {
"uuid": "^11.1.0" "uuid": "^11.1.0"
@ -2762,6 +2763,47 @@
"node-pty": "^1.0.0" "node-pty": "^1.0.0"
} }
}, },
"node_modules/@google/gemini-cli-core/node_modules/@a2a-js/sdk": {
"version": "0.3.11",
"resolved": "https://registry.npmjs.org/@a2a-js/sdk/-/sdk-0.3.11.tgz",
"integrity": "sha512-pXjjlL0ZYHgAxObov1J+W3ylfQV0rOrDBB8Eo4a9eCunqs7iNW5OIfMcV8YnZQdzeVSRomj8jHeudVz0zc4RNw==",
"license": "Apache-2.0",
"dependencies": {
"uuid": "^11.1.0"
},
"engines": {
"node": ">=18"
},
"peerDependencies": {
"@bufbuild/protobuf": "^2.10.2",
"@grpc/grpc-js": "^1.11.0",
"express": "^4.21.2 || ^5.1.0"
},
"peerDependenciesMeta": {
"@bufbuild/protobuf": {
"optional": true
},
"@grpc/grpc-js": {
"optional": true
},
"express": {
"optional": true
}
}
},
"node_modules/@google/gemini-cli-core/node_modules/@a2a-js/sdk/node_modules/uuid": {
"version": "11.1.1",
"resolved": "https://registry.npmjs.org/uuid/-/uuid-11.1.1.tgz",
"integrity": "sha512-vIYxrBCC/N/K+Js3qSN88go7kIfNPssr/hHCesKCQNAjmgvYS2oqr69kIufEG+O4+PfezOH4EbIeHCfFov8ZgQ==",
"funding": [
"https://github.com/sponsors/broofa",
"https://github.com/sponsors/ctavan"
],
"license": "MIT",
"bin": {
"uuid": "dist/esm/bin/uuid"
}
},
"node_modules/@google/gemini-cli-core/node_modules/@google/genai": { "node_modules/@google/gemini-cli-core/node_modules/@google/genai": {
"version": "1.30.0", "version": "1.30.0",
"resolved": "https://registry.npmjs.org/@google/genai/-/genai-1.30.0.tgz", "resolved": "https://registry.npmjs.org/@google/genai/-/genai-1.30.0.tgz",

View file

@ -114,6 +114,7 @@
"check:circular:ext": "node scripts/check-circular-deps.mjs --ext" "check:circular:ext": "node scripts/check-circular-deps.mjs --ext"
}, },
"dependencies": { "dependencies": {
"@a2a-js/sdk": "^0.3.13",
"@anthropic-ai/sdk": "^0.95.1", "@anthropic-ai/sdk": "^0.95.1",
"@anthropic-ai/vertex-sdk": "^0.14.4", "@anthropic-ai/vertex-sdk": "^0.14.4",
"@aws-sdk/client-bedrock-runtime": "^3.983.0", "@aws-sdk/client-bedrock-runtime": "^3.983.0",

View file

@ -0,0 +1,227 @@
/**
* a2a-agent-server.js A2A HTTP server entrypoint for spawned swarm agent processes.
*
* Purpose: when launched as a subprocess with the SF_A2A_AGENT_* environment
* variables set, start an A2A JSON-RPC HTTP server for the named swarm agent
* role, handle incoming DispatchEnvelope tasks by routing them through the
* UOK coordination store, and signal readiness by printing a JSON line to
* stdout so the parent A2ATransport process can record the endpoint URL.
*
* Launch environment:
* SF_A2A_AGENT_NAME stable agent name (e.g. "worker-1")
* SF_A2A_AGENT_ROLE agent role (coordinator|worker|scout|reviewer|)
* SF_A2A_PORT HTTP port to listen on
* SF_A2A_BASE_PATH project root for SQLite state
*
* Consumer: A2ATransport.spawnAgent() in a2a-transport.js.
*/
import { randomUUID } from "node:crypto";
import { createServer } from "node:http";
import express from "express";
import {
DefaultRequestHandler,
InMemoryTaskStore,
} from "@a2a-js/sdk/server";
import {
agentCardHandler,
jsonRpcHandler,
UserBuilder,
} from "@a2a-js/sdk/server/express";
import { AGENT_CARD_PATH } from "@a2a-js/sdk";
import { buildAgentCard } from "./a2a-transport.js";
const agentName = process.env.SF_A2A_AGENT_NAME;
const agentRole = process.env.SF_A2A_AGENT_ROLE ?? "worker";
const port = Number(process.env.SF_A2A_PORT ?? 34501);
const basePath = process.env.SF_A2A_BASE_PATH ?? process.cwd();
if (!agentName) {
process.stderr.write(
"a2a-agent-server: SF_A2A_AGENT_NAME is required\n",
);
process.exit(1);
}
/**
* Minimal AgentExecutor that handles DispatchEnvelope messages.
*
* Purpose: receive incoming A2A task messages containing a DispatchEnvelope
* payload, process them via the UOK coordination store, and publish a
* structured result back to the caller so the parent SwarmDispatchLayer can
* record the outcome in SQLite.
*
* Consumer: DefaultRequestHandler for each inbound A2A task.
*/
class SwarmAgentExecutor {
/**
* @param {string} name - agent name
* @param {string} role - agent role
* @param {string} basePath - project root
*/
constructor(name, role, basePath) {
this._name = name;
this._role = role;
this._basePath = basePath;
}
/**
* Execute an incoming A2A task.
*
* Purpose: extract the DispatchEnvelope from the message parts, record it
* in the agent's coordination store, and publish a result message so the
* caller receives structured acknowledgement.
*
* Consumer: DefaultRequestHandler on every inbound sendMessage call.
*
* @param {import('@a2a-js/sdk/server').RequestContext} requestContext
* @param {import('@a2a-js/sdk/server').ExecutionEventBus} eventBus
*/
async execute(requestContext, eventBus) {
const incomingMessage = requestContext.message;
let envelope = null;
// Extract envelope from message parts.
for (const part of incomingMessage?.parts ?? []) {
if (part.kind === "data" && part.data) {
envelope = part.data;
break;
}
if (part.kind === "text") {
try {
envelope = JSON.parse(part.text);
} catch {
// not JSON text
}
if (envelope) break;
}
}
let resultBody;
if (!envelope) {
resultBody = {
error: "No DispatchEnvelope found in message parts",
agentName: this._name,
role: this._role,
};
} else {
try {
resultBody = await this._handleEnvelope(envelope);
} catch (err) {
resultBody = {
error: err instanceof Error ? err.message : String(err),
agentName: this._name,
role: this._role,
unitId: envelope?.unitId,
};
}
}
const responseMessage = {
kind: "message",
messageId: randomUUID(),
role: "agent",
parts: [
{
kind: "data",
data: resultBody,
metadata: { contentType: "application/json" },
},
],
contextId: requestContext.contextId,
};
eventBus.publish(responseMessage);
eventBus.finished();
}
/**
* Record the envelope in the agent's coordination store and return an ack.
*
* Purpose: persist the dispatched unit so the SQLite coordination layer
* retains durability even when transport is A2A HTTP. The agent's inbox is
* updated so the standard UOK polling loops can pick up the task.
*
* Consumer: execute() after extracting the envelope from message parts.
*
* @param {object} envelope - DispatchEnvelope from the swarm dispatch layer
* @returns {Promise<object>} acknowledgement payload
*/
async _handleEnvelope(envelope) {
// Lazy-import to avoid loading SQLite unless this server is actually running.
const { MessageBus } = await import("./message-bus.js");
const bus = new MessageBus(this._basePath);
const from = `a2a:dispatch:${envelope.scope ?? "unknown"}:${envelope.unitId}`;
const to = `agent:${this._name}`;
const metadata = {
unitId: envelope.unitId,
unitType: envelope.unitType,
workMode: envelope.workMode,
transport: "a2a",
};
const messageId = bus.send(from, to, envelope.payload ?? envelope, metadata);
return {
status: "accepted",
messageId,
agentName: this._name,
role: this._role,
unitId: envelope.unitId,
unitType: envelope.unitType,
workMode: envelope.workMode,
};
}
async cancelTask(_taskId, _requestContext) {
// Stateless for now; cancellation is handled via SQLite coordination store.
}
}
async function main() {
const agentCard = buildAgentCard(agentName, agentRole, port);
const executor = new SwarmAgentExecutor(agentName, agentRole, basePath);
const requestHandler = new DefaultRequestHandler(
agentCard,
new InMemoryTaskStore(),
executor,
);
const app = express();
app.use(
`/${AGENT_CARD_PATH}`,
agentCardHandler({ agentCardProvider: requestHandler }),
);
app.use(
"/a2a/jsonrpc",
jsonRpcHandler({
requestHandler,
userBuilder: UserBuilder.noAuthentication,
}),
);
// Health check endpoint.
app.get("/health", (_req, res) => {
res.json({ ok: true, agentName, role: agentRole, port });
});
await new Promise((resolve, reject) => {
const server = app.listen(port, "127.0.0.1", () => resolve(server));
server.once("error", reject);
});
// Signal readiness to the parent process.
process.stdout.write(JSON.stringify({ ready: true, port, agentName, role: agentRole }) + "\n");
process.on("SIGTERM", () => {
process.exit(0);
});
}
main().catch((err) => {
process.stderr.write(`a2a-agent-server: fatal: ${err.message}\n`);
process.exit(1);
});

View file

@ -0,0 +1,294 @@
/**
* a2a-transport.js A2A protocol transport for swarm dispatch.
*
* Purpose: when SF_A2A_ENABLED=1, SwarmDispatchLayer uses this module instead
* of the in-process SQLite MessageBus to dispatch DispatchEnvelopes to separate
* agent processes over the Agent2Agent HTTP protocol. SQLite coordination state
* is retained for durability; A2A is strictly the wire transport.
*
* Consumer: SwarmDispatchLayer._a2aDispatch() when SF_A2A_ENABLED is set.
*/
import { randomUUID } from "node:crypto";
import { spawn } from "node:child_process";
import { join, dirname, resolve } from "node:path";
const A2A_AGENT_SERVER_PATH = new URL("./a2a-agent-server.js", import.meta.url)
.pathname;
const AGENT_READY_TIMEOUT_MS = 15_000;
const BASE_PORT = 34500;
let _portCounter = BASE_PORT;
function nextPort() {
return ++_portCounter;
}
/**
* Build an A2A AgentCard descriptor for a named swarm role agent.
*
* Purpose: produce the minimal AgentCard structure required by the A2A SDK so
* the server endpoint can advertise its identity and capabilities to clients
* without requiring hand-authored config files.
*
* Consumer: A2ATransport.spawnAgent and a2a-agent-server.js at startup.
*
* @param {string} name - stable agent name (e.g. 'worker-1')
* @param {string} role - agent role (coordinator|worker|scout|reviewer|planner|verifier|scribe|adversary)
* @param {number} port - HTTP port the agent server will listen on
* @returns {object} A2A AgentCard
*/
export function buildAgentCard(name, role, port) {
const baseUrl = `http://localhost:${port}`;
const roleSkillMap = {
coordinator: {
description: "Orchestrates tasks across worker pool",
tags: ["coordinate", "route"],
},
worker: {
description: "Executes build and repair tasks",
tags: ["build", "repair"],
},
scout: {
description: "Discovers and surfaces information",
tags: ["research", "discover"],
},
reviewer: {
description: "Critiques worker output against intent",
tags: ["review", "critique"],
},
planner: {
description: "Generates milestone and task contracts",
tags: ["plan", "design"],
},
verifier: {
description: "Runs gate checks and evidence validation",
tags: ["verify", "validate"],
},
scribe: {
description: "Writes and exports documentation",
tags: ["document", "export"],
},
adversary: {
description: "Red-teams plans and decisions",
tags: ["challenge", "critique"],
},
};
const skill = roleSkillMap[role] ?? {
description: `SF agent: ${role}`,
tags: [role],
};
return {
name: `SF ${name}`,
description: `Singularity Forge swarm agent — role: ${role}. ${skill.description}.`,
protocolVersion: "0.3.0",
version: "1.0.0",
url: `${baseUrl}/a2a/jsonrpc`,
skills: [
{
id: role,
name: role,
description: skill.description,
tags: skill.tags,
},
],
capabilities: {
pushNotifications: false,
},
defaultInputModes: ["application/json"],
defaultOutputModes: ["application/json"],
};
}
/**
* A2ATransport spawns agent subprocesses and dispatches envelopes via HTTP A2A protocol.
*
* Purpose: provide real cross-process dispatch when SF_A2A_ENABLED=1 so each
* swarm agent runs in an isolated Node.js process with its own context window.
* Process lifecycle is managed here; callers interact only through dispatch().
*
* Consumer: SwarmDispatchLayer._a2aDispatch() in swarm-dispatch.js.
*/
export class A2ATransport {
constructor() {
/** @type {Map<string, { url: string, pid: number, process: import('child_process').ChildProcess }>} */
this._registry = new Map();
}
/**
* Spawn a new agent subprocess running an A2A HTTP server.
*
* Purpose: launch an isolated pi/sf agent process that serves A2A JSON-RPC
* requests on a dynamically allocated port, wait for it to signal readiness,
* and register it in the local process registry.
*
* Consumer: getOrSpawnAgent() when no cached entry exists for the agent name.
*
* @param {string} agentName - stable routing name (e.g. 'worker-1')
* @param {string} role - agent role
* @param {string} basePath - project root for SQLite state
* @returns {Promise<string>} the agent's A2A JSON-RPC endpoint URL
*/
async spawnAgent(agentName, role, basePath) {
const port = nextPort();
const env = {
...process.env,
SF_A2A_AGENT_NAME: agentName,
SF_A2A_AGENT_ROLE: role,
SF_A2A_PORT: String(port),
SF_A2A_BASE_PATH: basePath,
};
const child = spawn(process.execPath, [A2A_AGENT_SERVER_PATH], {
env,
cwd: basePath,
stdio: ["ignore", "pipe", "pipe"],
});
const url = await new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
child.kill("SIGTERM");
reject(
new Error(
`A2ATransport: agent ${agentName} did not signal readiness within ${AGENT_READY_TIMEOUT_MS}ms`,
),
);
}, AGENT_READY_TIMEOUT_MS);
let buffer = "";
child.stdout.on("data", (chunk) => {
buffer += chunk.toString();
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;
try {
const msg = JSON.parse(trimmed);
if (msg.ready && msg.port) {
clearTimeout(timeout);
resolve(`http://localhost:${msg.port}/a2a/jsonrpc`);
}
} catch {
// non-JSON stdout line — ignore
}
}
});
child.on("error", (err) => {
clearTimeout(timeout);
reject(
new Error(`A2ATransport: failed to spawn agent ${agentName}: ${err.message}`),
);
});
child.on("exit", (code) => {
clearTimeout(timeout);
this._registry.delete(agentName);
if (code !== 0 && code !== null) {
reject(
new Error(
`A2ATransport: agent ${agentName} exited with code ${code} before signalling readiness`,
),
);
}
});
});
this._registry.set(agentName, { url, pid: child.pid, process: child });
child.stderr.on("data", (chunk) => {
// Suppress stderr from agent subprocesses unless debugging
if (process.env.SF_A2A_DEBUG) {
process.stderr.write(`[a2a:${agentName}] ${chunk}`);
}
});
return url;
}
/**
* Return the cached agent endpoint URL, spawning the process if needed.
*
* Purpose: ensure each named agent has at most one live process so dispatch
* calls share a persistent agent context rather than forking a new process
* per envelope.
*
* Consumer: dispatch() on every envelope send.
*
* @param {string} agentName
* @param {string} role
* @param {string} basePath
* @returns {Promise<string>} endpoint URL
*/
async getOrSpawnAgent(agentName, role, basePath) {
const entry = this._registry.get(agentName);
if (entry) return entry.url;
return this.spawnAgent(agentName, role, basePath);
}
/**
* Send a DispatchEnvelope to a live agent endpoint via A2A JSON-RPC.
*
* Purpose: deliver a single envelope over the A2A wire protocol and return
* the agent's JSON response body so the SwarmDispatchLayer can record the
* result without knowing the HTTP transport details.
*
* Consumer: SwarmDispatchLayer._a2aDispatch() after resolving the agent URL.
*
* @param {string} agentUrl - the agent's JSON-RPC endpoint URL
* @param {import('./swarm-dispatch.js').DispatchEnvelope} envelope
* @returns {Promise<object>} task result from the agent
*/
async dispatch(agentUrl, envelope) {
// Dynamically import A2A client to avoid loading it unless A2A mode is active.
const { ClientFactory } = await import("@a2a-js/sdk/client");
// Derive base URL by stripping the /a2a/jsonrpc suffix for agent card resolution.
const baseUrl = agentUrl.replace(/\/a2a\/jsonrpc$/, "");
const factory = new ClientFactory();
const client = await factory.createFromUrl(baseUrl);
const messageId = randomUUID();
const sendParams = {
message: {
messageId,
role: "user",
kind: "message",
parts: [
{
kind: "data",
data: envelope,
metadata: {
contentType: "application/json",
},
},
],
},
};
const response = await client.sendMessage(sendParams);
return response;
}
/**
* Terminate all spawned agent processes.
*
* Purpose: clean up child processes on shutdown so they don't linger after
* the parent process exits.
*
* Consumer: process exit handlers and test teardown.
*/
shutdown() {
for (const [name, entry] of this._registry) {
try {
entry.process.kill("SIGTERM");
} catch {
// already dead
}
}
this._registry.clear();
}
}

View file

@ -17,6 +17,16 @@ import { createDefaultSwarm } from "./swarm-roles.js";
// Module-level cache keyed by `${basePath}:${swarmName}` // Module-level cache keyed by `${basePath}:${swarmName}`
const _cache = new Map(); const _cache = new Map();
// Lazy singleton A2ATransport — only loaded when SF_A2A_ENABLED is set.
let _a2aTransport = null;
async function getA2ATransport() {
if (!_a2aTransport) {
const { A2ATransport } = await import("./a2a-transport.js");
_a2aTransport = new A2ATransport();
}
return _a2aTransport;
}
/** /**
* @typedef {object} DispatchEnvelope * @typedef {object} DispatchEnvelope
* @property {string} unitId * @property {string} unitId
@ -95,6 +105,8 @@ export class SwarmDispatchLayer {
* Purpose: select the right role agent for the envelope's workMode/unitType, * Purpose: select the right role agent for the envelope's workMode/unitType,
* deliver the payload to that agent's durable inbox, and return a structured * deliver the payload to that agent's durable inbox, and return a structured
* result so callers can track message delivery without knowing the swarm topology. * result so callers can track message delivery without knowing the swarm topology.
* When SF_A2A_ENABLED is set, delivery uses the A2A HTTP transport instead of
* the in-process SQLite MessageBus.
* *
* Consumer: UOK kernel dispatch path and swarmDispatch() convenience function. * Consumer: UOK kernel dispatch path and swarmDispatch() convenience function.
* *
@ -102,6 +114,24 @@ export class SwarmDispatchLayer {
* @returns {Promise<DispatchResult>} * @returns {Promise<DispatchResult>}
*/ */
async dispatch(envelope) { async dispatch(envelope) {
if (process.env.SF_A2A_ENABLED) {
return this._a2aDispatch(envelope);
}
return this._busDispatch(envelope);
}
/**
* Deliver an envelope via the SQLite MessageBus (default in-process path).
*
* Purpose: preserve the existing bus-backed delivery path so all existing
* callers continue working unchanged when SF_A2A_ENABLED is not set.
*
* Consumer: dispatch() when SF_A2A_ENABLED is unset.
*
* @param {DispatchEnvelope} envelope
* @returns {Promise<DispatchResult>}
*/
async _busDispatch(envelope) {
const swarm = await this.getOrCreateSwarm(); const swarm = await this.getOrCreateSwarm();
const target = swarm.route(envelope); const target = swarm.route(envelope);
@ -129,6 +159,65 @@ export class SwarmDispatchLayer {
}; };
} }
/**
* Deliver an envelope to a spawned agent process over the A2A HTTP protocol.
*
* Purpose: route the envelope to the correct role agent running in a separate
* process, using A2ATransport to spawn the process on first use and the A2A
* JSON-RPC client for delivery. SQLite state is updated by the remote process.
*
* Consumer: dispatch() when SF_A2A_ENABLED is set.
*
* @param {DispatchEnvelope} envelope
* @returns {Promise<DispatchResult>}
*/
async _a2aDispatch(envelope) {
const swarm = await this.getOrCreateSwarm();
const target = swarm.route(envelope);
if (!target) {
throw new Error(
`SwarmDispatchLayer(a2a): no agent available to handle envelope unitType=${envelope.unitType} workMode=${envelope.workMode}`,
);
}
const { name: agentName, role } = target.identity;
const transport = await getA2ATransport();
const agentUrl = await transport.getOrSpawnAgent(
agentName,
role,
this._basePath,
);
const response = await transport.dispatch(agentUrl, envelope);
// Extract messageId from the A2A response if present.
let messageId = `a2a-${Date.now()}`;
try {
const parts = response?.parts ?? [];
for (const part of parts) {
const data =
part?.data ??
(part?.kind === "text" ? JSON.parse(part.text ?? "{}") : null);
if (data?.messageId) {
messageId = data.messageId;
break;
}
}
} catch {
// leave messageId as default
}
return {
messageId,
targetAgent: agentName,
swarmName: this._swarmName,
envelope,
transport: "a2a",
agentUrl,
};
}
/** /**
* Route multiple DispatchEnvelopes through the swarm in parallel. * Route multiple DispatchEnvelopes through the swarm in parallel.
* *