diff --git a/src/resources/extensions/sf/auto-dispatch.ts b/src/resources/extensions/sf/auto-dispatch.ts index 46999da82..7d2d5a543 100644 --- a/src/resources/extensions/sf/auto-dispatch.ts +++ b/src/resources/extensions/sf/auto-dispatch.ts @@ -73,7 +73,10 @@ import { buildDispatchEnvelope, explainDispatch, } from "./uok/dispatch-envelope.js"; -import { EXECUTION_ENTRY_PHASES } from "./uok/plan-v2.js"; +import { + EXECUTION_ENTRY_PHASES, + hasFinalizedMilestoneContext, +} from "./uok/plan-v2.js"; import { extractVerdict, isAcceptableUatVerdict } from "./verdict-parser.js"; import { logError, logWarning } from "./workflow-logger.js"; @@ -668,10 +671,7 @@ export const DISPATCH_RULES: DispatchRule[] = [ name: "execution-entry phase (no context) → discuss-milestone", match: async ({ state, mid, midTitle, basePath }) => { if (!EXECUTION_ENTRY_PHASES.has(state.phase)) return null; - const contextFile = resolveMilestoneFile(basePath, mid, "CONTEXT"); - const contextContent = contextFile ? await loadFile(contextFile) : null; - const hasContext = !!(contextContent && contextContent.trim().length > 0); - if (hasContext) return null; + if (hasFinalizedMilestoneContext(basePath, mid)) return null; return { action: "dispatch", unitType: "discuss-milestone", diff --git a/src/resources/extensions/sf/auto-loop.ts b/src/resources/extensions/sf/auto-loop.ts index 8d4be59a9..901367e12 100644 --- a/src/resources/extensions/sf/auto-loop.ts +++ b/src/resources/extensions/sf/auto-loop.ts @@ -12,7 +12,7 @@ export { INFRA_ERROR_CODES, isInfrastructureError, } from "./auto/infra-errors.js"; -export { autoLoop } from "./auto/loop.js"; +export { autoLoop, runLegacyAutoLoop, runUokKernelLoop } from "./auto/loop.js"; export type { LoopDeps } from "./auto/loop-deps.js"; export { _resetPendingResolve, diff --git a/src/resources/extensions/sf/auto.ts b/src/resources/extensions/sf/auto.ts index 02e520a4d..17a5427bc 100644 --- a/src/resources/extensions/sf/auto.ts +++ b/src/resources/extensions/sf/auto.ts @@ -64,12 +64,13 @@ import { import { DISPATCH_RULES, resolveDispatch } from "./auto-dispatch.js"; import { _resetPendingResolve, - autoLoop, type ErrorContext, isSessionSwitchInFlight, type LoopDeps, resolveAgentEnd, resolveAgentEndCancelled, + runLegacyAutoLoop, + runUokKernelLoop, } from "./auto-loop.js"; import { clearToolBaseline, @@ -1809,8 +1810,8 @@ export async function startAuto( pi, s, deps: buildLoopDeps(), - runKernelLoop: autoLoop, - runLegacyLoop: autoLoop, + runKernelLoop: runUokKernelLoop, + runLegacyLoop: runLegacyAutoLoop, }); cleanupAfterLoopExit(ctx); return; @@ -1862,8 +1863,8 @@ export async function startAuto( pi, s, deps: buildLoopDeps(), - runKernelLoop: autoLoop, - runLegacyLoop: autoLoop, + runKernelLoop: runUokKernelLoop, + runLegacyLoop: runLegacyAutoLoop, }); cleanupAfterLoopExit(ctx); } diff --git a/src/resources/extensions/sf/auto/loop.ts b/src/resources/extensions/sf/auto/loop.ts index 0491899aa..511bd8dea 100644 --- a/src/resources/extensions/sf/auto/loop.ts +++ b/src/resources/extensions/sf/auto/loop.ts @@ -18,8 +18,9 @@ import { ModelPolicyDispatchBlockedError } from "../auto-model-selection.js"; import { debugLog } from "../debug-logger.js"; import { resolveEngine } from "../engine-resolver.js"; import { sfRoot } from "../paths.js"; -import { scheduleSidecarQueue } from "../uok/execution-graph.js"; +import { ExecutionGraphScheduler, scheduleSidecarQueue } from "../uok/execution-graph.js"; import { resolveUokFlags } from "../uok/flags.js"; +import type { UokGraphNode } from "../uok/contracts.js"; import { logWarning } from "../workflow-logger.js"; import { COOLDOWN_FALLBACK_WAIT_MS, @@ -42,9 +43,17 @@ import { type IterationContext, type IterationData, type LoopState, + type PhaseResult, MAX_LOOP_ITERATIONS, } from "./types.js"; +// ── Dispatch contract types ─────────────────────────────────────────────── +type DispatchContract = "legacy-direct" | "uok-scheduler"; + +interface AutoLoopOptions { + dispatchContract?: DispatchContract; +} + // ── Stuck detection persistence (#3704) ────────────────────────────────── // Persist stuck detection state to disk so it survives session restarts. // Without this, restarting auto-mode resets all counters, allowing the @@ -244,6 +253,63 @@ async function withPhaseTimeout( } } +// ── Dispatch contract helpers ───────────────────────────────────────────── + +function resolveDispatchNodeKind( + unitType: string, + sidecarItem?: SidecarItem, +): UokGraphNode["kind"] { + if (sidecarItem?.kind === "hook") return "hook"; + if (sidecarItem?.kind === "triage") return "verification"; + if (sidecarItem?.kind === "quick-task") return "team-worker"; + if (unitType.startsWith("hook/")) return "hook"; + if (unitType === "reactive-execute") return "subagent"; + if ( + unitType === "gate-evaluate" || + unitType === "validate-milestone" || + unitType === "run-uat" || + unitType === "complete-slice" + ) { + return "verification"; + } + if (unitType === "replan-slice" || unitType === "reassess-roadmap") { + return "reprocess"; + } + return "unit"; +} + +async function runUnitPhaseViaContract( + dispatchContract: DispatchContract, + ic: IterationContext, + iterData: IterationData, + loopState: LoopState, + sidecarItem?: SidecarItem, +): Promise> { + if (dispatchContract === "legacy-direct") { + return runUnitPhase(ic, iterData, loopState, sidecarItem); + } + + const scheduler = new ExecutionGraphScheduler(); + let outcome: PhaseResult<{ unitStartedAt: number }> | null = null; + const executeNode = async (): Promise => { + outcome = await runUnitPhase(ic, iterData, loopState, sidecarItem); + }; + const kinds: UokGraphNode["kind"][] = [ + "unit", "hook", "subagent", "team-worker", "verification", "reprocess", + ]; + for (const kind of kinds) scheduler.registerHandler(kind, executeNode); + + const nodeId = `dispatch:${ic.iteration}:${iterData.unitType}:${iterData.unitId}`; + await scheduler.run([{ + id: nodeId, + kind: resolveDispatchNodeKind(iterData.unitType, sidecarItem), + dependsOn: [], + metadata: { unitType: iterData.unitType, unitId: iterData.unitId }, + }], { parallel: false, maxWorkers: 1 }); + + return outcome ?? { action: "break", reason: "scheduler-dispatch-missing-result" }; +} + /** * Main auto-mode execution loop. Iterates: derive → dispatch → guards → * runUnit → finalize → repeat. Exits when s.active becomes false or a @@ -257,7 +323,9 @@ export async function autoLoop( pi: ExtensionAPI, s: AutoSession, deps: LoopDeps, + options?: AutoLoopOptions, ): Promise { + const dispatchContract = options?.dispatchContract ?? "legacy-direct"; debugLog("autoLoop", { phase: "enter" }); let iteration = 0; // Load persisted stuck state so counters survive session restarts (#3704) @@ -552,7 +620,7 @@ export async function autoLoop( } // ── Unit execution (shared with dev path) ── - const unitPhaseResult = await runUnitPhase(ic, iterData, loopState); + const unitPhaseResult = await runUnitPhaseViaContract(dispatchContract, ic, iterData, loopState); deps.uokObserver?.onPhaseResult("unit", unitPhaseResult.action, { unitType: iterData.unitType, unitId: iterData.unitId, @@ -848,7 +916,8 @@ export async function autoLoop( }); } - const unitPhaseResult = await runUnitPhase( + const unitPhaseResult = await runUnitPhaseViaContract( + dispatchContract, ic, iterData, loopState, @@ -1089,3 +1158,23 @@ export async function autoLoop( _clearCurrentResolve(); debugLog("autoLoop", { phase: "exit", totalIterations: iteration }); } + +// ── Dispatch-contract entry points ─────────────────────────────────────── + +export async function runUokKernelLoop( + ctx: ExtensionContext, + pi: ExtensionAPI, + s: AutoSession, + deps: LoopDeps, +): Promise { + return autoLoop(ctx, pi, s, deps, { dispatchContract: "uok-scheduler" }); +} + +export async function runLegacyAutoLoop( + ctx: ExtensionContext, + pi: ExtensionAPI, + s: AutoSession, + deps: LoopDeps, +): Promise { + return autoLoop(ctx, pi, s, deps, { dispatchContract: "legacy-direct" }); +} diff --git a/src/resources/extensions/sf/auto/phases.ts b/src/resources/extensions/sf/auto/phases.ts index 2e69eeab6..81ad3b320 100644 --- a/src/resources/extensions/sf/auto/phases.ts +++ b/src/resources/extensions/sf/auto/phases.ts @@ -580,10 +580,7 @@ export async function runPreDispatch( // Derive state let state = await deps.deriveState(s.basePath); - const planningFlowEnabled = - prefs?.uok?.planning_flow?.enabled === true || - prefs?.uok?.plan_v2?.enabled === true; - if (planningFlowEnabled && shouldRunPlanningFlowGate(state.phase)) { + if (uokFlags.planningFlow && shouldRunPlanningFlowGate(state.phase)) { let compiled = ensurePlanningFlowGraph(s.basePath, state); // Empty-graph recovery: stale DB caches can yield 0 nodes right after a // task-complete write. Invalidate caches, re-derive state, and retry once. diff --git a/src/resources/extensions/sf/tests/uok-kernel-path.test.ts b/src/resources/extensions/sf/tests/uok-kernel-path.test.ts new file mode 100644 index 000000000..15b8893d2 --- /dev/null +++ b/src/resources/extensions/sf/tests/uok-kernel-path.test.ts @@ -0,0 +1,165 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import { mkdtempSync, readFileSync, rmSync } from "node:fs"; +import { join } from "node:path"; +import { tmpdir } from "node:os"; + +import type { ExtensionAPI, ExtensionContext } from "@singularity-forge/pi-coding-agent"; + +import { runAutoLoopWithUok } from "../uok/kernel.ts"; +import type { AutoSession } from "../auto/session.ts"; +import type { LoopDeps } from "../auto/loop-deps.ts"; +import { sfRoot } from "../paths.ts"; +import type { SFPreferences } from "../preferences.ts"; + +function makeBasePath(): string { + return mkdtempSync(join(tmpdir(), "sf-uok-kernel-")); +} + +function makeArgs( + basePath: string, + preferences: SFPreferences | undefined, +): { + ctx: ExtensionContext; + pi: ExtensionAPI; + s: AutoSession; + deps: LoopDeps; + runKernelLoop: ( + ctx: ExtensionContext, + pi: ExtensionAPI, + s: AutoSession, + deps: LoopDeps, + ) => Promise; + runLegacyLoop: ( + ctx: ExtensionContext, + pi: ExtensionAPI, + s: AutoSession, + deps: LoopDeps, + ) => Promise; + calls: { + kernel: number; + legacy: number; + kernelDeps: LoopDeps | null; + legacyDeps: LoopDeps | null; + }; +} { + const calls = { + kernel: 0, + legacy: 0, + kernelDeps: null as LoopDeps | null, + legacyDeps: null as LoopDeps | null, + }; + + return { + ctx: { + sessionManager: { + getSessionId: (): string => "session-test", + }, + } as unknown as ExtensionContext, + pi: {} as unknown as ExtensionAPI, + s: { + basePath, + autoStartTime: 1, + } as unknown as AutoSession, + deps: { + loadEffectiveSFPreferences: () => ({ preferences }), + } as unknown as LoopDeps, + runKernelLoop: async (_ctx, _pi, _s, loopDeps): Promise => { + calls.kernel += 1; + calls.kernelDeps = loopDeps; + }, + runLegacyLoop: async (_ctx, _pi, _s, loopDeps): Promise => { + calls.legacy += 1; + calls.legacyDeps = loopDeps; + }, + calls, + }; +} + +function readParityEvents(basePath: string): Array> { + const file = join(sfRoot(basePath), "runtime", "uok-parity.jsonl"); + const raw = readFileSync(file, "utf-8").trim(); + if (raw.length === 0) return []; + return raw.split("\n").map(line => JSON.parse(line) as Record); +} + +test("runAutoLoopWithUok uses kernel path by default and records uok-kernel parity", async () => { + const basePath = makeBasePath(); + try { + const args = makeArgs(basePath, { + uok: { + enabled: true, + audit_envelope: { enabled: false }, + gitops: { enabled: false }, + }, + }); + await runAutoLoopWithUok(args); + + assert.equal(args.calls.kernel, 1); + assert.equal(args.calls.legacy, 0); + assert.ok(args.calls.kernelDeps); + assert.notEqual(args.calls.kernelDeps, args.deps); + assert.ok(args.calls.kernelDeps?.uokObserver); + + const events = readParityEvents(basePath); + assert.equal(events.length, 2); + assert.equal(events[0]?.path, "uok-kernel"); + assert.equal(events[0]?.phase, "enter"); + assert.equal(events[1]?.path, "uok-kernel"); + assert.equal(events[1]?.phase, "exit"); + assert.equal(events[1]?.status, "ok"); + } finally { + rmSync(basePath, { recursive: true, force: true }); + } +}); + +test("runAutoLoopWithUok uses legacy path when explicit legacy fallback is enabled", async () => { + const basePath = makeBasePath(); + try { + const args = makeArgs(basePath, { + uok: { + enabled: true, + legacy_fallback: { enabled: true }, + }, + }); + await runAutoLoopWithUok(args); + + assert.equal(args.calls.kernel, 0); + assert.equal(args.calls.legacy, 1); + assert.equal(args.calls.legacyDeps, args.deps); + + const events = readParityEvents(basePath); + assert.equal(events.length, 2); + assert.equal(events[0]?.path, "legacy-fallback"); + assert.equal(events[1]?.path, "legacy-fallback"); + assert.equal(events[1]?.status, "ok"); + } finally { + rmSync(basePath, { recursive: true, force: true }); + } +}); + +test("runAutoLoopWithUok respects SF_UOK_FORCE_LEGACY emergency switch", async () => { + const basePath = makeBasePath(); + const previous = process.env.SF_UOK_FORCE_LEGACY; + process.env.SF_UOK_FORCE_LEGACY = "1"; + try { + const args = makeArgs(basePath, { + uok: { + enabled: true, + }, + }); + await runAutoLoopWithUok(args); + + assert.equal(args.calls.kernel, 0); + assert.equal(args.calls.legacy, 1); + + const events = readParityEvents(basePath); + assert.equal(events.length, 2); + assert.equal(events[0]?.path, "legacy-fallback"); + assert.equal(events[1]?.path, "legacy-fallback"); + } finally { + if (previous === undefined) delete process.env.SF_UOK_FORCE_LEGACY; + else process.env.SF_UOK_FORCE_LEGACY = previous; + rmSync(basePath, { recursive: true, force: true }); + } +}); diff --git a/src/resources/extensions/sf/tests/uok-loop-adapter-writer.test.ts b/src/resources/extensions/sf/tests/uok-loop-adapter-writer.test.ts new file mode 100644 index 000000000..21ecc4608 --- /dev/null +++ b/src/resources/extensions/sf/tests/uok-loop-adapter-writer.test.ts @@ -0,0 +1,65 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import { existsSync, mkdtempSync, readFileSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +import { createTurnObserver } from "../uok/loop-adapter.ts"; +import { hasActiveWriterToken, resetWriterTokensForTests } from "../uok/writer.ts"; + +function readAuditPayloads(basePath: string): Array> { + const path = join(basePath, ".sf", "audit", "events.jsonl"); + if (!existsSync(path)) return []; + return readFileSync(path, "utf-8") + .split("\n") + .filter(Boolean) + .map((line) => JSON.parse(line) as { payload?: Record }) + .map((event) => event.payload ?? {}); +} + +test("uok turn observer adds writer sequence metadata to audit events", (t) => { + const basePath = mkdtempSync(join(tmpdir(), "sf-uok-loop-writer-")); + resetWriterTokensForTests(); + t.after(() => { + resetWriterTokensForTests(); + rmSync(basePath, { recursive: true, force: true }); + }); + + const observer = createTurnObserver({ + basePath, + gitAction: "status-only", + gitPush: false, + enableAudit: true, + enableGitops: false, + }); + + observer.onTurnStart({ + basePath, + traceId: "trace-1", + turnId: "turn-1", + iteration: 1, + unitType: "execute-task", + unitId: "M001/S01/T01", + startedAt: new Date().toISOString(), + }); + assert.equal(hasActiveWriterToken(basePath, "turn-1"), true); + + observer.onTurnResult({ + traceId: "trace-1", + turnId: "turn-1", + iteration: 1, + unitType: "execute-task", + unitId: "M001/S01/T01", + status: "completed", + failureClass: "none", + phaseResults: [], + startedAt: new Date().toISOString(), + finishedAt: new Date().toISOString(), + }); + + assert.equal(hasActiveWriterToken(basePath, "turn-1"), false); + const payloads = readAuditPayloads(basePath); + assert.equal(payloads[0]?.writeSequence, 1); + assert.equal(payloads[1]?.writeSequence, 2); + assert.equal(typeof payloads[0]?.writerTokenId, "string"); +}); diff --git a/src/resources/extensions/sf/tests/uok-parity-report.test.ts b/src/resources/extensions/sf/tests/uok-parity-report.test.ts new file mode 100644 index 000000000..63165b75f --- /dev/null +++ b/src/resources/extensions/sf/tests/uok-parity-report.test.ts @@ -0,0 +1,42 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import { mkdtempSync, readFileSync, rmSync, appendFileSync, mkdirSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +import { buildParityReport, parseParityEvents, writeParityReport } from "../uok/parity-report.ts"; + +test("uok parity report summarizes paths, statuses, and fallback use", () => { + const events = parseParityEvents([ + JSON.stringify({ path: "uok-kernel", phase: "enter" }), + JSON.stringify({ path: "uok-kernel", phase: "exit", status: "ok" }), + JSON.stringify({ path: "legacy-fallback", phase: "enter" }), + JSON.stringify({ path: "legacy-fallback", phase: "exit", status: "error", error: "boom" }), + ].join("\n")); + + const report = buildParityReport(events, "/tmp/uok-parity.jsonl"); + assert.equal(report.totalEvents, 4); + assert.equal(report.paths["uok-kernel"], 2); + assert.equal(report.fallbackInvocations, 2); + assert.deepEqual(report.criticalMismatches, ["boom"]); +}); + +test("uok parity report writes runtime report artifact", (t) => { + const basePath = mkdtempSync(join(tmpdir(), "sf-uok-parity-")); + t.after(() => { + rmSync(basePath, { recursive: true, force: true }); + }); + + const runtime = join(basePath, ".sf", "runtime"); + mkdirSync(runtime, { recursive: true }); + appendFileSync( + join(runtime, "uok-parity.jsonl"), + `${JSON.stringify({ path: "uok-kernel", phase: "exit", status: "ok" })}\n`, + "utf-8", + ); + + const report = writeParityReport(basePath); + assert.equal(report.totalEvents, 1); + const saved = JSON.parse(readFileSync(join(runtime, "uok-parity-report.json"), "utf-8")); + assert.equal(saved.statuses.ok, 1); +}); diff --git a/src/resources/extensions/sf/tests/uok-writer.test.ts b/src/resources/extensions/sf/tests/uok-writer.test.ts new file mode 100644 index 000000000..655b486e9 --- /dev/null +++ b/src/resources/extensions/sf/tests/uok-writer.test.ts @@ -0,0 +1,75 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +import { + acquireWriterToken, + hasActiveWriterToken, + nextWriteRecord, + releaseWriterToken, + resetWriterTokensForTests, +} from "../uok/writer.ts"; + +test("uok writer enforces one active token per turn", (t) => { + const basePath = mkdtempSync(join(tmpdir(), "sf-uok-writer-")); + resetWriterTokensForTests(); + t.after(() => { + resetWriterTokensForTests(); + rmSync(basePath, { recursive: true, force: true }); + }); + + const token = acquireWriterToken({ + basePath, + traceId: "trace-1", + turnId: "turn-1", + }); + assert.equal(hasActiveWriterToken(basePath, "turn-1"), true); + assert.throws( + () => acquireWriterToken({ basePath, traceId: "trace-1", turnId: "turn-1" }), + /already active/, + ); + + releaseWriterToken(basePath, token); + assert.equal(hasActiveWriterToken(basePath, "turn-1"), false); +}); + +test("uok writer produces monotonic sequence records across turns", (t) => { + const basePath = mkdtempSync(join(tmpdir(), "sf-uok-writer-seq-")); + resetWriterTokensForTests(); + t.after(() => { + resetWriterTokensForTests(); + rmSync(basePath, { recursive: true, force: true }); + }); + + const token1 = acquireWriterToken({ + basePath, + traceId: "trace-1", + turnId: "turn-1", + }); + const first = nextWriteRecord({ + basePath, + token: token1, + category: "audit", + operation: "append", + path: ".sf/audit/events.jsonl", + }); + releaseWriterToken(basePath, token1); + + const token2 = acquireWriterToken({ + basePath, + traceId: "trace-2", + turnId: "turn-2", + }); + const second = nextWriteRecord({ + basePath, + token: token2, + category: "gitops", + operation: "insert", + }); + + assert.equal(first.sequence.sequence, 1); + assert.equal(second.sequence.sequence, 2); + assert.equal(second.sequence.turnId, "turn-2"); +});