From 27ef4fcc407fdebeb545e15fa49bdaa2ad241ce9 Mon Sep 17 00:00:00 2001 From: mastertyko <11311479+mastertyko@users.noreply.github.com> Date: Sat, 21 Mar 2026 16:28:11 +0100 Subject: [PATCH] fix(parallel): restore orchestrator state from session files and add worker stderr logging (#1748) When the coordinator process restarts after a crash, the in-memory orchestrator state is lost even though workers may still be running. restoreState() only reads orchestrator.json, which can be missing or corrupt. This adds restoreRuntimeState() as a fallback that rebuilds coordinator state from live session status files under .gsd/parallel/. Also adds: - Worker stderr logging to per-milestone .stderr.log files for post-mortem diagnostics - refreshWorkerStatuses(restoreIfNeeded) option for lazy state recovery from the /gsd parallel status command path - getWorkerStatuses(basePath) auto-refreshes before returning - Dead workers with no session file are marked stopped/error instead of staying permanently 'running' Builds on #873 (crash recovery) and #932 (PID tracking). --- .../gsd/commands/handlers/parallel.ts | 49 ++++---- .../extensions/gsd/parallel-orchestrator.ts | 117 +++++++++++++++++- .../gsd/tests/parallel-orchestration.test.ts | 73 +++++++++-- .../tests/parallel-worker-monitoring.test.ts | 82 ++++++++---- 4 files changed, 259 insertions(+), 62 deletions(-) diff --git a/src/resources/extensions/gsd/commands/handlers/parallel.ts b/src/resources/extensions/gsd/commands/handlers/parallel.ts index 0aa27c385..a2acb5367 100644 --- a/src/resources/extensions/gsd/commands/handlers/parallel.ts +++ b/src/resources/extensions/gsd/commands/handlers/parallel.ts @@ -6,6 +6,7 @@ import { isParallelActive, pauseWorker, prepareParallelStart, + refreshWorkerStatuses, resumeWorker, startParallel, stopParallel, @@ -14,6 +15,9 @@ import { formatEligibilityReport } from "../../parallel-eligibility.js"; import { formatMergeResults, mergeAllCompleted, mergeCompletedMilestone } from "../../parallel-merge.js"; import { loadEffectiveGSDPreferences, resolveParallelConfig } from "../../preferences.js"; import { projectRoot } from "../context.js"; +function emitParallelMessage(pi: ExtensionAPI, content: string): void { + pi.sendMessage({ customType: "gsd-parallel", content, display: true }); +} export async function handleParallelCommand(trimmed: string, _ctx: ExtensionCommandContext, pi: ExtensionAPI): Promise { if (!trimmed.startsWith("parallel")) return false; @@ -23,24 +27,21 @@ export async function handleParallelCommand(trimmed: string, _ctx: ExtensionComm const rest = restParts.join(" "); if (subcommand === "start" || subcommand === "") { + const root = projectRoot(); const loaded = loadEffectiveGSDPreferences(); const config = resolveParallelConfig(loaded?.preferences); if (!config.enabled) { - pi.sendMessage({ - customType: "gsd-parallel", - content: "Parallel mode is not enabled. Set `parallel.enabled: true` in your preferences.", - display: false, - }); + emitParallelMessage(pi, "Parallel mode is not enabled. Set `parallel.enabled: true` in your preferences."); return true; } - const candidates = await prepareParallelStart(projectRoot(), loaded?.preferences); + const candidates = await prepareParallelStart(root, loaded?.preferences); const report = formatEligibilityReport(candidates); if (candidates.eligible.length === 0) { - pi.sendMessage({ customType: "gsd-parallel", content: `${report}\n\nNo milestones are eligible for parallel execution.`, display: false }); + emitParallelMessage(pi, `${report}\n\nNo milestones are eligible for parallel execution.`); return true; } const result = await startParallel( - projectRoot(), + root, candidates.eligible.map((candidate) => candidate.milestoneId), loaded?.preferences, ); @@ -48,16 +49,18 @@ export async function handleParallelCommand(trimmed: string, _ctx: ExtensionComm if (result.errors.length > 0) { lines.push(`Errors: ${result.errors.map((entry) => `${entry.mid}: ${entry.error}`).join("; ")}`); } - pi.sendMessage({ customType: "gsd-parallel", content: `${report}\n\n${lines.join("\n")}`, display: false }); + emitParallelMessage(pi, `${report}\n\n${lines.join("\n")}`); return true; } if (subcommand === "status") { - if (!isParallelActive()) { - pi.sendMessage({ customType: "gsd-parallel", content: "No parallel orchestration is currently active.", display: false }); + const root = projectRoot(); + refreshWorkerStatuses(root, { restoreIfNeeded: true }); + const workers = getWorkerStatuses(root); + if (workers.length === 0 || !isParallelActive()) { + emitParallelMessage(pi, "No parallel orchestration is currently active."); return true; } - const workers = getWorkerStatuses(); const lines = ["# Parallel Workers\n"]; for (const worker of workers) { lines.push(`- **${worker.milestoneId}** (${worker.title}) — ${worker.state} — ${worker.completedUnits} units — $${worker.cost.toFixed(2)}`); @@ -66,28 +69,28 @@ export async function handleParallelCommand(trimmed: string, _ctx: ExtensionComm if (state) { lines.push(`\nTotal cost: $${state.totalCost.toFixed(2)}`); } - pi.sendMessage({ customType: "gsd-parallel", content: lines.join("\n"), display: false }); + emitParallelMessage(pi, lines.join("\n")); return true; } if (subcommand === "stop") { const milestoneId = rest.trim() || undefined; await stopParallel(projectRoot(), milestoneId); - pi.sendMessage({ customType: "gsd-parallel", content: milestoneId ? `Stopped worker for ${milestoneId}.` : "All parallel workers stopped.", display: false }); + emitParallelMessage(pi, milestoneId ? `Stopped worker for ${milestoneId}.` : "All parallel workers stopped."); return true; } if (subcommand === "pause") { const milestoneId = rest.trim() || undefined; pauseWorker(projectRoot(), milestoneId); - pi.sendMessage({ customType: "gsd-parallel", content: milestoneId ? `Paused worker for ${milestoneId}.` : "All parallel workers paused.", display: false }); + emitParallelMessage(pi, milestoneId ? `Paused worker for ${milestoneId}.` : "All parallel workers paused."); return true; } if (subcommand === "resume") { const milestoneId = rest.trim() || undefined; resumeWorker(projectRoot(), milestoneId); - pi.sendMessage({ customType: "gsd-parallel", content: milestoneId ? `Resumed worker for ${milestoneId}.` : "All parallel workers resumed.", display: false }); + emitParallelMessage(pi, milestoneId ? `Resumed worker for ${milestoneId}.` : "All parallel workers resumed."); return true; } @@ -95,24 +98,20 @@ export async function handleParallelCommand(trimmed: string, _ctx: ExtensionComm const milestoneId = rest.trim() || undefined; if (milestoneId) { const result = await mergeCompletedMilestone(projectRoot(), milestoneId); - pi.sendMessage({ customType: "gsd-parallel", content: formatMergeResults([result]), display: false }); + emitParallelMessage(pi, formatMergeResults([result])); return true; } - const workers = getWorkerStatuses(); + const workers = getWorkerStatuses(projectRoot()); if (workers.length === 0) { - pi.sendMessage({ customType: "gsd-parallel", content: "No parallel workers to merge.", display: false }); + emitParallelMessage(pi, "No parallel workers to merge."); return true; } const results = await mergeAllCompleted(projectRoot(), workers); - pi.sendMessage({ customType: "gsd-parallel", content: formatMergeResults(results), display: false }); + emitParallelMessage(pi, formatMergeResults(results)); return true; } - pi.sendMessage({ - customType: "gsd-parallel", - content: `Unknown parallel subcommand "${subcommand}". Usage: /gsd parallel [start|status|stop|pause|resume|merge]`, - display: false, - }); + emitParallelMessage(pi, `Unknown parallel subcommand "${subcommand}". Usage: /gsd parallel [start|status|stop|pause|resume|merge]`); return true; } diff --git a/src/resources/extensions/gsd/parallel-orchestrator.ts b/src/resources/extensions/gsd/parallel-orchestrator.ts index 33309eab8..86aa480f7 100644 --- a/src/resources/extensions/gsd/parallel-orchestrator.ts +++ b/src/resources/extensions/gsd/parallel-orchestrator.ts @@ -9,6 +9,7 @@ import { spawn, type ChildProcess } from "node:child_process"; import { + appendFileSync, existsSync, writeFileSync, readFileSync, @@ -29,6 +30,7 @@ import type { ParallelConfig } from "./types.js"; import { writeSessionStatus, readAllSessionStatuses, + readSessionStatus, removeSessionStatus, sendSignal, cleanupStaleSessions, @@ -181,6 +183,92 @@ export function restoreState(basePath: string): PersistedState | null { } } +function workerLogPath(basePath: string, milestoneId: string): string { + return join(gsdRoot(basePath), "parallel", `${milestoneId}.stderr.log`); +} + +function appendWorkerLog(basePath: string, milestoneId: string, chunk: string): void { + try { + const dir = join(gsdRoot(basePath), "parallel"); + if (!existsSync(dir)) mkdirSync(dir, { recursive: true }); + appendFileSync(workerLogPath(basePath, milestoneId), chunk, "utf-8"); + } catch { + // Non-fatal — diagnostics should never break orchestration. + } +} + +function restoreRuntimeState(basePath: string): boolean { + if (state?.active) return true; + + const restored = restoreState(basePath); + if (restored && restored.workers.length > 0) { + const config = resolveParallelConfig(undefined); + state = { + active: restored.active, + workers: new Map(), + config: { + ...config, + max_workers: restored.configSnapshot.max_workers, + budget_ceiling: restored.configSnapshot.budget_ceiling, + }, + totalCost: restored.totalCost, + startedAt: restored.startedAt, + }; + + for (const w of restored.workers) { + const diskStatus = readSessionStatus(basePath, w.milestoneId); + state.workers.set(w.milestoneId, { + milestoneId: w.milestoneId, + title: w.title, + pid: diskStatus?.pid ?? w.pid, + process: null, + worktreePath: diskStatus?.worktreePath ?? w.worktreePath, + startedAt: w.startedAt, + state: diskStatus?.state ?? w.state, + completedUnits: diskStatus?.completedUnits ?? w.completedUnits, + cost: diskStatus?.cost ?? w.cost, + }); + } + + return true; + } + + // Fallback: rebuild coordinator state from live session status files. + // This covers cases where orchestrator.json is missing/corrupt but workers are + // still running and writing heartbeats under .gsd/parallel/. + cleanupStaleSessions(basePath); + const statuses = readAllSessionStatuses(basePath); + if (statuses.length === 0) { + return false; + } + + const config = resolveParallelConfig(undefined); + state = { + active: true, + workers: new Map(), + config, + totalCost: 0, + startedAt: Math.min(...statuses.map((status) => status.startedAt)), + }; + + for (const status of statuses) { + state.workers.set(status.milestoneId, { + milestoneId: status.milestoneId, + title: status.milestoneId, + pid: status.pid, + process: null, + worktreePath: status.worktreePath, + startedAt: status.startedAt, + state: status.state, + completedUnits: status.completedUnits, + cost: status.cost, + }); + state.totalCost += status.cost; + } + + return true; +} + async function waitForWorkerExit(worker: WorkerInfo, timeoutMs: number): Promise { if (worker.process) { await new Promise((resolve) => { @@ -202,6 +290,7 @@ async function waitForWorkerExit(worker: WorkerInfo, timeoutMs: number): Promise return !isPidAlive(worker.pid); } + // ─── Accessors ───────────────────────────────────────────────────────────── /** Returns true if the orchestrator is active and has been initialized. */ @@ -215,7 +304,10 @@ export function getOrchestratorState(): OrchestratorState | null { } /** Returns a snapshot of all tracked workers as an array. */ -export function getWorkerStatuses(): WorkerInfo[] { +export function getWorkerStatuses(basePath?: string): WorkerInfo[] { + if (basePath) { + refreshWorkerStatuses(basePath, { restoreIfNeeded: true }); + } if (!state) return []; return [...state.workers.values()]; } @@ -487,6 +579,12 @@ export function spawnWorker( }); } + if (child.stderr) { + child.stderr.on("data", (data: Buffer) => { + appendWorkerLog(basePath, milestoneId, data.toString()); + }); + } + // Update session status with real PID writeSessionStatus(basePath, { milestoneId, @@ -513,6 +611,7 @@ export function spawnWorker( w.state = "stopped"; } else { w.state = "error"; + appendWorkerLog(basePath, milestoneId, `\n[orchestrator] worker exited with code ${code ?? "null"}\n`); } // Update session status and persist orchestrator state for crash recovery @@ -767,7 +866,13 @@ export function resumeWorker( * Poll worker statuses from disk and update orchestrator state. * Call this periodically from the dashboard refresh cycle. */ -export function refreshWorkerStatuses(basePath: string): void { +export function refreshWorkerStatuses( + basePath: string, + options: { restoreIfNeeded?: boolean } = {}, +): void { + if (!state && options.restoreIfNeeded) { + restoreRuntimeState(basePath); + } if (!state) return; // Clean up stale sessions first @@ -790,7 +895,13 @@ export function refreshWorkerStatuses(basePath: string): void { // Update in-memory worker state from disk data for (const [mid, worker] of state.workers) { const diskStatus = statusMap.get(mid); - if (!diskStatus) continue; + if (!diskStatus) { + if (!isPidAlive(worker.pid)) { + worker.state = worker.completedUnits > 0 ? "stopped" : "error"; + worker.process = null; + } + continue; + } worker.state = diskStatus.state; worker.completedUnits = diskStatus.completedUnits; diff --git a/src/resources/extensions/gsd/tests/parallel-orchestration.test.ts b/src/resources/extensions/gsd/tests/parallel-orchestration.test.ts index fcc81ca45..aabd9736c 100644 --- a/src/resources/extensions/gsd/tests/parallel-orchestration.test.ts +++ b/src/resources/extensions/gsd/tests/parallel-orchestration.test.ts @@ -8,7 +8,15 @@ import { describe, it, beforeEach, afterEach } from "node:test"; import assert from "node:assert/strict"; -import { mkdtempSync, mkdirSync, rmSync } from "node:fs"; +import { + mkdtempSync, + mkdirSync, + rmSync, + writeFileSync, + existsSync, + readFileSync, + lstatSync, +} from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; @@ -41,6 +49,7 @@ import { getAggregateCost, isBudgetExceeded, resetOrchestrator, + refreshWorkerStatuses, } from "../parallel-orchestrator.js"; import { validatePreferences, resolveParallelConfig } from "../preferences.js"; @@ -275,8 +284,37 @@ describe("parallel-orchestrator: lifecycle", () => { assert.equal(isParallelActive(), false); }); - it("getOrchestratorState returns null initially", () => { - assert.equal(getOrchestratorState(), null); + it("getWorkerStatuses restores persisted workers from disk", async () => { + const base = makeTmpBase(); + try { + const persisted = { + active: true, + workers: [ + { + milestoneId: "M001", + title: "M001", + pid: process.pid, + worktreePath: "/tmp/wt-M001", + startedAt: Date.now(), + state: "running", + completedUnits: 2, + cost: 0.25, + }, + ], + totalCost: 0.25, + startedAt: Date.now(), + configSnapshot: { max_workers: 2 }, + }; + writeFileSync(join(base, ".gsd", "orchestrator.json"), JSON.stringify(persisted, null, 2), "utf-8"); + const workers = getWorkerStatuses(base); + assert.equal(workers.length, 1); + assert.equal(workers[0].milestoneId, "M001"); + assert.equal(workers[0].completedUnits, 2); + assert.equal(isParallelActive(), true); + } finally { + resetOrchestrator(); + rmSync(base, { recursive: true, force: true }); + } }); it("startParallel initializes orchestrator state", async () => { @@ -360,12 +398,29 @@ describe("parallel-orchestrator: lifecycle", () => { } }); - it("shutdownParallel deactivates the orchestrator state", async () => { - await startParallel(base, ["M001"], undefined); - assert.equal(isParallelActive(), true); - await shutdownParallel(base); - assert.equal(isParallelActive(), false); - assert.equal(getOrchestratorState(), null); + it("refreshWorkerStatuses restores live workers from session status files when orchestrator state is absent", async () => { + const base = makeTmpBase(); + try { + writeSessionStatus(base, { + milestoneId: "M001", + pid: process.pid, + state: "running", + currentUnit: null, + completedUnits: 4, + cost: 0.33, + lastHeartbeat: Date.now(), + startedAt: Date.now() - 1000, + worktreePath: "/tmp/wt-M001", + }); + refreshWorkerStatuses(base, { restoreIfNeeded: true }); + const workers = getWorkerStatuses(); + assert.equal(workers.length, 1); + assert.equal(workers[0].state, "running"); + assert.equal(workers[0].completedUnits, 4); + } finally { + resetOrchestrator(); + rmSync(base, { recursive: true, force: true }); + } }); }); diff --git a/src/resources/extensions/gsd/tests/parallel-worker-monitoring.test.ts b/src/resources/extensions/gsd/tests/parallel-worker-monitoring.test.ts index abee1929d..ba7920645 100644 --- a/src/resources/extensions/gsd/tests/parallel-worker-monitoring.test.ts +++ b/src/resources/extensions/gsd/tests/parallel-worker-monitoring.test.ts @@ -10,8 +10,8 @@ * 6. completedUnits counter increments on assistant message_end */ -import { describe, it, beforeEach, after } from "node:test"; -import { mkdtempSync, rmSync, existsSync, readFileSync } from "node:fs"; +import { describe, it, after } from "node:test"; +import { mkdtempSync, rmSync, writeFileSync, mkdirSync } from "node:fs"; import { join } from "node:path"; import { tmpdir } from "node:os"; import { createTestContext } from "./test-helpers.ts"; @@ -19,14 +19,12 @@ import { createTestContext } from "./test-helpers.ts"; // We test processWorkerLine indirectly via the module's exported state. // To test the internal function, we use the exported accessors. import { - getOrchestratorState, getWorkerStatuses, getAggregateCost, isBudgetExceeded, isParallelActive, resetOrchestrator, - type OrchestratorState, - type WorkerInfo, + refreshWorkerStatuses, } from "../parallel-orchestrator.ts"; const { assertEq, assertTrue, report } = createTestContext(); @@ -49,14 +47,6 @@ function makeMessageEndLine(cost: number, role = "assistant"): string { }); } -/** Create a tool_execution_start NDJSON line. */ -function makeToolStartLine(toolName: string): string { - return JSON.stringify({ - type: "tool_execution_start", - toolName, - }); -} - // ─── Tests ──────────────────────────────────────────────────────────────── describe("parallel-worker-monitoring", () => { @@ -154,18 +144,60 @@ describe("parallel-worker-monitoring", () => { "--mode comes before json"); }); - it("PID-based kill fallback pattern works", () => { - // Verify the pattern: try process handle first, fall back to process.kill - const worker = { process: null as null, pid: process.pid }; - // With null process handle, PID-based kill should be used - assertTrue(worker.process === null, "process handle is null"); - assertTrue(worker.pid > 0, "PID is valid"); - // process.kill(pid, 0) checks if process exists without sending signal - let alive = false; + it("refreshWorkerStatuses restores persisted workers from disk", () => { + const base = mkdtempSync(join(tmpdir(), "gsd-parallel-monitoring-")); try { - process.kill(worker.pid, 0); - alive = true; - } catch { /* not alive */ } - assertTrue(alive, "PID-based liveness check works"); + mkdirSync(join(base, ".gsd"), { recursive: true }); + writeFileSync(join(base, ".gsd", "orchestrator.json"), JSON.stringify({ + active: true, + workers: [ + { + milestoneId: "M001", + title: "M001", + pid: process.pid, + worktreePath: "/tmp/wt-M001", + startedAt: Date.now(), + state: "running", + completedUnits: 1, + cost: 0.1, + }, + ], + totalCost: 0.1, + startedAt: Date.now(), + configSnapshot: { max_workers: 2 }, + }, null, 2)); + refreshWorkerStatuses(base, { restoreIfNeeded: true }); + const workers = getWorkerStatuses(); + assertEq(workers.length, 1, "restored one worker"); + assertEq(workers[0].milestoneId, "M001", "worker restored from persisted state"); + } finally { + resetOrchestrator(); + rmSync(base, { recursive: true, force: true }); + } + }); + + it("refreshWorkerStatuses restores persisted workers from live session status files", () => { + const base = mkdtempSync(join(tmpdir(), "gsd-parallel-stderr-")); + try { + mkdirSync(join(base, ".gsd", "parallel"), { recursive: true }); + writeFileSync(join(base, ".gsd", "parallel", "M009.status.json"), JSON.stringify({ + milestoneId: "M009", + pid: process.pid, + state: "running", + currentUnit: null, + completedUnits: 3, + cost: 0.42, + lastHeartbeat: Date.now(), + startedAt: Date.now() - 1000, + worktreePath: "/tmp/wt-M009", + }, null, 2)); + refreshWorkerStatuses(base, { restoreIfNeeded: true }); + const workers = getWorkerStatuses(); + assertEq(workers[0].state, "running", "live session status restored"); + assertEq(workers[0].completedUnits, 3, "completed units restored from status file"); + } finally { + resetOrchestrator(); + rmSync(base, { recursive: true, force: true }); + } }); });