perf: add concurrency limits to unbounded Promise.all operations (#1029)
Cap parallel async operations to prevent memory spikes when processing large numbers of items: - session-manager.ts: limit file loading to 10 concurrent reads - pipeline.ts: limit job execution to 5 concurrent LLM calls - discovery.ts: limit tool scanning to 5 concurrent scanners Uses an inline pLimit utility in each file to avoid adding a dependency. Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
df7a8e138c
commit
61f4693f16
3 changed files with 89 additions and 24 deletions
|
|
@ -27,6 +27,25 @@ import {
|
|||
} from "./messages.js";
|
||||
import { BlobStore, externalizeImageData, isBlobRef, resolveImageData } from "./blob-store.js";
|
||||
|
||||
/** Inline concurrency limiter to cap parallel async operations. */
|
||||
function pLimit(concurrency: number) {
|
||||
const queue: (() => void)[] = [];
|
||||
let active = 0;
|
||||
return <T>(fn: () => Promise<T>): Promise<T> => {
|
||||
return new Promise<T>((resolve, reject) => {
|
||||
const run = () => {
|
||||
active++;
|
||||
fn().then(resolve, reject).finally(() => {
|
||||
active--;
|
||||
if (queue.length > 0) queue.shift()!();
|
||||
});
|
||||
};
|
||||
if (active < concurrency) run();
|
||||
else queue.push(run);
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
const BLOB_EXTERNALIZE_THRESHOLD = 1024; // 1KB minimum to externalize
|
||||
const MAX_PERSIST_CHARS = 500_000;
|
||||
const TRUNCATION_NOTICE = "\n\n[Session persistence truncated large content]";
|
||||
|
|
@ -1624,13 +1643,17 @@ export class SessionManager {
|
|||
const sessions: SessionInfo[] = [];
|
||||
const allFiles = dirFiles.flat();
|
||||
|
||||
// Limit concurrency to avoid memory spikes with many session files
|
||||
const limit = pLimit(10);
|
||||
const results = await Promise.all(
|
||||
allFiles.map(async (file) => {
|
||||
const info = await buildSessionInfo(file);
|
||||
loaded++;
|
||||
onProgress?.(loaded, totalFiles);
|
||||
return info;
|
||||
}),
|
||||
allFiles.map((file) =>
|
||||
limit(async () => {
|
||||
const info = await buildSessionInfo(file);
|
||||
loaded++;
|
||||
onProgress?.(loaded, totalFiles);
|
||||
return info;
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
for (const info of results) {
|
||||
|
|
|
|||
|
|
@ -11,6 +11,25 @@ import { join } from "path";
|
|||
import { createInterface } from "readline";
|
||||
import type { MemoryStorage } from "./storage.js";
|
||||
|
||||
/** Inline concurrency limiter to cap parallel async operations. */
|
||||
function pLimit(concurrency: number) {
|
||||
const queue: (() => void)[] = [];
|
||||
let active = 0;
|
||||
return <T>(fn: () => Promise<T>): Promise<T> => {
|
||||
return new Promise<T>((resolve, reject) => {
|
||||
const run = () => {
|
||||
active++;
|
||||
fn().then(resolve, reject).finally(() => {
|
||||
active--;
|
||||
if (queue.length > 0) queue.shift()!();
|
||||
});
|
||||
};
|
||||
if (active < concurrency) run();
|
||||
else queue.push(run);
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
/** Max session file size to process (50MB) — prevents OOM with concurrent workers */
|
||||
const MAX_SESSION_FILE_SIZE = 50 * 1024 * 1024;
|
||||
|
||||
|
|
@ -320,8 +339,9 @@ async function runPhase1(
|
|||
return { processed: 0, errors: 0 };
|
||||
}
|
||||
|
||||
// Process jobs concurrently
|
||||
const promises = jobs.map(async (job) => {
|
||||
// Process jobs with bounded concurrency to avoid memory spikes
|
||||
const limit = pLimit(5);
|
||||
const promises = jobs.map((job) => limit(async () => {
|
||||
try {
|
||||
const thread = storage.getThread(job.threadId);
|
||||
if (!thread) {
|
||||
|
|
@ -369,7 +389,7 @@ async function runPhase1(
|
|||
storage.failStage1Job(job.threadId, message);
|
||||
errors++;
|
||||
}
|
||||
});
|
||||
}));
|
||||
|
||||
await Promise.all(promises);
|
||||
return { processed, errors };
|
||||
|
|
|
|||
|
|
@ -10,6 +10,25 @@ import { TOOLS } from "./tools.js";
|
|||
import { SCANNERS } from "./scanners.js";
|
||||
import type { DiscoveryResult, DiscoveredItem, ToolDiscoveryResult } from "./types.js";
|
||||
|
||||
/** Inline concurrency limiter to cap parallel async operations. */
|
||||
function pLimit(concurrency: number) {
|
||||
const queue: (() => void)[] = [];
|
||||
let active = 0;
|
||||
return <T>(fn: () => Promise<T>): Promise<T> => {
|
||||
return new Promise<T>((resolve, reject) => {
|
||||
const run = () => {
|
||||
active++;
|
||||
fn().then(resolve, reject).finally(() => {
|
||||
active--;
|
||||
if (queue.length > 0) queue.shift()!();
|
||||
});
|
||||
};
|
||||
if (active < concurrency) run();
|
||||
else queue.push(run);
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Run universal config discovery across all supported AI coding tools.
|
||||
*
|
||||
|
|
@ -25,22 +44,25 @@ export async function discoverAllConfigs(
|
|||
const allWarnings: string[] = [];
|
||||
const toolResults: ToolDiscoveryResult[] = [];
|
||||
|
||||
// Run all scanners in parallel
|
||||
// Run scanners with bounded concurrency to avoid memory spikes
|
||||
const limit = pLimit(5);
|
||||
const results = await Promise.allSettled(
|
||||
TOOLS.map(async (tool) => {
|
||||
const scanner = SCANNERS[tool.id];
|
||||
if (!scanner) return { tool, items: [] as DiscoveredItem[], warnings: [`No scanner for ${tool.id}`] };
|
||||
try {
|
||||
const { items, warnings } = await scanner(projectRoot, home, tool);
|
||||
return { tool, items, warnings };
|
||||
} catch (err) {
|
||||
return {
|
||||
tool,
|
||||
items: [] as DiscoveredItem[],
|
||||
warnings: [`Scanner error for ${tool.name}: ${err instanceof Error ? err.message : String(err)}`],
|
||||
};
|
||||
}
|
||||
}),
|
||||
TOOLS.map((tool) =>
|
||||
limit(async () => {
|
||||
const scanner = SCANNERS[tool.id];
|
||||
if (!scanner) return { tool, items: [] as DiscoveredItem[], warnings: [`No scanner for ${tool.id}`] };
|
||||
try {
|
||||
const { items, warnings } = await scanner(projectRoot, home, tool);
|
||||
return { tool, items, warnings };
|
||||
} catch (err) {
|
||||
return {
|
||||
tool,
|
||||
items: [] as DiscoveredItem[],
|
||||
warnings: [`Scanner error for ${tool.name}: ${err instanceof Error ? err.message : String(err)}`],
|
||||
};
|
||||
}
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
for (const result of results) {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue