From 11234b74563227a384653bd133f16f61d84e2e3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?T=C3=82CHES?= Date: Sat, 14 Mar 2026 21:16:17 -0600 Subject: [PATCH] fix(agent-core): await event queue in tool hooks for safe parallel execution (#439) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Moves extension tool_call/tool_result interception from wrapToolsWithExtensions (which fires inside the agent loop, bypassing event settlement) to beforeToolCall/afterToolCall hooks that await _agentEventQueue. This ensures extensions always see settled state — including the appended assistant message — even when tools execute in parallel. Co-authored-by: Claude Opus 4.6 (1M context) --- packages/pi-agent-core/src/agent.ts | 24 ++++++ .../pi-coding-agent/src/core/agent-session.ts | 83 +++++++++++++++++-- 2 files changed, 100 insertions(+), 7 deletions(-) diff --git a/packages/pi-agent-core/src/agent.ts b/packages/pi-agent-core/src/agent.ts index 5ce47971a..4b9711be9 100644 --- a/packages/pi-agent-core/src/agent.ts +++ b/packages/pi-agent-core/src/agent.ts @@ -22,6 +22,10 @@ import type { AgentMessage, AgentState, AgentTool, + BeforeToolCallContext, + BeforeToolCallResult, + AfterToolCallContext, + AfterToolCallResult, StreamFn, ThinkingLevel, } from "./types.js"; @@ -129,6 +133,8 @@ export class Agent { private _thinkingBudgets?: ThinkingBudgets; private _transport: Transport; private _maxRetryDelayMs?: number; + private _beforeToolCall?: AgentLoopConfig["beforeToolCall"]; + private _afterToolCall?: AgentLoopConfig["afterToolCall"]; constructor(opts: AgentOptions = {}) { this._state = { ...this._state, ...opts.initialState }; @@ -203,6 +209,22 @@ export class Agent { this._maxRetryDelayMs = value; } + /** + * Install a hook called before each tool executes, after argument validation. + * Return `{ block: true }` to prevent execution. + */ + setBeforeToolCall(fn: AgentLoopConfig["beforeToolCall"]): void { + this._beforeToolCall = fn; + } + + /** + * Install a hook called after each tool executes, before results are emitted. + * Return field overrides for content/details/isError. + */ + setAfterToolCall(fn: AgentLoopConfig["afterToolCall"]): void { + this._afterToolCall = fn; + } + get state(): AgentState { return this._state; } @@ -452,6 +474,8 @@ export class Agent { return this.dequeueSteeringMessages(); }, getFollowUpMessages: async () => this.dequeueFollowUpMessages(), + beforeToolCall: this._beforeToolCall, + afterToolCall: this._afterToolCall, }; let partial: AgentMessage | null = null; diff --git a/packages/pi-coding-agent/src/core/agent-session.ts b/packages/pi-coding-agent/src/core/agent-session.ts index 510f79988..6fc9a9853 100644 --- a/packages/pi-coding-agent/src/core/agent-session.ts +++ b/packages/pi-coding-agent/src/core/agent-session.ts @@ -67,7 +67,6 @@ import { type TurnEndEvent, type TurnStartEvent, wrapRegisteredTools, - wrapToolsWithExtensions, } from "./extensions/index.js"; import type { BashExecutionMessage, CustomMessage } from "./messages.js"; import { FallbackResolver } from "./fallback-resolver.js"; @@ -304,6 +303,11 @@ export class AgentSession { // (session persistence, extensions, auto-compaction, retry logic) this._unsubscribeAgent = this.agent.subscribe(this._handleAgentEvent); + // Install tool hooks that await the event queue before emitting extension events. + // This ensures extensions always see settled state (e.g., assistant message appended) + // even when tools execute in parallel. + this._installAgentToolHooks(); + this._buildRuntime({ activeToolNames: this._initialActiveToolNames, includeAllExtensionTools: true, @@ -477,6 +481,73 @@ export class AgentSession { } } + /** + * Install beforeToolCall/afterToolCall hooks on the Agent. + * + * These hooks await `_agentEventQueue` before emitting extension events, + * ensuring that all prior events (including `message_end` which appends + * the assistant message) have fully settled. This prevents a race condition + * in parallel tool execution where extension `tool_call` handlers could + * see stale agent state. + */ + private _installAgentToolHooks(): void { + this.agent.setBeforeToolCall(async ({ toolCall, args }) => { + // Wait for all queued agent events to settle before emitting to extensions + await this._agentEventQueue; + + if (!this._extensionRunner?.hasHandlers("tool_call")) return undefined; + + try { + const callResult = await this._extensionRunner.emitToolCall({ + type: "tool_call", + toolName: toolCall.name, + toolCallId: toolCall.id, + input: args as Record, + }); + + if (callResult?.block) { + return { + block: true, + reason: callResult.reason || "Tool execution was blocked by an extension", + }; + } + } catch (err) { + if (err instanceof Error) { + return { block: true, reason: err.message }; + } + return { block: true, reason: `Extension failed, blocking execution: ${String(err)}` }; + } + + return undefined; + }); + + this.agent.setAfterToolCall(async ({ toolCall, args, result, isError }) => { + // Wait for all queued agent events to settle + await this._agentEventQueue; + + if (!this._extensionRunner?.hasHandlers("tool_result")) return undefined; + + const resultResult = await this._extensionRunner.emitToolResult({ + type: "tool_result", + toolName: toolCall.name, + toolCallId: toolCall.id, + input: args as Record, + content: result.content, + details: result.details, + isError, + }); + + if (resultResult) { + return { + content: resultResult.content ?? undefined, + details: resultResult.details ?? undefined, + }; + } + + return undefined; + }); + } + /** Extract text content from a message */ private _getUserMessageText(message: Message): string { if (message.role !== "user") return ""; @@ -2181,12 +2252,10 @@ export class AgentSession { toolRegistry.set(tool.name, tool); } - if (this._extensionRunner) { - const wrappedAllTools = wrapToolsWithExtensions(Array.from(toolRegistry.values()), this._extensionRunner); - this._toolRegistry = new Map(wrappedAllTools.map((tool) => [tool.name, tool])); - } else { - this._toolRegistry = toolRegistry; - } + // Tool interception (tool_call/tool_result extension events) is handled by + // beforeToolCall/afterToolCall hooks installed in _installAgentToolHooks(), + // which await _agentEventQueue for safe parallel execution. + this._toolRegistry = toolRegistry; const nextActiveToolNames = options?.activeToolNames ? [...options.activeToolNames]