Merge pull request #2168 from frizynn/fix/ai-providers-memory-leaks

fix(ai): resolve WebSocket listener leaks and bound session cache
This commit is contained in:
TÂCHES 2026-03-25 22:15:32 -06:00 committed by GitHub
commit 19addd2aa5

View file

@ -451,6 +451,7 @@ async function* parseSSE(response: Response): AsyncGenerator<Record<string, unkn
const OPENAI_BETA_RESPONSES_WEBSOCKETS = "responses_websockets=2026-02-06";
const SESSION_WEBSOCKET_CACHE_TTL_MS = 5 * 60 * 1000;
const MAX_WEBSOCKET_CACHE_SIZE = 10;
type WebSocketEventType = "open" | "message" | "error" | "close";
type WebSocketListener = (event: unknown) => void;
@ -635,6 +636,20 @@ async function acquireWebSocket(
const socket = await connectWebSocket(url, headers, signal);
const entry: CachedWebSocketConnection = { socket, busy: true };
// Evict the oldest entry if the cache is at capacity (LRU eviction).
if (websocketSessionCache.size >= MAX_WEBSOCKET_CACHE_SIZE) {
const oldestKey = websocketSessionCache.keys().next().value;
if (oldestKey) {
const oldEntry = websocketSessionCache.get(oldestKey);
websocketSessionCache.delete(oldestKey);
if (oldEntry) {
if (oldEntry.idleTimer) clearTimeout(oldEntry.idleTimer);
closeWebSocketSilently(oldEntry.socket);
}
}
}
websocketSessionCache.set(sessionId, entry);
return {
socket,
@ -705,12 +720,19 @@ async function* parseWebSocket(socket: WebSocketLike, signal?: AbortSignal): Asy
resolve();
};
const cleanup = () => {
socket.removeEventListener("message", onMessage);
socket.removeEventListener("error", onError);
socket.removeEventListener("close", onClose);
signal?.removeEventListener("abort", onAbort);
};
const onMessage: WebSocketListener = (event) => {
void (async () => {
if (!event || typeof event !== "object" || !("data" in event)) return;
const text = await decodeWebSocketData((event as { data?: unknown }).data);
if (!text) return;
try {
if (!event || typeof event !== "object" || !("data" in event)) return;
const text = await decodeWebSocketData((event as { data?: unknown }).data);
if (!text) return;
const parsed = JSON.parse(text) as Record<string, unknown>;
const type = typeof parsed.type === "string" ? parsed.type : "";
if (type === "response.completed" || type === "response.done") {
@ -719,7 +741,19 @@ async function* parseWebSocket(socket: WebSocketLike, signal?: AbortSignal): Asy
}
queue.push(parsed);
wake();
} catch {}
} catch (err) {
// Ensure listeners are cleaned up if the async handler errors.
// Without this, the fire-and-forget promise would swallow the
// error while leaving listeners attached to the socket.
if (err instanceof SyntaxError) {
// JSON parse failure — skip the malformed message.
return;
}
failed = err instanceof Error ? err : new Error(String(err));
done = true;
cleanup();
wake();
}
})();
};
@ -775,10 +809,7 @@ async function* parseWebSocket(socket: WebSocketLike, signal?: AbortSignal): Asy
throw new Error("WebSocket stream closed before response.completed");
}
} finally {
socket.removeEventListener("message", onMessage);
socket.removeEventListener("error", onError);
socket.removeEventListener("close", onClose);
signal?.removeEventListener("abort", onAbort);
cleanup();
}
}