diff --git a/packages/pi-ai/src/providers/openai-codex-responses.ts b/packages/pi-ai/src/providers/openai-codex-responses.ts index 3a93e9fa0..294290188 100644 --- a/packages/pi-ai/src/providers/openai-codex-responses.ts +++ b/packages/pi-ai/src/providers/openai-codex-responses.ts @@ -451,6 +451,7 @@ async function* parseSSE(response: Response): AsyncGenerator void; @@ -635,6 +636,20 @@ async function acquireWebSocket( const socket = await connectWebSocket(url, headers, signal); const entry: CachedWebSocketConnection = { socket, busy: true }; + + // Evict the oldest entry if the cache is at capacity (LRU eviction). + if (websocketSessionCache.size >= MAX_WEBSOCKET_CACHE_SIZE) { + const oldestKey = websocketSessionCache.keys().next().value; + if (oldestKey) { + const oldEntry = websocketSessionCache.get(oldestKey); + websocketSessionCache.delete(oldestKey); + if (oldEntry) { + if (oldEntry.idleTimer) clearTimeout(oldEntry.idleTimer); + closeWebSocketSilently(oldEntry.socket); + } + } + } + websocketSessionCache.set(sessionId, entry); return { socket, @@ -705,12 +720,19 @@ async function* parseWebSocket(socket: WebSocketLike, signal?: AbortSignal): Asy resolve(); }; + const cleanup = () => { + socket.removeEventListener("message", onMessage); + socket.removeEventListener("error", onError); + socket.removeEventListener("close", onClose); + signal?.removeEventListener("abort", onAbort); + }; + const onMessage: WebSocketListener = (event) => { void (async () => { - if (!event || typeof event !== "object" || !("data" in event)) return; - const text = await decodeWebSocketData((event as { data?: unknown }).data); - if (!text) return; try { + if (!event || typeof event !== "object" || !("data" in event)) return; + const text = await decodeWebSocketData((event as { data?: unknown }).data); + if (!text) return; const parsed = JSON.parse(text) as Record; const type = typeof parsed.type === "string" ? parsed.type : ""; if (type === "response.completed" || type === "response.done") { @@ -719,7 +741,19 @@ async function* parseWebSocket(socket: WebSocketLike, signal?: AbortSignal): Asy } queue.push(parsed); wake(); - } catch {} + } catch (err) { + // Ensure listeners are cleaned up if the async handler errors. + // Without this, the fire-and-forget promise would swallow the + // error while leaving listeners attached to the socket. + if (err instanceof SyntaxError) { + // JSON parse failure — skip the malformed message. + return; + } + failed = err instanceof Error ? err : new Error(String(err)); + done = true; + cleanup(); + wake(); + } })(); }; @@ -775,10 +809,7 @@ async function* parseWebSocket(socket: WebSocketLike, signal?: AbortSignal): Asy throw new Error("WebSocket stream closed before response.completed"); } } finally { - socket.removeEventListener("message", onMessage); - socket.removeEventListener("error", onError); - socket.removeEventListener("close", onClose); - signal?.removeEventListener("abort", onAbort); + cleanup(); } }