diff --git a/src/resources/extensions/gsd/parallel-orchestrator.ts b/src/resources/extensions/gsd/parallel-orchestrator.ts index db5dfabff..e0375c53e 100644 --- a/src/resources/extensions/gsd/parallel-orchestrator.ts +++ b/src/resources/extensions/gsd/parallel-orchestrator.ts @@ -8,7 +8,14 @@ */ import { spawn, type ChildProcess } from "node:child_process"; -import { existsSync } from "node:fs"; +import { + existsSync, + writeFileSync, + readFileSync, + renameSync, + unlinkSync, + mkdirSync, +} from "node:fs"; import { join, dirname } from "node:path"; import { fileURLToPath } from "node:url"; import { gsdRoot } from "./paths.js"; @@ -58,6 +65,120 @@ export interface OrchestratorState { let state: OrchestratorState | null = null; +// ─── Persistence ────────────────────────────────────────────────────────── + +const ORCHESTRATOR_STATE_FILE = "orchestrator.json"; +const TMP_SUFFIX = ".tmp"; + +export interface PersistedState { + active: boolean; + workers: Array<{ + milestoneId: string; + title: string; + pid: number; + worktreePath: string; + startedAt: number; + state: "running" | "paused" | "stopped" | "error"; + completedUnits: number; + cost: number; + }>; + totalCost: number; + startedAt: number; + configSnapshot: { max_workers: number; budget_ceiling?: number }; +} + +function stateFilePath(basePath: string): string { + return join(gsdRoot(basePath), ORCHESTRATOR_STATE_FILE); +} + +/** + * Persist the current orchestrator state to .gsd/orchestrator.json. + * Uses atomic write (tmp + rename) to prevent partial reads. + */ +export function persistState(basePath: string): void { + if (!state) return; + try { + const dir = gsdRoot(basePath); + if (!existsSync(dir)) mkdirSync(dir, { recursive: true }); + + const persisted: PersistedState = { + active: state.active, + workers: [...state.workers.values()].map((w) => ({ + milestoneId: w.milestoneId, + title: w.title, + pid: w.pid, + worktreePath: w.worktreePath, + startedAt: w.startedAt, + state: w.state, + completedUnits: w.completedUnits, + cost: w.cost, + })), + totalCost: state.totalCost, + startedAt: state.startedAt, + configSnapshot: { + max_workers: state.config.max_workers, + budget_ceiling: state.config.budget_ceiling, + }, + }; + + const dest = stateFilePath(basePath); + const tmp = dest + TMP_SUFFIX; + writeFileSync(tmp, JSON.stringify(persisted, null, 2), "utf-8"); + renameSync(tmp, dest); + } catch { /* non-fatal */ } +} + +/** + * Remove the persisted state file. + */ +function removeStateFile(basePath: string): void { + try { + const p = stateFilePath(basePath); + if (existsSync(p)) unlinkSync(p); + } catch { /* non-fatal */ } +} + +function isPidAlive(pid: number): boolean { + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } +} + +/** + * Restore orchestrator state from .gsd/orchestrator.json. + * Checks PID liveness for each worker: + * - Living PID → state "running", process stays null (no handle) + * - Dead PID → removed from restored state + * Returns null if no state file exists or no workers survive. + */ +export function restoreState(basePath: string): PersistedState | null { + try { + const p = stateFilePath(basePath); + if (!existsSync(p)) return null; + const raw = readFileSync(p, "utf-8"); + const persisted = JSON.parse(raw) as PersistedState; + + // Filter to only workers with living PIDs + persisted.workers = persisted.workers.filter((w) => { + if (w.state === "stopped" || w.state === "error") return false; + return isPidAlive(w.pid); + }); + + if (persisted.workers.length === 0) { + // No surviving workers — clean up and return null + removeStateFile(basePath); + return null; + } + + return persisted; + } catch { + return null; + } +} + // ─── Accessors ───────────────────────────────────────────────────────────── /** Returns true if the orchestrator is active and has been initialized. */ @@ -81,12 +202,26 @@ export function getWorkerStatuses(): WorkerInfo[] { /** * Analyze eligibility and prepare for parallel start. * Returns the candidates report without actually starting workers. + * Also detects orphaned sessions from prior crashes. */ export async function prepareParallelStart( basePath: string, _prefs: GSDPreferences | undefined, -): Promise { - return analyzeParallelEligibility(basePath); +): Promise }> { + // Detect orphaned sessions before eligibility analysis + const sessions = readAllSessionStatuses(basePath); + const orphans: Array<{ milestoneId: string; pid: number; alive: boolean }> = []; + for (const session of sessions) { + const alive = isPidAlive(session.pid); + orphans.push({ milestoneId: session.milestoneId, pid: session.pid, alive }); + if (!alive) { + // Clean up dead session + removeSessionStatus(basePath, session.milestoneId); + } + } + + const candidates = await analyzeParallelEligibility(basePath); + return orphans.length > 0 ? { ...candidates, orphans } : candidates; } // ─── Start ───────────────────────────────────────────────────────────────── @@ -106,6 +241,36 @@ export async function startParallel( } const config = resolveParallelConfig(prefs); + + // Try to restore from a previous crash + const restored = restoreState(basePath); + if (restored && restored.workers.length > 0) { + // Adopt surviving workers instead of starting new ones + state = { + active: true, + workers: new Map(), + config, + totalCost: restored.totalCost, + startedAt: restored.startedAt, + }; + const adopted: string[] = []; + for (const w of restored.workers) { + state.workers.set(w.milestoneId, { + milestoneId: w.milestoneId, + title: w.title, + pid: w.pid, + process: null, // no handle for adopted workers + worktreePath: w.worktreePath, + startedAt: w.startedAt, + state: "running", + completedUnits: w.completedUnits, + cost: w.cost, + }); + adopted.push(w.milestoneId); + } + return { started: adopted, errors: [] }; + } + const now = Date.now(); // Initialize orchestrator state @@ -190,6 +355,9 @@ export async function startParallel( state.active = false; } + // Persist state for crash recovery + persistState(basePath); + return { started, errors }; } @@ -503,6 +671,9 @@ export async function stopParallel( if (!milestoneId) { state.active = false; } + + // Persist final state and clean up state file + removeStateFile(basePath); } // ─── Pause / Resume ──────────────────────────────────────────────────────── @@ -589,6 +760,9 @@ export function refreshWorkerStatuses(basePath: string): void { for (const worker of state.workers.values()) { state.totalCost += worker.cost; } + + // Persist updated state for crash recovery + persistState(basePath); } // ─── Budget ──────────────────────────────────────────────────────────────── diff --git a/src/resources/extensions/gsd/tests/parallel-crash-recovery.test.ts b/src/resources/extensions/gsd/tests/parallel-crash-recovery.test.ts new file mode 100644 index 000000000..9e38c7262 --- /dev/null +++ b/src/resources/extensions/gsd/tests/parallel-crash-recovery.test.ts @@ -0,0 +1,298 @@ +/** + * Tests for parallel orchestrator crash recovery. + * + * Validates that orchestrator state is persisted to disk and can be + * restored after a coordinator crash, with PID liveness filtering. + */ + +import { + mkdtempSync, + mkdirSync, + readFileSync, + writeFileSync, + existsSync, + rmSync, +} from "node:fs"; +import { join } from "node:path"; +import { tmpdir } from "node:os"; + +import { + persistState, + restoreState, + resetOrchestrator, + getOrchestratorState, + type PersistedState, +} from "../parallel-orchestrator.ts"; +import { writeSessionStatus, readAllSessionStatuses, removeSessionStatus } from "../session-status-io.ts"; +import { createTestContext } from './test-helpers.ts'; + +const { assertEq, assertTrue, report } = createTestContext(); + +// ─── Helpers ────────────────────────────────────────────────────────────────── + +function makeTempDir(): string { + const dir = mkdtempSync(join(tmpdir(), "gsd-crash-recovery-")); + mkdirSync(join(dir, ".gsd"), { recursive: true }); + return dir; +} + +function stateFilePath(basePath: string): string { + return join(basePath, ".gsd", "orchestrator.json"); +} + +function writeStateFile(basePath: string, state: PersistedState): void { + writeFileSync(stateFilePath(basePath), JSON.stringify(state, null, 2), "utf-8"); +} + +function makePersistedState(overrides: Partial = {}): PersistedState { + return { + active: true, + workers: [], + totalCost: 0, + startedAt: Date.now(), + configSnapshot: { max_workers: 3 }, + ...overrides, + }; +} + +// ─── Tests ──────────────────────────────────────────────────────────────────── + +// Test 1: persistState writes valid JSON +{ + const basePath = makeTempDir(); + try { + // We can't call persistState directly without internal state set up, + // so we test the round-trip by writing a state file and reading it back + const state = makePersistedState({ + workers: [ + { + milestoneId: "M001", + title: "M001", + pid: process.pid, + worktreePath: "/tmp/wt-M001", + startedAt: Date.now(), + state: "running", + completedUnits: 3, + cost: 0.15, + }, + ], + totalCost: 0.15, + }); + writeStateFile(basePath, state); + + const raw = readFileSync(stateFilePath(basePath), "utf-8"); + const parsed = JSON.parse(raw) as PersistedState; + assertEq(parsed.active, true, "persistState: active field preserved"); + assertEq(parsed.workers.length, 1, "persistState: worker count preserved"); + assertEq(parsed.workers[0].milestoneId, "M001", "persistState: milestoneId preserved"); + assertEq(parsed.workers[0].cost, 0.15, "persistState: cost preserved"); + assertEq(parsed.totalCost, 0.15, "persistState: totalCost preserved"); + } finally { + rmSync(basePath, { recursive: true, force: true }); + } +} + +// Test 2: restoreState returns null for missing file +{ + const basePath = makeTempDir(); + try { + const result = restoreState(basePath); + assertEq(result, null, "restoreState: returns null when no state file"); + } finally { + rmSync(basePath, { recursive: true, force: true }); + } +} + +// Test 3: restoreState filters dead PIDs +{ + const basePath = makeTempDir(); + try { + // PID 99999999 is almost certainly not alive + const state = makePersistedState({ + workers: [ + { + milestoneId: "M001", + title: "M001", + pid: 99999999, + worktreePath: "/tmp/wt-M001", + startedAt: Date.now(), + state: "running", + completedUnits: 0, + cost: 0, + }, + { + milestoneId: "M002", + title: "M002", + pid: 99999998, + worktreePath: "/tmp/wt-M002", + startedAt: Date.now(), + state: "running", + completedUnits: 0, + cost: 0, + }, + ], + }); + writeStateFile(basePath, state); + + const result = restoreState(basePath); + // Both PIDs are dead, so result should be null and file should be cleaned up + assertEq(result, null, "restoreState: returns null when all PIDs dead"); + assertTrue(!existsSync(stateFilePath(basePath)), "restoreState: cleans up state file when all dead"); + } finally { + rmSync(basePath, { recursive: true, force: true }); + } +} + +// Test 4: restoreState keeps alive PIDs +{ + const basePath = makeTempDir(); + try { + // Use current process PID (definitely alive) + const state = makePersistedState({ + workers: [ + { + milestoneId: "M001", + title: "M001", + pid: process.pid, + worktreePath: "/tmp/wt-M001", + startedAt: Date.now(), + state: "running", + completedUnits: 5, + cost: 0.25, + }, + { + milestoneId: "M002", + title: "M002", + pid: 99999999, // dead + worktreePath: "/tmp/wt-M002", + startedAt: Date.now(), + state: "running", + completedUnits: 0, + cost: 0, + }, + ], + totalCost: 0.25, + }); + writeStateFile(basePath, state); + + const result = restoreState(basePath); + assertTrue(result !== null, "restoreState: returns state when alive PID exists"); + assertEq(result!.workers.length, 1, "restoreState: filters out dead PID"); + assertEq(result!.workers[0].milestoneId, "M001", "restoreState: keeps alive worker"); + assertEq(result!.workers[0].pid, process.pid, "restoreState: preserves PID"); + assertEq(result!.workers[0].completedUnits, 5, "restoreState: preserves progress"); + } finally { + rmSync(basePath, { recursive: true, force: true }); + } +} + +// Test 5: restoreState skips stopped/error workers even with alive PIDs +{ + const basePath = makeTempDir(); + try { + const state = makePersistedState({ + workers: [ + { + milestoneId: "M001", + title: "M001", + pid: process.pid, + worktreePath: "/tmp/wt-M001", + startedAt: Date.now(), + state: "stopped", + completedUnits: 10, + cost: 0.50, + }, + ], + }); + writeStateFile(basePath, state); + + const result = restoreState(basePath); + assertEq(result, null, "restoreState: skips stopped workers"); + } finally { + rmSync(basePath, { recursive: true, force: true }); + } +} + +// Test 6: orphan detection finds stale sessions +{ + const basePath = makeTempDir(); + try { + // Write a session status with a dead PID + mkdirSync(join(basePath, ".gsd", "parallel"), { recursive: true }); + writeSessionStatus(basePath, { + milestoneId: "M001", + pid: 99999999, + state: "running", + currentUnit: null, + completedUnits: 3, + cost: 0.10, + lastHeartbeat: Date.now(), + startedAt: Date.now(), + worktreePath: "/tmp/wt-M001", + }); + + // Write a session status with alive PID + writeSessionStatus(basePath, { + milestoneId: "M002", + pid: process.pid, + state: "running", + currentUnit: null, + completedUnits: 1, + cost: 0.05, + lastHeartbeat: Date.now(), + startedAt: Date.now(), + worktreePath: "/tmp/wt-M002", + }); + + // Read all sessions — both should exist initially + const before = readAllSessionStatuses(basePath); + assertEq(before.length, 2, "orphan: both sessions exist before detection"); + + // Now simulate orphan detection logic (same as prepareParallelStart) + const sessions = readAllSessionStatuses(basePath); + const orphans: Array<{ milestoneId: string; pid: number; alive: boolean }> = []; + for (const session of sessions) { + let alive: boolean; + try { + process.kill(session.pid, 0); + alive = true; + } catch { + alive = false; + } + orphans.push({ milestoneId: session.milestoneId, pid: session.pid, alive }); + if (!alive) { + removeSessionStatus(basePath, session.milestoneId); + } + } + + assertTrue(orphans.length === 2, "orphan: detected both sessions"); + const deadOrphan = orphans.find(o => o.milestoneId === "M001"); + assertTrue(deadOrphan !== undefined && !deadOrphan.alive, "orphan: M001 detected as dead"); + const aliveOrphan = orphans.find(o => o.milestoneId === "M002"); + assertTrue(aliveOrphan !== undefined && aliveOrphan.alive, "orphan: M002 detected as alive"); + + // Dead session should be cleaned up + const after = readAllSessionStatuses(basePath); + assertEq(after.length, 1, "orphan: dead session cleaned up"); + assertEq(after[0].milestoneId, "M002", "orphan: alive session remains"); + } finally { + rmSync(basePath, { recursive: true, force: true }); + } +} + +// Test 7: restoreState handles corrupt JSON gracefully +{ + const basePath = makeTempDir(); + try { + writeFileSync(stateFilePath(basePath), "{ not valid json !!!", "utf-8"); + const result = restoreState(basePath); + assertEq(result, null, "restoreState: returns null for corrupt JSON"); + } finally { + rmSync(basePath, { recursive: true, force: true }); + } +} + +// Clean up module state +resetOrchestrator(); + +report();