feat: expose uok message bus metrics

This commit is contained in:
Mikael Hugo 2026-05-07 05:19:41 +02:00
parent 268e7ac678
commit 5bc3895586
6 changed files with 264 additions and 5 deletions

View file

@ -5,11 +5,12 @@ import { sfRoot } from "./paths.js";
import {
getDistinctGateIds,
getGateCircuitBreaker,
getUokMessageBusMetrics,
getUokRuns,
isDbAvailable,
} from "./sf-db.js";
import { writeUokDiagnostics } from "./uok/diagnostic-synthesis.js";
import { UokGateRunner } from "./uok/gate-runner.js";
import { MessageBus } from "./uok/message-bus.js";
import { readUokMetrics, writeUokMetrics } from "./uok/metrics-exposition.js";
import {
summarizeParityHealth,
@ -208,7 +209,7 @@ export async function handleUok(args, ctx) {
const trimmed = args.trim();
if (trimmed === "help" || trimmed === "--help") {
ctx.ui.notify(
"Usage: /sf uok [status|metrics|circuit-breakers|--json]\n\n status — UOK ledger health, last run, last error, historical drift, startup gate, and gate health\n metrics — Render Prometheus-format metrics to .sf/runtime/uok-metrics.prom and display\n circuit-breakers — List all circuit breaker states and failure streaks\n --json — Same as status but outputs JSON",
"Usage: /sf uok [status|metrics|circuit-breakers|gates|messages|--json]\n\n status — UOK ledger health, last run, last error, historical drift, startup gate, and gate health\n metrics — Render Prometheus-format metrics to .sf/runtime/uok-metrics.prom and display\n circuit-breakers — List all circuit breaker states and failure streaks\n gates — List observed gate runs and circuit breaker state\n messages — Show message bus status\n --json — Same as status but outputs JSON",
"info",
);
return;
@ -253,6 +254,62 @@ export async function handleUok(args, ctx) {
ctx.ui.notify(lines.join("\n"), "info");
return;
}
if (trimmed === "gates") {
const basePath = process.cwd();
if (!isDbAvailable()) {
await ensureDbOpen(basePath);
}
if (!isDbAvailable()) {
ctx.ui.notify("Database unavailable", "error");
return;
}
const gateIds = getDistinctGateIds();
if (gateIds.length === 0) {
ctx.ui.notify("No UOK gate runs recorded yet.", "info");
return;
}
const lines = ["Observed gates", ""];
for (const id of gateIds) {
const cb = getGateCircuitBreaker(id);
lines.push(
` ${id} cb=${cb.state} streak=${cb.failureStreak} lastFail=${cb.lastFailureAt ?? "never"}`,
);
}
ctx.ui.notify(lines.join("\n"), "info");
return;
}
if (trimmed === "messages" || trimmed.startsWith("messages ")) {
const basePath = process.cwd();
if (!isDbAvailable()) {
await ensureDbOpen(basePath);
}
if (!isDbAvailable()) {
ctx.ui.notify("Database unavailable", "error");
return;
}
const sub = trimmed.slice("messages".length).trim();
if (sub === "compact") {
const bus = new MessageBus(basePath);
const result = bus.compact();
ctx.ui.notify(
`Message bus compacted: ${result.before}${result.after} messages`,
"info",
);
return;
}
const m = getUokMessageBusMetrics();
const lines = ["Message bus", ""];
lines.push(`Total messages: ${m.totalMessages}`);
lines.push(`Unread messages: ${m.totalUnread}`);
lines.push(`Unique agents: ${m.uniqueAgents}`);
lines.push(`Unique conversations: ${m.uniqueConversations}`);
lines.push("");
lines.push(
"Tip: /sf uok messages compact — remove messages older than retention period",
);
ctx.ui.notify(lines.join("\n"), "info");
return;
}
const status = await collectUokStatus(process.cwd());
if (trimmed === "--json" || trimmed === "json") {
ctx.ui.notify(JSON.stringify(status, null, 2), "info");

View file

@ -5131,6 +5131,52 @@ export function getUokMessageReadIds(agentId) {
return [];
}
}
export function getUokMessageBusMetrics() {
if (!currentDb) {
return {
totalMessages: 0,
totalUnread: 0,
uniqueAgents: 0,
uniqueConversations: 0,
};
}
try {
const totalRow = currentDb
.prepare("SELECT COUNT(*) AS cnt FROM uok_messages")
.get();
const unreadRow = currentDb
.prepare(
`SELECT COUNT(*) AS cnt FROM uok_messages m
WHERE NOT EXISTS (
SELECT 1 FROM uok_message_reads r
WHERE r.message_id = m.id
AND r.agent_id = m.to_agent
)`,
)
.get();
const agentsRow = currentDb
.prepare(`SELECT COUNT(DISTINCT to_agent) AS cnt FROM uok_messages`)
.get();
const convRow = currentDb
.prepare(
`SELECT COUNT(DISTINCT from_agent || ':' || to_agent) AS cnt FROM uok_messages`,
)
.get();
return {
totalMessages: totalRow?.cnt ?? 0,
totalUnread: unreadRow?.cnt ?? 0,
uniqueAgents: agentsRow?.cnt ?? 0,
uniqueConversations: convRow?.cnt ?? 0,
};
} catch {
return {
totalMessages: 0,
totalUnread: 0,
uniqueAgents: 0,
uniqueConversations: 0,
};
}
}
function asStringOrNull(value) {
return typeof value === "string" && value.length > 0 ? value : null;
}

View file

@ -3,7 +3,11 @@ import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, test } from "vitest";
import { closeDatabase, insertUokMessage } from "../sf-db.js";
import {
closeDatabase,
getUokMessageBusMetrics,
insertUokMessage,
} from "../sf-db.js";
import { AgentInbox, MessageBus } from "../uok/message-bus.js";
const tmpRoots = [];
@ -181,3 +185,42 @@ test("agentInbox_markRead_returns_false_for_unknown_id", () => {
const inbox = new AgentInbox("agent-b", root);
assert.equal(inbox.markRead("nope"), false);
});
test("getUokMessageBusMetrics_returns_zero_when_empty", () => {
makeProject();
const m = getUokMessageBusMetrics();
assert.equal(m.totalMessages, 0);
assert.equal(m.totalUnread, 0);
assert.equal(m.uniqueAgents, 0);
assert.equal(m.uniqueConversations, 0);
});
test("getUokMessageBusMetrics_counts_messages_and_unread", () => {
const root = makeProject();
const bus = new MessageBus(root);
bus.send("agent-a", "agent-b", "hello");
bus.send("agent-a", "agent-c", "world");
bus.send("agent-c", "agent-b", "unread");
const m = getUokMessageBusMetrics();
assert.equal(m.totalMessages, 3);
assert.equal(m.totalUnread, 3);
assert.equal(m.uniqueAgents, 2); // agent-b, agent-c
assert.equal(m.uniqueConversations, 3);
// Mark one read
bus.getInbox("agent-b").markRead(bus.getInbox("agent-b").list()[0].id);
const m2 = getUokMessageBusMetrics();
assert.equal(m2.totalUnread, 2);
});
test("getUokMessageBusMetrics_ignores_reads_by_non_recipient_for_unread_count", () => {
const root = makeProject();
const bus = new MessageBus(root);
const id = bus.send("agent-a", "agent-b", "hello");
bus.getInbox("agent-c").markRead(id);
const m = getUokMessageBusMetrics();
assert.equal(m.totalMessages, 1);
assert.equal(m.totalUnread, 1);
});

View file

@ -10,7 +10,12 @@ import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, test } from "vitest";
import { closeDatabase, insertGateRun, openDatabase } from "../sf-db.js";
import {
closeDatabase,
insertGateRun,
insertUokMessage,
openDatabase,
} from "../sf-db.js";
import {
invalidateMetricsCache,
readUokMetrics,
@ -80,3 +85,21 @@ test("writeUokMetrics_when_cache_invalidated_refreshes_db_snapshot", () => {
/uok_gate_runs_failed_total\{gate_id="cache-gate"\} 1/,
);
});
test("writeUokMetrics_includes_message_bus_metrics", () => {
const project = makeProject();
insertUokMessage({
id: "msg-1",
from: "agent-a",
to: "agent-b",
body: "hello",
sentAt: new Date().toISOString(),
metadata: {},
});
writeUokMetrics(project);
const text = readUokMetrics(project);
assert.match(text, /uok_message_bus_total_messages 1/);
assert.match(text, /uok_message_bus_unread_messages 1/);
assert.match(text, /uok_message_bus_unique_agents 1/);
assert.match(text, /uok_message_bus_unique_conversations 1/);
});

View file

@ -3,9 +3,14 @@ import { mkdirSync, mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, test } from "vitest";
import { collectUokStatus, formatUokStatus } from "../commands-uok.js";
import {
collectUokStatus,
formatUokStatus,
handleUok,
} from "../commands-uok.js";
import {
closeDatabase,
insertGateRun,
openDatabase,
recordUokRunExit,
recordUokRunStart,
@ -135,3 +140,66 @@ test("formatUokStatus_shows_operator_fields_without_raw_json", () => {
assert.match(rendered, /ledger boom/);
assert.match(rendered, /legacy missing exits: 3/);
});
test("handleUok_gates_when_empty_shows_no_recorded_gate_runs", async () => {
const notifications = [];
const ctx = {
ui: {
notify: (msg, level) => notifications.push({ msg, level }),
},
};
await handleUok("gates", ctx);
assert.ok(notifications.length >= 1);
assert.match(notifications[0].msg, /No UOK gate runs recorded yet/);
});
test("handleUok_gates_lists_observed_gate_runs", async () => {
openDatabase(":memory:");
insertGateRun({
traceId: "trace-gates",
turnId: "turn-gates",
gateId: "observed-gate",
gateType: "verification",
unitType: "execute-task",
unitId: "M001/S01/T01",
milestoneId: "M001",
sliceId: "S01",
taskId: "T01",
outcome: "pass",
failureClass: "none",
rationale: "ok",
findings: "",
attempt: 1,
maxAttempts: 1,
retryable: false,
evaluatedAt: new Date(NOW).toISOString(),
durationMs: 10,
});
const notifications = [];
const ctx = {
ui: {
notify: (msg, level) => notifications.push({ msg, level }),
},
};
await handleUok("gates", ctx);
assert.ok(notifications.length >= 1);
assert.match(notifications[0].msg, /Observed gates/);
assert.match(notifications[0].msg, /observed-gate/);
});
test("handleUok_messages_shows_bus_metrics", async () => {
makeProject();
openDatabase(":memory:");
const notifications = [];
const ctx = {
ui: {
notify: (msg, level) => notifications.push({ msg, level }),
},
};
await handleUok("messages", ctx);
assert.ok(notifications.length >= 1);
assert.match(notifications[0].msg, /Message bus/);
assert.match(notifications[0].msg, /Total messages: 0/);
});

View file

@ -16,6 +16,7 @@ import {
getGateCircuitBreaker,
getGateLatencyStats,
getGateRunStats,
getUokMessageBusMetrics,
isDbAvailable,
} from "../sf-db.js";
@ -99,6 +100,18 @@ function collectGateMetrics(gateIds) {
return lines;
}
function collectMessageBusMetrics() {
const m = getUokMessageBusMetrics();
const lines = [];
lines.push(fmtGauge("uok_message_bus_total_messages", m.totalMessages));
lines.push(fmtGauge("uok_message_bus_unread_messages", m.totalUnread));
lines.push(fmtGauge("uok_message_bus_unique_agents", m.uniqueAgents));
lines.push(
fmtGauge("uok_message_bus_unique_conversations", m.uniqueConversations),
);
return lines;
}
export function buildMetricsText(gateIds) {
const cacheKey = gateIds ? gateIds.join(",") : "";
const now = Date.now();
@ -130,6 +143,14 @@ export function buildMetricsText(gateIds) {
"# TYPE uok_gate_circuit_breaker_state gauge",
"# HELP uok_gate_circuit_breaker_failure_streak Consecutive failures since last pass",
"# TYPE uok_gate_circuit_breaker_failure_streak gauge",
"# HELP uok_message_bus_total_messages Total messages in the bus",
"# TYPE uok_message_bus_total_messages gauge",
"# HELP uok_message_bus_unread_messages Unread messages in the bus",
"# TYPE uok_message_bus_unread_messages gauge",
"# HELP uok_message_bus_unique_agents Unique agent recipients",
"# TYPE uok_message_bus_unique_agents gauge",
"# HELP uok_message_bus_unique_conversations Unique agent conversation pairs",
"# TYPE uok_message_bus_unique_conversations gauge",
];
if (isDbAvailable()) {
const ids =
@ -139,6 +160,7 @@ export function buildMetricsText(gateIds) {
? getDistinctGateIds()
: DEFAULT_GATE_NAMES;
lines.push(...collectGateMetrics(ids));
lines.push(...collectMessageBusMetrics());
}
const text = lines.join("\n") + "\n";
_metricsCacheText = text;