feat: persist SF runtime signals

This commit is contained in:
Mikael Hugo 2026-05-07 03:07:51 +02:00
parent f9334019cd
commit 4cefa6de2a
32 changed files with 1022 additions and 250 deletions

View file

@ -31,10 +31,7 @@ import {
import { getAgentDir } from "../config.js";
import { AUTH_LOCK_STALE_MS } from "./constants.js";
import { acquireLockAsync, acquireLockSyncWithRetry } from "./lock-utils.js";
import {
resolveConfigValue,
resolveConfigValueAsync,
} from "./resolve-config-value.js";
import { resolveConfigValueAsync } from "./resolve-config-value.js";
export type ApiKeyCredential = {
type: "api_key";

View file

@ -29,6 +29,8 @@ const DEFAULT_SOLVER_MAX_ITERATIONS = 30000;
const MIN_SOLVER_MAX_ITERATIONS = 1;
const MAX_SOLVER_MAX_ITERATIONS = 100000;
const DEFAULT_MISSING_CHECKPOINT_REPAIR_ATTEMPTS = 4;
const SOLVER_CHECKPOINT_SCHEMA_VERSION = 1;
const SOLVER_STEERING_SCHEMA_VERSION = 1;
function solverDir(basePath) {
return join(sfRoot(basePath), "runtime", "autonomous-solver");
@ -259,6 +261,7 @@ export function appendAutonomousSolverCheckpoint(basePath, params) {
readJson(statePath(basePath)) ??
beginAutonomousSolverIteration(basePath, params.unitType, params.unitId);
const checkpoint = {
schemaVersion: SOLVER_CHECKPOINT_SCHEMA_VERSION,
ts: nowIso(),
unitType: params.unitType,
unitId: params.unitId,
@ -532,6 +535,7 @@ export function appendAutonomousSolverSteering(basePath, text, metadata = {}) {
const trimmed = String(text ?? "").trim();
if (!trimmed) return null;
const entry = {
schemaVersion: SOLVER_STEERING_SCHEMA_VERSION,
id: `${Date.now()}-${Math.random().toString(16).slice(2)}`,
ts: nowIso(),
text: trimmed,
@ -550,7 +554,7 @@ function readSteeringEntries(basePath) {
.filter((line) => line.trim())
.map((line) => {
try {
return JSON.parse(line);
return normalizeSteeringEntry(JSON.parse(line));
} catch {
return null;
}
@ -561,6 +565,16 @@ function readSteeringEntries(basePath) {
}
}
function normalizeSteeringEntry(entry) {
if (!entry || typeof entry !== "object" || Array.isArray(entry)) return null;
const schemaVersion = entry.schemaVersion ?? SOLVER_STEERING_SCHEMA_VERSION;
if (schemaVersion !== SOLVER_STEERING_SCHEMA_VERSION) return null;
return {
...entry,
schemaVersion,
};
}
/**
* Consume pending steering exactly once for prompt injection.
*

View file

@ -22,6 +22,7 @@ import { sfRoot } from "./paths.js";
const _EMPTY_TODO = "# TODO\n\nDump anything here.\n";
const MAX_DUMP_CHARS = 48_000;
const TODO_TRIAGE_SCHEMA_VERSION = 1;
const PREFERRED_TRIAGE_MODEL_PATTERNS = [
/minimax.*m2\.7.*highspeed/i,
/minimax.*m2\.5.*highspeed/i,
@ -167,7 +168,11 @@ function renderEvalJsonl(result) {
return (
result.eval_candidates
.map((item) =>
JSON.stringify({ ...item, source: item.source ?? "TODO.md" }),
JSON.stringify({
schemaVersion: TODO_TRIAGE_SCHEMA_VERSION,
...item,
source: item.source ?? "TODO.md",
}),
)
.join("\n") + (result.eval_candidates.length > 0 ? "\n" : "")
);
@ -260,7 +265,16 @@ function detectRecurringPatterns(result) {
function renderSkillProposals(result) {
const proposals = detectRecurringPatterns(result);
if (proposals.length === 0) return "\n";
return proposals.map((p) => JSON.stringify(p)).join("\n") + "\n";
return (
proposals
.map((p) =>
JSON.stringify({
schemaVersion: TODO_TRIAGE_SCHEMA_VERSION,
...p,
}),
)
.join("\n") + "\n"
);
}
function backlogPath(basePath) {
return join(sfRoot(basePath), "WORK-QUEUE.md");
@ -278,6 +292,7 @@ function renderBacklogJsonl(items, triagedAt) {
items
.map((item) =>
JSON.stringify({
schemaVersion: TODO_TRIAGE_SCHEMA_VERSION,
id: item.id,
title: item.title,
source: "todo-triage",
@ -356,8 +371,14 @@ function normalizedItems(result, createdAt) {
function renderNormalizedJsonl(result, createdAt) {
const items = normalizedItems(result, createdAt);
return (
items.map((item) => JSON.stringify(item)).join("\n") +
(items.length ? "\n" : "")
items
.map((item) =>
JSON.stringify({
schemaVersion: TODO_TRIAGE_SCHEMA_VERSION,
...item,
}),
)
.join("\n") + (items.length ? "\n" : "")
);
}
function buildTriagePrompt(dump) {

View file

@ -1,6 +1,12 @@
// Re-exports for backward compatibility
export {
checkConfigHealth,
checkSmHealth,
checkTurnStatusHealth,
checkVaultHealth,
} from "./doctor-config-checks.js";
export { checkEngineHealth } from "./doctor-engine-checks.js";
export { checkGitHealth } from "./doctor-git-checks.js";
export { checkGlobalHealth } from "./doctor-global-checks.js";
export { checkRuntimeHealth } from "./doctor-runtime-checks.js";
export { checkConfigHealth, checkVaultHealth, checkSmHealth } from "./doctor-config-checks.js";

View file

@ -7,17 +7,15 @@
* Severity: varies (error for type mismatches, warning for out-of-range values).
* Fixable: varies (some are auto-fixable, others are user-config).
*/
import { loadEffectiveSFPreferences } from "./preferences.js";
import {
getContextCompactThreshold,
getContextHardLimit,
getUnitTimeout,
getMaxAgentsForPhase,
isTurnInputRequired,
getWorktreeMode,
getToolAbortGrace,
getMaxTurnsPerAttempt,
getHotCacheTurns,
getMaxTurnsPerAttempt,
getToolAbortGrace,
getUnitTimeout,
getWorktreeMode,
loadEffectiveSFPreferences,
} from "./preferences.js";
/**
@ -86,7 +84,10 @@ export async function checkConfigHealth(issues, fixesApplied, shouldFix) {
}
// ─── Phase-Specific Timeouts ────────────────────────────────────────
if (prefs.unit_timeout_by_phase && typeof prefs.unit_timeout_by_phase === "object") {
if (
prefs.unit_timeout_by_phase &&
typeof prefs.unit_timeout_by_phase === "object"
) {
const recognizedPhases = new Set([
"research",
"planning",
@ -95,7 +96,9 @@ export async function checkConfigHealth(issues, fixesApplied, shouldFix) {
"discussion",
"replan",
]);
for (const [phase, timeout] of Object.entries(prefs.unit_timeout_by_phase)) {
for (const [phase, timeout] of Object.entries(
prefs.unit_timeout_by_phase,
)) {
if (!recognizedPhases.has(phase)) {
issues.push({
severity: "warning",
@ -122,7 +125,10 @@ export async function checkConfigHealth(issues, fixesApplied, shouldFix) {
}
// ─── Max Agents Per Phase ────────────────────────────────────────────
if (prefs.max_agents_by_phase && typeof prefs.max_agents_by_phase === "object") {
if (
prefs.max_agents_by_phase &&
typeof prefs.max_agents_by_phase === "object"
) {
const recognizedPhases = new Set([
"research",
"planning",
@ -262,7 +268,7 @@ export async function checkConfigHealth(issues, fixesApplied, shouldFix) {
* - Invalid vault URIs
* - Vault connectivity issues
*/
export function checkVaultHealth(issues, shouldFix) {
export function checkVaultHealth(issues, _shouldFix) {
try {
// Check if any environment variables reference vault URIs
const vaultUris = [];
@ -381,7 +387,7 @@ export function checkVaultHealth(issues, shouldFix) {
* - SM unavailable or unhealthy
* - Optional (not required SF works fine locally)
*/
export async function checkSmHealth(issues, shouldFix) {
export async function checkSmHealth(issues, _shouldFix) {
try {
// Check if explicitly disabled
if (process.env.SM_ENABLED === "false") {
@ -420,8 +426,51 @@ export async function checkSmHealth(issues, shouldFix) {
file: "Singularity Memory server",
fixable: false,
});
} catch (err) {
} catch (_err) {
// Non-fatal; SM is optional
// Don't report errors in the check itself
}
}
/**
* Check turn_status marker coverage in prompts (Tier 2.5).
*
* Issues detected:
* - Executive prompts (execute-task, complete-slice, etc.) missing markers
* - Malformed markers or inconsistent format
*
* Severity: warning (agents can function without markers; markers enable pausing/reassess)
*/
export async function checkTurnStatusHealth(issues, _shouldFix) {
try {
// Lazy import to avoid optional dependency
let checkTurnStatusPrompts;
try {
const parser = await import("./turn-status-parser.js");
checkTurnStatusPrompts = parser.checkTurnStatusPrompts;
} catch {
// Parser not available; skip check
return;
}
if (!checkTurnStatusPrompts) return;
// Test marker coverage in SF root
const basePath = process.cwd();
const result = checkTurnStatusPrompts(basePath);
if (!result.allGood && result.issues.length > 0) {
issues.push({
severity: "warning",
code: "turn_status_incomplete",
scope: "project",
unitId: "turn_status",
message: `Turn status markers incomplete: ${result.issues.join(", ")}. Agents won't be able to signal blocked/giving_up state.`,
file: "src/resources/extensions/sf/prompts/",
fixable: false,
});
}
} catch (_err) {
// Non-fatal; turn_status is optional
}
}

View file

@ -12,13 +12,14 @@ import { dirname, extname, join } from "node:path";
import { parse as parseYaml } from "yaml";
import { invalidateAllCaches } from "./cache.js";
import {
checkConfigHealth,
checkEngineHealth,
checkGitHealth,
checkGlobalHealth,
checkRuntimeHealth,
checkConfigHealth,
checkVaultHealth,
checkSmHealth,
checkTurnStatusHealth,
checkVaultHealth,
} from "./doctor-checks.js";
import { checkEnvironmentHealth } from "./doctor-environment.js";
import { runProviderChecks } from "./doctor-providers.js";
@ -1419,6 +1420,8 @@ export async function runSFDoctor(basePath, options) {
checkVaultHealth(issues, shouldFix);
// Singularity Memory checks — Tier 1.2 optional federation
await checkSmHealth(issues, shouldFix);
// Turn status markers — Tier 2.5 agent semantic state signaling
await checkTurnStatusHealth(issues, shouldFix);
const milestonesPath = milestonesDir(basePath);
if (!existsSync(milestonesPath)) {
const report = {

View file

@ -27,6 +27,8 @@ import { sfRuntimeRoot } from "./paths.js";
import { buildAuditEnvelope, emitUokAuditEvent } from "./uok/audit.js";
import { isAuditEnvelopeEnabled } from "./uok/audit-toggle.js";
const JOURNAL_SCHEMA_VERSION = 1;
// Per-session dedup for journal write failures to prevent log flooding.
let _journalWriteFailureNotified = false;
// ─── Emit ─────────────────────────────────────────────────────────────────────
@ -43,6 +45,10 @@ export function emitJournalEvent(basePath, entry) {
// See auto/turn-epoch.ts for the full rationale.
if (isStaleWrite("journal")) return;
let writeError;
const persistedEntry = {
schemaVersion: JOURNAL_SCHEMA_VERSION,
...entry,
};
try {
const journalDir = join(sfRuntimeRoot(basePath), "journal");
mkdirSync(journalDir, { recursive: true });
@ -57,7 +63,7 @@ export function emitJournalEvent(basePath, entry) {
withFileLockSync(
filePath,
() => {
appendFileSync(filePath, JSON.stringify(entry) + "\n");
appendFileSync(filePath, JSON.stringify(persistedEntry) + "\n");
},
{ onLocked: "skip" },
);
@ -75,6 +81,7 @@ export function emitJournalEvent(basePath, entry) {
".write-failures.jsonl",
);
const warningEntry = {
schemaVersion: JOURNAL_SCHEMA_VERSION,
ts: new Date().toISOString(),
errorClass: writeError.constructor.name,
message: writeError.message,
@ -129,7 +136,8 @@ export function queryJournal(basePath, filters) {
for (const line of raw.split("\n")) {
if (!line.trim()) continue;
try {
const entry = JSON.parse(line);
const entry = normalizeJournalEntry(JSON.parse(line));
if (!entry) continue;
entries.push(entry);
} catch {
// Skip malformed lines
@ -151,3 +159,13 @@ export function queryJournal(basePath, filters) {
return [];
}
}
function normalizeJournalEntry(entry) {
if (!entry || typeof entry !== "object" || Array.isArray(entry)) return null;
const schemaVersion = entry.schemaVersion ?? JOURNAL_SCHEMA_VERSION;
if (schemaVersion !== JOURNAL_SCHEMA_VERSION) return null;
return {
...entry,
schemaVersion,
};
}

View file

@ -13,6 +13,9 @@
import { appendFileSync, existsSync, mkdirSync, readFileSync } from "node:fs";
import { join } from "node:path";
import { sfRuntimeRoot } from "./paths.js";
const JUDGMENT_LOG_SCHEMA_VERSION = 1;
/**
* Append a single judgment entry to the judgment log JSONL file.
* Creates the file and parent directories on first call.
@ -22,7 +25,11 @@ export function appendJudgment(basePath, entry) {
try {
const logPath = resolveJudgmentLogPath(basePath);
mkdirSync(join(logPath, ".."), { recursive: true });
const full = { ts: new Date().toISOString(), ...entry };
const full = {
schemaVersion: JUDGMENT_LOG_SCHEMA_VERSION,
ts: new Date().toISOString(),
...entry,
};
appendFileSync(logPath, JSON.stringify(full) + "\n", "utf-8");
} catch {
// Non-fatal — judgment logging must never break the agent loop
@ -45,7 +52,8 @@ export function readJudgmentLog(basePath, unitId) {
const trimmed = line.trim();
if (!trimmed) continue;
try {
const parsed = JSON.parse(trimmed);
const parsed = normalizeJudgmentEntry(JSON.parse(trimmed));
if (!parsed) continue;
if (unitId && !parsed.unitId.startsWith(unitId)) continue;
entries.push(parsed);
} catch {
@ -57,6 +65,17 @@ export function readJudgmentLog(basePath, unitId) {
return [];
}
}
function normalizeJudgmentEntry(entry) {
if (!entry || typeof entry !== "object" || Array.isArray(entry)) return null;
const schemaVersion = entry.schemaVersion ?? JUDGMENT_LOG_SCHEMA_VERSION;
if (schemaVersion !== JUDGMENT_LOG_SCHEMA_VERSION) return null;
return {
...entry,
schemaVersion,
};
}
/**
* Resolve the absolute path to the judgment log file.
*/

View file

@ -19,6 +19,8 @@ import {
} from "node:fs";
import { dirname, join } from "node:path";
const MODEL_FAILURE_LOG_SCHEMA_VERSION = 1;
/**
* Per-task-type model performance tracker.
*
@ -282,6 +284,7 @@ class FailureAnalyzer {
} = failure;
const entry = {
schemaVersion: MODEL_FAILURE_LOG_SCHEMA_VERSION,
timestamp,
taskType,
modelId,
@ -323,7 +326,9 @@ class FailureAnalyzer {
const lines = content.trim() ? content.trim().split("\n") : [];
const entries = [
...this.entries,
...lines.map((line) => JSON.parse(line)),
...lines
.map((line) => normalizeModelFailureEntry(JSON.parse(line)))
.filter(Boolean),
];
return this._summarizeEntries(taskType, modelId, entries);
} catch {
@ -367,6 +372,16 @@ class FailureAnalyzer {
}
}
function normalizeModelFailureEntry(entry) {
if (!entry || typeof entry !== "object" || Array.isArray(entry)) return null;
const schemaVersion = entry.schemaVersion ?? MODEL_FAILURE_LOG_SCHEMA_VERSION;
if (schemaVersion !== MODEL_FAILURE_LOG_SCHEMA_VERSION) return null;
return {
...entry,
schemaVersion,
};
}
/**
* Main API: Integrate model learning into dispatch workflow.
*

View file

@ -26,7 +26,12 @@ export const MODE_DEFAULTS = {
context_compact_at: 20000,
context_hard_limit: 35000,
unit_timeout: 300,
max_agents_by_phase: { research: 1, planning: 1, execution: 1, validation: 1 },
max_agents_by_phase: {
research: 1,
planning: 1,
execution: 1,
validation: 1,
},
turn_input_required: false,
worktree_mode: "auto",
tool_abort_grace: 5000,
@ -45,7 +50,12 @@ export const MODE_DEFAULTS = {
context_compact_at: 25000,
context_hard_limit: 40000,
unit_timeout: 300,
max_agents_by_phase: { research: 2, planning: 1, execution: 4, validation: 2 },
max_agents_by_phase: {
research: 2,
planning: 1,
execution: 4,
validation: 2,
},
turn_input_required: true,
worktree_mode: "manual",
tool_abort_grace: 8000,

View file

@ -1929,39 +1929,57 @@ export function validatePreferences(preferences) {
}
}
if (preferences.unit_timeout_by_phase !== undefined) {
if (typeof preferences.unit_timeout_by_phase === "object" && preferences.unit_timeout_by_phase !== null) {
if (
typeof preferences.unit_timeout_by_phase === "object" &&
preferences.unit_timeout_by_phase !== null
) {
const validatedPhaseTimeouts = {};
for (const [phase, timeout] of Object.entries(preferences.unit_timeout_by_phase)) {
for (const [phase, timeout] of Object.entries(
preferences.unit_timeout_by_phase,
)) {
const seconds = Number(timeout);
if (Number.isFinite(seconds) && seconds > 0) {
validatedPhaseTimeouts[phase] = Math.floor(seconds);
} else {
errors.push(`unit_timeout_by_phase.${phase} must be a positive number (seconds)`);
errors.push(
`unit_timeout_by_phase.${phase} must be a positive number (seconds)`,
);
}
}
if (Object.keys(validatedPhaseTimeouts).length > 0) {
validated.unit_timeout_by_phase = validatedPhaseTimeouts;
}
} else {
errors.push("unit_timeout_by_phase must be an object mapping phases to seconds");
errors.push(
"unit_timeout_by_phase must be an object mapping phases to seconds",
);
}
}
if (preferences.max_agents_by_phase !== undefined) {
if (typeof preferences.max_agents_by_phase === "object" && preferences.max_agents_by_phase !== null) {
if (
typeof preferences.max_agents_by_phase === "object" &&
preferences.max_agents_by_phase !== null
) {
const validatedAgents = {};
for (const [phase, count] of Object.entries(preferences.max_agents_by_phase)) {
for (const [phase, count] of Object.entries(
preferences.max_agents_by_phase,
)) {
const agents = Number(count);
if (Number.isFinite(agents) && agents >= 1) {
validatedAgents[phase] = Math.floor(agents);
} else {
errors.push(`max_agents_by_phase.${phase} must be a positive integer`);
errors.push(
`max_agents_by_phase.${phase} must be a positive integer`,
);
}
}
if (Object.keys(validatedAgents).length > 0) {
validated.max_agents_by_phase = validatedAgents;
}
} else {
errors.push("max_agents_by_phase must be an object mapping phases to agent counts");
errors.push(
"max_agents_by_phase must be an object mapping phases to agent counts",
);
}
}
if (preferences.turn_input_required !== undefined) {
@ -1983,7 +2001,9 @@ export function validatePreferences(preferences) {
if (Number.isFinite(ms) && ms >= 0) {
validated.tool_abort_grace = Math.floor(ms);
} else {
errors.push("tool_abort_grace must be a non-negative number (milliseconds)");
errors.push(
"tool_abort_grace must be a non-negative number (milliseconds)",
);
}
}
if (preferences.max_turns_per_attempt !== undefined) {

View file

@ -864,7 +864,8 @@ export function getMaxAgentsForPhase(phase) {
*/
export function isTurnInputRequired() {
const prefs = loadEffectiveSFPreferences()?.preferences;
if (prefs?.turn_input_required !== undefined) return prefs.turn_input_required;
if (prefs?.turn_input_required !== undefined)
return prefs.turn_input_required;
const mode = prefs?.mode ?? "solo";
return mode === "team";
}
@ -900,7 +901,8 @@ export function getToolAbortGrace() {
*/
export function getMaxTurnsPerAttempt() {
const prefs = loadEffectiveSFPreferences()?.preferences;
if (prefs?.max_turns_per_attempt !== undefined) return prefs.max_turns_per_attempt;
if (prefs?.max_turns_per_attempt !== undefined)
return prefs.max_turns_per_attempt;
const mode = prefs?.mode ?? "solo";
return mode === "team" ? 60 : 50;
}

View file

@ -22,6 +22,7 @@ import { sfRuntimeRoot } from "../paths.js";
// ─── Constants ──────────────────────────────────────────────────────────────
const FILENAME = "schedule.jsonl";
const SCHEDULE_SCHEMA_VERSION = 1;
/** @type {string} */
const _sfHome = process.env.SF_HOME || join(homedir(), ".sf");
@ -99,7 +100,14 @@ function _appendEntry(basePath, scope, entry) {
}
withFileLockSync(filePath, () => {
appendFileSync(filePath, JSON.stringify(entry) + "\n", "utf-8");
appendFileSync(
filePath,
JSON.stringify({
schemaVersion: SCHEDULE_SCHEMA_VERSION,
...entry,
}) + "\n",
"utf-8",
);
});
}
@ -132,7 +140,7 @@ function _readEntries(basePath, scope) {
if (!line.trim()) continue;
try {
/** @type {import("./schedule-types.js").ScheduleEntry} */
const entry = JSON.parse(line);
const entry = normalizeScheduleEntry(JSON.parse(line));
if (!entry || typeof entry.id !== "string") continue;
const existing = byId.get(entry.id);
@ -153,6 +161,16 @@ function _readEntries(basePath, scope) {
return Array.from(byId.values());
}
function normalizeScheduleEntry(entry) {
if (!entry || typeof entry !== "object" || Array.isArray(entry)) return null;
const schemaVersion = entry.schemaVersion ?? SCHEDULE_SCHEMA_VERSION;
if (schemaVersion !== SCHEDULE_SCHEMA_VERSION) return null;
return {
...entry,
schemaVersion,
};
}
/**
* Return pending entries whose due_at is at or before `now`, sorted by due_at ASC.
*

View file

@ -94,6 +94,7 @@
/**
* @typedef {object} ScheduleEntry
* @property {number} [schemaVersion] Schema contract version for persisted JSONL rows
* @property {string} id ULID monotonic, sortable, 28 chars
* @property {ScheduleKind} kind What kind of scheduled item this is
* @property {ScheduleStatus} status Current lifecycle status

View file

@ -78,7 +78,7 @@ function openRawDb(path) {
loadProvider();
return new DatabaseSync(path);
}
const SCHEMA_VERSION = 30;
const SCHEMA_VERSION = 31;
function indexExists(db, name) {
return !!db
.prepare(
@ -210,6 +210,37 @@ function ensureHeadlessRunTables(db) {
"CREATE INDEX IF NOT EXISTS idx_headless_runs_status ON headless_runs(status, created_at DESC)",
);
}
function ensureUokMessageTables(db) {
db.exec(`
CREATE TABLE IF NOT EXISTS uok_messages (
id TEXT PRIMARY KEY,
from_agent TEXT NOT NULL,
to_agent TEXT NOT NULL,
body TEXT NOT NULL DEFAULT '',
metadata_json TEXT NOT NULL DEFAULT '{}',
sent_at TEXT NOT NULL DEFAULT '',
delivered_at TEXT DEFAULT NULL
)
`);
db.exec(`
CREATE TABLE IF NOT EXISTS uok_message_reads (
message_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
read_at TEXT NOT NULL DEFAULT '',
PRIMARY KEY (message_id, agent_id),
FOREIGN KEY (message_id) REFERENCES uok_messages(id) ON DELETE CASCADE
)
`);
db.exec(
"CREATE INDEX IF NOT EXISTS idx_uok_messages_to ON uok_messages(to_agent, sent_at DESC)",
);
db.exec(
"CREATE INDEX IF NOT EXISTS idx_uok_messages_conversation ON uok_messages(from_agent, to_agent, sent_at DESC)",
);
db.exec(
"CREATE INDEX IF NOT EXISTS idx_uok_messages_sent ON uok_messages(sent_at DESC)",
);
}
function ensureSelfFeedbackTables(db) {
db.exec(`
CREATE TABLE IF NOT EXISTS self_feedback (
@ -703,6 +734,7 @@ function initSchema(db, fileBacked) {
ensureRepoProfileTables(db);
ensureSolverEvalTables(db);
ensureHeadlessRunTables(db);
ensureUokMessageTables(db);
db.exec(
`CREATE VIEW IF NOT EXISTS active_decisions AS SELECT * FROM decisions WHERE superseded_by IS NULL`,
);
@ -1655,6 +1687,15 @@ function migrateSchema(db) {
":applied_at": new Date().toISOString(),
});
}
if (currentVersion < 31) {
ensureUokMessageTables(db);
db.prepare(
"INSERT INTO schema_version (version, applied_at) VALUES (:version, :applied_at)",
).run({
":version": 31,
":applied_at": new Date().toISOString(),
});
}
db.exec("COMMIT");
} catch (err) {
db.exec("ROLLBACK");
@ -4433,6 +4474,150 @@ export function getDistinctGateIds() {
return [];
}
}
export function insertUokMessage(msg) {
if (!currentDb) return;
currentDb
.prepare(
`INSERT OR IGNORE INTO uok_messages (id, from_agent, to_agent, body, metadata_json, sent_at, delivered_at)
VALUES (:id, :from_agent, :to_agent, :body, :metadata_json, :sent_at, :delivered_at)`,
)
.run({
":id": msg.id,
":from_agent": msg.from,
":to_agent": msg.to,
":body": msg.body ?? "",
":metadata_json": JSON.stringify(msg.metadata ?? {}),
":sent_at": msg.sentAt,
":delivered_at": msg.deliveredAt ?? null,
});
}
export function getUokMessagesForAgent(
agentId,
limit = 1000,
unreadOnly = false,
) {
if (!currentDb) return [];
try {
let sql = `SELECT m.id, m.from_agent AS "from", m.to_agent AS "to", m.body, m.metadata_json AS metadataJson, m.sent_at AS sentAt, m.delivered_at AS deliveredAt,
CASE WHEN r.agent_id IS NOT NULL THEN 1 ELSE 0 END AS read
FROM uok_messages m
LEFT JOIN uok_message_reads r ON r.message_id = m.id AND r.agent_id = :agent_id
WHERE m.to_agent = :agent_id`;
if (unreadOnly) {
sql += " AND r.agent_id IS NULL";
}
sql += " ORDER BY m.sent_at ASC LIMIT :limit";
const rows = currentDb.prepare(sql).all({
":agent_id": agentId,
":limit": Math.max(1, Math.min(10_000, Number(limit) || 1000)),
});
return rows.map((r) => ({
id: r.id,
from: r.from,
to: r.to,
body: r.body,
metadata: parseJsonObject(r.metadataJson, {}),
sentAt: r.sentAt,
deliveredAt: r.deliveredAt,
read: !!r.read,
}));
} catch {
return [];
}
}
export function getUokConversation(agentA, agentB, limit = 1000) {
if (!currentDb) return [];
try {
const rows = currentDb
.prepare(
`SELECT id, from_agent AS "from", to_agent AS "to", body, metadata_json AS metadataJson, sent_at AS sentAt, delivered_at AS deliveredAt
FROM uok_messages
WHERE (from_agent = :a AND to_agent = :b) OR (from_agent = :b AND to_agent = :a)
ORDER BY sent_at DESC
LIMIT :limit`,
)
.all({ ":a": agentA, ":b": agentB, ":limit": limit });
return rows.map((r) => ({
id: r.id,
from: r.from,
to: r.to,
body: r.body,
metadata: parseJsonObject(r.metadataJson, {}),
sentAt: r.sentAt,
deliveredAt: r.deliveredAt,
}));
} catch {
return [];
}
}
export function markUokMessageRead(messageId, agentId) {
if (!currentDb) return false;
try {
currentDb
.prepare(
`INSERT OR IGNORE INTO uok_message_reads (message_id, agent_id, read_at) VALUES (:message_id, :agent_id, :read_at)`,
)
.run({
":message_id": messageId,
":agent_id": agentId,
":read_at": new Date().toISOString(),
});
return true;
} catch {
return false;
}
}
export function getUokMessageUnreadCount(agentId) {
if (!currentDb) return 0;
try {
const row = currentDb
.prepare(
`SELECT COUNT(*) AS cnt FROM uok_messages m
WHERE m.to_agent = :agent_id
AND NOT EXISTS (
SELECT 1 FROM uok_message_reads r
WHERE r.message_id = m.id AND r.agent_id = :agent_id
)`,
)
.get({ ":agent_id": agentId });
return row?.cnt ?? 0;
} catch {
return 0;
}
}
export function compactUokMessages(retentionDays) {
if (!currentDb) return { before: 0, after: 0 };
try {
const cutoff = new Date(
Date.now() - retentionDays * 24 * 60 * 60 * 1000,
).toISOString();
const beforeRow = currentDb
.prepare("SELECT COUNT(*) AS cnt FROM uok_messages")
.get();
currentDb
.prepare("DELETE FROM uok_messages WHERE sent_at < :cutoff")
.run({ ":cutoff": cutoff });
const afterRow = currentDb
.prepare("SELECT COUNT(*) AS cnt FROM uok_messages")
.get();
return { before: beforeRow?.cnt ?? 0, after: afterRow?.cnt ?? 0 };
} catch {
return { before: 0, after: 0 };
}
}
export function getUokMessageReadIds(agentId) {
if (!currentDb) return [];
try {
const rows = currentDb
.prepare(
"SELECT message_id FROM uok_message_reads WHERE agent_id = :agent_id",
)
.all({ ":agent_id": agentId });
return rows.map((r) => r.message_id);
} catch {
return [];
}
}
function asStringOrNull(value) {
return typeof value === "string" && value.length > 0 ? value : null;
}

View file

@ -23,7 +23,11 @@
export async function initializeSmClient() {
// Check if explicitly disabled
if (process.env.SM_ENABLED === "false") {
return { connected: false, version: null, reason: "disabled via SM_ENABLED=false" };
return {
connected: false,
version: null,
reason: "disabled via SM_ENABLED=false",
};
}
const addr = process.env.SINGULARITY_MEMORY_ADDR || "http://localhost:8080";
@ -80,7 +84,10 @@ export async function syncMemoryToSm(memory, opts = {}) {
return { queued: false, reason: "SM not connected" };
}
const addr = opts.smAddr || process.env.SINGULARITY_MEMORY_ADDR || "http://localhost:8080";
const addr =
opts.smAddr ||
process.env.SINGULARITY_MEMORY_ADDR ||
"http://localhost:8080";
// Queue sync in background (fire-and-forget)
setImmediate(async () => {
@ -122,7 +129,10 @@ export async function querySmMemories(query, opts = {}) {
return [];
}
const addr = opts.smAddr || process.env.SINGULARITY_MEMORY_ADDR || "http://localhost:8080";
const addr =
opts.smAddr ||
process.env.SINGULARITY_MEMORY_ADDR ||
"http://localhost:8080";
const limit = opts.limit || 5; // Cross-project recall limit (smaller than local)
try {

View file

@ -14,7 +14,7 @@
* - Session-end flush: before unit completes, pending syncs are flushed (best-effort)
*/
import { syncMemoryToSm, querySmMemories } from "./sm-client.js";
import { syncMemoryToSm } from "./sm-client.js";
/**
* Global sync scheduler state.
@ -189,7 +189,7 @@ async function trySyncWithRetry(item, attempt = 0) {
} catch (err) {
if (attempt < MAX_RETRIES) {
// Exponential backoff: 1s, 2s, 4s
const delayMs = BACKOFF_BASE_MS * Math.pow(2, attempt);
const delayMs = BACKOFF_BASE_MS * 2 ** attempt;
await new Promise((resolve) => setTimeout(resolve, delayMs));

View file

@ -8,7 +8,7 @@
* auto-mode dispatch must be able to select them.
*/
import { describe, expect, it, beforeEach, vi } from "vitest";
import { describe, expect, it } from "vitest";
// Mock extension models
const MOCK_EXTENSION_MODELS = [
@ -58,7 +58,7 @@ describe("Extension-Provided Models (gap-5)", () => {
it("extension_models_are_discoverable", () => {
const discovered = MOCK_PROVIDER.models.filter(
(m) => m.id === "claude-sonnet-4-6"
(m) => m.id === "claude-sonnet-4-6",
);
expect(discovered).toHaveLength(1);
});
@ -114,7 +114,7 @@ describe("Extension-Provided Models (gap-5)", () => {
it("can_filter_extension_models_by_capability", () => {
const needsReasoning = true;
const candidates = MOCK_PROVIDER.models.filter(
(m) => m.reasoning === needsReasoning
(m) => m.reasoning === needsReasoning,
);
expect(candidates.length).toBeGreaterThan(0);
});
@ -122,10 +122,10 @@ describe("Extension-Provided Models (gap-5)", () => {
it("can_filter_extension_models_by_context_window", () => {
const minContextWindow = 500_000;
const suitable = MOCK_PROVIDER.models.filter(
(m) => m.contextWindow >= minContextWindow
(m) => m.contextWindow >= minContextWindow,
);
expect(suitable).toContain(
MOCK_PROVIDER.models.find((m) => m.id === "claude-sonnet-4-6")
MOCK_PROVIDER.models.find((m) => m.id === "claude-sonnet-4-6"),
);
});
});
@ -145,7 +145,7 @@ describe("Extension-Provided Models (gap-5)", () => {
const preferredModels = ["claude-sonnet-4-6", "claude-haiku-4-5"];
const fallback = (unavailable) => {
return MOCK_PROVIDER.models.find(
(m) => preferredModels.includes(m.id) && !unavailable.includes(m.id)
(m) => preferredModels.includes(m.id) && !unavailable.includes(m.id),
);
};
@ -170,13 +170,20 @@ describe("Extension-Provided Models (gap-5)", () => {
const provider2 = {
name: "ollama",
models: [
{ id: "llama-2", name: "Llama 2", contextWindow: 4096, maxTokens: 2048 },
{
id: "llama-2",
name: "Llama 2",
contextWindow: 4096,
maxTokens: 2048,
},
],
};
const allModels = [...provider1.models, ...provider2.models];
expect(allModels).toHaveLength(3);
expect(allModels.filter((m) => m.id === "claude-sonnet-4-6")).toHaveLength(1);
expect(
allModels.filter((m) => m.id === "claude-sonnet-4-6"),
).toHaveLength(1);
expect(allModels.filter((m) => m.id === "llama-2")).toHaveLength(1);
});
});
@ -221,35 +228,31 @@ describe("Extension-Provided Models (gap-5)", () => {
];
const allowed = candidates.filter(
(m) =>
!preferences.blocked.includes(`${MOCK_PROVIDER.name}/${m.id}`)
(m) => !preferences.blocked.includes(`${MOCK_PROVIDER.name}/${m.id}`),
);
expect(allowed).toContain(
MOCK_PROVIDER.models.find((m) => m.id === "claude-sonnet-4-6")
MOCK_PROVIDER.models.find((m) => m.id === "claude-sonnet-4-6"),
);
});
it("extension_models_can_be_blocked", () => {
const blocklist = ["claude-code/claude-haiku-4-5"];
const filtered = MOCK_PROVIDER.models.filter(
(m) =>
!blocklist.includes(`${MOCK_PROVIDER.name}/${m.id}`)
(m) => !blocklist.includes(`${MOCK_PROVIDER.name}/${m.id}`),
);
expect(filtered.find((m) => m.id === "claude-sonnet-4-6")).toBeDefined();
expect(
filtered.find((m) => m.id === "claude-haiku-4-5")
).toBeUndefined();
expect(filtered.find((m) => m.id === "claude-haiku-4-5")).toBeUndefined();
});
it("can_route_to_extension_model_by_capability", () => {
const needsImage = true;
const _needsImage = true;
const suitable = MOCK_PROVIDER.models.filter((m) =>
m.input.includes("image")
m.input.includes("image"),
);
expect(suitable).toContain(
MOCK_PROVIDER.models.find((m) => m.id === "claude-sonnet-4-6")
MOCK_PROVIDER.models.find((m) => m.id === "claude-sonnet-4-6"),
);
});
});
@ -263,9 +266,7 @@ describe("Extension-Provided Models (gap-5)", () => {
it("gracefully_handles_extension_unavailable", () => {
const provider = { ...MOCK_PROVIDER, isReady: () => false };
const fallback = provider.isReady()
? provider.models
: [];
const fallback = provider.isReady() ? provider.models : [];
expect(fallback).toHaveLength(0);
});
@ -274,10 +275,10 @@ describe("Extension-Provided Models (gap-5)", () => {
const validate = (model) => {
return Boolean(
model.id &&
model.name &&
model.contextWindow &&
model.maxTokens &&
model.cost
model.name &&
model.contextWindow &&
model.maxTokens &&
model.cost,
);
};
@ -294,18 +295,18 @@ describe("Extension-Provided Models (gap-5)", () => {
...MOCK_PROVIDER.models.map((m) => ({
...m,
provider: "claude-code",
score: 0.80,
score: 0.8,
})),
];
expect(
candidates.find((m) => m.provider === "claude-code")
candidates.find((m) => m.provider === "claude-code"),
).toBeDefined();
expect(candidates).toHaveLength(3);
});
it("extension_model_scoring_works", () => {
const model = MOCK_PROVIDER.models[0];
const _model = MOCK_PROVIDER.models[0];
const score = 0.85; // hypothetical dispatch score
expect(score).toBeGreaterThan(0);

View file

@ -0,0 +1,361 @@
/**
* jsonl-schema-versioning.test.mjs - SF-owned JSONL contract markers.
*
* Purpose: prove append-only SF runtime logs carry explicit schema versions
* while legacy missing-version rows remain readable as version 1.
*/
import assert from "node:assert/strict";
import {
mkdirSync,
mkdtempSync,
readdirSync,
readFileSync,
rmSync,
writeFileSync,
} from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, describe, test } from "vitest";
import {
appendAutonomousSolverCheckpoint,
appendAutonomousSolverSteering,
beginAutonomousSolverIteration,
consumePendingAutonomousSolverSteering,
} from "../autonomous-solver.js";
import { triageTodoDump } from "../commands-todo.js";
import { emitJournalEvent, queryJournal } from "../journal.js";
import { appendJudgment, readJudgmentLog } from "../judgment-log.js";
import { ModelLearner } from "../model-learner.js";
import { createScheduleStore } from "../schedule/schedule-store.js";
import { buildAuditEnvelope, emitUokAuditEvent } from "../uok/audit.js";
import {
parseParityEvents,
writeParityHeartbeat,
} from "../uok/parity-report.js";
import { appendEvent, readEvents } from "../workflow-events.js";
import {
_resetLogs,
logError,
readAuditLog,
setLogBasePath,
} from "../workflow-logger.js";
const tmpDirs = [];
afterEach(() => {
setLogBasePath(null);
_resetLogs();
while (tmpDirs.length > 0) {
const dir = tmpDirs.pop();
if (dir) rmSync(dir, { recursive: true, force: true });
}
});
function makeProject() {
const dir = mkdtempSync(join(tmpdir(), "sf-jsonl-versioning-"));
tmpDirs.push(dir);
mkdirSync(join(dir, ".sf"), { recursive: true });
return dir;
}
function readJsonl(path) {
return readFileSync(path, "utf-8")
.trim()
.split("\n")
.filter(Boolean)
.map((line) => JSON.parse(line));
}
function makeScheduleEntry(overrides = {}) {
return {
id: "01HX0000000000000000000000",
kind: "reminder",
status: "pending",
due_at: "2026-05-07T00:00:00.000Z",
created_at: "2026-05-06T00:00:00.000Z",
payload: { message: "check" },
created_by: "user",
...overrides,
};
}
describe("SF JSONL schema versioning", () => {
test("schedule_store_writes_schema_version_and_reads_legacy_rows", () => {
const project = makeProject();
const store = createScheduleStore(project);
store.appendEntry("project", makeScheduleEntry());
const path = store._filePathForScope("project");
assert.equal(readJsonl(path)[0].schemaVersion, 1);
writeFileSync(
path,
`${JSON.stringify(makeScheduleEntry({ id: "legacy" }))}\n`,
"utf-8",
);
const [entry] = store.readEntries("project");
assert.equal(entry.id, "legacy");
assert.equal(entry.schemaVersion, 1);
});
test("journal_writes_schema_version_and_reads_legacy_rows", () => {
const project = makeProject();
emitJournalEvent(project, {
ts: "2026-05-07T00:00:00.000Z",
flowId: "flow-1",
seq: 1,
eventType: "iteration-start",
data: { unitId: "M001" },
});
const path = join(project, ".sf", "journal", "2026-05-07.jsonl");
assert.equal(readJsonl(path)[0].schemaVersion, 1);
writeFileSync(
path,
`${JSON.stringify({
ts: "2026-05-07T00:00:01.000Z",
flowId: "legacy-flow",
seq: 1,
eventType: "dispatch-match",
data: { unitId: "M002" },
})}\n`,
"utf-8",
);
const [entry] = queryJournal(project);
assert.equal(entry.schemaVersion, 1);
assert.equal(entry.flowId, "legacy-flow");
});
test("workflow_event_log_writes_schema_version_and_reads_legacy_rows", () => {
const project = makeProject();
appendEvent(project, { cmd: "plan", params: { milestoneId: "M001" } });
const path = join(project, ".sf", "event-log.jsonl");
assert.equal(readJsonl(path)[0].schemaVersion, 1);
writeFileSync(
path,
`${JSON.stringify({
v: 2,
cmd: "legacy",
params: { milestoneId: "M002" },
hash: "abc",
})}\n`,
"utf-8",
);
const [event] = readEvents(path);
assert.equal(event.schemaVersion, 1);
assert.equal(event.cmd, "legacy");
});
test("workflow_audit_log_writes_schema_version_and_reads_legacy_rows", () => {
const project = makeProject();
setLogBasePath(project);
logError("versioning-test", "audit failure", { id: "audit-1" });
const path = join(project, ".sf", "audit-log.jsonl");
assert.equal(readJsonl(path)[0].schemaVersion, 1);
writeFileSync(
path,
`${JSON.stringify({
ts: "2026-05-07T00:00:00.000Z",
severity: "error",
component: "legacy",
message: "legacy error",
})}\n`,
"utf-8",
);
const [entry] = readAuditLog(project);
assert.equal(entry.schemaVersion, 1);
assert.equal(entry.component, "legacy");
});
test("uok_audit_and_parity_logs_write_schema_version_and_read_legacy_parity", () => {
const project = makeProject();
emitUokAuditEvent(
project,
buildAuditEnvelope({
traceId: "trace-1",
category: "orchestration",
type: "test-event",
}),
);
assert.equal(
readJsonl(join(project, ".sf", "audit", "events.jsonl"))[0].schemaVersion,
1,
);
writeParityHeartbeat(project, {
ts: "2026-05-07T00:00:00.000Z",
runId: "run-1",
path: "uok-kernel",
phase: "enter",
});
assert.equal(
readJsonl(join(project, ".sf", "runtime", "uok-parity.jsonl"))[0]
.schemaVersion,
1,
);
const [legacy] = parseParityEvents(
`${JSON.stringify({
ts: "2026-05-07T00:00:01.000Z",
runId: "legacy-run",
path: "uok-kernel",
phase: "exit",
status: "ok",
})}\n`,
);
assert.equal(legacy.schemaVersion, 1);
assert.equal(legacy.runId, "legacy-run");
});
test("judgment_log_writes_schema_version_and_reads_legacy_rows", () => {
const project = makeProject();
appendJudgment(project, {
unitId: "M001/S01/T01",
confidence: "high",
decision: "keep file-backed projection",
});
const path = join(project, ".sf", "judgment-log.jsonl");
assert.equal(readJsonl(path)[0].schemaVersion, 1);
writeFileSync(
path,
`${JSON.stringify({
ts: "2026-05-07T00:00:00.000Z",
unitId: "M001/S01/T02",
confidence: "low",
decision: "legacy row",
})}\n`,
"utf-8",
);
const [entry] = readJudgmentLog(project, "M001");
assert.equal(entry.schemaVersion, 1);
assert.equal(entry.decision, "legacy row");
});
test("autonomous_solver_history_and_steering_write_schema_versions", () => {
const project = makeProject();
beginAutonomousSolverIteration(project, "execute-task", "M001/S01/T01");
appendAutonomousSolverCheckpoint(project, {
unitType: "execute-task",
unitId: "M001/S01/T01",
outcome: "continue",
summary: "versioned checkpoint",
completedItems: ["one"],
remainingItems: ["two"],
verificationEvidence: ["test"],
pdd: {
purpose: "prove schema marker",
consumer: "solver loop",
contract: "checkpoint row is versioned",
failureBoundary: "unversioned history drifts",
evidence: "jsonl row",
nonGoals: "no solver behavior change",
invariants: "legacy state remains readable",
assumptions: "local fs works",
},
});
assert.equal(
readJsonl(
join(
project,
".sf",
"runtime",
"autonomous-solver",
"iterations.jsonl",
),
)[0].schemaVersion,
1,
);
const steering = appendAutonomousSolverSteering(project, "prefer DB state");
assert.equal(steering.schemaVersion, 1);
const [consumed] = consumePendingAutonomousSolverSteering(project);
assert.equal(consumed.schemaVersion, 1);
});
test("model_failure_log_writes_schema_version_and_reads_legacy_rows", () => {
const project = makeProject();
const learner = new ModelLearner(project);
learner.logFailure("execute-task", "test-model", {
reason: "timeout",
timeout: true,
});
const path = join(project, ".sf", "model-failure-log.jsonl");
assert.equal(readJsonl(path)[0].schemaVersion, 1);
writeFileSync(
path,
`${JSON.stringify({
timestamp: "2026-05-07T00:00:00.000Z",
taskType: "execute-task",
modelId: "legacy-model",
reason: "quality",
})}\n`,
"utf-8",
);
const summary = learner.getFailureSummary("execute-task", "legacy-model");
assert.equal(summary.reasons.quality, 1);
});
test("todo_triage_jsonl_outputs_write_schema_versions", async () => {
const project = makeProject();
writeFileSync(join(project, "TODO.md"), "# TODO\n\nmake evals real\n");
const response = JSON.stringify({
summary: "triaged",
eval_candidates: [
{
id: "eval.one",
task_input: "run one",
expected_behavior: "passes",
evidence: "TODO.md",
},
{
id: "eval.two",
task_input: "run one",
expected_behavior: "passes again",
evidence: "TODO.md",
},
],
implementation_tasks: ["add the task"],
memory_requirements: ["remember the rule"],
harness_suggestions: ["add harness"],
docs_or_tests: ["document it"],
unclear_notes: [],
});
const result = await triageTodoDump(project, async () => response, {
clear: false,
backlog: true,
date: new Date("2026-05-07T01:02:03.000Z"),
});
const backlogDir = join(project, ".sf", "triage", "backlog");
const [backlogFile] = readdirSync(backlogDir).filter((file) =>
file.endsWith(".jsonl"),
);
for (const path of [
result.evalJsonlPath,
result.normalizedJsonlPath,
result.skillJsonlPath,
join(backlogDir, backlogFile),
]) {
const rows = readJsonl(path);
assert.ok(rows.length > 0, `${path} should contain rows`);
for (const row of rows) assert.equal(row.schemaVersion, 1);
}
});
});

View file

@ -4,17 +4,17 @@
* Validates that lifecycle hooks properly flush memory syncs at unit/session completion.
*/
import { describe, it, expect, beforeEach, afterEach } from "vitest";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import {
flushProjectMemorySync,
flushAllProjectsMemorySync,
onUnitTerminal,
flushProjectMemorySync,
onSessionEnd,
onUnitTerminal,
} from "../lifecycle-hooks.js";
import {
getSyncStatus,
queueMemorySync,
resetAllSchedulers,
getSyncStatus,
} from "../sync-scheduler.js";
describe("Lifecycle Hooks (Tier 1.2 Phase 3b)", () => {

View file

@ -9,7 +9,7 @@
* - Environment variable controls
*/
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
describe("SM Client", () => {
let originalEnv: Record<string, string | undefined>;

View file

@ -9,14 +9,14 @@
* - Degrades gracefully when SM unavailable
*/
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import {
queueMemorySync,
clearSyncQueue,
flushSyncQueue,
getSyncStatus,
clearSyncQueue,
resetScheduler,
queueMemorySync,
resetAllSchedulers,
resetScheduler,
} from "../sync-scheduler.js";
describe("Sync Scheduler (Tier 1.2 Phase 2)", () => {
@ -142,7 +142,7 @@ describe("Sync Scheduler (Tier 1.2 Phase 2)", () => {
]);
// One flush should process items, other should see already_flushing
const totalProcessed = result1.synced + result2.synced;
const _totalProcessed = result1.synced + result2.synced;
// First flush processes 10, second sees already_flushing
expect(result1.synced + result1.failed).toBeGreaterThanOrEqual(0);

View file

@ -5,16 +5,49 @@
* from agent output.
*/
import { describe, it, expect } from "vitest";
import { mkdirSync, mkdtempSync, rmSync, writeFileSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import {
checkTurnStatusPrompts,
describeTurnStatus,
extractTurnStatus,
isValidTurnStatus,
describeTurnStatus,
resolveSignalFromStatus,
parseTurnStatusFull,
checkTurnStatusPrompts,
resolveSignalFromStatus,
} from "../turn-status-parser.js";
const tmpRoots: string[] = [];
afterEach(() => {
for (const dir of tmpRoots.splice(0)) {
rmSync(dir, { recursive: true, force: true });
}
});
function makePromptRoot() {
const root = mkdtempSync(join(tmpdir(), "sf-turn-status-"));
tmpRoots.push(root);
const prompts = join(root, "src/resources/extensions/sf/prompts");
mkdirSync(prompts, { recursive: true });
for (const name of [
"execute-task.md",
"complete-slice.md",
"research-slice.md",
"plan-slice.md",
"research-milestone.md",
"plan-milestone.md",
]) {
writeFileSync(
join(prompts, name),
"Finish with <turn_status>complete</turn_status>\n",
"utf-8",
);
}
return root;
}
describe("Turn Status Parser (Tier 2.5)", () => {
describe("extractTurnStatus", () => {
it("when_complete_marker_present_extracts_status", () => {
@ -63,8 +96,7 @@ describe("Turn Status Parser (Tier 2.5)", () => {
});
it("handles_whitespace_around_marker", () => {
const output =
"Done.\n\n\n<turn_status>complete</turn_status> \n\n";
const output = "Done.\n\n\n<turn_status>complete</turn_status> \n\n";
const result = extractTurnStatus(output);
@ -97,14 +129,13 @@ describe("Turn Status Parser (Tier 2.5)", () => {
expect(result.cleanOutput).toBe(123);
});
it("finds_marker_only_at_end", () => {
it("ignores_marker_not_at_end", () => {
const output =
"Found <turn_status>complete</turn_status> in middle\nmore text here";
const result = extractTurnStatus(output);
// Should still extract (regex doesn't require end-of-string)
expect(result.status).toBe("complete");
expect(result.status).toBeNull();
});
});
@ -193,8 +224,7 @@ describe("Turn Status Parser (Tier 2.5)", () => {
});
it("extracts_and_resolves_blocked_marker", () => {
const output =
"Cannot find file.\n\n<turn_status>blocked</turn_status>";
const output = "Cannot find file.\n\n<turn_status>blocked</turn_status>";
const result = parseTurnStatusFull(output);
@ -236,15 +266,15 @@ describe("Turn Status Parser (Tier 2.5)", () => {
});
describe("checkTurnStatusPrompts", () => {
it("validates_marker_presence_in_prompts", () => {
// This test is informational; real validation requires file access
const result = checkTurnStatusPrompts(
"/home/mhugo/code/singularity-forge",
);
it("validates_marker_presence_in_prompts_without_commonjs_require", () => {
const result = checkTurnStatusPrompts(makePromptRoot());
expect(result).toHaveProperty("issues");
expect(result).toHaveProperty("allGood");
expect(result).toHaveProperty("promptsChecked");
expect(result.allGood).toBe(true);
expect(result.promptsChecked).toBe(6);
expect(result.issues).toEqual([]);
});
});
@ -282,13 +312,11 @@ describe("Turn Status Parser (Tier 2.5)", () => {
expectedAction: "continue",
},
{
output:
"Blocker found.\n\n<turn_status>blocked</turn_status>",
output: "Blocker found.\n\n<turn_status>blocked</turn_status>",
expectedAction: "pause",
},
{
output:
"Out of ideas.\n\n<turn_status>giving_up</turn_status>",
output: "Out of ideas.\n\n<turn_status>giving_up</turn_status>",
expectedAction: "reassess",
},
{

View file

@ -1,13 +1,15 @@
import assert from "node:assert/strict";
import { mkdirSync, mkdtempSync, rmSync, writeFileSync } from "node:fs";
import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, test } from "vitest";
import { closeDatabase, insertUokMessage } from "../sf-db.js";
import { AgentInbox, MessageBus } from "../uok/message-bus.js";
const tmpRoots = [];
afterEach(() => {
closeDatabase();
for (const dir of tmpRoots.splice(0)) {
rmSync(dir, { recursive: true, force: true });
}
@ -56,6 +58,20 @@ test("messageBus_persists_across_reconstruction", () => {
assert.equal(messages[0].body, "persistent");
});
test("messageBus_send_when_inbox_already_hydrated_does_not_double_insert", () => {
const root = makeProject();
const bus = new MessageBus(root);
const inbox = bus.getInbox("agent-b");
const id = bus.send("agent-a", "agent-b", "after-hydrate");
assert.equal(inbox.list().length, 1);
assert.equal(inbox.list()[0].id, id);
assert.equal(inbox.list()[0].body, "after-hydrate");
const bus2 = new MessageBus(root);
assert.equal(bus2.getInbox("agent-b").list().length, 1);
});
test("messageBus_markRead_updates_state_and_persists", () => {
const root = makeProject();
const bus = new MessageBus(root);
@ -76,9 +92,7 @@ test("messageBus_markRead_updates_state_and_persists", () => {
test("messageBus_compact_removes_old_messages", () => {
const root = makeProject();
const bus = new MessageBus(root, { retentionDays: 1 });
// Send an old message by manually appending to JSONL
const path = join(root, ".sf", "runtime", "uok-messages.jsonl");
mkdirSync(join(root, ".sf", "runtime"), { recursive: true });
// Insert an old message directly into the DB
const oldMsg = {
id: "msg-old",
from: "agent-a",
@ -86,8 +100,9 @@ test("messageBus_compact_removes_old_messages", () => {
body: "old",
sentAt: new Date(Date.now() - 2 * 24 * 60 * 60 * 1000).toISOString(),
deliveredAt: new Date(Date.now() - 2 * 24 * 60 * 60 * 1000).toISOString(),
metadata: {},
};
writeFileSync(path, `${JSON.stringify(oldMsg)}\n`, "utf-8");
insertUokMessage(oldMsg);
const result = bus.compact();
assert.equal(result.after, 0);

View file

@ -3,14 +3,12 @@
*
* Tests URI parsing, auth chain, caching, and fallback behavior.
*/
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
parseVaultUri,
resolveVaultToken,
resolveVaultSecret,
isVaultUri,
clearVaultCache,
getVaultCacheStats,
isVaultUri,
parseVaultUri,
resolveSecret,
} from "../vault-resolver.js";
@ -28,9 +26,7 @@ describe("Vault Secret Resolver", () => {
describe("parseVaultUri", () => {
it("parses valid vault:// URIs", () => {
const result = parseVaultUri(
"vault://secret/anthropic/prod#api_key",
);
const result = parseVaultUri("vault://secret/anthropic/prod#api_key");
expect(result).toHaveProperty("path", "secret/anthropic/prod");
expect(result).toHaveProperty("field", "api_key");
expect(result).toHaveProperty("vaultAddr");
@ -39,7 +35,7 @@ describe("Vault Secret Resolver", () => {
it("returns error for missing scheme", () => {
const result = parseVaultUri("secret/anthropic/prod#api_key");
expect(result).toHaveProperty("error");
expect(result.error).toContain('vault://');
expect(result.error).toContain("vault://");
});
it("returns error for missing fragment", () => {
@ -92,9 +88,7 @@ describe("Vault Secret Resolver", () => {
it("fails open with vault:// URIs when vault unavailable", async () => {
// Mock fetch to reject (vault unavailable)
global.fetch = vi.fn().mockRejectedValue(
new Error("Connection refused"),
);
global.fetch = vi.fn().mockRejectedValue(new Error("Connection refused"));
const result = await resolveSecret("vault://secret/path#field", {
failOpen: true,
@ -108,9 +102,7 @@ describe("Vault Secret Resolver", () => {
it("throws in strict mode if vault unavailable", async () => {
// Mock fetch to reject
global.fetch = vi.fn().mockRejectedValue(
new Error("Connection refused"),
);
global.fetch = vi.fn().mockRejectedValue(new Error("Connection refused"));
await expect(
resolveSecret("vault://secret/path#field", {
@ -146,9 +138,7 @@ describe("Vault Secret Resolver", () => {
});
it("handles paths with multiple segments", () => {
const result = parseVaultUri(
"vault://secret/deeply/nested/path#field",
);
const result = parseVaultUri("vault://secret/deeply/nested/path#field");
expect(result.path).toBe("secret/deeply/nested/path");
expect(result.field).toBe("field");
});

View file

@ -25,8 +25,9 @@ export function extractTurnStatus(output) {
return { status: null, cleanOutput: output };
}
// Look for marker at end of output (allow whitespace)
const markerRegex = /<turn_status>(complete|blocked|giving_up)<\/turn_status>/i;
// Look for marker at end of output (allow trailing whitespace only).
const markerRegex =
/<turn_status>(complete|blocked|giving_up)<\/turn_status>\s*$/i;
const match = output.match(markerRegex);
@ -177,9 +178,7 @@ export function checkTurnStatusPrompts(sfRoot) {
const content = readFileSync(promptPath, "utf8");
if (!content.includes("<turn_status>")) {
issues.push(
`Prompt ${prompt} missing turn_status marker template`,
);
issues.push(`Prompt ${prompt} missing turn_status marker template`);
}
}

View file

@ -12,6 +12,8 @@ import { withFileLockSync } from "../file-lock.js";
import { sfRuntimeRoot } from "../paths.js";
import { insertAuditEvent, isDbAvailable } from "../sf-db.js";
const UOK_AUDIT_SCHEMA_VERSION = 1;
function auditLogPath(basePath) {
return join(sfRuntimeRoot(basePath), "audit", "events.jsonl");
}
@ -20,6 +22,7 @@ function ensureAuditDir(basePath) {
}
export function buildAuditEnvelope(args) {
return {
schemaVersion: UOK_AUDIT_SCHEMA_VERSION,
eventId: randomUUID(),
traceId: args.traceId,
turnId: args.turnId,

View file

@ -2,81 +2,36 @@
* UOK Durable Message Bus & Agent Inbox
*
* Purpose: implement Letta-style inter-agent communication with at-least-once
* delivery guarantees via append-only JSONL. Messages survive process restarts
* and are retained with configurable TTL.
* delivery guarantees via SQLite. Messages survive process restarts and are
* retained with configurable TTL.
*
* Consumer: multi-agent orchestration, cross-turn coordination, and UOK kernel
* observer chains.
*/
import { randomUUID } from "node:crypto";
import {
appendFileSync,
existsSync,
mkdirSync,
readFileSync,
writeFileSync,
} from "node:fs";
import { mkdirSync } from "node:fs";
import { join } from "node:path";
import { sfRoot } from "../paths.js";
import {
compactUokMessages,
getUokConversation,
getUokMessageReadIds,
getUokMessagesForAgent,
insertUokMessage,
isDbAvailable,
markUokMessageRead,
openDatabase,
} from "../sf-db.js";
const DEFAULT_RETENTION_DAYS = 7;
const DEFAULT_MAX_INBOX_SIZE = 1000;
function messagesPath(basePath) {
return join(sfRoot(basePath), "runtime", "uok-messages.jsonl");
}
function inboxStatePath(basePath, agentId) {
const sanitized = agentId.replace(/[^a-zA-Z0-9_-]/g, "_");
return join(sfRoot(basePath), "runtime", `uok-inbox-${sanitized}.json`);
}
function loadMessages(basePath) {
const path = messagesPath(basePath);
if (!existsSync(path)) return [];
const lines = readFileSync(path, "utf-8").split(/\r?\n/).filter(Boolean);
const results = [];
for (const line of lines) {
try {
results.push(JSON.parse(line));
} catch {
// Skip malformed lines
}
}
return results;
}
function appendMessage(basePath, message) {
const path = messagesPath(basePath);
const dir = join(sfRoot(basePath), "runtime");
function ensureDb(basePath) {
if (isDbAvailable()) return;
const dir = sfRoot(basePath);
mkdirSync(dir, { recursive: true });
appendFileSync(path, `${JSON.stringify(message)}\n`, "utf-8");
}
function loadInboxState(basePath, agentId) {
const path = inboxStatePath(basePath, agentId);
if (!existsSync(path)) return { readIds: [] };
try {
return JSON.parse(readFileSync(path, "utf-8"));
} catch {
return { readIds: [] };
}
}
function saveInboxState(basePath, agentId, state) {
const path = inboxStatePath(basePath, agentId);
const dir = join(sfRoot(basePath), "runtime");
mkdirSync(dir, { recursive: true });
writeFileSync(path, `${JSON.stringify(state)}\n`, "utf-8");
}
function pruneOldMessages(messages, retentionDays) {
const cutoff = Date.now() - retentionDays * 24 * 60 * 60 * 1000;
return messages.filter((m) => {
const ts = m.sentAt ? Date.parse(m.sentAt) : 0;
return ts >= cutoff;
});
openDatabase(join(dir, "sf.db"));
}
export class AgentInbox {
@ -85,20 +40,23 @@ export class AgentInbox {
this.basePath = basePath;
this.maxSize = options.maxInboxSize ?? DEFAULT_MAX_INBOX_SIZE;
this.retentionDays = options.retentionDays ?? DEFAULT_RETENTION_DAYS;
this._state = loadInboxState(basePath, agentId);
ensureDb(basePath);
this._messages = this._hydrate();
}
_hydrate() {
const all = loadMessages(this.basePath);
const readIds = new Set(this._state.readIds ?? []);
const forMe = all
.filter((m) => m.to === this.agentId)
.map((m) => ({
...m,
read: readIds.has(m.id),
}));
return pruneOldMessages(forMe, this.retentionDays).slice(-this.maxSize);
const messages = getUokMessagesForAgent(this.agentId, this.maxSize, false);
const readIds = new Set(getUokMessageReadIds(this.agentId));
const withRead = messages.map((m) => ({
...m,
read: readIds.has(m.id),
}));
const cutoff = Date.now() - this.retentionDays * 24 * 60 * 60 * 1000;
const recent = withRead.filter((m) => {
const ts = m.sentAt ? Date.parse(m.sentAt) : 0;
return ts >= cutoff;
});
return recent.slice(-this.maxSize);
}
receive(message) {
@ -107,6 +65,15 @@ export class AgentInbox {
receivedAt: new Date().toISOString(),
read: false,
};
insertUokMessage({
id: enriched.id,
from: enriched.from,
to: enriched.to ?? this.agentId,
body: enriched.body,
metadata: enriched.metadata,
sentAt: enriched.sentAt ?? enriched.receivedAt,
deliveredAt: enriched.deliveredAt ?? enriched.receivedAt,
});
this._messages.push(enriched);
if (this._messages.length > this.maxSize) {
this._messages = this._messages.slice(-this.maxSize);
@ -124,10 +91,7 @@ export class AgentInbox {
const msg = this._messages.find((m) => m.id === messageId);
if (msg) {
msg.read = true;
if (!this._state.readIds.includes(messageId)) {
this._state.readIds.push(messageId);
saveInboxState(this.basePath, this.agentId, this._state);
}
markUokMessageRead(messageId, this.agentId);
}
return !!msg;
}
@ -147,7 +111,7 @@ export class MessageBus {
this.retentionDays = options.retentionDays ?? DEFAULT_RETENTION_DAYS;
this.maxInboxSize = options.maxInboxSize ?? DEFAULT_MAX_INBOX_SIZE;
this.inboxes = new Map();
this._messages = loadMessages(basePath);
ensureDb(basePath);
}
_getOrCreateInbox(agentId) {
@ -174,8 +138,7 @@ export class MessageBus {
deliveredAt: new Date().toISOString(),
};
appendMessage(this.basePath, message);
this._messages.push(message);
insertUokMessage(message);
const targetInbox = this._getOrCreateInbox(to);
const alreadyHas = targetInbox.list().some((m) => m.id === message.id);
@ -199,30 +162,10 @@ export class MessageBus {
}
getConversation(agentA, agentB) {
const all = loadMessages(this.basePath);
return pruneOldMessages(
all.filter(
(m) =>
(m.from === agentA && m.to === agentB) ||
(m.from === agentB && m.to === agentA),
),
this.retentionDays,
);
return getUokConversation(agentA, agentB, this.maxInboxSize);
}
compact() {
const path = messagesPath(this.basePath);
const all = loadMessages(this.basePath);
const pruned = pruneOldMessages(all, this.retentionDays);
const dir = join(sfRoot(this.basePath), "runtime");
mkdirSync(dir, { recursive: true });
writeFileSync(
path,
pruned.map((m) => JSON.stringify(m)).join("\n") +
(pruned.length ? "\n" : ""),
"utf-8",
);
this._messages = pruned;
return { before: all.length, after: pruned.length };
return compactUokMessages(this.retentionDays);
}
}

View file

@ -10,6 +10,7 @@ import { join } from "node:path";
import { sfRoot } from "../paths.js";
import { getUokRuns, isDbAvailable, recordUokRunExit } from "../sf-db.js";
const UOK_PARITY_SCHEMA_VERSION = 1;
export const UNMATCHED_RUN_STALE_MS = 30 * 60 * 1000;
function parityLogPath(basePath) {
@ -75,7 +76,8 @@ export function parseParityEvents(raw) {
.filter((line) => line.trim().length > 0)
.map((line) => {
try {
const parsed = JSON.parse(line);
const parsed = normalizeParityEvent(JSON.parse(line));
if (!parsed) return null;
if (isParityDiffEvent(parsed)) return parsed;
return parsed;
} catch {
@ -84,7 +86,17 @@ export function parseParityEvents(raw) {
error: "invalid parity json line",
};
}
});
})
.filter(Boolean);
}
function normalizeParityEvent(event) {
if (!event || typeof event !== "object" || Array.isArray(event)) return null;
const schemaVersion = event.schemaVersion ?? UOK_PARITY_SCHEMA_VERSION;
if (schemaVersion !== UOK_PARITY_SCHEMA_VERSION) return null;
return {
...event,
schemaVersion,
};
}
export function buildParityReport(
events,
@ -240,6 +252,7 @@ export function buildParityReport(
);
}
return {
schemaVersion: UOK_PARITY_SCHEMA_VERSION,
generatedAt: new Date(nowMs).toISOString(),
sourcePath,
totalEvents: events.length,
@ -340,7 +353,11 @@ export function writeParityDiff(basePath, event) {
try {
mkdirSync(join(sfRoot(basePath), "runtime"), { recursive: true });
const logPath = parityLogPath(basePath);
appendFileSync(logPath, `${JSON.stringify(event)}\n`, "utf-8");
appendFileSync(
logPath,
`${JSON.stringify({ schemaVersion: UOK_PARITY_SCHEMA_VERSION, ...event })}\n`,
"utf-8",
);
} catch {
// Best-effort: diff emission must never break orchestration.
}
@ -359,7 +376,7 @@ export function writeParityHeartbeat(basePath, event) {
mkdirSync(join(sfRoot(basePath), "runtime"), { recursive: true });
appendFileSync(
parityLogPath(basePath),
`${JSON.stringify(event)}\n`,
`${JSON.stringify({ schemaVersion: UOK_PARITY_SCHEMA_VERSION, ...event })}\n`,
"utf-8",
);
} catch {

View file

@ -19,7 +19,6 @@
* No persistent cache (vault secrets can change).
*/
import { existsSync, readFileSync } from "node:fs";
import { expand } from "node:path";
import { homedir } from "node:os";
/**
@ -50,7 +49,7 @@ export function parseVaultUri(uri) {
if (!pathPart || !fieldPart) {
return {
error:
'Invalid vault URI format. Expected: vault://path/to/secret#fieldname',
"Invalid vault URI format. Expected: vault://path/to/secret#fieldname",
};
}
@ -61,8 +60,7 @@ export function parseVaultUri(uri) {
return { error: "Path and field must be non-empty" };
}
const vaultAddr =
process.env.VAULT_ADDR || "http://127.0.0.1:8200";
const vaultAddr = process.env.VAULT_ADDR || "http://127.0.0.1:8200";
const token = resolveVaultToken();
return { path, field, vaultAddr, token };
@ -154,7 +152,10 @@ async function fetchVaultSecret(path, vaultAddr, token) {
*
* Caches results in memory for TTL (default 5 minutes).
*/
export async function resolveVaultSecret(uri, cacheTtlMs = DEFAULT_CACHE_TTL_MS) {
export async function resolveVaultSecret(
uri,
cacheTtlMs = DEFAULT_CACHE_TTL_MS,
) {
// Check memory cache first
const cached = secretCache.get(uri);
if (cached && Date.now() - cached.timestamp < cacheTtlMs) {

View file

@ -6,6 +6,8 @@ import { withFileLockSync } from "./file-lock.js";
import { sfRuntimeRoot } from "./paths.js";
import { logWarning } from "./workflow-logger.js";
const WORKFLOW_EVENT_SCHEMA_VERSION = 1;
// ─── Session ID ───────────────────────────────────────────────────────────
/**
* Engine-generated session ID stable for the lifetime of this process.
@ -27,6 +29,7 @@ export function appendEvent(basePath, event) {
.digest("hex")
.slice(0, 16);
const fullEvent = {
schemaVersion: WORKFLOW_EVENT_SCHEMA_VERSION,
v: 2,
...event,
hash,
@ -57,7 +60,8 @@ export function readEvents(logPath) {
for (let i = 0; i < lines.length; i++) {
const line = lines[i];
try {
events.push(JSON.parse(line));
const event = normalizeWorkflowEvent(JSON.parse(line));
if (event) events.push(event);
} catch (_err) {
corruptCount++;
const snippet = line.slice(0, 80);
@ -86,6 +90,16 @@ export function readEvents(logPath) {
}
return events;
}
function normalizeWorkflowEvent(event) {
if (!event || typeof event !== "object" || Array.isArray(event)) return null;
const schemaVersion = event.schemaVersion ?? WORKFLOW_EVENT_SCHEMA_VERSION;
if (schemaVersion !== WORKFLOW_EVENT_SCHEMA_VERSION) return null;
return {
...event,
schemaVersion,
};
}
// ─── findForkPoint ───────────────────────────────────────────────────────
/**
* Find the index of the last common event between two logs by comparing hashes.

View file

@ -31,6 +31,7 @@ import { isAuditEnvelopeEnabled } from "./uok/audit-toggle.js";
// ─── Buffer & Persistent Audit ──────────────────────────────────────────
const MAX_BUFFER = 100;
const AUDIT_LOG_SCHEMA_VERSION = 1;
let _buffer = [];
let _auditBasePath = null;
let _stderrEnabled = true;
@ -173,7 +174,7 @@ export function readAuditLog(basePath) {
.filter((l) => l.length > 0)
.map((l) => {
try {
return JSON.parse(l);
return normalizeAuditLogEntry(JSON.parse(l));
} catch {
return null;
}
@ -294,6 +295,7 @@ function _writeStderr(message) {
*/
function _sanitizeForAudit(entry) {
const sanitized = {
schemaVersion: AUDIT_LOG_SCHEMA_VERSION,
ts: entry.ts,
severity: entry.severity,
component: entry.component,
@ -328,3 +330,13 @@ function _sanitizeForAudit(entry) {
}
return sanitized;
}
function normalizeAuditLogEntry(entry) {
if (!entry || typeof entry !== "object" || Array.isArray(entry)) return null;
const schemaVersion = entry.schemaVersion ?? AUDIT_LOG_SCHEMA_VERSION;
if (schemaVersion !== AUDIT_LOG_SCHEMA_VERSION) return null;
return {
...entry,
schemaVersion,
};
}