From 3090f968f46ea3a2f28a0e5fb4787923e6877a7d Mon Sep 17 00:00:00 2001 From: frizynn Date: Thu, 19 Mar 2026 16:46:14 -0300 Subject: [PATCH] refactor: extract retry handler and compaction orchestrator from agent-session Extract two self-contained subsystems from agent-session.ts (3,367 -> 2,737 lines): - RetryHandler: auto-retry with exponential backoff, credential rotation, and cross-provider fallback logic - CompactionOrchestrator: manual/auto compaction, overflow recovery, and extension integration for custom compaction providers Also add shared getErrorMessage() utility to replace repeated `err instanceof Error ? err.message : String(err)` patterns. The extracted modules receive AgentSession state via dependency injection interfaces, avoiding state duplication. AgentSession remains the coordinator that delegates to these modules. --- .../pi-coding-agent/src/core/agent-session.ts | 780 ++---------------- .../src/core/compaction-orchestrator.ts | 424 ++++++++++ .../pi-coding-agent/src/core/retry-handler.ts | 359 ++++++++ packages/pi-coding-agent/src/utils/error.ts | 6 + 4 files changed, 864 insertions(+), 705 deletions(-) create mode 100644 packages/pi-coding-agent/src/core/compaction-orchestrator.ts create mode 100644 packages/pi-coding-agent/src/core/retry-handler.ts create mode 100644 packages/pi-coding-agent/src/utils/error.ts diff --git a/packages/pi-coding-agent/src/core/agent-session.ts b/packages/pi-coding-agent/src/core/agent-session.ts index a5dfa2335..edcad46a2 100644 --- a/packages/pi-coding-agent/src/core/agent-session.ts +++ b/packages/pi-coding-agent/src/core/agent-session.ts @@ -24,22 +24,20 @@ import type { ThinkingLevel, } from "@gsd/pi-agent-core"; import type { AssistantMessage, ImageContent, Message, Model, TextContent } from "@gsd/pi-ai"; -import { isContextOverflow, modelsAreEqual, resetApiProviders, supportsXhigh } from "@gsd/pi-ai"; +import { modelsAreEqual, resetApiProviders, supportsXhigh } from "@gsd/pi-ai"; import { getDocsPath } from "../config.js"; +import { getErrorMessage } from "../utils/error.js"; import { theme } from "../modes/interactive/theme/theme.js"; import { stripFrontmatter } from "../utils/frontmatter.js"; -import { sleep } from "../utils/sleep.js"; import { type BashResult, executeBash as executeBashCommand, executeBashWithOperations } from "./bash-executor.js"; import { type CompactionResult, calculateContextTokens, collectEntriesForBranchSummary, - compact, estimateContextTokens, generateBranchSummary, - prepareCompaction, - shouldCompact, } from "./compaction/index.js"; +import { CompactionOrchestrator } from "./compaction-orchestrator.js"; import { DEFAULT_THINKING_LEVEL } from "./defaults.js"; import { exportSessionToHtml, type ToolHtmlRenderer } from "./export-html/index.js"; import { createToolHtmlRenderer } from "./export-html/tool-renderer.js"; @@ -53,7 +51,6 @@ import { type MessageEndEvent, type MessageStartEvent, type MessageUpdateEvent, - type SessionBeforeCompactResult, type SessionBeforeForkResult, type SessionBeforeSwitchResult, type SessionBeforeTreeResult, @@ -73,7 +70,8 @@ import { FallbackResolver } from "./fallback-resolver.js"; import type { ModelRegistry } from "./model-registry.js"; import { expandPromptTemplate, type PromptTemplate } from "./prompt-templates.js"; import type { ResourceExtensionPaths, ResourceLoader } from "./resource-loader.js"; -import type { BranchSummaryEntry, CompactionEntry, SessionManager } from "./session-manager.js"; +import { RetryHandler } from "./retry-handler.js"; +import type { BranchSummaryEntry, SessionManager } from "./session-manager.js"; import { getLatestCompactionEntry } from "./session-manager.js"; import type { SettingsManager } from "./settings-manager.js"; import { BUILTIN_SLASH_COMMANDS, type SlashCommandInfo, type SlashCommandLocation } from "./slash-commands.js"; @@ -232,19 +230,9 @@ export class AgentSession { /** Messages queued to be included with the next user prompt as context ("asides"). */ private _pendingNextTurnMessages: CustomMessage[] = []; - // Compaction state - private _compactionAbortController: AbortController | undefined = undefined; - private _autoCompactionAbortController: AbortController | undefined = undefined; - private _overflowRecoveryAttempted = false; - - // Branch summarization state - private _branchSummaryAbortController: AbortController | undefined = undefined; - - // Retry state - private _retryAbortController: AbortController | undefined = undefined; - private _retryAttempt = 0; - private _retryPromise: Promise | undefined = undefined; - private _retryResolve: (() => void) | undefined = undefined; + // Delegated subsystems + private _retryHandler: RetryHandler; + private _compactionOrchestrator: CompactionOrchestrator; // Bash execution state private _bashAbortController: AbortController | undefined = undefined; @@ -299,6 +287,32 @@ export class AgentSession { this._initialActiveToolNames = config.initialActiveToolNames; this._baseToolsOverride = config.baseToolsOverride; + // Initialize delegated subsystems + this._retryHandler = new RetryHandler({ + agent: this.agent, + settingsManager: this.settingsManager, + modelRegistry: this._modelRegistry, + fallbackResolver: this._fallbackResolver, + getModel: () => this.model, + getSessionId: () => this.sessionId, + emit: (event) => this._emit(event), + onModelChange: (model) => this.sessionManager.appendModelChange(model.provider, model.id), + }); + + this._compactionOrchestrator = new CompactionOrchestrator({ + agent: this.agent, + sessionManager: this.sessionManager, + settingsManager: this.settingsManager, + modelRegistry: this._modelRegistry, + getModel: () => this.model, + getSessionId: () => this.sessionId, + getExtensionRunner: () => this._extensionRunner, + emit: (event) => this._emit(event), + disconnectFromAgent: () => this._disconnectFromAgent(), + reconnectToAgent: () => this._reconnectToAgent(), + abort: () => this.abort(), + }); + // Always subscribe to agent events for internal handling // (session persistence, extensions, auto-compaction, retry logic) this._unsubscribeAgent = this.agent.subscribe(this._handleAgentEvent); @@ -342,7 +356,7 @@ export class AgentSession { private _handleAgentEvent = (event: AgentEvent): void => { // Create retry promise synchronously before queueing async processing. // Agent.emit() calls this handler synchronously, and prompt() calls waitForRetry() - // as soon as agent.prompt() resolves. If _retryPromise is created only inside + // as soon as agent.prompt() resolves. If the retry promise is created only inside // _processAgentEvent, slow earlier queued events can delay agent_end processing // and waitForRetry() can miss the in-flight retry. this._createRetryPromiseForAgentEnd(event); @@ -357,40 +371,15 @@ export class AgentSession { }; private _createRetryPromiseForAgentEnd(event: AgentEvent): void { - if (event.type !== "agent_end" || this._retryPromise) { - return; - } - - const settings = this.settingsManager.getRetrySettings(); - if (!settings.enabled) { - return; - } - - const lastAssistant = this._findLastAssistantInMessages(event.messages); - if (!lastAssistant || !this._isRetryableError(lastAssistant)) { - return; - } - - this._retryPromise = new Promise((resolve) => { - this._retryResolve = resolve; - }); - } - - private _findLastAssistantInMessages(messages: AgentMessage[]): AssistantMessage | undefined { - for (let i = messages.length - 1; i >= 0; i--) { - const message = messages[i]; - if (message.role === "assistant") { - return message as AssistantMessage; - } - } - return undefined; + if (event.type !== "agent_end") return; + this._retryHandler.createRetryPromiseForAgentEnd(event.messages); } private async _processAgentEvent(event: AgentEvent): Promise { // When a user message starts, check if it's from either queue and remove it BEFORE emitting // This ensures the UI sees the updated queue state if (event.type === "message_start" && event.message.role === "user") { - this._overflowRecoveryAttempted = false; + this._compactionOrchestrator.resetOverflowRecovery(); const messageText = this._getUserMessageText(event.message); if (messageText) { // Check steering queue first @@ -440,19 +429,13 @@ export class AgentSession { const assistantMsg = event.message as AssistantMessage; if (assistantMsg.stopReason !== "error") { - this._overflowRecoveryAttempted = false; + this._compactionOrchestrator.clearOverflowRecovery(); } // Reset retry counter immediately on successful assistant response // This prevents accumulation across multiple LLM calls within a turn - if (assistantMsg.stopReason !== "error" && this._retryAttempt > 0) { - this._emit({ - type: "auto_retry_end", - success: true, - attempt: this._retryAttempt, - }); - this._retryAttempt = 0; - this._resolveRetry(); + if (assistantMsg.stopReason !== "error") { + this._retryHandler.handleSuccessfulResponse(); } } } @@ -463,21 +446,12 @@ export class AgentSession { this._lastAssistantMessage = undefined; // Check for retryable errors first (overloaded, rate limit, server errors) - if (this._isRetryableError(msg)) { - const didRetry = await this._handleRetryableError(msg); + if (this._retryHandler.isRetryableError(msg)) { + const didRetry = await this._retryHandler.handleRetryableError(msg); if (didRetry) return; // Retry was initiated, don't proceed to compaction } - await this._checkCompaction(msg); - } - } - - /** Resolve the pending retry promise */ - private _resolveRetry(): void { - if (this._retryResolve) { - this._retryResolve(); - this._retryResolve = undefined; - this._retryPromise = undefined; + await this._compactionOrchestrator.checkCompaction(msg); } } @@ -512,10 +486,7 @@ export class AgentSession { }; } } catch (err) { - if (err instanceof Error) { - return { block: true, reason: err.message }; - } - return { block: true, reason: `Extension failed, blocking execution: ${String(err)}` }; + return { block: true, reason: err instanceof Error ? err.message : `Extension failed, blocking execution: ${String(err)}` }; } return undefined; @@ -720,7 +691,7 @@ export class AgentSession { /** Current retry attempt (0 if not retrying) */ get retryAttempt(): number { - return this._retryAttempt; + return this._retryHandler.retryAttempt; } /** @@ -767,11 +738,7 @@ export class AgentSession { /** Whether compaction or branch summarization is currently running */ get isCompacting(): boolean { - return ( - this._autoCompactionAbortController !== undefined || - this._compactionAbortController !== undefined || - this._branchSummaryAbortController !== undefined - ); + return this._compactionOrchestrator.isCompacting; } /** @@ -1025,7 +992,7 @@ export class AgentSession { // Check if we need to compact before sending (catches aborted responses) const lastAssistant = this._findLastAssistantMessage(); if (lastAssistant) { - await this._checkCompaction(lastAssistant, false); + await this._compactionOrchestrator.checkCompaction(lastAssistant, false); } // Build messages array (custom message if any, then user message) @@ -1078,7 +1045,7 @@ export class AgentSession { } await this.agent.prompt(messages); - await this.waitForRetry(); + await this._retryHandler.waitForRetry(); } /** @@ -1106,7 +1073,7 @@ export class AgentSession { this._extensionRunner.emitError({ extensionPath: `command:${commandName}`, event: "command", - error: err instanceof Error ? err.message : String(err), + error: getErrorMessage(err), }); return true; } @@ -1137,7 +1104,7 @@ export class AgentSession { this._extensionRunner?.emitError({ extensionPath: skill.filePath, event: "skill_expansion", - error: err instanceof Error ? err.message : String(err), + error: getErrorMessage(err), }); return text; // Return original on error } @@ -1356,7 +1323,7 @@ export class AgentSession { * Abort current operation and wait for agent to become idle. */ async abort(): Promise { - this.abortRetry(); + this._retryHandler.abortRetry(); this.agent.abort(); await this.agent.waitForIdle(); // Ensure agent_end is emitted even when abort interrupts a tool call (#1414). @@ -1712,373 +1679,27 @@ export class AgentSession { * @param customInstructions Optional instructions for the compaction summary */ async compact(customInstructions?: string): Promise { - this._disconnectFromAgent(); - await this.abort(); - this._compactionAbortController = new AbortController(); - - try { - if (!this.model) { - throw new Error("No model selected"); - } - - const apiKey = await this._modelRegistry.getApiKey(this.model, this.sessionId); - if (!apiKey) { - throw new Error(`No API key for ${this.model.provider}`); - } - - const pathEntries = this.sessionManager.getBranch(); - const settings = this.settingsManager.getCompactionSettings(); - - const preparation = prepareCompaction(pathEntries, settings); - if (!preparation) { - // Check why we can't compact - const lastEntry = pathEntries[pathEntries.length - 1]; - if (lastEntry?.type === "compaction") { - throw new Error("Already compacted"); - } - throw new Error("Nothing to compact (session too small)"); - } - - let extensionCompaction: CompactionResult | undefined; - let fromExtension = false; - - if (this._extensionRunner?.hasHandlers("session_before_compact")) { - const result = (await this._extensionRunner.emit({ - type: "session_before_compact", - preparation, - branchEntries: pathEntries, - customInstructions, - signal: this._compactionAbortController.signal, - })) as SessionBeforeCompactResult | undefined; - - if (result?.cancel) { - throw new Error("Compaction cancelled"); - } - - if (result?.compaction) { - extensionCompaction = result.compaction; - fromExtension = true; - } - } - - let summary: string; - let firstKeptEntryId: string; - let tokensBefore: number; - let details: unknown; - - if (extensionCompaction) { - // Extension provided compaction content - summary = extensionCompaction.summary; - firstKeptEntryId = extensionCompaction.firstKeptEntryId; - tokensBefore = extensionCompaction.tokensBefore; - details = extensionCompaction.details; - } else { - // Generate compaction result - const result = await compact( - preparation, - this.model, - apiKey, - customInstructions, - this._compactionAbortController.signal, - ); - summary = result.summary; - firstKeptEntryId = result.firstKeptEntryId; - tokensBefore = result.tokensBefore; - details = result.details; - } - - if (this._compactionAbortController.signal.aborted) { - throw new Error("Compaction cancelled"); - } - - this.sessionManager.appendCompaction(summary, firstKeptEntryId, tokensBefore, details, fromExtension); - const newEntries = this.sessionManager.getEntries(); - const sessionContext = this.sessionManager.buildSessionContext(); - this.agent.replaceMessages(sessionContext.messages); - - // Get the saved compaction entry for the extension event - const savedCompactionEntry = newEntries.find((e) => e.type === "compaction" && e.summary === summary) as - | CompactionEntry - | undefined; - - if (this._extensionRunner && savedCompactionEntry) { - await this._extensionRunner.emit({ - type: "session_compact", - compactionEntry: savedCompactionEntry, - fromExtension, - }); - } - - return { - summary, - firstKeptEntryId, - tokensBefore, - details, - }; - } finally { - this._compactionAbortController = undefined; - this._reconnectToAgent(); - } + return this._compactionOrchestrator.compact(customInstructions); } - /** - * Cancel in-progress compaction (manual or auto). - */ + /** Cancel in-progress compaction (manual or auto) */ abortCompaction(): void { - this._compactionAbortController?.abort(); - this._autoCompactionAbortController?.abort(); + this._compactionOrchestrator.abortCompaction(); } - /** - * Cancel in-progress branch summarization. - */ + /** Cancel in-progress branch summarization */ abortBranchSummary(): void { - this._branchSummaryAbortController?.abort(); + this._compactionOrchestrator.abortBranchSummary(); } - /** - * Check if compaction is needed and run it. - * Called after agent_end and before prompt submission. - * - * Two cases: - * 1. Overflow: LLM returned context overflow error, remove error message from agent state, compact, auto-retry - * 2. Threshold: Context over threshold, compact, NO auto-retry (user continues manually) - * - * @param assistantMessage The assistant message to check - * @param skipAbortedCheck If false, include aborted messages (for pre-prompt check). Default: true - */ - private async _checkCompaction(assistantMessage: AssistantMessage, skipAbortedCheck = true): Promise { - const settings = this.settingsManager.getCompactionSettings(); - if (!settings.enabled) return; - - // Skip if message was aborted (user cancelled) - unless skipAbortedCheck is false - if (skipAbortedCheck && assistantMessage.stopReason === "aborted") return; - - const contextWindow = this.model?.contextWindow ?? 0; - - // Skip overflow check if the message came from a different model. - // This handles the case where user switched from a smaller-context model (e.g. opus) - // to a larger-context model (e.g. codex) - the overflow error from the old model - // shouldn't trigger compaction for the new model. - const sameModel = - this.model && assistantMessage.provider === this.model.provider && assistantMessage.model === this.model.id; - - // Skip compaction checks if this assistant message is older than the latest - // compaction boundary. This prevents a stale pre-compaction usage/error - // from retriggering compaction on the first prompt after compaction. - const compactionEntry = getLatestCompactionEntry(this.sessionManager.getBranch()); - const assistantIsFromBeforeCompaction = - compactionEntry !== null && assistantMessage.timestamp <= new Date(compactionEntry.timestamp).getTime(); - if (assistantIsFromBeforeCompaction) { - return; - } - - // Case 1: Overflow - LLM returned context overflow error - if (sameModel && isContextOverflow(assistantMessage, contextWindow)) { - if (this._overflowRecoveryAttempted) { - this._emit({ - type: "auto_compaction_end", - result: undefined, - aborted: false, - willRetry: false, - errorMessage: - "Context overflow recovery failed after one compact-and-retry attempt. Try reducing context or switching to a larger-context model.", - }); - return; - } - - this._overflowRecoveryAttempted = true; - // Remove the error message from agent state (it IS saved to session for history, - // but we don't want it in context for the retry) - const messages = this.agent.state.messages; - if (messages.length > 0 && messages[messages.length - 1].role === "assistant") { - this.agent.replaceMessages(messages.slice(0, -1)); - } - await this._runAutoCompaction("overflow", true); - return; - } - - // Case 2: Threshold - context is getting large - // For error messages (no usage data), estimate from last successful response. - // This ensures sessions that hit persistent API errors (e.g. 529) can still compact. - let contextTokens: number; - if (assistantMessage.stopReason === "error") { - const messages = this.agent.state.messages; - const estimate = estimateContextTokens(messages); - if (estimate.lastUsageIndex === null) return; // No usage data at all - // Verify the usage source is post-compaction. Kept pre-compaction messages - // have stale usage reflecting the old (larger) context and would falsely - // trigger compaction right after one just finished. - const usageMsg = messages[estimate.lastUsageIndex]; - if ( - compactionEntry && - usageMsg.role === "assistant" && - (usageMsg as AssistantMessage).timestamp <= new Date(compactionEntry.timestamp).getTime() - ) { - return; - } - contextTokens = estimate.tokens; - } else { - contextTokens = calculateContextTokens(assistantMessage.usage); - } - if (shouldCompact(contextTokens, contextWindow, settings)) { - await this._runAutoCompaction("threshold", false); - } - } - - /** - * Internal: Run auto-compaction with events. - */ - private async _runAutoCompaction(reason: "overflow" | "threshold", willRetry: boolean): Promise { - const settings = this.settingsManager.getCompactionSettings(); - - this._emit({ type: "auto_compaction_start", reason }); - this._autoCompactionAbortController = new AbortController(); - - try { - if (!this.model) { - this._emit({ type: "auto_compaction_end", result: undefined, aborted: false, willRetry: false }); - return; - } - - const apiKey = await this._modelRegistry.getApiKey(this.model, this.sessionId); - if (!apiKey) { - this._emit({ type: "auto_compaction_end", result: undefined, aborted: false, willRetry: false }); - return; - } - - const pathEntries = this.sessionManager.getBranch(); - - const preparation = prepareCompaction(pathEntries, settings); - if (!preparation) { - this._emit({ type: "auto_compaction_end", result: undefined, aborted: false, willRetry: false }); - return; - } - - let extensionCompaction: CompactionResult | undefined; - let fromExtension = false; - - if (this._extensionRunner?.hasHandlers("session_before_compact")) { - const extensionResult = (await this._extensionRunner.emit({ - type: "session_before_compact", - preparation, - branchEntries: pathEntries, - customInstructions: undefined, - signal: this._autoCompactionAbortController.signal, - })) as SessionBeforeCompactResult | undefined; - - if (extensionResult?.cancel) { - this._emit({ type: "auto_compaction_end", result: undefined, aborted: true, willRetry: false }); - return; - } - - if (extensionResult?.compaction) { - extensionCompaction = extensionResult.compaction; - fromExtension = true; - } - } - - let summary: string; - let firstKeptEntryId: string; - let tokensBefore: number; - let details: unknown; - - if (extensionCompaction) { - // Extension provided compaction content - summary = extensionCompaction.summary; - firstKeptEntryId = extensionCompaction.firstKeptEntryId; - tokensBefore = extensionCompaction.tokensBefore; - details = extensionCompaction.details; - } else { - // Generate compaction result - const compactResult = await compact( - preparation, - this.model, - apiKey, - undefined, - this._autoCompactionAbortController.signal, - ); - summary = compactResult.summary; - firstKeptEntryId = compactResult.firstKeptEntryId; - tokensBefore = compactResult.tokensBefore; - details = compactResult.details; - } - - if (this._autoCompactionAbortController.signal.aborted) { - this._emit({ type: "auto_compaction_end", result: undefined, aborted: true, willRetry: false }); - return; - } - - this.sessionManager.appendCompaction(summary, firstKeptEntryId, tokensBefore, details, fromExtension); - const newEntries = this.sessionManager.getEntries(); - const sessionContext = this.sessionManager.buildSessionContext(); - this.agent.replaceMessages(sessionContext.messages); - - // Get the saved compaction entry for the extension event - const savedCompactionEntry = newEntries.find((e) => e.type === "compaction" && e.summary === summary) as - | CompactionEntry - | undefined; - - if (this._extensionRunner && savedCompactionEntry) { - await this._extensionRunner.emit({ - type: "session_compact", - compactionEntry: savedCompactionEntry, - fromExtension, - }); - } - - const result: CompactionResult = { - summary, - firstKeptEntryId, - tokensBefore, - details, - }; - this._emit({ type: "auto_compaction_end", result, aborted: false, willRetry }); - - if (willRetry) { - const messages = this.agent.state.messages; - const lastMsg = messages[messages.length - 1]; - if (lastMsg?.role === "assistant" && (lastMsg as AssistantMessage).stopReason === "error") { - this.agent.replaceMessages(messages.slice(0, -1)); - } - - setTimeout(() => { - this.agent.continue().catch(() => {}); - }, 100); - } else if (this.agent.hasQueuedMessages()) { - // Auto-compaction can complete while follow-up/steering/custom messages are waiting. - // Kick the loop so queued messages are actually delivered. - setTimeout(() => { - this.agent.continue().catch(() => {}); - }, 100); - } - } catch (error) { - const errorMessage = error instanceof Error ? error.message : "compaction failed"; - this._emit({ - type: "auto_compaction_end", - result: undefined, - aborted: false, - willRetry: false, - errorMessage: - reason === "overflow" - ? `Context overflow recovery failed: ${errorMessage}` - : `Auto-compaction failed: ${errorMessage}`, - }); - } finally { - this._autoCompactionAbortController = undefined; - } - } - - /** - * Toggle auto-compaction setting. - */ + /** Toggle auto-compaction setting */ setAutoCompactionEnabled(enabled: boolean): void { - this.settingsManager.setCompactionEnabled(enabled); + this._compactionOrchestrator.setAutoCompactionEnabled(enabled); } /** Whether auto-compaction is enabled */ get autoCompactionEnabled(): boolean { - return this.settingsManager.getCompactionEnabled(); + return this._compactionOrchestrator.autoCompactionEnabled; } async bindExtensions(bindings: ExtensionBindings): Promise { @@ -2212,7 +1833,7 @@ export class AgentSession { runner.emitError({ extensionPath: "", event: "send_message", - error: err instanceof Error ? err.message : String(err), + error: getErrorMessage(err), }); }); }, @@ -2221,7 +1842,7 @@ export class AgentSession { runner.emitError({ extensionPath: "", event: "send_user_message", - error: err instanceof Error ? err.message : String(err), + error: getErrorMessage(err), }); }); }, @@ -2234,7 +1855,7 @@ export class AgentSession { runner.emitError({ extensionPath: "", event: "retry_last_turn", - error: err instanceof Error ? err.message : String(err), + error: getErrorMessage(err), }); }); } @@ -2434,278 +2055,27 @@ export class AgentSession { } // ========================================================================= - // Auto-Retry + // Auto-Retry (delegated to RetryHandler) // ========================================================================= - /** - * Check if an error is retryable (overloaded, rate limit, server errors). - * Context overflow errors are NOT retryable (handled by compaction instead). - */ - private _isRetryableError(message: AssistantMessage): boolean { - if (message.stopReason !== "error" || !message.errorMessage) return false; - - // Context overflow is handled by compaction, not retry - const contextWindow = this.model?.contextWindow ?? 0; - if (isContextOverflow(message, contextWindow)) return false; - - const err = message.errorMessage; - // Match: overloaded_error, rate limit, 429, 500, 502, 503, 504, service unavailable, connection errors, fetch failed, terminated, retry delay exceeded, network unavailable / auth expired (transient network failures) - return /overloaded|rate.?limit|too many requests|429|500|502|503|504|service.?unavailable|server.?error|internal.?error|connection.?error|connection.?refused|other side closed|fetch failed|upstream.?connect|reset before headers|terminated|retry delay|network.?(?:is\s+)?unavailable|credentials.*expired|temporarily backed off/i.test( - err, - ); - } - - /** - * Classify an error message into a usage-limit error type for credential backoff. - */ - private _classifyErrorType(errorMessage: string): import("./auth-storage.js").UsageLimitErrorType { - const err = errorMessage.toLowerCase(); - if (/quota|billing|exceeded.*limit|usage.*limit/i.test(err)) return "quota_exhausted"; - if (/rate.?limit|too many requests|429/i.test(err)) return "rate_limit"; - if (/500|502|503|504|server.?error|internal.?error|service.?unavailable/i.test(err)) return "server_error"; - return "unknown"; - } - - /** - * Handle retryable errors with exponential backoff. - * When multiple credentials are available, marks the failing credential - * as backed off and retries immediately with the next one. - * @returns true if retry was initiated, false if max retries exceeded or disabled - */ - private async _handleRetryableError(message: AssistantMessage): Promise { - const settings = this.settingsManager.getRetrySettings(); - if (!settings.enabled) { - this._resolveRetry(); - return false; - } - - // Retry promise is created synchronously in _handleAgentEvent for agent_end. - // Keep a defensive fallback here in case a future refactor bypasses that path. - if (!this._retryPromise) { - this._retryPromise = new Promise((resolve) => { - this._retryResolve = resolve; - }); - } - - // Try credential fallback before counting against retry budget. - // If another credential is available, switch to it and retry immediately. - // Only attempt credential rotation for errors that indicate a credential-level - // problem (rate limit, quota exhaustion, server error). Transport failures - // ("unknown") like connection resets are not credential-specific — rotating - // won't help and backing off the only credential causes "Authentication failed". - if (this.model && message.errorMessage) { - const errorType = this._classifyErrorType(message.errorMessage); - const isCredentialError = errorType !== "unknown"; - const hasAlternate = isCredentialError && this._modelRegistry.authStorage.markUsageLimitReached( - this.model.provider, - this.sessionId, - { errorType }, - ); - - if (hasAlternate) { - // Remove error message from agent state - const messages = this.agent.state.messages; - if (messages.length > 0 && messages[messages.length - 1].role === "assistant") { - this.agent.replaceMessages(messages.slice(0, -1)); - } - - this._emit({ - type: "auto_retry_start", - attempt: this._retryAttempt + 1, - maxAttempts: settings.maxRetries, - delayMs: 0, - errorMessage: `${message.errorMessage} (switching credential)`, - }); - - // Retry immediately with the next credential - don't increment _retryAttempt - setTimeout(() => { - this.agent.continue().catch(() => { - // Retry failed - will be caught by next agent_end - }); - }, 0); - - return true; - } - - // All credentials are backed off. Try cross-provider fallback before giving up. - if (isCredentialError) { - const fallbackResult = await this._fallbackResolver.findFallback( - this.model, - errorType, - ); - - if (fallbackResult) { - // Swap to fallback model — don't persist to settings - const previousProvider = this.model.provider; - this.agent.setModel(fallbackResult.model); - this.sessionManager.appendModelChange(fallbackResult.model.provider, fallbackResult.model.id); - - // Remove error message from agent state - const msgs = this.agent.state.messages; - if (msgs.length > 0 && msgs[msgs.length - 1].role === "assistant") { - this.agent.replaceMessages(msgs.slice(0, -1)); - } - - this._emit({ - type: "fallback_provider_switch", - from: `${previousProvider}/${this.model?.id}`, - to: `${fallbackResult.model.provider}/${fallbackResult.model.id}`, - reason: fallbackResult.reason, - }); - - this._emit({ - type: "auto_retry_start", - attempt: this._retryAttempt + 1, - maxAttempts: settings.maxRetries, - delayMs: 0, - errorMessage: `${message.errorMessage} (${fallbackResult.reason})`, - }); - - // Retry immediately with fallback provider - don't increment _retryAttempt - setTimeout(() => { - this.agent.continue().catch(() => { - // Retry failed - will be caught by next agent_end - }); - }, 0); - - return true; - } - - // No fallback available either - if (errorType === "quota_exhausted") { - this._emit({ - type: "fallback_chain_exhausted", - reason: `All providers exhausted for ${this.model.provider}/${this.model.id}`, - }); - this._emit({ - type: "auto_retry_end", - success: false, - attempt: this._retryAttempt, - finalError: message.errorMessage, - }); - this._retryAttempt = 0; - this._resolveRetry(); - return false; - } - } - } - - this._retryAttempt++; - - if (this._retryAttempt > settings.maxRetries) { - // Max retries exceeded, emit final failure and reset - this._emit({ - type: "auto_retry_end", - success: false, - attempt: this._retryAttempt - 1, - finalError: message.errorMessage, - }); - this._retryAttempt = 0; - this._resolveRetry(); // Resolve so waitForRetry() completes - return false; - } - - // Use server-requested delay when available (rate limit headers), capped by maxDelayMs. - // Fall back to exponential backoff when no server hint is present. - const exponentialDelayMs = settings.baseDelayMs * 2 ** (this._retryAttempt - 1); - let delayMs: number; - if (message.retryAfterMs !== undefined) { - const cap = settings.maxDelayMs > 0 ? settings.maxDelayMs : Infinity; - if (message.retryAfterMs > cap) { - // Server wants us to wait longer than maxDelayMs — give up to let auto-mode handle recovery - this._emit({ - type: "auto_retry_end", - success: false, - attempt: this._retryAttempt - 1, - finalError: `Rate limit reset in ${Math.ceil(message.retryAfterMs / 1000)}s (max: ${Math.ceil(cap / 1000)}s). ${message.errorMessage || ""}`.trim(), - }); - this._retryAttempt = 0; - this._resolveRetry(); - return false; - } - delayMs = message.retryAfterMs; - } else { - delayMs = exponentialDelayMs; - } - - this._emit({ - type: "auto_retry_start", - attempt: this._retryAttempt, - maxAttempts: settings.maxRetries, - delayMs, - errorMessage: message.errorMessage || "Unknown error", - }); - - // Remove error message from agent state (keep in session for history) - const messages = this.agent.state.messages; - if (messages.length > 0 && messages[messages.length - 1].role === "assistant") { - this.agent.replaceMessages(messages.slice(0, -1)); - } - - // Wait with exponential backoff (abortable) - this._retryAbortController = new AbortController(); - try { - await sleep(delayMs, this._retryAbortController.signal); - } catch { - // Aborted during sleep - emit end event so UI can clean up - const attempt = this._retryAttempt; - this._retryAttempt = 0; - this._retryAbortController = undefined; - this._emit({ - type: "auto_retry_end", - success: false, - attempt, - finalError: "Retry cancelled", - }); - this._resolveRetry(); - return false; - } - this._retryAbortController = undefined; - - // Retry via continue() - use setTimeout to break out of event handler chain - setTimeout(() => { - this.agent.continue().catch(() => { - // Retry failed - will be caught by next agent_end - }); - }, 0); - - return true; - } - - /** - * Cancel in-progress retry. - */ + /** Cancel in-progress retry */ abortRetry(): void { - this._retryAbortController?.abort(); - // Note: _retryAttempt is reset in the catch block of _autoRetry - this._resolveRetry(); - } - - /** - * Wait for any in-progress retry to complete. - * Returns immediately if no retry is in progress. - */ - private async waitForRetry(): Promise { - if (this._retryPromise) { - await this._retryPromise; - } + this._retryHandler.abortRetry(); } /** Whether auto-retry is currently in progress */ get isRetrying(): boolean { - return this._retryPromise !== undefined; + return this._retryHandler.isRetrying; } /** Whether auto-retry is enabled */ get autoRetryEnabled(): boolean { - return this.settingsManager.getRetryEnabled(); + return this._retryHandler.autoRetryEnabled; } - /** - * Toggle auto-retry setting. - */ + /** Toggle auto-retry setting */ setAutoRetryEnabled(enabled: boolean): void { - this.settingsManager.setRetryEnabled(enabled); + this._retryHandler.setAutoRetryEnabled(enabled); } // ========================================================================= @@ -3029,7 +2399,7 @@ export class AgentSession { }; // Set up abort controller for summarization - this._branchSummaryAbortController = new AbortController(); + this._compactionOrchestrator.branchSummaryAbortController = new AbortController(); let extensionSummary: { summary: string; details?: unknown } | undefined; let fromExtension = false; @@ -3038,7 +2408,7 @@ export class AgentSession { const result = (await this._extensionRunner.emit({ type: "session_before_tree", preparation, - signal: this._branchSummaryAbortController.signal, + signal: this._compactionOrchestrator.branchSummaryAbortController.signal, })) as SessionBeforeTreeResult | undefined; if (result?.cancel) { @@ -3075,12 +2445,12 @@ export class AgentSession { const result = await generateBranchSummary(entriesToSummarize, { model, apiKey, - signal: this._branchSummaryAbortController.signal, + signal: this._compactionOrchestrator.branchSummaryAbortController.signal, customInstructions, replaceInstructions, reserveTokens: branchSummarySettings.reserveTokens, }); - this._branchSummaryAbortController = undefined; + this._compactionOrchestrator.branchSummaryAbortController = undefined; if (result.aborted) { return { cancelled: true, aborted: true }; } @@ -3162,7 +2532,7 @@ export class AgentSession { // Emit to custom tools - this._branchSummaryAbortController = undefined; + this._compactionOrchestrator.branchSummaryAbortController = undefined; return { editorText, cancelled: false, summaryEntry }; } diff --git a/packages/pi-coding-agent/src/core/compaction-orchestrator.ts b/packages/pi-coding-agent/src/core/compaction-orchestrator.ts new file mode 100644 index 000000000..6415f8098 --- /dev/null +++ b/packages/pi-coding-agent/src/core/compaction-orchestrator.ts @@ -0,0 +1,424 @@ +/** + * CompactionOrchestrator - Manages manual and automatic context compaction. + * + * Handles: + * - Manual compaction (user-triggered /compact) + * - Auto-compaction when context exceeds threshold + * - Overflow recovery when LLM returns context overflow errors + * - Extension integration for custom compaction providers + * - Branch summarization abort coordination + */ + +import type { Agent } from "@gsd/pi-agent-core"; +import type { AssistantMessage, Model } from "@gsd/pi-ai"; +import { isContextOverflow } from "@gsd/pi-ai"; +import { + type CompactionResult, + calculateContextTokens, + compact, + estimateContextTokens, + prepareCompaction, + shouldCompact, +} from "./compaction/index.js"; +import type { ExtensionRunner, SessionBeforeCompactResult } from "./extensions/index.js"; +import type { ModelRegistry } from "./model-registry.js"; +import { getLatestCompactionEntry } from "./session-manager.js"; +import type { CompactionEntry, SessionManager } from "./session-manager.js"; +import type { SettingsManager } from "./settings-manager.js"; +import type { AgentSessionEvent } from "./agent-session.js"; +import { getErrorMessage } from "../utils/error.js"; + +/** Dependencies injected from AgentSession into CompactionOrchestrator */ +export interface CompactionOrchestratorDeps { + readonly agent: Agent; + readonly sessionManager: SessionManager; + readonly settingsManager: SettingsManager; + readonly modelRegistry: ModelRegistry; + getModel: () => Model | undefined; + getSessionId: () => string; + getExtensionRunner: () => ExtensionRunner | undefined; + emit: (event: AgentSessionEvent) => void; + disconnectFromAgent: () => void; + reconnectToAgent: () => void; + abort: () => Promise; +} + +export class CompactionOrchestrator { + private _compactionAbortController: AbortController | undefined = undefined; + private _autoCompactionAbortController: AbortController | undefined = undefined; + private _overflowRecoveryAttempted = false; + private _branchSummaryAbortController: AbortController | undefined = undefined; + + constructor(private readonly _deps: CompactionOrchestratorDeps) {} + + /** Whether compaction or branch summarization is currently running */ + get isCompacting(): boolean { + return ( + this._autoCompactionAbortController !== undefined || + this._compactionAbortController !== undefined || + this._branchSummaryAbortController !== undefined + ); + } + + /** Reset overflow recovery flag (called when a new user message starts) */ + resetOverflowRecovery(): void { + this._overflowRecoveryAttempted = false; + } + + /** Mark overflow recovery as not needed (called on successful assistant response) */ + clearOverflowRecovery(): void { + this._overflowRecoveryAttempted = false; + } + + /** Get/set the branch summary abort controller (used by navigateTree) */ + get branchSummaryAbortController(): AbortController | undefined { + return this._branchSummaryAbortController; + } + set branchSummaryAbortController(controller: AbortController | undefined) { + this._branchSummaryAbortController = controller; + } + + /** + * Manually compact the session context. + * Aborts current agent operation first. + * @param customInstructions Optional instructions for the compaction summary + */ + async compact(customInstructions?: string): Promise { + this._deps.disconnectFromAgent(); + await this._deps.abort(); + this._compactionAbortController = new AbortController(); + + try { + const model = this._deps.getModel(); + if (!model) { + throw new Error("No model selected"); + } + + const apiKey = await this._deps.modelRegistry.getApiKey(model, this._deps.getSessionId()); + if (!apiKey) { + throw new Error(`No API key for ${model.provider}`); + } + + const pathEntries = this._deps.sessionManager.getBranch(); + const settings = this._deps.settingsManager.getCompactionSettings(); + + const preparation = prepareCompaction(pathEntries, settings); + if (!preparation) { + const lastEntry = pathEntries[pathEntries.length - 1]; + if (lastEntry?.type === "compaction") { + throw new Error("Already compacted"); + } + throw new Error("Nothing to compact (session too small)"); + } + + let extensionCompaction: CompactionResult | undefined; + let fromExtension = false; + const extensionRunner = this._deps.getExtensionRunner(); + + if (extensionRunner?.hasHandlers("session_before_compact")) { + const result = (await extensionRunner.emit({ + type: "session_before_compact", + preparation, + branchEntries: pathEntries, + customInstructions, + signal: this._compactionAbortController.signal, + })) as SessionBeforeCompactResult | undefined; + + if (result?.cancel) { + throw new Error("Compaction cancelled"); + } + + if (result?.compaction) { + extensionCompaction = result.compaction; + fromExtension = true; + } + } + + let summary: string; + let firstKeptEntryId: string; + let tokensBefore: number; + let details: unknown; + + if (extensionCompaction) { + summary = extensionCompaction.summary; + firstKeptEntryId = extensionCompaction.firstKeptEntryId; + tokensBefore = extensionCompaction.tokensBefore; + details = extensionCompaction.details; + } else { + const result = await compact( + preparation, + model, + apiKey, + customInstructions, + this._compactionAbortController.signal, + ); + summary = result.summary; + firstKeptEntryId = result.firstKeptEntryId; + tokensBefore = result.tokensBefore; + details = result.details; + } + + if (this._compactionAbortController.signal.aborted) { + throw new Error("Compaction cancelled"); + } + + this._deps.sessionManager.appendCompaction(summary, firstKeptEntryId, tokensBefore, details, fromExtension); + const newEntries = this._deps.sessionManager.getEntries(); + const sessionContext = this._deps.sessionManager.buildSessionContext(); + this._deps.agent.replaceMessages(sessionContext.messages); + + const savedCompactionEntry = newEntries.find( + (e) => e.type === "compaction" && e.summary === summary, + ) as CompactionEntry | undefined; + + if (extensionRunner && savedCompactionEntry) { + await extensionRunner.emit({ + type: "session_compact", + compactionEntry: savedCompactionEntry, + fromExtension, + }); + } + + return { summary, firstKeptEntryId, tokensBefore, details }; + } finally { + this._compactionAbortController = undefined; + this._deps.reconnectToAgent(); + } + } + + /** Cancel in-progress compaction (manual or auto) */ + abortCompaction(): void { + this._compactionAbortController?.abort(); + this._autoCompactionAbortController?.abort(); + } + + /** Cancel in-progress branch summarization */ + abortBranchSummary(): void { + this._branchSummaryAbortController?.abort(); + } + + /** + * Check if compaction is needed and run it. + * Called after agent_end and before prompt submission. + * + * Two cases: + * 1. Overflow: LLM returned context overflow error, remove error message, compact, auto-retry + * 2. Threshold: Context over threshold, compact, NO auto-retry + * + * @param assistantMessage The assistant message to check + * @param skipAbortedCheck If false, include aborted messages (for pre-prompt check). Default: true + */ + async checkCompaction(assistantMessage: AssistantMessage, skipAbortedCheck = true): Promise { + const settings = this._deps.settingsManager.getCompactionSettings(); + if (!settings.enabled) return; + + if (skipAbortedCheck && assistantMessage.stopReason === "aborted") return; + + const model = this._deps.getModel(); + const contextWindow = model?.contextWindow ?? 0; + + const sameModel = + model && assistantMessage.provider === model.provider && assistantMessage.model === model.id; + + const branchEntries = this._deps.sessionManager.getBranch(); + const compactionEntry = getLatestCompactionEntry(branchEntries); + const assistantIsFromBeforeCompaction = + compactionEntry !== null && assistantMessage.timestamp <= new Date(compactionEntry.timestamp).getTime(); + if (assistantIsFromBeforeCompaction) return; + + // Case 1: Overflow - LLM returned context overflow error + if (sameModel && isContextOverflow(assistantMessage, contextWindow)) { + if (this._overflowRecoveryAttempted) { + this._deps.emit({ + type: "auto_compaction_end", + result: undefined, + aborted: false, + willRetry: false, + errorMessage: + "Context overflow recovery failed after one compact-and-retry attempt. Try reducing context or switching to a larger-context model.", + }); + return; + } + + this._overflowRecoveryAttempted = true; + const messages = this._deps.agent.state.messages; + if (messages.length > 0 && messages[messages.length - 1].role === "assistant") { + this._deps.agent.replaceMessages(messages.slice(0, -1)); + } + await this._runAutoCompaction("overflow", true); + return; + } + + // Case 2: Threshold - context is getting large + let contextTokens: number; + if (assistantMessage.stopReason === "error") { + const messages = this._deps.agent.state.messages; + const estimate = estimateContextTokens(messages); + if (estimate.lastUsageIndex === null) return; + const usageMsg = messages[estimate.lastUsageIndex]; + if ( + compactionEntry && + usageMsg.role === "assistant" && + (usageMsg as AssistantMessage).timestamp <= new Date(compactionEntry.timestamp).getTime() + ) { + return; + } + contextTokens = estimate.tokens; + } else { + contextTokens = calculateContextTokens(assistantMessage.usage); + } + if (shouldCompact(contextTokens, contextWindow, settings)) { + await this._runAutoCompaction("threshold", false); + } + } + + /** Toggle auto-compaction setting */ + setAutoCompactionEnabled(enabled: boolean): void { + this._deps.settingsManager.setCompactionEnabled(enabled); + } + + /** Whether auto-compaction is enabled */ + get autoCompactionEnabled(): boolean { + return this._deps.settingsManager.getCompactionEnabled(); + } + + // ========================================================================= + // Private helpers + // ========================================================================= + + private async _runAutoCompaction(reason: "overflow" | "threshold", willRetry: boolean): Promise { + const settings = this._deps.settingsManager.getCompactionSettings(); + + this._deps.emit({ type: "auto_compaction_start", reason }); + this._autoCompactionAbortController = new AbortController(); + + try { + const model = this._deps.getModel(); + if (!model) { + this._deps.emit({ type: "auto_compaction_end", result: undefined, aborted: false, willRetry: false }); + return; + } + + const apiKey = await this._deps.modelRegistry.getApiKey(model, this._deps.getSessionId()); + if (!apiKey) { + this._deps.emit({ type: "auto_compaction_end", result: undefined, aborted: false, willRetry: false }); + return; + } + + const pathEntries = this._deps.sessionManager.getBranch(); + const preparation = prepareCompaction(pathEntries, settings); + if (!preparation) { + this._deps.emit({ type: "auto_compaction_end", result: undefined, aborted: false, willRetry: false }); + return; + } + + let extensionCompaction: CompactionResult | undefined; + let fromExtension = false; + const extensionRunner = this._deps.getExtensionRunner(); + + if (extensionRunner?.hasHandlers("session_before_compact")) { + const extensionResult = (await extensionRunner.emit({ + type: "session_before_compact", + preparation, + branchEntries: pathEntries, + customInstructions: undefined, + signal: this._autoCompactionAbortController.signal, + })) as SessionBeforeCompactResult | undefined; + + if (extensionResult?.cancel) { + this._deps.emit({ + type: "auto_compaction_end", + result: undefined, + aborted: true, + willRetry: false, + }); + return; + } + + if (extensionResult?.compaction) { + extensionCompaction = extensionResult.compaction; + fromExtension = true; + } + } + + let summary: string; + let firstKeptEntryId: string; + let tokensBefore: number; + let details: unknown; + + if (extensionCompaction) { + summary = extensionCompaction.summary; + firstKeptEntryId = extensionCompaction.firstKeptEntryId; + tokensBefore = extensionCompaction.tokensBefore; + details = extensionCompaction.details; + } else { + const compactResult = await compact( + preparation, + model, + apiKey, + undefined, + this._autoCompactionAbortController.signal, + ); + summary = compactResult.summary; + firstKeptEntryId = compactResult.firstKeptEntryId; + tokensBefore = compactResult.tokensBefore; + details = compactResult.details; + } + + if (this._autoCompactionAbortController.signal.aborted) { + this._deps.emit({ type: "auto_compaction_end", result: undefined, aborted: true, willRetry: false }); + return; + } + + this._deps.sessionManager.appendCompaction(summary, firstKeptEntryId, tokensBefore, details, fromExtension); + const newEntries = this._deps.sessionManager.getEntries(); + const sessionContext = this._deps.sessionManager.buildSessionContext(); + this._deps.agent.replaceMessages(sessionContext.messages); + + const savedCompactionEntry = newEntries.find( + (e) => e.type === "compaction" && e.summary === summary, + ) as CompactionEntry | undefined; + + if (extensionRunner && savedCompactionEntry) { + await extensionRunner.emit({ + type: "session_compact", + compactionEntry: savedCompactionEntry, + fromExtension, + }); + } + + const result: CompactionResult = { summary, firstKeptEntryId, tokensBefore, details }; + this._deps.emit({ type: "auto_compaction_end", result, aborted: false, willRetry }); + + if (willRetry) { + const messages = this._deps.agent.state.messages; + const lastMsg = messages[messages.length - 1]; + if (lastMsg?.role === "assistant" && (lastMsg as AssistantMessage).stopReason === "error") { + this._deps.agent.replaceMessages(messages.slice(0, -1)); + } + + setTimeout(() => { + this._deps.agent.continue().catch(() => {}); + }, 100); + } else if (this._deps.agent.hasQueuedMessages()) { + setTimeout(() => { + this._deps.agent.continue().catch(() => {}); + }, 100); + } + } catch (error) { + const errorMessage = getErrorMessage(error); + this._deps.emit({ + type: "auto_compaction_end", + result: undefined, + aborted: false, + willRetry: false, + errorMessage: + reason === "overflow" + ? `Context overflow recovery failed: ${errorMessage}` + : `Auto-compaction failed: ${errorMessage}`, + }); + } finally { + this._autoCompactionAbortController = undefined; + } + } +} diff --git a/packages/pi-coding-agent/src/core/retry-handler.ts b/packages/pi-coding-agent/src/core/retry-handler.ts new file mode 100644 index 000000000..f44733086 --- /dev/null +++ b/packages/pi-coding-agent/src/core/retry-handler.ts @@ -0,0 +1,359 @@ +/** + * RetryHandler - Automatic retry logic with exponential backoff and credential/provider fallback. + * + * Handles retryable errors (overloaded, rate limit, server errors) by: + * 1. Trying alternate credentials for the same provider + * 2. Falling back to other providers via FallbackResolver + * 3. Exponential backoff with configurable max retries + * + * Context overflow errors are NOT handled here (see compaction). + */ + +import type { Agent } from "@gsd/pi-agent-core"; +import type { AssistantMessage, Model } from "@gsd/pi-ai"; +import { isContextOverflow } from "@gsd/pi-ai"; +import type { UsageLimitErrorType } from "./auth-storage.js"; +import type { FallbackResolver } from "./fallback-resolver.js"; +import type { ModelRegistry } from "./model-registry.js"; +import type { SettingsManager } from "./settings-manager.js"; +import { sleep } from "../utils/sleep.js"; +import type { AgentSessionEvent } from "./agent-session.js"; + +/** Dependencies injected from AgentSession into RetryHandler */ +export interface RetryHandlerDeps { + readonly agent: Agent; + readonly settingsManager: SettingsManager; + readonly modelRegistry: ModelRegistry; + readonly fallbackResolver: FallbackResolver; + getModel: () => Model | undefined; + getSessionId: () => string; + emit: (event: AgentSessionEvent) => void; + /** Called when the retry handler switches to a fallback model */ + onModelChange: (model: Model) => void; +} + +export class RetryHandler { + private _retryAbortController: AbortController | undefined = undefined; + private _retryAttempt = 0; + private _retryPromise: Promise | undefined = undefined; + private _retryResolve: (() => void) | undefined = undefined; + + constructor(private readonly _deps: RetryHandlerDeps) {} + + /** Current retry attempt (0 if not retrying) */ + get retryAttempt(): number { + return this._retryAttempt; + } + + /** Whether auto-retry is currently in progress */ + get isRetrying(): boolean { + return this._retryPromise !== undefined; + } + + /** Whether auto-retry is enabled */ + get autoRetryEnabled(): boolean { + return this._deps.settingsManager.getRetryEnabled(); + } + + /** Toggle auto-retry setting */ + setAutoRetryEnabled(enabled: boolean): void { + this._deps.settingsManager.setRetryEnabled(enabled); + } + + /** + * Create a retry promise synchronously for agent_end events. + * Must be called synchronously from the agent event handler before + * any async processing, so that waitForRetry() doesn't miss in-flight retries. + */ + createRetryPromiseForAgentEnd(messages: Array<{ role: string } & Record>): void { + if (this._retryPromise) return; + + const settings = this._deps.settingsManager.getRetrySettings(); + if (!settings.enabled) return; + + const lastAssistant = this._findLastAssistantInMessages(messages); + if (!lastAssistant || !this.isRetryableError(lastAssistant)) return; + + this._retryPromise = new Promise((resolve) => { + this._retryResolve = resolve; + }); + } + + /** + * Handle a successful assistant response by resetting retry state. + * Call this when an assistant message completes without error. + */ + handleSuccessfulResponse(): void { + if (this._retryAttempt > 0) { + this._deps.emit({ + type: "auto_retry_end", + success: true, + attempt: this._retryAttempt, + }); + this._retryAttempt = 0; + this._resolveRetry(); + } + } + + /** + * Check if an error is retryable (overloaded, rate limit, server errors). + * Context overflow errors are NOT retryable (handled by compaction instead). + */ + isRetryableError(message: AssistantMessage): boolean { + if (message.stopReason !== "error" || !message.errorMessage) return false; + + // Context overflow is handled by compaction, not retry + const contextWindow = this._deps.getModel()?.contextWindow ?? 0; + if (isContextOverflow(message, contextWindow)) return false; + + const err = message.errorMessage; + return /overloaded|rate.?limit|too many requests|429|500|502|503|504|service.?unavailable|server.?error|internal.?error|connection.?error|connection.?refused|other side closed|fetch failed|upstream.?connect|reset before headers|terminated|retry delay|network.?(?:is\s+)?unavailable|credentials.*expired|temporarily backed off/i.test( + err, + ); + } + + /** + * Handle retryable errors with exponential backoff. + * When multiple credentials are available, marks the failing credential + * as backed off and retries immediately with the next one. + * @returns true if retry was initiated, false if max retries exceeded or disabled + */ + async handleRetryableError(message: AssistantMessage): Promise { + const settings = this._deps.settingsManager.getRetrySettings(); + if (!settings.enabled) { + this._resolveRetry(); + return false; + } + + // Retry promise is created synchronously in createRetryPromiseForAgentEnd. + // Keep a defensive fallback here in case a future refactor bypasses that path. + if (!this._retryPromise) { + this._retryPromise = new Promise((resolve) => { + this._retryResolve = resolve; + }); + } + + // Try credential fallback before counting against retry budget. + if (this._deps.getModel() && message.errorMessage) { + const errorType = this._classifyErrorType(message.errorMessage); + const isCredentialError = errorType !== "unknown"; + const hasAlternate = + isCredentialError && + this._deps.modelRegistry.authStorage.markUsageLimitReached( + this._deps.getModel()!.provider, + this._deps.getSessionId(), + { errorType }, + ); + + if (hasAlternate) { + this._removeLastAssistantError(); + + this._deps.emit({ + type: "auto_retry_start", + attempt: this._retryAttempt + 1, + maxAttempts: settings.maxRetries, + delayMs: 0, + errorMessage: `${message.errorMessage} (switching credential)`, + }); + + // Retry immediately with the next credential - don't increment _retryAttempt + setTimeout(() => { + this._deps.agent.continue().catch(() => {}); + }, 0); + + return true; + } + + // All credentials are backed off. Try cross-provider fallback before giving up. + if (isCredentialError) { + const fallbackResult = await this._deps.fallbackResolver.findFallback( + this._deps.getModel()!, + errorType, + ); + + if (fallbackResult) { + const previousProvider = this._deps.getModel()!.provider; + this._deps.agent.setModel(fallbackResult.model); + this._deps.onModelChange(fallbackResult.model); + this._removeLastAssistantError(); + + this._deps.emit({ + type: "fallback_provider_switch", + from: `${previousProvider}/${this._deps.getModel()?.id}`, + to: `${fallbackResult.model.provider}/${fallbackResult.model.id}`, + reason: fallbackResult.reason, + }); + + this._deps.emit({ + type: "auto_retry_start", + attempt: this._retryAttempt + 1, + maxAttempts: settings.maxRetries, + delayMs: 0, + errorMessage: `${message.errorMessage} (${fallbackResult.reason})`, + }); + + // Retry immediately with fallback provider - don't increment _retryAttempt + setTimeout(() => { + this._deps.agent.continue().catch(() => {}); + }, 0); + + return true; + } + + // No fallback available either + if (errorType === "quota_exhausted") { + this._deps.emit({ + type: "fallback_chain_exhausted", + reason: `All providers exhausted for ${this._deps.getModel()!.provider}/${this._deps.getModel()!.id}`, + }); + this._deps.emit({ + type: "auto_retry_end", + success: false, + attempt: this._retryAttempt, + finalError: message.errorMessage, + }); + this._retryAttempt = 0; + this._resolveRetry(); + return false; + } + } + } + + this._retryAttempt++; + + if (this._retryAttempt > settings.maxRetries) { + this._deps.emit({ + type: "auto_retry_end", + success: false, + attempt: this._retryAttempt - 1, + finalError: message.errorMessage, + }); + this._retryAttempt = 0; + this._resolveRetry(); + return false; + } + + // Use server-requested delay when available, capped by maxDelayMs. + // Fall back to exponential backoff when no server hint is present. + const exponentialDelayMs = settings.baseDelayMs * 2 ** (this._retryAttempt - 1); + let delayMs: number; + if (message.retryAfterMs !== undefined) { + const cap = settings.maxDelayMs > 0 ? settings.maxDelayMs : Infinity; + if (message.retryAfterMs > cap) { + this._deps.emit({ + type: "auto_retry_end", + success: false, + attempt: this._retryAttempt - 1, + finalError: `Rate limit reset in ${Math.ceil(message.retryAfterMs / 1000)}s (max: ${Math.ceil(cap / 1000)}s). ${message.errorMessage || ""}`.trim(), + }); + this._retryAttempt = 0; + this._resolveRetry(); + return false; + } + delayMs = message.retryAfterMs; + } else { + delayMs = exponentialDelayMs; + } + + this._deps.emit({ + type: "auto_retry_start", + attempt: this._retryAttempt, + maxAttempts: settings.maxRetries, + delayMs, + errorMessage: message.errorMessage || "Unknown error", + }); + + this._removeLastAssistantError(); + + // Wait with exponential backoff (abortable) + this._retryAbortController = new AbortController(); + try { + await sleep(delayMs, this._retryAbortController.signal); + } catch { + // Aborted during sleep + const attempt = this._retryAttempt; + this._retryAttempt = 0; + this._retryAbortController = undefined; + this._deps.emit({ + type: "auto_retry_end", + success: false, + attempt, + finalError: "Retry cancelled", + }); + this._resolveRetry(); + return false; + } + this._retryAbortController = undefined; + + // Retry via continue() - use setTimeout to break out of event handler chain + setTimeout(() => { + this._deps.agent.continue().catch(() => {}); + }, 0); + + return true; + } + + /** Cancel in-progress retry */ + abortRetry(): void { + this._retryAbortController?.abort(); + this._resolveRetry(); + } + + /** + * Wait for any in-progress retry to complete. + * Returns immediately if no retry is in progress. + */ + async waitForRetry(): Promise { + if (this._retryPromise) { + await this._retryPromise; + } + } + + /** Resolve the pending retry promise */ + resolveRetry(): void { + this._resolveRetry(); + } + + // ========================================================================= + // Private helpers + // ========================================================================= + + private _resolveRetry(): void { + if (this._retryResolve) { + this._retryResolve(); + this._retryResolve = undefined; + this._retryPromise = undefined; + } + } + + private _findLastAssistantInMessages( + messages: Array<{ role: string } & Record>, + ): AssistantMessage | undefined { + for (let i = messages.length - 1; i >= 0; i--) { + const message = messages[i]; + if (message.role === "assistant") { + return message as AssistantMessage; + } + } + return undefined; + } + + /** + * Classify an error message into a usage-limit error type for credential backoff. + */ + private _classifyErrorType(errorMessage: string): UsageLimitErrorType { + const err = errorMessage.toLowerCase(); + if (/quota|billing|exceeded.*limit|usage.*limit/i.test(err)) return "quota_exhausted"; + if (/rate.?limit|too many requests|429/i.test(err)) return "rate_limit"; + if (/500|502|503|504|server.?error|internal.?error|service.?unavailable/i.test(err)) return "server_error"; + return "unknown"; + } + + /** Remove the last assistant error message from agent state */ + private _removeLastAssistantError(): void { + const messages = this._deps.agent.state.messages; + if (messages.length > 0 && messages[messages.length - 1].role === "assistant") { + this._deps.agent.replaceMessages(messages.slice(0, -1)); + } + } +} diff --git a/packages/pi-coding-agent/src/utils/error.ts b/packages/pi-coding-agent/src/utils/error.ts new file mode 100644 index 000000000..6fe04cb09 --- /dev/null +++ b/packages/pi-coding-agent/src/utils/error.ts @@ -0,0 +1,6 @@ +/** + * Extract a human-readable message from an unknown caught value. + */ +export function getErrorMessage(err: unknown): string { + return err instanceof Error ? err.message : String(err); +}