From 498a4b5310ee35cf2389c45a412df4883cc3177c Mon Sep 17 00:00:00 2001 From: frizynn Date: Sun, 22 Mar 2026 22:30:11 -0300 Subject: [PATCH] fix(ai): resolve WebSocket listener leaks and bound session cache Fix two memory leaks in the OpenAI Codex Responses WebSocket code: 1. parseWebSocket() onMessage handler: The fire-and-forget async IIFE could error after the await on decodeWebSocketData(), swallowing the error and leaving all four event listeners attached to the socket indefinitely. Wrap the entire handler body in try/catch, signal the error to the generator loop via `failed`/`done`, and call cleanup() to remove listeners immediately. JSON SyntaxErrors are treated as non-fatal (malformed message skipped). 2. websocketSessionCache: The Map grows without bound when many distinct session IDs are used over the lifetime of a process. Add a MAX_WEBSOCKET_CACHE_SIZE (10) constant and evict the oldest entry (first key in insertion order) before inserting a new one, closing the evicted socket and clearing its idle timer. Also extract the duplicated removeEventListener calls in parseWebSocket into a shared cleanup() helper used by both the onMessage error path and the finally block. --- .../src/providers/openai-codex-responses.ts | 47 +++++++++++++++---- 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/packages/pi-ai/src/providers/openai-codex-responses.ts b/packages/pi-ai/src/providers/openai-codex-responses.ts index 3a93e9fa0..294290188 100644 --- a/packages/pi-ai/src/providers/openai-codex-responses.ts +++ b/packages/pi-ai/src/providers/openai-codex-responses.ts @@ -451,6 +451,7 @@ async function* parseSSE(response: Response): AsyncGenerator void; @@ -635,6 +636,20 @@ async function acquireWebSocket( const socket = await connectWebSocket(url, headers, signal); const entry: CachedWebSocketConnection = { socket, busy: true }; + + // Evict the oldest entry if the cache is at capacity (LRU eviction). + if (websocketSessionCache.size >= MAX_WEBSOCKET_CACHE_SIZE) { + const oldestKey = websocketSessionCache.keys().next().value; + if (oldestKey) { + const oldEntry = websocketSessionCache.get(oldestKey); + websocketSessionCache.delete(oldestKey); + if (oldEntry) { + if (oldEntry.idleTimer) clearTimeout(oldEntry.idleTimer); + closeWebSocketSilently(oldEntry.socket); + } + } + } + websocketSessionCache.set(sessionId, entry); return { socket, @@ -705,12 +720,19 @@ async function* parseWebSocket(socket: WebSocketLike, signal?: AbortSignal): Asy resolve(); }; + const cleanup = () => { + socket.removeEventListener("message", onMessage); + socket.removeEventListener("error", onError); + socket.removeEventListener("close", onClose); + signal?.removeEventListener("abort", onAbort); + }; + const onMessage: WebSocketListener = (event) => { void (async () => { - if (!event || typeof event !== "object" || !("data" in event)) return; - const text = await decodeWebSocketData((event as { data?: unknown }).data); - if (!text) return; try { + if (!event || typeof event !== "object" || !("data" in event)) return; + const text = await decodeWebSocketData((event as { data?: unknown }).data); + if (!text) return; const parsed = JSON.parse(text) as Record; const type = typeof parsed.type === "string" ? parsed.type : ""; if (type === "response.completed" || type === "response.done") { @@ -719,7 +741,19 @@ async function* parseWebSocket(socket: WebSocketLike, signal?: AbortSignal): Asy } queue.push(parsed); wake(); - } catch {} + } catch (err) { + // Ensure listeners are cleaned up if the async handler errors. + // Without this, the fire-and-forget promise would swallow the + // error while leaving listeners attached to the socket. + if (err instanceof SyntaxError) { + // JSON parse failure — skip the malformed message. + return; + } + failed = err instanceof Error ? err : new Error(String(err)); + done = true; + cleanup(); + wake(); + } })(); }; @@ -775,10 +809,7 @@ async function* parseWebSocket(socket: WebSocketLike, signal?: AbortSignal): Asy throw new Error("WebSocket stream closed before response.completed"); } } finally { - socket.removeEventListener("message", onMessage); - socket.removeEventListener("error", onError); - socket.removeEventListener("close", onClose); - signal?.removeEventListener("abort", onAbort); + cleanup(); } }