refactor(gsd): replace session-scoped promise bridge with per-unit one-shot (#1595)
Move `pendingResolve` and `sessionSwitchInFlight` from AutoSession to module-level variables in auto-loop.ts (`_currentResolve`, `_sessionSwitchInFlight`). Remove `pendingAgentEndQueue` entirely — agent_end events arriving with no pending resolver are now dropped (with a debug warning) instead of queued. This eliminates the `_activeSession` singleton, the queue drain logic in `runUnit`, and three properties from `AutoSession.reset()`. Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
900d2fbd7c
commit
65dca68242
4 changed files with 69 additions and 172 deletions
|
|
@ -5,9 +5,9 @@
|
|||
* pattern with a while loop. The agent_end event resolves a promise instead
|
||||
* of recursing.
|
||||
*
|
||||
* MAINTENANCE RULE: The only module-level mutable state here is `_activeSession`,
|
||||
* used by the agent_end bridge. Promise state itself lives on AutoSession so
|
||||
* concurrent auto sessions cannot corrupt each other.
|
||||
* MAINTENANCE RULE: Module-level mutable state is limited to `_currentResolve`
|
||||
* (per-unit one-shot resolver) and `_sessionSwitchInFlight` (guard for
|
||||
* session rotation). No queue — stale agent_end events are dropped.
|
||||
*/
|
||||
|
||||
import type { ExtensionAPI, ExtensionContext } from "@gsd/pi-coding-agent";
|
||||
|
|
@ -67,17 +67,15 @@ export interface UnitResult {
|
|||
event?: AgentEndEvent;
|
||||
}
|
||||
|
||||
// ─── Session-scoped promise state ───────────────────────────────────────────
|
||||
// ─── Per-unit one-shot promise state ────────────────────────────────────────
|
||||
//
|
||||
// pendingResolve and pendingAgentEndQueue live on AutoSession (not module-level)
|
||||
// so concurrent sessions cannot corrupt each other's promises.
|
||||
// A single module-level resolve function scoped to the current unit execution.
|
||||
// No queue — if an agent_end arrives with no pending resolver, it is dropped
|
||||
// (logged as warning). This is simpler and safer than the previous session-
|
||||
// scoped pendingResolve + pendingAgentEndQueue pattern.
|
||||
|
||||
/**
|
||||
* The singleton session reference used by resolveAgentEnd. Set by autoLoop
|
||||
* on entry so that the agent_end handler in index.ts can resolve the correct
|
||||
* session's promise without needing a direct reference to `s`.
|
||||
*/
|
||||
let _activeSession: AutoSession | null = null;
|
||||
let _currentResolve: ((result: UnitResult) => void) | null = null;
|
||||
let _sessionSwitchInFlight = false;
|
||||
|
||||
// ─── resolveAgentEnd ─────────────────────────────────────────────────────────
|
||||
|
||||
|
|
@ -86,61 +84,48 @@ let _activeSession: AutoSession | null = null;
|
|||
* in-flight unit promise. One-shot: the resolver is nulled before calling
|
||||
* to prevent double-resolution from model fallback retries.
|
||||
*
|
||||
* If no pendingResolve exists (event arrived between loop iterations),
|
||||
* the event is queued on the session so the next runUnit can drain it.
|
||||
* If no resolver exists (event arrived between loop iterations or during
|
||||
* session switch), the event is dropped with a debug warning.
|
||||
*/
|
||||
export function resolveAgentEnd(event: AgentEndEvent): void {
|
||||
const s = _activeSession;
|
||||
if (!s) {
|
||||
debugLog("resolveAgentEnd", {
|
||||
status: "no-active-session",
|
||||
warning: "agent_end with no active loop session",
|
||||
});
|
||||
if (_sessionSwitchInFlight) {
|
||||
debugLog("resolveAgentEnd", { status: "ignored-during-switch" });
|
||||
return;
|
||||
}
|
||||
|
||||
if (s.pendingResolve) {
|
||||
if (_currentResolve) {
|
||||
debugLog("resolveAgentEnd", { status: "resolving", hasEvent: true });
|
||||
const r = s.pendingResolve;
|
||||
s.pendingResolve = null;
|
||||
const r = _currentResolve;
|
||||
_currentResolve = null;
|
||||
r({ status: "completed", event });
|
||||
} else {
|
||||
// Queue the event so the next runUnit picks it up immediately
|
||||
debugLog("resolveAgentEnd", {
|
||||
status: "queued",
|
||||
queueLength: s.pendingAgentEndQueue.length + 1,
|
||||
unitId: s.currentUnit?.id,
|
||||
warning:
|
||||
"agent_end arrived between loop iterations — queued for next runUnit",
|
||||
status: "no-pending-resolve",
|
||||
warning: "agent_end with no pending unit",
|
||||
});
|
||||
s.pendingAgentEndQueue.push({ ...event, unitId: s.currentUnit?.id });
|
||||
}
|
||||
}
|
||||
|
||||
export function isSessionSwitchInFlight(): boolean {
|
||||
return _activeSession?.sessionSwitchInFlight ?? false;
|
||||
return _sessionSwitchInFlight;
|
||||
}
|
||||
|
||||
// ─── resetPendingResolve (test helper) ───────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Reset session promise state. Only exported for test cleanup — production code
|
||||
* should never call this.
|
||||
* Reset module-level promise state. Only exported for test cleanup —
|
||||
* production code should never call this.
|
||||
*/
|
||||
export function _resetPendingResolve(): void {
|
||||
if (_activeSession) {
|
||||
_activeSession.pendingResolve = null;
|
||||
_activeSession.pendingAgentEndQueue = [];
|
||||
}
|
||||
_activeSession = null;
|
||||
_currentResolve = null;
|
||||
_sessionSwitchInFlight = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the active session for resolveAgentEnd. Only exported for test setup —
|
||||
* production code sets this via autoLoop entry.
|
||||
* No-op for backward compatibility with tests that previously set the
|
||||
* active session. The module no longer holds a session reference.
|
||||
*/
|
||||
export function _setActiveSession(session: AutoSession | null): void {
|
||||
_activeSession = session;
|
||||
export function _setActiveSession(_session: AutoSession | null): void {
|
||||
// No-op — kept for test backward compatibility
|
||||
}
|
||||
|
||||
// ─── runUnit ─────────────────────────────────────────────────────────────────
|
||||
|
|
@ -164,64 +149,15 @@ export async function runUnit(
|
|||
): Promise<UnitResult> {
|
||||
debugLog("runUnit", { phase: "start", unitType, unitId });
|
||||
|
||||
// ── Drain queued events from error-recovery retries ──
|
||||
// If an agent_end arrived between iterations (e.g. from a model fallback
|
||||
// sendMessage retry), consume it immediately instead of creating a new promise.
|
||||
// Cap queue to 3 entries to prevent unbounded growth from stale events.
|
||||
if (s.pendingAgentEndQueue.length > 3) {
|
||||
debugLog("runUnit", {
|
||||
phase: "queue-overflow",
|
||||
dropped: s.pendingAgentEndQueue.length - 1,
|
||||
unitType,
|
||||
unitId,
|
||||
});
|
||||
s.pendingAgentEndQueue = [
|
||||
s.pendingAgentEndQueue[s.pendingAgentEndQueue.length - 1]!,
|
||||
];
|
||||
}
|
||||
if (s.pendingAgentEndQueue.length > 0) {
|
||||
// Find an event matching this unit; discard stale events from other units
|
||||
const matchIdx = s.pendingAgentEndQueue.findIndex(
|
||||
(e) => !e.unitId || e.unitId === unitId,
|
||||
);
|
||||
if (matchIdx >= 0) {
|
||||
// Discard any stale events before the match
|
||||
if (matchIdx > 0) {
|
||||
debugLog("runUnit", {
|
||||
phase: "discarded-stale-events",
|
||||
count: matchIdx,
|
||||
unitType,
|
||||
unitId,
|
||||
});
|
||||
}
|
||||
const queued = s.pendingAgentEndQueue.splice(0, matchIdx + 1).pop()!;
|
||||
debugLog("runUnit", {
|
||||
phase: "drained-queued-event",
|
||||
unitType,
|
||||
unitId,
|
||||
queueRemaining: s.pendingAgentEndQueue.length,
|
||||
});
|
||||
return { status: "completed", event: queued };
|
||||
}
|
||||
// No matching event — discard all stale events and proceed to new session
|
||||
debugLog("runUnit", {
|
||||
phase: "discarded-all-stale-events",
|
||||
count: s.pendingAgentEndQueue.length,
|
||||
unitType,
|
||||
unitId,
|
||||
});
|
||||
s.pendingAgentEndQueue = [];
|
||||
}
|
||||
|
||||
// ── Session creation with timeout ──
|
||||
debugLog("runUnit", { phase: "session-create", unitType, unitId });
|
||||
|
||||
let sessionResult: { cancelled: boolean };
|
||||
let sessionTimeoutHandle: ReturnType<typeof setTimeout> | undefined;
|
||||
s.sessionSwitchInFlight = true;
|
||||
_sessionSwitchInFlight = true;
|
||||
try {
|
||||
const sessionPromise = s.cmdCtx!.newSession().finally(() => {
|
||||
s.sessionSwitchInFlight = false;
|
||||
_sessionSwitchInFlight = false;
|
||||
});
|
||||
const timeoutPromise = new Promise<{ cancelled: true }>((resolve) => {
|
||||
sessionTimeoutHandle = setTimeout(
|
||||
|
|
@ -253,11 +189,12 @@ export async function runUnit(
|
|||
return { status: "cancelled" };
|
||||
}
|
||||
|
||||
// ── Create the agent_end promise (session-scoped) ──
|
||||
// ── Create the agent_end promise (per-unit one-shot) ──
|
||||
// This happens after newSession completes so session-switch agent_end events
|
||||
// from the previous session cannot resolve the new unit.
|
||||
_sessionSwitchInFlight = false;
|
||||
const unitPromise = new Promise<UnitResult>((resolve) => {
|
||||
s.pendingResolve = resolve;
|
||||
_currentResolve = resolve;
|
||||
});
|
||||
|
||||
// Ensure cwd matches basePath before dispatch (#1389).
|
||||
|
|
@ -569,7 +506,6 @@ export async function autoLoop(
|
|||
deps: LoopDeps,
|
||||
): Promise<void> {
|
||||
debugLog("autoLoop", { phase: "enter" });
|
||||
_activeSession = s;
|
||||
let iteration = 0;
|
||||
let lastDerivedUnit = "";
|
||||
let sameUnitCount = 0;
|
||||
|
|
@ -1759,6 +1695,6 @@ export async function autoLoop(
|
|||
}
|
||||
}
|
||||
|
||||
_activeSession = null;
|
||||
_currentResolve = null;
|
||||
debugLog("autoLoop", { phase: "exit", totalIterations: iteration });
|
||||
}
|
||||
|
|
|
|||
|
|
@ -137,27 +137,8 @@ export class AutoSession {
|
|||
sigtermHandler: (() => void) | null = null;
|
||||
|
||||
// ── Loop promise state ──────────────────────────────────────────────────
|
||||
/**
|
||||
* True only while runUnit is rotating into a fresh session. agent_end events
|
||||
* emitted from the previous session's abort during this window must be
|
||||
* ignored; they do not belong to the new unit.
|
||||
*/
|
||||
sessionSwitchInFlight = false;
|
||||
|
||||
/**
|
||||
* One-shot resolver for the current unit's agent_end promise.
|
||||
* Non-null only while a unit is in-flight (between sendMessage and agent_end).
|
||||
* Scoped to the session to prevent concurrent session corruption.
|
||||
*/
|
||||
pendingResolve: ((result: { status: "completed" | "cancelled" | "error"; event?: { messages: unknown[] } }) => void) | null = null;
|
||||
|
||||
/**
|
||||
* Queue for agent_end events that arrive when no pendingResolve exists.
|
||||
* This happens when error-recovery sendMessage retries produce agent_end
|
||||
* events between loop iterations. The next runUnit drains this queue
|
||||
* instead of waiting for a new event.
|
||||
*/
|
||||
pendingAgentEndQueue: Array<{ messages: unknown[]; unitId?: string }> = [];
|
||||
// Per-unit resolve function and session-switch guard live at module level
|
||||
// in auto-loop.ts (_currentResolve, _sessionSwitchInFlight).
|
||||
|
||||
// ── Methods ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
|
@ -236,10 +217,7 @@ export class AutoSession {
|
|||
// Signal handler
|
||||
this.sigtermHandler = null;
|
||||
|
||||
// Loop promise state
|
||||
this.sessionSwitchInFlight = false;
|
||||
this.pendingResolve = null;
|
||||
this.pendingAgentEndQueue = [];
|
||||
// Loop promise state lives in auto-loop.ts module scope
|
||||
}
|
||||
|
||||
toJSON(): Record<string, unknown> {
|
||||
|
|
|
|||
|
|
@ -1,9 +1,9 @@
|
|||
/**
|
||||
* agent-end-retry.test.ts — Regression checks for the post-#1419 agent_end model.
|
||||
* agent-end-retry.test.ts — Regression checks for the agent_end model.
|
||||
*
|
||||
* The old recursive handleAgentEnd retry path is gone. The loop now keeps
|
||||
* pendingResolve + pendingAgentEndQueue on AutoSession, and handleAgentEnd is
|
||||
* only a thin compatibility wrapper around resolveAgentEnd().
|
||||
* The per-unit one-shot resolve function lives at module level in auto-loop.ts
|
||||
* (_currentResolve). handleAgentEnd is a thin compatibility wrapper around
|
||||
* resolveAgentEnd().
|
||||
*/
|
||||
|
||||
import test from "node:test";
|
||||
|
|
@ -14,40 +14,43 @@ import { fileURLToPath } from "node:url";
|
|||
|
||||
const __dirname = dirname(fileURLToPath(import.meta.url));
|
||||
const AUTO_TS_PATH = join(__dirname, "..", "auto.ts");
|
||||
const AUTO_LOOP_TS_PATH = join(__dirname, "..", "auto-loop.ts");
|
||||
const SESSION_TS_PATH = join(__dirname, "..", "auto", "session.ts");
|
||||
|
||||
function getAutoTsSource(): string {
|
||||
return readFileSync(AUTO_TS_PATH, "utf-8");
|
||||
}
|
||||
|
||||
function getAutoLoopTsSource(): string {
|
||||
return readFileSync(AUTO_LOOP_TS_PATH, "utf-8");
|
||||
}
|
||||
|
||||
function getSessionTsSource(): string {
|
||||
return readFileSync(SESSION_TS_PATH, "utf-8");
|
||||
}
|
||||
|
||||
test("AutoSession declares pending agent_end queue state", () => {
|
||||
const source = getSessionTsSource();
|
||||
test("auto-loop.ts declares _currentResolve for per-unit one-shot promises", () => {
|
||||
const source = getAutoLoopTsSource();
|
||||
assert.ok(
|
||||
source.includes("pendingResolve"),
|
||||
"AutoSession must declare pendingResolve for the in-flight unit promise",
|
||||
source.includes("_currentResolve"),
|
||||
"auto-loop.ts must declare _currentResolve for the per-unit resolve function",
|
||||
);
|
||||
assert.ok(
|
||||
source.includes("pendingAgentEndQueue"),
|
||||
"AutoSession must declare pendingAgentEndQueue for between-iteration agent_end events",
|
||||
source.includes("_sessionSwitchInFlight"),
|
||||
"auto-loop.ts must declare _sessionSwitchInFlight guard",
|
||||
);
|
||||
});
|
||||
|
||||
test("AutoSession reset clears pending agent_end queue state", () => {
|
||||
test("AutoSession no longer holds promise state (moved to auto-loop.ts module scope)", () => {
|
||||
const source = getSessionTsSource();
|
||||
const resetIdx = source.indexOf("reset(): void");
|
||||
assert.ok(resetIdx > -1, "AutoSession must have a reset() method");
|
||||
const resetBlock = source.slice(resetIdx, resetIdx + 4000);
|
||||
// Properties should NOT exist as class fields
|
||||
assert.ok(
|
||||
resetBlock.includes("this.pendingResolve = null"),
|
||||
"reset() must clear pendingResolve",
|
||||
!source.includes("pendingResolve:"),
|
||||
"AutoSession must not declare pendingResolve (moved to auto-loop.ts)",
|
||||
);
|
||||
assert.ok(
|
||||
resetBlock.includes("this.pendingAgentEndQueue = []"),
|
||||
"reset() must clear pendingAgentEndQueue",
|
||||
!source.includes("pendingAgentEndQueue:"),
|
||||
"AutoSession must not declare pendingAgentEndQueue (removed — events are dropped)",
|
||||
);
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -37,9 +37,6 @@ function makeMockSession(opts?: {
|
|||
const session = {
|
||||
active: true,
|
||||
verbose: false,
|
||||
sessionSwitchInFlight: false,
|
||||
pendingResolve: null,
|
||||
pendingAgentEndQueue: [],
|
||||
cmdCtx: {
|
||||
newSession: () => {
|
||||
opts?.onNewSessionStart?.(session);
|
||||
|
|
@ -96,7 +93,6 @@ test("resolveAgentEnd resolves a pending runUnit promise", async () => {
|
|||
const ctx = makeMockCtx();
|
||||
const pi = makeMockPi();
|
||||
const s = makeMockSession();
|
||||
_setActiveSession(s);
|
||||
const event = makeEvent();
|
||||
|
||||
// Start runUnit — it will create the promise and send a message,
|
||||
|
|
@ -122,25 +118,21 @@ test("resolveAgentEnd resolves a pending runUnit promise", async () => {
|
|||
assert.deepEqual(result.event, event);
|
||||
});
|
||||
|
||||
test("resolveAgentEnd queues event when no promise is pending", () => {
|
||||
test("resolveAgentEnd drops event when no promise is pending", () => {
|
||||
_resetPendingResolve();
|
||||
const s = makeMockSession();
|
||||
_setActiveSession(s);
|
||||
|
||||
// Should not throw — queues the event for the next runUnit
|
||||
// Should not throw — event is dropped (logged as warning)
|
||||
assert.doesNotThrow(() => {
|
||||
resolveAgentEnd(makeEvent());
|
||||
});
|
||||
assert.equal(s.pendingAgentEndQueue.length, 1, "event should be queued");
|
||||
});
|
||||
|
||||
test("double resolveAgentEnd only resolves once (second is queued)", async () => {
|
||||
test("double resolveAgentEnd only resolves once (second is dropped)", async () => {
|
||||
_resetPendingResolve();
|
||||
|
||||
const ctx = makeMockCtx();
|
||||
const pi = makeMockPi();
|
||||
const s = makeMockSession();
|
||||
_setActiveSession(s);
|
||||
const event1 = makeEvent([{ id: 1 }]);
|
||||
const event2 = makeEvent([{ id: 2 }]);
|
||||
|
||||
|
|
@ -151,15 +143,10 @@ test("double resolveAgentEnd only resolves once (second is queued)", async () =>
|
|||
// First resolve — should work
|
||||
resolveAgentEnd(event1);
|
||||
|
||||
// Second resolve — should be queued (no pending promise)
|
||||
// Second resolve — should be dropped (no pending resolver)
|
||||
assert.doesNotThrow(() => {
|
||||
resolveAgentEnd(event2);
|
||||
});
|
||||
assert.equal(
|
||||
s.pendingAgentEndQueue.length,
|
||||
1,
|
||||
"second event should be queued",
|
||||
);
|
||||
|
||||
const result = await resultPromise;
|
||||
assert.equal(result.status, "completed");
|
||||
|
|
@ -211,29 +198,25 @@ test("runUnit returns cancelled when s.active is false before sendMessage", asyn
|
|||
assert.equal(pi.calls.length, 0);
|
||||
});
|
||||
|
||||
test("runUnit only arms pendingResolve after newSession completes", async () => {
|
||||
test("runUnit only arms resolve after newSession completes", async () => {
|
||||
_resetPendingResolve();
|
||||
|
||||
let sawSwitchFlag = false;
|
||||
let sawPendingResolve: unknown = "unset";
|
||||
|
||||
const ctx = makeMockCtx();
|
||||
const pi = makeMockPi();
|
||||
const s = makeMockSession({
|
||||
newSessionDelayMs: 20,
|
||||
onNewSessionStart: (session) => {
|
||||
sawSwitchFlag = session.sessionSwitchInFlight;
|
||||
sawPendingResolve = session.pendingResolve;
|
||||
onNewSessionStart: () => {
|
||||
sawSwitchFlag = isSessionSwitchInFlight();
|
||||
},
|
||||
});
|
||||
_setActiveSession(s);
|
||||
|
||||
const resultPromise = runUnit(ctx, pi, s, "task", "T01", "prompt", undefined);
|
||||
|
||||
await new Promise((r) => setTimeout(r, 30));
|
||||
|
||||
assert.equal(sawSwitchFlag, true, "session switch guard should be active during newSession");
|
||||
assert.equal(sawPendingResolve, null, "pendingResolve should not be armed before newSession completes");
|
||||
assert.equal(isSessionSwitchInFlight(), false, "session switch guard should clear after newSession settles");
|
||||
|
||||
resolveAgentEnd(makeEvent());
|
||||
|
|
@ -275,24 +258,23 @@ test("auto-loop.ts contains a while keyword", () => {
|
|||
);
|
||||
});
|
||||
|
||||
test("auto-loop.ts one-shot pattern: pendingResolve is nulled before calling resolver", () => {
|
||||
test("auto-loop.ts one-shot pattern: _currentResolve is nulled before calling resolver", () => {
|
||||
const src = readFileSync(
|
||||
resolve(import.meta.dirname, "..", "auto-loop.ts"),
|
||||
"utf-8",
|
||||
);
|
||||
// The one-shot pattern requires: save ref, null the variable, then call
|
||||
// Look for the pattern: s.pendingResolve = null appearing before r(
|
||||
const resolveBlock = src.slice(
|
||||
src.indexOf("export function resolveAgentEnd"),
|
||||
src.indexOf("export function resolveAgentEnd") + 600,
|
||||
);
|
||||
const nullIdx = resolveBlock.indexOf("pendingResolve = null");
|
||||
const nullIdx = resolveBlock.indexOf("_currentResolve = null");
|
||||
const callIdx = resolveBlock.indexOf("r({");
|
||||
assert.ok(nullIdx > 0, "should null pendingResolve in resolveAgentEnd");
|
||||
assert.ok(nullIdx > 0, "should null _currentResolve in resolveAgentEnd");
|
||||
assert.ok(callIdx > 0, "should call resolver in resolveAgentEnd");
|
||||
assert.ok(
|
||||
nullIdx < callIdx,
|
||||
"pendingResolve should be nulled before calling the resolver (one-shot)",
|
||||
"_currentResolve should be nulled before calling the resolver (one-shot)",
|
||||
);
|
||||
});
|
||||
|
||||
|
|
@ -462,8 +444,6 @@ function makeLoopSession(overrides?: Partial<Record<string, unknown>>) {
|
|||
pendingQuickTasks: [],
|
||||
sidecarQueue: [],
|
||||
autoModeStartModel: null,
|
||||
pendingResolve: null,
|
||||
pendingAgentEndQueue: [],
|
||||
unitDispatchCount: new Map<string, number>(),
|
||||
unitLifetimeDispatches: new Map<string, number>(),
|
||||
unitRecoveryCount: new Map<string, number>(),
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue