remove A2A; swarm enrollment + status projection + web swarms view; headless refactor
- A2A removal per M054/R071 cancellation 2026-05-17 (-2294 lines):
- docs/plans/A2A_ADOPTION_PLAN.md, MISSION-A2A-ADOPTION.md deleted
- src/resources/extensions/sf/uok/a2a-agent-server.js,
a2a-transport.js deleted
- tests/a2a-auth.test.mjs deleted
- swarm-dispatch.js purged of A2A-conditional code paths
- New: scripts/sf-swarm-enroll.mjs + test (operator-facing swarm
enrollment, replaces former A2A pairing flow)
- New: src/status-projection.ts + test, web/lib/swarm-status.ts +
test, web/components/sf/swarms-view.tsx, web/app/api/swarms/
(web swarms-view surface — direct visibility into running swarm
state without requiring TUI; aligns with project_tui_deprecating)
- headless-{answers,query,ui,headless}.ts: coordinated tweaks
consistent with the headless-as-default direction (R124 proposal)
- docs/dev/drafts/M053-per-repo-supervisor.md: design refinement
- .sf/REQUIREMENTS.md: small text fixes (6/6 churn)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
ecac4328bd
commit
077fd0a2a7
27 changed files with 1087 additions and 2342 deletions
|
|
@ -868,12 +868,12 @@ ADR-0000 declares SF a **purpose-to-software compiler**. R036–R040 codify that
|
|||
### R068 — Always-on per-repo supervisor
|
||||
- Class: capability
|
||||
- Status: active
|
||||
- Description: SF runs always-on per repo through a supervisor boundary. The first implementation target is a user-level systemd unit per registered repo wrapping `sf headless autonomous` with crash-restart + crash-loop backoff. The existing `packages/daemon` may remain as an adapter/control package, but it must not become a shared multi-repo workflow owner or a custom always-on JSON-RPC brain. Each repo's execution stays self-contained.
|
||||
- Why it matters: Operator wants 24/7 autonomous operation without manually-tended watchdogs. Per-repo supervisors deliver always-on with smallest blast radius (one bad repo cannot take down others), standard ops tooling (`systemctl status`, `journalctl`), and no new cross-repo workflow semantics.
|
||||
- Description: SF has one real operator server: the web/Next.js process. Repositories do **not** each get their own server. Each enrolled repo gets a supervisor-managed worker boundary wrapping `sf headless autonomous` with crash-restart + crash-loop backoff; the first Linux implementation target is a user-level systemd unit per registered repo. The existing `packages/daemon` may remain as an adapter/control package, but it must not become a shared multi-repo workflow owner or a custom always-on JSON-RPC brain. Each repo's execution stays self-contained.
|
||||
- Why it matters: Operator wants one web surface for 24/7 visibility without manually-tended watchdogs. Per-repo supervised workers deliver always-on execution with smallest blast radius (one bad repo cannot take down others), standard ops tooling (`systemctl status`, `journalctl`), and no new cross-repo workflow semantics.
|
||||
- Source: user-direction-2026-05-17, reshaped by codex-adversarial-review-2026-05-17
|
||||
- Primary owning slice: M053/S10
|
||||
- Validation: unmapped
|
||||
- Notes: Was originally specified as a custom `sf serve` JSON-RPC daemon hosting multiple swarms. Codex review flagged the multi-swarm-in-one-daemon design as collapsing blast radius and quietly reintroducing federation; reshaped to per-repo supervision plus read-model projection. M028 federation remains the only place for cross-repo coordination.
|
||||
- Notes: Was originally specified as a custom `sf serve` JSON-RPC daemon hosting multiple swarms. Codex review flagged the multi-swarm-in-one-daemon design as collapsing blast radius and quietly reintroducing federation; reshaped to one web server plus per-repo supervised worker processes and read-model projection. M028 federation remains the only place for cross-repo coordination.
|
||||
|
||||
### R069 — Multi-Swarm Isolation [CANCELLED]
|
||||
- Class: capability
|
||||
|
|
@ -892,12 +892,12 @@ ADR-0000 declares SF a **purpose-to-software compiler**. R036–R040 codify that
|
|||
- Validation: unmapped
|
||||
- Notes: Atomic-write contract: writers MUST use temp-file + fsync + rename. Readers MUST tolerate ENOENT, schema mismatch, and partial-parse as "degraded/last-known-good" without surfacing partial JSON to the operator. Schema versioned via `projectionVersion: 1` field; readers reject unknown versions cleanly. Provenance: each projection record includes the swarm-id, repo-path, and writer-timestamp. Excludes self-feedback / doctor detail / last-green-ledger aggregation per codex review — those stay per-repo only until M028 federation defines a safe cross-repo channel.
|
||||
|
||||
### R071 — Headless+A2A primary investment surface [CANCELLED]
|
||||
### R071 — A2A primary investment surface [CANCELLED]
|
||||
- Class: capability
|
||||
- Status: cancelled
|
||||
- Description: [CANCELLED 2026-05-17] A2A was not actually a requirement (user direction: "a2a was not requirment just nore i think we have it"). Web-first preference for new operator features is an informal convention, not milestone-grade work. CLI-as-thin-client was a regression risk to daemonless recovery per codex review and is not pursued. If A2A becomes important later, it gets filed as a new R then.
|
||||
- Description: [CANCELLED 2026-05-17] A2A was not actually a requirement (user direction: "a2a was not requirment just nore i think we have it"). SF-owned A2A code and plan documents were removed from the active tree; the only remaining A2A dependency is transitive inside the Google Gemini CLI provider. Web-first preference for new operator features is an informal convention, not milestone-grade work. If A2A becomes important later, it gets filed as a new R then.
|
||||
- Source: user-direction-2026-05-17
|
||||
- Notes: Cancelled-clean. TUI deprioritization remains an informal preference, not a tracked requirement.
|
||||
- Notes: Cancelled-clean. TUI deprioritization remains an informal preference, not a tracked requirement. `sf autonomous` already routes to the non-TUI machine surface internally, so operators should not need to type `headless` for normal autonomous runs; `sf headless ...` remains as a compatibility spelling for explicit machine-surface commands.
|
||||
|
||||
### R072 — Status-Completion-Drift Detector (Wiggums extension)
|
||||
- Class: capability
|
||||
|
|
|
|||
|
|
@ -1,57 +1,77 @@
|
|||
# M053 Per-Repo Supervisor
|
||||
# M053 Supervisor And Web Server Shape
|
||||
|
||||
SF needs a lightweight supervisor layer so one SF operator can watch and steer many independent repositories without collapsing them into one planning state. ADR-0000 frames SF as a purpose-to-software compiler; this milestone treats each repository as a separate compilation unit with its own `.sf/` directory, its own SQLite state, and its own autonomous-loop boundary.
|
||||
M053 uses one real server: the SF web/Next.js process. It is the operator
|
||||
surface and read-model aggregator.
|
||||
|
||||
The goal is to supervise N independent repos from a single SF process or from a coordinated set of SF processes. A supervisor decides when a repo is eligible for one autonomous tick, reports current health to an operator, and leaves actual task execution to later milestones. M053 is deliberately not a dispatch implementation. It defines the process boundary and the status contract needed before real work can be scheduled safely.
|
||||
Enrolled repositories do not run their own web server and do not host a shared
|
||||
workflow daemon. Each repo has its own supervised worker boundary for
|
||||
`sf autonomous` (implemented by the existing non-TUI machine surface). On Linux
|
||||
that boundary is a user-level systemd unit;
|
||||
other platforms can add equivalent supervisor adapters. The worker writes a
|
||||
small repo-local status projection, and the single web server reads those
|
||||
projections.
|
||||
|
||||
## Architecture
|
||||
## Process Model
|
||||
|
||||
The core runtime unit is one `RepoSupervisor` instance per repository. The class has no module-global state. Each instance owns its `repoPath`, `supervisorId`, inferred `.sf/sf.db` path, runaway-guard counters, dispatch queue placeholder, lifecycle flags, and status counters. A caller can create many supervisors in the same Node process without shared mutable state leaking between repositories.
|
||||
The operator server is singular. It serves the browser UI, exposes `/api/swarms`,
|
||||
and reads `~/.sf/swarms.json` to discover enrolled repositories.
|
||||
|
||||
Each supervisor is responsible for opening or receiving its own database connection for the repo-local `.sf/sf.db`. The current skeleton records the inferred database path and keeps a placeholder connection state; real SQLite access belongs in the next implementation slice. The important contract is that repo A never writes repo B's `.sf/` state and that cross-repo roll-ups are read-only summaries produced above the per-repo boundary.
|
||||
Each repository worker is separate. It runs from that repo's working directory,
|
||||
uses that repo's `.sf/sf.db`, and writes only that repo's `.sf/` runtime files.
|
||||
The worker is allowed to write `.sf/status.projection.json` with temp-file,
|
||||
fsync, and rename. It is not allowed to mutate another repo's DB or aggregate
|
||||
another repo's doctor/self-feedback/ledger data.
|
||||
|
||||
The dispatch queue is also per instance. For M053, `tick()` only logs intent such as `would dispatch unit X` and returns a result object. It must not import or call the existing autonomous loop, `parallel-orchestrator.js`, `swarm-dispatch.js`, or any detector. This avoids the known module-global state in the old orchestrator and keeps the new supervisor boundary testable.
|
||||
The supervisor is an OS/process boundary, not the product brain. systemd,
|
||||
launchd, or a small adapter may restart a worker and expose process health, but
|
||||
the planning state remains repo-local and the web server remains the operator
|
||||
surface.
|
||||
|
||||
## Lifecycle
|
||||
## Status Projection
|
||||
|
||||
`start()` marks the supervisor as running and prepares repo-local resources. Future versions will open the SQLite connection and load pending queue state here.
|
||||
Each repo publishes `.sf/status.projection.json` with `projectionVersion: 1`.
|
||||
The projection contains only the read-only fields the web dashboard needs:
|
||||
active milestone, active slice, current unit, next unit, queue depth,
|
||||
last-cycle outcome, writer timestamp, and coarse health.
|
||||
|
||||
`tick()` is one autonomous loop iteration for exactly one repository. It checks lifecycle state, returns `{ status: "paused" }` while paused, selects the next queued unit placeholder, logs the dispatch intent, and records the result. All exceptions are caught inside the method so one repo cannot crash the whole supervising process.
|
||||
The web reader treats missing, corrupt, or unknown-version projections as a
|
||||
per-repo degraded row. One broken projection must not break the dashboard.
|
||||
|
||||
`pause()` prevents future ticks from dispatching work while preserving counters and queue state. `resume()` clears the pause flag. `stop()` closes the supervisor boundary for that repo and marks its placeholder database connection as closed. `getStatus()` returns a plain JSON-safe object containing `repoId`, `lastTickAt`, `lastTickResult`, `paused`, `ticksTotal`, and `errorsTotal`.
|
||||
The projection excludes self-feedback aggregation, full doctor reports,
|
||||
last-green-ledger details, cross-repo learning, and cross-repo dispatch. Those
|
||||
belong behind a future federation requirement, not M053.
|
||||
|
||||
## Coordination
|
||||
## Enrollment
|
||||
|
||||
Two coordination models are viable.
|
||||
Enrollment writes two operator-owned artifacts:
|
||||
|
||||
Option A: each supervisor runs independently while sharing a global rate limiter for scarce resources such as LLM requests, Codex sessions, and heavy disk I/O. This keeps the POC small. Supervisors remain isolated, but the outer process injects shared budget controls before calling `tick()`.
|
||||
- `~/.sf/swarms.json` registers the repo path, display name, and supervisor
|
||||
metadata for the web server.
|
||||
- A platform supervisor unit starts `sf autonomous` from the repo
|
||||
directory and restarts it with backoff.
|
||||
|
||||
Option B: a central scheduler chooses which supervisor ticks next based on repo priority, staleness, backoff state, and resource availability. This gives stronger fairness and operator control, but it needs a richer scheduling policy and more status history before it is worth the complexity.
|
||||
On Linux the first adapter is a user systemd unit named from a stable hash of
|
||||
the repo path. The unit uses `Restart=always`, `RestartSec=30s`, and systemd
|
||||
start-limit settings for crash-loop backoff.
|
||||
|
||||
M053 recommends Option A for the POC. It proves the per-repo boundary and status contract without creating a new orchestration kernel. Option B is the evolution path once multiple repos are being supervised and starvation, priority, or budget contention becomes observable.
|
||||
## RPC Client Boundary
|
||||
|
||||
## Failure Isolation
|
||||
`@singularity-forge/rpc-client` is the reusable stdio JSON-RPC adapter. Root
|
||||
headless clients and `packages/daemon` should import it directly. The coding
|
||||
agent remains the RPC server implementation and still owns interactive/session
|
||||
internals; it should not be the source of reusable client code for web, daemon,
|
||||
or headless orchestration.
|
||||
|
||||
A panic in one repo supervisor must not take down other supervisors. `tick()` catches every error, increments `errorsTotal`, records `lastTickResult`, and returns an error result instead of throwing. The next layer should wrap each supervisor tick in its own try/catch as a second barrier.
|
||||
## Definition Of Done
|
||||
|
||||
The supervisor should also carry a small circuit breaker. After repeated failures, the supervisor can stop dispatching and report a degraded state until an operator resets it or a backoff window expires. The skeleton keeps the counters needed for that policy; the real breaker thresholds should be introduced with behaviour tests when dispatch is wired.
|
||||
M053 is done when:
|
||||
|
||||
## Persistence
|
||||
- The non-TUI status query writes the versioned atomic status projection.
|
||||
- Web has a Swarms view backed by `/api/swarms`.
|
||||
- The web reader survives missing/corrupt projections per repo.
|
||||
- Linux enrollment can register a repo in `~/.sf/swarms.json` and create a
|
||||
user-level systemd worker for `sf autonomous`.
|
||||
- Root headless/client utilities use `@singularity-forge/rpc-client`.
|
||||
|
||||
Each repository state lives in that repository's `.sf/sf.db`. There are no cross-repo writes. Repo-local status output may be written to `.sf/supervisor-status.json` by the caller using `getStatus()`. A top-level operator can produce a roll-up file from those plain status objects, but that roll-up is a report, not an authority that mutates repo-local planning state.
|
||||
|
||||
This preserves the SF planning model: `.sf/sf.db` remains the canonical structured store for a repo, while generated JSON status is an operator-facing projection.
|
||||
|
||||
## Operator Interface
|
||||
|
||||
The intended operator surface is `sf supervisor status`. It should show one row per supervised repository with repo id, path, pause state, last tick time, last tick result, total ticks, total errors, and any circuit-breaker state. The command can also emit JSON for automation.
|
||||
|
||||
Each repo may expose `.sf/supervisor-status.json`, written from `RepoSupervisor.getStatus()`. A supervising process can additionally write a top-level roll-up containing all repo statuses and aggregate resource budget information. The roll-up should identify stale repos, paused repos, and repos with recent errors without reaching into another repo's `.sf/sf.db`.
|
||||
|
||||
## Open Questions
|
||||
|
||||
Secrets per repo are unresolved. A supervised repo may need different provider keys, different vault scopes, or no network access at all. The supervisor should not assume the operator process has permission to reuse its own secrets in every repo.
|
||||
|
||||
LLM key rotation is also unresolved. Shared rate limiting works for a POC, but production supervision needs a policy for provider quotas, key health, rotation, and per-repo attribution.
|
||||
|
||||
The repository trust boundary belongs to M056. A supervisor that watches arbitrary repos must distinguish trusted, restricted, and untrusted repos before it runs tools, loads instructions, or passes repo content to external models. M053 only creates the lightweight supervision boundary; it does not solve trust policy.
|
||||
M053 is not done by creating a per-repo server. That is explicitly out of
|
||||
scope.
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -1,124 +0,0 @@
|
|||
# Mission: Adopt A2A as Internal Agent Communication Protocol
|
||||
|
||||
## Metadata
|
||||
- **Created:** 2026-05-08
|
||||
- **Status:** Proposed (not started)
|
||||
- **Source:** Research from 4+ model consultations + codebase analysis + cross-repo pattern study
|
||||
- **Plans:** `docs/plans/A2A_ADOPTION_PLAN.md`, `docs/plans/UNIFIED_DISPATCH_V2.md`, `docs/plans/dispatch-orchestration-architecture.md`
|
||||
|
||||
---
|
||||
|
||||
## Goal
|
||||
|
||||
Consolidate SF's 5 dispatch mechanisms + MessageBus into a unified dispatch architecture using A2A as the semantic communication layer, with MessageBus as the transport.
|
||||
|
||||
---
|
||||
|
||||
## Context
|
||||
|
||||
### Problem
|
||||
SF has accumulated 5 dispatch mechanisms without a unifying abstraction:
|
||||
1. **subagent tool** — inline delegation, 4 tools, no project DB
|
||||
2. **parallel-orchestrator** — milestone-level parallelism, full tools, shared SQLite WAL
|
||||
3. **slice-parallel-orchestrator** — same pattern at slice scope (~80% duplicate)
|
||||
4. **UOK kernel** — autonomous loop controller
|
||||
5. **MessageBus** — SQLite-backed durable messaging (well-implemented but not wired)
|
||||
6. **Cmux** — surface integration (keep separate)
|
||||
|
||||
### Evidence
|
||||
- `parallel-orchestrator.js` + `slice-parallel-orchestrator.js` share ~80% logic
|
||||
- MessageBus has zero references in parallel-orchestrator (coordination via file-based IPC)
|
||||
- `@a2a-js/sdk@0.3.11` already in node_modules (transitive dep of `@google/gemini-cli-core`)
|
||||
- All 4 consulted models converged: merge the orchestrators first, wire MessageBus, keep subagent constraint
|
||||
|
||||
### Constraints
|
||||
- Must preserve existing behavior throughout migration (feature-flagged rollout)
|
||||
- `SF_A2A_ENABLED=0` must disable all new A2A behavior instantly
|
||||
- subagent isolation (4 tools, no project DB writes) is NOT changed
|
||||
- File-based IPC (`session-status-io.js`) stays as permanent crash-recovery fallback
|
||||
- DB is authoritative; A2A messages are hints
|
||||
|
||||
---
|
||||
|
||||
## Mission Tasks
|
||||
|
||||
### Phase 1 — A2A Type Definitions (Week 1-2)
|
||||
- [ ] Create `dispatch/a2a-types.ts` — A2A types + SF extensions (A2ATaskState, SFTaskExtension, MessagePriority, SFAgentCapabilities, SFAgentCard)
|
||||
- [ ] Create `dispatch/a2a-task.ts` — Task creation + state mapping functions
|
||||
- [ ] Create `dispatch/a2a-errors.ts` — DeliveryError + error codes
|
||||
- [ ] Run: `npx tsc --noEmit` on new types
|
||||
- [ ] Run: `npx vitest run dispatch/a2a-task.test.ts`
|
||||
|
||||
### Phase 2 — AgentRegistry (Week 2-3)
|
||||
- [ ] Create `dispatch/capability-registry.ts` — AgentRegistry + SF_CAPABILITY_DEFINITIONS
|
||||
- [ ] Create `dispatch/index.ts` — barrel exports
|
||||
- [ ] Wire registry into `DispatchService` (opt-in via `SF_A2A_ENABLED=1`)
|
||||
- [ ] Run: unit tests pass
|
||||
- [ ] Run: existing tests pass with `SF_A2A_ENABLED=0`
|
||||
|
||||
### Phase 3 — MessageBus Wiring (Week 3-4)
|
||||
- [ ] Create `dispatch/a2a-service.ts` — A2AMessageService wrapping MessageBus
|
||||
- [ ] Create `dispatch/metrics.ts` — A2A Prometheus metrics
|
||||
- [ ] Create `dispatch/logger.ts` — structured JSONL logging
|
||||
- [ ] Create `dispatch/validation.ts` — message body validation
|
||||
- [ ] Create `dispatch/auth.ts` — agent token generation + verification
|
||||
- [ ] Replace file-based IPC with MessageBus.send() for pause/resume/stop
|
||||
- [ ] Keep file-based IPC as crash-recovery fallback
|
||||
- [ ] Run: integration tests with `SF_A2A_ENABLED=1`
|
||||
- [ ] Run: existing tests pass with `SF_A2A_ENABLED=0`
|
||||
|
||||
### Phase 4 — Subagent A2A (Week 4-5)
|
||||
- [ ] Refactor `subagent/index.ts` to use DispatchService
|
||||
- [ ] Remove ~600 LOC spawn management (replaced by `dispatch.start()`)
|
||||
- [ ] Verify all 4 subagent modes work identically
|
||||
- [ ] Add optional MessageBus inbox for subagents (`useMessageBus: true`)
|
||||
- [ ] Run: subagent tests with `SF_A2A_ENABLED=1`
|
||||
|
||||
### Phase 5 — UOK Kernel A2A (Week 5-6)
|
||||
- [ ] Register coordinator AgentCard in `uok/kernel.ts`
|
||||
- [ ] Replace `startParallel`/`startSliceParallel` calls with DispatchService
|
||||
- [ ] Verify `sf headless autonomous` works identically
|
||||
- [ ] Run: integration tests with `SF_A2A_ENABLED=1`
|
||||
|
||||
### Phase 6 — A2A Default On (Week 6-7)
|
||||
- [ ] Set `SF_A2A_ENABLED=1` as default in preferences
|
||||
- [ ] Monitor for 1 week
|
||||
- [ ] Promote to stable
|
||||
|
||||
---
|
||||
|
||||
## Verification Criteria
|
||||
|
||||
1. `npx vitest run src/resources/extensions/sf/tests/uok-message-bus.test.mjs` passes throughout
|
||||
2. `SF_A2A_ENABLED=0 npm run test:unit` passes throughout (legacy preserved)
|
||||
3. Pause/resume works via MessageBus with `SF_A2A_ENABLED=1`
|
||||
4. All 4 subagent modes work identically (parallel, debate, chain, single)
|
||||
5. `sf headless autonomous` works with `SF_A2A_ENABLED=1`
|
||||
6. Worker AgentCards visible in dashboard
|
||||
7. Panic mode activates correctly (3 consecutive delivery failures → file-based fallback)
|
||||
8. `sf uok messages compact` works
|
||||
9. `sf dispatch agents list` shows registered agents with capabilities
|
||||
|
||||
---
|
||||
|
||||
## Key Files
|
||||
|
||||
| File | Role |
|
||||
|---|---|
|
||||
| `src/resources/extensions/sf/uok/message-bus.js` | Existing MessageBus (transport) |
|
||||
| `src/resources/extensions/sf/dispatch/service.js` | DispatchService (A2A coordinator client) |
|
||||
| `src/resources/extensions/sf/worktree-orchestrator.js` | Worker spawner (A2A agent) |
|
||||
| `src/resources/extensions/sf/uok/kernel.js` | UOK kernel (A2A coordinator) |
|
||||
| `src/resources/extensions/subagent/index.js` | Subagent tool (A2A constrained agent) |
|
||||
| `docs/plans/A2A_ADOPTION_PLAN.md` | Full production-grade plan |
|
||||
|
||||
---
|
||||
|
||||
## Exit Criteria
|
||||
|
||||
Mission is complete when:
|
||||
- All 6 phases are merged to main
|
||||
- `SF_A2A_ENABLED=1` is the default
|
||||
- `SF_A2A_ENABLED=0` still passes all tests (legacy preserved)
|
||||
- 0 P0/P1 bugs in dispatch layer for 2 consecutive weeks
|
||||
- A2A observability dashboard shows healthy metrics (delivery latency < 500ms p99)
|
||||
|
|
@ -57,7 +57,7 @@ A surface is where a person or program drives or observes the flow.
|
|||
- **Editor surface** — IDE/editor integration, usually through an adapter.
|
||||
- **Machine surface** — non-interactive runner for CI, scripts, schedulers, and parent processes.
|
||||
|
||||
`sf headless` is the current command name for the machine surface. It means "without the TUI"; it does not mean "JSON", "autonomous", or a separate flow.
|
||||
`sf headless` is the compatibility command name for the machine surface. It means "without the TUI"; it does not mean "JSON", "autonomous", or a separate flow. Product-facing commands should prefer direct non-TUI spellings such as `sf autonomous`; the CLI may route those through the same machine-surface implementation internally.
|
||||
|
||||
## Protocol
|
||||
|
||||
|
|
|
|||
42
package-lock.json
generated
42
package-lock.json
generated
|
|
@ -13,7 +13,6 @@
|
|||
"packages/*"
|
||||
],
|
||||
"dependencies": {
|
||||
"@a2a-js/sdk": "^0.3.13",
|
||||
"@anthropic-ai/sdk": "^0.96.0",
|
||||
"@anthropic-ai/vertex-sdk": "^0.16.0",
|
||||
"@aws-sdk/client-bedrock-runtime": "^3.983.0",
|
||||
|
|
@ -104,47 +103,6 @@
|
|||
"vectordrive": "^0.1.35"
|
||||
}
|
||||
},
|
||||
"node_modules/@a2a-js/sdk": {
|
||||
"version": "0.3.13",
|
||||
"resolved": "https://registry.npmjs.org/@a2a-js/sdk/-/sdk-0.3.13.tgz",
|
||||
"integrity": "sha512-BZr0f9JVNQs3GKOM9xINWCh6OKIJWZFPyqqVqTym5mxO2Eemc6I/0zL7zWnljHzGdaf5aZQyQN5xa6PSH62q+A==",
|
||||
"license": "Apache-2.0",
|
||||
"dependencies": {
|
||||
"uuid": "^11.1.0"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=18"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"@bufbuild/protobuf": "^2.10.2",
|
||||
"@grpc/grpc-js": "^1.11.0",
|
||||
"express": "^4.21.2 || ^5.1.0"
|
||||
},
|
||||
"peerDependenciesMeta": {
|
||||
"@bufbuild/protobuf": {
|
||||
"optional": true
|
||||
},
|
||||
"@grpc/grpc-js": {
|
||||
"optional": true
|
||||
},
|
||||
"express": {
|
||||
"optional": true
|
||||
}
|
||||
}
|
||||
},
|
||||
"node_modules/@a2a-js/sdk/node_modules/uuid": {
|
||||
"version": "11.1.0",
|
||||
"resolved": "https://registry.npmjs.org/uuid/-/uuid-11.1.0.tgz",
|
||||
"integrity": "sha512-0/A9rDy9P7cJ+8w1c9WD9V//9Wj15Ce2MPz8Ri6032usz+NfePxx5AcN3bN+r6ZL6jEo066/yNYB3tn4pQEx+A==",
|
||||
"funding": [
|
||||
"https://github.com/sponsors/broofa",
|
||||
"https://github.com/sponsors/ctavan"
|
||||
],
|
||||
"license": "MIT",
|
||||
"bin": {
|
||||
"uuid": "dist/esm/bin/uuid"
|
||||
}
|
||||
},
|
||||
"node_modules/@alcalzone/ansi-tokenize": {
|
||||
"version": "0.3.0",
|
||||
"resolved": "https://registry.npmjs.org/@alcalzone/ansi-tokenize/-/ansi-tokenize-0.3.0.tgz",
|
||||
|
|
|
|||
|
|
@ -115,7 +115,6 @@
|
|||
"check:test-imports": "node scripts/check-test-imports.mjs"
|
||||
},
|
||||
"dependencies": {
|
||||
"@a2a-js/sdk": "^0.3.13",
|
||||
"@anthropic-ai/sdk": "^0.96.0",
|
||||
"@anthropic-ai/vertex-sdk": "^0.16.0",
|
||||
"@aws-sdk/client-bedrock-runtime": "^3.983.0",
|
||||
|
|
|
|||
199
scripts/sf-swarm-enroll.mjs
Normal file
199
scripts/sf-swarm-enroll.mjs
Normal file
|
|
@ -0,0 +1,199 @@
|
|||
#!/usr/bin/env node
|
||||
/**
|
||||
* sf-swarm-enroll.mjs — register repos for the web swarm dashboard.
|
||||
*
|
||||
* Purpose: make M053 concrete without creating one server per repo. This script
|
||||
* registers a repo in the operator registry and, on Linux, writes a user-level
|
||||
* systemd worker unit that runs `sf autonomous` inside that repo.
|
||||
*/
|
||||
import { createHash } from "node:crypto";
|
||||
import {
|
||||
existsSync,
|
||||
mkdirSync,
|
||||
readFileSync,
|
||||
realpathSync,
|
||||
writeFileSync,
|
||||
} from "node:fs";
|
||||
import { homedir } from "node:os";
|
||||
import { basename, dirname, join, resolve } from "node:path";
|
||||
import { spawnSync } from "node:child_process";
|
||||
|
||||
export const DEFAULT_REGISTRY_PATH = join(homedir(), ".sf", "swarms.json");
|
||||
export const SYSTEMD_USER_DIR = join(homedir(), ".config", "systemd", "user");
|
||||
|
||||
export function stableSwarmId(repoPath) {
|
||||
return createHash("sha256")
|
||||
.update(resolve(repoPath))
|
||||
.digest("hex")
|
||||
.slice(0, 16);
|
||||
}
|
||||
|
||||
export function systemdUnitName(repoPath) {
|
||||
return `sf-headless-${stableSwarmId(repoPath)}.service`;
|
||||
}
|
||||
|
||||
export function loadRegistry(registryPath = DEFAULT_REGISTRY_PATH) {
|
||||
if (!existsSync(registryPath)) return { swarms: [] };
|
||||
const parsed = JSON.parse(readFileSync(registryPath, "utf8"));
|
||||
if (Array.isArray(parsed)) return { swarms: parsed };
|
||||
if (parsed && typeof parsed === "object" && Array.isArray(parsed.swarms)) {
|
||||
return { swarms: parsed.swarms };
|
||||
}
|
||||
throw new Error(`Invalid swarms registry shape: ${registryPath}`);
|
||||
}
|
||||
|
||||
export function saveRegistry(registry, registryPath = DEFAULT_REGISTRY_PATH) {
|
||||
mkdirSync(dirname(registryPath), { recursive: true });
|
||||
writeFileSync(registryPath, `${JSON.stringify(registry, null, 2)}\n`);
|
||||
}
|
||||
|
||||
export function buildSystemdUnit({ repoPath, sfBin = "sf" }) {
|
||||
const unitName = systemdUnitName(repoPath);
|
||||
return {
|
||||
unitName,
|
||||
content: `[Unit]
|
||||
Description=SF autonomous worker for ${repoPath}
|
||||
After=default.target
|
||||
StartLimitIntervalSec=300
|
||||
StartLimitBurst=5
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
WorkingDirectory=${repoPath}
|
||||
ExecStart=${sfBin} autonomous
|
||||
Restart=always
|
||||
RestartSec=30s
|
||||
KillSignal=SIGTERM
|
||||
TimeoutStopSec=30s
|
||||
|
||||
[Install]
|
||||
WantedBy=default.target
|
||||
`,
|
||||
};
|
||||
}
|
||||
|
||||
export function upsertRegistryEntry(
|
||||
repoPath,
|
||||
{ registryPath = DEFAULT_REGISTRY_PATH, supervisor = null } = {},
|
||||
) {
|
||||
const resolvedPath = realpathSync(resolve(repoPath));
|
||||
const registry = loadRegistry(registryPath);
|
||||
const id = stableSwarmId(resolvedPath);
|
||||
const nextEntry = {
|
||||
id,
|
||||
name: basename(resolvedPath),
|
||||
path: resolvedPath,
|
||||
...(supervisor ? { supervisor } : {}),
|
||||
};
|
||||
const existingIndex = registry.swarms.findIndex(
|
||||
(entry) => resolve(String(entry?.path ?? "")) === resolvedPath,
|
||||
);
|
||||
if (existingIndex >= 0) registry.swarms[existingIndex] = nextEntry;
|
||||
else registry.swarms.push(nextEntry);
|
||||
registry.swarms.sort((a, b) =>
|
||||
String(a.name ?? a.path).localeCompare(String(b.name ?? b.path)),
|
||||
);
|
||||
saveRegistry(registry, registryPath);
|
||||
return nextEntry;
|
||||
}
|
||||
|
||||
export function removeRegistryEntry(
|
||||
repoPath,
|
||||
registryPath = DEFAULT_REGISTRY_PATH,
|
||||
) {
|
||||
const resolvedPath = realpathSync(resolve(repoPath));
|
||||
const registry = loadRegistry(registryPath);
|
||||
const before = registry.swarms.length;
|
||||
registry.swarms = registry.swarms.filter(
|
||||
(entry) => resolve(String(entry?.path ?? "")) !== resolvedPath,
|
||||
);
|
||||
saveRegistry(registry, registryPath);
|
||||
return before !== registry.swarms.length;
|
||||
}
|
||||
|
||||
export function writeSystemdUnit({
|
||||
repoPath,
|
||||
sfBin = "sf",
|
||||
systemdUserDir = SYSTEMD_USER_DIR,
|
||||
}) {
|
||||
const resolvedPath = realpathSync(resolve(repoPath));
|
||||
const unit = buildSystemdUnit({ repoPath: resolvedPath, sfBin });
|
||||
mkdirSync(systemdUserDir, { recursive: true });
|
||||
const unitPath = join(systemdUserDir, unit.unitName);
|
||||
writeFileSync(unitPath, unit.content);
|
||||
return { ...unit, unitPath };
|
||||
}
|
||||
|
||||
function runSystemctl(args, { dryRun = false } = {}) {
|
||||
if (dryRun) return;
|
||||
const result = spawnSync("systemctl", ["--user", ...args], {
|
||||
stdio: "inherit",
|
||||
});
|
||||
if (result.status !== 0) {
|
||||
throw new Error(`systemctl --user ${args.join(" ")} failed`);
|
||||
}
|
||||
}
|
||||
|
||||
function printUsage() {
|
||||
process.stderr.write(`Usage:
|
||||
node scripts/sf-swarm-enroll.mjs enroll [repo] [--no-start] [--dry-run] [--sf-bin <path>]
|
||||
node scripts/sf-swarm-enroll.mjs unenroll [repo] [--dry-run]
|
||||
node scripts/sf-swarm-enroll.mjs list
|
||||
node scripts/sf-swarm-enroll.mjs unit-name [repo]
|
||||
`);
|
||||
}
|
||||
|
||||
export function main(argv = process.argv.slice(2)) {
|
||||
const [command = "list", maybeRepo = "."] = argv;
|
||||
const dryRun = argv.includes("--dry-run");
|
||||
const noStart = argv.includes("--no-start");
|
||||
const sfBinFlagIndex = argv.indexOf("--sf-bin");
|
||||
const sfBin =
|
||||
sfBinFlagIndex >= 0
|
||||
? argv[sfBinFlagIndex + 1]
|
||||
: (process.env.SF_BIN ?? "sf");
|
||||
|
||||
if (command === "list") {
|
||||
process.stdout.write(`${JSON.stringify(loadRegistry(), null, 2)}\n`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (command === "unit-name") {
|
||||
process.stdout.write(
|
||||
`${systemdUnitName(realpathSync(resolve(maybeRepo)))}\n`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (command === "enroll") {
|
||||
const unit = writeSystemdUnit({ repoPath: maybeRepo, sfBin });
|
||||
const entry = upsertRegistryEntry(maybeRepo, {
|
||||
supervisor: {
|
||||
kind: "systemd",
|
||||
unitName: unit.unitName,
|
||||
unitPath: unit.unitPath,
|
||||
},
|
||||
});
|
||||
if (!dryRun) runSystemctl(["daemon-reload"]);
|
||||
if (!noStart) runSystemctl(["enable", "--now", unit.unitName], { dryRun });
|
||||
process.stdout.write(
|
||||
`enrolled ${entry.path}\nunit ${unit.unitName}\nregistry ${DEFAULT_REGISTRY_PATH}\n`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (command === "unenroll") {
|
||||
const unitName = systemdUnitName(realpathSync(resolve(maybeRepo)));
|
||||
runSystemctl(["disable", "--now", unitName], { dryRun });
|
||||
removeRegistryEntry(maybeRepo);
|
||||
process.stdout.write(`unenrolled ${realpathSync(resolve(maybeRepo))}\n`);
|
||||
return;
|
||||
}
|
||||
|
||||
printUsage();
|
||||
process.exitCode = 2;
|
||||
}
|
||||
|
||||
if (import.meta.url === `file://${process.argv[1]}`) {
|
||||
main();
|
||||
}
|
||||
70
scripts/sf-swarm-enroll.test.mjs
Normal file
70
scripts/sf-swarm-enroll.test.mjs
Normal file
|
|
@ -0,0 +1,70 @@
|
|||
import assert from "node:assert/strict";
|
||||
import { mkdtempSync, mkdirSync, readFileSync } from "node:fs";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import test from "node:test";
|
||||
import {
|
||||
buildSystemdUnit,
|
||||
loadRegistry,
|
||||
removeRegistryEntry,
|
||||
stableSwarmId,
|
||||
systemdUnitName,
|
||||
upsertRegistryEntry,
|
||||
writeSystemdUnit,
|
||||
} from "./sf-swarm-enroll.mjs";
|
||||
|
||||
function fixture() {
|
||||
const root = mkdtempSync(join(tmpdir(), "sf-swarm-enroll-"));
|
||||
const repo = join(root, "repo");
|
||||
mkdirSync(repo);
|
||||
return { root, repo, registryPath: join(root, "swarms.json") };
|
||||
}
|
||||
|
||||
test("stable_unit_name_is_derived_from_repo_path", () => {
|
||||
const id = stableSwarmId("/tmp/repo");
|
||||
assert.match(id, /^[a-f0-9]{16}$/);
|
||||
assert.equal(systemdUnitName("/tmp/repo"), `sf-headless-${id}.service`);
|
||||
});
|
||||
|
||||
test("enroll_when_registry_missing_writes_sorted_registry_entry", () => {
|
||||
const { repo, registryPath } = fixture();
|
||||
const entry = upsertRegistryEntry(repo, {
|
||||
registryPath,
|
||||
supervisor: { kind: "systemd", unitName: systemdUnitName(repo) },
|
||||
});
|
||||
const registry = loadRegistry(registryPath);
|
||||
assert.equal(registry.swarms.length, 1);
|
||||
assert.equal(registry.swarms[0].path, repo);
|
||||
assert.equal(registry.swarms[0].id, entry.id);
|
||||
assert.equal(registry.swarms[0].supervisor.kind, "systemd");
|
||||
});
|
||||
|
||||
test("unenroll_when_entry_exists_removes_registry_entry", () => {
|
||||
const { repo, registryPath } = fixture();
|
||||
upsertRegistryEntry(repo, { registryPath });
|
||||
assert.equal(removeRegistryEntry(repo, registryPath), true);
|
||||
assert.equal(loadRegistry(registryPath).swarms.length, 0);
|
||||
});
|
||||
|
||||
test("systemd_unit_runs_autonomous_from_repo", () => {
|
||||
const unit = buildSystemdUnit({
|
||||
repoPath: "/tmp/repo",
|
||||
sfBin: "/usr/bin/sf",
|
||||
});
|
||||
assert.match(unit.content, /WorkingDirectory=\/tmp\/repo/);
|
||||
assert.match(unit.content, /ExecStart=\/usr\/bin\/sf autonomous/);
|
||||
assert.match(unit.content, /Restart=always/);
|
||||
assert.match(unit.content, /StartLimitBurst=5/);
|
||||
});
|
||||
|
||||
test("writeSystemdUnit_writes_user_unit_file", () => {
|
||||
const { root, repo } = fixture();
|
||||
const result = writeSystemdUnit({
|
||||
repoPath: repo,
|
||||
sfBin: "sf",
|
||||
systemdUserDir: root,
|
||||
});
|
||||
const content = readFileSync(result.unitPath, "utf8");
|
||||
assert.equal(result.unitPath, join(root, result.unitName));
|
||||
assert.match(content, /ExecStart=sf autonomous/);
|
||||
});
|
||||
|
|
@ -7,7 +7,7 @@
|
|||
*/
|
||||
|
||||
import { readFileSync } from "node:fs";
|
||||
import { serializeJsonLine } from "@singularity-forge/coding-agent";
|
||||
import { serializeJsonLine } from "@singularity-forge/rpc-client";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Types
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import { createJiti } from "@mariozechner/jiti";
|
|||
import { resolveBundledSourceResource } from "./bundled-resource-path.js";
|
||||
import { getSfEnv } from "./env.js";
|
||||
import type { SFState } from "./resources/extensions/sf/types.js";
|
||||
import { writeStatusProjection } from "./status-projection.js";
|
||||
|
||||
const jiti = createJiti(import.meta.filename, {
|
||||
interopDefault: true,
|
||||
|
|
@ -498,6 +499,7 @@ export async function buildQuerySnapshot(
|
|||
...(memoryExtraction ? { memoryExtraction } : {}),
|
||||
schedule: scheduleEntries,
|
||||
};
|
||||
writeStatusProjection(basePath, snapshot);
|
||||
|
||||
return snapshot;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import type { Readable } from "node:stream";
|
|||
import {
|
||||
attachJsonlLineReader,
|
||||
type RpcClient,
|
||||
} from "@singularity-forge/coding-agent";
|
||||
} from "@singularity-forge/rpc-client";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Types
|
||||
|
|
|
|||
|
|
@ -21,7 +21,8 @@ import {
|
|||
} from "node:fs";
|
||||
import { join, resolve } from "node:path";
|
||||
import type { SessionInfo } from "@singularity-forge/coding-agent";
|
||||
import { RpcClient, SessionManager } from "@singularity-forge/coding-agent";
|
||||
import { SessionManager } from "@singularity-forge/coding-agent";
|
||||
import { RpcClient } from "@singularity-forge/rpc-client";
|
||||
import { error, formatStructuredError } from "./errors.js";
|
||||
import {
|
||||
AnswerInjector,
|
||||
|
|
@ -664,10 +665,7 @@ async function runHeadlessOnce(
|
|||
const last = Date.parse(
|
||||
readFileSync(triageMarkerPath, "utf8").trim(),
|
||||
);
|
||||
if (
|
||||
Number.isFinite(last) &&
|
||||
Date.now() - last < triageIntervalMs
|
||||
) {
|
||||
if (Number.isFinite(last) && Date.now() - last < triageIntervalMs) {
|
||||
shouldRunTriage = false;
|
||||
if (!options.json) {
|
||||
process.stderr.write(
|
||||
|
|
|
|||
|
|
@ -1,265 +0,0 @@
|
|||
/**
|
||||
* a2a-auth.test.mjs — A2A JSON-RPC bearer-token guard tests.
|
||||
*
|
||||
* Purpose: prove SF A2A agent control endpoints reject unauthenticated
|
||||
* localhost requests while accepting the parent process bearer token.
|
||||
*
|
||||
* Consumer: M055/S02 A2A hardening regression suite.
|
||||
*/
|
||||
|
||||
import assert from "node:assert/strict";
|
||||
import { EventEmitter } from "node:events";
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { mkdtempSync, rmSync } from "node:fs";
|
||||
import { IncomingMessage, ServerResponse } from "node:http";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { Readable, Writable } from "node:stream";
|
||||
import { afterEach, test } from "vitest";
|
||||
import { AGENT_CARD_PATH } from "@a2a-js/sdk";
|
||||
import { closeDatabase } from "../sf-db.js";
|
||||
import { createA2AAgentApp } from "../uok/a2a-agent-server.js";
|
||||
import { A2ATransport } from "../uok/a2a-transport.js";
|
||||
|
||||
const tmpRoots = [];
|
||||
const transports = [];
|
||||
const originalFetch = globalThis.fetch;
|
||||
|
||||
afterEach(() => {
|
||||
globalThis.fetch = originalFetch;
|
||||
closeDatabase();
|
||||
for (const transport of transports.splice(0)) {
|
||||
transport.shutdown();
|
||||
}
|
||||
for (const root of tmpRoots.splice(0)) {
|
||||
rmSync(root, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
function makeProject() {
|
||||
const root = mkdtempSync(join(tmpdir(), "sf-a2a-auth-"));
|
||||
tmpRoots.push(root);
|
||||
return root;
|
||||
}
|
||||
|
||||
function makeEnvelope() {
|
||||
return {
|
||||
unitId: `unit-${randomUUID()}`,
|
||||
unitType: "task",
|
||||
workMode: "build",
|
||||
payload: "auth regression",
|
||||
scope: "a2a-auth",
|
||||
};
|
||||
}
|
||||
|
||||
function makeJsonRpcBody(envelope = makeEnvelope()) {
|
||||
return {
|
||||
jsonrpc: "2.0",
|
||||
id: 1,
|
||||
method: "message/send",
|
||||
params: {
|
||||
message: {
|
||||
messageId: randomUUID(),
|
||||
role: "user",
|
||||
kind: "message",
|
||||
parts: [
|
||||
{
|
||||
kind: "data",
|
||||
data: envelope,
|
||||
metadata: { contentType: "application/json" },
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function createTestApp({ token = "test-token", port = 45678 } = {}) {
|
||||
const root = makeProject();
|
||||
return {
|
||||
port,
|
||||
token,
|
||||
app: createA2AAgentApp({
|
||||
agentName: "worker-auth",
|
||||
agentRole: "worker",
|
||||
port,
|
||||
basePath: root,
|
||||
bearerToken: token,
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
class ResponseSocket extends Writable {
|
||||
constructor() {
|
||||
super();
|
||||
this.chunks = [];
|
||||
}
|
||||
|
||||
_write(chunk, _encoding, callback) {
|
||||
this.chunks.push(Buffer.from(chunk));
|
||||
callback();
|
||||
}
|
||||
}
|
||||
|
||||
async function appRequest(app, { method = "POST", path, headers = {}, body }) {
|
||||
const payload = body ? Buffer.from(JSON.stringify(body)) : Buffer.alloc(0);
|
||||
const socket = new ResponseSocket();
|
||||
const req = new IncomingMessage(new EventEmitter());
|
||||
req.method = method;
|
||||
req.url = path;
|
||||
req.headers = {
|
||||
...Object.fromEntries(
|
||||
Object.entries(headers).map(([key, value]) => [key.toLowerCase(), value]),
|
||||
),
|
||||
"content-length": String(payload.length),
|
||||
};
|
||||
|
||||
const res = new ServerResponse(req);
|
||||
res.assignSocket(socket);
|
||||
|
||||
await new Promise((resolve, reject) => {
|
||||
res.on("finish", resolve);
|
||||
res.on("error", reject);
|
||||
app.handle(req, res);
|
||||
queueMicrotask(() => {
|
||||
req.push(payload);
|
||||
req.push(null);
|
||||
});
|
||||
});
|
||||
|
||||
const raw = Buffer.concat(socket.chunks).toString("utf8");
|
||||
const bodyText = raw.includes("\r\n\r\n") ? raw.split("\r\n\r\n").at(-1) : raw;
|
||||
return {
|
||||
status: res.statusCode,
|
||||
bodyText,
|
||||
body: bodyText ? JSON.parse(bodyText) : null,
|
||||
};
|
||||
}
|
||||
|
||||
async function postJsonRpc(app, port, { token, host, origin } = {}) {
|
||||
const headers = {
|
||||
"Content-Type": "application/json",
|
||||
Host: host ?? `127.0.0.1:${port}`,
|
||||
};
|
||||
if (token) headers.Authorization = `Bearer ${token}`;
|
||||
if (origin) headers.Origin = origin;
|
||||
return appRequest(app, {
|
||||
path: "/a2a/jsonrpc",
|
||||
headers,
|
||||
body: makeJsonRpcBody(),
|
||||
});
|
||||
}
|
||||
|
||||
function installAppFetch(app, port) {
|
||||
globalThis.fetch = async (url, init = {}) => {
|
||||
const parsed = new URL(String(url));
|
||||
const headers = Object.fromEntries(new Headers(init.headers ?? {}).entries());
|
||||
const request = await appRequest(app, {
|
||||
method: init.method ?? "GET",
|
||||
path: parsed.pathname,
|
||||
headers: {
|
||||
...headers,
|
||||
Host: headers.host ?? `localhost:${port}`,
|
||||
},
|
||||
body: init.body ? JSON.parse(init.body) : undefined,
|
||||
});
|
||||
return new Response(request.bodyText, {
|
||||
status: request.status,
|
||||
headers: { "Content-Type": "application/json" },
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
function createFakeSpawn(captured) {
|
||||
return (_command, _args, options) => {
|
||||
captured.env = options.env;
|
||||
const child = new EventEmitter();
|
||||
child.pid = 12345;
|
||||
child.stdout = new EventEmitter();
|
||||
child.stderr = new EventEmitter();
|
||||
child.kill = () => child.emit("exit", 0);
|
||||
queueMicrotask(() => {
|
||||
child.stdout.emit(
|
||||
"data",
|
||||
`${JSON.stringify({ ready: true, port: options.env.SF_A2A_PORT })}\n`,
|
||||
);
|
||||
});
|
||||
return child;
|
||||
};
|
||||
}
|
||||
|
||||
// The 200-path JSON-RPC test requires a real HTTP server because the SDK's
|
||||
// internal express.json() body parser doesn't reliably trigger on the synthetic
|
||||
// IncomingMessage stub used in this harness — the body push happens via
|
||||
// queueMicrotask after app.handle(), and the stream events on a manually-
|
||||
// constructed IncomingMessage(EventEmitter) don't always wake express.json()'s
|
||||
// listeners. The bearer-token guard ITSELF is exercised by the 3 negative-path
|
||||
// tests below (missing/wrong-bearer/bad-host return 401/401/403). Real JSON-RPC
|
||||
// integration is deferred to a no-sandbox integration suite; the codex rescue
|
||||
// attempted it under sandbox and could not spawn the subprocess.
|
||||
test.skip("a2aJsonRpc_when_valid_bearer_accepts_request — integration: needs real HTTP server", async () => {
|
||||
const { app, port, token } = createTestApp();
|
||||
|
||||
const response = await postJsonRpc(app, port, { token });
|
||||
|
||||
assert.equal(response.status, 200);
|
||||
assert.equal(response.body.error, undefined);
|
||||
assert.match(response.bodyText, /accepted/);
|
||||
});
|
||||
|
||||
test("a2aJsonRpc_when_authorization_missing_returns_401", async () => {
|
||||
const { app, port } = createTestApp();
|
||||
|
||||
const response = await postJsonRpc(app, port);
|
||||
|
||||
assert.equal(response.status, 401);
|
||||
});
|
||||
|
||||
test("a2aJsonRpc_when_bearer_wrong_returns_401", async () => {
|
||||
const { app, port } = createTestApp();
|
||||
|
||||
const response = await postJsonRpc(app, port, { token: "wrong-token" });
|
||||
|
||||
assert.equal(response.status, 401);
|
||||
});
|
||||
|
||||
test("a2aJsonRpc_when_host_unexpected_returns_403", async () => {
|
||||
const { app, port, token } = createTestApp();
|
||||
|
||||
const response = await postJsonRpc(app, port, {
|
||||
token,
|
||||
host: "evil.localhost",
|
||||
});
|
||||
|
||||
assert.equal(response.status, 403);
|
||||
});
|
||||
|
||||
// Same body-parsing limitation as the 200-path test above — defer to no-sandbox
|
||||
// integration suite. The transport's env-var-threading (SF_A2A_BEARER_TOKEN passed
|
||||
// to spawned agent) is still verified by createFakeSpawn capturing options.env.
|
||||
test.skip("a2aTransport_spawned_agent_receives_bearer_and_dispatches — integration: needs real HTTP server", async () => {
|
||||
const captured = {};
|
||||
const transport = new A2ATransport({ spawn: createFakeSpawn(captured) });
|
||||
transports.push(transport);
|
||||
|
||||
const root = makeProject();
|
||||
const agentUrl = await transport.spawnAgent("worker-auth", "worker", root);
|
||||
const port = Number(captured.env.SF_A2A_PORT);
|
||||
const app = createA2AAgentApp({
|
||||
agentName: "worker-auth",
|
||||
agentRole: "worker",
|
||||
port,
|
||||
basePath: root,
|
||||
bearerToken: captured.env.SF_A2A_BEARER_TOKEN,
|
||||
});
|
||||
installAppFetch(app, port);
|
||||
|
||||
const response = await transport.dispatch(agentUrl, makeEnvelope());
|
||||
|
||||
assert.equal(agentUrl, `http://localhost:${port}/a2a/jsonrpc`);
|
||||
assert.ok(captured.env.SF_A2A_BEARER_TOKEN);
|
||||
assert.match(response.contextId ?? JSON.stringify(response), /accepted|/);
|
||||
assert.match(JSON.stringify(response), /accepted/);
|
||||
assert.equal(new URL(agentUrl).pathname, "/a2a/jsonrpc");
|
||||
assert.equal(AGENT_CARD_PATH, ".well-known/agent-card.json");
|
||||
});
|
||||
|
|
@ -362,54 +362,6 @@ describe("SwarmDispatchLayer.dispatchAndWait — multi-dispatch to same agent",
|
|||
});
|
||||
});
|
||||
|
||||
// ─── A2A path — falls through cleanly ────────────────────────────────────────
|
||||
|
||||
describe("SwarmDispatchLayer.dispatchAndWait — SF_A2A_ENABLED path", () => {
|
||||
test("returns null reply and replyMessageId when A2A is enabled", async () => {
|
||||
// We don't actually start an A2A server; we just verify that dispatchAndWait
|
||||
// detects SF_A2A_ENABLED and short-circuits with null reply fields without
|
||||
// calling runAgentTurn. Use a minimal mock for _a2aDispatch.
|
||||
const root = makeProject();
|
||||
const layer = new SwarmDispatchLayer(root);
|
||||
|
||||
// Stub _a2aDispatch directly so we don't need a real A2A process
|
||||
const fakeA2aResult = {
|
||||
messageId: "a2a-msg-001",
|
||||
targetAgent: "worker-1",
|
||||
swarmName: "default",
|
||||
envelope: {},
|
||||
transport: "a2a",
|
||||
};
|
||||
layer._a2aDispatch = vi.fn(async () => fakeA2aResult);
|
||||
|
||||
const orig = process.env.SF_A2A_ENABLED;
|
||||
try {
|
||||
process.env.SF_A2A_ENABLED = "1";
|
||||
const result = await layer.dispatchAndWait({
|
||||
unitId: "task-a2a",
|
||||
unitType: "task",
|
||||
workMode: "build",
|
||||
payload: "a2a payload",
|
||||
priority: 1,
|
||||
scope: "scope-a2a",
|
||||
});
|
||||
|
||||
expect(result.reply).toBeNull();
|
||||
expect(result.replyMessageId).toBeNull();
|
||||
expect(result.messageId).toBe("a2a-msg-001");
|
||||
// runAgentTurn must NOT have been called
|
||||
const { runAgentTurn } = await import("../uok/agent-runner.js");
|
||||
expect(runAgentTurn).not.toHaveBeenCalled();
|
||||
} finally {
|
||||
if (orig === undefined) {
|
||||
delete process.env.SF_A2A_ENABLED;
|
||||
} else {
|
||||
process.env.SF_A2A_ENABLED = orig;
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// ─── Round 7: executorSystemPrompt / executorTools forwarded from envelope ────
|
||||
|
||||
describe("SwarmDispatchLayer.dispatchAndWait — Round 7: executor config forwarding", () => {
|
||||
|
|
|
|||
|
|
@ -1,316 +0,0 @@
|
|||
/**
|
||||
* a2a-agent-server.js — A2A HTTP server entrypoint for spawned swarm agent processes.
|
||||
*
|
||||
* Purpose: when launched as a subprocess with the SF_A2A_AGENT_* environment
|
||||
* variables set, start an A2A JSON-RPC HTTP server for the named swarm agent
|
||||
* role, handle incoming DispatchEnvelope tasks by routing them through the
|
||||
* UOK coordination store, and signal readiness by printing a JSON line to
|
||||
* stdout so the parent A2ATransport process can record the endpoint URL.
|
||||
*
|
||||
* Launch environment:
|
||||
* SF_A2A_AGENT_NAME — stable agent name (e.g. "worker-1")
|
||||
* SF_A2A_AGENT_ROLE — agent role (coordinator|worker|scout|reviewer|…)
|
||||
* SF_A2A_PORT — HTTP port to listen on
|
||||
* SF_A2A_BASE_PATH — project root for SQLite state
|
||||
* SF_A2A_BEARER_TOKEN — parent-process bearer token for JSON-RPC requests
|
||||
*
|
||||
* Consumer: A2ATransport.spawnAgent() in a2a-transport.js.
|
||||
*/
|
||||
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { pathToFileURL } from "node:url";
|
||||
import { AGENT_CARD_PATH } from "@a2a-js/sdk";
|
||||
import { DefaultRequestHandler, InMemoryTaskStore } from "@a2a-js/sdk/server";
|
||||
import {
|
||||
agentCardHandler,
|
||||
jsonRpcHandler,
|
||||
UserBuilder,
|
||||
} from "@a2a-js/sdk/server/express";
|
||||
import express from "express";
|
||||
import { getErrorMessage } from "../error-utils.js";
|
||||
import { buildAgentCard } from "./a2a-transport.js";
|
||||
|
||||
let activeServer = null;
|
||||
|
||||
function isAllowedHost(hostHeader, port) {
|
||||
const host = String(hostHeader ?? "").toLowerCase();
|
||||
return host === `localhost:${port}` || host === `127.0.0.1:${port}`;
|
||||
}
|
||||
|
||||
function isAllowedOrigin(originHeader, port) {
|
||||
if (!originHeader) return true;
|
||||
try {
|
||||
const origin = new URL(originHeader);
|
||||
return (
|
||||
origin.protocol === "http:" &&
|
||||
(origin.host === `localhost:${port}` ||
|
||||
origin.host === `127.0.0.1:${port}`)
|
||||
);
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
function createA2ARequestGuard({ bearerToken, port }) {
|
||||
return function a2aRequestGuard(req, res, next) {
|
||||
if (!isAllowedHost(req.headers.host, port)) {
|
||||
return res.status(403).json({ error: "Forbidden" });
|
||||
}
|
||||
if (!isAllowedOrigin(req.headers.origin, port)) {
|
||||
return res.status(403).json({ error: "Forbidden" });
|
||||
}
|
||||
if (req.headers.authorization !== `Bearer ${bearerToken}`) {
|
||||
return res.status(401).json({ error: "Unauthorized" });
|
||||
}
|
||||
return next();
|
||||
};
|
||||
}
|
||||
|
||||
function readConfigFromEnv() {
|
||||
const agentName = process.env.SF_A2A_AGENT_NAME;
|
||||
const agentRole = process.env.SF_A2A_AGENT_ROLE ?? "worker";
|
||||
const port = Number(process.env.SF_A2A_PORT ?? 34501);
|
||||
const basePath = process.env.SF_A2A_BASE_PATH ?? process.cwd();
|
||||
const bearerToken = process.env.SF_A2A_BEARER_TOKEN;
|
||||
|
||||
if (!agentName) {
|
||||
throw new Error("SF_A2A_AGENT_NAME is required");
|
||||
}
|
||||
if (!bearerToken) {
|
||||
throw new Error("SF_A2A_BEARER_TOKEN is required");
|
||||
}
|
||||
|
||||
return { agentName, agentRole, port, basePath, bearerToken };
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the Express app for one spawned A2A swarm agent.
|
||||
*
|
||||
* Purpose: centralize A2A route construction so the executable child process
|
||||
* and auth regression tests exercise the same JSON-RPC middleware stack.
|
||||
*
|
||||
* Consumer: main() at subprocess startup and a2a-auth.test.mjs.
|
||||
*
|
||||
* @param {{agentName: string, agentRole: string, port: number, basePath: string, bearerToken: string}} config
|
||||
* @returns {import('express').Express}
|
||||
*/
|
||||
export function createA2AAgentApp(config) {
|
||||
const { agentName, agentRole, port, basePath, bearerToken } = config;
|
||||
if (!bearerToken) {
|
||||
throw new Error("SF_A2A_BEARER_TOKEN is required");
|
||||
}
|
||||
const agentCard = buildAgentCard(agentName, agentRole, port);
|
||||
|
||||
const executor = new SwarmAgentExecutor(agentName, agentRole, basePath);
|
||||
const requestHandler = new DefaultRequestHandler(
|
||||
agentCard,
|
||||
new InMemoryTaskStore(),
|
||||
executor,
|
||||
);
|
||||
|
||||
const app = express();
|
||||
|
||||
app.use(
|
||||
`/${AGENT_CARD_PATH}`,
|
||||
agentCardHandler({ agentCardProvider: requestHandler }),
|
||||
);
|
||||
app.use(
|
||||
"/a2a/jsonrpc",
|
||||
createA2ARequestGuard({ bearerToken, port }),
|
||||
jsonRpcHandler({
|
||||
requestHandler,
|
||||
userBuilder: UserBuilder.noAuthentication,
|
||||
}),
|
||||
);
|
||||
|
||||
// Health check endpoint.
|
||||
app.get("/health", (_req, res) => {
|
||||
res.json({ ok: true, agentName, role: agentRole, port });
|
||||
});
|
||||
|
||||
return app;
|
||||
}
|
||||
|
||||
/**
|
||||
* Minimal AgentExecutor that handles DispatchEnvelope messages.
|
||||
*
|
||||
* Purpose: receive incoming A2A task messages containing a DispatchEnvelope
|
||||
* payload, process them via the UOK coordination store, and publish a
|
||||
* structured result back to the caller so the parent SwarmDispatchLayer can
|
||||
* record the outcome in SQLite.
|
||||
*
|
||||
* Consumer: DefaultRequestHandler for each inbound A2A task.
|
||||
*/
|
||||
class SwarmAgentExecutor {
|
||||
/**
|
||||
* @param {string} name - agent name
|
||||
* @param {string} role - agent role
|
||||
* @param {string} basePath - project root
|
||||
*/
|
||||
constructor(name, role, basePath) {
|
||||
this._name = name;
|
||||
this._role = role;
|
||||
this._basePath = basePath;
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute an incoming A2A task.
|
||||
*
|
||||
* Purpose: extract the DispatchEnvelope from the message parts, record it
|
||||
* in the agent's coordination store, and publish a result message so the
|
||||
* caller receives structured acknowledgement.
|
||||
*
|
||||
* Consumer: DefaultRequestHandler on every inbound sendMessage call.
|
||||
*
|
||||
* @param {import('@a2a-js/sdk/server').RequestContext} requestContext
|
||||
* @param {import('@a2a-js/sdk/server').ExecutionEventBus} eventBus
|
||||
*/
|
||||
async execute(requestContext, eventBus) {
|
||||
const incomingMessage = requestContext.message;
|
||||
let envelope = null;
|
||||
|
||||
// Extract envelope from message parts.
|
||||
for (const part of incomingMessage?.parts ?? []) {
|
||||
if (part.kind === "data" && part.data) {
|
||||
envelope = part.data;
|
||||
break;
|
||||
}
|
||||
if (part.kind === "text") {
|
||||
try {
|
||||
envelope = JSON.parse(part.text);
|
||||
} catch {
|
||||
// not JSON text
|
||||
}
|
||||
if (envelope) break;
|
||||
}
|
||||
}
|
||||
|
||||
let resultBody;
|
||||
if (!envelope) {
|
||||
resultBody = {
|
||||
error: "No DispatchEnvelope found in message parts",
|
||||
agentName: this._name,
|
||||
role: this._role,
|
||||
};
|
||||
} else {
|
||||
try {
|
||||
resultBody = await this._handleEnvelope(envelope);
|
||||
} catch (err) {
|
||||
resultBody = {
|
||||
error: getErrorMessage(err),
|
||||
agentName: this._name,
|
||||
role: this._role,
|
||||
unitId: envelope?.unitId,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
const responseMessage = {
|
||||
kind: "message",
|
||||
messageId: randomUUID(),
|
||||
role: "agent",
|
||||
parts: [
|
||||
{
|
||||
kind: "data",
|
||||
data: resultBody,
|
||||
metadata: { contentType: "application/json" },
|
||||
},
|
||||
],
|
||||
contextId: requestContext.contextId,
|
||||
};
|
||||
|
||||
eventBus.publish(responseMessage);
|
||||
eventBus.finished();
|
||||
}
|
||||
|
||||
/**
|
||||
* Record the envelope in the agent's coordination store and return an ack.
|
||||
*
|
||||
* Purpose: persist the dispatched unit so the SQLite coordination layer
|
||||
* retains durability even when transport is A2A HTTP. The agent's inbox is
|
||||
* updated so the standard UOK polling loops can pick up the task.
|
||||
*
|
||||
* Consumer: execute() after extracting the envelope from message parts.
|
||||
*
|
||||
* @param {object} envelope - DispatchEnvelope from the swarm dispatch layer
|
||||
* @returns {Promise<object>} acknowledgement payload
|
||||
*/
|
||||
async _handleEnvelope(envelope) {
|
||||
// Lazy-import to avoid loading SQLite unless this server is actually running.
|
||||
const { MessageBus } = await import("./message-bus.js");
|
||||
|
||||
const bus = new MessageBus(this._basePath);
|
||||
const from = `a2a:dispatch:${envelope.scope ?? "unknown"}:${envelope.unitId}`;
|
||||
const to = `agent:${this._name}`;
|
||||
const metadata = {
|
||||
unitId: envelope.unitId,
|
||||
unitType: envelope.unitType,
|
||||
workMode: envelope.workMode,
|
||||
transport: "a2a",
|
||||
};
|
||||
|
||||
const messageId = bus.send(
|
||||
from,
|
||||
to,
|
||||
envelope.payload ?? envelope,
|
||||
metadata,
|
||||
);
|
||||
|
||||
return {
|
||||
status: "accepted",
|
||||
messageId,
|
||||
agentName: this._name,
|
||||
role: this._role,
|
||||
unitId: envelope.unitId,
|
||||
unitType: envelope.unitType,
|
||||
workMode: envelope.workMode,
|
||||
};
|
||||
}
|
||||
|
||||
async cancelTask(_taskId, _requestContext) {
|
||||
// Stateless for now; cancellation is handled via SQLite coordination store.
|
||||
}
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const { agentName, agentRole, port, basePath, bearerToken } =
|
||||
readConfigFromEnv();
|
||||
const app = createA2AAgentApp({
|
||||
agentName,
|
||||
agentRole,
|
||||
port,
|
||||
basePath,
|
||||
bearerToken,
|
||||
});
|
||||
|
||||
await new Promise((resolve, reject) => {
|
||||
const server = app.listen(port, "127.0.0.1", () => resolve(server));
|
||||
activeServer = server;
|
||||
server.once("error", reject);
|
||||
});
|
||||
|
||||
// Signal readiness to the parent process.
|
||||
process.stdout.write(
|
||||
JSON.stringify({ ready: true, port, agentName, role: agentRole }) + "\n",
|
||||
);
|
||||
|
||||
await new Promise((resolve) => {
|
||||
const keepAlive = setInterval(() => {}, 60_000);
|
||||
process.once("SIGTERM", () => {
|
||||
clearInterval(keepAlive);
|
||||
if (activeServer) {
|
||||
activeServer.close(() => resolve());
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
if (process.argv[1] && import.meta.url === pathToFileURL(process.argv[1]).href) {
|
||||
main().catch((err) => {
|
||||
process.stderr.write(`a2a-agent-server: fatal: ${err.message}\n`);
|
||||
process.exit(1);
|
||||
});
|
||||
}
|
||||
|
|
@ -1,309 +0,0 @@
|
|||
/**
|
||||
* a2a-transport.js — A2A protocol transport for swarm dispatch.
|
||||
*
|
||||
* Purpose: when SF_A2A_ENABLED=1, SwarmDispatchLayer uses this module instead
|
||||
* of the in-process SQLite MessageBus to dispatch DispatchEnvelopes to separate
|
||||
* agent processes over the Agent2Agent HTTP protocol. SQLite coordination state
|
||||
* is retained for durability; A2A is strictly the wire transport.
|
||||
*
|
||||
* Consumer: SwarmDispatchLayer._a2aDispatch() when SF_A2A_ENABLED is set.
|
||||
*/
|
||||
|
||||
import { spawn } from "node:child_process";
|
||||
import { randomBytes, randomInt, randomUUID } from "node:crypto";
|
||||
|
||||
const A2A_AGENT_SERVER_PATH = new URL("./a2a-agent-server.js", import.meta.url)
|
||||
.pathname;
|
||||
|
||||
const AGENT_READY_TIMEOUT_MS = 15_000;
|
||||
const PARENT_A2A_BEARER_TOKEN = randomBytes(32).toString("base64url");
|
||||
|
||||
function nextPort() {
|
||||
return randomInt(40_000, 60_000);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build an A2A AgentCard descriptor for a named swarm role agent.
|
||||
*
|
||||
* Purpose: produce the minimal AgentCard structure required by the A2A SDK so
|
||||
* the server endpoint can advertise its identity and capabilities to clients
|
||||
* without requiring hand-authored config files.
|
||||
*
|
||||
* Consumer: A2ATransport.spawnAgent and a2a-agent-server.js at startup.
|
||||
*
|
||||
* @param {string} name - stable agent name (e.g. 'worker-1')
|
||||
* @param {string} role - agent role (coordinator|worker|scout|reviewer|planner|verifier|scribe|adversary)
|
||||
* @param {number} port - HTTP port the agent server will listen on
|
||||
* @returns {object} A2A AgentCard
|
||||
*/
|
||||
export function buildAgentCard(name, role, port) {
|
||||
const baseUrl = `http://localhost:${port}`;
|
||||
const roleSkillMap = {
|
||||
coordinator: {
|
||||
description: "Orchestrates tasks across worker pool",
|
||||
tags: ["coordinate", "route"],
|
||||
},
|
||||
worker: {
|
||||
description: "Executes build and repair tasks",
|
||||
tags: ["build", "repair"],
|
||||
},
|
||||
scout: {
|
||||
description: "Discovers and surfaces information",
|
||||
tags: ["research", "discover"],
|
||||
},
|
||||
reviewer: {
|
||||
description: "Critiques worker output against intent",
|
||||
tags: ["review", "critique"],
|
||||
},
|
||||
planner: {
|
||||
description: "Generates milestone and task contracts",
|
||||
tags: ["plan", "design"],
|
||||
},
|
||||
verifier: {
|
||||
description: "Runs gate checks and evidence validation",
|
||||
tags: ["verify", "validate"],
|
||||
},
|
||||
scribe: {
|
||||
description: "Writes and exports documentation",
|
||||
tags: ["document", "export"],
|
||||
},
|
||||
adversary: {
|
||||
description: "Red-teams plans and decisions",
|
||||
tags: ["challenge", "critique"],
|
||||
},
|
||||
};
|
||||
|
||||
const skill = roleSkillMap[role] ?? {
|
||||
description: `SF agent: ${role}`,
|
||||
tags: [role],
|
||||
};
|
||||
|
||||
return {
|
||||
name: `SF ${name}`,
|
||||
description: `Singularity Forge swarm agent — role: ${role}. ${skill.description}.`,
|
||||
protocolVersion: "0.3.0",
|
||||
version: "1.0.0",
|
||||
url: `${baseUrl}/a2a/jsonrpc`,
|
||||
skills: [
|
||||
{
|
||||
id: role,
|
||||
name: role,
|
||||
description: skill.description,
|
||||
tags: skill.tags,
|
||||
},
|
||||
],
|
||||
capabilities: {
|
||||
pushNotifications: false,
|
||||
},
|
||||
defaultInputModes: ["application/json"],
|
||||
defaultOutputModes: ["application/json"],
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* A2ATransport — spawns agent subprocesses and dispatches envelopes via HTTP A2A protocol.
|
||||
*
|
||||
* Purpose: provide real cross-process dispatch when SF_A2A_ENABLED=1 so each
|
||||
* swarm agent runs in an isolated Node.js process with its own context window.
|
||||
* Process lifecycle is managed here; callers interact only through dispatch().
|
||||
*
|
||||
* Consumer: SwarmDispatchLayer._a2aDispatch() in swarm-dispatch.js.
|
||||
*/
|
||||
export class A2ATransport {
|
||||
constructor(options = {}) {
|
||||
/** @type {Map<string, { url: string, pid: number, process: import('child_process').ChildProcess }>} */
|
||||
this._registry = new Map();
|
||||
this._spawn = options.spawn ?? spawn;
|
||||
}
|
||||
|
||||
/**
|
||||
* Spawn a new agent subprocess running an A2A HTTP server.
|
||||
*
|
||||
* Purpose: launch an isolated pi/sf agent process that serves A2A JSON-RPC
|
||||
* requests on a dynamically allocated port, wait for it to signal readiness,
|
||||
* and register it in the local process registry.
|
||||
*
|
||||
* Consumer: getOrSpawnAgent() when no cached entry exists for the agent name.
|
||||
*
|
||||
* @param {string} agentName - stable routing name (e.g. 'worker-1')
|
||||
* @param {string} role - agent role
|
||||
* @param {string} basePath - project root for SQLite state
|
||||
* @returns {Promise<string>} the agent's A2A JSON-RPC endpoint URL
|
||||
*/
|
||||
async spawnAgent(agentName, role, basePath) {
|
||||
const port = nextPort();
|
||||
const env = {
|
||||
...process.env,
|
||||
SF_A2A_AGENT_NAME: agentName,
|
||||
SF_A2A_AGENT_ROLE: role,
|
||||
SF_A2A_PORT: String(port),
|
||||
SF_A2A_BASE_PATH: basePath,
|
||||
SF_A2A_BEARER_TOKEN: PARENT_A2A_BEARER_TOKEN,
|
||||
};
|
||||
|
||||
const child = this._spawn(process.execPath, [A2A_AGENT_SERVER_PATH], {
|
||||
env,
|
||||
cwd: basePath,
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
});
|
||||
|
||||
const url = await new Promise((resolve, reject) => {
|
||||
const timeout = setTimeout(() => {
|
||||
child.kill("SIGTERM");
|
||||
reject(
|
||||
new Error(
|
||||
`A2ATransport: agent ${agentName} did not signal readiness within ${AGENT_READY_TIMEOUT_MS}ms`,
|
||||
),
|
||||
);
|
||||
}, AGENT_READY_TIMEOUT_MS);
|
||||
|
||||
let buffer = "";
|
||||
child.stdout.on("data", (chunk) => {
|
||||
buffer += chunk.toString();
|
||||
const lines = buffer.split("\n");
|
||||
buffer = lines.pop() ?? "";
|
||||
for (const line of lines) {
|
||||
const trimmed = line.trim();
|
||||
if (!trimmed) continue;
|
||||
try {
|
||||
const msg = JSON.parse(trimmed);
|
||||
if (msg.ready && msg.port) {
|
||||
clearTimeout(timeout);
|
||||
resolve(`http://localhost:${msg.port}/a2a/jsonrpc`);
|
||||
}
|
||||
} catch {
|
||||
// non-JSON stdout line — ignore
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
child.on("error", (err) => {
|
||||
clearTimeout(timeout);
|
||||
reject(
|
||||
new Error(
|
||||
`A2ATransport: failed to spawn agent ${agentName}: ${err.message}`,
|
||||
),
|
||||
);
|
||||
});
|
||||
|
||||
child.on("exit", (code) => {
|
||||
clearTimeout(timeout);
|
||||
this._registry.delete(agentName);
|
||||
if (code !== 0 && code !== null) {
|
||||
reject(
|
||||
new Error(
|
||||
`A2ATransport: agent ${agentName} exited with code ${code} before signalling readiness`,
|
||||
),
|
||||
);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
this._registry.set(agentName, { url, pid: child.pid, process: child });
|
||||
|
||||
child.stderr.on("data", (chunk) => {
|
||||
// Suppress stderr from agent subprocesses unless debugging
|
||||
if (process.env.SF_A2A_DEBUG) {
|
||||
process.stderr.write(`[a2a:${agentName}] ${chunk}`);
|
||||
}
|
||||
});
|
||||
|
||||
return url;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the cached agent endpoint URL, spawning the process if needed.
|
||||
*
|
||||
* Purpose: ensure each named agent has at most one live process so dispatch
|
||||
* calls share a persistent agent context rather than forking a new process
|
||||
* per envelope.
|
||||
*
|
||||
* Consumer: dispatch() on every envelope send.
|
||||
*
|
||||
* @param {string} agentName
|
||||
* @param {string} role
|
||||
* @param {string} basePath
|
||||
* @returns {Promise<string>} endpoint URL
|
||||
*/
|
||||
async getOrSpawnAgent(agentName, role, basePath) {
|
||||
const entry = this._registry.get(agentName);
|
||||
if (entry) return entry.url;
|
||||
return this.spawnAgent(agentName, role, basePath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a DispatchEnvelope to a live agent endpoint via A2A JSON-RPC.
|
||||
*
|
||||
* Purpose: deliver a single envelope over the A2A wire protocol and return
|
||||
* the agent's JSON response body so the SwarmDispatchLayer can record the
|
||||
* result without knowing the HTTP transport details.
|
||||
*
|
||||
* Consumer: SwarmDispatchLayer._a2aDispatch() after resolving the agent URL.
|
||||
*
|
||||
* @param {string} agentUrl - the agent's JSON-RPC endpoint URL
|
||||
* @param {import('./swarm-dispatch.js').DispatchEnvelope} envelope
|
||||
* @returns {Promise<object>} task result from the agent
|
||||
*/
|
||||
async dispatch(agentUrl, envelope) {
|
||||
// Dynamically import A2A client to avoid loading it unless A2A mode is active.
|
||||
const { ClientFactory, ClientFactoryOptions, JsonRpcTransportFactory } =
|
||||
await import("@a2a-js/sdk/client");
|
||||
|
||||
// Derive base URL by stripping the /a2a/jsonrpc suffix for agent card resolution.
|
||||
const baseUrl = agentUrl.replace(/\/a2a\/jsonrpc$/, "");
|
||||
const authenticatedFetch = (url, init = {}) =>
|
||||
fetch(url, {
|
||||
...init,
|
||||
headers: {
|
||||
...(init.headers ?? {}),
|
||||
Authorization: `Bearer ${PARENT_A2A_BEARER_TOKEN}`,
|
||||
},
|
||||
});
|
||||
const factory = new ClientFactory(
|
||||
ClientFactoryOptions.createFrom(ClientFactoryOptions.default, {
|
||||
transports: [new JsonRpcTransportFactory({ fetchImpl: authenticatedFetch })],
|
||||
}),
|
||||
);
|
||||
const client = await factory.createFromUrl(baseUrl);
|
||||
|
||||
const messageId = randomUUID();
|
||||
const sendParams = {
|
||||
message: {
|
||||
messageId,
|
||||
role: "user",
|
||||
kind: "message",
|
||||
parts: [
|
||||
{
|
||||
kind: "data",
|
||||
data: envelope,
|
||||
metadata: {
|
||||
contentType: "application/json",
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
};
|
||||
|
||||
const response = await client.sendMessage(sendParams);
|
||||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
* Terminate all spawned agent processes.
|
||||
*
|
||||
* Purpose: clean up child processes on shutdown so they don't linger after
|
||||
* the parent process exits.
|
||||
*
|
||||
* Consumer: process exit handlers and test teardown.
|
||||
*/
|
||||
shutdown() {
|
||||
for (const [_name, entry] of this._registry) {
|
||||
try {
|
||||
entry.process.kill("SIGTERM");
|
||||
} catch {
|
||||
// already dead
|
||||
}
|
||||
}
|
||||
this._registry.clear();
|
||||
}
|
||||
}
|
||||
|
|
@ -6,8 +6,7 @@
|
|||
* a structured result. Provides cached swarm access so the same AgentSwarm instance is
|
||||
* reused across dispatch calls within a process.
|
||||
*
|
||||
* Consumer: UOK kernel dispatch path, parallel orchestrators, and /sf autonomous controller
|
||||
* when SF_A2A_ENABLED is set.
|
||||
* Consumer: UOK kernel dispatch path, parallel orchestrators, and /sf autonomous controller.
|
||||
*
|
||||
* ## Current state — enqueue only, no runner
|
||||
* `_busDispatch` routes an envelope to a role agent's inbox via the MessageBus. It does NOT
|
||||
|
|
@ -199,16 +198,6 @@ async function runAgentTurnWithOuterWatchdogs(runAgentTurn, agent, opts = {}) {
|
|||
// Module-level cache keyed by `${basePath}:${swarmName}`
|
||||
const _cache = new Map();
|
||||
|
||||
// Lazy singleton A2ATransport — only loaded when SF_A2A_ENABLED is set.
|
||||
let _a2aTransport = null;
|
||||
async function getA2ATransport() {
|
||||
if (!_a2aTransport) {
|
||||
const { A2ATransport } = await import("./a2a-transport.js");
|
||||
_a2aTransport = new A2ATransport();
|
||||
}
|
||||
return _a2aTransport;
|
||||
}
|
||||
|
||||
/**
|
||||
* @typedef {object} DispatchEnvelope
|
||||
* @property {string} unitId
|
||||
|
|
@ -246,7 +235,7 @@ async function getA2ATransport() {
|
|||
* envelope to the appropriate role agent and returning a structured result.
|
||||
*
|
||||
* Consumer: UOK kernel dispatch path, parallel orchestrators, and /sf autonomous
|
||||
* controller when SF_A2A_ENABLED is set.
|
||||
* controller.
|
||||
*/
|
||||
export class SwarmDispatchLayer {
|
||||
/**
|
||||
|
|
@ -299,8 +288,6 @@ export class SwarmDispatchLayer {
|
|||
* Purpose: select the right role agent for the envelope's workMode/unitType,
|
||||
* deliver the payload to that agent's durable inbox, and return a structured
|
||||
* result so callers can track message delivery without knowing the swarm topology.
|
||||
* When SF_A2A_ENABLED is set, delivery uses the A2A HTTP transport instead of
|
||||
* the in-process SQLite MessageBus.
|
||||
*
|
||||
* Consumer: UOK kernel dispatch path and swarmDispatch() convenience function.
|
||||
*
|
||||
|
|
@ -308,13 +295,6 @@ export class SwarmDispatchLayer {
|
|||
* @returns {Promise<DispatchResult>}
|
||||
*/
|
||||
async dispatch(envelope) {
|
||||
// Codex adversarial review 2026-05-17 [high]: gate must be === "1"
|
||||
// so SF_A2A_ENABLED=0 actually disables A2A. The previous truthy
|
||||
// check left "0" routing through A2A, silently breaking the
|
||||
// documented emergency-rollback contract.
|
||||
if (process.env.SF_A2A_ENABLED === "1") {
|
||||
return this._a2aDispatch(envelope);
|
||||
}
|
||||
return this._busDispatch(envelope);
|
||||
}
|
||||
|
||||
|
|
@ -322,9 +302,9 @@ export class SwarmDispatchLayer {
|
|||
* Deliver an envelope via the SQLite MessageBus (default in-process path).
|
||||
*
|
||||
* Purpose: preserve the existing bus-backed delivery path so all existing
|
||||
* callers continue working unchanged when SF_A2A_ENABLED is not set.
|
||||
* callers continue working unchanged.
|
||||
*
|
||||
* Consumer: dispatch() when SF_A2A_ENABLED is unset.
|
||||
* Consumer: dispatch().
|
||||
*
|
||||
* @param {DispatchEnvelope} envelope
|
||||
* @returns {Promise<DispatchResult>}
|
||||
|
|
@ -396,68 +376,6 @@ export class SwarmDispatchLayer {
|
|||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Deliver an envelope to a spawned agent process over the A2A HTTP protocol.
|
||||
*
|
||||
* Purpose: route the envelope to the correct role agent running in a separate
|
||||
* process, using A2ATransport to spawn the process on first use and the A2A
|
||||
* JSON-RPC client for delivery. SQLite state is updated by the remote process.
|
||||
*
|
||||
* Consumer: dispatch() when SF_A2A_ENABLED is set.
|
||||
*
|
||||
* @param {DispatchEnvelope} envelope
|
||||
* @returns {Promise<DispatchResult>}
|
||||
*/
|
||||
async _a2aDispatch(envelope) {
|
||||
const swarm = await this.getOrCreateSwarm();
|
||||
const target =
|
||||
envelope.targetAgent != null
|
||||
? swarm.get(envelope.targetAgent)
|
||||
: swarm.route(envelope);
|
||||
|
||||
if (!target) {
|
||||
throw new Error(
|
||||
`SwarmDispatchLayer(a2a): no agent available to handle envelope unitType=${envelope.unitType} workMode=${envelope.workMode}`,
|
||||
);
|
||||
}
|
||||
|
||||
const { name: agentName, role } = target.identity;
|
||||
const transport = await getA2ATransport();
|
||||
const agentUrl = await transport.getOrSpawnAgent(
|
||||
agentName,
|
||||
role,
|
||||
this._basePath,
|
||||
);
|
||||
|
||||
const response = await transport.dispatch(agentUrl, envelope);
|
||||
|
||||
// Extract messageId from the A2A response if present.
|
||||
let messageId = `a2a-${Date.now()}`;
|
||||
try {
|
||||
const parts = response?.parts ?? [];
|
||||
for (const part of parts) {
|
||||
const data =
|
||||
part?.data ??
|
||||
(part?.kind === "text" ? JSON.parse(part.text ?? "{}") : null);
|
||||
if (data?.messageId) {
|
||||
messageId = data.messageId;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// leave messageId as default
|
||||
}
|
||||
|
||||
return {
|
||||
messageId,
|
||||
targetAgent: agentName,
|
||||
swarmName: this._swarmName,
|
||||
envelope,
|
||||
transport: "a2a",
|
||||
agentUrl,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispatch an envelope and block until the target agent has produced a reply.
|
||||
*
|
||||
|
|
@ -466,10 +384,6 @@ export class SwarmDispatchLayer {
|
|||
* agent's inbox (same as dispatch()), then drives runAgentTurn so the agent
|
||||
* processes the inbox in-process, then reads the agent's reply from the bus.
|
||||
*
|
||||
* Behavior under SF_A2A_ENABLED: falls through to the A2A transport and returns
|
||||
* `{ ...result, reply: null, replyMessageId: null }` — the A2A path does not yet
|
||||
* support synchronous waits.
|
||||
*
|
||||
* @param {DispatchEnvelope} envelope
|
||||
* @param {object} [options={}]
|
||||
* @param {number} [options.timeoutMs=480000] Hard cap for the agent's turn.
|
||||
|
|
@ -481,15 +395,6 @@ export class SwarmDispatchLayer {
|
|||
async dispatchAndWait(envelope, options = {}) {
|
||||
const { timeoutMs = 480_000, noOutputTimeoutMs, signal, onEvent } = options;
|
||||
|
||||
// A2A path: no synchronous wait support yet — return nulled reply fields.
|
||||
// Codex adversarial review 2026-05-17 [high]: gate must be === "1"
|
||||
// so SF_A2A_ENABLED=0 actually disables A2A (the truthy check left
|
||||
// "0" routing through A2A and returning null reply fields silently).
|
||||
if (process.env.SF_A2A_ENABLED === "1") {
|
||||
const result = await this._a2aDispatch(envelope);
|
||||
return { ...result, reply: null, replyMessageId: null };
|
||||
}
|
||||
|
||||
// Step 1: Enqueue via existing _busDispatch
|
||||
debugLog("swarm-dispatch", {
|
||||
phase: "before-busDispatch",
|
||||
|
|
|
|||
167
src/status-projection.ts
Normal file
167
src/status-projection.ts
Normal file
|
|
@ -0,0 +1,167 @@
|
|||
/**
|
||||
* status-projection.ts — read-model contract for supervised SF repos.
|
||||
*
|
||||
* Purpose: publish a small atomically-written status file that web and other
|
||||
* observers can read without opening a repo's live SQLite database or poking
|
||||
* at transient runtime internals.
|
||||
*
|
||||
* Consumer: `sf headless query` writes `.sf/status.projection.json`; the web
|
||||
* `/api/swarms` route reads it for multi-repo dashboard aggregation.
|
||||
*/
|
||||
import {
|
||||
closeSync,
|
||||
existsSync,
|
||||
fsyncSync,
|
||||
mkdirSync,
|
||||
openSync,
|
||||
renameSync,
|
||||
writeSync,
|
||||
} from "node:fs";
|
||||
import { basename, join, resolve } from "node:path";
|
||||
import type { QuerySnapshot } from "./headless-query.js";
|
||||
|
||||
export const STATUS_PROJECTION_VERSION = 1;
|
||||
export const STATUS_PROJECTION_FILE = "status.projection.json";
|
||||
|
||||
export interface SfStatusProjection {
|
||||
projectionVersion: 1;
|
||||
swarmId: string;
|
||||
repoPath: string;
|
||||
repoName: string;
|
||||
writer: {
|
||||
surface: "headless";
|
||||
pid: number;
|
||||
writtenAt: string;
|
||||
};
|
||||
supervisor: {
|
||||
kind: "systemd" | "launchd" | "daemon" | "headless" | "unknown";
|
||||
health: "running" | "idle" | "blocked" | "degraded" | "unknown";
|
||||
currentPid: number | null;
|
||||
};
|
||||
state: {
|
||||
activeMilestoneId: string | null;
|
||||
activeMilestoneTitle: string | null;
|
||||
activeSliceId: string | null;
|
||||
activeSliceTitle: string | null;
|
||||
phase: string;
|
||||
nextAction: string;
|
||||
nextUnitType: string | null;
|
||||
nextUnitId: string | null;
|
||||
queueDepth: number;
|
||||
lastCycleOutcome: string | null;
|
||||
currentUnit: string | null;
|
||||
};
|
||||
health: {
|
||||
verdict: string | null;
|
||||
classification: string | null;
|
||||
issues: number;
|
||||
};
|
||||
}
|
||||
|
||||
function projectionPath(basePath: string): string {
|
||||
return join(basePath, ".sf", STATUS_PROJECTION_FILE);
|
||||
}
|
||||
|
||||
function deriveSupervisor(
|
||||
snapshot: QuerySnapshot,
|
||||
): SfStatusProjection["supervisor"] {
|
||||
const diagnostics = snapshot.uokDiagnostics ?? {};
|
||||
const currentUnit = diagnostics.currentUnit ?? null;
|
||||
const classification = String(diagnostics.classification ?? "unknown");
|
||||
const health =
|
||||
classification === "running"
|
||||
? "running"
|
||||
: classification === "blocked"
|
||||
? "blocked"
|
||||
: classification === "clear"
|
||||
? "idle"
|
||||
: classification === "degraded"
|
||||
? "degraded"
|
||||
: diagnostics.verdict === "clear"
|
||||
? "running"
|
||||
: "unknown";
|
||||
return {
|
||||
kind: "headless",
|
||||
health,
|
||||
currentPid:
|
||||
typeof currentUnit?.pid === "number" && Number.isFinite(currentUnit.pid)
|
||||
? currentUnit.pid
|
||||
: null,
|
||||
};
|
||||
}
|
||||
|
||||
function deriveQueueDepth(snapshot: QuerySnapshot): number {
|
||||
const state = snapshot.state as { registry?: Array<{ status?: string }> };
|
||||
const registry = Array.isArray(state.registry) ? state.registry : [];
|
||||
return registry.filter((item) => item.status === "pending").length;
|
||||
}
|
||||
|
||||
export function buildStatusProjection(
|
||||
basePath: string,
|
||||
snapshot: QuerySnapshot,
|
||||
): SfStatusProjection {
|
||||
const repoPath = resolve(basePath);
|
||||
const diagnostics = snapshot.uokDiagnostics ?? {};
|
||||
const currentUnit = diagnostics.currentUnit;
|
||||
const currentUnitLabel =
|
||||
currentUnit?.unitType && currentUnit?.unitId
|
||||
? `${currentUnit.unitType}:${currentUnit.unitId}`
|
||||
: null;
|
||||
return {
|
||||
projectionVersion: STATUS_PROJECTION_VERSION,
|
||||
swarmId: repoPath,
|
||||
repoPath,
|
||||
repoName: basename(repoPath),
|
||||
writer: {
|
||||
surface: "headless",
|
||||
pid: process.pid,
|
||||
writtenAt: new Date().toISOString(),
|
||||
},
|
||||
supervisor: deriveSupervisor(snapshot),
|
||||
state: {
|
||||
activeMilestoneId: snapshot.state.activeMilestone?.id ?? null,
|
||||
activeMilestoneTitle: snapshot.state.activeMilestone?.title ?? null,
|
||||
activeSliceId: snapshot.state.activeSlice?.id ?? null,
|
||||
activeSliceTitle: snapshot.state.activeSlice?.title ?? null,
|
||||
phase: String(snapshot.state.phase ?? "unknown"),
|
||||
nextAction: String(snapshot.state.nextAction ?? ""),
|
||||
nextUnitType: snapshot.next.unitType ?? null,
|
||||
nextUnitId: snapshot.next.unitId ?? null,
|
||||
queueDepth: deriveQueueDepth(snapshot),
|
||||
lastCycleOutcome: diagnostics.latestRun?.status
|
||||
? String(diagnostics.latestRun.status)
|
||||
: null,
|
||||
currentUnit: currentUnitLabel,
|
||||
},
|
||||
health: {
|
||||
verdict: diagnostics.verdict ? String(diagnostics.verdict) : null,
|
||||
classification: diagnostics.classification
|
||||
? String(diagnostics.classification)
|
||||
: null,
|
||||
issues: Array.isArray(diagnostics.issues) ? diagnostics.issues.length : 0,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export function writeStatusProjection(
|
||||
basePath: string,
|
||||
snapshot: QuerySnapshot,
|
||||
): SfStatusProjection {
|
||||
const projection = buildStatusProjection(basePath, snapshot);
|
||||
const target = projectionPath(basePath);
|
||||
mkdirSync(join(basePath, ".sf"), { recursive: true });
|
||||
const tmp = `${target}.tmp-${process.pid}-${Date.now()}`;
|
||||
const fd = openSync(tmp, "w", 0o644);
|
||||
try {
|
||||
writeSync(fd, JSON.stringify(projection, null, 2) + "\n");
|
||||
fsyncSync(fd);
|
||||
} finally {
|
||||
closeSync(fd);
|
||||
}
|
||||
renameSync(tmp, target);
|
||||
return projection;
|
||||
}
|
||||
|
||||
export function hasStatusProjection(basePath: string): boolean {
|
||||
return existsSync(projectionPath(basePath));
|
||||
}
|
||||
88
src/tests/status-projection.test.ts
Normal file
88
src/tests/status-projection.test.ts
Normal file
|
|
@ -0,0 +1,88 @@
|
|||
import assert from "node:assert/strict";
|
||||
import { existsSync, mkdtempSync, readFileSync, rmSync } from "node:fs";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { afterEach, describe, test } from "vitest";
|
||||
import {
|
||||
STATUS_PROJECTION_FILE,
|
||||
buildStatusProjection,
|
||||
writeStatusProjection,
|
||||
} from "../status-projection.js";
|
||||
import type { QuerySnapshot } from "../headless-query.js";
|
||||
|
||||
const roots: string[] = [];
|
||||
|
||||
afterEach(() => {
|
||||
for (const root of roots.splice(0)) {
|
||||
rmSync(root, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
function makeRoot() {
|
||||
const root = mkdtempSync(join(tmpdir(), "sf-status-projection-"));
|
||||
roots.push(root);
|
||||
return root;
|
||||
}
|
||||
|
||||
function snapshot(): QuerySnapshot {
|
||||
return {
|
||||
schemaVersion: 1,
|
||||
state: {
|
||||
activeMilestone: { id: "M053", title: "Supervisor" },
|
||||
activeSlice: { id: "S11", title: "Web projection" },
|
||||
activeTask: null,
|
||||
phase: "planning",
|
||||
recentDecisions: [],
|
||||
blockers: [],
|
||||
nextAction: "Plan projection",
|
||||
registry: [{ id: "M054", title: "Later", status: "pending" }],
|
||||
requirements: {
|
||||
active: 1,
|
||||
validated: 0,
|
||||
deferred: 0,
|
||||
outOfScope: 0,
|
||||
blocked: 0,
|
||||
total: 1,
|
||||
},
|
||||
progress: {
|
||||
milestones: { done: 0, total: 1 },
|
||||
slices: { done: 0, total: 1 },
|
||||
},
|
||||
} as QuerySnapshot["state"],
|
||||
next: { action: "dispatch", unitType: "plan-slice", unitId: "M053/S11" },
|
||||
cost: { workers: [], total: 0 },
|
||||
runtime: { units: [] },
|
||||
uokDiagnostics: {
|
||||
verdict: "clear",
|
||||
classification: "running",
|
||||
currentUnit: { unitType: "plan-slice", unitId: "M053/S11", pid: 123 },
|
||||
latestRun: { status: "started" },
|
||||
issues: [],
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
describe("status projection", () => {
|
||||
test("buildStatusProjection_from_query_snapshot_returns_read_model", () => {
|
||||
const root = makeRoot();
|
||||
const projection = buildStatusProjection(root, snapshot());
|
||||
|
||||
assert.equal(projection.projectionVersion, 1);
|
||||
assert.equal(projection.state.activeMilestoneId, "M053");
|
||||
assert.equal(projection.state.currentUnit, "plan-slice:M053/S11");
|
||||
assert.equal(projection.state.queueDepth, 1);
|
||||
assert.equal(projection.supervisor.health, "running");
|
||||
assert.equal(projection.supervisor.currentPid, 123);
|
||||
});
|
||||
|
||||
test("writeStatusProjection_writes_atomic_projection_file", () => {
|
||||
const root = makeRoot();
|
||||
const projection = writeStatusProjection(root, snapshot());
|
||||
const path = join(root, ".sf", STATUS_PROJECTION_FILE);
|
||||
|
||||
assert.equal(existsSync(path), true);
|
||||
const parsed = JSON.parse(readFileSync(path, "utf-8"));
|
||||
assert.equal(parsed.projectionVersion, projection.projectionVersion);
|
||||
assert.equal(parsed.state.activeSliceId, "S11");
|
||||
});
|
||||
});
|
||||
30
web/app/api/swarms/route.ts
Normal file
30
web/app/api/swarms/route.ts
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
import {
|
||||
readSwarmDashboardRows,
|
||||
SWARMS_REGISTRY_PATH,
|
||||
} from "@/lib/swarm-status";
|
||||
|
||||
export const runtime = "nodejs";
|
||||
export const dynamic = "force-dynamic";
|
||||
|
||||
export async function GET(): Promise<Response> {
|
||||
const swarms = readSwarmDashboardRows();
|
||||
const summary = {
|
||||
total: swarms.length,
|
||||
ok: swarms.filter((swarm) => swarm.status === "ok").length,
|
||||
degraded: swarms.filter((swarm) => swarm.status === "degraded").length,
|
||||
missing: swarms.filter((swarm) => swarm.status === "missing").length,
|
||||
};
|
||||
return Response.json(
|
||||
{
|
||||
schemaVersion: 1,
|
||||
registryPath: SWARMS_REGISTRY_PATH,
|
||||
summary,
|
||||
swarms,
|
||||
},
|
||||
{
|
||||
headers: {
|
||||
"Cache-Control": "no-store",
|
||||
},
|
||||
},
|
||||
);
|
||||
}
|
||||
|
|
@ -32,6 +32,7 @@ import {
|
|||
Sidebar,
|
||||
} from "@/components/sf/sidebar";
|
||||
import { StatusBar } from "@/components/sf/status-bar";
|
||||
import { SwarmsView } from "@/components/sf/swarms-view";
|
||||
import { UpdateBanner } from "@/components/sf/update-banner";
|
||||
import { VisualizerView } from "@/components/sf/visualizer-view";
|
||||
import { Badge } from "@/components/ui/badge";
|
||||
|
|
@ -57,6 +58,7 @@ const KNOWN_VIEWS = new Set([
|
|||
"dashboard",
|
||||
"power",
|
||||
"chat",
|
||||
"swarms",
|
||||
"roadmap",
|
||||
"files",
|
||||
"activity",
|
||||
|
|
@ -520,6 +522,11 @@ function WorkspaceChrome() {
|
|||
<Roadmap />
|
||||
</ErrorBoundary>
|
||||
)}
|
||||
{activeView === "swarms" && (
|
||||
<ErrorBoundary viewName="Swarms">
|
||||
<SwarmsView />
|
||||
</ErrorBoundary>
|
||||
)}
|
||||
{activeView === "files" && (
|
||||
<ErrorBoundary viewName="Files">
|
||||
<FilesView />
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import {
|
|||
PanelRightClose,
|
||||
PanelRightOpen,
|
||||
Play,
|
||||
Server,
|
||||
Settings,
|
||||
SkipForward,
|
||||
Sun,
|
||||
|
|
@ -108,6 +109,7 @@ export function NavRail({
|
|||
{ id: "dashboard", label: "Dashboard", icon: LayoutDashboard },
|
||||
{ id: "power", label: "Power Mode", icon: Columns2 },
|
||||
{ id: "chat", label: "Chat", icon: MessagesSquare },
|
||||
{ id: "swarms", label: "Swarms", icon: Server },
|
||||
{ id: "roadmap", label: "Roadmap", icon: MapIcon },
|
||||
{ id: "files", label: "Files", icon: Folder },
|
||||
{ id: "activity", label: "Activity", icon: Activity },
|
||||
|
|
|
|||
206
web/components/sf/swarms-view.tsx
Normal file
206
web/components/sf/swarms-view.tsx
Normal file
|
|
@ -0,0 +1,206 @@
|
|||
"use client";
|
||||
|
||||
import {
|
||||
AlertTriangle,
|
||||
CheckCircle2,
|
||||
CircleDashed,
|
||||
Server,
|
||||
} from "lucide-react";
|
||||
import { useEffect, useState } from "react";
|
||||
import { Badge } from "@/components/ui/badge";
|
||||
import { authFetch } from "@/lib/auth";
|
||||
import type { SwarmDashboardRow } from "@/lib/swarm-status";
|
||||
import { cn } from "@/lib/utils";
|
||||
|
||||
interface SwarmsPayload {
|
||||
schemaVersion: 1;
|
||||
registryPath: string;
|
||||
summary: {
|
||||
total: number;
|
||||
ok: number;
|
||||
degraded: number;
|
||||
missing: number;
|
||||
};
|
||||
swarms: SwarmDashboardRow[];
|
||||
}
|
||||
|
||||
function healthIcon(status: SwarmDashboardRow["status"]) {
|
||||
if (status === "ok") return <CheckCircle2 className="h-4 w-4 text-success" />;
|
||||
if (status === "missing") {
|
||||
return <CircleDashed className="h-4 w-4 text-muted-foreground" />;
|
||||
}
|
||||
return <AlertTriangle className="h-4 w-4 text-warning" />;
|
||||
}
|
||||
|
||||
function statusTone(status: SwarmDashboardRow["status"]) {
|
||||
if (status === "ok") return "border-success/40 text-success";
|
||||
if (status === "missing") return "border-border text-muted-foreground";
|
||||
return "border-warning/40 text-warning";
|
||||
}
|
||||
|
||||
export function SwarmsView() {
|
||||
const [payload, setPayload] = useState<SwarmsPayload | null>(null);
|
||||
const [error, setError] = useState<string | null>(null);
|
||||
|
||||
useEffect(() => {
|
||||
let cancelled = false;
|
||||
const load = async () => {
|
||||
try {
|
||||
const response = await authFetch("/api/swarms");
|
||||
if (!response.ok) throw new Error(`HTTP ${response.status}`);
|
||||
const data = (await response.json()) as SwarmsPayload;
|
||||
if (!cancelled) {
|
||||
setPayload(data);
|
||||
setError(null);
|
||||
}
|
||||
} catch (err) {
|
||||
if (!cancelled) {
|
||||
setError(err instanceof Error ? err.message : String(err));
|
||||
}
|
||||
}
|
||||
};
|
||||
void load();
|
||||
const timer = window.setInterval(load, 5_000);
|
||||
return () => {
|
||||
cancelled = true;
|
||||
window.clearInterval(timer);
|
||||
};
|
||||
}, []);
|
||||
|
||||
return (
|
||||
<div className="flex h-full flex-col overflow-hidden">
|
||||
<div className="border-b border-border px-6 py-3">
|
||||
<div className="flex items-center justify-between gap-4">
|
||||
<div>
|
||||
<h1 className="text-lg font-semibold">Swarms</h1>
|
||||
<p className="text-sm text-muted-foreground">
|
||||
Per-repo supervised SF status projections
|
||||
</p>
|
||||
</div>
|
||||
<Server className="h-5 w-5 text-muted-foreground" />
|
||||
</div>
|
||||
</div>
|
||||
<div className="flex-1 overflow-y-auto px-6 py-4">
|
||||
{error ? (
|
||||
<div className="rounded-md border border-destructive/30 p-4 text-sm text-destructive">
|
||||
{error}
|
||||
</div>
|
||||
) : null}
|
||||
{payload ? (
|
||||
<div className="space-y-4">
|
||||
<div className="grid gap-3 md:grid-cols-4">
|
||||
<div className="rounded-md border border-border p-3">
|
||||
<p className="text-xs text-muted-foreground">Registered</p>
|
||||
<p className="mt-1 text-2xl font-semibold">
|
||||
{payload.summary.total}
|
||||
</p>
|
||||
</div>
|
||||
<div className="rounded-md border border-border p-3">
|
||||
<p className="text-xs text-muted-foreground">Running</p>
|
||||
<p className="mt-1 text-2xl font-semibold">
|
||||
{payload.summary.ok}
|
||||
</p>
|
||||
</div>
|
||||
<div className="rounded-md border border-border p-3">
|
||||
<p className="text-xs text-muted-foreground">Degraded</p>
|
||||
<p className="mt-1 text-2xl font-semibold">
|
||||
{payload.summary.degraded}
|
||||
</p>
|
||||
</div>
|
||||
<div className="rounded-md border border-border p-3">
|
||||
<p className="text-xs text-muted-foreground">Missing</p>
|
||||
<p className="mt-1 text-2xl font-semibold">
|
||||
{payload.summary.missing}
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{payload.swarms.length === 0 ? (
|
||||
<div className="rounded-md border border-border p-6 text-sm text-muted-foreground">
|
||||
No repos registered in {payload.registryPath}.
|
||||
</div>
|
||||
) : (
|
||||
<div className="overflow-hidden rounded-md border border-border">
|
||||
{payload.swarms.map((swarm) => (
|
||||
<div
|
||||
key={swarm.id}
|
||||
className="border-b border-border p-4 last:border-b-0"
|
||||
>
|
||||
<div className="flex items-start justify-between gap-4">
|
||||
<div className="min-w-0">
|
||||
<div className="flex items-center gap-2">
|
||||
{healthIcon(swarm.status)}
|
||||
<h2 className="truncate text-sm font-semibold">
|
||||
{swarm.name}
|
||||
</h2>
|
||||
<Badge
|
||||
variant="outline"
|
||||
className={cn(
|
||||
"h-5 text-[11px]",
|
||||
statusTone(swarm.status),
|
||||
)}
|
||||
>
|
||||
{swarm.status}
|
||||
</Badge>
|
||||
</div>
|
||||
<p className="mt-1 truncate font-mono text-xs text-muted-foreground">
|
||||
{swarm.repoPath}
|
||||
</p>
|
||||
</div>
|
||||
<p className="shrink-0 text-xs text-muted-foreground">
|
||||
{swarm.updatedAt
|
||||
? new Date(swarm.updatedAt).toLocaleTimeString()
|
||||
: "no projection"}
|
||||
</p>
|
||||
</div>
|
||||
{swarm.projection ? (
|
||||
<div className="mt-3 grid gap-3 text-sm md:grid-cols-4">
|
||||
<div>
|
||||
<p className="text-xs text-muted-foreground">
|
||||
Milestone
|
||||
</p>
|
||||
<p className="truncate">
|
||||
{swarm.projection.state.activeMilestoneId ?? "none"}
|
||||
</p>
|
||||
</div>
|
||||
<div>
|
||||
<p className="text-xs text-muted-foreground">
|
||||
Current Unit
|
||||
</p>
|
||||
<p className="truncate">
|
||||
{swarm.projection.state.currentUnit ?? "idle"}
|
||||
</p>
|
||||
</div>
|
||||
<div>
|
||||
<p className="text-xs text-muted-foreground">Queue</p>
|
||||
<p>{swarm.projection.state.queueDepth}</p>
|
||||
</div>
|
||||
<div>
|
||||
<p className="text-xs text-muted-foreground">
|
||||
Supervisor
|
||||
</p>
|
||||
<p className="truncate">
|
||||
{swarm.projection.supervisor.kind} /{" "}
|
||||
{swarm.projection.supervisor.health}
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
) : (
|
||||
<p className="mt-3 text-sm text-muted-foreground">
|
||||
{swarm.error}
|
||||
</p>
|
||||
)}
|
||||
</div>
|
||||
))}
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
) : (
|
||||
<div className="rounded-md border border-border p-6 text-sm text-muted-foreground">
|
||||
Loading swarms…
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
87
web/lib/__tests__/swarm-status.test.ts
Normal file
87
web/lib/__tests__/swarm-status.test.ts
Normal file
|
|
@ -0,0 +1,87 @@
|
|||
import assert from "node:assert/strict";
|
||||
import { mkdirSync, mkdtempSync, rmSync, writeFileSync } from "node:fs";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { afterEach, describe, test } from "vitest";
|
||||
import { readSwarmDashboardRows } from "../swarm-status";
|
||||
|
||||
const roots: string[] = [];
|
||||
|
||||
afterEach(() => {
|
||||
for (const root of roots.splice(0)) {
|
||||
rmSync(root, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
function makeRoot() {
|
||||
const root = mkdtempSync(join(tmpdir(), "sf-web-swarms-"));
|
||||
roots.push(root);
|
||||
return root;
|
||||
}
|
||||
|
||||
function writeProjection(repo: string) {
|
||||
mkdirSync(join(repo, ".sf"), { recursive: true });
|
||||
writeFileSync(
|
||||
join(repo, ".sf", "status.projection.json"),
|
||||
JSON.stringify({
|
||||
projectionVersion: 1,
|
||||
swarmId: repo,
|
||||
repoPath: repo,
|
||||
repoName: "repo",
|
||||
writer: {
|
||||
surface: "headless",
|
||||
pid: 123,
|
||||
writtenAt: "2026-05-17T00:00:00.000Z",
|
||||
},
|
||||
supervisor: { kind: "headless", health: "running", currentPid: 123 },
|
||||
state: {
|
||||
activeMilestoneId: "M053",
|
||||
activeMilestoneTitle: "Supervisor",
|
||||
activeSliceId: "S11",
|
||||
activeSliceTitle: "Web projection",
|
||||
phase: "planning",
|
||||
nextAction: "Plan projection",
|
||||
nextUnitType: "plan-slice",
|
||||
nextUnitId: "M053/S11",
|
||||
queueDepth: 2,
|
||||
lastCycleOutcome: "started",
|
||||
currentUnit: "plan-slice:M053/S11",
|
||||
},
|
||||
health: { verdict: "clear", classification: "running", issues: 0 },
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
describe("readSwarmDashboardRows", () => {
|
||||
test("reads_registered_repos_from_status_projection", () => {
|
||||
const root = makeRoot();
|
||||
const repo = join(root, "repo");
|
||||
mkdirSync(repo);
|
||||
writeProjection(repo);
|
||||
const registry = join(root, "swarms.json");
|
||||
writeFileSync(
|
||||
registry,
|
||||
JSON.stringify({ swarms: [{ name: "Repo", path: repo }] }),
|
||||
);
|
||||
|
||||
const rows = readSwarmDashboardRows(registry);
|
||||
|
||||
assert.equal(rows.length, 1);
|
||||
assert.equal(rows[0]?.status, "ok");
|
||||
assert.equal(rows[0]?.projection?.state.activeMilestoneId, "M053");
|
||||
});
|
||||
|
||||
test("missing_projection_degrades_one_repo_without_throwing", () => {
|
||||
const root = makeRoot();
|
||||
const repo = join(root, "repo");
|
||||
mkdirSync(repo);
|
||||
const registry = join(root, "swarms.json");
|
||||
writeFileSync(registry, JSON.stringify([{ path: repo }]));
|
||||
|
||||
const rows = readSwarmDashboardRows(registry);
|
||||
|
||||
assert.equal(rows.length, 1);
|
||||
assert.equal(rows[0]?.status, "missing");
|
||||
assert.match(rows[0]?.error ?? "", /projection missing/);
|
||||
});
|
||||
});
|
||||
|
|
@ -319,7 +319,6 @@ export class PtyChatParser {
|
|||
* Feed a raw PTY chunk (may contain ANSI codes, partial lines, etc.)
|
||||
*/
|
||||
feed(chunk: string): void {
|
||||
this._lastInputAt = Date.now();
|
||||
// Any new content resets pending completion — we're still receiving output
|
||||
if (this._completionTimer) {
|
||||
clearTimeout(this._completionTimer);
|
||||
|
|
@ -385,7 +384,6 @@ export class PtyChatParser {
|
|||
this._activeMessage = null;
|
||||
this._pendingSelect = null;
|
||||
this._lastHeaderText = "";
|
||||
this._lastInputAt = 0;
|
||||
this._completionEmitted = false;
|
||||
this._awaitingInput = false;
|
||||
if (this._completionTimer) {
|
||||
|
|
|
|||
156
web/lib/swarm-status.ts
Normal file
156
web/lib/swarm-status.ts
Normal file
|
|
@ -0,0 +1,156 @@
|
|||
import { existsSync, readFileSync, statSync } from "node:fs";
|
||||
import { homedir } from "node:os";
|
||||
import { join, resolve } from "node:path";
|
||||
|
||||
export const SWARMS_REGISTRY_PATH = join(homedir(), ".sf", "swarms.json");
|
||||
export const STATUS_PROJECTION_FILE = "status.projection.json";
|
||||
|
||||
export interface SwarmRegistryEntry {
|
||||
id?: string;
|
||||
name?: string;
|
||||
path?: string;
|
||||
repoPath?: string;
|
||||
}
|
||||
|
||||
export interface SwarmStatusProjection {
|
||||
projectionVersion: number;
|
||||
swarmId: string;
|
||||
repoPath: string;
|
||||
repoName: string;
|
||||
writer: {
|
||||
surface: string;
|
||||
pid: number;
|
||||
writtenAt: string;
|
||||
};
|
||||
supervisor: {
|
||||
kind: string;
|
||||
health: string;
|
||||
currentPid: number | null;
|
||||
};
|
||||
state: {
|
||||
activeMilestoneId: string | null;
|
||||
activeMilestoneTitle: string | null;
|
||||
activeSliceId: string | null;
|
||||
activeSliceTitle: string | null;
|
||||
phase: string;
|
||||
nextAction: string;
|
||||
nextUnitType: string | null;
|
||||
nextUnitId: string | null;
|
||||
queueDepth: number;
|
||||
lastCycleOutcome: string | null;
|
||||
currentUnit: string | null;
|
||||
};
|
||||
health: {
|
||||
verdict: string | null;
|
||||
classification: string | null;
|
||||
issues: number;
|
||||
};
|
||||
}
|
||||
|
||||
export interface SwarmDashboardRow {
|
||||
id: string;
|
||||
name: string;
|
||||
repoPath: string;
|
||||
projectionPath: string;
|
||||
status: "ok" | "missing" | "degraded";
|
||||
error: string | null;
|
||||
projection: SwarmStatusProjection | null;
|
||||
updatedAt: string | null;
|
||||
}
|
||||
|
||||
function normalizeRegistry(raw: unknown): SwarmRegistryEntry[] {
|
||||
if (Array.isArray(raw)) return raw.filter(isRegistryEntry);
|
||||
if (raw && typeof raw === "object") {
|
||||
const entries = (raw as Record<string, unknown>).swarms;
|
||||
if (Array.isArray(entries)) return entries.filter(isRegistryEntry);
|
||||
}
|
||||
return [];
|
||||
}
|
||||
|
||||
function isRegistryEntry(value: unknown): value is SwarmRegistryEntry {
|
||||
if (!value || typeof value !== "object") return false;
|
||||
const entry = value as Record<string, unknown>;
|
||||
return typeof entry.path === "string" || typeof entry.repoPath === "string";
|
||||
}
|
||||
|
||||
function readRegistry(
|
||||
registryPath = SWARMS_REGISTRY_PATH,
|
||||
): SwarmRegistryEntry[] {
|
||||
if (!existsSync(registryPath)) return [];
|
||||
const parsed = JSON.parse(readFileSync(registryPath, "utf-8"));
|
||||
return normalizeRegistry(parsed);
|
||||
}
|
||||
|
||||
function validateProjection(value: unknown): SwarmStatusProjection {
|
||||
if (!value || typeof value !== "object") {
|
||||
throw new Error("projection is not an object");
|
||||
}
|
||||
const projection = value as SwarmStatusProjection;
|
||||
if (projection.projectionVersion !== 1) {
|
||||
throw new Error(
|
||||
`unsupported projectionVersion ${projection.projectionVersion}`,
|
||||
);
|
||||
}
|
||||
if (
|
||||
typeof projection.repoPath !== "string" ||
|
||||
projection.repoPath.length === 0
|
||||
) {
|
||||
throw new Error("projection repoPath missing");
|
||||
}
|
||||
if (!projection.state || typeof projection.state !== "object") {
|
||||
throw new Error("projection state missing");
|
||||
}
|
||||
return projection;
|
||||
}
|
||||
|
||||
export function readSwarmDashboardRows(
|
||||
registryPath = SWARMS_REGISTRY_PATH,
|
||||
): SwarmDashboardRow[] {
|
||||
const entries = readRegistry(registryPath);
|
||||
return entries.map((entry, index) => {
|
||||
const repoPath = resolve(entry.repoPath ?? entry.path ?? "");
|
||||
const projectionPath = join(repoPath, ".sf", STATUS_PROJECTION_FILE);
|
||||
const fallbackName =
|
||||
repoPath.split(/[\\/]/).filter(Boolean).at(-1) ?? repoPath;
|
||||
const id = entry.id ?? repoPath;
|
||||
const name = entry.name ?? fallbackName;
|
||||
if (!existsSync(projectionPath)) {
|
||||
return {
|
||||
id,
|
||||
name,
|
||||
repoPath,
|
||||
projectionPath,
|
||||
status: "missing",
|
||||
error: "status projection missing",
|
||||
projection: null,
|
||||
updatedAt: null,
|
||||
};
|
||||
}
|
||||
try {
|
||||
const projection = validateProjection(
|
||||
JSON.parse(readFileSync(projectionPath, "utf-8")),
|
||||
);
|
||||
return {
|
||||
id: projection.swarmId || id || `swarm-${index + 1}`,
|
||||
name: entry.name ?? projection.repoName ?? name,
|
||||
repoPath,
|
||||
projectionPath,
|
||||
status: "ok",
|
||||
error: null,
|
||||
projection,
|
||||
updatedAt: statSync(projectionPath).mtime.toISOString(),
|
||||
};
|
||||
} catch (error) {
|
||||
return {
|
||||
id,
|
||||
name,
|
||||
repoPath,
|
||||
projectionPath,
|
||||
status: "degraded",
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
projection: null,
|
||||
updatedAt: null,
|
||||
};
|
||||
}
|
||||
});
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue