feat: add dashboard parallel workers view, 80% budget alert, and E2E tests
Add three remaining features: 1. Dashboard multi-session view: New worker registry (subagent/worker-registry.ts) tracks active parallel subagent sessions with batch grouping and status lifecycle. Dashboard overlay now renders a "Parallel Workers" section showing per-batch worker status with agent names, task previews, and elapsed time. 2. Budget approach notification at 80%: Added 80% threshold to the existing 75/90/100 budget alert levels. Fires an "Approaching budget ceiling" notification with desktop alert at the 80% mark, giving users earlier warning before hitting enforcement thresholds. 3. End-to-end testing across milestones: New E2E test validates parallel worker lifecycle across M001/M002 milestones, metrics accumulation, full budget alert progression (0→75→80→90→100), cost prediction with multi-milestone data, and combined worker+budget scenarios. Worker registry unit tests cover registration, batch grouping, status updates, and edge cases.
This commit is contained in:
parent
01b0d530c8
commit
0ee7016bc7
7 changed files with 670 additions and 4 deletions
|
|
@ -362,11 +362,12 @@ let _sigtermHandler: (() => void) | null = null;
|
|||
*/
|
||||
const inFlightTools = new Map<string, number>();
|
||||
|
||||
type BudgetAlertLevel = 0 | 75 | 90 | 100;
|
||||
type BudgetAlertLevel = 0 | 75 | 80 | 90 | 100;
|
||||
|
||||
export function getBudgetAlertLevel(budgetPct: number): BudgetAlertLevel {
|
||||
if (budgetPct >= 1.0) return 100;
|
||||
if (budgetPct >= 0.90) return 90;
|
||||
if (budgetPct >= 0.80) return 80;
|
||||
if (budgetPct >= 0.75) return 75;
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -2233,6 +2234,10 @@ async function dispatchNextUnit(
|
|||
lastBudgetAlertLevel = newBudgetAlertLevel;
|
||||
ctx.ui.notify(`Budget 90%: ${formatCost(totalCost)} / ${formatCost(budgetCeiling)}`, "warning");
|
||||
sendDesktopNotification("GSD", `Budget 90%: ${formatCost(totalCost)} / ${formatCost(budgetCeiling)}`, "warning", "budget");
|
||||
} else if (newBudgetAlertLevel === 80) {
|
||||
lastBudgetAlertLevel = newBudgetAlertLevel;
|
||||
ctx.ui.notify(`Approaching budget ceiling — 80%: ${formatCost(totalCost)} / ${formatCost(budgetCeiling)}`, "warning");
|
||||
sendDesktopNotification("GSD", `Approaching budget ceiling — 80%: ${formatCost(totalCost)} / ${formatCost(budgetCeiling)}`, "warning", "budget");
|
||||
} else if (newBudgetAlertLevel === 75) {
|
||||
lastBudgetAlertLevel = newBudgetAlertLevel;
|
||||
ctx.ui.notify(`Budget 75%: ${formatCost(totalCost)} / ${formatCost(budgetCeiling)}`, "info");
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import {
|
|||
} from "./metrics.js";
|
||||
import { loadEffectiveGSDPreferences } from "./preferences.js";
|
||||
import { getActiveWorktreeName } from "./worktree-command.js";
|
||||
import { getWorkerBatches, hasActiveWorkers, type WorkerEntry } from "../subagent/worker-registry.js";
|
||||
|
||||
function formatDuration(ms: number): string {
|
||||
const s = Math.floor(ms / 1000);
|
||||
|
|
@ -363,6 +364,43 @@ export class GSDDashboardOverlay {
|
|||
lines.push(blank());
|
||||
}
|
||||
|
||||
// Parallel workers section — shows active subagent sessions
|
||||
if (hasActiveWorkers()) {
|
||||
lines.push(hr());
|
||||
lines.push(row(th.fg("text", th.bold("Parallel Workers"))));
|
||||
lines.push(blank());
|
||||
|
||||
const batches = getWorkerBatches();
|
||||
for (const [batchId, workers] of batches) {
|
||||
const running = workers.filter(w => w.status === "running").length;
|
||||
const done = workers.filter(w => w.status === "completed").length;
|
||||
const failed = workers.filter(w => w.status === "failed").length;
|
||||
const total = workers[0]?.batchSize ?? workers.length;
|
||||
|
||||
lines.push(row(joinColumns(
|
||||
` ${th.fg("accent", "⟐")} ${th.fg("text", `Batch ${batchId.slice(0, 8)}`)}`,
|
||||
th.fg("dim", `${done + failed}/${total} done`),
|
||||
contentWidth,
|
||||
)));
|
||||
|
||||
for (const w of workers) {
|
||||
const icon = w.status === "running"
|
||||
? th.fg("accent", "▸")
|
||||
: w.status === "completed"
|
||||
? th.fg("success", "✓")
|
||||
: th.fg("error", "✗");
|
||||
const elapsed = th.fg("dim", formatDuration(Date.now() - w.startedAt));
|
||||
const taskPreview = truncateToWidth(w.task, Math.max(20, contentWidth - 30));
|
||||
lines.push(row(joinColumns(
|
||||
` ${icon} ${th.fg("text", w.agent)} ${th.fg("dim", taskPreview)}`,
|
||||
elapsed,
|
||||
contentWidth,
|
||||
)));
|
||||
}
|
||||
}
|
||||
lines.push(blank());
|
||||
}
|
||||
|
||||
// Pending captures badge — only shown when captures are waiting for triage
|
||||
if (this.dashData.pendingCaptureCount > 0) {
|
||||
const count = this.dashData.pendingCaptureCount;
|
||||
|
|
|
|||
|
|
@ -9,8 +9,12 @@ import {
|
|||
|
||||
test("getBudgetAlertLevel returns the expected threshold bucket", () => {
|
||||
assert.equal(getBudgetAlertLevel(0.10), 0);
|
||||
assert.equal(getBudgetAlertLevel(0.74), 0);
|
||||
assert.equal(getBudgetAlertLevel(0.75), 75);
|
||||
assert.equal(getBudgetAlertLevel(0.89), 75);
|
||||
assert.equal(getBudgetAlertLevel(0.79), 75);
|
||||
assert.equal(getBudgetAlertLevel(0.80), 80);
|
||||
assert.equal(getBudgetAlertLevel(0.85), 80);
|
||||
assert.equal(getBudgetAlertLevel(0.89), 80);
|
||||
assert.equal(getBudgetAlertLevel(0.90), 90);
|
||||
assert.equal(getBudgetAlertLevel(1.00), 100);
|
||||
});
|
||||
|
|
@ -18,14 +22,27 @@ test("getBudgetAlertLevel returns the expected threshold bucket", () => {
|
|||
test("getNewBudgetAlertLevel only emits once per threshold", () => {
|
||||
assert.equal(getNewBudgetAlertLevel(0, 0.74), null);
|
||||
assert.equal(getNewBudgetAlertLevel(0, 0.75), 75);
|
||||
assert.equal(getNewBudgetAlertLevel(75, 0.80), null);
|
||||
assert.equal(getNewBudgetAlertLevel(75, 0.90), 90);
|
||||
assert.equal(getNewBudgetAlertLevel(75, 0.79), null);
|
||||
assert.equal(getNewBudgetAlertLevel(75, 0.80), 80);
|
||||
assert.equal(getNewBudgetAlertLevel(80, 0.85), null);
|
||||
assert.equal(getNewBudgetAlertLevel(80, 0.90), 90);
|
||||
assert.equal(getNewBudgetAlertLevel(90, 0.95), null);
|
||||
assert.equal(getNewBudgetAlertLevel(90, 1.0), 100);
|
||||
assert.equal(getNewBudgetAlertLevel(100, 1.2), null);
|
||||
});
|
||||
|
||||
test("80% alert fires exactly once between 75% and 90%", () => {
|
||||
// Transition from 75 → 80 emits 80
|
||||
assert.equal(getNewBudgetAlertLevel(75, 0.80), 80);
|
||||
// Already at 80 — no re-emission
|
||||
assert.equal(getNewBudgetAlertLevel(80, 0.82), null);
|
||||
assert.equal(getNewBudgetAlertLevel(80, 0.89), null);
|
||||
// Transition from 80 → 90 emits 90
|
||||
assert.equal(getNewBudgetAlertLevel(80, 0.90), 90);
|
||||
});
|
||||
|
||||
test("getBudgetEnforcementAction maps the configured ceiling behavior", () => {
|
||||
assert.equal(getBudgetEnforcementAction("warn", 0.80), "none");
|
||||
assert.equal(getBudgetEnforcementAction("warn", 0.99), "none");
|
||||
assert.equal(getBudgetEnforcementAction("warn", 1.0), "warn");
|
||||
assert.equal(getBudgetEnforcementAction("pause", 1.0), "pause");
|
||||
|
|
|
|||
|
|
@ -0,0 +1,354 @@
|
|||
/**
|
||||
* E2E test: Parallel workers across multiple milestones.
|
||||
*
|
||||
* Validates the full lifecycle of the worker registry + metrics + budget
|
||||
* alerting across multiple milestone contexts. Uses real filesystem fixtures
|
||||
* and the actual metrics/worker-registry modules (no mocking).
|
||||
*
|
||||
* Covers:
|
||||
* - Worker registry tracking across parallel batches
|
||||
* - Metrics ledger accumulation across milestones
|
||||
* - Budget alert level transitions including the 80% threshold
|
||||
* - Dashboard data aggregation with parallel worker context
|
||||
* - Cost projection with budget ceiling awareness
|
||||
*/
|
||||
|
||||
import { mkdtempSync, mkdirSync, rmSync, writeFileSync, readFileSync } from 'node:fs';
|
||||
import { join } from 'node:path';
|
||||
import { tmpdir } from 'node:os';
|
||||
|
||||
import { createTestContext } from './test-helpers.ts';
|
||||
import {
|
||||
registerWorker,
|
||||
updateWorker,
|
||||
getActiveWorkers,
|
||||
getWorkerBatches,
|
||||
hasActiveWorkers,
|
||||
resetWorkerRegistry,
|
||||
} from '../../subagent/worker-registry.ts';
|
||||
import {
|
||||
getBudgetAlertLevel,
|
||||
getNewBudgetAlertLevel,
|
||||
getBudgetEnforcementAction,
|
||||
} from '../auto.ts';
|
||||
import {
|
||||
type UnitMetrics,
|
||||
type MetricsLedger,
|
||||
getProjectTotals,
|
||||
aggregateByPhase,
|
||||
aggregateBySlice,
|
||||
formatCost,
|
||||
formatCostProjection,
|
||||
getAverageCostPerUnitType,
|
||||
predictRemainingCost,
|
||||
} from '../metrics.ts';
|
||||
|
||||
const { assertEq, assertTrue, assertMatch, report } = createTestContext();
|
||||
|
||||
// ─── Fixture helpers ──────────────────────────────────────────────────────────
|
||||
|
||||
function createFixtureBase(): string {
|
||||
const base = mkdtempSync(join(tmpdir(), 'gsd-e2e-parallel-'));
|
||||
mkdirSync(join(base, '.gsd', 'milestones'), { recursive: true });
|
||||
return base;
|
||||
}
|
||||
|
||||
function writeMetricsLedger(base: string, ledger: MetricsLedger): void {
|
||||
writeFileSync(join(base, '.gsd', 'metrics.json'), JSON.stringify(ledger, null, 2));
|
||||
}
|
||||
|
||||
function readMetricsLedger(base: string): MetricsLedger {
|
||||
return JSON.parse(readFileSync(join(base, '.gsd', 'metrics.json'), 'utf-8'));
|
||||
}
|
||||
|
||||
function makeUnit(overrides: Partial<UnitMetrics> = {}): UnitMetrics {
|
||||
return {
|
||||
type: "execute-task",
|
||||
id: "M001/S01/T01",
|
||||
model: "claude-sonnet-4-20250514",
|
||||
startedAt: Date.now() - 5000,
|
||||
finishedAt: Date.now(),
|
||||
tokens: { input: 1000, output: 500, cacheRead: 200, cacheWrite: 100, total: 1800 },
|
||||
cost: 0.05,
|
||||
toolCalls: 3,
|
||||
assistantMessages: 2,
|
||||
userMessages: 1,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
function cleanup(base: string): void {
|
||||
rmSync(base, { recursive: true, force: true });
|
||||
}
|
||||
|
||||
// ─── E2E: Parallel workers across M001 and M002 ──────────────────────────────
|
||||
|
||||
console.log("\n=== E2E: Parallel workers across milestones ===");
|
||||
|
||||
{
|
||||
resetWorkerRegistry();
|
||||
const base = createFixtureBase();
|
||||
|
||||
// Create milestone directories
|
||||
mkdirSync(join(base, '.gsd', 'milestones', 'M001'), { recursive: true });
|
||||
mkdirSync(join(base, '.gsd', 'milestones', 'M002'), { recursive: true });
|
||||
|
||||
// Simulate M001 parallel workers (batch 1)
|
||||
const batch1Id = "batch-m001";
|
||||
const w1 = registerWorker("scout", "Explore M001 codebase", 0, 3, batch1Id);
|
||||
const w2 = registerWorker("researcher", "Research M001 APIs", 1, 3, batch1Id);
|
||||
const w3 = registerWorker("worker", "Implement M001 feature", 2, 3, batch1Id);
|
||||
|
||||
assertEq(getActiveWorkers().length, 3, "M001: 3 parallel workers registered");
|
||||
assertTrue(hasActiveWorkers(), "M001: has active workers");
|
||||
|
||||
const batches1 = getWorkerBatches();
|
||||
assertEq(batches1.size, 1, "M001: single batch");
|
||||
assertEq(batches1.get(batch1Id)!.length, 3, "M001: batch has 3 workers");
|
||||
|
||||
// Complete M001 workers
|
||||
updateWorker(w1, "completed");
|
||||
updateWorker(w2, "completed");
|
||||
updateWorker(w3, "completed");
|
||||
assertTrue(!hasActiveWorkers(), "M001: no active workers after completion");
|
||||
|
||||
// Simulate M002 parallel workers (batch 2) — overlapping with M001 cleanup
|
||||
const batch2Id = "batch-m002";
|
||||
const w4 = registerWorker("scout", "Explore M002 codebase", 0, 2, batch2Id);
|
||||
const w5 = registerWorker("worker", "Implement M002 feature", 1, 2, batch2Id);
|
||||
|
||||
assertTrue(hasActiveWorkers(), "M002: has active workers");
|
||||
const batches2 = getWorkerBatches();
|
||||
// M001 workers may still be in cleanup window (5s timeout), M002 workers are active
|
||||
assertTrue(batches2.has(batch2Id), "M002: batch exists");
|
||||
assertEq(batches2.get(batch2Id)!.length, 2, "M002: batch has 2 workers");
|
||||
|
||||
// One worker fails in M002
|
||||
updateWorker(w4, "completed");
|
||||
updateWorker(w5, "failed");
|
||||
assertTrue(!hasActiveWorkers(), "M002: no active workers after all finish");
|
||||
|
||||
// Verify worker statuses reflect correctly
|
||||
const allWorkers = getActiveWorkers();
|
||||
const m002Workers = allWorkers.filter(w => w.batchId === batch2Id);
|
||||
if (m002Workers.length > 0) {
|
||||
const failedWorker = m002Workers.find(w => w.status === "failed");
|
||||
assertTrue(failedWorker !== undefined, "M002: failed worker tracked");
|
||||
assertEq(failedWorker?.agent, "worker", "M002: failed worker is 'worker'");
|
||||
}
|
||||
|
||||
cleanup(base);
|
||||
}
|
||||
|
||||
// ─── E2E: Metrics accumulation across milestones ──────────────────────────────
|
||||
|
||||
console.log("\n=== E2E: Metrics across milestones ===");
|
||||
|
||||
{
|
||||
const base = createFixtureBase();
|
||||
|
||||
// Build a ledger spanning two milestones
|
||||
const ledger: MetricsLedger = {
|
||||
version: 1,
|
||||
projectStartedAt: Date.now() - 60000,
|
||||
units: [
|
||||
// M001 units
|
||||
makeUnit({ type: "research-milestone", id: "M001", cost: 0.10 }),
|
||||
makeUnit({ type: "plan-milestone", id: "M001", cost: 0.08 }),
|
||||
makeUnit({ type: "plan-slice", id: "M001/S01", cost: 0.05 }),
|
||||
makeUnit({ type: "execute-task", id: "M001/S01/T01", cost: 0.12 }),
|
||||
makeUnit({ type: "execute-task", id: "M001/S01/T02", cost: 0.15 }),
|
||||
makeUnit({ type: "complete-slice", id: "M001/S01", cost: 0.03 }),
|
||||
makeUnit({ type: "plan-slice", id: "M001/S02", cost: 0.06 }),
|
||||
makeUnit({ type: "execute-task", id: "M001/S02/T01", cost: 0.20 }),
|
||||
makeUnit({ type: "complete-slice", id: "M001/S02", cost: 0.04 }),
|
||||
// M002 units
|
||||
makeUnit({ type: "research-milestone", id: "M002", cost: 0.12 }),
|
||||
makeUnit({ type: "plan-milestone", id: "M002", cost: 0.09 }),
|
||||
makeUnit({ type: "plan-slice", id: "M002/S01", cost: 0.07 }),
|
||||
makeUnit({ type: "execute-task", id: "M002/S01/T01", cost: 0.18 }),
|
||||
],
|
||||
};
|
||||
|
||||
writeMetricsLedger(base, ledger);
|
||||
const loaded = readMetricsLedger(base);
|
||||
|
||||
// Verify totals
|
||||
const totals = getProjectTotals(loaded.units);
|
||||
assertEq(totals.units, 13, "metrics: 13 total units across M001+M002");
|
||||
const totalCost = loaded.units.reduce((sum, u) => sum + u.cost, 0);
|
||||
assertTrue(Math.abs(totals.cost - totalCost) < 0.001, "metrics: total cost matches sum");
|
||||
|
||||
// Verify phase aggregation
|
||||
const phases = aggregateByPhase(loaded.units);
|
||||
const research = phases.find(p => p.phase === "research");
|
||||
assertTrue(research !== undefined, "metrics: research phase exists");
|
||||
assertEq(research!.units, 2, "metrics: 2 research units (M001 + M002)");
|
||||
|
||||
const execution = phases.find(p => p.phase === "execution");
|
||||
assertTrue(execution !== undefined, "metrics: execution phase exists");
|
||||
assertEq(execution!.units, 4, "metrics: 4 execution units across both milestones");
|
||||
|
||||
// Verify slice aggregation
|
||||
const slices = aggregateBySlice(loaded.units);
|
||||
assertTrue(slices.length >= 4, "metrics: at least 4 slice aggregates (M001/S01, M001/S02, M002/S01, milestone-level)");
|
||||
|
||||
const m001s01 = slices.find(s => s.sliceId === "M001/S01");
|
||||
assertTrue(m001s01 !== undefined, "metrics: M001/S01 slice aggregate exists");
|
||||
// M001/S01 has: plan-slice + T01 + T02 + complete-slice = 4 units
|
||||
assertEq(m001s01!.units, 4, "metrics: M001/S01 has 4 units");
|
||||
|
||||
// Cost projection
|
||||
const projLines = formatCostProjection(slices, 3, 2.0);
|
||||
assertTrue(projLines.length >= 1, "metrics: cost projection generated");
|
||||
assertMatch(projLines[0], /Projected remaining/, "metrics: projection line text");
|
||||
|
||||
cleanup(base);
|
||||
}
|
||||
|
||||
// ─── E2E: Budget alert progression through all thresholds ─────────────────────
|
||||
|
||||
console.log("\n=== E2E: Budget alert progression 0→75→80→90→100 ===");
|
||||
|
||||
{
|
||||
// Simulate spending progression against a $10 budget ceiling
|
||||
const ceiling = 10.0;
|
||||
|
||||
// Start: 50% spent
|
||||
let lastLevel = getBudgetAlertLevel(5.0 / ceiling);
|
||||
assertEq(lastLevel, 0, "budget: 50% → level 0");
|
||||
assertEq(getNewBudgetAlertLevel(0, 5.0 / ceiling), null, "budget: no alert at 50%");
|
||||
|
||||
// Spend to 75%
|
||||
let newLevel = getNewBudgetAlertLevel(lastLevel, 7.5 / ceiling);
|
||||
assertEq(newLevel, 75, "budget: alert fires at 75%");
|
||||
lastLevel = newLevel!;
|
||||
|
||||
// Spend to 78% — no alert (between 75 and 80)
|
||||
assertEq(getNewBudgetAlertLevel(lastLevel, 7.8 / ceiling), null, "budget: no alert at 78%");
|
||||
|
||||
// Spend to 80% — 80% approach alert
|
||||
newLevel = getNewBudgetAlertLevel(lastLevel, 8.0 / ceiling);
|
||||
assertEq(newLevel, 80, "budget: approach alert fires at 80%");
|
||||
lastLevel = newLevel!;
|
||||
|
||||
// Spend to 85% — no alert (still at 80 level)
|
||||
assertEq(getNewBudgetAlertLevel(lastLevel, 8.5 / ceiling), null, "budget: no alert at 85%");
|
||||
|
||||
// Spend to 90%
|
||||
newLevel = getNewBudgetAlertLevel(lastLevel, 9.0 / ceiling);
|
||||
assertEq(newLevel, 90, "budget: alert fires at 90%");
|
||||
lastLevel = newLevel!;
|
||||
|
||||
// Spend to 100%
|
||||
newLevel = getNewBudgetAlertLevel(lastLevel, 10.0 / ceiling);
|
||||
assertEq(newLevel, 100, "budget: alert fires at 100%");
|
||||
lastLevel = newLevel!;
|
||||
|
||||
// Over budget — no re-emission
|
||||
assertEq(getNewBudgetAlertLevel(lastLevel, 12.0 / ceiling), null, "budget: no re-alert over 100%");
|
||||
|
||||
// Enforcement at 80% — still "none" (enforcement only at 100%)
|
||||
assertEq(getBudgetEnforcementAction("pause", 0.80), "none", "budget: no enforcement at 80%");
|
||||
assertEq(getBudgetEnforcementAction("halt", 0.80), "none", "budget: no enforcement at 80%");
|
||||
assertEq(getBudgetEnforcementAction("warn", 0.80), "none", "budget: no enforcement at 80%");
|
||||
}
|
||||
|
||||
// ─── E2E: Budget prediction with multi-milestone cost data ────────────────────
|
||||
|
||||
console.log("\n=== E2E: Budget prediction across milestones ===");
|
||||
|
||||
{
|
||||
const units: UnitMetrics[] = [
|
||||
makeUnit({ type: "execute-task", id: "M001/S01/T01", cost: 0.10 }),
|
||||
makeUnit({ type: "execute-task", id: "M001/S01/T02", cost: 0.15 }),
|
||||
makeUnit({ type: "plan-slice", id: "M001/S01", cost: 0.05 }),
|
||||
makeUnit({ type: "execute-task", id: "M002/S01/T01", cost: 0.20 }),
|
||||
makeUnit({ type: "plan-slice", id: "M002/S01", cost: 0.08 }),
|
||||
];
|
||||
|
||||
const avgCosts = getAverageCostPerUnitType(units);
|
||||
assertTrue(avgCosts.has("execute-task"), "prediction: has execute-task average");
|
||||
assertTrue(avgCosts.has("plan-slice"), "prediction: has plan-slice average");
|
||||
|
||||
// Average execute-task cost: (0.10 + 0.15 + 0.20) / 3 = 0.15
|
||||
const execAvg = avgCosts.get("execute-task")!;
|
||||
assertTrue(Math.abs(execAvg - 0.15) < 0.001, `prediction: execute-task avg is $0.15 (got ${execAvg})`);
|
||||
|
||||
// Average plan-slice cost: (0.05 + 0.08) / 2 = 0.065
|
||||
const planAvg = avgCosts.get("plan-slice")!;
|
||||
assertTrue(Math.abs(planAvg - 0.065) < 0.001, `prediction: plan-slice avg is $0.065 (got ${planAvg})`);
|
||||
|
||||
// Predict remaining cost for 3 more execute-tasks and 1 plan-slice
|
||||
const remaining = predictRemainingCost(avgCosts, [
|
||||
"execute-task", "execute-task", "execute-task", "plan-slice",
|
||||
]);
|
||||
// Expected: 3 * 0.15 + 1 * 0.065 = 0.515
|
||||
assertTrue(Math.abs(remaining - 0.515) < 0.001, `prediction: remaining cost ~$0.515 (got ${remaining})`);
|
||||
}
|
||||
|
||||
// ─── E2E: Parallel workers + budget alerts combined scenario ──────────────────
|
||||
|
||||
console.log("\n=== E2E: Combined parallel workers + budget monitoring ===");
|
||||
|
||||
{
|
||||
resetWorkerRegistry();
|
||||
|
||||
// Simulate a scenario: 3 parallel workers running while budget is at 78%
|
||||
const batchId = "batch-combined";
|
||||
const w1 = registerWorker("scout", "Research APIs", 0, 3, batchId);
|
||||
const w2 = registerWorker("worker", "Implement feature", 1, 3, batchId);
|
||||
const w3 = registerWorker("worker", "Write tests", 2, 3, batchId);
|
||||
|
||||
// Budget is at 78% — no alert yet (between 75 and 80)
|
||||
const ceiling = 10.0;
|
||||
let lastLevel: ReturnType<typeof getBudgetAlertLevel> = 75; // already got 75% alert
|
||||
assertEq(getNewBudgetAlertLevel(lastLevel, 7.8 / ceiling), null, "combined: no alert at 78% with workers running");
|
||||
assertTrue(hasActiveWorkers(), "combined: workers running during budget check");
|
||||
|
||||
// First worker completes, cost rises to 80%
|
||||
updateWorker(w1, "completed");
|
||||
const level80 = getNewBudgetAlertLevel(lastLevel, 8.0 / ceiling);
|
||||
assertEq(level80, 80, "combined: 80% approach alert fires after worker completes");
|
||||
lastLevel = level80!;
|
||||
|
||||
// Second worker completes, cost rises to 88%
|
||||
updateWorker(w2, "completed");
|
||||
assertEq(getNewBudgetAlertLevel(lastLevel, 8.8 / ceiling), null, "combined: no alert at 88%");
|
||||
|
||||
// Third worker completes, cost reaches 90%
|
||||
updateWorker(w3, "completed");
|
||||
const level90 = getNewBudgetAlertLevel(lastLevel, 9.0 / ceiling);
|
||||
assertEq(level90, 90, "combined: 90% alert fires after all workers complete");
|
||||
|
||||
assertTrue(!hasActiveWorkers(), "combined: no active workers at end");
|
||||
|
||||
resetWorkerRegistry();
|
||||
}
|
||||
|
||||
// ─── E2E: formatCostProjection with budget ceiling warnings ───────────────────
|
||||
|
||||
console.log("\n=== E2E: Cost projection ceiling warnings ===");
|
||||
|
||||
{
|
||||
const slices = [
|
||||
{ sliceId: "M001/S01", units: 4, tokens: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, cost: 3.0, duration: 10000 },
|
||||
{ sliceId: "M001/S02", units: 3, tokens: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, cost: 4.0, duration: 8000 },
|
||||
{ sliceId: "M002/S01", units: 3, tokens: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, cost: 5.0, duration: 12000 },
|
||||
];
|
||||
|
||||
// With ceiling NOT yet reached
|
||||
const proj1 = formatCostProjection(slices, 2, 20.0);
|
||||
assertTrue(proj1.length >= 1, "projection: has projection line");
|
||||
assertMatch(proj1[0], /Projected remaining/, "projection: shows projection");
|
||||
assertTrue(proj1.length === 1, "projection: no ceiling warning when under budget");
|
||||
|
||||
// With ceiling reached (spent 12.0 >= ceiling 10.0)
|
||||
const proj2 = formatCostProjection(slices, 2, 10.0);
|
||||
assertTrue(proj2.length >= 2, "projection: has ceiling warning when over budget");
|
||||
assertMatch(proj2[1], /ceiling/, "projection: ceiling warning text");
|
||||
}
|
||||
|
||||
// ─── Summary ──────────────────────────────────────────────────────────────────
|
||||
|
||||
report();
|
||||
148
src/resources/extensions/gsd/tests/worker-registry.test.ts
Normal file
148
src/resources/extensions/gsd/tests/worker-registry.test.ts
Normal file
|
|
@ -0,0 +1,148 @@
|
|||
/**
|
||||
* Tests for the parallel worker registry used by the dashboard overlay.
|
||||
*
|
||||
* Verifies worker lifecycle (register → update → cleanup), batch grouping,
|
||||
* and the hasActiveWorkers() status check.
|
||||
*/
|
||||
|
||||
import { createTestContext } from './test-helpers.ts';
|
||||
import {
|
||||
registerWorker,
|
||||
updateWorker,
|
||||
getActiveWorkers,
|
||||
getWorkerBatches,
|
||||
hasActiveWorkers,
|
||||
resetWorkerRegistry,
|
||||
} from '../../subagent/worker-registry.ts';
|
||||
|
||||
const { assertEq, assertTrue, report } = createTestContext();
|
||||
|
||||
// ─── Setup ────────────────────────────────────────────────────────────────────
|
||||
|
||||
resetWorkerRegistry();
|
||||
|
||||
// ─── Registration ─────────────────────────────────────────────────────────────
|
||||
|
||||
console.log("\n=== Worker Registration ===");
|
||||
|
||||
{
|
||||
resetWorkerRegistry();
|
||||
const id = registerWorker("scout", "Explore codebase", 0, 3, "batch-1");
|
||||
assertTrue(id.startsWith("worker-"), "worker ID has correct prefix");
|
||||
const workers = getActiveWorkers();
|
||||
assertEq(workers.length, 1, "one worker registered");
|
||||
assertEq(workers[0].agent, "scout", "worker agent name correct");
|
||||
assertEq(workers[0].task, "Explore codebase", "worker task correct");
|
||||
assertEq(workers[0].status, "running", "worker starts as running");
|
||||
assertEq(workers[0].index, 0, "worker index correct");
|
||||
assertEq(workers[0].batchSize, 3, "worker batch size correct");
|
||||
assertEq(workers[0].batchId, "batch-1", "worker batch ID correct");
|
||||
}
|
||||
|
||||
// ─── Multiple workers in a batch ──────────────────────────────────────────────
|
||||
|
||||
console.log("\n=== Multiple Workers in a Batch ===");
|
||||
|
||||
{
|
||||
resetWorkerRegistry();
|
||||
const id1 = registerWorker("scout", "Task A", 0, 3, "batch-2");
|
||||
const id2 = registerWorker("researcher", "Task B", 1, 3, "batch-2");
|
||||
const id3 = registerWorker("worker", "Task C", 2, 3, "batch-2");
|
||||
|
||||
const workers = getActiveWorkers();
|
||||
assertEq(workers.length, 3, "three workers registered");
|
||||
assertTrue(hasActiveWorkers(), "has active workers");
|
||||
|
||||
const batches = getWorkerBatches();
|
||||
assertEq(batches.size, 1, "one batch");
|
||||
const batch = batches.get("batch-2");
|
||||
assertTrue(batch !== undefined, "batch-2 exists");
|
||||
assertEq(batch!.length, 3, "batch has 3 workers");
|
||||
}
|
||||
|
||||
// ─── Worker status updates ────────────────────────────────────────────────────
|
||||
|
||||
console.log("\n=== Worker Status Updates ===");
|
||||
|
||||
{
|
||||
resetWorkerRegistry();
|
||||
const id1 = registerWorker("scout", "Task A", 0, 2, "batch-3");
|
||||
const id2 = registerWorker("worker", "Task B", 1, 2, "batch-3");
|
||||
|
||||
updateWorker(id1, "completed");
|
||||
const workers = getActiveWorkers();
|
||||
const w1 = workers.find(w => w.id === id1);
|
||||
assertEq(w1?.status, "completed", "worker 1 marked completed");
|
||||
|
||||
const w2 = workers.find(w => w.id === id2);
|
||||
assertEq(w2?.status, "running", "worker 2 still running");
|
||||
assertTrue(hasActiveWorkers(), "still has active workers (worker 2 running)");
|
||||
}
|
||||
|
||||
// ─── Failed worker ────────────────────────────────────────────────────────────
|
||||
|
||||
console.log("\n=== Failed Worker ===");
|
||||
|
||||
{
|
||||
resetWorkerRegistry();
|
||||
const id = registerWorker("scout", "Task A", 0, 1, "batch-4");
|
||||
updateWorker(id, "failed");
|
||||
const workers = getActiveWorkers();
|
||||
assertEq(workers[0].status, "failed", "worker marked failed");
|
||||
}
|
||||
|
||||
// ─── Multiple batches ─────────────────────────────────────────────────────────
|
||||
|
||||
console.log("\n=== Multiple Batches ===");
|
||||
|
||||
{
|
||||
resetWorkerRegistry();
|
||||
registerWorker("scout", "Task A", 0, 2, "batch-5");
|
||||
registerWorker("worker", "Task B", 1, 2, "batch-5");
|
||||
registerWorker("researcher", "Task C", 0, 1, "batch-6");
|
||||
|
||||
const batches = getWorkerBatches();
|
||||
assertEq(batches.size, 2, "two batches");
|
||||
assertEq(batches.get("batch-5")!.length, 2, "batch-5 has 2 workers");
|
||||
assertEq(batches.get("batch-6")!.length, 1, "batch-6 has 1 worker");
|
||||
}
|
||||
|
||||
// ─── hasActiveWorkers with all completed ──────────────────────────────────────
|
||||
|
||||
console.log("\n=== hasActiveWorkers — all completed ===");
|
||||
|
||||
{
|
||||
resetWorkerRegistry();
|
||||
const id1 = registerWorker("scout", "Task A", 0, 2, "batch-7");
|
||||
const id2 = registerWorker("worker", "Task B", 1, 2, "batch-7");
|
||||
updateWorker(id1, "completed");
|
||||
updateWorker(id2, "completed");
|
||||
assertTrue(!hasActiveWorkers(), "no active workers when all completed");
|
||||
}
|
||||
|
||||
// ─── Reset clears everything ─────────────────────────────────────────────────
|
||||
|
||||
console.log("\n=== Reset ===");
|
||||
|
||||
{
|
||||
registerWorker("scout", "Task", 0, 1, "batch-8");
|
||||
assertTrue(getActiveWorkers().length > 0, "workers exist before reset");
|
||||
resetWorkerRegistry();
|
||||
assertEq(getActiveWorkers().length, 0, "no workers after reset");
|
||||
assertTrue(!hasActiveWorkers(), "hasActiveWorkers false after reset");
|
||||
}
|
||||
|
||||
// ─── Update non-existent worker is no-op ──────────────────────────────────────
|
||||
|
||||
console.log("\n=== Update non-existent worker ===");
|
||||
|
||||
{
|
||||
resetWorkerRegistry();
|
||||
// Should not throw
|
||||
updateWorker("nonexistent-id", "completed");
|
||||
assertEq(getActiveWorkers().length, 0, "no workers created by updating nonexistent");
|
||||
}
|
||||
|
||||
// ─── Summary ──────────────────────────────────────────────────────────────────
|
||||
|
||||
report();
|
||||
|
|
@ -33,6 +33,7 @@ import {
|
|||
mergeDeltaPatches,
|
||||
readIsolationMode,
|
||||
} from "./isolation.js";
|
||||
import { registerWorker, updateWorker } from "./worker-registry.js";
|
||||
|
||||
const MAX_PARALLEL_TASKS = 8;
|
||||
const MAX_CONCURRENCY = 4;
|
||||
|
|
@ -626,7 +627,10 @@ export default function (pi: ExtensionAPI) {
|
|||
};
|
||||
|
||||
const MAX_RETRIES = 1; // Retry failed tasks once
|
||||
const batchId = crypto.randomUUID();
|
||||
const batchSize = params.tasks.length;
|
||||
const results = await mapWithConcurrencyLimit(params.tasks, MAX_CONCURRENCY, async (t, index) => {
|
||||
const workerId = registerWorker(t.agent, t.task, index, batchSize, batchId);
|
||||
let result = await runSingleAgent(
|
||||
ctx.cwd,
|
||||
agents,
|
||||
|
|
@ -666,6 +670,7 @@ export default function (pi: ExtensionAPI) {
|
|||
);
|
||||
}
|
||||
|
||||
updateWorker(workerId, result.exitCode === 0 ? "completed" : "failed");
|
||||
allResults[index] = result;
|
||||
emitParallelUpdate();
|
||||
return result;
|
||||
|
|
|
|||
99
src/resources/extensions/subagent/worker-registry.ts
Normal file
99
src/resources/extensions/subagent/worker-registry.ts
Normal file
|
|
@ -0,0 +1,99 @@
|
|||
/**
|
||||
* Worker Registry — Tracks active subagent sessions for dashboard visibility.
|
||||
*
|
||||
* Provides a global registry of currently-running parallel workers so the
|
||||
* GSD dashboard overlay can display real-time worker status.
|
||||
*/
|
||||
|
||||
export interface WorkerEntry {
|
||||
id: string;
|
||||
agent: string;
|
||||
task: string;
|
||||
startedAt: number;
|
||||
status: "running" | "completed" | "failed";
|
||||
/** Index within a parallel batch (0-based) */
|
||||
index: number;
|
||||
/** Total workers in the parallel batch */
|
||||
batchSize: number;
|
||||
/** Unique batch identifier for grouping parallel runs */
|
||||
batchId: string;
|
||||
}
|
||||
|
||||
const activeWorkers = new Map<string, WorkerEntry>();
|
||||
let workerIdCounter = 0;
|
||||
|
||||
/**
|
||||
* Register a new worker. Returns the worker ID for later updates.
|
||||
*/
|
||||
export function registerWorker(
|
||||
agent: string,
|
||||
task: string,
|
||||
index: number,
|
||||
batchSize: number,
|
||||
batchId: string,
|
||||
): string {
|
||||
const id = `worker-${++workerIdCounter}`;
|
||||
activeWorkers.set(id, {
|
||||
id,
|
||||
agent,
|
||||
task,
|
||||
startedAt: Date.now(),
|
||||
status: "running",
|
||||
index,
|
||||
batchSize,
|
||||
batchId,
|
||||
});
|
||||
return id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update worker status when it completes or fails.
|
||||
*/
|
||||
export function updateWorker(id: string, status: "completed" | "failed"): void {
|
||||
const entry = activeWorkers.get(id);
|
||||
if (entry) {
|
||||
entry.status = status;
|
||||
// Remove after a brief display window (5 seconds)
|
||||
setTimeout(() => {
|
||||
activeWorkers.delete(id);
|
||||
}, 5000);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all currently-tracked workers (running + recently completed).
|
||||
*/
|
||||
export function getActiveWorkers(): WorkerEntry[] {
|
||||
return Array.from(activeWorkers.values());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get workers grouped by batch.
|
||||
*/
|
||||
export function getWorkerBatches(): Map<string, WorkerEntry[]> {
|
||||
const batches = new Map<string, WorkerEntry[]>();
|
||||
for (const worker of activeWorkers.values()) {
|
||||
const batch = batches.get(worker.batchId) ?? [];
|
||||
batch.push(worker);
|
||||
batches.set(worker.batchId, batch);
|
||||
}
|
||||
return batches;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if any parallel workers are currently running.
|
||||
*/
|
||||
export function hasActiveWorkers(): boolean {
|
||||
for (const worker of activeWorkers.values()) {
|
||||
if (worker.status === "running") return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset registry state. Used for testing.
|
||||
*/
|
||||
export function resetWorkerRegistry(): void {
|
||||
activeWorkers.clear();
|
||||
workerIdCounter = 0;
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue