feat: Added runId generation on prompt/steer/follow_up commands, event…

- "packages/pi-coding-agent/src/modes/rpc/rpc-mode.ts"
- "packages/pi-coding-agent/src/modes/rpc/rpc-client.ts"
- "packages/pi-coding-agent/src/modes/rpc/rpc-types.ts"

GSD-Task: S01/T02
This commit is contained in:
Lex Christopherson 2026-03-26 11:05:32 -06:00
parent 01e37670e1
commit c5bc9208c4
3 changed files with 120 additions and 5 deletions

View file

@ -11,7 +11,7 @@ import type { SessionStats } from "../../core/agent-session.js";
import type { BashResult } from "../../core/bash-executor.js";
import type { CompactionResult } from "../../core/compaction/index.js";
import { attachJsonlLineReader, serializeJsonLine } from "./jsonl.js";
import type { RpcCommand, RpcResponse, RpcSessionState, RpcSlashCommand } from "./rpc-types.js";
import type { RpcCommand, RpcInitResult, RpcResponse, RpcSessionState, RpcSlashCommand } from "./rpc-types.js";
// ============================================================================
// Types
@ -413,6 +413,44 @@ export class RpcClient {
}));
}
/**
* Initialize a v2 protocol session. Must be sent as the first command.
* Returns the negotiated protocol version, session ID, and server capabilities.
*/
async init(options?: { clientId?: string }): Promise<RpcInitResult> {
const response = await this.send({ type: "init", protocolVersion: 2, clientId: options?.clientId });
return this.getData<RpcInitResult>(response);
}
/**
* Request a graceful shutdown of the agent process.
* Waits for the response before the process exits.
*/
async shutdown(): Promise<void> {
await this.send({ type: "shutdown" });
// Wait for process to exit after shutdown acknowledgment
if (this.process) {
await new Promise<void>((resolve) => {
const timeout = setTimeout(() => {
this.process?.kill("SIGKILL");
resolve();
}, 5000);
this.process?.on("exit", () => {
clearTimeout(timeout);
resolve();
});
});
}
}
/**
* Subscribe to specific event types (v2 only).
* Pass ["*"] to receive all events, or a list of event type strings to filter.
*/
async subscribe(events: string[]): Promise<void> {
await this.send({ type: "subscribe", events });
}
// =========================================================================
// Helpers
// =========================================================================

View file

@ -82,6 +82,12 @@ export async function runRpcMode(session: AgentSession): Promise<never> {
let protocolVersion: 1 | 2 = 1;
let protocolLocked = false;
// v2 runId threading: tracks the current execution run
let currentRunId: string | null = null;
// v2 event filtering: null = no filter (all events); Set = only listed event types
let eventFilter: Set<string> | null = null;
const embeddedTerminalEnabled = process.env.GSD_WEB_BRIDGE_TUI === "1";
const remoteTerminal = embeddedTerminalEnabled
? new RemoteTerminal({
@ -433,7 +439,55 @@ export async function runRpcMode(session: AgentSession): Promise<never> {
// Output all agent events as JSON
const unsubscribe = session.subscribe((event) => {
output(event);
// v2: emit synthesized events before the regular event
if (protocolVersion === 2) {
// cost_update on assistant message_end
if (event.type === "message_end" && event.message.role === "assistant" && currentRunId) {
const stats = session.getSessionStats();
const costUpdate = {
type: "cost_update" as const,
runId: currentRunId,
turnCost: session.getLastTurnCost(),
cumulativeCost: stats.cost,
tokens: {
input: stats.tokens.input,
output: stats.tokens.output,
cacheRead: stats.tokens.cacheRead,
cacheWrite: stats.tokens.cacheWrite,
},
};
if (!eventFilter || eventFilter.has("cost_update")) {
output(costUpdate);
}
}
// execution_complete on agent_end
if (event.type === "agent_end" && currentRunId) {
const stats = session.getSessionStats();
const completionEvent = {
type: "execution_complete" as const,
runId: currentRunId,
status: "completed" as const,
stats,
};
if (!eventFilter || eventFilter.has("execution_complete")) {
output(completionEvent);
}
currentRunId = null;
}
}
// Apply event filter (v2 only, applies to agent session events only)
if (protocolVersion === 2 && eventFilter && !eventFilter.has(event.type)) {
return;
}
// Emit the regular event, with runId injection in v2 mode
if (protocolVersion === 2 && currentRunId) {
output({ ...event, runId: currentRunId });
} else {
output(event);
}
});
// Handle a single command
@ -446,6 +500,9 @@ export async function runRpcMode(session: AgentSession): Promise<never> {
// =================================================================
case "prompt": {
// v2: generate runId for execution tracking
const runId = protocolVersion === 2 ? crypto.randomUUID() : undefined;
if (runId) currentRunId = runId;
// Don't await - events will stream
// Extension commands are executed immediately, file prompt templates are expanded
// If streaming and streamingBehavior specified, queues via steer/followUp
@ -456,17 +513,23 @@ export async function runRpcMode(session: AgentSession): Promise<never> {
source: "rpc",
})
.catch((e) => output(error(id, "prompt", e.message)));
return success(id, "prompt");
return { id, type: "response", command: "prompt", success: true, ...(runId && { runId }) } as RpcResponse;
}
case "steer": {
// v2: generate runId for execution tracking
const runId = protocolVersion === 2 ? crypto.randomUUID() : undefined;
if (runId) currentRunId = runId;
await session.steer(command.message, command.images);
return success(id, "steer");
return { id, type: "response", command: "steer", success: true, ...(runId && { runId }) } as RpcResponse;
}
case "follow_up": {
// v2: generate runId for execution tracking
const runId = protocolVersion === 2 ? crypto.randomUUID() : undefined;
if (runId) currentRunId = runId;
await session.followUp(command.message, command.images);
return success(id, "follow_up");
return { id, type: "response", command: "follow_up", success: true, ...(runId && { runId }) } as RpcResponse;
}
case "abort": {
@ -717,6 +780,19 @@ export async function runRpcMode(session: AgentSession): Promise<never> {
return success(id, "terminal_redraw");
}
// =================================================================
// v2 Protocol: subscribe
// =================================================================
case "subscribe": {
if (command.events.includes("*")) {
eventFilter = null; // wildcard = all events
} else {
eventFilter = new Set(command.events);
}
return success(id, "subscribe");
}
// =================================================================
// v2 Protocol: shutdown
// =================================================================

View file

@ -231,6 +231,7 @@ export type RpcResponse =
// v2 Protocol
| { id?: string; type: "response"; command: "init"; success: true; data: RpcInitResult }
| { id?: string; type: "response"; command: "shutdown"; success: true }
| { id?: string; type: "response"; command: "subscribe"; success: true }
// Error response (any command can fail)
| { id?: string; type: "response"; command: string; success: false; error: string };