refactor: extract retry handler and compaction orchestrator from agent-session
Extract two self-contained subsystems from agent-session.ts (3,367 -> 2,737 lines): - RetryHandler: auto-retry with exponential backoff, credential rotation, and cross-provider fallback logic - CompactionOrchestrator: manual/auto compaction, overflow recovery, and extension integration for custom compaction providers Also add shared getErrorMessage() utility to replace repeated `err instanceof Error ? err.message : String(err)` patterns. The extracted modules receive AgentSession state via dependency injection interfaces, avoiding state duplication. AgentSession remains the coordinator that delegates to these modules.
This commit is contained in:
parent
eaf0538150
commit
3090f968f4
4 changed files with 864 additions and 705 deletions
|
|
@ -24,22 +24,20 @@ import type {
|
|||
ThinkingLevel,
|
||||
} from "@gsd/pi-agent-core";
|
||||
import type { AssistantMessage, ImageContent, Message, Model, TextContent } from "@gsd/pi-ai";
|
||||
import { isContextOverflow, modelsAreEqual, resetApiProviders, supportsXhigh } from "@gsd/pi-ai";
|
||||
import { modelsAreEqual, resetApiProviders, supportsXhigh } from "@gsd/pi-ai";
|
||||
import { getDocsPath } from "../config.js";
|
||||
import { getErrorMessage } from "../utils/error.js";
|
||||
import { theme } from "../modes/interactive/theme/theme.js";
|
||||
import { stripFrontmatter } from "../utils/frontmatter.js";
|
||||
import { sleep } from "../utils/sleep.js";
|
||||
import { type BashResult, executeBash as executeBashCommand, executeBashWithOperations } from "./bash-executor.js";
|
||||
import {
|
||||
type CompactionResult,
|
||||
calculateContextTokens,
|
||||
collectEntriesForBranchSummary,
|
||||
compact,
|
||||
estimateContextTokens,
|
||||
generateBranchSummary,
|
||||
prepareCompaction,
|
||||
shouldCompact,
|
||||
} from "./compaction/index.js";
|
||||
import { CompactionOrchestrator } from "./compaction-orchestrator.js";
|
||||
import { DEFAULT_THINKING_LEVEL } from "./defaults.js";
|
||||
import { exportSessionToHtml, type ToolHtmlRenderer } from "./export-html/index.js";
|
||||
import { createToolHtmlRenderer } from "./export-html/tool-renderer.js";
|
||||
|
|
@ -53,7 +51,6 @@ import {
|
|||
type MessageEndEvent,
|
||||
type MessageStartEvent,
|
||||
type MessageUpdateEvent,
|
||||
type SessionBeforeCompactResult,
|
||||
type SessionBeforeForkResult,
|
||||
type SessionBeforeSwitchResult,
|
||||
type SessionBeforeTreeResult,
|
||||
|
|
@ -73,7 +70,8 @@ import { FallbackResolver } from "./fallback-resolver.js";
|
|||
import type { ModelRegistry } from "./model-registry.js";
|
||||
import { expandPromptTemplate, type PromptTemplate } from "./prompt-templates.js";
|
||||
import type { ResourceExtensionPaths, ResourceLoader } from "./resource-loader.js";
|
||||
import type { BranchSummaryEntry, CompactionEntry, SessionManager } from "./session-manager.js";
|
||||
import { RetryHandler } from "./retry-handler.js";
|
||||
import type { BranchSummaryEntry, SessionManager } from "./session-manager.js";
|
||||
import { getLatestCompactionEntry } from "./session-manager.js";
|
||||
import type { SettingsManager } from "./settings-manager.js";
|
||||
import { BUILTIN_SLASH_COMMANDS, type SlashCommandInfo, type SlashCommandLocation } from "./slash-commands.js";
|
||||
|
|
@ -232,19 +230,9 @@ export class AgentSession {
|
|||
/** Messages queued to be included with the next user prompt as context ("asides"). */
|
||||
private _pendingNextTurnMessages: CustomMessage[] = [];
|
||||
|
||||
// Compaction state
|
||||
private _compactionAbortController: AbortController | undefined = undefined;
|
||||
private _autoCompactionAbortController: AbortController | undefined = undefined;
|
||||
private _overflowRecoveryAttempted = false;
|
||||
|
||||
// Branch summarization state
|
||||
private _branchSummaryAbortController: AbortController | undefined = undefined;
|
||||
|
||||
// Retry state
|
||||
private _retryAbortController: AbortController | undefined = undefined;
|
||||
private _retryAttempt = 0;
|
||||
private _retryPromise: Promise<void> | undefined = undefined;
|
||||
private _retryResolve: (() => void) | undefined = undefined;
|
||||
// Delegated subsystems
|
||||
private _retryHandler: RetryHandler;
|
||||
private _compactionOrchestrator: CompactionOrchestrator;
|
||||
|
||||
// Bash execution state
|
||||
private _bashAbortController: AbortController | undefined = undefined;
|
||||
|
|
@ -299,6 +287,32 @@ export class AgentSession {
|
|||
this._initialActiveToolNames = config.initialActiveToolNames;
|
||||
this._baseToolsOverride = config.baseToolsOverride;
|
||||
|
||||
// Initialize delegated subsystems
|
||||
this._retryHandler = new RetryHandler({
|
||||
agent: this.agent,
|
||||
settingsManager: this.settingsManager,
|
||||
modelRegistry: this._modelRegistry,
|
||||
fallbackResolver: this._fallbackResolver,
|
||||
getModel: () => this.model,
|
||||
getSessionId: () => this.sessionId,
|
||||
emit: (event) => this._emit(event),
|
||||
onModelChange: (model) => this.sessionManager.appendModelChange(model.provider, model.id),
|
||||
});
|
||||
|
||||
this._compactionOrchestrator = new CompactionOrchestrator({
|
||||
agent: this.agent,
|
||||
sessionManager: this.sessionManager,
|
||||
settingsManager: this.settingsManager,
|
||||
modelRegistry: this._modelRegistry,
|
||||
getModel: () => this.model,
|
||||
getSessionId: () => this.sessionId,
|
||||
getExtensionRunner: () => this._extensionRunner,
|
||||
emit: (event) => this._emit(event),
|
||||
disconnectFromAgent: () => this._disconnectFromAgent(),
|
||||
reconnectToAgent: () => this._reconnectToAgent(),
|
||||
abort: () => this.abort(),
|
||||
});
|
||||
|
||||
// Always subscribe to agent events for internal handling
|
||||
// (session persistence, extensions, auto-compaction, retry logic)
|
||||
this._unsubscribeAgent = this.agent.subscribe(this._handleAgentEvent);
|
||||
|
|
@ -342,7 +356,7 @@ export class AgentSession {
|
|||
private _handleAgentEvent = (event: AgentEvent): void => {
|
||||
// Create retry promise synchronously before queueing async processing.
|
||||
// Agent.emit() calls this handler synchronously, and prompt() calls waitForRetry()
|
||||
// as soon as agent.prompt() resolves. If _retryPromise is created only inside
|
||||
// as soon as agent.prompt() resolves. If the retry promise is created only inside
|
||||
// _processAgentEvent, slow earlier queued events can delay agent_end processing
|
||||
// and waitForRetry() can miss the in-flight retry.
|
||||
this._createRetryPromiseForAgentEnd(event);
|
||||
|
|
@ -357,40 +371,15 @@ export class AgentSession {
|
|||
};
|
||||
|
||||
private _createRetryPromiseForAgentEnd(event: AgentEvent): void {
|
||||
if (event.type !== "agent_end" || this._retryPromise) {
|
||||
return;
|
||||
}
|
||||
|
||||
const settings = this.settingsManager.getRetrySettings();
|
||||
if (!settings.enabled) {
|
||||
return;
|
||||
}
|
||||
|
||||
const lastAssistant = this._findLastAssistantInMessages(event.messages);
|
||||
if (!lastAssistant || !this._isRetryableError(lastAssistant)) {
|
||||
return;
|
||||
}
|
||||
|
||||
this._retryPromise = new Promise((resolve) => {
|
||||
this._retryResolve = resolve;
|
||||
});
|
||||
}
|
||||
|
||||
private _findLastAssistantInMessages(messages: AgentMessage[]): AssistantMessage | undefined {
|
||||
for (let i = messages.length - 1; i >= 0; i--) {
|
||||
const message = messages[i];
|
||||
if (message.role === "assistant") {
|
||||
return message as AssistantMessage;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
if (event.type !== "agent_end") return;
|
||||
this._retryHandler.createRetryPromiseForAgentEnd(event.messages);
|
||||
}
|
||||
|
||||
private async _processAgentEvent(event: AgentEvent): Promise<void> {
|
||||
// When a user message starts, check if it's from either queue and remove it BEFORE emitting
|
||||
// This ensures the UI sees the updated queue state
|
||||
if (event.type === "message_start" && event.message.role === "user") {
|
||||
this._overflowRecoveryAttempted = false;
|
||||
this._compactionOrchestrator.resetOverflowRecovery();
|
||||
const messageText = this._getUserMessageText(event.message);
|
||||
if (messageText) {
|
||||
// Check steering queue first
|
||||
|
|
@ -440,19 +429,13 @@ export class AgentSession {
|
|||
|
||||
const assistantMsg = event.message as AssistantMessage;
|
||||
if (assistantMsg.stopReason !== "error") {
|
||||
this._overflowRecoveryAttempted = false;
|
||||
this._compactionOrchestrator.clearOverflowRecovery();
|
||||
}
|
||||
|
||||
// Reset retry counter immediately on successful assistant response
|
||||
// This prevents accumulation across multiple LLM calls within a turn
|
||||
if (assistantMsg.stopReason !== "error" && this._retryAttempt > 0) {
|
||||
this._emit({
|
||||
type: "auto_retry_end",
|
||||
success: true,
|
||||
attempt: this._retryAttempt,
|
||||
});
|
||||
this._retryAttempt = 0;
|
||||
this._resolveRetry();
|
||||
if (assistantMsg.stopReason !== "error") {
|
||||
this._retryHandler.handleSuccessfulResponse();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -463,21 +446,12 @@ export class AgentSession {
|
|||
this._lastAssistantMessage = undefined;
|
||||
|
||||
// Check for retryable errors first (overloaded, rate limit, server errors)
|
||||
if (this._isRetryableError(msg)) {
|
||||
const didRetry = await this._handleRetryableError(msg);
|
||||
if (this._retryHandler.isRetryableError(msg)) {
|
||||
const didRetry = await this._retryHandler.handleRetryableError(msg);
|
||||
if (didRetry) return; // Retry was initiated, don't proceed to compaction
|
||||
}
|
||||
|
||||
await this._checkCompaction(msg);
|
||||
}
|
||||
}
|
||||
|
||||
/** Resolve the pending retry promise */
|
||||
private _resolveRetry(): void {
|
||||
if (this._retryResolve) {
|
||||
this._retryResolve();
|
||||
this._retryResolve = undefined;
|
||||
this._retryPromise = undefined;
|
||||
await this._compactionOrchestrator.checkCompaction(msg);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -512,10 +486,7 @@ export class AgentSession {
|
|||
};
|
||||
}
|
||||
} catch (err) {
|
||||
if (err instanceof Error) {
|
||||
return { block: true, reason: err.message };
|
||||
}
|
||||
return { block: true, reason: `Extension failed, blocking execution: ${String(err)}` };
|
||||
return { block: true, reason: err instanceof Error ? err.message : `Extension failed, blocking execution: ${String(err)}` };
|
||||
}
|
||||
|
||||
return undefined;
|
||||
|
|
@ -720,7 +691,7 @@ export class AgentSession {
|
|||
|
||||
/** Current retry attempt (0 if not retrying) */
|
||||
get retryAttempt(): number {
|
||||
return this._retryAttempt;
|
||||
return this._retryHandler.retryAttempt;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -767,11 +738,7 @@ export class AgentSession {
|
|||
|
||||
/** Whether compaction or branch summarization is currently running */
|
||||
get isCompacting(): boolean {
|
||||
return (
|
||||
this._autoCompactionAbortController !== undefined ||
|
||||
this._compactionAbortController !== undefined ||
|
||||
this._branchSummaryAbortController !== undefined
|
||||
);
|
||||
return this._compactionOrchestrator.isCompacting;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -1025,7 +992,7 @@ export class AgentSession {
|
|||
// Check if we need to compact before sending (catches aborted responses)
|
||||
const lastAssistant = this._findLastAssistantMessage();
|
||||
if (lastAssistant) {
|
||||
await this._checkCompaction(lastAssistant, false);
|
||||
await this._compactionOrchestrator.checkCompaction(lastAssistant, false);
|
||||
}
|
||||
|
||||
// Build messages array (custom message if any, then user message)
|
||||
|
|
@ -1078,7 +1045,7 @@ export class AgentSession {
|
|||
}
|
||||
|
||||
await this.agent.prompt(messages);
|
||||
await this.waitForRetry();
|
||||
await this._retryHandler.waitForRetry();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -1106,7 +1073,7 @@ export class AgentSession {
|
|||
this._extensionRunner.emitError({
|
||||
extensionPath: `command:${commandName}`,
|
||||
event: "command",
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
error: getErrorMessage(err),
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
|
@ -1137,7 +1104,7 @@ export class AgentSession {
|
|||
this._extensionRunner?.emitError({
|
||||
extensionPath: skill.filePath,
|
||||
event: "skill_expansion",
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
error: getErrorMessage(err),
|
||||
});
|
||||
return text; // Return original on error
|
||||
}
|
||||
|
|
@ -1356,7 +1323,7 @@ export class AgentSession {
|
|||
* Abort current operation and wait for agent to become idle.
|
||||
*/
|
||||
async abort(): Promise<void> {
|
||||
this.abortRetry();
|
||||
this._retryHandler.abortRetry();
|
||||
this.agent.abort();
|
||||
await this.agent.waitForIdle();
|
||||
// Ensure agent_end is emitted even when abort interrupts a tool call (#1414).
|
||||
|
|
@ -1712,373 +1679,27 @@ export class AgentSession {
|
|||
* @param customInstructions Optional instructions for the compaction summary
|
||||
*/
|
||||
async compact(customInstructions?: string): Promise<CompactionResult> {
|
||||
this._disconnectFromAgent();
|
||||
await this.abort();
|
||||
this._compactionAbortController = new AbortController();
|
||||
|
||||
try {
|
||||
if (!this.model) {
|
||||
throw new Error("No model selected");
|
||||
}
|
||||
|
||||
const apiKey = await this._modelRegistry.getApiKey(this.model, this.sessionId);
|
||||
if (!apiKey) {
|
||||
throw new Error(`No API key for ${this.model.provider}`);
|
||||
}
|
||||
|
||||
const pathEntries = this.sessionManager.getBranch();
|
||||
const settings = this.settingsManager.getCompactionSettings();
|
||||
|
||||
const preparation = prepareCompaction(pathEntries, settings);
|
||||
if (!preparation) {
|
||||
// Check why we can't compact
|
||||
const lastEntry = pathEntries[pathEntries.length - 1];
|
||||
if (lastEntry?.type === "compaction") {
|
||||
throw new Error("Already compacted");
|
||||
}
|
||||
throw new Error("Nothing to compact (session too small)");
|
||||
}
|
||||
|
||||
let extensionCompaction: CompactionResult | undefined;
|
||||
let fromExtension = false;
|
||||
|
||||
if (this._extensionRunner?.hasHandlers("session_before_compact")) {
|
||||
const result = (await this._extensionRunner.emit({
|
||||
type: "session_before_compact",
|
||||
preparation,
|
||||
branchEntries: pathEntries,
|
||||
customInstructions,
|
||||
signal: this._compactionAbortController.signal,
|
||||
})) as SessionBeforeCompactResult | undefined;
|
||||
|
||||
if (result?.cancel) {
|
||||
throw new Error("Compaction cancelled");
|
||||
}
|
||||
|
||||
if (result?.compaction) {
|
||||
extensionCompaction = result.compaction;
|
||||
fromExtension = true;
|
||||
}
|
||||
}
|
||||
|
||||
let summary: string;
|
||||
let firstKeptEntryId: string;
|
||||
let tokensBefore: number;
|
||||
let details: unknown;
|
||||
|
||||
if (extensionCompaction) {
|
||||
// Extension provided compaction content
|
||||
summary = extensionCompaction.summary;
|
||||
firstKeptEntryId = extensionCompaction.firstKeptEntryId;
|
||||
tokensBefore = extensionCompaction.tokensBefore;
|
||||
details = extensionCompaction.details;
|
||||
} else {
|
||||
// Generate compaction result
|
||||
const result = await compact(
|
||||
preparation,
|
||||
this.model,
|
||||
apiKey,
|
||||
customInstructions,
|
||||
this._compactionAbortController.signal,
|
||||
);
|
||||
summary = result.summary;
|
||||
firstKeptEntryId = result.firstKeptEntryId;
|
||||
tokensBefore = result.tokensBefore;
|
||||
details = result.details;
|
||||
}
|
||||
|
||||
if (this._compactionAbortController.signal.aborted) {
|
||||
throw new Error("Compaction cancelled");
|
||||
}
|
||||
|
||||
this.sessionManager.appendCompaction(summary, firstKeptEntryId, tokensBefore, details, fromExtension);
|
||||
const newEntries = this.sessionManager.getEntries();
|
||||
const sessionContext = this.sessionManager.buildSessionContext();
|
||||
this.agent.replaceMessages(sessionContext.messages);
|
||||
|
||||
// Get the saved compaction entry for the extension event
|
||||
const savedCompactionEntry = newEntries.find((e) => e.type === "compaction" && e.summary === summary) as
|
||||
| CompactionEntry
|
||||
| undefined;
|
||||
|
||||
if (this._extensionRunner && savedCompactionEntry) {
|
||||
await this._extensionRunner.emit({
|
||||
type: "session_compact",
|
||||
compactionEntry: savedCompactionEntry,
|
||||
fromExtension,
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
summary,
|
||||
firstKeptEntryId,
|
||||
tokensBefore,
|
||||
details,
|
||||
};
|
||||
} finally {
|
||||
this._compactionAbortController = undefined;
|
||||
this._reconnectToAgent();
|
||||
}
|
||||
return this._compactionOrchestrator.compact(customInstructions);
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel in-progress compaction (manual or auto).
|
||||
*/
|
||||
/** Cancel in-progress compaction (manual or auto) */
|
||||
abortCompaction(): void {
|
||||
this._compactionAbortController?.abort();
|
||||
this._autoCompactionAbortController?.abort();
|
||||
this._compactionOrchestrator.abortCompaction();
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel in-progress branch summarization.
|
||||
*/
|
||||
/** Cancel in-progress branch summarization */
|
||||
abortBranchSummary(): void {
|
||||
this._branchSummaryAbortController?.abort();
|
||||
this._compactionOrchestrator.abortBranchSummary();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if compaction is needed and run it.
|
||||
* Called after agent_end and before prompt submission.
|
||||
*
|
||||
* Two cases:
|
||||
* 1. Overflow: LLM returned context overflow error, remove error message from agent state, compact, auto-retry
|
||||
* 2. Threshold: Context over threshold, compact, NO auto-retry (user continues manually)
|
||||
*
|
||||
* @param assistantMessage The assistant message to check
|
||||
* @param skipAbortedCheck If false, include aborted messages (for pre-prompt check). Default: true
|
||||
*/
|
||||
private async _checkCompaction(assistantMessage: AssistantMessage, skipAbortedCheck = true): Promise<void> {
|
||||
const settings = this.settingsManager.getCompactionSettings();
|
||||
if (!settings.enabled) return;
|
||||
|
||||
// Skip if message was aborted (user cancelled) - unless skipAbortedCheck is false
|
||||
if (skipAbortedCheck && assistantMessage.stopReason === "aborted") return;
|
||||
|
||||
const contextWindow = this.model?.contextWindow ?? 0;
|
||||
|
||||
// Skip overflow check if the message came from a different model.
|
||||
// This handles the case where user switched from a smaller-context model (e.g. opus)
|
||||
// to a larger-context model (e.g. codex) - the overflow error from the old model
|
||||
// shouldn't trigger compaction for the new model.
|
||||
const sameModel =
|
||||
this.model && assistantMessage.provider === this.model.provider && assistantMessage.model === this.model.id;
|
||||
|
||||
// Skip compaction checks if this assistant message is older than the latest
|
||||
// compaction boundary. This prevents a stale pre-compaction usage/error
|
||||
// from retriggering compaction on the first prompt after compaction.
|
||||
const compactionEntry = getLatestCompactionEntry(this.sessionManager.getBranch());
|
||||
const assistantIsFromBeforeCompaction =
|
||||
compactionEntry !== null && assistantMessage.timestamp <= new Date(compactionEntry.timestamp).getTime();
|
||||
if (assistantIsFromBeforeCompaction) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Case 1: Overflow - LLM returned context overflow error
|
||||
if (sameModel && isContextOverflow(assistantMessage, contextWindow)) {
|
||||
if (this._overflowRecoveryAttempted) {
|
||||
this._emit({
|
||||
type: "auto_compaction_end",
|
||||
result: undefined,
|
||||
aborted: false,
|
||||
willRetry: false,
|
||||
errorMessage:
|
||||
"Context overflow recovery failed after one compact-and-retry attempt. Try reducing context or switching to a larger-context model.",
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
this._overflowRecoveryAttempted = true;
|
||||
// Remove the error message from agent state (it IS saved to session for history,
|
||||
// but we don't want it in context for the retry)
|
||||
const messages = this.agent.state.messages;
|
||||
if (messages.length > 0 && messages[messages.length - 1].role === "assistant") {
|
||||
this.agent.replaceMessages(messages.slice(0, -1));
|
||||
}
|
||||
await this._runAutoCompaction("overflow", true);
|
||||
return;
|
||||
}
|
||||
|
||||
// Case 2: Threshold - context is getting large
|
||||
// For error messages (no usage data), estimate from last successful response.
|
||||
// This ensures sessions that hit persistent API errors (e.g. 529) can still compact.
|
||||
let contextTokens: number;
|
||||
if (assistantMessage.stopReason === "error") {
|
||||
const messages = this.agent.state.messages;
|
||||
const estimate = estimateContextTokens(messages);
|
||||
if (estimate.lastUsageIndex === null) return; // No usage data at all
|
||||
// Verify the usage source is post-compaction. Kept pre-compaction messages
|
||||
// have stale usage reflecting the old (larger) context and would falsely
|
||||
// trigger compaction right after one just finished.
|
||||
const usageMsg = messages[estimate.lastUsageIndex];
|
||||
if (
|
||||
compactionEntry &&
|
||||
usageMsg.role === "assistant" &&
|
||||
(usageMsg as AssistantMessage).timestamp <= new Date(compactionEntry.timestamp).getTime()
|
||||
) {
|
||||
return;
|
||||
}
|
||||
contextTokens = estimate.tokens;
|
||||
} else {
|
||||
contextTokens = calculateContextTokens(assistantMessage.usage);
|
||||
}
|
||||
if (shouldCompact(contextTokens, contextWindow, settings)) {
|
||||
await this._runAutoCompaction("threshold", false);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal: Run auto-compaction with events.
|
||||
*/
|
||||
private async _runAutoCompaction(reason: "overflow" | "threshold", willRetry: boolean): Promise<void> {
|
||||
const settings = this.settingsManager.getCompactionSettings();
|
||||
|
||||
this._emit({ type: "auto_compaction_start", reason });
|
||||
this._autoCompactionAbortController = new AbortController();
|
||||
|
||||
try {
|
||||
if (!this.model) {
|
||||
this._emit({ type: "auto_compaction_end", result: undefined, aborted: false, willRetry: false });
|
||||
return;
|
||||
}
|
||||
|
||||
const apiKey = await this._modelRegistry.getApiKey(this.model, this.sessionId);
|
||||
if (!apiKey) {
|
||||
this._emit({ type: "auto_compaction_end", result: undefined, aborted: false, willRetry: false });
|
||||
return;
|
||||
}
|
||||
|
||||
const pathEntries = this.sessionManager.getBranch();
|
||||
|
||||
const preparation = prepareCompaction(pathEntries, settings);
|
||||
if (!preparation) {
|
||||
this._emit({ type: "auto_compaction_end", result: undefined, aborted: false, willRetry: false });
|
||||
return;
|
||||
}
|
||||
|
||||
let extensionCompaction: CompactionResult | undefined;
|
||||
let fromExtension = false;
|
||||
|
||||
if (this._extensionRunner?.hasHandlers("session_before_compact")) {
|
||||
const extensionResult = (await this._extensionRunner.emit({
|
||||
type: "session_before_compact",
|
||||
preparation,
|
||||
branchEntries: pathEntries,
|
||||
customInstructions: undefined,
|
||||
signal: this._autoCompactionAbortController.signal,
|
||||
})) as SessionBeforeCompactResult | undefined;
|
||||
|
||||
if (extensionResult?.cancel) {
|
||||
this._emit({ type: "auto_compaction_end", result: undefined, aborted: true, willRetry: false });
|
||||
return;
|
||||
}
|
||||
|
||||
if (extensionResult?.compaction) {
|
||||
extensionCompaction = extensionResult.compaction;
|
||||
fromExtension = true;
|
||||
}
|
||||
}
|
||||
|
||||
let summary: string;
|
||||
let firstKeptEntryId: string;
|
||||
let tokensBefore: number;
|
||||
let details: unknown;
|
||||
|
||||
if (extensionCompaction) {
|
||||
// Extension provided compaction content
|
||||
summary = extensionCompaction.summary;
|
||||
firstKeptEntryId = extensionCompaction.firstKeptEntryId;
|
||||
tokensBefore = extensionCompaction.tokensBefore;
|
||||
details = extensionCompaction.details;
|
||||
} else {
|
||||
// Generate compaction result
|
||||
const compactResult = await compact(
|
||||
preparation,
|
||||
this.model,
|
||||
apiKey,
|
||||
undefined,
|
||||
this._autoCompactionAbortController.signal,
|
||||
);
|
||||
summary = compactResult.summary;
|
||||
firstKeptEntryId = compactResult.firstKeptEntryId;
|
||||
tokensBefore = compactResult.tokensBefore;
|
||||
details = compactResult.details;
|
||||
}
|
||||
|
||||
if (this._autoCompactionAbortController.signal.aborted) {
|
||||
this._emit({ type: "auto_compaction_end", result: undefined, aborted: true, willRetry: false });
|
||||
return;
|
||||
}
|
||||
|
||||
this.sessionManager.appendCompaction(summary, firstKeptEntryId, tokensBefore, details, fromExtension);
|
||||
const newEntries = this.sessionManager.getEntries();
|
||||
const sessionContext = this.sessionManager.buildSessionContext();
|
||||
this.agent.replaceMessages(sessionContext.messages);
|
||||
|
||||
// Get the saved compaction entry for the extension event
|
||||
const savedCompactionEntry = newEntries.find((e) => e.type === "compaction" && e.summary === summary) as
|
||||
| CompactionEntry
|
||||
| undefined;
|
||||
|
||||
if (this._extensionRunner && savedCompactionEntry) {
|
||||
await this._extensionRunner.emit({
|
||||
type: "session_compact",
|
||||
compactionEntry: savedCompactionEntry,
|
||||
fromExtension,
|
||||
});
|
||||
}
|
||||
|
||||
const result: CompactionResult = {
|
||||
summary,
|
||||
firstKeptEntryId,
|
||||
tokensBefore,
|
||||
details,
|
||||
};
|
||||
this._emit({ type: "auto_compaction_end", result, aborted: false, willRetry });
|
||||
|
||||
if (willRetry) {
|
||||
const messages = this.agent.state.messages;
|
||||
const lastMsg = messages[messages.length - 1];
|
||||
if (lastMsg?.role === "assistant" && (lastMsg as AssistantMessage).stopReason === "error") {
|
||||
this.agent.replaceMessages(messages.slice(0, -1));
|
||||
}
|
||||
|
||||
setTimeout(() => {
|
||||
this.agent.continue().catch(() => {});
|
||||
}, 100);
|
||||
} else if (this.agent.hasQueuedMessages()) {
|
||||
// Auto-compaction can complete while follow-up/steering/custom messages are waiting.
|
||||
// Kick the loop so queued messages are actually delivered.
|
||||
setTimeout(() => {
|
||||
this.agent.continue().catch(() => {});
|
||||
}, 100);
|
||||
}
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : "compaction failed";
|
||||
this._emit({
|
||||
type: "auto_compaction_end",
|
||||
result: undefined,
|
||||
aborted: false,
|
||||
willRetry: false,
|
||||
errorMessage:
|
||||
reason === "overflow"
|
||||
? `Context overflow recovery failed: ${errorMessage}`
|
||||
: `Auto-compaction failed: ${errorMessage}`,
|
||||
});
|
||||
} finally {
|
||||
this._autoCompactionAbortController = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Toggle auto-compaction setting.
|
||||
*/
|
||||
/** Toggle auto-compaction setting */
|
||||
setAutoCompactionEnabled(enabled: boolean): void {
|
||||
this.settingsManager.setCompactionEnabled(enabled);
|
||||
this._compactionOrchestrator.setAutoCompactionEnabled(enabled);
|
||||
}
|
||||
|
||||
/** Whether auto-compaction is enabled */
|
||||
get autoCompactionEnabled(): boolean {
|
||||
return this.settingsManager.getCompactionEnabled();
|
||||
return this._compactionOrchestrator.autoCompactionEnabled;
|
||||
}
|
||||
|
||||
async bindExtensions(bindings: ExtensionBindings): Promise<void> {
|
||||
|
|
@ -2212,7 +1833,7 @@ export class AgentSession {
|
|||
runner.emitError({
|
||||
extensionPath: "<runtime>",
|
||||
event: "send_message",
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
error: getErrorMessage(err),
|
||||
});
|
||||
});
|
||||
},
|
||||
|
|
@ -2221,7 +1842,7 @@ export class AgentSession {
|
|||
runner.emitError({
|
||||
extensionPath: "<runtime>",
|
||||
event: "send_user_message",
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
error: getErrorMessage(err),
|
||||
});
|
||||
});
|
||||
},
|
||||
|
|
@ -2234,7 +1855,7 @@ export class AgentSession {
|
|||
runner.emitError({
|
||||
extensionPath: "<runtime>",
|
||||
event: "retry_last_turn",
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
error: getErrorMessage(err),
|
||||
});
|
||||
});
|
||||
}
|
||||
|
|
@ -2434,278 +2055,27 @@ export class AgentSession {
|
|||
}
|
||||
|
||||
// =========================================================================
|
||||
// Auto-Retry
|
||||
// Auto-Retry (delegated to RetryHandler)
|
||||
// =========================================================================
|
||||
|
||||
/**
|
||||
* Check if an error is retryable (overloaded, rate limit, server errors).
|
||||
* Context overflow errors are NOT retryable (handled by compaction instead).
|
||||
*/
|
||||
private _isRetryableError(message: AssistantMessage): boolean {
|
||||
if (message.stopReason !== "error" || !message.errorMessage) return false;
|
||||
|
||||
// Context overflow is handled by compaction, not retry
|
||||
const contextWindow = this.model?.contextWindow ?? 0;
|
||||
if (isContextOverflow(message, contextWindow)) return false;
|
||||
|
||||
const err = message.errorMessage;
|
||||
// Match: overloaded_error, rate limit, 429, 500, 502, 503, 504, service unavailable, connection errors, fetch failed, terminated, retry delay exceeded, network unavailable / auth expired (transient network failures)
|
||||
return /overloaded|rate.?limit|too many requests|429|500|502|503|504|service.?unavailable|server.?error|internal.?error|connection.?error|connection.?refused|other side closed|fetch failed|upstream.?connect|reset before headers|terminated|retry delay|network.?(?:is\s+)?unavailable|credentials.*expired|temporarily backed off/i.test(
|
||||
err,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Classify an error message into a usage-limit error type for credential backoff.
|
||||
*/
|
||||
private _classifyErrorType(errorMessage: string): import("./auth-storage.js").UsageLimitErrorType {
|
||||
const err = errorMessage.toLowerCase();
|
||||
if (/quota|billing|exceeded.*limit|usage.*limit/i.test(err)) return "quota_exhausted";
|
||||
if (/rate.?limit|too many requests|429/i.test(err)) return "rate_limit";
|
||||
if (/500|502|503|504|server.?error|internal.?error|service.?unavailable/i.test(err)) return "server_error";
|
||||
return "unknown";
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle retryable errors with exponential backoff.
|
||||
* When multiple credentials are available, marks the failing credential
|
||||
* as backed off and retries immediately with the next one.
|
||||
* @returns true if retry was initiated, false if max retries exceeded or disabled
|
||||
*/
|
||||
private async _handleRetryableError(message: AssistantMessage): Promise<boolean> {
|
||||
const settings = this.settingsManager.getRetrySettings();
|
||||
if (!settings.enabled) {
|
||||
this._resolveRetry();
|
||||
return false;
|
||||
}
|
||||
|
||||
// Retry promise is created synchronously in _handleAgentEvent for agent_end.
|
||||
// Keep a defensive fallback here in case a future refactor bypasses that path.
|
||||
if (!this._retryPromise) {
|
||||
this._retryPromise = new Promise((resolve) => {
|
||||
this._retryResolve = resolve;
|
||||
});
|
||||
}
|
||||
|
||||
// Try credential fallback before counting against retry budget.
|
||||
// If another credential is available, switch to it and retry immediately.
|
||||
// Only attempt credential rotation for errors that indicate a credential-level
|
||||
// problem (rate limit, quota exhaustion, server error). Transport failures
|
||||
// ("unknown") like connection resets are not credential-specific — rotating
|
||||
// won't help and backing off the only credential causes "Authentication failed".
|
||||
if (this.model && message.errorMessage) {
|
||||
const errorType = this._classifyErrorType(message.errorMessage);
|
||||
const isCredentialError = errorType !== "unknown";
|
||||
const hasAlternate = isCredentialError && this._modelRegistry.authStorage.markUsageLimitReached(
|
||||
this.model.provider,
|
||||
this.sessionId,
|
||||
{ errorType },
|
||||
);
|
||||
|
||||
if (hasAlternate) {
|
||||
// Remove error message from agent state
|
||||
const messages = this.agent.state.messages;
|
||||
if (messages.length > 0 && messages[messages.length - 1].role === "assistant") {
|
||||
this.agent.replaceMessages(messages.slice(0, -1));
|
||||
}
|
||||
|
||||
this._emit({
|
||||
type: "auto_retry_start",
|
||||
attempt: this._retryAttempt + 1,
|
||||
maxAttempts: settings.maxRetries,
|
||||
delayMs: 0,
|
||||
errorMessage: `${message.errorMessage} (switching credential)`,
|
||||
});
|
||||
|
||||
// Retry immediately with the next credential - don't increment _retryAttempt
|
||||
setTimeout(() => {
|
||||
this.agent.continue().catch(() => {
|
||||
// Retry failed - will be caught by next agent_end
|
||||
});
|
||||
}, 0);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// All credentials are backed off. Try cross-provider fallback before giving up.
|
||||
if (isCredentialError) {
|
||||
const fallbackResult = await this._fallbackResolver.findFallback(
|
||||
this.model,
|
||||
errorType,
|
||||
);
|
||||
|
||||
if (fallbackResult) {
|
||||
// Swap to fallback model — don't persist to settings
|
||||
const previousProvider = this.model.provider;
|
||||
this.agent.setModel(fallbackResult.model);
|
||||
this.sessionManager.appendModelChange(fallbackResult.model.provider, fallbackResult.model.id);
|
||||
|
||||
// Remove error message from agent state
|
||||
const msgs = this.agent.state.messages;
|
||||
if (msgs.length > 0 && msgs[msgs.length - 1].role === "assistant") {
|
||||
this.agent.replaceMessages(msgs.slice(0, -1));
|
||||
}
|
||||
|
||||
this._emit({
|
||||
type: "fallback_provider_switch",
|
||||
from: `${previousProvider}/${this.model?.id}`,
|
||||
to: `${fallbackResult.model.provider}/${fallbackResult.model.id}`,
|
||||
reason: fallbackResult.reason,
|
||||
});
|
||||
|
||||
this._emit({
|
||||
type: "auto_retry_start",
|
||||
attempt: this._retryAttempt + 1,
|
||||
maxAttempts: settings.maxRetries,
|
||||
delayMs: 0,
|
||||
errorMessage: `${message.errorMessage} (${fallbackResult.reason})`,
|
||||
});
|
||||
|
||||
// Retry immediately with fallback provider - don't increment _retryAttempt
|
||||
setTimeout(() => {
|
||||
this.agent.continue().catch(() => {
|
||||
// Retry failed - will be caught by next agent_end
|
||||
});
|
||||
}, 0);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// No fallback available either
|
||||
if (errorType === "quota_exhausted") {
|
||||
this._emit({
|
||||
type: "fallback_chain_exhausted",
|
||||
reason: `All providers exhausted for ${this.model.provider}/${this.model.id}`,
|
||||
});
|
||||
this._emit({
|
||||
type: "auto_retry_end",
|
||||
success: false,
|
||||
attempt: this._retryAttempt,
|
||||
finalError: message.errorMessage,
|
||||
});
|
||||
this._retryAttempt = 0;
|
||||
this._resolveRetry();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this._retryAttempt++;
|
||||
|
||||
if (this._retryAttempt > settings.maxRetries) {
|
||||
// Max retries exceeded, emit final failure and reset
|
||||
this._emit({
|
||||
type: "auto_retry_end",
|
||||
success: false,
|
||||
attempt: this._retryAttempt - 1,
|
||||
finalError: message.errorMessage,
|
||||
});
|
||||
this._retryAttempt = 0;
|
||||
this._resolveRetry(); // Resolve so waitForRetry() completes
|
||||
return false;
|
||||
}
|
||||
|
||||
// Use server-requested delay when available (rate limit headers), capped by maxDelayMs.
|
||||
// Fall back to exponential backoff when no server hint is present.
|
||||
const exponentialDelayMs = settings.baseDelayMs * 2 ** (this._retryAttempt - 1);
|
||||
let delayMs: number;
|
||||
if (message.retryAfterMs !== undefined) {
|
||||
const cap = settings.maxDelayMs > 0 ? settings.maxDelayMs : Infinity;
|
||||
if (message.retryAfterMs > cap) {
|
||||
// Server wants us to wait longer than maxDelayMs — give up to let auto-mode handle recovery
|
||||
this._emit({
|
||||
type: "auto_retry_end",
|
||||
success: false,
|
||||
attempt: this._retryAttempt - 1,
|
||||
finalError: `Rate limit reset in ${Math.ceil(message.retryAfterMs / 1000)}s (max: ${Math.ceil(cap / 1000)}s). ${message.errorMessage || ""}`.trim(),
|
||||
});
|
||||
this._retryAttempt = 0;
|
||||
this._resolveRetry();
|
||||
return false;
|
||||
}
|
||||
delayMs = message.retryAfterMs;
|
||||
} else {
|
||||
delayMs = exponentialDelayMs;
|
||||
}
|
||||
|
||||
this._emit({
|
||||
type: "auto_retry_start",
|
||||
attempt: this._retryAttempt,
|
||||
maxAttempts: settings.maxRetries,
|
||||
delayMs,
|
||||
errorMessage: message.errorMessage || "Unknown error",
|
||||
});
|
||||
|
||||
// Remove error message from agent state (keep in session for history)
|
||||
const messages = this.agent.state.messages;
|
||||
if (messages.length > 0 && messages[messages.length - 1].role === "assistant") {
|
||||
this.agent.replaceMessages(messages.slice(0, -1));
|
||||
}
|
||||
|
||||
// Wait with exponential backoff (abortable)
|
||||
this._retryAbortController = new AbortController();
|
||||
try {
|
||||
await sleep(delayMs, this._retryAbortController.signal);
|
||||
} catch {
|
||||
// Aborted during sleep - emit end event so UI can clean up
|
||||
const attempt = this._retryAttempt;
|
||||
this._retryAttempt = 0;
|
||||
this._retryAbortController = undefined;
|
||||
this._emit({
|
||||
type: "auto_retry_end",
|
||||
success: false,
|
||||
attempt,
|
||||
finalError: "Retry cancelled",
|
||||
});
|
||||
this._resolveRetry();
|
||||
return false;
|
||||
}
|
||||
this._retryAbortController = undefined;
|
||||
|
||||
// Retry via continue() - use setTimeout to break out of event handler chain
|
||||
setTimeout(() => {
|
||||
this.agent.continue().catch(() => {
|
||||
// Retry failed - will be caught by next agent_end
|
||||
});
|
||||
}, 0);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel in-progress retry.
|
||||
*/
|
||||
/** Cancel in-progress retry */
|
||||
abortRetry(): void {
|
||||
this._retryAbortController?.abort();
|
||||
// Note: _retryAttempt is reset in the catch block of _autoRetry
|
||||
this._resolveRetry();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for any in-progress retry to complete.
|
||||
* Returns immediately if no retry is in progress.
|
||||
*/
|
||||
private async waitForRetry(): Promise<void> {
|
||||
if (this._retryPromise) {
|
||||
await this._retryPromise;
|
||||
}
|
||||
this._retryHandler.abortRetry();
|
||||
}
|
||||
|
||||
/** Whether auto-retry is currently in progress */
|
||||
get isRetrying(): boolean {
|
||||
return this._retryPromise !== undefined;
|
||||
return this._retryHandler.isRetrying;
|
||||
}
|
||||
|
||||
/** Whether auto-retry is enabled */
|
||||
get autoRetryEnabled(): boolean {
|
||||
return this.settingsManager.getRetryEnabled();
|
||||
return this._retryHandler.autoRetryEnabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Toggle auto-retry setting.
|
||||
*/
|
||||
/** Toggle auto-retry setting */
|
||||
setAutoRetryEnabled(enabled: boolean): void {
|
||||
this.settingsManager.setRetryEnabled(enabled);
|
||||
this._retryHandler.setAutoRetryEnabled(enabled);
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
|
|
@ -3029,7 +2399,7 @@ export class AgentSession {
|
|||
};
|
||||
|
||||
// Set up abort controller for summarization
|
||||
this._branchSummaryAbortController = new AbortController();
|
||||
this._compactionOrchestrator.branchSummaryAbortController = new AbortController();
|
||||
let extensionSummary: { summary: string; details?: unknown } | undefined;
|
||||
let fromExtension = false;
|
||||
|
||||
|
|
@ -3038,7 +2408,7 @@ export class AgentSession {
|
|||
const result = (await this._extensionRunner.emit({
|
||||
type: "session_before_tree",
|
||||
preparation,
|
||||
signal: this._branchSummaryAbortController.signal,
|
||||
signal: this._compactionOrchestrator.branchSummaryAbortController.signal,
|
||||
})) as SessionBeforeTreeResult | undefined;
|
||||
|
||||
if (result?.cancel) {
|
||||
|
|
@ -3075,12 +2445,12 @@ export class AgentSession {
|
|||
const result = await generateBranchSummary(entriesToSummarize, {
|
||||
model,
|
||||
apiKey,
|
||||
signal: this._branchSummaryAbortController.signal,
|
||||
signal: this._compactionOrchestrator.branchSummaryAbortController.signal,
|
||||
customInstructions,
|
||||
replaceInstructions,
|
||||
reserveTokens: branchSummarySettings.reserveTokens,
|
||||
});
|
||||
this._branchSummaryAbortController = undefined;
|
||||
this._compactionOrchestrator.branchSummaryAbortController = undefined;
|
||||
if (result.aborted) {
|
||||
return { cancelled: true, aborted: true };
|
||||
}
|
||||
|
|
@ -3162,7 +2532,7 @@ export class AgentSession {
|
|||
|
||||
// Emit to custom tools
|
||||
|
||||
this._branchSummaryAbortController = undefined;
|
||||
this._compactionOrchestrator.branchSummaryAbortController = undefined;
|
||||
return { editorText, cancelled: false, summaryEntry };
|
||||
}
|
||||
|
||||
|
|
|
|||
424
packages/pi-coding-agent/src/core/compaction-orchestrator.ts
Normal file
424
packages/pi-coding-agent/src/core/compaction-orchestrator.ts
Normal file
|
|
@ -0,0 +1,424 @@
|
|||
/**
|
||||
* CompactionOrchestrator - Manages manual and automatic context compaction.
|
||||
*
|
||||
* Handles:
|
||||
* - Manual compaction (user-triggered /compact)
|
||||
* - Auto-compaction when context exceeds threshold
|
||||
* - Overflow recovery when LLM returns context overflow errors
|
||||
* - Extension integration for custom compaction providers
|
||||
* - Branch summarization abort coordination
|
||||
*/
|
||||
|
||||
import type { Agent } from "@gsd/pi-agent-core";
|
||||
import type { AssistantMessage, Model } from "@gsd/pi-ai";
|
||||
import { isContextOverflow } from "@gsd/pi-ai";
|
||||
import {
|
||||
type CompactionResult,
|
||||
calculateContextTokens,
|
||||
compact,
|
||||
estimateContextTokens,
|
||||
prepareCompaction,
|
||||
shouldCompact,
|
||||
} from "./compaction/index.js";
|
||||
import type { ExtensionRunner, SessionBeforeCompactResult } from "./extensions/index.js";
|
||||
import type { ModelRegistry } from "./model-registry.js";
|
||||
import { getLatestCompactionEntry } from "./session-manager.js";
|
||||
import type { CompactionEntry, SessionManager } from "./session-manager.js";
|
||||
import type { SettingsManager } from "./settings-manager.js";
|
||||
import type { AgentSessionEvent } from "./agent-session.js";
|
||||
import { getErrorMessage } from "../utils/error.js";
|
||||
|
||||
/** Dependencies injected from AgentSession into CompactionOrchestrator */
|
||||
export interface CompactionOrchestratorDeps {
|
||||
readonly agent: Agent;
|
||||
readonly sessionManager: SessionManager;
|
||||
readonly settingsManager: SettingsManager;
|
||||
readonly modelRegistry: ModelRegistry;
|
||||
getModel: () => Model<any> | undefined;
|
||||
getSessionId: () => string;
|
||||
getExtensionRunner: () => ExtensionRunner | undefined;
|
||||
emit: (event: AgentSessionEvent) => void;
|
||||
disconnectFromAgent: () => void;
|
||||
reconnectToAgent: () => void;
|
||||
abort: () => Promise<void>;
|
||||
}
|
||||
|
||||
export class CompactionOrchestrator {
|
||||
private _compactionAbortController: AbortController | undefined = undefined;
|
||||
private _autoCompactionAbortController: AbortController | undefined = undefined;
|
||||
private _overflowRecoveryAttempted = false;
|
||||
private _branchSummaryAbortController: AbortController | undefined = undefined;
|
||||
|
||||
constructor(private readonly _deps: CompactionOrchestratorDeps) {}
|
||||
|
||||
/** Whether compaction or branch summarization is currently running */
|
||||
get isCompacting(): boolean {
|
||||
return (
|
||||
this._autoCompactionAbortController !== undefined ||
|
||||
this._compactionAbortController !== undefined ||
|
||||
this._branchSummaryAbortController !== undefined
|
||||
);
|
||||
}
|
||||
|
||||
/** Reset overflow recovery flag (called when a new user message starts) */
|
||||
resetOverflowRecovery(): void {
|
||||
this._overflowRecoveryAttempted = false;
|
||||
}
|
||||
|
||||
/** Mark overflow recovery as not needed (called on successful assistant response) */
|
||||
clearOverflowRecovery(): void {
|
||||
this._overflowRecoveryAttempted = false;
|
||||
}
|
||||
|
||||
/** Get/set the branch summary abort controller (used by navigateTree) */
|
||||
get branchSummaryAbortController(): AbortController | undefined {
|
||||
return this._branchSummaryAbortController;
|
||||
}
|
||||
set branchSummaryAbortController(controller: AbortController | undefined) {
|
||||
this._branchSummaryAbortController = controller;
|
||||
}
|
||||
|
||||
/**
|
||||
* Manually compact the session context.
|
||||
* Aborts current agent operation first.
|
||||
* @param customInstructions Optional instructions for the compaction summary
|
||||
*/
|
||||
async compact(customInstructions?: string): Promise<CompactionResult> {
|
||||
this._deps.disconnectFromAgent();
|
||||
await this._deps.abort();
|
||||
this._compactionAbortController = new AbortController();
|
||||
|
||||
try {
|
||||
const model = this._deps.getModel();
|
||||
if (!model) {
|
||||
throw new Error("No model selected");
|
||||
}
|
||||
|
||||
const apiKey = await this._deps.modelRegistry.getApiKey(model, this._deps.getSessionId());
|
||||
if (!apiKey) {
|
||||
throw new Error(`No API key for ${model.provider}`);
|
||||
}
|
||||
|
||||
const pathEntries = this._deps.sessionManager.getBranch();
|
||||
const settings = this._deps.settingsManager.getCompactionSettings();
|
||||
|
||||
const preparation = prepareCompaction(pathEntries, settings);
|
||||
if (!preparation) {
|
||||
const lastEntry = pathEntries[pathEntries.length - 1];
|
||||
if (lastEntry?.type === "compaction") {
|
||||
throw new Error("Already compacted");
|
||||
}
|
||||
throw new Error("Nothing to compact (session too small)");
|
||||
}
|
||||
|
||||
let extensionCompaction: CompactionResult | undefined;
|
||||
let fromExtension = false;
|
||||
const extensionRunner = this._deps.getExtensionRunner();
|
||||
|
||||
if (extensionRunner?.hasHandlers("session_before_compact")) {
|
||||
const result = (await extensionRunner.emit({
|
||||
type: "session_before_compact",
|
||||
preparation,
|
||||
branchEntries: pathEntries,
|
||||
customInstructions,
|
||||
signal: this._compactionAbortController.signal,
|
||||
})) as SessionBeforeCompactResult | undefined;
|
||||
|
||||
if (result?.cancel) {
|
||||
throw new Error("Compaction cancelled");
|
||||
}
|
||||
|
||||
if (result?.compaction) {
|
||||
extensionCompaction = result.compaction;
|
||||
fromExtension = true;
|
||||
}
|
||||
}
|
||||
|
||||
let summary: string;
|
||||
let firstKeptEntryId: string;
|
||||
let tokensBefore: number;
|
||||
let details: unknown;
|
||||
|
||||
if (extensionCompaction) {
|
||||
summary = extensionCompaction.summary;
|
||||
firstKeptEntryId = extensionCompaction.firstKeptEntryId;
|
||||
tokensBefore = extensionCompaction.tokensBefore;
|
||||
details = extensionCompaction.details;
|
||||
} else {
|
||||
const result = await compact(
|
||||
preparation,
|
||||
model,
|
||||
apiKey,
|
||||
customInstructions,
|
||||
this._compactionAbortController.signal,
|
||||
);
|
||||
summary = result.summary;
|
||||
firstKeptEntryId = result.firstKeptEntryId;
|
||||
tokensBefore = result.tokensBefore;
|
||||
details = result.details;
|
||||
}
|
||||
|
||||
if (this._compactionAbortController.signal.aborted) {
|
||||
throw new Error("Compaction cancelled");
|
||||
}
|
||||
|
||||
this._deps.sessionManager.appendCompaction(summary, firstKeptEntryId, tokensBefore, details, fromExtension);
|
||||
const newEntries = this._deps.sessionManager.getEntries();
|
||||
const sessionContext = this._deps.sessionManager.buildSessionContext();
|
||||
this._deps.agent.replaceMessages(sessionContext.messages);
|
||||
|
||||
const savedCompactionEntry = newEntries.find(
|
||||
(e) => e.type === "compaction" && e.summary === summary,
|
||||
) as CompactionEntry | undefined;
|
||||
|
||||
if (extensionRunner && savedCompactionEntry) {
|
||||
await extensionRunner.emit({
|
||||
type: "session_compact",
|
||||
compactionEntry: savedCompactionEntry,
|
||||
fromExtension,
|
||||
});
|
||||
}
|
||||
|
||||
return { summary, firstKeptEntryId, tokensBefore, details };
|
||||
} finally {
|
||||
this._compactionAbortController = undefined;
|
||||
this._deps.reconnectToAgent();
|
||||
}
|
||||
}
|
||||
|
||||
/** Cancel in-progress compaction (manual or auto) */
|
||||
abortCompaction(): void {
|
||||
this._compactionAbortController?.abort();
|
||||
this._autoCompactionAbortController?.abort();
|
||||
}
|
||||
|
||||
/** Cancel in-progress branch summarization */
|
||||
abortBranchSummary(): void {
|
||||
this._branchSummaryAbortController?.abort();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if compaction is needed and run it.
|
||||
* Called after agent_end and before prompt submission.
|
||||
*
|
||||
* Two cases:
|
||||
* 1. Overflow: LLM returned context overflow error, remove error message, compact, auto-retry
|
||||
* 2. Threshold: Context over threshold, compact, NO auto-retry
|
||||
*
|
||||
* @param assistantMessage The assistant message to check
|
||||
* @param skipAbortedCheck If false, include aborted messages (for pre-prompt check). Default: true
|
||||
*/
|
||||
async checkCompaction(assistantMessage: AssistantMessage, skipAbortedCheck = true): Promise<void> {
|
||||
const settings = this._deps.settingsManager.getCompactionSettings();
|
||||
if (!settings.enabled) return;
|
||||
|
||||
if (skipAbortedCheck && assistantMessage.stopReason === "aborted") return;
|
||||
|
||||
const model = this._deps.getModel();
|
||||
const contextWindow = model?.contextWindow ?? 0;
|
||||
|
||||
const sameModel =
|
||||
model && assistantMessage.provider === model.provider && assistantMessage.model === model.id;
|
||||
|
||||
const branchEntries = this._deps.sessionManager.getBranch();
|
||||
const compactionEntry = getLatestCompactionEntry(branchEntries);
|
||||
const assistantIsFromBeforeCompaction =
|
||||
compactionEntry !== null && assistantMessage.timestamp <= new Date(compactionEntry.timestamp).getTime();
|
||||
if (assistantIsFromBeforeCompaction) return;
|
||||
|
||||
// Case 1: Overflow - LLM returned context overflow error
|
||||
if (sameModel && isContextOverflow(assistantMessage, contextWindow)) {
|
||||
if (this._overflowRecoveryAttempted) {
|
||||
this._deps.emit({
|
||||
type: "auto_compaction_end",
|
||||
result: undefined,
|
||||
aborted: false,
|
||||
willRetry: false,
|
||||
errorMessage:
|
||||
"Context overflow recovery failed after one compact-and-retry attempt. Try reducing context or switching to a larger-context model.",
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
this._overflowRecoveryAttempted = true;
|
||||
const messages = this._deps.agent.state.messages;
|
||||
if (messages.length > 0 && messages[messages.length - 1].role === "assistant") {
|
||||
this._deps.agent.replaceMessages(messages.slice(0, -1));
|
||||
}
|
||||
await this._runAutoCompaction("overflow", true);
|
||||
return;
|
||||
}
|
||||
|
||||
// Case 2: Threshold - context is getting large
|
||||
let contextTokens: number;
|
||||
if (assistantMessage.stopReason === "error") {
|
||||
const messages = this._deps.agent.state.messages;
|
||||
const estimate = estimateContextTokens(messages);
|
||||
if (estimate.lastUsageIndex === null) return;
|
||||
const usageMsg = messages[estimate.lastUsageIndex];
|
||||
if (
|
||||
compactionEntry &&
|
||||
usageMsg.role === "assistant" &&
|
||||
(usageMsg as AssistantMessage).timestamp <= new Date(compactionEntry.timestamp).getTime()
|
||||
) {
|
||||
return;
|
||||
}
|
||||
contextTokens = estimate.tokens;
|
||||
} else {
|
||||
contextTokens = calculateContextTokens(assistantMessage.usage);
|
||||
}
|
||||
if (shouldCompact(contextTokens, contextWindow, settings)) {
|
||||
await this._runAutoCompaction("threshold", false);
|
||||
}
|
||||
}
|
||||
|
||||
/** Toggle auto-compaction setting */
|
||||
setAutoCompactionEnabled(enabled: boolean): void {
|
||||
this._deps.settingsManager.setCompactionEnabled(enabled);
|
||||
}
|
||||
|
||||
/** Whether auto-compaction is enabled */
|
||||
get autoCompactionEnabled(): boolean {
|
||||
return this._deps.settingsManager.getCompactionEnabled();
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Private helpers
|
||||
// =========================================================================
|
||||
|
||||
private async _runAutoCompaction(reason: "overflow" | "threshold", willRetry: boolean): Promise<void> {
|
||||
const settings = this._deps.settingsManager.getCompactionSettings();
|
||||
|
||||
this._deps.emit({ type: "auto_compaction_start", reason });
|
||||
this._autoCompactionAbortController = new AbortController();
|
||||
|
||||
try {
|
||||
const model = this._deps.getModel();
|
||||
if (!model) {
|
||||
this._deps.emit({ type: "auto_compaction_end", result: undefined, aborted: false, willRetry: false });
|
||||
return;
|
||||
}
|
||||
|
||||
const apiKey = await this._deps.modelRegistry.getApiKey(model, this._deps.getSessionId());
|
||||
if (!apiKey) {
|
||||
this._deps.emit({ type: "auto_compaction_end", result: undefined, aborted: false, willRetry: false });
|
||||
return;
|
||||
}
|
||||
|
||||
const pathEntries = this._deps.sessionManager.getBranch();
|
||||
const preparation = prepareCompaction(pathEntries, settings);
|
||||
if (!preparation) {
|
||||
this._deps.emit({ type: "auto_compaction_end", result: undefined, aborted: false, willRetry: false });
|
||||
return;
|
||||
}
|
||||
|
||||
let extensionCompaction: CompactionResult | undefined;
|
||||
let fromExtension = false;
|
||||
const extensionRunner = this._deps.getExtensionRunner();
|
||||
|
||||
if (extensionRunner?.hasHandlers("session_before_compact")) {
|
||||
const extensionResult = (await extensionRunner.emit({
|
||||
type: "session_before_compact",
|
||||
preparation,
|
||||
branchEntries: pathEntries,
|
||||
customInstructions: undefined,
|
||||
signal: this._autoCompactionAbortController.signal,
|
||||
})) as SessionBeforeCompactResult | undefined;
|
||||
|
||||
if (extensionResult?.cancel) {
|
||||
this._deps.emit({
|
||||
type: "auto_compaction_end",
|
||||
result: undefined,
|
||||
aborted: true,
|
||||
willRetry: false,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (extensionResult?.compaction) {
|
||||
extensionCompaction = extensionResult.compaction;
|
||||
fromExtension = true;
|
||||
}
|
||||
}
|
||||
|
||||
let summary: string;
|
||||
let firstKeptEntryId: string;
|
||||
let tokensBefore: number;
|
||||
let details: unknown;
|
||||
|
||||
if (extensionCompaction) {
|
||||
summary = extensionCompaction.summary;
|
||||
firstKeptEntryId = extensionCompaction.firstKeptEntryId;
|
||||
tokensBefore = extensionCompaction.tokensBefore;
|
||||
details = extensionCompaction.details;
|
||||
} else {
|
||||
const compactResult = await compact(
|
||||
preparation,
|
||||
model,
|
||||
apiKey,
|
||||
undefined,
|
||||
this._autoCompactionAbortController.signal,
|
||||
);
|
||||
summary = compactResult.summary;
|
||||
firstKeptEntryId = compactResult.firstKeptEntryId;
|
||||
tokensBefore = compactResult.tokensBefore;
|
||||
details = compactResult.details;
|
||||
}
|
||||
|
||||
if (this._autoCompactionAbortController.signal.aborted) {
|
||||
this._deps.emit({ type: "auto_compaction_end", result: undefined, aborted: true, willRetry: false });
|
||||
return;
|
||||
}
|
||||
|
||||
this._deps.sessionManager.appendCompaction(summary, firstKeptEntryId, tokensBefore, details, fromExtension);
|
||||
const newEntries = this._deps.sessionManager.getEntries();
|
||||
const sessionContext = this._deps.sessionManager.buildSessionContext();
|
||||
this._deps.agent.replaceMessages(sessionContext.messages);
|
||||
|
||||
const savedCompactionEntry = newEntries.find(
|
||||
(e) => e.type === "compaction" && e.summary === summary,
|
||||
) as CompactionEntry | undefined;
|
||||
|
||||
if (extensionRunner && savedCompactionEntry) {
|
||||
await extensionRunner.emit({
|
||||
type: "session_compact",
|
||||
compactionEntry: savedCompactionEntry,
|
||||
fromExtension,
|
||||
});
|
||||
}
|
||||
|
||||
const result: CompactionResult = { summary, firstKeptEntryId, tokensBefore, details };
|
||||
this._deps.emit({ type: "auto_compaction_end", result, aborted: false, willRetry });
|
||||
|
||||
if (willRetry) {
|
||||
const messages = this._deps.agent.state.messages;
|
||||
const lastMsg = messages[messages.length - 1];
|
||||
if (lastMsg?.role === "assistant" && (lastMsg as AssistantMessage).stopReason === "error") {
|
||||
this._deps.agent.replaceMessages(messages.slice(0, -1));
|
||||
}
|
||||
|
||||
setTimeout(() => {
|
||||
this._deps.agent.continue().catch(() => {});
|
||||
}, 100);
|
||||
} else if (this._deps.agent.hasQueuedMessages()) {
|
||||
setTimeout(() => {
|
||||
this._deps.agent.continue().catch(() => {});
|
||||
}, 100);
|
||||
}
|
||||
} catch (error) {
|
||||
const errorMessage = getErrorMessage(error);
|
||||
this._deps.emit({
|
||||
type: "auto_compaction_end",
|
||||
result: undefined,
|
||||
aborted: false,
|
||||
willRetry: false,
|
||||
errorMessage:
|
||||
reason === "overflow"
|
||||
? `Context overflow recovery failed: ${errorMessage}`
|
||||
: `Auto-compaction failed: ${errorMessage}`,
|
||||
});
|
||||
} finally {
|
||||
this._autoCompactionAbortController = undefined;
|
||||
}
|
||||
}
|
||||
}
|
||||
359
packages/pi-coding-agent/src/core/retry-handler.ts
Normal file
359
packages/pi-coding-agent/src/core/retry-handler.ts
Normal file
|
|
@ -0,0 +1,359 @@
|
|||
/**
|
||||
* RetryHandler - Automatic retry logic with exponential backoff and credential/provider fallback.
|
||||
*
|
||||
* Handles retryable errors (overloaded, rate limit, server errors) by:
|
||||
* 1. Trying alternate credentials for the same provider
|
||||
* 2. Falling back to other providers via FallbackResolver
|
||||
* 3. Exponential backoff with configurable max retries
|
||||
*
|
||||
* Context overflow errors are NOT handled here (see compaction).
|
||||
*/
|
||||
|
||||
import type { Agent } from "@gsd/pi-agent-core";
|
||||
import type { AssistantMessage, Model } from "@gsd/pi-ai";
|
||||
import { isContextOverflow } from "@gsd/pi-ai";
|
||||
import type { UsageLimitErrorType } from "./auth-storage.js";
|
||||
import type { FallbackResolver } from "./fallback-resolver.js";
|
||||
import type { ModelRegistry } from "./model-registry.js";
|
||||
import type { SettingsManager } from "./settings-manager.js";
|
||||
import { sleep } from "../utils/sleep.js";
|
||||
import type { AgentSessionEvent } from "./agent-session.js";
|
||||
|
||||
/** Dependencies injected from AgentSession into RetryHandler */
|
||||
export interface RetryHandlerDeps {
|
||||
readonly agent: Agent;
|
||||
readonly settingsManager: SettingsManager;
|
||||
readonly modelRegistry: ModelRegistry;
|
||||
readonly fallbackResolver: FallbackResolver;
|
||||
getModel: () => Model<any> | undefined;
|
||||
getSessionId: () => string;
|
||||
emit: (event: AgentSessionEvent) => void;
|
||||
/** Called when the retry handler switches to a fallback model */
|
||||
onModelChange: (model: Model<any>) => void;
|
||||
}
|
||||
|
||||
export class RetryHandler {
|
||||
private _retryAbortController: AbortController | undefined = undefined;
|
||||
private _retryAttempt = 0;
|
||||
private _retryPromise: Promise<void> | undefined = undefined;
|
||||
private _retryResolve: (() => void) | undefined = undefined;
|
||||
|
||||
constructor(private readonly _deps: RetryHandlerDeps) {}
|
||||
|
||||
/** Current retry attempt (0 if not retrying) */
|
||||
get retryAttempt(): number {
|
||||
return this._retryAttempt;
|
||||
}
|
||||
|
||||
/** Whether auto-retry is currently in progress */
|
||||
get isRetrying(): boolean {
|
||||
return this._retryPromise !== undefined;
|
||||
}
|
||||
|
||||
/** Whether auto-retry is enabled */
|
||||
get autoRetryEnabled(): boolean {
|
||||
return this._deps.settingsManager.getRetryEnabled();
|
||||
}
|
||||
|
||||
/** Toggle auto-retry setting */
|
||||
setAutoRetryEnabled(enabled: boolean): void {
|
||||
this._deps.settingsManager.setRetryEnabled(enabled);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a retry promise synchronously for agent_end events.
|
||||
* Must be called synchronously from the agent event handler before
|
||||
* any async processing, so that waitForRetry() doesn't miss in-flight retries.
|
||||
*/
|
||||
createRetryPromiseForAgentEnd(messages: Array<{ role: string } & Record<string, any>>): void {
|
||||
if (this._retryPromise) return;
|
||||
|
||||
const settings = this._deps.settingsManager.getRetrySettings();
|
||||
if (!settings.enabled) return;
|
||||
|
||||
const lastAssistant = this._findLastAssistantInMessages(messages);
|
||||
if (!lastAssistant || !this.isRetryableError(lastAssistant)) return;
|
||||
|
||||
this._retryPromise = new Promise((resolve) => {
|
||||
this._retryResolve = resolve;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a successful assistant response by resetting retry state.
|
||||
* Call this when an assistant message completes without error.
|
||||
*/
|
||||
handleSuccessfulResponse(): void {
|
||||
if (this._retryAttempt > 0) {
|
||||
this._deps.emit({
|
||||
type: "auto_retry_end",
|
||||
success: true,
|
||||
attempt: this._retryAttempt,
|
||||
});
|
||||
this._retryAttempt = 0;
|
||||
this._resolveRetry();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if an error is retryable (overloaded, rate limit, server errors).
|
||||
* Context overflow errors are NOT retryable (handled by compaction instead).
|
||||
*/
|
||||
isRetryableError(message: AssistantMessage): boolean {
|
||||
if (message.stopReason !== "error" || !message.errorMessage) return false;
|
||||
|
||||
// Context overflow is handled by compaction, not retry
|
||||
const contextWindow = this._deps.getModel()?.contextWindow ?? 0;
|
||||
if (isContextOverflow(message, contextWindow)) return false;
|
||||
|
||||
const err = message.errorMessage;
|
||||
return /overloaded|rate.?limit|too many requests|429|500|502|503|504|service.?unavailable|server.?error|internal.?error|connection.?error|connection.?refused|other side closed|fetch failed|upstream.?connect|reset before headers|terminated|retry delay|network.?(?:is\s+)?unavailable|credentials.*expired|temporarily backed off/i.test(
|
||||
err,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle retryable errors with exponential backoff.
|
||||
* When multiple credentials are available, marks the failing credential
|
||||
* as backed off and retries immediately with the next one.
|
||||
* @returns true if retry was initiated, false if max retries exceeded or disabled
|
||||
*/
|
||||
async handleRetryableError(message: AssistantMessage): Promise<boolean> {
|
||||
const settings = this._deps.settingsManager.getRetrySettings();
|
||||
if (!settings.enabled) {
|
||||
this._resolveRetry();
|
||||
return false;
|
||||
}
|
||||
|
||||
// Retry promise is created synchronously in createRetryPromiseForAgentEnd.
|
||||
// Keep a defensive fallback here in case a future refactor bypasses that path.
|
||||
if (!this._retryPromise) {
|
||||
this._retryPromise = new Promise((resolve) => {
|
||||
this._retryResolve = resolve;
|
||||
});
|
||||
}
|
||||
|
||||
// Try credential fallback before counting against retry budget.
|
||||
if (this._deps.getModel() && message.errorMessage) {
|
||||
const errorType = this._classifyErrorType(message.errorMessage);
|
||||
const isCredentialError = errorType !== "unknown";
|
||||
const hasAlternate =
|
||||
isCredentialError &&
|
||||
this._deps.modelRegistry.authStorage.markUsageLimitReached(
|
||||
this._deps.getModel()!.provider,
|
||||
this._deps.getSessionId(),
|
||||
{ errorType },
|
||||
);
|
||||
|
||||
if (hasAlternate) {
|
||||
this._removeLastAssistantError();
|
||||
|
||||
this._deps.emit({
|
||||
type: "auto_retry_start",
|
||||
attempt: this._retryAttempt + 1,
|
||||
maxAttempts: settings.maxRetries,
|
||||
delayMs: 0,
|
||||
errorMessage: `${message.errorMessage} (switching credential)`,
|
||||
});
|
||||
|
||||
// Retry immediately with the next credential - don't increment _retryAttempt
|
||||
setTimeout(() => {
|
||||
this._deps.agent.continue().catch(() => {});
|
||||
}, 0);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// All credentials are backed off. Try cross-provider fallback before giving up.
|
||||
if (isCredentialError) {
|
||||
const fallbackResult = await this._deps.fallbackResolver.findFallback(
|
||||
this._deps.getModel()!,
|
||||
errorType,
|
||||
);
|
||||
|
||||
if (fallbackResult) {
|
||||
const previousProvider = this._deps.getModel()!.provider;
|
||||
this._deps.agent.setModel(fallbackResult.model);
|
||||
this._deps.onModelChange(fallbackResult.model);
|
||||
this._removeLastAssistantError();
|
||||
|
||||
this._deps.emit({
|
||||
type: "fallback_provider_switch",
|
||||
from: `${previousProvider}/${this._deps.getModel()?.id}`,
|
||||
to: `${fallbackResult.model.provider}/${fallbackResult.model.id}`,
|
||||
reason: fallbackResult.reason,
|
||||
});
|
||||
|
||||
this._deps.emit({
|
||||
type: "auto_retry_start",
|
||||
attempt: this._retryAttempt + 1,
|
||||
maxAttempts: settings.maxRetries,
|
||||
delayMs: 0,
|
||||
errorMessage: `${message.errorMessage} (${fallbackResult.reason})`,
|
||||
});
|
||||
|
||||
// Retry immediately with fallback provider - don't increment _retryAttempt
|
||||
setTimeout(() => {
|
||||
this._deps.agent.continue().catch(() => {});
|
||||
}, 0);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// No fallback available either
|
||||
if (errorType === "quota_exhausted") {
|
||||
this._deps.emit({
|
||||
type: "fallback_chain_exhausted",
|
||||
reason: `All providers exhausted for ${this._deps.getModel()!.provider}/${this._deps.getModel()!.id}`,
|
||||
});
|
||||
this._deps.emit({
|
||||
type: "auto_retry_end",
|
||||
success: false,
|
||||
attempt: this._retryAttempt,
|
||||
finalError: message.errorMessage,
|
||||
});
|
||||
this._retryAttempt = 0;
|
||||
this._resolveRetry();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this._retryAttempt++;
|
||||
|
||||
if (this._retryAttempt > settings.maxRetries) {
|
||||
this._deps.emit({
|
||||
type: "auto_retry_end",
|
||||
success: false,
|
||||
attempt: this._retryAttempt - 1,
|
||||
finalError: message.errorMessage,
|
||||
});
|
||||
this._retryAttempt = 0;
|
||||
this._resolveRetry();
|
||||
return false;
|
||||
}
|
||||
|
||||
// Use server-requested delay when available, capped by maxDelayMs.
|
||||
// Fall back to exponential backoff when no server hint is present.
|
||||
const exponentialDelayMs = settings.baseDelayMs * 2 ** (this._retryAttempt - 1);
|
||||
let delayMs: number;
|
||||
if (message.retryAfterMs !== undefined) {
|
||||
const cap = settings.maxDelayMs > 0 ? settings.maxDelayMs : Infinity;
|
||||
if (message.retryAfterMs > cap) {
|
||||
this._deps.emit({
|
||||
type: "auto_retry_end",
|
||||
success: false,
|
||||
attempt: this._retryAttempt - 1,
|
||||
finalError: `Rate limit reset in ${Math.ceil(message.retryAfterMs / 1000)}s (max: ${Math.ceil(cap / 1000)}s). ${message.errorMessage || ""}`.trim(),
|
||||
});
|
||||
this._retryAttempt = 0;
|
||||
this._resolveRetry();
|
||||
return false;
|
||||
}
|
||||
delayMs = message.retryAfterMs;
|
||||
} else {
|
||||
delayMs = exponentialDelayMs;
|
||||
}
|
||||
|
||||
this._deps.emit({
|
||||
type: "auto_retry_start",
|
||||
attempt: this._retryAttempt,
|
||||
maxAttempts: settings.maxRetries,
|
||||
delayMs,
|
||||
errorMessage: message.errorMessage || "Unknown error",
|
||||
});
|
||||
|
||||
this._removeLastAssistantError();
|
||||
|
||||
// Wait with exponential backoff (abortable)
|
||||
this._retryAbortController = new AbortController();
|
||||
try {
|
||||
await sleep(delayMs, this._retryAbortController.signal);
|
||||
} catch {
|
||||
// Aborted during sleep
|
||||
const attempt = this._retryAttempt;
|
||||
this._retryAttempt = 0;
|
||||
this._retryAbortController = undefined;
|
||||
this._deps.emit({
|
||||
type: "auto_retry_end",
|
||||
success: false,
|
||||
attempt,
|
||||
finalError: "Retry cancelled",
|
||||
});
|
||||
this._resolveRetry();
|
||||
return false;
|
||||
}
|
||||
this._retryAbortController = undefined;
|
||||
|
||||
// Retry via continue() - use setTimeout to break out of event handler chain
|
||||
setTimeout(() => {
|
||||
this._deps.agent.continue().catch(() => {});
|
||||
}, 0);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/** Cancel in-progress retry */
|
||||
abortRetry(): void {
|
||||
this._retryAbortController?.abort();
|
||||
this._resolveRetry();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for any in-progress retry to complete.
|
||||
* Returns immediately if no retry is in progress.
|
||||
*/
|
||||
async waitForRetry(): Promise<void> {
|
||||
if (this._retryPromise) {
|
||||
await this._retryPromise;
|
||||
}
|
||||
}
|
||||
|
||||
/** Resolve the pending retry promise */
|
||||
resolveRetry(): void {
|
||||
this._resolveRetry();
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Private helpers
|
||||
// =========================================================================
|
||||
|
||||
private _resolveRetry(): void {
|
||||
if (this._retryResolve) {
|
||||
this._retryResolve();
|
||||
this._retryResolve = undefined;
|
||||
this._retryPromise = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
private _findLastAssistantInMessages(
|
||||
messages: Array<{ role: string } & Record<string, any>>,
|
||||
): AssistantMessage | undefined {
|
||||
for (let i = messages.length - 1; i >= 0; i--) {
|
||||
const message = messages[i];
|
||||
if (message.role === "assistant") {
|
||||
return message as AssistantMessage;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Classify an error message into a usage-limit error type for credential backoff.
|
||||
*/
|
||||
private _classifyErrorType(errorMessage: string): UsageLimitErrorType {
|
||||
const err = errorMessage.toLowerCase();
|
||||
if (/quota|billing|exceeded.*limit|usage.*limit/i.test(err)) return "quota_exhausted";
|
||||
if (/rate.?limit|too many requests|429/i.test(err)) return "rate_limit";
|
||||
if (/500|502|503|504|server.?error|internal.?error|service.?unavailable/i.test(err)) return "server_error";
|
||||
return "unknown";
|
||||
}
|
||||
|
||||
/** Remove the last assistant error message from agent state */
|
||||
private _removeLastAssistantError(): void {
|
||||
const messages = this._deps.agent.state.messages;
|
||||
if (messages.length > 0 && messages[messages.length - 1].role === "assistant") {
|
||||
this._deps.agent.replaceMessages(messages.slice(0, -1));
|
||||
}
|
||||
}
|
||||
}
|
||||
6
packages/pi-coding-agent/src/utils/error.ts
Normal file
6
packages/pi-coding-agent/src/utils/error.ts
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
/**
|
||||
* Extract a human-readable message from an unknown caught value.
|
||||
*/
|
||||
export function getErrorMessage(err: unknown): string {
|
||||
return err instanceof Error ? err.message : String(err);
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue