From 5bc3895586d9b6a76ab63f38d1abd0779e8193d2 Mon Sep 17 00:00:00 2001 From: Mikael Hugo Date: Thu, 7 May 2026 05:19:41 +0200 Subject: [PATCH] feat: expose uok message bus metrics --- src/resources/extensions/sf/commands-uok.js | 61 +++++++++++++++- src/resources/extensions/sf/sf-db.js | 46 ++++++++++++ .../sf/tests/uok-message-bus.test.mjs | 45 +++++++++++- .../sf/tests/uok-metrics-exposition.test.mjs | 25 ++++++- .../sf/tests/uok-status-command.test.mjs | 70 ++++++++++++++++++- .../extensions/sf/uok/metrics-exposition.js | 22 ++++++ 6 files changed, 264 insertions(+), 5 deletions(-) diff --git a/src/resources/extensions/sf/commands-uok.js b/src/resources/extensions/sf/commands-uok.js index 0540a1e3e..50a8ced79 100644 --- a/src/resources/extensions/sf/commands-uok.js +++ b/src/resources/extensions/sf/commands-uok.js @@ -5,11 +5,12 @@ import { sfRoot } from "./paths.js"; import { getDistinctGateIds, getGateCircuitBreaker, + getUokMessageBusMetrics, getUokRuns, isDbAvailable, } from "./sf-db.js"; import { writeUokDiagnostics } from "./uok/diagnostic-synthesis.js"; -import { UokGateRunner } from "./uok/gate-runner.js"; +import { MessageBus } from "./uok/message-bus.js"; import { readUokMetrics, writeUokMetrics } from "./uok/metrics-exposition.js"; import { summarizeParityHealth, @@ -208,7 +209,7 @@ export async function handleUok(args, ctx) { const trimmed = args.trim(); if (trimmed === "help" || trimmed === "--help") { ctx.ui.notify( - "Usage: /sf uok [status|metrics|circuit-breakers|--json]\n\n status — UOK ledger health, last run, last error, historical drift, startup gate, and gate health\n metrics — Render Prometheus-format metrics to .sf/runtime/uok-metrics.prom and display\n circuit-breakers — List all circuit breaker states and failure streaks\n --json — Same as status but outputs JSON", + "Usage: /sf uok [status|metrics|circuit-breakers|gates|messages|--json]\n\n status — UOK ledger health, last run, last error, historical drift, startup gate, and gate health\n metrics — Render Prometheus-format metrics to .sf/runtime/uok-metrics.prom and display\n circuit-breakers — List all circuit breaker states and failure streaks\n gates — List observed gate runs and circuit breaker state\n messages — Show message bus status\n --json — Same as status but outputs JSON", "info", ); return; @@ -253,6 +254,62 @@ export async function handleUok(args, ctx) { ctx.ui.notify(lines.join("\n"), "info"); return; } + if (trimmed === "gates") { + const basePath = process.cwd(); + if (!isDbAvailable()) { + await ensureDbOpen(basePath); + } + if (!isDbAvailable()) { + ctx.ui.notify("Database unavailable", "error"); + return; + } + const gateIds = getDistinctGateIds(); + if (gateIds.length === 0) { + ctx.ui.notify("No UOK gate runs recorded yet.", "info"); + return; + } + const lines = ["Observed gates", ""]; + for (const id of gateIds) { + const cb = getGateCircuitBreaker(id); + lines.push( + ` ${id} cb=${cb.state} streak=${cb.failureStreak} lastFail=${cb.lastFailureAt ?? "never"}`, + ); + } + ctx.ui.notify(lines.join("\n"), "info"); + return; + } + if (trimmed === "messages" || trimmed.startsWith("messages ")) { + const basePath = process.cwd(); + if (!isDbAvailable()) { + await ensureDbOpen(basePath); + } + if (!isDbAvailable()) { + ctx.ui.notify("Database unavailable", "error"); + return; + } + const sub = trimmed.slice("messages".length).trim(); + if (sub === "compact") { + const bus = new MessageBus(basePath); + const result = bus.compact(); + ctx.ui.notify( + `Message bus compacted: ${result.before} → ${result.after} messages`, + "info", + ); + return; + } + const m = getUokMessageBusMetrics(); + const lines = ["Message bus", ""]; + lines.push(`Total messages: ${m.totalMessages}`); + lines.push(`Unread messages: ${m.totalUnread}`); + lines.push(`Unique agents: ${m.uniqueAgents}`); + lines.push(`Unique conversations: ${m.uniqueConversations}`); + lines.push(""); + lines.push( + "Tip: /sf uok messages compact — remove messages older than retention period", + ); + ctx.ui.notify(lines.join("\n"), "info"); + return; + } const status = await collectUokStatus(process.cwd()); if (trimmed === "--json" || trimmed === "json") { ctx.ui.notify(JSON.stringify(status, null, 2), "info"); diff --git a/src/resources/extensions/sf/sf-db.js b/src/resources/extensions/sf/sf-db.js index e2fb69026..72cea9762 100644 --- a/src/resources/extensions/sf/sf-db.js +++ b/src/resources/extensions/sf/sf-db.js @@ -5131,6 +5131,52 @@ export function getUokMessageReadIds(agentId) { return []; } } +export function getUokMessageBusMetrics() { + if (!currentDb) { + return { + totalMessages: 0, + totalUnread: 0, + uniqueAgents: 0, + uniqueConversations: 0, + }; + } + try { + const totalRow = currentDb + .prepare("SELECT COUNT(*) AS cnt FROM uok_messages") + .get(); + const unreadRow = currentDb + .prepare( + `SELECT COUNT(*) AS cnt FROM uok_messages m + WHERE NOT EXISTS ( + SELECT 1 FROM uok_message_reads r + WHERE r.message_id = m.id + AND r.agent_id = m.to_agent + )`, + ) + .get(); + const agentsRow = currentDb + .prepare(`SELECT COUNT(DISTINCT to_agent) AS cnt FROM uok_messages`) + .get(); + const convRow = currentDb + .prepare( + `SELECT COUNT(DISTINCT from_agent || ':' || to_agent) AS cnt FROM uok_messages`, + ) + .get(); + return { + totalMessages: totalRow?.cnt ?? 0, + totalUnread: unreadRow?.cnt ?? 0, + uniqueAgents: agentsRow?.cnt ?? 0, + uniqueConversations: convRow?.cnt ?? 0, + }; + } catch { + return { + totalMessages: 0, + totalUnread: 0, + uniqueAgents: 0, + uniqueConversations: 0, + }; + } +} function asStringOrNull(value) { return typeof value === "string" && value.length > 0 ? value : null; } diff --git a/src/resources/extensions/sf/tests/uok-message-bus.test.mjs b/src/resources/extensions/sf/tests/uok-message-bus.test.mjs index 362b599ef..a44dc1906 100644 --- a/src/resources/extensions/sf/tests/uok-message-bus.test.mjs +++ b/src/resources/extensions/sf/tests/uok-message-bus.test.mjs @@ -3,7 +3,11 @@ import { mkdtempSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { afterEach, test } from "vitest"; -import { closeDatabase, insertUokMessage } from "../sf-db.js"; +import { + closeDatabase, + getUokMessageBusMetrics, + insertUokMessage, +} from "../sf-db.js"; import { AgentInbox, MessageBus } from "../uok/message-bus.js"; const tmpRoots = []; @@ -181,3 +185,42 @@ test("agentInbox_markRead_returns_false_for_unknown_id", () => { const inbox = new AgentInbox("agent-b", root); assert.equal(inbox.markRead("nope"), false); }); + +test("getUokMessageBusMetrics_returns_zero_when_empty", () => { + makeProject(); + const m = getUokMessageBusMetrics(); + assert.equal(m.totalMessages, 0); + assert.equal(m.totalUnread, 0); + assert.equal(m.uniqueAgents, 0); + assert.equal(m.uniqueConversations, 0); +}); + +test("getUokMessageBusMetrics_counts_messages_and_unread", () => { + const root = makeProject(); + const bus = new MessageBus(root); + bus.send("agent-a", "agent-b", "hello"); + bus.send("agent-a", "agent-c", "world"); + bus.send("agent-c", "agent-b", "unread"); + + const m = getUokMessageBusMetrics(); + assert.equal(m.totalMessages, 3); + assert.equal(m.totalUnread, 3); + assert.equal(m.uniqueAgents, 2); // agent-b, agent-c + assert.equal(m.uniqueConversations, 3); + + // Mark one read + bus.getInbox("agent-b").markRead(bus.getInbox("agent-b").list()[0].id); + const m2 = getUokMessageBusMetrics(); + assert.equal(m2.totalUnread, 2); +}); + +test("getUokMessageBusMetrics_ignores_reads_by_non_recipient_for_unread_count", () => { + const root = makeProject(); + const bus = new MessageBus(root); + const id = bus.send("agent-a", "agent-b", "hello"); + + bus.getInbox("agent-c").markRead(id); + const m = getUokMessageBusMetrics(); + assert.equal(m.totalMessages, 1); + assert.equal(m.totalUnread, 1); +}); diff --git a/src/resources/extensions/sf/tests/uok-metrics-exposition.test.mjs b/src/resources/extensions/sf/tests/uok-metrics-exposition.test.mjs index 77a5ed2b4..021c71fed 100644 --- a/src/resources/extensions/sf/tests/uok-metrics-exposition.test.mjs +++ b/src/resources/extensions/sf/tests/uok-metrics-exposition.test.mjs @@ -10,7 +10,12 @@ import { mkdtempSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { afterEach, test } from "vitest"; -import { closeDatabase, insertGateRun, openDatabase } from "../sf-db.js"; +import { + closeDatabase, + insertGateRun, + insertUokMessage, + openDatabase, +} from "../sf-db.js"; import { invalidateMetricsCache, readUokMetrics, @@ -80,3 +85,21 @@ test("writeUokMetrics_when_cache_invalidated_refreshes_db_snapshot", () => { /uok_gate_runs_failed_total\{gate_id="cache-gate"\} 1/, ); }); + +test("writeUokMetrics_includes_message_bus_metrics", () => { + const project = makeProject(); + insertUokMessage({ + id: "msg-1", + from: "agent-a", + to: "agent-b", + body: "hello", + sentAt: new Date().toISOString(), + metadata: {}, + }); + writeUokMetrics(project); + const text = readUokMetrics(project); + assert.match(text, /uok_message_bus_total_messages 1/); + assert.match(text, /uok_message_bus_unread_messages 1/); + assert.match(text, /uok_message_bus_unique_agents 1/); + assert.match(text, /uok_message_bus_unique_conversations 1/); +}); diff --git a/src/resources/extensions/sf/tests/uok-status-command.test.mjs b/src/resources/extensions/sf/tests/uok-status-command.test.mjs index f5b440392..48de0ffeb 100644 --- a/src/resources/extensions/sf/tests/uok-status-command.test.mjs +++ b/src/resources/extensions/sf/tests/uok-status-command.test.mjs @@ -3,9 +3,14 @@ import { mkdirSync, mkdtempSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { afterEach, test } from "vitest"; -import { collectUokStatus, formatUokStatus } from "../commands-uok.js"; +import { + collectUokStatus, + formatUokStatus, + handleUok, +} from "../commands-uok.js"; import { closeDatabase, + insertGateRun, openDatabase, recordUokRunExit, recordUokRunStart, @@ -135,3 +140,66 @@ test("formatUokStatus_shows_operator_fields_without_raw_json", () => { assert.match(rendered, /ledger boom/); assert.match(rendered, /legacy missing exits: 3/); }); + +test("handleUok_gates_when_empty_shows_no_recorded_gate_runs", async () => { + const notifications = []; + const ctx = { + ui: { + notify: (msg, level) => notifications.push({ msg, level }), + }, + }; + await handleUok("gates", ctx); + assert.ok(notifications.length >= 1); + assert.match(notifications[0].msg, /No UOK gate runs recorded yet/); +}); + +test("handleUok_gates_lists_observed_gate_runs", async () => { + openDatabase(":memory:"); + insertGateRun({ + traceId: "trace-gates", + turnId: "turn-gates", + gateId: "observed-gate", + gateType: "verification", + unitType: "execute-task", + unitId: "M001/S01/T01", + milestoneId: "M001", + sliceId: "S01", + taskId: "T01", + outcome: "pass", + failureClass: "none", + rationale: "ok", + findings: "", + attempt: 1, + maxAttempts: 1, + retryable: false, + evaluatedAt: new Date(NOW).toISOString(), + durationMs: 10, + }); + const notifications = []; + const ctx = { + ui: { + notify: (msg, level) => notifications.push({ msg, level }), + }, + }; + + await handleUok("gates", ctx); + + assert.ok(notifications.length >= 1); + assert.match(notifications[0].msg, /Observed gates/); + assert.match(notifications[0].msg, /observed-gate/); +}); + +test("handleUok_messages_shows_bus_metrics", async () => { + makeProject(); + openDatabase(":memory:"); + const notifications = []; + const ctx = { + ui: { + notify: (msg, level) => notifications.push({ msg, level }), + }, + }; + await handleUok("messages", ctx); + assert.ok(notifications.length >= 1); + assert.match(notifications[0].msg, /Message bus/); + assert.match(notifications[0].msg, /Total messages: 0/); +}); diff --git a/src/resources/extensions/sf/uok/metrics-exposition.js b/src/resources/extensions/sf/uok/metrics-exposition.js index aeb7ecf96..97c2712d3 100644 --- a/src/resources/extensions/sf/uok/metrics-exposition.js +++ b/src/resources/extensions/sf/uok/metrics-exposition.js @@ -16,6 +16,7 @@ import { getGateCircuitBreaker, getGateLatencyStats, getGateRunStats, + getUokMessageBusMetrics, isDbAvailable, } from "../sf-db.js"; @@ -99,6 +100,18 @@ function collectGateMetrics(gateIds) { return lines; } +function collectMessageBusMetrics() { + const m = getUokMessageBusMetrics(); + const lines = []; + lines.push(fmtGauge("uok_message_bus_total_messages", m.totalMessages)); + lines.push(fmtGauge("uok_message_bus_unread_messages", m.totalUnread)); + lines.push(fmtGauge("uok_message_bus_unique_agents", m.uniqueAgents)); + lines.push( + fmtGauge("uok_message_bus_unique_conversations", m.uniqueConversations), + ); + return lines; +} + export function buildMetricsText(gateIds) { const cacheKey = gateIds ? gateIds.join(",") : ""; const now = Date.now(); @@ -130,6 +143,14 @@ export function buildMetricsText(gateIds) { "# TYPE uok_gate_circuit_breaker_state gauge", "# HELP uok_gate_circuit_breaker_failure_streak Consecutive failures since last pass", "# TYPE uok_gate_circuit_breaker_failure_streak gauge", + "# HELP uok_message_bus_total_messages Total messages in the bus", + "# TYPE uok_message_bus_total_messages gauge", + "# HELP uok_message_bus_unread_messages Unread messages in the bus", + "# TYPE uok_message_bus_unread_messages gauge", + "# HELP uok_message_bus_unique_agents Unique agent recipients", + "# TYPE uok_message_bus_unique_agents gauge", + "# HELP uok_message_bus_unique_conversations Unique agent conversation pairs", + "# TYPE uok_message_bus_unique_conversations gauge", ]; if (isDbAvailable()) { const ids = @@ -139,6 +160,7 @@ export function buildMetricsText(gateIds) { ? getDistinctGateIds() : DEFAULT_GATE_NAMES; lines.push(...collectGateMetrics(ids)); + lines.push(...collectMessageBusMetrics()); } const text = lines.join("\n") + "\n"; _metricsCacheText = text;