From 58d729894e078d55a849e45c211bfdb43460962f Mon Sep 17 00:00:00 2001 From: Jeremy Date: Sat, 11 Apr 2026 11:22:54 -0500 Subject: [PATCH] fix(interactive): preserve MCP tool output stream ordering --- .../controllers/chat-controller.ts | 110 +++++++++++++----- 1 file changed, 83 insertions(+), 27 deletions(-) diff --git a/packages/pi-coding-agent/src/modes/interactive/controllers/chat-controller.ts b/packages/pi-coding-agent/src/modes/interactive/controllers/chat-controller.ts index d301acd12..8d72d683d 100644 --- a/packages/pi-coding-agent/src/modes/interactive/controllers/chat-controller.ts +++ b/packages/pi-coding-agent/src/modes/interactive/controllers/chat-controller.ts @@ -9,6 +9,18 @@ import { appKey } from "../components/keybinding-hints.js"; // Tracks the last processed content index to avoid re-scanning all blocks on every message_update let lastProcessedContentIndex = 0; +function hasVisibleAssistantContent(message: { content: Array }): boolean { + return message.content.some( + (c) => + (c.type === "text" && typeof c.text === "string" && c.text.trim().length > 0) + || (c.type === "thinking" && typeof c.thinking === "string" && c.thinking.trim().length > 0), + ); +} + +function hasAssistantToolBlocks(message: { content: Array }): boolean { + return message.content.some((c) => c.type === "toolCall" || c.type === "serverToolUse"); +} + export async function handleAgentEvent(host: InteractiveModeStateHost & { init: () => Promise; getMarkdownThemeWithSettings: () => any; @@ -104,45 +116,55 @@ export async function handleAgentEvent(host: InteractiveModeStateHost & { host.updatePendingMessagesDisplay(); host.ui.requestRender(); } else if (event.message.role === "assistant") { - host.streamingComponent = new AssistantMessageComponent( - undefined, - host.hideThinkingBlock, - host.getMarkdownThemeWithSettings(), - host.settingsManager.getTimestampFormat(), - ); host.streamingMessage = event.message; - host.chatContainer.addChild(host.streamingComponent); - host.streamingComponent.updateContent(host.streamingMessage); + // External-tool providers can stream multiple assistant turns through + // one response. Delay component creation until visible assistant text + // arrives so tool outputs keep chronological ordering. host.ui.requestRender(); } break; case "message_update": - if (host.streamingComponent && event.message.role === "assistant") { + if (event.message.role === "assistant") { host.streamingMessage = event.message; - host.streamingComponent.updateContent(host.streamingMessage); - - // When the stream adapter signals a completed tool call with an - // external result (from Claude Code SDK), update the pending - // ToolExecutionComponent immediately so output is visible in - // real-time instead of waiting for the session to end. const innerEvent = event.assistantMessageEvent; + + if (!host.streamingComponent && hasVisibleAssistantContent(host.streamingMessage)) { + host.streamingComponent = new AssistantMessageComponent( + undefined, + host.hideThinkingBlock, + host.getMarkdownThemeWithSettings(), + host.settingsManager.getTimestampFormat(), + ); + host.chatContainer.addChild(host.streamingComponent); + } + if (host.streamingComponent) { + host.streamingComponent.updateContent(host.streamingMessage); + } + + let externalToolResult: + | { toolCallId: string; content: Array<{ type: string; text?: string; data?: string; mimeType?: string }>; details: Record; isError: boolean } + | undefined; if (innerEvent.type === "toolcall_end" && innerEvent.toolCall) { const tc = innerEvent.toolCall as any; - const externalResult = tc.externalResult; - if (externalResult) { - const component = host.pendingTools.get(tc.id); - if (component) { - component.updateResult({ - content: externalResult.content ?? [{ type: "text", text: "" }], - details: externalResult.details ?? {}, - isError: externalResult.isError ?? false, - }); - } + const ext = tc.externalResult; + if (ext) { + externalToolResult = { + toolCallId: tc.id, + content: ext.content ?? [{ type: "text", text: "" }], + details: ext.details ?? {}, + isError: ext.isError ?? false, + }; } } const contentBlocks = host.streamingMessage.content; + // Some adapters reuse a single assistant lifecycle while internally + // spanning multiple provider turns. When a new turn starts, content + // length can shrink back to 0/1; reset scan index to avoid skipping. + if (lastProcessedContentIndex >= contentBlocks.length) { + lastProcessedContentIndex = 0; + } for (let i = lastProcessedContentIndex; i < contentBlocks.length; i++) { const content = contentBlocks[i]; if (content.type === "toolCall") { @@ -192,6 +214,22 @@ export async function handleAgentEvent(host: InteractiveModeStateHost & { } } } + + // When the stream adapter signals a completed tool call with an + // external result (from Claude Code SDK), update the pending + // ToolExecutionComponent immediately so output is visible in + // real-time instead of waiting for the session to end. + if (externalToolResult) { + const component = host.pendingTools.get(externalToolResult.toolCallId); + if (component) { + component.updateResult({ + content: externalToolResult.content, + details: externalToolResult.details, + isError: externalToolResult.isError, + }); + } + } + // Update index: fully processed blocks won't need re-scanning. // Keep the last block's index (it may still be accumulating data), // so we re-check it next time but skip all earlier ones. @@ -204,7 +242,7 @@ export async function handleAgentEvent(host: InteractiveModeStateHost & { case "message_end": if (event.message.role === "user") break; - if (host.streamingComponent && event.message.role === "assistant") { + if (event.message.role === "assistant") { host.streamingMessage = event.message; let errorMessage: string | undefined; if (host.streamingMessage.stopReason === "aborted") { @@ -214,7 +252,25 @@ export async function handleAgentEvent(host: InteractiveModeStateHost & { : "Operation aborted"; host.streamingMessage.errorMessage = errorMessage; } - host.streamingComponent.updateContent(host.streamingMessage); + + const shouldRenderAssistant = hasVisibleAssistantContent(host.streamingMessage) + || ( + (host.streamingMessage.stopReason === "aborted" || host.streamingMessage.stopReason === "error") + && !hasAssistantToolBlocks(host.streamingMessage) + ); + if (!host.streamingComponent && shouldRenderAssistant) { + host.streamingComponent = new AssistantMessageComponent( + undefined, + host.hideThinkingBlock, + host.getMarkdownThemeWithSettings(), + host.settingsManager.getTimestampFormat(), + ); + host.chatContainer.addChild(host.streamingComponent); + } + if (host.streamingComponent) { + host.streamingComponent.updateContent(host.streamingMessage); + } + if (host.streamingMessage.stopReason === "aborted" || host.streamingMessage.stopReason === "error") { if (!errorMessage) { errorMessage = host.streamingMessage.errorMessage || "Error";