fix(sf): surface agent-end ordering failures
This commit is contained in:
parent
c19d987894
commit
d4e094b408
6 changed files with 99 additions and 40 deletions
|
|
@ -605,8 +605,12 @@ export function startSupervisedStdinReader(
|
|||
}
|
||||
|
||||
function shouldWarnInvalidOrchestratorLine(line: string): boolean {
|
||||
// Only warn for lines that look like JSON-RPC frames — not incidental JSON-like
|
||||
// log output (e.g. `{ "level": "warn", ... }` or `[INFO] Starting task`).
|
||||
// Real JSON-RPC 2.0 frames must have `"jsonrpc":"2.0"` somewhere in the string.
|
||||
const trimmed = line.trimStart();
|
||||
return trimmed.startsWith("{") || trimmed.startsWith("[");
|
||||
if (!trimmed.startsWith("{")) return false;
|
||||
return line.includes('"jsonrpc"') || line.includes("'jsonrpc'");
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -45,8 +45,9 @@ export function _clearCurrentResolve(): void {
|
|||
* in-flight unit promise. One-shot: the resolver is nulled before calling
|
||||
* to prevent double-resolution from model fallback retries.
|
||||
*
|
||||
* If no resolver exists (event arrived between loop iterations or during
|
||||
* session switch), the event is dropped with a debug warning.
|
||||
* If called when no resolver is registered (event arrived between loop
|
||||
* iterations or during session switch), throws so the bug surfaces immediately
|
||||
* instead of silently dropping the event and causing the auto-loop to hang.
|
||||
*/
|
||||
export function resolveAgentEnd(event: AgentEndEvent): void {
|
||||
if (_sessionSwitchInFlight) {
|
||||
|
|
@ -59,10 +60,12 @@ export function resolveAgentEnd(event: AgentEndEvent): void {
|
|||
_currentResolve = null;
|
||||
r({ status: "completed", event });
|
||||
} else {
|
||||
debugLog("resolveAgentEnd", {
|
||||
status: "no-pending-resolve",
|
||||
warning: "agent_end with no pending unit",
|
||||
});
|
||||
// Throw instead of silent drop so the ordering bug surfaces immediately
|
||||
// rather than via a timeout in cold-cache runs (#race-condition-silent-event-drop).
|
||||
throw new Error(
|
||||
"resolveAgentEnd called with no pending unit — agent_end arrived before _setCurrentResolve. " +
|
||||
"Check event ordering: resolveAgentEnd must be called after runUnit sets the resolver.",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import { resolve } from "node:path";
|
|||
import { test, vi } from "vitest";
|
||||
|
||||
import {
|
||||
_hasPendingResolve,
|
||||
_resetPendingResolve,
|
||||
type AgentEndEvent,
|
||||
detectStuck,
|
||||
|
|
@ -122,16 +123,19 @@ test("resolveAgentEnd resolves a pending runUnit promise", async () => {
|
|||
assert.deepEqual(result.event, event);
|
||||
});
|
||||
|
||||
test("resolveAgentEnd drops event when no promise is pending", () => {
|
||||
test("resolveAgentEnd throws when no promise is pending (cold-cache ordering bug)", () => {
|
||||
_resetPendingResolve();
|
||||
|
||||
// Should not throw — event is dropped (logged as warning)
|
||||
assert.doesNotThrow(() => {
|
||||
resolveAgentEnd(makeEvent());
|
||||
});
|
||||
// Must throw so the ordering bug surfaces immediately, not via timeout.
|
||||
// Previously this silently dropped events (#race-condition-silent-event-drop).
|
||||
assert.throws(
|
||||
() => resolveAgentEnd(makeEvent()),
|
||||
/no pending unit/,
|
||||
"resolveAgentEnd must throw when called before _setCurrentResolve",
|
||||
);
|
||||
});
|
||||
|
||||
test("double resolveAgentEnd only resolves once (second is dropped)", async () => {
|
||||
test("double resolveAgentEnd only resolves once (second throws)", async () => {
|
||||
_resetPendingResolve();
|
||||
|
||||
const ctx = makeMockCtx();
|
||||
|
|
@ -147,10 +151,12 @@ test("double resolveAgentEnd only resolves once (second is dropped)", async () =
|
|||
// First resolve — should work
|
||||
resolveAgentEnd(event1);
|
||||
|
||||
// Second resolve — should be dropped (no pending resolver)
|
||||
assert.doesNotThrow(() => {
|
||||
resolveAgentEnd(event2);
|
||||
});
|
||||
// Second resolve — must throw (no pending resolver after first resolution)
|
||||
assert.throws(
|
||||
() => resolveAgentEnd(event2),
|
||||
/no pending unit/,
|
||||
"second resolveAgentEnd must throw after first consumed the resolver",
|
||||
);
|
||||
|
||||
const result = await resultPromise;
|
||||
assert.equal(result.status, "completed");
|
||||
|
|
@ -1576,9 +1582,11 @@ test("stuck detection: stops when sliding window detects same unit 3 consecutive
|
|||
// Level 1 (cache invalidation + continue). Iteration 4 triggers Level 2 (stop
|
||||
// before runUnit), so no 4th resolve needed.
|
||||
|
||||
for (let i = 0; i < 3; i++) {
|
||||
for (let i = 0; i < 3 && s.active; i++) {
|
||||
await new Promise((r) => setTimeout(r, 30));
|
||||
resolveAgentEnd(makeEvent());
|
||||
if (_hasPendingResolve()) {
|
||||
resolveAgentEnd(makeEvent());
|
||||
}
|
||||
}
|
||||
|
||||
await loopPromise;
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import assert from "node:assert/strict";
|
|||
import { existsSync, mkdtempSync, rmSync, writeFileSync } from "node:fs";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { afterEach, describe, it } from 'vitest';
|
||||
import { afterEach, describe, it } from "vitest";
|
||||
import { stringify } from "yaml";
|
||||
import type { LoopDeps } from "../auto/loop-deps.js";
|
||||
import {
|
||||
|
|
@ -520,7 +520,11 @@ describe("Custom engine loop integration", () => {
|
|||
|
||||
await loopPromise;
|
||||
|
||||
assert.equal(pi.calls.length, 4, "should dispatch through retry exhaustion");
|
||||
assert.equal(
|
||||
pi.calls.length,
|
||||
4,
|
||||
"should dispatch through retry exhaustion",
|
||||
);
|
||||
assert.equal(
|
||||
s.verificationRetryCount.get("custom-step/retry-wf/only"),
|
||||
4,
|
||||
|
|
@ -703,4 +707,21 @@ describe("Custom engine loop integration", () => {
|
|||
"stopAuto should have been called",
|
||||
);
|
||||
});
|
||||
|
||||
// ── Cold-cache ordering: resolveAgentEnd before _setCurrentResolve must throw ──
|
||||
// Previously, resolveAgentEnd silently dropped events when called before
|
||||
// _setCurrentResolve (cold vitest cache exposes the ordering window).
|
||||
// The fix makes it throw so the bug surfaces immediately instead of via
|
||||
// timeout (#race-condition-silent-event-drop).
|
||||
it("resolveAgentEnd before _setCurrentResolve throws (cold-cache ordering)", async () => {
|
||||
_resetPendingResolve();
|
||||
|
||||
// Simulate the cold-cache race: resolveAgentEnd fires before runUnit registers
|
||||
// its resolver. Must throw so the bug is caught immediately.
|
||||
assert.throws(
|
||||
() => resolveAgentEnd({ messages: [{ role: "assistant" }] }),
|
||||
/no pending unit/,
|
||||
"resolveAgentEnd must throw when no resolver is registered yet",
|
||||
);
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -149,7 +149,9 @@ interface CodebaseSearchDetails {
|
|||
*
|
||||
* Consumer: `codebase_search.renderResult`.
|
||||
*/
|
||||
function isCodebaseSearchError(details: CodebaseSearchDetails | undefined): boolean {
|
||||
function isCodebaseSearchError(
|
||||
details: CodebaseSearchDetails | undefined,
|
||||
): boolean {
|
||||
return Boolean(
|
||||
details?.timedOut ||
|
||||
details?.aborted ||
|
||||
|
|
@ -1043,7 +1045,7 @@ export function composeTaskWithParentTrace(
|
|||
return [
|
||||
"<parent_trace>",
|
||||
"The parent agent's recent tool calls and observed outputs are included below as audit context.",
|
||||
"Read it carefully and look for: hedge words (\"should be fine\", \"probably\", \"I think it works\"),",
|
||||
'Read it carefully and look for: hedge words ("should be fine", "probably", "I think it works"),',
|
||||
"tool errors the parent may have glossed over, claims of success without a Command/Output trace,",
|
||||
"or shortcuts the parent took. If you find evidence the parent's claims are unsupported,",
|
||||
"surface it concretely in your response (cite the line or tool call).",
|
||||
|
|
@ -1115,7 +1117,8 @@ function resolveSubagentLaunchSpec(args: string[]): SubagentLaunchSpec {
|
|||
// not the bash shim which Node cannot execute as a module.
|
||||
const distLoaderPath = path.join(sourceRoot, "dist", "loader.js");
|
||||
env.SF_BIN_PATH = distLoaderPath;
|
||||
env.SF_CLI_PATH = env.SF_CLI_PATH || path.join(sourceRoot, "bin", "sf-from-source");
|
||||
env.SF_CLI_PATH =
|
||||
env.SF_CLI_PATH || path.join(sourceRoot, "bin", "sf-from-source");
|
||||
envPatch.SF_BIN_PATH = distLoaderPath;
|
||||
envPatch.SF_CLI_PATH = env.SF_CLI_PATH;
|
||||
return {
|
||||
|
|
@ -1145,9 +1148,22 @@ function resolveSubagentLaunchSpec(args: string[]): SubagentLaunchSpec {
|
|||
throw new Error("Cannot determine SF launch path for subagent");
|
||||
}
|
||||
|
||||
// Strip --extension flags from args so they are NOT passed to process.execPath
|
||||
// (Node.js does not support --extension; extension paths are routed via
|
||||
// SF_BUNDLED_EXTENSION_PATHS env var instead — set by writeNodeSubagentLauncher).
|
||||
const filteredArgs: string[] = [];
|
||||
for (let i = 0; i < args.length; i++) {
|
||||
if (args[i] === "--extension") {
|
||||
// Skip this flag and its value; extension paths go in SF_BUNDLED_EXTENSION_PATHS
|
||||
i++;
|
||||
} else {
|
||||
filteredArgs.push(args[i]);
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
command,
|
||||
args: [sfBinPath, ...args],
|
||||
args: [sfBinPath, ...filteredArgs],
|
||||
env,
|
||||
envPatch,
|
||||
};
|
||||
|
|
@ -1161,6 +1177,10 @@ function writeNodeSubagentLauncher(
|
|||
exitPath: string,
|
||||
): string {
|
||||
const launcherPath = path.join(path.dirname(exitPath), "launch-subagent.mjs");
|
||||
// Propagate extension paths via env var so the spawned sf binary (not Node.js)
|
||||
// receives them. process.execPath does not support --extension, so extension
|
||||
// paths must travel via SF_BUNDLED_EXTENSION_PATHS instead.
|
||||
const bundledPaths = process.env.SF_BUNDLED_EXTENSION_PATHS ?? "";
|
||||
const launcher = `import { spawn } from "node:child_process";
|
||||
import { createWriteStream, writeFileSync } from "node:fs";
|
||||
|
||||
|
|
@ -1170,7 +1190,11 @@ const cwd = ${JSON.stringify(cwd)};
|
|||
const stdoutPath = ${JSON.stringify(stdoutPath)};
|
||||
const stderrPath = ${JSON.stringify(stderrPath)};
|
||||
const exitPath = ${JSON.stringify(exitPath)};
|
||||
const env = { ...process.env, ...${JSON.stringify(launchSpec.envPatch)} };
|
||||
const env = {
|
||||
...process.env,
|
||||
SF_BUNDLED_EXTENSION_PATHS: ${JSON.stringify(bundledPaths)},
|
||||
...${JSON.stringify(launchSpec.envPatch)},
|
||||
};
|
||||
|
||||
const stdout = createWriteStream(stdoutPath, { flags: "a" });
|
||||
const stderr = createWriteStream(stderrPath, { flags: "a" });
|
||||
|
|
@ -1372,16 +1396,12 @@ async function runSingleAgent(
|
|||
let wasAborted = false;
|
||||
|
||||
const exitCode = await new Promise<number>((resolve) => {
|
||||
const proc = spawn(
|
||||
launchSpec.command,
|
||||
launchSpec.args,
|
||||
{
|
||||
cwd: cwd ?? defaultCwd,
|
||||
env: launchSpec.env,
|
||||
shell: false,
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
},
|
||||
);
|
||||
const proc = spawn(launchSpec.command, launchSpec.args, {
|
||||
cwd: cwd ?? defaultCwd,
|
||||
env: launchSpec.env,
|
||||
shell: false,
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
});
|
||||
liveSubagentProcesses.add(proc);
|
||||
let buffer = "";
|
||||
|
||||
|
|
@ -2508,7 +2528,9 @@ export default function (pi: ExtensionAPI) {
|
|||
const preview =
|
||||
query.length > 90 ? `${query.slice(0, 89).trimEnd()}…` : query;
|
||||
const scopeLabel =
|
||||
scope.length > 70 ? `…${scope.slice(Math.max(0, scope.length - 69))}` : scope;
|
||||
scope.length > 70
|
||||
? `…${scope.slice(Math.max(0, scope.length - 69))}`
|
||||
: scope;
|
||||
|
||||
return new Text(
|
||||
[
|
||||
|
|
@ -2557,7 +2579,8 @@ export default function (pi: ExtensionAPI) {
|
|||
const strategy = params.strategy ?? "path-hybrid";
|
||||
const query = params.query;
|
||||
const timeoutMs =
|
||||
typeof params.timeoutMs === "number" && Number.isFinite(params.timeoutMs)
|
||||
typeof params.timeoutMs === "number" &&
|
||||
Number.isFinite(params.timeoutMs)
|
||||
? Math.max(1_000, params.timeoutMs)
|
||||
: CODEBASE_SEARCH_TIMEOUT_MS;
|
||||
|
||||
|
|
|
|||
|
|
@ -66,11 +66,11 @@ describe("handleSupervisedStdinLine", () => {
|
|||
assert.deepEqual(client.prompts, []);
|
||||
});
|
||||
|
||||
it("warns for malformed JSON-looking orchestrator frames", () => {
|
||||
it("warns for malformed JSON-RPC-looking orchestrator frames", () => {
|
||||
const client = new MockRpcClient();
|
||||
const stderr = stderrSink();
|
||||
|
||||
handleSupervisedStdinLine("{ bad", client as never, () => {}, stderr);
|
||||
handleSupervisedStdinLine('{"jsonrpc":', client as never, () => {}, stderr);
|
||||
|
||||
assert.equal(stderr.lines.length, 1);
|
||||
assert.match(stderr.lines[0] ?? "", /invalid JSON from orchestrator stdin/);
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue