sf snapshot: uncommitted changes after 33m inactivity

This commit is contained in:
Mikael Hugo 2026-05-06 07:35:57 +02:00
parent 500a9d1c1d
commit 14d963cb51
8 changed files with 336 additions and 188 deletions

View file

@ -15,7 +15,7 @@
* bypassing the extension loader's jiti setup (#1137).
*/
import { existsSync, readdirSync, readFileSync } from "node:fs";
import { existsSync, readFileSync } from "node:fs";
import { homedir } from "node:os";
import { dirname, join } from "node:path";
import { createJiti } from "@mariozechner/jiti";
@ -63,6 +63,10 @@ async function loadExtensionModules() {
sfExtensionPath("auto-start"),
{},
)) as any;
const uokRuntimeModule = (await jiti.import(
sfExtensionPath("uok/unit-runtime"),
{},
)) as any;
return {
openProjectDbIfPresent: autoStartModule.openProjectDbIfPresent as (
basePath: string,
@ -78,6 +82,20 @@ async function loadExtensionModules() {
) => any[],
loadEffectiveSFPreferences:
prefsModule.loadEffectiveSFPreferences as () => any,
listUnitRuntimeRecords: uokRuntimeModule.listUnitRuntimeRecords as (
basePath: string,
) => any[],
getUnitRuntimeState: uokRuntimeModule.getUnitRuntimeState as (
record: any,
) => any,
decideUnitRuntimeDispatch: uokRuntimeModule.decideUnitRuntimeDispatch as (
record: any,
opts?: any,
) => any,
isTerminalUnitRuntimeStatus:
uokRuntimeModule.isTerminalUnitRuntimeStatus as (
status: string,
) => boolean,
};
}
@ -167,21 +185,6 @@ export interface QueryResult {
// ─── Implementation ─────────────────────────────────────────────────────────
const QUERY_TERMINAL_STATUSES = new Set([
"completed",
"failed",
"blocked",
"cancelled",
"stale",
"runaway-recovered",
]);
const QUERY_RETRYABLE_TERMINAL_STATUSES = new Set([
"failed",
"stale",
"runaway-recovered",
]);
const DEFAULT_QUERY_MAX_RETRIES = 1;
function resolveSfRootForQuery(basePath: string): string {
let current = basePath;
while (true) {
@ -193,14 +196,6 @@ function resolveSfRootForQuery(basePath: string): string {
}
}
function stringField(value: unknown, fallback = ""): string {
return typeof value === "string" ? value : fallback;
}
function numberField(value: unknown): number | null {
return typeof value === "number" && Number.isFinite(value) ? value : null;
}
function pidIsAlive(pid: unknown): boolean {
if (!Number.isInteger(pid) || Number(pid) <= 0) return false;
if (pid === process.pid) return true;
@ -226,155 +221,46 @@ function queryHasLiveAutoLock(basePath: string): boolean {
}
}
function inferQueryStatus(
phase: string,
record: Record<string, unknown>,
): string {
switch (phase) {
case "queued":
case "claimed":
case "running":
case "progress":
case "completed":
case "failed":
case "blocked":
case "cancelled":
case "stale":
case "runaway-recovered":
case "notified":
return phase;
case "dispatched":
return "running";
case "wrapup-warning-sent":
case "runaway-warning-sent":
case "runaway-final-warning-sent":
case "recovered":
return "progress";
case "timeout":
return "stale";
case "finalized":
return "completed";
case "paused":
return record.runawayGuardPause ? "runaway-recovered" : "blocked";
case "skipped":
return "blocked";
default:
return "running";
}
}
function queryRuntimeDecision(input: {
unitType: string;
unitId: string;
status: string;
retryCount: number;
maxRetries: number;
notifiedAt: number | null;
}): RuntimeDispatchDecisionSummary {
const retryBudgetRemaining = Math.max(0, input.maxRetries - input.retryCount);
const common = {
retryCount: input.retryCount,
maxRetries: input.maxRetries,
retryBudgetRemaining,
};
if (input.notifiedAt !== null) {
return { action: "skip", reasonCode: "already-notified", ...common };
}
if (input.status === "notified") {
return { action: "skip", reasonCode: "notified", ...common };
}
if (input.status === "queued") {
return { action: "dispatch", reasonCode: "queued", ...common };
}
if (!QUERY_TERMINAL_STATUSES.has(input.status)) {
return { action: "skip", reasonCode: "active-or-claimed", ...common };
}
const synthetic =
input.unitType === "synthetic" ||
input.unitId.includes("parallel-research");
if (synthetic && input.status !== "completed") {
return {
action: "block",
reasonCode: "synthetic-reset-required",
...common,
};
}
if (QUERY_RETRYABLE_TERMINAL_STATUSES.has(input.status)) {
return retryBudgetRemaining > 0
? { action: "retry", reasonCode: "retry-budget-available", ...common }
: { action: "block", reasonCode: "retry-budget-exhausted", ...common };
}
if (
input.status === "completed" ||
input.status === "blocked" ||
input.status === "cancelled"
) {
return {
action: "notify",
reasonCode: "terminal-ready-to-notify",
...common,
};
}
return { action: "skip", reasonCode: "terminal-nonretryable", ...common };
}
function readRuntimeUnitSummaries(basePath: string): RuntimeUnitSummary[] {
const unitsDir = join(resolveSfRootForQuery(basePath), "runtime", "units");
if (!existsSync(unitsDir)) return [];
function readRuntimeUnitSummaries(
basePath: string,
uokRuntime: {
listUnitRuntimeRecords: (basePath: string) => any[];
getUnitRuntimeState: (record: any) => any;
decideUnitRuntimeDispatch: (record: any, opts?: any) => any;
isTerminalUnitRuntimeStatus: (status: string) => boolean;
},
): RuntimeUnitSummary[] {
const hasLiveAutoLock = queryHasLiveAutoLock(basePath);
const records = uokRuntime.listUnitRuntimeRecords(basePath);
const results: RuntimeUnitSummary[] = [];
for (const file of readdirSync(unitsDir)) {
if (!file.endsWith(".json")) continue;
try {
const record = JSON.parse(
readFileSync(join(unitsDir, file), "utf-8"),
) as Record<string, unknown>;
const unitType = stringField(record.unitType);
const unitId = stringField(record.unitId);
if (!unitType || !unitId) continue;
const phase = stringField(record.phase, "dispatched");
let status = stringField(record.status, inferQueryStatus(phase, record));
if (!hasLiveAutoLock && !QUERY_TERMINAL_STATUSES.has(status)) {
status = "stale";
}
const recoveryAttempts = numberField(record.recoveryAttempts) ?? 0;
const retryCount = numberField(record.retryCount) ?? recoveryAttempts;
const maxRetries =
numberField(record.maxRetries) ?? DEFAULT_QUERY_MAX_RETRIES;
const notifiedAt = numberField(record.notifiedAt);
const dispatchDecision = queryRuntimeDecision({
unitType,
unitId,
status,
retryCount,
maxRetries,
notifiedAt,
});
results.push({
unitType,
unitId,
phase,
status,
startedAt: numberField(record.startedAt),
updatedAt: numberField(record.updatedAt),
retryCount,
maxRetries,
retryBudgetRemaining: dispatchDecision.retryBudgetRemaining,
lastHeartbeatAt: numberField(record.lastHeartbeatAt),
lastProgressAt: numberField(record.lastProgressAt),
lastOutputAt: numberField(record.lastOutputAt),
outputPath:
typeof record.outputPath === "string" ? record.outputPath : null,
watchdogReason:
typeof record.watchdogReason === "string"
? record.watchdogReason
: null,
notifiedAt,
dispatchDecision,
});
} catch {
// Runtime query must stay best-effort; malformed unit files are ignored.
for (const record of records) {
const unitType = String(record.unitType ?? "");
const unitId = String(record.unitId ?? "");
if (!unitType || !unitId) continue;
const state = uokRuntime.getUnitRuntimeState(record);
let status = state.status;
if (!hasLiveAutoLock && !uokRuntime.isTerminalUnitRuntimeStatus(status)) {
status = "stale";
}
const decision = uokRuntime.decideUnitRuntimeDispatch(record);
results.push({
unitType,
unitId,
phase: String(record.phase ?? "dispatched"),
status,
startedAt: typeof record.startedAt === "number" ? record.startedAt : null,
updatedAt: typeof record.updatedAt === "number" ? record.updatedAt : null,
retryCount: state.retryCount,
maxRetries: state.maxRetries,
retryBudgetRemaining: decision.retryBudgetRemaining,
lastHeartbeatAt: state.lastHeartbeatAt,
lastProgressAt: state.lastProgressAt ?? null,
lastOutputAt: state.lastOutputAt,
outputPath: state.outputPath,
watchdogReason: state.watchdogReason,
notifiedAt: state.notifiedAt,
dispatchDecision: decision,
});
}
return results;
}
@ -388,6 +274,10 @@ export async function buildQuerySnapshot(
resolveDispatch,
readAllSessionStatuses,
loadEffectiveSFPreferences,
listUnitRuntimeRecords,
getUnitRuntimeState,
decideUnitRuntimeDispatch,
isTerminalUnitRuntimeStatus,
} = await loadExtensionModules();
await openProjectDbIfPresent(basePath);
const state = await deriveState(basePath);
@ -463,7 +353,14 @@ export async function buildQuerySnapshot(
state,
next,
cost: { workers, total: workers.reduce((sum, w) => sum + w.cost, 0) },
runtime: { units: readRuntimeUnitSummaries(basePath) },
runtime: {
units: readRuntimeUnitSummaries(basePath, {
listUnitRuntimeRecords,
getUnitRuntimeState,
decideUnitRuntimeDispatch,
isTerminalUnitRuntimeStatus,
}),
},
schedule: scheduleEntries,
};

View file

@ -82,6 +82,10 @@ import { selectReactiveDispatchBatch } from "./uok/execution-graph.js";
import { resolveUokFlags } from "./uok/flags.js";
import { UokGateRunner } from "./uok/gate-runner.js";
import { hasFinalizedMilestoneContext } from "./uok/plan-v2.js";
import {
decideUnitRuntimeDispatch,
readUnitRuntimeRecord,
} from "./uok/unit-runtime.js";
import { extractVerdict, isAcceptableUatVerdict } from "./verdict-parser.js";
import { logError, logWarning } from "./workflow-logger.js";
@ -1657,6 +1661,27 @@ function emitDispatchEnvelope(ctx, action) {
* loop over DISPATCH_RULES for backward compatibility (tests that import
* resolveDispatch directly without registry initialization).
*/
function applyUokRuntimeGuard(ctx, dispatchResult) {
if (dispatchResult.action !== "dispatch") return dispatchResult;
const { basePath } = ctx;
const { unitType, unitId } = dispatchResult;
if (!unitType || !unitId) return dispatchResult;
const record = readUnitRuntimeRecord(basePath, unitType, unitId);
const decision = decideUnitRuntimeDispatch(record);
if (decision.action === "dispatch" || decision.action === "retry") {
return dispatchResult;
}
if (decision.action === "block") {
return {
action: "stop",
reason: `UOK runtime guard blocked ${unitType} ${unitId}: ${decision.reasonCode} (retry ${decision.retryCount}/${decision.maxRetries})`,
level: "warning",
matchedRule: dispatchResult.matchedRule,
};
}
// skip or notify — treat as skip so the loop re-derives state
return { action: "skip", matchedRule: dispatchResult.matchedRule };
}
export async function resolveDispatch(ctx) {
// Fetch pipeline variant once per dispatch cycle so rules can read ctx.pipelineVariant
// without triggering redundant DB queries + heuristic evaluations.
@ -1670,8 +1695,9 @@ export async function resolveDispatch(ctx) {
if (hasRegistry()) {
try {
const result = await getRegistry().evaluateDispatch(ctx);
emitDispatchEnvelope(ctx, result);
return result;
const guarded = applyUokRuntimeGuard(ctx, result);
emitDispatchEnvelope(ctx, guarded);
return guarded;
} catch (err) {
// Genuine registry evaluation failure (rule threw, etc.) — log so we
// surface real bugs, then fall back.
@ -1685,8 +1711,9 @@ export async function resolveDispatch(ctx) {
const result = await rule.match(ctx);
if (result) {
if (result.action !== "skip") result.matchedRule = rule.name;
emitDispatchEnvelope(ctx, result);
return result;
const guarded = applyUokRuntimeGuard(ctx, result);
emitDispatchEnvelope(ctx, guarded);
return guarded;
}
}
// No rule matched — unhandled phase.

View file

@ -27,6 +27,7 @@ import { isClosedStatus } from "./status-guards.js";
import { parseUnitId } from "./unit-id.js";
import { resolveUokFlags } from "./uok/flags.js";
import { UokGateRunner } from "./uok/gate-runner.js";
import { SecurityGate } from "./uok/security-gate.js";
import { extractVerdict } from "./verdict-parser.js";
import { writeVerificationJSON } from "./verification-evidence.js";
import {
@ -271,6 +272,9 @@ export async function runPostUnitVerification(vctx, pauseAuto) {
findings: result.passed ? "" : formatFailureContext(result),
}),
});
if (uokFlags.securityGuard) {
gateRunner.register(new SecurityGate());
}
await gateRunner.run("verification-gate", {
basePath: s.basePath,
traceId: `verification:${s.currentUnit.id}`,
@ -281,6 +285,24 @@ export async function runPostUnitVerification(vctx, pauseAuto) {
unitType: s.currentUnit.type,
unitId: s.currentUnit.id,
});
if (uokFlags.securityGuard) {
const secResult = await gateRunner.run("security-guard", {
basePath: s.basePath,
traceId: `security-guard:${s.currentUnit.id}`,
turnId: s.currentUnit.id,
milestoneId: mid ?? undefined,
sliceId: sid ?? undefined,
taskId: tid ?? undefined,
unitType: s.currentUnit.type,
unitId: s.currentUnit.id,
});
if (secResult.outcome === "fail") {
result.passed = false;
result.securityFailure = true;
result.securityRationale = secResult.rationale;
result.securityFindings = secResult.findings;
}
}
}
// Auto-fix retry preferences
const autoFixEnabled = prefs?.verification_auto_fix !== false;
@ -326,6 +348,19 @@ export async function runPostUnitVerification(vctx, pauseAuto) {
);
}
}
// Log security failures
if (result.securityFailure) {
ctx.ui.notify(
`[verify] SECURITY FAIL — ${result.securityRationale}`,
"error",
);
process.stderr.write(
`verification-gate: security failure: ${result.securityRationale}\n`,
);
if (result.securityFindings) {
process.stderr.write(`${result.securityFindings}\n`);
}
}
// Write verification evidence JSON
const attempt = s.verificationRetryCount.get(s.currentUnit.id) ?? 0;
if (mid && sid && tid) {

View file

@ -34,6 +34,7 @@ import {
removeSessionStatus,
} from "./session-status-io.js";
import { deriveState } from "./state.js";
import { listUnitRuntimeRecords } from "./uok/unit-runtime.js";
import { getAuditEmitFailureCount } from "./workflow-logger.js";
const ACTIVE_UNIT_RUNTIME_STATUSES = new Set([
@ -96,19 +97,12 @@ export async function checkRuntimeHealth(
// or points at a dead PID, dispatched/runtime-active unit files are stale
// leftovers and make query/dispatch believe old units are still claimed.
try {
const unitsDir = join(root, "runtime", "units");
if (!crashLockAlive && existsSync(unitsDir)) {
if (!crashLockAlive) {
const records = listUnitRuntimeRecords(basePath);
const staleFiles = [];
for (const file of readdirSync(unitsDir)) {
if (!file.endsWith(".json")) continue;
const abs = join(unitsDir, file);
try {
const record = JSON.parse(readFileSync(abs, "utf8"));
if (isActiveUnitRuntimeRecord(record)) {
staleFiles.push({ file, abs, record });
}
} catch {
// Malformed runtime unit records are handled by other checks.
for (const record of records) {
if (isActiveUnitRuntimeRecord(record)) {
staleFiles.push(record);
}
}
if (staleFiles.length > 0) {
@ -123,7 +117,18 @@ export async function checkRuntimeHealth(
});
if (shouldFix("stale_active_unit_runtime")) {
for (const stale of staleFiles) {
rmSync(stale.abs, { force: true });
const sanitizedUnitType = String(stale.unitType).replace(
/[/]/g,
"-",
);
const sanitizedUnitId = String(stale.unitId).replace(/[/]/g, "-");
const abs = join(
root,
"runtime",
"units",
`${sanitizedUnitType}-${sanitizedUnitId}.json`,
);
rmSync(abs, { force: true });
}
fixesApplied.push(
`cleared ${staleFiles.length} stale active unit runtime record(s)`,

View file

@ -0,0 +1,175 @@
/**
* Schedule E2E full addlistdone round-trip.
*
* Purpose: verify the schedule system end-to-end through the store API:
* appendEntry with a past due_at, findDue returns it, list shows it,
* updateEntryStatus to done, findDue returns 0.
*
* Consumer: CI test runner (vitest).
*/
import assert from "node:assert/strict";
import { mkdirSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, beforeEach, describe, it } from "vitest";
import { createScheduleStore } from "../schedule/schedule-store.js";
import { generateULID } from "../schedule/schedule-ulid.js";
describe("schedule-e2e round-trip", () => {
/** @type {string} */
let testDir;
/** @type {ReturnType<typeof createScheduleStore>} */
let store;
beforeEach(() => {
testDir = join(
tmpdir(),
`sf-schedule-e2e-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
);
mkdirSync(testDir, { recursive: true });
store = createScheduleStore(testDir);
});
afterEach(() => {
try {
rmSync(testDir, { recursive: true });
} catch {
// ignore
}
});
function makeEntry(overrides = {}) {
const now = new Date().toISOString();
return {
id: generateULID(),
kind: "reminder",
status: "pending",
due_at: now,
created_at: now,
payload: { message: "test" },
created_by: "user",
...overrides,
};
}
it("appends an entry with due_at in the past and findDue returns it", () => {
const entry = makeEntry({
due_at: "2020-01-01T00:00:00.000Z",
status: "pending",
});
store.appendEntry("project", entry);
const due = store.findDue("project", "2024-06-01T00:00:00.000Z");
assert.equal(due.length, 1);
assert.equal(due[0].id, entry.id);
});
it("list shows the pending entry", () => {
const entry = makeEntry({
due_at: "2020-01-01T00:00:00.000Z",
status: "pending",
payload: { message: "e2e test item" },
});
store.appendEntry("project", entry);
const entries = store
.readEntries("project")
.filter((e) => e.status === "pending" || e.status === "snoozed");
assert.equal(entries.length, 1);
assert.equal(entries[0].id, entry.id);
assert.equal(entries[0].payload.message, "e2e test item");
});
it("marking done via appendEntry update removes it from findDue", () => {
const entry = makeEntry({
due_at: "2020-01-01T00:00:00.000Z",
status: "pending",
});
store.appendEntry("project", entry);
// Verify it's due
let due = store.findDue("project", "2024-06-01T00:00:00.000Z");
assert.equal(due.length, 1);
// Append a "done" update for the same ID
const updated = {
...entry,
status: "done",
created_at: new Date().toISOString(),
};
store.appendEntry("project", updated);
// Now findDue should return 0
due = store.findDue("project", "2024-06-01T00:00:00.000Z");
assert.equal(due.length, 0);
});
it("full round-trip: add past → findDue → list → done → findDue empty", () => {
const entry = makeEntry({
due_at: "2020-01-01T00:00:00.000Z",
status: "pending",
payload: { message: "round-trip item" },
});
// 1) add
store.appendEntry("project", entry);
// 2) findDue returns it
let due = store.findDue("project", "2024-06-01T00:00:00.000Z");
assert.equal(due.length, 1);
assert.equal(due[0].id, entry.id);
// 3) list shows it
const list = store
.readEntries("project")
.filter((e) => e.status === "pending" || e.status === "snoozed");
assert.equal(list.length, 1);
assert.equal(list[0].payload.message, "round-trip item");
// 4) mark done
const updated = {
...entry,
status: "done",
created_at: new Date().toISOString(),
};
store.appendEntry("project", updated);
// 5) findDue returns 0
due = store.findDue("project", "2024-06-01T00:00:00.000Z");
assert.equal(due.length, 0);
});
it("isolates scopes: two project stores do not see each others entries", () => {
const testDir2 = join(tmpdir(), `sf-schedule-e2e-2-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`);
mkdirSync(testDir2, { recursive: true });
const store2 = createScheduleStore(testDir2);
const entry1 = makeEntry({
due_at: "2020-01-01T00:00:00.000Z",
status: "pending",
payload: { message: "store 1" },
});
const entry2 = makeEntry({
due_at: "2020-01-01T00:00:00.000Z",
status: "pending",
payload: { message: "store 2" },
});
store.appendEntry("project", entry1);
store2.appendEntry("project", entry2);
const due1 = store.findDue("project", "2024-06-01T00:00:00.000Z");
const due2 = store2.findDue("project", "2024-06-01T00:00:00.000Z");
assert.equal(due1.length, 1);
assert.equal(due1[0].payload.message, "store 1");
assert.equal(due2.length, 1);
assert.equal(due2[0].payload.message, "store 2");
try {
rmSync(testDir2, { recursive: true });
} catch {
// ignore
}
});
});

View file

@ -6,6 +6,9 @@ export function resolveUokFlags(prefs) {
return {
enabled: enabledByPreference,
gates: uok?.gates?.enabled ?? true,
securityGuard: uok?.security_guard?.enabled ?? true,
multiPackageHealing: uok?.multi_package_healing?.enabled ?? true,
chaosMonkey: uok?.chaos_monkey?.enabled ?? false,
modelPolicy: uok?.model_policy?.enabled ?? true,
executionGraph: uok?.execution_graph?.enabled ?? true,
gitops: uok?.gitops?.enabled ?? true,

View file

@ -134,6 +134,7 @@ export async function runAutoLoopWithUok(args) {
gitPush: flags.gitopsTurnPush,
enableAudit: flags.auditEnvelope,
enableGitops: flags.gitops,
enableChaosMonkey: flags.chaosMonkey,
}),
};
let status = "ok";

View file

@ -1,4 +1,5 @@
import { buildAuditEnvelope, emitUokAuditEvent } from "./audit.js";
import { ChaosMonkey } from "./chaos-monkey.js";
import {
writeTurnCloseoutGitRecord,
writeTurnGitTransaction,
@ -12,6 +13,8 @@ export function createTurnObserver(options) {
let current = null;
let writerToken = null;
const phaseResults = [];
const chaosMonkey = options.enableChaosMonkey ? new ChaosMonkey() : null;
function nextSequenceMetadata(category, operation, metadata) {
if (!writerToken) return metadata ?? {};
const record = nextWriteRecord({
@ -29,6 +32,7 @@ export function createTurnObserver(options) {
}
return {
onTurnStart(contract) {
if (chaosMonkey) chaosMonkey.strike("turn-start");
current = contract;
phaseResults.length = 0;
writerToken = acquireWriterToken({
@ -72,6 +76,7 @@ export function createTurnObserver(options) {
}
},
onPhaseResult(phase, action, data) {
if (chaosMonkey) chaosMonkey.strike(`after-${phase}`);
phaseResults.push({
phase,
action,