fix(parallel): restore orchestrator state from session files and add worker stderr logging (#1748)

When the coordinator process restarts after a crash, the in-memory
orchestrator state is lost even though workers may still be running.
restoreState() only reads orchestrator.json, which can be missing or
corrupt. This adds restoreRuntimeState() as a fallback that rebuilds
coordinator state from live session status files under .gsd/parallel/.

Also adds:
- Worker stderr logging to per-milestone .stderr.log files for
  post-mortem diagnostics
- refreshWorkerStatuses(restoreIfNeeded) option for lazy state recovery
  from the /gsd parallel status command path
- getWorkerStatuses(basePath) auto-refreshes before returning
- Dead workers with no session file are marked stopped/error instead
  of staying permanently 'running'

Builds on #873 (crash recovery) and #932 (PID tracking).
This commit is contained in:
mastertyko 2026-03-21 16:28:11 +01:00 committed by GitHub
parent d1b6a8a6b1
commit 27ef4fcc40
4 changed files with 259 additions and 62 deletions

View file

@ -6,6 +6,7 @@ import {
isParallelActive,
pauseWorker,
prepareParallelStart,
refreshWorkerStatuses,
resumeWorker,
startParallel,
stopParallel,
@ -14,6 +15,9 @@ import { formatEligibilityReport } from "../../parallel-eligibility.js";
import { formatMergeResults, mergeAllCompleted, mergeCompletedMilestone } from "../../parallel-merge.js";
import { loadEffectiveGSDPreferences, resolveParallelConfig } from "../../preferences.js";
import { projectRoot } from "../context.js";
function emitParallelMessage(pi: ExtensionAPI, content: string): void {
pi.sendMessage({ customType: "gsd-parallel", content, display: true });
}
export async function handleParallelCommand(trimmed: string, _ctx: ExtensionCommandContext, pi: ExtensionAPI): Promise<boolean> {
if (!trimmed.startsWith("parallel")) return false;
@ -23,24 +27,21 @@ export async function handleParallelCommand(trimmed: string, _ctx: ExtensionComm
const rest = restParts.join(" ");
if (subcommand === "start" || subcommand === "") {
const root = projectRoot();
const loaded = loadEffectiveGSDPreferences();
const config = resolveParallelConfig(loaded?.preferences);
if (!config.enabled) {
pi.sendMessage({
customType: "gsd-parallel",
content: "Parallel mode is not enabled. Set `parallel.enabled: true` in your preferences.",
display: false,
});
emitParallelMessage(pi, "Parallel mode is not enabled. Set `parallel.enabled: true` in your preferences.");
return true;
}
const candidates = await prepareParallelStart(projectRoot(), loaded?.preferences);
const candidates = await prepareParallelStart(root, loaded?.preferences);
const report = formatEligibilityReport(candidates);
if (candidates.eligible.length === 0) {
pi.sendMessage({ customType: "gsd-parallel", content: `${report}\n\nNo milestones are eligible for parallel execution.`, display: false });
emitParallelMessage(pi, `${report}\n\nNo milestones are eligible for parallel execution.`);
return true;
}
const result = await startParallel(
projectRoot(),
root,
candidates.eligible.map((candidate) => candidate.milestoneId),
loaded?.preferences,
);
@ -48,16 +49,18 @@ export async function handleParallelCommand(trimmed: string, _ctx: ExtensionComm
if (result.errors.length > 0) {
lines.push(`Errors: ${result.errors.map((entry) => `${entry.mid}: ${entry.error}`).join("; ")}`);
}
pi.sendMessage({ customType: "gsd-parallel", content: `${report}\n\n${lines.join("\n")}`, display: false });
emitParallelMessage(pi, `${report}\n\n${lines.join("\n")}`);
return true;
}
if (subcommand === "status") {
if (!isParallelActive()) {
pi.sendMessage({ customType: "gsd-parallel", content: "No parallel orchestration is currently active.", display: false });
const root = projectRoot();
refreshWorkerStatuses(root, { restoreIfNeeded: true });
const workers = getWorkerStatuses(root);
if (workers.length === 0 || !isParallelActive()) {
emitParallelMessage(pi, "No parallel orchestration is currently active.");
return true;
}
const workers = getWorkerStatuses();
const lines = ["# Parallel Workers\n"];
for (const worker of workers) {
lines.push(`- **${worker.milestoneId}** (${worker.title}) — ${worker.state}${worker.completedUnits} units — $${worker.cost.toFixed(2)}`);
@ -66,28 +69,28 @@ export async function handleParallelCommand(trimmed: string, _ctx: ExtensionComm
if (state) {
lines.push(`\nTotal cost: $${state.totalCost.toFixed(2)}`);
}
pi.sendMessage({ customType: "gsd-parallel", content: lines.join("\n"), display: false });
emitParallelMessage(pi, lines.join("\n"));
return true;
}
if (subcommand === "stop") {
const milestoneId = rest.trim() || undefined;
await stopParallel(projectRoot(), milestoneId);
pi.sendMessage({ customType: "gsd-parallel", content: milestoneId ? `Stopped worker for ${milestoneId}.` : "All parallel workers stopped.", display: false });
emitParallelMessage(pi, milestoneId ? `Stopped worker for ${milestoneId}.` : "All parallel workers stopped.");
return true;
}
if (subcommand === "pause") {
const milestoneId = rest.trim() || undefined;
pauseWorker(projectRoot(), milestoneId);
pi.sendMessage({ customType: "gsd-parallel", content: milestoneId ? `Paused worker for ${milestoneId}.` : "All parallel workers paused.", display: false });
emitParallelMessage(pi, milestoneId ? `Paused worker for ${milestoneId}.` : "All parallel workers paused.");
return true;
}
if (subcommand === "resume") {
const milestoneId = rest.trim() || undefined;
resumeWorker(projectRoot(), milestoneId);
pi.sendMessage({ customType: "gsd-parallel", content: milestoneId ? `Resumed worker for ${milestoneId}.` : "All parallel workers resumed.", display: false });
emitParallelMessage(pi, milestoneId ? `Resumed worker for ${milestoneId}.` : "All parallel workers resumed.");
return true;
}
@ -95,24 +98,20 @@ export async function handleParallelCommand(trimmed: string, _ctx: ExtensionComm
const milestoneId = rest.trim() || undefined;
if (milestoneId) {
const result = await mergeCompletedMilestone(projectRoot(), milestoneId);
pi.sendMessage({ customType: "gsd-parallel", content: formatMergeResults([result]), display: false });
emitParallelMessage(pi, formatMergeResults([result]));
return true;
}
const workers = getWorkerStatuses();
const workers = getWorkerStatuses(projectRoot());
if (workers.length === 0) {
pi.sendMessage({ customType: "gsd-parallel", content: "No parallel workers to merge.", display: false });
emitParallelMessage(pi, "No parallel workers to merge.");
return true;
}
const results = await mergeAllCompleted(projectRoot(), workers);
pi.sendMessage({ customType: "gsd-parallel", content: formatMergeResults(results), display: false });
emitParallelMessage(pi, formatMergeResults(results));
return true;
}
pi.sendMessage({
customType: "gsd-parallel",
content: `Unknown parallel subcommand "${subcommand}". Usage: /gsd parallel [start|status|stop|pause|resume|merge]`,
display: false,
});
emitParallelMessage(pi, `Unknown parallel subcommand "${subcommand}". Usage: /gsd parallel [start|status|stop|pause|resume|merge]`);
return true;
}

View file

@ -9,6 +9,7 @@
import { spawn, type ChildProcess } from "node:child_process";
import {
appendFileSync,
existsSync,
writeFileSync,
readFileSync,
@ -29,6 +30,7 @@ import type { ParallelConfig } from "./types.js";
import {
writeSessionStatus,
readAllSessionStatuses,
readSessionStatus,
removeSessionStatus,
sendSignal,
cleanupStaleSessions,
@ -181,6 +183,92 @@ export function restoreState(basePath: string): PersistedState | null {
}
}
function workerLogPath(basePath: string, milestoneId: string): string {
return join(gsdRoot(basePath), "parallel", `${milestoneId}.stderr.log`);
}
function appendWorkerLog(basePath: string, milestoneId: string, chunk: string): void {
try {
const dir = join(gsdRoot(basePath), "parallel");
if (!existsSync(dir)) mkdirSync(dir, { recursive: true });
appendFileSync(workerLogPath(basePath, milestoneId), chunk, "utf-8");
} catch {
// Non-fatal — diagnostics should never break orchestration.
}
}
function restoreRuntimeState(basePath: string): boolean {
if (state?.active) return true;
const restored = restoreState(basePath);
if (restored && restored.workers.length > 0) {
const config = resolveParallelConfig(undefined);
state = {
active: restored.active,
workers: new Map(),
config: {
...config,
max_workers: restored.configSnapshot.max_workers,
budget_ceiling: restored.configSnapshot.budget_ceiling,
},
totalCost: restored.totalCost,
startedAt: restored.startedAt,
};
for (const w of restored.workers) {
const diskStatus = readSessionStatus(basePath, w.milestoneId);
state.workers.set(w.milestoneId, {
milestoneId: w.milestoneId,
title: w.title,
pid: diskStatus?.pid ?? w.pid,
process: null,
worktreePath: diskStatus?.worktreePath ?? w.worktreePath,
startedAt: w.startedAt,
state: diskStatus?.state ?? w.state,
completedUnits: diskStatus?.completedUnits ?? w.completedUnits,
cost: diskStatus?.cost ?? w.cost,
});
}
return true;
}
// Fallback: rebuild coordinator state from live session status files.
// This covers cases where orchestrator.json is missing/corrupt but workers are
// still running and writing heartbeats under .gsd/parallel/.
cleanupStaleSessions(basePath);
const statuses = readAllSessionStatuses(basePath);
if (statuses.length === 0) {
return false;
}
const config = resolveParallelConfig(undefined);
state = {
active: true,
workers: new Map(),
config,
totalCost: 0,
startedAt: Math.min(...statuses.map((status) => status.startedAt)),
};
for (const status of statuses) {
state.workers.set(status.milestoneId, {
milestoneId: status.milestoneId,
title: status.milestoneId,
pid: status.pid,
process: null,
worktreePath: status.worktreePath,
startedAt: status.startedAt,
state: status.state,
completedUnits: status.completedUnits,
cost: status.cost,
});
state.totalCost += status.cost;
}
return true;
}
async function waitForWorkerExit(worker: WorkerInfo, timeoutMs: number): Promise<boolean> {
if (worker.process) {
await new Promise<void>((resolve) => {
@ -202,6 +290,7 @@ async function waitForWorkerExit(worker: WorkerInfo, timeoutMs: number): Promise
return !isPidAlive(worker.pid);
}
// ─── Accessors ─────────────────────────────────────────────────────────────
/** Returns true if the orchestrator is active and has been initialized. */
@ -215,7 +304,10 @@ export function getOrchestratorState(): OrchestratorState | null {
}
/** Returns a snapshot of all tracked workers as an array. */
export function getWorkerStatuses(): WorkerInfo[] {
export function getWorkerStatuses(basePath?: string): WorkerInfo[] {
if (basePath) {
refreshWorkerStatuses(basePath, { restoreIfNeeded: true });
}
if (!state) return [];
return [...state.workers.values()];
}
@ -487,6 +579,12 @@ export function spawnWorker(
});
}
if (child.stderr) {
child.stderr.on("data", (data: Buffer) => {
appendWorkerLog(basePath, milestoneId, data.toString());
});
}
// Update session status with real PID
writeSessionStatus(basePath, {
milestoneId,
@ -513,6 +611,7 @@ export function spawnWorker(
w.state = "stopped";
} else {
w.state = "error";
appendWorkerLog(basePath, milestoneId, `\n[orchestrator] worker exited with code ${code ?? "null"}\n`);
}
// Update session status and persist orchestrator state for crash recovery
@ -767,7 +866,13 @@ export function resumeWorker(
* Poll worker statuses from disk and update orchestrator state.
* Call this periodically from the dashboard refresh cycle.
*/
export function refreshWorkerStatuses(basePath: string): void {
export function refreshWorkerStatuses(
basePath: string,
options: { restoreIfNeeded?: boolean } = {},
): void {
if (!state && options.restoreIfNeeded) {
restoreRuntimeState(basePath);
}
if (!state) return;
// Clean up stale sessions first
@ -790,7 +895,13 @@ export function refreshWorkerStatuses(basePath: string): void {
// Update in-memory worker state from disk data
for (const [mid, worker] of state.workers) {
const diskStatus = statusMap.get(mid);
if (!diskStatus) continue;
if (!diskStatus) {
if (!isPidAlive(worker.pid)) {
worker.state = worker.completedUnits > 0 ? "stopped" : "error";
worker.process = null;
}
continue;
}
worker.state = diskStatus.state;
worker.completedUnits = diskStatus.completedUnits;

View file

@ -8,7 +8,15 @@
import { describe, it, beforeEach, afterEach } from "node:test";
import assert from "node:assert/strict";
import { mkdtempSync, mkdirSync, rmSync } from "node:fs";
import {
mkdtempSync,
mkdirSync,
rmSync,
writeFileSync,
existsSync,
readFileSync,
lstatSync,
} from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
@ -41,6 +49,7 @@ import {
getAggregateCost,
isBudgetExceeded,
resetOrchestrator,
refreshWorkerStatuses,
} from "../parallel-orchestrator.js";
import { validatePreferences, resolveParallelConfig } from "../preferences.js";
@ -275,8 +284,37 @@ describe("parallel-orchestrator: lifecycle", () => {
assert.equal(isParallelActive(), false);
});
it("getOrchestratorState returns null initially", () => {
assert.equal(getOrchestratorState(), null);
it("getWorkerStatuses restores persisted workers from disk", async () => {
const base = makeTmpBase();
try {
const persisted = {
active: true,
workers: [
{
milestoneId: "M001",
title: "M001",
pid: process.pid,
worktreePath: "/tmp/wt-M001",
startedAt: Date.now(),
state: "running",
completedUnits: 2,
cost: 0.25,
},
],
totalCost: 0.25,
startedAt: Date.now(),
configSnapshot: { max_workers: 2 },
};
writeFileSync(join(base, ".gsd", "orchestrator.json"), JSON.stringify(persisted, null, 2), "utf-8");
const workers = getWorkerStatuses(base);
assert.equal(workers.length, 1);
assert.equal(workers[0].milestoneId, "M001");
assert.equal(workers[0].completedUnits, 2);
assert.equal(isParallelActive(), true);
} finally {
resetOrchestrator();
rmSync(base, { recursive: true, force: true });
}
});
it("startParallel initializes orchestrator state", async () => {
@ -360,12 +398,29 @@ describe("parallel-orchestrator: lifecycle", () => {
}
});
it("shutdownParallel deactivates the orchestrator state", async () => {
await startParallel(base, ["M001"], undefined);
assert.equal(isParallelActive(), true);
await shutdownParallel(base);
assert.equal(isParallelActive(), false);
assert.equal(getOrchestratorState(), null);
it("refreshWorkerStatuses restores live workers from session status files when orchestrator state is absent", async () => {
const base = makeTmpBase();
try {
writeSessionStatus(base, {
milestoneId: "M001",
pid: process.pid,
state: "running",
currentUnit: null,
completedUnits: 4,
cost: 0.33,
lastHeartbeat: Date.now(),
startedAt: Date.now() - 1000,
worktreePath: "/tmp/wt-M001",
});
refreshWorkerStatuses(base, { restoreIfNeeded: true });
const workers = getWorkerStatuses();
assert.equal(workers.length, 1);
assert.equal(workers[0].state, "running");
assert.equal(workers[0].completedUnits, 4);
} finally {
resetOrchestrator();
rmSync(base, { recursive: true, force: true });
}
});
});

View file

@ -10,8 +10,8 @@
* 6. completedUnits counter increments on assistant message_end
*/
import { describe, it, beforeEach, after } from "node:test";
import { mkdtempSync, rmSync, existsSync, readFileSync } from "node:fs";
import { describe, it, after } from "node:test";
import { mkdtempSync, rmSync, writeFileSync, mkdirSync } from "node:fs";
import { join } from "node:path";
import { tmpdir } from "node:os";
import { createTestContext } from "./test-helpers.ts";
@ -19,14 +19,12 @@ import { createTestContext } from "./test-helpers.ts";
// We test processWorkerLine indirectly via the module's exported state.
// To test the internal function, we use the exported accessors.
import {
getOrchestratorState,
getWorkerStatuses,
getAggregateCost,
isBudgetExceeded,
isParallelActive,
resetOrchestrator,
type OrchestratorState,
type WorkerInfo,
refreshWorkerStatuses,
} from "../parallel-orchestrator.ts";
const { assertEq, assertTrue, report } = createTestContext();
@ -49,14 +47,6 @@ function makeMessageEndLine(cost: number, role = "assistant"): string {
});
}
/** Create a tool_execution_start NDJSON line. */
function makeToolStartLine(toolName: string): string {
return JSON.stringify({
type: "tool_execution_start",
toolName,
});
}
// ─── Tests ────────────────────────────────────────────────────────────────
describe("parallel-worker-monitoring", () => {
@ -154,18 +144,60 @@ describe("parallel-worker-monitoring", () => {
"--mode comes before json");
});
it("PID-based kill fallback pattern works", () => {
// Verify the pattern: try process handle first, fall back to process.kill
const worker = { process: null as null, pid: process.pid };
// With null process handle, PID-based kill should be used
assertTrue(worker.process === null, "process handle is null");
assertTrue(worker.pid > 0, "PID is valid");
// process.kill(pid, 0) checks if process exists without sending signal
let alive = false;
it("refreshWorkerStatuses restores persisted workers from disk", () => {
const base = mkdtempSync(join(tmpdir(), "gsd-parallel-monitoring-"));
try {
process.kill(worker.pid, 0);
alive = true;
} catch { /* not alive */ }
assertTrue(alive, "PID-based liveness check works");
mkdirSync(join(base, ".gsd"), { recursive: true });
writeFileSync(join(base, ".gsd", "orchestrator.json"), JSON.stringify({
active: true,
workers: [
{
milestoneId: "M001",
title: "M001",
pid: process.pid,
worktreePath: "/tmp/wt-M001",
startedAt: Date.now(),
state: "running",
completedUnits: 1,
cost: 0.1,
},
],
totalCost: 0.1,
startedAt: Date.now(),
configSnapshot: { max_workers: 2 },
}, null, 2));
refreshWorkerStatuses(base, { restoreIfNeeded: true });
const workers = getWorkerStatuses();
assertEq(workers.length, 1, "restored one worker");
assertEq(workers[0].milestoneId, "M001", "worker restored from persisted state");
} finally {
resetOrchestrator();
rmSync(base, { recursive: true, force: true });
}
});
it("refreshWorkerStatuses restores persisted workers from live session status files", () => {
const base = mkdtempSync(join(tmpdir(), "gsd-parallel-stderr-"));
try {
mkdirSync(join(base, ".gsd", "parallel"), { recursive: true });
writeFileSync(join(base, ".gsd", "parallel", "M009.status.json"), JSON.stringify({
milestoneId: "M009",
pid: process.pid,
state: "running",
currentUnit: null,
completedUnits: 3,
cost: 0.42,
lastHeartbeat: Date.now(),
startedAt: Date.now() - 1000,
worktreePath: "/tmp/wt-M009",
}, null, 2));
refreshWorkerStatuses(base, { restoreIfNeeded: true });
const workers = getWorkerStatuses();
assertEq(workers[0].state, "running", "live session status restored");
assertEq(workers[0].completedUnits, 3, "completed units restored from status file");
} finally {
resetOrchestrator();
rmSync(base, { recursive: true, force: true });
}
});
});