diff --git a/src/resources/extensions/subagent/background-jobs.ts b/src/resources/extensions/subagent/background-jobs.ts new file mode 100644 index 000000000..c480fa2ed --- /dev/null +++ b/src/resources/extensions/subagent/background-jobs.ts @@ -0,0 +1,182 @@ +import { randomUUID } from "node:crypto"; +import type { AgentToolResult } from "@singularity-forge/pi-agent-core"; + +interface ErrorAwareToolResult extends AgentToolResult { + isError?: boolean; +} + +export type SubagentJobStatus = "running" | "completed" | "failed" | "cancelled"; + +export interface SubagentBackgroundJob { + id: string; + status: SubagentJobStatus; + startTime: number; + label: string; + abortController: AbortController; + promise: Promise; + result?: ErrorAwareToolResult; + errorText?: string; + awaited?: boolean; + deliveryTimer?: ReturnType; +} + +export interface SubagentBackgroundJobManagerOptions { + maxRunning?: number; + maxTotal?: number; + evictionMs?: number; + onJobComplete?: (job: SubagentBackgroundJob) => void; +} + +export class SubagentBackgroundJobManager { + private jobs = new Map>(); + private evictionTimers = new Map>(); + + private maxRunning: number; + private maxTotal: number; + private evictionMs: number; + private onJobComplete?: (job: SubagentBackgroundJob) => void; + + constructor(options: SubagentBackgroundJobManagerOptions = {}) { + this.maxRunning = options.maxRunning ?? 8; + this.maxTotal = options.maxTotal ?? 50; + this.evictionMs = options.evictionMs ?? 10 * 60 * 1000; + this.onJobComplete = options.onJobComplete; + } + + register( + label: string, + runFn: (signal: AbortSignal) => Promise>, + ): string { + const running = this.getRunningJobs(); + if (running.length >= this.maxRunning) { + throw new Error( + `Maximum concurrent background subagent jobs reached (${this.maxRunning}). ` + + `Use await_subagent or cancel_subagent to free a slot.`, + ); + } + if (this.jobs.size >= this.maxTotal) { + this.evictOldest(); + if (this.jobs.size >= this.maxTotal) { + throw new Error( + `Maximum total background subagent jobs reached (${this.maxTotal}). ` + + `Use cancel_subagent to remove jobs.`, + ); + } + } + + const id = `sub_${randomUUID().slice(0, 8)}`; + const abortController = new AbortController(); + const job: SubagentBackgroundJob = { + id, + status: "running", + startTime: Date.now(), + label, + abortController, + promise: undefined as unknown as Promise, + }; + + job.promise = runFn(abortController.signal) + .then((result) => { + job.result = result; + job.status = result.isError ? "failed" : "completed"; + this.scheduleEviction(id); + this.deliverResult(job); + }) + .catch((err) => { + if (job.status === "cancelled") { + this.scheduleEviction(id); + return; + } + job.status = "failed"; + job.errorText = err instanceof Error ? err.message : String(err); + this.scheduleEviction(id); + this.deliverResult(job); + }); + + this.jobs.set(id, job); + return id; + } + + cancel(id: string): "cancelled" | "not_found" | "already_completed" { + const job = this.jobs.get(id); + if (!job) return "not_found"; + if (job.status !== "running") return "already_completed"; + + job.status = "cancelled"; + job.errorText = "Cancelled by user"; + job.abortController.abort(); + this.scheduleEviction(id); + return "cancelled"; + } + + getJob(id: string): SubagentBackgroundJob | undefined { + return this.jobs.get(id); + } + + getRunningJobs(): SubagentBackgroundJob[] { + return [...this.jobs.values()].filter((job) => job.status === "running"); + } + + getRecentJobs(limit = 10): SubagentBackgroundJob[] { + return [...this.jobs.values()] + .sort((a, b) => b.startTime - a.startTime) + .slice(0, limit); + } + + suppressFollowUp(id: string): void { + const job = this.jobs.get(id); + if (!job) return; + job.awaited = true; + if (job.deliveryTimer !== undefined) { + clearTimeout(job.deliveryTimer); + job.deliveryTimer = undefined; + } + } + + shutdown(): void { + for (const timer of this.evictionTimers.values()) clearTimeout(timer); + this.evictionTimers.clear(); + for (const job of this.jobs.values()) { + if (job.status === "running") { + job.status = "cancelled"; + job.abortController.abort(); + } + } + } + + private deliverResult(job: SubagentBackgroundJob): void { + if (!this.onJobComplete) return; + const cb = this.onJobComplete; + job.deliveryTimer = setTimeout(() => { + job.deliveryTimer = undefined; + if (!job.awaited) cb(job); + }, 0); + if (typeof job.deliveryTimer === "object" && "unref" in job.deliveryTimer) { + (job.deliveryTimer as NodeJS.Timeout).unref(); + } + } + + private scheduleEviction(id: string): void { + const existing = this.evictionTimers.get(id); + if (existing) clearTimeout(existing); + const timer = setTimeout(() => { + this.evictionTimers.delete(id); + this.jobs.delete(id); + }, this.evictionMs); + this.evictionTimers.set(id, timer); + } + + private evictOldest(): void { + let oldest: SubagentBackgroundJob | undefined; + for (const job of this.jobs.values()) { + if (job.status !== "running") { + if (!oldest || job.startTime < oldest.startTime) oldest = job; + } + } + if (!oldest) return; + const timer = this.evictionTimers.get(oldest.id); + if (timer) clearTimeout(timer); + this.evictionTimers.delete(oldest.id); + this.jobs.delete(oldest.id); + } +} diff --git a/src/resources/extensions/subagent/tests/background-jobs.test.ts b/src/resources/extensions/subagent/tests/background-jobs.test.ts new file mode 100644 index 000000000..05cab21fa --- /dev/null +++ b/src/resources/extensions/subagent/tests/background-jobs.test.ts @@ -0,0 +1,49 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import { SubagentBackgroundJobManager } from "../background-jobs.ts"; + +test("background job manager stores completed results", async () => { + const manager = new SubagentBackgroundJobManager<{ mode: "single"; results: [] }>({ + evictionMs: 60_000, + }); + + const id = manager.register("demo", async () => ({ + content: [{ type: "text", text: "ok" }], + details: { mode: "single", results: [] }, + })); + + await manager.getJob(id)?.promise; + const job = manager.getJob(id); + assert.equal(job?.status, "completed"); + assert.equal(job?.result?.content[0]?.type, "text"); + if (job?.result?.content[0]?.type === "text") { + assert.equal(job.result.content[0].text, "ok"); + } + manager.shutdown(); +}); + +test("background job manager cancels running jobs", async () => { + const manager = new SubagentBackgroundJobManager({ + evictionMs: 60_000, + }); + + const id = manager.register("long", async (signal) => { + await new Promise((resolve, reject) => { + const timer = setTimeout(resolve, 10_000); + signal.addEventListener( + "abort", + () => { + clearTimeout(timer); + reject(new Error("aborted")); + }, + { once: true }, + ); + }); + return { content: [{ type: "text", text: "done" }], details: undefined }; + }); + + assert.equal(manager.cancel(id), "cancelled"); + await manager.getJob(id)?.promise; + assert.equal(manager.getJob(id)?.status, "cancelled"); + manager.shutdown(); +}); diff --git a/src/resources/extensions/subagent/tests/background-mode.test.ts b/src/resources/extensions/subagent/tests/background-mode.test.ts new file mode 100644 index 000000000..7e135e465 --- /dev/null +++ b/src/resources/extensions/subagent/tests/background-mode.test.ts @@ -0,0 +1,38 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import { readFileSync } from "node:fs"; +import { dirname, join } from "node:path"; +import { fileURLToPath } from "node:url"; + +const __dirname = dirname(fileURLToPath(import.meta.url)); +const subagentSrc = readFileSync(join(__dirname, "..", "index.ts"), "utf-8"); +const manifestSrc = readFileSync(join(__dirname, "..", "extension-manifest.json"), "utf-8"); + +test("SubagentParams declares optional background field", () => { + const paramsStart = subagentSrc.indexOf("const SubagentParams = Type.Object({"); + const paramsEnd = subagentSrc.indexOf("});", paramsStart); + const paramsBlock = subagentSrc.slice(paramsStart, paramsEnd); + assert.match(paramsBlock, /background:\s*Type\.Optional\(\s*Type\.Boolean/); +}); + +test("subagent execute registers background jobs and disables nested background flag", () => { + assert.match(subagentSrc, /if \(params\.background\)/, "subagent tool should branch on params.background"); + assert.match(subagentSrc, /manager\.register\(summarizeBackgroundInvocation\(params\)/, "background path should register a job"); + assert.match( + subagentSrc, + /params:\s*\{\s*\.\.\.params,\s*confirmProjectAgents:\s*false,\s*background:\s*false\s*\}/, + "background execution should clear background on the nested invocation", + ); +}); + +test("subagent extension registers await_subagent and cancel_subagent tools", () => { + assert.match(subagentSrc, /name:\s*"await_subagent"/); + assert.match(subagentSrc, /name:\s*"cancel_subagent"/); +}); + +test("extension manifest advertises background subagent tools and lifecycle hooks", () => { + assert.match(manifestSrc, /"await_subagent"/); + assert.match(manifestSrc, /"cancel_subagent"/); + assert.match(manifestSrc, /"session_start"/); + assert.match(manifestSrc, /"session_before_switch"/); +});