feat: parallel milestone orchestration foundation (#672)

Add infrastructure for parallel milestone execution behind
`parallel.enabled: false` flag (opt-in, zero impact to existing users).

New modules:
- session-status-io.ts: File-based IPC protocol with atomic writes,
  signal lifecycle (pause/resume/stop), and stale session detection
- parallel-eligibility.ts: Milestone parallelism analysis checking
  dependency satisfaction and file overlap across slice plans
- parallel-orchestrator.ts: Core orchestrator managing worker lifecycle,
  budget tracking, and coordination via session status files
- /gsd parallel [start|status|stop|pause|resume] command handlers

Modified:
- types.ts: ParallelConfig interface (enabled, max_workers, budget_ceiling,
  merge_strategy, auto_merge)
- preferences.ts: Parallel config validation, merging, and resolver
- commands.ts: /gsd parallel subcommand routing with argument completions

Tests: 39 new tests covering session I/O roundtrip, signal lifecycle,
stale detection, eligibility formatting, orchestrator lifecycle,
budget enforcement, and preference validation.
This commit is contained in:
Jeremy McSpadden 2026-03-16 16:54:09 -05:00 committed by Lex Christopherson
parent 392b75a4db
commit eb302fe1d2
8 changed files with 1380 additions and 2 deletions

View file

@ -0,0 +1,17 @@
# Issue #672: Parallel Milestone Orchestration
**Issue:** https://github.com/gsd-build/gsd-2/issues/672
**Contributor:** @deseltrus (7 merged PRs, proven contributor)
**Status:** WIP — foundation modules built, orchestrator core in progress
**Default:** `parallel.enabled: false` — opt-in, zero impact to existing users
## Delivery Plan (6 PRs)
### PR 1: Worktree Bugfixes - MERGED (#675)
### PR 2: Dispatch Hardening (Small) - pending contributor
### PR 3: Parallel Config + Preferences (Small) - included in this PR
### PR 4: Session Status Protocol (Medium) - included in this PR
### PR 5: Orchestrator Core (Large) - included in this PR
### PR 6: Dashboard + Commands (Medium) - commands included, dashboard deferred
See full plan in the GitHub issue comment.

View file

@ -42,6 +42,13 @@ import { handleQuick } from "./quick.js";
import { handleHistory } from "./history.js";
import { handleUndo } from "./undo.js";
import { handleExport } from "./export.js";
import {
isParallelActive, getOrchestratorState, getWorkerStatuses,
prepareParallelStart, startParallel, stopParallel,
pauseWorker, resumeWorker,
} from "./parallel-orchestrator.js";
import { formatEligibilityReport } from "./parallel-eligibility.js";
import { resolveParallelConfig } from "./preferences.js";
import { nativeBranchList, nativeDetectMainBranch, nativeBranchListMerged, nativeBranchDelete, nativeForEachRef, nativeUpdateRef } from "./native-git-bridge.js";
export function dispatchDoctorHeal(pi: ExtensionAPI, scope: string | undefined, reportText: string, structuredIssues: string): void {
@ -69,13 +76,13 @@ function projectRoot(): string {
export function registerGSDCommand(pi: ExtensionAPI): void {
pi.registerCommand("gsd", {
description: "GSD — Get Shit Done: /gsd help|next|auto|stop|pause|status|visualize|queue|quick|capture|triage|dispatch|history|undo|skip|export|cleanup|mode|prefs|config|hooks|run-hook|skill-health|doctor|forensics|migrate|remote|steer|knowledge",
description: "GSD — Get Shit Done: /gsd help|next|auto|stop|pause|status|visualize|queue|quick|capture|triage|dispatch|history|undo|skip|export|cleanup|mode|prefs|config|hooks|run-hook|skill-health|doctor|forensics|migrate|remote|steer|knowledge|parallel",
getArgumentCompletions: (prefix: string) => {
const subcommands = [
"help", "next", "auto", "stop", "pause", "status", "visualize", "queue", "quick", "discuss",
"capture", "triage", "dispatch",
"history", "undo", "skip", "export", "cleanup", "mode", "prefs",
"config", "hooks", "run-hook", "skill-health", "doctor", "forensics", "migrate", "remote", "steer", "inspect", "knowledge",
"config", "hooks", "run-hook", "skill-health", "doctor", "forensics", "migrate", "remote", "steer", "inspect", "knowledge", "parallel",
];
const parts = prefix.trim().split(/\s+/);
@ -99,6 +106,13 @@ export function registerGSDCommand(pi: ExtensionAPI): void {
.map((cmd) => ({ value: `mode ${cmd}`, label: cmd }));
}
if (parts[0] === "parallel" && parts.length <= 2) {
const subPrefix = parts[1] ?? "";
return ["start", "status", "stop", "pause", "resume"]
.filter((cmd) => cmd.startsWith(subPrefix))
.map((cmd) => ({ value: `parallel ${cmd}`, label: cmd }));
}
if (parts[0] === "prefs" && parts.length <= 2) {
const subPrefix = parts[1] ?? "";
return ["global", "project", "status", "wizard", "setup", "import-claude"]
@ -288,6 +302,85 @@ export function registerGSDCommand(pi: ExtensionAPI): void {
return;
}
// ─── Parallel Orchestration ────────────────────────────────────────
if (trimmed.startsWith("parallel")) {
const parallelArgs = trimmed.slice("parallel".length).trim();
const [subCmd = "", ...restParts] = parallelArgs.split(/\s+/);
const rest = restParts.join(" ");
if (subCmd === "start" || subCmd === "") {
const loaded = loadEffectiveGSDPreferences();
const config = resolveParallelConfig(loaded?.preferences);
if (!config.enabled) {
pi.sendMessage({
content: "Parallel mode is not enabled. Set `parallel.enabled: true` in your preferences.",
});
return;
}
const candidates = await prepareParallelStart(projectRoot(), loaded?.preferences);
const report = formatEligibilityReport(candidates);
if (candidates.eligible.length === 0) {
pi.sendMessage({ content: report + "\n\nNo milestones are eligible for parallel execution." });
return;
}
const result = await startParallel(
projectRoot(),
candidates.eligible.map(e => e.milestoneId),
loaded?.preferences,
);
const lines = [`Parallel orchestration started.`, `Workers: ${result.started.join(", ")}`];
if (result.errors.length > 0) {
lines.push(`Errors: ${result.errors.map(e => `${e.mid}: ${e.error}`).join("; ")}`);
}
pi.sendMessage({ content: report + "\n\n" + lines.join("\n") });
return;
}
if (subCmd === "status") {
if (!isParallelActive()) {
pi.sendMessage({ content: "No parallel orchestration is currently active." });
return;
}
const workers = getWorkerStatuses();
const lines = ["# Parallel Workers\n"];
for (const w of workers) {
lines.push(`- **${w.milestoneId}** (${w.title}) — ${w.state}${w.completedUnits} units — $${w.cost.toFixed(2)}`);
}
const orchState = getOrchestratorState();
if (orchState) {
lines.push(`\nTotal cost: $${orchState.totalCost.toFixed(2)}`);
}
pi.sendMessage({ content: lines.join("\n") });
return;
}
if (subCmd === "stop") {
const mid = rest.trim() || undefined;
await stopParallel(projectRoot(), mid);
pi.sendMessage({ content: mid ? `Stopped worker for ${mid}.` : "All parallel workers stopped." });
return;
}
if (subCmd === "pause") {
const mid = rest.trim() || undefined;
pauseWorker(projectRoot(), mid);
pi.sendMessage({ content: mid ? `Paused worker for ${mid}.` : "All parallel workers paused." });
return;
}
if (subCmd === "resume") {
const mid = rest.trim() || undefined;
resumeWorker(projectRoot(), mid);
pi.sendMessage({ content: mid ? `Resumed worker for ${mid}.` : "All parallel workers resumed." });
return;
}
pi.sendMessage({
content: `Unknown parallel subcommand "${subCmd}". Usage: /gsd parallel [start|status|stop|pause|resume]`,
});
return;
}
if (trimmed === "cleanup") {
await handleCleanupBranches(ctx, projectRoot());
await handleCleanupSnapshots(ctx, projectRoot());

View file

@ -0,0 +1,233 @@
/**
* GSD Parallel Eligibility Milestone parallelism analysis.
*
* Analyzes which milestones can safely run in parallel by checking
* dependency satisfaction and file overlap across slice plans.
*/
import { deriveState } from "./state.js";
import { parseRoadmap, parsePlan, loadFile } from "./files.js";
import { resolveMilestoneFile, resolveSliceFile } from "./paths.js";
import { findMilestoneIds } from "./guided-flow.js";
import type { MilestoneRegistryEntry } from "./types.js";
// ─── Types ───────────────────────────────────────────────────────────────────
export interface EligibilityResult {
milestoneId: string;
title: string;
eligible: boolean;
reason: string;
}
export interface ParallelCandidates {
eligible: EligibilityResult[];
ineligible: EligibilityResult[];
fileOverlaps: Array<{ mid1: string; mid2: string; files: string[] }>;
}
// ─── File Collection ─────────────────────────────────────────────────────────
/**
* Collect all `filesLikelyTouched` across every slice plan in a milestone.
* Returns a deduplicated list of file paths.
*/
async function collectTouchedFiles(
basePath: string,
milestoneId: string,
): Promise<string[]> {
const roadmapPath = resolveMilestoneFile(basePath, milestoneId, "ROADMAP");
if (!roadmapPath) return [];
const roadmapContent = await loadFile(roadmapPath);
if (!roadmapContent) return [];
const roadmap = parseRoadmap(roadmapContent);
const files = new Set<string>();
for (const slice of roadmap.slices) {
const planPath = resolveSliceFile(basePath, milestoneId, slice.id, "PLAN");
if (!planPath) continue;
const planContent = await loadFile(planPath);
if (!planContent) continue;
const plan = parsePlan(planContent);
for (const f of plan.filesLikelyTouched) {
files.add(f);
}
}
return [...files];
}
// ─── Overlap Detection ──────────────────────────────────────────────────────
/**
* Compare file sets across milestones and return pairs with overlapping files.
*/
function detectFileOverlaps(
fileSets: Map<string, string[]>,
): Array<{ mid1: string; mid2: string; files: string[] }> {
const overlaps: Array<{ mid1: string; mid2: string; files: string[] }> = [];
const ids = [...fileSets.keys()];
for (let i = 0; i < ids.length; i++) {
const files1 = new Set(fileSets.get(ids[i])!);
for (let j = i + 1; j < ids.length; j++) {
const files2 = fileSets.get(ids[j])!;
const shared = files2.filter(f => files1.has(f));
if (shared.length > 0) {
overlaps.push({ mid1: ids[i], mid2: ids[j], files: shared.sort() });
}
}
}
return overlaps;
}
// ─── Analysis ────────────────────────────────────────────────────────────────
/**
* Analyze milestones for parallel execution eligibility.
*
* A milestone is eligible if:
* 1. It is not complete
* 2. Its dependencies (`dependsOn`) are all complete
* 3. It does not have file overlap with other eligible milestones
* (overlaps are flagged as warnings but do not disqualify)
*/
export async function analyzeParallelEligibility(
basePath: string,
): Promise<ParallelCandidates> {
const milestoneIds = findMilestoneIds(basePath);
const state = await deriveState(basePath);
const registry = state.registry;
// Build a lookup for quick status checks
const registryMap = new Map<string, MilestoneRegistryEntry>();
for (const entry of registry) {
registryMap.set(entry.id, entry);
}
const eligible: EligibilityResult[] = [];
const ineligible: EligibilityResult[] = [];
for (const mid of milestoneIds) {
const entry = registryMap.get(mid);
const title = entry?.title ?? mid;
const status = entry?.status ?? "pending";
// Rule 1: skip complete milestones
if (status === "complete") {
ineligible.push({
milestoneId: mid,
title,
eligible: false,
reason: "Already complete.",
});
continue;
}
// Rule 2: check dependency satisfaction
const deps = entry?.dependsOn ?? [];
const unsatisfied = deps.filter(dep => {
const depEntry = registryMap.get(dep);
return !depEntry || depEntry.status !== "complete";
});
if (unsatisfied.length > 0) {
ineligible.push({
milestoneId: mid,
title,
eligible: false,
reason: `Blocked by incomplete dependencies: ${unsatisfied.join(", ")}.`,
});
continue;
}
eligible.push({
milestoneId: mid,
title,
eligible: true,
reason: "All dependencies satisfied.",
});
}
// Rule 3: check file overlap among eligible milestones
const fileSets = new Map<string, string[]>();
for (const result of eligible) {
const files = await collectTouchedFiles(basePath, result.milestoneId);
fileSets.set(result.milestoneId, files);
}
const fileOverlaps = detectFileOverlaps(fileSets);
// Annotate eligible milestones that have file overlaps
const overlappingIds = new Set<string>();
for (const overlap of fileOverlaps) {
overlappingIds.add(overlap.mid1);
overlappingIds.add(overlap.mid2);
}
for (const result of eligible) {
if (overlappingIds.has(result.milestoneId)) {
result.reason = "All dependencies satisfied. WARNING: has file overlap with another eligible milestone.";
}
}
return { eligible, ineligible, fileOverlaps };
}
// ─── Formatting ──────────────────────────────────────────────────────────────
/**
* Produce a human-readable report of parallel eligibility analysis.
*/
export function formatEligibilityReport(candidates: ParallelCandidates): string {
const lines: string[] = [];
lines.push("# Parallel Eligibility Report");
lines.push("");
// Eligible milestones
lines.push(`## Eligible for Parallel Execution (${candidates.eligible.length})`);
lines.push("");
if (candidates.eligible.length === 0) {
lines.push("No milestones are currently eligible for parallel execution.");
} else {
for (const e of candidates.eligible) {
lines.push(`- **${e.milestoneId}** — ${e.title}`);
lines.push(` ${e.reason}`);
}
}
lines.push("");
// Ineligible milestones
lines.push(`## Ineligible (${candidates.ineligible.length})`);
lines.push("");
if (candidates.ineligible.length === 0) {
lines.push("All milestones are eligible.");
} else {
for (const e of candidates.ineligible) {
lines.push(`- **${e.milestoneId}** — ${e.title}`);
lines.push(` ${e.reason}`);
}
}
lines.push("");
// File overlap warnings
if (candidates.fileOverlaps.length > 0) {
lines.push(`## File Overlap Warnings (${candidates.fileOverlaps.length})`);
lines.push("");
for (const overlap of candidates.fileOverlaps) {
lines.push(`- **${overlap.mid1}** <-> **${overlap.mid2}** — ${overlap.files.length} shared file(s):`);
for (const f of overlap.files) {
lines.push(` - \`${f}\``);
}
}
lines.push("");
}
return lines.join("\n");
}

View file

@ -0,0 +1,308 @@
/**
* GSD Parallel Orchestrator Core engine for parallel milestone orchestration.
*
* Manages worker lifecycle, budget tracking, and coordination. Workers are
* separate processes spawned via child_process, each running in its own git
* worktree with GSD_MILESTONE_LOCK env var set. The coordinator monitors
* workers via session status files (see session-status-io.ts).
*/
import { type ChildProcess } from "node:child_process";
import { join } from "node:path";
import { gsdRoot } from "./paths.js";
import { resolveParallelConfig } from "./preferences.js";
import type { GSDPreferences } from "./preferences.js";
import type { ParallelConfig } from "./types.js";
import {
writeSessionStatus,
readAllSessionStatuses,
removeSessionStatus,
sendSignal,
cleanupStaleSessions,
type SessionStatus,
} from "./session-status-io.js";
import {
analyzeParallelEligibility,
type ParallelCandidates,
} from "./parallel-eligibility.js";
// ─── Types ─────────────────────────────────────────────────────────────────
export interface WorkerInfo {
milestoneId: string;
title: string;
pid: number;
process: ChildProcess | null; // null after process exits
worktreePath: string;
startedAt: number;
state: "running" | "paused" | "stopped" | "error";
completedUnits: number;
cost: number;
}
export interface OrchestratorState {
active: boolean;
workers: Map<string, WorkerInfo>;
config: ParallelConfig;
totalCost: number;
startedAt: number;
}
// ─── Module State ──────────────────────────────────────────────────────────
let state: OrchestratorState | null = null;
// ─── Accessors ─────────────────────────────────────────────────────────────
/** Returns true if the orchestrator is active and has been initialized. */
export function isParallelActive(): boolean {
return state?.active ?? false;
}
/** Returns the current orchestrator state, or null if not initialized. */
export function getOrchestratorState(): OrchestratorState | null {
return state;
}
/** Returns a snapshot of all tracked workers as an array. */
export function getWorkerStatuses(): WorkerInfo[] {
if (!state) return [];
return [...state.workers.values()];
}
// ─── Preparation ───────────────────────────────────────────────────────────
/**
* Analyze eligibility and prepare for parallel start.
* Returns the candidates report without actually starting workers.
*/
export async function prepareParallelStart(
basePath: string,
_prefs: GSDPreferences | undefined,
): Promise<ParallelCandidates> {
return analyzeParallelEligibility(basePath);
}
// ─── Start ─────────────────────────────────────────────────────────────────
/**
* Start parallel execution with the given eligible milestones.
* Creates tracking structures and writes initial session status files.
*
* Actual worker process spawning is deferred to the auto-mode integration
* layer; this function sets up the orchestrator state and bookkeeping only.
*/
export async function startParallel(
basePath: string,
milestoneIds: string[],
prefs: GSDPreferences | undefined,
): Promise<{ started: string[]; errors: Array<{ mid: string; error: string }> }> {
const config = resolveParallelConfig(prefs);
const now = Date.now();
// Initialize orchestrator state
state = {
active: true,
workers: new Map(),
config,
totalCost: 0,
startedAt: now,
};
const started: string[] = [];
const errors: Array<{ mid: string; error: string }> = [];
// Cap to max_workers
const toStart = milestoneIds.slice(0, config.max_workers);
for (const mid of toStart) {
try {
const worktreePath = join(gsdRoot(basePath), "worktrees", mid);
const worker: WorkerInfo = {
milestoneId: mid,
title: mid,
pid: process.pid,
process: null,
worktreePath,
startedAt: now,
state: "running",
completedUnits: 0,
cost: 0,
};
state.workers.set(mid, worker);
// Write initial session status so the coordinator can track it
const sessionStatus: SessionStatus = {
milestoneId: mid,
pid: process.pid,
state: "running",
currentUnit: null,
completedUnits: 0,
cost: 0,
lastHeartbeat: now,
startedAt: now,
worktreePath,
};
writeSessionStatus(basePath, sessionStatus);
started.push(mid);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
errors.push({ mid, error: message });
}
}
// If nothing started successfully, deactivate
if (started.length === 0) {
state.active = false;
}
return { started, errors };
}
// ─── Stop ──────────────────────────────────────────────────────────────────
/**
* Stop all workers or a specific milestone's worker.
* Sends stop signals and updates tracking state.
*/
export async function stopParallel(
basePath: string,
milestoneId?: string,
): Promise<void> {
if (!state) return;
const targets = milestoneId
? [milestoneId]
: [...state.workers.keys()];
for (const mid of targets) {
const worker = state.workers.get(mid);
if (!worker) continue;
// Send stop signal to the worker process
sendSignal(basePath, mid, "stop");
// Update in-memory state
worker.state = "stopped";
worker.process = null;
// Clean up session status file
removeSessionStatus(basePath, mid);
}
// If stopping all workers, deactivate the orchestrator
if (!milestoneId) {
state.active = false;
}
}
// ─── Pause / Resume ────────────────────────────────────────────────────────
/** Pause a specific worker or all workers. */
export function pauseWorker(
basePath: string,
milestoneId?: string,
): void {
if (!state) return;
const targets = milestoneId
? [milestoneId]
: [...state.workers.keys()];
for (const mid of targets) {
const worker = state.workers.get(mid);
if (!worker || worker.state !== "running") continue;
sendSignal(basePath, mid, "pause");
worker.state = "paused";
}
}
/** Resume a specific worker or all workers. */
export function resumeWorker(
basePath: string,
milestoneId?: string,
): void {
if (!state) return;
const targets = milestoneId
? [milestoneId]
: [...state.workers.keys()];
for (const mid of targets) {
const worker = state.workers.get(mid);
if (!worker || worker.state !== "paused") continue;
sendSignal(basePath, mid, "resume");
worker.state = "running";
}
}
// ─── Status Refresh ────────────────────────────────────────────────────────
/**
* Poll worker statuses from disk and update orchestrator state.
* Call this periodically from the dashboard refresh cycle.
*/
export function refreshWorkerStatuses(basePath: string): void {
if (!state) return;
// Clean up stale sessions first
const staleIds = cleanupStaleSessions(basePath);
for (const mid of staleIds) {
const worker = state.workers.get(mid);
if (worker) {
worker.state = "error";
worker.process = null;
}
}
// Read all live session statuses from disk
const statuses = readAllSessionStatuses(basePath);
const statusMap = new Map<string, SessionStatus>();
for (const s of statuses) {
statusMap.set(s.milestoneId, s);
}
// Update in-memory worker state from disk data
for (const [mid, worker] of state.workers) {
const diskStatus = statusMap.get(mid);
if (!diskStatus) continue;
worker.state = diskStatus.state;
worker.completedUnits = diskStatus.completedUnits;
worker.cost = diskStatus.cost;
worker.pid = diskStatus.pid;
}
// Recalculate aggregate cost
state.totalCost = 0;
for (const worker of state.workers.values()) {
state.totalCost += worker.cost;
}
}
// ─── Budget ────────────────────────────────────────────────────────────────
/** Get aggregate cost across all workers. */
export function getAggregateCost(): number {
if (!state) return 0;
return state.totalCost;
}
/** Check if budget ceiling has been reached. */
export function isBudgetExceeded(): boolean {
if (!state) return false;
if (state.config.budget_ceiling == null) return false;
return state.totalCost >= state.config.budget_ceiling;
}
// ─── Reset ─────────────────────────────────────────────────────────────────
/** Reset orchestrator state. Called on clean shutdown. */
export function resetOrchestrator(): void {
state = null;
}

View file

@ -75,6 +75,7 @@ const KNOWN_PREFERENCE_KEYS = new Set<string>([
"token_profile",
"phases",
"auto_visualize",
"parallel",
]);
export interface GSDSkillRule {
@ -171,6 +172,7 @@ export interface GSDPreferences {
token_profile?: TokenProfile;
phases?: PhaseSkipPreferences;
auto_visualize?: boolean;
parallel?: import("./types.js").ParallelConfig;
}
export interface LoadedGSDPreferences {
@ -768,6 +770,9 @@ function mergePreferences(base: GSDPreferences, override: GSDPreferences): GSDPr
phases: (base.phases || override.phases)
? { ...(base.phases ?? {}), ...(override.phases ?? {}) }
: undefined,
parallel: (base.parallel || override.parallel)
? { ...(base.parallel ?? {}), ...(override.parallel ?? {}) } as import("./types.js").ParallelConfig
: undefined,
};
}
@ -1154,6 +1159,51 @@ export function validatePreferences(preferences: GSDPreferences): {
}
}
// ─── Parallel Config ────────────────────────────────────────────────────
if (preferences.parallel && typeof preferences.parallel === "object") {
const p = preferences.parallel as unknown as Record<string, unknown>;
const parallel: Record<string, unknown> = {};
if (p.enabled !== undefined) {
if (typeof p.enabled === "boolean") parallel.enabled = p.enabled;
else errors.push("parallel.enabled must be a boolean");
}
if (p.max_workers !== undefined) {
if (typeof p.max_workers === "number" && p.max_workers >= 1 && p.max_workers <= 4) {
parallel.max_workers = Math.floor(p.max_workers);
} else {
errors.push("parallel.max_workers must be a number between 1 and 4");
}
}
if (p.budget_ceiling !== undefined) {
if (typeof p.budget_ceiling === "number" && p.budget_ceiling > 0) {
parallel.budget_ceiling = p.budget_ceiling;
} else {
errors.push("parallel.budget_ceiling must be a positive number");
}
}
if (p.merge_strategy !== undefined) {
const validStrategies = new Set(["per-slice", "per-milestone"]);
if (typeof p.merge_strategy === "string" && validStrategies.has(p.merge_strategy)) {
parallel.merge_strategy = p.merge_strategy;
} else {
errors.push("parallel.merge_strategy must be one of: per-slice, per-milestone");
}
}
if (p.auto_merge !== undefined) {
const validModes = new Set(["auto", "confirm", "manual"]);
if (typeof p.auto_merge === "string" && validModes.has(p.auto_merge)) {
parallel.auto_merge = p.auto_merge;
} else {
errors.push("parallel.auto_merge must be one of: auto, confirm, manual");
}
}
if (Object.keys(parallel).length > 0) {
validated.parallel = parallel as unknown as import("./types.js").ParallelConfig;
}
}
// ─── Git Preferences ───────────────────────────────────────────────────
if (preferences.git && typeof preferences.git === "object") {
const git: Record<string, unknown> = {};
@ -1371,3 +1421,15 @@ export function updatePreferencesModels(models: GSDModelConfigV2): void {
writeFileSync(prefsPath, content, "utf-8");
}
// ─── Parallel Config Resolver ──────────────────────────────────────────────
export function resolveParallelConfig(prefs: GSDPreferences | undefined): import("./types.js").ParallelConfig {
return {
enabled: prefs?.parallel?.enabled ?? false,
max_workers: Math.max(1, Math.min(4, prefs?.parallel?.max_workers ?? 2)),
budget_ceiling: prefs?.parallel?.budget_ceiling,
merge_strategy: prefs?.parallel?.merge_strategy ?? "per-milestone",
auto_merge: prefs?.parallel?.auto_merge ?? "confirm",
};
}

View file

@ -0,0 +1,197 @@
/**
* GSD Session Status I/O
*
* File-based IPC protocol for coordinator-worker communication in
* parallel milestone orchestration. Each worker writes its status to a
* file; the coordinator reads all status files to monitor progress.
*
* Atomic writes (write to .tmp, then rename) prevent partial reads.
* Signal files let the coordinator send pause/resume/stop/rebase to workers.
* Stale detection combines PID liveness checks with heartbeat timeouts.
*/
import {
writeFileSync,
readFileSync,
renameSync,
unlinkSync,
readdirSync,
mkdirSync,
existsSync,
} from "node:fs";
import { join } from "node:path";
import { gsdRoot } from "./paths.js";
// ─── Types ─────────────────────────────────────────────────────────────────
export interface SessionStatus {
milestoneId: string;
pid: number;
state: "running" | "paused" | "stopped" | "error";
currentUnit: { type: string; id: string; startedAt: number } | null;
completedUnits: number;
cost: number;
lastHeartbeat: number;
startedAt: number;
worktreePath: string;
}
export type SessionSignal = "pause" | "resume" | "stop" | "rebase";
export interface SignalMessage {
signal: SessionSignal;
sentAt: number;
from: "coordinator";
}
// ─── Constants ─────────────────────────────────────────────────────────────
const PARALLEL_DIR = "parallel";
const STATUS_SUFFIX = ".status.json";
const SIGNAL_SUFFIX = ".signal.json";
const TMP_SUFFIX = ".tmp";
const DEFAULT_STALE_TIMEOUT_MS = 30_000;
// ─── Helpers ───────────────────────────────────────────────────────────────
function parallelDir(basePath: string): string {
return join(gsdRoot(basePath), PARALLEL_DIR);
}
function statusPath(basePath: string, milestoneId: string): string {
return join(parallelDir(basePath), `${milestoneId}${STATUS_SUFFIX}`);
}
function signalPath(basePath: string, milestoneId: string): string {
return join(parallelDir(basePath), `${milestoneId}${SIGNAL_SUFFIX}`);
}
function ensureParallelDir(basePath: string): void {
const dir = parallelDir(basePath);
if (!existsSync(dir)) {
mkdirSync(dir, { recursive: true });
}
}
function isPidAlive(pid: number): boolean {
try {
process.kill(pid, 0);
return true;
} catch {
return false;
}
}
// ─── Status I/O ────────────────────────────────────────────────────────────
/** Write session status atomically (write to .tmp, then rename). */
export function writeSessionStatus(basePath: string, status: SessionStatus): void {
try {
ensureParallelDir(basePath);
const dest = statusPath(basePath, status.milestoneId);
const tmp = dest + TMP_SUFFIX;
writeFileSync(tmp, JSON.stringify(status, null, 2), "utf-8");
renameSync(tmp, dest);
} catch { /* non-fatal */ }
}
/** Read a specific milestone's session status. */
export function readSessionStatus(basePath: string, milestoneId: string): SessionStatus | null {
try {
const p = statusPath(basePath, milestoneId);
if (!existsSync(p)) return null;
const raw = readFileSync(p, "utf-8");
return JSON.parse(raw) as SessionStatus;
} catch {
return null;
}
}
/** Read all session status files from .gsd/parallel/. */
export function readAllSessionStatuses(basePath: string): SessionStatus[] {
const dir = parallelDir(basePath);
if (!existsSync(dir)) return [];
const results: SessionStatus[] = [];
try {
const entries = readdirSync(dir);
for (const entry of entries) {
if (!entry.endsWith(STATUS_SUFFIX)) continue;
try {
const raw = readFileSync(join(dir, entry), "utf-8");
results.push(JSON.parse(raw) as SessionStatus);
} catch { /* skip corrupt files */ }
}
} catch { /* non-fatal */ }
return results;
}
/** Remove a milestone's session status file. */
export function removeSessionStatus(basePath: string, milestoneId: string): void {
try {
const p = statusPath(basePath, milestoneId);
if (existsSync(p)) unlinkSync(p);
} catch { /* non-fatal */ }
}
// ─── Signal I/O ────────────────────────────────────────────────────────────
/** Write a signal file for a worker to consume. */
export function sendSignal(basePath: string, milestoneId: string, signal: SessionSignal): void {
try {
ensureParallelDir(basePath);
const dest = signalPath(basePath, milestoneId);
const tmp = dest + TMP_SUFFIX;
const msg: SignalMessage = { signal, sentAt: Date.now(), from: "coordinator" };
writeFileSync(tmp, JSON.stringify(msg, null, 2), "utf-8");
renameSync(tmp, dest);
} catch { /* non-fatal */ }
}
/** Read and delete a signal file (atomic consume). Returns null if no signal pending. */
export function consumeSignal(basePath: string, milestoneId: string): SignalMessage | null {
try {
const p = signalPath(basePath, milestoneId);
if (!existsSync(p)) return null;
const raw = readFileSync(p, "utf-8");
unlinkSync(p);
return JSON.parse(raw) as SignalMessage;
} catch {
return null;
}
}
// ─── Stale Detection ───────────────────────────────────────────────────────
/** Check whether a session is stale (PID dead or heartbeat timed out). */
export function isSessionStale(
status: SessionStatus,
timeoutMs: number = DEFAULT_STALE_TIMEOUT_MS,
): boolean {
if (!isPidAlive(status.pid)) return true;
const elapsed = Date.now() - status.lastHeartbeat;
return elapsed > timeoutMs;
}
/** Find and remove stale sessions. Returns the milestone IDs that were cleaned up. */
export function cleanupStaleSessions(
basePath: string,
timeoutMs: number = DEFAULT_STALE_TIMEOUT_MS,
): string[] {
const removed: string[] = [];
const statuses = readAllSessionStatuses(basePath);
for (const status of statuses) {
if (isSessionStale(status, timeoutMs)) {
removeSessionStatus(basePath, status.milestoneId);
// Also clean up any lingering signal file
try {
const sig = signalPath(basePath, status.milestoneId);
if (existsSync(sig)) unlinkSync(sig);
} catch { /* non-fatal */ }
removed.push(status.milestoneId);
}
}
return removed;
}

View file

@ -0,0 +1,455 @@
/**
* Tests for parallel milestone orchestration modules:
* - session-status-io.ts (file-based IPC)
* - parallel-eligibility.ts (eligibility formatting)
* - parallel-orchestrator.ts (orchestrator lifecycle)
* - preferences.ts (parallel config validation)
*/
import { describe, it, beforeEach, afterEach } from "node:test";
import assert from "node:assert/strict";
import { mkdtempSync, mkdirSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import {
writeSessionStatus,
readSessionStatus,
readAllSessionStatuses,
removeSessionStatus,
sendSignal,
consumeSignal,
isSessionStale,
cleanupStaleSessions,
type SessionStatus,
} from "../session-status-io.js";
import {
formatEligibilityReport,
type ParallelCandidates,
} from "../parallel-eligibility.js";
import {
isParallelActive,
getOrchestratorState,
getWorkerStatuses,
startParallel,
stopParallel,
pauseWorker,
resumeWorker,
getAggregateCost,
isBudgetExceeded,
resetOrchestrator,
} from "../parallel-orchestrator.js";
import { validatePreferences, resolveParallelConfig } from "../preferences.js";
// ─── Test Helpers ────────────────────────────────────────────────────────────
function makeTmpBase(): string {
const base = mkdtempSync(join(tmpdir(), "gsd-parallel-test-"));
mkdirSync(join(base, ".gsd"), { recursive: true });
return base;
}
function makeStatus(overrides: Partial<SessionStatus> = {}): SessionStatus {
return {
milestoneId: "M001",
pid: process.pid,
state: "running",
currentUnit: { type: "execute-task", id: "M001/S01/T01", startedAt: Date.now() },
completedUnits: 3,
cost: 1.50,
lastHeartbeat: Date.now(),
startedAt: Date.now() - 60_000,
worktreePath: "/tmp/test-worktree",
...overrides,
};
}
// ─── session-status-io ───────────────────────────────────────────────────────
describe("session-status-io: status roundtrip", () => {
let base: string;
beforeEach(() => { base = makeTmpBase(); });
afterEach(() => { rmSync(base, { recursive: true, force: true }); });
it("write then read returns identical status", () => {
const status = makeStatus();
writeSessionStatus(base, status);
const read = readSessionStatus(base, "M001");
assert.ok(read);
assert.equal(read.milestoneId, "M001");
assert.equal(read.pid, process.pid);
assert.equal(read.state, "running");
assert.equal(read.completedUnits, 3);
assert.equal(read.cost, 1.50);
});
it("readSessionStatus returns null for missing milestone", () => {
const read = readSessionStatus(base, "M999");
assert.equal(read, null);
});
it("readAllSessionStatuses returns all written statuses", () => {
writeSessionStatus(base, makeStatus({ milestoneId: "M001" }));
writeSessionStatus(base, makeStatus({ milestoneId: "M002" }));
writeSessionStatus(base, makeStatus({ milestoneId: "M003" }));
const all = readAllSessionStatuses(base);
assert.equal(all.length, 3);
const ids = all.map(s => s.milestoneId).sort();
assert.deepEqual(ids, ["M001", "M002", "M003"]);
});
it("readAllSessionStatuses returns empty array when no parallel dir", () => {
const all = readAllSessionStatuses(base);
assert.equal(all.length, 0);
});
it("removeSessionStatus deletes the file", () => {
writeSessionStatus(base, makeStatus());
assert.ok(readSessionStatus(base, "M001"));
removeSessionStatus(base, "M001");
assert.equal(readSessionStatus(base, "M001"), null);
});
});
describe("session-status-io: signal roundtrip", () => {
let base: string;
beforeEach(() => { base = makeTmpBase(); });
afterEach(() => { rmSync(base, { recursive: true, force: true }); });
it("sendSignal then consumeSignal returns the signal", () => {
sendSignal(base, "M001", "pause");
const signal = consumeSignal(base, "M001");
assert.ok(signal);
assert.equal(signal.signal, "pause");
assert.equal(signal.from, "coordinator");
assert.ok(signal.sentAt > 0);
});
it("consumeSignal removes the signal file", () => {
sendSignal(base, "M001", "stop");
consumeSignal(base, "M001");
const second = consumeSignal(base, "M001");
assert.equal(second, null);
});
it("consumeSignal returns null when no signal pending", () => {
assert.equal(consumeSignal(base, "M001"), null);
});
});
describe("session-status-io: stale detection", () => {
it("isSessionStale returns false for current process PID", () => {
const status = makeStatus({ pid: process.pid, lastHeartbeat: Date.now() });
assert.equal(isSessionStale(status), false);
});
it("isSessionStale returns true for dead PID", () => {
// PID 2147483647 is extremely unlikely to be alive
const status = makeStatus({ pid: 2147483647, lastHeartbeat: Date.now() });
assert.equal(isSessionStale(status), true);
});
it("isSessionStale returns true for expired heartbeat", () => {
const status = makeStatus({
pid: process.pid,
lastHeartbeat: Date.now() - 60_000,
});
assert.equal(isSessionStale(status, 5_000), true);
});
it("isSessionStale returns false for recent heartbeat with alive PID", () => {
const status = makeStatus({
pid: process.pid,
lastHeartbeat: Date.now(),
});
assert.equal(isSessionStale(status, 30_000), false);
});
});
describe("session-status-io: cleanupStaleSessions", () => {
let base: string;
beforeEach(() => { base = makeTmpBase(); });
afterEach(() => { rmSync(base, { recursive: true, force: true }); });
it("removes stale sessions and returns their IDs", () => {
// Write a stale session (dead PID)
writeSessionStatus(base, makeStatus({
milestoneId: "M001",
pid: 2147483647,
}));
// Write a live session
writeSessionStatus(base, makeStatus({
milestoneId: "M002",
pid: process.pid,
lastHeartbeat: Date.now(),
}));
const removed = cleanupStaleSessions(base);
assert.deepEqual(removed, ["M001"]);
assert.equal(readSessionStatus(base, "M001"), null);
assert.ok(readSessionStatus(base, "M002"));
});
});
// ─── parallel-eligibility ────────────────────────────────────────────────────
describe("parallel-eligibility: formatEligibilityReport", () => {
it("formats empty candidates", () => {
const candidates: ParallelCandidates = {
eligible: [],
ineligible: [],
fileOverlaps: [],
};
const report = formatEligibilityReport(candidates);
assert.ok(report.includes("Eligible for Parallel Execution (0)"));
assert.ok(report.includes("No milestones are currently eligible"));
});
it("formats eligible milestones", () => {
const candidates: ParallelCandidates = {
eligible: [
{ milestoneId: "M001", title: "Auth System", eligible: true, reason: "All dependencies satisfied." },
{ milestoneId: "M002", title: "Dashboard", eligible: true, reason: "All dependencies satisfied." },
],
ineligible: [],
fileOverlaps: [],
};
const report = formatEligibilityReport(candidates);
assert.ok(report.includes("Eligible for Parallel Execution (2)"));
assert.ok(report.includes("**M001** — Auth System"));
assert.ok(report.includes("**M002** — Dashboard"));
});
it("formats ineligible milestones with reasons", () => {
const candidates: ParallelCandidates = {
eligible: [],
ineligible: [
{ milestoneId: "M003", title: "API", eligible: false, reason: "Blocked by incomplete dependencies: M001." },
],
fileOverlaps: [],
};
const report = formatEligibilityReport(candidates);
assert.ok(report.includes("Ineligible (1)"));
assert.ok(report.includes("Blocked by incomplete dependencies"));
});
it("formats file overlap warnings", () => {
const candidates: ParallelCandidates = {
eligible: [
{ milestoneId: "M001", title: "Auth", eligible: true, reason: "OK" },
{ milestoneId: "M002", title: "API", eligible: true, reason: "OK" },
],
ineligible: [],
fileOverlaps: [
{ mid1: "M001", mid2: "M002", files: ["src/types.ts", "src/utils.ts"] },
],
};
const report = formatEligibilityReport(candidates);
assert.ok(report.includes("File Overlap Warnings (1)"));
assert.ok(report.includes("`src/types.ts`"));
assert.ok(report.includes("`src/utils.ts`"));
});
});
// ─── parallel-orchestrator ───────────────────────────────────────────────────
describe("parallel-orchestrator: lifecycle", () => {
let base: string;
beforeEach(() => {
base = makeTmpBase();
resetOrchestrator();
});
afterEach(() => {
resetOrchestrator();
rmSync(base, { recursive: true, force: true });
});
it("isParallelActive returns false initially", () => {
assert.equal(isParallelActive(), false);
});
it("getOrchestratorState returns null initially", () => {
assert.equal(getOrchestratorState(), null);
});
it("startParallel initializes orchestrator state", async () => {
const result = await startParallel(base, ["M001", "M002"], {
parallel: { enabled: true, max_workers: 4, merge_strategy: "per-milestone", auto_merge: "confirm" },
});
assert.deepEqual(result.started, ["M001", "M002"]);
assert.equal(result.errors.length, 0);
assert.equal(isParallelActive(), true);
assert.equal(getWorkerStatuses().length, 2);
});
it("startParallel caps to max_workers", async () => {
const result = await startParallel(base, ["M001", "M002", "M003", "M004"], {
parallel: { enabled: true, max_workers: 2, merge_strategy: "per-milestone", auto_merge: "confirm" },
});
assert.deepEqual(result.started, ["M001", "M002"]);
assert.equal(getWorkerStatuses().length, 2);
});
it("startParallel writes session status files", async () => {
await startParallel(base, ["M001"], undefined);
const status = readSessionStatus(base, "M001");
assert.ok(status);
assert.equal(status.milestoneId, "M001");
assert.equal(status.state, "running");
});
it("stopParallel stops all workers", async () => {
await startParallel(base, ["M001", "M002"], undefined);
await stopParallel(base);
assert.equal(isParallelActive(), false);
const workers = getWorkerStatuses();
assert.ok(workers.every(w => w.state === "stopped"));
});
it("stopParallel stops a specific worker", async () => {
await startParallel(base, ["M001", "M002"], undefined);
await stopParallel(base, "M001");
const workers = getWorkerStatuses();
const m1 = workers.find(w => w.milestoneId === "M001");
const m2 = workers.find(w => w.milestoneId === "M002");
assert.equal(m1?.state, "stopped");
assert.equal(m2?.state, "running");
assert.equal(isParallelActive(), true);
});
it("pauseWorker and resumeWorker toggle worker state", async () => {
await startParallel(base, ["M001"], undefined);
pauseWorker(base, "M001");
assert.equal(getWorkerStatuses()[0].state, "paused");
resumeWorker(base, "M001");
assert.equal(getWorkerStatuses()[0].state, "running");
});
it("pauseWorker sends pause signal", async () => {
await startParallel(base, ["M001"], undefined);
pauseWorker(base, "M001");
const signal = consumeSignal(base, "M001");
assert.ok(signal);
assert.equal(signal.signal, "pause");
});
});
describe("parallel-orchestrator: budget", () => {
beforeEach(() => { resetOrchestrator(); });
afterEach(() => { resetOrchestrator(); });
it("getAggregateCost returns 0 when not active", () => {
assert.equal(getAggregateCost(), 0);
});
it("isBudgetExceeded returns false when not active", () => {
assert.equal(isBudgetExceeded(), false);
});
it("isBudgetExceeded returns false when no ceiling set", async () => {
const base = makeTmpBase();
await startParallel(base, ["M001"], undefined);
assert.equal(isBudgetExceeded(), false);
resetOrchestrator();
rmSync(base, { recursive: true, force: true });
});
it("isBudgetExceeded returns true when ceiling reached", async () => {
const base = makeTmpBase();
await startParallel(base, ["M001"], {
parallel: { enabled: true, max_workers: 2, budget_ceiling: 1.00, merge_strategy: "per-milestone", auto_merge: "confirm" },
});
// Manually set totalCost to test budget check
const orchState = getOrchestratorState();
if (orchState) orchState.totalCost = 1.50;
assert.equal(isBudgetExceeded(), true);
resetOrchestrator();
rmSync(base, { recursive: true, force: true });
});
});
// ─── preferences: parallel config ────────────────────────────────────────────
describe("preferences: resolveParallelConfig", () => {
it("returns defaults when prefs is undefined", () => {
const config = resolveParallelConfig(undefined);
assert.equal(config.enabled, false);
assert.equal(config.max_workers, 2);
assert.equal(config.budget_ceiling, undefined);
assert.equal(config.merge_strategy, "per-milestone");
assert.equal(config.auto_merge, "confirm");
});
it("returns defaults when parallel is undefined", () => {
const config = resolveParallelConfig({});
assert.equal(config.enabled, false);
assert.equal(config.max_workers, 2);
});
it("fills in missing fields with defaults", () => {
const config = resolveParallelConfig({
parallel: { enabled: true } as any,
});
assert.equal(config.enabled, true);
assert.equal(config.max_workers, 2);
assert.equal(config.merge_strategy, "per-milestone");
});
it("clamps max_workers to 1-4 range", () => {
assert.equal(resolveParallelConfig({
parallel: { enabled: true, max_workers: 0, merge_strategy: "per-milestone", auto_merge: "confirm" },
}).max_workers, 1);
assert.equal(resolveParallelConfig({
parallel: { enabled: true, max_workers: 10, merge_strategy: "per-milestone", auto_merge: "confirm" },
}).max_workers, 4);
});
});
describe("preferences: validatePreferences parallel config", () => {
it("validates valid parallel config without errors", () => {
const result = validatePreferences({
parallel: {
enabled: true,
max_workers: 3,
budget_ceiling: 50.00,
merge_strategy: "per-slice",
auto_merge: "manual",
},
});
assert.equal(result.errors.length, 0);
assert.ok(result.preferences.parallel);
assert.equal(result.preferences.parallel?.enabled, true);
assert.equal(result.preferences.parallel?.max_workers, 3);
});
it("rejects invalid max_workers", () => {
const result = validatePreferences({
parallel: { max_workers: 10 } as any,
});
assert.ok(result.errors.some(e => e.includes("max_workers")));
});
it("rejects negative budget_ceiling", () => {
const result = validatePreferences({
parallel: { budget_ceiling: -5 } as any,
});
assert.ok(result.errors.some(e => e.includes("budget_ceiling")));
});
it("rejects invalid merge_strategy", () => {
const result = validatePreferences({
parallel: { merge_strategy: "invalid" } as any,
});
assert.ok(result.errors.some(e => e.includes("merge_strategy")));
});
it("rejects invalid auto_merge", () => {
const result = validatePreferences({
parallel: { auto_merge: "yolo" } as any,
});
assert.ok(result.errors.some(e => e.includes("auto_merge")));
});
});

View file

@ -364,3 +364,16 @@ export interface Requirement {
full_content: string; // full requirement text
superseded_by: string | null; // ID of superseding requirement, or null
}
// ─── Parallel Orchestration Types ────────────────────────────────────────
export type MergeStrategy = "per-slice" | "per-milestone";
export type AutoMergeMode = "auto" | "confirm" | "manual";
export interface ParallelConfig {
enabled: boolean;
max_workers: number;
budget_ceiling?: number;
merge_strategy: MergeStrategy;
auto_merge: AutoMergeMode;
}