fix(memory): fix memory and resource leaks across TUI, LSP, DB, and automation (#2314)

* fix(memory): fix memory and resource leaks across TUI, LSP, DB, and automation

Addresses all findings from a systematic memory leak audit across five
dimensions: event listeners, timers, file system handles, subscriptions/
closures, and GSD automation lifecycle.

Critical fixes:

rpc-client.ts: stderr .on("data") handler attached in start() was never
removed in stop(). Now stored as _stderrHandler and removed via
removeListener() on stop.

lsp/client.ts: Three process.on() handlers (beforeExit, SIGINT, SIGTERM)
registered at module load time with anonymous functions — impossible to
remove. Now stored as named references; new removeProcessHandlers() export
allows graceful teardown. stdout/stderr stream listeners in
startMessageReader/startStderrReader also stored per-client in
clientStreamHandlers map and removed in shutdownClient() and shutdownAll().

parallel-orchestrator.ts: spawnWorker() attached 5 listeners to child
process streams on every spawn with no removal on worker stop/respawn,
accumulating listeners indefinitely. Added cleanup() field to WorkerInfo;
called via removeAllListeners() on exit, graceful stop, stale detection,
and dead PID cleanup paths. Also: module-level state.workers Map was never
cleared between orchestration runs; startParallel() and resetOrchestrator()
now iterate and clean up all WorkerInfo entries before reassigning state.

scripts/watch-resources.js: fs.watch() return value was discarded (OS
watcher never closed) and the fallback setInterval handle was also
discarded (timer ran forever). Both now stored; process.on("exit") handler
closes/clears them.

gsd-db.ts: closeDatabase() did not checkpoint the WAL before closing —
.db-shm/.db-wal files accumulated on disk across crash-recovery cycles.
Now runs PRAGMA wal_checkpoint(TRUNCATE) before close. Also added a
one-time process.on("exit") handler in openDatabase() so the handle is
always closed even on unclean exits.

Medium fixes:

bg-shell/overlay.ts: 1-second refresh setInterval only cleared in
keyboard exit handler; abnormal teardown leaked the timer. Added dispose()
method that unconditionally clears it.

file-watcher.ts: pending debounce Map was scoped inside startFileWatcher()
making it inaccessible to stopFileWatcher(). Moved to module scope;
stopFileWatcher() now clears all pending timers and empties the map before
closing the watcher.

auto-supervisor.ts: registerSigtermHandler() could accumulate multiple
SIGTERM handlers if called without passing back the previous reference.
Added module-level _currentSigtermHandler; old handler is always removed
before registering the new one regardless of whether caller passes it.

Low-severity fixes:

print-mode.ts: session.subscribe() return value was discarded. Now stored
and called in a finally block to guarantee cleanup on both normal
completion and errors.

rpc-mode.ts: same — subscribe() unsubscribe now called in the shutdown
path before process.exit().

theme.ts: onThemeChangeCallback singleton silently overwrote any previous
subscriber. Converted to Set<() => void>; onThemeChange() now returns a
cleanup function. All four internal call sites updated to forEach().
Backward-compatible — existing callers that discard the return are unaffected.

* fix: ensure unsubscribe is called on error/abort in print-mode

The PR #2314 added unsubscribe storage but still called process.exit(1)
directly, bypassing the unsubscribe. Wrapped in try/finally to guarantee
cleanup runs before exit.
This commit is contained in:
Jeremy McSpadden 2026-03-24 08:23:36 -05:00 committed by GitHub
parent 30daeeb8f4
commit 867a4be297
11 changed files with 230 additions and 75 deletions

View file

@ -24,6 +24,17 @@ const clients = new Map<string, LspClient>();
const clientLocks = new Map<string, Promise<LspClient>>();
const fileOperationLocks = new Map<string, Promise<void>>();
/** Track stream listeners per client so they can be removed on shutdown. */
interface StreamHandlers {
stdoutData?: (chunk: Buffer) => void;
stdoutEnd?: () => void;
stdoutError?: () => void;
stderrData?: (chunk: Buffer) => void;
stderrEnd?: () => void;
stderrError?: () => void;
}
const clientStreamHandlers = new Map<string, StreamHandlers>();
// Idle timeout configuration (disabled by default)
let idleTimeoutMs: number | null = null;
let idleCheckInterval: ReturnType<typeof setInterval> | null = null;
@ -257,7 +268,9 @@ async function startMessageReader(client: LspClient): Promise<void> {
}
return new Promise<void>((resolve) => {
stdout.on("data", async (chunk: Buffer) => {
const handlers = clientStreamHandlers.get(client.name) ?? {};
handlers.stdoutData = async (chunk: Buffer) => {
const currentBuffer: Buffer = Buffer.concat([client.messageBuffer, chunk]);
if (currentBuffer.length > MAX_MESSAGE_BUFFER_SIZE) {
@ -307,17 +320,22 @@ async function startMessageReader(client: LspClient): Promise<void> {
}
client.messageBuffer = workingBuffer;
});
};
stdout.on("data", handlers.stdoutData);
stdout.on("end", () => {
handlers.stdoutEnd = () => {
client.isReading = false;
resolve();
});
};
stdout.on("end", handlers.stdoutEnd);
stdout.on("error", () => {
handlers.stdoutError = () => {
client.isReading = false;
resolve();
});
};
stdout.on("error", handlers.stdoutError);
clientStreamHandlers.set(client.name, handlers);
});
}
@ -402,21 +420,28 @@ async function startStderrReader(client: LspClient): Promise<void> {
if (!stderr) return;
return new Promise<void>((resolve) => {
stderr.on("data", (chunk: Buffer) => {
const handlers = clientStreamHandlers.get(client.name) ?? {};
handlers.stderrData = (chunk: Buffer) => {
const text = chunk.toString("utf-8");
client.stderrBuffer += text;
if (client.stderrBuffer.length > 4096) {
client.stderrBuffer = client.stderrBuffer.slice(-4096);
}
});
};
stderr.on("data", handlers.stderrData);
stderr.on("end", () => {
handlers.stderrEnd = () => {
resolve();
});
};
stderr.on("end", handlers.stderrEnd);
stderr.on("error", () => {
handlers.stderrError = () => {
resolve();
});
};
stderr.on("error", handlers.stderrError);
clientStreamHandlers.set(client.name, handlers);
});
}
@ -706,6 +731,23 @@ export function notifyFileChanged(filePath: string): void {
}
}
/**
* Remove stdout/stderr stream listeners for a client to prevent leaks.
*/
function removeStreamHandlers(client: LspClient): void {
const handlers = clientStreamHandlers.get(client.name);
if (!handlers) return;
if (handlers.stdoutData) client.proc.stdout?.removeListener("data", handlers.stdoutData);
if (handlers.stdoutEnd) client.proc.stdout?.removeListener("end", handlers.stdoutEnd);
if (handlers.stdoutError) client.proc.stdout?.removeListener("error", handlers.stdoutError);
if (handlers.stderrData) client.proc.stderr?.removeListener("data", handlers.stderrData);
if (handlers.stderrEnd) client.proc.stderr?.removeListener("end", handlers.stderrEnd);
if (handlers.stderrError) client.proc.stderr?.removeListener("error", handlers.stderrError);
clientStreamHandlers.delete(client.name);
}
/**
* Shutdown a specific client by key.
*/
@ -720,6 +762,9 @@ function shutdownClient(key: string): void {
sendRequest(client, "shutdown", null).catch(() => {});
// Remove stream listeners before killing the process
removeStreamHandlers(client);
try {
killProcessTree(client.proc.pid);
} catch {
@ -860,6 +905,9 @@ function shutdownAll(): void {
pending.reject(err);
}
// Remove stream listeners before killing the process
removeStreamHandlers(client);
void (async () => {
const timeout = new Promise<void>(resolve => setTimeout(resolve, 5_000));
const result = sendRequest(client, "shutdown", null).catch(() => {});
@ -893,14 +941,28 @@ export function getActiveClients(): LspServerStatus[] {
// Process Cleanup
// =============================================================================
const _beforeExitHandler = () => shutdownAll();
const _sigintHandler = () => {
shutdownAll();
process.exit(0);
};
const _sigtermHandler = () => {
shutdownAll();
process.exit(0);
};
if (typeof process !== "undefined") {
process.on("beforeExit", shutdownAll);
process.on("SIGINT", () => {
shutdownAll();
process.exit(0);
});
process.on("SIGTERM", () => {
shutdownAll();
process.exit(0);
});
process.on("beforeExit", _beforeExitHandler);
process.on("SIGINT", _sigintHandler);
process.on("SIGTERM", _sigtermHandler);
}
/**
* Remove process-level signal handlers registered at module load.
* Call this during graceful teardown to prevent leaked listeners.
*/
export function removeProcessHandlers(): void {
process.off("beforeExit", _beforeExitHandler);
process.off("SIGINT", _sigintHandler);
process.off("SIGTERM", _sigtermHandler);
}

View file

@ -663,7 +663,7 @@ function setGlobalTheme(t: Theme): void {
let currentThemeName: string | undefined;
let themeWatcher: fs.FSWatcher | undefined;
let onThemeChangeCallback: (() => void) | undefined;
const onThemeChangeCallbacks = new Set<() => void>();
const registeredThemes = new Map<string, Theme>();
export function setRegisteredThemes(themes: Theme[]): void {
@ -698,9 +698,7 @@ export function setTheme(name: string, enableWatcher: boolean = false): { succes
if (enableWatcher) {
startThemeWatcher();
}
if (onThemeChangeCallback) {
onThemeChangeCallback();
}
onThemeChangeCallbacks.forEach(cb => cb());
return { success: true };
} catch (error) {
// Theme is invalid - fall back to dark theme
@ -718,13 +716,12 @@ export function setThemeInstance(themeInstance: Theme): void {
setGlobalTheme(themeInstance);
currentThemeName = "<in-memory>";
stopThemeWatcher(); // Can't watch a direct instance
if (onThemeChangeCallback) {
onThemeChangeCallback();
}
onThemeChangeCallbacks.forEach(cb => cb());
}
export function onThemeChange(callback: () => void): void {
onThemeChangeCallback = callback;
export function onThemeChange(callback: () => void): () => void {
onThemeChangeCallbacks.add(callback);
return () => { onThemeChangeCallbacks.delete(callback); };
}
function startThemeWatcher(): void {
@ -755,10 +752,8 @@ function startThemeWatcher(): void {
try {
// Reload the theme
setGlobalTheme(loadTheme(currentThemeName!));
// Notify callback (to invalidate UI)
if (onThemeChangeCallback) {
onThemeChangeCallback();
}
// Notify callbacks (to invalidate UI)
onThemeChangeCallbacks.forEach(cb => cb());
} catch (_error) {
// Ignore errors (file might be in invalid state while being edited)
}
@ -773,9 +768,7 @@ function startThemeWatcher(): void {
themeWatcher.close();
themeWatcher = undefined;
}
if (onThemeChangeCallback) {
onThemeChangeCallback();
}
onThemeChangeCallbacks.forEach(cb => cb());
}
}, 100);
}

View file

@ -45,52 +45,62 @@ export async function runPrintMode(session: AgentSession, options: PrintModeOpti
});
// Always subscribe to enable session persistence via _handleAgentEvent
session.subscribe((event) => {
const unsubscribe = session.subscribe((event) => {
// In JSON mode, output all events
if (mode === "json") {
console.log(JSON.stringify(event));
}
});
// Send initial message with attachments
if (initialMessage) {
await session.prompt(initialMessage, { images: initialImages });
}
let exitCode = 0;
// Send remaining messages
for (const message of messages) {
await session.prompt(message);
}
try {
// Send initial message with attachments
if (initialMessage) {
await session.prompt(initialMessage, { images: initialImages });
}
// In text mode, output final response
if (mode === "text") {
const state = session.state;
const lastMessage = state.messages[state.messages.length - 1];
// Send remaining messages
for (const message of messages) {
await session.prompt(message);
}
if (lastMessage?.role === "assistant") {
const assistantMsg = lastMessage as AssistantMessage;
// In text mode, output final response
if (mode === "text") {
const state = session.state;
const lastMessage = state.messages[state.messages.length - 1];
// Check for error/aborted
if (assistantMsg.stopReason === "error" || assistantMsg.stopReason === "aborted") {
console.error(assistantMsg.errorMessage || `Request ${assistantMsg.stopReason}`);
process.exit(1);
}
if (lastMessage?.role === "assistant") {
const assistantMsg = lastMessage as AssistantMessage;
// Output text content
for (const content of assistantMsg.content) {
if (content.type === "text") {
console.log(content.text);
// Check for error/aborted
if (assistantMsg.stopReason === "error" || assistantMsg.stopReason === "aborted") {
console.error(assistantMsg.errorMessage || `Request ${assistantMsg.stopReason}`);
exitCode = 1;
} else {
// Output text content
for (const content of assistantMsg.content) {
if (content.type === "text") {
console.log(content.text);
}
}
}
}
}
// Ensure stdout is fully flushed before returning
// This prevents race conditions where the process exits before all output is written
await new Promise<void>((resolve, reject) => {
process.stdout.write("", (err) => {
if (err) reject(err);
else resolve();
});
});
} finally {
unsubscribe();
}
// Ensure stdout is fully flushed before returning
// This prevents race conditions where the process exits before all output is written
await new Promise<void>((resolve, reject) => {
process.stdout.write("", (err) => {
if (err) reject(err);
else resolve();
});
});
if (exitCode !== 0) {
process.exit(exitCode);
}
}

View file

@ -54,6 +54,7 @@ export type RpcEventListener = (event: AgentEvent) => void;
export class RpcClient {
private process: ChildProcess | null = null;
private stopReadingStdout: (() => void) | null = null;
private _stderrHandler?: (data: Buffer) => void;
private eventListeners: RpcEventListener[] = [];
private pendingRequests: Map<string, { resolve: (response: RpcResponse) => void; reject: (error: Error) => void }> =
new Map();
@ -90,9 +91,10 @@ export class RpcClient {
});
// Collect stderr for debugging
this.process.stderr?.on("data", (data) => {
this._stderrHandler = (data: Buffer) => {
this.stderr += data.toString();
});
};
this.process.stderr?.on("data", this._stderrHandler);
// Set up strict JSONL reader for stdout.
this.stopReadingStdout = attachJsonlLineReader(this.process.stdout!, (line) => {
@ -127,6 +129,10 @@ export class RpcClient {
this.stopReadingStdout?.();
this.stopReadingStdout = null;
if (this._stderrHandler) {
this.process.stderr?.removeListener("data", this._stderrHandler);
this._stderrHandler = undefined;
}
this.process.kill("SIGTERM");
// Wait for process to exit

View file

@ -424,7 +424,7 @@ export async function runRpcMode(session: AgentSession): Promise<never> {
void extensionsReadyPromise;
// Output all agent events as JSON
session.subscribe((event) => {
const unsubscribe = session.subscribe((event) => {
output(event);
});
@ -730,6 +730,7 @@ export async function runRpcMode(session: AgentSession): Promise<never> {
await currentRunner.emit({ type: "session_shutdown" });
}
unsubscribe();
embeddedInteractiveMode?.stop();
detachInput();
process.stdin.pause();

View file

@ -37,6 +37,9 @@ process.stderr.write(`[watch-resources] Initial sync done\n`)
// On Linux (Node <20.13) it throws ERR_FEATURE_UNAVAILABLE_ON_PLATFORM.
// Fall back to polling on unsupported platforms.
let timer = null
let fsWatcher = null
let pollInterval = null
const onChange = () => {
if (timer) clearTimeout(timer)
timer = setTimeout(() => {
@ -46,13 +49,19 @@ const onChange = () => {
}
try {
watch(src, { recursive: true }, onChange)
fsWatcher = watch(src, { recursive: true }, onChange)
} catch {
// Fallback: poll every 2s (Linux without recursive watch support)
process.stderr.write(`[watch-resources] fs.watch recursive not supported, falling back to polling\n`)
setInterval(() => {
pollInterval = setInterval(() => {
try { sync() } catch {}
}, 2000)
}
process.on('exit', () => {
if (timer) clearTimeout(timer)
if (fsWatcher) fsWatcher.close()
if (pollInterval) clearInterval(pollInterval)
})
process.stderr.write(`[watch-resources] Watching src/resources/ → dist/resources/\n`)

View file

@ -430,6 +430,10 @@ export class BgManagerOverlay {
return this.box(inner, width);
}
dispose(): void {
clearInterval(this.refreshTimer);
}
invalidate(): void {
this.cachedWidth = undefined;
this.cachedLines = undefined;

View file

@ -13,6 +13,10 @@ import { nativeHasChanges } from "./native-git-bridge.js";
/** Signals that should trigger lock cleanup on process termination. */
const CLEANUP_SIGNALS: NodeJS.Signals[] = ["SIGTERM", "SIGHUP", "SIGINT"];
/** Module-level reference to the last registered handler, used as a safety net
* to prevent handler accumulation if the caller neglects to pass previousHandler. */
let _currentSigtermHandler: (() => void) | null = null;
/**
* Register signal handlers that clear lock files and exit cleanly.
* Installs handlers on SIGTERM, SIGHUP, and SIGINT so that lock files
@ -29,15 +33,22 @@ export function registerSigtermHandler(
currentBasePath: string,
previousHandler: (() => void) | null,
): () => void {
// Remove the explicitly-passed previous handler
if (previousHandler) {
for (const sig of CLEANUP_SIGNALS) process.off(sig, previousHandler);
}
// Safety net: also remove the module-tracked handler in case the caller
// forgot to pass previousHandler (prevents handler accumulation)
if (_currentSigtermHandler && _currentSigtermHandler !== previousHandler) {
for (const sig of CLEANUP_SIGNALS) process.off(sig, _currentSigtermHandler);
}
const handler = () => {
clearLock(currentBasePath);
releaseSessionLock(currentBasePath);
process.exit(0);
};
for (const sig of CLEANUP_SIGNALS) process.on(sig, handler);
_currentSigtermHandler = handler;
return handler;
}
@ -46,6 +57,9 @@ export function deregisterSigtermHandler(handler: (() => void) | null): void {
if (handler) {
for (const sig of CLEANUP_SIGNALS) process.off(sig, handler);
}
if (_currentSigtermHandler === handler) {
_currentSigtermHandler = null;
}
}
// ─── Working Tree Activity Detection ──────────────────────────────────────────

View file

@ -3,6 +3,7 @@ import type { EventBus } from "@gsd/pi-coding-agent";
import { relative } from "node:path";
let watcher: FSWatcher | null = null;
let pending = new Map<string, ReturnType<typeof setTimeout>>();
const EVENT_MAP: Record<string, string> = {
"settings.json": "settings-changed",
@ -36,7 +37,7 @@ export async function startFileWatcher(
const { watch } = await import("chokidar");
const pending = new Map<string, ReturnType<typeof setTimeout>>();
pending = new Map<string, ReturnType<typeof setTimeout>>();
function debounceEmit(event: string): void {
const existing = pending.get(event);
@ -90,6 +91,8 @@ export async function startFileWatcher(
* Stop the file watcher and clean up resources.
*/
export async function stopFileWatcher(): Promise<void> {
for (const timer of pending.values()) clearTimeout(timer);
pending.clear();
if (watcher) {
await watcher.close();
watcher = null;

View file

@ -547,6 +547,7 @@ let currentDb: DbAdapter | null = null;
let currentPath: string | null = null;
/** PID that opened the current connection — used for diagnostic logging. */
let currentPid: number = 0;
let _exitHandlerRegistered = false;
// ─── Public API ────────────────────────────────────────────────────────────
@ -599,6 +600,12 @@ export function openDatabase(path: string): boolean {
currentDb = adapter;
currentPath = path;
currentPid = process.pid;
if (!_exitHandlerRegistered) {
_exitHandlerRegistered = true;
process.on("exit", () => { try { closeDatabase(); } catch {} });
}
return true;
}
@ -607,6 +614,9 @@ export function openDatabase(path: string): boolean {
*/
export function closeDatabase(): void {
if (currentDb) {
try {
currentDb.exec('PRAGMA wal_checkpoint(TRUNCATE)');
} catch { /* non-fatal — best effort before close */ }
try {
currentDb.close();
} catch {

View file

@ -54,6 +54,7 @@ export interface WorkerInfo {
state: "running" | "paused" | "stopped" | "error";
completedUnits: number;
cost: number;
cleanup?: () => void;
}
export interface OrchestratorState {
@ -357,6 +358,16 @@ export async function startParallel(
const config = resolveParallelConfig(prefs);
// Release any leftover state from a previous session before reassigning
if (state) {
for (const w of state.workers.values()) {
w.cleanup?.();
w.cleanup = undefined;
w.process = null;
}
state.workers.clear();
}
// Try to restore from a previous crash
const restored = restoreState(basePath);
if (restored && restored.workers.length > 0) {
@ -598,12 +609,26 @@ export function spawnWorker(
worktreePath: worker.worktreePath,
});
// Store cleanup function to remove all listeners from the child process.
// This prevents listener accumulation when workers are respawned, since
// handler closures capture milestoneId and other data that would otherwise
// be retained indefinitely.
worker.cleanup = () => {
child.stdout?.removeAllListeners();
child.stderr?.removeAllListeners();
child.removeAllListeners();
};
// Handle worker exit
child.on("exit", (code) => {
if (!state) return;
const w = state.workers.get(milestoneId);
if (!w) return;
// Remove all stream listeners to release closure references
w.cleanup?.();
w.cleanup = undefined;
w.process = null;
if (w.state === "stopped") return; // graceful stop, already handled
@ -795,6 +820,10 @@ export async function stopParallel(
await waitForWorkerExit(worker, 250);
}
// Remove stream listeners before releasing the process handle
worker.cleanup?.();
worker.cleanup = undefined;
// Update in-memory state
worker.state = "stopped";
worker.process = null;
@ -880,6 +909,8 @@ export function refreshWorkerStatuses(
for (const mid of staleIds) {
const worker = state.workers.get(mid);
if (worker) {
worker.cleanup?.();
worker.cleanup = undefined;
worker.state = "error";
worker.process = null;
}
@ -897,6 +928,8 @@ export function refreshWorkerStatuses(
const diskStatus = statusMap.get(mid);
if (!diskStatus) {
if (!isPidAlive(worker.pid)) {
worker.cleanup?.();
worker.cleanup = undefined;
worker.state = worker.completedUnits > 0 ? "stopped" : "error";
worker.process = null;
}
@ -938,5 +971,15 @@ export function isBudgetExceeded(): boolean {
/** Reset orchestrator state. Called on clean shutdown. */
export function resetOrchestrator(): void {
if (state) {
// Explicitly release all WorkerInfo references and run any pending
// cleanup callbacks so child process stream closures are freed.
for (const w of state.workers.values()) {
w.cleanup?.();
w.cleanup = undefined;
w.process = null;
}
state.workers.clear();
}
state = null;
}