feat: publish uok diagnostics to observer inbox

This commit is contained in:
Mikael Hugo 2026-05-07 05:08:44 +02:00
parent c0973ac287
commit 268e7ac678
7 changed files with 128 additions and 5 deletions

View file

@ -15,7 +15,10 @@ import { getEnvApiKey } from "@singularity-forge/pi-ai";
import { AuthStorage } from "@singularity-forge/pi-coding-agent";
import { getAuthPath, PROVIDER_REGISTRY } from "./key-manager.js";
import { loadEffectiveSFPreferences } from "./preferences.js";
import { couldBeVaultUri, hasProviderCredentialEnvVar } from "./vault-credential-resolver.js";
import {
couldBeVaultUri,
hasProviderCredentialEnvVar,
} from "./vault-credential-resolver.js";
// ── Model → Provider ID mapping ───────────────────────────────────────────────
/**

View file

@ -21,6 +21,7 @@ import {
synthesizeUokDiagnostics,
writeUokDiagnostics,
} from "../uok/diagnostic-synthesis.js";
import { MessageBus } from "../uok/message-bus.js";
import { writeUnitRuntimeRecord } from "../uok/unit-runtime.js";
const NOW = Date.parse("2026-05-06T00:00:00.000Z");
@ -240,3 +241,26 @@ test("writeUokDiagnostics_persists_report_for_status_widget_and_doctor", () => {
);
assert.equal(diagnostics.reportPath, reportPath);
});
test("writeUokDiagnostics_when_attention_publishes_deduped_observer_message", () => {
const root = makeProject();
openDatabase(":memory:");
writeAutoLock(root, {
pid: 999_999_999,
startedAt: new Date(NOW - 60_000).toISOString(),
unitType: "execute-task",
unitId: "M010/S08/T08.1",
unitStartedAt: new Date(NOW - 60_000).toISOString(),
sessionFile: "/tmp/session.jsonl",
});
writeUokDiagnostics(root, { nowMs: NOW, processRows: [] });
writeUokDiagnostics(root, { nowMs: NOW, processRows: [] });
const inbox = new MessageBus(root).getInbox("uok-observer");
const messages = inbox.list();
assert.equal(messages.length, 1);
assert.equal(messages[0].from, "uok-diagnostics");
assert.match(messages[0].body, /stale-lock/);
assert.deepEqual(messages[0].metadata.issueCodes, ["stale-lock"]);
});

View file

@ -72,6 +72,32 @@ test("messageBus_send_when_inbox_already_hydrated_does_not_double_insert", () =>
assert.equal(bus2.getInbox("agent-b").list().length, 1);
});
test("messageBus_sendOnce_when_called_repeatedly_keeps_single_inbox_message", () => {
const root = makeProject();
const bus = new MessageBus(root);
const first = bus.sendOnce(
"uok-diagnostics",
"uok-observer",
"same health issue",
{ verdict: "attention" },
"diagnostic:stable",
);
const second = bus.sendOnce(
"uok-diagnostics",
"uok-observer",
"same health issue",
{ verdict: "attention" },
"diagnostic:stable",
);
assert.equal(second, first);
const messages = bus.getInbox("uok-observer").list();
assert.equal(messages.length, 1);
assert.equal(messages[0].body, "same health issue");
assert.equal(messages[0].metadata.dedupeKey, "diagnostic:stable");
});
test("messageBus_markRead_updates_state_and_persists", () => {
const root = makeProject();
const bus = new MessageBus(root);

View file

@ -165,7 +165,11 @@ describe("Vault Credential Resolver", () => {
describe("getCredentialValue", () => {
it("returns value from resolved result", () => {
const result = { resolved: true, value: "sk-ant-abc123", source: "plaintext" };
const result = {
resolved: true,
value: "sk-ant-abc123",
source: "plaintext",
};
expect(getCredentialValue(result)).toBe("sk-ant-abc123");
});
@ -198,7 +202,11 @@ describe("Vault Credential Resolver", () => {
describe("formatCredentialInfo", () => {
it("formats resolved plaintext credential", () => {
const result = { resolved: true, value: "sk-ant-abc123", source: "plaintext" };
const result = {
resolved: true,
value: "sk-ant-abc123",
source: "plaintext",
};
const formatted = formatCredentialInfo(result, "anthropic");
expect(formatted).toContain("anthropic");
expect(formatted).toContain("[13 chars]");

View file

@ -4,6 +4,7 @@ import { join } from "node:path";
import { isLockProcessAlive, readCrashLock } from "../crash-recovery.js";
import { sfRoot } from "../paths.js";
import { getUokRuns, isDbAvailable } from "../sf-db.js";
import { MessageBus } from "./message-bus.js";
import { summarizeParityHealth, writeParityReport } from "./parity-report.js";
import {
decideUnitRuntimeDispatch,
@ -349,9 +350,37 @@ export function writeUokDiagnostics(basePath, options = {}) {
const path = diagnosticsPath(basePath);
mkdirSync(join(sfRoot(basePath), "runtime"), { recursive: true });
writeFileSync(path, `${JSON.stringify(diagnostics, null, 2)}\n`, "utf-8");
publishDiagnosticsToObserver(basePath, diagnostics);
return diagnostics;
}
function publishDiagnosticsToObserver(basePath, diagnostics) {
if (!isDbAvailable() || diagnostics.verdict === "clear") return;
try {
const issueCodes = diagnostics.issues.map((issue) => issue.code).join(",");
const unit = diagnostics.currentUnit
? `${diagnostics.currentUnit.unitType ?? "unknown"}:${diagnostics.currentUnit.unitId ?? "unknown"}`
: "no-unit";
const bus = new MessageBus(basePath);
bus.sendOnce(
"uok-diagnostics",
"uok-observer",
`UOK diagnostics ${diagnostics.verdict}/${diagnostics.classification}: ${issueCodes || "attention"}`,
{
verdict: diagnostics.verdict,
classification: diagnostics.classification,
issueCodes: diagnostics.issues.map((issue) => issue.code),
reportPath: diagnostics.reportPath,
generatedAt: diagnostics.generatedAt,
currentUnit: diagnostics.currentUnit,
},
`uok-diagnostics:${diagnostics.verdict}:${diagnostics.classification}:${unit}:${issueCodes}`,
);
} catch {
/* observer messages are diagnostic-only; never mask status/doctor output */
}
}
export function readUokDiagnostics(basePath) {
const path = diagnosticsPath(basePath);
if (!existsSync(path)) return null;

View file

@ -9,7 +9,7 @@
* observer chains.
*/
import { randomUUID } from "node:crypto";
import { createHash, randomUUID } from "node:crypto";
import { mkdirSync } from "node:fs";
import { join } from "node:path";
import { sfRoot } from "../paths.js";
@ -27,6 +27,11 @@ import {
const DEFAULT_RETENTION_DAYS = 7;
const DEFAULT_MAX_INBOX_SIZE = 1000;
function deterministicMessageId(key) {
const digest = createHash("sha256").update(String(key)).digest("hex");
return `msg-${digest.slice(0, 32)}`;
}
function ensureDb(basePath) {
if (isDbAvailable()) return;
const dir = sfRoot(basePath);
@ -149,6 +154,32 @@ export class MessageBus {
return message.id;
}
/**
* Send an idempotent message keyed by a stable event identity.
*
* Purpose: let recurring diagnostics and observer chains publish durable
* notifications without flooding an inbox on every status/doctor poll.
*
* Consumer: UOK diagnostics when surfacing repeated runtime health issues.
*/
sendOnce(from, to, body, metadata = {}, dedupeKey) {
const key = dedupeKey ?? `${from}:${to}:${body}`;
const message = {
id: deterministicMessageId(key),
from,
to,
body,
metadata: { ...metadata, dedupeKey: key },
sentAt: new Date().toISOString(),
deliveredAt: new Date().toISOString(),
};
insertUokMessage(message);
const targetInbox = this._getOrCreateInbox(to);
targetInbox.refresh();
return message.id;
}
broadcast(from, recipients, body, metadata = {}) {
const ids = [];
for (const to of recipients) {

View file

@ -151,7 +151,9 @@ export function formatCredentialInfo(resolveResult, providerId) {
? `[${resolveResult.value.length} chars]`
: "[empty]";
const warning = resolveResult.warning ? ` (warning: ${resolveResult.warning})` : "";
const warning = resolveResult.warning
? ` (warning: ${resolveResult.warning})`
: "";
return `${providerId}: ${valuePreview} from ${sourceLabel}${warning}`;
}