From 4cefa6de2ac5c9b7af4b8638534a6ff952055ada Mon Sep 17 00:00:00 2001 From: Mikael Hugo Date: Thu, 7 May 2026 03:07:51 +0200 Subject: [PATCH] feat: persist SF runtime signals --- .../pi-coding-agent/src/core/auth-storage.ts | 5 +- .../extensions/sf/autonomous-solver.js | 16 +- src/resources/extensions/sf/commands-todo.js | 29 +- src/resources/extensions/sf/doctor-checks.js | 8 +- .../extensions/sf/doctor-config-checks.js | 75 +++- src/resources/extensions/sf/doctor.js | 7 +- src/resources/extensions/sf/journal.js | 22 +- src/resources/extensions/sf/judgment-log.js | 23 +- src/resources/extensions/sf/model-learner.js | 17 +- .../extensions/sf/preferences-types.js | 14 +- .../extensions/sf/preferences-validation.js | 38 +- src/resources/extensions/sf/preferences.js | 6 +- .../extensions/sf/schedule/schedule-store.js | 22 +- .../extensions/sf/schedule/schedule-types.js | 1 + src/resources/extensions/sf/sf-db.js | 187 ++++++++- src/resources/extensions/sf/sm-client.js | 16 +- src/resources/extensions/sf/sync-scheduler.js | 4 +- .../sf/tests/extension-models-gap5.test.mjs | 59 +-- .../sf/tests/jsonl-schema-versioning.test.mjs | 361 ++++++++++++++++++ .../sf/tests/lifecycle-hooks.test.ts | 8 +- .../extensions/sf/tests/sm-client.test.ts | 2 +- .../sf/tests/sync-scheduler.test.ts | 10 +- .../sf/tests/turn-status-parser.test.ts | 68 +++- .../sf/tests/uok-message-bus.test.mjs | 25 +- .../sf/tests/vault-resolver.test.ts | 26 +- .../extensions/sf/turn-status-parser.js | 9 +- src/resources/extensions/sf/uok/audit.js | 3 + .../extensions/sf/uok/message-bus.js | 145 +++---- .../extensions/sf/uok/parity-report.js | 25 +- src/resources/extensions/sf/vault-resolver.js | 11 +- .../extensions/sf/workflow-events.js | 16 +- .../extensions/sf/workflow-logger.js | 14 +- 32 files changed, 1022 insertions(+), 250 deletions(-) create mode 100644 src/resources/extensions/sf/tests/jsonl-schema-versioning.test.mjs diff --git a/packages/pi-coding-agent/src/core/auth-storage.ts b/packages/pi-coding-agent/src/core/auth-storage.ts index 9391d8743..98f9c0f13 100644 --- a/packages/pi-coding-agent/src/core/auth-storage.ts +++ b/packages/pi-coding-agent/src/core/auth-storage.ts @@ -31,10 +31,7 @@ import { import { getAgentDir } from "../config.js"; import { AUTH_LOCK_STALE_MS } from "./constants.js"; import { acquireLockAsync, acquireLockSyncWithRetry } from "./lock-utils.js"; -import { - resolveConfigValue, - resolveConfigValueAsync, -} from "./resolve-config-value.js"; +import { resolveConfigValueAsync } from "./resolve-config-value.js"; export type ApiKeyCredential = { type: "api_key"; diff --git a/src/resources/extensions/sf/autonomous-solver.js b/src/resources/extensions/sf/autonomous-solver.js index 2727d45f7..40361a9cc 100644 --- a/src/resources/extensions/sf/autonomous-solver.js +++ b/src/resources/extensions/sf/autonomous-solver.js @@ -29,6 +29,8 @@ const DEFAULT_SOLVER_MAX_ITERATIONS = 30000; const MIN_SOLVER_MAX_ITERATIONS = 1; const MAX_SOLVER_MAX_ITERATIONS = 100000; const DEFAULT_MISSING_CHECKPOINT_REPAIR_ATTEMPTS = 4; +const SOLVER_CHECKPOINT_SCHEMA_VERSION = 1; +const SOLVER_STEERING_SCHEMA_VERSION = 1; function solverDir(basePath) { return join(sfRoot(basePath), "runtime", "autonomous-solver"); @@ -259,6 +261,7 @@ export function appendAutonomousSolverCheckpoint(basePath, params) { readJson(statePath(basePath)) ?? beginAutonomousSolverIteration(basePath, params.unitType, params.unitId); const checkpoint = { + schemaVersion: SOLVER_CHECKPOINT_SCHEMA_VERSION, ts: nowIso(), unitType: params.unitType, unitId: params.unitId, @@ -532,6 +535,7 @@ export function appendAutonomousSolverSteering(basePath, text, metadata = {}) { const trimmed = String(text ?? "").trim(); if (!trimmed) return null; const entry = { + schemaVersion: SOLVER_STEERING_SCHEMA_VERSION, id: `${Date.now()}-${Math.random().toString(16).slice(2)}`, ts: nowIso(), text: trimmed, @@ -550,7 +554,7 @@ function readSteeringEntries(basePath) { .filter((line) => line.trim()) .map((line) => { try { - return JSON.parse(line); + return normalizeSteeringEntry(JSON.parse(line)); } catch { return null; } @@ -561,6 +565,16 @@ function readSteeringEntries(basePath) { } } +function normalizeSteeringEntry(entry) { + if (!entry || typeof entry !== "object" || Array.isArray(entry)) return null; + const schemaVersion = entry.schemaVersion ?? SOLVER_STEERING_SCHEMA_VERSION; + if (schemaVersion !== SOLVER_STEERING_SCHEMA_VERSION) return null; + return { + ...entry, + schemaVersion, + }; +} + /** * Consume pending steering exactly once for prompt injection. * diff --git a/src/resources/extensions/sf/commands-todo.js b/src/resources/extensions/sf/commands-todo.js index 532f1abaf..004376f72 100644 --- a/src/resources/extensions/sf/commands-todo.js +++ b/src/resources/extensions/sf/commands-todo.js @@ -22,6 +22,7 @@ import { sfRoot } from "./paths.js"; const _EMPTY_TODO = "# TODO\n\nDump anything here.\n"; const MAX_DUMP_CHARS = 48_000; +const TODO_TRIAGE_SCHEMA_VERSION = 1; const PREFERRED_TRIAGE_MODEL_PATTERNS = [ /minimax.*m2\.7.*highspeed/i, /minimax.*m2\.5.*highspeed/i, @@ -167,7 +168,11 @@ function renderEvalJsonl(result) { return ( result.eval_candidates .map((item) => - JSON.stringify({ ...item, source: item.source ?? "TODO.md" }), + JSON.stringify({ + schemaVersion: TODO_TRIAGE_SCHEMA_VERSION, + ...item, + source: item.source ?? "TODO.md", + }), ) .join("\n") + (result.eval_candidates.length > 0 ? "\n" : "") ); @@ -260,7 +265,16 @@ function detectRecurringPatterns(result) { function renderSkillProposals(result) { const proposals = detectRecurringPatterns(result); if (proposals.length === 0) return "\n"; - return proposals.map((p) => JSON.stringify(p)).join("\n") + "\n"; + return ( + proposals + .map((p) => + JSON.stringify({ + schemaVersion: TODO_TRIAGE_SCHEMA_VERSION, + ...p, + }), + ) + .join("\n") + "\n" + ); } function backlogPath(basePath) { return join(sfRoot(basePath), "WORK-QUEUE.md"); @@ -278,6 +292,7 @@ function renderBacklogJsonl(items, triagedAt) { items .map((item) => JSON.stringify({ + schemaVersion: TODO_TRIAGE_SCHEMA_VERSION, id: item.id, title: item.title, source: "todo-triage", @@ -356,8 +371,14 @@ function normalizedItems(result, createdAt) { function renderNormalizedJsonl(result, createdAt) { const items = normalizedItems(result, createdAt); return ( - items.map((item) => JSON.stringify(item)).join("\n") + - (items.length ? "\n" : "") + items + .map((item) => + JSON.stringify({ + schemaVersion: TODO_TRIAGE_SCHEMA_VERSION, + ...item, + }), + ) + .join("\n") + (items.length ? "\n" : "") ); } function buildTriagePrompt(dump) { diff --git a/src/resources/extensions/sf/doctor-checks.js b/src/resources/extensions/sf/doctor-checks.js index c41d5c4d9..ca4204885 100644 --- a/src/resources/extensions/sf/doctor-checks.js +++ b/src/resources/extensions/sf/doctor-checks.js @@ -1,6 +1,12 @@ // Re-exports for backward compatibility + +export { + checkConfigHealth, + checkSmHealth, + checkTurnStatusHealth, + checkVaultHealth, +} from "./doctor-config-checks.js"; export { checkEngineHealth } from "./doctor-engine-checks.js"; export { checkGitHealth } from "./doctor-git-checks.js"; export { checkGlobalHealth } from "./doctor-global-checks.js"; export { checkRuntimeHealth } from "./doctor-runtime-checks.js"; -export { checkConfigHealth, checkVaultHealth, checkSmHealth } from "./doctor-config-checks.js"; diff --git a/src/resources/extensions/sf/doctor-config-checks.js b/src/resources/extensions/sf/doctor-config-checks.js index c36247151..c94db46fb 100644 --- a/src/resources/extensions/sf/doctor-config-checks.js +++ b/src/resources/extensions/sf/doctor-config-checks.js @@ -7,17 +7,15 @@ * Severity: varies (error for type mismatches, warning for out-of-range values). * Fixable: varies (some are auto-fixable, others are user-config). */ -import { loadEffectiveSFPreferences } from "./preferences.js"; import { getContextCompactThreshold, getContextHardLimit, - getUnitTimeout, - getMaxAgentsForPhase, - isTurnInputRequired, - getWorktreeMode, - getToolAbortGrace, - getMaxTurnsPerAttempt, getHotCacheTurns, + getMaxTurnsPerAttempt, + getToolAbortGrace, + getUnitTimeout, + getWorktreeMode, + loadEffectiveSFPreferences, } from "./preferences.js"; /** @@ -86,7 +84,10 @@ export async function checkConfigHealth(issues, fixesApplied, shouldFix) { } // ─── Phase-Specific Timeouts ──────────────────────────────────────── - if (prefs.unit_timeout_by_phase && typeof prefs.unit_timeout_by_phase === "object") { + if ( + prefs.unit_timeout_by_phase && + typeof prefs.unit_timeout_by_phase === "object" + ) { const recognizedPhases = new Set([ "research", "planning", @@ -95,7 +96,9 @@ export async function checkConfigHealth(issues, fixesApplied, shouldFix) { "discussion", "replan", ]); - for (const [phase, timeout] of Object.entries(prefs.unit_timeout_by_phase)) { + for (const [phase, timeout] of Object.entries( + prefs.unit_timeout_by_phase, + )) { if (!recognizedPhases.has(phase)) { issues.push({ severity: "warning", @@ -122,7 +125,10 @@ export async function checkConfigHealth(issues, fixesApplied, shouldFix) { } // ─── Max Agents Per Phase ──────────────────────────────────────────── - if (prefs.max_agents_by_phase && typeof prefs.max_agents_by_phase === "object") { + if ( + prefs.max_agents_by_phase && + typeof prefs.max_agents_by_phase === "object" + ) { const recognizedPhases = new Set([ "research", "planning", @@ -262,7 +268,7 @@ export async function checkConfigHealth(issues, fixesApplied, shouldFix) { * - Invalid vault URIs * - Vault connectivity issues */ -export function checkVaultHealth(issues, shouldFix) { +export function checkVaultHealth(issues, _shouldFix) { try { // Check if any environment variables reference vault URIs const vaultUris = []; @@ -381,7 +387,7 @@ export function checkVaultHealth(issues, shouldFix) { * - SM unavailable or unhealthy * - Optional (not required — SF works fine locally) */ -export async function checkSmHealth(issues, shouldFix) { +export async function checkSmHealth(issues, _shouldFix) { try { // Check if explicitly disabled if (process.env.SM_ENABLED === "false") { @@ -420,8 +426,51 @@ export async function checkSmHealth(issues, shouldFix) { file: "Singularity Memory server", fixable: false, }); - } catch (err) { + } catch (_err) { // Non-fatal; SM is optional // Don't report errors in the check itself } } + +/** + * Check turn_status marker coverage in prompts (Tier 2.5). + * + * Issues detected: + * - Executive prompts (execute-task, complete-slice, etc.) missing markers + * - Malformed markers or inconsistent format + * + * Severity: warning (agents can function without markers; markers enable pausing/reassess) + */ +export async function checkTurnStatusHealth(issues, _shouldFix) { + try { + // Lazy import to avoid optional dependency + let checkTurnStatusPrompts; + try { + const parser = await import("./turn-status-parser.js"); + checkTurnStatusPrompts = parser.checkTurnStatusPrompts; + } catch { + // Parser not available; skip check + return; + } + + if (!checkTurnStatusPrompts) return; + + // Test marker coverage in SF root + const basePath = process.cwd(); + const result = checkTurnStatusPrompts(basePath); + + if (!result.allGood && result.issues.length > 0) { + issues.push({ + severity: "warning", + code: "turn_status_incomplete", + scope: "project", + unitId: "turn_status", + message: `Turn status markers incomplete: ${result.issues.join(", ")}. Agents won't be able to signal blocked/giving_up state.`, + file: "src/resources/extensions/sf/prompts/", + fixable: false, + }); + } + } catch (_err) { + // Non-fatal; turn_status is optional + } +} diff --git a/src/resources/extensions/sf/doctor.js b/src/resources/extensions/sf/doctor.js index 502809e28..bb14f1b22 100644 --- a/src/resources/extensions/sf/doctor.js +++ b/src/resources/extensions/sf/doctor.js @@ -12,13 +12,14 @@ import { dirname, extname, join } from "node:path"; import { parse as parseYaml } from "yaml"; import { invalidateAllCaches } from "./cache.js"; import { + checkConfigHealth, checkEngineHealth, checkGitHealth, checkGlobalHealth, checkRuntimeHealth, - checkConfigHealth, - checkVaultHealth, checkSmHealth, + checkTurnStatusHealth, + checkVaultHealth, } from "./doctor-checks.js"; import { checkEnvironmentHealth } from "./doctor-environment.js"; import { runProviderChecks } from "./doctor-providers.js"; @@ -1419,6 +1420,8 @@ export async function runSFDoctor(basePath, options) { checkVaultHealth(issues, shouldFix); // Singularity Memory checks — Tier 1.2 optional federation await checkSmHealth(issues, shouldFix); + // Turn status markers — Tier 2.5 agent semantic state signaling + await checkTurnStatusHealth(issues, shouldFix); const milestonesPath = milestonesDir(basePath); if (!existsSync(milestonesPath)) { const report = { diff --git a/src/resources/extensions/sf/journal.js b/src/resources/extensions/sf/journal.js index e8805d771..1cd5373c8 100644 --- a/src/resources/extensions/sf/journal.js +++ b/src/resources/extensions/sf/journal.js @@ -27,6 +27,8 @@ import { sfRuntimeRoot } from "./paths.js"; import { buildAuditEnvelope, emitUokAuditEvent } from "./uok/audit.js"; import { isAuditEnvelopeEnabled } from "./uok/audit-toggle.js"; +const JOURNAL_SCHEMA_VERSION = 1; + // Per-session dedup for journal write failures to prevent log flooding. let _journalWriteFailureNotified = false; // ─── Emit ───────────────────────────────────────────────────────────────────── @@ -43,6 +45,10 @@ export function emitJournalEvent(basePath, entry) { // See auto/turn-epoch.ts for the full rationale. if (isStaleWrite("journal")) return; let writeError; + const persistedEntry = { + schemaVersion: JOURNAL_SCHEMA_VERSION, + ...entry, + }; try { const journalDir = join(sfRuntimeRoot(basePath), "journal"); mkdirSync(journalDir, { recursive: true }); @@ -57,7 +63,7 @@ export function emitJournalEvent(basePath, entry) { withFileLockSync( filePath, () => { - appendFileSync(filePath, JSON.stringify(entry) + "\n"); + appendFileSync(filePath, JSON.stringify(persistedEntry) + "\n"); }, { onLocked: "skip" }, ); @@ -75,6 +81,7 @@ export function emitJournalEvent(basePath, entry) { ".write-failures.jsonl", ); const warningEntry = { + schemaVersion: JOURNAL_SCHEMA_VERSION, ts: new Date().toISOString(), errorClass: writeError.constructor.name, message: writeError.message, @@ -129,7 +136,8 @@ export function queryJournal(basePath, filters) { for (const line of raw.split("\n")) { if (!line.trim()) continue; try { - const entry = JSON.parse(line); + const entry = normalizeJournalEntry(JSON.parse(line)); + if (!entry) continue; entries.push(entry); } catch { // Skip malformed lines @@ -151,3 +159,13 @@ export function queryJournal(basePath, filters) { return []; } } + +function normalizeJournalEntry(entry) { + if (!entry || typeof entry !== "object" || Array.isArray(entry)) return null; + const schemaVersion = entry.schemaVersion ?? JOURNAL_SCHEMA_VERSION; + if (schemaVersion !== JOURNAL_SCHEMA_VERSION) return null; + return { + ...entry, + schemaVersion, + }; +} diff --git a/src/resources/extensions/sf/judgment-log.js b/src/resources/extensions/sf/judgment-log.js index 41d6833b8..71f608d24 100644 --- a/src/resources/extensions/sf/judgment-log.js +++ b/src/resources/extensions/sf/judgment-log.js @@ -13,6 +13,9 @@ import { appendFileSync, existsSync, mkdirSync, readFileSync } from "node:fs"; import { join } from "node:path"; import { sfRuntimeRoot } from "./paths.js"; + +const JUDGMENT_LOG_SCHEMA_VERSION = 1; + /** * Append a single judgment entry to the judgment log JSONL file. * Creates the file and parent directories on first call. @@ -22,7 +25,11 @@ export function appendJudgment(basePath, entry) { try { const logPath = resolveJudgmentLogPath(basePath); mkdirSync(join(logPath, ".."), { recursive: true }); - const full = { ts: new Date().toISOString(), ...entry }; + const full = { + schemaVersion: JUDGMENT_LOG_SCHEMA_VERSION, + ts: new Date().toISOString(), + ...entry, + }; appendFileSync(logPath, JSON.stringify(full) + "\n", "utf-8"); } catch { // Non-fatal — judgment logging must never break the agent loop @@ -45,7 +52,8 @@ export function readJudgmentLog(basePath, unitId) { const trimmed = line.trim(); if (!trimmed) continue; try { - const parsed = JSON.parse(trimmed); + const parsed = normalizeJudgmentEntry(JSON.parse(trimmed)); + if (!parsed) continue; if (unitId && !parsed.unitId.startsWith(unitId)) continue; entries.push(parsed); } catch { @@ -57,6 +65,17 @@ export function readJudgmentLog(basePath, unitId) { return []; } } + +function normalizeJudgmentEntry(entry) { + if (!entry || typeof entry !== "object" || Array.isArray(entry)) return null; + const schemaVersion = entry.schemaVersion ?? JUDGMENT_LOG_SCHEMA_VERSION; + if (schemaVersion !== JUDGMENT_LOG_SCHEMA_VERSION) return null; + return { + ...entry, + schemaVersion, + }; +} + /** * Resolve the absolute path to the judgment log file. */ diff --git a/src/resources/extensions/sf/model-learner.js b/src/resources/extensions/sf/model-learner.js index 2296fa2e8..d493d2920 100644 --- a/src/resources/extensions/sf/model-learner.js +++ b/src/resources/extensions/sf/model-learner.js @@ -19,6 +19,8 @@ import { } from "node:fs"; import { dirname, join } from "node:path"; +const MODEL_FAILURE_LOG_SCHEMA_VERSION = 1; + /** * Per-task-type model performance tracker. * @@ -282,6 +284,7 @@ class FailureAnalyzer { } = failure; const entry = { + schemaVersion: MODEL_FAILURE_LOG_SCHEMA_VERSION, timestamp, taskType, modelId, @@ -323,7 +326,9 @@ class FailureAnalyzer { const lines = content.trim() ? content.trim().split("\n") : []; const entries = [ ...this.entries, - ...lines.map((line) => JSON.parse(line)), + ...lines + .map((line) => normalizeModelFailureEntry(JSON.parse(line))) + .filter(Boolean), ]; return this._summarizeEntries(taskType, modelId, entries); } catch { @@ -367,6 +372,16 @@ class FailureAnalyzer { } } +function normalizeModelFailureEntry(entry) { + if (!entry || typeof entry !== "object" || Array.isArray(entry)) return null; + const schemaVersion = entry.schemaVersion ?? MODEL_FAILURE_LOG_SCHEMA_VERSION; + if (schemaVersion !== MODEL_FAILURE_LOG_SCHEMA_VERSION) return null; + return { + ...entry, + schemaVersion, + }; +} + /** * Main API: Integrate model learning into dispatch workflow. * diff --git a/src/resources/extensions/sf/preferences-types.js b/src/resources/extensions/sf/preferences-types.js index aa565be67..ba84e4dd1 100644 --- a/src/resources/extensions/sf/preferences-types.js +++ b/src/resources/extensions/sf/preferences-types.js @@ -26,7 +26,12 @@ export const MODE_DEFAULTS = { context_compact_at: 20000, context_hard_limit: 35000, unit_timeout: 300, - max_agents_by_phase: { research: 1, planning: 1, execution: 1, validation: 1 }, + max_agents_by_phase: { + research: 1, + planning: 1, + execution: 1, + validation: 1, + }, turn_input_required: false, worktree_mode: "auto", tool_abort_grace: 5000, @@ -45,7 +50,12 @@ export const MODE_DEFAULTS = { context_compact_at: 25000, context_hard_limit: 40000, unit_timeout: 300, - max_agents_by_phase: { research: 2, planning: 1, execution: 4, validation: 2 }, + max_agents_by_phase: { + research: 2, + planning: 1, + execution: 4, + validation: 2, + }, turn_input_required: true, worktree_mode: "manual", tool_abort_grace: 8000, diff --git a/src/resources/extensions/sf/preferences-validation.js b/src/resources/extensions/sf/preferences-validation.js index 45155d8d8..5141ed349 100644 --- a/src/resources/extensions/sf/preferences-validation.js +++ b/src/resources/extensions/sf/preferences-validation.js @@ -1929,39 +1929,57 @@ export function validatePreferences(preferences) { } } if (preferences.unit_timeout_by_phase !== undefined) { - if (typeof preferences.unit_timeout_by_phase === "object" && preferences.unit_timeout_by_phase !== null) { + if ( + typeof preferences.unit_timeout_by_phase === "object" && + preferences.unit_timeout_by_phase !== null + ) { const validatedPhaseTimeouts = {}; - for (const [phase, timeout] of Object.entries(preferences.unit_timeout_by_phase)) { + for (const [phase, timeout] of Object.entries( + preferences.unit_timeout_by_phase, + )) { const seconds = Number(timeout); if (Number.isFinite(seconds) && seconds > 0) { validatedPhaseTimeouts[phase] = Math.floor(seconds); } else { - errors.push(`unit_timeout_by_phase.${phase} must be a positive number (seconds)`); + errors.push( + `unit_timeout_by_phase.${phase} must be a positive number (seconds)`, + ); } } if (Object.keys(validatedPhaseTimeouts).length > 0) { validated.unit_timeout_by_phase = validatedPhaseTimeouts; } } else { - errors.push("unit_timeout_by_phase must be an object mapping phases to seconds"); + errors.push( + "unit_timeout_by_phase must be an object mapping phases to seconds", + ); } } if (preferences.max_agents_by_phase !== undefined) { - if (typeof preferences.max_agents_by_phase === "object" && preferences.max_agents_by_phase !== null) { + if ( + typeof preferences.max_agents_by_phase === "object" && + preferences.max_agents_by_phase !== null + ) { const validatedAgents = {}; - for (const [phase, count] of Object.entries(preferences.max_agents_by_phase)) { + for (const [phase, count] of Object.entries( + preferences.max_agents_by_phase, + )) { const agents = Number(count); if (Number.isFinite(agents) && agents >= 1) { validatedAgents[phase] = Math.floor(agents); } else { - errors.push(`max_agents_by_phase.${phase} must be a positive integer`); + errors.push( + `max_agents_by_phase.${phase} must be a positive integer`, + ); } } if (Object.keys(validatedAgents).length > 0) { validated.max_agents_by_phase = validatedAgents; } } else { - errors.push("max_agents_by_phase must be an object mapping phases to agent counts"); + errors.push( + "max_agents_by_phase must be an object mapping phases to agent counts", + ); } } if (preferences.turn_input_required !== undefined) { @@ -1983,7 +2001,9 @@ export function validatePreferences(preferences) { if (Number.isFinite(ms) && ms >= 0) { validated.tool_abort_grace = Math.floor(ms); } else { - errors.push("tool_abort_grace must be a non-negative number (milliseconds)"); + errors.push( + "tool_abort_grace must be a non-negative number (milliseconds)", + ); } } if (preferences.max_turns_per_attempt !== undefined) { diff --git a/src/resources/extensions/sf/preferences.js b/src/resources/extensions/sf/preferences.js index 24c6d14e6..d649360f3 100644 --- a/src/resources/extensions/sf/preferences.js +++ b/src/resources/extensions/sf/preferences.js @@ -864,7 +864,8 @@ export function getMaxAgentsForPhase(phase) { */ export function isTurnInputRequired() { const prefs = loadEffectiveSFPreferences()?.preferences; - if (prefs?.turn_input_required !== undefined) return prefs.turn_input_required; + if (prefs?.turn_input_required !== undefined) + return prefs.turn_input_required; const mode = prefs?.mode ?? "solo"; return mode === "team"; } @@ -900,7 +901,8 @@ export function getToolAbortGrace() { */ export function getMaxTurnsPerAttempt() { const prefs = loadEffectiveSFPreferences()?.preferences; - if (prefs?.max_turns_per_attempt !== undefined) return prefs.max_turns_per_attempt; + if (prefs?.max_turns_per_attempt !== undefined) + return prefs.max_turns_per_attempt; const mode = prefs?.mode ?? "solo"; return mode === "team" ? 60 : 50; } diff --git a/src/resources/extensions/sf/schedule/schedule-store.js b/src/resources/extensions/sf/schedule/schedule-store.js index 304a5d421..52abb0680 100644 --- a/src/resources/extensions/sf/schedule/schedule-store.js +++ b/src/resources/extensions/sf/schedule/schedule-store.js @@ -22,6 +22,7 @@ import { sfRuntimeRoot } from "../paths.js"; // ─── Constants ────────────────────────────────────────────────────────────── const FILENAME = "schedule.jsonl"; +const SCHEDULE_SCHEMA_VERSION = 1; /** @type {string} */ const _sfHome = process.env.SF_HOME || join(homedir(), ".sf"); @@ -99,7 +100,14 @@ function _appendEntry(basePath, scope, entry) { } withFileLockSync(filePath, () => { - appendFileSync(filePath, JSON.stringify(entry) + "\n", "utf-8"); + appendFileSync( + filePath, + JSON.stringify({ + schemaVersion: SCHEDULE_SCHEMA_VERSION, + ...entry, + }) + "\n", + "utf-8", + ); }); } @@ -132,7 +140,7 @@ function _readEntries(basePath, scope) { if (!line.trim()) continue; try { /** @type {import("./schedule-types.js").ScheduleEntry} */ - const entry = JSON.parse(line); + const entry = normalizeScheduleEntry(JSON.parse(line)); if (!entry || typeof entry.id !== "string") continue; const existing = byId.get(entry.id); @@ -153,6 +161,16 @@ function _readEntries(basePath, scope) { return Array.from(byId.values()); } +function normalizeScheduleEntry(entry) { + if (!entry || typeof entry !== "object" || Array.isArray(entry)) return null; + const schemaVersion = entry.schemaVersion ?? SCHEDULE_SCHEMA_VERSION; + if (schemaVersion !== SCHEDULE_SCHEMA_VERSION) return null; + return { + ...entry, + schemaVersion, + }; +} + /** * Return pending entries whose due_at is at or before `now`, sorted by due_at ASC. * diff --git a/src/resources/extensions/sf/schedule/schedule-types.js b/src/resources/extensions/sf/schedule/schedule-types.js index 43c95ba56..39dbda6e4 100644 --- a/src/resources/extensions/sf/schedule/schedule-types.js +++ b/src/resources/extensions/sf/schedule/schedule-types.js @@ -94,6 +94,7 @@ /** * @typedef {object} ScheduleEntry + * @property {number} [schemaVersion] Schema contract version for persisted JSONL rows * @property {string} id ULID — monotonic, sortable, 28 chars * @property {ScheduleKind} kind What kind of scheduled item this is * @property {ScheduleStatus} status Current lifecycle status diff --git a/src/resources/extensions/sf/sf-db.js b/src/resources/extensions/sf/sf-db.js index c48613dc4..a57a669c7 100644 --- a/src/resources/extensions/sf/sf-db.js +++ b/src/resources/extensions/sf/sf-db.js @@ -78,7 +78,7 @@ function openRawDb(path) { loadProvider(); return new DatabaseSync(path); } -const SCHEMA_VERSION = 30; +const SCHEMA_VERSION = 31; function indexExists(db, name) { return !!db .prepare( @@ -210,6 +210,37 @@ function ensureHeadlessRunTables(db) { "CREATE INDEX IF NOT EXISTS idx_headless_runs_status ON headless_runs(status, created_at DESC)", ); } +function ensureUokMessageTables(db) { + db.exec(` + CREATE TABLE IF NOT EXISTS uok_messages ( + id TEXT PRIMARY KEY, + from_agent TEXT NOT NULL, + to_agent TEXT NOT NULL, + body TEXT NOT NULL DEFAULT '', + metadata_json TEXT NOT NULL DEFAULT '{}', + sent_at TEXT NOT NULL DEFAULT '', + delivered_at TEXT DEFAULT NULL + ) + `); + db.exec(` + CREATE TABLE IF NOT EXISTS uok_message_reads ( + message_id TEXT NOT NULL, + agent_id TEXT NOT NULL, + read_at TEXT NOT NULL DEFAULT '', + PRIMARY KEY (message_id, agent_id), + FOREIGN KEY (message_id) REFERENCES uok_messages(id) ON DELETE CASCADE + ) + `); + db.exec( + "CREATE INDEX IF NOT EXISTS idx_uok_messages_to ON uok_messages(to_agent, sent_at DESC)", + ); + db.exec( + "CREATE INDEX IF NOT EXISTS idx_uok_messages_conversation ON uok_messages(from_agent, to_agent, sent_at DESC)", + ); + db.exec( + "CREATE INDEX IF NOT EXISTS idx_uok_messages_sent ON uok_messages(sent_at DESC)", + ); +} function ensureSelfFeedbackTables(db) { db.exec(` CREATE TABLE IF NOT EXISTS self_feedback ( @@ -703,6 +734,7 @@ function initSchema(db, fileBacked) { ensureRepoProfileTables(db); ensureSolverEvalTables(db); ensureHeadlessRunTables(db); + ensureUokMessageTables(db); db.exec( `CREATE VIEW IF NOT EXISTS active_decisions AS SELECT * FROM decisions WHERE superseded_by IS NULL`, ); @@ -1655,6 +1687,15 @@ function migrateSchema(db) { ":applied_at": new Date().toISOString(), }); } + if (currentVersion < 31) { + ensureUokMessageTables(db); + db.prepare( + "INSERT INTO schema_version (version, applied_at) VALUES (:version, :applied_at)", + ).run({ + ":version": 31, + ":applied_at": new Date().toISOString(), + }); + } db.exec("COMMIT"); } catch (err) { db.exec("ROLLBACK"); @@ -4433,6 +4474,150 @@ export function getDistinctGateIds() { return []; } } +export function insertUokMessage(msg) { + if (!currentDb) return; + currentDb + .prepare( + `INSERT OR IGNORE INTO uok_messages (id, from_agent, to_agent, body, metadata_json, sent_at, delivered_at) + VALUES (:id, :from_agent, :to_agent, :body, :metadata_json, :sent_at, :delivered_at)`, + ) + .run({ + ":id": msg.id, + ":from_agent": msg.from, + ":to_agent": msg.to, + ":body": msg.body ?? "", + ":metadata_json": JSON.stringify(msg.metadata ?? {}), + ":sent_at": msg.sentAt, + ":delivered_at": msg.deliveredAt ?? null, + }); +} +export function getUokMessagesForAgent( + agentId, + limit = 1000, + unreadOnly = false, +) { + if (!currentDb) return []; + try { + let sql = `SELECT m.id, m.from_agent AS "from", m.to_agent AS "to", m.body, m.metadata_json AS metadataJson, m.sent_at AS sentAt, m.delivered_at AS deliveredAt, + CASE WHEN r.agent_id IS NOT NULL THEN 1 ELSE 0 END AS read + FROM uok_messages m + LEFT JOIN uok_message_reads r ON r.message_id = m.id AND r.agent_id = :agent_id + WHERE m.to_agent = :agent_id`; + if (unreadOnly) { + sql += " AND r.agent_id IS NULL"; + } + sql += " ORDER BY m.sent_at ASC LIMIT :limit"; + const rows = currentDb.prepare(sql).all({ + ":agent_id": agentId, + ":limit": Math.max(1, Math.min(10_000, Number(limit) || 1000)), + }); + return rows.map((r) => ({ + id: r.id, + from: r.from, + to: r.to, + body: r.body, + metadata: parseJsonObject(r.metadataJson, {}), + sentAt: r.sentAt, + deliveredAt: r.deliveredAt, + read: !!r.read, + })); + } catch { + return []; + } +} +export function getUokConversation(agentA, agentB, limit = 1000) { + if (!currentDb) return []; + try { + const rows = currentDb + .prepare( + `SELECT id, from_agent AS "from", to_agent AS "to", body, metadata_json AS metadataJson, sent_at AS sentAt, delivered_at AS deliveredAt + FROM uok_messages + WHERE (from_agent = :a AND to_agent = :b) OR (from_agent = :b AND to_agent = :a) + ORDER BY sent_at DESC + LIMIT :limit`, + ) + .all({ ":a": agentA, ":b": agentB, ":limit": limit }); + return rows.map((r) => ({ + id: r.id, + from: r.from, + to: r.to, + body: r.body, + metadata: parseJsonObject(r.metadataJson, {}), + sentAt: r.sentAt, + deliveredAt: r.deliveredAt, + })); + } catch { + return []; + } +} +export function markUokMessageRead(messageId, agentId) { + if (!currentDb) return false; + try { + currentDb + .prepare( + `INSERT OR IGNORE INTO uok_message_reads (message_id, agent_id, read_at) VALUES (:message_id, :agent_id, :read_at)`, + ) + .run({ + ":message_id": messageId, + ":agent_id": agentId, + ":read_at": new Date().toISOString(), + }); + return true; + } catch { + return false; + } +} +export function getUokMessageUnreadCount(agentId) { + if (!currentDb) return 0; + try { + const row = currentDb + .prepare( + `SELECT COUNT(*) AS cnt FROM uok_messages m + WHERE m.to_agent = :agent_id + AND NOT EXISTS ( + SELECT 1 FROM uok_message_reads r + WHERE r.message_id = m.id AND r.agent_id = :agent_id + )`, + ) + .get({ ":agent_id": agentId }); + return row?.cnt ?? 0; + } catch { + return 0; + } +} +export function compactUokMessages(retentionDays) { + if (!currentDb) return { before: 0, after: 0 }; + try { + const cutoff = new Date( + Date.now() - retentionDays * 24 * 60 * 60 * 1000, + ).toISOString(); + const beforeRow = currentDb + .prepare("SELECT COUNT(*) AS cnt FROM uok_messages") + .get(); + currentDb + .prepare("DELETE FROM uok_messages WHERE sent_at < :cutoff") + .run({ ":cutoff": cutoff }); + const afterRow = currentDb + .prepare("SELECT COUNT(*) AS cnt FROM uok_messages") + .get(); + return { before: beforeRow?.cnt ?? 0, after: afterRow?.cnt ?? 0 }; + } catch { + return { before: 0, after: 0 }; + } +} +export function getUokMessageReadIds(agentId) { + if (!currentDb) return []; + try { + const rows = currentDb + .prepare( + "SELECT message_id FROM uok_message_reads WHERE agent_id = :agent_id", + ) + .all({ ":agent_id": agentId }); + return rows.map((r) => r.message_id); + } catch { + return []; + } +} function asStringOrNull(value) { return typeof value === "string" && value.length > 0 ? value : null; } diff --git a/src/resources/extensions/sf/sm-client.js b/src/resources/extensions/sf/sm-client.js index 38edd2093..bf8bb7f77 100644 --- a/src/resources/extensions/sf/sm-client.js +++ b/src/resources/extensions/sf/sm-client.js @@ -23,7 +23,11 @@ export async function initializeSmClient() { // Check if explicitly disabled if (process.env.SM_ENABLED === "false") { - return { connected: false, version: null, reason: "disabled via SM_ENABLED=false" }; + return { + connected: false, + version: null, + reason: "disabled via SM_ENABLED=false", + }; } const addr = process.env.SINGULARITY_MEMORY_ADDR || "http://localhost:8080"; @@ -80,7 +84,10 @@ export async function syncMemoryToSm(memory, opts = {}) { return { queued: false, reason: "SM not connected" }; } - const addr = opts.smAddr || process.env.SINGULARITY_MEMORY_ADDR || "http://localhost:8080"; + const addr = + opts.smAddr || + process.env.SINGULARITY_MEMORY_ADDR || + "http://localhost:8080"; // Queue sync in background (fire-and-forget) setImmediate(async () => { @@ -122,7 +129,10 @@ export async function querySmMemories(query, opts = {}) { return []; } - const addr = opts.smAddr || process.env.SINGULARITY_MEMORY_ADDR || "http://localhost:8080"; + const addr = + opts.smAddr || + process.env.SINGULARITY_MEMORY_ADDR || + "http://localhost:8080"; const limit = opts.limit || 5; // Cross-project recall limit (smaller than local) try { diff --git a/src/resources/extensions/sf/sync-scheduler.js b/src/resources/extensions/sf/sync-scheduler.js index d91deec0a..d614c6c99 100644 --- a/src/resources/extensions/sf/sync-scheduler.js +++ b/src/resources/extensions/sf/sync-scheduler.js @@ -14,7 +14,7 @@ * - Session-end flush: before unit completes, pending syncs are flushed (best-effort) */ -import { syncMemoryToSm, querySmMemories } from "./sm-client.js"; +import { syncMemoryToSm } from "./sm-client.js"; /** * Global sync scheduler state. @@ -189,7 +189,7 @@ async function trySyncWithRetry(item, attempt = 0) { } catch (err) { if (attempt < MAX_RETRIES) { // Exponential backoff: 1s, 2s, 4s - const delayMs = BACKOFF_BASE_MS * Math.pow(2, attempt); + const delayMs = BACKOFF_BASE_MS * 2 ** attempt; await new Promise((resolve) => setTimeout(resolve, delayMs)); diff --git a/src/resources/extensions/sf/tests/extension-models-gap5.test.mjs b/src/resources/extensions/sf/tests/extension-models-gap5.test.mjs index 563e43c5d..af901326d 100644 --- a/src/resources/extensions/sf/tests/extension-models-gap5.test.mjs +++ b/src/resources/extensions/sf/tests/extension-models-gap5.test.mjs @@ -8,7 +8,7 @@ * auto-mode dispatch must be able to select them. */ -import { describe, expect, it, beforeEach, vi } from "vitest"; +import { describe, expect, it } from "vitest"; // Mock extension models const MOCK_EXTENSION_MODELS = [ @@ -58,7 +58,7 @@ describe("Extension-Provided Models (gap-5)", () => { it("extension_models_are_discoverable", () => { const discovered = MOCK_PROVIDER.models.filter( - (m) => m.id === "claude-sonnet-4-6" + (m) => m.id === "claude-sonnet-4-6", ); expect(discovered).toHaveLength(1); }); @@ -114,7 +114,7 @@ describe("Extension-Provided Models (gap-5)", () => { it("can_filter_extension_models_by_capability", () => { const needsReasoning = true; const candidates = MOCK_PROVIDER.models.filter( - (m) => m.reasoning === needsReasoning + (m) => m.reasoning === needsReasoning, ); expect(candidates.length).toBeGreaterThan(0); }); @@ -122,10 +122,10 @@ describe("Extension-Provided Models (gap-5)", () => { it("can_filter_extension_models_by_context_window", () => { const minContextWindow = 500_000; const suitable = MOCK_PROVIDER.models.filter( - (m) => m.contextWindow >= minContextWindow + (m) => m.contextWindow >= minContextWindow, ); expect(suitable).toContain( - MOCK_PROVIDER.models.find((m) => m.id === "claude-sonnet-4-6") + MOCK_PROVIDER.models.find((m) => m.id === "claude-sonnet-4-6"), ); }); }); @@ -145,7 +145,7 @@ describe("Extension-Provided Models (gap-5)", () => { const preferredModels = ["claude-sonnet-4-6", "claude-haiku-4-5"]; const fallback = (unavailable) => { return MOCK_PROVIDER.models.find( - (m) => preferredModels.includes(m.id) && !unavailable.includes(m.id) + (m) => preferredModels.includes(m.id) && !unavailable.includes(m.id), ); }; @@ -170,13 +170,20 @@ describe("Extension-Provided Models (gap-5)", () => { const provider2 = { name: "ollama", models: [ - { id: "llama-2", name: "Llama 2", contextWindow: 4096, maxTokens: 2048 }, + { + id: "llama-2", + name: "Llama 2", + contextWindow: 4096, + maxTokens: 2048, + }, ], }; const allModels = [...provider1.models, ...provider2.models]; expect(allModels).toHaveLength(3); - expect(allModels.filter((m) => m.id === "claude-sonnet-4-6")).toHaveLength(1); + expect( + allModels.filter((m) => m.id === "claude-sonnet-4-6"), + ).toHaveLength(1); expect(allModels.filter((m) => m.id === "llama-2")).toHaveLength(1); }); }); @@ -221,35 +228,31 @@ describe("Extension-Provided Models (gap-5)", () => { ]; const allowed = candidates.filter( - (m) => - !preferences.blocked.includes(`${MOCK_PROVIDER.name}/${m.id}`) + (m) => !preferences.blocked.includes(`${MOCK_PROVIDER.name}/${m.id}`), ); expect(allowed).toContain( - MOCK_PROVIDER.models.find((m) => m.id === "claude-sonnet-4-6") + MOCK_PROVIDER.models.find((m) => m.id === "claude-sonnet-4-6"), ); }); it("extension_models_can_be_blocked", () => { const blocklist = ["claude-code/claude-haiku-4-5"]; const filtered = MOCK_PROVIDER.models.filter( - (m) => - !blocklist.includes(`${MOCK_PROVIDER.name}/${m.id}`) + (m) => !blocklist.includes(`${MOCK_PROVIDER.name}/${m.id}`), ); expect(filtered.find((m) => m.id === "claude-sonnet-4-6")).toBeDefined(); - expect( - filtered.find((m) => m.id === "claude-haiku-4-5") - ).toBeUndefined(); + expect(filtered.find((m) => m.id === "claude-haiku-4-5")).toBeUndefined(); }); it("can_route_to_extension_model_by_capability", () => { - const needsImage = true; + const _needsImage = true; const suitable = MOCK_PROVIDER.models.filter((m) => - m.input.includes("image") + m.input.includes("image"), ); expect(suitable).toContain( - MOCK_PROVIDER.models.find((m) => m.id === "claude-sonnet-4-6") + MOCK_PROVIDER.models.find((m) => m.id === "claude-sonnet-4-6"), ); }); }); @@ -263,9 +266,7 @@ describe("Extension-Provided Models (gap-5)", () => { it("gracefully_handles_extension_unavailable", () => { const provider = { ...MOCK_PROVIDER, isReady: () => false }; - const fallback = provider.isReady() - ? provider.models - : []; + const fallback = provider.isReady() ? provider.models : []; expect(fallback).toHaveLength(0); }); @@ -274,10 +275,10 @@ describe("Extension-Provided Models (gap-5)", () => { const validate = (model) => { return Boolean( model.id && - model.name && - model.contextWindow && - model.maxTokens && - model.cost + model.name && + model.contextWindow && + model.maxTokens && + model.cost, ); }; @@ -294,18 +295,18 @@ describe("Extension-Provided Models (gap-5)", () => { ...MOCK_PROVIDER.models.map((m) => ({ ...m, provider: "claude-code", - score: 0.80, + score: 0.8, })), ]; expect( - candidates.find((m) => m.provider === "claude-code") + candidates.find((m) => m.provider === "claude-code"), ).toBeDefined(); expect(candidates).toHaveLength(3); }); it("extension_model_scoring_works", () => { - const model = MOCK_PROVIDER.models[0]; + const _model = MOCK_PROVIDER.models[0]; const score = 0.85; // hypothetical dispatch score expect(score).toBeGreaterThan(0); diff --git a/src/resources/extensions/sf/tests/jsonl-schema-versioning.test.mjs b/src/resources/extensions/sf/tests/jsonl-schema-versioning.test.mjs new file mode 100644 index 000000000..0fe01c5c5 --- /dev/null +++ b/src/resources/extensions/sf/tests/jsonl-schema-versioning.test.mjs @@ -0,0 +1,361 @@ +/** + * jsonl-schema-versioning.test.mjs - SF-owned JSONL contract markers. + * + * Purpose: prove append-only SF runtime logs carry explicit schema versions + * while legacy missing-version rows remain readable as version 1. + */ +import assert from "node:assert/strict"; +import { + mkdirSync, + mkdtempSync, + readdirSync, + readFileSync, + rmSync, + writeFileSync, +} from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { afterEach, describe, test } from "vitest"; +import { + appendAutonomousSolverCheckpoint, + appendAutonomousSolverSteering, + beginAutonomousSolverIteration, + consumePendingAutonomousSolverSteering, +} from "../autonomous-solver.js"; +import { triageTodoDump } from "../commands-todo.js"; +import { emitJournalEvent, queryJournal } from "../journal.js"; +import { appendJudgment, readJudgmentLog } from "../judgment-log.js"; +import { ModelLearner } from "../model-learner.js"; +import { createScheduleStore } from "../schedule/schedule-store.js"; +import { buildAuditEnvelope, emitUokAuditEvent } from "../uok/audit.js"; +import { + parseParityEvents, + writeParityHeartbeat, +} from "../uok/parity-report.js"; +import { appendEvent, readEvents } from "../workflow-events.js"; +import { + _resetLogs, + logError, + readAuditLog, + setLogBasePath, +} from "../workflow-logger.js"; + +const tmpDirs = []; + +afterEach(() => { + setLogBasePath(null); + _resetLogs(); + while (tmpDirs.length > 0) { + const dir = tmpDirs.pop(); + if (dir) rmSync(dir, { recursive: true, force: true }); + } +}); + +function makeProject() { + const dir = mkdtempSync(join(tmpdir(), "sf-jsonl-versioning-")); + tmpDirs.push(dir); + mkdirSync(join(dir, ".sf"), { recursive: true }); + return dir; +} + +function readJsonl(path) { + return readFileSync(path, "utf-8") + .trim() + .split("\n") + .filter(Boolean) + .map((line) => JSON.parse(line)); +} + +function makeScheduleEntry(overrides = {}) { + return { + id: "01HX0000000000000000000000", + kind: "reminder", + status: "pending", + due_at: "2026-05-07T00:00:00.000Z", + created_at: "2026-05-06T00:00:00.000Z", + payload: { message: "check" }, + created_by: "user", + ...overrides, + }; +} + +describe("SF JSONL schema versioning", () => { + test("schedule_store_writes_schema_version_and_reads_legacy_rows", () => { + const project = makeProject(); + const store = createScheduleStore(project); + store.appendEntry("project", makeScheduleEntry()); + + const path = store._filePathForScope("project"); + assert.equal(readJsonl(path)[0].schemaVersion, 1); + + writeFileSync( + path, + `${JSON.stringify(makeScheduleEntry({ id: "legacy" }))}\n`, + "utf-8", + ); + + const [entry] = store.readEntries("project"); + assert.equal(entry.id, "legacy"); + assert.equal(entry.schemaVersion, 1); + }); + + test("journal_writes_schema_version_and_reads_legacy_rows", () => { + const project = makeProject(); + emitJournalEvent(project, { + ts: "2026-05-07T00:00:00.000Z", + flowId: "flow-1", + seq: 1, + eventType: "iteration-start", + data: { unitId: "M001" }, + }); + + const path = join(project, ".sf", "journal", "2026-05-07.jsonl"); + assert.equal(readJsonl(path)[0].schemaVersion, 1); + + writeFileSync( + path, + `${JSON.stringify({ + ts: "2026-05-07T00:00:01.000Z", + flowId: "legacy-flow", + seq: 1, + eventType: "dispatch-match", + data: { unitId: "M002" }, + })}\n`, + "utf-8", + ); + + const [entry] = queryJournal(project); + assert.equal(entry.schemaVersion, 1); + assert.equal(entry.flowId, "legacy-flow"); + }); + + test("workflow_event_log_writes_schema_version_and_reads_legacy_rows", () => { + const project = makeProject(); + appendEvent(project, { cmd: "plan", params: { milestoneId: "M001" } }); + + const path = join(project, ".sf", "event-log.jsonl"); + assert.equal(readJsonl(path)[0].schemaVersion, 1); + + writeFileSync( + path, + `${JSON.stringify({ + v: 2, + cmd: "legacy", + params: { milestoneId: "M002" }, + hash: "abc", + })}\n`, + "utf-8", + ); + + const [event] = readEvents(path); + assert.equal(event.schemaVersion, 1); + assert.equal(event.cmd, "legacy"); + }); + + test("workflow_audit_log_writes_schema_version_and_reads_legacy_rows", () => { + const project = makeProject(); + setLogBasePath(project); + logError("versioning-test", "audit failure", { id: "audit-1" }); + + const path = join(project, ".sf", "audit-log.jsonl"); + assert.equal(readJsonl(path)[0].schemaVersion, 1); + + writeFileSync( + path, + `${JSON.stringify({ + ts: "2026-05-07T00:00:00.000Z", + severity: "error", + component: "legacy", + message: "legacy error", + })}\n`, + "utf-8", + ); + + const [entry] = readAuditLog(project); + assert.equal(entry.schemaVersion, 1); + assert.equal(entry.component, "legacy"); + }); + + test("uok_audit_and_parity_logs_write_schema_version_and_read_legacy_parity", () => { + const project = makeProject(); + emitUokAuditEvent( + project, + buildAuditEnvelope({ + traceId: "trace-1", + category: "orchestration", + type: "test-event", + }), + ); + assert.equal( + readJsonl(join(project, ".sf", "audit", "events.jsonl"))[0].schemaVersion, + 1, + ); + + writeParityHeartbeat(project, { + ts: "2026-05-07T00:00:00.000Z", + runId: "run-1", + path: "uok-kernel", + phase: "enter", + }); + assert.equal( + readJsonl(join(project, ".sf", "runtime", "uok-parity.jsonl"))[0] + .schemaVersion, + 1, + ); + + const [legacy] = parseParityEvents( + `${JSON.stringify({ + ts: "2026-05-07T00:00:01.000Z", + runId: "legacy-run", + path: "uok-kernel", + phase: "exit", + status: "ok", + })}\n`, + ); + assert.equal(legacy.schemaVersion, 1); + assert.equal(legacy.runId, "legacy-run"); + }); + + test("judgment_log_writes_schema_version_and_reads_legacy_rows", () => { + const project = makeProject(); + appendJudgment(project, { + unitId: "M001/S01/T01", + confidence: "high", + decision: "keep file-backed projection", + }); + + const path = join(project, ".sf", "judgment-log.jsonl"); + assert.equal(readJsonl(path)[0].schemaVersion, 1); + + writeFileSync( + path, + `${JSON.stringify({ + ts: "2026-05-07T00:00:00.000Z", + unitId: "M001/S01/T02", + confidence: "low", + decision: "legacy row", + })}\n`, + "utf-8", + ); + + const [entry] = readJudgmentLog(project, "M001"); + assert.equal(entry.schemaVersion, 1); + assert.equal(entry.decision, "legacy row"); + }); + + test("autonomous_solver_history_and_steering_write_schema_versions", () => { + const project = makeProject(); + beginAutonomousSolverIteration(project, "execute-task", "M001/S01/T01"); + appendAutonomousSolverCheckpoint(project, { + unitType: "execute-task", + unitId: "M001/S01/T01", + outcome: "continue", + summary: "versioned checkpoint", + completedItems: ["one"], + remainingItems: ["two"], + verificationEvidence: ["test"], + pdd: { + purpose: "prove schema marker", + consumer: "solver loop", + contract: "checkpoint row is versioned", + failureBoundary: "unversioned history drifts", + evidence: "jsonl row", + nonGoals: "no solver behavior change", + invariants: "legacy state remains readable", + assumptions: "local fs works", + }, + }); + + assert.equal( + readJsonl( + join( + project, + ".sf", + "runtime", + "autonomous-solver", + "iterations.jsonl", + ), + )[0].schemaVersion, + 1, + ); + + const steering = appendAutonomousSolverSteering(project, "prefer DB state"); + assert.equal(steering.schemaVersion, 1); + const [consumed] = consumePendingAutonomousSolverSteering(project); + assert.equal(consumed.schemaVersion, 1); + }); + + test("model_failure_log_writes_schema_version_and_reads_legacy_rows", () => { + const project = makeProject(); + const learner = new ModelLearner(project); + learner.logFailure("execute-task", "test-model", { + reason: "timeout", + timeout: true, + }); + + const path = join(project, ".sf", "model-failure-log.jsonl"); + assert.equal(readJsonl(path)[0].schemaVersion, 1); + + writeFileSync( + path, + `${JSON.stringify({ + timestamp: "2026-05-07T00:00:00.000Z", + taskType: "execute-task", + modelId: "legacy-model", + reason: "quality", + })}\n`, + "utf-8", + ); + + const summary = learner.getFailureSummary("execute-task", "legacy-model"); + assert.equal(summary.reasons.quality, 1); + }); + + test("todo_triage_jsonl_outputs_write_schema_versions", async () => { + const project = makeProject(); + writeFileSync(join(project, "TODO.md"), "# TODO\n\nmake evals real\n"); + const response = JSON.stringify({ + summary: "triaged", + eval_candidates: [ + { + id: "eval.one", + task_input: "run one", + expected_behavior: "passes", + evidence: "TODO.md", + }, + { + id: "eval.two", + task_input: "run one", + expected_behavior: "passes again", + evidence: "TODO.md", + }, + ], + implementation_tasks: ["add the task"], + memory_requirements: ["remember the rule"], + harness_suggestions: ["add harness"], + docs_or_tests: ["document it"], + unclear_notes: [], + }); + + const result = await triageTodoDump(project, async () => response, { + clear: false, + backlog: true, + date: new Date("2026-05-07T01:02:03.000Z"), + }); + const backlogDir = join(project, ".sf", "triage", "backlog"); + const [backlogFile] = readdirSync(backlogDir).filter((file) => + file.endsWith(".jsonl"), + ); + + for (const path of [ + result.evalJsonlPath, + result.normalizedJsonlPath, + result.skillJsonlPath, + join(backlogDir, backlogFile), + ]) { + const rows = readJsonl(path); + assert.ok(rows.length > 0, `${path} should contain rows`); + for (const row of rows) assert.equal(row.schemaVersion, 1); + } + }); +}); diff --git a/src/resources/extensions/sf/tests/lifecycle-hooks.test.ts b/src/resources/extensions/sf/tests/lifecycle-hooks.test.ts index c7f8ca42d..afd30a3c0 100644 --- a/src/resources/extensions/sf/tests/lifecycle-hooks.test.ts +++ b/src/resources/extensions/sf/tests/lifecycle-hooks.test.ts @@ -4,17 +4,17 @@ * Validates that lifecycle hooks properly flush memory syncs at unit/session completion. */ -import { describe, it, expect, beforeEach, afterEach } from "vitest"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { - flushProjectMemorySync, flushAllProjectsMemorySync, - onUnitTerminal, + flushProjectMemorySync, onSessionEnd, + onUnitTerminal, } from "../lifecycle-hooks.js"; import { + getSyncStatus, queueMemorySync, resetAllSchedulers, - getSyncStatus, } from "../sync-scheduler.js"; describe("Lifecycle Hooks (Tier 1.2 Phase 3b)", () => { diff --git a/src/resources/extensions/sf/tests/sm-client.test.ts b/src/resources/extensions/sf/tests/sm-client.test.ts index 5526ecd18..b8087fce6 100644 --- a/src/resources/extensions/sf/tests/sm-client.test.ts +++ b/src/resources/extensions/sf/tests/sm-client.test.ts @@ -9,7 +9,7 @@ * - Environment variable controls */ -import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; describe("SM Client", () => { let originalEnv: Record; diff --git a/src/resources/extensions/sf/tests/sync-scheduler.test.ts b/src/resources/extensions/sf/tests/sync-scheduler.test.ts index c6f817932..038f85754 100644 --- a/src/resources/extensions/sf/tests/sync-scheduler.test.ts +++ b/src/resources/extensions/sf/tests/sync-scheduler.test.ts @@ -9,14 +9,14 @@ * - Degrades gracefully when SM unavailable */ -import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { - queueMemorySync, + clearSyncQueue, flushSyncQueue, getSyncStatus, - clearSyncQueue, - resetScheduler, + queueMemorySync, resetAllSchedulers, + resetScheduler, } from "../sync-scheduler.js"; describe("Sync Scheduler (Tier 1.2 Phase 2)", () => { @@ -142,7 +142,7 @@ describe("Sync Scheduler (Tier 1.2 Phase 2)", () => { ]); // One flush should process items, other should see already_flushing - const totalProcessed = result1.synced + result2.synced; + const _totalProcessed = result1.synced + result2.synced; // First flush processes 10, second sees already_flushing expect(result1.synced + result1.failed).toBeGreaterThanOrEqual(0); diff --git a/src/resources/extensions/sf/tests/turn-status-parser.test.ts b/src/resources/extensions/sf/tests/turn-status-parser.test.ts index b7a274b9e..c58b1b84d 100644 --- a/src/resources/extensions/sf/tests/turn-status-parser.test.ts +++ b/src/resources/extensions/sf/tests/turn-status-parser.test.ts @@ -5,16 +5,49 @@ * from agent output. */ -import { describe, it, expect } from "vitest"; +import { mkdirSync, mkdtempSync, rmSync, writeFileSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { afterEach, describe, expect, it } from "vitest"; import { + checkTurnStatusPrompts, + describeTurnStatus, extractTurnStatus, isValidTurnStatus, - describeTurnStatus, - resolveSignalFromStatus, parseTurnStatusFull, - checkTurnStatusPrompts, + resolveSignalFromStatus, } from "../turn-status-parser.js"; +const tmpRoots: string[] = []; + +afterEach(() => { + for (const dir of tmpRoots.splice(0)) { + rmSync(dir, { recursive: true, force: true }); + } +}); + +function makePromptRoot() { + const root = mkdtempSync(join(tmpdir(), "sf-turn-status-")); + tmpRoots.push(root); + const prompts = join(root, "src/resources/extensions/sf/prompts"); + mkdirSync(prompts, { recursive: true }); + for (const name of [ + "execute-task.md", + "complete-slice.md", + "research-slice.md", + "plan-slice.md", + "research-milestone.md", + "plan-milestone.md", + ]) { + writeFileSync( + join(prompts, name), + "Finish with complete\n", + "utf-8", + ); + } + return root; +} + describe("Turn Status Parser (Tier 2.5)", () => { describe("extractTurnStatus", () => { it("when_complete_marker_present_extracts_status", () => { @@ -63,8 +96,7 @@ describe("Turn Status Parser (Tier 2.5)", () => { }); it("handles_whitespace_around_marker", () => { - const output = - "Done.\n\n\ncomplete \n\n"; + const output = "Done.\n\n\ncomplete \n\n"; const result = extractTurnStatus(output); @@ -97,14 +129,13 @@ describe("Turn Status Parser (Tier 2.5)", () => { expect(result.cleanOutput).toBe(123); }); - it("finds_marker_only_at_end", () => { + it("ignores_marker_not_at_end", () => { const output = "Found complete in middle\nmore text here"; const result = extractTurnStatus(output); - // Should still extract (regex doesn't require end-of-string) - expect(result.status).toBe("complete"); + expect(result.status).toBeNull(); }); }); @@ -193,8 +224,7 @@ describe("Turn Status Parser (Tier 2.5)", () => { }); it("extracts_and_resolves_blocked_marker", () => { - const output = - "Cannot find file.\n\nblocked"; + const output = "Cannot find file.\n\nblocked"; const result = parseTurnStatusFull(output); @@ -236,15 +266,15 @@ describe("Turn Status Parser (Tier 2.5)", () => { }); describe("checkTurnStatusPrompts", () => { - it("validates_marker_presence_in_prompts", () => { - // This test is informational; real validation requires file access - const result = checkTurnStatusPrompts( - "/home/mhugo/code/singularity-forge", - ); + it("validates_marker_presence_in_prompts_without_commonjs_require", () => { + const result = checkTurnStatusPrompts(makePromptRoot()); expect(result).toHaveProperty("issues"); expect(result).toHaveProperty("allGood"); expect(result).toHaveProperty("promptsChecked"); + expect(result.allGood).toBe(true); + expect(result.promptsChecked).toBe(6); + expect(result.issues).toEqual([]); }); }); @@ -282,13 +312,11 @@ describe("Turn Status Parser (Tier 2.5)", () => { expectedAction: "continue", }, { - output: - "Blocker found.\n\nblocked", + output: "Blocker found.\n\nblocked", expectedAction: "pause", }, { - output: - "Out of ideas.\n\ngiving_up", + output: "Out of ideas.\n\ngiving_up", expectedAction: "reassess", }, { diff --git a/src/resources/extensions/sf/tests/uok-message-bus.test.mjs b/src/resources/extensions/sf/tests/uok-message-bus.test.mjs index a7eb58c93..1e97361fe 100644 --- a/src/resources/extensions/sf/tests/uok-message-bus.test.mjs +++ b/src/resources/extensions/sf/tests/uok-message-bus.test.mjs @@ -1,13 +1,15 @@ import assert from "node:assert/strict"; -import { mkdirSync, mkdtempSync, rmSync, writeFileSync } from "node:fs"; +import { mkdtempSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { afterEach, test } from "vitest"; +import { closeDatabase, insertUokMessage } from "../sf-db.js"; import { AgentInbox, MessageBus } from "../uok/message-bus.js"; const tmpRoots = []; afterEach(() => { + closeDatabase(); for (const dir of tmpRoots.splice(0)) { rmSync(dir, { recursive: true, force: true }); } @@ -56,6 +58,20 @@ test("messageBus_persists_across_reconstruction", () => { assert.equal(messages[0].body, "persistent"); }); +test("messageBus_send_when_inbox_already_hydrated_does_not_double_insert", () => { + const root = makeProject(); + const bus = new MessageBus(root); + const inbox = bus.getInbox("agent-b"); + + const id = bus.send("agent-a", "agent-b", "after-hydrate"); + + assert.equal(inbox.list().length, 1); + assert.equal(inbox.list()[0].id, id); + assert.equal(inbox.list()[0].body, "after-hydrate"); + const bus2 = new MessageBus(root); + assert.equal(bus2.getInbox("agent-b").list().length, 1); +}); + test("messageBus_markRead_updates_state_and_persists", () => { const root = makeProject(); const bus = new MessageBus(root); @@ -76,9 +92,7 @@ test("messageBus_markRead_updates_state_and_persists", () => { test("messageBus_compact_removes_old_messages", () => { const root = makeProject(); const bus = new MessageBus(root, { retentionDays: 1 }); - // Send an old message by manually appending to JSONL - const path = join(root, ".sf", "runtime", "uok-messages.jsonl"); - mkdirSync(join(root, ".sf", "runtime"), { recursive: true }); + // Insert an old message directly into the DB const oldMsg = { id: "msg-old", from: "agent-a", @@ -86,8 +100,9 @@ test("messageBus_compact_removes_old_messages", () => { body: "old", sentAt: new Date(Date.now() - 2 * 24 * 60 * 60 * 1000).toISOString(), deliveredAt: new Date(Date.now() - 2 * 24 * 60 * 60 * 1000).toISOString(), + metadata: {}, }; - writeFileSync(path, `${JSON.stringify(oldMsg)}\n`, "utf-8"); + insertUokMessage(oldMsg); const result = bus.compact(); assert.equal(result.after, 0); diff --git a/src/resources/extensions/sf/tests/vault-resolver.test.ts b/src/resources/extensions/sf/tests/vault-resolver.test.ts index 4c4cd79f6..1c012ba70 100644 --- a/src/resources/extensions/sf/tests/vault-resolver.test.ts +++ b/src/resources/extensions/sf/tests/vault-resolver.test.ts @@ -3,14 +3,12 @@ * * Tests URI parsing, auth chain, caching, and fallback behavior. */ -import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { - parseVaultUri, - resolveVaultToken, - resolveVaultSecret, - isVaultUri, clearVaultCache, getVaultCacheStats, + isVaultUri, + parseVaultUri, resolveSecret, } from "../vault-resolver.js"; @@ -28,9 +26,7 @@ describe("Vault Secret Resolver", () => { describe("parseVaultUri", () => { it("parses valid vault:// URIs", () => { - const result = parseVaultUri( - "vault://secret/anthropic/prod#api_key", - ); + const result = parseVaultUri("vault://secret/anthropic/prod#api_key"); expect(result).toHaveProperty("path", "secret/anthropic/prod"); expect(result).toHaveProperty("field", "api_key"); expect(result).toHaveProperty("vaultAddr"); @@ -39,7 +35,7 @@ describe("Vault Secret Resolver", () => { it("returns error for missing scheme", () => { const result = parseVaultUri("secret/anthropic/prod#api_key"); expect(result).toHaveProperty("error"); - expect(result.error).toContain('vault://'); + expect(result.error).toContain("vault://"); }); it("returns error for missing fragment", () => { @@ -92,9 +88,7 @@ describe("Vault Secret Resolver", () => { it("fails open with vault:// URIs when vault unavailable", async () => { // Mock fetch to reject (vault unavailable) - global.fetch = vi.fn().mockRejectedValue( - new Error("Connection refused"), - ); + global.fetch = vi.fn().mockRejectedValue(new Error("Connection refused")); const result = await resolveSecret("vault://secret/path#field", { failOpen: true, @@ -108,9 +102,7 @@ describe("Vault Secret Resolver", () => { it("throws in strict mode if vault unavailable", async () => { // Mock fetch to reject - global.fetch = vi.fn().mockRejectedValue( - new Error("Connection refused"), - ); + global.fetch = vi.fn().mockRejectedValue(new Error("Connection refused")); await expect( resolveSecret("vault://secret/path#field", { @@ -146,9 +138,7 @@ describe("Vault Secret Resolver", () => { }); it("handles paths with multiple segments", () => { - const result = parseVaultUri( - "vault://secret/deeply/nested/path#field", - ); + const result = parseVaultUri("vault://secret/deeply/nested/path#field"); expect(result.path).toBe("secret/deeply/nested/path"); expect(result.field).toBe("field"); }); diff --git a/src/resources/extensions/sf/turn-status-parser.js b/src/resources/extensions/sf/turn-status-parser.js index b34412865..acbd91185 100644 --- a/src/resources/extensions/sf/turn-status-parser.js +++ b/src/resources/extensions/sf/turn-status-parser.js @@ -25,8 +25,9 @@ export function extractTurnStatus(output) { return { status: null, cleanOutput: output }; } - // Look for marker at end of output (allow whitespace) - const markerRegex = /(complete|blocked|giving_up)<\/turn_status>/i; + // Look for marker at end of output (allow trailing whitespace only). + const markerRegex = + /(complete|blocked|giving_up)<\/turn_status>\s*$/i; const match = output.match(markerRegex); @@ -177,9 +178,7 @@ export function checkTurnStatusPrompts(sfRoot) { const content = readFileSync(promptPath, "utf8"); if (!content.includes("")) { - issues.push( - `Prompt ${prompt} missing turn_status marker template`, - ); + issues.push(`Prompt ${prompt} missing turn_status marker template`); } } diff --git a/src/resources/extensions/sf/uok/audit.js b/src/resources/extensions/sf/uok/audit.js index 36cb56d8d..ee4eeee37 100644 --- a/src/resources/extensions/sf/uok/audit.js +++ b/src/resources/extensions/sf/uok/audit.js @@ -12,6 +12,8 @@ import { withFileLockSync } from "../file-lock.js"; import { sfRuntimeRoot } from "../paths.js"; import { insertAuditEvent, isDbAvailable } from "../sf-db.js"; +const UOK_AUDIT_SCHEMA_VERSION = 1; + function auditLogPath(basePath) { return join(sfRuntimeRoot(basePath), "audit", "events.jsonl"); } @@ -20,6 +22,7 @@ function ensureAuditDir(basePath) { } export function buildAuditEnvelope(args) { return { + schemaVersion: UOK_AUDIT_SCHEMA_VERSION, eventId: randomUUID(), traceId: args.traceId, turnId: args.turnId, diff --git a/src/resources/extensions/sf/uok/message-bus.js b/src/resources/extensions/sf/uok/message-bus.js index 5d648781d..74b66cc17 100644 --- a/src/resources/extensions/sf/uok/message-bus.js +++ b/src/resources/extensions/sf/uok/message-bus.js @@ -2,81 +2,36 @@ * UOK Durable Message Bus & Agent Inbox * * Purpose: implement Letta-style inter-agent communication with at-least-once - * delivery guarantees via append-only JSONL. Messages survive process restarts - * and are retained with configurable TTL. + * delivery guarantees via SQLite. Messages survive process restarts and are + * retained with configurable TTL. * * Consumer: multi-agent orchestration, cross-turn coordination, and UOK kernel * observer chains. */ import { randomUUID } from "node:crypto"; -import { - appendFileSync, - existsSync, - mkdirSync, - readFileSync, - writeFileSync, -} from "node:fs"; +import { mkdirSync } from "node:fs"; import { join } from "node:path"; import { sfRoot } from "../paths.js"; +import { + compactUokMessages, + getUokConversation, + getUokMessageReadIds, + getUokMessagesForAgent, + insertUokMessage, + isDbAvailable, + markUokMessageRead, + openDatabase, +} from "../sf-db.js"; const DEFAULT_RETENTION_DAYS = 7; const DEFAULT_MAX_INBOX_SIZE = 1000; -function messagesPath(basePath) { - return join(sfRoot(basePath), "runtime", "uok-messages.jsonl"); -} - -function inboxStatePath(basePath, agentId) { - const sanitized = agentId.replace(/[^a-zA-Z0-9_-]/g, "_"); - return join(sfRoot(basePath), "runtime", `uok-inbox-${sanitized}.json`); -} - -function loadMessages(basePath) { - const path = messagesPath(basePath); - if (!existsSync(path)) return []; - const lines = readFileSync(path, "utf-8").split(/\r?\n/).filter(Boolean); - const results = []; - for (const line of lines) { - try { - results.push(JSON.parse(line)); - } catch { - // Skip malformed lines - } - } - return results; -} - -function appendMessage(basePath, message) { - const path = messagesPath(basePath); - const dir = join(sfRoot(basePath), "runtime"); +function ensureDb(basePath) { + if (isDbAvailable()) return; + const dir = sfRoot(basePath); mkdirSync(dir, { recursive: true }); - appendFileSync(path, `${JSON.stringify(message)}\n`, "utf-8"); -} - -function loadInboxState(basePath, agentId) { - const path = inboxStatePath(basePath, agentId); - if (!existsSync(path)) return { readIds: [] }; - try { - return JSON.parse(readFileSync(path, "utf-8")); - } catch { - return { readIds: [] }; - } -} - -function saveInboxState(basePath, agentId, state) { - const path = inboxStatePath(basePath, agentId); - const dir = join(sfRoot(basePath), "runtime"); - mkdirSync(dir, { recursive: true }); - writeFileSync(path, `${JSON.stringify(state)}\n`, "utf-8"); -} - -function pruneOldMessages(messages, retentionDays) { - const cutoff = Date.now() - retentionDays * 24 * 60 * 60 * 1000; - return messages.filter((m) => { - const ts = m.sentAt ? Date.parse(m.sentAt) : 0; - return ts >= cutoff; - }); + openDatabase(join(dir, "sf.db")); } export class AgentInbox { @@ -85,20 +40,23 @@ export class AgentInbox { this.basePath = basePath; this.maxSize = options.maxInboxSize ?? DEFAULT_MAX_INBOX_SIZE; this.retentionDays = options.retentionDays ?? DEFAULT_RETENTION_DAYS; - this._state = loadInboxState(basePath, agentId); + ensureDb(basePath); this._messages = this._hydrate(); } _hydrate() { - const all = loadMessages(this.basePath); - const readIds = new Set(this._state.readIds ?? []); - const forMe = all - .filter((m) => m.to === this.agentId) - .map((m) => ({ - ...m, - read: readIds.has(m.id), - })); - return pruneOldMessages(forMe, this.retentionDays).slice(-this.maxSize); + const messages = getUokMessagesForAgent(this.agentId, this.maxSize, false); + const readIds = new Set(getUokMessageReadIds(this.agentId)); + const withRead = messages.map((m) => ({ + ...m, + read: readIds.has(m.id), + })); + const cutoff = Date.now() - this.retentionDays * 24 * 60 * 60 * 1000; + const recent = withRead.filter((m) => { + const ts = m.sentAt ? Date.parse(m.sentAt) : 0; + return ts >= cutoff; + }); + return recent.slice(-this.maxSize); } receive(message) { @@ -107,6 +65,15 @@ export class AgentInbox { receivedAt: new Date().toISOString(), read: false, }; + insertUokMessage({ + id: enriched.id, + from: enriched.from, + to: enriched.to ?? this.agentId, + body: enriched.body, + metadata: enriched.metadata, + sentAt: enriched.sentAt ?? enriched.receivedAt, + deliveredAt: enriched.deliveredAt ?? enriched.receivedAt, + }); this._messages.push(enriched); if (this._messages.length > this.maxSize) { this._messages = this._messages.slice(-this.maxSize); @@ -124,10 +91,7 @@ export class AgentInbox { const msg = this._messages.find((m) => m.id === messageId); if (msg) { msg.read = true; - if (!this._state.readIds.includes(messageId)) { - this._state.readIds.push(messageId); - saveInboxState(this.basePath, this.agentId, this._state); - } + markUokMessageRead(messageId, this.agentId); } return !!msg; } @@ -147,7 +111,7 @@ export class MessageBus { this.retentionDays = options.retentionDays ?? DEFAULT_RETENTION_DAYS; this.maxInboxSize = options.maxInboxSize ?? DEFAULT_MAX_INBOX_SIZE; this.inboxes = new Map(); - this._messages = loadMessages(basePath); + ensureDb(basePath); } _getOrCreateInbox(agentId) { @@ -174,8 +138,7 @@ export class MessageBus { deliveredAt: new Date().toISOString(), }; - appendMessage(this.basePath, message); - this._messages.push(message); + insertUokMessage(message); const targetInbox = this._getOrCreateInbox(to); const alreadyHas = targetInbox.list().some((m) => m.id === message.id); @@ -199,30 +162,10 @@ export class MessageBus { } getConversation(agentA, agentB) { - const all = loadMessages(this.basePath); - return pruneOldMessages( - all.filter( - (m) => - (m.from === agentA && m.to === agentB) || - (m.from === agentB && m.to === agentA), - ), - this.retentionDays, - ); + return getUokConversation(agentA, agentB, this.maxInboxSize); } compact() { - const path = messagesPath(this.basePath); - const all = loadMessages(this.basePath); - const pruned = pruneOldMessages(all, this.retentionDays); - const dir = join(sfRoot(this.basePath), "runtime"); - mkdirSync(dir, { recursive: true }); - writeFileSync( - path, - pruned.map((m) => JSON.stringify(m)).join("\n") + - (pruned.length ? "\n" : ""), - "utf-8", - ); - this._messages = pruned; - return { before: all.length, after: pruned.length }; + return compactUokMessages(this.retentionDays); } } diff --git a/src/resources/extensions/sf/uok/parity-report.js b/src/resources/extensions/sf/uok/parity-report.js index d7a893031..a870a37b0 100644 --- a/src/resources/extensions/sf/uok/parity-report.js +++ b/src/resources/extensions/sf/uok/parity-report.js @@ -10,6 +10,7 @@ import { join } from "node:path"; import { sfRoot } from "../paths.js"; import { getUokRuns, isDbAvailable, recordUokRunExit } from "../sf-db.js"; +const UOK_PARITY_SCHEMA_VERSION = 1; export const UNMATCHED_RUN_STALE_MS = 30 * 60 * 1000; function parityLogPath(basePath) { @@ -75,7 +76,8 @@ export function parseParityEvents(raw) { .filter((line) => line.trim().length > 0) .map((line) => { try { - const parsed = JSON.parse(line); + const parsed = normalizeParityEvent(JSON.parse(line)); + if (!parsed) return null; if (isParityDiffEvent(parsed)) return parsed; return parsed; } catch { @@ -84,7 +86,17 @@ export function parseParityEvents(raw) { error: "invalid parity json line", }; } - }); + }) + .filter(Boolean); +} +function normalizeParityEvent(event) { + if (!event || typeof event !== "object" || Array.isArray(event)) return null; + const schemaVersion = event.schemaVersion ?? UOK_PARITY_SCHEMA_VERSION; + if (schemaVersion !== UOK_PARITY_SCHEMA_VERSION) return null; + return { + ...event, + schemaVersion, + }; } export function buildParityReport( events, @@ -240,6 +252,7 @@ export function buildParityReport( ); } return { + schemaVersion: UOK_PARITY_SCHEMA_VERSION, generatedAt: new Date(nowMs).toISOString(), sourcePath, totalEvents: events.length, @@ -340,7 +353,11 @@ export function writeParityDiff(basePath, event) { try { mkdirSync(join(sfRoot(basePath), "runtime"), { recursive: true }); const logPath = parityLogPath(basePath); - appendFileSync(logPath, `${JSON.stringify(event)}\n`, "utf-8"); + appendFileSync( + logPath, + `${JSON.stringify({ schemaVersion: UOK_PARITY_SCHEMA_VERSION, ...event })}\n`, + "utf-8", + ); } catch { // Best-effort: diff emission must never break orchestration. } @@ -359,7 +376,7 @@ export function writeParityHeartbeat(basePath, event) { mkdirSync(join(sfRoot(basePath), "runtime"), { recursive: true }); appendFileSync( parityLogPath(basePath), - `${JSON.stringify(event)}\n`, + `${JSON.stringify({ schemaVersion: UOK_PARITY_SCHEMA_VERSION, ...event })}\n`, "utf-8", ); } catch { diff --git a/src/resources/extensions/sf/vault-resolver.js b/src/resources/extensions/sf/vault-resolver.js index f60d997d1..e890a62f3 100644 --- a/src/resources/extensions/sf/vault-resolver.js +++ b/src/resources/extensions/sf/vault-resolver.js @@ -19,7 +19,6 @@ * No persistent cache (vault secrets can change). */ import { existsSync, readFileSync } from "node:fs"; -import { expand } from "node:path"; import { homedir } from "node:os"; /** @@ -50,7 +49,7 @@ export function parseVaultUri(uri) { if (!pathPart || !fieldPart) { return { error: - 'Invalid vault URI format. Expected: vault://path/to/secret#fieldname', + "Invalid vault URI format. Expected: vault://path/to/secret#fieldname", }; } @@ -61,8 +60,7 @@ export function parseVaultUri(uri) { return { error: "Path and field must be non-empty" }; } - const vaultAddr = - process.env.VAULT_ADDR || "http://127.0.0.1:8200"; + const vaultAddr = process.env.VAULT_ADDR || "http://127.0.0.1:8200"; const token = resolveVaultToken(); return { path, field, vaultAddr, token }; @@ -154,7 +152,10 @@ async function fetchVaultSecret(path, vaultAddr, token) { * * Caches results in memory for TTL (default 5 minutes). */ -export async function resolveVaultSecret(uri, cacheTtlMs = DEFAULT_CACHE_TTL_MS) { +export async function resolveVaultSecret( + uri, + cacheTtlMs = DEFAULT_CACHE_TTL_MS, +) { // Check memory cache first const cached = secretCache.get(uri); if (cached && Date.now() - cached.timestamp < cacheTtlMs) { diff --git a/src/resources/extensions/sf/workflow-events.js b/src/resources/extensions/sf/workflow-events.js index 67d604824..60b52467e 100644 --- a/src/resources/extensions/sf/workflow-events.js +++ b/src/resources/extensions/sf/workflow-events.js @@ -6,6 +6,8 @@ import { withFileLockSync } from "./file-lock.js"; import { sfRuntimeRoot } from "./paths.js"; import { logWarning } from "./workflow-logger.js"; +const WORKFLOW_EVENT_SCHEMA_VERSION = 1; + // ─── Session ID ─────────────────────────────────────────────────────────── /** * Engine-generated session ID — stable for the lifetime of this process. @@ -27,6 +29,7 @@ export function appendEvent(basePath, event) { .digest("hex") .slice(0, 16); const fullEvent = { + schemaVersion: WORKFLOW_EVENT_SCHEMA_VERSION, v: 2, ...event, hash, @@ -57,7 +60,8 @@ export function readEvents(logPath) { for (let i = 0; i < lines.length; i++) { const line = lines[i]; try { - events.push(JSON.parse(line)); + const event = normalizeWorkflowEvent(JSON.parse(line)); + if (event) events.push(event); } catch (_err) { corruptCount++; const snippet = line.slice(0, 80); @@ -86,6 +90,16 @@ export function readEvents(logPath) { } return events; } + +function normalizeWorkflowEvent(event) { + if (!event || typeof event !== "object" || Array.isArray(event)) return null; + const schemaVersion = event.schemaVersion ?? WORKFLOW_EVENT_SCHEMA_VERSION; + if (schemaVersion !== WORKFLOW_EVENT_SCHEMA_VERSION) return null; + return { + ...event, + schemaVersion, + }; +} // ─── findForkPoint ─────────────────────────────────────────────────────── /** * Find the index of the last common event between two logs by comparing hashes. diff --git a/src/resources/extensions/sf/workflow-logger.js b/src/resources/extensions/sf/workflow-logger.js index 227bec6b5..15899c34d 100644 --- a/src/resources/extensions/sf/workflow-logger.js +++ b/src/resources/extensions/sf/workflow-logger.js @@ -31,6 +31,7 @@ import { isAuditEnvelopeEnabled } from "./uok/audit-toggle.js"; // ─── Buffer & Persistent Audit ────────────────────────────────────────── const MAX_BUFFER = 100; +const AUDIT_LOG_SCHEMA_VERSION = 1; let _buffer = []; let _auditBasePath = null; let _stderrEnabled = true; @@ -173,7 +174,7 @@ export function readAuditLog(basePath) { .filter((l) => l.length > 0) .map((l) => { try { - return JSON.parse(l); + return normalizeAuditLogEntry(JSON.parse(l)); } catch { return null; } @@ -294,6 +295,7 @@ function _writeStderr(message) { */ function _sanitizeForAudit(entry) { const sanitized = { + schemaVersion: AUDIT_LOG_SCHEMA_VERSION, ts: entry.ts, severity: entry.severity, component: entry.component, @@ -328,3 +330,13 @@ function _sanitizeForAudit(entry) { } return sanitized; } + +function normalizeAuditLogEntry(entry) { + if (!entry || typeof entry !== "object" || Array.isArray(entry)) return null; + const schemaVersion = entry.schemaVersion ?? AUDIT_LOG_SCHEMA_VERSION; + if (schemaVersion !== AUDIT_LOG_SCHEMA_VERSION) return null; + return { + ...entry, + schemaVersion, + }; +}