singularity-forge/docs/plans/A2A_ADOPTION_PLAN.md

1101 lines
35 KiB
Markdown
Raw Normal View History

# A2A Adoption Plan for Singularity-Forge — Production Grade
**Author:** Research synthesis
**Date:** 2026-05-08
**Status:** Draft — for review
**Scope:** A2A as the internal agent communication protocol for SF dispatch layer
---
## Executive Summary
SF's 5 dispatch mechanisms + MessageBus are functionally complete but architecturally silos. A2A provides a standardized protocol that maps 1:1 onto SF's semantics. The existing MessageBus is preserved as the transport; A2A is the semantic layer on top.
**This is a production-grade plan.** Every section covers: error handling, failure modes, rollback procedures, observability, and testing strategy.
---
## Quick Reference
| Concern | Decision |
|---|---|
| A2A as internal protocol | YES — standardizes Task state, priority, capability discovery |
| MessageBus | Wrap as `A2AMessageService` transport; add `AgentRegistry` |
| Transport | SQLite-backed MessageBus (not HTTP/WebSocket) for local process agents |
| External A2A | Optional; wired later when HTTP exposure is needed |
| Migration | 6 phases; each phase is independently deployable and rollback-safe |
| Feature flag | `SF_A2A_ENABLED` — gates all new A2A behavior; default OFF until Phase 6 |
---
## 1. Architecture Overview
### 1.1 System Diagram
```
┌──────────────────────────────────────────────────────────────────────┐
│ Coordinator (UOK Kernel or subagent tool) │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ DispatchService │ │
│ │ ├── A2AClient (send/receive) │ │
│ │ ├── AgentRegistry (capability lookup) │ │
│ │ └── AgentCard (self-description) │ │
│ └────────────────────────────────────────────────────────────┘ │
└───────────────────────────┬──────────────────────────────────────────┘
│ A2AMessageService (wraps MessageBus)
│ bus.send(), bus.broadcast(), bus.sendOnce()
┌──────────────────────────────────────────────────────────────────────┐
│ MessageBus (SQLite-backed, existing) │
│ ├── Durable at-least-once delivery │
│ ├── TTL-based auto-compaction │
│ ├── AgentInbox per agent (per-queue) │
│ └── sendOnce for idempotent delivery │
└───────────────────────────┬──────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────────┐
│ Worker Agents (git worktrees, one per milestone/slice) │
│ ├── AgentCard (role: worker, isolation: full) │
│ ├── AgentInbox subscription │
│ ├── Project SQLite WAL (read/write) │
│ └── Emits: task_updated, cost, heartbeat │
└──────────────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────────┐
│ Constrained Subagents (no project DB) │
│ ├── AgentCard (role: subagent, isolation: constrained) │
│ ├── Limited tool scope (4 tools) │
│ ├── AgentInbox (optional, opt-in via useMessageBus) │
│ └── Returns structured output via A2A message │
└──────────────────────────────────────────────────────────────────────┘
```
### 1.2 A2A Semantic Mapping
| SF Concept | A2A Concept |
|---|---|
| milestone / slice / task | A2A Task (`id`, `status`, `metadata`) |
| UOK Kernel | A2A Client + Coordinator Agent |
| Worker (parallel orchestrator) | A2A Agent |
| MessageBus.send() | A2A MessageService.send() |
| MessageBus.sendOnce() | A2A idempotent delivery |
| MessageBus.broadcast() | A2A MessageService.broadcast() |
| AgentInbox per worker | A2A per-agent subscription queue |
| File-based status files | A2A AgentStatus (online/busy/idle/offline/error) |
| adversarial-partner/combatant/architect | A2A Agent with specialized capabilities |
| parallel / debate / chain modes | A2A CommunicationPattern |
---
## 2. A2A Type System
### 2.1 Core Types
```typescript
// File: src/resources/extensions/sf/dispatch/a2a-types.ts
import type {
AgentCard,
AgentCapabilities,
Task,
TaskStatus,
Message,
} from "@a2a-js/sdk";
/**
* A2A Task state — maps directly from SF unit runtime status.
* These are the ONLY authoritative task states.
*/
export const A2A_TASK_STATES = [
"submitted",
"working",
"completed",
"failed",
"cancelled",
] as const;
export type A2ATaskState = (typeof A2A_TASK_STATES)[number];
/**
* SF-specific task extensions — runtime states that A2A doesn't model.
* These live in task.metadata.sf_state and are NOT authoritative.
* DB is the authority for these.
*/
export const SF_TASK_EXTENSIONS = [
"verifying",
"reviewing",
"blocked",
"paused",
"retrying",
"pending_input",
] as const;
export type SFTaskExtension = (typeof SF_TASK_EXTENSIONS)[number];
/**
* Message priority levels — determines delivery urgency and retry budget.
*/
export const MESSAGE_PRIORITIES = ["low", "normal", "high", "urgent"] as const;
export type MessagePriority = (typeof MESSAGE_PRIORITIES)[number];
/**
* Dispatch mode → A2A CommunicationPattern mapping.
*/
export const DISPATCH_TO_PATTERN: Record<string, string> = {
single: "request_response",
parallel: "notification",
debate: "streaming",
chain: "request_response",
};
/**
* SF-specific capability extensions on top of A2A AgentCapabilities.
*/
export interface SFAgentCapabilities extends AgentCapabilities {
/** Domain role */
role: "coordinator" | "worker" | "subagent" | "reviewer" | "adversary" | "architect" | "researcher";
/** Isolation level — determines DB access */
isolation: "full" | "constrained";
/** For constrained agents — which tools are permitted */
toolScope?: Array<"file_read" | "file_write" | "execute" | "query" | "memory_read" | "memory_write">;
/** Model tier for cost and routing decisions */
modelTier: "primary" | "validation" | "worker";
/** Domain specializations */
specializations?: Array<
| "milestone_planning"
| "slice_planning"
| "code_review"
| "security_review"
| "adversarial_review"
| "architecture_analysis"
| "research"
| "verification"
>;
}
/**
* SF AgentCard — extends A2A AgentCard with SF-specific capabilities.
* Published by each agent on startup; cached in AgentRegistry.
*/
export interface SFAgentCard extends AgentCard {
capabilities: SFAgentCapabilities;
metadata?: {
basePath?: string;
milestoneId?: string;
sliceId?: string;
worktreePath?: string;
pid?: number;
startedAt?: string;
};
}
/**
* SF Task metadata — stored in A2A Task.metadata.
* sf_state is NOT authoritative — DB is the authority.
*/
export interface SFTaskMetadata {
scope: "milestone" | "slice" | "task" | "inline";
milestoneId: string;
sliceId?: string;
taskId?: string;
title: string;
/** Non-authoritative runtime hint — DB is authority */
sf_state?: SFTaskExtension;
/** Base path for DB access */
basePath: string;
}
/**
* A2A Message envelope used internally.
* Wraps MessageBus messages with A2A metadata.
*/
export interface SFA2AMessage {
id: string;
type: "message" | "task_submitted" | "task_updated" | "task_completed" | "control" | "error";
from: string;
to: string | string[];
body: Record<string, unknown>;
priority: MessagePriority;
sentAt: string;
deliveredAt?: string;
correlationId?: string;
conversationId?: string;
ttlMs?: number;
taskId?: string;
metadata?: Record<string, unknown>;
}
```
---
## 3. Error Handling
### 3.1 Message Delivery Errors
| Error | Detection | Response |
|---|---|---|
| Recipient offline | `AgentRegistry.getStatus() === "offline"` | Buffer message; deliver on reconnect |
| Inbox full (max 1000) | `AgentInbox.unreadCount >= maxInboxSize` | Reject with `TOO_MANY_PENDING`; caller retries with backoff |
| TTL exceeded | `Date.now() - sentAt > ttlMs` | Discard; caller notified via error response |
| DB write conflict | SQLite `SQLITE_BUSY` | Retry with exponential backoff (max 3 attempts, 100ms base) |
| Invalid recipient | `AgentRegistry.getCard(to) === undefined` | Return `AGENT_NOT_FOUND` error; do not retry |
### 3.2 Retry Strategy
```typescript
// File: src/resources/extensions/sf/dispatch/a2a-service.ts
const RETRY_CONFIG = {
maxAttempts: 3,
baseDelayMs: 100,
maxDelayMs: 5000,
backoffMultiplier: 2.0,
jitterFactor: 0.1, // 10% random jitter to prevent thundering herd
} as const;
export class DeliveryError extends Error {
constructor(
message: string,
public readonly code: string,
public readonly retryable: boolean,
public readonly attempts: number,
) {
super(message);
this.name = "DeliveryError";
}
}
async function sendWithRetry(
params: SendParams,
attempt = 1,
): Promise<string> {
const { from, to, body, metadata = {} } = params;
try {
return await doSend(from, to, body, metadata);
} catch (err) {
const isRetryable =
err instanceof DeliveryError && err.retryable && attempt < RETRY_CONFIG.maxAttempts;
if (!isRetryable) {
throw err;
}
const delay = Math.min(
RETRY_CONFIG.baseDelayMs * Math.pow(RETRY_CONFIG.backoffMultiplier, attempt - 1),
RETRY_CONFIG.maxDelayMs,
);
const jitter = delay * RETRY_CONFIG.jitterFactor * Math.random();
await sleep(delay + jitter);
return sendWithRetry(params, attempt + 1);
}
}
```
### 3.3 Agent Crash Handling
```
Worker crash detection:
1. Worker process exits → SIGCHLD handler
2. Update AgentRegistry status: "offline"
3. MessageBus retains undelivered messages (TTL not expired)
4. Coordinator polls AgentRegistry.getStatus() every 30s
5. On reconnect: worker re-registers AgentCard
6. Buffered messages delivered to reconnected AgentInbox
7. Coordinator re-sends any unacknowledged task_updated messages
```
### 3.4 Panic Mode
When `messageService` fails to deliver HIGH/URGENT messages 3 times consecutively:
1. Log `A2A_DELIVERY_PANIC` event to `.sf/journal/`
2. Fall back to file-based signal (`session-status-io.js`)
3. Emit `sf_dispatch_degraded` event
4. Dashboard shows "dispatch degraded" warning
5. Auto-recovery when MessageBus recovers
---
## 4. Backpressure and Flow Control
### 4.1 Per-Agent Inbox Backpressure
```typescript
// File: src/resources/extensions/sf/dispatch/a2a-service.ts
const INBOX_CONFIG = {
maxInboxSize: 1000, // Per-agent queue limit
maxMessageSizeBytes: 64 * 1024, // 64 KB per message body
highWaterMark: 800, // Warn when inbox reaches 80%
overflowAction: "reject", // "reject" | "drop_oldest"
} as const;
interface SendParams {
from: string;
to: string;
body: Record<string, unknown>;
metadata?: {
priority?: MessagePriority;
ttlMs?: number;
replyTo?: string;
taskId?: string;
};
}
function validateSend(params: SendParams): void {
const bodySize = JSON.stringify(params.body).length;
if (bodySize > INBOX_CONFIG.maxMessageSizeBytes) {
throw new DeliveryError(
`Message body ${bodySize} bytes exceeds limit ${INBOX_CONFIG.maxMessageSizeBytes}`,
"MESSAGE_TOO_LARGE",
false, // Not retryable
0,
);
}
const inbox = bus.getInbox(params.to);
if (inbox.unreadCount >= INBOX_CONFIG.maxInboxSize) {
throw new DeliveryError(
`Inbox for ${params.to} is full (${inbox.unreadCount}/${INBOX_CONFIG.maxInboxSize})`,
"INBOX_OVERFLOW",
true, // Retryable after inbox drains
0,
);
}
if (inbox.unreadCount >= INBOX_CONFIG.highWaterMark) {
logWarning("dispatch", `Inbox for ${params.to} at ${inbox.unreadCount}/${INBOX_CONFIG.maxInboxSize}`);
}
}
```
### 4.2 Coordinator Outbox Backpressure
When the coordinator sends faster than workers can consume:
```typescript
// Coordinator: batch outgoing messages, flush on interval
const outbox = new Map<string, SFA2AMessage[]>();
const FLUSH_INTERVAL_MS = 500;
setInterval(() => {
for (const [to, messages] of outbox) {
if (messages.length === 0) continue;
bus.broadcast(coordinatorId, [to], { batch: messages });
messages.length = 0; // drain
}
}, FLUSH_INTERVAL_MS);
// Caller adds to outbox instead of sending immediately
function scheduleSend(params: SendParams): void {
const queue = outbox.get(params.to) ?? [];
queue.push(wrapAsA2AMessage(params));
outbox.set(params.to, queue);
}
```
### 4.3 Memory Budget Per Worker
Each worker has a memory budget for buffering messages it cannot process immediately:
```
MAX_BUFFERED_MESSAGES_PER_WORKER = 100
MAX_BUFFERED_BYTES_PER_WORKER = 10 * 1024 * 1024 // 10 MB
```
If a worker's inbox exceeds either limit, the oldest messages are dropped (not rejected — the sender already moved on).
---
## 5. Observability
### 5.1 Metrics
```typescript
// File: src/resources/extensions/sf/dispatch/metrics.ts
export const A2A_METRICS = {
// Message throughput
"sf_a2a_messages_sent_total": {
type: "counter",
help: "Total A2A messages sent",
labels: ["priority", "from_role", "to_role"],
},
"sf_a2a_messages_delivered_total": {
type: "counter",
help: "Total A2A messages delivered to recipient inbox",
labels: ["priority", "from_role", "to_role"],
},
"sf_a2a_messages_failed_total": {
type: "counter",
help: "Total A2A message delivery failures",
labels: ["priority", "error_code"],
},
"sf_a2a_message_delivery_latency_ms": {
type: "histogram",
help: "End-to-end message delivery latency (send to inbox receipt)",
buckets: [10, 50, 100, 500, 1000, 5000],
},
"sf_a2a_inbox_size": {
type: "gauge",
help: "Current inbox size per agent",
labels: ["agent_id", "role"],
},
"sf_a2a_retry_total": {
type: "counter",
help: "Total retry attempts",
labels: ["priority", "attempt_number"],
},
"sf_a2a_agent_status": {
type: "gauge",
help: "Agent status (1=online, 0.5=busy, 0.1=idle, 0=offline/error)",
labels: ["agent_id", "role"],
},
} as const;
```
### 5.2 Structured Logging
Every A2A operation emits structured log lines:
```typescript
// File: src/resources/extensions/sf/dispatch/logger.ts
type A2ALogEvent =
| { event: "a2a.send"; from: string; to: string; priority: string; messageId: string; sizeBytes: number }
| { event: "a2a.delivered"; messageId: string; to: string; latencyMs: number }
| { event: "a2a.delivery_failed"; messageId: string; error: string; retryable: boolean; attempt: number }
| { event: "a2a.agent_registered"; agentId: string; role: string; capabilities: string[] }
| { event: "a2a.agent_offline"; agentId: string; reason: string }
| { event: "a2a.inbox_overflow"; agentId: string; size: number; action: string }
| { event: "a2a.panic_mode"; reason: string; fallback_used: boolean };
function logA2A(event: A2ALogEvent): void {
const line = JSON.stringify({
ts: new Date().toISOString(),
...event,
});
workflowLogger.log("dispatch", line);
}
```
### 5.3 Trace Context
Propagate trace context through A2A messages for debugging:
```typescript
interface TraceContext {
traceId: string; // ULID — unique per dispatch session
spanId: string; // Per-message ID
parentSpanId?: string;
}
function injectTraceContext(msg: SFA2AMessage): SFA2AMessage {
const spanId = ulid();
return {
...msg,
metadata: {
...msg.metadata,
trace: {
traceId: currentTraceId(),
spanId,
parentSpanId: currentSpanId(),
},
},
};
}
```
Traces are stored in `.sf/journal/a2a-traces/{date}.jsonl` and queryable via `sf trace <traceId>`.
---
## 6. Security
### 6.1 Agent Authentication
Every A2A message must carry a valid agent identity. Identity is established at agent startup:
```typescript
// File: src/resources/extensions/sf/dispatch/auth.ts
/**
* Agent identity token — HMAC-SHA256 of agent ID + basePath + startup timestamp.
* Used to authenticate messages from agents.
* Generated once at agent startup; stored in process.env.SF_AGENT_TOKEN.
*/
function generateAgentToken(agentId: string, basePath: string): string {
const secret = process.env.SF_A2A_SHARED_SECRET ?? process.env.SF_DB_KEY ?? "sf-insecure-dev-secret";
const payload = `${agentId}:${basePath}:${Date.now()}`;
return createHmac("sha256", secret).update(payload).digest("hex").slice(0, 32);
}
function verifyAgentToken(token: string, agentId: string): boolean {
// Tokens are single-use (generated per startup, not reusable)
// Verification is membership check: token must have been issued for this agentId
return validTokens.has(`${agentId}:${token}`);
}
```
### 6.2 Input Validation
```typescript
// File: src/resources/extensions/sf/dispatch/validation.ts
const MAX_BODY_DEPTH = 20; // Nested object depth
const MAX_ARRAY_LENGTH = 1000; // Max array items in body
const MAX_STRING_LENGTH = 100_000; // Max string value length
const FORBIDDEN_KEYS = ["__proto__", "constructor", "prototype"]; // Prototype pollution
function validateMessageBody(body: unknown, depth = 0): void {
if (depth > MAX_BODY_DEPTH) throw new ValidationError("BODY_TOO_DEEP");
if (Array.isArray(body)) {
if (body.length > MAX_ARRAY_LENGTH) throw new ValidationError("ARRAY_TOO_LARGE");
for (const item of body) validateMessageBody(item, depth + 1);
return;
}
if (typeof body === "object" && body !== null) {
for (const [k, v] of Object.entries(body)) {
if (FORBIDDEN_KEYS.includes(k)) throw new ValidationError(`FORBIDDEN_KEY: ${k}`);
validateMessageBody(v, depth + 1);
}
return;
}
if (typeof body === "string" && body.length > MAX_STRING_LENGTH) {
throw new ValidationError("STRING_TOO_LONG");
}
}
```
### 6.3 Capability Enforcement
The `AgentRegistry` enforces that agents only perform actions consistent with their registered capabilities:
```typescript
// File: src/resources/extensions/sf/dispatch/capability-enforcer.ts
function enforceCapabilities(agentId: string, action: string): void {
const card = registry.getCard(agentId);
if (!card) throw new DeliveryError(`Unknown agent: ${agentId}`, "AGENT_NOT_FOUND", false, 0);
const caps = card.capabilities as SFAgentCapabilities;
switch (action) {
case "write_project_db":
if (caps.isolation !== "full") {
throw new DeliveryError(
`${agentId} cannot write project DB (isolation: ${caps.isolation})`,
"ISOLATION_VIOLATION",
false,
0,
);
}
break;
case "send_to_worker":
if (caps.role === "subagent") {
// Constrained subagents can only send to their parent
throw new DeliveryError("Subagent cannot send to workers", "CAPABILITY_DENIED", false, 0);
}
break;
case "read_project_context":
// All agents can read project context (it's in the prompt)
break;
}
}
```
---
## 7. Testing Strategy
### 7.1 Unit Tests
```typescript
// File: src/resources/extensions/sf/dispatch/a2a-service.test.ts
describe("A2AMessageService", () => {
let bus: MessageBus;
let registry: AgentRegistry;
let service: A2AMessageService;
beforeEach(() => {
bus = new MessageBus(tmpDir());
registry = new AgentRegistry(tmpDir(), bus);
service = new A2AMessageService(tmpDir(), registry);
});
test("send_delivers_to_recipient_inbox", async () => {
registry.register(workerCard("worker:1"));
const id = service.send({
from: "coordinator",
to: "worker:1",
body: { type: "task_submitted", taskId: "M01" },
});
const inbox = service.getInbox("worker:1");
const msgs = inbox.list();
expect(msgs).toHaveLength(1);
expect(msgs[0].id).toBe(id);
});
test("sendWithRetry_retries_on_retryable_error", async () => {
// Simulate transient DB busy
vi.spyOn(bus, "send").mockRejectedOnceOnce(new DeliveryError("busy", "DB_BUSY", true, 1));
vi.spyOn(bus, "send").mockResolvedValueOnce("msg-1");
const id = await sendWithRetry({ from: "c", to: "w", body: { test: true } });
expect(id).toBe("msg-1");
expect(bus.send).toHaveBeenCalledTimes(2);
});
test("sendWithRetry_does_not_retry_non_retryable_error", async () => {
vi.spyOn(bus, "send").mockRejectedValueOnce(
new DeliveryError("unknown agent", "AGENT_NOT_FOUND", false, 1),
);
await expect(sendWithRetry({ from: "c", to: "w", body: { test: true } }))
.rejects.toThrow("AGENT_NOT_FOUND");
expect(bus.send).toHaveBeenCalledTimes(1);
});
test("sendOnce_same_key_returns_same_id", async () => {
const id1 = service.sendOnce({ from: "c", to: "w", body: { beat: 1 }, dedupeKey: "heartbeat" });
const id2 = service.sendOnce({ from: "c", to: "w", body: { beat: 2 }, dedupeKey: "heartbeat" });
expect(id1).toBe(id2); // Idempotent
});
test("validateMessageBody_rejects_deep_objects", () => {
const deep = { a: { b: { c: { d: { e: {} } } } };
expect(() => validateMessageBody(deep, 0, MAX_BODY_DEPTH)).toThrow("BODY_TOO_DEEP");
});
test("validateMessageBody_rejects_prototype_pollution", () => {
expect(() => validateMessageBody({ "__proto__": { evil: true } }, 0))
.toThrow("FORBIDDEN_KEY");
});
});
```
### 7.2 Integration Tests
```typescript
// File: src/resources/extensions/sf/tests/a2a-integration.test.ts
describe("A2A Integration", () => {
test("worker_registers_and_receives_task", async () => {
const { coordinator, worker, service } = setupTwoAgentSystem();
// Worker starts, registers
await worker.start();
await waitFor(() => registry.getStatus("worker:1") === "online");
// Coordinator sends task
service.send({
from: "coordinator",
to: "worker:1",
body: { type: "task_submitted", taskId: "M01" },
});
// Worker receives
const msg = await worker.waitForMessage("task_submitted");
expect(msg.body.taskId).toBe("M01");
});
test("worker_crash_does_not_lose_messages", async () => {
const { coordinator, worker, service } = setupTwoAgentSystem();
await worker.start();
service.send({ from: "coordinator", to: "worker:1", body: { type: "task_submitted" } });
// Worker crashes and restarts
await worker.kill();
await worker.start();
// Message should still be in inbox after restart
const msg = await worker.waitForMessage("task_submitted");
expect(msg).toBeDefined();
});
test("coordinator_receives_worker_heartbeat", async () => {
const { coordinator, worker, service } = setupTwoAgentSystem();
await worker.start();
worker.sendHeartbeat();
const msg = await coordinator.waitForMessage("worker.heartbeat");
expect(msg.from).toBe("worker:1");
});
});
```
### 7.3 Chaos Tests
```typescript
// File: src/resources/extensions/sf/tests/a2a-chaos.test.ts
describe("A2A Chaos", () => {
test("messages_delivered_despite_slow_worker", async () => {
// Worker is slow to process (simulate 10s processing time)
worker.simulateSlowProcessing(10_000);
// Send 100 messages while worker is slow
const sends = Array.from({ length: 100 }, (_, i) =>
service.send({ from: "c", to: "w", body: { seq: i } }),
);
const results = await Promise.allSettled(sends);
// All succeed (buffered, not rejected)
expect(results.filter(r => r.status === "fulfilled")).toHaveLength(100);
// Worker processes all after recovery
worker.simulateFastProcessing();
await worker.processAllBuffered();
const received = await worker.getAllMessages();
expect(received).toHaveLength(100);
});
test("panic_mode_activates_on_repeated_failure", async () => {
bus.simulatePermanentFailure();
for (let i = 0; i < 3; i++) {
try {
await service.send({ from: "c", to: "w", body: { test: true } });
} catch {}
}
// Panic mode should be active
expect(service.isPanicMode).toBe(true);
// File-based fallback should be active
expect(sessionStatusSignalWasUsed()).toBe(true);
});
});
```
---
## 8. Rollback Procedures
### 8.1 Feature Flag
All A2A behavior is gated by `SF_A2A_ENABLED`:
```typescript
// File: src/resources/extensions/sf/dispatch/service.ts
const A2A_ENABLED = process.env.SF_A2A_ENABLED === "1";
export class DispatchService {
private messageService: A2AMessageService | null = null;
constructor(opts: DispatchOptions) {
if (A2A_ENABLED) {
this.messageService = new A2AMessageService(opts.basePath, this.registry);
}
// ...
}
async pause(workerId: string): Promise<void> {
if (this.messageService && A2A_ENABLED) {
await this.messageService.send({
from: "coordinator",
to: workerId,
body: { type: "control", action: "pause" },
metadata: { priority: "high" },
});
} else {
// Legacy file-based signal
sendSignal(this.basePath, workerId, "pause");
}
}
}
```
### 8.2 Per-Phase Rollback
| Phase | Rollback |
|---|---|
| Phase 1: A2A adapter types | Delete `a2a-types.ts`, `a2a-task.ts`. No behavior change — code not wired yet. |
| Phase 2: AgentRegistry | Delete `capability-registry.ts`. Remove registry from `DispatchService` constructor. No behavior change. |
| Phase 3: MessageBus wiring | Set `SF_A2A_ENABLED=0`. File-based IPC (`sendSignal`) is the automatic fallback. |
| Phase 4: Subagent A2A | Delete `subagent/a2a.ts`. Restore original `subagent/index.js` from git. |
| Phase 5: UOK kernel A2A | Revert `uok/kernel.js` to pre-Phase-5 state from git. |
| Phase 6: Fallback removal | `session-status-io.js` is never removed — it stays as crash-recovery fallback permanently. |
### 8.3 Emergency Rollback
```bash
# Emergency: disable A2A entirely
SF_A2A_ENABLED=0 sf headless autonomous
# Emergency: revert to specific phase
git stash
git checkout phase2-end # tag or branch at end of Phase 2
SF_A2A_ENABLED=0 sf headless autonomous
# Verify rollback
npx vitest run src/resources/extensions/sf/tests/uok-message-bus.test.mjs
```
---
## 9. Migration Phases (Detailed)
### Phase 1: A2A Type Definitions (Week 1-2)
**Risk: Zero | Behavior: identical**
```
Files created:
dispatch/a2a-types.ts — A2A types + SF extensions
dispatch/a2a-task.ts — Task creation + state mapping
dispatch/a2a-errors.ts — DeliveryError + error codes
Files modified:
None (types are additive, not wired)
```
**Verification:**
```bash
npx tsc --noEmit src/resources/extensions/sf/dispatch/a2a-types.ts
npx vitest run src/resources/extensions/sf/dispatch/a2a-task.test.ts
```
---
### Phase 2: AgentRegistry (Week 2-3)
**Risk: Low | Behavior: additive**
```
Files created:
dispatch/capability-registry.ts — AgentRegistry + SF_CAPABILITY_DEFINITIONS
Files modified:
dispatch/service.ts — Add registry to DispatchService (opt-in via feature flag)
dispatch/index.ts — Export new types
```
**Verification:**
```bash
npx vitest run src/resources/extensions/sf/dispatch/capability-registry.test.ts
SF_A2A_ENABLED=0 npm run test:unit # existing tests pass
```
---
### Phase 3: MessageBus Wiring (Week 3-4)
**Risk: Medium | Behavior: pause/resume/stop now use MessageBus**
```
Files created:
dispatch/a2a-service.ts — A2AMessageService wrapping MessageBus
Files modified:
dispatch/service.ts — Wire MessageBus into pause/resume/stop
dispatch/worker-*.ts — Register AgentCard on spawn
session-status-io.ts — Mark as crash-recovery fallback (never primary)
```
**Before:** `sendSignal(basePath, id, "pause")` → signal file
**After:** `messageService.send({ from, to, body: { type: "control", action: "pause" }, priority: HIGH })`
**Fallback:** File signal if MessageBus delivery fails 3 times
**Verification:**
```bash
SF_A2A_ENABLED=1 npx vitest run src/resources/extensions/sf/tests/a2a-integration.test.ts
SF_A2A_ENABLED=0 npm run test:unit # existing tests pass
```
---
### Phase 4: Subagent A2A (Week 4-5)
**Risk: Medium | Behavior: subagent modes unchanged**
```
Files modified:
subagent/index.ts — Use DispatchService internally
dispatch/service.ts — Handle isolation: constrained
```
**Verification:**
```bash
SF_A2A_ENABLED=1 npx vitest run src/resources/extensions/sf/tests/subagent-a2a.test.ts
SF_A2A_ENABLED=0 npm run test:unit # existing tests pass
```
---
### Phase 5: UOK Kernel A2A (Week 5-6)
**Risk: Medium | Behavior: UOK autonomous loop uses A2A**
```
Files modified:
uok/kernel.ts — Use DispatchService + A2AMessageService
uok/index.ts — Export new A2A types
```
**Verification:**
```bash
SF_A2A_ENABLED=1 npm run test:integration # Full integration suite
SF_A2A_ENABLED=0 npm run test:integration # Legacy still works
```
---
### Phase 6: A2A Default On (Week 6-7)
**Risk: Low | Behavior: A2A is now the default**
```
Actions:
1. Set SF_A2A_ENABLED=1 as default in preferences
2. Document in CHANGELOG.md
3. Monitor for 1 week before declaring stable
```
---
## 10. Operational Runbooks
### 10.1 Dispatch Degraded
**Symptoms:** Dashboard shows "dispatch degraded"; `sf_dispatch_degraded` events in journal
**Diagnosis:**
```bash
# Check MessageBus health
node -e "import('./src/resources/extensions/sf/uok/message-bus.js').then(m => {
const metrics = m.getUokMessageBusMetrics();
console.log(JSON.stringify(metrics, null, 2));
}')
# Check for panic mode
cat .sf/journal/*.jsonl | jq 'select(.event == "a2a.panic_mode")' | tail -5
```
**Fix:**
```bash
# Switch to file-based IPC temporarily
SF_A2A_ENABLED=0 sf headless autonomous
# Restart with A2A off
sf headless autonomous
# After fix: re-enable A2A
sf config set SF_A2A_ENABLED=1
```
### 10.2 Worker Not Receiving Messages
**Symptoms:** Worker shows "offline" but process is running
**Diagnosis:**
```bash
# Check worker AgentCard registration
curl -s http://localhost:3030/api/dispatch/agents | jq '.[] | select(.role == "worker")'
# Check worker inbox size
node -e "const m = require('./src/resources/extensions/sf/dispatch/metrics'); m.getInboxMetrics('worker:M01')"
# Check MessageBus delivery latency
cat .sf/journal/*.jsonl | jq 'select(.event == "a2a.delivery_failed")' | tail -20
```
**Fix:**
```bash
# Restart the worker process
sf parallel stop M01
sf parallel start M01
# Or: send SIGUSR1 to worker to re-register its AgentCard
kill -USR1 $(pgrep -f "sf.*M01")
```
### 10.3 Inbox Overflow
**Symptoms:** `"INBOX_OVERFLOW"` errors in logs; workers missing messages
**Diagnosis:**
```bash
# Find overflowing inboxes
node -e "import('./src/resources/extensions/sf/dispatch/metrics').then(m => {
Object.entries(m.getAllInboxSizes()).forEach(([id, size]) => {
if (size > 900) console.log(id, size);
});
})"
```
**Fix:**
```bash
# Compact all message buses (removes messages older than retention)
sf uok messages compact
# Or: increase inbox size limit temporarily
SF_INBOX_MAX_SIZE=5000 sf headless autonomous
```
---
## 11. Performance Targets
| Metric | Target | Critical Threshold |
|---|---|---|
| Message delivery latency (local) | < 50ms p50, < 500ms p99 | > 2000ms |
| Inbox delivery for 100 parallel workers | < 5s end-to-end | > 15s |
| Agent registration time | < 100ms | > 1000ms |
| Message throughput | > 1000 msg/s per coordinator | < 100 msg/s |
| Memory per worker (idle) | < 50 MB | > 200 MB |
| Memory per coordinator (10 workers) | < 200 MB | > 500 MB |
| DB WAL size growth | < 10 MB/day | > 100 MB/day |
| Recovery time after coordinator crash | < 5s | > 30s |
---
## 12. File Manifest
### New Files
| File | Lines (est) | Purpose |
|---|---|---|
| `dispatch/a2a-types.ts` | 120 | Core A2A types + SF extensions |
| `dispatch/a2a-task.ts` | 80 | Task creation + state mapping |
| `dispatch/a2a-errors.ts` | 60 | DeliveryError + error codes |
| `dispatch/a2a-service.ts` | 250 | A2AMessageService wrapping MessageBus |
| `dispatch/capability-registry.ts` | 180 | AgentRegistry + SF_CAPABILITY_DEFINITIONS |
| `dispatch/metrics.ts` | 60 | A2A Prometheus metrics |
| `dispatch/logger.ts` | 40 | A2A structured logging |
| `dispatch/validation.ts` | 70 | Message body validation |
| `dispatch/auth.ts` | 50 | Agent token generation + verification |
| `dispatch/index.ts` | 30 | Barrel exports |
| `dispatch/a2a-service.test.ts` | 200 | Unit tests |
| `tests/a2a-integration.test.ts` | 300 | Integration tests |
| `tests/a2a-chaos.test.ts` | 150 | Chaos tests |
| **Total new** | **~1600 LOC** | |
### Modified Files
| File | Change |
|---|---|
| `dispatch/service.ts` | Add registry + messageService; wire pause/resume/stop |
| `dispatch/worker-orchestrator.ts` | Register AgentCard on spawn; open AgentInbox |
| `uok/kernel.ts` | Register coordinator AgentCard; use DispatchService |
| `uok/message-bus.js` | Add AgentCard types (no behavior change) |
| `uok/index.ts` | Export A2A types |
| `subagent/index.ts` | Use DispatchService; remove ~600 LOC spawn management |
| `session-status-io.ts` | Mark as crash-recovery fallback only |
---
## Summary
| Question | Answer |
|---|---|
| A2A as internal protocol | YES — Task state, priority, capability discovery |
| Transport | SQLite MessageBus (not HTTP/WebSocket) |
| External A2A | Optional; wired later |
| Feature flag | `SF_A2A_ENABLED` gates all behavior |
| Migration | 6 phases; each independently rollback-safe |
| Error handling | Retry with exponential backoff; panic mode with file-based fallback |
| Backpressure | Per-inbox limits; coordinator outbox batching |
| Observability | Prometheus metrics + structured JSONL logging |
| Security | Agent tokens, input validation, capability enforcement |
| Testing | Unit + integration + chaos tests for every phase |
| Rollback | `SF_A2A_ENABLED=0` disables all new behavior instantly |