Operator coined "system lane" — better than my "side-track". Frames the architecture cleanly. The lane primitive unifies: - R046 (multi-unit lanes) parallel slice dispatch - R049 (per-lane model routing) different LLM per lane - R057 (system lane) non-unit work alongside unit lane Today autoLoop is 1 unit lane. System lane runs alongside for memory consolidation, triage drain, doctor audits, log compaction, reflection assembly, catalog refresh — all currently queued between units. Single-writer DB met by sf-db.js serial queue. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
405 lines
No EOL
14 KiB
JavaScript
405 lines
No EOL
14 KiB
JavaScript
/**
|
|
* parallel-orchestrator-spawn-failure.test.mjs
|
|
*
|
|
* Tests for the spawn-failure watchdog (T01) and the retry/respawn path (T02)
|
|
* in the parallel orchestrator.
|
|
*
|
|
* R015 contract: spawn failure → worker goes to 'failed' state →
|
|
* recoveryAttempts increments → doctor signal fires → retry respawns
|
|
* up to max_retries.
|
|
*
|
|
* Run with: npm run test:unit -- tests/parallel-orchestrator-spawn-failure.test.mjs
|
|
*/
|
|
import assert from "node:assert/strict";
|
|
import { describe, it } from "vitest";
|
|
|
|
// ─── Test fixtures ──────────────────────────────────────────────────────────
|
|
|
|
/**
|
|
* Build a minimal orchestrator state object for testing refreshWorkerStatuses.
|
|
* Allows tests to control initial worker state, disk status, and timers.
|
|
*/
|
|
function makeWorker(overrides = {}) {
|
|
return {
|
|
milestoneId: "M001",
|
|
pid: 12345,
|
|
process: null,
|
|
worktreePath: "/fake/worktree",
|
|
startedAt: Date.now() - 5000, // 5s ago — within grace period by default
|
|
state: "running",
|
|
cost: 0,
|
|
retryCount: 0,
|
|
...overrides,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Build a minimal session status object for disk stub.
|
|
*/
|
|
function makeStatus(overrides = {}) {
|
|
return {
|
|
milestoneId: "M001",
|
|
pid: 12345,
|
|
state: "running",
|
|
currentUnit: null,
|
|
completedUnits: 0,
|
|
cost: 0,
|
|
lastHeartbeat: Date.now() - 5000,
|
|
startedAt: Date.now() - 5000,
|
|
worktreePath: "/fake/worktree",
|
|
progressCount: 0,
|
|
...overrides,
|
|
};
|
|
}
|
|
|
|
// ─── Mock helpers ────────────────────────────────────────────────────────────
|
|
|
|
/**
|
|
* Stub the entire parallel orchestrator module so tests control all side effects.
|
|
* Returns a mockedOrchestrator with reset(), getWorkers(), and
|
|
* getWorker(milestoneId) helpers for test assertions.
|
|
*/
|
|
function createStubOrchestrator() {
|
|
const _workers = new Map();
|
|
let _config = {
|
|
spawn_failure_timeout_ms: 30_000,
|
|
spawn_failure_grace_period_ms: 10_000,
|
|
max_retries: 3,
|
|
};
|
|
const _sessionStatuses = new Map();
|
|
const _unitRuntimeRecords = new Map();
|
|
|
|
function reset() {
|
|
_workers.clear();
|
|
_sessionStatuses.clear();
|
|
_unitRuntimeRecords.clear();
|
|
_config = {
|
|
spawn_failure_timeout_ms: 30_000,
|
|
spawn_failure_grace_period_ms: 10_000,
|
|
max_retries: 3,
|
|
};
|
|
}
|
|
|
|
function addWorker(worker) {
|
|
_workers.set(worker.milestoneId, { ...worker });
|
|
}
|
|
|
|
function setSessionStatus(status) {
|
|
_sessionStatuses.set(status.milestoneId, { ...status });
|
|
}
|
|
|
|
/**
|
|
* Simulate the spawn-failure watchdog logic (mirrors the real implementation
|
|
* in refreshWorkerStatuses) using the current mock state.
|
|
* Returns the set of milestoneIds whose watchdog fired.
|
|
*/
|
|
function simulateWatchdog(now = Date.now()) {
|
|
const fired = [];
|
|
for (const [mid, worker] of _workers) {
|
|
if (worker.state !== "running") continue;
|
|
const gracePeriod = _config.spawn_failure_grace_period_ms;
|
|
if (now - worker.startedAt < gracePeriod) continue;
|
|
const diskStatus = _sessionStatuses.get(mid);
|
|
if (!diskStatus) continue;
|
|
const heartbeatAge = now - (diskStatus.lastHeartbeat ?? 0);
|
|
const hasProgress = (diskStatus.progressCount ?? 0) > 0;
|
|
if (heartbeatAge <= _config.spawn_failure_timeout_ms || hasProgress) continue;
|
|
// Silent failure confirmed — transition to 'failed'.
|
|
worker.state = "failed";
|
|
worker.process = null;
|
|
// Write failed session status
|
|
_sessionStatuses.set(mid, { ...diskStatus, state: "failed" });
|
|
_journalEvents.push({
|
|
eventType: "worker-spawn-failure",
|
|
data: { milestoneId: mid, pid: worker.pid, stderrExcerpt: "" },
|
|
});
|
|
// Retry/respawn path
|
|
const maxRetries = _config.max_retries ?? 3;
|
|
const retryCount = worker.retryCount ?? 0;
|
|
if (retryCount < maxRetries) {
|
|
const unitId = mid;
|
|
const existing = _unitRuntimeRecords.get(unitId) ?? {
|
|
recoveryAttempts: 0,
|
|
};
|
|
_unitRuntimeRecords.set(unitId, {
|
|
...existing,
|
|
recoveryAttempts: retryCount + 1,
|
|
});
|
|
worker.retryCount = retryCount + 1;
|
|
worker.startedAt = now;
|
|
worker.state = "running";
|
|
// New PID on respawn
|
|
const oldPid = worker.pid;
|
|
worker.pid = oldPid + 1000;
|
|
_sessionStatuses.set(mid, {
|
|
..._sessionStatuses.get(mid),
|
|
pid: worker.pid,
|
|
state: "running",
|
|
});
|
|
_journalEvents.push({
|
|
eventType: "worker-respawn",
|
|
data: {
|
|
milestoneId: mid,
|
|
pid: worker.pid,
|
|
retryCount: worker.retryCount,
|
|
},
|
|
});
|
|
}
|
|
fired.push(mid);
|
|
}
|
|
return fired;
|
|
}
|
|
|
|
return {
|
|
reset,
|
|
addWorker,
|
|
setSessionStatus,
|
|
get workers() {
|
|
return _workers;
|
|
},
|
|
get config() {
|
|
return _config;
|
|
},
|
|
get persistCalls() {
|
|
return _persistCalls;
|
|
},
|
|
get journalEvents() {
|
|
return _journalEvents;
|
|
},
|
|
get sessionStatuses() {
|
|
return _sessionStatuses;
|
|
},
|
|
get unitRuntimeRecords() {
|
|
return _unitRuntimeRecords;
|
|
},
|
|
simulateWatchdog,
|
|
};
|
|
}
|
|
|
|
// ─── Tests ──────────────────────────────────────────────────────────────────
|
|
|
|
describe("spawn-failure watchdog", () => {
|
|
it("fires_when_worker_silent_for_30s", () => {
|
|
const stub = createStubOrchestrator();
|
|
const now = Date.now();
|
|
// Worker started 35s ago (past grace period of 10s)
|
|
stub.addWorker(makeWorker({ startedAt: now - 35_000 }));
|
|
// Disk status: heartbeat 35s old, zero progress
|
|
stub.setSessionStatus(
|
|
makeStatus({ lastHeartbeat: now - 35_000, progressCount: 0 }),
|
|
);
|
|
const fired = stub.simulateWatchdog(now);
|
|
assert.deepStrictEqual(fired, ["M001"]);
|
|
assert.strictEqual(stub.workers.get("M001").state, "failed");
|
|
});
|
|
|
|
it("does_not_fire_before_grace_period", () => {
|
|
const stub = createStubOrchestrator();
|
|
const now = Date.now();
|
|
// Worker started only 5s ago (within 10s grace period)
|
|
stub.addWorker(makeWorker({ startedAt: now - 5_000 }));
|
|
stub.setSessionStatus(
|
|
makeStatus({ lastHeartbeat: now - 5_000, progressCount: 0 }),
|
|
);
|
|
const fired = stub.simulateWatchdog(now);
|
|
assert.deepStrictEqual(fired, []);
|
|
assert.strictEqual(stub.workers.get("M001").state, "running");
|
|
});
|
|
|
|
it("does_not_fire_when_worker_has_progress", () => {
|
|
const stub = createStubOrchestrator();
|
|
const now = Date.now();
|
|
stub.addWorker(makeWorker({ startedAt: now - 35_000 }));
|
|
// Worker has made progress — alive despite stale heartbeat
|
|
stub.setSessionStatus(
|
|
makeStatus({ lastHeartbeat: now - 35_000, progressCount: 1 }),
|
|
);
|
|
const fired = stub.simulateWatchdog(now);
|
|
assert.deepStrictEqual(fired, []);
|
|
assert.strictEqual(stub.workers.get("M001").state, "running");
|
|
});
|
|
|
|
it("transitions_session_status_to_failed", () => {
|
|
const stub = createStubOrchestrator();
|
|
const now = Date.now();
|
|
stub.addWorker(makeWorker({ startedAt: now - 35_000 }));
|
|
stub.setSessionStatus(
|
|
makeStatus({ lastHeartbeat: now - 35_000, progressCount: 0 }),
|
|
);
|
|
stub.simulateWatchdog(now);
|
|
const status = stub.sessionStatuses.get("M001");
|
|
assert.strictEqual(status.state, "failed");
|
|
});
|
|
|
|
it("captures_stderr_to_journal_events", () => {
|
|
const stub = createStubOrchestrator();
|
|
const now = Date.now();
|
|
stub.addWorker(makeWorker({ startedAt: now - 35_000 }));
|
|
stub.setSessionStatus(
|
|
makeStatus({ lastHeartbeat: now - 35_000, progressCount: 0 }),
|
|
);
|
|
stub.simulateWatchdog(now);
|
|
const spawnFailureEvents = stub.journalEvents.filter(
|
|
(e) => e.eventType === "worker-spawn-failure",
|
|
);
|
|
assert.strictEqual(spawnFailureEvents.length, 1);
|
|
assert.strictEqual(spawnFailureEvents[0].data.milestoneId, "M001");
|
|
});
|
|
});
|
|
|
|
describe("retry/respawn path", () => {
|
|
it("increments_recoveryAttempts_on_first_failure", () => {
|
|
const stub = createStubOrchestrator();
|
|
const now = Date.now();
|
|
stub.addWorker(makeWorker({ startedAt: now - 35_000 }));
|
|
stub.setSessionStatus(
|
|
makeStatus({ lastHeartbeat: now - 35_000, progressCount: 0 }),
|
|
);
|
|
stub.simulateWatchdog(now);
|
|
const record = stub.unitRuntimeRecords.get("M001");
|
|
assert.strictEqual(record.recoveryAttempts, 1);
|
|
});
|
|
|
|
it("respawns_worker_up_to_max_retries", () => {
|
|
const stub = createStubOrchestrator();
|
|
const now = Date.now();
|
|
// Start worker fresh (within grace period so watchdog doesn't fire on first call)
|
|
stub.addWorker(makeWorker({ startedAt: now - 5_000, retryCount: 0 }));
|
|
stub.setSessionStatus(
|
|
makeStatus({ lastHeartbeat: now - 5_000, progressCount: 0 }),
|
|
);
|
|
// Simulate 4 failure cycles — with max_retries=3, only 3 respawns occur
|
|
const firedMilestones = [];
|
|
for (let i = 0; i < 4; i++) {
|
|
const cycleNow = now + i * 60_000;
|
|
// Force the worker to appear past grace with stale heartbeat each cycle
|
|
stub.workers.get("M001").startedAt = cycleNow - 35_000;
|
|
stub.setSessionStatus(
|
|
makeStatus({
|
|
milestoneId: "M001",
|
|
lastHeartbeat: cycleNow - 35_000,
|
|
progressCount: 0,
|
|
// Preserve the PID from previous respawn
|
|
pid: stub.workers.get("M001").pid,
|
|
state: "running",
|
|
}),
|
|
);
|
|
const fired = stub.simulateWatchdog(cycleNow);
|
|
firedMilestones.push(...fired);
|
|
}
|
|
// 3 watchdog fires should have triggered 3 respawns (retryCount 0→1→2→3)
|
|
// After retryCount reaches 3 (max_retries), watchdog fires but no respawn
|
|
assert.strictEqual(stub.workers.get("M001").retryCount, 3);
|
|
assert.strictEqual(stub.workers.get("M001").state, "failed");
|
|
// recoveryAttempts should be at the max (3)
|
|
const record = stub.unitRuntimeRecords.get("M001");
|
|
assert.strictEqual(record.recoveryAttempts, 3);
|
|
});
|
|
|
|
it("respawn_updates_pid_and_startedAt", () => {
|
|
const stub = createStubOrchestrator();
|
|
const now = Date.now();
|
|
stub.addWorker(makeWorker({ startedAt: now - 35_000, pid: 100 }));
|
|
stub.setSessionStatus(
|
|
makeStatus({ lastHeartbeat: now - 35_000, progressCount: 0, pid: 100 }),
|
|
);
|
|
stub.simulateWatchdog(now);
|
|
const worker = stub.workers.get("M001");
|
|
// PID should have changed after respawn (new PID = old + 1000 in stub)
|
|
assert.ok(worker.pid > 100, "PID should be updated after respawn");
|
|
// startedAt should be reset to now
|
|
assert.ok(worker.startedAt >= now, "startedAt should be reset after respawn");
|
|
});
|
|
|
|
it("session_status_reflects_running_after_respawn", () => {
|
|
const stub = createStubOrchestrator();
|
|
const now = Date.now();
|
|
stub.addWorker(makeWorker({ startedAt: now - 35_000 }));
|
|
stub.setSessionStatus(
|
|
makeStatus({ lastHeartbeat: now - 35_000, progressCount: 0 }),
|
|
);
|
|
stub.simulateWatchdog(now);
|
|
// After retry, session status should be 'running'
|
|
const status = stub.sessionStatuses.get("M001");
|
|
assert.strictEqual(status.state, "running");
|
|
assert.strictEqual(status.pid, stub.workers.get("M001").pid);
|
|
});
|
|
|
|
it("exhausted_retry_budget_leaves_worker_in_terminal_failed_state", () => {
|
|
const stub = createStubOrchestrator();
|
|
const now = Date.now();
|
|
// Worker already at retryCount = max_retries (3)
|
|
stub.addWorker(makeWorker({ startedAt: now - 35_000, retryCount: 3 }));
|
|
stub.setSessionStatus(
|
|
makeStatus({ lastHeartbeat: now - 35_000, progressCount: 0 }),
|
|
);
|
|
stub.simulateWatchdog(now);
|
|
// Worker should remain in 'failed' state (no respawn)
|
|
assert.strictEqual(stub.workers.get("M001").state, "failed");
|
|
assert.strictEqual(stub.workers.get("M001").retryCount, 3);
|
|
});
|
|
|
|
it("recovery_attempts_reaches_0_on_first_start", () => {
|
|
const stub = createStubOrchestrator();
|
|
const now = Date.now();
|
|
stub.addWorker(makeWorker({ startedAt: now - 5_000, retryCount: 0 }));
|
|
stub.setSessionStatus(
|
|
makeStatus({ lastHeartbeat: now - 5_000, progressCount: 0 }),
|
|
);
|
|
stub.simulateWatchdog(now);
|
|
// No failure yet — recoveryAttempts should still be undefined/0
|
|
const record = stub.unitRuntimeRecords.get("M001");
|
|
assert.ok(!record, "No runtime record should exist before first failure");
|
|
assert.strictEqual(stub.workers.get("M001").state, "running");
|
|
assert.strictEqual(stub.workers.get("M001").retryCount, 0);
|
|
});
|
|
});
|
|
|
|
describe("doctor signal integration", () => {
|
|
it("doctor_check_returns_spawn_failure_issues_for_failed_workers", () => {
|
|
const stub = createStubOrchestrator();
|
|
const now = Date.now();
|
|
stub.addWorker(makeWorker({ startedAt: now - 35_000, pid: 99999 }));
|
|
stub.setSessionStatus(
|
|
makeStatus({ lastHeartbeat: now - 35_000, progressCount: 0 }),
|
|
);
|
|
stub.simulateWatchdog(now);
|
|
// After watchdog fires, worker is in 'failed' state
|
|
assert.strictEqual(stub.workers.get("M001").state, "failed");
|
|
// Simulate doctor check: scan for failed-state workers
|
|
const issues = [];
|
|
for (const [, worker] of stub.workers) {
|
|
if (worker.state === "failed") {
|
|
issues.push({
|
|
kind: "spawn_worker_silent_failure",
|
|
spawnPid: worker.pid,
|
|
elapsedMsSinceSpawn: now - worker.startedAt,
|
|
milestoneId: worker.milestoneId,
|
|
stderrExcerpt: "",
|
|
});
|
|
}
|
|
}
|
|
assert.strictEqual(issues.length, 1);
|
|
assert.strictEqual(issues[0].kind, "spawn_worker_silent_failure");
|
|
assert.strictEqual(issues[0].spawnPid, 99999);
|
|
assert.ok(issues[0].elapsedMsSinceSpawn > 30_000);
|
|
});
|
|
|
|
it("doctor_signal_not_fired_when_worker_recovers_after_respawn", () => {
|
|
const stub = createStubOrchestrator();
|
|
const now = Date.now();
|
|
stub.addWorker(makeWorker({ startedAt: now - 35_000, pid: 99999 }));
|
|
stub.setSessionStatus(
|
|
makeStatus({ lastHeartbeat: now - 35_000, progressCount: 0 }),
|
|
);
|
|
stub.simulateWatchdog(now);
|
|
// After respawn, worker is back to 'running' — doctor should find no issues
|
|
const status = stub.sessionStatuses.get("M001");
|
|
assert.strictEqual(status.state, "running");
|
|
// Doctor scan: no failed workers
|
|
const failedWorkers = [...stub.workers.values()].filter(
|
|
(w) => w.state === "failed",
|
|
);
|
|
assert.strictEqual(failedWorkers.length, 0);
|
|
});
|
|
}); |