diff --git a/packages/pi-coding-agent/src/core/session-manager.ts b/packages/pi-coding-agent/src/core/session-manager.ts index b61605b81..ef60d4120 100644 --- a/packages/pi-coding-agent/src/core/session-manager.ts +++ b/packages/pi-coding-agent/src/core/session-manager.ts @@ -27,6 +27,25 @@ import { } from "./messages.js"; import { BlobStore, externalizeImageData, isBlobRef, resolveImageData } from "./blob-store.js"; +/** Inline concurrency limiter to cap parallel async operations. */ +function pLimit(concurrency: number) { + const queue: (() => void)[] = []; + let active = 0; + return (fn: () => Promise): Promise => { + return new Promise((resolve, reject) => { + const run = () => { + active++; + fn().then(resolve, reject).finally(() => { + active--; + if (queue.length > 0) queue.shift()!(); + }); + }; + if (active < concurrency) run(); + else queue.push(run); + }); + }; +} + const BLOB_EXTERNALIZE_THRESHOLD = 1024; // 1KB minimum to externalize const MAX_PERSIST_CHARS = 500_000; const TRUNCATION_NOTICE = "\n\n[Session persistence truncated large content]"; @@ -1624,13 +1643,17 @@ export class SessionManager { const sessions: SessionInfo[] = []; const allFiles = dirFiles.flat(); + // Limit concurrency to avoid memory spikes with many session files + const limit = pLimit(10); const results = await Promise.all( - allFiles.map(async (file) => { - const info = await buildSessionInfo(file); - loaded++; - onProgress?.(loaded, totalFiles); - return info; - }), + allFiles.map((file) => + limit(async () => { + const info = await buildSessionInfo(file); + loaded++; + onProgress?.(loaded, totalFiles); + return info; + }), + ), ); for (const info of results) { diff --git a/packages/pi-coding-agent/src/resources/extensions/memory/pipeline.ts b/packages/pi-coding-agent/src/resources/extensions/memory/pipeline.ts index 78dea701f..ca2d4943c 100644 --- a/packages/pi-coding-agent/src/resources/extensions/memory/pipeline.ts +++ b/packages/pi-coding-agent/src/resources/extensions/memory/pipeline.ts @@ -11,6 +11,25 @@ import { join } from "path"; import { createInterface } from "readline"; import type { MemoryStorage } from "./storage.js"; +/** Inline concurrency limiter to cap parallel async operations. */ +function pLimit(concurrency: number) { + const queue: (() => void)[] = []; + let active = 0; + return (fn: () => Promise): Promise => { + return new Promise((resolve, reject) => { + const run = () => { + active++; + fn().then(resolve, reject).finally(() => { + active--; + if (queue.length > 0) queue.shift()!(); + }); + }; + if (active < concurrency) run(); + else queue.push(run); + }); + }; +} + /** Max session file size to process (50MB) — prevents OOM with concurrent workers */ const MAX_SESSION_FILE_SIZE = 50 * 1024 * 1024; @@ -320,8 +339,9 @@ async function runPhase1( return { processed: 0, errors: 0 }; } - // Process jobs concurrently - const promises = jobs.map(async (job) => { + // Process jobs with bounded concurrency to avoid memory spikes + const limit = pLimit(5); + const promises = jobs.map((job) => limit(async () => { try { const thread = storage.getThread(job.threadId); if (!thread) { @@ -369,7 +389,7 @@ async function runPhase1( storage.failStage1Job(job.threadId, message); errors++; } - }); + })); await Promise.all(promises); return { processed, errors }; diff --git a/src/resources/extensions/universal-config/discovery.ts b/src/resources/extensions/universal-config/discovery.ts index 39516ecd0..2d1a691a6 100644 --- a/src/resources/extensions/universal-config/discovery.ts +++ b/src/resources/extensions/universal-config/discovery.ts @@ -10,6 +10,25 @@ import { TOOLS } from "./tools.js"; import { SCANNERS } from "./scanners.js"; import type { DiscoveryResult, DiscoveredItem, ToolDiscoveryResult } from "./types.js"; +/** Inline concurrency limiter to cap parallel async operations. */ +function pLimit(concurrency: number) { + const queue: (() => void)[] = []; + let active = 0; + return (fn: () => Promise): Promise => { + return new Promise((resolve, reject) => { + const run = () => { + active++; + fn().then(resolve, reject).finally(() => { + active--; + if (queue.length > 0) queue.shift()!(); + }); + }; + if (active < concurrency) run(); + else queue.push(run); + }); + }; +} + /** * Run universal config discovery across all supported AI coding tools. * @@ -25,22 +44,25 @@ export async function discoverAllConfigs( const allWarnings: string[] = []; const toolResults: ToolDiscoveryResult[] = []; - // Run all scanners in parallel + // Run scanners with bounded concurrency to avoid memory spikes + const limit = pLimit(5); const results = await Promise.allSettled( - TOOLS.map(async (tool) => { - const scanner = SCANNERS[tool.id]; - if (!scanner) return { tool, items: [] as DiscoveredItem[], warnings: [`No scanner for ${tool.id}`] }; - try { - const { items, warnings } = await scanner(projectRoot, home, tool); - return { tool, items, warnings }; - } catch (err) { - return { - tool, - items: [] as DiscoveredItem[], - warnings: [`Scanner error for ${tool.name}: ${err instanceof Error ? err.message : String(err)}`], - }; - } - }), + TOOLS.map((tool) => + limit(async () => { + const scanner = SCANNERS[tool.id]; + if (!scanner) return { tool, items: [] as DiscoveredItem[], warnings: [`No scanner for ${tool.id}`] }; + try { + const { items, warnings } = await scanner(projectRoot, home, tool); + return { tool, items, warnings }; + } catch (err) { + return { + tool, + items: [] as DiscoveredItem[], + warnings: [`Scanner error for ${tool.name}: ${err instanceof Error ? err.message : String(err)}`], + }; + } + }), + ), ); for (const result of results) {