feat(uok): wire ExecutionGraphScheduler into kernel loop path

- loop.ts: add DispatchContract type, AutoLoopOptions, resolveDispatchNodeKind,
  runUnitPhaseViaContract — kernel path routes unit execution through
  ExecutionGraphScheduler; legacy path passes through directly
- loop.ts: export runUokKernelLoop (contract=uok-scheduler) and
  runLegacyAutoLoop (contract=legacy-direct)
- auto-loop.ts: re-export both new loop functions
- auto.ts: use runUokKernelLoop/runLegacyAutoLoop at both call sites
- phases.ts: use uokFlags.planningFlow for plan gate (was bypassing
  legacyFallback via raw pref read)
- auto-dispatch.ts: use hasFinalizedMilestoneContext for execution-entry
  context check (picks up SF_PROJECT_ROOT artifact fallback)
- tests: port uok-writer, uok-parity-report, uok-loop-adapter-writer,
  uok-kernel-path test files from gsd2 — all 8 tests pass

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Mikael Hugo 2026-05-02 00:54:15 +02:00
parent ca5d2880ec
commit 9c20ebd76b
9 changed files with 452 additions and 18 deletions

View file

@ -73,7 +73,10 @@ import {
buildDispatchEnvelope,
explainDispatch,
} from "./uok/dispatch-envelope.js";
import { EXECUTION_ENTRY_PHASES } from "./uok/plan-v2.js";
import {
EXECUTION_ENTRY_PHASES,
hasFinalizedMilestoneContext,
} from "./uok/plan-v2.js";
import { extractVerdict, isAcceptableUatVerdict } from "./verdict-parser.js";
import { logError, logWarning } from "./workflow-logger.js";
@ -668,10 +671,7 @@ export const DISPATCH_RULES: DispatchRule[] = [
name: "execution-entry phase (no context) → discuss-milestone",
match: async ({ state, mid, midTitle, basePath }) => {
if (!EXECUTION_ENTRY_PHASES.has(state.phase)) return null;
const contextFile = resolveMilestoneFile(basePath, mid, "CONTEXT");
const contextContent = contextFile ? await loadFile(contextFile) : null;
const hasContext = !!(contextContent && contextContent.trim().length > 0);
if (hasContext) return null;
if (hasFinalizedMilestoneContext(basePath, mid)) return null;
return {
action: "dispatch",
unitType: "discuss-milestone",

View file

@ -12,7 +12,7 @@ export {
INFRA_ERROR_CODES,
isInfrastructureError,
} from "./auto/infra-errors.js";
export { autoLoop } from "./auto/loop.js";
export { autoLoop, runLegacyAutoLoop, runUokKernelLoop } from "./auto/loop.js";
export type { LoopDeps } from "./auto/loop-deps.js";
export {
_resetPendingResolve,

View file

@ -64,12 +64,13 @@ import {
import { DISPATCH_RULES, resolveDispatch } from "./auto-dispatch.js";
import {
_resetPendingResolve,
autoLoop,
type ErrorContext,
isSessionSwitchInFlight,
type LoopDeps,
resolveAgentEnd,
resolveAgentEndCancelled,
runLegacyAutoLoop,
runUokKernelLoop,
} from "./auto-loop.js";
import {
clearToolBaseline,
@ -1809,8 +1810,8 @@ export async function startAuto(
pi,
s,
deps: buildLoopDeps(),
runKernelLoop: autoLoop,
runLegacyLoop: autoLoop,
runKernelLoop: runUokKernelLoop,
runLegacyLoop: runLegacyAutoLoop,
});
cleanupAfterLoopExit(ctx);
return;
@ -1862,8 +1863,8 @@ export async function startAuto(
pi,
s,
deps: buildLoopDeps(),
runKernelLoop: autoLoop,
runLegacyLoop: autoLoop,
runKernelLoop: runUokKernelLoop,
runLegacyLoop: runLegacyAutoLoop,
});
cleanupAfterLoopExit(ctx);
}

View file

@ -18,8 +18,9 @@ import { ModelPolicyDispatchBlockedError } from "../auto-model-selection.js";
import { debugLog } from "../debug-logger.js";
import { resolveEngine } from "../engine-resolver.js";
import { sfRoot } from "../paths.js";
import { scheduleSidecarQueue } from "../uok/execution-graph.js";
import { ExecutionGraphScheduler, scheduleSidecarQueue } from "../uok/execution-graph.js";
import { resolveUokFlags } from "../uok/flags.js";
import type { UokGraphNode } from "../uok/contracts.js";
import { logWarning } from "../workflow-logger.js";
import {
COOLDOWN_FALLBACK_WAIT_MS,
@ -42,9 +43,17 @@ import {
type IterationContext,
type IterationData,
type LoopState,
type PhaseResult,
MAX_LOOP_ITERATIONS,
} from "./types.js";
// ── Dispatch contract types ───────────────────────────────────────────────
type DispatchContract = "legacy-direct" | "uok-scheduler";
interface AutoLoopOptions {
dispatchContract?: DispatchContract;
}
// ── Stuck detection persistence (#3704) ──────────────────────────────────
// Persist stuck detection state to disk so it survives session restarts.
// Without this, restarting auto-mode resets all counters, allowing the
@ -244,6 +253,63 @@ async function withPhaseTimeout<T>(
}
}
// ── Dispatch contract helpers ─────────────────────────────────────────────
function resolveDispatchNodeKind(
unitType: string,
sidecarItem?: SidecarItem,
): UokGraphNode["kind"] {
if (sidecarItem?.kind === "hook") return "hook";
if (sidecarItem?.kind === "triage") return "verification";
if (sidecarItem?.kind === "quick-task") return "team-worker";
if (unitType.startsWith("hook/")) return "hook";
if (unitType === "reactive-execute") return "subagent";
if (
unitType === "gate-evaluate" ||
unitType === "validate-milestone" ||
unitType === "run-uat" ||
unitType === "complete-slice"
) {
return "verification";
}
if (unitType === "replan-slice" || unitType === "reassess-roadmap") {
return "reprocess";
}
return "unit";
}
async function runUnitPhaseViaContract(
dispatchContract: DispatchContract,
ic: IterationContext,
iterData: IterationData,
loopState: LoopState,
sidecarItem?: SidecarItem,
): Promise<PhaseResult<{ unitStartedAt: number }>> {
if (dispatchContract === "legacy-direct") {
return runUnitPhase(ic, iterData, loopState, sidecarItem);
}
const scheduler = new ExecutionGraphScheduler();
let outcome: PhaseResult<{ unitStartedAt: number }> | null = null;
const executeNode = async (): Promise<void> => {
outcome = await runUnitPhase(ic, iterData, loopState, sidecarItem);
};
const kinds: UokGraphNode["kind"][] = [
"unit", "hook", "subagent", "team-worker", "verification", "reprocess",
];
for (const kind of kinds) scheduler.registerHandler(kind, executeNode);
const nodeId = `dispatch:${ic.iteration}:${iterData.unitType}:${iterData.unitId}`;
await scheduler.run([{
id: nodeId,
kind: resolveDispatchNodeKind(iterData.unitType, sidecarItem),
dependsOn: [],
metadata: { unitType: iterData.unitType, unitId: iterData.unitId },
}], { parallel: false, maxWorkers: 1 });
return outcome ?? { action: "break", reason: "scheduler-dispatch-missing-result" };
}
/**
* Main auto-mode execution loop. Iterates: derive dispatch guards
* runUnit finalize repeat. Exits when s.active becomes false or a
@ -257,7 +323,9 @@ export async function autoLoop(
pi: ExtensionAPI,
s: AutoSession,
deps: LoopDeps,
options?: AutoLoopOptions,
): Promise<void> {
const dispatchContract = options?.dispatchContract ?? "legacy-direct";
debugLog("autoLoop", { phase: "enter" });
let iteration = 0;
// Load persisted stuck state so counters survive session restarts (#3704)
@ -552,7 +620,7 @@ export async function autoLoop(
}
// ── Unit execution (shared with dev path) ──
const unitPhaseResult = await runUnitPhase(ic, iterData, loopState);
const unitPhaseResult = await runUnitPhaseViaContract(dispatchContract, ic, iterData, loopState);
deps.uokObserver?.onPhaseResult("unit", unitPhaseResult.action, {
unitType: iterData.unitType,
unitId: iterData.unitId,
@ -848,7 +916,8 @@ export async function autoLoop(
});
}
const unitPhaseResult = await runUnitPhase(
const unitPhaseResult = await runUnitPhaseViaContract(
dispatchContract,
ic,
iterData,
loopState,
@ -1089,3 +1158,23 @@ export async function autoLoop(
_clearCurrentResolve();
debugLog("autoLoop", { phase: "exit", totalIterations: iteration });
}
// ── Dispatch-contract entry points ───────────────────────────────────────
export async function runUokKernelLoop(
ctx: ExtensionContext,
pi: ExtensionAPI,
s: AutoSession,
deps: LoopDeps,
): Promise<void> {
return autoLoop(ctx, pi, s, deps, { dispatchContract: "uok-scheduler" });
}
export async function runLegacyAutoLoop(
ctx: ExtensionContext,
pi: ExtensionAPI,
s: AutoSession,
deps: LoopDeps,
): Promise<void> {
return autoLoop(ctx, pi, s, deps, { dispatchContract: "legacy-direct" });
}

View file

@ -580,10 +580,7 @@ export async function runPreDispatch(
// Derive state
let state = await deps.deriveState(s.basePath);
const planningFlowEnabled =
prefs?.uok?.planning_flow?.enabled === true ||
prefs?.uok?.plan_v2?.enabled === true;
if (planningFlowEnabled && shouldRunPlanningFlowGate(state.phase)) {
if (uokFlags.planningFlow && shouldRunPlanningFlowGate(state.phase)) {
let compiled = ensurePlanningFlowGraph(s.basePath, state);
// Empty-graph recovery: stale DB caches can yield 0 nodes right after a
// task-complete write. Invalidate caches, re-derive state, and retry once.

View file

@ -0,0 +1,165 @@
import test from "node:test";
import assert from "node:assert/strict";
import { mkdtempSync, readFileSync, rmSync } from "node:fs";
import { join } from "node:path";
import { tmpdir } from "node:os";
import type { ExtensionAPI, ExtensionContext } from "@singularity-forge/pi-coding-agent";
import { runAutoLoopWithUok } from "../uok/kernel.ts";
import type { AutoSession } from "../auto/session.ts";
import type { LoopDeps } from "../auto/loop-deps.ts";
import { sfRoot } from "../paths.ts";
import type { SFPreferences } from "../preferences.ts";
function makeBasePath(): string {
return mkdtempSync(join(tmpdir(), "sf-uok-kernel-"));
}
function makeArgs(
basePath: string,
preferences: SFPreferences | undefined,
): {
ctx: ExtensionContext;
pi: ExtensionAPI;
s: AutoSession;
deps: LoopDeps;
runKernelLoop: (
ctx: ExtensionContext,
pi: ExtensionAPI,
s: AutoSession,
deps: LoopDeps,
) => Promise<void>;
runLegacyLoop: (
ctx: ExtensionContext,
pi: ExtensionAPI,
s: AutoSession,
deps: LoopDeps,
) => Promise<void>;
calls: {
kernel: number;
legacy: number;
kernelDeps: LoopDeps | null;
legacyDeps: LoopDeps | null;
};
} {
const calls = {
kernel: 0,
legacy: 0,
kernelDeps: null as LoopDeps | null,
legacyDeps: null as LoopDeps | null,
};
return {
ctx: {
sessionManager: {
getSessionId: (): string => "session-test",
},
} as unknown as ExtensionContext,
pi: {} as unknown as ExtensionAPI,
s: {
basePath,
autoStartTime: 1,
} as unknown as AutoSession,
deps: {
loadEffectiveSFPreferences: () => ({ preferences }),
} as unknown as LoopDeps,
runKernelLoop: async (_ctx, _pi, _s, loopDeps): Promise<void> => {
calls.kernel += 1;
calls.kernelDeps = loopDeps;
},
runLegacyLoop: async (_ctx, _pi, _s, loopDeps): Promise<void> => {
calls.legacy += 1;
calls.legacyDeps = loopDeps;
},
calls,
};
}
function readParityEvents(basePath: string): Array<Record<string, unknown>> {
const file = join(sfRoot(basePath), "runtime", "uok-parity.jsonl");
const raw = readFileSync(file, "utf-8").trim();
if (raw.length === 0) return [];
return raw.split("\n").map(line => JSON.parse(line) as Record<string, unknown>);
}
test("runAutoLoopWithUok uses kernel path by default and records uok-kernel parity", async () => {
const basePath = makeBasePath();
try {
const args = makeArgs(basePath, {
uok: {
enabled: true,
audit_envelope: { enabled: false },
gitops: { enabled: false },
},
});
await runAutoLoopWithUok(args);
assert.equal(args.calls.kernel, 1);
assert.equal(args.calls.legacy, 0);
assert.ok(args.calls.kernelDeps);
assert.notEqual(args.calls.kernelDeps, args.deps);
assert.ok(args.calls.kernelDeps?.uokObserver);
const events = readParityEvents(basePath);
assert.equal(events.length, 2);
assert.equal(events[0]?.path, "uok-kernel");
assert.equal(events[0]?.phase, "enter");
assert.equal(events[1]?.path, "uok-kernel");
assert.equal(events[1]?.phase, "exit");
assert.equal(events[1]?.status, "ok");
} finally {
rmSync(basePath, { recursive: true, force: true });
}
});
test("runAutoLoopWithUok uses legacy path when explicit legacy fallback is enabled", async () => {
const basePath = makeBasePath();
try {
const args = makeArgs(basePath, {
uok: {
enabled: true,
legacy_fallback: { enabled: true },
},
});
await runAutoLoopWithUok(args);
assert.equal(args.calls.kernel, 0);
assert.equal(args.calls.legacy, 1);
assert.equal(args.calls.legacyDeps, args.deps);
const events = readParityEvents(basePath);
assert.equal(events.length, 2);
assert.equal(events[0]?.path, "legacy-fallback");
assert.equal(events[1]?.path, "legacy-fallback");
assert.equal(events[1]?.status, "ok");
} finally {
rmSync(basePath, { recursive: true, force: true });
}
});
test("runAutoLoopWithUok respects SF_UOK_FORCE_LEGACY emergency switch", async () => {
const basePath = makeBasePath();
const previous = process.env.SF_UOK_FORCE_LEGACY;
process.env.SF_UOK_FORCE_LEGACY = "1";
try {
const args = makeArgs(basePath, {
uok: {
enabled: true,
},
});
await runAutoLoopWithUok(args);
assert.equal(args.calls.kernel, 0);
assert.equal(args.calls.legacy, 1);
const events = readParityEvents(basePath);
assert.equal(events.length, 2);
assert.equal(events[0]?.path, "legacy-fallback");
assert.equal(events[1]?.path, "legacy-fallback");
} finally {
if (previous === undefined) delete process.env.SF_UOK_FORCE_LEGACY;
else process.env.SF_UOK_FORCE_LEGACY = previous;
rmSync(basePath, { recursive: true, force: true });
}
});

View file

@ -0,0 +1,65 @@
import test from "node:test";
import assert from "node:assert/strict";
import { existsSync, mkdtempSync, readFileSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createTurnObserver } from "../uok/loop-adapter.ts";
import { hasActiveWriterToken, resetWriterTokensForTests } from "../uok/writer.ts";
function readAuditPayloads(basePath: string): Array<Record<string, unknown>> {
const path = join(basePath, ".sf", "audit", "events.jsonl");
if (!existsSync(path)) return [];
return readFileSync(path, "utf-8")
.split("\n")
.filter(Boolean)
.map((line) => JSON.parse(line) as { payload?: Record<string, unknown> })
.map((event) => event.payload ?? {});
}
test("uok turn observer adds writer sequence metadata to audit events", (t) => {
const basePath = mkdtempSync(join(tmpdir(), "sf-uok-loop-writer-"));
resetWriterTokensForTests();
t.after(() => {
resetWriterTokensForTests();
rmSync(basePath, { recursive: true, force: true });
});
const observer = createTurnObserver({
basePath,
gitAction: "status-only",
gitPush: false,
enableAudit: true,
enableGitops: false,
});
observer.onTurnStart({
basePath,
traceId: "trace-1",
turnId: "turn-1",
iteration: 1,
unitType: "execute-task",
unitId: "M001/S01/T01",
startedAt: new Date().toISOString(),
});
assert.equal(hasActiveWriterToken(basePath, "turn-1"), true);
observer.onTurnResult({
traceId: "trace-1",
turnId: "turn-1",
iteration: 1,
unitType: "execute-task",
unitId: "M001/S01/T01",
status: "completed",
failureClass: "none",
phaseResults: [],
startedAt: new Date().toISOString(),
finishedAt: new Date().toISOString(),
});
assert.equal(hasActiveWriterToken(basePath, "turn-1"), false);
const payloads = readAuditPayloads(basePath);
assert.equal(payloads[0]?.writeSequence, 1);
assert.equal(payloads[1]?.writeSequence, 2);
assert.equal(typeof payloads[0]?.writerTokenId, "string");
});

View file

@ -0,0 +1,42 @@
import test from "node:test";
import assert from "node:assert/strict";
import { mkdtempSync, readFileSync, rmSync, appendFileSync, mkdirSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { buildParityReport, parseParityEvents, writeParityReport } from "../uok/parity-report.ts";
test("uok parity report summarizes paths, statuses, and fallback use", () => {
const events = parseParityEvents([
JSON.stringify({ path: "uok-kernel", phase: "enter" }),
JSON.stringify({ path: "uok-kernel", phase: "exit", status: "ok" }),
JSON.stringify({ path: "legacy-fallback", phase: "enter" }),
JSON.stringify({ path: "legacy-fallback", phase: "exit", status: "error", error: "boom" }),
].join("\n"));
const report = buildParityReport(events, "/tmp/uok-parity.jsonl");
assert.equal(report.totalEvents, 4);
assert.equal(report.paths["uok-kernel"], 2);
assert.equal(report.fallbackInvocations, 2);
assert.deepEqual(report.criticalMismatches, ["boom"]);
});
test("uok parity report writes runtime report artifact", (t) => {
const basePath = mkdtempSync(join(tmpdir(), "sf-uok-parity-"));
t.after(() => {
rmSync(basePath, { recursive: true, force: true });
});
const runtime = join(basePath, ".sf", "runtime");
mkdirSync(runtime, { recursive: true });
appendFileSync(
join(runtime, "uok-parity.jsonl"),
`${JSON.stringify({ path: "uok-kernel", phase: "exit", status: "ok" })}\n`,
"utf-8",
);
const report = writeParityReport(basePath);
assert.equal(report.totalEvents, 1);
const saved = JSON.parse(readFileSync(join(runtime, "uok-parity-report.json"), "utf-8"));
assert.equal(saved.statuses.ok, 1);
});

View file

@ -0,0 +1,75 @@
import test from "node:test";
import assert from "node:assert/strict";
import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import {
acquireWriterToken,
hasActiveWriterToken,
nextWriteRecord,
releaseWriterToken,
resetWriterTokensForTests,
} from "../uok/writer.ts";
test("uok writer enforces one active token per turn", (t) => {
const basePath = mkdtempSync(join(tmpdir(), "sf-uok-writer-"));
resetWriterTokensForTests();
t.after(() => {
resetWriterTokensForTests();
rmSync(basePath, { recursive: true, force: true });
});
const token = acquireWriterToken({
basePath,
traceId: "trace-1",
turnId: "turn-1",
});
assert.equal(hasActiveWriterToken(basePath, "turn-1"), true);
assert.throws(
() => acquireWriterToken({ basePath, traceId: "trace-1", turnId: "turn-1" }),
/already active/,
);
releaseWriterToken(basePath, token);
assert.equal(hasActiveWriterToken(basePath, "turn-1"), false);
});
test("uok writer produces monotonic sequence records across turns", (t) => {
const basePath = mkdtempSync(join(tmpdir(), "sf-uok-writer-seq-"));
resetWriterTokensForTests();
t.after(() => {
resetWriterTokensForTests();
rmSync(basePath, { recursive: true, force: true });
});
const token1 = acquireWriterToken({
basePath,
traceId: "trace-1",
turnId: "turn-1",
});
const first = nextWriteRecord({
basePath,
token: token1,
category: "audit",
operation: "append",
path: ".sf/audit/events.jsonl",
});
releaseWriterToken(basePath, token1);
const token2 = acquireWriterToken({
basePath,
traceId: "trace-2",
turnId: "turn-2",
});
const second = nextWriteRecord({
basePath,
token: token2,
category: "gitops",
operation: "insert",
});
assert.equal(first.sequence.sequence, 1);
assert.equal(second.sequence.sequence, 2);
assert.equal(second.sequence.turnId, "turn-2");
});