From a2cc151bc9b60cb9fc80d12071d64cae231e0615 Mon Sep 17 00:00:00 2001 From: Jeremy McSpadden Date: Tue, 14 Apr 2026 20:41:15 -0500 Subject: [PATCH] feat(gsd-uok): unify reactive and parallel scheduling via execution graph --- src/resources/extensions/gsd/auto-dispatch.ts | 22 +++- src/resources/extensions/gsd/auto/loop.ts | 10 ++ src/resources/extensions/gsd/auto/phases.ts | 5 +- .../extensions/gsd/parallel-orchestrator.ts | 41 +++++- .../gsd/slice-parallel-orchestrator.ts | 21 ++- .../gsd/tests/uok-execution-graph.test.ts | 69 ++++++++++ .../extensions/gsd/uok/execution-graph.ts | 120 ++++++++++++++++++ 7 files changed, 279 insertions(+), 9 deletions(-) create mode 100644 src/resources/extensions/gsd/tests/uok-execution-graph.test.ts diff --git a/src/resources/extensions/gsd/auto-dispatch.ts b/src/resources/extensions/gsd/auto-dispatch.ts index 16e437dc2..4ce22e36e 100644 --- a/src/resources/extensions/gsd/auto-dispatch.ts +++ b/src/resources/extensions/gsd/auto-dispatch.ts @@ -53,6 +53,8 @@ import { checkNeedsRunUat, } from "./auto-prompts.js"; import { resolveModelWithFallbacksForUnit } from "./preferences-models.js"; +import { resolveUokFlags } from "./uok/flags.js"; +import { selectReactiveDispatchBatch } from "./uok/execution-graph.js"; // ─── Types ──────────────────────────────────────────────────────────────── @@ -584,12 +586,20 @@ export const DISPATCH_RULES: DispatchRule[] = [ // Only activate reactive dispatch when >1 task is ready if (readyIds.length <= 1) return null; - const selected = chooseNonConflictingSubset( - readyIds, - graph, - maxParallel, - new Set(), - ); + const uokFlags = resolveUokFlags(prefs); + const selected = uokFlags.executionGraph + ? selectReactiveDispatchBatch({ + graph, + readyIds, + maxParallel, + inFlightOutputs: new Set(), + }).selected + : chooseNonConflictingSubset( + readyIds, + graph, + maxParallel, + new Set(), + ); if (selected.length <= 1) return null; // Log graph metrics for observability diff --git a/src/resources/extensions/gsd/auto/loop.ts b/src/resources/extensions/gsd/auto/loop.ts index be8128bb1..b2c273254 100644 --- a/src/resources/extensions/gsd/auto/loop.ts +++ b/src/resources/extensions/gsd/auto/loop.ts @@ -31,6 +31,8 @@ import { isInfrastructureError, isTransientCooldownError, getCooldownRetryAfterM import { resolveEngine } from "../engine-resolver.js"; import { logWarning } from "../workflow-logger.js"; import { gsdRoot } from "../paths.js"; +import { resolveUokFlags } from "../uok/flags.js"; +import { scheduleSidecarQueue } from "../uok/execution-graph.js"; import { readFileSync, writeFileSync, mkdirSync } from "node:fs"; import { join } from "node:path"; @@ -205,10 +207,18 @@ export async function autoLoop( try { // ── Blanket try/catch: one bad iteration must not kill the session const prefs = deps.loadEffectiveGSDPreferences()?.preferences; + const uokFlags = resolveUokFlags(prefs); // ── Check sidecar queue before deriveState ── let sidecarItem: SidecarItem | undefined; if (s.sidecarQueue.length > 0) { + if (uokFlags.executionGraph && s.sidecarQueue.length > 1) { + try { + s.sidecarQueue = await scheduleSidecarQueue(s.sidecarQueue); + } catch (err) { + logWarning("dispatch", `sidecar queue scheduling failed: ${err instanceof Error ? err.message : String(err)}`); + } + } sidecarItem = s.sidecarQueue.shift()!; debugLog("autoLoop", { phase: "sidecar-dequeue", diff --git a/src/resources/extensions/gsd/auto/phases.ts b/src/resources/extensions/gsd/auto/phases.ts index 97c485c44..647cafa90 100644 --- a/src/resources/extensions/gsd/auto/phases.ts +++ b/src/resources/extensions/gsd/auto/phases.ts @@ -396,7 +396,10 @@ export async function runPreDispatch( s.basePath, mid, eligible, - { maxWorkers: prefs.slice_parallel.max_workers ?? 2 }, + { + maxWorkers: prefs.slice_parallel.max_workers ?? 2, + useExecutionGraph: uokFlags.executionGraph, + }, ); if (result.started.length > 0) { ctx.ui.notify( diff --git a/src/resources/extensions/gsd/parallel-orchestrator.ts b/src/resources/extensions/gsd/parallel-orchestrator.ts index 689de6ce2..f6e68a020 100644 --- a/src/resources/extensions/gsd/parallel-orchestrator.ts +++ b/src/resources/extensions/gsd/parallel-orchestrator.ts @@ -42,6 +42,8 @@ import { } from "./parallel-eligibility.js"; import { getErrorMessage } from "./error-utils.js"; import { logWarning } from "./workflow-logger.js"; +import { resolveUokFlags } from "./uok/flags.js"; +import { selectConflictFreeBatch } from "./uok/execution-graph.js"; // ─── Types ───────────────────────────────────────────────────────────────── @@ -69,6 +71,10 @@ export interface OrchestratorState { let state: OrchestratorState | null = null; +function overlapKey(a: string, b: string): string { + return a < b ? `${a}::${b}` : `${b}::${a}`; +} + // ─── Persistence ────────────────────────────────────────────────────────── const ORCHESTRATOR_STATE_FILE = "orchestrator.json"; @@ -365,6 +371,7 @@ export async function startParallel( } const config = resolveParallelConfig(prefs); + const uokFlags = resolveUokFlags(prefs); // Release any leftover state from a previous session before reassigning if (state) { @@ -418,8 +425,40 @@ export async function startParallel( const started: string[] = []; const errors: Array<{ mid: string; error: string }> = []; + let filteredMilestoneIds = milestoneIds; + if (uokFlags.executionGraph && milestoneIds.length > 1) { + try { + const requestedIds = new Set(milestoneIds); + const candidates = await analyzeParallelEligibility(basePath); + const overlapPairs = new Set(); + for (const overlap of candidates.fileOverlaps) { + if (!requestedIds.has(overlap.mid1) || !requestedIds.has(overlap.mid2)) continue; + overlapPairs.add(overlapKey(overlap.mid1, overlap.mid2)); + } + filteredMilestoneIds = selectConflictFreeBatch({ + orderedIds: milestoneIds, + maxParallel: milestoneIds.length, + hasConflict: (candidate, existing) => + overlapPairs.has(overlapKey(candidate, existing)), + }); + if (filteredMilestoneIds.length < milestoneIds.length) { + const skipped = milestoneIds.filter((mid) => !filteredMilestoneIds.includes(mid)); + logWarning( + "parallel", + `uok execution graph filtered ${skipped.length} conflicting milestone(s): ${skipped.join(", ")}`, + ); + } + } catch (e) { + logWarning( + "parallel", + `uok execution graph overlap analysis failed; using legacy milestone selection: ${(e as Error).message}`, + ); + filteredMilestoneIds = milestoneIds; + } + } + // Cap to max_workers - const toStart = milestoneIds.slice(0, config.max_workers); + const toStart = filteredMilestoneIds.slice(0, config.max_workers); for (const mid of toStart) { // Check budget ceiling before each spawn diff --git a/src/resources/extensions/gsd/slice-parallel-orchestrator.ts b/src/resources/extensions/gsd/slice-parallel-orchestrator.ts index 346237651..974107ac9 100644 --- a/src/resources/extensions/gsd/slice-parallel-orchestrator.ts +++ b/src/resources/extensions/gsd/slice-parallel-orchestrator.ts @@ -32,6 +32,7 @@ import { } from "./session-status-io.js"; import { hasFileConflict } from "./slice-parallel-conflict.js"; import { getErrorMessage } from "./error-utils.js"; +import { selectConflictFreeBatch } from "./uok/execution-graph.js"; // ─── Types ───────────────────────────────────────────────────────────────── @@ -61,6 +62,7 @@ export interface SliceOrchestratorState { export interface StartSliceParallelOpts { maxWorkers?: number; budgetCeiling?: number; + useExecutionGraph?: boolean; } // ─── Module State ────────────────────────────────────────────────────────── @@ -118,7 +120,12 @@ export async function startSliceParallel( const errors: Array<{ sid: string; error: string }> = []; // Filter out conflicting slices (conservative: check all pairs) - const safeSlices = filterConflictingSlices(basePath, milestoneId, eligibleSlices); + const safeSlices = filterConflictingSlices( + basePath, + milestoneId, + eligibleSlices, + opts.useExecutionGraph === true, + ); // Limit to maxWorkers const toSpawn = safeSlices.slice(0, maxWorkers); @@ -245,7 +252,19 @@ function filterConflictingSlices( basePath: string, milestoneId: string, slices: Array<{ id: string }>, + useExecutionGraph: boolean, ): Array<{ id: string }> { + if (useExecutionGraph) { + const selectedIds = selectConflictFreeBatch({ + orderedIds: slices.map((slice) => slice.id), + maxParallel: slices.length, + hasConflict: (candidate, existing) => + hasFileConflict(basePath, milestoneId, candidate, existing), + }); + const selected = new Set(selectedIds); + return slices.filter((slice) => selected.has(slice.id)); + } + const safe: Array<{ id: string }> = []; for (const candidate of slices) { diff --git a/src/resources/extensions/gsd/tests/uok-execution-graph.test.ts b/src/resources/extensions/gsd/tests/uok-execution-graph.test.ts new file mode 100644 index 000000000..448a7249a --- /dev/null +++ b/src/resources/extensions/gsd/tests/uok-execution-graph.test.ts @@ -0,0 +1,69 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import type { SidecarItem } from "../auto/session.ts"; +import { + selectConflictFreeBatch, + selectReactiveDispatchBatch, + buildSidecarQueueNodes, + scheduleSidecarQueue, +} from "../uok/execution-graph.ts"; + +test("uok execution graph selects deterministic conflict-free IDs", () => { + const selected = selectConflictFreeBatch({ + orderedIds: ["S01", "S02", "S03", "S04"], + maxParallel: 4, + hasConflict: (candidate, existing) => + (candidate === "S02" && existing === "S01") || + (candidate === "S01" && existing === "S02"), + }); + + assert.deepEqual(selected, ["S01", "S03", "S04"]); +}); + +test("uok execution graph reactive batch honors file conflicts and in-flight writes", () => { + const result = selectReactiveDispatchBatch({ + graph: [ + { id: "T01", dependsOn: [], outputFiles: ["src/a.ts"] }, + { id: "T02", dependsOn: [], outputFiles: ["src/a.ts"] }, + { id: "T03", dependsOn: [], outputFiles: ["src/b.ts"] }, + { id: "T04", dependsOn: ["T03"], outputFiles: ["src/c.ts"] }, + ], + readyIds: ["T01", "T02", "T03", "T04"], + maxParallel: 3, + inFlightOutputs: new Set(["src/c.ts"]), + }); + + assert.deepEqual(result.selected, ["T01", "T03"]); + assert.ok( + result.conflicts.some((c) => c.nodeA === "T01" && c.nodeB === "T02" && c.file === "src/a.ts"), + "conflict list should include overlapping outputs", + ); +}); + +test("uok execution graph sidecar nodes map queue kinds to supported DAG kinds", () => { + const queue: SidecarItem[] = [ + { kind: "hook", unitType: "execute-task", unitId: "M001/S01/T01", prompt: "hook" }, + { kind: "triage", unitType: "triage", unitId: "M001/S01", prompt: "triage" }, + { kind: "quick-task", unitType: "quick-task", unitId: "M001/S01/Q01", prompt: "quick" }, + ]; + + const nodes = buildSidecarQueueNodes(queue); + assert.equal(nodes[0]?.kind, "hook"); + assert.equal(nodes[1]?.kind, "verification"); + assert.equal(nodes[2]?.kind, "team-worker"); + assert.equal(nodes[1]?.dependsOn.length, 1); +}); + +test("uok execution graph sidecar scheduler preserves deterministic queue order", async () => { + const queue: SidecarItem[] = [ + { kind: "quick-task", unitType: "quick-task", unitId: "M001/S01/Q01", prompt: "q1" }, + { kind: "hook", unitType: "hook", unitId: "M001/S01/H01", prompt: "h1" }, + { kind: "triage", unitType: "triage", unitId: "M001/S01/TR1", prompt: "t1" }, + ]; + + const scheduled = await scheduleSidecarQueue(queue); + assert.deepEqual( + scheduled.map((item) => item.unitId), + queue.map((item) => item.unitId), + ); +}); diff --git a/src/resources/extensions/gsd/uok/execution-graph.ts b/src/resources/extensions/gsd/uok/execution-graph.ts index 243ac4093..445b1f145 100644 --- a/src/resources/extensions/gsd/uok/execution-graph.ts +++ b/src/resources/extensions/gsd/uok/execution-graph.ts @@ -1,4 +1,6 @@ import type { UokGraphNode } from "./contracts.js"; +import type { DerivedTaskNode } from "../types.js"; +import type { SidecarItem } from "../auto/session.js"; export interface ExecutionGraphRunOptions { parallel?: boolean; @@ -12,6 +14,123 @@ export interface ExecutionGraphResult { export type ExecutionNodeHandler = (node: UokGraphNode) => Promise; +export interface ConflictFreeBatchInput { + orderedIds: string[]; + maxParallel: number; + hasConflict: (leftId: string, rightId: string) => boolean; +} + +export interface ReactiveDispatchSelectionInput { + graph: Array>; + readyIds: string[]; + maxParallel: number; + inFlightOutputs?: Set; +} + +export interface ReactiveDispatchSelectionResult { + selected: string[]; + conflicts: Array<{ nodeA: string; nodeB: string; file: string }>; +} + +export function selectConflictFreeBatch({ + orderedIds, + maxParallel, + hasConflict, +}: ConflictFreeBatchInput): string[] { + if (maxParallel <= 0 || orderedIds.length === 0) return []; + const selected: string[] = []; + for (const candidate of orderedIds) { + if (selected.length >= maxParallel) break; + const conflictsExisting = selected.some((existing) => hasConflict(candidate, existing)); + if (conflictsExisting) continue; + selected.push(candidate); + } + return selected; +} + +function buildReactiveNodes( + graph: Array>, +): UokGraphNode[] { + return graph.map((node) => ({ + id: node.id, + kind: "unit", + dependsOn: [...node.dependsOn], + writes: [...node.outputFiles], + })); +} + +export function selectReactiveDispatchBatch( + input: ReactiveDispatchSelectionInput, +): ReactiveDispatchSelectionResult { + const nodeMap = new Map(buildReactiveNodes(input.graph).map((n) => [n.id, n])); + const readyNodes = input.readyIds + .map((id) => nodeMap.get(id)) + .filter((node): node is UokGraphNode => !!node); + const conflicts = detectFileConflicts(readyNodes); + if (readyNodes.length === 0 || input.maxParallel <= 0) { + return { selected: [], conflicts }; + } + + const claimed = new Set(input.inFlightOutputs ?? []); + const selected: string[] = []; + const selectedSet = new Set(); + const readySet = new Set(input.readyIds); + + for (const id of input.readyIds) { + if (selected.length >= input.maxParallel) break; + const node = nodeMap.get(id); + if (!node) continue; + + const hasUnmetReadyDependency = node.dependsOn.some( + (dep) => readySet.has(dep) && !selectedSet.has(dep), + ); + if (hasUnmetReadyDependency) continue; + + const writes = node.writes ?? []; + const conflictsWithClaimed = writes.some((file) => claimed.has(file)); + if (conflictsWithClaimed) continue; + + selected.push(node.id); + selectedSet.add(node.id); + for (const file of writes) claimed.add(file); + } + + return { selected, conflicts }; +} + +function sidecarToNodeKind(kind: SidecarItem["kind"]): UokGraphNode["kind"] { + if (kind === "hook") return "hook"; + if (kind === "triage") return "verification"; + return "team-worker"; +} + +export function buildSidecarQueueNodes(queue: SidecarItem[]): UokGraphNode[] { + return queue.map((item, index) => ({ + id: `sidecar-${String(index).padStart(4, "0")}:${item.kind}:${item.unitType}:${item.unitId}`, + kind: sidecarToNodeKind(item.kind), + dependsOn: index > 0 ? [`sidecar-${String(index - 1).padStart(4, "0")}:${queue[index - 1].kind}:${queue[index - 1].unitType}:${queue[index - 1].unitId}`] : [], + metadata: { index }, + })); +} + +export async function scheduleSidecarQueue(queue: SidecarItem[]): Promise { + if (queue.length <= 1) return [...queue]; + const nodes = buildSidecarQueueNodes(queue); + const scheduler = new ExecutionGraphScheduler(); + const orderedIndexes: number[] = []; + const seenKinds = new Set(nodes.map((n) => n.kind)); + + for (const kind of seenKinds) { + scheduler.registerHandler(kind, async (node) => { + const idx = Number(node.metadata?.index); + if (Number.isInteger(idx) && idx >= 0) orderedIndexes.push(idx); + }); + } + + await scheduler.run(nodes, { parallel: false }); + return orderedIndexes.map((idx) => queue[idx]).filter((item): item is SidecarItem => !!item); +} + export class ExecutionGraphScheduler { private readonly handlers = new Map(); @@ -42,6 +161,7 @@ export class ExecutionGraphScheduler { const ready = Array.from(remaining.values()).filter((node) => node.dependsOn.every((dep) => done.has(dep)), ); + ready.sort((a, b) => a.id.localeCompare(b.id)); if (ready.length === 0) { throw new Error("Execution graph deadlock detected: no ready nodes and graph not complete"); }