diff --git a/packages/pi-coding-agent/src/modes/rpc/rpc-client.ts b/packages/pi-coding-agent/src/modes/rpc/rpc-client.ts index 197dee8a0..e776bd8ad 100644 --- a/packages/pi-coding-agent/src/modes/rpc/rpc-client.ts +++ b/packages/pi-coding-agent/src/modes/rpc/rpc-client.ts @@ -11,7 +11,7 @@ import type { SessionStats } from "../../core/agent-session.js"; import type { BashResult } from "../../core/bash-executor.js"; import type { CompactionResult } from "../../core/compaction/index.js"; import { attachJsonlLineReader, serializeJsonLine } from "./jsonl.js"; -import type { RpcCommand, RpcResponse, RpcSessionState, RpcSlashCommand } from "./rpc-types.js"; +import type { RpcCommand, RpcInitResult, RpcResponse, RpcSessionState, RpcSlashCommand } from "./rpc-types.js"; // ============================================================================ // Types @@ -413,6 +413,44 @@ export class RpcClient { })); } + /** + * Initialize a v2 protocol session. Must be sent as the first command. + * Returns the negotiated protocol version, session ID, and server capabilities. + */ + async init(options?: { clientId?: string }): Promise { + const response = await this.send({ type: "init", protocolVersion: 2, clientId: options?.clientId }); + return this.getData(response); + } + + /** + * Request a graceful shutdown of the agent process. + * Waits for the response before the process exits. + */ + async shutdown(): Promise { + await this.send({ type: "shutdown" }); + // Wait for process to exit after shutdown acknowledgment + if (this.process) { + await new Promise((resolve) => { + const timeout = setTimeout(() => { + this.process?.kill("SIGKILL"); + resolve(); + }, 5000); + this.process?.on("exit", () => { + clearTimeout(timeout); + resolve(); + }); + }); + } + } + + /** + * Subscribe to specific event types (v2 only). + * Pass ["*"] to receive all events, or a list of event type strings to filter. + */ + async subscribe(events: string[]): Promise { + await this.send({ type: "subscribe", events }); + } + // ========================================================================= // Helpers // ========================================================================= diff --git a/packages/pi-coding-agent/src/modes/rpc/rpc-mode.ts b/packages/pi-coding-agent/src/modes/rpc/rpc-mode.ts index 27a898765..f2f8fbe4c 100644 --- a/packages/pi-coding-agent/src/modes/rpc/rpc-mode.ts +++ b/packages/pi-coding-agent/src/modes/rpc/rpc-mode.ts @@ -82,6 +82,12 @@ export async function runRpcMode(session: AgentSession): Promise { let protocolVersion: 1 | 2 = 1; let protocolLocked = false; + // v2 runId threading: tracks the current execution run + let currentRunId: string | null = null; + + // v2 event filtering: null = no filter (all events); Set = only listed event types + let eventFilter: Set | null = null; + const embeddedTerminalEnabled = process.env.GSD_WEB_BRIDGE_TUI === "1"; const remoteTerminal = embeddedTerminalEnabled ? new RemoteTerminal({ @@ -433,7 +439,55 @@ export async function runRpcMode(session: AgentSession): Promise { // Output all agent events as JSON const unsubscribe = session.subscribe((event) => { - output(event); + // v2: emit synthesized events before the regular event + if (protocolVersion === 2) { + // cost_update on assistant message_end + if (event.type === "message_end" && event.message.role === "assistant" && currentRunId) { + const stats = session.getSessionStats(); + const costUpdate = { + type: "cost_update" as const, + runId: currentRunId, + turnCost: session.getLastTurnCost(), + cumulativeCost: stats.cost, + tokens: { + input: stats.tokens.input, + output: stats.tokens.output, + cacheRead: stats.tokens.cacheRead, + cacheWrite: stats.tokens.cacheWrite, + }, + }; + if (!eventFilter || eventFilter.has("cost_update")) { + output(costUpdate); + } + } + + // execution_complete on agent_end + if (event.type === "agent_end" && currentRunId) { + const stats = session.getSessionStats(); + const completionEvent = { + type: "execution_complete" as const, + runId: currentRunId, + status: "completed" as const, + stats, + }; + if (!eventFilter || eventFilter.has("execution_complete")) { + output(completionEvent); + } + currentRunId = null; + } + } + + // Apply event filter (v2 only, applies to agent session events only) + if (protocolVersion === 2 && eventFilter && !eventFilter.has(event.type)) { + return; + } + + // Emit the regular event, with runId injection in v2 mode + if (protocolVersion === 2 && currentRunId) { + output({ ...event, runId: currentRunId }); + } else { + output(event); + } }); // Handle a single command @@ -446,6 +500,9 @@ export async function runRpcMode(session: AgentSession): Promise { // ================================================================= case "prompt": { + // v2: generate runId for execution tracking + const runId = protocolVersion === 2 ? crypto.randomUUID() : undefined; + if (runId) currentRunId = runId; // Don't await - events will stream // Extension commands are executed immediately, file prompt templates are expanded // If streaming and streamingBehavior specified, queues via steer/followUp @@ -456,17 +513,23 @@ export async function runRpcMode(session: AgentSession): Promise { source: "rpc", }) .catch((e) => output(error(id, "prompt", e.message))); - return success(id, "prompt"); + return { id, type: "response", command: "prompt", success: true, ...(runId && { runId }) } as RpcResponse; } case "steer": { + // v2: generate runId for execution tracking + const runId = protocolVersion === 2 ? crypto.randomUUID() : undefined; + if (runId) currentRunId = runId; await session.steer(command.message, command.images); - return success(id, "steer"); + return { id, type: "response", command: "steer", success: true, ...(runId && { runId }) } as RpcResponse; } case "follow_up": { + // v2: generate runId for execution tracking + const runId = protocolVersion === 2 ? crypto.randomUUID() : undefined; + if (runId) currentRunId = runId; await session.followUp(command.message, command.images); - return success(id, "follow_up"); + return { id, type: "response", command: "follow_up", success: true, ...(runId && { runId }) } as RpcResponse; } case "abort": { @@ -717,6 +780,19 @@ export async function runRpcMode(session: AgentSession): Promise { return success(id, "terminal_redraw"); } + // ================================================================= + // v2 Protocol: subscribe + // ================================================================= + + case "subscribe": { + if (command.events.includes("*")) { + eventFilter = null; // wildcard = all events + } else { + eventFilter = new Set(command.events); + } + return success(id, "subscribe"); + } + // ================================================================= // v2 Protocol: shutdown // ================================================================= diff --git a/packages/pi-coding-agent/src/modes/rpc/rpc-types.ts b/packages/pi-coding-agent/src/modes/rpc/rpc-types.ts index 957e0f3ac..20d5c2c73 100644 --- a/packages/pi-coding-agent/src/modes/rpc/rpc-types.ts +++ b/packages/pi-coding-agent/src/modes/rpc/rpc-types.ts @@ -231,6 +231,7 @@ export type RpcResponse = // v2 Protocol | { id?: string; type: "response"; command: "init"; success: true; data: RpcInitResult } | { id?: string; type: "response"; command: "shutdown"; success: true } + | { id?: string; type: "response"; command: "subscribe"; success: true } // Error response (any command can fail) | { id?: string; type: "response"; command: string; success: false; error: string };