fix(gsd): prevent concurrent dispatch during skip chains (#1272) (#1283)

This commit is contained in:
Jeremy McSpadden 2026-03-18 18:27:05 -05:00 committed by GitHub
parent 2f729fe051
commit f6c980db81
3 changed files with 173 additions and 14 deletions

6
package-lock.json generated
View file

@ -1,12 +1,12 @@
{
"name": "gsd-pi",
"version": "2.28.0",
"version": "2.31.2",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "gsd-pi",
"version": "2.28.0",
"version": "2.31.2",
"hasInstallScript": true,
"license": "MIT",
"workspaces": [
@ -9101,7 +9101,7 @@
},
"packages/pi-coding-agent": {
"name": "@gsd/pi-coding-agent",
"version": "0.57.1",
"version": "2.31.2",
"dependencies": {
"@mariozechner/jiti": "^2.6.2",
"@silvia-odwyer/photon-node": "^0.3.4",

View file

@ -833,6 +833,9 @@ export async function handleAgentEnd(
// permanently stalled with no unit running and no watchdog set.
if (s.pendingAgentEndRetry) {
s.pendingAgentEndRetry = false;
// Clear gap watchdog from the previous cycle to prevent concurrent
// dispatch when the deferred handleAgentEnd calls dispatchNextUnit (#1272).
clearDispatchGapWatchdog();
setImmediate(() => {
handleAgentEnd(ctx, pi).catch((err) => {
const msg = err instanceof Error ? err.message : String(err);
@ -975,8 +978,12 @@ async function dispatchNextUnit(
return;
}
// Reentrancy guard
if (s.dispatching && s.skipDepth === 0) {
// Reentrancy guard — unconditional to prevent concurrent dispatch from
// gap watchdog or pendingAgentEndRetry during skip chains (#1272).
// Previously the guard was bypassed when skipDepth > 0, but the recursive
// skip chain's inner finally block resets s.dispatching = false before the
// outer call's finally runs, opening a window for concurrent entry.
if (s.dispatching) {
debugLog("dispatchNextUnit reentrancy guard — another dispatch in progress, bailing");
return;
}
@ -1448,8 +1455,12 @@ async function dispatchNextUnit(
}
if (dispatchResult.action !== "dispatch") {
await new Promise(r => setImmediate(r));
await dispatchNextUnit(ctx, pi);
// Defer re-dispatch to next microtask so s.dispatching is released first,
// preventing reentrancy guard bypass during concurrent entry (#1272).
setImmediate(() => dispatchNextUnit(ctx, pi).catch(err => {
ctx.ui.notify(`Deferred dispatch failed: ${err instanceof Error ? err.message : String(err)}`, "error");
pauseAuto(ctx, pi).catch(() => {});
}));
return;
}
@ -1467,8 +1478,10 @@ async function dispatchNextUnit(
}
if (preDispatchResult.action === "skip") {
ctx.ui.notify(`Skipping ${unitType} ${unitId} (pre-dispatch hook).`, "info");
await new Promise(r => setImmediate(r));
await dispatchNextUnit(ctx, pi);
setImmediate(() => dispatchNextUnit(ctx, pi).catch(err => {
ctx.ui.notify(`Deferred dispatch failed: ${err instanceof Error ? err.message : String(err)}`, "error");
pauseAuto(ctx, pi).catch(() => {});
}));
return;
}
if (preDispatchResult.action === "replace") {
@ -1499,9 +1512,16 @@ async function dispatchNextUnit(
if (idempotencyResult.reason === "completed" || idempotencyResult.reason === "fallback-persisted" || idempotencyResult.reason === "phantom-loop-cleared" || idempotencyResult.reason === "evicted") {
if (!s.active) return;
s.skipDepth++;
await new Promise(r => setTimeout(r, idempotencyResult.reason === "phantom-loop-cleared" ? 50 : 150));
await dispatchNextUnit(ctx, pi);
s.skipDepth = Math.max(0, s.skipDepth - 1);
const skipDelay = idempotencyResult.reason === "phantom-loop-cleared" ? 50 : 150;
// Defer re-dispatch so s.dispatching is released first (#1272).
setTimeout(() => {
dispatchNextUnit(ctx, pi).catch(err => {
ctx.ui.notify(`Deferred skip-dispatch failed: ${err instanceof Error ? err.message : String(err)}`, "error");
pauseAuto(ctx, pi).catch(() => {});
}).finally(() => {
s.skipDepth = Math.max(0, s.skipDepth - 1);
});
}, skipDelay);
return;
}
} else if (idempotencyResult.action === "stop") {
@ -1532,8 +1552,11 @@ async function dispatchNextUnit(
return;
}
if (stuckResult.action === "recovered" && stuckResult.dispatchAgain) {
await new Promise(r => setImmediate(r));
await dispatchNextUnit(ctx, pi);
// Defer re-dispatch so s.dispatching is released first (#1272).
setImmediate(() => dispatchNextUnit(ctx, pi).catch(err => {
ctx.ui.notify(`Deferred recovery-dispatch failed: ${err instanceof Error ? err.message : String(err)}`, "error");
pauseAuto(ctx, pi).catch(() => {});
}));
return;
}
@ -1779,6 +1802,15 @@ export {
export function _getUnitConsecutiveSkips(): Map<string, number> { return s.unitConsecutiveSkips; }
export function _resetUnitConsecutiveSkips(): void { s.unitConsecutiveSkips.clear(); }
/**
* Test-only: expose dispatching / skipDepth state for reentrancy guard tests.
* Not part of the public API.
*/
export function _getDispatching(): boolean { return s.dispatching; }
export function _setDispatching(v: boolean): void { s.dispatching = v; }
export function _getSkipDepth(): number { return s.skipDepth; }
export function _setSkipDepth(v: number): void { s.skipDepth = v; }
/**
* Dispatch a hook unit directly, bypassing normal pre-dispatch hooks.
* Used for manual hook triggers via /gsd run-hook.

View file

@ -0,0 +1,127 @@
/**
* auto-reentrancy-guard.test.ts Tests for the unconditional reentrancy guard.
*
* Regression for #1272: auto-mode stuck-loop where gap watchdog or
* pendingAgentEndRetry could enter dispatchNextUnit concurrently during
* recursive skip chains because the reentrancy guard was bypassed when
* skipDepth > 0.
*
* The fix makes the guard unconditional (`if (s.dispatching)` without
* `&& s.skipDepth === 0`), and defers recursive re-dispatch via
* setImmediate/setTimeout so s.dispatching is released first.
*/
import {
_getDispatching,
_setDispatching,
_getSkipDepth,
_setSkipDepth,
} from "../auto.ts";
import { createTestContext } from "./test-helpers.ts";
const { assertEq, assertTrue, report } = createTestContext();
async function main(): Promise<void> {
// ─── Test-only accessors work ───────────────────────────────────────────
console.log("\n=== reentrancy guard: test accessors round-trip ===");
{
_setDispatching(false);
assertEq(_getDispatching(), false, "dispatching starts false");
_setDispatching(true);
assertEq(_getDispatching(), true, "dispatching set to true");
_setDispatching(false);
assertEq(_getDispatching(), false, "dispatching reset to false");
}
// ─── skipDepth accessors ────────────────────────────────────────────────
console.log("\n=== reentrancy guard: skipDepth accessors round-trip ===");
{
_setSkipDepth(0);
assertEq(_getSkipDepth(), 0, "skipDepth starts at 0");
_setSkipDepth(3);
assertEq(_getSkipDepth(), 3, "skipDepth set to 3");
_setSkipDepth(0);
assertEq(_getSkipDepth(), 0, "skipDepth reset to 0");
}
// ─── Guard blocks even when skipDepth > 0 (#1272 regression) ───────────
console.log("\n=== reentrancy guard: blocks when dispatching=true regardless of skipDepth ===");
{
// Simulate the scenario from #1272: dispatching=true + skipDepth>0
// The old guard (`if (s.dispatching && s.skipDepth === 0)`) would allow
// concurrent entry when skipDepth > 0. The fix makes the check
// unconditional on skipDepth.
_setDispatching(true);
_setSkipDepth(2);
// Verify dispatching is true — guard should block regardless of skipDepth
assertTrue(
_getDispatching() === true,
"dispatching flag is true during skip chain"
);
// The actual reentrancy guard in dispatchNextUnit checks:
// if (s.dispatching) { return; }
// We verify the state that would trigger the guard:
const wouldBlock = _getDispatching(); // unconditional check
const wouldBlockOld = _getDispatching() && _getSkipDepth() === 0; // old check
assertTrue(wouldBlock === true, "new guard blocks when dispatching=true, skipDepth=2");
assertTrue(wouldBlockOld === false, "old guard WOULD NOT block when dispatching=true, skipDepth=2 (the bug)");
// Clean up
_setDispatching(false);
_setSkipDepth(0);
}
// ─── Guard allows entry when dispatching=false ──────────────────────────
console.log("\n=== reentrancy guard: allows entry when dispatching=false ===");
{
_setDispatching(false);
_setSkipDepth(0);
assertTrue(!_getDispatching(), "guard allows entry when dispatching=false, skipDepth=0");
_setDispatching(false);
_setSkipDepth(3);
assertTrue(!_getDispatching(), "guard allows entry when dispatching=false, skipDepth=3");
_setSkipDepth(0);
}
// ─── skipDepth does not affect guard decision (the fix) ─────────────────
console.log("\n=== reentrancy guard: skipDepth is irrelevant to guard decision ===");
{
for (const depth of [0, 1, 2, 5]) {
_setDispatching(true);
_setSkipDepth(depth);
assertTrue(
_getDispatching() === true,
`guard blocks at skipDepth=${depth} when dispatching=true`
);
}
for (const depth of [0, 1, 2, 5]) {
_setDispatching(false);
_setSkipDepth(depth);
assertTrue(
_getDispatching() === false,
`guard allows at skipDepth=${depth} when dispatching=false`
);
}
// Clean up
_setDispatching(false);
_setSkipDepth(0);
}
report();
}
main().catch((err) => {
console.error(err);
process.exit(1);
});