singularity-forge/src/headless-query.ts

401 lines
11 KiB
TypeScript

/**
* Headless Query — `sf headless query`
*
* Single read-only command that returns the full project snapshot as JSON
* to stdout, without spawning an LLM session. Instant (~50ms).
*
* Output: { schemaVersion, state, next, cost }
* schemaVersion — output contract version
* state — deriveState() output (phase, milestones, progress, blockers)
* next — dry-run dispatch preview (what autonomous mode would do next)
* cost — aggregated parallel worker costs
*
* Note: Extension modules are .ts files loaded via jiti (not compiled to .js).
* We use createJiti() here because this module is imported directly from cli.ts,
* bypassing the extension loader's jiti setup (#1137).
*/
import { existsSync, readFileSync } from "node:fs";
import { dirname, join } from "node:path";
import { createJiti } from "@mariozechner/jiti";
import { resolveBundledSourceResource } from "./bundled-resource-path.js";
import { getSfEnv } from "./env.js";
import type { SFState } from "./resources/extensions/sf/types.js";
const jiti = createJiti(import.meta.filename, {
interopDefault: true,
debug: false,
});
// Resolve extensions from the synced agent directory so headless-query
// loads the same extension copy as interactive/autonomous modes (#3471).
// The synced runtime is compiled .js; source-tree fallback is .ts.
const agentExtensionsDir = join(getSfEnv().agentDir, "extensions", "sf");
const useAgentDir = existsSync(join(agentExtensionsDir, "state.js"));
const sfExtensionPath = (moduleName: string) => {
if (useAgentDir) return join(agentExtensionsDir, `${moduleName}.js`);
const tsPath = resolveBundledSourceResource(
import.meta.url,
"extensions",
"sf",
`${moduleName}.ts`,
);
if (existsSync(tsPath)) return tsPath;
return resolveBundledSourceResource(
import.meta.url,
"extensions",
"sf",
`${moduleName}.js`,
);
};
async function loadExtensionModules() {
const stateModule = (await jiti.import(sfExtensionPath("state"), {})) as any;
const dispatchModule = (await jiti.import(
sfExtensionPath("auto-dispatch"),
{},
)) as any;
const sessionModule = (await jiti.import(
sfExtensionPath("session-status-io"),
{},
)) as any;
const prefsModule = (await jiti.import(
sfExtensionPath("preferences"),
{},
)) as any;
const autoStartModule = (await jiti.import(
sfExtensionPath("auto-start"),
{},
)) as any;
const uokRuntimeModule = (await jiti.import(
sfExtensionPath("uok/unit-runtime"),
{},
)) as any;
const uokDiagnosticsModule = (await jiti.import(
sfExtensionPath("uok/diagnostic-synthesis"),
{},
)) as any;
return {
openProjectDbIfPresent: autoStartModule.openProjectDbIfPresent as (
basePath: string,
) => Promise<void>,
deriveState: stateModule.deriveState as (
basePath: string,
) => Promise<SFState>,
resolveDispatch: dispatchModule.resolveDispatch as (
opts: any,
) => Promise<any>,
readAllSessionStatuses: sessionModule.readAllSessionStatuses as (
basePath: string,
) => any[],
loadEffectiveSFPreferences:
prefsModule.loadEffectiveSFPreferences as () => any,
listUnitRuntimeRecords: uokRuntimeModule.listUnitRuntimeRecords as (
basePath: string,
) => any[],
getUnitRuntimeState: uokRuntimeModule.getUnitRuntimeState as (
record: any,
) => any,
decideUnitRuntimeDispatch: uokRuntimeModule.decideUnitRuntimeDispatch as (
record: any,
opts?: any,
) => any,
isTerminalUnitRuntimeStatus:
uokRuntimeModule.isTerminalUnitRuntimeStatus as (
status: string,
) => boolean,
writeUokDiagnostics: uokDiagnosticsModule.writeUokDiagnostics as (
basePath: string,
opts?: any,
) => any,
};
}
// ─── Types ──────────────────────────────────────────────────────────────────
type RuntimeDispatchDecisionSummary = {
action: "dispatch" | "retry" | "notify" | "block" | "skip";
reasonCode:
| "no-runtime-record"
| "queued"
| "retry-budget-available"
| "terminal-ready-to-notify"
| "retry-budget-exhausted"
| "synthetic-reset-required"
| "already-notified"
| "active-or-claimed"
| "notified"
| "terminal-nonretryable";
retryCount: number;
maxRetries: number;
retryBudgetRemaining: number;
};
type RuntimeUnitSummary = {
unitType: string;
unitId: string;
phase: string;
status: string;
startedAt: number | null;
updatedAt: number | null;
retryCount: number;
maxRetries: number;
retryBudgetRemaining: number;
lastHeartbeatAt: number | null;
lastProgressAt: number | null;
lastOutputAt: number | null;
outputPath: string | null;
watchdogReason: string | null;
notifiedAt: number | null;
dispatchDecision: RuntimeDispatchDecisionSummary;
};
export interface QuerySnapshot {
schemaVersion: 1;
state: SFState;
next: {
action: "dispatch" | "stop" | "skip";
unitType?: string;
unitId?: string;
reason?: string;
};
cost: {
workers: Array<{
milestoneId: string;
pid: number;
state: string;
cost: number;
lastHeartbeat: number;
}>;
total: number;
};
runtime: {
units: RuntimeUnitSummary[];
};
uokDiagnostics?: any;
schedule?: {
pending_count: number;
overdue_count: number;
due: Array<{
id: string;
kind: string;
status: string;
due_at: string;
payload: unknown;
}>;
upcoming: Array<{
id: string;
kind: string;
status: string;
due_at: string;
payload: unknown;
}>;
};
}
export interface QueryResult {
exitCode: number;
data?: QuerySnapshot;
}
// ─── Implementation ─────────────────────────────────────────────────────────
function resolveSfRootForQuery(basePath: string): string {
let current = basePath;
while (true) {
const candidate = join(current, ".sf");
if (existsSync(candidate)) return candidate;
const parent = dirname(current);
if (parent === current) return join(basePath, ".sf");
current = parent;
}
}
function pidIsAlive(pid: unknown): boolean {
if (!Number.isInteger(pid) || Number(pid) <= 0) return false;
if (pid === process.pid) return true;
try {
process.kill(Number(pid), 0);
return true;
} catch (err) {
return (err as NodeJS.ErrnoException).code === "EPERM";
}
}
function queryHasLiveAutoLock(basePath: string): boolean {
const lockPath = join(resolveSfRootForQuery(basePath), "auto.lock");
if (!existsSync(lockPath)) return false;
try {
const lock = JSON.parse(readFileSync(lockPath, "utf-8")) as Record<
string,
unknown
>;
return pidIsAlive(lock.pid);
} catch {
return false;
}
}
function readRuntimeUnitSummaries(
basePath: string,
uokRuntime: {
listUnitRuntimeRecords: (basePath: string) => any[];
getUnitRuntimeState: (record: any) => any;
decideUnitRuntimeDispatch: (record: any, opts?: any) => any;
isTerminalUnitRuntimeStatus: (status: string) => boolean;
},
): RuntimeUnitSummary[] {
const hasLiveAutoLock = queryHasLiveAutoLock(basePath);
const records = uokRuntime.listUnitRuntimeRecords(basePath);
const results: RuntimeUnitSummary[] = [];
for (const record of records) {
const unitType = String(record.unitType ?? "");
const unitId = String(record.unitId ?? "");
if (!unitType || !unitId) continue;
const state = uokRuntime.getUnitRuntimeState(record);
let status = state.status;
if (!hasLiveAutoLock && !uokRuntime.isTerminalUnitRuntimeStatus(status)) {
status = "stale";
}
const decision = uokRuntime.decideUnitRuntimeDispatch(record);
results.push({
unitType,
unitId,
phase: String(record.phase ?? "dispatched"),
status,
startedAt: typeof record.startedAt === "number" ? record.startedAt : null,
updatedAt: typeof record.updatedAt === "number" ? record.updatedAt : null,
retryCount: state.retryCount,
maxRetries: state.maxRetries,
retryBudgetRemaining: decision.retryBudgetRemaining,
lastHeartbeatAt: state.lastHeartbeatAt,
lastProgressAt: state.lastProgressAt ?? null,
lastOutputAt: state.lastOutputAt,
outputPath: state.outputPath,
watchdogReason: state.watchdogReason,
notifiedAt: state.notifiedAt,
dispatchDecision: decision,
});
}
return results;
}
export async function buildQuerySnapshot(
basePath: string,
): Promise<QuerySnapshot> {
const {
openProjectDbIfPresent,
deriveState,
resolveDispatch,
readAllSessionStatuses,
loadEffectiveSFPreferences,
listUnitRuntimeRecords,
getUnitRuntimeState,
decideUnitRuntimeDispatch,
isTerminalUnitRuntimeStatus,
writeUokDiagnostics,
} = await loadExtensionModules();
await openProjectDbIfPresent(basePath);
const state = await deriveState(basePath);
// Derive next dispatch action
let next: QuerySnapshot["next"];
if (!state.activeMilestone?.id) {
next = {
action: "stop",
reason:
state.phase === "complete"
? "All milestones complete."
: state.nextAction,
};
} else {
const loaded = loadEffectiveSFPreferences();
const dispatch = await resolveDispatch({
basePath,
mid: state.activeMilestone.id,
midTitle: state.activeMilestone.title,
state,
prefs: loaded?.preferences,
});
next = {
action: dispatch.action,
unitType: dispatch.action === "dispatch" ? dispatch.unitType : undefined,
unitId: dispatch.action === "dispatch" ? dispatch.unitId : undefined,
reason: dispatch.action === "stop" ? dispatch.reason : undefined,
};
}
// Aggregate parallel worker costs
const statuses = readAllSessionStatuses(basePath);
const workers = statuses.map((s) => ({
milestoneId: s.milestoneId,
pid: s.pid,
state: s.state,
cost: s.cost,
lastHeartbeat: s.lastHeartbeat,
}));
// Load schedule entries
let scheduleEntries: QuerySnapshot["schedule"] = {
pending_count: 0,
overdue_count: 0,
due: [],
upcoming: [],
};
try {
const { createScheduleStore } = await import(
"./resources/extensions/sf/schedule/schedule-store.js"
);
const store = createScheduleStore(basePath);
const now = new Date();
const all = store.readEntries("project");
const pending = all.filter((e) => e.status === "pending");
const nowMs = now.getTime();
const overdue = pending.filter((e) => Date.parse(e.due_at) <= nowMs);
const mapEntry = (e: {
id: string;
kind: string;
status: string;
due_at: string;
payload: unknown;
}) => ({
id: e.id,
kind: e.kind,
status: e.status,
due_at: e.due_at,
payload: e.payload,
});
scheduleEntries = {
pending_count: pending.length,
overdue_count: overdue.length,
due: store.findDue("project", now).map(mapEntry),
upcoming: store.findUpcoming("project", now, 7).map(mapEntry),
};
} catch {
// Non-fatal — schedule data is best-effort in query output.
}
const snapshot: QuerySnapshot = {
schemaVersion: 1,
state,
next,
cost: { workers, total: workers.reduce((sum, w) => sum + w.cost, 0) },
runtime: {
units: readRuntimeUnitSummaries(basePath, {
listUnitRuntimeRecords,
getUnitRuntimeState,
decideUnitRuntimeDispatch,
isTerminalUnitRuntimeStatus,
}),
},
uokDiagnostics: writeUokDiagnostics(basePath, { expectedNext: next }),
schedule: scheduleEntries,
};
return snapshot;
}
export async function handleQuery(basePath: string): Promise<QueryResult> {
const snapshot = await buildQuerySnapshot(basePath);
process.stdout.write(JSON.stringify(snapshot) + "\n");
return { exitCode: 0, data: snapshot };
}