feat: crash recovery for parallel orchestrator (#873)

This commit is contained in:
deseltrus 2026-03-17 18:47:26 +01:00 committed by GitHub
parent 5d86159ea8
commit 10200c43f3
2 changed files with 475 additions and 3 deletions

View file

@ -8,7 +8,14 @@
*/
import { spawn, type ChildProcess } from "node:child_process";
import { existsSync } from "node:fs";
import {
existsSync,
writeFileSync,
readFileSync,
renameSync,
unlinkSync,
mkdirSync,
} from "node:fs";
import { join, dirname } from "node:path";
import { fileURLToPath } from "node:url";
import { gsdRoot } from "./paths.js";
@ -58,6 +65,120 @@ export interface OrchestratorState {
let state: OrchestratorState | null = null;
// ─── Persistence ──────────────────────────────────────────────────────────
const ORCHESTRATOR_STATE_FILE = "orchestrator.json";
const TMP_SUFFIX = ".tmp";
export interface PersistedState {
active: boolean;
workers: Array<{
milestoneId: string;
title: string;
pid: number;
worktreePath: string;
startedAt: number;
state: "running" | "paused" | "stopped" | "error";
completedUnits: number;
cost: number;
}>;
totalCost: number;
startedAt: number;
configSnapshot: { max_workers: number; budget_ceiling?: number };
}
function stateFilePath(basePath: string): string {
return join(gsdRoot(basePath), ORCHESTRATOR_STATE_FILE);
}
/**
* Persist the current orchestrator state to .gsd/orchestrator.json.
* Uses atomic write (tmp + rename) to prevent partial reads.
*/
export function persistState(basePath: string): void {
if (!state) return;
try {
const dir = gsdRoot(basePath);
if (!existsSync(dir)) mkdirSync(dir, { recursive: true });
const persisted: PersistedState = {
active: state.active,
workers: [...state.workers.values()].map((w) => ({
milestoneId: w.milestoneId,
title: w.title,
pid: w.pid,
worktreePath: w.worktreePath,
startedAt: w.startedAt,
state: w.state,
completedUnits: w.completedUnits,
cost: w.cost,
})),
totalCost: state.totalCost,
startedAt: state.startedAt,
configSnapshot: {
max_workers: state.config.max_workers,
budget_ceiling: state.config.budget_ceiling,
},
};
const dest = stateFilePath(basePath);
const tmp = dest + TMP_SUFFIX;
writeFileSync(tmp, JSON.stringify(persisted, null, 2), "utf-8");
renameSync(tmp, dest);
} catch { /* non-fatal */ }
}
/**
* Remove the persisted state file.
*/
function removeStateFile(basePath: string): void {
try {
const p = stateFilePath(basePath);
if (existsSync(p)) unlinkSync(p);
} catch { /* non-fatal */ }
}
function isPidAlive(pid: number): boolean {
try {
process.kill(pid, 0);
return true;
} catch {
return false;
}
}
/**
* Restore orchestrator state from .gsd/orchestrator.json.
* Checks PID liveness for each worker:
* - Living PID state "running", process stays null (no handle)
* - Dead PID removed from restored state
* Returns null if no state file exists or no workers survive.
*/
export function restoreState(basePath: string): PersistedState | null {
try {
const p = stateFilePath(basePath);
if (!existsSync(p)) return null;
const raw = readFileSync(p, "utf-8");
const persisted = JSON.parse(raw) as PersistedState;
// Filter to only workers with living PIDs
persisted.workers = persisted.workers.filter((w) => {
if (w.state === "stopped" || w.state === "error") return false;
return isPidAlive(w.pid);
});
if (persisted.workers.length === 0) {
// No surviving workers — clean up and return null
removeStateFile(basePath);
return null;
}
return persisted;
} catch {
return null;
}
}
// ─── Accessors ─────────────────────────────────────────────────────────────
/** Returns true if the orchestrator is active and has been initialized. */
@ -81,12 +202,26 @@ export function getWorkerStatuses(): WorkerInfo[] {
/**
* Analyze eligibility and prepare for parallel start.
* Returns the candidates report without actually starting workers.
* Also detects orphaned sessions from prior crashes.
*/
export async function prepareParallelStart(
basePath: string,
_prefs: GSDPreferences | undefined,
): Promise<ParallelCandidates> {
return analyzeParallelEligibility(basePath);
): Promise<ParallelCandidates & { orphans?: Array<{ milestoneId: string; pid: number; alive: boolean }> }> {
// Detect orphaned sessions before eligibility analysis
const sessions = readAllSessionStatuses(basePath);
const orphans: Array<{ milestoneId: string; pid: number; alive: boolean }> = [];
for (const session of sessions) {
const alive = isPidAlive(session.pid);
orphans.push({ milestoneId: session.milestoneId, pid: session.pid, alive });
if (!alive) {
// Clean up dead session
removeSessionStatus(basePath, session.milestoneId);
}
}
const candidates = await analyzeParallelEligibility(basePath);
return orphans.length > 0 ? { ...candidates, orphans } : candidates;
}
// ─── Start ─────────────────────────────────────────────────────────────────
@ -106,6 +241,36 @@ export async function startParallel(
}
const config = resolveParallelConfig(prefs);
// Try to restore from a previous crash
const restored = restoreState(basePath);
if (restored && restored.workers.length > 0) {
// Adopt surviving workers instead of starting new ones
state = {
active: true,
workers: new Map(),
config,
totalCost: restored.totalCost,
startedAt: restored.startedAt,
};
const adopted: string[] = [];
for (const w of restored.workers) {
state.workers.set(w.milestoneId, {
milestoneId: w.milestoneId,
title: w.title,
pid: w.pid,
process: null, // no handle for adopted workers
worktreePath: w.worktreePath,
startedAt: w.startedAt,
state: "running",
completedUnits: w.completedUnits,
cost: w.cost,
});
adopted.push(w.milestoneId);
}
return { started: adopted, errors: [] };
}
const now = Date.now();
// Initialize orchestrator state
@ -190,6 +355,9 @@ export async function startParallel(
state.active = false;
}
// Persist state for crash recovery
persistState(basePath);
return { started, errors };
}
@ -503,6 +671,9 @@ export async function stopParallel(
if (!milestoneId) {
state.active = false;
}
// Persist final state and clean up state file
removeStateFile(basePath);
}
// ─── Pause / Resume ────────────────────────────────────────────────────────
@ -589,6 +760,9 @@ export function refreshWorkerStatuses(basePath: string): void {
for (const worker of state.workers.values()) {
state.totalCost += worker.cost;
}
// Persist updated state for crash recovery
persistState(basePath);
}
// ─── Budget ────────────────────────────────────────────────────────────────

View file

@ -0,0 +1,298 @@
/**
* Tests for parallel orchestrator crash recovery.
*
* Validates that orchestrator state is persisted to disk and can be
* restored after a coordinator crash, with PID liveness filtering.
*/
import {
mkdtempSync,
mkdirSync,
readFileSync,
writeFileSync,
existsSync,
rmSync,
} from "node:fs";
import { join } from "node:path";
import { tmpdir } from "node:os";
import {
persistState,
restoreState,
resetOrchestrator,
getOrchestratorState,
type PersistedState,
} from "../parallel-orchestrator.ts";
import { writeSessionStatus, readAllSessionStatuses, removeSessionStatus } from "../session-status-io.ts";
import { createTestContext } from './test-helpers.ts';
const { assertEq, assertTrue, report } = createTestContext();
// ─── Helpers ──────────────────────────────────────────────────────────────────
function makeTempDir(): string {
const dir = mkdtempSync(join(tmpdir(), "gsd-crash-recovery-"));
mkdirSync(join(dir, ".gsd"), { recursive: true });
return dir;
}
function stateFilePath(basePath: string): string {
return join(basePath, ".gsd", "orchestrator.json");
}
function writeStateFile(basePath: string, state: PersistedState): void {
writeFileSync(stateFilePath(basePath), JSON.stringify(state, null, 2), "utf-8");
}
function makePersistedState(overrides: Partial<PersistedState> = {}): PersistedState {
return {
active: true,
workers: [],
totalCost: 0,
startedAt: Date.now(),
configSnapshot: { max_workers: 3 },
...overrides,
};
}
// ─── Tests ────────────────────────────────────────────────────────────────────
// Test 1: persistState writes valid JSON
{
const basePath = makeTempDir();
try {
// We can't call persistState directly without internal state set up,
// so we test the round-trip by writing a state file and reading it back
const state = makePersistedState({
workers: [
{
milestoneId: "M001",
title: "M001",
pid: process.pid,
worktreePath: "/tmp/wt-M001",
startedAt: Date.now(),
state: "running",
completedUnits: 3,
cost: 0.15,
},
],
totalCost: 0.15,
});
writeStateFile(basePath, state);
const raw = readFileSync(stateFilePath(basePath), "utf-8");
const parsed = JSON.parse(raw) as PersistedState;
assertEq(parsed.active, true, "persistState: active field preserved");
assertEq(parsed.workers.length, 1, "persistState: worker count preserved");
assertEq(parsed.workers[0].milestoneId, "M001", "persistState: milestoneId preserved");
assertEq(parsed.workers[0].cost, 0.15, "persistState: cost preserved");
assertEq(parsed.totalCost, 0.15, "persistState: totalCost preserved");
} finally {
rmSync(basePath, { recursive: true, force: true });
}
}
// Test 2: restoreState returns null for missing file
{
const basePath = makeTempDir();
try {
const result = restoreState(basePath);
assertEq(result, null, "restoreState: returns null when no state file");
} finally {
rmSync(basePath, { recursive: true, force: true });
}
}
// Test 3: restoreState filters dead PIDs
{
const basePath = makeTempDir();
try {
// PID 99999999 is almost certainly not alive
const state = makePersistedState({
workers: [
{
milestoneId: "M001",
title: "M001",
pid: 99999999,
worktreePath: "/tmp/wt-M001",
startedAt: Date.now(),
state: "running",
completedUnits: 0,
cost: 0,
},
{
milestoneId: "M002",
title: "M002",
pid: 99999998,
worktreePath: "/tmp/wt-M002",
startedAt: Date.now(),
state: "running",
completedUnits: 0,
cost: 0,
},
],
});
writeStateFile(basePath, state);
const result = restoreState(basePath);
// Both PIDs are dead, so result should be null and file should be cleaned up
assertEq(result, null, "restoreState: returns null when all PIDs dead");
assertTrue(!existsSync(stateFilePath(basePath)), "restoreState: cleans up state file when all dead");
} finally {
rmSync(basePath, { recursive: true, force: true });
}
}
// Test 4: restoreState keeps alive PIDs
{
const basePath = makeTempDir();
try {
// Use current process PID (definitely alive)
const state = makePersistedState({
workers: [
{
milestoneId: "M001",
title: "M001",
pid: process.pid,
worktreePath: "/tmp/wt-M001",
startedAt: Date.now(),
state: "running",
completedUnits: 5,
cost: 0.25,
},
{
milestoneId: "M002",
title: "M002",
pid: 99999999, // dead
worktreePath: "/tmp/wt-M002",
startedAt: Date.now(),
state: "running",
completedUnits: 0,
cost: 0,
},
],
totalCost: 0.25,
});
writeStateFile(basePath, state);
const result = restoreState(basePath);
assertTrue(result !== null, "restoreState: returns state when alive PID exists");
assertEq(result!.workers.length, 1, "restoreState: filters out dead PID");
assertEq(result!.workers[0].milestoneId, "M001", "restoreState: keeps alive worker");
assertEq(result!.workers[0].pid, process.pid, "restoreState: preserves PID");
assertEq(result!.workers[0].completedUnits, 5, "restoreState: preserves progress");
} finally {
rmSync(basePath, { recursive: true, force: true });
}
}
// Test 5: restoreState skips stopped/error workers even with alive PIDs
{
const basePath = makeTempDir();
try {
const state = makePersistedState({
workers: [
{
milestoneId: "M001",
title: "M001",
pid: process.pid,
worktreePath: "/tmp/wt-M001",
startedAt: Date.now(),
state: "stopped",
completedUnits: 10,
cost: 0.50,
},
],
});
writeStateFile(basePath, state);
const result = restoreState(basePath);
assertEq(result, null, "restoreState: skips stopped workers");
} finally {
rmSync(basePath, { recursive: true, force: true });
}
}
// Test 6: orphan detection finds stale sessions
{
const basePath = makeTempDir();
try {
// Write a session status with a dead PID
mkdirSync(join(basePath, ".gsd", "parallel"), { recursive: true });
writeSessionStatus(basePath, {
milestoneId: "M001",
pid: 99999999,
state: "running",
currentUnit: null,
completedUnits: 3,
cost: 0.10,
lastHeartbeat: Date.now(),
startedAt: Date.now(),
worktreePath: "/tmp/wt-M001",
});
// Write a session status with alive PID
writeSessionStatus(basePath, {
milestoneId: "M002",
pid: process.pid,
state: "running",
currentUnit: null,
completedUnits: 1,
cost: 0.05,
lastHeartbeat: Date.now(),
startedAt: Date.now(),
worktreePath: "/tmp/wt-M002",
});
// Read all sessions — both should exist initially
const before = readAllSessionStatuses(basePath);
assertEq(before.length, 2, "orphan: both sessions exist before detection");
// Now simulate orphan detection logic (same as prepareParallelStart)
const sessions = readAllSessionStatuses(basePath);
const orphans: Array<{ milestoneId: string; pid: number; alive: boolean }> = [];
for (const session of sessions) {
let alive: boolean;
try {
process.kill(session.pid, 0);
alive = true;
} catch {
alive = false;
}
orphans.push({ milestoneId: session.milestoneId, pid: session.pid, alive });
if (!alive) {
removeSessionStatus(basePath, session.milestoneId);
}
}
assertTrue(orphans.length === 2, "orphan: detected both sessions");
const deadOrphan = orphans.find(o => o.milestoneId === "M001");
assertTrue(deadOrphan !== undefined && !deadOrphan.alive, "orphan: M001 detected as dead");
const aliveOrphan = orphans.find(o => o.milestoneId === "M002");
assertTrue(aliveOrphan !== undefined && aliveOrphan.alive, "orphan: M002 detected as alive");
// Dead session should be cleaned up
const after = readAllSessionStatuses(basePath);
assertEq(after.length, 1, "orphan: dead session cleaned up");
assertEq(after[0].milestoneId, "M002", "orphan: alive session remains");
} finally {
rmSync(basePath, { recursive: true, force: true });
}
}
// Test 7: restoreState handles corrupt JSON gracefully
{
const basePath = makeTempDir();
try {
writeFileSync(stateFilePath(basePath), "{ not valid json !!!", "utf-8");
const result = restoreState(basePath);
assertEq(result, null, "restoreState: returns null for corrupt JSON");
} finally {
rmSync(basePath, { recursive: true, force: true });
}
}
// Clean up module state
resetOrchestrator();
report();