From a81701979f946a56bb9de9762caaefc139daaaea Mon Sep 17 00:00:00 2001 From: Tom Boucher Date: Sun, 5 Apr 2026 01:55:20 -0400 Subject: [PATCH] feat(parallel): slice-level parallelism with dependency-aware dispatch (#3315) * feat(parallel): add slice-level parallelism with dependency-aware dispatch Fixes #2340 Co-Authored-By: Claude Opus 4.6 (1M context) * fix(parallel): handle missing slice lock, add worktree cleanup, remove dead code - state.ts: When GSD_SLICE_LOCK is set but the locked slice ID is not found in activeMilestoneSlices, log a warning and return a blocked state with a clear error message instead of silently continuing with activeSlice=undefined. Applied in both DB-backed and legacy paths. - slice-parallel-orchestrator.ts: Add worktree cleanup via removeWorktree in stopSliceParallel (after killing workers) and in the catch block of startSliceParallel (for partially created worktrees). Store basePath in SliceOrchestratorState so stopSliceParallel can reference it. - status-guards.ts: isInactiveStatus does not exist on this branch (only isClosedStatus is defined), so no removal needed. Co-Authored-By: Claude Opus 4.6 * fix(state): remove duplicate logWarning import after rebase conflict resolution The rebase merge left two import lines for logWarning from workflow-logger. Consolidated into a single import including logError. Co-Authored-By: Claude Sonnet 4.6 --------- Co-authored-by: Claude Opus 4.6 (1M context) Co-authored-by: trek-e --- src/resources/extensions/gsd/auto.ts | 3 + src/resources/extensions/gsd/auto/phases.ts | 60 +++ .../extensions/gsd/preferences-types.ts | 3 + src/resources/extensions/gsd/preferences.ts | 3 + .../extensions/gsd/slice-parallel-conflict.ts | 86 ++++ .../gsd/slice-parallel-eligibility.ts | 73 +++ .../gsd/slice-parallel-orchestrator.ts | 477 ++++++++++++++++++ src/resources/extensions/gsd/state.ts | 82 ++- .../gsd/tests/slice-parallel-conflict.test.ts | 92 ++++ .../tests/slice-parallel-eligibility.test.ts | 95 ++++ .../tests/slice-parallel-orchestrator.test.ts | 83 +++ 11 files changed, 1041 insertions(+), 16 deletions(-) create mode 100644 src/resources/extensions/gsd/slice-parallel-conflict.ts create mode 100644 src/resources/extensions/gsd/slice-parallel-eligibility.ts create mode 100644 src/resources/extensions/gsd/slice-parallel-orchestrator.ts create mode 100644 src/resources/extensions/gsd/tests/slice-parallel-conflict.test.ts create mode 100644 src/resources/extensions/gsd/tests/slice-parallel-eligibility.test.ts create mode 100644 src/resources/extensions/gsd/tests/slice-parallel-orchestrator.test.ts diff --git a/src/resources/extensions/gsd/auto.ts b/src/resources/extensions/gsd/auto.ts index 3b73a535a..e7558e57c 100644 --- a/src/resources/extensions/gsd/auto.ts +++ b/src/resources/extensions/gsd/auto.ts @@ -188,6 +188,9 @@ import { } from "./auto-post-unit.js"; import { bootstrapAutoSession, openProjectDbIfPresent, type BootstrapDeps } from "./auto-start.js"; import { autoLoop, resolveAgentEnd, resolveAgentEndCancelled, _resetPendingResolve, isSessionSwitchInFlight, type LoopDeps, type ErrorContext } from "./auto-loop.js"; +// Slice-level parallelism (#2340) +import { getEligibleSlices } from "./slice-parallel-eligibility.js"; +import { startSliceParallel } from "./slice-parallel-orchestrator.js"; import { WorktreeResolver, type WorktreeResolverDeps, diff --git a/src/resources/extensions/gsd/auto/phases.ts b/src/resources/extensions/gsd/auto/phases.ts index e291cba64..fba025d22 100644 --- a/src/resources/extensions/gsd/auto/phases.ts +++ b/src/resources/extensions/gsd/auto/phases.ts @@ -34,6 +34,9 @@ import { atomicWriteSync } from "../atomic-write.js"; import { verifyExpectedArtifact, diagnoseExpectedArtifact, buildLoopRemediationSteps } from "../auto-recovery.js"; import { writeUnitRuntimeRecord } from "../unit-runtime.js"; import { withTimeout, FINALIZE_POST_TIMEOUT_MS } from "./finalize-timeout.js"; +import { getEligibleSlices } from "../slice-parallel-eligibility.js"; +import { startSliceParallel } from "../slice-parallel-orchestrator.js"; +import { isDbAvailable, getMilestoneSlices } from "../gsd-db.js"; // ─── generateMilestoneReport ────────────────────────────────────────────────── @@ -219,6 +222,63 @@ export async function runPreDispatch( statePhase: state.phase, }); + // ── Slice-level parallelism gate (#2340) ───────────────────────────── + // When slice_parallel is enabled, check if multiple slices are eligible + // for parallel execution. If so, dispatch them in parallel and stop the + // sequential loop. Workers are spawned via slice-parallel-orchestrator.ts. + if ( + prefs?.slice_parallel?.enabled && + mid && + !process.env.GSD_PARALLEL_WORKER && + isDbAvailable() + ) { + try { + const dbSlices = getMilestoneSlices(mid); + if (dbSlices.length > 0) { + const doneIds = new Set(dbSlices.filter(sl => sl.status === "complete" || sl.status === "done").map(sl => sl.id)); + const sliceInputs = dbSlices.map(sl => ({ + id: sl.id, + done: doneIds.has(sl.id), + depends: sl.depends ?? [], + })); + const eligible = getEligibleSlices(sliceInputs, doneIds); + if (eligible.length > 1) { + debugLog("autoLoop", { + phase: "slice-parallel-dispatch", + iteration: ic.iteration, + mid, + eligibleSlices: eligible.map(e => e.id), + }); + ctx.ui.notify( + `Slice-parallel: dispatching ${eligible.length} eligible slices for ${mid}.`, + "info", + ); + const result = await startSliceParallel( + s.basePath, + mid, + eligible, + { maxWorkers: prefs.slice_parallel.max_workers ?? 2 }, + ); + if (result.started.length > 0) { + ctx.ui.notify( + `Slice-parallel: started ${result.started.length} worker(s): ${result.started.join(", ")}.`, + "info", + ); + await deps.stopAuto(ctx, pi, `Slice-parallel dispatched for ${mid}`); + return { action: "break", reason: "slice-parallel-dispatched" }; + } + // Fall through to sequential if no workers started + } + } + } catch (err) { + debugLog("autoLoop", { + phase: "slice-parallel-check-error", + error: err instanceof Error ? err.message : String(err), + }); + // Non-fatal — fall through to sequential dispatch + } + } + // ── Milestone transition ──────────────────────────────────────────── if (mid && s.currentMilestoneId && mid !== s.currentMilestoneId) { deps.emitJournalEvent({ ts: new Date().toISOString(), flowId: ic.flowId, seq: ic.nextSeq(), eventType: "milestone-transition", data: { from: s.currentMilestoneId, to: mid } }); diff --git a/src/resources/extensions/gsd/preferences-types.ts b/src/resources/extensions/gsd/preferences-types.ts index dd87949d4..043bb4055 100644 --- a/src/resources/extensions/gsd/preferences-types.ts +++ b/src/resources/extensions/gsd/preferences-types.ts @@ -104,6 +104,7 @@ export const KNOWN_PREFERENCE_KEYS = new Set([ "context_management", "experimental", "codebase", + "slice_parallel", ]); /** Canonical list of all dispatch unit types. */ @@ -288,6 +289,8 @@ export interface GSDPreferences { experimental?: ExperimentalPreferences; /** Configuration for the codebase map generator (/gsd codebase). */ codebase?: CodebaseMapPreferences; + /** Slice-level parallelism within a milestone. Disabled by default. */ + slice_parallel?: { enabled?: boolean; max_workers?: number }; } export interface LoadedGSDPreferences { diff --git a/src/resources/extensions/gsd/preferences.ts b/src/resources/extensions/gsd/preferences.ts index 4d88f0271..11614104f 100644 --- a/src/resources/extensions/gsd/preferences.ts +++ b/src/resources/extensions/gsd/preferences.ts @@ -388,6 +388,9 @@ function mergePreferences(base: GSDPreferences, override: GSDPreferences): GSDPr ].filter(Boolean), } : undefined, + slice_parallel: (base.slice_parallel || override.slice_parallel) + ? { ...(base.slice_parallel ?? {}), ...(override.slice_parallel ?? {}) } + : undefined, }; } diff --git a/src/resources/extensions/gsd/slice-parallel-conflict.ts b/src/resources/extensions/gsd/slice-parallel-conflict.ts new file mode 100644 index 000000000..dd540a627 --- /dev/null +++ b/src/resources/extensions/gsd/slice-parallel-conflict.ts @@ -0,0 +1,86 @@ +/** + * GSD Slice Parallel Conflict Detection — File overlap analysis between slices. + * + * Reads PLAN.md for each slice and extracts file paths mentioned in task + * descriptions. If two slices share more than 5 file paths, they are considered + * conflicting and should not run in parallel. + * + * Conservative by default: missing PLAN = block parallel execution. + */ + +import { existsSync, readFileSync } from "node:fs"; +import { join } from "node:path"; + +// ─── File Path Extraction ───────────────────────────────────────────────────── + +/** + * Extract file paths from a PLAN.md content string. + * Matches common patterns like `src/...`, `lib/...`, paths with extensions. + */ +function extractFilePaths(content: string): Set { + const paths = new Set(); + + // Match file-like patterns: word/word paths with extensions, or src/lib/etc prefixed paths + const patterns = [ + // Paths like src/foo/bar.ts, lib/utils.js, etc. + /(?:src|lib|test|tests|app|pkg|cmd|internal|components|pages|api|utils|config|scripts|dist|build)\/[\w./-]+\.\w+/g, + // Generic path with at least one slash and extension + /(?5 shared files or missing plan). + */ +export function hasFileConflict( + basePath: string, + mid: string, + sliceA: string, + sliceB: string, +): boolean { + const planPathA = join(basePath, ".gsd", "milestones", mid, sliceA, "PLAN.md"); + const planPathB = join(basePath, ".gsd", "milestones", mid, sliceB, "PLAN.md"); + + // Conservative: missing PLAN = block + if (!existsSync(planPathA) || !existsSync(planPathB)) { + return true; + } + + const contentA = readFileSync(planPathA, "utf-8"); + const contentB = readFileSync(planPathB, "utf-8"); + + const filesA = extractFilePaths(contentA); + const filesB = extractFilePaths(contentB); + + // If either has no files extracted, no conflict detectable → allow + if (filesA.size === 0 || filesB.size === 0) { + return false; + } + + // Count shared files + let sharedCount = 0; + for (const file of filesA) { + if (filesB.has(file)) { + sharedCount++; + } + } + + return sharedCount > 5; +} diff --git a/src/resources/extensions/gsd/slice-parallel-eligibility.ts b/src/resources/extensions/gsd/slice-parallel-eligibility.ts new file mode 100644 index 000000000..f00fa0f43 --- /dev/null +++ b/src/resources/extensions/gsd/slice-parallel-eligibility.ts @@ -0,0 +1,73 @@ +/** + * GSD Slice Parallel Eligibility — Pure function to determine which slices + * within a milestone can run in parallel based on dependency satisfaction. + * + * This is the slice-level equivalent of parallel-eligibility.ts (which operates + * at milestone scope). The key difference is the positional fallback: slices + * without explicit dependencies use sequential ordering as an implicit constraint. + */ + +// ─── Types ──────────────────────────────────────────────────────────────────── + +export interface SliceInput { + id: string; + done: boolean; + depends: string[]; +} + +export interface EligibleSlice { + id: string; +} + +// ─── Core Logic ─────────────────────────────────────────────────────────────── + +/** + * Determine which slices are eligible for parallel execution. + * + * Rules: + * 1. Done slices are never eligible (nothing to do). + * 2. A slice with explicit `depends` entries is eligible when ALL deps + * appear in `completedSliceIds`. + * 3. A slice with NO `depends` entries uses positional fallback: it is + * eligible only when every positionally-earlier slice is done. + * This preserves backward compatibility with roadmaps that don't + * declare inter-slice dependencies. + * + * @param slices All slices in the milestone (ordered by position). + * @param completedSliceIds Set of slice IDs that are already complete. + * @returns Array of eligible slice descriptors. + */ +export function getEligibleSlices( + slices: SliceInput[], + completedSliceIds: Set, +): EligibleSlice[] { + const eligible: EligibleSlice[] = []; + + for (let i = 0; i < slices.length; i++) { + const slice = slices[i]; + + // Rule 1: skip done slices + if (slice.done) continue; + + const hasExplicitDeps = slice.depends.length > 0; + + if (hasExplicitDeps) { + // Rule 2: explicit dependencies — all must be satisfied + const allDepsSatisfied = slice.depends.every(dep => completedSliceIds.has(dep)); + if (allDepsSatisfied) { + eligible.push({ id: slice.id }); + } + } else { + // Rule 3: no deps declared — positional fallback + // Eligible only if all positionally-earlier slices are done + const allEarlierDone = slices.slice(0, i).every( + earlier => earlier.done || completedSliceIds.has(earlier.id), + ); + if (allEarlierDone) { + eligible.push({ id: slice.id }); + } + } + } + + return eligible; +} diff --git a/src/resources/extensions/gsd/slice-parallel-orchestrator.ts b/src/resources/extensions/gsd/slice-parallel-orchestrator.ts new file mode 100644 index 000000000..346237651 --- /dev/null +++ b/src/resources/extensions/gsd/slice-parallel-orchestrator.ts @@ -0,0 +1,477 @@ +/** + * GSD Slice Parallel Orchestrator — Engine for parallel slice execution + * within a single milestone. + * + * Mirrors the existing parallel-orchestrator.ts pattern at slice scope + * instead of milestone scope. Workers are separate processes spawned via + * child_process, each running in its own git worktree with GSD_SLICE_LOCK + * + GSD_MILESTONE_LOCK env vars set. + * + * Key differences from milestone-level parallelism: + * - Scope: slices within one milestone, not milestones within a project + * - Lock env: GSD_SLICE_LOCK (in addition to GSD_MILESTONE_LOCK) + * - Conflict check: file overlap between slice plans (slice-parallel-conflict.ts) + */ + +import { spawn, type ChildProcess } from "node:child_process"; +import { + appendFileSync, + existsSync, + writeFileSync, + readFileSync, + mkdirSync, +} from "node:fs"; +import { join, dirname } from "node:path"; +import { fileURLToPath } from "node:url"; +import { gsdRoot } from "./paths.js"; +import { createWorktree, worktreePath, removeWorktree } from "./worktree-manager.js"; +import { autoWorktreeBranch, runWorktreePostCreateHook } from "./auto-worktree.js"; +import { + writeSessionStatus, + removeSessionStatus, +} from "./session-status-io.js"; +import { hasFileConflict } from "./slice-parallel-conflict.js"; +import { getErrorMessage } from "./error-utils.js"; + +// ─── Types ───────────────────────────────────────────────────────────────── + +export interface SliceWorkerInfo { + milestoneId: string; + sliceId: string; + pid: number; + process: ChildProcess | null; + worktreePath: string; + startedAt: number; + state: "running" | "stopped" | "error"; + completedUnits: number; + cost: number; + cleanup?: () => void; +} + +export interface SliceOrchestratorState { + active: boolean; + workers: Map; + totalCost: number; + budgetCeiling?: number; + maxWorkers: number; + startedAt: number; + basePath: string; +} + +export interface StartSliceParallelOpts { + maxWorkers?: number; + budgetCeiling?: number; +} + +// ─── Module State ────────────────────────────────────────────────────────── + +let sliceState: SliceOrchestratorState | null = null; + +// ─── Public API ──────────────────────────────────────────────────────────── + +/** + * Check whether slice-level parallel is currently active. + */ +export function isSliceParallelActive(): boolean { + return sliceState?.active === true; +} + +/** + * Get current slice orchestrator state (read-only snapshot). + */ +export function getSliceOrchestratorState(): SliceOrchestratorState | null { + return sliceState; +} + +/** + * Start parallel execution for eligible slices within a milestone. + * + * For each eligible slice: create a worktree, spawn `gsd --mode json --print "/gsd auto"` + * with env GSD_SLICE_LOCK= + GSD_MILESTONE_LOCK= + GSD_PARALLEL_WORKER=1. + */ +export async function startSliceParallel( + basePath: string, + milestoneId: string, + eligibleSlices: Array<{ id: string }>, + opts: StartSliceParallelOpts = {}, +): Promise<{ started: string[]; errors: Array<{ sid: string; error: string }> }> { + // Prevent nesting: if already a parallel worker, refuse + if (process.env.GSD_PARALLEL_WORKER) { + return { started: [], errors: [{ sid: "all", error: "Cannot start slice-parallel from within a parallel worker" }] }; + } + + const maxWorkers = opts.maxWorkers ?? 2; + const budgetCeiling = opts.budgetCeiling; + + // Initialize orchestrator state + sliceState = { + active: true, + workers: new Map(), + totalCost: 0, + budgetCeiling, + maxWorkers, + startedAt: Date.now(), + basePath, + }; + + const started: string[] = []; + const errors: Array<{ sid: string; error: string }> = []; + + // Filter out conflicting slices (conservative: check all pairs) + const safeSlices = filterConflictingSlices(basePath, milestoneId, eligibleSlices); + + // Limit to maxWorkers + const toSpawn = safeSlices.slice(0, maxWorkers); + + for (const slice of toSpawn) { + try { + // Create worktree for this slice + const wtBranch = `slice/${milestoneId}/${slice.id}`; + const wtName = `${milestoneId}-${slice.id}`; + const wtPath = worktreePath(basePath, wtName); + + if (!existsSync(wtPath)) { + createWorktree(basePath, wtName, { branch: wtBranch }); + } + + // Create worker info + const worker: SliceWorkerInfo = { + milestoneId, + sliceId: slice.id, + pid: 0, + process: null, + worktreePath: wtPath, + startedAt: Date.now(), + state: "running", + completedUnits: 0, + cost: 0, + }; + + sliceState.workers.set(slice.id, worker); + + // Spawn worker + const spawned = spawnSliceWorker(basePath, milestoneId, slice.id); + if (spawned) { + started.push(slice.id); + } else { + errors.push({ sid: slice.id, error: "Failed to spawn worker process" }); + worker.state = "error"; + } + } catch (err) { + errors.push({ sid: slice.id, error: getErrorMessage(err) }); + // Best-effort cleanup of partially created worktree + const wtName = `${milestoneId}-${slice.id}`; + try { + removeWorktree(basePath, wtName, { deleteBranch: true, force: true }); + } catch { /* ignore cleanup failures */ } + } + } + + // If nothing started, deactivate + if (started.length === 0) { + sliceState.active = false; + } + + return { started, errors }; +} + +/** + * Stop all slice-parallel workers and deactivate. + */ +export function stopSliceParallel(): void { + if (!sliceState) return; + + for (const worker of sliceState.workers.values()) { + if (worker.process) { + try { + worker.process.kill("SIGTERM"); + } catch { /* already dead */ } + } + worker.cleanup?.(); + worker.cleanup = undefined; + worker.process = null; + worker.state = "stopped"; + + // Clean up worktree created for this worker + const wtName = `${worker.milestoneId}-${worker.sliceId}`; + try { + removeWorktree(sliceState.basePath, wtName, { deleteBranch: true, force: true }); + } catch { /* best-effort cleanup */ } + } + + sliceState.active = false; +} + +/** + * Get aggregate cost across all slice workers. + */ +export function getSliceAggregateCost(): number { + if (!sliceState) return 0; + let total = 0; + for (const w of sliceState.workers.values()) { + total += w.cost; + } + return total; +} + +/** + * Check if budget ceiling has been exceeded. + */ +export function isSliceBudgetExceeded(): boolean { + if (!sliceState?.budgetCeiling) return false; + return getSliceAggregateCost() >= sliceState.budgetCeiling; +} + +/** + * Reset module state (for testing). + */ +export function resetSliceOrchestrator(): void { + if (sliceState) { + for (const w of sliceState.workers.values()) { + w.cleanup?.(); + } + } + sliceState = null; +} + +// ─── Internal: Conflict Filtering ────────────────────────────────────────── + +/** + * Remove slices that have file conflicts with each other. + * Greedy: add slices to the safe set in order; skip any that conflict + * with an already-included slice. + */ +function filterConflictingSlices( + basePath: string, + milestoneId: string, + slices: Array<{ id: string }>, +): Array<{ id: string }> { + const safe: Array<{ id: string }> = []; + + for (const candidate of slices) { + let conflictsWithSafe = false; + for (const existing of safe) { + if (hasFileConflict(basePath, milestoneId, candidate.id, existing.id)) { + conflictsWithSafe = true; + break; + } + } + if (!conflictsWithSafe) { + safe.push(candidate); + } + } + + return safe; +} + +// ─── Internal: Worker Spawning ───────────────────────────────────────────── + +/** + * Resolve the GSD CLI binary path. + * Same logic as parallel-orchestrator.ts resolveGsdBin(). + */ +function resolveGsdBin(): string | null { + if (process.env.GSD_BIN_PATH && existsSync(process.env.GSD_BIN_PATH)) { + return process.env.GSD_BIN_PATH; + } + + let thisDir: string; + try { + thisDir = dirname(fileURLToPath(import.meta.url)); + } catch { + thisDir = process.cwd(); + } + const candidates = [ + join(thisDir, "..", "..", "..", "loader.js"), + join(thisDir, "..", "..", "..", "..", "dist", "loader.js"), + ]; + for (const candidate of candidates) { + if (existsSync(candidate)) return candidate; + } + + return null; +} + +/** + * Spawn a worker process for a slice. + * The worker runs `gsd --mode json --print "/gsd auto"` in the slice's worktree + * with GSD_SLICE_LOCK, GSD_MILESTONE_LOCK, and GSD_PARALLEL_WORKER set. + */ +function spawnSliceWorker( + basePath: string, + milestoneId: string, + sliceId: string, +): boolean { + if (!sliceState) return false; + const worker = sliceState.workers.get(sliceId); + if (!worker) return false; + if (worker.process) return true; + + const binPath = resolveGsdBin(); + if (!binPath) return false; + + let child: ChildProcess; + try { + child = spawn(process.execPath, [binPath, "--mode", "json", "--print", "/gsd auto"], { + cwd: worker.worktreePath, + env: { + ...process.env, + GSD_SLICE_LOCK: sliceId, + GSD_MILESTONE_LOCK: milestoneId, + GSD_PROJECT_ROOT: basePath, + GSD_PARALLEL_WORKER: "1", + }, + stdio: ["ignore", "pipe", "pipe"], + detached: false, + }); + } catch { + return false; + } + + child.on("error", () => { + if (!sliceState) return; + const w = sliceState.workers.get(sliceId); + if (w) { + w.process = null; + } + }); + + worker.process = child; + worker.pid = child.pid ?? 0; + + if (!child.pid) { + worker.process = null; + return false; + } + + // ── NDJSON stdout monitoring ──────────────────────────────────────── + if (child.stdout) { + let stdoutBuffer = ""; + child.stdout.on("data", (data: Buffer) => { + stdoutBuffer += data.toString(); + const lines = stdoutBuffer.split("\n"); + stdoutBuffer = lines.pop() || ""; + for (const line of lines) { + processSliceWorkerLine(basePath, milestoneId, sliceId, line); + } + }); + child.stdout.on("close", () => { + if (stdoutBuffer.trim()) { + processSliceWorkerLine(basePath, milestoneId, sliceId, stdoutBuffer); + } + }); + } + + if (child.stderr) { + child.stderr.on("data", (data: Buffer) => { + appendSliceWorkerLog(basePath, milestoneId, sliceId, data.toString()); + }); + } + + // Update session status + writeSessionStatus(basePath, { + milestoneId: `${milestoneId}/${sliceId}`, + pid: worker.pid, + state: "running", + currentUnit: null, + completedUnits: worker.completedUnits, + cost: worker.cost, + lastHeartbeat: Date.now(), + startedAt: worker.startedAt, + worktreePath: worker.worktreePath, + }); + + // Store cleanup function + worker.cleanup = () => { + child.stdout?.removeAllListeners(); + child.stderr?.removeAllListeners(); + child.removeAllListeners(); + }; + + // Handle worker exit + child.on("exit", (code) => { + if (!sliceState) return; + const w = sliceState.workers.get(sliceId); + if (!w) return; + + w.cleanup?.(); + w.cleanup = undefined; + w.process = null; + + if (w.state === "stopped") return; + + if (code === 0) { + w.state = "stopped"; + } else { + w.state = "error"; + appendSliceWorkerLog(basePath, milestoneId, sliceId, + `\n[slice-orchestrator] worker exited with code ${code ?? "null"}\n`); + } + + writeSessionStatus(basePath, { + milestoneId: `${milestoneId}/${sliceId}`, + pid: w.pid, + state: w.state, + currentUnit: null, + completedUnits: w.completedUnits, + cost: w.cost, + lastHeartbeat: Date.now(), + startedAt: w.startedAt, + worktreePath: w.worktreePath, + }); + }); + + return true; +} + +// ─── NDJSON Processing ────────────────────────────────────────────────────── + +/** + * Process a single NDJSON line from a slice worker's stdout. + * Extracts cost from message_end events. + */ +function processSliceWorkerLine( + _basePath: string, + _milestoneId: string, + sliceId: string, + line: string, +): void { + if (!line.trim() || !sliceState) return; + + let event: Record; + try { + event = JSON.parse(line); + } catch { + return; + } + + const type = String(event.type ?? ""); + if (type === "message_end") { + const worker = sliceState.workers.get(sliceId); + if (worker) { + const usage = event.usage as Record | undefined; + if (usage?.cost && typeof usage.cost === "number") { + worker.cost += usage.cost; + sliceState.totalCost += usage.cost; + } + worker.completedUnits++; + } + } +} + +// ─── Logging ──────────────────────────────────────────────────────────────── + +function sliceLogDir(basePath: string): string { + return join(gsdRoot(basePath), "parallel", "slice-logs"); +} + +function appendSliceWorkerLog( + basePath: string, + milestoneId: string, + sliceId: string, + text: string, +): void { + const dir = sliceLogDir(basePath); + mkdirSync(dir, { recursive: true }); + appendFileSync(join(dir, `${milestoneId}-${sliceId}.log`), text); +} diff --git a/src/resources/extensions/gsd/state.ts b/src/resources/extensions/gsd/state.ts index 7f2c4067e..bc41ab6ed 100644 --- a/src/resources/extensions/gsd/state.ts +++ b/src/resources/extensions/gsd/state.ts @@ -42,8 +42,8 @@ import { nativeBatchParseGsdFiles, type BatchParsedFile } from './native-parser- import { join, resolve } from 'path'; import { existsSync, readdirSync, readFileSync } from 'node:fs'; import { debugCount, debugTime } from './debug-logger.js'; -import { extractVerdict } from './verdict-parser.js'; import { logWarning, logError } from './workflow-logger.js'; +import { extractVerdict } from './verdict-parser.js'; import { isDbAvailable, @@ -675,16 +675,39 @@ export async function deriveStateFromDb(basePath: string): Promise { let activeSlice: ActiveRef | null = null; let activeSliceRow: SliceRow | null = null; - for (const s of activeMilestoneSlices) { - if (isStatusDone(s.status)) continue; - // #2661: Skip deferred slices — a decision explicitly deferred this work. - // Without this guard the dispatcher would keep dispatching deferred slices - // because DECISIONS.md is only contextual, not authoritative for dispatch. - if (isDeferredStatus(s.status)) continue; - if (s.depends.every(dep => doneSliceIds.has(dep))) { - activeSlice = { id: s.id, title: s.title }; - activeSliceRow = s; - break; + // ── Slice-level parallel worker isolation ───────────────────────────── + // When GSD_SLICE_LOCK is set, this process is a parallel worker scoped + // to a single slice. Override activeSlice to only the locked slice ID. + const sliceLock = process.env.GSD_SLICE_LOCK; + if (sliceLock) { + const lockedSlice = activeMilestoneSlices.find(s => s.id === sliceLock); + if (lockedSlice) { + activeSlice = { id: lockedSlice.id, title: lockedSlice.title }; + activeSliceRow = lockedSlice; + } else { + logWarning("state", `GSD_SLICE_LOCK=${sliceLock} not found in active slices — worker has no assigned work`); + // Don't silently continue — this is a dispatch error + return { + activeMilestone, activeSlice: null, activeTask: null, + phase: 'blocked', + recentDecisions: [], blockers: [`GSD_SLICE_LOCK=${sliceLock} not found in active milestone slices`], + nextAction: 'Slice lock references a non-existent slice — check orchestrator dispatch.', + registry, requirements, + progress: { milestones: milestoneProgress, slices: sliceProgress }, + }; + } + } else { + for (const s of activeMilestoneSlices) { + if (isStatusDone(s.status)) continue; + // #2661: Skip deferred slices — a decision explicitly deferred this work. + // Without this guard the dispatcher would keep dispatching deferred slices + // because DECISIONS.md is only contextual, not authoritative for dispatch. + if (isDeferredStatus(s.status)) continue; + if (s.depends.every(dep => doneSliceIds.has(dep))) { + activeSlice = { id: s.id, title: s.title }; + activeSliceRow = s; + break; + } } } @@ -1325,11 +1348,38 @@ export async function _deriveStateImpl(basePath: string): Promise { const doneSliceIds = new Set(activeRoadmap.slices.filter(s => s.done).map(s => s.id)); let activeSlice: ActiveRef | null = null; - for (const s of activeRoadmap.slices) { - if (s.done) continue; - if (s.depends.every(dep => doneSliceIds.has(dep))) { - activeSlice = { id: s.id, title: s.title }; - break; + // ── Slice-level parallel worker isolation ───────────────────────────── + // When GSD_SLICE_LOCK is set, override activeSlice to only the locked slice. + const sliceLockLegacy = process.env.GSD_SLICE_LOCK; + if (sliceLockLegacy) { + const lockedSlice = activeRoadmap.slices.find(s => s.id === sliceLockLegacy); + if (lockedSlice) { + activeSlice = { id: lockedSlice.id, title: lockedSlice.title }; + } else { + logWarning("state", `GSD_SLICE_LOCK=${sliceLockLegacy} not found in active slices — worker has no assigned work`); + return { + activeMilestone, + activeSlice: null, + activeTask: null, + phase: 'blocked', + recentDecisions: [], + blockers: [`GSD_SLICE_LOCK=${sliceLockLegacy} not found in active milestone slices`], + nextAction: 'Slice lock references a non-existent slice — check orchestrator dispatch.', + registry, + requirements, + progress: { + milestones: milestoneProgress, + slices: sliceProgress, + }, + }; + } + } else { + for (const s of activeRoadmap.slices) { + if (s.done) continue; + if (s.depends.every(dep => doneSliceIds.has(dep))) { + activeSlice = { id: s.id, title: s.title }; + break; + } } } diff --git a/src/resources/extensions/gsd/tests/slice-parallel-conflict.test.ts b/src/resources/extensions/gsd/tests/slice-parallel-conflict.test.ts new file mode 100644 index 000000000..375774215 --- /dev/null +++ b/src/resources/extensions/gsd/tests/slice-parallel-conflict.test.ts @@ -0,0 +1,92 @@ +/** + * Tests for slice-level parallel conflict detection. + * Verifies hasFileConflict() correctly identifies when two slices + * touch too many of the same files to safely run in parallel. + */ + +import { describe, it, beforeEach, afterEach } from "node:test"; +import assert from "node:assert/strict"; +import { mkdtempSync, mkdirSync, rmSync, writeFileSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +import { hasFileConflict } from "../slice-parallel-conflict.js"; + +// ─── Helpers ────────────────────────────────────────────────────────────────── + +function makeTmpBase(): string { + const base = mkdtempSync(join(tmpdir(), "gsd-slice-conflict-test-")); + mkdirSync(join(base, ".gsd"), { recursive: true }); + return base; +} + +function writeSlicePlan(base: string, mid: string, sid: string, content: string): void { + const dir = join(base, ".gsd", "milestones", mid, sid); + mkdirSync(dir, { recursive: true }); + writeFileSync(join(dir, "PLAN.md"), content, "utf-8"); +} + +describe("hasFileConflict", () => { + let base: string; + + beforeEach(() => { + base = makeTmpBase(); + }); + + afterEach(() => { + rmSync(base, { recursive: true, force: true }); + }); + + it("two slices with >5 overlapping file paths → blocked (true)", () => { + const planA = `# Plan S01 +## Tasks +- T01: Update src/auth/login.ts +- T02: Update src/auth/register.ts +- T03: Update src/auth/session.ts +- T04: Update src/auth/middleware.ts +- T05: Update src/auth/types.ts +- T06: Update src/auth/utils.ts +`; + const planB = `# Plan S02 +## Tasks +- T01: Refactor src/auth/login.ts +- T02: Refactor src/auth/register.ts +- T03: Refactor src/auth/session.ts +- T04: Refactor src/auth/middleware.ts +- T05: Refactor src/auth/types.ts +- T06: Refactor src/auth/utils.ts +`; + writeSlicePlan(base, "M001", "S01", planA); + writeSlicePlan(base, "M001", "S02", planB); + assert.equal(hasFileConflict(base, "M001", "S01", "S02"), true); + }); + + it("two slices with 0 overlapping paths → allowed (false)", () => { + const planA = `# Plan S01 +## Tasks +- T01: Create src/api/routes.ts +- T02: Create src/api/handlers.ts +`; + const planB = `# Plan S02 +## Tasks +- T01: Create src/ui/components.ts +- T02: Create src/ui/styles.ts +`; + writeSlicePlan(base, "M001", "S01", planA); + writeSlicePlan(base, "M001", "S02", planB); + assert.equal(hasFileConflict(base, "M001", "S01", "S02"), false); + }); + + it("missing PLAN.md → conservative block (true)", () => { + // Only create one slice's plan + writeSlicePlan(base, "M001", "S01", "# Plan\n- T01: src/foo.ts"); + // S02 has no plan at all + assert.equal(hasFileConflict(base, "M001", "S01", "S02"), true); + }); + + it("one slice empty plan → allowed (false)", () => { + writeSlicePlan(base, "M001", "S01", "# Plan S01\n## Tasks\n- T01: Create src/foo.ts"); + writeSlicePlan(base, "M001", "S02", "# Plan S02\n## Tasks\n(no tasks yet)"); + assert.equal(hasFileConflict(base, "M001", "S01", "S02"), false); + }); +}); diff --git a/src/resources/extensions/gsd/tests/slice-parallel-eligibility.test.ts b/src/resources/extensions/gsd/tests/slice-parallel-eligibility.test.ts new file mode 100644 index 000000000..9beded51e --- /dev/null +++ b/src/resources/extensions/gsd/tests/slice-parallel-eligibility.test.ts @@ -0,0 +1,95 @@ +/** + * Tests for slice-level parallel eligibility. + * Verifies getEligibleSlices() correctly determines which slices + * can run in parallel based on dependency satisfaction. + */ + +import { describe, it } from "node:test"; +import assert from "node:assert/strict"; + +import { getEligibleSlices } from "../slice-parallel-eligibility.js"; + +describe("getEligibleSlices", () => { + it("diamond DAG: S01 done, S02 depends:[S01], S03 depends:[S01] → both eligible", () => { + const slices = [ + { id: "S01", done: true, depends: [] }, + { id: "S02", done: false, depends: ["S01"] }, + { id: "S03", done: false, depends: ["S01"] }, + ]; + const completed = new Set(["S01"]); + const result = getEligibleSlices(slices, completed); + const ids = result.map(s => s.id); + assert.deepStrictEqual(ids.sort(), ["S02", "S03"]); + }); + + it("linear chain: S01→S02→S03, only S01 done → only S02 eligible", () => { + const slices = [ + { id: "S01", done: true, depends: [] }, + { id: "S02", done: false, depends: ["S01"] }, + { id: "S03", done: false, depends: ["S02"] }, + ]; + const completed = new Set(["S01"]); + const result = getEligibleSlices(slices, completed); + assert.equal(result.length, 1); + assert.equal(result[0].id, "S02"); + }); + + it("no deps declared: S01 done, S02 no deps, S03 no deps → only S02 eligible (positional fallback)", () => { + const slices = [ + { id: "S01", done: true, depends: [] }, + { id: "S02", done: false, depends: [] }, + { id: "S03", done: false, depends: [] }, + ]; + const completed = new Set(["S01"]); + const result = getEligibleSlices(slices, completed); + // Positional fallback: when no deps declared, only the first non-done slice + // after all positionally-earlier slices are done is eligible + assert.equal(result.length, 1); + assert.equal(result[0].id, "S02"); + }); + + it("all done: empty result", () => { + const slices = [ + { id: "S01", done: true, depends: [] }, + { id: "S02", done: true, depends: ["S01"] }, + { id: "S03", done: true, depends: ["S02"] }, + ]; + const completed = new Set(["S01", "S02", "S03"]); + const result = getEligibleSlices(slices, completed); + assert.equal(result.length, 0); + }); + + it("empty input: empty result", () => { + const result = getEligibleSlices([], new Set()); + assert.equal(result.length, 0); + }); + + it("mixed deps and no-deps: only dep-satisfied slices with explicit deps are eligible alongside positional", () => { + const slices = [ + { id: "S01", done: true, depends: [] }, + { id: "S02", done: false, depends: ["S01"] }, // explicit dep satisfied + { id: "S03", done: false, depends: [] }, // no deps, positional fallback + { id: "S04", done: false, depends: ["S01"] }, // explicit dep satisfied + ]; + const completed = new Set(["S01"]); + const result = getEligibleSlices(slices, completed); + const ids = result.map(s => s.id); + // S02 and S04 have explicit deps satisfied; S03 has no deps but + // positionally S02 (before it) is not done, so S03 is blocked by positional rule + assert.ok(ids.includes("S02"), "S02 should be eligible (dep on S01 satisfied)"); + assert.ok(ids.includes("S04"), "S04 should be eligible (dep on S01 satisfied)"); + }); + + it("unsatisfied dependency blocks slice", () => { + const slices = [ + { id: "S01", done: false, depends: [] }, + { id: "S02", done: false, depends: ["S01"] }, + ]; + const completed = new Set(); + const result = getEligibleSlices(slices, completed); + // S01 has no deps and is first → eligible by positional + // S02 depends on S01 which is not completed → blocked + assert.equal(result.length, 1); + assert.equal(result[0].id, "S01"); + }); +}); diff --git a/src/resources/extensions/gsd/tests/slice-parallel-orchestrator.test.ts b/src/resources/extensions/gsd/tests/slice-parallel-orchestrator.test.ts new file mode 100644 index 000000000..8592f2c39 --- /dev/null +++ b/src/resources/extensions/gsd/tests/slice-parallel-orchestrator.test.ts @@ -0,0 +1,83 @@ +/** + * Structural tests for slice-level parallel orchestrator. + * Verifies the orchestrator module exists and has the correct shape, + * env var usage, and preference gating. + */ + +import { describe, it } from "node:test"; +import assert from "node:assert/strict"; +import { readFileSync } from "node:fs"; +import { join, dirname } from "node:path"; +import { fileURLToPath } from "node:url"; + +const __dirname = dirname(fileURLToPath(import.meta.url)); +const gsdDir = join(__dirname, ".."); + +describe("slice-parallel-orchestrator structural tests", () => { + it("orchestrator uses GSD_SLICE_LOCK env var", () => { + const source = readFileSync(join(gsdDir, "slice-parallel-orchestrator.ts"), "utf-8"); + assert.ok( + source.includes("GSD_SLICE_LOCK"), + "Orchestrator must use GSD_SLICE_LOCK env var to isolate slice workers", + ); + }); + + it("orchestrator sets GSD_PARALLEL_WORKER=1 to prevent nesting", () => { + const source = readFileSync(join(gsdDir, "slice-parallel-orchestrator.ts"), "utf-8"); + assert.ok( + source.includes("GSD_PARALLEL_WORKER"), + "Orchestrator must set GSD_PARALLEL_WORKER to prevent nested parallel", + ); + }); + + it("maxWorkers default is 2", () => { + const source = readFileSync(join(gsdDir, "slice-parallel-orchestrator.ts"), "utf-8"); + // Check that default max workers is 2 (in opts.maxWorkers ?? 2 or similar) + assert.ok( + source.includes("maxWorkers") && source.includes("2"), + "Default maxWorkers should be 2", + ); + }); + + it("orchestrator imports GSD_MILESTONE_LOCK for milestone isolation", () => { + const source = readFileSync(join(gsdDir, "slice-parallel-orchestrator.ts"), "utf-8"); + assert.ok( + source.includes("GSD_MILESTONE_LOCK"), + "Orchestrator must also pass GSD_MILESTONE_LOCK for milestone context", + ); + }); +}); + +describe("slice_parallel preference gating", () => { + it("preferences-types.ts includes slice_parallel in interface", () => { + const source = readFileSync(join(gsdDir, "preferences-types.ts"), "utf-8"); + assert.ok( + source.includes("slice_parallel"), + "GSDPreferences should have slice_parallel field", + ); + }); + + it("slice_parallel is in KNOWN_PREFERENCE_KEYS", () => { + const source = readFileSync(join(gsdDir, "preferences-types.ts"), "utf-8"); + assert.ok( + source.includes('"slice_parallel"'), + 'KNOWN_PREFERENCE_KEYS should include "slice_parallel"', + ); + }); + + it("state.ts checks GSD_SLICE_LOCK for slice isolation", () => { + const source = readFileSync(join(gsdDir, "state.ts"), "utf-8"); + assert.ok( + source.includes("GSD_SLICE_LOCK"), + "State derivation should check GSD_SLICE_LOCK for slice-level parallel isolation", + ); + }); + + it("auto.ts imports slice parallel orchestrator when enabled", () => { + const source = readFileSync(join(gsdDir, "auto.ts"), "utf-8"); + assert.ok( + source.includes("slice_parallel") || source.includes("slice-parallel"), + "auto.ts should reference slice_parallel for dispatch gating", + ); + }); +});