The queueMicrotask() deferral in deliverResult() only prevented duplicate follow-ups when a job completed *while* await_job was blocked in Promise.race(). For jobs that completed before await_job was called (common in multi-turn interactive sessions), the microtask had already fired and queued the follow-up message before suppressFollowUp could run. Fix: replace queueMicrotask with setTimeout(0), storing the timer handle on the job object. suppressFollowUp() (new method on AsyncJobManager) cancels that timer and marks awaited = true atomically, handling both the within-turn and cross-turn cases. await-tool.ts now calls manager.suppressFollowUp(id) instead of directly setting j.awaited = true, which gives it the cancellable timer path. Adds a regression test specifically for the cross-turn case.
This commit is contained in:
parent
a460463371
commit
d9a3aabf75
3 changed files with 80 additions and 14 deletions
|
|
@ -119,12 +119,10 @@ test("await_job returns not-found message for invalid job IDs", async () => {
|
|||
manager.shutdown();
|
||||
});
|
||||
|
||||
test("await_job marks jobs as awaited to suppress follow-up delivery (#2248)", async () => {
|
||||
test("await_job suppresses follow-up for jobs that complete while awaiting (#2248)", async () => {
|
||||
const followUps: string[] = [];
|
||||
const manager = new AsyncJobManager({
|
||||
onJobComplete: (job) => {
|
||||
if (!job.awaited) followUps.push(job.id);
|
||||
},
|
||||
onJobComplete: (job) => followUps.push(job.id),
|
||||
});
|
||||
const tool = createAwaitTool(() => manager);
|
||||
|
||||
|
|
@ -133,13 +131,48 @@ test("await_job marks jobs as awaited to suppress follow-up delivery (#2248)", a
|
|||
return new Promise<string>((resolve) => setTimeout(() => resolve("result"), 50));
|
||||
});
|
||||
|
||||
// await_job consumes the result — should mark as awaited before promise resolves
|
||||
// await_job consumes the result — suppressFollowUp() should cancel delivery timer
|
||||
await tool.execute("tc7", { jobs: [jobId] }, noopSignal, () => {}, undefined as never);
|
||||
|
||||
// Give the onJobComplete callback a tick to fire
|
||||
// Give the onJobComplete callback a tick to fire (if suppression failed)
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
assert.equal(followUps.length, 0, "onJobComplete should not deliver follow-up for awaited jobs");
|
||||
assert.equal(followUps.length, 0, "onJobComplete should not fire for jobs consumed by await_job");
|
||||
|
||||
manager.shutdown();
|
||||
});
|
||||
|
||||
test("await_job suppresses follow-up for already-completed jobs (cross-turn case) (#3787)", async () => {
|
||||
// This is the key regression: job completes in a prior LLM turn, then
|
||||
// await_job is called in a later turn. The delivery timer must still be
|
||||
// cancellable at that point.
|
||||
const followUps: string[] = [];
|
||||
const manager = new AsyncJobManager({
|
||||
onJobComplete: (job) => followUps.push(job.id),
|
||||
});
|
||||
const tool = createAwaitTool(() => manager);
|
||||
|
||||
// Register and let the job complete fully before calling await_job
|
||||
const jobId = manager.register("bash", "pre-completed-job", async () => "done");
|
||||
const job = manager.getJob(jobId)!;
|
||||
await job.promise;
|
||||
|
||||
// Simulate a "later turn" by yielding to the event loop — this lets any
|
||||
// queueMicrotask callbacks run, but the setTimeout(0) delivery timer has
|
||||
// not yet fired (it's scheduled for the next macrotask).
|
||||
await new Promise((r) => setImmediate(r));
|
||||
|
||||
// Now call await_job — suppressFollowUp() should cancel the pending timer
|
||||
await tool.execute("tc7b", { jobs: [jobId] }, noopSignal, () => {}, undefined as never);
|
||||
|
||||
// Drain the macrotask queue — the (now-cancelled) timer would have fired here
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
assert.equal(
|
||||
followUps.length,
|
||||
0,
|
||||
"onJobComplete should not fire for already-completed jobs consumed by await_job",
|
||||
);
|
||||
|
||||
manager.shutdown();
|
||||
});
|
||||
|
|
|
|||
|
|
@ -66,10 +66,13 @@ export function createAwaitTool(getManager: () => AsyncJobManager): ToolDefiniti
|
|||
}
|
||||
}
|
||||
|
||||
// Mark all watched jobs as awaited upfront so the onJobComplete
|
||||
// callback (which fires synchronously in the promise .then()) knows
|
||||
// to suppress the follow-up message.
|
||||
for (const j of watched) j.awaited = true;
|
||||
// Suppress follow-up notifications for all watched jobs upfront.
|
||||
// suppressFollowUp() cancels the pending delivery timer (if any), which
|
||||
// handles both the within-turn case (job completes while we await) and
|
||||
// the cross-turn case (job already completed before await_job was called).
|
||||
// Previously this only set j.awaited = true, which missed the cross-turn
|
||||
// case because the queueMicrotask had already fired (#3787).
|
||||
for (const j of watched) manager.suppressFollowUp(j.id);
|
||||
|
||||
// If all watched jobs are already done, return immediately
|
||||
const running = watched.filter((j) => j.status === "running");
|
||||
|
|
|
|||
|
|
@ -24,6 +24,12 @@ export interface Job {
|
|||
errorText?: string;
|
||||
/** Set by await_job when results are consumed. Suppresses follow-up delivery. */
|
||||
awaited?: boolean;
|
||||
/**
|
||||
* Handle for the pending follow-up delivery timer (set by deliverResult).
|
||||
* Stored so suppressFollowUp() can cancel it before the notification fires,
|
||||
* even when await_job is called after the job has already completed (#3787).
|
||||
*/
|
||||
deliveryTimer?: ReturnType<typeof setTimeout>;
|
||||
}
|
||||
|
||||
export interface JobManagerOptions {
|
||||
|
|
@ -170,12 +176,36 @@ export class AsyncJobManager {
|
|||
|
||||
// ── Private ────────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Suppress follow-up notification for a job — cancels any pending delivery
|
||||
* timer and marks the job as awaited. Safe to call at any time, including
|
||||
* before or after the job completes (#3787).
|
||||
*/
|
||||
suppressFollowUp(id: string): void {
|
||||
const job = this.jobs.get(id);
|
||||
if (!job) return;
|
||||
job.awaited = true;
|
||||
if (job.deliveryTimer !== undefined) {
|
||||
clearTimeout(job.deliveryTimer);
|
||||
job.deliveryTimer = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
private deliverResult(job: Job): void {
|
||||
if (!this.onJobComplete) return;
|
||||
// Defer delivery by one microtask so await_job's .then() chain runs first
|
||||
// and can set job.awaited = true before onJobComplete checks it (#2762).
|
||||
// Use setTimeout(0) instead of queueMicrotask so the handle is cancellable.
|
||||
// suppressFollowUp() can clear this timer even when await_job is called in
|
||||
// a later LLM turn (after the job already completed). queueMicrotask ran
|
||||
// immediately and could not be cancelled (#2762, #3787).
|
||||
const cb = this.onJobComplete;
|
||||
queueMicrotask(() => cb(job));
|
||||
job.deliveryTimer = setTimeout(() => {
|
||||
job.deliveryTimer = undefined;
|
||||
if (!job.awaited) cb(job);
|
||||
}, 0);
|
||||
// Allow process to exit even if timer is pending
|
||||
if (typeof job.deliveryTimer === "object" && "unref" in job.deliveryTimer) {
|
||||
(job.deliveryTimer as NodeJS.Timeout).unref();
|
||||
}
|
||||
}
|
||||
|
||||
private scheduleEviction(id: string): void {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue