fix(ai): resolve WebSocket listener leaks and bound session cache

Fix two memory leaks in the OpenAI Codex Responses WebSocket code:

1. parseWebSocket() onMessage handler: The fire-and-forget async IIFE
   could error after the await on decodeWebSocketData(), swallowing the
   error and leaving all four event listeners attached to the socket
   indefinitely. Wrap the entire handler body in try/catch, signal the
   error to the generator loop via `failed`/`done`, and call cleanup()
   to remove listeners immediately. JSON SyntaxErrors are treated as
   non-fatal (malformed message skipped).

2. websocketSessionCache: The Map grows without bound when many distinct
   session IDs are used over the lifetime of a process. Add a
   MAX_WEBSOCKET_CACHE_SIZE (10) constant and evict the oldest entry
   (first key in insertion order) before inserting a new one, closing
   the evicted socket and clearing its idle timer.

Also extract the duplicated removeEventListener calls in parseWebSocket
into a shared cleanup() helper used by both the onMessage error path
and the finally block.
This commit is contained in:
frizynn 2026-03-22 22:30:11 -03:00
parent f196309295
commit 498a4b5310

View file

@ -451,6 +451,7 @@ async function* parseSSE(response: Response): AsyncGenerator<Record<string, unkn
const OPENAI_BETA_RESPONSES_WEBSOCKETS = "responses_websockets=2026-02-06";
const SESSION_WEBSOCKET_CACHE_TTL_MS = 5 * 60 * 1000;
const MAX_WEBSOCKET_CACHE_SIZE = 10;
type WebSocketEventType = "open" | "message" | "error" | "close";
type WebSocketListener = (event: unknown) => void;
@ -635,6 +636,20 @@ async function acquireWebSocket(
const socket = await connectWebSocket(url, headers, signal);
const entry: CachedWebSocketConnection = { socket, busy: true };
// Evict the oldest entry if the cache is at capacity (LRU eviction).
if (websocketSessionCache.size >= MAX_WEBSOCKET_CACHE_SIZE) {
const oldestKey = websocketSessionCache.keys().next().value;
if (oldestKey) {
const oldEntry = websocketSessionCache.get(oldestKey);
websocketSessionCache.delete(oldestKey);
if (oldEntry) {
if (oldEntry.idleTimer) clearTimeout(oldEntry.idleTimer);
closeWebSocketSilently(oldEntry.socket);
}
}
}
websocketSessionCache.set(sessionId, entry);
return {
socket,
@ -705,12 +720,19 @@ async function* parseWebSocket(socket: WebSocketLike, signal?: AbortSignal): Asy
resolve();
};
const cleanup = () => {
socket.removeEventListener("message", onMessage);
socket.removeEventListener("error", onError);
socket.removeEventListener("close", onClose);
signal?.removeEventListener("abort", onAbort);
};
const onMessage: WebSocketListener = (event) => {
void (async () => {
if (!event || typeof event !== "object" || !("data" in event)) return;
const text = await decodeWebSocketData((event as { data?: unknown }).data);
if (!text) return;
try {
if (!event || typeof event !== "object" || !("data" in event)) return;
const text = await decodeWebSocketData((event as { data?: unknown }).data);
if (!text) return;
const parsed = JSON.parse(text) as Record<string, unknown>;
const type = typeof parsed.type === "string" ? parsed.type : "";
if (type === "response.completed" || type === "response.done") {
@ -719,7 +741,19 @@ async function* parseWebSocket(socket: WebSocketLike, signal?: AbortSignal): Asy
}
queue.push(parsed);
wake();
} catch {}
} catch (err) {
// Ensure listeners are cleaned up if the async handler errors.
// Without this, the fire-and-forget promise would swallow the
// error while leaving listeners attached to the socket.
if (err instanceof SyntaxError) {
// JSON parse failure — skip the malformed message.
return;
}
failed = err instanceof Error ? err : new Error(String(err));
done = true;
cleanup();
wake();
}
})();
};
@ -775,10 +809,7 @@ async function* parseWebSocket(socket: WebSocketLike, signal?: AbortSignal): Asy
throw new Error("WebSocket stream closed before response.completed");
}
} finally {
socket.removeEventListener("message", onMessage);
socket.removeEventListener("error", onError);
socket.removeEventListener("close", onClose);
signal?.removeEventListener("abort", onAbort);
cleanup();
}
}