feat(gsd-uok): unify reactive and parallel scheduling via execution graph

This commit is contained in:
Jeremy McSpadden 2026-04-14 20:41:15 -05:00
parent 414c2ee58c
commit a2cc151bc9
7 changed files with 279 additions and 9 deletions

View file

@ -53,6 +53,8 @@ import {
checkNeedsRunUat,
} from "./auto-prompts.js";
import { resolveModelWithFallbacksForUnit } from "./preferences-models.js";
import { resolveUokFlags } from "./uok/flags.js";
import { selectReactiveDispatchBatch } from "./uok/execution-graph.js";
// ─── Types ────────────────────────────────────────────────────────────────
@ -584,12 +586,20 @@ export const DISPATCH_RULES: DispatchRule[] = [
// Only activate reactive dispatch when >1 task is ready
if (readyIds.length <= 1) return null;
const selected = chooseNonConflictingSubset(
readyIds,
graph,
maxParallel,
new Set(),
);
const uokFlags = resolveUokFlags(prefs);
const selected = uokFlags.executionGraph
? selectReactiveDispatchBatch({
graph,
readyIds,
maxParallel,
inFlightOutputs: new Set(),
}).selected
: chooseNonConflictingSubset(
readyIds,
graph,
maxParallel,
new Set(),
);
if (selected.length <= 1) return null;
// Log graph metrics for observability

View file

@ -31,6 +31,8 @@ import { isInfrastructureError, isTransientCooldownError, getCooldownRetryAfterM
import { resolveEngine } from "../engine-resolver.js";
import { logWarning } from "../workflow-logger.js";
import { gsdRoot } from "../paths.js";
import { resolveUokFlags } from "../uok/flags.js";
import { scheduleSidecarQueue } from "../uok/execution-graph.js";
import { readFileSync, writeFileSync, mkdirSync } from "node:fs";
import { join } from "node:path";
@ -205,10 +207,18 @@ export async function autoLoop(
try {
// ── Blanket try/catch: one bad iteration must not kill the session
const prefs = deps.loadEffectiveGSDPreferences()?.preferences;
const uokFlags = resolveUokFlags(prefs);
// ── Check sidecar queue before deriveState ──
let sidecarItem: SidecarItem | undefined;
if (s.sidecarQueue.length > 0) {
if (uokFlags.executionGraph && s.sidecarQueue.length > 1) {
try {
s.sidecarQueue = await scheduleSidecarQueue(s.sidecarQueue);
} catch (err) {
logWarning("dispatch", `sidecar queue scheduling failed: ${err instanceof Error ? err.message : String(err)}`);
}
}
sidecarItem = s.sidecarQueue.shift()!;
debugLog("autoLoop", {
phase: "sidecar-dequeue",

View file

@ -396,7 +396,10 @@ export async function runPreDispatch(
s.basePath,
mid,
eligible,
{ maxWorkers: prefs.slice_parallel.max_workers ?? 2 },
{
maxWorkers: prefs.slice_parallel.max_workers ?? 2,
useExecutionGraph: uokFlags.executionGraph,
},
);
if (result.started.length > 0) {
ctx.ui.notify(

View file

@ -42,6 +42,8 @@ import {
} from "./parallel-eligibility.js";
import { getErrorMessage } from "./error-utils.js";
import { logWarning } from "./workflow-logger.js";
import { resolveUokFlags } from "./uok/flags.js";
import { selectConflictFreeBatch } from "./uok/execution-graph.js";
// ─── Types ─────────────────────────────────────────────────────────────────
@ -69,6 +71,10 @@ export interface OrchestratorState {
let state: OrchestratorState | null = null;
function overlapKey(a: string, b: string): string {
return a < b ? `${a}::${b}` : `${b}::${a}`;
}
// ─── Persistence ──────────────────────────────────────────────────────────
const ORCHESTRATOR_STATE_FILE = "orchestrator.json";
@ -365,6 +371,7 @@ export async function startParallel(
}
const config = resolveParallelConfig(prefs);
const uokFlags = resolveUokFlags(prefs);
// Release any leftover state from a previous session before reassigning
if (state) {
@ -418,8 +425,40 @@ export async function startParallel(
const started: string[] = [];
const errors: Array<{ mid: string; error: string }> = [];
let filteredMilestoneIds = milestoneIds;
if (uokFlags.executionGraph && milestoneIds.length > 1) {
try {
const requestedIds = new Set(milestoneIds);
const candidates = await analyzeParallelEligibility(basePath);
const overlapPairs = new Set<string>();
for (const overlap of candidates.fileOverlaps) {
if (!requestedIds.has(overlap.mid1) || !requestedIds.has(overlap.mid2)) continue;
overlapPairs.add(overlapKey(overlap.mid1, overlap.mid2));
}
filteredMilestoneIds = selectConflictFreeBatch({
orderedIds: milestoneIds,
maxParallel: milestoneIds.length,
hasConflict: (candidate, existing) =>
overlapPairs.has(overlapKey(candidate, existing)),
});
if (filteredMilestoneIds.length < milestoneIds.length) {
const skipped = milestoneIds.filter((mid) => !filteredMilestoneIds.includes(mid));
logWarning(
"parallel",
`uok execution graph filtered ${skipped.length} conflicting milestone(s): ${skipped.join(", ")}`,
);
}
} catch (e) {
logWarning(
"parallel",
`uok execution graph overlap analysis failed; using legacy milestone selection: ${(e as Error).message}`,
);
filteredMilestoneIds = milestoneIds;
}
}
// Cap to max_workers
const toStart = milestoneIds.slice(0, config.max_workers);
const toStart = filteredMilestoneIds.slice(0, config.max_workers);
for (const mid of toStart) {
// Check budget ceiling before each spawn

View file

@ -32,6 +32,7 @@ import {
} from "./session-status-io.js";
import { hasFileConflict } from "./slice-parallel-conflict.js";
import { getErrorMessage } from "./error-utils.js";
import { selectConflictFreeBatch } from "./uok/execution-graph.js";
// ─── Types ─────────────────────────────────────────────────────────────────
@ -61,6 +62,7 @@ export interface SliceOrchestratorState {
export interface StartSliceParallelOpts {
maxWorkers?: number;
budgetCeiling?: number;
useExecutionGraph?: boolean;
}
// ─── Module State ──────────────────────────────────────────────────────────
@ -118,7 +120,12 @@ export async function startSliceParallel(
const errors: Array<{ sid: string; error: string }> = [];
// Filter out conflicting slices (conservative: check all pairs)
const safeSlices = filterConflictingSlices(basePath, milestoneId, eligibleSlices);
const safeSlices = filterConflictingSlices(
basePath,
milestoneId,
eligibleSlices,
opts.useExecutionGraph === true,
);
// Limit to maxWorkers
const toSpawn = safeSlices.slice(0, maxWorkers);
@ -245,7 +252,19 @@ function filterConflictingSlices(
basePath: string,
milestoneId: string,
slices: Array<{ id: string }>,
useExecutionGraph: boolean,
): Array<{ id: string }> {
if (useExecutionGraph) {
const selectedIds = selectConflictFreeBatch({
orderedIds: slices.map((slice) => slice.id),
maxParallel: slices.length,
hasConflict: (candidate, existing) =>
hasFileConflict(basePath, milestoneId, candidate, existing),
});
const selected = new Set(selectedIds);
return slices.filter((slice) => selected.has(slice.id));
}
const safe: Array<{ id: string }> = [];
for (const candidate of slices) {

View file

@ -0,0 +1,69 @@
import test from "node:test";
import assert from "node:assert/strict";
import type { SidecarItem } from "../auto/session.ts";
import {
selectConflictFreeBatch,
selectReactiveDispatchBatch,
buildSidecarQueueNodes,
scheduleSidecarQueue,
} from "../uok/execution-graph.ts";
test("uok execution graph selects deterministic conflict-free IDs", () => {
const selected = selectConflictFreeBatch({
orderedIds: ["S01", "S02", "S03", "S04"],
maxParallel: 4,
hasConflict: (candidate, existing) =>
(candidate === "S02" && existing === "S01") ||
(candidate === "S01" && existing === "S02"),
});
assert.deepEqual(selected, ["S01", "S03", "S04"]);
});
test("uok execution graph reactive batch honors file conflicts and in-flight writes", () => {
const result = selectReactiveDispatchBatch({
graph: [
{ id: "T01", dependsOn: [], outputFiles: ["src/a.ts"] },
{ id: "T02", dependsOn: [], outputFiles: ["src/a.ts"] },
{ id: "T03", dependsOn: [], outputFiles: ["src/b.ts"] },
{ id: "T04", dependsOn: ["T03"], outputFiles: ["src/c.ts"] },
],
readyIds: ["T01", "T02", "T03", "T04"],
maxParallel: 3,
inFlightOutputs: new Set(["src/c.ts"]),
});
assert.deepEqual(result.selected, ["T01", "T03"]);
assert.ok(
result.conflicts.some((c) => c.nodeA === "T01" && c.nodeB === "T02" && c.file === "src/a.ts"),
"conflict list should include overlapping outputs",
);
});
test("uok execution graph sidecar nodes map queue kinds to supported DAG kinds", () => {
const queue: SidecarItem[] = [
{ kind: "hook", unitType: "execute-task", unitId: "M001/S01/T01", prompt: "hook" },
{ kind: "triage", unitType: "triage", unitId: "M001/S01", prompt: "triage" },
{ kind: "quick-task", unitType: "quick-task", unitId: "M001/S01/Q01", prompt: "quick" },
];
const nodes = buildSidecarQueueNodes(queue);
assert.equal(nodes[0]?.kind, "hook");
assert.equal(nodes[1]?.kind, "verification");
assert.equal(nodes[2]?.kind, "team-worker");
assert.equal(nodes[1]?.dependsOn.length, 1);
});
test("uok execution graph sidecar scheduler preserves deterministic queue order", async () => {
const queue: SidecarItem[] = [
{ kind: "quick-task", unitType: "quick-task", unitId: "M001/S01/Q01", prompt: "q1" },
{ kind: "hook", unitType: "hook", unitId: "M001/S01/H01", prompt: "h1" },
{ kind: "triage", unitType: "triage", unitId: "M001/S01/TR1", prompt: "t1" },
];
const scheduled = await scheduleSidecarQueue(queue);
assert.deepEqual(
scheduled.map((item) => item.unitId),
queue.map((item) => item.unitId),
);
});

View file

@ -1,4 +1,6 @@
import type { UokGraphNode } from "./contracts.js";
import type { DerivedTaskNode } from "../types.js";
import type { SidecarItem } from "../auto/session.js";
export interface ExecutionGraphRunOptions {
parallel?: boolean;
@ -12,6 +14,123 @@ export interface ExecutionGraphResult {
export type ExecutionNodeHandler = (node: UokGraphNode) => Promise<void>;
export interface ConflictFreeBatchInput {
orderedIds: string[];
maxParallel: number;
hasConflict: (leftId: string, rightId: string) => boolean;
}
export interface ReactiveDispatchSelectionInput {
graph: Array<Pick<DerivedTaskNode, "id" | "dependsOn" | "outputFiles">>;
readyIds: string[];
maxParallel: number;
inFlightOutputs?: Set<string>;
}
export interface ReactiveDispatchSelectionResult {
selected: string[];
conflicts: Array<{ nodeA: string; nodeB: string; file: string }>;
}
export function selectConflictFreeBatch({
orderedIds,
maxParallel,
hasConflict,
}: ConflictFreeBatchInput): string[] {
if (maxParallel <= 0 || orderedIds.length === 0) return [];
const selected: string[] = [];
for (const candidate of orderedIds) {
if (selected.length >= maxParallel) break;
const conflictsExisting = selected.some((existing) => hasConflict(candidate, existing));
if (conflictsExisting) continue;
selected.push(candidate);
}
return selected;
}
function buildReactiveNodes(
graph: Array<Pick<DerivedTaskNode, "id" | "dependsOn" | "outputFiles">>,
): UokGraphNode[] {
return graph.map((node) => ({
id: node.id,
kind: "unit",
dependsOn: [...node.dependsOn],
writes: [...node.outputFiles],
}));
}
export function selectReactiveDispatchBatch(
input: ReactiveDispatchSelectionInput,
): ReactiveDispatchSelectionResult {
const nodeMap = new Map(buildReactiveNodes(input.graph).map((n) => [n.id, n]));
const readyNodes = input.readyIds
.map((id) => nodeMap.get(id))
.filter((node): node is UokGraphNode => !!node);
const conflicts = detectFileConflicts(readyNodes);
if (readyNodes.length === 0 || input.maxParallel <= 0) {
return { selected: [], conflicts };
}
const claimed = new Set(input.inFlightOutputs ?? []);
const selected: string[] = [];
const selectedSet = new Set<string>();
const readySet = new Set(input.readyIds);
for (const id of input.readyIds) {
if (selected.length >= input.maxParallel) break;
const node = nodeMap.get(id);
if (!node) continue;
const hasUnmetReadyDependency = node.dependsOn.some(
(dep) => readySet.has(dep) && !selectedSet.has(dep),
);
if (hasUnmetReadyDependency) continue;
const writes = node.writes ?? [];
const conflictsWithClaimed = writes.some((file) => claimed.has(file));
if (conflictsWithClaimed) continue;
selected.push(node.id);
selectedSet.add(node.id);
for (const file of writes) claimed.add(file);
}
return { selected, conflicts };
}
function sidecarToNodeKind(kind: SidecarItem["kind"]): UokGraphNode["kind"] {
if (kind === "hook") return "hook";
if (kind === "triage") return "verification";
return "team-worker";
}
export function buildSidecarQueueNodes(queue: SidecarItem[]): UokGraphNode[] {
return queue.map((item, index) => ({
id: `sidecar-${String(index).padStart(4, "0")}:${item.kind}:${item.unitType}:${item.unitId}`,
kind: sidecarToNodeKind(item.kind),
dependsOn: index > 0 ? [`sidecar-${String(index - 1).padStart(4, "0")}:${queue[index - 1].kind}:${queue[index - 1].unitType}:${queue[index - 1].unitId}`] : [],
metadata: { index },
}));
}
export async function scheduleSidecarQueue(queue: SidecarItem[]): Promise<SidecarItem[]> {
if (queue.length <= 1) return [...queue];
const nodes = buildSidecarQueueNodes(queue);
const scheduler = new ExecutionGraphScheduler();
const orderedIndexes: number[] = [];
const seenKinds = new Set<UokGraphNode["kind"]>(nodes.map((n) => n.kind));
for (const kind of seenKinds) {
scheduler.registerHandler(kind, async (node) => {
const idx = Number(node.metadata?.index);
if (Number.isInteger(idx) && idx >= 0) orderedIndexes.push(idx);
});
}
await scheduler.run(nodes, { parallel: false });
return orderedIndexes.map((idx) => queue[idx]).filter((item): item is SidecarItem => !!item);
}
export class ExecutionGraphScheduler {
private readonly handlers = new Map<string, ExecutionNodeHandler>();
@ -42,6 +161,7 @@ export class ExecutionGraphScheduler {
const ready = Array.from(remaining.values()).filter((node) =>
node.dependsOn.every((dep) => done.has(dep)),
);
ready.sort((a, b) => a.id.localeCompare(b.id));
if (ready.length === 0) {
throw new Error("Execution graph deadlock detected: no ready nodes and graph not complete");
}