singularity-forge/packages/daemon/src/session-manager.ts

682 lines
19 KiB
TypeScript

/**
* SessionManager — manages RpcClient lifecycle for daemon-driven SF execution.
*
* Extends EventEmitter to emit typed session lifecycle events.
* One active session per projectDir. Tracks events in a ring buffer,
* detects blockers, tracks terminal state, and accumulates cost using
* the cumulative-max pattern (K004).
*
* Purpose: provide daemon-owned session tracking without exposing SF workflows
* through an MCP server.
*/
import { execSync } from "node:child_process";
import { EventEmitter } from "node:events";
import { basename, resolve } from "node:path";
import type {
RpcCostUpdateEvent,
RpcExtensionUIRequest,
RpcInitResult,
SdkAgentEvent,
} from "@singularity-forge/rpc-client";
import { RpcClient } from "@singularity-forge/rpc-client";
import type { Logger } from "./logger.js";
import type {
ManagedSession,
PendingBlocker,
RuntimeHeartbeat,
StartSessionOptions,
} from "./types.js";
import { INIT_TIMEOUT_MS, MAX_EVENTS } from "./types.js";
// ---------------------------------------------------------------------------
// Inlined detection logic (from headless-events.ts — no internal package imports)
// ---------------------------------------------------------------------------
const FIRE_AND_FORGET_METHODS = new Set([
"notify",
"setStatus",
"setWidget",
"setTitle",
"set_editor_text",
]);
const TERMINAL_PREFIXES = [
"autonomous mode stopped",
"autonomous mode paused",
"assisted mode stopped",
"assisted mode paused",
];
const RELOAD_PAUSE_TIMEOUT_MS = 5_000;
function isTerminalNotification(event: Record<string, unknown>): boolean {
if (event.type !== "extension_ui_request" || event.method !== "notify")
return false;
const message = String(event.message ?? "").toLowerCase();
return TERMINAL_PREFIXES.some((prefix) => message.startsWith(prefix));
}
function isBlockedNotification(event: Record<string, unknown>): boolean {
if (event.type !== "extension_ui_request" || event.method !== "notify")
return false;
const message = String(event.message ?? "").toLowerCase();
return (
message.includes("blocked:") ||
message.startsWith("autonomous mode paused") ||
message.startsWith("assisted mode paused")
);
}
function isBlockingUIRequest(event: Record<string, unknown>): boolean {
if (event.type !== "extension_ui_request") return false;
const method = String(event.method ?? "");
return !FIRE_AND_FORGET_METHODS.has(method);
}
// ---------------------------------------------------------------------------
// SessionManager
// ---------------------------------------------------------------------------
export class SessionManager extends EventEmitter {
/** Sessions keyed by resolved projectDir for duplicate-start prevention */
private sessions = new Map<string, ManagedSession>();
constructor(private readonly logger: Logger) {
super();
}
/**
* Start a new SF autonomous mode session for the given project directory.
*
* Rejects if a session already exists for this projectDir.
* Creates an RpcClient, starts the process, performs the v2 init handshake,
* wires event tracking, and sends '/sf autonomous' to begin execution.
*/
async startSession(options: StartSessionOptions): Promise<string> {
const { projectDir } = options;
if (!projectDir || projectDir.trim() === "") {
throw new Error("projectDir is required and cannot be empty");
}
const resolvedDir = resolve(projectDir);
const projectName = basename(resolvedDir);
const existing = this.sessions.get(resolvedDir);
if (existing) {
throw new Error(
`Session already active for ${resolvedDir} (sessionId: ${existing.sessionId}, status: ${existing.status})`,
);
}
const cliPath = options.cliPath ?? SessionManager.resolveCLIPath();
const args: string[] = ["--mode", "rpc"];
if (options.model) args.push("--model", options.model);
if (options.bare) args.push("--bare");
const client = this.createRpcClient(cliPath, resolvedDir, args);
// Build the session shell before async operations so we can track state
const session: ManagedSession = {
sessionId: "", // filled after init
projectDir: resolvedDir,
projectName,
status: "starting",
reloadState: "running",
client,
events: [],
pendingBlocker: null,
cost: {
totalCost: 0,
tokens: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
},
startTime: Date.now(),
startOptions: { ...options, projectDir: resolvedDir },
};
// Insert into map early (keyed by dir) so concurrent starts are rejected
this.sessions.set(resolvedDir, session);
try {
// Start the process with timeout
await Promise.race([
client.start(),
timeout(
INIT_TIMEOUT_MS,
`RpcClient.start() timed out after ${INIT_TIMEOUT_MS}ms`,
),
]);
// Perform v2 init handshake
const initResult: RpcInitResult = (await Promise.race([
client.init(),
timeout(
INIT_TIMEOUT_MS,
`RpcClient.init() timed out after ${INIT_TIMEOUT_MS}ms`,
),
])) as RpcInitResult;
session.sessionId = initResult.sessionId;
session.status = "running";
// Wire event tracking
session.unsubscribe = client.onEvent((event: SdkAgentEvent) => {
this.handleEvent(session, event);
});
// Kick off autonomous mode
const command = options.command ?? "/sf autonomous";
await client.prompt(command);
this.logger.info("session started", {
sessionId: session.sessionId,
projectDir: resolvedDir,
});
this.emit("session:started", {
sessionId: session.sessionId,
projectDir: resolvedDir,
projectName,
});
return session.sessionId;
} catch (err) {
session.status = "error";
session.error = err instanceof Error ? err.message : String(err);
// Attempt cleanup
try {
await client.stop();
} catch {
/* swallow cleanup errors */
}
this.logger.error("session error", {
sessionId: session.sessionId,
projectDir: resolvedDir,
error: session.error,
});
this.emit("session:error", {
sessionId: session.sessionId,
projectDir: resolvedDir,
projectName,
error: session.error,
});
// Keep session in map so callers can inspect the error
throw new Error(
`Failed to start session for ${resolvedDir}: ${session.error}`,
);
}
}
/**
* Look up a session by sessionId.
* Linear scan is fine — we expect <10 concurrent sessions.
*/
getSession(sessionId: string): ManagedSession | undefined {
for (const session of this.sessions.values()) {
if (session.sessionId === sessionId) return session;
}
return undefined;
}
/**
* Look up a session by project directory (direct map lookup).
*/
getSessionByDir(projectDir: string): ManagedSession | undefined {
return this.sessions.get(resolve(projectDir));
}
/**
* Return all tracked sessions (R035 — cross-project status).
*/
getAllSessions(): ManagedSession[] {
return Array.from(this.sessions.values());
}
/**
* Resolve a pending blocker by sending a UI response.
*/
async resolveBlocker(sessionId: string, response: string): Promise<void> {
const session = this.getSession(sessionId);
if (!session) throw new Error(`Session not found: ${sessionId}`);
if (!session.pendingBlocker)
throw new Error(`No pending blocker for session ${sessionId}`);
const blocker = session.pendingBlocker;
session.client.sendUIResponse(blocker.id, { value: response });
session.pendingBlocker = null;
if (session.status === "blocked") {
session.status = "running";
}
this.logger.info("blocker resolved", {
sessionId,
projectDir: session.projectDir,
blockerId: blocker.id,
blockerMethod: blocker.method,
});
}
/**
* Cancel a running session — abort current operation then stop the process.
*/
async cancelSession(sessionId: string): Promise<void> {
const session = this.getSession(sessionId);
if (!session) throw new Error(`Session not found: ${sessionId}`);
try {
await session.client.abort();
} catch {
/* may already be stopped */
}
try {
await session.client.stop();
} catch {
/* swallow */
}
session.status = "cancelled";
session.unsubscribe?.();
this.logger.info("session cancelled", {
sessionId,
projectDir: session.projectDir,
});
}
/**
* Restart a managed RPC child and resume the same persisted session when possible.
*
* Purpose: make daemon-managed auto sessions pick up changed runtime/source
* files at process boundaries instead of trying unsafe in-process hot reload.
*/
async reloadSession(
sessionId: string,
reason = "runtime epoch changed",
): Promise<void> {
const session = this.getSession(sessionId);
if (!session) throw new Error(`Session not found: ${sessionId}`);
await this.restartSession(session, reason);
}
/**
* Build a HeadlessJsonResult-shaped object from accumulated session state.
*/
getResult(sessionId: string): Record<string, unknown> {
const session = this.getSession(sessionId);
if (!session) throw new Error(`Session not found: ${sessionId}`);
const durationMs = Date.now() - session.startTime;
return {
sessionId: session.sessionId,
projectDir: session.projectDir,
projectName: session.projectName,
status: session.status,
reloadState: session.reloadState ?? "running",
durationMs,
cost: session.cost,
recentEvents: session.events.slice(-10),
lastHeartbeat: session.lastHeartbeat ?? null,
pendingBlocker: session.pendingBlocker
? {
id: session.pendingBlocker.id,
method: session.pendingBlocker.method,
message: session.pendingBlocker.message,
}
: null,
error: session.error ?? null,
};
}
/**
* Stop all active sessions and clean up resources.
*/
async cleanup(): Promise<void> {
const stopPromises: Promise<void>[] = [];
for (const session of this.sessions.values()) {
session.unsubscribe?.();
if (
session.status === "running" ||
session.status === "starting" ||
session.status === "blocked"
) {
stopPromises.push(
session.client.stop().catch(() => {
/* swallow */
}),
);
session.status = "cancelled";
}
}
await Promise.allSettled(stopPromises);
}
/**
* Resolve the SF CLI path.
*
* 1. SF_CLI_PATH env var (highest priority)
* 2. `which sf` → resolve to the actual dist/cli.js
*/
static resolveCLIPath(): string {
const envPath = process.env["SF_CLI_PATH"];
if (envPath) return resolve(envPath);
try {
const sfBin = execSync("which sf", { encoding: "utf-8" }).trim();
if (sfBin) return resolve(sfBin);
} catch {
// which failed
}
throw new Error(
"Cannot find SF CLI. Set SF_CLI_PATH environment variable or ensure `sf` is in PATH.",
);
}
// ---------------------------------------------------------------------------
// Private: Event Handling
// ---------------------------------------------------------------------------
private handleEvent(session: ManagedSession, event: SdkAgentEvent): void {
// Ring buffer: push and trim
session.events.push(event);
if (session.events.length > MAX_EVENTS) {
session.events.splice(0, session.events.length - MAX_EVENTS);
}
// Forward event to listeners
this.logger.debug("session event", {
sessionId: session.sessionId,
type: (event as Record<string, unknown>).type as string,
});
this.emit("session:event", {
sessionId: session.sessionId,
projectDir: session.projectDir,
event,
});
if ((event as Record<string, unknown>).type === "runtime_heartbeat") {
this.handleRuntimeHeartbeat(
session,
event as unknown as RuntimeHeartbeat,
);
}
// Cost tracking (K004 — cumulative-max)
if ((event as Record<string, unknown>).type === "cost_update") {
const costEvent = event as unknown as RpcCostUpdateEvent;
session.cost.totalCost = Math.max(
session.cost.totalCost,
costEvent.cumulativeCost ?? 0,
);
if (costEvent.tokens) {
session.cost.tokens.input = Math.max(
session.cost.tokens.input,
costEvent.tokens.input ?? 0,
);
session.cost.tokens.output = Math.max(
session.cost.tokens.output,
costEvent.tokens.output ?? 0,
);
session.cost.tokens.cacheRead = Math.max(
session.cost.tokens.cacheRead,
costEvent.tokens.cacheRead ?? 0,
);
session.cost.tokens.cacheWrite = Math.max(
session.cost.tokens.cacheWrite,
costEvent.tokens.cacheWrite ?? 0,
);
}
}
// Terminal detection — autonomous mode/assisted mode stopped
if (isTerminalNotification(event as Record<string, unknown>)) {
if (isBlockedNotification(event as Record<string, unknown>)) {
session.status = "blocked";
session.pendingBlocker = extractBlocker(event);
this.logger.info("session blocked", {
sessionId: session.sessionId,
projectDir: session.projectDir,
blockerId: session.pendingBlocker.id,
blockerMethod: session.pendingBlocker.method,
});
this.emit("session:blocked", {
sessionId: session.sessionId,
projectDir: session.projectDir,
projectName: session.projectName,
blocker: session.pendingBlocker,
});
} else {
session.status = "completed";
session.unsubscribe?.();
this.logger.info("session completed", {
sessionId: session.sessionId,
projectDir: session.projectDir,
});
this.emit("session:completed", {
sessionId: session.sessionId,
projectDir: session.projectDir,
projectName: session.projectName,
});
}
return;
}
// Blocker detection — non-fire-and-forget extension_ui_request
if (isBlockingUIRequest(event as Record<string, unknown>)) {
session.status = "blocked";
session.pendingBlocker = extractBlocker(event);
this.logger.info("session blocked", {
sessionId: session.sessionId,
projectDir: session.projectDir,
blockerId: session.pendingBlocker.id,
blockerMethod: session.pendingBlocker.method,
});
this.emit("session:blocked", {
sessionId: session.sessionId,
projectDir: session.projectDir,
projectName: session.projectName,
blocker: session.pendingBlocker,
});
}
}
private handleRuntimeHeartbeat(
session: ManagedSession,
heartbeat: RuntimeHeartbeat,
): void {
session.lastHeartbeat = heartbeat;
if (heartbeat.runtimeEpoch === heartbeat.sourceEpoch) return;
if (session.reloadState === "reloading") return;
if (session.status !== "running" && session.status !== "blocked") return;
this.logger.info("runtime epoch mismatch detected", {
sessionId: session.sessionId,
projectDir: session.projectDir,
unitType: heartbeat.unitType,
unitId: heartbeat.unitId,
runtimeEpoch: heartbeat.runtimeEpoch,
sourceEpoch: heartbeat.sourceEpoch,
});
void this.restartSession(session, "runtime epoch changed").catch((err) => {
session.reloadState = "reload_failed";
session.status = "error";
session.error = err instanceof Error ? err.message : String(err);
this.logger.error("session reload failed", {
sessionId: session.sessionId,
projectDir: session.projectDir,
error: session.error,
});
this.emit("session:error", {
sessionId: session.sessionId,
projectDir: session.projectDir,
projectName: session.projectName,
error: session.error,
});
});
}
private async restartSession(
session: ManagedSession,
reason: string,
): Promise<void> {
if (session.reloadState === "reloading") return;
session.reloadState = "reloading";
let sessionFile = session.lastHeartbeat?.sessionFile;
try {
const state = await session.client.getState();
sessionFile = state.sessionFile ?? sessionFile;
} catch {
// Best effort: a wedged child may not answer state requests.
}
try {
await session.client.prompt("/sf pause");
await waitFor(
() =>
session.status === "blocked" ||
session.status === "completed" ||
session.status === "cancelled",
RELOAD_PAUSE_TIMEOUT_MS,
);
} catch {
// Timeout or prompt failure: stop() escalates SIGTERM to SIGKILL.
}
session.unsubscribe?.();
try {
await session.client.stop();
} catch {
// stop() is best-effort; subsequent start creates a new child.
}
const opts = session.startOptions ?? { projectDir: session.projectDir };
const cliPath = opts.cliPath ?? SessionManager.resolveCLIPath();
const args: string[] = ["--mode", "rpc"];
if (opts.model) args.push("--model", opts.model);
if (opts.bare) args.push("--bare");
const client = this.createRpcClient(cliPath, session.projectDir, args);
await Promise.race([
client.start(),
timeout(
INIT_TIMEOUT_MS,
`RpcClient.start() timed out after ${INIT_TIMEOUT_MS}ms`,
),
]);
const initResult: RpcInitResult = (await Promise.race([
client.init(),
timeout(
INIT_TIMEOUT_MS,
`RpcClient.init() timed out after ${INIT_TIMEOUT_MS}ms`,
),
])) as RpcInitResult;
session.client = client;
session.sessionId = initResult.sessionId;
session.status = "running";
session.pendingBlocker = null;
session.reloadState = "restarted";
session.error = undefined;
session.startOptions = { ...opts, projectDir: session.projectDir };
session.unsubscribe = client.onEvent((event: SdkAgentEvent) => {
this.handleEvent(session, event);
});
if (sessionFile) {
try {
await client.switchSession(sessionFile);
} catch (err) {
this.logger.warn(
"session reload could not switch to previous session file",
{
sessionId: session.sessionId,
projectDir: session.projectDir,
sessionFile,
error: err instanceof Error ? err.message : String(err),
},
);
}
}
await client.prompt(opts.command ?? "/sf autonomous");
session.reloadState = "running";
this.logger.info("session reloaded", {
sessionId: session.sessionId,
projectDir: session.projectDir,
reason,
resumedSessionFile: sessionFile,
});
this.emit("session:restarted", {
sessionId: session.sessionId,
projectDir: session.projectDir,
projectName: session.projectName,
reason,
sessionFile,
});
}
protected createRpcClient(
cliPath: string,
cwd: string,
args: string[],
): RpcClient {
return new RpcClient({
cliPath,
cwd,
args,
});
}
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function timeout(ms: number, message: string): Promise<never> {
return new Promise((_, reject) => {
setTimeout(() => reject(new Error(message)), ms);
});
}
function waitFor(predicate: () => boolean, timeoutMs: number): Promise<void> {
if (predicate()) return Promise.resolve();
return new Promise((resolve, reject) => {
const startedAt = Date.now();
const interval = setInterval(() => {
if (predicate()) {
clearInterval(interval);
resolve();
return;
}
if (Date.now() - startedAt >= timeoutMs) {
clearInterval(interval);
reject(new Error(`Timed out after ${timeoutMs}ms`));
}
}, 100);
});
}
function extractBlocker(event: SdkAgentEvent): PendingBlocker {
const uiEvent = event as unknown as RpcExtensionUIRequest;
return {
id: String(uiEvent.id ?? ""),
method: String(uiEvent.method ?? ""),
message: String(
(uiEvent as Record<string, unknown>).title ??
(uiEvent as Record<string, unknown>).message ??
"",
),
event: uiEvent,
};
}