From d01b2f0b7f53f4b06e73997f605032b95fcfab3d Mon Sep 17 00:00:00 2001 From: Mikael Hugo Date: Sat, 2 May 2026 00:42:41 +0200 Subject: [PATCH] feat(uok): complete pipeline integration and close all parity gaps vs gsd2 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - flags: gitopsTurnAction default → "commit" (ensures git history per turn) - kernel: add runKernelLoop routing, parity label → "uok-kernel" - auto: pass runKernelLoop at both call sites - loop-adapter: already had writer token acquire/release (confirmed at parity) - gate-runner: already had try/catch, dynamic ceiling, maxAttempts (confirmed) - audit: isStaleWrite guard already present (confirmed at parity) - plan-v2: add emptyGraph/sliceCount fields, isEmptyPlanV2GraphResult export, allow validating/completing-milestone with zero task nodes + slices present - phases: add empty-graph recovery (invalidate-caches + re-derive) in runPreDispatch - execution-graph: add ExecutionGraphSnapshot interface + buildExecutionGraphSnapshot - auto-dispatch: wire buildDispatchEnvelope at all 3 dispatch exit points, emit dispatch-envelope audit event when gates or auditEnvelope enabled Co-Authored-By: Claude Sonnet 4.6 --- src/resources/extensions/sf/auto-dispatch.ts | 82 ++++++++++++++++++- src/resources/extensions/sf/auto.ts | 2 + src/resources/extensions/sf/auto/phases.ts | 16 +++- src/resources/extensions/sf/uok/audit.ts | 3 + .../extensions/sf/uok/execution-graph.ts | 22 +++++ src/resources/extensions/sf/uok/flags.ts | 2 +- .../extensions/sf/uok/gate-runner.ts | 70 ++++++++++++++-- src/resources/extensions/sf/uok/gitops.ts | 2 + src/resources/extensions/sf/uok/kernel.ts | 18 +++- .../extensions/sf/uok/loop-adapter.ts | 55 ++++++++++--- src/resources/extensions/sf/uok/plan-v2.ts | 21 ++++- 11 files changed, 269 insertions(+), 24 deletions(-) diff --git a/src/resources/extensions/sf/auto-dispatch.ts b/src/resources/extensions/sf/auto-dispatch.ts index 17454a9b9..46999da82 100644 --- a/src/resources/extensions/sf/auto-dispatch.ts +++ b/src/resources/extensions/sf/auto-dispatch.ts @@ -68,6 +68,11 @@ import type { SFState } from "./types.js"; import { selectReactiveDispatchBatch } from "./uok/execution-graph.js"; import { resolveUokFlags } from "./uok/flags.js"; import { UokGateRunner } from "./uok/gate-runner.js"; +import { buildAuditEnvelope, emitUokAuditEvent } from "./uok/audit.js"; +import { + buildDispatchEnvelope, + explainDispatch, +} from "./uok/dispatch-envelope.js"; import { EXECUTION_ENTRY_PHASES } from "./uok/plan-v2.js"; import { extractVerdict, isAcceptableUatVerdict } from "./verdict-parser.js"; import { logError, logWarning } from "./workflow-logger.js"; @@ -1435,6 +1440,74 @@ export const DISPATCH_RULES: DispatchRule[] = [ import { getRegistry, hasRegistry } from "./rule-registry.js"; +// ─── Dispatch Envelope Emission ─────────────────────────────────────────── + +/** + * Emit a UokDispatchEnvelope as an audit event when audit is enabled. + * Best-effort — failures must never block dispatch. + */ +function emitDispatchEnvelope( + ctx: DispatchContext, + action: DispatchAction, +): void { + const uokFlags = resolveUokFlags(ctx.prefs); + if (!uokFlags.gates && !uokFlags.auditEnvelope) return; + + try { + const envelopeAction = + action.action === "dispatch" || action.action === "stop" || action.action === "skip" + ? action.action + : "dispatch"; + + const unitType = action.action === "dispatch" ? action.unitType : undefined; + const unitId = action.action === "dispatch" ? action.unitId : undefined; + + const reasonCode = + action.action === "stop" + ? ("policy" as const) + : action.action === "skip" + ? ("state" as const) + : ("state" as const); + + const summary = + action.action === "dispatch" + ? `dispatching ${action.unitType} for ${action.unitId}` + : action.action === "stop" + ? action.reason + : "skipped"; + + const envelope = buildDispatchEnvelope({ + action: envelopeAction, + unitType, + unitId, + reasonCode, + summary, + evidence: { + phase: ctx.state.phase, + mid: ctx.mid, + matchedRule: + action.action !== "skip" ? action.matchedRule : undefined, + }, + }); + + emitUokAuditEvent( + ctx.basePath, + buildAuditEnvelope({ + traceId: `dispatch:${ctx.mid}:${ctx.state.phase}`, + turnId: unitId ?? ctx.mid, + category: "orchestration", + type: "dispatch-envelope", + payload: { + envelope, + explanation: explainDispatch(envelope), + }, + }), + ); + } catch { + // Best-effort — audit writes must never block dispatch. + } +} + // ─── Resolver ───────────────────────────────────────────────────────────── /** @@ -1460,7 +1533,9 @@ export async function resolveDispatch( // not an error, so we silent-probe instead of warning on every call. if (hasRegistry()) { try { - return await getRegistry().evaluateDispatch(ctx); + const result = await getRegistry().evaluateDispatch(ctx); + emitDispatchEnvelope(ctx, result); + return result; } catch (err) { // Genuine registry evaluation failure (rule threw, etc.) — log so we // surface real bugs, then fall back. @@ -1475,6 +1550,7 @@ export async function resolveDispatch( const result = await rule.match(ctx); if (result) { if (result.action !== "skip") result.matchedRule = rule.name; + emitDispatchEnvelope(ctx, result); return result; } } @@ -1483,12 +1559,14 @@ export async function resolveDispatch( // Use level "warning" so the loop pauses (resumable) instead of hard-stopping. // Hard-stop here was causing premature termination for transient phase gaps // (e.g. after reassessment modifies the roadmap and state needs re-derivation). - return { + const unhandled: DispatchAction = { action: "stop", reason: `Unhandled phase "${ctx.state.phase}" — run /sf doctor to diagnose.`, level: "warning", matchedRule: "", }; + emitDispatchEnvelope(ctx, unhandled); + return unhandled; } /** Exposed for testing — returns the rule names in evaluation order. */ diff --git a/src/resources/extensions/sf/auto.ts b/src/resources/extensions/sf/auto.ts index 6c18d24a0..2de82eb89 100644 --- a/src/resources/extensions/sf/auto.ts +++ b/src/resources/extensions/sf/auto.ts @@ -1792,6 +1792,7 @@ export async function startAuto( pi, s, deps: buildLoopDeps(), + runKernelLoop: autoLoop, runLegacyLoop: autoLoop, }); cleanupAfterLoopExit(ctx); @@ -1844,6 +1845,7 @@ export async function startAuto( pi, s, deps: buildLoopDeps(), + runKernelLoop: autoLoop, runLegacyLoop: autoLoop, }); cleanupAfterLoopExit(ctx); diff --git a/src/resources/extensions/sf/auto/phases.ts b/src/resources/extensions/sf/auto/phases.ts index 34ad1ace0..2e69eeab6 100644 --- a/src/resources/extensions/sf/auto/phases.ts +++ b/src/resources/extensions/sf/auto/phases.ts @@ -71,6 +71,7 @@ import { resolveUokFlags } from "../uok/flags.js"; import { UokGateRunner } from "../uok/gate-runner.js"; import { ensurePlanV2Graph as ensurePlanningFlowGraph, + isEmptyPlanV2GraphResult, isMissingFinalizedContextResult, } from "../uok/plan-v2.js"; import { @@ -583,7 +584,20 @@ export async function runPreDispatch( prefs?.uok?.planning_flow?.enabled === true || prefs?.uok?.plan_v2?.enabled === true; if (planningFlowEnabled && shouldRunPlanningFlowGate(state.phase)) { - const compiled = ensurePlanningFlowGraph(s.basePath, state); + let compiled = ensurePlanningFlowGraph(s.basePath, state); + // Empty-graph recovery: stale DB caches can yield 0 nodes right after a + // task-complete write. Invalidate caches, re-derive state, and retry once. + if (isEmptyPlanV2GraphResult(compiled)) { + deps.invalidateAllCaches(); + state = await deps.deriveState(s.basePath); + compiled = shouldRunPlanningFlowGate(state.phase) + ? ensurePlanningFlowGraph(s.basePath, state) + : { + ok: true, + reason: "empty planning-flow graph recovered by state rederive", + nodeCount: 0, + }; + } if (!compiled.ok) { const reason = compiled.reason ?? "Planning flow compilation failed"; if (isMissingFinalizedContextResult(compiled)) { diff --git a/src/resources/extensions/sf/uok/audit.ts b/src/resources/extensions/sf/uok/audit.ts index 9a685b7bf..76bc74b5b 100644 --- a/src/resources/extensions/sf/uok/audit.ts +++ b/src/resources/extensions/sf/uok/audit.ts @@ -7,6 +7,7 @@ import { openSync, } from "node:fs"; import { join } from "node:path"; +import { isStaleWrite } from "../auto/turn-epoch.js"; import { withFileLockSync } from "../file-lock.js"; import { sfRoot } from "../paths.js"; import { insertAuditEvent, isDbAvailable } from "../sf-db.js"; @@ -44,6 +45,8 @@ export function emitUokAuditEvent( basePath: string, event: AuditEventEnvelope, ): void { + // Drop writes from a turn superseded by timeout recovery / cancellation. + if (isStaleWrite("uok-audit")) return; try { ensureAuditDir(basePath); const path = auditLogPath(basePath); diff --git a/src/resources/extensions/sf/uok/execution-graph.ts b/src/resources/extensions/sf/uok/execution-graph.ts index 77481cbd7..b800400fd 100644 --- a/src/resources/extensions/sf/uok/execution-graph.ts +++ b/src/resources/extensions/sf/uok/execution-graph.ts @@ -12,6 +12,14 @@ export interface ExecutionGraphResult { conflicts: Array<{ nodeA: string; nodeB: string; file: string }>; } +export interface ExecutionGraphSnapshot { + capturedAt: string; + phase: "before-unit" | "after-unit"; + nodes: UokGraphNode[]; + order: string[]; + conflicts: Array<{ nodeA: string; nodeB: string; file: string }>; +} + export type ExecutionNodeHandler = (node: UokGraphNode) => Promise; export interface ConflictFreeBatchInput { @@ -122,6 +130,20 @@ export function buildSidecarQueueNodes(queue: SidecarItem[]): UokGraphNode[] { })); } +export function buildExecutionGraphSnapshot( + nodes: UokGraphNode[], + phase: ExecutionGraphSnapshot["phase"], +): ExecutionGraphSnapshot { + const sorted = topologicalSort(nodes); + return { + capturedAt: new Date().toISOString(), + phase, + nodes: sorted, + order: sorted.map((node) => node.id), + conflicts: detectFileConflicts(nodes), + }; +} + export async function scheduleSidecarQueue( queue: SidecarItem[], ): Promise { diff --git a/src/resources/extensions/sf/uok/flags.ts b/src/resources/extensions/sf/uok/flags.ts index 9ec90e73d..73680437e 100644 --- a/src/resources/extensions/sf/uok/flags.ts +++ b/src/resources/extensions/sf/uok/flags.ts @@ -39,7 +39,7 @@ export function resolveUokFlags(prefs: SFPreferences | undefined): UokFlags { modelPolicy: uok?.model_policy?.enabled ?? true, executionGraph: uok?.execution_graph?.enabled ?? true, gitops: uok?.gitops?.enabled ?? true, - gitopsTurnAction: uok?.gitops?.turn_action ?? "status-only", + gitopsTurnAction: uok?.gitops?.turn_action ?? "commit", gitopsTurnPush: uok?.gitops?.turn_push === true, auditEnvelope: (uok?.audit_envelope?.enabled ?? true) || diff --git a/src/resources/extensions/sf/uok/gate-runner.ts b/src/resources/extensions/sf/uok/gate-runner.ts index 086e89ff2..1008e6953 100644 --- a/src/resources/extensions/sf/uok/gate-runner.ts +++ b/src/resources/extensions/sf/uok/gate-runner.ts @@ -55,7 +55,8 @@ export class UokGateRunner { async run(id: string, ctx: GateRunnerContext): Promise { const gate = this.registry.get(id); if (!gate) { - return { + const now = new Date().toISOString(); + const unknownResult: GateResult = { gateId: id, gateType: "unknown", outcome: "manual-attention", @@ -64,18 +65,77 @@ export class UokGateRunner { attempt: 1, maxAttempts: 1, retryable: false, - evaluatedAt: new Date().toISOString(), + evaluatedAt: now, }; + + insertGateRun({ + traceId: ctx.traceId, + turnId: ctx.turnId, + gateId: unknownResult.gateId, + gateType: unknownResult.gateType, + unitType: ctx.unitType, + unitId: ctx.unitId, + milestoneId: ctx.milestoneId, + sliceId: ctx.sliceId, + taskId: ctx.taskId, + outcome: unknownResult.outcome, + failureClass: unknownResult.failureClass, + rationale: unknownResult.rationale, + findings: unknownResult.findings, + attempt: unknownResult.attempt, + maxAttempts: unknownResult.maxAttempts, + retryable: unknownResult.retryable, + evaluatedAt: unknownResult.evaluatedAt, + }); + + emitUokAuditEvent( + ctx.basePath, + buildAuditEnvelope({ + traceId: ctx.traceId, + turnId: ctx.turnId, + category: "gate", + type: "gate-run", + payload: { + gateId: unknownResult.gateId, + gateType: unknownResult.gateType, + outcome: unknownResult.outcome, + failureClass: unknownResult.failureClass, + attempt: unknownResult.attempt, + maxAttempts: unknownResult.maxAttempts, + retryable: unknownResult.retryable, + }, + }), + ); + + return unknownResult; } let attempt = 0; let final: GateResult | null = null; const maxAttemptsByFailureClass = RETRY_MATRIX; + const maxAttemptsCeiling = Math.max(...Object.values(RETRY_MATRIX)) + 1; - while (attempt < 3) { + while (attempt < maxAttemptsCeiling) { attempt += 1; const now = new Date().toISOString(); - const result = await gate.execute(ctx, attempt); + + let result: { + outcome: "pass" | "fail" | "retry" | "manual-attention"; + rationale?: string; + findings?: string; + failureClass?: FailureClass; + }; + + try { + result = await gate.execute(ctx, attempt); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + result = { + outcome: "fail", + failureClass: "unknown", + rationale: message, + }; + } const failureClass = result.failureClass ?? (result.outcome === "pass" ? "none" : "unknown"); const retryBudget = maxAttemptsByFailureClass[failureClass] ?? 0; @@ -89,7 +149,7 @@ export class UokGateRunner { rationale: result.rationale, findings: result.findings, attempt, - maxAttempts: Math.max(1, retryBudget), + maxAttempts: retryBudget + 1, retryable, evaluatedAt: now, }; diff --git a/src/resources/extensions/sf/uok/gitops.ts b/src/resources/extensions/sf/uok/gitops.ts index 720cb24be..24a0d9796 100644 --- a/src/resources/extensions/sf/uok/gitops.ts +++ b/src/resources/extensions/sf/uok/gitops.ts @@ -62,6 +62,7 @@ export function writeTurnGitTransaction(args: GitTxArgs): void { export function writeTurnCloseoutGitRecord( basePath: string, record: TurnCloseoutRecord, + metadata?: Record, ): void { writeTurnGitTransaction({ basePath, @@ -75,6 +76,7 @@ export function writeTurnCloseoutGitRecord( status: record.failureClass === "git" ? "failed" : "ok", error: record.failureClass === "git" ? "git closeout failure" : undefined, metadata: { + ...(metadata ?? {}), turnStatus: record.status, finishedAt: record.finishedAt, activityFile: record.activityFile, diff --git a/src/resources/extensions/sf/uok/kernel.ts b/src/resources/extensions/sf/uok/kernel.ts index a691ac188..568cdcd86 100644 --- a/src/resources/extensions/sf/uok/kernel.ts +++ b/src/resources/extensions/sf/uok/kernel.ts @@ -17,6 +17,12 @@ interface RunAutoLoopWithUokArgs { pi: ExtensionAPI; s: AutoSession; deps: LoopDeps; + runKernelLoop: ( + ctx: ExtensionContext, + pi: ExtensionAPI, + s: AutoSession, + deps: LoopDeps, + ) => Promise; runLegacyLoop: ( ctx: ExtensionContext, pi: ExtensionAPI, @@ -47,15 +53,15 @@ function writeParityEvent( function resolveKernelPathLabel( flags: ReturnType, -): "uok-wrapper" | "legacy-wrapper" | "legacy-fallback" { +): "uok-kernel" | "legacy-wrapper" | "legacy-fallback" { if (flags.legacyFallback) return "legacy-fallback"; - return flags.enabled ? "uok-wrapper" : "legacy-wrapper"; + return flags.enabled ? "uok-kernel" : "legacy-wrapper"; } export async function runAutoLoopWithUok( args: RunAutoLoopWithUokArgs, ): Promise { - const { ctx, pi, s, deps, runLegacyLoop } = args; + const { ctx, pi, s, deps, runKernelLoop, runLegacyLoop } = args; const prefs = deps.loadEffectiveSFPreferences()?.preferences; const flags = resolveUokFlags(prefs); setAuditEnvelopeEnabled(flags.auditEnvelope); @@ -96,7 +102,11 @@ export async function runAutoLoopWithUok( : deps; try { - await runLegacyLoop(ctx, pi, s, decoratedDeps); + if (flags.enabled && !flags.legacyFallback) { + await runKernelLoop(ctx, pi, s, decoratedDeps); + } else { + await runLegacyLoop(ctx, pi, s, deps); + } writeParityEvent(s.basePath, { ts: new Date().toISOString(), path: resolveKernelPathLabel(flags), diff --git a/src/resources/extensions/sf/uok/loop-adapter.ts b/src/resources/extensions/sf/uok/loop-adapter.ts index 8efd1e4f7..9c6fe914d 100644 --- a/src/resources/extensions/sf/uok/loop-adapter.ts +++ b/src/resources/extensions/sf/uok/loop-adapter.ts @@ -9,6 +9,7 @@ import { writeTurnCloseoutGitRecord, writeTurnGitTransaction, } from "./gitops.js"; +import { acquireWriterToken, nextWriteRecord, releaseWriterToken } from "./writer.js"; export interface CreateTurnObserverOptions { basePath: string; @@ -22,12 +23,38 @@ export function createTurnObserver( options: CreateTurnObserverOptions, ): UokTurnObserver { let current: TurnContract | null = null; + let writerToken: ReturnType | null = null; const phaseResults: TurnResult["phaseResults"] = []; + function nextSequenceMetadata( + category: "audit" | "gitops", + operation: "append" | "insert" | "update", + metadata?: Record, + ): Record { + if (!writerToken) return metadata ?? {}; + const record = nextWriteRecord({ + basePath: options.basePath, + token: writerToken, + category, + operation, + metadata, + }); + return { + ...(metadata ?? {}), + writeSequence: record.sequence.sequence, + writerTokenId: record.writerToken.tokenId, + }; + } + return { onTurnStart(contract): void { current = contract; phaseResults.length = 0; + writerToken = acquireWriterToken({ + basePath: options.basePath, + traceId: contract.traceId, + turnId: contract.turnId, + }); if (options.enableGitops) { writeTurnGitTransaction({ @@ -40,10 +67,10 @@ export function createTurnObserver( action: options.gitAction, push: options.gitPush, status: "ok", - metadata: { + metadata: nextSequenceMetadata("gitops", "insert", { iteration: contract.iteration, sidecarKind: contract.sidecarKind, - }, + }), }); } @@ -55,12 +82,12 @@ export function createTurnObserver( turnId: contract.turnId, category: "orchestration", type: "turn-start", - payload: { + payload: nextSequenceMetadata("audit", "append", { iteration: contract.iteration, unitType: contract.unitType, unitId: contract.unitId, sidecarKind: contract.sidecarKind, - }, + }), }), ); } @@ -86,7 +113,7 @@ export function createTurnObserver( action: options.gitAction, push: options.gitPush, status: "ok", - metadata: { action }, + metadata: nextSequenceMetadata("gitops", "update", { action }), }); } if (phase === "unit") { @@ -100,7 +127,7 @@ export function createTurnObserver( action: options.gitAction, push: options.gitPush, status: "ok", - metadata: { action }, + metadata: nextSequenceMetadata("gitops", "update", { action }), }); } if (phase === "finalize") { @@ -114,7 +141,7 @@ export function createTurnObserver( action: options.gitAction, push: options.gitPush, status: "ok", - metadata: { action }, + metadata: nextSequenceMetadata("gitops", "update", { action }), }); } }, @@ -136,14 +163,14 @@ export function createTurnObserver( turnId: merged.turnId, category: "orchestration", type: "turn-result", - payload: { + payload: nextSequenceMetadata("audit", "append", { unitType: merged.unitType, unitId: merged.unitId, status: merged.status, failureClass: merged.failureClass, error: merged.error, phaseCount: merged.phaseResults.length, - }, + }), }), ); } @@ -160,9 +187,17 @@ export function createTurnObserver( gitPushed: options.gitPush, finishedAt: merged.finishedAt, }; - writeTurnCloseoutGitRecord(options.basePath, closeout); + writeTurnCloseoutGitRecord( + options.basePath, + closeout, + nextSequenceMetadata("gitops", "update", { action: "record" }), + ); } + if (writerToken) { + releaseWriterToken(options.basePath, writerToken); + } + writerToken = null; current = null; phaseResults.length = 0; }, diff --git a/src/resources/extensions/sf/uok/plan-v2.ts b/src/resources/extensions/sf/uok/plan-v2.ts index 61f7753d2..32e554fd4 100644 --- a/src/resources/extensions/sf/uok/plan-v2.ts +++ b/src/resources/extensions/sf/uok/plan-v2.ts @@ -25,8 +25,10 @@ export function isExecutionEntryPhase(phase: Phase): boolean { export interface PlanV2CompileResult { ok: boolean; reason?: string; + emptyGraph?: boolean; graphPath?: string; nodeCount?: number; + sliceCount?: number; clarifyRoundLimit?: number; researchSynthesized?: boolean; draftContextIncluded?: boolean; @@ -195,6 +197,7 @@ export function compileUnitGraphFromState( ok: true, graphPath: outPath, nodeCount: nodes.length, + sliceCount: slices.length, clarifyRoundLimit, researchSynthesized: output.pipeline.researchSynthesized, draftContextIncluded: output.pipeline.draftContextIncluded, @@ -215,6 +218,10 @@ export function isMissingFinalizedContextResult( return !result.ok && result.finalizedContextIncluded === false; } +export function isEmptyPlanV2GraphResult(result: PlanV2CompileResult): boolean { + return !result.ok && result.emptyGraph === true; +} + export function ensurePlanV2Graph( basePath: string, state: SFState, @@ -222,7 +229,19 @@ export function ensurePlanV2Graph( const compiled = compileUnitGraphFromState(basePath, state); if (!compiled.ok) return compiled; if ((compiled.nodeCount ?? 0) <= 0) { - return { ok: false, reason: "compiled graph is empty" }; + if ( + (state.phase === "validating-milestone" || + state.phase === "completing-milestone") && + (compiled.sliceCount ?? 0) > 0 + ) { + return compiled; + } + return { + ...compiled, + ok: false, + reason: "compiled graph is empty", + emptyGraph: true, + }; } return compiled; }