diff --git a/packages/pi-agent-core/src/agent-loop.test.ts b/packages/pi-agent-core/src/agent-loop.test.ts index 54e62679b..050fbf1d1 100644 --- a/packages/pi-agent-core/src/agent-loop.test.ts +++ b/packages/pi-agent-core/src/agent-loop.test.ts @@ -256,7 +256,7 @@ describe("agent-loop — steering during tool batches", () => { toolExecution: "sequential", getSteeringMessages: async () => { steeringPolls += 1; - return steeringPolls === 1 ? [steering] : []; + return steeringPolls === 2 ? [steering] : []; }, }; @@ -293,7 +293,7 @@ describe("agent-loop — steering during tool batches", () => { ); }); - it("defers queued steering until after the current tool batch when configured", async () => { + it("defers queued user steering until after the current tool batch by default", async () => { const calls: string[] = []; const tool = { name: "record", @@ -354,7 +354,6 @@ describe("agent-loop — steering during tool batches", () => { model: TEST_MODEL, convertToLlm: (msgs) => msgs.filter((m): m is any => m.role !== "custom"), toolExecution: "sequential", - interruptToolExecutionOnSteering: false, getSteeringMessages: async () => { steeringPolls += 1; return steeringPolls === 1 ? [steering] : []; @@ -393,6 +392,107 @@ describe("agent-loop — steering during tool batches", () => { "queued steering should still be delivered after the tool batch", ); }); + + it("skips remaining tool calls only when steering interruption is explicit", async () => { + const calls: string[] = []; + const tool = { + name: "record", + label: "Record", + description: "Record a value", + parameters: Type.Object({ value: Type.String() }), + execute: async (_id: string, args: { value: string }) => { + calls.push(args.value); + return { + content: [{ type: "text" as const, text: `recorded ${args.value}` }], + details: {}, + }; + }, + } satisfies AgentTool<{ value: string }>; + + const first = makeAssistantMessage({ + content: [ + { + type: "toolCall", + id: "tc-1", + name: "record", + arguments: { value: "one" }, + }, + { + type: "toolCall", + id: "tc-2", + name: "record", + arguments: { value: "two" }, + }, + ], + stopReason: "toolUse", + }); + const second = makeAssistantMessage({ + content: [{ type: "text", text: "saw steering" }], + stopReason: "stop", + }); + const mockStream = createMockStreamFn([first, second]); + let steeringPolls = 0; + const steering: AgentMessage = { + role: "user", + content: [{ type: "text", text: "stop and listen" }], + timestamp: Date.now(), + }; + + const context: AgentContext = { + systemPrompt: "You are a test agent.", + messages: [ + { + role: "user", + content: [{ type: "text", text: "record values" }], + timestamp: Date.now(), + }, + ], + tools: [tool], + }; + + const config: AgentLoopConfig = { + model: TEST_MODEL, + convertToLlm: (msgs) => msgs.filter((m): m is any => m.role !== "custom"), + toolExecution: "sequential", + interruptToolExecutionOnSteering: true, + getSteeringMessages: async () => { + steeringPolls += 1; + return steeringPolls === 2 ? [steering] : []; + }, + }; + + const stream = agentLoop( + [ + { + role: "user", + content: [{ type: "text", text: "record values" }], + timestamp: Date.now(), + }, + ], + context, + config, + undefined, + mockStream as any, + ); + + const events = await collectEvents(stream); + const skipped = events.filter( + (event) => + event.type === "tool_execution_end" && + JSON.stringify(event.result.content).includes( + "Skipped due to queued user message", + ), + ); + + assert.deepEqual(calls, ["one"]); + assert.equal(skipped.length, 1); + assert.ok( + events.some( + (event) => event.type === "message_start" && event.message === steering, + ), + "explicit interrupt steering should still be delivered", + ); + }); }); /** diff --git a/packages/pi-agent-core/src/agent-loop.ts b/packages/pi-agent-core/src/agent-loop.ts index 86cf7dd8c..4b51c1ba7 100644 --- a/packages/pi-agent-core/src/agent-loop.ts +++ b/packages/pi-agent-core/src/agent-loop.ts @@ -583,7 +583,7 @@ async function executeToolCallsSequential( const results: ToolResultMessage[] = []; let steeringMessages: AgentMessage[] | undefined; let preparationErrorCount = 0; - const interruptOnSteering = config.interruptToolExecutionOnSteering !== false; + const interruptOnSteering = config.interruptToolExecutionOnSteering === true; for (let index = 0; index < toolCalls.length; index++) { const toolCall = toolCalls[index]; @@ -662,7 +662,7 @@ async function executeToolCallsParallel( const runnableCalls: PreparedToolCall[] = []; let steeringMessages: AgentMessage[] | undefined; let preparationErrorCount = 0; - const interruptOnSteering = config.interruptToolExecutionOnSteering !== false; + const interruptOnSteering = config.interruptToolExecutionOnSteering === true; for (let index = 0; index < toolCalls.length; index++) { const toolCall = toolCalls[index]; diff --git a/packages/pi-agent-core/src/agent.ts b/packages/pi-agent-core/src/agent.ts index a262d5be9..c6b998b6a 100644 --- a/packages/pi-agent-core/src/agent.ts +++ b/packages/pi-agent-core/src/agent.ts @@ -61,7 +61,7 @@ export interface AgentOptions { /** * Whether steering messages interrupt the current assistant tool batch. - * Defaults to true for interactive sessions. + * Defaults to false so user comments are absorbed at the next safe boundary. */ interruptToolExecutionOnSteering?: boolean; @@ -201,7 +201,7 @@ export class Agent { this._externalToolExecution = opts.externalToolExecution; this._getProviderOptions = opts.getProviderOptions; this._interruptToolExecutionOnSteering = - opts.interruptToolExecutionOnSteering ?? true; + opts.interruptToolExecutionOnSteering ?? false; } /** @@ -329,8 +329,8 @@ export class Agent { } /** - * Queue a steering message to interrupt the agent mid-run. - * Delivered after current tool execution, skips remaining tools. + * Queue a steering message for the agent mid-run. + * Delivered after the current tool batch unless interrupt behavior is explicitly enabled. */ steer(m: AgentMessage, origin: "user" | "system" = "system") { this.steeringQueue.push({ message: m, origin }); diff --git a/packages/pi-agent-core/src/types.ts b/packages/pi-agent-core/src/types.ts index 781f63bd2..6719d24e8 100644 --- a/packages/pi-agent-core/src/types.ts +++ b/packages/pi-agent-core/src/types.ts @@ -153,9 +153,9 @@ export interface AgentLoopConfig extends SimpleStreamOptions { /** * Returns steering messages to inject into the conversation mid-run. * - * Called after each tool execution to check for user interruptions. - * If messages are returned, remaining tool calls are skipped and - * these messages are added to the context before the next LLM call. + * Called after tool execution boundaries to check for user steering. + * By default, returned messages are added to the context after the current + * tool batch finishes and before the next LLM call. * * Use this for "steering" the agent while it's working. */ @@ -164,10 +164,10 @@ export interface AgentLoopConfig extends SimpleStreamOptions { /** * Whether steering messages interrupt the current assistant tool batch. * - * Default true preserves interactive behavior: a user message typed while - * tools are running skips remaining tool calls and lets the model react - * immediately. Headless/auto runners can set this false so incidental - * queued messages are deferred until the current tool batch finishes. + * Default false preserves active tool execution: a user message typed while + * tools are running is absorbed after the current tool batch finishes. Set + * this true only for explicit stop-now workflows that should skip remaining + * tool calls. */ interruptToolExecutionOnSteering?: boolean; diff --git a/packages/pi-coding-agent/src/core/agent-session.ts b/packages/pi-coding-agent/src/core/agent-session.ts index b6e14bc17..19424e35c 100644 --- a/packages/pi-coding-agent/src/core/agent-session.ts +++ b/packages/pi-coding-agent/src/core/agent-session.ts @@ -237,7 +237,7 @@ export interface PromptOptions { expandPromptTemplates?: boolean; /** Image attachments */ images?: ImageContent[]; - /** When streaming, how to queue the message: "steer" (interrupt) or "followUp" (wait). Required if streaming. */ + /** When streaming, how to queue the message: "steer" (next safe turn) or "followUp" (after current run). Required if streaming. */ streamingBehavior?: "steer" | "followUp"; /** Source of input for extension input event handlers. Defaults to "interactive". */ source?: InputSource; @@ -1461,8 +1461,8 @@ export class AgentSession { } /** - * Queue a steering message to interrupt the agent mid-run. - * Delivered after current tool execution, skips remaining tools. + * Queue a steering message for the agent mid-run. + * Delivered after the current tool batch at the next safe LLM turn. * Expands skill commands and prompt templates. Errors on extension commands. * @param images Optional image attachments to include with the message * @throws Error if text is an extension command diff --git a/packages/pi-coding-agent/src/core/sdk.ts b/packages/pi-coding-agent/src/core/sdk.ts index ca85567c7..922276fc0 100644 --- a/packages/pi-coding-agent/src/core/sdk.ts +++ b/packages/pi-coding-agent/src/core/sdk.ts @@ -447,7 +447,7 @@ export async function createAgentSession( }, steeringMode: settingsManager.getSteeringMode(), followUpMode: settingsManager.getFollowUpMode(), - interruptToolExecutionOnSteering: process.env.SF_HEADLESS !== "1", + interruptToolExecutionOnSteering: false, transport: settingsManager.getTransport(), thinkingBudgets: settingsManager.getThinkingBudgets(), maxRetryDelayMs: settingsManager.getRetrySettings().maxDelayMs, diff --git a/packages/pi-coding-agent/src/modes/rpc/rpc-client.ts b/packages/pi-coding-agent/src/modes/rpc/rpc-client.ts index 17d05033f..59a0cbefb 100644 --- a/packages/pi-coding-agent/src/modes/rpc/rpc-client.ts +++ b/packages/pi-coding-agent/src/modes/rpc/rpc-client.ts @@ -278,7 +278,7 @@ export class RpcClient { } /** - * Queue a steering message to interrupt the agent mid-run. + * Queue a steering message for the agent at the next safe turn. */ async steer(message: string, images?: ImageContent[]): Promise { await this.send({ type: "steer", message, images }); diff --git a/packages/rpc-client/src/rpc-client.ts b/packages/rpc-client/src/rpc-client.ts index 10566ff44..9ff3eac37 100644 --- a/packages/rpc-client/src/rpc-client.ts +++ b/packages/rpc-client/src/rpc-client.ts @@ -360,7 +360,7 @@ export class RpcClient { } /** - * Queue a steering message to interrupt the agent mid-run. + * Queue a steering message for the agent at the next safe turn. */ async steer(message: string, images?: ImageContent[]): Promise { await this.send({ type: "steer", message, images }); diff --git a/src/resources/extensions/sf/doctor-engine-checks.js b/src/resources/extensions/sf/doctor-engine-checks.js index 44e1070b8..9a73ed11a 100644 --- a/src/resources/extensions/sf/doctor-engine-checks.js +++ b/src/resources/extensions/sf/doctor-engine-checks.js @@ -8,6 +8,10 @@ import { openDatabase, } from "./sf-db.js"; import { deriveState } from "./state.js"; +import { + summarizeParityHealth, + writeParityReport, +} from "./uok/parity-report.js"; import { readEvents } from "./workflow-events.js"; import { renderAllProjections } from "./workflow-projections.js"; @@ -139,6 +143,45 @@ export async function checkEngineHealth( } catch { // Non-fatal — legacy directory normalization must never block doctor. } + try { + const parityReport = writeParityReport(basePath); + const parityHealth = summarizeParityHealth(parityReport); + if (!parityHealth.ok) { + issues.push({ + severity: "warning", + code: "uok_parity_current_issue", + scope: "project", + unitId: "project", + message: + `UOK parity has current issues: ${parityHealth.current.criticalMismatches} critical mismatch(es), ` + + `${parityHealth.current.missingExitEvents} missing exit(s), ${parityHealth.current.errorEvents} error event(s).`, + file: ".sf/runtime/uok-parity-report.json", + fixable: false, + }); + } else if ( + parityHealth.historical.criticalMismatches > 0 || + parityHealth.historical.legacyMissingExitEvents > 0 || + parityHealth.historical.errorEvents > 0 || + parityHealth.historical.unmatchedRuns > 0 + ) { + issues.push({ + severity: "info", + code: "uok_parity_historical_drift", + scope: "project", + unitId: "project", + message: + `UOK parity is clean now; historical drift remains: ` + + `${parityHealth.historical.criticalMismatches} old mismatch(es), ` + + `${parityHealth.historical.legacyMissingExitEvents} legacy missing exit(s), ` + + `${parityHealth.historical.errorEvents} old error event(s), ` + + `${parityHealth.historical.unmatchedRuns} stale unmatched run(s).`, + file: ".sf/runtime/uok-parity-report.json", + fixable: false, + }); + } + } catch { + // Non-fatal — UOK parity diagnostics must never block doctor. + } // ── DB constraint violation detection (full doctor only, not pre-dispatch per D-10) ── try { if (isDbAvailable()) { diff --git a/src/resources/extensions/sf/sf-db.js b/src/resources/extensions/sf/sf-db.js index b3bfb5531..3fd1f277e 100644 --- a/src/resources/extensions/sf/sf-db.js +++ b/src/resources/extensions/sf/sf-db.js @@ -78,7 +78,7 @@ function openRawDb(path) { loadProvider(); return new DatabaseSync(path); } -const SCHEMA_VERSION = 25; +const SCHEMA_VERSION = 26; function indexExists(db, name) { return !!db .prepare( @@ -503,6 +503,19 @@ function initSchema(db, fileBacked) { cost_usd REAL DEFAULT NULL, recorded_at INTEGER NOT NULL ) + `); + db.exec(` + CREATE TABLE IF NOT EXISTS uok_runs ( + run_id TEXT PRIMARY KEY, + session_id TEXT DEFAULT NULL, + path TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL DEFAULT 'started', + started_at TEXT NOT NULL, + ended_at TEXT DEFAULT NULL, + error TEXT DEFAULT NULL, + flags_json TEXT NOT NULL DEFAULT '{}', + updated_at TEXT NOT NULL + ) `); db.exec( "CREATE INDEX IF NOT EXISTS idx_memories_active ON memories(superseded_by)", @@ -558,6 +571,12 @@ function initSchema(db, fileBacked) { db.exec( "CREATE INDEX IF NOT EXISTS idx_llm_task_outcomes_provider ON llm_task_outcomes(provider, recorded_at DESC)", ); + db.exec( + "CREATE INDEX IF NOT EXISTS idx_uok_runs_status_started ON uok_runs(status, started_at DESC)", + ); + db.exec( + "CREATE INDEX IF NOT EXISTS idx_uok_runs_session ON uok_runs(session_id, started_at DESC)", + ); ensureRepoProfileTables(db); db.exec( `CREATE VIEW IF NOT EXISTS active_decisions AS SELECT * FROM decisions WHERE superseded_by IS NULL`, @@ -1429,6 +1448,33 @@ function migrateSchema(db) { ":applied_at": new Date().toISOString(), }); } + if (currentVersion < 26) { + db.exec(` + CREATE TABLE IF NOT EXISTS uok_runs ( + run_id TEXT PRIMARY KEY, + session_id TEXT DEFAULT NULL, + path TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL DEFAULT 'started', + started_at TEXT NOT NULL, + ended_at TEXT DEFAULT NULL, + error TEXT DEFAULT NULL, + flags_json TEXT NOT NULL DEFAULT '{}', + updated_at TEXT NOT NULL + ) + `); + db.exec( + "CREATE INDEX IF NOT EXISTS idx_uok_runs_status_started ON uok_runs(status, started_at DESC)", + ); + db.exec( + "CREATE INDEX IF NOT EXISTS idx_uok_runs_session ON uok_runs(session_id, started_at DESC)", + ); + db.prepare( + "INSERT INTO schema_version (version, applied_at) VALUES (:version, :applied_at)", + ).run({ + ":version": 26, + ":applied_at": new Date().toISOString(), + }); + } db.exec("COMMIT"); } catch (err) { db.exec("ROLLBACK"); @@ -3363,6 +3409,90 @@ export function upsertTurnGitTransaction(entry) { ":updated_at": entry.updatedAt, }); } +export function recordUokRunStart(entry) { + if (!currentDb) return; + const now = entry.startedAt ?? new Date().toISOString(); + currentDb + .prepare(`INSERT INTO uok_runs ( + run_id, session_id, path, status, started_at, ended_at, error, flags_json, updated_at + ) VALUES ( + :run_id, :session_id, :path, 'started', :started_at, NULL, NULL, :flags_json, :updated_at + ) + ON CONFLICT(run_id) DO UPDATE SET + session_id = excluded.session_id, + path = excluded.path, + status = 'started', + started_at = excluded.started_at, + ended_at = NULL, + error = NULL, + flags_json = excluded.flags_json, + updated_at = excluded.updated_at`) + .run({ + ":run_id": entry.runId, + ":session_id": entry.sessionId ?? null, + ":path": entry.path ?? "", + ":started_at": now, + ":flags_json": JSON.stringify(entry.flags ?? {}), + ":updated_at": now, + }); +} +export function recordUokRunExit(entry) { + if (!currentDb) return; + const now = entry.endedAt ?? new Date().toISOString(); + currentDb + .prepare(`INSERT INTO uok_runs ( + run_id, session_id, path, status, started_at, ended_at, error, flags_json, updated_at + ) VALUES ( + :run_id, :session_id, :path, :status, :started_at, :ended_at, :error, :flags_json, :updated_at + ) + ON CONFLICT(run_id) DO UPDATE SET + session_id = COALESCE(excluded.session_id, uok_runs.session_id), + path = CASE WHEN excluded.path = '' THEN uok_runs.path ELSE excluded.path END, + status = excluded.status, + ended_at = excluded.ended_at, + error = excluded.error, + flags_json = CASE WHEN excluded.flags_json = '{}' THEN uok_runs.flags_json ELSE excluded.flags_json END, + updated_at = excluded.updated_at`) + .run({ + ":run_id": entry.runId, + ":session_id": entry.sessionId ?? null, + ":path": entry.path ?? "", + ":status": entry.status ?? "ok", + ":started_at": entry.startedAt ?? now, + ":ended_at": now, + ":error": entry.error ?? null, + ":flags_json": JSON.stringify(entry.flags ?? {}), + ":updated_at": now, + }); +} +export function getUokRuns(limit = 500) { + if (!currentDb) return []; + return currentDb + .prepare( + `SELECT run_id, session_id, path, status, started_at, ended_at, error, flags_json, updated_at + FROM uok_runs + ORDER BY started_at DESC + LIMIT :limit`, + ) + .all({ ":limit": limit }) + .map((row) => ({ + runId: row.run_id, + sessionId: row.session_id, + path: row.path, + status: row.status, + startedAt: row.started_at, + endedAt: row.ended_at, + error: row.error, + flags: (() => { + try { + return JSON.parse(row.flags_json || "{}"); + } catch { + return {}; + } + })(), + updatedAt: row.updated_at, + })); +} export function insertAuditEvent(entry) { if (!currentDb) return; transaction(() => { diff --git a/src/resources/extensions/sf/tests/uok-parity-report.test.mjs b/src/resources/extensions/sf/tests/uok-parity-report.test.mjs index 8ad2e7629..ce6b553d2 100644 --- a/src/resources/extensions/sf/tests/uok-parity-report.test.mjs +++ b/src/resources/extensions/sf/tests/uok-parity-report.test.mjs @@ -9,6 +9,13 @@ import { import { tmpdir } from "node:os"; import { join } from "node:path"; import { afterEach, test } from "vitest"; +import { + closeDatabase, + getUokRuns, + openDatabase, + recordUokRunExit, + recordUokRunStart, +} from "../sf-db.js"; import { runAutoLoopWithUok } from "../uok/kernel.js"; import { buildParityReport, @@ -21,6 +28,7 @@ const NOW = Date.parse("2026-05-06T00:00:00.000Z"); const tmpRoots = []; afterEach(() => { + closeDatabase(); for (const dir of tmpRoots.splice(0)) { rmSync(dir, { recursive: true, force: true }); } @@ -154,6 +162,102 @@ test("buildParityReport_run_exit_balances_enter", () => { assert.equal(hasCurrentParityWarning(report), false); }); +test("uok_run_ledger_records_lifecycle_as_primary_state", () => { + openDatabase(":memory:"); + recordUokRunStart({ + runId: "uok-ledger-1", + sessionId: "session-ledger", + path: "uok-kernel", + flags: { enabled: true }, + startedAt: new Date(NOW - 10_000).toISOString(), + }); + recordUokRunExit({ + runId: "uok-ledger-1", + sessionId: "session-ledger", + path: "uok-kernel", + flags: { enabled: true }, + status: "ok", + endedAt: new Date(NOW - 5_000).toISOString(), + }); + + const runs = getUokRuns(); + assert.equal(runs.length, 1); + assert.equal(runs[0].runId, "uok-ledger-1"); + assert.equal(runs[0].status, "ok"); + assert.equal(runs[0].sessionId, "session-ledger"); + assert.equal(runs[0].flags.enabled, true); + + const report = buildParityReport( + [], + "/tmp/uok-parity.jsonl", + NOW, + 30_000, + runs, + ); + assert.equal(report.ledgerRunCount, 1); + assert.equal(report.missingExitEvents, 0); + assert.equal(hasCurrentParityWarning(report), false); +}); + +test("buildParityReport_ledger_overrides_jsonl_lifecycle_counts", () => { + const report = buildParityReport( + [ + { + ts: new Date(NOW - 5_000).toISOString(), + runId: "uok-ledger-clean", + path: "uok-kernel", + phase: "enter", + status: "unknown", + }, + { + ts: new Date(NOW - 5_000).toISOString(), + path: "uok-kernel", + phase: "enter", + status: "unknown", + }, + ], + "/tmp/uok-parity.jsonl", + NOW, + 30_000, + [ + { + runId: "uok-ledger-clean", + sessionId: "session-ledger", + path: "uok-kernel", + status: "ok", + startedAt: new Date(NOW - 10_000).toISOString(), + endedAt: new Date(NOW - 5_000).toISOString(), + flags: { enabled: true }, + }, + ], + ); + + assert.equal(report.ledgerRunCount, 1); + assert.equal(report.legacyMissingExitEvents, 1); + assert.equal(report.missingExitEvents, 0); + assert.deepEqual(report.criticalMismatches, []); + assert.equal(hasCurrentParityWarning(report), false); +}); + +test("buildParityReport_ledger_error_is_current_warning", () => { + const report = buildParityReport([], "/tmp/uok-parity.jsonl", NOW, 30_000, [ + { + runId: "uok-ledger-error", + sessionId: "session-ledger", + path: "uok-kernel", + status: "error", + error: "ledger failure", + startedAt: new Date(NOW - 10_000).toISOString(), + endedAt: new Date(NOW - 5_000).toISOString(), + flags: { enabled: true }, + }, + ]); + + assert.equal(report.currentErrorEvents, 1); + assert.deepEqual(report.criticalMismatches, ["ledger failure"]); + assert.equal(hasCurrentParityWarning(report), true); +}); + test("buildParityReport_fresh_error_is_current_but_stale_error_is_historical", () => { const report = buildParityReport( [ diff --git a/src/resources/extensions/sf/uok/kernel.js b/src/resources/extensions/sf/uok/kernel.js index d48a97649..4d511d308 100644 --- a/src/resources/extensions/sf/uok/kernel.js +++ b/src/resources/extensions/sf/uok/kernel.js @@ -1,5 +1,10 @@ import { randomUUID } from "node:crypto"; import { debugLog } from "../debug-logger.js"; +import { + isDbAvailable, + recordUokRunExit, + recordUokRunStart, +} from "../sf-db.js"; import { buildAuditEnvelope, emitUokAuditEvent } from "./audit.js"; import { setAuditEnvelopeEnabled } from "./audit-toggle.js"; import { resolveUokFlags } from "./flags.js"; @@ -48,8 +53,18 @@ export async function runAutoLoopWithUok(args) { } setAuditEnvelopeEnabled(flags.auditEnvelope); signalKernelEnter(); + const startedAt = new Date().toISOString(); + if (isDbAvailable()) { + recordUokRunStart({ + runId, + sessionId: ctx.sessionManager?.getSessionId?.(), + path: resolveKernelPathLabel(flags), + flags: { ...flags }, + startedAt, + }); + } writeParityHeartbeat(s.basePath, { - ts: new Date().toISOString(), + ts: startedAt, runId, sessionId: ctx.sessionManager?.getSessionId?.(), path: resolveKernelPathLabel(flags), @@ -95,8 +110,20 @@ export async function runAutoLoopWithUok(args) { error = err instanceof Error ? err.message : String(err); throw err; } finally { + const endedAt = new Date().toISOString(); + if (isDbAvailable()) { + recordUokRunExit({ + runId, + sessionId: ctx.sessionManager?.getSessionId?.(), + path: resolveKernelPathLabel(flags), + flags: { ...flags }, + status, + endedAt, + ...(error ? { error } : {}), + }); + } writeParityHeartbeat(s.basePath, { - ts: new Date().toISOString(), + ts: endedAt, runId, sessionId: ctx.sessionManager?.getSessionId?.(), path: resolveKernelPathLabel(flags), diff --git a/src/resources/extensions/sf/uok/parity-report.js b/src/resources/extensions/sf/uok/parity-report.js index 8de6d2a7e..68b67e455 100644 --- a/src/resources/extensions/sf/uok/parity-report.js +++ b/src/resources/extensions/sf/uok/parity-report.js @@ -8,6 +8,7 @@ import { } from "node:fs"; import { join } from "node:path"; import { sfRoot } from "../paths.js"; +import { getUokRuns, isDbAvailable } from "../sf-db.js"; export const UNMATCHED_RUN_STALE_MS = 30 * 60 * 1000; @@ -59,12 +60,28 @@ export function buildParityReport( sourcePath, nowMs = Date.now(), staleMs = UNMATCHED_RUN_STALE_MS, + ledgerRuns = [], ) { const paths = {}; const statuses = {}; const criticalMismatches = []; const historicalCriticalMismatches = []; const runs = new Map(); + for (const run of ledgerRuns) { + runs.set(run.runId, { + runId: run.runId, + path: run.path, + status: run.status, + error: run.error, + sessionId: run.sessionId, + enterEvents: 1, + exitEvents: run.endedAt ? 1 : 0, + enteredAt: run.startedAt, + exitedAt: run.endedAt, + source: "ledger", + }); + } + const hasLedgerRuns = ledgerRuns.length > 0; let currentErrorEvents = 0; let historicalErrorEvents = 0; let enterEvents = 0; @@ -106,7 +123,7 @@ export function buildParityReport( : undefined; if (heartbeat.phase === "enter") { enterEvents += 1; - if (runId) { + if (runId && !hasLedgerRuns) { const current = runs.get(runId) ?? { runId, path: heartbeat.path, @@ -119,13 +136,13 @@ export function buildParityReport( current.path = current.path ?? heartbeat.path; current.enteredAt = current.enteredAt ?? heartbeat.ts; runs.set(runId, current); - } else { + } else if (!runId) { legacyEnterEvents += 1; } } if (heartbeat.phase === "exit") { exitEvents += 1; - if (runId) { + if (runId && !hasLedgerRuns) { const current = runs.get(runId) ?? { runId, path: heartbeat.path, @@ -138,11 +155,12 @@ export function buildParityReport( current.path = current.path ?? heartbeat.path; current.exitedAt = heartbeat.ts; runs.set(runId, current); - } else { + } else if (!runId) { legacyExitEvents += 1; } } if (heartbeat.status === "error") { + if (hasLedgerRuns) continue; const message = heartbeat.error ?? "parity event reported error"; if (isFreshTimestamp(heartbeat.ts, nowMs, staleMs)) { currentErrorEvents += 1; @@ -166,6 +184,20 @@ export function buildParityReport( 0, legacyEnterEvents - legacyExitEvents, ); + if (hasLedgerRuns) { + for (const run of ledgerRuns) { + if (run.status !== "error") continue; + if (isFreshTimestamp(run.endedAt ?? run.updatedAt, nowMs, staleMs)) { + currentErrorEvents += 1; + criticalMismatches.push(run.error ?? "uok run reported error"); + } else { + historicalErrorEvents += 1; + historicalCriticalMismatches.push( + run.error ?? "uok run reported historical error", + ); + } + } + } const missingExitEvents = freshUnmatchedRuns.length; if (freshUnmatchedRuns.length > 0) { const exampleIds = freshUnmatchedRuns @@ -190,6 +222,7 @@ export function buildParityReport( exitEvents, missingExitEvents, legacyMissingExitEvents, + ledgerRunCount: ledgerRuns.length, unmatchedRuns, freshUnmatchedRuns, historicalUnmatchedRuns, @@ -205,10 +238,53 @@ export function hasCurrentParityWarning(report) { return criticalMismatches.length > 0 || currentErrors > 0; } +export function summarizeParityHealth(report) { + const currentCriticalMismatches = report?.criticalMismatches ?? []; + const historicalCriticalMismatches = + report?.historicalCriticalMismatches ?? []; + const currentMissingExitEvents = report?.missingExitEvents ?? 0; + const currentErrorEvents = + report?.currentErrorEvents ?? report?.statuses?.error ?? 0; + const legacyMissingExitEvents = report?.legacyMissingExitEvents ?? 0; + const historicalErrorEvents = report?.historicalErrorEvents ?? 0; + const historicalUnmatchedRuns = report?.historicalUnmatchedRuns?.length ?? 0; + const ok = + currentCriticalMismatches.length === 0 && + currentMissingExitEvents === 0 && + currentErrorEvents === 0; + return { + ok, + status: ok ? "ok" : "degraded", + current: { + criticalMismatches: currentCriticalMismatches.length, + missingExitEvents: currentMissingExitEvents, + errorEvents: currentErrorEvents, + }, + historical: { + criticalMismatches: historicalCriticalMismatches.length, + legacyMissingExitEvents, + errorEvents: historicalErrorEvents, + unmatchedRuns: historicalUnmatchedRuns, + }, + }; +} + export function writeParityReport(basePath) { const sourcePath = parityLogPath(basePath); const raw = existsSync(sourcePath) ? readFileSync(sourcePath, "utf-8") : ""; - const report = buildParityReport(parseParityEvents(raw), sourcePath); + let ledgerRuns = []; + try { + ledgerRuns = isDbAvailable() ? getUokRuns() : []; + } catch { + ledgerRuns = []; + } + const report = buildParityReport( + parseParityEvents(raw), + sourcePath, + Date.now(), + UNMATCHED_RUN_STALE_MS, + ledgerRuns, + ); mkdirSync(join(sfRoot(basePath), "runtime"), { recursive: true }); const finalPath = reportPath(basePath); const tmpPath = `${finalPath}.tmp`;