Merge pull request #3992 from jeremymcs/fix/mcp-output-stream-order
fix(interactive): preserve MCP tool output stream ordering
This commit is contained in:
commit
5d48038816
2 changed files with 235 additions and 27 deletions
|
|
@ -0,0 +1,152 @@
|
|||
import assert from "node:assert/strict";
|
||||
import { test } from "node:test";
|
||||
|
||||
import { handleAgentEvent } from "../modes/interactive/controllers/chat-controller.js";
|
||||
|
||||
function makeUsage() {
|
||||
return {
|
||||
input: 0,
|
||||
output: 0,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
totalTokens: 0,
|
||||
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
||||
};
|
||||
}
|
||||
|
||||
function makeAssistant(content: any[]) {
|
||||
return {
|
||||
role: "assistant",
|
||||
content,
|
||||
api: "anthropic-messages",
|
||||
provider: "claude-code",
|
||||
model: "claude-sonnet-4",
|
||||
usage: makeUsage(),
|
||||
stopReason: "stop",
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
}
|
||||
|
||||
function createHost() {
|
||||
const chatContainer = {
|
||||
children: [] as any[],
|
||||
addChild(component: any) {
|
||||
this.children.push(component);
|
||||
},
|
||||
removeChild(component: any) {
|
||||
const idx = this.children.indexOf(component);
|
||||
if (idx !== -1) this.children.splice(idx, 1);
|
||||
},
|
||||
clear() {
|
||||
this.children = [];
|
||||
},
|
||||
};
|
||||
|
||||
const host: any = {
|
||||
isInitialized: true,
|
||||
init: async () => {},
|
||||
defaultEditor: { onEscape: undefined },
|
||||
editor: {},
|
||||
session: { retryAttempt: 0, abortCompaction: () => {}, abortRetry: () => {} },
|
||||
ui: { requestRender: () => {} },
|
||||
footer: { invalidate: () => {} },
|
||||
keybindings: {},
|
||||
statusContainer: { clear: () => {}, addChild: () => {} },
|
||||
chatContainer,
|
||||
settingsManager: { getTimestampFormat: () => "date-time-iso", getShowImages: () => false },
|
||||
pendingTools: new Map(),
|
||||
toolOutputExpanded: false,
|
||||
hideThinkingBlock: false,
|
||||
isBashMode: false,
|
||||
defaultWorkingMessage: "Working...",
|
||||
compactionQueuedMessages: [],
|
||||
editorContainer: {},
|
||||
pendingMessagesContainer: { clear: () => {} },
|
||||
addMessageToChat: () => {},
|
||||
getMarkdownThemeWithSettings: () => ({}),
|
||||
formatWebSearchResult: () => "",
|
||||
getRegisteredToolDefinition: () => undefined,
|
||||
checkShutdownRequested: async () => {},
|
||||
rebuildChatFromMessages: () => {},
|
||||
flushCompactionQueue: async () => {},
|
||||
showStatus: () => {},
|
||||
showError: () => {},
|
||||
updatePendingMessagesDisplay: () => {},
|
||||
updateTerminalTitle: () => {},
|
||||
updateEditorBorderColor: () => {},
|
||||
};
|
||||
|
||||
return host;
|
||||
}
|
||||
|
||||
test("chat-controller keeps tool output ahead of delayed assistant text for external tool streams", async () => {
|
||||
// ToolExecutionComponent uses the global theme singleton.
|
||||
// Install a minimal no-op theme implementation for this unit test.
|
||||
(globalThis as any)[Symbol.for("@gsd/pi-coding-agent:theme")] = {
|
||||
fg: (_key: string, text: string) => text,
|
||||
bg: (_key: string, text: string) => text,
|
||||
bold: (text: string) => text,
|
||||
italic: (text: string) => text,
|
||||
truncate: (text: string) => text,
|
||||
};
|
||||
|
||||
const host = createHost();
|
||||
const toolId = "mcp-tool-1";
|
||||
const toolCall = {
|
||||
type: "toolCall",
|
||||
id: toolId,
|
||||
name: "exec_command",
|
||||
arguments: { cmd: "echo hi" },
|
||||
};
|
||||
|
||||
await handleAgentEvent(host, { type: "message_start", message: makeAssistant([]) } as any);
|
||||
|
||||
assert.equal(host.streamingComponent, undefined, "assistant component should be deferred at message_start");
|
||||
assert.equal(host.chatContainer.children.length, 0, "nothing should render before content arrives");
|
||||
|
||||
await handleAgentEvent(
|
||||
host,
|
||||
{
|
||||
type: "message_update",
|
||||
message: makeAssistant([toolCall]),
|
||||
assistantMessageEvent: {
|
||||
type: "toolcall_end",
|
||||
contentIndex: 0,
|
||||
toolCall: {
|
||||
...toolCall,
|
||||
externalResult: {
|
||||
content: [{ type: "text", text: "tool output" }],
|
||||
details: {},
|
||||
isError: false,
|
||||
},
|
||||
},
|
||||
partial: makeAssistant([toolCall]),
|
||||
},
|
||||
} as any,
|
||||
);
|
||||
|
||||
assert.equal(host.streamingComponent, undefined, "assistant text container should remain deferred for tool-only updates");
|
||||
assert.equal(host.chatContainer.children.length, 1, "tool execution block should render immediately");
|
||||
assert.equal(host.chatContainer.children[0]?.constructor?.name, "ToolExecutionComponent");
|
||||
|
||||
// Re-assert required host method before the text-bearing update path.
|
||||
host.getMarkdownThemeWithSettings = () => ({});
|
||||
|
||||
await handleAgentEvent(
|
||||
host,
|
||||
{
|
||||
type: "message_update",
|
||||
message: makeAssistant([toolCall, { type: "text", text: "done" }]),
|
||||
assistantMessageEvent: {
|
||||
type: "text_delta",
|
||||
contentIndex: 1,
|
||||
delta: "done",
|
||||
partial: makeAssistant([toolCall, { type: "text", text: "done" }]),
|
||||
},
|
||||
} as any,
|
||||
);
|
||||
|
||||
assert.equal(host.chatContainer.children.length, 2, "assistant content should render after existing tool output");
|
||||
assert.equal(host.chatContainer.children[0]?.constructor?.name, "ToolExecutionComponent");
|
||||
assert.equal(host.chatContainer.children[1]?.constructor?.name, "AssistantMessageComponent");
|
||||
});
|
||||
|
|
@ -9,6 +9,18 @@ import { appKey } from "../components/keybinding-hints.js";
|
|||
// Tracks the last processed content index to avoid re-scanning all blocks on every message_update
|
||||
let lastProcessedContentIndex = 0;
|
||||
|
||||
function hasVisibleAssistantContent(message: { content: Array<any> }): boolean {
|
||||
return message.content.some(
|
||||
(c) =>
|
||||
(c.type === "text" && typeof c.text === "string" && c.text.trim().length > 0)
|
||||
|| (c.type === "thinking" && typeof c.thinking === "string" && c.thinking.trim().length > 0),
|
||||
);
|
||||
}
|
||||
|
||||
function hasAssistantToolBlocks(message: { content: Array<any> }): boolean {
|
||||
return message.content.some((c) => c.type === "toolCall" || c.type === "serverToolUse");
|
||||
}
|
||||
|
||||
export async function handleAgentEvent(host: InteractiveModeStateHost & {
|
||||
init: () => Promise<void>;
|
||||
getMarkdownThemeWithSettings: () => any;
|
||||
|
|
@ -104,45 +116,55 @@ export async function handleAgentEvent(host: InteractiveModeStateHost & {
|
|||
host.updatePendingMessagesDisplay();
|
||||
host.ui.requestRender();
|
||||
} else if (event.message.role === "assistant") {
|
||||
host.streamingComponent = new AssistantMessageComponent(
|
||||
undefined,
|
||||
host.hideThinkingBlock,
|
||||
host.getMarkdownThemeWithSettings(),
|
||||
host.settingsManager.getTimestampFormat(),
|
||||
);
|
||||
host.streamingMessage = event.message;
|
||||
host.chatContainer.addChild(host.streamingComponent);
|
||||
host.streamingComponent.updateContent(host.streamingMessage);
|
||||
// External-tool providers can stream multiple assistant turns through
|
||||
// one response. Delay component creation until visible assistant text
|
||||
// arrives so tool outputs keep chronological ordering.
|
||||
host.ui.requestRender();
|
||||
}
|
||||
break;
|
||||
|
||||
case "message_update":
|
||||
if (host.streamingComponent && event.message.role === "assistant") {
|
||||
if (event.message.role === "assistant") {
|
||||
host.streamingMessage = event.message;
|
||||
host.streamingComponent.updateContent(host.streamingMessage);
|
||||
|
||||
// When the stream adapter signals a completed tool call with an
|
||||
// external result (from Claude Code SDK), update the pending
|
||||
// ToolExecutionComponent immediately so output is visible in
|
||||
// real-time instead of waiting for the session to end.
|
||||
const innerEvent = event.assistantMessageEvent;
|
||||
|
||||
if (!host.streamingComponent && hasVisibleAssistantContent(host.streamingMessage)) {
|
||||
host.streamingComponent = new AssistantMessageComponent(
|
||||
undefined,
|
||||
host.hideThinkingBlock,
|
||||
host.getMarkdownThemeWithSettings(),
|
||||
host.settingsManager.getTimestampFormat(),
|
||||
);
|
||||
host.chatContainer.addChild(host.streamingComponent);
|
||||
}
|
||||
if (host.streamingComponent) {
|
||||
host.streamingComponent.updateContent(host.streamingMessage);
|
||||
}
|
||||
|
||||
let externalToolResult:
|
||||
| { toolCallId: string; content: Array<{ type: string; text?: string; data?: string; mimeType?: string }>; details: Record<string, unknown>; isError: boolean }
|
||||
| undefined;
|
||||
if (innerEvent.type === "toolcall_end" && innerEvent.toolCall) {
|
||||
const tc = innerEvent.toolCall as any;
|
||||
const externalResult = tc.externalResult;
|
||||
if (externalResult) {
|
||||
const component = host.pendingTools.get(tc.id);
|
||||
if (component) {
|
||||
component.updateResult({
|
||||
content: externalResult.content ?? [{ type: "text", text: "" }],
|
||||
details: externalResult.details ?? {},
|
||||
isError: externalResult.isError ?? false,
|
||||
});
|
||||
}
|
||||
const ext = tc.externalResult;
|
||||
if (ext) {
|
||||
externalToolResult = {
|
||||
toolCallId: tc.id,
|
||||
content: ext.content ?? [{ type: "text", text: "" }],
|
||||
details: ext.details ?? {},
|
||||
isError: ext.isError ?? false,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
const contentBlocks = host.streamingMessage.content;
|
||||
// Some adapters reuse a single assistant lifecycle while internally
|
||||
// spanning multiple provider turns. When a new turn starts, content
|
||||
// length can shrink back to 0/1; reset scan index to avoid skipping.
|
||||
if (lastProcessedContentIndex >= contentBlocks.length) {
|
||||
lastProcessedContentIndex = 0;
|
||||
}
|
||||
for (let i = lastProcessedContentIndex; i < contentBlocks.length; i++) {
|
||||
const content = contentBlocks[i];
|
||||
if (content.type === "toolCall") {
|
||||
|
|
@ -192,6 +214,22 @@ export async function handleAgentEvent(host: InteractiveModeStateHost & {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// When the stream adapter signals a completed tool call with an
|
||||
// external result (from Claude Code SDK), update the pending
|
||||
// ToolExecutionComponent immediately so output is visible in
|
||||
// real-time instead of waiting for the session to end.
|
||||
if (externalToolResult) {
|
||||
const component = host.pendingTools.get(externalToolResult.toolCallId);
|
||||
if (component) {
|
||||
component.updateResult({
|
||||
content: externalToolResult.content,
|
||||
details: externalToolResult.details,
|
||||
isError: externalToolResult.isError,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Update index: fully processed blocks won't need re-scanning.
|
||||
// Keep the last block's index (it may still be accumulating data),
|
||||
// so we re-check it next time but skip all earlier ones.
|
||||
|
|
@ -204,7 +242,7 @@ export async function handleAgentEvent(host: InteractiveModeStateHost & {
|
|||
|
||||
case "message_end":
|
||||
if (event.message.role === "user") break;
|
||||
if (host.streamingComponent && event.message.role === "assistant") {
|
||||
if (event.message.role === "assistant") {
|
||||
host.streamingMessage = event.message;
|
||||
let errorMessage: string | undefined;
|
||||
if (host.streamingMessage.stopReason === "aborted") {
|
||||
|
|
@ -214,7 +252,25 @@ export async function handleAgentEvent(host: InteractiveModeStateHost & {
|
|||
: "Operation aborted";
|
||||
host.streamingMessage.errorMessage = errorMessage;
|
||||
}
|
||||
host.streamingComponent.updateContent(host.streamingMessage);
|
||||
|
||||
const shouldRenderAssistant = hasVisibleAssistantContent(host.streamingMessage)
|
||||
|| (
|
||||
(host.streamingMessage.stopReason === "aborted" || host.streamingMessage.stopReason === "error")
|
||||
&& !hasAssistantToolBlocks(host.streamingMessage)
|
||||
);
|
||||
if (!host.streamingComponent && shouldRenderAssistant) {
|
||||
host.streamingComponent = new AssistantMessageComponent(
|
||||
undefined,
|
||||
host.hideThinkingBlock,
|
||||
host.getMarkdownThemeWithSettings(),
|
||||
host.settingsManager.getTimestampFormat(),
|
||||
);
|
||||
host.chatContainer.addChild(host.streamingComponent);
|
||||
}
|
||||
if (host.streamingComponent) {
|
||||
host.streamingComponent.updateContent(host.streamingMessage);
|
||||
}
|
||||
|
||||
if (host.streamingMessage.stopReason === "aborted" || host.streamingMessage.stopReason === "error") {
|
||||
if (!errorMessage) {
|
||||
errorMessage = host.streamingMessage.errorMessage || "Error";
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue