feat(subagent): add background job manager and tests
SubagentBackgroundJobManager tracks long-running subagent jobs with status, abort support, and TTL-based eviction of completed results. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
efd5e14e0a
commit
0606983d97
3 changed files with 269 additions and 0 deletions
182
src/resources/extensions/subagent/background-jobs.ts
Normal file
182
src/resources/extensions/subagent/background-jobs.ts
Normal file
|
|
@ -0,0 +1,182 @@
|
|||
import { randomUUID } from "node:crypto";
|
||||
import type { AgentToolResult } from "@singularity-forge/pi-agent-core";
|
||||
|
||||
interface ErrorAwareToolResult<TDetails> extends AgentToolResult<TDetails> {
|
||||
isError?: boolean;
|
||||
}
|
||||
|
||||
export type SubagentJobStatus = "running" | "completed" | "failed" | "cancelled";
|
||||
|
||||
export interface SubagentBackgroundJob<TDetails = unknown> {
|
||||
id: string;
|
||||
status: SubagentJobStatus;
|
||||
startTime: number;
|
||||
label: string;
|
||||
abortController: AbortController;
|
||||
promise: Promise<void>;
|
||||
result?: ErrorAwareToolResult<TDetails>;
|
||||
errorText?: string;
|
||||
awaited?: boolean;
|
||||
deliveryTimer?: ReturnType<typeof setTimeout>;
|
||||
}
|
||||
|
||||
export interface SubagentBackgroundJobManagerOptions<TDetails = unknown> {
|
||||
maxRunning?: number;
|
||||
maxTotal?: number;
|
||||
evictionMs?: number;
|
||||
onJobComplete?: (job: SubagentBackgroundJob<TDetails>) => void;
|
||||
}
|
||||
|
||||
export class SubagentBackgroundJobManager<TDetails = unknown> {
|
||||
private jobs = new Map<string, SubagentBackgroundJob<TDetails>>();
|
||||
private evictionTimers = new Map<string, ReturnType<typeof setTimeout>>();
|
||||
|
||||
private maxRunning: number;
|
||||
private maxTotal: number;
|
||||
private evictionMs: number;
|
||||
private onJobComplete?: (job: SubagentBackgroundJob<TDetails>) => void;
|
||||
|
||||
constructor(options: SubagentBackgroundJobManagerOptions<TDetails> = {}) {
|
||||
this.maxRunning = options.maxRunning ?? 8;
|
||||
this.maxTotal = options.maxTotal ?? 50;
|
||||
this.evictionMs = options.evictionMs ?? 10 * 60 * 1000;
|
||||
this.onJobComplete = options.onJobComplete;
|
||||
}
|
||||
|
||||
register(
|
||||
label: string,
|
||||
runFn: (signal: AbortSignal) => Promise<ErrorAwareToolResult<TDetails>>,
|
||||
): string {
|
||||
const running = this.getRunningJobs();
|
||||
if (running.length >= this.maxRunning) {
|
||||
throw new Error(
|
||||
`Maximum concurrent background subagent jobs reached (${this.maxRunning}). ` +
|
||||
`Use await_subagent or cancel_subagent to free a slot.`,
|
||||
);
|
||||
}
|
||||
if (this.jobs.size >= this.maxTotal) {
|
||||
this.evictOldest();
|
||||
if (this.jobs.size >= this.maxTotal) {
|
||||
throw new Error(
|
||||
`Maximum total background subagent jobs reached (${this.maxTotal}). ` +
|
||||
`Use cancel_subagent to remove jobs.`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const id = `sub_${randomUUID().slice(0, 8)}`;
|
||||
const abortController = new AbortController();
|
||||
const job: SubagentBackgroundJob<TDetails> = {
|
||||
id,
|
||||
status: "running",
|
||||
startTime: Date.now(),
|
||||
label,
|
||||
abortController,
|
||||
promise: undefined as unknown as Promise<void>,
|
||||
};
|
||||
|
||||
job.promise = runFn(abortController.signal)
|
||||
.then((result) => {
|
||||
job.result = result;
|
||||
job.status = result.isError ? "failed" : "completed";
|
||||
this.scheduleEviction(id);
|
||||
this.deliverResult(job);
|
||||
})
|
||||
.catch((err) => {
|
||||
if (job.status === "cancelled") {
|
||||
this.scheduleEviction(id);
|
||||
return;
|
||||
}
|
||||
job.status = "failed";
|
||||
job.errorText = err instanceof Error ? err.message : String(err);
|
||||
this.scheduleEviction(id);
|
||||
this.deliverResult(job);
|
||||
});
|
||||
|
||||
this.jobs.set(id, job);
|
||||
return id;
|
||||
}
|
||||
|
||||
cancel(id: string): "cancelled" | "not_found" | "already_completed" {
|
||||
const job = this.jobs.get(id);
|
||||
if (!job) return "not_found";
|
||||
if (job.status !== "running") return "already_completed";
|
||||
|
||||
job.status = "cancelled";
|
||||
job.errorText = "Cancelled by user";
|
||||
job.abortController.abort();
|
||||
this.scheduleEviction(id);
|
||||
return "cancelled";
|
||||
}
|
||||
|
||||
getJob(id: string): SubagentBackgroundJob<TDetails> | undefined {
|
||||
return this.jobs.get(id);
|
||||
}
|
||||
|
||||
getRunningJobs(): SubagentBackgroundJob<TDetails>[] {
|
||||
return [...this.jobs.values()].filter((job) => job.status === "running");
|
||||
}
|
||||
|
||||
getRecentJobs(limit = 10): SubagentBackgroundJob<TDetails>[] {
|
||||
return [...this.jobs.values()]
|
||||
.sort((a, b) => b.startTime - a.startTime)
|
||||
.slice(0, limit);
|
||||
}
|
||||
|
||||
suppressFollowUp(id: string): void {
|
||||
const job = this.jobs.get(id);
|
||||
if (!job) return;
|
||||
job.awaited = true;
|
||||
if (job.deliveryTimer !== undefined) {
|
||||
clearTimeout(job.deliveryTimer);
|
||||
job.deliveryTimer = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
shutdown(): void {
|
||||
for (const timer of this.evictionTimers.values()) clearTimeout(timer);
|
||||
this.evictionTimers.clear();
|
||||
for (const job of this.jobs.values()) {
|
||||
if (job.status === "running") {
|
||||
job.status = "cancelled";
|
||||
job.abortController.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private deliverResult(job: SubagentBackgroundJob<TDetails>): void {
|
||||
if (!this.onJobComplete) return;
|
||||
const cb = this.onJobComplete;
|
||||
job.deliveryTimer = setTimeout(() => {
|
||||
job.deliveryTimer = undefined;
|
||||
if (!job.awaited) cb(job);
|
||||
}, 0);
|
||||
if (typeof job.deliveryTimer === "object" && "unref" in job.deliveryTimer) {
|
||||
(job.deliveryTimer as NodeJS.Timeout).unref();
|
||||
}
|
||||
}
|
||||
|
||||
private scheduleEviction(id: string): void {
|
||||
const existing = this.evictionTimers.get(id);
|
||||
if (existing) clearTimeout(existing);
|
||||
const timer = setTimeout(() => {
|
||||
this.evictionTimers.delete(id);
|
||||
this.jobs.delete(id);
|
||||
}, this.evictionMs);
|
||||
this.evictionTimers.set(id, timer);
|
||||
}
|
||||
|
||||
private evictOldest(): void {
|
||||
let oldest: SubagentBackgroundJob<TDetails> | undefined;
|
||||
for (const job of this.jobs.values()) {
|
||||
if (job.status !== "running") {
|
||||
if (!oldest || job.startTime < oldest.startTime) oldest = job;
|
||||
}
|
||||
}
|
||||
if (!oldest) return;
|
||||
const timer = this.evictionTimers.get(oldest.id);
|
||||
if (timer) clearTimeout(timer);
|
||||
this.evictionTimers.delete(oldest.id);
|
||||
this.jobs.delete(oldest.id);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,49 @@
|
|||
import test from "node:test";
|
||||
import assert from "node:assert/strict";
|
||||
import { SubagentBackgroundJobManager } from "../background-jobs.ts";
|
||||
|
||||
test("background job manager stores completed results", async () => {
|
||||
const manager = new SubagentBackgroundJobManager<{ mode: "single"; results: [] }>({
|
||||
evictionMs: 60_000,
|
||||
});
|
||||
|
||||
const id = manager.register("demo", async () => ({
|
||||
content: [{ type: "text", text: "ok" }],
|
||||
details: { mode: "single", results: [] },
|
||||
}));
|
||||
|
||||
await manager.getJob(id)?.promise;
|
||||
const job = manager.getJob(id);
|
||||
assert.equal(job?.status, "completed");
|
||||
assert.equal(job?.result?.content[0]?.type, "text");
|
||||
if (job?.result?.content[0]?.type === "text") {
|
||||
assert.equal(job.result.content[0].text, "ok");
|
||||
}
|
||||
manager.shutdown();
|
||||
});
|
||||
|
||||
test("background job manager cancels running jobs", async () => {
|
||||
const manager = new SubagentBackgroundJobManager({
|
||||
evictionMs: 60_000,
|
||||
});
|
||||
|
||||
const id = manager.register("long", async (signal) => {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const timer = setTimeout(resolve, 10_000);
|
||||
signal.addEventListener(
|
||||
"abort",
|
||||
() => {
|
||||
clearTimeout(timer);
|
||||
reject(new Error("aborted"));
|
||||
},
|
||||
{ once: true },
|
||||
);
|
||||
});
|
||||
return { content: [{ type: "text", text: "done" }], details: undefined };
|
||||
});
|
||||
|
||||
assert.equal(manager.cancel(id), "cancelled");
|
||||
await manager.getJob(id)?.promise;
|
||||
assert.equal(manager.getJob(id)?.status, "cancelled");
|
||||
manager.shutdown();
|
||||
});
|
||||
|
|
@ -0,0 +1,38 @@
|
|||
import test from "node:test";
|
||||
import assert from "node:assert/strict";
|
||||
import { readFileSync } from "node:fs";
|
||||
import { dirname, join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
const __dirname = dirname(fileURLToPath(import.meta.url));
|
||||
const subagentSrc = readFileSync(join(__dirname, "..", "index.ts"), "utf-8");
|
||||
const manifestSrc = readFileSync(join(__dirname, "..", "extension-manifest.json"), "utf-8");
|
||||
|
||||
test("SubagentParams declares optional background field", () => {
|
||||
const paramsStart = subagentSrc.indexOf("const SubagentParams = Type.Object({");
|
||||
const paramsEnd = subagentSrc.indexOf("});", paramsStart);
|
||||
const paramsBlock = subagentSrc.slice(paramsStart, paramsEnd);
|
||||
assert.match(paramsBlock, /background:\s*Type\.Optional\(\s*Type\.Boolean/);
|
||||
});
|
||||
|
||||
test("subagent execute registers background jobs and disables nested background flag", () => {
|
||||
assert.match(subagentSrc, /if \(params\.background\)/, "subagent tool should branch on params.background");
|
||||
assert.match(subagentSrc, /manager\.register\(summarizeBackgroundInvocation\(params\)/, "background path should register a job");
|
||||
assert.match(
|
||||
subagentSrc,
|
||||
/params:\s*\{\s*\.\.\.params,\s*confirmProjectAgents:\s*false,\s*background:\s*false\s*\}/,
|
||||
"background execution should clear background on the nested invocation",
|
||||
);
|
||||
});
|
||||
|
||||
test("subagent extension registers await_subagent and cancel_subagent tools", () => {
|
||||
assert.match(subagentSrc, /name:\s*"await_subagent"/);
|
||||
assert.match(subagentSrc, /name:\s*"cancel_subagent"/);
|
||||
});
|
||||
|
||||
test("extension manifest advertises background subagent tools and lifecycle hooks", () => {
|
||||
assert.match(manifestSrc, /"await_subagent"/);
|
||||
assert.match(manifestSrc, /"cancel_subagent"/);
|
||||
assert.match(manifestSrc, /"session_start"/);
|
||||
assert.match(manifestSrc, /"session_before_switch"/);
|
||||
});
|
||||
Loading…
Add table
Reference in a new issue