From 867a4be2976a9c3ff2e5157c977de113f8531d6d Mon Sep 17 00:00:00 2001 From: Jeremy McSpadden Date: Tue, 24 Mar 2026 08:23:36 -0500 Subject: [PATCH] fix(memory): fix memory and resource leaks across TUI, LSP, DB, and automation (#2314) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(memory): fix memory and resource leaks across TUI, LSP, DB, and automation Addresses all findings from a systematic memory leak audit across five dimensions: event listeners, timers, file system handles, subscriptions/ closures, and GSD automation lifecycle. Critical fixes: rpc-client.ts: stderr .on("data") handler attached in start() was never removed in stop(). Now stored as _stderrHandler and removed via removeListener() on stop. lsp/client.ts: Three process.on() handlers (beforeExit, SIGINT, SIGTERM) registered at module load time with anonymous functions — impossible to remove. Now stored as named references; new removeProcessHandlers() export allows graceful teardown. stdout/stderr stream listeners in startMessageReader/startStderrReader also stored per-client in clientStreamHandlers map and removed in shutdownClient() and shutdownAll(). parallel-orchestrator.ts: spawnWorker() attached 5 listeners to child process streams on every spawn with no removal on worker stop/respawn, accumulating listeners indefinitely. Added cleanup() field to WorkerInfo; called via removeAllListeners() on exit, graceful stop, stale detection, and dead PID cleanup paths. Also: module-level state.workers Map was never cleared between orchestration runs; startParallel() and resetOrchestrator() now iterate and clean up all WorkerInfo entries before reassigning state. scripts/watch-resources.js: fs.watch() return value was discarded (OS watcher never closed) and the fallback setInterval handle was also discarded (timer ran forever). Both now stored; process.on("exit") handler closes/clears them. gsd-db.ts: closeDatabase() did not checkpoint the WAL before closing — .db-shm/.db-wal files accumulated on disk across crash-recovery cycles. Now runs PRAGMA wal_checkpoint(TRUNCATE) before close. Also added a one-time process.on("exit") handler in openDatabase() so the handle is always closed even on unclean exits. Medium fixes: bg-shell/overlay.ts: 1-second refresh setInterval only cleared in keyboard exit handler; abnormal teardown leaked the timer. Added dispose() method that unconditionally clears it. file-watcher.ts: pending debounce Map was scoped inside startFileWatcher() making it inaccessible to stopFileWatcher(). Moved to module scope; stopFileWatcher() now clears all pending timers and empties the map before closing the watcher. auto-supervisor.ts: registerSigtermHandler() could accumulate multiple SIGTERM handlers if called without passing back the previous reference. Added module-level _currentSigtermHandler; old handler is always removed before registering the new one regardless of whether caller passes it. Low-severity fixes: print-mode.ts: session.subscribe() return value was discarded. Now stored and called in a finally block to guarantee cleanup on both normal completion and errors. rpc-mode.ts: same — subscribe() unsubscribe now called in the shutdown path before process.exit(). theme.ts: onThemeChangeCallback singleton silently overwrote any previous subscriber. Converted to Set<() => void>; onThemeChange() now returns a cleanup function. All four internal call sites updated to forEach(). Backward-compatible — existing callers that discard the return are unaffected. * fix: ensure unsubscribe is called on error/abort in print-mode The PR #2314 added unsubscribe storage but still called process.exit(1) directly, bypassing the unsubscribe. Wrapped in try/finally to guarantee cleanup runs before exit. --- .../pi-coding-agent/src/core/lsp/client.ts | 104 ++++++++++++++---- .../src/modes/interactive/theme/theme.ts | 25 ++--- .../pi-coding-agent/src/modes/print-mode.ts | 74 +++++++------ .../src/modes/rpc/rpc-client.ts | 10 +- .../pi-coding-agent/src/modes/rpc/rpc-mode.ts | 3 +- scripts/watch-resources.js | 13 ++- src/resources/extensions/bg-shell/overlay.ts | 4 + .../extensions/gsd/auto-supervisor.ts | 14 +++ src/resources/extensions/gsd/file-watcher.ts | 5 +- src/resources/extensions/gsd/gsd-db.ts | 10 ++ .../extensions/gsd/parallel-orchestrator.ts | 43 ++++++++ 11 files changed, 230 insertions(+), 75 deletions(-) diff --git a/packages/pi-coding-agent/src/core/lsp/client.ts b/packages/pi-coding-agent/src/core/lsp/client.ts index 47e942cc4..400b2beb0 100644 --- a/packages/pi-coding-agent/src/core/lsp/client.ts +++ b/packages/pi-coding-agent/src/core/lsp/client.ts @@ -24,6 +24,17 @@ const clients = new Map(); const clientLocks = new Map>(); const fileOperationLocks = new Map>(); +/** Track stream listeners per client so they can be removed on shutdown. */ +interface StreamHandlers { + stdoutData?: (chunk: Buffer) => void; + stdoutEnd?: () => void; + stdoutError?: () => void; + stderrData?: (chunk: Buffer) => void; + stderrEnd?: () => void; + stderrError?: () => void; +} +const clientStreamHandlers = new Map(); + // Idle timeout configuration (disabled by default) let idleTimeoutMs: number | null = null; let idleCheckInterval: ReturnType | null = null; @@ -257,7 +268,9 @@ async function startMessageReader(client: LspClient): Promise { } return new Promise((resolve) => { - stdout.on("data", async (chunk: Buffer) => { + const handlers = clientStreamHandlers.get(client.name) ?? {}; + + handlers.stdoutData = async (chunk: Buffer) => { const currentBuffer: Buffer = Buffer.concat([client.messageBuffer, chunk]); if (currentBuffer.length > MAX_MESSAGE_BUFFER_SIZE) { @@ -307,17 +320,22 @@ async function startMessageReader(client: LspClient): Promise { } client.messageBuffer = workingBuffer; - }); + }; + stdout.on("data", handlers.stdoutData); - stdout.on("end", () => { + handlers.stdoutEnd = () => { client.isReading = false; resolve(); - }); + }; + stdout.on("end", handlers.stdoutEnd); - stdout.on("error", () => { + handlers.stdoutError = () => { client.isReading = false; resolve(); - }); + }; + stdout.on("error", handlers.stdoutError); + + clientStreamHandlers.set(client.name, handlers); }); } @@ -402,21 +420,28 @@ async function startStderrReader(client: LspClient): Promise { if (!stderr) return; return new Promise((resolve) => { - stderr.on("data", (chunk: Buffer) => { + const handlers = clientStreamHandlers.get(client.name) ?? {}; + + handlers.stderrData = (chunk: Buffer) => { const text = chunk.toString("utf-8"); client.stderrBuffer += text; if (client.stderrBuffer.length > 4096) { client.stderrBuffer = client.stderrBuffer.slice(-4096); } - }); + }; + stderr.on("data", handlers.stderrData); - stderr.on("end", () => { + handlers.stderrEnd = () => { resolve(); - }); + }; + stderr.on("end", handlers.stderrEnd); - stderr.on("error", () => { + handlers.stderrError = () => { resolve(); - }); + }; + stderr.on("error", handlers.stderrError); + + clientStreamHandlers.set(client.name, handlers); }); } @@ -706,6 +731,23 @@ export function notifyFileChanged(filePath: string): void { } } +/** + * Remove stdout/stderr stream listeners for a client to prevent leaks. + */ +function removeStreamHandlers(client: LspClient): void { + const handlers = clientStreamHandlers.get(client.name); + if (!handlers) return; + + if (handlers.stdoutData) client.proc.stdout?.removeListener("data", handlers.stdoutData); + if (handlers.stdoutEnd) client.proc.stdout?.removeListener("end", handlers.stdoutEnd); + if (handlers.stdoutError) client.proc.stdout?.removeListener("error", handlers.stdoutError); + if (handlers.stderrData) client.proc.stderr?.removeListener("data", handlers.stderrData); + if (handlers.stderrEnd) client.proc.stderr?.removeListener("end", handlers.stderrEnd); + if (handlers.stderrError) client.proc.stderr?.removeListener("error", handlers.stderrError); + + clientStreamHandlers.delete(client.name); +} + /** * Shutdown a specific client by key. */ @@ -720,6 +762,9 @@ function shutdownClient(key: string): void { sendRequest(client, "shutdown", null).catch(() => {}); + // Remove stream listeners before killing the process + removeStreamHandlers(client); + try { killProcessTree(client.proc.pid); } catch { @@ -860,6 +905,9 @@ function shutdownAll(): void { pending.reject(err); } + // Remove stream listeners before killing the process + removeStreamHandlers(client); + void (async () => { const timeout = new Promise(resolve => setTimeout(resolve, 5_000)); const result = sendRequest(client, "shutdown", null).catch(() => {}); @@ -893,14 +941,28 @@ export function getActiveClients(): LspServerStatus[] { // Process Cleanup // ============================================================================= +const _beforeExitHandler = () => shutdownAll(); +const _sigintHandler = () => { + shutdownAll(); + process.exit(0); +}; +const _sigtermHandler = () => { + shutdownAll(); + process.exit(0); +}; + if (typeof process !== "undefined") { - process.on("beforeExit", shutdownAll); - process.on("SIGINT", () => { - shutdownAll(); - process.exit(0); - }); - process.on("SIGTERM", () => { - shutdownAll(); - process.exit(0); - }); + process.on("beforeExit", _beforeExitHandler); + process.on("SIGINT", _sigintHandler); + process.on("SIGTERM", _sigtermHandler); +} + +/** + * Remove process-level signal handlers registered at module load. + * Call this during graceful teardown to prevent leaked listeners. + */ +export function removeProcessHandlers(): void { + process.off("beforeExit", _beforeExitHandler); + process.off("SIGINT", _sigintHandler); + process.off("SIGTERM", _sigtermHandler); } diff --git a/packages/pi-coding-agent/src/modes/interactive/theme/theme.ts b/packages/pi-coding-agent/src/modes/interactive/theme/theme.ts index db1a524a0..763b22734 100644 --- a/packages/pi-coding-agent/src/modes/interactive/theme/theme.ts +++ b/packages/pi-coding-agent/src/modes/interactive/theme/theme.ts @@ -663,7 +663,7 @@ function setGlobalTheme(t: Theme): void { let currentThemeName: string | undefined; let themeWatcher: fs.FSWatcher | undefined; -let onThemeChangeCallback: (() => void) | undefined; +const onThemeChangeCallbacks = new Set<() => void>(); const registeredThemes = new Map(); export function setRegisteredThemes(themes: Theme[]): void { @@ -698,9 +698,7 @@ export function setTheme(name: string, enableWatcher: boolean = false): { succes if (enableWatcher) { startThemeWatcher(); } - if (onThemeChangeCallback) { - onThemeChangeCallback(); - } + onThemeChangeCallbacks.forEach(cb => cb()); return { success: true }; } catch (error) { // Theme is invalid - fall back to dark theme @@ -718,13 +716,12 @@ export function setThemeInstance(themeInstance: Theme): void { setGlobalTheme(themeInstance); currentThemeName = ""; stopThemeWatcher(); // Can't watch a direct instance - if (onThemeChangeCallback) { - onThemeChangeCallback(); - } + onThemeChangeCallbacks.forEach(cb => cb()); } -export function onThemeChange(callback: () => void): void { - onThemeChangeCallback = callback; +export function onThemeChange(callback: () => void): () => void { + onThemeChangeCallbacks.add(callback); + return () => { onThemeChangeCallbacks.delete(callback); }; } function startThemeWatcher(): void { @@ -755,10 +752,8 @@ function startThemeWatcher(): void { try { // Reload the theme setGlobalTheme(loadTheme(currentThemeName!)); - // Notify callback (to invalidate UI) - if (onThemeChangeCallback) { - onThemeChangeCallback(); - } + // Notify callbacks (to invalidate UI) + onThemeChangeCallbacks.forEach(cb => cb()); } catch (_error) { // Ignore errors (file might be in invalid state while being edited) } @@ -773,9 +768,7 @@ function startThemeWatcher(): void { themeWatcher.close(); themeWatcher = undefined; } - if (onThemeChangeCallback) { - onThemeChangeCallback(); - } + onThemeChangeCallbacks.forEach(cb => cb()); } }, 100); } diff --git a/packages/pi-coding-agent/src/modes/print-mode.ts b/packages/pi-coding-agent/src/modes/print-mode.ts index a2557f99b..a44266450 100644 --- a/packages/pi-coding-agent/src/modes/print-mode.ts +++ b/packages/pi-coding-agent/src/modes/print-mode.ts @@ -45,52 +45,62 @@ export async function runPrintMode(session: AgentSession, options: PrintModeOpti }); // Always subscribe to enable session persistence via _handleAgentEvent - session.subscribe((event) => { + const unsubscribe = session.subscribe((event) => { // In JSON mode, output all events if (mode === "json") { console.log(JSON.stringify(event)); } }); - // Send initial message with attachments - if (initialMessage) { - await session.prompt(initialMessage, { images: initialImages }); - } + let exitCode = 0; - // Send remaining messages - for (const message of messages) { - await session.prompt(message); - } + try { + // Send initial message with attachments + if (initialMessage) { + await session.prompt(initialMessage, { images: initialImages }); + } - // In text mode, output final response - if (mode === "text") { - const state = session.state; - const lastMessage = state.messages[state.messages.length - 1]; + // Send remaining messages + for (const message of messages) { + await session.prompt(message); + } - if (lastMessage?.role === "assistant") { - const assistantMsg = lastMessage as AssistantMessage; + // In text mode, output final response + if (mode === "text") { + const state = session.state; + const lastMessage = state.messages[state.messages.length - 1]; - // Check for error/aborted - if (assistantMsg.stopReason === "error" || assistantMsg.stopReason === "aborted") { - console.error(assistantMsg.errorMessage || `Request ${assistantMsg.stopReason}`); - process.exit(1); - } + if (lastMessage?.role === "assistant") { + const assistantMsg = lastMessage as AssistantMessage; - // Output text content - for (const content of assistantMsg.content) { - if (content.type === "text") { - console.log(content.text); + // Check for error/aborted + if (assistantMsg.stopReason === "error" || assistantMsg.stopReason === "aborted") { + console.error(assistantMsg.errorMessage || `Request ${assistantMsg.stopReason}`); + exitCode = 1; + } else { + // Output text content + for (const content of assistantMsg.content) { + if (content.type === "text") { + console.log(content.text); + } + } } } } + + // Ensure stdout is fully flushed before returning + // This prevents race conditions where the process exits before all output is written + await new Promise((resolve, reject) => { + process.stdout.write("", (err) => { + if (err) reject(err); + else resolve(); + }); + }); + } finally { + unsubscribe(); } - // Ensure stdout is fully flushed before returning - // This prevents race conditions where the process exits before all output is written - await new Promise((resolve, reject) => { - process.stdout.write("", (err) => { - if (err) reject(err); - else resolve(); - }); - }); + if (exitCode !== 0) { + process.exit(exitCode); + } } diff --git a/packages/pi-coding-agent/src/modes/rpc/rpc-client.ts b/packages/pi-coding-agent/src/modes/rpc/rpc-client.ts index a3f91ecc4..c688a049f 100644 --- a/packages/pi-coding-agent/src/modes/rpc/rpc-client.ts +++ b/packages/pi-coding-agent/src/modes/rpc/rpc-client.ts @@ -54,6 +54,7 @@ export type RpcEventListener = (event: AgentEvent) => void; export class RpcClient { private process: ChildProcess | null = null; private stopReadingStdout: (() => void) | null = null; + private _stderrHandler?: (data: Buffer) => void; private eventListeners: RpcEventListener[] = []; private pendingRequests: Map void; reject: (error: Error) => void }> = new Map(); @@ -90,9 +91,10 @@ export class RpcClient { }); // Collect stderr for debugging - this.process.stderr?.on("data", (data) => { + this._stderrHandler = (data: Buffer) => { this.stderr += data.toString(); - }); + }; + this.process.stderr?.on("data", this._stderrHandler); // Set up strict JSONL reader for stdout. this.stopReadingStdout = attachJsonlLineReader(this.process.stdout!, (line) => { @@ -127,6 +129,10 @@ export class RpcClient { this.stopReadingStdout?.(); this.stopReadingStdout = null; + if (this._stderrHandler) { + this.process.stderr?.removeListener("data", this._stderrHandler); + this._stderrHandler = undefined; + } this.process.kill("SIGTERM"); // Wait for process to exit diff --git a/packages/pi-coding-agent/src/modes/rpc/rpc-mode.ts b/packages/pi-coding-agent/src/modes/rpc/rpc-mode.ts index e15c81ae3..fc80a9d3e 100644 --- a/packages/pi-coding-agent/src/modes/rpc/rpc-mode.ts +++ b/packages/pi-coding-agent/src/modes/rpc/rpc-mode.ts @@ -424,7 +424,7 @@ export async function runRpcMode(session: AgentSession): Promise { void extensionsReadyPromise; // Output all agent events as JSON - session.subscribe((event) => { + const unsubscribe = session.subscribe((event) => { output(event); }); @@ -730,6 +730,7 @@ export async function runRpcMode(session: AgentSession): Promise { await currentRunner.emit({ type: "session_shutdown" }); } + unsubscribe(); embeddedInteractiveMode?.stop(); detachInput(); process.stdin.pause(); diff --git a/scripts/watch-resources.js b/scripts/watch-resources.js index 900afae51..d0a160e26 100644 --- a/scripts/watch-resources.js +++ b/scripts/watch-resources.js @@ -37,6 +37,9 @@ process.stderr.write(`[watch-resources] Initial sync done\n`) // On Linux (Node <20.13) it throws ERR_FEATURE_UNAVAILABLE_ON_PLATFORM. // Fall back to polling on unsupported platforms. let timer = null +let fsWatcher = null +let pollInterval = null + const onChange = () => { if (timer) clearTimeout(timer) timer = setTimeout(() => { @@ -46,13 +49,19 @@ const onChange = () => { } try { - watch(src, { recursive: true }, onChange) + fsWatcher = watch(src, { recursive: true }, onChange) } catch { // Fallback: poll every 2s (Linux without recursive watch support) process.stderr.write(`[watch-resources] fs.watch recursive not supported, falling back to polling\n`) - setInterval(() => { + pollInterval = setInterval(() => { try { sync() } catch {} }, 2000) } +process.on('exit', () => { + if (timer) clearTimeout(timer) + if (fsWatcher) fsWatcher.close() + if (pollInterval) clearInterval(pollInterval) +}) + process.stderr.write(`[watch-resources] Watching src/resources/ → dist/resources/\n`) diff --git a/src/resources/extensions/bg-shell/overlay.ts b/src/resources/extensions/bg-shell/overlay.ts index ddaf744bb..5dd6a3872 100644 --- a/src/resources/extensions/bg-shell/overlay.ts +++ b/src/resources/extensions/bg-shell/overlay.ts @@ -430,6 +430,10 @@ export class BgManagerOverlay { return this.box(inner, width); } + dispose(): void { + clearInterval(this.refreshTimer); + } + invalidate(): void { this.cachedWidth = undefined; this.cachedLines = undefined; diff --git a/src/resources/extensions/gsd/auto-supervisor.ts b/src/resources/extensions/gsd/auto-supervisor.ts index 4777f68e2..49bfbeca0 100644 --- a/src/resources/extensions/gsd/auto-supervisor.ts +++ b/src/resources/extensions/gsd/auto-supervisor.ts @@ -13,6 +13,10 @@ import { nativeHasChanges } from "./native-git-bridge.js"; /** Signals that should trigger lock cleanup on process termination. */ const CLEANUP_SIGNALS: NodeJS.Signals[] = ["SIGTERM", "SIGHUP", "SIGINT"]; +/** Module-level reference to the last registered handler, used as a safety net + * to prevent handler accumulation if the caller neglects to pass previousHandler. */ +let _currentSigtermHandler: (() => void) | null = null; + /** * Register signal handlers that clear lock files and exit cleanly. * Installs handlers on SIGTERM, SIGHUP, and SIGINT so that lock files @@ -29,15 +33,22 @@ export function registerSigtermHandler( currentBasePath: string, previousHandler: (() => void) | null, ): () => void { + // Remove the explicitly-passed previous handler if (previousHandler) { for (const sig of CLEANUP_SIGNALS) process.off(sig, previousHandler); } + // Safety net: also remove the module-tracked handler in case the caller + // forgot to pass previousHandler (prevents handler accumulation) + if (_currentSigtermHandler && _currentSigtermHandler !== previousHandler) { + for (const sig of CLEANUP_SIGNALS) process.off(sig, _currentSigtermHandler); + } const handler = () => { clearLock(currentBasePath); releaseSessionLock(currentBasePath); process.exit(0); }; for (const sig of CLEANUP_SIGNALS) process.on(sig, handler); + _currentSigtermHandler = handler; return handler; } @@ -46,6 +57,9 @@ export function deregisterSigtermHandler(handler: (() => void) | null): void { if (handler) { for (const sig of CLEANUP_SIGNALS) process.off(sig, handler); } + if (_currentSigtermHandler === handler) { + _currentSigtermHandler = null; + } } // ─── Working Tree Activity Detection ────────────────────────────────────────── diff --git a/src/resources/extensions/gsd/file-watcher.ts b/src/resources/extensions/gsd/file-watcher.ts index 98928ed62..a8b0be19c 100644 --- a/src/resources/extensions/gsd/file-watcher.ts +++ b/src/resources/extensions/gsd/file-watcher.ts @@ -3,6 +3,7 @@ import type { EventBus } from "@gsd/pi-coding-agent"; import { relative } from "node:path"; let watcher: FSWatcher | null = null; +let pending = new Map>(); const EVENT_MAP: Record = { "settings.json": "settings-changed", @@ -36,7 +37,7 @@ export async function startFileWatcher( const { watch } = await import("chokidar"); - const pending = new Map>(); + pending = new Map>(); function debounceEmit(event: string): void { const existing = pending.get(event); @@ -90,6 +91,8 @@ export async function startFileWatcher( * Stop the file watcher and clean up resources. */ export async function stopFileWatcher(): Promise { + for (const timer of pending.values()) clearTimeout(timer); + pending.clear(); if (watcher) { await watcher.close(); watcher = null; diff --git a/src/resources/extensions/gsd/gsd-db.ts b/src/resources/extensions/gsd/gsd-db.ts index bc6acae7d..8df0a095f 100644 --- a/src/resources/extensions/gsd/gsd-db.ts +++ b/src/resources/extensions/gsd/gsd-db.ts @@ -547,6 +547,7 @@ let currentDb: DbAdapter | null = null; let currentPath: string | null = null; /** PID that opened the current connection — used for diagnostic logging. */ let currentPid: number = 0; +let _exitHandlerRegistered = false; // ─── Public API ──────────────────────────────────────────────────────────── @@ -599,6 +600,12 @@ export function openDatabase(path: string): boolean { currentDb = adapter; currentPath = path; currentPid = process.pid; + + if (!_exitHandlerRegistered) { + _exitHandlerRegistered = true; + process.on("exit", () => { try { closeDatabase(); } catch {} }); + } + return true; } @@ -607,6 +614,9 @@ export function openDatabase(path: string): boolean { */ export function closeDatabase(): void { if (currentDb) { + try { + currentDb.exec('PRAGMA wal_checkpoint(TRUNCATE)'); + } catch { /* non-fatal — best effort before close */ } try { currentDb.close(); } catch { diff --git a/src/resources/extensions/gsd/parallel-orchestrator.ts b/src/resources/extensions/gsd/parallel-orchestrator.ts index 86aa480f7..d2b71be22 100644 --- a/src/resources/extensions/gsd/parallel-orchestrator.ts +++ b/src/resources/extensions/gsd/parallel-orchestrator.ts @@ -54,6 +54,7 @@ export interface WorkerInfo { state: "running" | "paused" | "stopped" | "error"; completedUnits: number; cost: number; + cleanup?: () => void; } export interface OrchestratorState { @@ -357,6 +358,16 @@ export async function startParallel( const config = resolveParallelConfig(prefs); + // Release any leftover state from a previous session before reassigning + if (state) { + for (const w of state.workers.values()) { + w.cleanup?.(); + w.cleanup = undefined; + w.process = null; + } + state.workers.clear(); + } + // Try to restore from a previous crash const restored = restoreState(basePath); if (restored && restored.workers.length > 0) { @@ -598,12 +609,26 @@ export function spawnWorker( worktreePath: worker.worktreePath, }); + // Store cleanup function to remove all listeners from the child process. + // This prevents listener accumulation when workers are respawned, since + // handler closures capture milestoneId and other data that would otherwise + // be retained indefinitely. + worker.cleanup = () => { + child.stdout?.removeAllListeners(); + child.stderr?.removeAllListeners(); + child.removeAllListeners(); + }; + // Handle worker exit child.on("exit", (code) => { if (!state) return; const w = state.workers.get(milestoneId); if (!w) return; + // Remove all stream listeners to release closure references + w.cleanup?.(); + w.cleanup = undefined; + w.process = null; if (w.state === "stopped") return; // graceful stop, already handled @@ -795,6 +820,10 @@ export async function stopParallel( await waitForWorkerExit(worker, 250); } + // Remove stream listeners before releasing the process handle + worker.cleanup?.(); + worker.cleanup = undefined; + // Update in-memory state worker.state = "stopped"; worker.process = null; @@ -880,6 +909,8 @@ export function refreshWorkerStatuses( for (const mid of staleIds) { const worker = state.workers.get(mid); if (worker) { + worker.cleanup?.(); + worker.cleanup = undefined; worker.state = "error"; worker.process = null; } @@ -897,6 +928,8 @@ export function refreshWorkerStatuses( const diskStatus = statusMap.get(mid); if (!diskStatus) { if (!isPidAlive(worker.pid)) { + worker.cleanup?.(); + worker.cleanup = undefined; worker.state = worker.completedUnits > 0 ? "stopped" : "error"; worker.process = null; } @@ -938,5 +971,15 @@ export function isBudgetExceeded(): boolean { /** Reset orchestrator state. Called on clean shutdown. */ export function resetOrchestrator(): void { + if (state) { + // Explicitly release all WorkerInfo references and run any pending + // cleanup callbacks so child process stream closures are freed. + for (const w of state.workers.values()) { + w.cleanup?.(); + w.cleanup = undefined; + w.process = null; + } + state.workers.clear(); + } state = null; }