singularity-forge/src/resources/extensions/sf/metrics-central.js

692 lines
18 KiB
JavaScript
Raw Normal View History

/**
* Centralized Metrics Collector Unified metrics sink for all SF subsystems.
*
* Purpose: Replace scattered metrics emission (DB, Prometheus, stderr, JSONL)
* with a single collector that aggregates counters, gauges, and histograms,
* then exposes them in Prometheus text format AND persists to SQLite for
* queryable historical analysis.
*
* Consumer: /uok status, health widgets, external Prometheus scrapers,
* TUI cost/context overlay, and programmatic queries via sf-db.
*
* Design:
* - In-memory aggregation with configurable flush interval
* - Prometheus text format output (compatible with existing exposition)
* - SQLite persistence for historical queries (session-scoped)
* - Cost/token metrics alongside operational metrics
* - Retry with exponential backoff on flush failures
* - Zero external dependencies
*/
import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs";
import { join } from "node:path";
import { sfRoot } from "./paths.js";
import { logWarning } from "./workflow-logger.js";
const FLUSH_INTERVAL_MS = 60_000; // 1 minute
const MAX_HISTOGRAM_BUCKETS = 10;
const FLUSH_RETRY_MAX = 3;
const FLUSH_RETRY_BASE_MS = 1000;
const METRIC_NAME_PATTERN = /^[a-zA-Z_:][a-zA-Z0-9_:]*$/;
// ─── Metric Types ───────────────────────────────────────────────────────────
class Counter {
constructor(name, help, labelNames = []) {
this.name = name;
this.help = help;
this.labelNames = labelNames;
this.values = new Map(); // key → number
}
inc(labels = {}, amount = 1) {
const key = this._key(labels);
this.values.set(key, (this.values.get(key) ?? 0) + amount);
}
get(labels = {}) {
return this.values.get(this._key(labels)) ?? 0;
}
_key(labels) {
return _buildLabelKey(labels);
}
*lines() {
yield `# HELP ${this.name} ${this.help}`;
yield `# TYPE ${this.name} counter`;
for (const [key, value] of this.values) {
const labels = _parseLabelKey(key);
yield fmtLine(this.name, value, labels);
}
}
}
class Gauge {
constructor(name, help, labelNames = []) {
this.name = name;
this.help = help;
this.labelNames = labelNames;
this.values = new Map();
}
set(labels = {}, value) {
this.values.set(this._key(labels), value);
}
get(labels = {}) {
return this.values.get(this._key(labels)) ?? 0;
}
_key(labels) {
return _buildLabelKey(labels);
}
*lines() {
yield `# HELP ${this.name} ${this.help}`;
yield `# TYPE ${this.name} gauge`;
for (const [key, value] of this.values) {
const labels = _parseLabelKey(key);
yield fmtLine(this.name, value, labels);
}
}
}
class Histogram {
constructor(
name,
help,
buckets = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10],
) {
this.name = name;
this.help = help;
const capped = [...buckets]
.sort((a, b) => a - b)
.slice(0, MAX_HISTOGRAM_BUCKETS);
this.buckets = capped;
this.counts = new Map(); // bucket → count
this.sum = 0;
this.count = 0;
}
observe(value) {
this.sum += value;
this.count++;
for (const bucket of this.buckets) {
if (value <= bucket) {
this.counts.set(bucket, (this.counts.get(bucket) ?? 0) + 1);
}
}
}
*lines() {
yield `# HELP ${this.name} ${this.help}`;
yield `# TYPE ${this.name} histogram`;
for (const bucket of this.buckets) {
yield fmtLine(`${this.name}_bucket`, this.counts.get(bucket) ?? 0, {
le: String(bucket),
});
}
yield fmtLine(`${this.name}_bucket`, this.count, { le: "+Inf" });
yield fmtLine(`${this.name}_sum`, this.sum);
yield fmtLine(`${this.name}_count`, this.count);
}
}
// ─── Label Escaping ─────────────────────────────────────────────────────────
function _escapeLabel(v) {
return String(v)
.replace(/\\/g, "\\\\")
.replace(/=/g, "\\=")
.replace(/,/g, "\\,");
}
function _unescapeLabel(v) {
return v.replace(/\\,/g, ",").replace(/\\=/g, "=").replace(/\\\\/g, "\\");
}
// ─── Label Key Builder (escapes values, stable ordering) ────────────────────
function _buildLabelKey(labels) {
const keys = Object.keys(labels).sort();
return keys.map((k) => `${k}=${_escapeLabel(labels[k] ?? "")}`).join(",");
}
function _parseLabelKey(key) {
const labels = {};
let i = 0;
while (i < key.length) {
// Find the '=' separator for this label
const eqIdx = key.indexOf("=", i);
if (eqIdx === -1) break;
const k = key.slice(i, eqIdx);
// Parse the value, handling escapes
let v = "";
let j = eqIdx + 1;
while (j < key.length) {
const ch = key[j];
if (ch === "\\" && j + 1 < key.length) {
const next = key[j + 1];
if (next === "\\" || next === "=" || next === ",") {
v += next;
j += 2;
continue;
}
}
if (ch === ",") {
break;
}
v += ch;
j++;
}
labels[k] = v;
i = j + 1; // skip the ','
}
return labels;
}
// ─── Formatter ──────────────────────────────────────────────────────────────
function fmtLine(name, value, labels = {}) {
const labelStr = Object.entries(labels)
.map(([k, v]) => `${k}="${v}"`)
.join(",");
const suffix = labelStr ? `{${labelStr}}` : "";
return `${name}${suffix} ${value}`;
}
// ─── Validation ─────────────────────────────────────────────────────────────
function validateMetricName(name) {
if (!name || typeof name !== "string") {
throw new TypeError(
`Metric name must be a non-empty string, got: ${typeof name}`,
);
}
if (!METRIC_NAME_PATTERN.test(name)) {
throw new Error(
`Invalid metric name "${name}". Must match Prometheus naming convention: ` +
`^[a-zA-Z_:][a-zA-Z0-9_:]*$`,
);
}
}
// ─── Central Registry ───────────────────────────────────────────────────────
class MetricsRegistry {
counters = new Map();
gauges = new Map();
histograms = new Map();
_metadata = new Map();
counter(name, help, labelNames) {
if (!this.counters.has(name)) {
this.counters.set(name, new Counter(name, help, labelNames));
}
return this.counters.get(name);
}
gauge(name, help, labelNames) {
if (!this.gauges.has(name)) {
this.gauges.set(name, new Gauge(name, help, labelNames));
}
return this.gauges.get(name);
}
histogram(name, help, buckets) {
if (!this.histograms.has(name)) {
this.histograms.set(name, new Histogram(name, help, buckets));
}
return this.histograms.get(name);
}
buildText() {
const lines = [];
for (const c of this.counters.values()) {
lines.push(...c.lines());
}
for (const g of this.gauges.values()) {
lines.push(...g.lines());
}
for (const h of this.histograms.values()) {
lines.push(...h.lines());
}
return lines.join("\n") + "\n";
}
clear() {
this.counters.clear();
this.gauges.clear();
this.histograms.clear();
}
}
// ─── Singleton ──────────────────────────────────────────────────────────────
let _registry = null;
let _flushTimer = null;
let _basePath = "";
let _sessionId = "";
let _dbAdapter = null;
let _flushFailures = 0;
function getRegistry() {
if (!_registry) _registry = new MetricsRegistry();
return _registry;
}
function metricsFilePath(basePath) {
return join(sfRoot(basePath), "runtime", "sf-metrics.prom");
}
// ─── DB Persistence ─────────────────────────────────────────────────────────
function ensureMetricsTable(db) {
if (!db) return;
try {
db.exec(`
CREATE TABLE IF NOT EXISTS metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
type TEXT NOT NULL CHECK(type IN ('counter', 'gauge', 'histogram')),
labels TEXT,
value REAL NOT NULL,
timestamp TEXT NOT NULL DEFAULT (datetime('now')),
session_id TEXT
)
`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_metrics_name ON metrics(name)`);
db.exec(
`CREATE INDEX IF NOT EXISTS idx_metrics_session ON metrics(session_id)`,
);
db.exec(
`CREATE INDEX IF NOT EXISTS idx_metrics_timestamp ON metrics(timestamp)`,
);
} catch (err) {
logWarning("metrics-central", `DB table creation failed: ${err.message}`);
}
}
function persistMetricsToDb(registry, sessionId, db) {
if (!db) return;
ensureMetricsTable(db);
const ts = new Date().toISOString();
try {
const insert = db.prepare(
"INSERT INTO metrics (name, type, labels, value, timestamp, session_id) VALUES (?, ?, ?, ?, ?, ?)",
);
for (const c of registry.counters.values()) {
for (const [key, value] of c.values) {
const labels = c._parseKey(key);
insert.run(
c.name,
"counter",
JSON.stringify(labels),
value,
ts,
sessionId,
);
}
}
for (const g of registry.gauges.values()) {
for (const [key, value] of g.values) {
const labels = g._parseKey(key);
insert.run(
g.name,
"gauge",
JSON.stringify(labels),
value,
ts,
sessionId,
);
}
}
for (const h of registry.histograms.values()) {
insert.run(
h.name,
"histogram",
JSON.stringify({ count: h.count, sum: h.sum }),
h.sum,
ts,
sessionId,
);
}
} catch (err) {
logWarning("metrics-central", `DB persist failed: ${err.message}`);
}
}
// ─── Flush with Retry ───────────────────────────────────────────────────────
function flushMetrics() {
if (!_basePath) return;
try {
const text = getRegistry().buildText();
const path = metricsFilePath(_basePath);
mkdirSync(join(sfRoot(_basePath), "runtime"), { recursive: true });
writeFileSync(path, text, "utf-8");
// Also persist to DB if available
if (_dbAdapter) {
persistMetricsToDb(getRegistry(), _sessionId, _dbAdapter);
}
_flushFailures = 0;
} catch (err) {
_flushFailures++;
logWarning(
"metrics-central",
`Flush failed (attempt ${_flushFailures}): ${err.message}`,
);
if (_flushFailures < FLUSH_RETRY_MAX) {
const delay = FLUSH_RETRY_BASE_MS * 2 ** (_flushFailures - 1);
setTimeout(flushMetrics, delay);
} else {
// Record flush failure as a metric
try {
getRegistry()
.counter(
"sf_metrics_flush_failed_total",
"Total metrics flush failures",
[],
)
.inc({}, 1);
} catch {
// Best effort
}
}
}
}
// ─── Public API ─────────────────────────────────────────────────────────────
/**
* Initialize the centralized metrics system.
*
* @param {string} basePath project root
* @param {object} [opts] { flushIntervalMs, sessionId, dbAdapter }
*/
export function initMetricsCentral(basePath, opts = {}) {
_basePath = basePath;
_sessionId = opts.sessionId ?? "";
_dbAdapter = opts.dbAdapter ?? null;
const interval = opts.flushIntervalMs ?? FLUSH_INTERVAL_MS;
if (_flushTimer) clearInterval(_flushTimer);
_flushTimer = setInterval(flushMetrics, interval);
// Ensure timer doesn't keep process alive
if (_flushTimer.unref) _flushTimer.unref();
// Ensure DB table exists
if (_dbAdapter) {
ensureMetricsTable(_dbAdapter);
}
}
/**
* Stop the metrics collector.
*/
export function stopMetricsCentral() {
if (_flushTimer) {
clearInterval(_flushTimer);
_flushTimer = null;
}
// Final flush attempt
flushMetrics();
_basePath = "";
_sessionId = "";
_dbAdapter = null;
}
/**
* Record a counter increment.
*
* @param {string} name metric name (sf_ prefix recommended)
* @param {object} [labels] label key-value pairs
* @param {number} [amount] increment amount (default 1)
*/
export function recordCounter(name, labels = {}, amount = 1) {
validateMetricName(name);
const meta = getMetricMeta(name);
// Inject session_id into labels if available
if (_sessionId && !labels.session_id) {
labels = { ...labels, session_id: _sessionId };
}
getRegistry()
.counter(name, meta.help, Object.keys(labels))
.inc(labels, amount);
}
/**
* Record a gauge value.
*
* @param {string} name metric name
* @param {number} value gauge value
* @param {object} [labels] label key-value pairs
*/
export function recordGauge(name, value, labels = {}) {
validateMetricName(name);
const meta = getMetricMeta(name);
if (_sessionId && !labels.session_id) {
labels = { ...labels, session_id: _sessionId };
}
getRegistry().gauge(name, meta.help, Object.keys(labels)).set(labels, value);
}
/**
* Record a histogram observation.
*
* @param {string} name metric name
* @param {number} value observed value
*/
export function recordHistogram(name, value) {
validateMetricName(name);
const meta = getMetricMeta(name);
getRegistry().histogram(name, meta.help, meta.buckets).observe(value);
}
/**
* Record cost and token usage for a unit.
*
* @param {string} unitId unit identifier
* @param {string} modelId model identifier
* @param {number} inputTokens input token count
* @param {number} outputTokens output token count
* @param {number} cost cost in USD
* @param {string} [workMode] current work mode
*/
export function recordCost(
unitId,
modelId,
inputTokens,
outputTokens,
cost,
workMode = "",
) {
const labels = { unit_id: unitId, model_id: modelId };
if (workMode) labels.work_mode = workMode;
recordCounter("sf_cost_total", labels, cost);
recordCounter("sf_tokens_input_total", { model_id: modelId }, inputTokens);
recordCounter("sf_tokens_output_total", { model_id: modelId }, outputTokens);
recordGauge("sf_cost_last", cost, { unit_id: unitId, model_id: modelId });
}
/**
* Get current metrics text in Prometheus format.
*/
export function getMetricsText() {
return getRegistry().buildText();
}
/**
* Read persisted metrics from disk.
*/
export function readMetricsFile(basePath) {
const path = metricsFilePath(basePath);
if (!existsSync(path)) return null;
try {
return readFileSync(path, "utf-8");
} catch {
return null;
}
}
/**
* Query metrics from DB for a session.
*
* @param {object} db DB adapter
* @param {string} [sessionId] session to filter by
* @param {string} [name] metric name to filter by
* @param {number} [limit] max rows to return
* @returns {Array} metric rows
*/
export function queryMetrics(db, sessionId = null, name = null, limit = 1000) {
if (!db) return [];
try {
let sql = "SELECT * FROM metrics WHERE 1=1";
const params = [];
if (sessionId) {
sql += " AND session_id = ?";
params.push(sessionId);
}
if (name) {
sql += " AND name = ?";
params.push(name);
}
sql += " ORDER BY timestamp DESC LIMIT ?";
params.push(limit);
const stmt = db.prepare(sql);
return stmt.all(...params);
} catch (err) {
logWarning("metrics-central", `Query failed: ${err.message}`);
return [];
}
}
// ─── Metric Metadata Registry ───────────────────────────────────────────────
const METRIC_META = {
// Subagent inheritance
sf_subagent_dispatch_total: {
help: "Total subagent dispatch attempts",
labels: ["work_mode", "permission_profile"],
},
sf_subagent_dispatch_blocked: {
help: "Subagent dispatches blocked by inheritance policy",
labels: ["reason", "work_mode", "permission_profile"],
},
sf_subagent_dispatch_allowed: {
help: "Subagent dispatches allowed after inheritance check",
labels: ["work_mode", "permission_profile"],
},
// Mode transitions
sf_mode_transition_total: {
help: "Total mode transitions",
labels: ["axis", "from", "to", "reason"],
},
// Task frontmatter
sf_task_created_total: {
help: "Total tasks created with frontmatter",
labels: ["risk_level", "mutation_scope"],
},
sf_task_parallel_blocked: {
help: "Tasks blocked from parallel execution by frontmatter",
labels: ["reason"],
},
// Parallel intent
sf_parallel_intent_declared: {
help: "Parallel worker intents declared",
labels: ["milestone_id"],
},
sf_parallel_intent_conflict: {
help: "Parallel intent conflicts detected",
labels: ["milestone_id"],
},
// Remote steering
sf_remote_steering_applied: {
help: "Remote steering directives applied",
labels: ["directive_type", "source"],
},
sf_remote_steering_rejected: {
help: "Remote steering directives rejected (throttle/invalid)",
labels: ["reason"],
},
// Skill eval
sf_skill_eval_runs_total: {
help: "Total skill evaluation runs",
labels: ["skill_name", "passed"],
},
sf_skill_eval_duration_ms: {
help: "Skill evaluation duration in milliseconds",
buckets: [100, 500, 1000, 5000, 10000, 30000],
},
// Cost guard
sf_cost_guard_blocked: {
help: "Units blocked by cost guard",
labels: ["reason", "model_id"],
},
sf_cost_guard_hourly_spend: {
help: "Current hourly spend in USD",
},
// Gate runner
sf_gate_runs_total: {
help: "Total gate executions",
labels: ["gate_id", "outcome"],
},
sf_gate_latency_ms: {
help: "Gate execution latency in milliseconds",
buckets: [10, 50, 100, 250, 500, 1000, 2500, 5000],
},
// Message bus
sf_message_bus_messages_total: {
help: "Total messages in bus",
labels: ["agent_id"],
},
sf_message_bus_unread_total: {
help: "Unread messages in bus",
labels: ["agent_id"],
},
// Cost tracking
sf_cost_total: {
help: "Total cost in USD",
labels: ["unit_id", "model_id", "work_mode"],
},
sf_tokens_input_total: {
help: "Total input tokens",
labels: ["model_id"],
},
sf_tokens_output_total: {
help: "Total output tokens",
labels: ["model_id"],
},
sf_cost_last: {
help: "Last recorded cost in USD",
labels: ["unit_id", "model_id"],
},
// Internal
sf_metrics_flush_failed_total: {
help: "Total metrics flush failures",
},
};
function getMetricMeta(name) {
return METRIC_META[name] ?? { help: name, labels: [] };
}
/**
* Register custom metric metadata.
*/
export function registerMetricMeta(name, help, labels = [], buckets) {
METRIC_META[name] = { help, labels, buckets };
}