From fa9477f638e7e537399f7a1d758ba53f29a181f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?T=C3=82CHES?= Date: Fri, 13 Mar 2026 16:01:30 -0600 Subject: [PATCH] feat: async background jobs extension (#260) --- .../src/core/settings-manager.ts | 15 +- packages/pi-coding-agent/src/index.ts | 2 +- src/loader.ts | 1 + .../extensions/async-jobs/async-bash-tool.ts | 211 +++++++++++++++ .../extensions/async-jobs/await-tool.ts | 101 +++++++ .../extensions/async-jobs/cancel-job-tool.ts | 34 +++ src/resources/extensions/async-jobs/index.ts | 133 ++++++++++ .../extensions/async-jobs/job-manager.ts | 250 ++++++++++++++++++ 8 files changed, 741 insertions(+), 6 deletions(-) create mode 100644 src/resources/extensions/async-jobs/async-bash-tool.ts create mode 100644 src/resources/extensions/async-jobs/await-tool.ts create mode 100644 src/resources/extensions/async-jobs/cancel-job-tool.ts create mode 100644 src/resources/extensions/async-jobs/index.ts create mode 100644 src/resources/extensions/async-jobs/job-manager.ts diff --git a/packages/pi-coding-agent/src/core/settings-manager.ts b/packages/pi-coding-agent/src/core/settings-manager.ts index 3c1521c2e..d9420e8a5 100644 --- a/packages/pi-coding-agent/src/core/settings-manager.ts +++ b/packages/pi-coding-agent/src/core/settings-manager.ts @@ -49,6 +49,11 @@ export interface MarkdownSettings { codeBlockIndent?: string; // default: " " } +export interface AsyncSettings { + enabled?: boolean; // default: false + maxJobs?: number; // default: 100 +} + export type TransportSetting = Transport; /** @@ -99,7 +104,7 @@ export interface Settings { autocompleteMaxVisible?: number; // Max visible items in autocomplete dropdown (default: 5) showHardwareCursor?: boolean; // Show terminal cursor while still positioning it for IME markdown?: MarkdownSettings; - bashInterceptor?: BashInterceptorSettings; + async?: AsyncSettings; } /** Deep merge settings: project/overrides take precedence, nested objects merge recursively */ @@ -947,11 +952,11 @@ export class SettingsManager { return this.settings.markdown?.codeBlockIndent ?? " "; } - getBashInterceptorEnabled(): boolean { - return this.settings.bashInterceptor?.enabled ?? true; + getAsyncEnabled(): boolean { + return this.settings.async?.enabled ?? false; } - getBashInterceptorRules(): BashInterceptorRule[] | undefined { - return this.settings.bashInterceptor?.rules; + getAsyncMaxJobs(): number { + return this.settings.async?.maxJobs ?? 100; } } diff --git a/packages/pi-coding-agent/src/index.ts b/packages/pi-coding-agent/src/index.ts index 4d75e487e..40bc8dd53 100644 --- a/packages/pi-coding-agent/src/index.ts +++ b/packages/pi-coding-agent/src/index.ts @@ -202,7 +202,7 @@ export { export { BlobStore, isBlobRef, parseBlobRef, externalizeImageData, resolveImageData } from "./core/blob-store.js"; export { ArtifactManager } from "./core/artifact-manager.js"; export { - type BashInterceptorSettings, + type AsyncSettings, type CompactionSettings, type ImageSettings, type PackageSource, diff --git a/src/loader.ts b/src/loader.ts index b212c7731..4630c9758 100644 --- a/src/loader.ts +++ b/src/loader.ts @@ -90,6 +90,7 @@ process.env.GSD_BUNDLED_EXTENSION_PATHS = [ join(agentDir, 'extensions', 'slash-commands', 'index.ts'), join(agentDir, 'extensions', 'subagent', 'index.ts'), join(agentDir, 'extensions', 'mac-tools', 'index.ts'), + join(agentDir, 'extensions', 'async-jobs', 'index.ts'), join(agentDir, 'extensions', 'ask-user-questions.ts'), join(agentDir, 'extensions', 'get-secrets-from-user.ts'), ].join(':') diff --git a/src/resources/extensions/async-jobs/async-bash-tool.ts b/src/resources/extensions/async-jobs/async-bash-tool.ts new file mode 100644 index 000000000..328b0dcf2 --- /dev/null +++ b/src/resources/extensions/async-jobs/async-bash-tool.ts @@ -0,0 +1,211 @@ +/** + * async_bash tool — run a bash command in the background. + * + * Registers the command with the AsyncJobManager and returns a job ID + * immediately. The LLM can continue working and check results later + * with await_job. + */ + +import type { ToolDefinition } from "@gsd/pi-coding-agent"; +import { + getShellConfig, + sanitizeCommand, + DEFAULT_MAX_BYTES, + DEFAULT_MAX_LINES, +} from "@gsd/pi-coding-agent"; +import { Type } from "@sinclair/typebox"; +import { spawn } from "node:child_process"; +import { createWriteStream } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { randomBytes } from "node:crypto"; +import type { AsyncJobManager } from "./job-manager.js"; + +const schema = Type.Object({ + command: Type.String({ description: "Bash command to execute in the background" }), + timeout: Type.Optional( + Type.Number({ description: "Timeout in seconds (optional)" }), + ), + label: Type.Optional( + Type.String({ description: "Short label for the job (shown in /jobs). Defaults to a truncated version of the command." }), + ), +}); + +function getTempFilePath(): string { + const id = randomBytes(8).toString("hex"); + return join(tmpdir(), `pi-async-bash-${id}.log`); +} + +/** + * Kill a process and its children. Uses process group kill on Unix. + */ +function killTree(pid: number): void { + try { + // Kill the process group (negative PID) + process.kill(-pid, "SIGTERM"); + } catch { + try { + process.kill(pid, "SIGTERM"); + } catch { + // Already exited + } + } +} + +export function createAsyncBashTool( + getManager: () => AsyncJobManager, + getCwd: () => string, +): ToolDefinition { + return { + name: "async_bash", + label: "Background Bash", + description: + `Run a bash command in the background. Returns a job ID immediately so you can continue working. ` + + `Use await_job to get results or cancel_job to stop. Ideal for long-running builds, tests, or installs. ` + + `Output is truncated to the last ${DEFAULT_MAX_LINES} lines or ${DEFAULT_MAX_BYTES / 1024}KB.`, + promptSnippet: "Run a bash command in the background, returning a job ID immediately.", + promptGuidelines: [ + "Use async_bash for commands that take more than a few seconds (builds, tests, installs, large git operations).", + "After starting async jobs, continue with other work and use await_job when you need the results.", + "Use cancel_job to stop a running background job.", + "Check /jobs to see all running and recent background jobs.", + ], + parameters: schema, + async execute(_toolCallId, params) { + const manager = getManager(); + const cwd = getCwd(); + const { command, timeout, label } = params; + const shortCmd = label ?? (command.length > 60 ? command.slice(0, 57) + "..." : command); + + const jobId = manager.register("bash", shortCmd, (signal) => { + return executeBashInBackground(command, cwd, signal, timeout); + }); + + return { + content: [{ + type: "text", + text: [ + `Background job started: **${jobId}**`, + `Command: \`${shortCmd}\``, + "", + "Use `await_job` to get results when ready, or `cancel_job` to stop.", + ].join("\n"), + }], + }; + }, + }; +} + +/** + * Execute a bash command, collecting output. Returns the text result. + */ +function executeBashInBackground( + command: string, + cwd: string, + signal: AbortSignal, + timeout?: number, +): Promise { + return new Promise((resolve, reject) => { + const { shell, args } = getShellConfig(); + const resolvedCommand = sanitizeCommand(command); + + const child = spawn(shell, [...args, resolvedCommand], { + cwd, + detached: true, + env: { ...process.env }, + stdio: ["ignore", "pipe", "pipe"], + }); + + let timedOut = false; + let timeoutHandle: ReturnType | undefined; + + if (timeout !== undefined && timeout > 0) { + timeoutHandle = setTimeout(() => { + timedOut = true; + if (child.pid) killTree(child.pid); + }, timeout * 1000); + } + + const chunks: Buffer[] = []; + let totalBytes = 0; + let spillFilePath: string | undefined; + let spillStream: ReturnType | undefined; + const MAX_BUFFER = DEFAULT_MAX_BYTES * 2; + + const onData = (data: Buffer) => { + totalBytes += data.length; + + if (totalBytes > DEFAULT_MAX_BYTES && !spillFilePath) { + spillFilePath = getTempFilePath(); + spillStream = createWriteStream(spillFilePath); + for (const chunk of chunks) spillStream.write(chunk); + } + if (spillStream) spillStream.write(data); + + chunks.push(data); + let chunksBytes = chunks.reduce((s, c) => s + c.length, 0); + while (chunksBytes > MAX_BUFFER && chunks.length > 1) { + const removed = chunks.shift()!; + chunksBytes -= removed.length; + } + }; + + if (child.stdout) child.stdout.on("data", onData); + if (child.stderr) child.stderr.on("data", onData); + + const onAbort = () => { + if (child.pid) killTree(child.pid); + }; + + if (signal.aborted) { + onAbort(); + } else { + signal.addEventListener("abort", onAbort, { once: true }); + } + + child.on("error", (err) => { + if (timeoutHandle) clearTimeout(timeoutHandle); + signal.removeEventListener("abort", onAbort); + reject(err); + }); + + child.on("close", (code) => { + if (timeoutHandle) clearTimeout(timeoutHandle); + signal.removeEventListener("abort", onAbort); + if (spillStream) spillStream.end(); + + if (signal.aborted) { + const output = Buffer.concat(chunks).toString("utf-8"); + resolve(output ? `${output}\n\nCommand aborted` : "Command aborted"); + return; + } + + if (timedOut) { + const output = Buffer.concat(chunks).toString("utf-8"); + resolve(output ? `${output}\n\nCommand timed out after ${timeout} seconds` : `Command timed out after ${timeout} seconds`); + return; + } + + const fullOutput = Buffer.concat(chunks).toString("utf-8"); + + const lines = fullOutput.split("\n"); + let text: string; + if (lines.length > DEFAULT_MAX_LINES) { + text = lines.slice(-DEFAULT_MAX_LINES).join("\n"); + if (spillFilePath) { + text += `\n\n[Showing last ${DEFAULT_MAX_LINES} of ${lines.length} lines. Full output: ${spillFilePath}]`; + } else { + text += `\n\n[Showing last ${DEFAULT_MAX_LINES} of ${lines.length} lines]`; + } + } else { + text = fullOutput || "(no output)"; + } + + if (code !== 0 && code !== null) { + text += `\n\nCommand exited with code ${code}`; + } + + resolve(text); + }); + }); +} diff --git a/src/resources/extensions/async-jobs/await-tool.ts b/src/resources/extensions/async-jobs/await-tool.ts new file mode 100644 index 000000000..bab889e9a --- /dev/null +++ b/src/resources/extensions/async-jobs/await-tool.ts @@ -0,0 +1,101 @@ +/** + * await_job tool — wait for one or more background jobs to complete. + * + * If specific job IDs are provided, waits for those jobs. + * If omitted, waits for any running job to complete. + */ + +import type { ToolDefinition } from "@gsd/pi-coding-agent"; +import { Type } from "@sinclair/typebox"; +import type { AsyncJobManager, Job } from "./job-manager.js"; + +const schema = Type.Object({ + jobs: Type.Optional( + Type.Array(Type.String(), { + description: "Job IDs to wait for. Omit to wait for any running job.", + }), + ), +}); + +export function createAwaitTool(getManager: () => AsyncJobManager): ToolDefinition { + return { + name: "await_job", + label: "Await Background Job", + description: + "Wait for background jobs to complete. Provide specific job IDs or omit to wait for the next job that finishes. Returns results of completed jobs.", + parameters: schema, + async execute(_toolCallId, params) { + const manager = getManager(); + const { jobs: jobIds } = params; + + let watched: Job[]; + if (jobIds && jobIds.length > 0) { + watched = []; + const notFound: string[] = []; + for (const id of jobIds) { + const job = manager.getJob(id); + if (job) { + watched.push(job); + } else { + notFound.push(id); + } + } + if (notFound.length > 0 && watched.length === 0) { + return { + content: [{ type: "text", text: `No jobs found: ${notFound.join(", ")}` }], + }; + } + } else { + watched = manager.getRunningJobs(); + if (watched.length === 0) { + return { + content: [{ type: "text", text: "No running background jobs." }], + }; + } + } + + // If all watched jobs are already done, return immediately + const running = watched.filter((j) => j.status === "running"); + if (running.length === 0) { + const result = formatResults(watched); + manager.acknowledgeDeliveries(watched.map((j) => j.id)); + return { content: [{ type: "text", text: result }] }; + } + + // Wait for at least one to complete + await Promise.race(running.map((j) => j.promise)); + + // Collect all completed results (more may have finished while waiting) + const completed = watched.filter((j) => j.status !== "running"); + manager.acknowledgeDeliveries(completed.map((j) => j.id)); + + const stillRunning = watched.filter((j) => j.status === "running"); + let result = formatResults(completed); + if (stillRunning.length > 0) { + result += `\n\n**Still running:** ${stillRunning.map((j) => `${j.id} (${j.label})`).join(", ")}`; + } + + return { content: [{ type: "text", text: result }] }; + }, + }; +} + +function formatResults(jobs: Job[]): string { + if (jobs.length === 0) return "No completed jobs."; + + const parts: string[] = []; + for (const job of jobs) { + const elapsed = ((Date.now() - job.startTime) / 1000).toFixed(1); + const header = `### ${job.id} — ${job.label} (${job.status}, ${elapsed}s)`; + + if (job.status === "completed") { + parts.push(`${header}\n\n${job.resultText ?? "(no output)"}`); + } else if (job.status === "failed") { + parts.push(`${header}\n\nError: ${job.errorText ?? "unknown error"}`); + } else if (job.status === "cancelled") { + parts.push(`${header}\n\nCancelled.`); + } + } + + return parts.join("\n\n---\n\n"); +} diff --git a/src/resources/extensions/async-jobs/cancel-job-tool.ts b/src/resources/extensions/async-jobs/cancel-job-tool.ts new file mode 100644 index 000000000..99f450414 --- /dev/null +++ b/src/resources/extensions/async-jobs/cancel-job-tool.ts @@ -0,0 +1,34 @@ +/** + * cancel_job tool — cancel a running background job. + */ + +import type { ToolDefinition } from "@gsd/pi-coding-agent"; +import { Type } from "@sinclair/typebox"; +import type { AsyncJobManager } from "./job-manager.js"; + +const schema = Type.Object({ + job_id: Type.String({ description: "The background job ID to cancel (e.g. bg_a1b2c3d4)" }), +}); + +export function createCancelJobTool(getManager: () => AsyncJobManager): ToolDefinition { + return { + name: "cancel_job", + label: "Cancel Background Job", + description: "Cancel a running background job by its ID.", + parameters: schema, + async execute(_toolCallId, params) { + const manager = getManager(); + const result = manager.cancel(params.job_id); + + const messages: Record = { + cancelled: `Job ${params.job_id} has been cancelled.`, + not_found: `Job ${params.job_id} not found.`, + already_completed: `Job ${params.job_id} has already completed (or failed/cancelled).`, + }; + + return { + content: [{ type: "text", text: messages[result] ?? `Unknown result: ${result}` }], + }; + }, + }; +} diff --git a/src/resources/extensions/async-jobs/index.ts b/src/resources/extensions/async-jobs/index.ts new file mode 100644 index 000000000..b44d4f2c6 --- /dev/null +++ b/src/resources/extensions/async-jobs/index.ts @@ -0,0 +1,133 @@ +/** + * Async Jobs Extension + * + * Allows bash commands to run in the background. The agent gets a job ID + * immediately and can continue working. Results are delivered via follow-up + * messages when jobs complete. + * + * Tools: + * async_bash — run a command in the background, get a job ID + * await_job — wait for background jobs to complete, get results + * cancel_job — cancel a running background job + * + * Commands: + * /jobs — show running and recent background jobs + */ + +import type { ExtensionAPI, ExtensionCommandContext } from "@gsd/pi-coding-agent"; +import { AsyncJobManager, type Job } from "./job-manager.js"; +import { createAsyncBashTool } from "./async-bash-tool.js"; +import { createAwaitTool } from "./await-tool.js"; +import { createCancelJobTool } from "./cancel-job-tool.js"; + +export default function AsyncJobs(pi: ExtensionAPI) { + let manager: AsyncJobManager | null = null; + let latestCwd: string = process.cwd(); + + function getManager(): AsyncJobManager { + if (!manager) { + throw new Error("AsyncJobManager not initialized. Wait for session_start."); + } + return manager; + } + + function getCwd(): string { + return latestCwd; + } + + // ── Session lifecycle ────────────────────────────────────────────────── + + pi.on("session_start", async (_event, ctx) => { + latestCwd = ctx.cwd; + + manager = new AsyncJobManager({ + onJobComplete: (job) => { + const statusEmoji = job.status === "completed" ? "done" : "error"; + const elapsed = ((Date.now() - job.startTime) / 1000).toFixed(1); + const output = job.status === "completed" + ? job.resultText ?? "(no output)" + : `Error: ${job.errorText ?? "unknown error"}`; + + // Truncate output for the follow-up message + const maxLen = 2000; + const truncatedOutput = output.length > maxLen + ? output.slice(0, maxLen) + "\n\n[... truncated, use await_job for full output]" + : output; + + pi.sendMessage( + { + customType: "async_job_result", + content: [ + `**Background job ${statusEmoji}: ${job.id}** (${job.label}, ${elapsed}s)`, + "", + truncatedOutput, + ].join("\n"), + display: `Background job ${job.id} ${job.status}`, + }, + { deliverAs: "followUp", triggerTurn: true }, + ); + }, + }); + }); + + pi.on("session_shutdown", async () => { + if (manager) { + manager.shutdown(); + manager = null; + } + }); + + // ── Tools ────────────────────────────────────────────────────────────── + + pi.registerTool(createAsyncBashTool(getManager, getCwd)); + pi.registerTool(createAwaitTool(getManager)); + pi.registerTool(createCancelJobTool(getManager)); + + // ── /jobs command ────────────────────────────────────────────────────── + + pi.registerCommand("jobs", { + description: "Show running and recent background jobs", + handler: async (_args: string, _ctx: ExtensionCommandContext) => { + if (!manager) { + pi.sendMessage({ + customType: "async_jobs_list", + content: "No async job manager active.", + display: "No jobs", + }); + return; + } + + const running = manager.getRunningJobs(); + const recent = manager.getRecentJobs(10); + const completed = recent.filter((j) => j.status !== "running"); + + const lines: string[] = ["## Background Jobs"]; + + if (running.length === 0 && completed.length === 0) { + lines.push("", "No background jobs."); + } else { + if (running.length > 0) { + lines.push("", "### Running"); + for (const job of running) { + const elapsed = ((Date.now() - job.startTime) / 1000).toFixed(0); + lines.push(`- **${job.id}** — ${job.label} (${elapsed}s)`); + } + } + + if (completed.length > 0) { + lines.push("", "### Recent"); + for (const job of completed) { + const elapsed = ((Date.now() - job.startTime) / 1000).toFixed(1); + lines.push(`- **${job.id}** — ${job.label} (${job.status}, ${elapsed}s)`); + } + } + } + + pi.sendMessage({ + customType: "async_jobs_list", + content: lines.join("\n"), + display: `${running.length} running, ${completed.length} recent`, + }); + }, + }); +} diff --git a/src/resources/extensions/async-jobs/job-manager.ts b/src/resources/extensions/async-jobs/job-manager.ts new file mode 100644 index 000000000..34a1b0527 --- /dev/null +++ b/src/resources/extensions/async-jobs/job-manager.ts @@ -0,0 +1,250 @@ +/** + * AsyncJobManager — manages background tool call jobs. + * + * Each job runs asynchronously and delivers its result via a callback + * when complete. Jobs are evicted after a configurable TTL. + */ + +import { randomUUID } from "node:crypto"; + +// ── Types ────────────────────────────────────────────────────────────────── + +export type JobStatus = "running" | "completed" | "failed" | "cancelled"; +export type JobType = "bash"; + +export interface Job { + id: string; + type: JobType; + status: JobStatus; + startTime: number; + label: string; + abortController: AbortController; + promise: Promise; + resultText?: string; + errorText?: string; +} + +export interface JobManagerOptions { + maxRunning?: number; // default 15 + maxTotal?: number; // default 100 + evictionMs?: number; // default 5 minutes + onJobComplete?: (job: Job) => void; +} + +// ── Delivery Retry ───────────────────────────────────────────────────────── + +const DELIVERY_BASE_MS = 500; +const DELIVERY_MAX_MS = 30_000; +const DELIVERY_JITTER_MS = 200; + +// ── Manager ──────────────────────────────────────────────────────────────── + +export class AsyncJobManager { + private jobs = new Map(); + private deliveryTimers = new Map>(); + private acknowledgedJobs = new Set(); + private evictionTimers = new Map>(); + + private maxRunning: number; + private maxTotal: number; + private evictionMs: number; + private onJobComplete?: (job: Job) => void; + + constructor(options: JobManagerOptions = {}) { + this.maxRunning = options.maxRunning ?? 15; + this.maxTotal = options.maxTotal ?? 100; + this.evictionMs = options.evictionMs ?? 5 * 60 * 1000; + this.onJobComplete = options.onJobComplete; + } + + /** + * Register a new background job. + * @returns job ID (prefixed with `bg_`) + */ + register( + type: JobType, + label: string, + runFn: (signal: AbortSignal) => Promise, + ): string { + // Enforce limits + const running = this.getRunningJobs(); + if (running.length >= this.maxRunning) { + throw new Error( + `Maximum concurrent background jobs reached (${this.maxRunning}). ` + + `Use await_job or cancel_job to free a slot.`, + ); + } + if (this.jobs.size >= this.maxTotal) { + // Evict oldest completed job + this.evictOldest(); + if (this.jobs.size >= this.maxTotal) { + throw new Error( + `Maximum total background jobs reached (${this.maxTotal}). ` + + `Use cancel_job to remove jobs.`, + ); + } + } + + const id = `bg_${randomUUID().slice(0, 8)}`; + const abortController = new AbortController(); + + // Declare job first so the promise callbacks can close over it safely. + const job: Job = { + id, + type, + status: "running", + startTime: Date.now(), + label, + abortController, + // promise assigned below + promise: undefined as unknown as Promise, + }; + + job.promise = runFn(abortController.signal) + .then((resultText) => { + job.status = "completed"; + job.resultText = resultText; + this.scheduleEviction(id); + this.deliverResult(job); + }) + .catch((err) => { + if (job.status === "cancelled") { + // Already cancelled — don't overwrite + this.scheduleEviction(id); + return; + } + job.status = "failed"; + job.errorText = err instanceof Error ? err.message : String(err); + this.scheduleEviction(id); + this.deliverResult(job); + }); + + this.jobs.set(id, job); + return id; + } + + /** + * Cancel a running job. + */ + cancel(id: string): "cancelled" | "not_found" | "already_completed" { + const job = this.jobs.get(id); + if (!job) return "not_found"; + if (job.status !== "running") return "already_completed"; + + job.status = "cancelled"; + job.errorText = "Cancelled by user"; + job.abortController.abort(); + this.scheduleEviction(id); + return "cancelled"; + } + + getJob(id: string): Job | undefined { + return this.jobs.get(id); + } + + getRunningJobs(): Job[] { + return [...this.jobs.values()].filter((j) => j.status === "running"); + } + + getRecentJobs(limit = 10): Job[] { + return [...this.jobs.values()] + .sort((a, b) => b.startTime - a.startTime) + .slice(0, limit); + } + + getAllJobs(): Job[] { + return [...this.jobs.values()]; + } + + /** + * Mark jobs as acknowledged so delivery retries stop. + */ + acknowledgeDeliveries(jobIds: string[]): void { + for (const id of jobIds) { + this.acknowledgedJobs.add(id); + const timer = this.deliveryTimers.get(id); + if (timer) { + clearTimeout(timer); + this.deliveryTimers.delete(id); + } + } + } + + /** + * Cleanup all timers and resources. + */ + shutdown(): void { + for (const timer of this.deliveryTimers.values()) { + clearTimeout(timer); + } + this.deliveryTimers.clear(); + + for (const timer of this.evictionTimers.values()) { + clearTimeout(timer); + } + this.evictionTimers.clear(); + + // Abort all running jobs + for (const job of this.jobs.values()) { + if (job.status === "running") { + job.status = "cancelled"; + job.abortController.abort(); + } + } + } + + // ── Private ──────────────────────────────────────────────────────────── + + private deliverResult(job: Job, attempt = 0): void { + if (this.acknowledgedJobs.has(job.id)) return; + if (!this.onJobComplete) return; + + this.onJobComplete(job); + + // Schedule retry with exponential backoff + jitter + const delay = Math.min( + DELIVERY_BASE_MS * Math.pow(2, attempt) + Math.random() * DELIVERY_JITTER_MS, + DELIVERY_MAX_MS, + ); + + const timer = setTimeout(() => { + this.deliveryTimers.delete(job.id); + if (!this.acknowledgedJobs.has(job.id)) { + this.deliverResult(job, attempt + 1); + } + }, delay); + + this.deliveryTimers.set(job.id, timer); + } + + private scheduleEviction(id: string): void { + const existing = this.evictionTimers.get(id); + if (existing) clearTimeout(existing); + + const timer = setTimeout(() => { + this.evictionTimers.delete(id); + this.jobs.delete(id); + this.acknowledgedJobs.delete(id); + }, this.evictionMs); + + this.evictionTimers.set(id, timer); + } + + private evictOldest(): void { + let oldest: Job | undefined; + for (const job of this.jobs.values()) { + if (job.status !== "running") { + if (!oldest || job.startTime < oldest.startTime) { + oldest = job; + } + } + } + if (oldest) { + const timer = this.evictionTimers.get(oldest.id); + if (timer) clearTimeout(timer); + this.evictionTimers.delete(oldest.id); + this.jobs.delete(oldest.id); + this.acknowledgedJobs.delete(oldest.id); + } + } +}