singularity-forge/src/headless-query.ts
2026-05-02 22:07:10 +02:00

405 lines
11 KiB
TypeScript

/**
* Headless Query — `sf headless query`
*
* Single read-only command that returns the full project snapshot as JSON
* to stdout, without spawning an LLM session. Instant (~50ms).
*
* Output: { schemaVersion, state, next, cost }
* schemaVersion — output contract version
* state — deriveState() output (phase, milestones, progress, blockers)
* next — dry-run dispatch preview (what auto-mode would do next)
* cost — aggregated parallel worker costs
*
* Note: Extension modules are .ts files loaded via jiti (not compiled to .js).
* We use createJiti() here because this module is imported directly from cli.ts,
* bypassing the extension loader's jiti setup (#1137).
*/
import { existsSync, readdirSync, readFileSync } from "node:fs";
import { homedir } from "node:os";
import { dirname, join } from "node:path";
import { createJiti } from "@mariozechner/jiti";
import { resolveBundledSourceResource } from "./bundled-resource-path.js";
import type { SFState } from "./resources/extensions/sf/types.js";
const jiti = createJiti(import.meta.filename, {
interopDefault: true,
debug: false,
});
// Resolve extensions from the synced agent directory so headless-query
// loads the same extension copy as interactive/auto modes (#3471).
// The synced runtime is compiled .js; source-tree fallback is .ts.
const agentExtensionsDir = join(
process.env.SF_AGENT_DIR || join(homedir(), ".sf", "agent"),
"extensions",
"sf",
);
const useAgentDir = existsSync(join(agentExtensionsDir, "state.js"));
const sfExtensionPath = (moduleName: string) =>
useAgentDir
? join(agentExtensionsDir, `${moduleName}.js`)
: resolveBundledSourceResource(
import.meta.url,
"extensions",
"sf",
`${moduleName}.ts`,
);
async function loadExtensionModules() {
const stateModule = (await jiti.import(sfExtensionPath("state"), {})) as any;
const dispatchModule = (await jiti.import(
sfExtensionPath("auto-dispatch"),
{},
)) as any;
const sessionModule = (await jiti.import(
sfExtensionPath("session-status-io"),
{},
)) as any;
const prefsModule = (await jiti.import(
sfExtensionPath("preferences"),
{},
)) as any;
const autoStartModule = (await jiti.import(
sfExtensionPath("auto-start"),
{},
)) as any;
return {
openProjectDbIfPresent: autoStartModule.openProjectDbIfPresent as (
basePath: string,
) => Promise<void>,
deriveState: stateModule.deriveState as (
basePath: string,
) => Promise<SFState>,
resolveDispatch: dispatchModule.resolveDispatch as (
opts: any,
) => Promise<any>,
readAllSessionStatuses: sessionModule.readAllSessionStatuses as (
basePath: string,
) => any[],
loadEffectiveSFPreferences:
prefsModule.loadEffectiveSFPreferences as () => any,
};
}
// ─── Types ──────────────────────────────────────────────────────────────────
type RuntimeDispatchDecisionSummary = {
action: "dispatch" | "retry" | "notify" | "block" | "skip";
reasonCode:
| "no-runtime-record"
| "queued"
| "retry-budget-available"
| "terminal-ready-to-notify"
| "retry-budget-exhausted"
| "synthetic-reset-required"
| "already-notified"
| "active-or-claimed"
| "notified"
| "terminal-nonretryable";
retryCount: number;
maxRetries: number;
retryBudgetRemaining: number;
};
type RuntimeUnitSummary = {
unitType: string;
unitId: string;
phase: string;
status: string;
startedAt: number | null;
updatedAt: number | null;
retryCount: number;
maxRetries: number;
retryBudgetRemaining: number;
lastHeartbeatAt: number | null;
lastProgressAt: number | null;
lastOutputAt: number | null;
outputPath: string | null;
watchdogReason: string | null;
notifiedAt: number | null;
dispatchDecision: RuntimeDispatchDecisionSummary;
};
export interface QuerySnapshot {
schemaVersion: 1;
state: SFState;
next: {
action: "dispatch" | "stop" | "skip";
unitType?: string;
unitId?: string;
reason?: string;
};
cost: {
workers: Array<{
milestoneId: string;
pid: number;
state: string;
cost: number;
lastHeartbeat: number;
}>;
total: number;
};
runtime: {
units: RuntimeUnitSummary[];
};
}
export interface QueryResult {
exitCode: number;
data?: QuerySnapshot;
}
// ─── Implementation ─────────────────────────────────────────────────────────
const QUERY_TERMINAL_STATUSES = new Set([
"completed",
"failed",
"blocked",
"cancelled",
"stale",
"runaway-recovered",
]);
const QUERY_RETRYABLE_TERMINAL_STATUSES = new Set([
"failed",
"stale",
"runaway-recovered",
]);
const DEFAULT_QUERY_MAX_RETRIES = 1;
function resolveSfRootForQuery(basePath: string): string {
let current = basePath;
while (true) {
const candidate = join(current, ".sf");
if (existsSync(candidate)) return candidate;
const parent = dirname(current);
if (parent === current) return join(basePath, ".sf");
current = parent;
}
}
function stringField(value: unknown, fallback = ""): string {
return typeof value === "string" ? value : fallback;
}
function numberField(value: unknown): number | null {
return typeof value === "number" && Number.isFinite(value) ? value : null;
}
function inferQueryStatus(
phase: string,
record: Record<string, unknown>,
): string {
switch (phase) {
case "queued":
case "claimed":
case "running":
case "progress":
case "completed":
case "failed":
case "blocked":
case "cancelled":
case "stale":
case "runaway-recovered":
case "notified":
return phase;
case "dispatched":
return "running";
case "wrapup-warning-sent":
case "runaway-warning-sent":
case "runaway-final-warning-sent":
case "recovered":
return "progress";
case "timeout":
return "stale";
case "finalized":
return "completed";
case "paused":
return record.runawayGuardPause ? "runaway-recovered" : "blocked";
case "skipped":
return "blocked";
default:
return "running";
}
}
function queryRuntimeDecision(input: {
unitType: string;
unitId: string;
status: string;
retryCount: number;
maxRetries: number;
notifiedAt: number | null;
}): RuntimeDispatchDecisionSummary {
const retryBudgetRemaining = Math.max(0, input.maxRetries - input.retryCount);
const common = {
retryCount: input.retryCount,
maxRetries: input.maxRetries,
retryBudgetRemaining,
};
if (input.notifiedAt !== null) {
return { action: "skip", reasonCode: "already-notified", ...common };
}
if (input.status === "notified") {
return { action: "skip", reasonCode: "notified", ...common };
}
if (input.status === "queued") {
return { action: "dispatch", reasonCode: "queued", ...common };
}
if (!QUERY_TERMINAL_STATUSES.has(input.status)) {
return { action: "skip", reasonCode: "active-or-claimed", ...common };
}
const synthetic =
input.unitType === "synthetic" ||
input.unitId.includes("parallel-research");
if (synthetic && input.status !== "completed") {
return {
action: "block",
reasonCode: "synthetic-reset-required",
...common,
};
}
if (QUERY_RETRYABLE_TERMINAL_STATUSES.has(input.status)) {
return retryBudgetRemaining > 0
? { action: "retry", reasonCode: "retry-budget-available", ...common }
: { action: "block", reasonCode: "retry-budget-exhausted", ...common };
}
if (
input.status === "completed" ||
input.status === "blocked" ||
input.status === "cancelled"
) {
return {
action: "notify",
reasonCode: "terminal-ready-to-notify",
...common,
};
}
return { action: "skip", reasonCode: "terminal-nonretryable", ...common };
}
function readRuntimeUnitSummaries(basePath: string): RuntimeUnitSummary[] {
const unitsDir = join(resolveSfRootForQuery(basePath), "runtime", "units");
if (!existsSync(unitsDir)) return [];
const results: RuntimeUnitSummary[] = [];
for (const file of readdirSync(unitsDir)) {
if (!file.endsWith(".json")) continue;
try {
const record = JSON.parse(
readFileSync(join(unitsDir, file), "utf-8"),
) as Record<string, unknown>;
const unitType = stringField(record.unitType);
const unitId = stringField(record.unitId);
if (!unitType || !unitId) continue;
const phase = stringField(record.phase, "dispatched");
const status = stringField(
record.status,
inferQueryStatus(phase, record),
);
const recoveryAttempts = numberField(record.recoveryAttempts) ?? 0;
const retryCount = numberField(record.retryCount) ?? recoveryAttempts;
const maxRetries =
numberField(record.maxRetries) ?? DEFAULT_QUERY_MAX_RETRIES;
const notifiedAt = numberField(record.notifiedAt);
const dispatchDecision = queryRuntimeDecision({
unitType,
unitId,
status,
retryCount,
maxRetries,
notifiedAt,
});
results.push({
unitType,
unitId,
phase,
status,
startedAt: numberField(record.startedAt),
updatedAt: numberField(record.updatedAt),
retryCount,
maxRetries,
retryBudgetRemaining: dispatchDecision.retryBudgetRemaining,
lastHeartbeatAt: numberField(record.lastHeartbeatAt),
lastProgressAt: numberField(record.lastProgressAt),
lastOutputAt: numberField(record.lastOutputAt),
outputPath:
typeof record.outputPath === "string" ? record.outputPath : null,
watchdogReason:
typeof record.watchdogReason === "string"
? record.watchdogReason
: null,
notifiedAt,
dispatchDecision,
});
} catch {
// Runtime query must stay best-effort; malformed unit files are ignored.
}
}
return results;
}
export async function buildQuerySnapshot(
basePath: string,
): Promise<QuerySnapshot> {
const {
openProjectDbIfPresent,
deriveState,
resolveDispatch,
readAllSessionStatuses,
loadEffectiveSFPreferences,
} = await loadExtensionModules();
await openProjectDbIfPresent(basePath);
const state = await deriveState(basePath);
// Derive next dispatch action
let next: QuerySnapshot["next"];
if (!state.activeMilestone?.id) {
next = {
action: "stop",
reason:
state.phase === "complete"
? "All milestones complete."
: state.nextAction,
};
} else {
const loaded = loadEffectiveSFPreferences();
const dispatch = await resolveDispatch({
basePath,
mid: state.activeMilestone.id,
midTitle: state.activeMilestone.title,
state,
prefs: loaded?.preferences,
});
next = {
action: dispatch.action,
unitType: dispatch.action === "dispatch" ? dispatch.unitType : undefined,
unitId: dispatch.action === "dispatch" ? dispatch.unitId : undefined,
reason: dispatch.action === "stop" ? dispatch.reason : undefined,
};
}
// Aggregate parallel worker costs
const statuses = readAllSessionStatuses(basePath);
const workers = statuses.map((s) => ({
milestoneId: s.milestoneId,
pid: s.pid,
state: s.state,
cost: s.cost,
lastHeartbeat: s.lastHeartbeat,
}));
const snapshot: QuerySnapshot = {
schemaVersion: 1,
state,
next,
cost: { workers, total: workers.reduce((sum, w) => sum + w.cost, 0) },
runtime: { units: readRuntimeUnitSummaries(basePath) },
};
return snapshot;
}
export async function handleQuery(basePath: string): Promise<QueryResult> {
const snapshot = await buildQuerySnapshot(basePath);
process.stdout.write(JSON.stringify(snapshot) + "\n");
return { exitCode: 0, data: snapshot };
}