fix(agent-core): await event queue in tool hooks for safe parallel execution (#439)
Moves extension tool_call/tool_result interception from wrapToolsWithExtensions (which fires inside the agent loop, bypassing event settlement) to beforeToolCall/afterToolCall hooks that await _agentEventQueue. This ensures extensions always see settled state — including the appended assistant message — even when tools execute in parallel. Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
6d2ff3d4a5
commit
11234b7456
2 changed files with 100 additions and 7 deletions
|
|
@ -22,6 +22,10 @@ import type {
|
|||
AgentMessage,
|
||||
AgentState,
|
||||
AgentTool,
|
||||
BeforeToolCallContext,
|
||||
BeforeToolCallResult,
|
||||
AfterToolCallContext,
|
||||
AfterToolCallResult,
|
||||
StreamFn,
|
||||
ThinkingLevel,
|
||||
} from "./types.js";
|
||||
|
|
@ -129,6 +133,8 @@ export class Agent {
|
|||
private _thinkingBudgets?: ThinkingBudgets;
|
||||
private _transport: Transport;
|
||||
private _maxRetryDelayMs?: number;
|
||||
private _beforeToolCall?: AgentLoopConfig["beforeToolCall"];
|
||||
private _afterToolCall?: AgentLoopConfig["afterToolCall"];
|
||||
|
||||
constructor(opts: AgentOptions = {}) {
|
||||
this._state = { ...this._state, ...opts.initialState };
|
||||
|
|
@ -203,6 +209,22 @@ export class Agent {
|
|||
this._maxRetryDelayMs = value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Install a hook called before each tool executes, after argument validation.
|
||||
* Return `{ block: true }` to prevent execution.
|
||||
*/
|
||||
setBeforeToolCall(fn: AgentLoopConfig["beforeToolCall"]): void {
|
||||
this._beforeToolCall = fn;
|
||||
}
|
||||
|
||||
/**
|
||||
* Install a hook called after each tool executes, before results are emitted.
|
||||
* Return field overrides for content/details/isError.
|
||||
*/
|
||||
setAfterToolCall(fn: AgentLoopConfig["afterToolCall"]): void {
|
||||
this._afterToolCall = fn;
|
||||
}
|
||||
|
||||
get state(): AgentState {
|
||||
return this._state;
|
||||
}
|
||||
|
|
@ -452,6 +474,8 @@ export class Agent {
|
|||
return this.dequeueSteeringMessages();
|
||||
},
|
||||
getFollowUpMessages: async () => this.dequeueFollowUpMessages(),
|
||||
beforeToolCall: this._beforeToolCall,
|
||||
afterToolCall: this._afterToolCall,
|
||||
};
|
||||
|
||||
let partial: AgentMessage | null = null;
|
||||
|
|
|
|||
|
|
@ -67,7 +67,6 @@ import {
|
|||
type TurnEndEvent,
|
||||
type TurnStartEvent,
|
||||
wrapRegisteredTools,
|
||||
wrapToolsWithExtensions,
|
||||
} from "./extensions/index.js";
|
||||
import type { BashExecutionMessage, CustomMessage } from "./messages.js";
|
||||
import { FallbackResolver } from "./fallback-resolver.js";
|
||||
|
|
@ -304,6 +303,11 @@ export class AgentSession {
|
|||
// (session persistence, extensions, auto-compaction, retry logic)
|
||||
this._unsubscribeAgent = this.agent.subscribe(this._handleAgentEvent);
|
||||
|
||||
// Install tool hooks that await the event queue before emitting extension events.
|
||||
// This ensures extensions always see settled state (e.g., assistant message appended)
|
||||
// even when tools execute in parallel.
|
||||
this._installAgentToolHooks();
|
||||
|
||||
this._buildRuntime({
|
||||
activeToolNames: this._initialActiveToolNames,
|
||||
includeAllExtensionTools: true,
|
||||
|
|
@ -477,6 +481,73 @@ export class AgentSession {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Install beforeToolCall/afterToolCall hooks on the Agent.
|
||||
*
|
||||
* These hooks await `_agentEventQueue` before emitting extension events,
|
||||
* ensuring that all prior events (including `message_end` which appends
|
||||
* the assistant message) have fully settled. This prevents a race condition
|
||||
* in parallel tool execution where extension `tool_call` handlers could
|
||||
* see stale agent state.
|
||||
*/
|
||||
private _installAgentToolHooks(): void {
|
||||
this.agent.setBeforeToolCall(async ({ toolCall, args }) => {
|
||||
// Wait for all queued agent events to settle before emitting to extensions
|
||||
await this._agentEventQueue;
|
||||
|
||||
if (!this._extensionRunner?.hasHandlers("tool_call")) return undefined;
|
||||
|
||||
try {
|
||||
const callResult = await this._extensionRunner.emitToolCall({
|
||||
type: "tool_call",
|
||||
toolName: toolCall.name,
|
||||
toolCallId: toolCall.id,
|
||||
input: args as Record<string, unknown>,
|
||||
});
|
||||
|
||||
if (callResult?.block) {
|
||||
return {
|
||||
block: true,
|
||||
reason: callResult.reason || "Tool execution was blocked by an extension",
|
||||
};
|
||||
}
|
||||
} catch (err) {
|
||||
if (err instanceof Error) {
|
||||
return { block: true, reason: err.message };
|
||||
}
|
||||
return { block: true, reason: `Extension failed, blocking execution: ${String(err)}` };
|
||||
}
|
||||
|
||||
return undefined;
|
||||
});
|
||||
|
||||
this.agent.setAfterToolCall(async ({ toolCall, args, result, isError }) => {
|
||||
// Wait for all queued agent events to settle
|
||||
await this._agentEventQueue;
|
||||
|
||||
if (!this._extensionRunner?.hasHandlers("tool_result")) return undefined;
|
||||
|
||||
const resultResult = await this._extensionRunner.emitToolResult({
|
||||
type: "tool_result",
|
||||
toolName: toolCall.name,
|
||||
toolCallId: toolCall.id,
|
||||
input: args as Record<string, unknown>,
|
||||
content: result.content,
|
||||
details: result.details,
|
||||
isError,
|
||||
});
|
||||
|
||||
if (resultResult) {
|
||||
return {
|
||||
content: resultResult.content ?? undefined,
|
||||
details: resultResult.details ?? undefined,
|
||||
};
|
||||
}
|
||||
|
||||
return undefined;
|
||||
});
|
||||
}
|
||||
|
||||
/** Extract text content from a message */
|
||||
private _getUserMessageText(message: Message): string {
|
||||
if (message.role !== "user") return "";
|
||||
|
|
@ -2181,12 +2252,10 @@ export class AgentSession {
|
|||
toolRegistry.set(tool.name, tool);
|
||||
}
|
||||
|
||||
if (this._extensionRunner) {
|
||||
const wrappedAllTools = wrapToolsWithExtensions(Array.from(toolRegistry.values()), this._extensionRunner);
|
||||
this._toolRegistry = new Map(wrappedAllTools.map((tool) => [tool.name, tool]));
|
||||
} else {
|
||||
this._toolRegistry = toolRegistry;
|
||||
}
|
||||
// Tool interception (tool_call/tool_result extension events) is handled by
|
||||
// beforeToolCall/afterToolCall hooks installed in _installAgentToolHooks(),
|
||||
// which await _agentEventQueue for safe parallel execution.
|
||||
this._toolRegistry = toolRegistry;
|
||||
|
||||
const nextActiveToolNames = options?.activeToolNames
|
||||
? [...options.activeToolNames]
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue