fix: stabilize uok ledger and steering
This commit is contained in:
parent
cfde65fdd5
commit
fbb61026fc
13 changed files with 510 additions and 30 deletions
|
|
@ -256,7 +256,7 @@ describe("agent-loop — steering during tool batches", () => {
|
|||
toolExecution: "sequential",
|
||||
getSteeringMessages: async () => {
|
||||
steeringPolls += 1;
|
||||
return steeringPolls === 1 ? [steering] : [];
|
||||
return steeringPolls === 2 ? [steering] : [];
|
||||
},
|
||||
};
|
||||
|
||||
|
|
@ -293,7 +293,7 @@ describe("agent-loop — steering during tool batches", () => {
|
|||
);
|
||||
});
|
||||
|
||||
it("defers queued steering until after the current tool batch when configured", async () => {
|
||||
it("defers queued user steering until after the current tool batch by default", async () => {
|
||||
const calls: string[] = [];
|
||||
const tool = {
|
||||
name: "record",
|
||||
|
|
@ -354,7 +354,6 @@ describe("agent-loop — steering during tool batches", () => {
|
|||
model: TEST_MODEL,
|
||||
convertToLlm: (msgs) => msgs.filter((m): m is any => m.role !== "custom"),
|
||||
toolExecution: "sequential",
|
||||
interruptToolExecutionOnSteering: false,
|
||||
getSteeringMessages: async () => {
|
||||
steeringPolls += 1;
|
||||
return steeringPolls === 1 ? [steering] : [];
|
||||
|
|
@ -393,6 +392,107 @@ describe("agent-loop — steering during tool batches", () => {
|
|||
"queued steering should still be delivered after the tool batch",
|
||||
);
|
||||
});
|
||||
|
||||
it("skips remaining tool calls only when steering interruption is explicit", async () => {
|
||||
const calls: string[] = [];
|
||||
const tool = {
|
||||
name: "record",
|
||||
label: "Record",
|
||||
description: "Record a value",
|
||||
parameters: Type.Object({ value: Type.String() }),
|
||||
execute: async (_id: string, args: { value: string }) => {
|
||||
calls.push(args.value);
|
||||
return {
|
||||
content: [{ type: "text" as const, text: `recorded ${args.value}` }],
|
||||
details: {},
|
||||
};
|
||||
},
|
||||
} satisfies AgentTool<{ value: string }>;
|
||||
|
||||
const first = makeAssistantMessage({
|
||||
content: [
|
||||
{
|
||||
type: "toolCall",
|
||||
id: "tc-1",
|
||||
name: "record",
|
||||
arguments: { value: "one" },
|
||||
},
|
||||
{
|
||||
type: "toolCall",
|
||||
id: "tc-2",
|
||||
name: "record",
|
||||
arguments: { value: "two" },
|
||||
},
|
||||
],
|
||||
stopReason: "toolUse",
|
||||
});
|
||||
const second = makeAssistantMessage({
|
||||
content: [{ type: "text", text: "saw steering" }],
|
||||
stopReason: "stop",
|
||||
});
|
||||
const mockStream = createMockStreamFn([first, second]);
|
||||
let steeringPolls = 0;
|
||||
const steering: AgentMessage = {
|
||||
role: "user",
|
||||
content: [{ type: "text", text: "stop and listen" }],
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
|
||||
const context: AgentContext = {
|
||||
systemPrompt: "You are a test agent.",
|
||||
messages: [
|
||||
{
|
||||
role: "user",
|
||||
content: [{ type: "text", text: "record values" }],
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
],
|
||||
tools: [tool],
|
||||
};
|
||||
|
||||
const config: AgentLoopConfig = {
|
||||
model: TEST_MODEL,
|
||||
convertToLlm: (msgs) => msgs.filter((m): m is any => m.role !== "custom"),
|
||||
toolExecution: "sequential",
|
||||
interruptToolExecutionOnSteering: true,
|
||||
getSteeringMessages: async () => {
|
||||
steeringPolls += 1;
|
||||
return steeringPolls === 2 ? [steering] : [];
|
||||
},
|
||||
};
|
||||
|
||||
const stream = agentLoop(
|
||||
[
|
||||
{
|
||||
role: "user",
|
||||
content: [{ type: "text", text: "record values" }],
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
],
|
||||
context,
|
||||
config,
|
||||
undefined,
|
||||
mockStream as any,
|
||||
);
|
||||
|
||||
const events = await collectEvents(stream);
|
||||
const skipped = events.filter(
|
||||
(event) =>
|
||||
event.type === "tool_execution_end" &&
|
||||
JSON.stringify(event.result.content).includes(
|
||||
"Skipped due to queued user message",
|
||||
),
|
||||
);
|
||||
|
||||
assert.deepEqual(calls, ["one"]);
|
||||
assert.equal(skipped.length, 1);
|
||||
assert.ok(
|
||||
events.some(
|
||||
(event) => event.type === "message_start" && event.message === steering,
|
||||
),
|
||||
"explicit interrupt steering should still be delivered",
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -583,7 +583,7 @@ async function executeToolCallsSequential(
|
|||
const results: ToolResultMessage[] = [];
|
||||
let steeringMessages: AgentMessage[] | undefined;
|
||||
let preparationErrorCount = 0;
|
||||
const interruptOnSteering = config.interruptToolExecutionOnSteering !== false;
|
||||
const interruptOnSteering = config.interruptToolExecutionOnSteering === true;
|
||||
|
||||
for (let index = 0; index < toolCalls.length; index++) {
|
||||
const toolCall = toolCalls[index];
|
||||
|
|
@ -662,7 +662,7 @@ async function executeToolCallsParallel(
|
|||
const runnableCalls: PreparedToolCall[] = [];
|
||||
let steeringMessages: AgentMessage[] | undefined;
|
||||
let preparationErrorCount = 0;
|
||||
const interruptOnSteering = config.interruptToolExecutionOnSteering !== false;
|
||||
const interruptOnSteering = config.interruptToolExecutionOnSteering === true;
|
||||
|
||||
for (let index = 0; index < toolCalls.length; index++) {
|
||||
const toolCall = toolCalls[index];
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ export interface AgentOptions {
|
|||
|
||||
/**
|
||||
* Whether steering messages interrupt the current assistant tool batch.
|
||||
* Defaults to true for interactive sessions.
|
||||
* Defaults to false so user comments are absorbed at the next safe boundary.
|
||||
*/
|
||||
interruptToolExecutionOnSteering?: boolean;
|
||||
|
||||
|
|
@ -201,7 +201,7 @@ export class Agent {
|
|||
this._externalToolExecution = opts.externalToolExecution;
|
||||
this._getProviderOptions = opts.getProviderOptions;
|
||||
this._interruptToolExecutionOnSteering =
|
||||
opts.interruptToolExecutionOnSteering ?? true;
|
||||
opts.interruptToolExecutionOnSteering ?? false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -329,8 +329,8 @@ export class Agent {
|
|||
}
|
||||
|
||||
/**
|
||||
* Queue a steering message to interrupt the agent mid-run.
|
||||
* Delivered after current tool execution, skips remaining tools.
|
||||
* Queue a steering message for the agent mid-run.
|
||||
* Delivered after the current tool batch unless interrupt behavior is explicitly enabled.
|
||||
*/
|
||||
steer(m: AgentMessage, origin: "user" | "system" = "system") {
|
||||
this.steeringQueue.push({ message: m, origin });
|
||||
|
|
|
|||
|
|
@ -153,9 +153,9 @@ export interface AgentLoopConfig extends SimpleStreamOptions {
|
|||
/**
|
||||
* Returns steering messages to inject into the conversation mid-run.
|
||||
*
|
||||
* Called after each tool execution to check for user interruptions.
|
||||
* If messages are returned, remaining tool calls are skipped and
|
||||
* these messages are added to the context before the next LLM call.
|
||||
* Called after tool execution boundaries to check for user steering.
|
||||
* By default, returned messages are added to the context after the current
|
||||
* tool batch finishes and before the next LLM call.
|
||||
*
|
||||
* Use this for "steering" the agent while it's working.
|
||||
*/
|
||||
|
|
@ -164,10 +164,10 @@ export interface AgentLoopConfig extends SimpleStreamOptions {
|
|||
/**
|
||||
* Whether steering messages interrupt the current assistant tool batch.
|
||||
*
|
||||
* Default true preserves interactive behavior: a user message typed while
|
||||
* tools are running skips remaining tool calls and lets the model react
|
||||
* immediately. Headless/auto runners can set this false so incidental
|
||||
* queued messages are deferred until the current tool batch finishes.
|
||||
* Default false preserves active tool execution: a user message typed while
|
||||
* tools are running is absorbed after the current tool batch finishes. Set
|
||||
* this true only for explicit stop-now workflows that should skip remaining
|
||||
* tool calls.
|
||||
*/
|
||||
interruptToolExecutionOnSteering?: boolean;
|
||||
|
||||
|
|
|
|||
|
|
@ -237,7 +237,7 @@ export interface PromptOptions {
|
|||
expandPromptTemplates?: boolean;
|
||||
/** Image attachments */
|
||||
images?: ImageContent[];
|
||||
/** When streaming, how to queue the message: "steer" (interrupt) or "followUp" (wait). Required if streaming. */
|
||||
/** When streaming, how to queue the message: "steer" (next safe turn) or "followUp" (after current run). Required if streaming. */
|
||||
streamingBehavior?: "steer" | "followUp";
|
||||
/** Source of input for extension input event handlers. Defaults to "interactive". */
|
||||
source?: InputSource;
|
||||
|
|
@ -1461,8 +1461,8 @@ export class AgentSession {
|
|||
}
|
||||
|
||||
/**
|
||||
* Queue a steering message to interrupt the agent mid-run.
|
||||
* Delivered after current tool execution, skips remaining tools.
|
||||
* Queue a steering message for the agent mid-run.
|
||||
* Delivered after the current tool batch at the next safe LLM turn.
|
||||
* Expands skill commands and prompt templates. Errors on extension commands.
|
||||
* @param images Optional image attachments to include with the message
|
||||
* @throws Error if text is an extension command
|
||||
|
|
|
|||
|
|
@ -447,7 +447,7 @@ export async function createAgentSession(
|
|||
},
|
||||
steeringMode: settingsManager.getSteeringMode(),
|
||||
followUpMode: settingsManager.getFollowUpMode(),
|
||||
interruptToolExecutionOnSteering: process.env.SF_HEADLESS !== "1",
|
||||
interruptToolExecutionOnSteering: false,
|
||||
transport: settingsManager.getTransport(),
|
||||
thinkingBudgets: settingsManager.getThinkingBudgets(),
|
||||
maxRetryDelayMs: settingsManager.getRetrySettings().maxDelayMs,
|
||||
|
|
|
|||
|
|
@ -278,7 +278,7 @@ export class RpcClient {
|
|||
}
|
||||
|
||||
/**
|
||||
* Queue a steering message to interrupt the agent mid-run.
|
||||
* Queue a steering message for the agent at the next safe turn.
|
||||
*/
|
||||
async steer(message: string, images?: ImageContent[]): Promise<void> {
|
||||
await this.send({ type: "steer", message, images });
|
||||
|
|
|
|||
|
|
@ -360,7 +360,7 @@ export class RpcClient {
|
|||
}
|
||||
|
||||
/**
|
||||
* Queue a steering message to interrupt the agent mid-run.
|
||||
* Queue a steering message for the agent at the next safe turn.
|
||||
*/
|
||||
async steer(message: string, images?: ImageContent[]): Promise<void> {
|
||||
await this.send({ type: "steer", message, images });
|
||||
|
|
|
|||
|
|
@ -8,6 +8,10 @@ import {
|
|||
openDatabase,
|
||||
} from "./sf-db.js";
|
||||
import { deriveState } from "./state.js";
|
||||
import {
|
||||
summarizeParityHealth,
|
||||
writeParityReport,
|
||||
} from "./uok/parity-report.js";
|
||||
import { readEvents } from "./workflow-events.js";
|
||||
import { renderAllProjections } from "./workflow-projections.js";
|
||||
|
||||
|
|
@ -139,6 +143,45 @@ export async function checkEngineHealth(
|
|||
} catch {
|
||||
// Non-fatal — legacy directory normalization must never block doctor.
|
||||
}
|
||||
try {
|
||||
const parityReport = writeParityReport(basePath);
|
||||
const parityHealth = summarizeParityHealth(parityReport);
|
||||
if (!parityHealth.ok) {
|
||||
issues.push({
|
||||
severity: "warning",
|
||||
code: "uok_parity_current_issue",
|
||||
scope: "project",
|
||||
unitId: "project",
|
||||
message:
|
||||
`UOK parity has current issues: ${parityHealth.current.criticalMismatches} critical mismatch(es), ` +
|
||||
`${parityHealth.current.missingExitEvents} missing exit(s), ${parityHealth.current.errorEvents} error event(s).`,
|
||||
file: ".sf/runtime/uok-parity-report.json",
|
||||
fixable: false,
|
||||
});
|
||||
} else if (
|
||||
parityHealth.historical.criticalMismatches > 0 ||
|
||||
parityHealth.historical.legacyMissingExitEvents > 0 ||
|
||||
parityHealth.historical.errorEvents > 0 ||
|
||||
parityHealth.historical.unmatchedRuns > 0
|
||||
) {
|
||||
issues.push({
|
||||
severity: "info",
|
||||
code: "uok_parity_historical_drift",
|
||||
scope: "project",
|
||||
unitId: "project",
|
||||
message:
|
||||
`UOK parity is clean now; historical drift remains: ` +
|
||||
`${parityHealth.historical.criticalMismatches} old mismatch(es), ` +
|
||||
`${parityHealth.historical.legacyMissingExitEvents} legacy missing exit(s), ` +
|
||||
`${parityHealth.historical.errorEvents} old error event(s), ` +
|
||||
`${parityHealth.historical.unmatchedRuns} stale unmatched run(s).`,
|
||||
file: ".sf/runtime/uok-parity-report.json",
|
||||
fixable: false,
|
||||
});
|
||||
}
|
||||
} catch {
|
||||
// Non-fatal — UOK parity diagnostics must never block doctor.
|
||||
}
|
||||
// ── DB constraint violation detection (full doctor only, not pre-dispatch per D-10) ──
|
||||
try {
|
||||
if (isDbAvailable()) {
|
||||
|
|
|
|||
|
|
@ -78,7 +78,7 @@ function openRawDb(path) {
|
|||
loadProvider();
|
||||
return new DatabaseSync(path);
|
||||
}
|
||||
const SCHEMA_VERSION = 25;
|
||||
const SCHEMA_VERSION = 26;
|
||||
function indexExists(db, name) {
|
||||
return !!db
|
||||
.prepare(
|
||||
|
|
@ -503,6 +503,19 @@ function initSchema(db, fileBacked) {
|
|||
cost_usd REAL DEFAULT NULL,
|
||||
recorded_at INTEGER NOT NULL
|
||||
)
|
||||
`);
|
||||
db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS uok_runs (
|
||||
run_id TEXT PRIMARY KEY,
|
||||
session_id TEXT DEFAULT NULL,
|
||||
path TEXT NOT NULL DEFAULT '',
|
||||
status TEXT NOT NULL DEFAULT 'started',
|
||||
started_at TEXT NOT NULL,
|
||||
ended_at TEXT DEFAULT NULL,
|
||||
error TEXT DEFAULT NULL,
|
||||
flags_json TEXT NOT NULL DEFAULT '{}',
|
||||
updated_at TEXT NOT NULL
|
||||
)
|
||||
`);
|
||||
db.exec(
|
||||
"CREATE INDEX IF NOT EXISTS idx_memories_active ON memories(superseded_by)",
|
||||
|
|
@ -558,6 +571,12 @@ function initSchema(db, fileBacked) {
|
|||
db.exec(
|
||||
"CREATE INDEX IF NOT EXISTS idx_llm_task_outcomes_provider ON llm_task_outcomes(provider, recorded_at DESC)",
|
||||
);
|
||||
db.exec(
|
||||
"CREATE INDEX IF NOT EXISTS idx_uok_runs_status_started ON uok_runs(status, started_at DESC)",
|
||||
);
|
||||
db.exec(
|
||||
"CREATE INDEX IF NOT EXISTS idx_uok_runs_session ON uok_runs(session_id, started_at DESC)",
|
||||
);
|
||||
ensureRepoProfileTables(db);
|
||||
db.exec(
|
||||
`CREATE VIEW IF NOT EXISTS active_decisions AS SELECT * FROM decisions WHERE superseded_by IS NULL`,
|
||||
|
|
@ -1429,6 +1448,33 @@ function migrateSchema(db) {
|
|||
":applied_at": new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
if (currentVersion < 26) {
|
||||
db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS uok_runs (
|
||||
run_id TEXT PRIMARY KEY,
|
||||
session_id TEXT DEFAULT NULL,
|
||||
path TEXT NOT NULL DEFAULT '',
|
||||
status TEXT NOT NULL DEFAULT 'started',
|
||||
started_at TEXT NOT NULL,
|
||||
ended_at TEXT DEFAULT NULL,
|
||||
error TEXT DEFAULT NULL,
|
||||
flags_json TEXT NOT NULL DEFAULT '{}',
|
||||
updated_at TEXT NOT NULL
|
||||
)
|
||||
`);
|
||||
db.exec(
|
||||
"CREATE INDEX IF NOT EXISTS idx_uok_runs_status_started ON uok_runs(status, started_at DESC)",
|
||||
);
|
||||
db.exec(
|
||||
"CREATE INDEX IF NOT EXISTS idx_uok_runs_session ON uok_runs(session_id, started_at DESC)",
|
||||
);
|
||||
db.prepare(
|
||||
"INSERT INTO schema_version (version, applied_at) VALUES (:version, :applied_at)",
|
||||
).run({
|
||||
":version": 26,
|
||||
":applied_at": new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
db.exec("COMMIT");
|
||||
} catch (err) {
|
||||
db.exec("ROLLBACK");
|
||||
|
|
@ -3363,6 +3409,90 @@ export function upsertTurnGitTransaction(entry) {
|
|||
":updated_at": entry.updatedAt,
|
||||
});
|
||||
}
|
||||
export function recordUokRunStart(entry) {
|
||||
if (!currentDb) return;
|
||||
const now = entry.startedAt ?? new Date().toISOString();
|
||||
currentDb
|
||||
.prepare(`INSERT INTO uok_runs (
|
||||
run_id, session_id, path, status, started_at, ended_at, error, flags_json, updated_at
|
||||
) VALUES (
|
||||
:run_id, :session_id, :path, 'started', :started_at, NULL, NULL, :flags_json, :updated_at
|
||||
)
|
||||
ON CONFLICT(run_id) DO UPDATE SET
|
||||
session_id = excluded.session_id,
|
||||
path = excluded.path,
|
||||
status = 'started',
|
||||
started_at = excluded.started_at,
|
||||
ended_at = NULL,
|
||||
error = NULL,
|
||||
flags_json = excluded.flags_json,
|
||||
updated_at = excluded.updated_at`)
|
||||
.run({
|
||||
":run_id": entry.runId,
|
||||
":session_id": entry.sessionId ?? null,
|
||||
":path": entry.path ?? "",
|
||||
":started_at": now,
|
||||
":flags_json": JSON.stringify(entry.flags ?? {}),
|
||||
":updated_at": now,
|
||||
});
|
||||
}
|
||||
export function recordUokRunExit(entry) {
|
||||
if (!currentDb) return;
|
||||
const now = entry.endedAt ?? new Date().toISOString();
|
||||
currentDb
|
||||
.prepare(`INSERT INTO uok_runs (
|
||||
run_id, session_id, path, status, started_at, ended_at, error, flags_json, updated_at
|
||||
) VALUES (
|
||||
:run_id, :session_id, :path, :status, :started_at, :ended_at, :error, :flags_json, :updated_at
|
||||
)
|
||||
ON CONFLICT(run_id) DO UPDATE SET
|
||||
session_id = COALESCE(excluded.session_id, uok_runs.session_id),
|
||||
path = CASE WHEN excluded.path = '' THEN uok_runs.path ELSE excluded.path END,
|
||||
status = excluded.status,
|
||||
ended_at = excluded.ended_at,
|
||||
error = excluded.error,
|
||||
flags_json = CASE WHEN excluded.flags_json = '{}' THEN uok_runs.flags_json ELSE excluded.flags_json END,
|
||||
updated_at = excluded.updated_at`)
|
||||
.run({
|
||||
":run_id": entry.runId,
|
||||
":session_id": entry.sessionId ?? null,
|
||||
":path": entry.path ?? "",
|
||||
":status": entry.status ?? "ok",
|
||||
":started_at": entry.startedAt ?? now,
|
||||
":ended_at": now,
|
||||
":error": entry.error ?? null,
|
||||
":flags_json": JSON.stringify(entry.flags ?? {}),
|
||||
":updated_at": now,
|
||||
});
|
||||
}
|
||||
export function getUokRuns(limit = 500) {
|
||||
if (!currentDb) return [];
|
||||
return currentDb
|
||||
.prepare(
|
||||
`SELECT run_id, session_id, path, status, started_at, ended_at, error, flags_json, updated_at
|
||||
FROM uok_runs
|
||||
ORDER BY started_at DESC
|
||||
LIMIT :limit`,
|
||||
)
|
||||
.all({ ":limit": limit })
|
||||
.map((row) => ({
|
||||
runId: row.run_id,
|
||||
sessionId: row.session_id,
|
||||
path: row.path,
|
||||
status: row.status,
|
||||
startedAt: row.started_at,
|
||||
endedAt: row.ended_at,
|
||||
error: row.error,
|
||||
flags: (() => {
|
||||
try {
|
||||
return JSON.parse(row.flags_json || "{}");
|
||||
} catch {
|
||||
return {};
|
||||
}
|
||||
})(),
|
||||
updatedAt: row.updated_at,
|
||||
}));
|
||||
}
|
||||
export function insertAuditEvent(entry) {
|
||||
if (!currentDb) return;
|
||||
transaction(() => {
|
||||
|
|
|
|||
|
|
@ -9,6 +9,13 @@ import {
|
|||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { afterEach, test } from "vitest";
|
||||
import {
|
||||
closeDatabase,
|
||||
getUokRuns,
|
||||
openDatabase,
|
||||
recordUokRunExit,
|
||||
recordUokRunStart,
|
||||
} from "../sf-db.js";
|
||||
import { runAutoLoopWithUok } from "../uok/kernel.js";
|
||||
import {
|
||||
buildParityReport,
|
||||
|
|
@ -21,6 +28,7 @@ const NOW = Date.parse("2026-05-06T00:00:00.000Z");
|
|||
const tmpRoots = [];
|
||||
|
||||
afterEach(() => {
|
||||
closeDatabase();
|
||||
for (const dir of tmpRoots.splice(0)) {
|
||||
rmSync(dir, { recursive: true, force: true });
|
||||
}
|
||||
|
|
@ -154,6 +162,102 @@ test("buildParityReport_run_exit_balances_enter", () => {
|
|||
assert.equal(hasCurrentParityWarning(report), false);
|
||||
});
|
||||
|
||||
test("uok_run_ledger_records_lifecycle_as_primary_state", () => {
|
||||
openDatabase(":memory:");
|
||||
recordUokRunStart({
|
||||
runId: "uok-ledger-1",
|
||||
sessionId: "session-ledger",
|
||||
path: "uok-kernel",
|
||||
flags: { enabled: true },
|
||||
startedAt: new Date(NOW - 10_000).toISOString(),
|
||||
});
|
||||
recordUokRunExit({
|
||||
runId: "uok-ledger-1",
|
||||
sessionId: "session-ledger",
|
||||
path: "uok-kernel",
|
||||
flags: { enabled: true },
|
||||
status: "ok",
|
||||
endedAt: new Date(NOW - 5_000).toISOString(),
|
||||
});
|
||||
|
||||
const runs = getUokRuns();
|
||||
assert.equal(runs.length, 1);
|
||||
assert.equal(runs[0].runId, "uok-ledger-1");
|
||||
assert.equal(runs[0].status, "ok");
|
||||
assert.equal(runs[0].sessionId, "session-ledger");
|
||||
assert.equal(runs[0].flags.enabled, true);
|
||||
|
||||
const report = buildParityReport(
|
||||
[],
|
||||
"/tmp/uok-parity.jsonl",
|
||||
NOW,
|
||||
30_000,
|
||||
runs,
|
||||
);
|
||||
assert.equal(report.ledgerRunCount, 1);
|
||||
assert.equal(report.missingExitEvents, 0);
|
||||
assert.equal(hasCurrentParityWarning(report), false);
|
||||
});
|
||||
|
||||
test("buildParityReport_ledger_overrides_jsonl_lifecycle_counts", () => {
|
||||
const report = buildParityReport(
|
||||
[
|
||||
{
|
||||
ts: new Date(NOW - 5_000).toISOString(),
|
||||
runId: "uok-ledger-clean",
|
||||
path: "uok-kernel",
|
||||
phase: "enter",
|
||||
status: "unknown",
|
||||
},
|
||||
{
|
||||
ts: new Date(NOW - 5_000).toISOString(),
|
||||
path: "uok-kernel",
|
||||
phase: "enter",
|
||||
status: "unknown",
|
||||
},
|
||||
],
|
||||
"/tmp/uok-parity.jsonl",
|
||||
NOW,
|
||||
30_000,
|
||||
[
|
||||
{
|
||||
runId: "uok-ledger-clean",
|
||||
sessionId: "session-ledger",
|
||||
path: "uok-kernel",
|
||||
status: "ok",
|
||||
startedAt: new Date(NOW - 10_000).toISOString(),
|
||||
endedAt: new Date(NOW - 5_000).toISOString(),
|
||||
flags: { enabled: true },
|
||||
},
|
||||
],
|
||||
);
|
||||
|
||||
assert.equal(report.ledgerRunCount, 1);
|
||||
assert.equal(report.legacyMissingExitEvents, 1);
|
||||
assert.equal(report.missingExitEvents, 0);
|
||||
assert.deepEqual(report.criticalMismatches, []);
|
||||
assert.equal(hasCurrentParityWarning(report), false);
|
||||
});
|
||||
|
||||
test("buildParityReport_ledger_error_is_current_warning", () => {
|
||||
const report = buildParityReport([], "/tmp/uok-parity.jsonl", NOW, 30_000, [
|
||||
{
|
||||
runId: "uok-ledger-error",
|
||||
sessionId: "session-ledger",
|
||||
path: "uok-kernel",
|
||||
status: "error",
|
||||
error: "ledger failure",
|
||||
startedAt: new Date(NOW - 10_000).toISOString(),
|
||||
endedAt: new Date(NOW - 5_000).toISOString(),
|
||||
flags: { enabled: true },
|
||||
},
|
||||
]);
|
||||
|
||||
assert.equal(report.currentErrorEvents, 1);
|
||||
assert.deepEqual(report.criticalMismatches, ["ledger failure"]);
|
||||
assert.equal(hasCurrentParityWarning(report), true);
|
||||
});
|
||||
|
||||
test("buildParityReport_fresh_error_is_current_but_stale_error_is_historical", () => {
|
||||
const report = buildParityReport(
|
||||
[
|
||||
|
|
|
|||
|
|
@ -1,5 +1,10 @@
|
|||
import { randomUUID } from "node:crypto";
|
||||
import { debugLog } from "../debug-logger.js";
|
||||
import {
|
||||
isDbAvailable,
|
||||
recordUokRunExit,
|
||||
recordUokRunStart,
|
||||
} from "../sf-db.js";
|
||||
import { buildAuditEnvelope, emitUokAuditEvent } from "./audit.js";
|
||||
import { setAuditEnvelopeEnabled } from "./audit-toggle.js";
|
||||
import { resolveUokFlags } from "./flags.js";
|
||||
|
|
@ -48,8 +53,18 @@ export async function runAutoLoopWithUok(args) {
|
|||
}
|
||||
setAuditEnvelopeEnabled(flags.auditEnvelope);
|
||||
signalKernelEnter();
|
||||
const startedAt = new Date().toISOString();
|
||||
if (isDbAvailable()) {
|
||||
recordUokRunStart({
|
||||
runId,
|
||||
sessionId: ctx.sessionManager?.getSessionId?.(),
|
||||
path: resolveKernelPathLabel(flags),
|
||||
flags: { ...flags },
|
||||
startedAt,
|
||||
});
|
||||
}
|
||||
writeParityHeartbeat(s.basePath, {
|
||||
ts: new Date().toISOString(),
|
||||
ts: startedAt,
|
||||
runId,
|
||||
sessionId: ctx.sessionManager?.getSessionId?.(),
|
||||
path: resolveKernelPathLabel(flags),
|
||||
|
|
@ -95,8 +110,20 @@ export async function runAutoLoopWithUok(args) {
|
|||
error = err instanceof Error ? err.message : String(err);
|
||||
throw err;
|
||||
} finally {
|
||||
const endedAt = new Date().toISOString();
|
||||
if (isDbAvailable()) {
|
||||
recordUokRunExit({
|
||||
runId,
|
||||
sessionId: ctx.sessionManager?.getSessionId?.(),
|
||||
path: resolveKernelPathLabel(flags),
|
||||
flags: { ...flags },
|
||||
status,
|
||||
endedAt,
|
||||
...(error ? { error } : {}),
|
||||
});
|
||||
}
|
||||
writeParityHeartbeat(s.basePath, {
|
||||
ts: new Date().toISOString(),
|
||||
ts: endedAt,
|
||||
runId,
|
||||
sessionId: ctx.sessionManager?.getSessionId?.(),
|
||||
path: resolveKernelPathLabel(flags),
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import {
|
|||
} from "node:fs";
|
||||
import { join } from "node:path";
|
||||
import { sfRoot } from "../paths.js";
|
||||
import { getUokRuns, isDbAvailable } from "../sf-db.js";
|
||||
|
||||
export const UNMATCHED_RUN_STALE_MS = 30 * 60 * 1000;
|
||||
|
||||
|
|
@ -59,12 +60,28 @@ export function buildParityReport(
|
|||
sourcePath,
|
||||
nowMs = Date.now(),
|
||||
staleMs = UNMATCHED_RUN_STALE_MS,
|
||||
ledgerRuns = [],
|
||||
) {
|
||||
const paths = {};
|
||||
const statuses = {};
|
||||
const criticalMismatches = [];
|
||||
const historicalCriticalMismatches = [];
|
||||
const runs = new Map();
|
||||
for (const run of ledgerRuns) {
|
||||
runs.set(run.runId, {
|
||||
runId: run.runId,
|
||||
path: run.path,
|
||||
status: run.status,
|
||||
error: run.error,
|
||||
sessionId: run.sessionId,
|
||||
enterEvents: 1,
|
||||
exitEvents: run.endedAt ? 1 : 0,
|
||||
enteredAt: run.startedAt,
|
||||
exitedAt: run.endedAt,
|
||||
source: "ledger",
|
||||
});
|
||||
}
|
||||
const hasLedgerRuns = ledgerRuns.length > 0;
|
||||
let currentErrorEvents = 0;
|
||||
let historicalErrorEvents = 0;
|
||||
let enterEvents = 0;
|
||||
|
|
@ -106,7 +123,7 @@ export function buildParityReport(
|
|||
: undefined;
|
||||
if (heartbeat.phase === "enter") {
|
||||
enterEvents += 1;
|
||||
if (runId) {
|
||||
if (runId && !hasLedgerRuns) {
|
||||
const current = runs.get(runId) ?? {
|
||||
runId,
|
||||
path: heartbeat.path,
|
||||
|
|
@ -119,13 +136,13 @@ export function buildParityReport(
|
|||
current.path = current.path ?? heartbeat.path;
|
||||
current.enteredAt = current.enteredAt ?? heartbeat.ts;
|
||||
runs.set(runId, current);
|
||||
} else {
|
||||
} else if (!runId) {
|
||||
legacyEnterEvents += 1;
|
||||
}
|
||||
}
|
||||
if (heartbeat.phase === "exit") {
|
||||
exitEvents += 1;
|
||||
if (runId) {
|
||||
if (runId && !hasLedgerRuns) {
|
||||
const current = runs.get(runId) ?? {
|
||||
runId,
|
||||
path: heartbeat.path,
|
||||
|
|
@ -138,11 +155,12 @@ export function buildParityReport(
|
|||
current.path = current.path ?? heartbeat.path;
|
||||
current.exitedAt = heartbeat.ts;
|
||||
runs.set(runId, current);
|
||||
} else {
|
||||
} else if (!runId) {
|
||||
legacyExitEvents += 1;
|
||||
}
|
||||
}
|
||||
if (heartbeat.status === "error") {
|
||||
if (hasLedgerRuns) continue;
|
||||
const message = heartbeat.error ?? "parity event reported error";
|
||||
if (isFreshTimestamp(heartbeat.ts, nowMs, staleMs)) {
|
||||
currentErrorEvents += 1;
|
||||
|
|
@ -166,6 +184,20 @@ export function buildParityReport(
|
|||
0,
|
||||
legacyEnterEvents - legacyExitEvents,
|
||||
);
|
||||
if (hasLedgerRuns) {
|
||||
for (const run of ledgerRuns) {
|
||||
if (run.status !== "error") continue;
|
||||
if (isFreshTimestamp(run.endedAt ?? run.updatedAt, nowMs, staleMs)) {
|
||||
currentErrorEvents += 1;
|
||||
criticalMismatches.push(run.error ?? "uok run reported error");
|
||||
} else {
|
||||
historicalErrorEvents += 1;
|
||||
historicalCriticalMismatches.push(
|
||||
run.error ?? "uok run reported historical error",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
const missingExitEvents = freshUnmatchedRuns.length;
|
||||
if (freshUnmatchedRuns.length > 0) {
|
||||
const exampleIds = freshUnmatchedRuns
|
||||
|
|
@ -190,6 +222,7 @@ export function buildParityReport(
|
|||
exitEvents,
|
||||
missingExitEvents,
|
||||
legacyMissingExitEvents,
|
||||
ledgerRunCount: ledgerRuns.length,
|
||||
unmatchedRuns,
|
||||
freshUnmatchedRuns,
|
||||
historicalUnmatchedRuns,
|
||||
|
|
@ -205,10 +238,53 @@ export function hasCurrentParityWarning(report) {
|
|||
return criticalMismatches.length > 0 || currentErrors > 0;
|
||||
}
|
||||
|
||||
export function summarizeParityHealth(report) {
|
||||
const currentCriticalMismatches = report?.criticalMismatches ?? [];
|
||||
const historicalCriticalMismatches =
|
||||
report?.historicalCriticalMismatches ?? [];
|
||||
const currentMissingExitEvents = report?.missingExitEvents ?? 0;
|
||||
const currentErrorEvents =
|
||||
report?.currentErrorEvents ?? report?.statuses?.error ?? 0;
|
||||
const legacyMissingExitEvents = report?.legacyMissingExitEvents ?? 0;
|
||||
const historicalErrorEvents = report?.historicalErrorEvents ?? 0;
|
||||
const historicalUnmatchedRuns = report?.historicalUnmatchedRuns?.length ?? 0;
|
||||
const ok =
|
||||
currentCriticalMismatches.length === 0 &&
|
||||
currentMissingExitEvents === 0 &&
|
||||
currentErrorEvents === 0;
|
||||
return {
|
||||
ok,
|
||||
status: ok ? "ok" : "degraded",
|
||||
current: {
|
||||
criticalMismatches: currentCriticalMismatches.length,
|
||||
missingExitEvents: currentMissingExitEvents,
|
||||
errorEvents: currentErrorEvents,
|
||||
},
|
||||
historical: {
|
||||
criticalMismatches: historicalCriticalMismatches.length,
|
||||
legacyMissingExitEvents,
|
||||
errorEvents: historicalErrorEvents,
|
||||
unmatchedRuns: historicalUnmatchedRuns,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export function writeParityReport(basePath) {
|
||||
const sourcePath = parityLogPath(basePath);
|
||||
const raw = existsSync(sourcePath) ? readFileSync(sourcePath, "utf-8") : "";
|
||||
const report = buildParityReport(parseParityEvents(raw), sourcePath);
|
||||
let ledgerRuns = [];
|
||||
try {
|
||||
ledgerRuns = isDbAvailable() ? getUokRuns() : [];
|
||||
} catch {
|
||||
ledgerRuns = [];
|
||||
}
|
||||
const report = buildParityReport(
|
||||
parseParityEvents(raw),
|
||||
sourcePath,
|
||||
Date.now(),
|
||||
UNMATCHED_RUN_STALE_MS,
|
||||
ledgerRuns,
|
||||
);
|
||||
mkdirSync(join(sfRoot(basePath), "runtime"), { recursive: true });
|
||||
const finalPath = reportPath(basePath);
|
||||
const tmpPath = `${finalPath}.tmp`;
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue