diff --git a/package-lock.json b/package-lock.json index 459f73f5d..f0d17a4d6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,6 +13,7 @@ "packages/*" ], "dependencies": { + "@a2a-js/sdk": "^0.3.13", "@anthropic-ai/sdk": "^0.95.1", "@anthropic-ai/vertex-sdk": "^0.14.4", "@aws-sdk/client-bedrock-runtime": "^3.983.0", @@ -104,9 +105,9 @@ } }, "node_modules/@a2a-js/sdk": { - "version": "0.3.11", - "resolved": "https://registry.npmjs.org/@a2a-js/sdk/-/sdk-0.3.11.tgz", - "integrity": "sha512-pXjjlL0ZYHgAxObov1J+W3ylfQV0rOrDBB8Eo4a9eCunqs7iNW5OIfMcV8YnZQdzeVSRomj8jHeudVz0zc4RNw==", + "version": "0.3.13", + "resolved": "https://registry.npmjs.org/@a2a-js/sdk/-/sdk-0.3.13.tgz", + "integrity": "sha512-BZr0f9JVNQs3GKOM9xINWCh6OKIJWZFPyqqVqTym5mxO2Eemc6I/0zL7zWnljHzGdaf5aZQyQN5xa6PSH62q+A==", "license": "Apache-2.0", "dependencies": { "uuid": "^11.1.0" @@ -2762,6 +2763,47 @@ "node-pty": "^1.0.0" } }, + "node_modules/@google/gemini-cli-core/node_modules/@a2a-js/sdk": { + "version": "0.3.11", + "resolved": "https://registry.npmjs.org/@a2a-js/sdk/-/sdk-0.3.11.tgz", + "integrity": "sha512-pXjjlL0ZYHgAxObov1J+W3ylfQV0rOrDBB8Eo4a9eCunqs7iNW5OIfMcV8YnZQdzeVSRomj8jHeudVz0zc4RNw==", + "license": "Apache-2.0", + "dependencies": { + "uuid": "^11.1.0" + }, + "engines": { + "node": ">=18" + }, + "peerDependencies": { + "@bufbuild/protobuf": "^2.10.2", + "@grpc/grpc-js": "^1.11.0", + "express": "^4.21.2 || ^5.1.0" + }, + "peerDependenciesMeta": { + "@bufbuild/protobuf": { + "optional": true + }, + "@grpc/grpc-js": { + "optional": true + }, + "express": { + "optional": true + } + } + }, + "node_modules/@google/gemini-cli-core/node_modules/@a2a-js/sdk/node_modules/uuid": { + "version": "11.1.1", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-11.1.1.tgz", + "integrity": "sha512-vIYxrBCC/N/K+Js3qSN88go7kIfNPssr/hHCesKCQNAjmgvYS2oqr69kIufEG+O4+PfezOH4EbIeHCfFov8ZgQ==", + "funding": [ + "https://github.com/sponsors/broofa", + "https://github.com/sponsors/ctavan" + ], + "license": "MIT", + "bin": { + "uuid": "dist/esm/bin/uuid" + } + }, "node_modules/@google/gemini-cli-core/node_modules/@google/genai": { "version": "1.30.0", "resolved": "https://registry.npmjs.org/@google/genai/-/genai-1.30.0.tgz", diff --git a/package.json b/package.json index 46618c3c2..fa9641cb1 100644 --- a/package.json +++ b/package.json @@ -114,6 +114,7 @@ "check:circular:ext": "node scripts/check-circular-deps.mjs --ext" }, "dependencies": { + "@a2a-js/sdk": "^0.3.13", "@anthropic-ai/sdk": "^0.95.1", "@anthropic-ai/vertex-sdk": "^0.14.4", "@aws-sdk/client-bedrock-runtime": "^3.983.0", diff --git a/src/resources/extensions/sf/uok/a2a-agent-server.js b/src/resources/extensions/sf/uok/a2a-agent-server.js new file mode 100644 index 000000000..746e6845a --- /dev/null +++ b/src/resources/extensions/sf/uok/a2a-agent-server.js @@ -0,0 +1,227 @@ +/** + * a2a-agent-server.js — A2A HTTP server entrypoint for spawned swarm agent processes. + * + * Purpose: when launched as a subprocess with the SF_A2A_AGENT_* environment + * variables set, start an A2A JSON-RPC HTTP server for the named swarm agent + * role, handle incoming DispatchEnvelope tasks by routing them through the + * UOK coordination store, and signal readiness by printing a JSON line to + * stdout so the parent A2ATransport process can record the endpoint URL. + * + * Launch environment: + * SF_A2A_AGENT_NAME — stable agent name (e.g. "worker-1") + * SF_A2A_AGENT_ROLE — agent role (coordinator|worker|scout|reviewer|…) + * SF_A2A_PORT — HTTP port to listen on + * SF_A2A_BASE_PATH — project root for SQLite state + * + * Consumer: A2ATransport.spawnAgent() in a2a-transport.js. + */ + +import { randomUUID } from "node:crypto"; +import { createServer } from "node:http"; +import express from "express"; +import { + DefaultRequestHandler, + InMemoryTaskStore, +} from "@a2a-js/sdk/server"; +import { + agentCardHandler, + jsonRpcHandler, + UserBuilder, +} from "@a2a-js/sdk/server/express"; +import { AGENT_CARD_PATH } from "@a2a-js/sdk"; +import { buildAgentCard } from "./a2a-transport.js"; + +const agentName = process.env.SF_A2A_AGENT_NAME; +const agentRole = process.env.SF_A2A_AGENT_ROLE ?? "worker"; +const port = Number(process.env.SF_A2A_PORT ?? 34501); +const basePath = process.env.SF_A2A_BASE_PATH ?? process.cwd(); + +if (!agentName) { + process.stderr.write( + "a2a-agent-server: SF_A2A_AGENT_NAME is required\n", + ); + process.exit(1); +} + +/** + * Minimal AgentExecutor that handles DispatchEnvelope messages. + * + * Purpose: receive incoming A2A task messages containing a DispatchEnvelope + * payload, process them via the UOK coordination store, and publish a + * structured result back to the caller so the parent SwarmDispatchLayer can + * record the outcome in SQLite. + * + * Consumer: DefaultRequestHandler for each inbound A2A task. + */ +class SwarmAgentExecutor { + /** + * @param {string} name - agent name + * @param {string} role - agent role + * @param {string} basePath - project root + */ + constructor(name, role, basePath) { + this._name = name; + this._role = role; + this._basePath = basePath; + } + + /** + * Execute an incoming A2A task. + * + * Purpose: extract the DispatchEnvelope from the message parts, record it + * in the agent's coordination store, and publish a result message so the + * caller receives structured acknowledgement. + * + * Consumer: DefaultRequestHandler on every inbound sendMessage call. + * + * @param {import('@a2a-js/sdk/server').RequestContext} requestContext + * @param {import('@a2a-js/sdk/server').ExecutionEventBus} eventBus + */ + async execute(requestContext, eventBus) { + const incomingMessage = requestContext.message; + let envelope = null; + + // Extract envelope from message parts. + for (const part of incomingMessage?.parts ?? []) { + if (part.kind === "data" && part.data) { + envelope = part.data; + break; + } + if (part.kind === "text") { + try { + envelope = JSON.parse(part.text); + } catch { + // not JSON text + } + if (envelope) break; + } + } + + let resultBody; + if (!envelope) { + resultBody = { + error: "No DispatchEnvelope found in message parts", + agentName: this._name, + role: this._role, + }; + } else { + try { + resultBody = await this._handleEnvelope(envelope); + } catch (err) { + resultBody = { + error: err instanceof Error ? err.message : String(err), + agentName: this._name, + role: this._role, + unitId: envelope?.unitId, + }; + } + } + + const responseMessage = { + kind: "message", + messageId: randomUUID(), + role: "agent", + parts: [ + { + kind: "data", + data: resultBody, + metadata: { contentType: "application/json" }, + }, + ], + contextId: requestContext.contextId, + }; + + eventBus.publish(responseMessage); + eventBus.finished(); + } + + /** + * Record the envelope in the agent's coordination store and return an ack. + * + * Purpose: persist the dispatched unit so the SQLite coordination layer + * retains durability even when transport is A2A HTTP. The agent's inbox is + * updated so the standard UOK polling loops can pick up the task. + * + * Consumer: execute() after extracting the envelope from message parts. + * + * @param {object} envelope - DispatchEnvelope from the swarm dispatch layer + * @returns {Promise} acknowledgement payload + */ + async _handleEnvelope(envelope) { + // Lazy-import to avoid loading SQLite unless this server is actually running. + const { MessageBus } = await import("./message-bus.js"); + + const bus = new MessageBus(this._basePath); + const from = `a2a:dispatch:${envelope.scope ?? "unknown"}:${envelope.unitId}`; + const to = `agent:${this._name}`; + const metadata = { + unitId: envelope.unitId, + unitType: envelope.unitType, + workMode: envelope.workMode, + transport: "a2a", + }; + + const messageId = bus.send(from, to, envelope.payload ?? envelope, metadata); + + return { + status: "accepted", + messageId, + agentName: this._name, + role: this._role, + unitId: envelope.unitId, + unitType: envelope.unitType, + workMode: envelope.workMode, + }; + } + + async cancelTask(_taskId, _requestContext) { + // Stateless for now; cancellation is handled via SQLite coordination store. + } +} + +async function main() { + const agentCard = buildAgentCard(agentName, agentRole, port); + + const executor = new SwarmAgentExecutor(agentName, agentRole, basePath); + const requestHandler = new DefaultRequestHandler( + agentCard, + new InMemoryTaskStore(), + executor, + ); + + const app = express(); + + app.use( + `/${AGENT_CARD_PATH}`, + agentCardHandler({ agentCardProvider: requestHandler }), + ); + app.use( + "/a2a/jsonrpc", + jsonRpcHandler({ + requestHandler, + userBuilder: UserBuilder.noAuthentication, + }), + ); + + // Health check endpoint. + app.get("/health", (_req, res) => { + res.json({ ok: true, agentName, role: agentRole, port }); + }); + + await new Promise((resolve, reject) => { + const server = app.listen(port, "127.0.0.1", () => resolve(server)); + server.once("error", reject); + }); + + // Signal readiness to the parent process. + process.stdout.write(JSON.stringify({ ready: true, port, agentName, role: agentRole }) + "\n"); + + process.on("SIGTERM", () => { + process.exit(0); + }); +} + +main().catch((err) => { + process.stderr.write(`a2a-agent-server: fatal: ${err.message}\n`); + process.exit(1); +}); diff --git a/src/resources/extensions/sf/uok/a2a-transport.js b/src/resources/extensions/sf/uok/a2a-transport.js new file mode 100644 index 000000000..8729664e8 --- /dev/null +++ b/src/resources/extensions/sf/uok/a2a-transport.js @@ -0,0 +1,294 @@ +/** + * a2a-transport.js — A2A protocol transport for swarm dispatch. + * + * Purpose: when SF_A2A_ENABLED=1, SwarmDispatchLayer uses this module instead + * of the in-process SQLite MessageBus to dispatch DispatchEnvelopes to separate + * agent processes over the Agent2Agent HTTP protocol. SQLite coordination state + * is retained for durability; A2A is strictly the wire transport. + * + * Consumer: SwarmDispatchLayer._a2aDispatch() when SF_A2A_ENABLED is set. + */ + +import { randomUUID } from "node:crypto"; +import { spawn } from "node:child_process"; +import { join, dirname, resolve } from "node:path"; + +const A2A_AGENT_SERVER_PATH = new URL("./a2a-agent-server.js", import.meta.url) + .pathname; + +const AGENT_READY_TIMEOUT_MS = 15_000; +const BASE_PORT = 34500; +let _portCounter = BASE_PORT; + +function nextPort() { + return ++_portCounter; +} + +/** + * Build an A2A AgentCard descriptor for a named swarm role agent. + * + * Purpose: produce the minimal AgentCard structure required by the A2A SDK so + * the server endpoint can advertise its identity and capabilities to clients + * without requiring hand-authored config files. + * + * Consumer: A2ATransport.spawnAgent and a2a-agent-server.js at startup. + * + * @param {string} name - stable agent name (e.g. 'worker-1') + * @param {string} role - agent role (coordinator|worker|scout|reviewer|planner|verifier|scribe|adversary) + * @param {number} port - HTTP port the agent server will listen on + * @returns {object} A2A AgentCard + */ +export function buildAgentCard(name, role, port) { + const baseUrl = `http://localhost:${port}`; + const roleSkillMap = { + coordinator: { + description: "Orchestrates tasks across worker pool", + tags: ["coordinate", "route"], + }, + worker: { + description: "Executes build and repair tasks", + tags: ["build", "repair"], + }, + scout: { + description: "Discovers and surfaces information", + tags: ["research", "discover"], + }, + reviewer: { + description: "Critiques worker output against intent", + tags: ["review", "critique"], + }, + planner: { + description: "Generates milestone and task contracts", + tags: ["plan", "design"], + }, + verifier: { + description: "Runs gate checks and evidence validation", + tags: ["verify", "validate"], + }, + scribe: { + description: "Writes and exports documentation", + tags: ["document", "export"], + }, + adversary: { + description: "Red-teams plans and decisions", + tags: ["challenge", "critique"], + }, + }; + + const skill = roleSkillMap[role] ?? { + description: `SF agent: ${role}`, + tags: [role], + }; + + return { + name: `SF ${name}`, + description: `Singularity Forge swarm agent — role: ${role}. ${skill.description}.`, + protocolVersion: "0.3.0", + version: "1.0.0", + url: `${baseUrl}/a2a/jsonrpc`, + skills: [ + { + id: role, + name: role, + description: skill.description, + tags: skill.tags, + }, + ], + capabilities: { + pushNotifications: false, + }, + defaultInputModes: ["application/json"], + defaultOutputModes: ["application/json"], + }; +} + +/** + * A2ATransport — spawns agent subprocesses and dispatches envelopes via HTTP A2A protocol. + * + * Purpose: provide real cross-process dispatch when SF_A2A_ENABLED=1 so each + * swarm agent runs in an isolated Node.js process with its own context window. + * Process lifecycle is managed here; callers interact only through dispatch(). + * + * Consumer: SwarmDispatchLayer._a2aDispatch() in swarm-dispatch.js. + */ +export class A2ATransport { + constructor() { + /** @type {Map} */ + this._registry = new Map(); + } + + /** + * Spawn a new agent subprocess running an A2A HTTP server. + * + * Purpose: launch an isolated pi/sf agent process that serves A2A JSON-RPC + * requests on a dynamically allocated port, wait for it to signal readiness, + * and register it in the local process registry. + * + * Consumer: getOrSpawnAgent() when no cached entry exists for the agent name. + * + * @param {string} agentName - stable routing name (e.g. 'worker-1') + * @param {string} role - agent role + * @param {string} basePath - project root for SQLite state + * @returns {Promise} the agent's A2A JSON-RPC endpoint URL + */ + async spawnAgent(agentName, role, basePath) { + const port = nextPort(); + const env = { + ...process.env, + SF_A2A_AGENT_NAME: agentName, + SF_A2A_AGENT_ROLE: role, + SF_A2A_PORT: String(port), + SF_A2A_BASE_PATH: basePath, + }; + + const child = spawn(process.execPath, [A2A_AGENT_SERVER_PATH], { + env, + cwd: basePath, + stdio: ["ignore", "pipe", "pipe"], + }); + + const url = await new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + child.kill("SIGTERM"); + reject( + new Error( + `A2ATransport: agent ${agentName} did not signal readiness within ${AGENT_READY_TIMEOUT_MS}ms`, + ), + ); + }, AGENT_READY_TIMEOUT_MS); + + let buffer = ""; + child.stdout.on("data", (chunk) => { + buffer += chunk.toString(); + const lines = buffer.split("\n"); + buffer = lines.pop() ?? ""; + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) continue; + try { + const msg = JSON.parse(trimmed); + if (msg.ready && msg.port) { + clearTimeout(timeout); + resolve(`http://localhost:${msg.port}/a2a/jsonrpc`); + } + } catch { + // non-JSON stdout line — ignore + } + } + }); + + child.on("error", (err) => { + clearTimeout(timeout); + reject( + new Error(`A2ATransport: failed to spawn agent ${agentName}: ${err.message}`), + ); + }); + + child.on("exit", (code) => { + clearTimeout(timeout); + this._registry.delete(agentName); + if (code !== 0 && code !== null) { + reject( + new Error( + `A2ATransport: agent ${agentName} exited with code ${code} before signalling readiness`, + ), + ); + } + }); + }); + + this._registry.set(agentName, { url, pid: child.pid, process: child }); + + child.stderr.on("data", (chunk) => { + // Suppress stderr from agent subprocesses unless debugging + if (process.env.SF_A2A_DEBUG) { + process.stderr.write(`[a2a:${agentName}] ${chunk}`); + } + }); + + return url; + } + + /** + * Return the cached agent endpoint URL, spawning the process if needed. + * + * Purpose: ensure each named agent has at most one live process so dispatch + * calls share a persistent agent context rather than forking a new process + * per envelope. + * + * Consumer: dispatch() on every envelope send. + * + * @param {string} agentName + * @param {string} role + * @param {string} basePath + * @returns {Promise} endpoint URL + */ + async getOrSpawnAgent(agentName, role, basePath) { + const entry = this._registry.get(agentName); + if (entry) return entry.url; + return this.spawnAgent(agentName, role, basePath); + } + + /** + * Send a DispatchEnvelope to a live agent endpoint via A2A JSON-RPC. + * + * Purpose: deliver a single envelope over the A2A wire protocol and return + * the agent's JSON response body so the SwarmDispatchLayer can record the + * result without knowing the HTTP transport details. + * + * Consumer: SwarmDispatchLayer._a2aDispatch() after resolving the agent URL. + * + * @param {string} agentUrl - the agent's JSON-RPC endpoint URL + * @param {import('./swarm-dispatch.js').DispatchEnvelope} envelope + * @returns {Promise} task result from the agent + */ + async dispatch(agentUrl, envelope) { + // Dynamically import A2A client to avoid loading it unless A2A mode is active. + const { ClientFactory } = await import("@a2a-js/sdk/client"); + + // Derive base URL by stripping the /a2a/jsonrpc suffix for agent card resolution. + const baseUrl = agentUrl.replace(/\/a2a\/jsonrpc$/, ""); + const factory = new ClientFactory(); + const client = await factory.createFromUrl(baseUrl); + + const messageId = randomUUID(); + const sendParams = { + message: { + messageId, + role: "user", + kind: "message", + parts: [ + { + kind: "data", + data: envelope, + metadata: { + contentType: "application/json", + }, + }, + ], + }, + }; + + const response = await client.sendMessage(sendParams); + return response; + } + + /** + * Terminate all spawned agent processes. + * + * Purpose: clean up child processes on shutdown so they don't linger after + * the parent process exits. + * + * Consumer: process exit handlers and test teardown. + */ + shutdown() { + for (const [name, entry] of this._registry) { + try { + entry.process.kill("SIGTERM"); + } catch { + // already dead + } + } + this._registry.clear(); + } +} diff --git a/src/resources/extensions/sf/uok/swarm-dispatch.js b/src/resources/extensions/sf/uok/swarm-dispatch.js index 5301842b1..3a7e66723 100644 --- a/src/resources/extensions/sf/uok/swarm-dispatch.js +++ b/src/resources/extensions/sf/uok/swarm-dispatch.js @@ -17,6 +17,16 @@ import { createDefaultSwarm } from "./swarm-roles.js"; // Module-level cache keyed by `${basePath}:${swarmName}` const _cache = new Map(); +// Lazy singleton A2ATransport — only loaded when SF_A2A_ENABLED is set. +let _a2aTransport = null; +async function getA2ATransport() { + if (!_a2aTransport) { + const { A2ATransport } = await import("./a2a-transport.js"); + _a2aTransport = new A2ATransport(); + } + return _a2aTransport; +} + /** * @typedef {object} DispatchEnvelope * @property {string} unitId @@ -95,6 +105,8 @@ export class SwarmDispatchLayer { * Purpose: select the right role agent for the envelope's workMode/unitType, * deliver the payload to that agent's durable inbox, and return a structured * result so callers can track message delivery without knowing the swarm topology. + * When SF_A2A_ENABLED is set, delivery uses the A2A HTTP transport instead of + * the in-process SQLite MessageBus. * * Consumer: UOK kernel dispatch path and swarmDispatch() convenience function. * @@ -102,6 +114,24 @@ export class SwarmDispatchLayer { * @returns {Promise} */ async dispatch(envelope) { + if (process.env.SF_A2A_ENABLED) { + return this._a2aDispatch(envelope); + } + return this._busDispatch(envelope); + } + + /** + * Deliver an envelope via the SQLite MessageBus (default in-process path). + * + * Purpose: preserve the existing bus-backed delivery path so all existing + * callers continue working unchanged when SF_A2A_ENABLED is not set. + * + * Consumer: dispatch() when SF_A2A_ENABLED is unset. + * + * @param {DispatchEnvelope} envelope + * @returns {Promise} + */ + async _busDispatch(envelope) { const swarm = await this.getOrCreateSwarm(); const target = swarm.route(envelope); @@ -129,6 +159,65 @@ export class SwarmDispatchLayer { }; } + /** + * Deliver an envelope to a spawned agent process over the A2A HTTP protocol. + * + * Purpose: route the envelope to the correct role agent running in a separate + * process, using A2ATransport to spawn the process on first use and the A2A + * JSON-RPC client for delivery. SQLite state is updated by the remote process. + * + * Consumer: dispatch() when SF_A2A_ENABLED is set. + * + * @param {DispatchEnvelope} envelope + * @returns {Promise} + */ + async _a2aDispatch(envelope) { + const swarm = await this.getOrCreateSwarm(); + const target = swarm.route(envelope); + + if (!target) { + throw new Error( + `SwarmDispatchLayer(a2a): no agent available to handle envelope unitType=${envelope.unitType} workMode=${envelope.workMode}`, + ); + } + + const { name: agentName, role } = target.identity; + const transport = await getA2ATransport(); + const agentUrl = await transport.getOrSpawnAgent( + agentName, + role, + this._basePath, + ); + + const response = await transport.dispatch(agentUrl, envelope); + + // Extract messageId from the A2A response if present. + let messageId = `a2a-${Date.now()}`; + try { + const parts = response?.parts ?? []; + for (const part of parts) { + const data = + part?.data ?? + (part?.kind === "text" ? JSON.parse(part.text ?? "{}") : null); + if (data?.messageId) { + messageId = data.messageId; + break; + } + } + } catch { + // leave messageId as default + } + + return { + messageId, + targetAgent: agentName, + swarmName: this._swarmName, + envelope, + transport: "a2a", + agentUrl, + }; + } + /** * Route multiple DispatchEnvelopes through the swarm in parallel. *