405 lines
11 KiB
TypeScript
405 lines
11 KiB
TypeScript
/**
|
|
* Headless Query — `sf headless query`
|
|
*
|
|
* Single read-only command that returns the full project snapshot as JSON
|
|
* to stdout, without spawning an LLM session. Instant (~50ms).
|
|
*
|
|
* Output: { schemaVersion, state, next, cost }
|
|
* schemaVersion — output contract version
|
|
* state — deriveState() output (phase, milestones, progress, blockers)
|
|
* next — dry-run dispatch preview (what auto-mode would do next)
|
|
* cost — aggregated parallel worker costs
|
|
*
|
|
* Note: Extension modules are .ts files loaded via jiti (not compiled to .js).
|
|
* We use createJiti() here because this module is imported directly from cli.ts,
|
|
* bypassing the extension loader's jiti setup (#1137).
|
|
*/
|
|
|
|
import { existsSync, readdirSync, readFileSync } from "node:fs";
|
|
import { homedir } from "node:os";
|
|
import { dirname, join } from "node:path";
|
|
import { createJiti } from "@mariozechner/jiti";
|
|
import { resolveBundledSourceResource } from "./bundled-resource-path.js";
|
|
import type { SFState } from "./resources/extensions/sf/types.js";
|
|
|
|
const jiti = createJiti(import.meta.filename, {
|
|
interopDefault: true,
|
|
debug: false,
|
|
});
|
|
// Resolve extensions from the synced agent directory so headless-query
|
|
// loads the same extension copy as interactive/auto modes (#3471).
|
|
// The synced runtime is compiled .js; source-tree fallback is .ts.
|
|
const agentExtensionsDir = join(
|
|
process.env.SF_AGENT_DIR || join(homedir(), ".sf", "agent"),
|
|
"extensions",
|
|
"sf",
|
|
);
|
|
const useAgentDir = existsSync(join(agentExtensionsDir, "state.js"));
|
|
const sfExtensionPath = (moduleName: string) =>
|
|
useAgentDir
|
|
? join(agentExtensionsDir, `${moduleName}.js`)
|
|
: resolveBundledSourceResource(
|
|
import.meta.url,
|
|
"extensions",
|
|
"sf",
|
|
`${moduleName}.ts`,
|
|
);
|
|
|
|
async function loadExtensionModules() {
|
|
const stateModule = (await jiti.import(sfExtensionPath("state"), {})) as any;
|
|
const dispatchModule = (await jiti.import(
|
|
sfExtensionPath("auto-dispatch"),
|
|
{},
|
|
)) as any;
|
|
const sessionModule = (await jiti.import(
|
|
sfExtensionPath("session-status-io"),
|
|
{},
|
|
)) as any;
|
|
const prefsModule = (await jiti.import(
|
|
sfExtensionPath("preferences"),
|
|
{},
|
|
)) as any;
|
|
const autoStartModule = (await jiti.import(
|
|
sfExtensionPath("auto-start"),
|
|
{},
|
|
)) as any;
|
|
return {
|
|
openProjectDbIfPresent: autoStartModule.openProjectDbIfPresent as (
|
|
basePath: string,
|
|
) => Promise<void>,
|
|
deriveState: stateModule.deriveState as (
|
|
basePath: string,
|
|
) => Promise<SFState>,
|
|
resolveDispatch: dispatchModule.resolveDispatch as (
|
|
opts: any,
|
|
) => Promise<any>,
|
|
readAllSessionStatuses: sessionModule.readAllSessionStatuses as (
|
|
basePath: string,
|
|
) => any[],
|
|
loadEffectiveSFPreferences:
|
|
prefsModule.loadEffectiveSFPreferences as () => any,
|
|
};
|
|
}
|
|
|
|
// ─── Types ──────────────────────────────────────────────────────────────────
|
|
|
|
type RuntimeDispatchDecisionSummary = {
|
|
action: "dispatch" | "retry" | "notify" | "block" | "skip";
|
|
reasonCode:
|
|
| "no-runtime-record"
|
|
| "queued"
|
|
| "retry-budget-available"
|
|
| "terminal-ready-to-notify"
|
|
| "retry-budget-exhausted"
|
|
| "synthetic-reset-required"
|
|
| "already-notified"
|
|
| "active-or-claimed"
|
|
| "notified"
|
|
| "terminal-nonretryable";
|
|
retryCount: number;
|
|
maxRetries: number;
|
|
retryBudgetRemaining: number;
|
|
};
|
|
|
|
type RuntimeUnitSummary = {
|
|
unitType: string;
|
|
unitId: string;
|
|
phase: string;
|
|
status: string;
|
|
startedAt: number | null;
|
|
updatedAt: number | null;
|
|
retryCount: number;
|
|
maxRetries: number;
|
|
retryBudgetRemaining: number;
|
|
lastHeartbeatAt: number | null;
|
|
lastProgressAt: number | null;
|
|
lastOutputAt: number | null;
|
|
outputPath: string | null;
|
|
watchdogReason: string | null;
|
|
notifiedAt: number | null;
|
|
dispatchDecision: RuntimeDispatchDecisionSummary;
|
|
};
|
|
|
|
export interface QuerySnapshot {
|
|
schemaVersion: 1;
|
|
state: SFState;
|
|
next: {
|
|
action: "dispatch" | "stop" | "skip";
|
|
unitType?: string;
|
|
unitId?: string;
|
|
reason?: string;
|
|
};
|
|
cost: {
|
|
workers: Array<{
|
|
milestoneId: string;
|
|
pid: number;
|
|
state: string;
|
|
cost: number;
|
|
lastHeartbeat: number;
|
|
}>;
|
|
total: number;
|
|
};
|
|
runtime: {
|
|
units: RuntimeUnitSummary[];
|
|
};
|
|
}
|
|
|
|
export interface QueryResult {
|
|
exitCode: number;
|
|
data?: QuerySnapshot;
|
|
}
|
|
|
|
// ─── Implementation ─────────────────────────────────────────────────────────
|
|
|
|
const QUERY_TERMINAL_STATUSES = new Set([
|
|
"completed",
|
|
"failed",
|
|
"blocked",
|
|
"cancelled",
|
|
"stale",
|
|
"runaway-recovered",
|
|
]);
|
|
const QUERY_RETRYABLE_TERMINAL_STATUSES = new Set([
|
|
"failed",
|
|
"stale",
|
|
"runaway-recovered",
|
|
]);
|
|
const DEFAULT_QUERY_MAX_RETRIES = 1;
|
|
|
|
function resolveSfRootForQuery(basePath: string): string {
|
|
let current = basePath;
|
|
while (true) {
|
|
const candidate = join(current, ".sf");
|
|
if (existsSync(candidate)) return candidate;
|
|
const parent = dirname(current);
|
|
if (parent === current) return join(basePath, ".sf");
|
|
current = parent;
|
|
}
|
|
}
|
|
|
|
function stringField(value: unknown, fallback = ""): string {
|
|
return typeof value === "string" ? value : fallback;
|
|
}
|
|
|
|
function numberField(value: unknown): number | null {
|
|
return typeof value === "number" && Number.isFinite(value) ? value : null;
|
|
}
|
|
|
|
function inferQueryStatus(
|
|
phase: string,
|
|
record: Record<string, unknown>,
|
|
): string {
|
|
switch (phase) {
|
|
case "queued":
|
|
case "claimed":
|
|
case "running":
|
|
case "progress":
|
|
case "completed":
|
|
case "failed":
|
|
case "blocked":
|
|
case "cancelled":
|
|
case "stale":
|
|
case "runaway-recovered":
|
|
case "notified":
|
|
return phase;
|
|
case "dispatched":
|
|
return "running";
|
|
case "wrapup-warning-sent":
|
|
case "runaway-warning-sent":
|
|
case "runaway-final-warning-sent":
|
|
case "recovered":
|
|
return "progress";
|
|
case "timeout":
|
|
return "stale";
|
|
case "finalized":
|
|
return "completed";
|
|
case "paused":
|
|
return record.runawayGuardPause ? "runaway-recovered" : "blocked";
|
|
case "skipped":
|
|
return "blocked";
|
|
default:
|
|
return "running";
|
|
}
|
|
}
|
|
|
|
function queryRuntimeDecision(input: {
|
|
unitType: string;
|
|
unitId: string;
|
|
status: string;
|
|
retryCount: number;
|
|
maxRetries: number;
|
|
notifiedAt: number | null;
|
|
}): RuntimeDispatchDecisionSummary {
|
|
const retryBudgetRemaining = Math.max(0, input.maxRetries - input.retryCount);
|
|
const common = {
|
|
retryCount: input.retryCount,
|
|
maxRetries: input.maxRetries,
|
|
retryBudgetRemaining,
|
|
};
|
|
if (input.notifiedAt !== null) {
|
|
return { action: "skip", reasonCode: "already-notified", ...common };
|
|
}
|
|
if (input.status === "notified") {
|
|
return { action: "skip", reasonCode: "notified", ...common };
|
|
}
|
|
if (input.status === "queued") {
|
|
return { action: "dispatch", reasonCode: "queued", ...common };
|
|
}
|
|
if (!QUERY_TERMINAL_STATUSES.has(input.status)) {
|
|
return { action: "skip", reasonCode: "active-or-claimed", ...common };
|
|
}
|
|
const synthetic =
|
|
input.unitType === "synthetic" ||
|
|
input.unitId.includes("parallel-research");
|
|
if (synthetic && input.status !== "completed") {
|
|
return {
|
|
action: "block",
|
|
reasonCode: "synthetic-reset-required",
|
|
...common,
|
|
};
|
|
}
|
|
if (QUERY_RETRYABLE_TERMINAL_STATUSES.has(input.status)) {
|
|
return retryBudgetRemaining > 0
|
|
? { action: "retry", reasonCode: "retry-budget-available", ...common }
|
|
: { action: "block", reasonCode: "retry-budget-exhausted", ...common };
|
|
}
|
|
if (
|
|
input.status === "completed" ||
|
|
input.status === "blocked" ||
|
|
input.status === "cancelled"
|
|
) {
|
|
return {
|
|
action: "notify",
|
|
reasonCode: "terminal-ready-to-notify",
|
|
...common,
|
|
};
|
|
}
|
|
return { action: "skip", reasonCode: "terminal-nonretryable", ...common };
|
|
}
|
|
|
|
function readRuntimeUnitSummaries(basePath: string): RuntimeUnitSummary[] {
|
|
const unitsDir = join(resolveSfRootForQuery(basePath), "runtime", "units");
|
|
if (!existsSync(unitsDir)) return [];
|
|
const results: RuntimeUnitSummary[] = [];
|
|
for (const file of readdirSync(unitsDir)) {
|
|
if (!file.endsWith(".json")) continue;
|
|
try {
|
|
const record = JSON.parse(
|
|
readFileSync(join(unitsDir, file), "utf-8"),
|
|
) as Record<string, unknown>;
|
|
const unitType = stringField(record.unitType);
|
|
const unitId = stringField(record.unitId);
|
|
if (!unitType || !unitId) continue;
|
|
const phase = stringField(record.phase, "dispatched");
|
|
const status = stringField(
|
|
record.status,
|
|
inferQueryStatus(phase, record),
|
|
);
|
|
const recoveryAttempts = numberField(record.recoveryAttempts) ?? 0;
|
|
const retryCount = numberField(record.retryCount) ?? recoveryAttempts;
|
|
const maxRetries =
|
|
numberField(record.maxRetries) ?? DEFAULT_QUERY_MAX_RETRIES;
|
|
const notifiedAt = numberField(record.notifiedAt);
|
|
const dispatchDecision = queryRuntimeDecision({
|
|
unitType,
|
|
unitId,
|
|
status,
|
|
retryCount,
|
|
maxRetries,
|
|
notifiedAt,
|
|
});
|
|
results.push({
|
|
unitType,
|
|
unitId,
|
|
phase,
|
|
status,
|
|
startedAt: numberField(record.startedAt),
|
|
updatedAt: numberField(record.updatedAt),
|
|
retryCount,
|
|
maxRetries,
|
|
retryBudgetRemaining: dispatchDecision.retryBudgetRemaining,
|
|
lastHeartbeatAt: numberField(record.lastHeartbeatAt),
|
|
lastProgressAt: numberField(record.lastProgressAt),
|
|
lastOutputAt: numberField(record.lastOutputAt),
|
|
outputPath:
|
|
typeof record.outputPath === "string" ? record.outputPath : null,
|
|
watchdogReason:
|
|
typeof record.watchdogReason === "string"
|
|
? record.watchdogReason
|
|
: null,
|
|
notifiedAt,
|
|
dispatchDecision,
|
|
});
|
|
} catch {
|
|
// Runtime query must stay best-effort; malformed unit files are ignored.
|
|
}
|
|
}
|
|
return results;
|
|
}
|
|
|
|
export async function buildQuerySnapshot(
|
|
basePath: string,
|
|
): Promise<QuerySnapshot> {
|
|
const {
|
|
openProjectDbIfPresent,
|
|
deriveState,
|
|
resolveDispatch,
|
|
readAllSessionStatuses,
|
|
loadEffectiveSFPreferences,
|
|
} = await loadExtensionModules();
|
|
await openProjectDbIfPresent(basePath);
|
|
const state = await deriveState(basePath);
|
|
|
|
// Derive next dispatch action
|
|
let next: QuerySnapshot["next"];
|
|
if (!state.activeMilestone?.id) {
|
|
next = {
|
|
action: "stop",
|
|
reason:
|
|
state.phase === "complete"
|
|
? "All milestones complete."
|
|
: state.nextAction,
|
|
};
|
|
} else {
|
|
const loaded = loadEffectiveSFPreferences();
|
|
const dispatch = await resolveDispatch({
|
|
basePath,
|
|
mid: state.activeMilestone.id,
|
|
midTitle: state.activeMilestone.title,
|
|
state,
|
|
prefs: loaded?.preferences,
|
|
});
|
|
next = {
|
|
action: dispatch.action,
|
|
unitType: dispatch.action === "dispatch" ? dispatch.unitType : undefined,
|
|
unitId: dispatch.action === "dispatch" ? dispatch.unitId : undefined,
|
|
reason: dispatch.action === "stop" ? dispatch.reason : undefined,
|
|
};
|
|
}
|
|
|
|
// Aggregate parallel worker costs
|
|
const statuses = readAllSessionStatuses(basePath);
|
|
const workers = statuses.map((s) => ({
|
|
milestoneId: s.milestoneId,
|
|
pid: s.pid,
|
|
state: s.state,
|
|
cost: s.cost,
|
|
lastHeartbeat: s.lastHeartbeat,
|
|
}));
|
|
|
|
const snapshot: QuerySnapshot = {
|
|
schemaVersion: 1,
|
|
state,
|
|
next,
|
|
cost: { workers, total: workers.reduce((sum, w) => sum + w.cost, 0) },
|
|
runtime: { units: readRuntimeUnitSummaries(basePath) },
|
|
};
|
|
|
|
return snapshot;
|
|
}
|
|
|
|
export async function handleQuery(basePath: string): Promise<QueryResult> {
|
|
const snapshot = await buildQuerySnapshot(basePath);
|
|
process.stdout.write(JSON.stringify(snapshot) + "\n");
|
|
return { exitCode: 0, data: snapshot };
|
|
}
|