From 268e7ac6783a5374baba3c60a7c1b8a881494ca8 Mon Sep 17 00:00:00 2001 From: Mikael Hugo Date: Thu, 7 May 2026 05:08:44 +0200 Subject: [PATCH] feat: publish uok diagnostics to observer inbox --- .../extensions/sf/doctor-providers.js | 5 ++- .../tests/uok-diagnostic-synthesis.test.mjs | 24 ++++++++++++++ .../sf/tests/uok-message-bus.test.mjs | 26 +++++++++++++++ .../tests/vault-credential-resolver.test.ts | 12 +++++-- .../extensions/sf/uok/diagnostic-synthesis.js | 29 ++++++++++++++++ .../extensions/sf/uok/message-bus.js | 33 ++++++++++++++++++- .../sf/vault-credential-resolver.js | 4 ++- 7 files changed, 128 insertions(+), 5 deletions(-) diff --git a/src/resources/extensions/sf/doctor-providers.js b/src/resources/extensions/sf/doctor-providers.js index 10b5bd774..531de37db 100644 --- a/src/resources/extensions/sf/doctor-providers.js +++ b/src/resources/extensions/sf/doctor-providers.js @@ -15,7 +15,10 @@ import { getEnvApiKey } from "@singularity-forge/pi-ai"; import { AuthStorage } from "@singularity-forge/pi-coding-agent"; import { getAuthPath, PROVIDER_REGISTRY } from "./key-manager.js"; import { loadEffectiveSFPreferences } from "./preferences.js"; -import { couldBeVaultUri, hasProviderCredentialEnvVar } from "./vault-credential-resolver.js"; +import { + couldBeVaultUri, + hasProviderCredentialEnvVar, +} from "./vault-credential-resolver.js"; // ── Model → Provider ID mapping ─────────────────────────────────────────────── /** diff --git a/src/resources/extensions/sf/tests/uok-diagnostic-synthesis.test.mjs b/src/resources/extensions/sf/tests/uok-diagnostic-synthesis.test.mjs index ed42d4e14..ccd12e744 100644 --- a/src/resources/extensions/sf/tests/uok-diagnostic-synthesis.test.mjs +++ b/src/resources/extensions/sf/tests/uok-diagnostic-synthesis.test.mjs @@ -21,6 +21,7 @@ import { synthesizeUokDiagnostics, writeUokDiagnostics, } from "../uok/diagnostic-synthesis.js"; +import { MessageBus } from "../uok/message-bus.js"; import { writeUnitRuntimeRecord } from "../uok/unit-runtime.js"; const NOW = Date.parse("2026-05-06T00:00:00.000Z"); @@ -240,3 +241,26 @@ test("writeUokDiagnostics_persists_report_for_status_widget_and_doctor", () => { ); assert.equal(diagnostics.reportPath, reportPath); }); + +test("writeUokDiagnostics_when_attention_publishes_deduped_observer_message", () => { + const root = makeProject(); + openDatabase(":memory:"); + writeAutoLock(root, { + pid: 999_999_999, + startedAt: new Date(NOW - 60_000).toISOString(), + unitType: "execute-task", + unitId: "M010/S08/T08.1", + unitStartedAt: new Date(NOW - 60_000).toISOString(), + sessionFile: "/tmp/session.jsonl", + }); + + writeUokDiagnostics(root, { nowMs: NOW, processRows: [] }); + writeUokDiagnostics(root, { nowMs: NOW, processRows: [] }); + + const inbox = new MessageBus(root).getInbox("uok-observer"); + const messages = inbox.list(); + assert.equal(messages.length, 1); + assert.equal(messages[0].from, "uok-diagnostics"); + assert.match(messages[0].body, /stale-lock/); + assert.deepEqual(messages[0].metadata.issueCodes, ["stale-lock"]); +}); diff --git a/src/resources/extensions/sf/tests/uok-message-bus.test.mjs b/src/resources/extensions/sf/tests/uok-message-bus.test.mjs index 1e97361fe..362b599ef 100644 --- a/src/resources/extensions/sf/tests/uok-message-bus.test.mjs +++ b/src/resources/extensions/sf/tests/uok-message-bus.test.mjs @@ -72,6 +72,32 @@ test("messageBus_send_when_inbox_already_hydrated_does_not_double_insert", () => assert.equal(bus2.getInbox("agent-b").list().length, 1); }); +test("messageBus_sendOnce_when_called_repeatedly_keeps_single_inbox_message", () => { + const root = makeProject(); + const bus = new MessageBus(root); + + const first = bus.sendOnce( + "uok-diagnostics", + "uok-observer", + "same health issue", + { verdict: "attention" }, + "diagnostic:stable", + ); + const second = bus.sendOnce( + "uok-diagnostics", + "uok-observer", + "same health issue", + { verdict: "attention" }, + "diagnostic:stable", + ); + + assert.equal(second, first); + const messages = bus.getInbox("uok-observer").list(); + assert.equal(messages.length, 1); + assert.equal(messages[0].body, "same health issue"); + assert.equal(messages[0].metadata.dedupeKey, "diagnostic:stable"); +}); + test("messageBus_markRead_updates_state_and_persists", () => { const root = makeProject(); const bus = new MessageBus(root); diff --git a/src/resources/extensions/sf/tests/vault-credential-resolver.test.ts b/src/resources/extensions/sf/tests/vault-credential-resolver.test.ts index 0af39e519..4c97a6dbd 100644 --- a/src/resources/extensions/sf/tests/vault-credential-resolver.test.ts +++ b/src/resources/extensions/sf/tests/vault-credential-resolver.test.ts @@ -165,7 +165,11 @@ describe("Vault Credential Resolver", () => { describe("getCredentialValue", () => { it("returns value from resolved result", () => { - const result = { resolved: true, value: "sk-ant-abc123", source: "plaintext" }; + const result = { + resolved: true, + value: "sk-ant-abc123", + source: "plaintext", + }; expect(getCredentialValue(result)).toBe("sk-ant-abc123"); }); @@ -198,7 +202,11 @@ describe("Vault Credential Resolver", () => { describe("formatCredentialInfo", () => { it("formats resolved plaintext credential", () => { - const result = { resolved: true, value: "sk-ant-abc123", source: "plaintext" }; + const result = { + resolved: true, + value: "sk-ant-abc123", + source: "plaintext", + }; const formatted = formatCredentialInfo(result, "anthropic"); expect(formatted).toContain("anthropic"); expect(formatted).toContain("[13 chars]"); diff --git a/src/resources/extensions/sf/uok/diagnostic-synthesis.js b/src/resources/extensions/sf/uok/diagnostic-synthesis.js index a0b8d5e11..686478aa9 100644 --- a/src/resources/extensions/sf/uok/diagnostic-synthesis.js +++ b/src/resources/extensions/sf/uok/diagnostic-synthesis.js @@ -4,6 +4,7 @@ import { join } from "node:path"; import { isLockProcessAlive, readCrashLock } from "../crash-recovery.js"; import { sfRoot } from "../paths.js"; import { getUokRuns, isDbAvailable } from "../sf-db.js"; +import { MessageBus } from "./message-bus.js"; import { summarizeParityHealth, writeParityReport } from "./parity-report.js"; import { decideUnitRuntimeDispatch, @@ -349,9 +350,37 @@ export function writeUokDiagnostics(basePath, options = {}) { const path = diagnosticsPath(basePath); mkdirSync(join(sfRoot(basePath), "runtime"), { recursive: true }); writeFileSync(path, `${JSON.stringify(diagnostics, null, 2)}\n`, "utf-8"); + publishDiagnosticsToObserver(basePath, diagnostics); return diagnostics; } +function publishDiagnosticsToObserver(basePath, diagnostics) { + if (!isDbAvailable() || diagnostics.verdict === "clear") return; + try { + const issueCodes = diagnostics.issues.map((issue) => issue.code).join(","); + const unit = diagnostics.currentUnit + ? `${diagnostics.currentUnit.unitType ?? "unknown"}:${diagnostics.currentUnit.unitId ?? "unknown"}` + : "no-unit"; + const bus = new MessageBus(basePath); + bus.sendOnce( + "uok-diagnostics", + "uok-observer", + `UOK diagnostics ${diagnostics.verdict}/${diagnostics.classification}: ${issueCodes || "attention"}`, + { + verdict: diagnostics.verdict, + classification: diagnostics.classification, + issueCodes: diagnostics.issues.map((issue) => issue.code), + reportPath: diagnostics.reportPath, + generatedAt: diagnostics.generatedAt, + currentUnit: diagnostics.currentUnit, + }, + `uok-diagnostics:${diagnostics.verdict}:${diagnostics.classification}:${unit}:${issueCodes}`, + ); + } catch { + /* observer messages are diagnostic-only; never mask status/doctor output */ + } +} + export function readUokDiagnostics(basePath) { const path = diagnosticsPath(basePath); if (!existsSync(path)) return null; diff --git a/src/resources/extensions/sf/uok/message-bus.js b/src/resources/extensions/sf/uok/message-bus.js index 04de0a3ae..804c11eb5 100644 --- a/src/resources/extensions/sf/uok/message-bus.js +++ b/src/resources/extensions/sf/uok/message-bus.js @@ -9,7 +9,7 @@ * observer chains. */ -import { randomUUID } from "node:crypto"; +import { createHash, randomUUID } from "node:crypto"; import { mkdirSync } from "node:fs"; import { join } from "node:path"; import { sfRoot } from "../paths.js"; @@ -27,6 +27,11 @@ import { const DEFAULT_RETENTION_DAYS = 7; const DEFAULT_MAX_INBOX_SIZE = 1000; +function deterministicMessageId(key) { + const digest = createHash("sha256").update(String(key)).digest("hex"); + return `msg-${digest.slice(0, 32)}`; +} + function ensureDb(basePath) { if (isDbAvailable()) return; const dir = sfRoot(basePath); @@ -149,6 +154,32 @@ export class MessageBus { return message.id; } + /** + * Send an idempotent message keyed by a stable event identity. + * + * Purpose: let recurring diagnostics and observer chains publish durable + * notifications without flooding an inbox on every status/doctor poll. + * + * Consumer: UOK diagnostics when surfacing repeated runtime health issues. + */ + sendOnce(from, to, body, metadata = {}, dedupeKey) { + const key = dedupeKey ?? `${from}:${to}:${body}`; + const message = { + id: deterministicMessageId(key), + from, + to, + body, + metadata: { ...metadata, dedupeKey: key }, + sentAt: new Date().toISOString(), + deliveredAt: new Date().toISOString(), + }; + + insertUokMessage(message); + const targetInbox = this._getOrCreateInbox(to); + targetInbox.refresh(); + return message.id; + } + broadcast(from, recipients, body, metadata = {}) { const ids = []; for (const to of recipients) { diff --git a/src/resources/extensions/sf/vault-credential-resolver.js b/src/resources/extensions/sf/vault-credential-resolver.js index 23691ae3e..f6cd9e394 100644 --- a/src/resources/extensions/sf/vault-credential-resolver.js +++ b/src/resources/extensions/sf/vault-credential-resolver.js @@ -151,7 +151,9 @@ export function formatCredentialInfo(resolveResult, providerId) { ? `[${resolveResult.value.length} chars]` : "[empty]"; - const warning = resolveResult.warning ? ` (warning: ${resolveResult.warning})` : ""; + const warning = resolveResult.warning + ? ` (warning: ${resolveResult.warning})` + : ""; return `${providerId}: ${valuePreview} from ${sourceLabel}${warning}`; }