From d9a3aabf75d0b94a9c2ea1763736bced4d9be151 Mon Sep 17 00:00:00 2001 From: drkthng Date: Mon, 13 Apr 2026 14:10:09 +0200 Subject: [PATCH] fix(async-jobs): suppress stale follow-up for jobs consumed by await_job (#3787) (#3788) The queueMicrotask() deferral in deliverResult() only prevented duplicate follow-ups when a job completed *while* await_job was blocked in Promise.race(). For jobs that completed before await_job was called (common in multi-turn interactive sessions), the microtask had already fired and queued the follow-up message before suppressFollowUp could run. Fix: replace queueMicrotask with setTimeout(0), storing the timer handle on the job object. suppressFollowUp() (new method on AsyncJobManager) cancels that timer and marks awaited = true atomically, handling both the within-turn and cross-turn cases. await-tool.ts now calls manager.suppressFollowUp(id) instead of directly setting j.awaited = true, which gives it the cancellable timer path. Adds a regression test specifically for the cross-turn case. --- .../extensions/async-jobs/await-tool.test.ts | 47 ++++++++++++++++--- .../extensions/async-jobs/await-tool.ts | 11 +++-- .../extensions/async-jobs/job-manager.ts | 36 ++++++++++++-- 3 files changed, 80 insertions(+), 14 deletions(-) diff --git a/src/resources/extensions/async-jobs/await-tool.test.ts b/src/resources/extensions/async-jobs/await-tool.test.ts index 1ed49161c..9e508d925 100644 --- a/src/resources/extensions/async-jobs/await-tool.test.ts +++ b/src/resources/extensions/async-jobs/await-tool.test.ts @@ -119,12 +119,10 @@ test("await_job returns not-found message for invalid job IDs", async () => { manager.shutdown(); }); -test("await_job marks jobs as awaited to suppress follow-up delivery (#2248)", async () => { +test("await_job suppresses follow-up for jobs that complete while awaiting (#2248)", async () => { const followUps: string[] = []; const manager = new AsyncJobManager({ - onJobComplete: (job) => { - if (!job.awaited) followUps.push(job.id); - }, + onJobComplete: (job) => followUps.push(job.id), }); const tool = createAwaitTool(() => manager); @@ -133,13 +131,48 @@ test("await_job marks jobs as awaited to suppress follow-up delivery (#2248)", a return new Promise((resolve) => setTimeout(() => resolve("result"), 50)); }); - // await_job consumes the result — should mark as awaited before promise resolves + // await_job consumes the result — suppressFollowUp() should cancel delivery timer await tool.execute("tc7", { jobs: [jobId] }, noopSignal, () => {}, undefined as never); - // Give the onJobComplete callback a tick to fire + // Give the onJobComplete callback a tick to fire (if suppression failed) await new Promise((r) => setTimeout(r, 50)); - assert.equal(followUps.length, 0, "onJobComplete should not deliver follow-up for awaited jobs"); + assert.equal(followUps.length, 0, "onJobComplete should not fire for jobs consumed by await_job"); + + manager.shutdown(); +}); + +test("await_job suppresses follow-up for already-completed jobs (cross-turn case) (#3787)", async () => { + // This is the key regression: job completes in a prior LLM turn, then + // await_job is called in a later turn. The delivery timer must still be + // cancellable at that point. + const followUps: string[] = []; + const manager = new AsyncJobManager({ + onJobComplete: (job) => followUps.push(job.id), + }); + const tool = createAwaitTool(() => manager); + + // Register and let the job complete fully before calling await_job + const jobId = manager.register("bash", "pre-completed-job", async () => "done"); + const job = manager.getJob(jobId)!; + await job.promise; + + // Simulate a "later turn" by yielding to the event loop — this lets any + // queueMicrotask callbacks run, but the setTimeout(0) delivery timer has + // not yet fired (it's scheduled for the next macrotask). + await new Promise((r) => setImmediate(r)); + + // Now call await_job — suppressFollowUp() should cancel the pending timer + await tool.execute("tc7b", { jobs: [jobId] }, noopSignal, () => {}, undefined as never); + + // Drain the macrotask queue — the (now-cancelled) timer would have fired here + await new Promise((r) => setTimeout(r, 50)); + + assert.equal( + followUps.length, + 0, + "onJobComplete should not fire for already-completed jobs consumed by await_job", + ); manager.shutdown(); }); diff --git a/src/resources/extensions/async-jobs/await-tool.ts b/src/resources/extensions/async-jobs/await-tool.ts index bab79270a..8d7e8c85c 100644 --- a/src/resources/extensions/async-jobs/await-tool.ts +++ b/src/resources/extensions/async-jobs/await-tool.ts @@ -66,10 +66,13 @@ export function createAwaitTool(getManager: () => AsyncJobManager): ToolDefiniti } } - // Mark all watched jobs as awaited upfront so the onJobComplete - // callback (which fires synchronously in the promise .then()) knows - // to suppress the follow-up message. - for (const j of watched) j.awaited = true; + // Suppress follow-up notifications for all watched jobs upfront. + // suppressFollowUp() cancels the pending delivery timer (if any), which + // handles both the within-turn case (job completes while we await) and + // the cross-turn case (job already completed before await_job was called). + // Previously this only set j.awaited = true, which missed the cross-turn + // case because the queueMicrotask had already fired (#3787). + for (const j of watched) manager.suppressFollowUp(j.id); // If all watched jobs are already done, return immediately const running = watched.filter((j) => j.status === "running"); diff --git a/src/resources/extensions/async-jobs/job-manager.ts b/src/resources/extensions/async-jobs/job-manager.ts index 10ce3cd41..ac5ab2abb 100644 --- a/src/resources/extensions/async-jobs/job-manager.ts +++ b/src/resources/extensions/async-jobs/job-manager.ts @@ -24,6 +24,12 @@ export interface Job { errorText?: string; /** Set by await_job when results are consumed. Suppresses follow-up delivery. */ awaited?: boolean; + /** + * Handle for the pending follow-up delivery timer (set by deliverResult). + * Stored so suppressFollowUp() can cancel it before the notification fires, + * even when await_job is called after the job has already completed (#3787). + */ + deliveryTimer?: ReturnType; } export interface JobManagerOptions { @@ -170,12 +176,36 @@ export class AsyncJobManager { // ── Private ──────────────────────────────────────────────────────────── + /** + * Suppress follow-up notification for a job — cancels any pending delivery + * timer and marks the job as awaited. Safe to call at any time, including + * before or after the job completes (#3787). + */ + suppressFollowUp(id: string): void { + const job = this.jobs.get(id); + if (!job) return; + job.awaited = true; + if (job.deliveryTimer !== undefined) { + clearTimeout(job.deliveryTimer); + job.deliveryTimer = undefined; + } + } + private deliverResult(job: Job): void { if (!this.onJobComplete) return; - // Defer delivery by one microtask so await_job's .then() chain runs first - // and can set job.awaited = true before onJobComplete checks it (#2762). + // Use setTimeout(0) instead of queueMicrotask so the handle is cancellable. + // suppressFollowUp() can clear this timer even when await_job is called in + // a later LLM turn (after the job already completed). queueMicrotask ran + // immediately and could not be cancelled (#2762, #3787). const cb = this.onJobComplete; - queueMicrotask(() => cb(job)); + job.deliveryTimer = setTimeout(() => { + job.deliveryTimer = undefined; + if (!job.awaited) cb(job); + }, 0); + // Allow process to exit even if timer is pending + if (typeof job.deliveryTimer === "object" && "unref" in job.deliveryTimer) { + (job.deliveryTimer as NodeJS.Timeout).unref(); + } } private scheduleEviction(id: string): void {