Merge pull request #719 from jeremymcs/feat/672-parallel-milestone-orchestration

feat: parallel milestone orchestration foundation (#672)
This commit is contained in:
Lex Christopherson 2026-03-16 20:32:54 -06:00
commit 28c48e0a40
25 changed files with 3046 additions and 6 deletions

View file

@ -0,0 +1,17 @@
# Issue #672: Parallel Milestone Orchestration
**Issue:** https://github.com/gsd-build/gsd-2/issues/672
**Contributor:** @deseltrus (7 merged PRs, proven contributor)
**Status:** WIP — foundation modules built, orchestrator core in progress
**Default:** `parallel.enabled: false` — opt-in, zero impact to existing users
## Delivery Plan (6 PRs)
### PR 1: Worktree Bugfixes - MERGED (#675)
### PR 2: Dispatch Hardening (Small) - pending contributor
### PR 3: Parallel Config + Preferences (Small) - included in this PR
### PR 4: Session Status Protocol (Medium) - included in this PR
### PR 5: Orchestrator Core (Large) - included in this PR
### PR 6: Dashboard + Commands (Medium) - commands included, dashboard deferred
See full plan in the GitHub issue comment.

View file

@ -17,6 +17,7 @@ Welcome to the GSD documentation. This covers everything from getting started to
| [Workflow Visualizer](./visualizer.md) | Interactive TUI overlay for progress, dependencies, metrics, and timeline (v2.19) |
| [Cost Management](./cost-management.md) | Budget ceilings, cost tracking, projections, and enforcement modes |
| [Git Strategy](./git-strategy.md) | Worktree isolation, branching model, and merge behavior |
| [Parallel Orchestration](./parallel-orchestration.md) | Run multiple milestones simultaneously with worker isolation and coordination |
| [Working in Teams](./working-in-teams.md) | Unique milestone IDs, `.gitignore` setup, and shared planning artifacts |
| [Skills](./skills.md) | Bundled skills, skill discovery, and custom skill authoring |
| [Migration from v1](./migration.md) | Migrating `.planning` directories from the original GSD |

View file

@ -51,6 +51,10 @@ GSD isolates milestone work using one of three modes (configured via `git.isolat
See [Git Strategy](./git-strategy.md) for details.
### Parallel Execution
When your project has independent milestones, you can run them simultaneously. Each milestone gets its own worker process and worktree. See [Parallel Orchestration](./parallel-orchestration.md) for setup and usage.
### Crash Recovery
A lock file tracks the current unit. If the session dies, the next `/gsd auto` reads the surviving session file, synthesizes a recovery briefing from every tool call that made it to disk, and resumes with full context.

View file

@ -34,6 +34,19 @@
| `/gsd run-hook` | Manually trigger a specific hook |
| `/gsd migrate` | Migrate a v1 `.planning` directory to `.gsd` format |
## Parallel Orchestration
| Command | Description |
|---------|-------------|
| `/gsd parallel start` | Analyze eligibility, confirm, and start workers |
| `/gsd parallel status` | Show all workers with state, progress, and cost |
| `/gsd parallel stop [MID]` | Stop all workers or a specific milestone's worker |
| `/gsd parallel pause [MID]` | Pause all workers or a specific one |
| `/gsd parallel resume [MID]` | Resume paused workers |
| `/gsd parallel merge [MID]` | Merge completed milestones back to main |
See [Parallel Orchestration](./parallel-orchestration.md) for full documentation.
## Git Commands
| Command | Description |

View file

@ -389,6 +389,21 @@ auto_visualize: true
See [Workflow Visualizer](./visualizer.md).
### `parallel`
Run multiple milestones simultaneously. Disabled by default.
```yaml
parallel:
enabled: false # Master toggle
max_workers: 2 # Concurrent workers (1-4)
budget_ceiling: 50.00 # Aggregate cost limit in USD
merge_strategy: "per-milestone" # "per-slice" or "per-milestone"
auto_merge: "confirm" # "auto", "confirm", or "manual"
```
See [Parallel Orchestration](./parallel-orchestration.md) for full documentation.
## Full Example
```yaml

View file

@ -48,6 +48,26 @@ In **branch mode**, the flow is the same except work happens in the project root
In **none mode**, commits land directly on the current branch — no milestone branch is created, and no merge step is needed.
### Parallel Worktrees
With [parallel orchestration](./parallel-orchestration.md) enabled, multiple milestones run in separate worktrees simultaneously:
```
main ──────────────────────────────────────────────────────────
│ ↑ ↑
├── milestone/M002 (worktree) ─────────┘ │
│ commit: feat(S01/T01): auth types │
│ commit: feat(S01/T02): JWT middleware │
│ → squash-merged first │
│ │
└── milestone/M003 (worktree) ────────────────────────┘
commit: feat(S01/T01): dashboard layout
commit: feat(S01/T02): chart components
→ squash-merged second
```
Each worktree operates on its own branch with its own commit history. Merges happen sequentially to avoid conflicts.
### Key Properties
- **Sequential commits on one branch** — no per-slice branches, no merge conflicts within a milestone

View file

@ -0,0 +1,307 @@
# Parallel Milestone Orchestration
Run multiple milestones simultaneously in isolated git worktrees. Each milestone gets its own worker process, its own branch, and its own context window — while a coordinator tracks progress, enforces budgets, and keeps everything in sync.
> **Status:** Behind `parallel.enabled: false` by default. Opt-in only — zero impact to existing users.
## Quick Start
1. Enable parallel mode in your preferences:
```yaml
---
parallel:
enabled: true
max_workers: 2
---
```
2. Start parallel execution:
```
/gsd parallel start
```
GSD scans your milestones, checks dependencies and file overlap, shows an eligibility report, and spawns workers for eligible milestones.
3. Monitor progress:
```
/gsd parallel status
```
4. Stop when done:
```
/gsd parallel stop
```
## How It Works
### Architecture
```
┌─────────────────────────────────────────────────────────┐
│ Coordinator (your GSD session) │
│ │
│ Responsibilities: │
│ - Eligibility analysis (deps + file overlap) │
│ - Worker spawning and lifecycle │
│ - Budget tracking across all workers │
│ - Signal dispatch (pause/resume/stop) │
│ - Session status monitoring │
│ - Merge reconciliation │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Worker 1 │ │ Worker 2 │ │ Worker 3 │ ... │
│ │ M001 │ │ M003 │ │ M005 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ .gsd/worktrees/ .gsd/worktrees/ .gsd/worktrees/ │
│ M001/ M003/ M005/ │
│ (milestone/ (milestone/ (milestone/ │
│ M001 branch) M003 branch) M005 branch) │
└─────────────────────────────────────────────────────────┘
```
### Worker Isolation
Each worker is a separate `gsd` process with complete isolation:
| Resource | Isolation Method |
|----------|-----------------|
| **Filesystem** | Git worktree — each worker has its own checkout |
| **Git branch** | `milestone/<MID>` — one branch per milestone |
| **State derivation** | `GSD_MILESTONE_LOCK` env var — `deriveState()` only sees the assigned milestone |
| **Context window** | Separate process — each worker has its own agent sessions |
| **Metrics** | Each worktree has its own `.gsd/metrics.json` |
| **Crash recovery** | Each worktree has its own `.gsd/auto.lock` |
### Coordination
Workers and the coordinator communicate through file-based IPC:
- **Session status files** (`.gsd/parallel/<MID>.status.json`) — workers write heartbeats, the coordinator reads them
- **Signal files** (`.gsd/parallel/<MID>.signal.json`) — coordinator writes signals, workers consume them
- **Atomic writes** — write-to-temp + rename prevents partial reads
## Eligibility Analysis
Before starting parallel execution, GSD checks which milestones can safely run concurrently.
### Rules
1. **Not complete** — Finished milestones are skipped
2. **Dependencies satisfied** — All `dependsOn` entries must have status `complete`
3. **File overlap check** — Milestones touching the same files get a warning (but are still eligible)
### Example Report
```
# Parallel Eligibility Report
## Eligible for Parallel Execution (2)
- **M002** — Auth System
All dependencies satisfied.
- **M003** — Dashboard UI
All dependencies satisfied.
## Ineligible (2)
- **M001** — Core Types
Already complete.
- **M004** — API Integration
Blocked by incomplete dependencies: M002.
## File Overlap Warnings (1)
- **M002** <-> **M003** — 2 shared file(s):
- `src/types.ts`
- `src/middleware.ts`
```
File overlaps are warnings, not blockers. Both milestones work in separate worktrees, so they won't interfere at the filesystem level. Conflicts are detected and resolved during merge.
## Configuration
Add to `~/.gsd/preferences.md` or `.gsd/preferences.md`:
```yaml
---
parallel:
enabled: false # Master toggle (default: false)
max_workers: 2 # Concurrent workers (1-4, default: 2)
budget_ceiling: 50.00 # Aggregate cost limit in dollars (optional)
merge_strategy: "per-milestone" # When to merge: "per-slice" or "per-milestone"
auto_merge: "confirm" # "auto", "confirm", or "manual"
---
```
### Configuration Reference
| Key | Type | Default | Description |
|-----|------|---------|-------------|
| `enabled` | boolean | `false` | Master toggle. Must be `true` for `/gsd parallel` commands to work. |
| `max_workers` | number (1-4) | `2` | Maximum concurrent worker processes. Higher values use more memory and API budget. |
| `budget_ceiling` | number | none | Aggregate cost ceiling in USD across all workers. When reached, no new units are dispatched. |
| `merge_strategy` | `"per-slice"` or `"per-milestone"` | `"per-milestone"` | When worktree changes merge back to main. Per-milestone waits for the full milestone to complete. |
| `auto_merge` | `"auto"`, `"confirm"`, `"manual"` | `"confirm"` | How merge-back is handled. `confirm` prompts before merging. `manual` requires explicit `/gsd parallel merge`. |
## Commands
| Command | Description |
|---------|-------------|
| `/gsd parallel start` | Analyze eligibility, confirm, and start workers |
| `/gsd parallel status` | Show all workers with state, units completed, and cost |
| `/gsd parallel stop` | Stop all workers (sends SIGTERM) |
| `/gsd parallel stop M002` | Stop a specific milestone's worker |
| `/gsd parallel pause` | Pause all workers (finish current unit, then wait) |
| `/gsd parallel pause M002` | Pause a specific worker |
| `/gsd parallel resume` | Resume all paused workers |
| `/gsd parallel resume M002` | Resume a specific worker |
| `/gsd parallel merge` | Merge all completed milestones back to main |
| `/gsd parallel merge M002` | Merge a specific milestone back to main |
## Signal Lifecycle
The coordinator communicates with workers through signals:
```
Coordinator Worker
│ │
├── sendSignal("pause") ──→ │
│ ├── consumeSignal()
│ ├── pauseAuto()
│ │ (finish current unit, wait)
│ │
├── sendSignal("resume") ─→ │
│ ├── consumeSignal()
│ ├── resume dispatch loop
│ │
├── sendSignal("stop") ───→ │
│ + SIGTERM ────────────→ │
│ ├── consumeSignal() or SIGTERM handler
│ ├── stopAuto()
│ └── process exits
```
Workers check for signals between units (in `handleAgentEnd`). The coordinator also sends `SIGTERM` for immediate response on stop.
## Merge Reconciliation
When milestones complete, their worktree changes need to merge back to main.
### Merge Order
- **Sequential** (default): Milestones merge in ID order (M001 before M002)
- **By-completion**: Milestones merge in the order they finish
### Conflict Handling
1. `.gsd/` state files (STATE.md, metrics.json, etc.) — **auto-resolved** by accepting the milestone branch version
2. Code conflicts — **stop and report**. The merge halts, showing which files conflict. Resolve manually and retry with `/gsd parallel merge <MID>`.
### Example
```
/gsd parallel merge
# Merge Results
- **M002** — merged successfully (pushed)
- **M003** — CONFLICT (2 file(s)):
- `src/types.ts`
- `src/middleware.ts`
Resolve conflicts manually and run `/gsd parallel merge M003` to retry.
```
## Budget Management
When `budget_ceiling` is set, the coordinator tracks aggregate cost across all workers:
- Cost is summed from each worker's session status
- When the ceiling is reached, the coordinator signals workers to stop
- Each worker also respects the project-level `budget_ceiling` preference independently
## Health Monitoring
### Doctor Integration
`/gsd doctor` detects parallel session issues:
- **Stale parallel sessions** — Worker process died without cleanup. Doctor finds `.gsd/parallel/*.status.json` files with dead PIDs or expired heartbeats and removes them.
Run `/gsd doctor --fix` to clean up automatically.
### Stale Detection
Sessions are considered stale when:
- The worker PID is no longer running (checked via `process.kill(pid, 0)`)
- The last heartbeat is older than 30 seconds
The coordinator runs stale detection during `refreshWorkerStatuses()` and automatically removes dead sessions.
## Safety Model
| Safety Layer | Protection |
|-------------|------------|
| **Feature flag** | `parallel.enabled: false` by default — existing users unaffected |
| **Eligibility analysis** | Dependency and file overlap checks before starting |
| **Worker isolation** | Separate processes, worktrees, branches, context windows |
| **`GSD_MILESTONE_LOCK`** | Each worker only sees its milestone in state derivation |
| **`GSD_PARALLEL_WORKER`** | Workers cannot spawn nested parallel sessions |
| **Budget ceiling** | Aggregate cost enforcement across all workers |
| **Signal-based shutdown** | Graceful stop via file signals + SIGTERM |
| **Doctor integration** | Detects and cleans up orphaned sessions |
| **Conflict-aware merge** | Stops on code conflicts, auto-resolves `.gsd/` state conflicts |
## File Layout
```
.gsd/
├── parallel/ # Coordinator ↔ worker IPC
│ ├── M002.status.json # Worker heartbeat + progress
│ ├── M002.signal.json # Coordinator → worker signals
│ ├── M003.status.json
│ └── M003.signal.json
├── worktrees/ # Git worktrees (one per milestone)
│ ├── M002/ # M002's isolated checkout
│ │ ├── .gsd/ # M002's own state files
│ │ │ ├── auto.lock
│ │ │ ├── metrics.json
│ │ │ └── milestones/
│ │ └── src/ # M002's working copy
│ └── M003/
│ └── ...
└── ...
```
Both `.gsd/parallel/` and `.gsd/worktrees/` are gitignored — they're runtime-only coordination files that never get committed.
## Troubleshooting
### "Parallel mode is not enabled"
Set `parallel.enabled: true` in your preferences file.
### "No milestones are eligible for parallel execution"
All milestones are either complete or blocked by dependencies. Check `/gsd queue` to see milestone status and dependency chains.
### Worker crashed — how to recover
1. Run `/gsd doctor --fix` to clean up stale sessions
2. Run `/gsd parallel status` to see current state
3. Re-run `/gsd parallel start` to spawn new workers for remaining milestones
### Merge conflicts after parallel completion
1. Run `/gsd parallel merge` to see which milestones have conflicts
2. Resolve conflicts in the worktree at `.gsd/worktrees/<MID>/`
3. Retry with `/gsd parallel merge <MID>`
### Workers seem stuck
Check if budget ceiling was reached: `/gsd parallel status` shows per-worker costs. Increase `parallel.budget_ceiling` or remove it to continue.

View file

@ -108,6 +108,7 @@ import {
autoWorktreeBranch,
} from "./auto-worktree.js";
import { pruneQueueOrder } from "./queue-order.js";
import { consumeSignal } from "./session-status-io.js";
import { showNextAction } from "../shared/next-action-ui.js";
import { debugLog, debugTime, debugCount, debugPeak, enableDebug, isDebugEnabled, writeDebugSummary, getDebugLogPath } from "./debug-logger.js";
import {
@ -361,11 +362,12 @@ let _sigtermHandler: (() => void) | null = null;
*/
const inFlightTools = new Map<string, number>();
type BudgetAlertLevel = 0 | 75 | 90 | 100;
type BudgetAlertLevel = 0 | 75 | 80 | 90 | 100;
export function getBudgetAlertLevel(budgetPct: number): BudgetAlertLevel {
if (budgetPct >= 1.0) return 100;
if (budgetPct >= 0.90) return 90;
if (budgetPct >= 0.80) return 80;
if (budgetPct >= 0.75) return 75;
return 0;
}
@ -1252,6 +1254,27 @@ export async function handleAgentEnd(
// Unit completed — clear its timeout
clearUnitTimeout();
// ── Parallel worker signal check ─────────────────────────────────────
// When running as a parallel worker (GSD_MILESTONE_LOCK set), check for
// coordinator signals before dispatching the next unit.
const milestoneLock = process.env.GSD_MILESTONE_LOCK;
if (milestoneLock) {
const signal = consumeSignal(basePath, milestoneLock);
if (signal) {
if (signal.signal === "stop") {
_handlingAgentEnd = false;
await stopAuto(ctx, pi);
return;
}
if (signal.signal === "pause") {
_handlingAgentEnd = false;
await pauseAuto(ctx, pi);
return;
}
// "resume" and "rebase" signals are handled elsewhere or no-op here
}
}
// Invalidate all caches — the unit just completed and may have
// written planning files (task summaries, roadmap checkboxes, etc.)
invalidateAllCaches();
@ -2211,6 +2234,10 @@ async function dispatchNextUnit(
lastBudgetAlertLevel = newBudgetAlertLevel;
ctx.ui.notify(`Budget 90%: ${formatCost(totalCost)} / ${formatCost(budgetCeiling)}`, "warning");
sendDesktopNotification("GSD", `Budget 90%: ${formatCost(totalCost)} / ${formatCost(budgetCeiling)}`, "warning", "budget");
} else if (newBudgetAlertLevel === 80) {
lastBudgetAlertLevel = newBudgetAlertLevel;
ctx.ui.notify(`Approaching budget ceiling — 80%: ${formatCost(totalCost)} / ${formatCost(budgetCeiling)}`, "warning");
sendDesktopNotification("GSD", `Approaching budget ceiling — 80%: ${formatCost(totalCost)} / ${formatCost(budgetCeiling)}`, "warning", "budget");
} else if (newBudgetAlertLevel === 75) {
lastBudgetAlertLevel = newBudgetAlertLevel;
ctx.ui.notify(`Budget 75%: ${formatCost(totalCost)} / ${formatCost(budgetCeiling)}`, "info");

View file

@ -42,6 +42,14 @@ import { handleQuick } from "./quick.js";
import { handleHistory } from "./history.js";
import { handleUndo } from "./undo.js";
import { handleExport } from "./export.js";
import {
isParallelActive, getOrchestratorState, getWorkerStatuses,
prepareParallelStart, startParallel, stopParallel,
pauseWorker, resumeWorker,
} from "./parallel-orchestrator.js";
import { formatEligibilityReport } from "./parallel-eligibility.js";
import { mergeAllCompleted, mergeCompletedMilestone, formatMergeResults } from "./parallel-merge.js";
import { resolveParallelConfig } from "./preferences.js";
import { nativeBranchList, nativeDetectMainBranch, nativeBranchListMerged, nativeBranchDelete, nativeForEachRef, nativeUpdateRef } from "./native-git-bridge.js";
export function dispatchDoctorHeal(pi: ExtensionAPI, scope: string | undefined, reportText: string, structuredIssues: string): void {
@ -69,13 +77,13 @@ function projectRoot(): string {
export function registerGSDCommand(pi: ExtensionAPI): void {
pi.registerCommand("gsd", {
description: "GSD — Get Shit Done: /gsd help|next|auto|stop|pause|status|visualize|queue|quick|capture|triage|dispatch|history|undo|skip|export|cleanup|mode|prefs|config|hooks|run-hook|skill-health|doctor|forensics|migrate|remote|steer|knowledge",
description: "GSD — Get Shit Done: /gsd help|next|auto|stop|pause|status|visualize|queue|quick|capture|triage|dispatch|history|undo|skip|export|cleanup|mode|prefs|config|hooks|run-hook|skill-health|doctor|forensics|migrate|remote|steer|knowledge|parallel",
getArgumentCompletions: (prefix: string) => {
const subcommands = [
"help", "next", "auto", "stop", "pause", "status", "visualize", "queue", "quick", "discuss",
"capture", "triage", "dispatch",
"history", "undo", "skip", "export", "cleanup", "mode", "prefs",
"config", "hooks", "run-hook", "skill-health", "doctor", "forensics", "migrate", "remote", "steer", "inspect", "knowledge",
"config", "hooks", "run-hook", "skill-health", "doctor", "forensics", "migrate", "remote", "steer", "inspect", "knowledge", "parallel",
];
const parts = prefix.trim().split(/\s+/);
@ -99,6 +107,13 @@ export function registerGSDCommand(pi: ExtensionAPI): void {
.map((cmd) => ({ value: `mode ${cmd}`, label: cmd }));
}
if (parts[0] === "parallel" && parts.length <= 2) {
const subPrefix = parts[1] ?? "";
return ["start", "status", "stop", "pause", "resume", "merge"]
.filter((cmd) => cmd.startsWith(subPrefix))
.map((cmd) => ({ value: `parallel ${cmd}`, label: cmd }));
}
if (parts[0] === "prefs" && parts.length <= 2) {
const subPrefix = parts[1] ?? "";
return ["global", "project", "status", "wizard", "setup", "import-claude"]
@ -288,6 +303,108 @@ export function registerGSDCommand(pi: ExtensionAPI): void {
return;
}
// ─── Parallel Orchestration ────────────────────────────────────────
if (trimmed.startsWith("parallel")) {
const parallelArgs = trimmed.slice("parallel".length).trim();
const [subCmd = "", ...restParts] = parallelArgs.split(/\s+/);
const rest = restParts.join(" ");
if (subCmd === "start" || subCmd === "") {
const loaded = loadEffectiveGSDPreferences();
const config = resolveParallelConfig(loaded?.preferences);
if (!config.enabled) {
pi.sendMessage({
customType: "gsd-parallel",
content: "Parallel mode is not enabled. Set `parallel.enabled: true` in your preferences.",
display: false,
});
return;
}
const candidates = await prepareParallelStart(projectRoot(), loaded?.preferences);
const report = formatEligibilityReport(candidates);
if (candidates.eligible.length === 0) {
pi.sendMessage({ customType: "gsd-parallel", content: report + "\n\nNo milestones are eligible for parallel execution.", display: false });
return;
}
const result = await startParallel(
projectRoot(),
candidates.eligible.map(e => e.milestoneId),
loaded?.preferences,
);
const lines = [`Parallel orchestration started.`, `Workers: ${result.started.join(", ")}`];
if (result.errors.length > 0) {
lines.push(`Errors: ${result.errors.map(e => `${e.mid}: ${e.error}`).join("; ")}`);
}
pi.sendMessage({ customType: "gsd-parallel", content: report + "\n\n" + lines.join("\n"), display: false });
return;
}
if (subCmd === "status") {
if (!isParallelActive()) {
pi.sendMessage({ customType: "gsd-parallel", content: "No parallel orchestration is currently active.", display: false });
return;
}
const workers = getWorkerStatuses();
const lines = ["# Parallel Workers\n"];
for (const w of workers) {
lines.push(`- **${w.milestoneId}** (${w.title}) — ${w.state}${w.completedUnits} units — $${w.cost.toFixed(2)}`);
}
const orchState = getOrchestratorState();
if (orchState) {
lines.push(`\nTotal cost: $${orchState.totalCost.toFixed(2)}`);
}
pi.sendMessage({ customType: "gsd-parallel", content: lines.join("\n"), display: false });
return;
}
if (subCmd === "stop") {
const mid = rest.trim() || undefined;
await stopParallel(projectRoot(), mid);
pi.sendMessage({ customType: "gsd-parallel", content: mid ? `Stopped worker for ${mid}.` : "All parallel workers stopped.", display: false });
return;
}
if (subCmd === "pause") {
const mid = rest.trim() || undefined;
pauseWorker(projectRoot(), mid);
pi.sendMessage({ customType: "gsd-parallel", content: mid ? `Paused worker for ${mid}.` : "All parallel workers paused.", display: false });
return;
}
if (subCmd === "resume") {
const mid = rest.trim() || undefined;
resumeWorker(projectRoot(), mid);
pi.sendMessage({ customType: "gsd-parallel", content: mid ? `Resumed worker for ${mid}.` : "All parallel workers resumed.", display: false });
return;
}
if (subCmd === "merge") {
const mid = rest.trim() || undefined;
if (mid) {
// Merge a specific milestone
const result = await mergeCompletedMilestone(projectRoot(), mid);
pi.sendMessage({ customType: "gsd-parallel", content: formatMergeResults([result]), display: false });
return;
}
// Merge all completed milestones
const workers = getWorkerStatuses();
if (workers.length === 0) {
pi.sendMessage({ customType: "gsd-parallel", content: "No parallel workers to merge.", display: false });
return;
}
const results = await mergeAllCompleted(projectRoot(), workers);
pi.sendMessage({ customType: "gsd-parallel", content: formatMergeResults(results), display: false });
return;
}
pi.sendMessage({
customType: "gsd-parallel",
content: `Unknown parallel subcommand "${subCmd}". Usage: /gsd parallel [start|status|stop|pause|resume|merge]`,
display: false,
});
return;
}
if (trimmed === "cleanup") {
await handleCleanupBranches(ctx, projectRoot());
await handleCleanupSnapshots(ctx, projectRoot());

View file

@ -19,6 +19,7 @@ import {
} from "./metrics.js";
import { loadEffectiveGSDPreferences } from "./preferences.js";
import { getActiveWorktreeName } from "./worktree-command.js";
import { getWorkerBatches, hasActiveWorkers, type WorkerEntry } from "../subagent/worker-registry.js";
function formatDuration(ms: number): string {
const s = Math.floor(ms / 1000);
@ -363,6 +364,43 @@ export class GSDDashboardOverlay {
lines.push(blank());
}
// Parallel workers section — shows active subagent sessions
if (hasActiveWorkers()) {
lines.push(hr());
lines.push(row(th.fg("text", th.bold("Parallel Workers"))));
lines.push(blank());
const batches = getWorkerBatches();
for (const [batchId, workers] of batches) {
const running = workers.filter(w => w.status === "running").length;
const done = workers.filter(w => w.status === "completed").length;
const failed = workers.filter(w => w.status === "failed").length;
const total = workers[0]?.batchSize ?? workers.length;
lines.push(row(joinColumns(
` ${th.fg("accent", "⟐")} ${th.fg("text", `Batch ${batchId.slice(0, 8)}`)}`,
th.fg("dim", `${done + failed}/${total} done`),
contentWidth,
)));
for (const w of workers) {
const icon = w.status === "running"
? th.fg("accent", "▸")
: w.status === "completed"
? th.fg("success", "✓")
: th.fg("error", "✗");
const elapsed = th.fg("dim", formatDuration(Date.now() - w.startedAt));
const taskPreview = truncateToWidth(w.task, Math.max(20, contentWidth - 30));
lines.push(row(joinColumns(
` ${icon} ${th.fg("text", w.agent)} ${th.fg("dim", taskPreview)}`,
elapsed,
contentWidth,
)));
}
}
lines.push(blank());
}
// Pending captures badge — only shown when captures are waiting for triage
if (this.dashData.pendingCaptureCount > 0) {
const count = this.dashData.pendingCaptureCount;

View file

@ -11,6 +11,7 @@ import { RUNTIME_EXCLUSION_PATHS } from "./git-service.js";
import { nativeIsRepo, nativeWorktreeRemove, nativeBranchList, nativeBranchDelete, nativeLsFiles, nativeRmCached } from "./native-git-bridge.js";
import { readCrashLock, isLockProcessAlive, clearLock } from "./crash-recovery.js";
import { ensureGitignore } from "./gitignore.js";
import { readAllSessionStatuses, isSessionStale, removeSessionStatus } from "./session-status-io.js";
export type DoctorSeverity = "info" | "warning" | "error";
export type DoctorIssueCode =
@ -37,6 +38,7 @@ export type DoctorIssueCode =
| "tracked_runtime_files"
| "legacy_slice_branches"
| "stale_crash_lock"
| "stale_parallel_session"
| "orphaned_completed_units"
| "stale_hook_state"
| "activity_log_bloat"
@ -711,6 +713,31 @@ async function checkRuntimeHealth(
// Non-fatal — crash lock check failed
}
// ── Stale parallel sessions ────────────────────────────────────────────
try {
const parallelStatuses = readAllSessionStatuses(basePath);
for (const status of parallelStatuses) {
if (isSessionStale(status)) {
issues.push({
severity: "warning",
code: "stale_parallel_session",
scope: "project",
unitId: status.milestoneId,
message: `Stale parallel session for ${status.milestoneId} (PID ${status.pid}, started ${new Date(status.startedAt).toISOString()}, last heartbeat ${new Date(status.lastHeartbeat).toISOString()}) — process is no longer running`,
file: `.gsd/parallel/${status.milestoneId}.status.json`,
fixable: true,
});
if (shouldFix("stale_parallel_session")) {
removeSessionStatus(basePath, status.milestoneId);
fixesApplied.push(`cleaned up stale parallel session for ${status.milestoneId}`);
}
}
}
} catch {
// Non-fatal — parallel session check failed
}
// ── Orphaned completed-units keys ─────────────────────────────────────
try {
const completedKeysFile = join(root, "completed-units.json");

View file

@ -19,6 +19,7 @@ const GSD_RUNTIME_PATTERNS = [
".gsd/forensics/",
".gsd/runtime/",
".gsd/worktrees/",
".gsd/parallel/",
".gsd/auto.lock",
".gsd/metrics.json",
".gsd/completed-units.json",

View file

@ -0,0 +1,233 @@
/**
* GSD Parallel Eligibility Milestone parallelism analysis.
*
* Analyzes which milestones can safely run in parallel by checking
* dependency satisfaction and file overlap across slice plans.
*/
import { deriveState } from "./state.js";
import { parseRoadmap, parsePlan, loadFile } from "./files.js";
import { resolveMilestoneFile, resolveSliceFile } from "./paths.js";
import { findMilestoneIds } from "./guided-flow.js";
import type { MilestoneRegistryEntry } from "./types.js";
// ─── Types ───────────────────────────────────────────────────────────────────
export interface EligibilityResult {
milestoneId: string;
title: string;
eligible: boolean;
reason: string;
}
export interface ParallelCandidates {
eligible: EligibilityResult[];
ineligible: EligibilityResult[];
fileOverlaps: Array<{ mid1: string; mid2: string; files: string[] }>;
}
// ─── File Collection ─────────────────────────────────────────────────────────
/**
* Collect all `filesLikelyTouched` across every slice plan in a milestone.
* Returns a deduplicated list of file paths.
*/
async function collectTouchedFiles(
basePath: string,
milestoneId: string,
): Promise<string[]> {
const roadmapPath = resolveMilestoneFile(basePath, milestoneId, "ROADMAP");
if (!roadmapPath) return [];
const roadmapContent = await loadFile(roadmapPath);
if (!roadmapContent) return [];
const roadmap = parseRoadmap(roadmapContent);
const files = new Set<string>();
for (const slice of roadmap.slices) {
const planPath = resolveSliceFile(basePath, milestoneId, slice.id, "PLAN");
if (!planPath) continue;
const planContent = await loadFile(planPath);
if (!planContent) continue;
const plan = parsePlan(planContent);
for (const f of plan.filesLikelyTouched) {
files.add(f);
}
}
return [...files];
}
// ─── Overlap Detection ──────────────────────────────────────────────────────
/**
* Compare file sets across milestones and return pairs with overlapping files.
*/
function detectFileOverlaps(
fileSets: Map<string, string[]>,
): Array<{ mid1: string; mid2: string; files: string[] }> {
const overlaps: Array<{ mid1: string; mid2: string; files: string[] }> = [];
const ids = [...fileSets.keys()];
for (let i = 0; i < ids.length; i++) {
const files1 = new Set(fileSets.get(ids[i])!);
for (let j = i + 1; j < ids.length; j++) {
const files2 = fileSets.get(ids[j])!;
const shared = files2.filter(f => files1.has(f));
if (shared.length > 0) {
overlaps.push({ mid1: ids[i], mid2: ids[j], files: shared.sort() });
}
}
}
return overlaps;
}
// ─── Analysis ────────────────────────────────────────────────────────────────
/**
* Analyze milestones for parallel execution eligibility.
*
* A milestone is eligible if:
* 1. It is not complete
* 2. Its dependencies (`dependsOn`) are all complete
* 3. It does not have file overlap with other eligible milestones
* (overlaps are flagged as warnings but do not disqualify)
*/
export async function analyzeParallelEligibility(
basePath: string,
): Promise<ParallelCandidates> {
const milestoneIds = findMilestoneIds(basePath);
const state = await deriveState(basePath);
const registry = state.registry;
// Build a lookup for quick status checks
const registryMap = new Map<string, MilestoneRegistryEntry>();
for (const entry of registry) {
registryMap.set(entry.id, entry);
}
const eligible: EligibilityResult[] = [];
const ineligible: EligibilityResult[] = [];
for (const mid of milestoneIds) {
const entry = registryMap.get(mid);
const title = entry?.title ?? mid;
const status = entry?.status ?? "pending";
// Rule 1: skip complete milestones
if (status === "complete") {
ineligible.push({
milestoneId: mid,
title,
eligible: false,
reason: "Already complete.",
});
continue;
}
// Rule 2: check dependency satisfaction
const deps = entry?.dependsOn ?? [];
const unsatisfied = deps.filter(dep => {
const depEntry = registryMap.get(dep);
return !depEntry || depEntry.status !== "complete";
});
if (unsatisfied.length > 0) {
ineligible.push({
milestoneId: mid,
title,
eligible: false,
reason: `Blocked by incomplete dependencies: ${unsatisfied.join(", ")}.`,
});
continue;
}
eligible.push({
milestoneId: mid,
title,
eligible: true,
reason: "All dependencies satisfied.",
});
}
// Rule 3: check file overlap among eligible milestones
const fileSets = new Map<string, string[]>();
for (const result of eligible) {
const files = await collectTouchedFiles(basePath, result.milestoneId);
fileSets.set(result.milestoneId, files);
}
const fileOverlaps = detectFileOverlaps(fileSets);
// Annotate eligible milestones that have file overlaps
const overlappingIds = new Set<string>();
for (const overlap of fileOverlaps) {
overlappingIds.add(overlap.mid1);
overlappingIds.add(overlap.mid2);
}
for (const result of eligible) {
if (overlappingIds.has(result.milestoneId)) {
result.reason = "All dependencies satisfied. WARNING: has file overlap with another eligible milestone.";
}
}
return { eligible, ineligible, fileOverlaps };
}
// ─── Formatting ──────────────────────────────────────────────────────────────
/**
* Produce a human-readable report of parallel eligibility analysis.
*/
export function formatEligibilityReport(candidates: ParallelCandidates): string {
const lines: string[] = [];
lines.push("# Parallel Eligibility Report");
lines.push("");
// Eligible milestones
lines.push(`## Eligible for Parallel Execution (${candidates.eligible.length})`);
lines.push("");
if (candidates.eligible.length === 0) {
lines.push("No milestones are currently eligible for parallel execution.");
} else {
for (const e of candidates.eligible) {
lines.push(`- **${e.milestoneId}** — ${e.title}`);
lines.push(` ${e.reason}`);
}
}
lines.push("");
// Ineligible milestones
lines.push(`## Ineligible (${candidates.ineligible.length})`);
lines.push("");
if (candidates.ineligible.length === 0) {
lines.push("All milestones are eligible.");
} else {
for (const e of candidates.ineligible) {
lines.push(`- **${e.milestoneId}** — ${e.title}`);
lines.push(` ${e.reason}`);
}
}
lines.push("");
// File overlap warnings
if (candidates.fileOverlaps.length > 0) {
lines.push(`## File Overlap Warnings (${candidates.fileOverlaps.length})`);
lines.push("");
for (const overlap of candidates.fileOverlaps) {
lines.push(`- **${overlap.mid1}** <-> **${overlap.mid2}** — ${overlap.files.length} shared file(s):`);
for (const f of overlap.files) {
lines.push(` - \`${f}\``);
}
}
lines.push("");
}
return lines.join("\n");
}

View file

@ -0,0 +1,156 @@
/**
* GSD Parallel Merge Worktree reconciliation for parallel milestones.
*
* Handles merging completed milestone worktrees back to main branch
* with safety checks for parallel execution context.
*/
import { loadFile } from "./files.js";
import { resolveMilestoneFile } from "./paths.js";
import { mergeMilestoneToMain } from "./auto-worktree.js";
import { MergeConflictError } from "./git-service.js";
import { removeSessionStatus } from "./session-status-io.js";
import type { WorkerInfo } from "./parallel-orchestrator.js";
// ─── Types ─────────────────────────────────────────────────────────────────
export interface MergeResult {
milestoneId: string;
success: boolean;
commitMessage?: string;
pushed?: boolean;
error?: string;
conflictFiles?: string[];
}
export type MergeOrder = "sequential" | "by-completion";
// ─── Merge Queue ───────────────────────────────────────────────────────────
/**
* Determine safe merge order for completed milestones.
* Sequential: merge in milestone ID order (M001 before M002).
* By-completion: merge in the order milestones finished.
*/
export function determineMergeOrder(
workers: WorkerInfo[],
order: MergeOrder = "sequential",
): string[] {
const completed = workers.filter(w => w.state === "stopped" && w.completedUnits > 0);
if (order === "by-completion") {
return completed
.sort((a, b) => a.startedAt - b.startedAt) // earliest first
.map(w => w.milestoneId);
}
return completed
.sort((a, b) => a.milestoneId.localeCompare(b.milestoneId))
.map(w => w.milestoneId);
}
/**
* Attempt to merge a single milestone's worktree back to main.
* Wraps mergeMilestoneToMain with error handling for parallel context.
*/
export async function mergeCompletedMilestone(
basePath: string,
milestoneId: string,
): Promise<MergeResult> {
try {
// Load the roadmap content (needed by mergeMilestoneToMain)
const roadmapPath = resolveMilestoneFile(basePath, milestoneId, "ROADMAP");
if (!roadmapPath) {
return {
milestoneId,
success: false,
error: `No roadmap found for ${milestoneId}`,
};
}
const roadmapContent = await loadFile(roadmapPath);
if (!roadmapContent) {
return {
milestoneId,
success: false,
error: `Could not read roadmap for ${milestoneId}`,
};
}
// Attempt the merge
const result = mergeMilestoneToMain(basePath, milestoneId, roadmapContent);
// Clean up parallel session status
removeSessionStatus(basePath, milestoneId);
return {
milestoneId,
success: true,
commitMessage: result.commitMessage,
pushed: result.pushed,
};
} catch (err) {
if (err instanceof MergeConflictError) {
return {
milestoneId,
success: false,
error: `Merge conflict: ${err.conflictedFiles.length} conflicting file(s)`,
conflictFiles: err.conflictedFiles,
};
}
return {
milestoneId,
success: false,
error: err instanceof Error ? err.message : String(err),
};
}
}
/**
* Merge all completed milestones in sequence.
* Stops on first conflict and returns results so far.
*/
export async function mergeAllCompleted(
basePath: string,
workers: WorkerInfo[],
order: MergeOrder = "sequential",
): Promise<MergeResult[]> {
const mergeOrder = determineMergeOrder(workers, order);
const results: MergeResult[] = [];
for (const mid of mergeOrder) {
const result = await mergeCompletedMilestone(basePath, mid);
results.push(result);
// Stop on first conflict — later merges may depend on this one
if (!result.success && result.conflictFiles) {
break;
}
}
return results;
}
/**
* Format merge results for display.
*/
export function formatMergeResults(results: MergeResult[]): string {
if (results.length === 0) return "No completed milestones to merge.";
const lines: string[] = ["# Merge Results\n"];
for (const r of results) {
if (r.success) {
const pushStatus = r.pushed ? " (pushed)" : "";
lines.push(`- **${r.milestoneId}** — merged successfully${pushStatus}`);
} else if (r.conflictFiles) {
lines.push(`- **${r.milestoneId}** — CONFLICT (${r.conflictFiles.length} file(s)):`);
for (const f of r.conflictFiles) {
lines.push(` - \`${f}\``);
}
lines.push(` Resolve conflicts manually and run \`/gsd parallel merge ${r.milestoneId}\` to retry.`);
} else {
lines.push(`- **${r.milestoneId}** — failed: ${r.error}`);
}
}
return lines.join("\n");
}

View file

@ -0,0 +1,496 @@
/**
* GSD Parallel Orchestrator Core engine for parallel milestone orchestration.
*
* Manages worker lifecycle, budget tracking, and coordination. Workers are
* separate processes spawned via child_process, each running in its own git
* worktree with GSD_MILESTONE_LOCK env var set. The coordinator monitors
* workers via session status files (see session-status-io.ts).
*/
import { spawn, type ChildProcess } from "node:child_process";
import { existsSync } from "node:fs";
import { join, dirname } from "node:path";
import { fileURLToPath } from "node:url";
import { gsdRoot } from "./paths.js";
import { createWorktree, worktreePath } from "./worktree-manager.js";
import { autoWorktreeBranch, runWorktreePostCreateHook } from "./auto-worktree.js";
import { nativeBranchExists } from "./native-git-bridge.js";
import { readIntegrationBranch } from "./git-service.js";
import { resolveParallelConfig } from "./preferences.js";
import type { GSDPreferences } from "./preferences.js";
import type { ParallelConfig } from "./types.js";
import {
writeSessionStatus,
readAllSessionStatuses,
removeSessionStatus,
sendSignal,
cleanupStaleSessions,
type SessionStatus,
} from "./session-status-io.js";
import {
analyzeParallelEligibility,
type ParallelCandidates,
} from "./parallel-eligibility.js";
// ─── Types ─────────────────────────────────────────────────────────────────
export interface WorkerInfo {
milestoneId: string;
title: string;
pid: number;
process: ChildProcess | null; // null after process exits
worktreePath: string;
startedAt: number;
state: "running" | "paused" | "stopped" | "error";
completedUnits: number;
cost: number;
}
export interface OrchestratorState {
active: boolean;
workers: Map<string, WorkerInfo>;
config: ParallelConfig;
totalCost: number;
startedAt: number;
}
// ─── Module State ──────────────────────────────────────────────────────────
let state: OrchestratorState | null = null;
// ─── Accessors ─────────────────────────────────────────────────────────────
/** Returns true if the orchestrator is active and has been initialized. */
export function isParallelActive(): boolean {
return state?.active ?? false;
}
/** Returns the current orchestrator state, or null if not initialized. */
export function getOrchestratorState(): OrchestratorState | null {
return state;
}
/** Returns a snapshot of all tracked workers as an array. */
export function getWorkerStatuses(): WorkerInfo[] {
if (!state) return [];
return [...state.workers.values()];
}
// ─── Preparation ───────────────────────────────────────────────────────────
/**
* Analyze eligibility and prepare for parallel start.
* Returns the candidates report without actually starting workers.
*/
export async function prepareParallelStart(
basePath: string,
_prefs: GSDPreferences | undefined,
): Promise<ParallelCandidates> {
return analyzeParallelEligibility(basePath);
}
// ─── Start ─────────────────────────────────────────────────────────────────
/**
* Start parallel execution with the given eligible milestones.
* Creates worktrees, spawns worker processes, and begins monitoring.
*/
export async function startParallel(
basePath: string,
milestoneIds: string[],
prefs: GSDPreferences | undefined,
): Promise<{ started: string[]; errors: Array<{ mid: string; error: string }> }> {
// Prevent workers from spawning nested parallel sessions
if (process.env.GSD_PARALLEL_WORKER) {
return { started: [], errors: [{ mid: "all", error: "Cannot start parallel from within a parallel worker" }] };
}
const config = resolveParallelConfig(prefs);
const now = Date.now();
// Initialize orchestrator state
state = {
active: true,
workers: new Map(),
config,
totalCost: 0,
startedAt: now,
};
const started: string[] = [];
const errors: Array<{ mid: string; error: string }> = [];
// Cap to max_workers
const toStart = milestoneIds.slice(0, config.max_workers);
for (const mid of toStart) {
try {
// Create the worktree (without chdir — coordinator stays in project root)
let wtPath: string;
try {
wtPath = createMilestoneWorktree(basePath, mid);
} catch {
// Worktree creation may fail in test environments or when git
// is not available. Fall back to a placeholder path.
wtPath = worktreePath(basePath, mid);
}
const worker: WorkerInfo = {
milestoneId: mid,
title: mid,
pid: process.pid,
process: null,
worktreePath: wtPath,
startedAt: now,
state: "running",
completedUnits: 0,
cost: 0,
};
state.workers.set(mid, worker);
// Write initial session status
const sessionStatus: SessionStatus = {
milestoneId: mid,
pid: worker.pid,
state: "running",
currentUnit: null,
completedUnits: 0,
cost: 0,
lastHeartbeat: now,
startedAt: now,
worktreePath: wtPath,
};
writeSessionStatus(basePath, sessionStatus);
// Attempt to spawn the worker process.
// Spawning may fail if the CLI binary is not available (e.g., in tests).
// The worker is still tracked and can be spawned later via spawnWorker().
const spawned = spawnWorker(basePath, mid);
if (!spawned) {
// Worker tracked but not yet running a process.
// State stays "running" so coordinator can retry or user can investigate.
}
started.push(mid);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
errors.push({ mid, error: message });
}
}
// If nothing started successfully, deactivate
if (started.length === 0) {
state.active = false;
}
return { started, errors };
}
// ─── Worktree Creation ────────────────────────────────────────────────────
/**
* Create a git worktree for a milestone without changing the coordinator's cwd.
* Uses milestone/<MID> branch naming (same as auto-worktree.ts).
*/
function createMilestoneWorktree(basePath: string, milestoneId: string): string {
const branch = autoWorktreeBranch(milestoneId);
const branchExists = nativeBranchExists(basePath, branch);
let info: { name: string; path: string; branch: string; exists: boolean };
if (branchExists) {
info = createWorktree(basePath, milestoneId, { branch, reuseExistingBranch: true });
} else {
const integrationBranch = readIntegrationBranch(basePath, milestoneId) ?? undefined;
info = createWorktree(basePath, milestoneId, { branch, startPoint: integrationBranch });
}
// Run post-create hook if configured
runWorktreePostCreateHook(basePath, info.path);
return info.path;
}
// ─── Worker Spawning ───────────────────────────────────────────────────
/**
* Spawn a worker process for a milestone.
* The worker runs `gsd --print "/gsd auto"` in the milestone's worktree
* with GSD_MILESTONE_LOCK set to isolate state derivation.
*/
export function spawnWorker(
basePath: string,
milestoneId: string,
): boolean {
if (!state) return false;
const worker = state.workers.get(milestoneId);
if (!worker) return false;
if (worker.process) return true; // already spawned
// Resolve the GSD CLI binary path
const binPath = resolveGsdBin();
if (!binPath) return false;
let child: ChildProcess;
try {
child = spawn(process.execPath, [binPath, "--print", "/gsd auto"], {
cwd: worker.worktreePath,
env: {
...process.env,
GSD_MILESTONE_LOCK: milestoneId,
// Prevent workers from spawning their own parallel sessions
GSD_PARALLEL_WORKER: "1",
},
stdio: ["ignore", "pipe", "pipe"],
detached: false,
});
} catch {
return false;
}
// Handle spawn errors (e.g., ENOENT when binary doesn't exist)
child.on("error", () => {
if (!state) return;
const w = state.workers.get(milestoneId);
if (w) {
w.process = null;
// Don't change state — spawn failure is non-fatal, coordinator can retry
}
});
worker.process = child;
worker.pid = child.pid ?? 0;
if (!child.pid) {
// Spawn returned but no PID — process failed to start
worker.process = null;
return false;
}
// Update session status with real PID
writeSessionStatus(basePath, {
milestoneId,
pid: worker.pid,
state: "running",
currentUnit: null,
completedUnits: worker.completedUnits,
cost: worker.cost,
lastHeartbeat: Date.now(),
startedAt: worker.startedAt,
worktreePath: worker.worktreePath,
});
// Handle worker exit
child.on("exit", (code) => {
if (!state) return;
const w = state.workers.get(milestoneId);
if (!w) return;
w.process = null;
if (w.state === "stopped") return; // graceful stop, already handled
if (code === 0) {
w.state = "stopped";
} else {
w.state = "error";
}
// Update session status
writeSessionStatus(basePath, {
milestoneId,
pid: w.pid,
state: w.state,
currentUnit: null,
completedUnits: w.completedUnits,
cost: w.cost,
lastHeartbeat: Date.now(),
startedAt: w.startedAt,
worktreePath: w.worktreePath,
});
});
return true;
}
/**
* Resolve the GSD CLI binary path.
* Uses GSD_BIN_PATH env var (set by loader.ts) or falls back to
* finding the binary relative to the current module.
*/
function resolveGsdBin(): string | null {
// GSD_BIN_PATH is set by loader.ts to the absolute path of dist/loader.js
if (process.env.GSD_BIN_PATH && existsSync(process.env.GSD_BIN_PATH)) {
return process.env.GSD_BIN_PATH;
}
// Fallback: try to find loader.js relative to this file
// This file is at dist/resources/extensions/gsd/parallel-orchestrator.js
// loader.js is at dist/loader.js
let thisDir: string;
try {
thisDir = dirname(fileURLToPath(import.meta.url));
} catch {
thisDir = process.cwd();
}
const candidates = [
join(thisDir, "..", "..", "..", "loader.js"),
join(thisDir, "..", "..", "..", "..", "dist", "loader.js"),
];
for (const candidate of candidates) {
if (existsSync(candidate)) return candidate;
}
return null;
}
// ─── Stop ──────────────────────────────────────────────────────────────────
/**
* Stop all workers or a specific milestone's worker.
* Sends stop signals and updates tracking state.
*/
export async function stopParallel(
basePath: string,
milestoneId?: string,
): Promise<void> {
if (!state) return;
const targets = milestoneId
? [milestoneId]
: [...state.workers.keys()];
for (const mid of targets) {
const worker = state.workers.get(mid);
if (!worker) continue;
// Send stop signal via file-based IPC (worker checks on next dispatch)
sendSignal(basePath, mid, "stop");
// Also send SIGTERM to the process for immediate response
if (worker.process && worker.pid > 0) {
try {
worker.process.kill("SIGTERM");
} catch { /* process may already be dead */ }
}
// Update in-memory state
worker.state = "stopped";
worker.process = null;
// Clean up session status file
removeSessionStatus(basePath, mid);
}
// If stopping all workers, deactivate the orchestrator
if (!milestoneId) {
state.active = false;
}
}
// ─── Pause / Resume ────────────────────────────────────────────────────────
/** Pause a specific worker or all workers. */
export function pauseWorker(
basePath: string,
milestoneId?: string,
): void {
if (!state) return;
const targets = milestoneId
? [milestoneId]
: [...state.workers.keys()];
for (const mid of targets) {
const worker = state.workers.get(mid);
if (!worker || worker.state !== "running") continue;
sendSignal(basePath, mid, "pause");
worker.state = "paused";
}
}
/** Resume a specific worker or all workers. */
export function resumeWorker(
basePath: string,
milestoneId?: string,
): void {
if (!state) return;
const targets = milestoneId
? [milestoneId]
: [...state.workers.keys()];
for (const mid of targets) {
const worker = state.workers.get(mid);
if (!worker || worker.state !== "paused") continue;
sendSignal(basePath, mid, "resume");
worker.state = "running";
}
}
// ─── Status Refresh ────────────────────────────────────────────────────────
/**
* Poll worker statuses from disk and update orchestrator state.
* Call this periodically from the dashboard refresh cycle.
*/
export function refreshWorkerStatuses(basePath: string): void {
if (!state) return;
// Clean up stale sessions first
const staleIds = cleanupStaleSessions(basePath);
for (const mid of staleIds) {
const worker = state.workers.get(mid);
if (worker) {
worker.state = "error";
worker.process = null;
}
}
// Read all live session statuses from disk
const statuses = readAllSessionStatuses(basePath);
const statusMap = new Map<string, SessionStatus>();
for (const s of statuses) {
statusMap.set(s.milestoneId, s);
}
// Update in-memory worker state from disk data
for (const [mid, worker] of state.workers) {
const diskStatus = statusMap.get(mid);
if (!diskStatus) continue;
worker.state = diskStatus.state;
worker.completedUnits = diskStatus.completedUnits;
worker.cost = diskStatus.cost;
worker.pid = diskStatus.pid;
}
// Recalculate aggregate cost
state.totalCost = 0;
for (const worker of state.workers.values()) {
state.totalCost += worker.cost;
}
}
// ─── Budget ────────────────────────────────────────────────────────────────
/** Get aggregate cost across all workers. */
export function getAggregateCost(): number {
if (!state) return 0;
return state.totalCost;
}
/** Check if budget ceiling has been reached. */
export function isBudgetExceeded(): boolean {
if (!state) return false;
if (state.config.budget_ceiling == null) return false;
return state.totalCost >= state.config.budget_ceiling;
}
// ─── Reset ─────────────────────────────────────────────────────────────────
/** Reset orchestrator state. Called on clean shutdown. */
export function resetOrchestrator(): void {
state = null;
}

View file

@ -75,6 +75,7 @@ const KNOWN_PREFERENCE_KEYS = new Set<string>([
"token_profile",
"phases",
"auto_visualize",
"parallel",
]);
export interface GSDSkillRule {
@ -171,6 +172,7 @@ export interface GSDPreferences {
token_profile?: TokenProfile;
phases?: PhaseSkipPreferences;
auto_visualize?: boolean;
parallel?: import("./types.js").ParallelConfig;
}
export interface LoadedGSDPreferences {
@ -768,6 +770,9 @@ function mergePreferences(base: GSDPreferences, override: GSDPreferences): GSDPr
phases: (base.phases || override.phases)
? { ...(base.phases ?? {}), ...(override.phases ?? {}) }
: undefined,
parallel: (base.parallel || override.parallel)
? { ...(base.parallel ?? {}), ...(override.parallel ?? {}) } as import("./types.js").ParallelConfig
: undefined,
};
}
@ -1154,6 +1159,51 @@ export function validatePreferences(preferences: GSDPreferences): {
}
}
// ─── Parallel Config ────────────────────────────────────────────────────
if (preferences.parallel && typeof preferences.parallel === "object") {
const p = preferences.parallel as unknown as Record<string, unknown>;
const parallel: Record<string, unknown> = {};
if (p.enabled !== undefined) {
if (typeof p.enabled === "boolean") parallel.enabled = p.enabled;
else errors.push("parallel.enabled must be a boolean");
}
if (p.max_workers !== undefined) {
if (typeof p.max_workers === "number" && p.max_workers >= 1 && p.max_workers <= 4) {
parallel.max_workers = Math.floor(p.max_workers);
} else {
errors.push("parallel.max_workers must be a number between 1 and 4");
}
}
if (p.budget_ceiling !== undefined) {
if (typeof p.budget_ceiling === "number" && p.budget_ceiling > 0) {
parallel.budget_ceiling = p.budget_ceiling;
} else {
errors.push("parallel.budget_ceiling must be a positive number");
}
}
if (p.merge_strategy !== undefined) {
const validStrategies = new Set(["per-slice", "per-milestone"]);
if (typeof p.merge_strategy === "string" && validStrategies.has(p.merge_strategy)) {
parallel.merge_strategy = p.merge_strategy;
} else {
errors.push("parallel.merge_strategy must be one of: per-slice, per-milestone");
}
}
if (p.auto_merge !== undefined) {
const validModes = new Set(["auto", "confirm", "manual"]);
if (typeof p.auto_merge === "string" && validModes.has(p.auto_merge)) {
parallel.auto_merge = p.auto_merge;
} else {
errors.push("parallel.auto_merge must be one of: auto, confirm, manual");
}
}
if (Object.keys(parallel).length > 0) {
validated.parallel = parallel as unknown as import("./types.js").ParallelConfig;
}
}
// ─── Git Preferences ───────────────────────────────────────────────────
if (preferences.git && typeof preferences.git === "object") {
const git: Record<string, unknown> = {};
@ -1371,3 +1421,15 @@ export function updatePreferencesModels(models: GSDModelConfigV2): void {
writeFileSync(prefsPath, content, "utf-8");
}
// ─── Parallel Config Resolver ──────────────────────────────────────────────
export function resolveParallelConfig(prefs: GSDPreferences | undefined): import("./types.js").ParallelConfig {
return {
enabled: prefs?.parallel?.enabled ?? false,
max_workers: Math.max(1, Math.min(4, prefs?.parallel?.max_workers ?? 2)),
budget_ceiling: prefs?.parallel?.budget_ceiling,
merge_strategy: prefs?.parallel?.merge_strategy ?? "per-milestone",
auto_merge: prefs?.parallel?.auto_merge ?? "confirm",
};
}

View file

@ -0,0 +1,197 @@
/**
* GSD Session Status I/O
*
* File-based IPC protocol for coordinator-worker communication in
* parallel milestone orchestration. Each worker writes its status to a
* file; the coordinator reads all status files to monitor progress.
*
* Atomic writes (write to .tmp, then rename) prevent partial reads.
* Signal files let the coordinator send pause/resume/stop/rebase to workers.
* Stale detection combines PID liveness checks with heartbeat timeouts.
*/
import {
writeFileSync,
readFileSync,
renameSync,
unlinkSync,
readdirSync,
mkdirSync,
existsSync,
} from "node:fs";
import { join } from "node:path";
import { gsdRoot } from "./paths.js";
// ─── Types ─────────────────────────────────────────────────────────────────
export interface SessionStatus {
milestoneId: string;
pid: number;
state: "running" | "paused" | "stopped" | "error";
currentUnit: { type: string; id: string; startedAt: number } | null;
completedUnits: number;
cost: number;
lastHeartbeat: number;
startedAt: number;
worktreePath: string;
}
export type SessionSignal = "pause" | "resume" | "stop" | "rebase";
export interface SignalMessage {
signal: SessionSignal;
sentAt: number;
from: "coordinator";
}
// ─── Constants ─────────────────────────────────────────────────────────────
const PARALLEL_DIR = "parallel";
const STATUS_SUFFIX = ".status.json";
const SIGNAL_SUFFIX = ".signal.json";
const TMP_SUFFIX = ".tmp";
const DEFAULT_STALE_TIMEOUT_MS = 30_000;
// ─── Helpers ───────────────────────────────────────────────────────────────
function parallelDir(basePath: string): string {
return join(gsdRoot(basePath), PARALLEL_DIR);
}
function statusPath(basePath: string, milestoneId: string): string {
return join(parallelDir(basePath), `${milestoneId}${STATUS_SUFFIX}`);
}
function signalPath(basePath: string, milestoneId: string): string {
return join(parallelDir(basePath), `${milestoneId}${SIGNAL_SUFFIX}`);
}
function ensureParallelDir(basePath: string): void {
const dir = parallelDir(basePath);
if (!existsSync(dir)) {
mkdirSync(dir, { recursive: true });
}
}
function isPidAlive(pid: number): boolean {
try {
process.kill(pid, 0);
return true;
} catch {
return false;
}
}
// ─── Status I/O ────────────────────────────────────────────────────────────
/** Write session status atomically (write to .tmp, then rename). */
export function writeSessionStatus(basePath: string, status: SessionStatus): void {
try {
ensureParallelDir(basePath);
const dest = statusPath(basePath, status.milestoneId);
const tmp = dest + TMP_SUFFIX;
writeFileSync(tmp, JSON.stringify(status, null, 2), "utf-8");
renameSync(tmp, dest);
} catch { /* non-fatal */ }
}
/** Read a specific milestone's session status. */
export function readSessionStatus(basePath: string, milestoneId: string): SessionStatus | null {
try {
const p = statusPath(basePath, milestoneId);
if (!existsSync(p)) return null;
const raw = readFileSync(p, "utf-8");
return JSON.parse(raw) as SessionStatus;
} catch {
return null;
}
}
/** Read all session status files from .gsd/parallel/. */
export function readAllSessionStatuses(basePath: string): SessionStatus[] {
const dir = parallelDir(basePath);
if (!existsSync(dir)) return [];
const results: SessionStatus[] = [];
try {
const entries = readdirSync(dir);
for (const entry of entries) {
if (!entry.endsWith(STATUS_SUFFIX)) continue;
try {
const raw = readFileSync(join(dir, entry), "utf-8");
results.push(JSON.parse(raw) as SessionStatus);
} catch { /* skip corrupt files */ }
}
} catch { /* non-fatal */ }
return results;
}
/** Remove a milestone's session status file. */
export function removeSessionStatus(basePath: string, milestoneId: string): void {
try {
const p = statusPath(basePath, milestoneId);
if (existsSync(p)) unlinkSync(p);
} catch { /* non-fatal */ }
}
// ─── Signal I/O ────────────────────────────────────────────────────────────
/** Write a signal file for a worker to consume. */
export function sendSignal(basePath: string, milestoneId: string, signal: SessionSignal): void {
try {
ensureParallelDir(basePath);
const dest = signalPath(basePath, milestoneId);
const tmp = dest + TMP_SUFFIX;
const msg: SignalMessage = { signal, sentAt: Date.now(), from: "coordinator" };
writeFileSync(tmp, JSON.stringify(msg, null, 2), "utf-8");
renameSync(tmp, dest);
} catch { /* non-fatal */ }
}
/** Read and delete a signal file (atomic consume). Returns null if no signal pending. */
export function consumeSignal(basePath: string, milestoneId: string): SignalMessage | null {
try {
const p = signalPath(basePath, milestoneId);
if (!existsSync(p)) return null;
const raw = readFileSync(p, "utf-8");
unlinkSync(p);
return JSON.parse(raw) as SignalMessage;
} catch {
return null;
}
}
// ─── Stale Detection ───────────────────────────────────────────────────────
/** Check whether a session is stale (PID dead or heartbeat timed out). */
export function isSessionStale(
status: SessionStatus,
timeoutMs: number = DEFAULT_STALE_TIMEOUT_MS,
): boolean {
if (!isPidAlive(status.pid)) return true;
const elapsed = Date.now() - status.lastHeartbeat;
return elapsed > timeoutMs;
}
/** Find and remove stale sessions. Returns the milestone IDs that were cleaned up. */
export function cleanupStaleSessions(
basePath: string,
timeoutMs: number = DEFAULT_STALE_TIMEOUT_MS,
): string[] {
const removed: string[] = [];
const statuses = readAllSessionStatuses(basePath);
for (const status of statuses) {
if (isSessionStale(status, timeoutMs)) {
removeSessionStatus(basePath, status.milestoneId);
// Also clean up any lingering signal file
try {
const sig = signalPath(basePath, status.milestoneId);
if (existsSync(sig)) unlinkSync(sig);
} catch { /* non-fatal */ }
removed.push(status.milestoneId);
}
}
return removed;
}

View file

@ -94,6 +94,11 @@ export function invalidateStateCache(): void {
*/
export async function getActiveMilestoneId(basePath: string): Promise<string | null> {
const milestoneIds = findMilestoneIds(basePath);
// Parallel worker isolation
const milestoneLock = process.env.GSD_MILESTONE_LOCK;
if (milestoneLock) {
return milestoneIds.includes(milestoneLock) ? milestoneLock : null;
}
for (const mid of milestoneIds) {
const roadmapFile = resolveMilestoneFile(basePath, mid, "ROADMAP");
const content = roadmapFile ? await loadFile(roadmapFile) : null;
@ -141,6 +146,18 @@ export async function deriveState(basePath: string): Promise<GSDState> {
async function _deriveStateImpl(basePath: string): Promise<GSDState> {
const milestoneIds = findMilestoneIds(basePath);
// ── Parallel worker isolation ──────────────────────────────────────────
// When GSD_MILESTONE_LOCK is set, this process is a parallel worker
// scoped to a single milestone. Filter the milestone list so this worker
// only sees its assigned milestone (all others are treated as if they
// don't exist). This gives each worker complete isolation without
// modifying any other state derivation logic.
const milestoneLock = process.env.GSD_MILESTONE_LOCK;
if (milestoneLock && milestoneIds.includes(milestoneLock)) {
milestoneIds.length = 0;
milestoneIds.push(milestoneLock);
}
// ── Batch-parse file cache ──────────────────────────────────────────────
// When the native Rust parser is available, read every .md file under .gsd/
// in one call and build an in-memory content map keyed by absolute path.

View file

@ -9,8 +9,12 @@ import {
test("getBudgetAlertLevel returns the expected threshold bucket", () => {
assert.equal(getBudgetAlertLevel(0.10), 0);
assert.equal(getBudgetAlertLevel(0.74), 0);
assert.equal(getBudgetAlertLevel(0.75), 75);
assert.equal(getBudgetAlertLevel(0.89), 75);
assert.equal(getBudgetAlertLevel(0.79), 75);
assert.equal(getBudgetAlertLevel(0.80), 80);
assert.equal(getBudgetAlertLevel(0.85), 80);
assert.equal(getBudgetAlertLevel(0.89), 80);
assert.equal(getBudgetAlertLevel(0.90), 90);
assert.equal(getBudgetAlertLevel(1.00), 100);
});
@ -18,14 +22,27 @@ test("getBudgetAlertLevel returns the expected threshold bucket", () => {
test("getNewBudgetAlertLevel only emits once per threshold", () => {
assert.equal(getNewBudgetAlertLevel(0, 0.74), null);
assert.equal(getNewBudgetAlertLevel(0, 0.75), 75);
assert.equal(getNewBudgetAlertLevel(75, 0.80), null);
assert.equal(getNewBudgetAlertLevel(75, 0.90), 90);
assert.equal(getNewBudgetAlertLevel(75, 0.79), null);
assert.equal(getNewBudgetAlertLevel(75, 0.80), 80);
assert.equal(getNewBudgetAlertLevel(80, 0.85), null);
assert.equal(getNewBudgetAlertLevel(80, 0.90), 90);
assert.equal(getNewBudgetAlertLevel(90, 0.95), null);
assert.equal(getNewBudgetAlertLevel(90, 1.0), 100);
assert.equal(getNewBudgetAlertLevel(100, 1.2), null);
});
test("80% alert fires exactly once between 75% and 90%", () => {
// Transition from 75 → 80 emits 80
assert.equal(getNewBudgetAlertLevel(75, 0.80), 80);
// Already at 80 — no re-emission
assert.equal(getNewBudgetAlertLevel(80, 0.82), null);
assert.equal(getNewBudgetAlertLevel(80, 0.89), null);
// Transition from 80 → 90 emits 90
assert.equal(getNewBudgetAlertLevel(80, 0.90), 90);
});
test("getBudgetEnforcementAction maps the configured ceiling behavior", () => {
assert.equal(getBudgetEnforcementAction("warn", 0.80), "none");
assert.equal(getBudgetEnforcementAction("warn", 0.99), "none");
assert.equal(getBudgetEnforcementAction("warn", 1.0), "warn");
assert.equal(getBudgetEnforcementAction("pause", 1.0), "pause");

View file

@ -0,0 +1,656 @@
/**
* Tests for parallel milestone orchestration modules:
* - session-status-io.ts (file-based IPC)
* - parallel-eligibility.ts (eligibility formatting)
* - parallel-orchestrator.ts (orchestrator lifecycle)
* - preferences.ts (parallel config validation)
*/
import { describe, it, beforeEach, afterEach } from "node:test";
import assert from "node:assert/strict";
import { mkdtempSync, mkdirSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import {
writeSessionStatus,
readSessionStatus,
readAllSessionStatuses,
removeSessionStatus,
sendSignal,
consumeSignal,
isSessionStale,
cleanupStaleSessions,
type SessionStatus,
} from "../session-status-io.js";
import {
formatEligibilityReport,
type ParallelCandidates,
} from "../parallel-eligibility.js";
import {
isParallelActive,
getOrchestratorState,
getWorkerStatuses,
startParallel,
stopParallel,
pauseWorker,
resumeWorker,
getAggregateCost,
isBudgetExceeded,
resetOrchestrator,
} from "../parallel-orchestrator.js";
import { validatePreferences, resolveParallelConfig } from "../preferences.js";
import { determineMergeOrder, formatMergeResults, type MergeResult } from "../parallel-merge.js";
import type { WorkerInfo } from "../parallel-orchestrator.js";
// ─── Test Helpers ────────────────────────────────────────────────────────────
function makeTmpBase(): string {
const base = mkdtempSync(join(tmpdir(), "gsd-parallel-test-"));
mkdirSync(join(base, ".gsd"), { recursive: true });
return base;
}
function makeStatus(overrides: Partial<SessionStatus> = {}): SessionStatus {
return {
milestoneId: "M001",
pid: process.pid,
state: "running",
currentUnit: { type: "execute-task", id: "M001/S01/T01", startedAt: Date.now() },
completedUnits: 3,
cost: 1.50,
lastHeartbeat: Date.now(),
startedAt: Date.now() - 60_000,
worktreePath: "/tmp/test-worktree",
...overrides,
};
}
// ─── session-status-io ───────────────────────────────────────────────────────
describe("session-status-io: status roundtrip", () => {
let base: string;
beforeEach(() => { base = makeTmpBase(); });
afterEach(() => { rmSync(base, { recursive: true, force: true }); });
it("write then read returns identical status", () => {
const status = makeStatus();
writeSessionStatus(base, status);
const read = readSessionStatus(base, "M001");
assert.ok(read);
assert.equal(read.milestoneId, "M001");
assert.equal(read.pid, process.pid);
assert.equal(read.state, "running");
assert.equal(read.completedUnits, 3);
assert.equal(read.cost, 1.50);
});
it("readSessionStatus returns null for missing milestone", () => {
const read = readSessionStatus(base, "M999");
assert.equal(read, null);
});
it("readAllSessionStatuses returns all written statuses", () => {
writeSessionStatus(base, makeStatus({ milestoneId: "M001" }));
writeSessionStatus(base, makeStatus({ milestoneId: "M002" }));
writeSessionStatus(base, makeStatus({ milestoneId: "M003" }));
const all = readAllSessionStatuses(base);
assert.equal(all.length, 3);
const ids = all.map(s => s.milestoneId).sort();
assert.deepEqual(ids, ["M001", "M002", "M003"]);
});
it("readAllSessionStatuses returns empty array when no parallel dir", () => {
const all = readAllSessionStatuses(base);
assert.equal(all.length, 0);
});
it("removeSessionStatus deletes the file", () => {
writeSessionStatus(base, makeStatus());
assert.ok(readSessionStatus(base, "M001"));
removeSessionStatus(base, "M001");
assert.equal(readSessionStatus(base, "M001"), null);
});
});
describe("session-status-io: signal roundtrip", () => {
let base: string;
beforeEach(() => { base = makeTmpBase(); });
afterEach(() => { rmSync(base, { recursive: true, force: true }); });
it("sendSignal then consumeSignal returns the signal", () => {
sendSignal(base, "M001", "pause");
const signal = consumeSignal(base, "M001");
assert.ok(signal);
assert.equal(signal.signal, "pause");
assert.equal(signal.from, "coordinator");
assert.ok(signal.sentAt > 0);
});
it("consumeSignal removes the signal file", () => {
sendSignal(base, "M001", "stop");
consumeSignal(base, "M001");
const second = consumeSignal(base, "M001");
assert.equal(second, null);
});
it("consumeSignal returns null when no signal pending", () => {
assert.equal(consumeSignal(base, "M001"), null);
});
});
describe("session-status-io: stale detection", () => {
it("isSessionStale returns false for current process PID", () => {
const status = makeStatus({ pid: process.pid, lastHeartbeat: Date.now() });
assert.equal(isSessionStale(status), false);
});
it("isSessionStale returns true for dead PID", () => {
// PID 2147483647 is extremely unlikely to be alive
const status = makeStatus({ pid: 2147483647, lastHeartbeat: Date.now() });
assert.equal(isSessionStale(status), true);
});
it("isSessionStale returns true for expired heartbeat", () => {
const status = makeStatus({
pid: process.pid,
lastHeartbeat: Date.now() - 60_000,
});
assert.equal(isSessionStale(status, 5_000), true);
});
it("isSessionStale returns false for recent heartbeat with alive PID", () => {
const status = makeStatus({
pid: process.pid,
lastHeartbeat: Date.now(),
});
assert.equal(isSessionStale(status, 30_000), false);
});
});
describe("session-status-io: cleanupStaleSessions", () => {
let base: string;
beforeEach(() => { base = makeTmpBase(); });
afterEach(() => { rmSync(base, { recursive: true, force: true }); });
it("removes stale sessions and returns their IDs", () => {
// Write a stale session (dead PID)
writeSessionStatus(base, makeStatus({
milestoneId: "M001",
pid: 2147483647,
}));
// Write a live session
writeSessionStatus(base, makeStatus({
milestoneId: "M002",
pid: process.pid,
lastHeartbeat: Date.now(),
}));
const removed = cleanupStaleSessions(base);
assert.deepEqual(removed, ["M001"]);
assert.equal(readSessionStatus(base, "M001"), null);
assert.ok(readSessionStatus(base, "M002"));
});
});
// ─── parallel-eligibility ────────────────────────────────────────────────────
describe("parallel-eligibility: formatEligibilityReport", () => {
it("formats empty candidates", () => {
const candidates: ParallelCandidates = {
eligible: [],
ineligible: [],
fileOverlaps: [],
};
const report = formatEligibilityReport(candidates);
assert.ok(report.includes("Eligible for Parallel Execution (0)"));
assert.ok(report.includes("No milestones are currently eligible"));
});
it("formats eligible milestones", () => {
const candidates: ParallelCandidates = {
eligible: [
{ milestoneId: "M001", title: "Auth System", eligible: true, reason: "All dependencies satisfied." },
{ milestoneId: "M002", title: "Dashboard", eligible: true, reason: "All dependencies satisfied." },
],
ineligible: [],
fileOverlaps: [],
};
const report = formatEligibilityReport(candidates);
assert.ok(report.includes("Eligible for Parallel Execution (2)"));
assert.ok(report.includes("**M001** — Auth System"));
assert.ok(report.includes("**M002** — Dashboard"));
});
it("formats ineligible milestones with reasons", () => {
const candidates: ParallelCandidates = {
eligible: [],
ineligible: [
{ milestoneId: "M003", title: "API", eligible: false, reason: "Blocked by incomplete dependencies: M001." },
],
fileOverlaps: [],
};
const report = formatEligibilityReport(candidates);
assert.ok(report.includes("Ineligible (1)"));
assert.ok(report.includes("Blocked by incomplete dependencies"));
});
it("formats file overlap warnings", () => {
const candidates: ParallelCandidates = {
eligible: [
{ milestoneId: "M001", title: "Auth", eligible: true, reason: "OK" },
{ milestoneId: "M002", title: "API", eligible: true, reason: "OK" },
],
ineligible: [],
fileOverlaps: [
{ mid1: "M001", mid2: "M002", files: ["src/types.ts", "src/utils.ts"] },
],
};
const report = formatEligibilityReport(candidates);
assert.ok(report.includes("File Overlap Warnings (1)"));
assert.ok(report.includes("`src/types.ts`"));
assert.ok(report.includes("`src/utils.ts`"));
});
});
// ─── parallel-orchestrator ───────────────────────────────────────────────────
describe("parallel-orchestrator: lifecycle", () => {
let base: string;
beforeEach(() => {
base = makeTmpBase();
resetOrchestrator();
});
afterEach(() => {
resetOrchestrator();
rmSync(base, { recursive: true, force: true });
});
it("isParallelActive returns false initially", () => {
assert.equal(isParallelActive(), false);
});
it("getOrchestratorState returns null initially", () => {
assert.equal(getOrchestratorState(), null);
});
it("startParallel initializes orchestrator state", async () => {
const result = await startParallel(base, ["M001", "M002"], {
parallel: { enabled: true, max_workers: 4, merge_strategy: "per-milestone", auto_merge: "confirm" },
});
assert.deepEqual(result.started, ["M001", "M002"]);
assert.equal(result.errors.length, 0);
assert.equal(isParallelActive(), true);
assert.equal(getWorkerStatuses().length, 2);
});
it("startParallel caps to max_workers", async () => {
const result = await startParallel(base, ["M001", "M002", "M003", "M004"], {
parallel: { enabled: true, max_workers: 2, merge_strategy: "per-milestone", auto_merge: "confirm" },
});
assert.deepEqual(result.started, ["M001", "M002"]);
assert.equal(getWorkerStatuses().length, 2);
});
it("startParallel writes session status files", async () => {
await startParallel(base, ["M001"], undefined);
const status = readSessionStatus(base, "M001");
assert.ok(status);
assert.equal(status.milestoneId, "M001");
assert.equal(status.state, "running");
});
it("stopParallel stops all workers", async () => {
await startParallel(base, ["M001", "M002"], undefined);
await stopParallel(base);
assert.equal(isParallelActive(), false);
const workers = getWorkerStatuses();
assert.ok(workers.every(w => w.state === "stopped"));
});
it("stopParallel stops a specific worker", async () => {
await startParallel(base, ["M001", "M002"], undefined);
await stopParallel(base, "M001");
const workers = getWorkerStatuses();
const m1 = workers.find(w => w.milestoneId === "M001");
const m2 = workers.find(w => w.milestoneId === "M002");
assert.equal(m1?.state, "stopped");
assert.equal(m2?.state, "running");
assert.equal(isParallelActive(), true);
});
it("pauseWorker and resumeWorker toggle worker state", async () => {
await startParallel(base, ["M001"], undefined);
pauseWorker(base, "M001");
assert.equal(getWorkerStatuses()[0].state, "paused");
resumeWorker(base, "M001");
assert.equal(getWorkerStatuses()[0].state, "running");
});
it("pauseWorker sends pause signal", async () => {
await startParallel(base, ["M001"], undefined);
pauseWorker(base, "M001");
const signal = consumeSignal(base, "M001");
assert.ok(signal);
assert.equal(signal.signal, "pause");
});
});
describe("parallel-orchestrator: budget", () => {
beforeEach(() => { resetOrchestrator(); });
afterEach(() => { resetOrchestrator(); });
it("getAggregateCost returns 0 when not active", () => {
assert.equal(getAggregateCost(), 0);
});
it("isBudgetExceeded returns false when not active", () => {
assert.equal(isBudgetExceeded(), false);
});
it("isBudgetExceeded returns false when no ceiling set", async () => {
const base = makeTmpBase();
await startParallel(base, ["M001"], undefined);
assert.equal(isBudgetExceeded(), false);
resetOrchestrator();
rmSync(base, { recursive: true, force: true });
});
it("isBudgetExceeded returns true when ceiling reached", async () => {
const base = makeTmpBase();
await startParallel(base, ["M001"], {
parallel: { enabled: true, max_workers: 2, budget_ceiling: 1.00, merge_strategy: "per-milestone", auto_merge: "confirm" },
});
// Manually set totalCost to test budget check
const orchState = getOrchestratorState();
if (orchState) orchState.totalCost = 1.50;
assert.equal(isBudgetExceeded(), true);
resetOrchestrator();
rmSync(base, { recursive: true, force: true });
});
});
// ─── preferences: parallel config ────────────────────────────────────────────
describe("preferences: resolveParallelConfig", () => {
it("returns defaults when prefs is undefined", () => {
const config = resolveParallelConfig(undefined);
assert.equal(config.enabled, false);
assert.equal(config.max_workers, 2);
assert.equal(config.budget_ceiling, undefined);
assert.equal(config.merge_strategy, "per-milestone");
assert.equal(config.auto_merge, "confirm");
});
it("returns defaults when parallel is undefined", () => {
const config = resolveParallelConfig({});
assert.equal(config.enabled, false);
assert.equal(config.max_workers, 2);
});
it("fills in missing fields with defaults", () => {
const config = resolveParallelConfig({
parallel: { enabled: true } as any,
});
assert.equal(config.enabled, true);
assert.equal(config.max_workers, 2);
assert.equal(config.merge_strategy, "per-milestone");
});
it("clamps max_workers to 1-4 range", () => {
assert.equal(resolveParallelConfig({
parallel: { enabled: true, max_workers: 0, merge_strategy: "per-milestone", auto_merge: "confirm" },
}).max_workers, 1);
assert.equal(resolveParallelConfig({
parallel: { enabled: true, max_workers: 10, merge_strategy: "per-milestone", auto_merge: "confirm" },
}).max_workers, 4);
});
});
describe("preferences: validatePreferences parallel config", () => {
it("validates valid parallel config without errors", () => {
const result = validatePreferences({
parallel: {
enabled: true,
max_workers: 3,
budget_ceiling: 50.00,
merge_strategy: "per-slice",
auto_merge: "manual",
},
});
assert.equal(result.errors.length, 0);
assert.ok(result.preferences.parallel);
assert.equal(result.preferences.parallel?.enabled, true);
assert.equal(result.preferences.parallel?.max_workers, 3);
});
it("rejects invalid max_workers", () => {
const result = validatePreferences({
parallel: { max_workers: 10 } as any,
});
assert.ok(result.errors.some(e => e.includes("max_workers")));
});
it("rejects negative budget_ceiling", () => {
const result = validatePreferences({
parallel: { budget_ceiling: -5 } as any,
});
assert.ok(result.errors.some(e => e.includes("budget_ceiling")));
});
it("rejects invalid merge_strategy", () => {
const result = validatePreferences({
parallel: { merge_strategy: "invalid" } as any,
});
assert.ok(result.errors.some(e => e.includes("merge_strategy")));
});
it("rejects invalid auto_merge", () => {
const result = validatePreferences({
parallel: { auto_merge: "yolo" } as any,
});
assert.ok(result.errors.some(e => e.includes("auto_merge")));
});
});
// ─── Test Helpers (parallel-merge) ───────────────────────────────────────────
function makeWorker(overrides: Partial<WorkerInfo> = {}): WorkerInfo {
return {
milestoneId: "M001",
title: "Test Milestone",
pid: process.pid,
process: null,
worktreePath: "/tmp/test-worktree",
startedAt: Date.now() - 60_000,
state: "stopped",
completedUnits: 5,
cost: 2.50,
...overrides,
};
}
// ─── parallel-merge: determineMergeOrder ─────────────────────────────────────
describe("parallel-merge: determineMergeOrder sequential", () => {
it("returns milestone IDs sorted alphabetically by default", () => {
const workers = [
makeWorker({ milestoneId: "M003", state: "stopped", completedUnits: 1 }),
makeWorker({ milestoneId: "M001", state: "stopped", completedUnits: 2 }),
makeWorker({ milestoneId: "M002", state: "stopped", completedUnits: 3 }),
];
const order = determineMergeOrder(workers, "sequential");
assert.deepEqual(order, ["M001", "M002", "M003"]);
});
it("excludes workers that are still running", () => {
const workers = [
makeWorker({ milestoneId: "M001", state: "stopped", completedUnits: 5 }),
makeWorker({ milestoneId: "M002", state: "running", completedUnits: 0 }),
makeWorker({ milestoneId: "M003", state: "stopped", completedUnits: 2 }),
];
const order = determineMergeOrder(workers, "sequential");
assert.deepEqual(order, ["M001", "M003"]);
});
it("excludes workers with zero completedUnits even if stopped", () => {
const workers = [
makeWorker({ milestoneId: "M001", state: "stopped", completedUnits: 0 }),
makeWorker({ milestoneId: "M002", state: "stopped", completedUnits: 3 }),
];
const order = determineMergeOrder(workers, "sequential");
assert.deepEqual(order, ["M002"]);
});
it("returns empty array when no workers are completed", () => {
const workers = [
makeWorker({ milestoneId: "M001", state: "running", completedUnits: 0 }),
makeWorker({ milestoneId: "M002", state: "paused", completedUnits: 0 }),
];
const order = determineMergeOrder(workers);
assert.deepEqual(order, []);
});
it("uses sequential order as the default when no order arg provided", () => {
const workers = [
makeWorker({ milestoneId: "M002", state: "stopped", completedUnits: 1 }),
makeWorker({ milestoneId: "M001", state: "stopped", completedUnits: 1 }),
];
// Call with no second argument — should default to "sequential"
const order = determineMergeOrder(workers);
assert.deepEqual(order, ["M001", "M002"]);
});
});
describe("parallel-merge: determineMergeOrder by-completion", () => {
it("returns milestones sorted by startedAt (earliest first)", () => {
const now = Date.now();
const workers = [
makeWorker({ milestoneId: "M003", state: "stopped", completedUnits: 1, startedAt: now - 30_000 }),
makeWorker({ milestoneId: "M001", state: "stopped", completedUnits: 1, startedAt: now - 90_000 }),
makeWorker({ milestoneId: "M002", state: "stopped", completedUnits: 1, startedAt: now - 60_000 }),
];
const order = determineMergeOrder(workers, "by-completion");
assert.deepEqual(order, ["M001", "M002", "M003"]);
});
it("excludes paused workers from by-completion order", () => {
const now = Date.now();
const workers = [
makeWorker({ milestoneId: "M001", state: "stopped", completedUnits: 2, startedAt: now - 90_000 }),
makeWorker({ milestoneId: "M002", state: "paused", completedUnits: 1, startedAt: now - 60_000 }),
makeWorker({ milestoneId: "M003", state: "stopped", completedUnits: 3, startedAt: now - 30_000 }),
];
const order = determineMergeOrder(workers, "by-completion");
assert.deepEqual(order, ["M001", "M003"]);
});
});
// ─── parallel-merge: formatMergeResults ──────────────────────────────────────
describe("parallel-merge: formatMergeResults", () => {
it("returns a no-op message for an empty results array", () => {
const output = formatMergeResults([]);
assert.equal(output, "No completed milestones to merge.");
});
it("formats a single successful merge without push", () => {
const results: MergeResult[] = [
{ milestoneId: "M001", success: true, commitMessage: "feat: auth system", pushed: false },
];
const output = formatMergeResults(results);
assert.ok(output.includes("# Merge Results"));
assert.ok(output.includes("**M001**"));
assert.ok(output.includes("merged successfully"));
assert.ok(!output.includes("(pushed)"));
});
it("includes (pushed) suffix when result.pushed is true", () => {
const results: MergeResult[] = [
{ milestoneId: "M002", success: true, commitMessage: "feat: dashboard", pushed: true },
];
const output = formatMergeResults(results);
assert.ok(output.includes("(pushed)"));
});
it("formats a conflict result with file list and retry instructions", () => {
const results: MergeResult[] = [
{
milestoneId: "M003",
success: false,
conflictFiles: ["src/types.ts", "src/utils.ts"],
error: "Merge conflict: 2 conflicting file(s)",
},
];
const output = formatMergeResults(results);
assert.ok(output.includes("**M003**"));
assert.ok(output.includes("CONFLICT (2 file(s))"));
assert.ok(output.includes("`src/types.ts`"));
assert.ok(output.includes("`src/utils.ts`"));
assert.ok(output.includes("/gsd parallel merge M003"));
});
it("formats a generic error (no conflict files) with the error message", () => {
const results: MergeResult[] = [
{ milestoneId: "M004", success: false, error: "No roadmap found for M004" },
];
const output = formatMergeResults(results);
assert.ok(output.includes("**M004**"));
assert.ok(output.includes("failed: No roadmap found for M004"));
assert.ok(!output.includes("CONFLICT"));
});
it("formats multiple results in the order provided", () => {
const results: MergeResult[] = [
{ milestoneId: "M001", success: true, pushed: false },
{ milestoneId: "M002", success: false, error: "branch not found" },
{ milestoneId: "M003", success: true, pushed: true },
];
const output = formatMergeResults(results);
const m1Pos = output.indexOf("M001");
const m2Pos = output.indexOf("M002");
const m3Pos = output.indexOf("M003");
assert.ok(m1Pos < m2Pos, "M001 should appear before M002");
assert.ok(m2Pos < m3Pos, "M002 should appear before M003");
});
});
// ─── doctor: stale_parallel_session issue code ───────────────────────────────
describe("doctor: stale_parallel_session issue code exists", () => {
it("DoctorIssueCode union includes stale_parallel_session", async () => {
// Import doctor.ts and verify the type is real by constructing a DoctorIssue
// with code "stale_parallel_session" — TypeScript will reject it at compile
// time if the code is not in the union; the runtime assertion confirms the
// string value round-trips through the typed object correctly.
const { } = await import("../doctor.js");
// Construct a value that satisfies DoctorIssue using the code under test
const issue: import("../doctor.js").DoctorIssue = {
severity: "warning",
code: "stale_parallel_session",
scope: "project",
unitId: "M001",
message: "Stale parallel session detected",
fixable: true,
};
assert.equal(issue.code, "stale_parallel_session");
});
it("DoctorIssue with stale_parallel_session has warning severity", () => {
const issue: import("../doctor.js").DoctorIssue = {
severity: "warning",
code: "stale_parallel_session",
scope: "project",
unitId: "M002",
message: "Stale parallel session for M002",
fixable: true,
};
assert.equal(issue.severity, "warning");
assert.equal(issue.fixable, true);
assert.equal(issue.scope, "project");
});
});

View file

@ -0,0 +1,354 @@
/**
* E2E test: Parallel workers across multiple milestones.
*
* Validates the full lifecycle of the worker registry + metrics + budget
* alerting across multiple milestone contexts. Uses real filesystem fixtures
* and the actual metrics/worker-registry modules (no mocking).
*
* Covers:
* - Worker registry tracking across parallel batches
* - Metrics ledger accumulation across milestones
* - Budget alert level transitions including the 80% threshold
* - Dashboard data aggregation with parallel worker context
* - Cost projection with budget ceiling awareness
*/
import { mkdtempSync, mkdirSync, rmSync, writeFileSync, readFileSync } from 'node:fs';
import { join } from 'node:path';
import { tmpdir } from 'node:os';
import { createTestContext } from './test-helpers.ts';
import {
registerWorker,
updateWorker,
getActiveWorkers,
getWorkerBatches,
hasActiveWorkers,
resetWorkerRegistry,
} from '../../subagent/worker-registry.ts';
import {
getBudgetAlertLevel,
getNewBudgetAlertLevel,
getBudgetEnforcementAction,
} from '../auto.ts';
import {
type UnitMetrics,
type MetricsLedger,
getProjectTotals,
aggregateByPhase,
aggregateBySlice,
formatCost,
formatCostProjection,
getAverageCostPerUnitType,
predictRemainingCost,
} from '../metrics.ts';
const { assertEq, assertTrue, assertMatch, report } = createTestContext();
// ─── Fixture helpers ──────────────────────────────────────────────────────────
function createFixtureBase(): string {
const base = mkdtempSync(join(tmpdir(), 'gsd-e2e-parallel-'));
mkdirSync(join(base, '.gsd', 'milestones'), { recursive: true });
return base;
}
function writeMetricsLedger(base: string, ledger: MetricsLedger): void {
writeFileSync(join(base, '.gsd', 'metrics.json'), JSON.stringify(ledger, null, 2));
}
function readMetricsLedger(base: string): MetricsLedger {
return JSON.parse(readFileSync(join(base, '.gsd', 'metrics.json'), 'utf-8'));
}
function makeUnit(overrides: Partial<UnitMetrics> = {}): UnitMetrics {
return {
type: "execute-task",
id: "M001/S01/T01",
model: "claude-sonnet-4-20250514",
startedAt: Date.now() - 5000,
finishedAt: Date.now(),
tokens: { input: 1000, output: 500, cacheRead: 200, cacheWrite: 100, total: 1800 },
cost: 0.05,
toolCalls: 3,
assistantMessages: 2,
userMessages: 1,
...overrides,
};
}
function cleanup(base: string): void {
rmSync(base, { recursive: true, force: true });
}
// ─── E2E: Parallel workers across M001 and M002 ──────────────────────────────
console.log("\n=== E2E: Parallel workers across milestones ===");
{
resetWorkerRegistry();
const base = createFixtureBase();
// Create milestone directories
mkdirSync(join(base, '.gsd', 'milestones', 'M001'), { recursive: true });
mkdirSync(join(base, '.gsd', 'milestones', 'M002'), { recursive: true });
// Simulate M001 parallel workers (batch 1)
const batch1Id = "batch-m001";
const w1 = registerWorker("scout", "Explore M001 codebase", 0, 3, batch1Id);
const w2 = registerWorker("researcher", "Research M001 APIs", 1, 3, batch1Id);
const w3 = registerWorker("worker", "Implement M001 feature", 2, 3, batch1Id);
assertEq(getActiveWorkers().length, 3, "M001: 3 parallel workers registered");
assertTrue(hasActiveWorkers(), "M001: has active workers");
const batches1 = getWorkerBatches();
assertEq(batches1.size, 1, "M001: single batch");
assertEq(batches1.get(batch1Id)!.length, 3, "M001: batch has 3 workers");
// Complete M001 workers
updateWorker(w1, "completed");
updateWorker(w2, "completed");
updateWorker(w3, "completed");
assertTrue(!hasActiveWorkers(), "M001: no active workers after completion");
// Simulate M002 parallel workers (batch 2) — overlapping with M001 cleanup
const batch2Id = "batch-m002";
const w4 = registerWorker("scout", "Explore M002 codebase", 0, 2, batch2Id);
const w5 = registerWorker("worker", "Implement M002 feature", 1, 2, batch2Id);
assertTrue(hasActiveWorkers(), "M002: has active workers");
const batches2 = getWorkerBatches();
// M001 workers may still be in cleanup window (5s timeout), M002 workers are active
assertTrue(batches2.has(batch2Id), "M002: batch exists");
assertEq(batches2.get(batch2Id)!.length, 2, "M002: batch has 2 workers");
// One worker fails in M002
updateWorker(w4, "completed");
updateWorker(w5, "failed");
assertTrue(!hasActiveWorkers(), "M002: no active workers after all finish");
// Verify worker statuses reflect correctly
const allWorkers = getActiveWorkers();
const m002Workers = allWorkers.filter(w => w.batchId === batch2Id);
if (m002Workers.length > 0) {
const failedWorker = m002Workers.find(w => w.status === "failed");
assertTrue(failedWorker !== undefined, "M002: failed worker tracked");
assertEq(failedWorker?.agent, "worker", "M002: failed worker is 'worker'");
}
cleanup(base);
}
// ─── E2E: Metrics accumulation across milestones ──────────────────────────────
console.log("\n=== E2E: Metrics across milestones ===");
{
const base = createFixtureBase();
// Build a ledger spanning two milestones
const ledger: MetricsLedger = {
version: 1,
projectStartedAt: Date.now() - 60000,
units: [
// M001 units
makeUnit({ type: "research-milestone", id: "M001", cost: 0.10 }),
makeUnit({ type: "plan-milestone", id: "M001", cost: 0.08 }),
makeUnit({ type: "plan-slice", id: "M001/S01", cost: 0.05 }),
makeUnit({ type: "execute-task", id: "M001/S01/T01", cost: 0.12 }),
makeUnit({ type: "execute-task", id: "M001/S01/T02", cost: 0.15 }),
makeUnit({ type: "complete-slice", id: "M001/S01", cost: 0.03 }),
makeUnit({ type: "plan-slice", id: "M001/S02", cost: 0.06 }),
makeUnit({ type: "execute-task", id: "M001/S02/T01", cost: 0.20 }),
makeUnit({ type: "complete-slice", id: "M001/S02", cost: 0.04 }),
// M002 units
makeUnit({ type: "research-milestone", id: "M002", cost: 0.12 }),
makeUnit({ type: "plan-milestone", id: "M002", cost: 0.09 }),
makeUnit({ type: "plan-slice", id: "M002/S01", cost: 0.07 }),
makeUnit({ type: "execute-task", id: "M002/S01/T01", cost: 0.18 }),
],
};
writeMetricsLedger(base, ledger);
const loaded = readMetricsLedger(base);
// Verify totals
const totals = getProjectTotals(loaded.units);
assertEq(totals.units, 13, "metrics: 13 total units across M001+M002");
const totalCost = loaded.units.reduce((sum, u) => sum + u.cost, 0);
assertTrue(Math.abs(totals.cost - totalCost) < 0.001, "metrics: total cost matches sum");
// Verify phase aggregation
const phases = aggregateByPhase(loaded.units);
const research = phases.find(p => p.phase === "research");
assertTrue(research !== undefined, "metrics: research phase exists");
assertEq(research!.units, 2, "metrics: 2 research units (M001 + M002)");
const execution = phases.find(p => p.phase === "execution");
assertTrue(execution !== undefined, "metrics: execution phase exists");
assertEq(execution!.units, 4, "metrics: 4 execution units across both milestones");
// Verify slice aggregation
const slices = aggregateBySlice(loaded.units);
assertTrue(slices.length >= 4, "metrics: at least 4 slice aggregates (M001/S01, M001/S02, M002/S01, milestone-level)");
const m001s01 = slices.find(s => s.sliceId === "M001/S01");
assertTrue(m001s01 !== undefined, "metrics: M001/S01 slice aggregate exists");
// M001/S01 has: plan-slice + T01 + T02 + complete-slice = 4 units
assertEq(m001s01!.units, 4, "metrics: M001/S01 has 4 units");
// Cost projection
const projLines = formatCostProjection(slices, 3, 2.0);
assertTrue(projLines.length >= 1, "metrics: cost projection generated");
assertMatch(projLines[0], /Projected remaining/, "metrics: projection line text");
cleanup(base);
}
// ─── E2E: Budget alert progression through all thresholds ─────────────────────
console.log("\n=== E2E: Budget alert progression 0→75→80→90→100 ===");
{
// Simulate spending progression against a $10 budget ceiling
const ceiling = 10.0;
// Start: 50% spent
let lastLevel = getBudgetAlertLevel(5.0 / ceiling);
assertEq(lastLevel, 0, "budget: 50% → level 0");
assertEq(getNewBudgetAlertLevel(0, 5.0 / ceiling), null, "budget: no alert at 50%");
// Spend to 75%
let newLevel = getNewBudgetAlertLevel(lastLevel, 7.5 / ceiling);
assertEq(newLevel, 75, "budget: alert fires at 75%");
lastLevel = newLevel!;
// Spend to 78% — no alert (between 75 and 80)
assertEq(getNewBudgetAlertLevel(lastLevel, 7.8 / ceiling), null, "budget: no alert at 78%");
// Spend to 80% — 80% approach alert
newLevel = getNewBudgetAlertLevel(lastLevel, 8.0 / ceiling);
assertEq(newLevel, 80, "budget: approach alert fires at 80%");
lastLevel = newLevel!;
// Spend to 85% — no alert (still at 80 level)
assertEq(getNewBudgetAlertLevel(lastLevel, 8.5 / ceiling), null, "budget: no alert at 85%");
// Spend to 90%
newLevel = getNewBudgetAlertLevel(lastLevel, 9.0 / ceiling);
assertEq(newLevel, 90, "budget: alert fires at 90%");
lastLevel = newLevel!;
// Spend to 100%
newLevel = getNewBudgetAlertLevel(lastLevel, 10.0 / ceiling);
assertEq(newLevel, 100, "budget: alert fires at 100%");
lastLevel = newLevel!;
// Over budget — no re-emission
assertEq(getNewBudgetAlertLevel(lastLevel, 12.0 / ceiling), null, "budget: no re-alert over 100%");
// Enforcement at 80% — still "none" (enforcement only at 100%)
assertEq(getBudgetEnforcementAction("pause", 0.80), "none", "budget: no enforcement at 80%");
assertEq(getBudgetEnforcementAction("halt", 0.80), "none", "budget: no enforcement at 80%");
assertEq(getBudgetEnforcementAction("warn", 0.80), "none", "budget: no enforcement at 80%");
}
// ─── E2E: Budget prediction with multi-milestone cost data ────────────────────
console.log("\n=== E2E: Budget prediction across milestones ===");
{
const units: UnitMetrics[] = [
makeUnit({ type: "execute-task", id: "M001/S01/T01", cost: 0.10 }),
makeUnit({ type: "execute-task", id: "M001/S01/T02", cost: 0.15 }),
makeUnit({ type: "plan-slice", id: "M001/S01", cost: 0.05 }),
makeUnit({ type: "execute-task", id: "M002/S01/T01", cost: 0.20 }),
makeUnit({ type: "plan-slice", id: "M002/S01", cost: 0.08 }),
];
const avgCosts = getAverageCostPerUnitType(units);
assertTrue(avgCosts.has("execute-task"), "prediction: has execute-task average");
assertTrue(avgCosts.has("plan-slice"), "prediction: has plan-slice average");
// Average execute-task cost: (0.10 + 0.15 + 0.20) / 3 = 0.15
const execAvg = avgCosts.get("execute-task")!;
assertTrue(Math.abs(execAvg - 0.15) < 0.001, `prediction: execute-task avg is $0.15 (got ${execAvg})`);
// Average plan-slice cost: (0.05 + 0.08) / 2 = 0.065
const planAvg = avgCosts.get("plan-slice")!;
assertTrue(Math.abs(planAvg - 0.065) < 0.001, `prediction: plan-slice avg is $0.065 (got ${planAvg})`);
// Predict remaining cost for 3 more execute-tasks and 1 plan-slice
const remaining = predictRemainingCost(avgCosts, [
"execute-task", "execute-task", "execute-task", "plan-slice",
]);
// Expected: 3 * 0.15 + 1 * 0.065 = 0.515
assertTrue(Math.abs(remaining - 0.515) < 0.001, `prediction: remaining cost ~$0.515 (got ${remaining})`);
}
// ─── E2E: Parallel workers + budget alerts combined scenario ──────────────────
console.log("\n=== E2E: Combined parallel workers + budget monitoring ===");
{
resetWorkerRegistry();
// Simulate a scenario: 3 parallel workers running while budget is at 78%
const batchId = "batch-combined";
const w1 = registerWorker("scout", "Research APIs", 0, 3, batchId);
const w2 = registerWorker("worker", "Implement feature", 1, 3, batchId);
const w3 = registerWorker("worker", "Write tests", 2, 3, batchId);
// Budget is at 78% — no alert yet (between 75 and 80)
const ceiling = 10.0;
let lastLevel: ReturnType<typeof getBudgetAlertLevel> = 75; // already got 75% alert
assertEq(getNewBudgetAlertLevel(lastLevel, 7.8 / ceiling), null, "combined: no alert at 78% with workers running");
assertTrue(hasActiveWorkers(), "combined: workers running during budget check");
// First worker completes, cost rises to 80%
updateWorker(w1, "completed");
const level80 = getNewBudgetAlertLevel(lastLevel, 8.0 / ceiling);
assertEq(level80, 80, "combined: 80% approach alert fires after worker completes");
lastLevel = level80!;
// Second worker completes, cost rises to 88%
updateWorker(w2, "completed");
assertEq(getNewBudgetAlertLevel(lastLevel, 8.8 / ceiling), null, "combined: no alert at 88%");
// Third worker completes, cost reaches 90%
updateWorker(w3, "completed");
const level90 = getNewBudgetAlertLevel(lastLevel, 9.0 / ceiling);
assertEq(level90, 90, "combined: 90% alert fires after all workers complete");
assertTrue(!hasActiveWorkers(), "combined: no active workers at end");
resetWorkerRegistry();
}
// ─── E2E: formatCostProjection with budget ceiling warnings ───────────────────
console.log("\n=== E2E: Cost projection ceiling warnings ===");
{
const slices = [
{ sliceId: "M001/S01", units: 4, tokens: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, cost: 3.0, duration: 10000 },
{ sliceId: "M001/S02", units: 3, tokens: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, cost: 4.0, duration: 8000 },
{ sliceId: "M002/S01", units: 3, tokens: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, cost: 5.0, duration: 12000 },
];
// With ceiling NOT yet reached
const proj1 = formatCostProjection(slices, 2, 20.0);
assertTrue(proj1.length >= 1, "projection: has projection line");
assertMatch(proj1[0], /Projected remaining/, "projection: shows projection");
assertTrue(proj1.length === 1, "projection: no ceiling warning when under budget");
// With ceiling reached (spent 12.0 >= ceiling 10.0)
const proj2 = formatCostProjection(slices, 2, 10.0);
assertTrue(proj2.length >= 2, "projection: has ceiling warning when over budget");
assertMatch(proj2[1], /ceiling/, "projection: ceiling warning text");
}
// ─── Summary ──────────────────────────────────────────────────────────────────
report();

View file

@ -0,0 +1,148 @@
/**
* Tests for the parallel worker registry used by the dashboard overlay.
*
* Verifies worker lifecycle (register update cleanup), batch grouping,
* and the hasActiveWorkers() status check.
*/
import { createTestContext } from './test-helpers.ts';
import {
registerWorker,
updateWorker,
getActiveWorkers,
getWorkerBatches,
hasActiveWorkers,
resetWorkerRegistry,
} from '../../subagent/worker-registry.ts';
const { assertEq, assertTrue, report } = createTestContext();
// ─── Setup ────────────────────────────────────────────────────────────────────
resetWorkerRegistry();
// ─── Registration ─────────────────────────────────────────────────────────────
console.log("\n=== Worker Registration ===");
{
resetWorkerRegistry();
const id = registerWorker("scout", "Explore codebase", 0, 3, "batch-1");
assertTrue(id.startsWith("worker-"), "worker ID has correct prefix");
const workers = getActiveWorkers();
assertEq(workers.length, 1, "one worker registered");
assertEq(workers[0].agent, "scout", "worker agent name correct");
assertEq(workers[0].task, "Explore codebase", "worker task correct");
assertEq(workers[0].status, "running", "worker starts as running");
assertEq(workers[0].index, 0, "worker index correct");
assertEq(workers[0].batchSize, 3, "worker batch size correct");
assertEq(workers[0].batchId, "batch-1", "worker batch ID correct");
}
// ─── Multiple workers in a batch ──────────────────────────────────────────────
console.log("\n=== Multiple Workers in a Batch ===");
{
resetWorkerRegistry();
const id1 = registerWorker("scout", "Task A", 0, 3, "batch-2");
const id2 = registerWorker("researcher", "Task B", 1, 3, "batch-2");
const id3 = registerWorker("worker", "Task C", 2, 3, "batch-2");
const workers = getActiveWorkers();
assertEq(workers.length, 3, "three workers registered");
assertTrue(hasActiveWorkers(), "has active workers");
const batches = getWorkerBatches();
assertEq(batches.size, 1, "one batch");
const batch = batches.get("batch-2");
assertTrue(batch !== undefined, "batch-2 exists");
assertEq(batch!.length, 3, "batch has 3 workers");
}
// ─── Worker status updates ────────────────────────────────────────────────────
console.log("\n=== Worker Status Updates ===");
{
resetWorkerRegistry();
const id1 = registerWorker("scout", "Task A", 0, 2, "batch-3");
const id2 = registerWorker("worker", "Task B", 1, 2, "batch-3");
updateWorker(id1, "completed");
const workers = getActiveWorkers();
const w1 = workers.find(w => w.id === id1);
assertEq(w1?.status, "completed", "worker 1 marked completed");
const w2 = workers.find(w => w.id === id2);
assertEq(w2?.status, "running", "worker 2 still running");
assertTrue(hasActiveWorkers(), "still has active workers (worker 2 running)");
}
// ─── Failed worker ────────────────────────────────────────────────────────────
console.log("\n=== Failed Worker ===");
{
resetWorkerRegistry();
const id = registerWorker("scout", "Task A", 0, 1, "batch-4");
updateWorker(id, "failed");
const workers = getActiveWorkers();
assertEq(workers[0].status, "failed", "worker marked failed");
}
// ─── Multiple batches ─────────────────────────────────────────────────────────
console.log("\n=== Multiple Batches ===");
{
resetWorkerRegistry();
registerWorker("scout", "Task A", 0, 2, "batch-5");
registerWorker("worker", "Task B", 1, 2, "batch-5");
registerWorker("researcher", "Task C", 0, 1, "batch-6");
const batches = getWorkerBatches();
assertEq(batches.size, 2, "two batches");
assertEq(batches.get("batch-5")!.length, 2, "batch-5 has 2 workers");
assertEq(batches.get("batch-6")!.length, 1, "batch-6 has 1 worker");
}
// ─── hasActiveWorkers with all completed ──────────────────────────────────────
console.log("\n=== hasActiveWorkers — all completed ===");
{
resetWorkerRegistry();
const id1 = registerWorker("scout", "Task A", 0, 2, "batch-7");
const id2 = registerWorker("worker", "Task B", 1, 2, "batch-7");
updateWorker(id1, "completed");
updateWorker(id2, "completed");
assertTrue(!hasActiveWorkers(), "no active workers when all completed");
}
// ─── Reset clears everything ─────────────────────────────────────────────────
console.log("\n=== Reset ===");
{
registerWorker("scout", "Task", 0, 1, "batch-8");
assertTrue(getActiveWorkers().length > 0, "workers exist before reset");
resetWorkerRegistry();
assertEq(getActiveWorkers().length, 0, "no workers after reset");
assertTrue(!hasActiveWorkers(), "hasActiveWorkers false after reset");
}
// ─── Update non-existent worker is no-op ──────────────────────────────────────
console.log("\n=== Update non-existent worker ===");
{
resetWorkerRegistry();
// Should not throw
updateWorker("nonexistent-id", "completed");
assertEq(getActiveWorkers().length, 0, "no workers created by updating nonexistent");
}
// ─── Summary ──────────────────────────────────────────────────────────────────
report();

View file

@ -364,3 +364,16 @@ export interface Requirement {
full_content: string; // full requirement text
superseded_by: string | null; // ID of superseding requirement, or null
}
// ─── Parallel Orchestration Types ────────────────────────────────────────
export type MergeStrategy = "per-slice" | "per-milestone";
export type AutoMergeMode = "auto" | "confirm" | "manual";
export interface ParallelConfig {
enabled: boolean;
max_workers: number;
budget_ceiling?: number;
merge_strategy: MergeStrategy;
auto_merge: AutoMergeMode;
}

View file

@ -33,6 +33,7 @@ import {
mergeDeltaPatches,
readIsolationMode,
} from "./isolation.js";
import { registerWorker, updateWorker } from "./worker-registry.js";
const MAX_PARALLEL_TASKS = 8;
const MAX_CONCURRENCY = 4;
@ -626,7 +627,10 @@ export default function (pi: ExtensionAPI) {
};
const MAX_RETRIES = 1; // Retry failed tasks once
const batchId = crypto.randomUUID();
const batchSize = params.tasks.length;
const results = await mapWithConcurrencyLimit(params.tasks, MAX_CONCURRENCY, async (t, index) => {
const workerId = registerWorker(t.agent, t.task, index, batchSize, batchId);
let result = await runSingleAgent(
ctx.cwd,
agents,
@ -666,6 +670,7 @@ export default function (pi: ExtensionAPI) {
);
}
updateWorker(workerId, result.exitCode === 0 ? "completed" : "failed");
allResults[index] = result;
emitParallelUpdate();
return result;

View file

@ -0,0 +1,99 @@
/**
* Worker Registry Tracks active subagent sessions for dashboard visibility.
*
* Provides a global registry of currently-running parallel workers so the
* GSD dashboard overlay can display real-time worker status.
*/
export interface WorkerEntry {
id: string;
agent: string;
task: string;
startedAt: number;
status: "running" | "completed" | "failed";
/** Index within a parallel batch (0-based) */
index: number;
/** Total workers in the parallel batch */
batchSize: number;
/** Unique batch identifier for grouping parallel runs */
batchId: string;
}
const activeWorkers = new Map<string, WorkerEntry>();
let workerIdCounter = 0;
/**
* Register a new worker. Returns the worker ID for later updates.
*/
export function registerWorker(
agent: string,
task: string,
index: number,
batchSize: number,
batchId: string,
): string {
const id = `worker-${++workerIdCounter}`;
activeWorkers.set(id, {
id,
agent,
task,
startedAt: Date.now(),
status: "running",
index,
batchSize,
batchId,
});
return id;
}
/**
* Update worker status when it completes or fails.
*/
export function updateWorker(id: string, status: "completed" | "failed"): void {
const entry = activeWorkers.get(id);
if (entry) {
entry.status = status;
// Remove after a brief display window (5 seconds)
setTimeout(() => {
activeWorkers.delete(id);
}, 5000);
}
}
/**
* Get all currently-tracked workers (running + recently completed).
*/
export function getActiveWorkers(): WorkerEntry[] {
return Array.from(activeWorkers.values());
}
/**
* Get workers grouped by batch.
*/
export function getWorkerBatches(): Map<string, WorkerEntry[]> {
const batches = new Map<string, WorkerEntry[]>();
for (const worker of activeWorkers.values()) {
const batch = batches.get(worker.batchId) ?? [];
batch.push(worker);
batches.set(worker.batchId, batch);
}
return batches;
}
/**
* Check if any parallel workers are currently running.
*/
export function hasActiveWorkers(): boolean {
for (const worker of activeWorkers.values()) {
if (worker.status === "running") return true;
}
return false;
}
/**
* Reset registry state. Used for testing.
*/
export function resetWorkerRegistry(): void {
activeWorkers.clear();
workerIdCounter = 0;
}