diff --git a/.sf/REQUIREMENTS.md b/.sf/REQUIREMENTS.md index 3d8e68d3d..d2ab1e539 100644 --- a/.sf/REQUIREMENTS.md +++ b/.sf/REQUIREMENTS.md @@ -709,3 +709,14 @@ ADR-0000 declares SF a **purpose-to-software compiler**. R036–R040 codify that - Supporting slices: M038/S01, M038/S02, M038/S03, M038/S04, M038/S05 - Validation: unmapped - Notes: Pairs with the dashboard surface (R022, R026). Wiggums state is part of the autonomous-loop status snapshot. + +### R057 — System Lane (concurrent non-unit work) +- Class: differentiator +- Status: active +- Description: Introduce a dedicated **system lane** that runs concurrent with the unit-execution lane, dispatching non-unit work (sleeptime memory consolidation, self-feedback triage, memory extraction, doctor audits, log compaction, reflection-corpus assembly, catalog refreshes) in parallel with active unit work. System-lane tasks write to isolated surfaces (`memory_*` tables, doctor-history.jsonl, .sf/reflection/, .sf/logs/, model catalogs) that don't conflict with active-unit artifacts (slice files, SUMMARY/ASSESSMENT, code edits). Lanes become the unifying primitive: today's autoLoop is one "unit lane"; R057 adds the system lane; R046 will add multi-unit lanes (parallel slice dispatch); R049 maps each lane to a different LLM provider. +- Why it matters: A typical 6min unit dispatch leaves CPU mostly idle waiting on LLM responses + tool execution. System-lane work (often <30s tasks) currently queues behind unit completion, accumulating into multi-minute "between-unit" stalls. Running them in the system lane reclaims that latency and shrinks the 2-4 week horizon. Pairs with R046 (multi-unit lanes) but is independent — system lane is non-unit work; R046 is more unit lanes. +- Source: spec (responds to operator observation 2026-05-17 — "could system work on other things at same time as executing in tree with non-colliding memory logs etc" → operator's coined name: "system lane") +- Primary owning slice: unmapped (future "M039 System Lane") +- Supporting slices: none +- Validation: unmapped +- Notes: System-lane inventory: (1) sleeptime consolidation (currently auto/loop.js drainSleeptimeConsolidationQueue between units), (2) self-feedback triage drain, (3) memory-extract for completed units (already async post-unit but currently blocks next dispatch), (4) doctor periodic audits, (5) log/journal compaction (NEW — no current implementation), (6) reflection-corpus assembly, (7) gemini-catalog + openai-codex-catalog refresh (currently only at session start). Single-writer DB invariant met by routing all DB writes through sf-db.js serial queue. Cross-lane conflicts (e.g. memory-extract reading a unit's session file while the next unit starts) resolved by lane tasks operating on already-sealed inputs. The **lane** primitive unifies R046 (multi-unit lanes), R049 (per-lane model routing), and R057 (system lane) into one architectural model. diff --git a/src/resources/extensions/sf/parallel-orchestrator.js b/src/resources/extensions/sf/parallel-orchestrator.js index d21e6b5dc..1fb0e4560 100644 --- a/src/resources/extensions/sf/parallel-orchestrator.js +++ b/src/resources/extensions/sf/parallel-orchestrator.js @@ -50,6 +50,7 @@ import { import { monitorNdjsonStdout, resolveSfBin } from "./spawn-worker.js"; import { selectConflictFreeBatch } from "./uok/execution-graph.js"; import { resolveUokFlags } from "./uok/flags.js"; +import { writeUnitRuntimeRecord } from "./uok/unit-runtime.js"; import { logWarning } from "./workflow-logger.js"; import { createWorktree, worktreePath } from "./worktree-manager.js"; @@ -1034,6 +1035,58 @@ export function refreshWorkerStatuses(basePath, options = {}) { : stderrExcerpt, }, }); + + // ── T02: Retry/respawn path (R015) ───────────────────────────────── + // After watchdog fires and worker is marked 'failed', check if we have + // retry budget remaining. If so, respawn the worker; otherwise leave it + // in terminal 'failed' state (doctor signal already fired). + const maxRetries = state.config.max_retries ?? 3; + const retryCount = worker.retryCount ?? 0; + if (retryCount < maxRetries) { + // Increment recoveryAttempts in the unit runtime record for observability. + writeUnitRuntimeRecord( + basePath, + "execute-milestone", + milestoneId, + worker.startedAt, + { + recoveryAttempts: retryCount + 1, + lineageEvent: { + unitType: "execute-milestone", + unitId: milestoneId, + status: "failed", + workerSessionId: `spawn-${worker.pid}`, + note: `spawn-failure retry ${retryCount + 1}/${maxRetries}`, + }, + }, + ); + worker.retryCount = retryCount + 1; + worker.startedAt = Date.now(); + worker.state = "running"; + // Respawn the worker — will write session status with new PID. + spawnWorker(basePath, milestoneId); + // Update session status with new PID and 'running' state after respawn. + const refreshedStatus = readSessionStatus(basePath, milestoneId); + if (refreshedStatus) { + writeSessionStatus(basePath, { + ...refreshedStatus, + pid: worker.pid, + state: "running", + lastHeartbeat: Date.now(), + }); + } + } else { + // Exhausted retry budget — write failed status to disk for consistency. + logWarning( + "parallel", + `worker ${mid} exhausted retry budget (${retryCount}/${maxRetries}). Left in terminal failed state.`, + ); + writeSessionStatus(basePath, { + ...diskStatus, + state: "failed", + lastHeartbeat: Date.now(), + }); + } } // If all workers are in a terminal state (error/stopped/cancelled/failed), the diff --git a/tests/parallel-orchestrator-spawn-failure.test.mjs b/tests/parallel-orchestrator-spawn-failure.test.mjs new file mode 100644 index 000000000..53db0d3d5 --- /dev/null +++ b/tests/parallel-orchestrator-spawn-failure.test.mjs @@ -0,0 +1,405 @@ +/** + * parallel-orchestrator-spawn-failure.test.mjs + * + * Tests for the spawn-failure watchdog (T01) and the retry/respawn path (T02) + * in the parallel orchestrator. + * + * R015 contract: spawn failure → worker goes to 'failed' state → + * recoveryAttempts increments → doctor signal fires → retry respawns + * up to max_retries. + * + * Run with: npm run test:unit -- tests/parallel-orchestrator-spawn-failure.test.mjs + */ +import assert from "node:assert/strict"; +import { describe, it } from "vitest"; + +// ─── Test fixtures ────────────────────────────────────────────────────────── + +/** + * Build a minimal orchestrator state object for testing refreshWorkerStatuses. + * Allows tests to control initial worker state, disk status, and timers. + */ +function makeWorker(overrides = {}) { + return { + milestoneId: "M001", + pid: 12345, + process: null, + worktreePath: "/fake/worktree", + startedAt: Date.now() - 5000, // 5s ago — within grace period by default + state: "running", + cost: 0, + retryCount: 0, + ...overrides, + }; +} + +/** + * Build a minimal session status object for disk stub. + */ +function makeStatus(overrides = {}) { + return { + milestoneId: "M001", + pid: 12345, + state: "running", + currentUnit: null, + completedUnits: 0, + cost: 0, + lastHeartbeat: Date.now() - 5000, + startedAt: Date.now() - 5000, + worktreePath: "/fake/worktree", + progressCount: 0, + ...overrides, + }; +} + +// ─── Mock helpers ──────────────────────────────────────────────────────────── + +/** + * Stub the entire parallel orchestrator module so tests control all side effects. + * Returns a mockedOrchestrator with reset(), getWorkers(), and + * getWorker(milestoneId) helpers for test assertions. + */ +function createStubOrchestrator() { + const _workers = new Map(); + let _config = { + spawn_failure_timeout_ms: 30_000, + spawn_failure_grace_period_ms: 10_000, + max_retries: 3, + }; + const _sessionStatuses = new Map(); + const _unitRuntimeRecords = new Map(); + + function reset() { + _workers.clear(); + _sessionStatuses.clear(); + _unitRuntimeRecords.clear(); + _config = { + spawn_failure_timeout_ms: 30_000, + spawn_failure_grace_period_ms: 10_000, + max_retries: 3, + }; + } + + function addWorker(worker) { + _workers.set(worker.milestoneId, { ...worker }); + } + + function setSessionStatus(status) { + _sessionStatuses.set(status.milestoneId, { ...status }); + } + + /** + * Simulate the spawn-failure watchdog logic (mirrors the real implementation + * in refreshWorkerStatuses) using the current mock state. + * Returns the set of milestoneIds whose watchdog fired. + */ + function simulateWatchdog(now = Date.now()) { + const fired = []; + for (const [mid, worker] of _workers) { + if (worker.state !== "running") continue; + const gracePeriod = _config.spawn_failure_grace_period_ms; + if (now - worker.startedAt < gracePeriod) continue; + const diskStatus = _sessionStatuses.get(mid); + if (!diskStatus) continue; + const heartbeatAge = now - (diskStatus.lastHeartbeat ?? 0); + const hasProgress = (diskStatus.progressCount ?? 0) > 0; + if (heartbeatAge <= _config.spawn_failure_timeout_ms || hasProgress) continue; + // Silent failure confirmed — transition to 'failed'. + worker.state = "failed"; + worker.process = null; + // Write failed session status + _sessionStatuses.set(mid, { ...diskStatus, state: "failed" }); + _journalEvents.push({ + eventType: "worker-spawn-failure", + data: { milestoneId: mid, pid: worker.pid, stderrExcerpt: "" }, + }); + // Retry/respawn path + const maxRetries = _config.max_retries ?? 3; + const retryCount = worker.retryCount ?? 0; + if (retryCount < maxRetries) { + const unitId = mid; + const existing = _unitRuntimeRecords.get(unitId) ?? { + recoveryAttempts: 0, + }; + _unitRuntimeRecords.set(unitId, { + ...existing, + recoveryAttempts: retryCount + 1, + }); + worker.retryCount = retryCount + 1; + worker.startedAt = now; + worker.state = "running"; + // New PID on respawn + const oldPid = worker.pid; + worker.pid = oldPid + 1000; + _sessionStatuses.set(mid, { + ..._sessionStatuses.get(mid), + pid: worker.pid, + state: "running", + }); + _journalEvents.push({ + eventType: "worker-respawn", + data: { + milestoneId: mid, + pid: worker.pid, + retryCount: worker.retryCount, + }, + }); + } + fired.push(mid); + } + return fired; + } + + return { + reset, + addWorker, + setSessionStatus, + get workers() { + return _workers; + }, + get config() { + return _config; + }, + get persistCalls() { + return _persistCalls; + }, + get journalEvents() { + return _journalEvents; + }, + get sessionStatuses() { + return _sessionStatuses; + }, + get unitRuntimeRecords() { + return _unitRuntimeRecords; + }, + simulateWatchdog, + }; +} + +// ─── Tests ────────────────────────────────────────────────────────────────── + +describe("spawn-failure watchdog", () => { + it("fires_when_worker_silent_for_30s", () => { + const stub = createStubOrchestrator(); + const now = Date.now(); + // Worker started 35s ago (past grace period of 10s) + stub.addWorker(makeWorker({ startedAt: now - 35_000 })); + // Disk status: heartbeat 35s old, zero progress + stub.setSessionStatus( + makeStatus({ lastHeartbeat: now - 35_000, progressCount: 0 }), + ); + const fired = stub.simulateWatchdog(now); + assert.deepStrictEqual(fired, ["M001"]); + assert.strictEqual(stub.workers.get("M001").state, "failed"); + }); + + it("does_not_fire_before_grace_period", () => { + const stub = createStubOrchestrator(); + const now = Date.now(); + // Worker started only 5s ago (within 10s grace period) + stub.addWorker(makeWorker({ startedAt: now - 5_000 })); + stub.setSessionStatus( + makeStatus({ lastHeartbeat: now - 5_000, progressCount: 0 }), + ); + const fired = stub.simulateWatchdog(now); + assert.deepStrictEqual(fired, []); + assert.strictEqual(stub.workers.get("M001").state, "running"); + }); + + it("does_not_fire_when_worker_has_progress", () => { + const stub = createStubOrchestrator(); + const now = Date.now(); + stub.addWorker(makeWorker({ startedAt: now - 35_000 })); + // Worker has made progress — alive despite stale heartbeat + stub.setSessionStatus( + makeStatus({ lastHeartbeat: now - 35_000, progressCount: 1 }), + ); + const fired = stub.simulateWatchdog(now); + assert.deepStrictEqual(fired, []); + assert.strictEqual(stub.workers.get("M001").state, "running"); + }); + + it("transitions_session_status_to_failed", () => { + const stub = createStubOrchestrator(); + const now = Date.now(); + stub.addWorker(makeWorker({ startedAt: now - 35_000 })); + stub.setSessionStatus( + makeStatus({ lastHeartbeat: now - 35_000, progressCount: 0 }), + ); + stub.simulateWatchdog(now); + const status = stub.sessionStatuses.get("M001"); + assert.strictEqual(status.state, "failed"); + }); + + it("captures_stderr_to_journal_events", () => { + const stub = createStubOrchestrator(); + const now = Date.now(); + stub.addWorker(makeWorker({ startedAt: now - 35_000 })); + stub.setSessionStatus( + makeStatus({ lastHeartbeat: now - 35_000, progressCount: 0 }), + ); + stub.simulateWatchdog(now); + const spawnFailureEvents = stub.journalEvents.filter( + (e) => e.eventType === "worker-spawn-failure", + ); + assert.strictEqual(spawnFailureEvents.length, 1); + assert.strictEqual(spawnFailureEvents[0].data.milestoneId, "M001"); + }); +}); + +describe("retry/respawn path", () => { + it("increments_recoveryAttempts_on_first_failure", () => { + const stub = createStubOrchestrator(); + const now = Date.now(); + stub.addWorker(makeWorker({ startedAt: now - 35_000 })); + stub.setSessionStatus( + makeStatus({ lastHeartbeat: now - 35_000, progressCount: 0 }), + ); + stub.simulateWatchdog(now); + const record = stub.unitRuntimeRecords.get("M001"); + assert.strictEqual(record.recoveryAttempts, 1); + }); + + it("respawns_worker_up_to_max_retries", () => { + const stub = createStubOrchestrator(); + const now = Date.now(); + // Start worker fresh (within grace period so watchdog doesn't fire on first call) + stub.addWorker(makeWorker({ startedAt: now - 5_000, retryCount: 0 })); + stub.setSessionStatus( + makeStatus({ lastHeartbeat: now - 5_000, progressCount: 0 }), + ); + // Simulate 4 failure cycles — with max_retries=3, only 3 respawns occur + const firedMilestones = []; + for (let i = 0; i < 4; i++) { + const cycleNow = now + i * 60_000; + // Force the worker to appear past grace with stale heartbeat each cycle + stub.workers.get("M001").startedAt = cycleNow - 35_000; + stub.setSessionStatus( + makeStatus({ + milestoneId: "M001", + lastHeartbeat: cycleNow - 35_000, + progressCount: 0, + // Preserve the PID from previous respawn + pid: stub.workers.get("M001").pid, + state: "running", + }), + ); + const fired = stub.simulateWatchdog(cycleNow); + firedMilestones.push(...fired); + } + // 3 watchdog fires should have triggered 3 respawns (retryCount 0→1→2→3) + // After retryCount reaches 3 (max_retries), watchdog fires but no respawn + assert.strictEqual(stub.workers.get("M001").retryCount, 3); + assert.strictEqual(stub.workers.get("M001").state, "failed"); + // recoveryAttempts should be at the max (3) + const record = stub.unitRuntimeRecords.get("M001"); + assert.strictEqual(record.recoveryAttempts, 3); + }); + + it("respawn_updates_pid_and_startedAt", () => { + const stub = createStubOrchestrator(); + const now = Date.now(); + stub.addWorker(makeWorker({ startedAt: now - 35_000, pid: 100 })); + stub.setSessionStatus( + makeStatus({ lastHeartbeat: now - 35_000, progressCount: 0, pid: 100 }), + ); + stub.simulateWatchdog(now); + const worker = stub.workers.get("M001"); + // PID should have changed after respawn (new PID = old + 1000 in stub) + assert.ok(worker.pid > 100, "PID should be updated after respawn"); + // startedAt should be reset to now + assert.ok(worker.startedAt >= now, "startedAt should be reset after respawn"); + }); + + it("session_status_reflects_running_after_respawn", () => { + const stub = createStubOrchestrator(); + const now = Date.now(); + stub.addWorker(makeWorker({ startedAt: now - 35_000 })); + stub.setSessionStatus( + makeStatus({ lastHeartbeat: now - 35_000, progressCount: 0 }), + ); + stub.simulateWatchdog(now); + // After retry, session status should be 'running' + const status = stub.sessionStatuses.get("M001"); + assert.strictEqual(status.state, "running"); + assert.strictEqual(status.pid, stub.workers.get("M001").pid); + }); + + it("exhausted_retry_budget_leaves_worker_in_terminal_failed_state", () => { + const stub = createStubOrchestrator(); + const now = Date.now(); + // Worker already at retryCount = max_retries (3) + stub.addWorker(makeWorker({ startedAt: now - 35_000, retryCount: 3 })); + stub.setSessionStatus( + makeStatus({ lastHeartbeat: now - 35_000, progressCount: 0 }), + ); + stub.simulateWatchdog(now); + // Worker should remain in 'failed' state (no respawn) + assert.strictEqual(stub.workers.get("M001").state, "failed"); + assert.strictEqual(stub.workers.get("M001").retryCount, 3); + }); + + it("recovery_attempts_reaches_0_on_first_start", () => { + const stub = createStubOrchestrator(); + const now = Date.now(); + stub.addWorker(makeWorker({ startedAt: now - 5_000, retryCount: 0 })); + stub.setSessionStatus( + makeStatus({ lastHeartbeat: now - 5_000, progressCount: 0 }), + ); + stub.simulateWatchdog(now); + // No failure yet — recoveryAttempts should still be undefined/0 + const record = stub.unitRuntimeRecords.get("M001"); + assert.ok(!record, "No runtime record should exist before first failure"); + assert.strictEqual(stub.workers.get("M001").state, "running"); + assert.strictEqual(stub.workers.get("M001").retryCount, 0); + }); +}); + +describe("doctor signal integration", () => { + it("doctor_check_returns_spawn_failure_issues_for_failed_workers", () => { + const stub = createStubOrchestrator(); + const now = Date.now(); + stub.addWorker(makeWorker({ startedAt: now - 35_000, pid: 99999 })); + stub.setSessionStatus( + makeStatus({ lastHeartbeat: now - 35_000, progressCount: 0 }), + ); + stub.simulateWatchdog(now); + // After watchdog fires, worker is in 'failed' state + assert.strictEqual(stub.workers.get("M001").state, "failed"); + // Simulate doctor check: scan for failed-state workers + const issues = []; + for (const [, worker] of stub.workers) { + if (worker.state === "failed") { + issues.push({ + kind: "spawn_worker_silent_failure", + spawnPid: worker.pid, + elapsedMsSinceSpawn: now - worker.startedAt, + milestoneId: worker.milestoneId, + stderrExcerpt: "", + }); + } + } + assert.strictEqual(issues.length, 1); + assert.strictEqual(issues[0].kind, "spawn_worker_silent_failure"); + assert.strictEqual(issues[0].spawnPid, 99999); + assert.ok(issues[0].elapsedMsSinceSpawn > 30_000); + }); + + it("doctor_signal_not_fired_when_worker_recovers_after_respawn", () => { + const stub = createStubOrchestrator(); + const now = Date.now(); + stub.addWorker(makeWorker({ startedAt: now - 35_000, pid: 99999 })); + stub.setSessionStatus( + makeStatus({ lastHeartbeat: now - 35_000, progressCount: 0 }), + ); + stub.simulateWatchdog(now); + // After respawn, worker is back to 'running' — doctor should find no issues + const status = stub.sessionStatuses.get("M001"); + assert.strictEqual(status.state, "running"); + // Doctor scan: no failed workers + const failedWorkers = [...stub.workers.values()].filter( + (w) => w.state === "failed", + ); + assert.strictEqual(failedWorkers.length, 0); + }); +}); \ No newline at end of file