feat: async background jobs extension (#260)

This commit is contained in:
TÂCHES 2026-03-13 16:01:30 -06:00 committed by GitHub
parent 7b0b71da79
commit fa9477f638
8 changed files with 741 additions and 6 deletions

View file

@ -49,6 +49,11 @@ export interface MarkdownSettings {
codeBlockIndent?: string; // default: " "
}
export interface AsyncSettings {
enabled?: boolean; // default: false
maxJobs?: number; // default: 100
}
export type TransportSetting = Transport;
/**
@ -99,7 +104,7 @@ export interface Settings {
autocompleteMaxVisible?: number; // Max visible items in autocomplete dropdown (default: 5)
showHardwareCursor?: boolean; // Show terminal cursor while still positioning it for IME
markdown?: MarkdownSettings;
bashInterceptor?: BashInterceptorSettings;
async?: AsyncSettings;
}
/** Deep merge settings: project/overrides take precedence, nested objects merge recursively */
@ -947,11 +952,11 @@ export class SettingsManager {
return this.settings.markdown?.codeBlockIndent ?? " ";
}
getBashInterceptorEnabled(): boolean {
return this.settings.bashInterceptor?.enabled ?? true;
getAsyncEnabled(): boolean {
return this.settings.async?.enabled ?? false;
}
getBashInterceptorRules(): BashInterceptorRule[] | undefined {
return this.settings.bashInterceptor?.rules;
getAsyncMaxJobs(): number {
return this.settings.async?.maxJobs ?? 100;
}
}

View file

@ -202,7 +202,7 @@ export {
export { BlobStore, isBlobRef, parseBlobRef, externalizeImageData, resolveImageData } from "./core/blob-store.js";
export { ArtifactManager } from "./core/artifact-manager.js";
export {
type BashInterceptorSettings,
type AsyncSettings,
type CompactionSettings,
type ImageSettings,
type PackageSource,

View file

@ -90,6 +90,7 @@ process.env.GSD_BUNDLED_EXTENSION_PATHS = [
join(agentDir, 'extensions', 'slash-commands', 'index.ts'),
join(agentDir, 'extensions', 'subagent', 'index.ts'),
join(agentDir, 'extensions', 'mac-tools', 'index.ts'),
join(agentDir, 'extensions', 'async-jobs', 'index.ts'),
join(agentDir, 'extensions', 'ask-user-questions.ts'),
join(agentDir, 'extensions', 'get-secrets-from-user.ts'),
].join(':')

View file

@ -0,0 +1,211 @@
/**
* async_bash tool run a bash command in the background.
*
* Registers the command with the AsyncJobManager and returns a job ID
* immediately. The LLM can continue working and check results later
* with await_job.
*/
import type { ToolDefinition } from "@gsd/pi-coding-agent";
import {
getShellConfig,
sanitizeCommand,
DEFAULT_MAX_BYTES,
DEFAULT_MAX_LINES,
} from "@gsd/pi-coding-agent";
import { Type } from "@sinclair/typebox";
import { spawn } from "node:child_process";
import { createWriteStream } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { randomBytes } from "node:crypto";
import type { AsyncJobManager } from "./job-manager.js";
const schema = Type.Object({
command: Type.String({ description: "Bash command to execute in the background" }),
timeout: Type.Optional(
Type.Number({ description: "Timeout in seconds (optional)" }),
),
label: Type.Optional(
Type.String({ description: "Short label for the job (shown in /jobs). Defaults to a truncated version of the command." }),
),
});
function getTempFilePath(): string {
const id = randomBytes(8).toString("hex");
return join(tmpdir(), `pi-async-bash-${id}.log`);
}
/**
* Kill a process and its children. Uses process group kill on Unix.
*/
function killTree(pid: number): void {
try {
// Kill the process group (negative PID)
process.kill(-pid, "SIGTERM");
} catch {
try {
process.kill(pid, "SIGTERM");
} catch {
// Already exited
}
}
}
export function createAsyncBashTool(
getManager: () => AsyncJobManager,
getCwd: () => string,
): ToolDefinition<typeof schema> {
return {
name: "async_bash",
label: "Background Bash",
description:
`Run a bash command in the background. Returns a job ID immediately so you can continue working. ` +
`Use await_job to get results or cancel_job to stop. Ideal for long-running builds, tests, or installs. ` +
`Output is truncated to the last ${DEFAULT_MAX_LINES} lines or ${DEFAULT_MAX_BYTES / 1024}KB.`,
promptSnippet: "Run a bash command in the background, returning a job ID immediately.",
promptGuidelines: [
"Use async_bash for commands that take more than a few seconds (builds, tests, installs, large git operations).",
"After starting async jobs, continue with other work and use await_job when you need the results.",
"Use cancel_job to stop a running background job.",
"Check /jobs to see all running and recent background jobs.",
],
parameters: schema,
async execute(_toolCallId, params) {
const manager = getManager();
const cwd = getCwd();
const { command, timeout, label } = params;
const shortCmd = label ?? (command.length > 60 ? command.slice(0, 57) + "..." : command);
const jobId = manager.register("bash", shortCmd, (signal) => {
return executeBashInBackground(command, cwd, signal, timeout);
});
return {
content: [{
type: "text",
text: [
`Background job started: **${jobId}**`,
`Command: \`${shortCmd}\``,
"",
"Use `await_job` to get results when ready, or `cancel_job` to stop.",
].join("\n"),
}],
};
},
};
}
/**
* Execute a bash command, collecting output. Returns the text result.
*/
function executeBashInBackground(
command: string,
cwd: string,
signal: AbortSignal,
timeout?: number,
): Promise<string> {
return new Promise<string>((resolve, reject) => {
const { shell, args } = getShellConfig();
const resolvedCommand = sanitizeCommand(command);
const child = spawn(shell, [...args, resolvedCommand], {
cwd,
detached: true,
env: { ...process.env },
stdio: ["ignore", "pipe", "pipe"],
});
let timedOut = false;
let timeoutHandle: ReturnType<typeof setTimeout> | undefined;
if (timeout !== undefined && timeout > 0) {
timeoutHandle = setTimeout(() => {
timedOut = true;
if (child.pid) killTree(child.pid);
}, timeout * 1000);
}
const chunks: Buffer[] = [];
let totalBytes = 0;
let spillFilePath: string | undefined;
let spillStream: ReturnType<typeof createWriteStream> | undefined;
const MAX_BUFFER = DEFAULT_MAX_BYTES * 2;
const onData = (data: Buffer) => {
totalBytes += data.length;
if (totalBytes > DEFAULT_MAX_BYTES && !spillFilePath) {
spillFilePath = getTempFilePath();
spillStream = createWriteStream(spillFilePath);
for (const chunk of chunks) spillStream.write(chunk);
}
if (spillStream) spillStream.write(data);
chunks.push(data);
let chunksBytes = chunks.reduce((s, c) => s + c.length, 0);
while (chunksBytes > MAX_BUFFER && chunks.length > 1) {
const removed = chunks.shift()!;
chunksBytes -= removed.length;
}
};
if (child.stdout) child.stdout.on("data", onData);
if (child.stderr) child.stderr.on("data", onData);
const onAbort = () => {
if (child.pid) killTree(child.pid);
};
if (signal.aborted) {
onAbort();
} else {
signal.addEventListener("abort", onAbort, { once: true });
}
child.on("error", (err) => {
if (timeoutHandle) clearTimeout(timeoutHandle);
signal.removeEventListener("abort", onAbort);
reject(err);
});
child.on("close", (code) => {
if (timeoutHandle) clearTimeout(timeoutHandle);
signal.removeEventListener("abort", onAbort);
if (spillStream) spillStream.end();
if (signal.aborted) {
const output = Buffer.concat(chunks).toString("utf-8");
resolve(output ? `${output}\n\nCommand aborted` : "Command aborted");
return;
}
if (timedOut) {
const output = Buffer.concat(chunks).toString("utf-8");
resolve(output ? `${output}\n\nCommand timed out after ${timeout} seconds` : `Command timed out after ${timeout} seconds`);
return;
}
const fullOutput = Buffer.concat(chunks).toString("utf-8");
const lines = fullOutput.split("\n");
let text: string;
if (lines.length > DEFAULT_MAX_LINES) {
text = lines.slice(-DEFAULT_MAX_LINES).join("\n");
if (spillFilePath) {
text += `\n\n[Showing last ${DEFAULT_MAX_LINES} of ${lines.length} lines. Full output: ${spillFilePath}]`;
} else {
text += `\n\n[Showing last ${DEFAULT_MAX_LINES} of ${lines.length} lines]`;
}
} else {
text = fullOutput || "(no output)";
}
if (code !== 0 && code !== null) {
text += `\n\nCommand exited with code ${code}`;
}
resolve(text);
});
});
}

View file

@ -0,0 +1,101 @@
/**
* await_job tool wait for one or more background jobs to complete.
*
* If specific job IDs are provided, waits for those jobs.
* If omitted, waits for any running job to complete.
*/
import type { ToolDefinition } from "@gsd/pi-coding-agent";
import { Type } from "@sinclair/typebox";
import type { AsyncJobManager, Job } from "./job-manager.js";
const schema = Type.Object({
jobs: Type.Optional(
Type.Array(Type.String(), {
description: "Job IDs to wait for. Omit to wait for any running job.",
}),
),
});
export function createAwaitTool(getManager: () => AsyncJobManager): ToolDefinition<typeof schema> {
return {
name: "await_job",
label: "Await Background Job",
description:
"Wait for background jobs to complete. Provide specific job IDs or omit to wait for the next job that finishes. Returns results of completed jobs.",
parameters: schema,
async execute(_toolCallId, params) {
const manager = getManager();
const { jobs: jobIds } = params;
let watched: Job[];
if (jobIds && jobIds.length > 0) {
watched = [];
const notFound: string[] = [];
for (const id of jobIds) {
const job = manager.getJob(id);
if (job) {
watched.push(job);
} else {
notFound.push(id);
}
}
if (notFound.length > 0 && watched.length === 0) {
return {
content: [{ type: "text", text: `No jobs found: ${notFound.join(", ")}` }],
};
}
} else {
watched = manager.getRunningJobs();
if (watched.length === 0) {
return {
content: [{ type: "text", text: "No running background jobs." }],
};
}
}
// If all watched jobs are already done, return immediately
const running = watched.filter((j) => j.status === "running");
if (running.length === 0) {
const result = formatResults(watched);
manager.acknowledgeDeliveries(watched.map((j) => j.id));
return { content: [{ type: "text", text: result }] };
}
// Wait for at least one to complete
await Promise.race(running.map((j) => j.promise));
// Collect all completed results (more may have finished while waiting)
const completed = watched.filter((j) => j.status !== "running");
manager.acknowledgeDeliveries(completed.map((j) => j.id));
const stillRunning = watched.filter((j) => j.status === "running");
let result = formatResults(completed);
if (stillRunning.length > 0) {
result += `\n\n**Still running:** ${stillRunning.map((j) => `${j.id} (${j.label})`).join(", ")}`;
}
return { content: [{ type: "text", text: result }] };
},
};
}
function formatResults(jobs: Job[]): string {
if (jobs.length === 0) return "No completed jobs.";
const parts: string[] = [];
for (const job of jobs) {
const elapsed = ((Date.now() - job.startTime) / 1000).toFixed(1);
const header = `### ${job.id}${job.label} (${job.status}, ${elapsed}s)`;
if (job.status === "completed") {
parts.push(`${header}\n\n${job.resultText ?? "(no output)"}`);
} else if (job.status === "failed") {
parts.push(`${header}\n\nError: ${job.errorText ?? "unknown error"}`);
} else if (job.status === "cancelled") {
parts.push(`${header}\n\nCancelled.`);
}
}
return parts.join("\n\n---\n\n");
}

View file

@ -0,0 +1,34 @@
/**
* cancel_job tool cancel a running background job.
*/
import type { ToolDefinition } from "@gsd/pi-coding-agent";
import { Type } from "@sinclair/typebox";
import type { AsyncJobManager } from "./job-manager.js";
const schema = Type.Object({
job_id: Type.String({ description: "The background job ID to cancel (e.g. bg_a1b2c3d4)" }),
});
export function createCancelJobTool(getManager: () => AsyncJobManager): ToolDefinition<typeof schema> {
return {
name: "cancel_job",
label: "Cancel Background Job",
description: "Cancel a running background job by its ID.",
parameters: schema,
async execute(_toolCallId, params) {
const manager = getManager();
const result = manager.cancel(params.job_id);
const messages: Record<string, string> = {
cancelled: `Job ${params.job_id} has been cancelled.`,
not_found: `Job ${params.job_id} not found.`,
already_completed: `Job ${params.job_id} has already completed (or failed/cancelled).`,
};
return {
content: [{ type: "text", text: messages[result] ?? `Unknown result: ${result}` }],
};
},
};
}

View file

@ -0,0 +1,133 @@
/**
* Async Jobs Extension
*
* Allows bash commands to run in the background. The agent gets a job ID
* immediately and can continue working. Results are delivered via follow-up
* messages when jobs complete.
*
* Tools:
* async_bash run a command in the background, get a job ID
* await_job wait for background jobs to complete, get results
* cancel_job cancel a running background job
*
* Commands:
* /jobs show running and recent background jobs
*/
import type { ExtensionAPI, ExtensionCommandContext } from "@gsd/pi-coding-agent";
import { AsyncJobManager, type Job } from "./job-manager.js";
import { createAsyncBashTool } from "./async-bash-tool.js";
import { createAwaitTool } from "./await-tool.js";
import { createCancelJobTool } from "./cancel-job-tool.js";
export default function AsyncJobs(pi: ExtensionAPI) {
let manager: AsyncJobManager | null = null;
let latestCwd: string = process.cwd();
function getManager(): AsyncJobManager {
if (!manager) {
throw new Error("AsyncJobManager not initialized. Wait for session_start.");
}
return manager;
}
function getCwd(): string {
return latestCwd;
}
// ── Session lifecycle ──────────────────────────────────────────────────
pi.on("session_start", async (_event, ctx) => {
latestCwd = ctx.cwd;
manager = new AsyncJobManager({
onJobComplete: (job) => {
const statusEmoji = job.status === "completed" ? "done" : "error";
const elapsed = ((Date.now() - job.startTime) / 1000).toFixed(1);
const output = job.status === "completed"
? job.resultText ?? "(no output)"
: `Error: ${job.errorText ?? "unknown error"}`;
// Truncate output for the follow-up message
const maxLen = 2000;
const truncatedOutput = output.length > maxLen
? output.slice(0, maxLen) + "\n\n[... truncated, use await_job for full output]"
: output;
pi.sendMessage(
{
customType: "async_job_result",
content: [
`**Background job ${statusEmoji}: ${job.id}** (${job.label}, ${elapsed}s)`,
"",
truncatedOutput,
].join("\n"),
display: `Background job ${job.id} ${job.status}`,
},
{ deliverAs: "followUp", triggerTurn: true },
);
},
});
});
pi.on("session_shutdown", async () => {
if (manager) {
manager.shutdown();
manager = null;
}
});
// ── Tools ──────────────────────────────────────────────────────────────
pi.registerTool(createAsyncBashTool(getManager, getCwd));
pi.registerTool(createAwaitTool(getManager));
pi.registerTool(createCancelJobTool(getManager));
// ── /jobs command ──────────────────────────────────────────────────────
pi.registerCommand("jobs", {
description: "Show running and recent background jobs",
handler: async (_args: string, _ctx: ExtensionCommandContext) => {
if (!manager) {
pi.sendMessage({
customType: "async_jobs_list",
content: "No async job manager active.",
display: "No jobs",
});
return;
}
const running = manager.getRunningJobs();
const recent = manager.getRecentJobs(10);
const completed = recent.filter((j) => j.status !== "running");
const lines: string[] = ["## Background Jobs"];
if (running.length === 0 && completed.length === 0) {
lines.push("", "No background jobs.");
} else {
if (running.length > 0) {
lines.push("", "### Running");
for (const job of running) {
const elapsed = ((Date.now() - job.startTime) / 1000).toFixed(0);
lines.push(`- **${job.id}** — ${job.label} (${elapsed}s)`);
}
}
if (completed.length > 0) {
lines.push("", "### Recent");
for (const job of completed) {
const elapsed = ((Date.now() - job.startTime) / 1000).toFixed(1);
lines.push(`- **${job.id}** — ${job.label} (${job.status}, ${elapsed}s)`);
}
}
}
pi.sendMessage({
customType: "async_jobs_list",
content: lines.join("\n"),
display: `${running.length} running, ${completed.length} recent`,
});
},
});
}

View file

@ -0,0 +1,250 @@
/**
* AsyncJobManager manages background tool call jobs.
*
* Each job runs asynchronously and delivers its result via a callback
* when complete. Jobs are evicted after a configurable TTL.
*/
import { randomUUID } from "node:crypto";
// ── Types ──────────────────────────────────────────────────────────────────
export type JobStatus = "running" | "completed" | "failed" | "cancelled";
export type JobType = "bash";
export interface Job {
id: string;
type: JobType;
status: JobStatus;
startTime: number;
label: string;
abortController: AbortController;
promise: Promise<void>;
resultText?: string;
errorText?: string;
}
export interface JobManagerOptions {
maxRunning?: number; // default 15
maxTotal?: number; // default 100
evictionMs?: number; // default 5 minutes
onJobComplete?: (job: Job) => void;
}
// ── Delivery Retry ─────────────────────────────────────────────────────────
const DELIVERY_BASE_MS = 500;
const DELIVERY_MAX_MS = 30_000;
const DELIVERY_JITTER_MS = 200;
// ── Manager ────────────────────────────────────────────────────────────────
export class AsyncJobManager {
private jobs = new Map<string, Job>();
private deliveryTimers = new Map<string, ReturnType<typeof setTimeout>>();
private acknowledgedJobs = new Set<string>();
private evictionTimers = new Map<string, ReturnType<typeof setTimeout>>();
private maxRunning: number;
private maxTotal: number;
private evictionMs: number;
private onJobComplete?: (job: Job) => void;
constructor(options: JobManagerOptions = {}) {
this.maxRunning = options.maxRunning ?? 15;
this.maxTotal = options.maxTotal ?? 100;
this.evictionMs = options.evictionMs ?? 5 * 60 * 1000;
this.onJobComplete = options.onJobComplete;
}
/**
* Register a new background job.
* @returns job ID (prefixed with `bg_`)
*/
register(
type: JobType,
label: string,
runFn: (signal: AbortSignal) => Promise<string>,
): string {
// Enforce limits
const running = this.getRunningJobs();
if (running.length >= this.maxRunning) {
throw new Error(
`Maximum concurrent background jobs reached (${this.maxRunning}). ` +
`Use await_job or cancel_job to free a slot.`,
);
}
if (this.jobs.size >= this.maxTotal) {
// Evict oldest completed job
this.evictOldest();
if (this.jobs.size >= this.maxTotal) {
throw new Error(
`Maximum total background jobs reached (${this.maxTotal}). ` +
`Use cancel_job to remove jobs.`,
);
}
}
const id = `bg_${randomUUID().slice(0, 8)}`;
const abortController = new AbortController();
// Declare job first so the promise callbacks can close over it safely.
const job: Job = {
id,
type,
status: "running",
startTime: Date.now(),
label,
abortController,
// promise assigned below
promise: undefined as unknown as Promise<void>,
};
job.promise = runFn(abortController.signal)
.then((resultText) => {
job.status = "completed";
job.resultText = resultText;
this.scheduleEviction(id);
this.deliverResult(job);
})
.catch((err) => {
if (job.status === "cancelled") {
// Already cancelled — don't overwrite
this.scheduleEviction(id);
return;
}
job.status = "failed";
job.errorText = err instanceof Error ? err.message : String(err);
this.scheduleEviction(id);
this.deliverResult(job);
});
this.jobs.set(id, job);
return id;
}
/**
* Cancel a running job.
*/
cancel(id: string): "cancelled" | "not_found" | "already_completed" {
const job = this.jobs.get(id);
if (!job) return "not_found";
if (job.status !== "running") return "already_completed";
job.status = "cancelled";
job.errorText = "Cancelled by user";
job.abortController.abort();
this.scheduleEviction(id);
return "cancelled";
}
getJob(id: string): Job | undefined {
return this.jobs.get(id);
}
getRunningJobs(): Job[] {
return [...this.jobs.values()].filter((j) => j.status === "running");
}
getRecentJobs(limit = 10): Job[] {
return [...this.jobs.values()]
.sort((a, b) => b.startTime - a.startTime)
.slice(0, limit);
}
getAllJobs(): Job[] {
return [...this.jobs.values()];
}
/**
* Mark jobs as acknowledged so delivery retries stop.
*/
acknowledgeDeliveries(jobIds: string[]): void {
for (const id of jobIds) {
this.acknowledgedJobs.add(id);
const timer = this.deliveryTimers.get(id);
if (timer) {
clearTimeout(timer);
this.deliveryTimers.delete(id);
}
}
}
/**
* Cleanup all timers and resources.
*/
shutdown(): void {
for (const timer of this.deliveryTimers.values()) {
clearTimeout(timer);
}
this.deliveryTimers.clear();
for (const timer of this.evictionTimers.values()) {
clearTimeout(timer);
}
this.evictionTimers.clear();
// Abort all running jobs
for (const job of this.jobs.values()) {
if (job.status === "running") {
job.status = "cancelled";
job.abortController.abort();
}
}
}
// ── Private ────────────────────────────────────────────────────────────
private deliverResult(job: Job, attempt = 0): void {
if (this.acknowledgedJobs.has(job.id)) return;
if (!this.onJobComplete) return;
this.onJobComplete(job);
// Schedule retry with exponential backoff + jitter
const delay = Math.min(
DELIVERY_BASE_MS * Math.pow(2, attempt) + Math.random() * DELIVERY_JITTER_MS,
DELIVERY_MAX_MS,
);
const timer = setTimeout(() => {
this.deliveryTimers.delete(job.id);
if (!this.acknowledgedJobs.has(job.id)) {
this.deliverResult(job, attempt + 1);
}
}, delay);
this.deliveryTimers.set(job.id, timer);
}
private scheduleEviction(id: string): void {
const existing = this.evictionTimers.get(id);
if (existing) clearTimeout(existing);
const timer = setTimeout(() => {
this.evictionTimers.delete(id);
this.jobs.delete(id);
this.acknowledgedJobs.delete(id);
}, this.evictionMs);
this.evictionTimers.set(id, timer);
}
private evictOldest(): void {
let oldest: Job | undefined;
for (const job of this.jobs.values()) {
if (job.status !== "running") {
if (!oldest || job.startTime < oldest.startTime) {
oldest = job;
}
}
}
if (oldest) {
const timer = this.evictionTimers.get(oldest.id);
if (timer) clearTimeout(timer);
this.evictionTimers.delete(oldest.id);
this.jobs.delete(oldest.id);
this.acknowledgedJobs.delete(oldest.id);
}
}
}