From 0e0f47ef9fbc2bf2af370bd667fff03e478dcff1 Mon Sep 17 00:00:00 2001 From: Jeremy McSpadden Date: Tue, 17 Mar 2026 17:03:49 -0500 Subject: [PATCH] fix: failure recovery & resume safeguards (all 4 waves) (#956) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: prevent data loss on crash with atomic writes, file locking, and error handling Wave 1 of failure recovery safeguards: 1. Atomic session file rewrites (tmp+rename) — _rewriteFile() and forkFrom() now use atomicWriteFileSync to prevent session file corruption on crash 2. Atomic auto.lock writes — crash-recovery.ts writeLock() uses tmp+rename so the crash detection system itself can't be corrupted 3. unhandledRejection handler — catches silent process death from unhandled promise rejections in OAuth, extensions, LSP, or MCP connections 4. try/catch in emitToolCall — matches pattern used by emitUserBash, emitContext, and emitToolResult to prevent extension handler crashes from killing the entire agent turn 5. File locking on session appends — prevents concurrent pi instances from interleaving partial JSON lines in session JSONL files using the same proper-lockfile pattern established in auth-storage.ts and settings-manager.ts * fix: add OAuth timeouts, RPC exit detection, and command context guards Wave 2 of failure recovery safeguards: 1. OAuth fetch timeouts — all fetch() calls across all OAuth providers (Anthropic, OpenAI Codex, Google Antigravity, Google Gemini CLI, GitHub Copilot) now have 30-second AbortSignal.timeout() to prevent indefinite hangs when OAuth servers are unresponsive 2. RPC subprocess exit detection — pending requests are now rejected when the agent subprocess exits unexpectedly, preventing indefinite hangs in the RPC client 3. Extension command context guards — default handlers for newSession, fork, navigateTree, switchSession, and reload now throw explicit errors instead of silently returning success when called before bindCommandContext() 4. OAuth error detail preservation — token refresh errors now preserve the original error as `cause` for better diagnostics * fix: resource cleanup, LSP retry, and crash detection on session resume Wave 3 of failure recovery safeguards: 1. Atomic completed-units.json cleanup — milestone completion writes now use tmp+rename pattern for consistency with auto-recovery.ts 2. Bash temp file cleanup — track temp files created for large output and register a process exit handler to clean them up 3. Settings write queue flush on shutdown — call settingsManager.flush() during interactive mode shutdown so queued writes aren't lost 4. LSP initialization retry — wrap getOrCreateClient with up to 2 retries with exponential backoff (1s, 2s) for transient spawn failures 5. Crash detection on session resume — wasInterrupted() checks if last assistant turn had tool calls without results, shows warning on resume * fix: blob garbage collection and LSP debug logging Wave 4 of failure recovery safeguards: 1. Blob garbage collection — BlobStore.gc(referencedHashes) removes orphaned blobs not referenced by any session file, plus totalSize() for monitoring blob directory growth 2. LSP JSON parse error logging — malformed LSP messages are now logged at debug level (when DEBUG env is set) instead of being silently dropped --- packages/pi-ai/src/utils/oauth/anthropic.ts | 2 + .../pi-ai/src/utils/oauth/github-copilot.ts | 6 +- .../src/utils/oauth/google-antigravity.ts | 4 + .../src/utils/oauth/google-gemini-cli.ts | 6 ++ packages/pi-ai/src/utils/oauth/index.ts | 4 +- .../pi-ai/src/utils/oauth/openai-codex.ts | 2 + .../pi-coding-agent/src/core/bash-executor.ts | 24 ++++- .../pi-coding-agent/src/core/blob-store.ts | 47 ++++++++- .../src/core/extensions/runner.test.ts | 84 ++++++++++++++++ .../src/core/extensions/runner.ts | 41 ++++++-- .../pi-coding-agent/src/core/fs-utils.test.ts | 66 +++++++++++++ packages/pi-coding-agent/src/core/fs-utils.ts | 12 +++ .../pi-coding-agent/src/core/lsp/client.ts | 24 ++++- .../src/core/session-manager.ts | 96 ++++++++++++++++--- packages/pi-coding-agent/src/main.ts | 7 ++ .../src/modes/interactive/interactive-mode.ts | 10 +- .../src/modes/rpc/rpc-client.ts | 12 +++ src/resources/extensions/gsd/auto.ts | 14 ++- .../extensions/gsd/crash-recovery.ts | 7 +- 19 files changed, 433 insertions(+), 35 deletions(-) create mode 100644 packages/pi-coding-agent/src/core/extensions/runner.test.ts create mode 100644 packages/pi-coding-agent/src/core/fs-utils.test.ts create mode 100644 packages/pi-coding-agent/src/core/fs-utils.ts diff --git a/packages/pi-ai/src/utils/oauth/anthropic.ts b/packages/pi-ai/src/utils/oauth/anthropic.ts index 5355df0d0..f3a349dc4 100644 --- a/packages/pi-ai/src/utils/oauth/anthropic.ts +++ b/packages/pi-ai/src/utils/oauth/anthropic.ts @@ -61,6 +61,7 @@ export async function loginAnthropic( redirect_uri: REDIRECT_URI, code_verifier: verifier, }), + signal: AbortSignal.timeout(30_000), }); if (!tokenResponse.ok) { @@ -97,6 +98,7 @@ export async function refreshAnthropicToken(refreshToken: string): Promise { - const response = await fetch(url, init); + const response = await fetch(url, { + ...init, + signal: init.signal ?? AbortSignal.timeout(30_000), + }); if (!response.ok) { const text = await response.text(); throw new Error(`${response.status} ${response.statusText}: ${text}`); @@ -276,6 +279,7 @@ async function enableGitHubCopilotModel(token: string, modelId: string, enterpri "x-interaction-type": "chat-policy", }, body: JSON.stringify({ state: "enabled" }), + signal: AbortSignal.timeout(30_000), }); return response.ok; } catch { diff --git a/packages/pi-ai/src/utils/oauth/google-antigravity.ts b/packages/pi-ai/src/utils/oauth/google-antigravity.ts index e9ef3c740..b9a49d473 100644 --- a/packages/pi-ai/src/utils/oauth/google-antigravity.ts +++ b/packages/pi-ai/src/utils/oauth/google-antigravity.ts @@ -184,6 +184,7 @@ async function discoverProject(accessToken: string, onProgress?: (message: strin pluginType: "GEMINI", }, }), + signal: AbortSignal.timeout(30_000), }); if (loadResponse.ok) { @@ -220,6 +221,7 @@ async function getUserEmail(accessToken: string): Promise { headers: { Authorization: `Bearer ${accessToken}`, }, + signal: AbortSignal.timeout(30_000), }); if (response.ok) { @@ -245,6 +247,7 @@ export async function refreshAntigravityToken(refreshToken: string, projectId: s refresh_token: refreshToken, grant_type: "refresh_token", }), + signal: AbortSignal.timeout(30_000), }); if (!response.ok) { @@ -392,6 +395,7 @@ export async function loginAntigravity( redirect_uri: REDIRECT_URI, code_verifier: verifier, }), + signal: AbortSignal.timeout(30_000), }); if (!tokenResponse.ok) { diff --git a/packages/pi-ai/src/utils/oauth/google-gemini-cli.ts b/packages/pi-ai/src/utils/oauth/google-gemini-cli.ts index a90524fea..8650e8afd 100644 --- a/packages/pi-ai/src/utils/oauth/google-gemini-cli.ts +++ b/packages/pi-ai/src/utils/oauth/google-gemini-cli.ts @@ -207,6 +207,7 @@ async function pollOperation( const response = await fetch(`${CODE_ASSIST_ENDPOINT}/v1internal/${operationName}`, { method: "GET", headers, + signal: AbortSignal.timeout(30_000), }); if (!response.ok) { @@ -250,6 +251,7 @@ async function discoverProject(accessToken: string, onProgress?: (message: strin duetProject: envProjectId, }, }), + signal: AbortSignal.timeout(30_000), }); let data: LoadCodeAssistPayload; @@ -321,6 +323,7 @@ async function discoverProject(accessToken: string, onProgress?: (message: strin method: "POST", headers, body: JSON.stringify(onboardBody), + signal: AbortSignal.timeout(30_000), }); if (!onboardResponse.ok) { @@ -362,6 +365,7 @@ async function getUserEmail(accessToken: string): Promise { headers: { Authorization: `Bearer ${accessToken}`, }, + signal: AbortSignal.timeout(30_000), }); if (response.ok) { @@ -387,6 +391,7 @@ export async function refreshGoogleCloudToken(refreshToken: string, projectId: s refresh_token: refreshToken, grant_type: "refresh_token", }), + signal: AbortSignal.timeout(30_000), }); if (!response.ok) { @@ -534,6 +539,7 @@ export async function loginGeminiCli( redirect_uri: REDIRECT_URI, code_verifier: verifier, }), + signal: AbortSignal.timeout(30_000), }); if (!tokenResponse.ok) { diff --git a/packages/pi-ai/src/utils/oauth/index.ts b/packages/pi-ai/src/utils/oauth/index.ts index 9108992f2..a91decf4a 100644 --- a/packages/pi-ai/src/utils/oauth/index.ts +++ b/packages/pi-ai/src/utils/oauth/index.ts @@ -126,8 +126,8 @@ export async function getOAuthApiKey( if (Date.now() >= creds.expires) { try { creds = await provider.refreshToken(creds); - } catch (_error) { - throw new Error(`Failed to refresh OAuth token for ${providerId}`); + } catch (error) { + throw new Error(`Failed to refresh OAuth token for ${providerId}`, { cause: error }); } } diff --git a/packages/pi-ai/src/utils/oauth/openai-codex.ts b/packages/pi-ai/src/utils/oauth/openai-codex.ts index 820168d91..8c5bd56bd 100644 --- a/packages/pi-ai/src/utils/oauth/openai-codex.ts +++ b/packages/pi-ai/src/utils/oauth/openai-codex.ts @@ -114,6 +114,7 @@ async function exchangeAuthorizationCode( code_verifier: verifier, redirect_uri: redirectUri, }), + signal: AbortSignal.timeout(30_000), }); if (!response.ok) { @@ -151,6 +152,7 @@ async function refreshAccessToken(refreshToken: string): Promise { refresh_token: refreshToken, client_id: CLIENT_ID, }), + signal: AbortSignal.timeout(30_000), }); if (!response.ok) { diff --git a/packages/pi-coding-agent/src/core/bash-executor.ts b/packages/pi-coding-agent/src/core/bash-executor.ts index bb0b2cb81..3931a7a25 100644 --- a/packages/pi-coding-agent/src/core/bash-executor.ts +++ b/packages/pi-coding-agent/src/core/bash-executor.ts @@ -7,10 +7,28 @@ */ import { randomBytes } from "node:crypto"; -import { createWriteStream, type WriteStream } from "node:fs"; +import { createWriteStream, unlinkSync, type WriteStream } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { type ChildProcess, spawn } from "child_process"; + +/** Track temp files created by bash execution for cleanup on exit. */ +const bashTempFiles = new Set(); + +let cleanupRegistered = false; +function registerTempCleanup(): void { + if (cleanupRegistered) return; + cleanupRegistered = true; + process.on("exit", () => { + for (const file of bashTempFiles) { + try { + unlinkSync(file); + } catch { + // Best-effort cleanup + } + } + }); +} import { processStreamChunk, type StreamState } from "@gsd/native"; import { getShellConfig, getShellEnv, killProcessTree, sanitizeCommand } from "../utils/shell.js"; import type { BashOperations } from "./tools/bash.js"; @@ -111,8 +129,10 @@ export function executeBash(command: string, options?: BashExecutorOptions): Pro // Start writing to temp file if exceeds threshold if (totalBytes > DEFAULT_MAX_BYTES && !tempFilePath) { + registerTempCleanup(); const id = randomBytes(8).toString("hex"); tempFilePath = join(tmpdir(), `pi-bash-${id}.log`); + bashTempFiles.add(tempFilePath); tempFileStream = createWriteStream(tempFilePath); // Write already-buffered chunks to temp file for (const chunk of outputChunks) { @@ -212,8 +232,10 @@ export async function executeBashWithOperations( // Start writing to temp file if exceeds threshold if (totalBytes > DEFAULT_MAX_BYTES && !tempFilePath) { + registerTempCleanup(); const id = randomBytes(8).toString("hex"); tempFilePath = join(tmpdir(), `pi-bash-${id}.log`); + bashTempFiles.add(tempFilePath); tempFileStream = createWriteStream(tempFilePath); for (const chunk of outputChunks) { tempFileStream.write(chunk); diff --git a/packages/pi-coding-agent/src/core/blob-store.ts b/packages/pi-coding-agent/src/core/blob-store.ts index 548dba0f6..16262c892 100644 --- a/packages/pi-coding-agent/src/core/blob-store.ts +++ b/packages/pi-coding-agent/src/core/blob-store.ts @@ -6,7 +6,7 @@ * provides automatic deduplication across sessions. */ import { createHash } from "node:crypto"; -import { mkdirSync, readFileSync, writeFileSync, existsSync, accessSync } from "node:fs"; +import { mkdirSync, readdirSync, readFileSync, writeFileSync, existsSync, accessSync, unlinkSync, statSync } from "node:fs"; import { join } from "node:path"; const BLOB_PREFIX = "blob:sha256:"; @@ -64,6 +64,51 @@ export class BlobStore { return false; } } + + /** + * Remove blobs not referenced by any session file. + * @param referencedHashes Set of SHA-256 hashes still referenced in session files. + * @returns Number of orphaned blobs removed. + */ + gc(referencedHashes: Set): number { + let removed = 0; + try { + const entries = readdirSync(this.dir); + for (const entry of entries) { + if (!SHA256_HEX_RE.test(entry)) continue; + if (!referencedHashes.has(entry)) { + try { + unlinkSync(join(this.dir, entry)); + removed++; + } catch { + // Best-effort removal + } + } + } + } catch { + // Blob dir may not exist or be unreadable + } + return removed; + } + + /** Get total size of all blobs in bytes, or 0 if the directory is empty/unreadable. */ + totalSize(): number { + try { + const entries = readdirSync(this.dir); + let total = 0; + for (const entry of entries) { + if (!SHA256_HEX_RE.test(entry)) continue; + try { + total += statSync(join(this.dir, entry)).size; + } catch { + // Skip unreadable files + } + } + return total; + } catch { + return 0; + } + } } /** Check if a data string is a blob reference. */ diff --git a/packages/pi-coding-agent/src/core/extensions/runner.test.ts b/packages/pi-coding-agent/src/core/extensions/runner.test.ts new file mode 100644 index 000000000..b11ae2d9a --- /dev/null +++ b/packages/pi-coding-agent/src/core/extensions/runner.test.ts @@ -0,0 +1,84 @@ +import assert from "node:assert/strict"; +import { describe, it, mock } from "node:test"; +import { ExtensionRunner } from "./runner.js"; +import type { Extension, ExtensionRuntime, ToolCallEvent } from "./index.js"; +import { SessionManager } from "../session-manager.js"; +import { ModelRegistry } from "../model-registry.js"; +import { mkdtempSync, rmSync } from "node:fs"; +import { join } from "node:path"; +import { tmpdir } from "node:os"; +import { AuthStorage } from "../auth-storage.js"; + +function makeMinimalRuntime(): ExtensionRuntime { + return { + sendMessage: async () => {}, + sendUserMessage: async () => {}, + appendEntry: () => {}, + setSessionName: () => {}, + getSessionName: () => undefined, + setLabel: () => {}, + getActiveTools: () => [], + getAllTools: () => [], + setActiveTools: () => {}, + refreshTools: () => {}, + getCommands: () => [], + setModel: async () => {}, + getThinkingLevel: () => undefined, + setThinkingLevel: () => {}, + registerProvider: () => {}, + unregisterProvider: () => {}, + pendingProviderRegistrations: [], + } as unknown as ExtensionRuntime; +} + +function makeThrowingExtension(eventType: string, error: Error): Extension { + const handlers = new Map(); + handlers.set(eventType, [ + async () => { + throw error; + }, + ]); + return { + path: "/test/throwing-ext", + handlers, + commands: [], + shortcuts: [], + diagnostics: [], + } as unknown as Extension; +} + +describe("ExtensionRunner.emitToolCall", () => { + it("catches throwing extension handler and routes to emitError", async () => { + const dir = mkdtempSync(join(tmpdir(), "runner-test-")); + try { + const sessionManager = SessionManager.create(dir, dir); + const authStorage = AuthStorage.create(); + const modelRegistry = new ModelRegistry(authStorage, join(dir, "models.json")); + + const throwingExt = makeThrowingExtension("tool_call", new Error("handler crashed")); + const runtime = makeMinimalRuntime(); + const runner = new ExtensionRunner([throwingExt], runtime, dir, sessionManager, modelRegistry); + + const errors: any[] = []; + runner.onError((err) => errors.push(err)); + + const event: ToolCallEvent = { + type: "tool_call", + toolCallId: "test-123", + toolName: "test_tool", + input: {}, + } as ToolCallEvent; + + const result = await runner.emitToolCall(event); + + // Should not throw — error is caught and routed to emitError + assert.equal(result, undefined); + assert.equal(errors.length, 1); + assert.equal(errors[0].error, "handler crashed"); + assert.equal(errors[0].event, "tool_call"); + assert.equal(errors[0].extensionPath, "/test/throwing-ext"); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); +}); diff --git a/packages/pi-coding-agent/src/core/extensions/runner.ts b/packages/pi-coding-agent/src/core/extensions/runner.ts index ccce69e99..0edd78a82 100644 --- a/packages/pi-coding-agent/src/core/extensions/runner.ts +++ b/packages/pi-coding-agent/src/core/extensions/runner.ts @@ -211,11 +211,21 @@ export class ExtensionRunner { private getContextUsageFn: () => ContextUsage | undefined = () => undefined; private compactFn: (options?: CompactOptions) => void = () => {}; private getSystemPromptFn: () => string = () => ""; - private newSessionHandler: NewSessionHandler = async () => ({ cancelled: false }); - private forkHandler: ForkHandler = async () => ({ cancelled: false }); - private navigateTreeHandler: NavigateTreeHandler = async () => ({ cancelled: false }); - private switchSessionHandler: SwitchSessionHandler = async () => ({ cancelled: false }); - private reloadHandler: ReloadHandler = async () => {}; + private newSessionHandler: NewSessionHandler = async () => { + throw new Error("Command context not yet bound: newSession is unavailable during early lifecycle"); + }; + private forkHandler: ForkHandler = async () => { + throw new Error("Command context not yet bound: fork is unavailable during early lifecycle"); + }; + private navigateTreeHandler: NavigateTreeHandler = async () => { + throw new Error("Command context not yet bound: navigateTree is unavailable during early lifecycle"); + }; + private switchSessionHandler: SwitchSessionHandler = async () => { + throw new Error("Command context not yet bound: switchSession is unavailable during early lifecycle"); + }; + private reloadHandler: ReloadHandler = async () => { + throw new Error("Command context not yet bound: reload is unavailable during early lifecycle"); + }; private shutdownHandler: ShutdownHandler = () => {}; private shortcutDiagnostics: ResourceDiagnostic[] = []; private commandDiagnostics: ResourceDiagnostic[] = []; @@ -637,13 +647,24 @@ export class ExtensionRunner { if (!handlers || handlers.length === 0) continue; for (const handler of handlers) { - const handlerResult = await handler(event, ctx); + try { + const handlerResult = await handler(event, ctx); - if (handlerResult) { - result = handlerResult as ToolCallEventResult; - if (result.block) { - return result; + if (handlerResult) { + result = handlerResult as ToolCallEventResult; + if (result.block) { + return result; + } } + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + const stack = err instanceof Error ? err.stack : undefined; + this.emitError({ + extensionPath: ext.path, + event: "tool_call", + error: message, + stack, + }); } } } diff --git a/packages/pi-coding-agent/src/core/fs-utils.test.ts b/packages/pi-coding-agent/src/core/fs-utils.test.ts new file mode 100644 index 000000000..997080e4c --- /dev/null +++ b/packages/pi-coding-agent/src/core/fs-utils.test.ts @@ -0,0 +1,66 @@ +import assert from "node:assert/strict"; +import { describe, it } from "node:test"; +import { mkdtempSync, readFileSync, rmSync, existsSync } from "node:fs"; +import { join } from "node:path"; +import { tmpdir } from "node:os"; +import { atomicWriteFileSync } from "./fs-utils.js"; + +describe("atomicWriteFileSync", () => { + it("writes file content atomically", () => { + const dir = mkdtempSync(join(tmpdir(), "fs-utils-test-")); + try { + const filePath = join(dir, "test.txt"); + atomicWriteFileSync(filePath, "hello world"); + assert.equal(readFileSync(filePath, "utf-8"), "hello world"); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + + it("overwrites existing file atomically", () => { + const dir = mkdtempSync(join(tmpdir(), "fs-utils-test-")); + try { + const filePath = join(dir, "test.txt"); + atomicWriteFileSync(filePath, "first"); + atomicWriteFileSync(filePath, "second"); + assert.equal(readFileSync(filePath, "utf-8"), "second"); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + + it("does not leave .tmp file after successful write", () => { + const dir = mkdtempSync(join(tmpdir(), "fs-utils-test-")); + try { + const filePath = join(dir, "test.txt"); + atomicWriteFileSync(filePath, "content"); + assert.equal(existsSync(filePath + ".tmp"), false); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + + it("supports Buffer content", () => { + const dir = mkdtempSync(join(tmpdir(), "fs-utils-test-")); + try { + const filePath = join(dir, "test.bin"); + const buf = Buffer.from([0x00, 0x01, 0x02, 0xff]); + atomicWriteFileSync(filePath, buf); + const result = readFileSync(filePath); + assert.deepEqual(result, buf); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + + it("supports encoding parameter", () => { + const dir = mkdtempSync(join(tmpdir(), "fs-utils-test-")); + try { + const filePath = join(dir, "test.txt"); + atomicWriteFileSync(filePath, "utf8 content", "utf-8"); + assert.equal(readFileSync(filePath, "utf-8"), "utf8 content"); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); +}); diff --git a/packages/pi-coding-agent/src/core/fs-utils.ts b/packages/pi-coding-agent/src/core/fs-utils.ts new file mode 100644 index 000000000..09e9c6ae9 --- /dev/null +++ b/packages/pi-coding-agent/src/core/fs-utils.ts @@ -0,0 +1,12 @@ +import { renameSync, writeFileSync } from "node:fs"; + +/** + * Atomically write a file by writing to a temporary path then renaming. + * This prevents data loss if the process crashes mid-write — either the + * old file remains intact or the new content is fully written. + */ +export function atomicWriteFileSync(filePath: string, content: string | Buffer, encoding?: BufferEncoding): void { + const tmpPath = filePath + ".tmp"; + writeFileSync(tmpPath, content, encoding); + renameSync(tmpPath, filePath); +} diff --git a/packages/pi-coding-agent/src/core/lsp/client.ts b/packages/pi-coding-agent/src/core/lsp/client.ts index 33682fd50..8f31afbe5 100644 --- a/packages/pi-coding-agent/src/core/lsp/client.ts +++ b/packages/pi-coding-agent/src/core/lsp/client.ts @@ -197,8 +197,12 @@ function parseMessage( let message: LspJsonRpcResponse | LspJsonRpcNotification; try { message = JSON.parse(messageText); - } catch { - // Malformed JSON from LSP server — skip this message and advance past it + } catch (err) { + // Malformed JSON from LSP server — log and skip this message + if (process.env.DEBUG) { + const preview = messageText.length > 200 ? messageText.slice(0, 200) + "..." : messageText; + console.error(`[lsp] Dropped malformed JSON message: ${err instanceof Error ? err.message : err} — ${preview}`); + } return { message: null, remaining }; } @@ -409,6 +413,22 @@ export const WARMUP_TIMEOUT_MS = 5000; * Get or create an LSP client for the given server configuration and working directory. */ export async function getOrCreateClient(config: ServerConfig, cwd: string, initTimeoutMs?: number): Promise { + const maxRetries = 2; + let lastErr: unknown; + for (let attempt = 0; attempt <= maxRetries; attempt++) { + try { + return await getOrCreateClientOnce(config, cwd, initTimeoutMs); + } catch (err) { + lastErr = err; + if (attempt < maxRetries) { + await new Promise((resolve) => setTimeout(resolve, 1000 * (attempt + 1))); + } + } + } + throw lastErr; +} + +async function getOrCreateClientOnce(config: ServerConfig, cwd: string, initTimeoutMs?: number): Promise { const key = `${config.command}:${cwd}`; const existingClient = clients.get(key); diff --git a/packages/pi-coding-agent/src/core/session-manager.ts b/packages/pi-coding-agent/src/core/session-manager.ts index fe9cbf7f6..b61605b81 100644 --- a/packages/pi-coding-agent/src/core/session-manager.ts +++ b/packages/pi-coding-agent/src/core/session-manager.ts @@ -13,8 +13,10 @@ import { statSync, writeFileSync, } from "fs"; +import { atomicWriteFileSync } from "./fs-utils.js"; import { readdir, readFile, stat } from "fs/promises"; import { join, resolve } from "path"; +import lockfile from "proper-lockfile"; import { getAgentDir as getDefaultAgentDir, getBlobsDir, getSessionsDir } from "../config.js"; import { type BashExecutionMessage, @@ -821,6 +823,37 @@ export class SessionManager { } } + /** + * Check if the last assistant turn in the session appears to have been + * interrupted (e.g., the last message is from the assistant with tool_use + * blocks but no subsequent tool_result message). + */ + wasInterrupted(): boolean { + // Walk backwards to find the last message entry + for (let i = this.fileEntries.length - 1; i >= 0; i--) { + const entry = this.fileEntries[i]; + if (entry.type !== "message") continue; + + const msg = entry.message; + if (msg.role === "user") return false; // clean user turn boundary + if (msg.role === "assistant") { + // Check if the assistant message contains tool_use blocks + const content = Array.isArray(msg.content) ? msg.content : []; + const hasToolUse = content.some( + (block) => block.type === "toolCall", + ); + if (hasToolUse) { + // If the last message is an assistant tool_use with no following + // tool_result message, the turn was likely interrupted + return true; + } + return false; // assistant message without tool_use = completed text response + } + return false; + } + return false; + } + /** Switch to a different session file (used for resume and branching) */ setSessionFile(sessionFile: string): void { this.sessionFile = resolve(sessionFile); @@ -903,10 +936,43 @@ export class SessionManager { } } + private acquireSessionLock(path: string): (() => void) | undefined { + const maxAttempts = 10; + const delayMs = 20; + let lastError: unknown; + + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + try { + return lockfile.lockSync(path, { realpath: false }); + } catch (error) { + const code = + typeof error === "object" && error !== null && "code" in error + ? String((error as { code?: unknown }).code) + : undefined; + if (code !== "ELOCKED" || attempt === maxAttempts) { + // Non-fatal: proceed without lock rather than losing data + return undefined; + } + lastError = error; + const start = Date.now(); + while (Date.now() - start < delayMs) { + // Busy-wait to avoid async + } + } + } + return undefined; + } + private _rewriteFile(): void { if (!this.persist || !this.sessionFile) return; const content = `${this.fileEntries.map((e) => JSON.stringify(e)).join("\n")}\n`; - writeFileSync(this.sessionFile, content); + let release: (() => void) | undefined; + try { + release = this.acquireSessionLock(this.sessionFile); + atomicWriteFileSync(this.sessionFile, content); + } finally { + release?.(); + } } isPersisted(): boolean { @@ -939,15 +1005,21 @@ export class SessionManager { return; } - if (!this.flushed) { - for (const e of this.fileEntries) { - const prepared = prepareForPersistence(e, this.blobStore) as FileEntry; + let release: (() => void) | undefined; + try { + release = this.acquireSessionLock(this.sessionFile); + if (!this.flushed) { + for (const e of this.fileEntries) { + const prepared = prepareForPersistence(e, this.blobStore) as FileEntry; + appendFileSync(this.sessionFile, `${JSON.stringify(prepared)}\n`); + } + this.flushed = true; + } else { + const prepared = prepareForPersistence(entry, this.blobStore) as FileEntry; appendFileSync(this.sessionFile, `${JSON.stringify(prepared)}\n`); } - this.flushed = true; - } else { - const prepared = prepareForPersistence(entry, this.blobStore) as FileEntry; - appendFileSync(this.sessionFile, `${JSON.stringify(prepared)}\n`); + } finally { + release?.(); } } @@ -1495,14 +1567,14 @@ export class SessionManager { cwd: targetCwd, parentSession: sourcePath, }; - appendFileSync(newSessionFile, `${JSON.stringify(newHeader)}\n`); - - // Copy all non-header entries from source + // Build complete fork content and write atomically to prevent partial files on crash + const lines = [JSON.stringify(newHeader)]; for (const entry of sourceEntries) { if (entry.type !== "session") { - appendFileSync(newSessionFile, `${JSON.stringify(entry)}\n`); + lines.push(JSON.stringify(entry)); } } + atomicWriteFileSync(newSessionFile, lines.join("\n") + "\n"); return new SessionManager(targetCwd, dir, newSessionFile, true); } diff --git a/packages/pi-coding-agent/src/main.ts b/packages/pi-coding-agent/src/main.ts index 7152f63b3..1f1c961e0 100644 --- a/packages/pi-coding-agent/src/main.ts +++ b/packages/pi-coding-agent/src/main.ts @@ -577,6 +577,13 @@ async function handleConfigCommand(args: string[]): Promise { } export async function main(args: string[]) { + // Catch unhandled promise rejections so the process doesn't silently disappear + process.on("unhandledRejection", (reason) => { + const message = reason instanceof Error ? reason.stack ?? reason.message : String(reason); + console.error(`\nFatal: unhandled promise rejection\n${message}`); + process.exitCode = 1; + }); + const offlineMode = args.includes("--offline") || isTruthyEnvFlag(process.env.PI_OFFLINE); if (offlineMode) { process.env.PI_OFFLINE = "1"; diff --git a/packages/pi-coding-agent/src/modes/interactive/interactive-mode.ts b/packages/pi-coding-agent/src/modes/interactive/interactive-mode.ts index 5faa9efa9..fbff35330 100644 --- a/packages/pi-coding-agent/src/modes/interactive/interactive-mode.ts +++ b/packages/pi-coding-agent/src/modes/interactive/interactive-mode.ts @@ -2764,6 +2764,9 @@ export class InteractiveMode { if (this.isShuttingDown) return; this.isShuttingDown = true; + // Flush any queued settings writes before shutdown + await this.settingsManager.flush(); + // Emit shutdown event to extensions const extensionRunner = this.session.extensionRunner; if (extensionRunner?.hasHandlers("session_shutdown")) { @@ -3860,7 +3863,12 @@ export class InteractiveMode { // Clear and re-render the chat this.chatContainer.clear(); this.renderInitialMessages(); - this.showStatus("Resumed session"); + + if (this.session.sessionManager.wasInterrupted()) { + this.showStatus("Resumed session (previous session ended unexpectedly — last action may be incomplete)"); + } else { + this.showStatus("Resumed session"); + } } private showProviderManager(): void { diff --git a/packages/pi-coding-agent/src/modes/rpc/rpc-client.ts b/packages/pi-coding-agent/src/modes/rpc/rpc-client.ts index a93dbb412..a3f91ecc4 100644 --- a/packages/pi-coding-agent/src/modes/rpc/rpc-client.ts +++ b/packages/pi-coding-agent/src/modes/rpc/rpc-client.ts @@ -99,6 +99,18 @@ export class RpcClient { this.handleLine(line); }); + // Detect unexpected subprocess exit and reject all pending requests + this.process.on("exit", (code, signal) => { + if (this.pendingRequests.size > 0) { + const reason = signal ? `signal ${signal}` : `code ${code}`; + const error = new Error(`Agent process exited unexpectedly (${reason}). Stderr: ${this.stderr}`); + for (const [id, pending] of this.pendingRequests) { + this.pendingRequests.delete(id); + pending.reject(error); + } + } + }); + // Wait a moment for process to initialize await new Promise((resolve) => setTimeout(resolve, 100)); diff --git a/src/resources/extensions/gsd/auto.ts b/src/resources/extensions/gsd/auto.ts index e154e2b5f..09e3ced19 100644 --- a/src/resources/extensions/gsd/auto.ts +++ b/src/resources/extensions/gsd/auto.ts @@ -103,7 +103,7 @@ import { import { computeBudgets, resolveExecutorContextWindow } from "./context-budget.js"; import { join } from "node:path"; import { sep as pathSep } from "node:path"; -import { readdirSync, readFileSync, existsSync, mkdirSync, writeFileSync, unlinkSync, statSync } from "node:fs"; +import { readdirSync, readFileSync, existsSync, mkdirSync, writeFileSync, renameSync, unlinkSync, statSync } from "node:fs"; import { nativeIsRepo, nativeInit, nativeAddPaths, nativeCommit } from "./native-git-bridge.js"; import { autoCommitCurrentBranch, @@ -2138,7 +2138,11 @@ async function dispatchNextUnit( // Clear completed-units.json for the finished milestone try { const file = completedKeysPath(s.basePath); - if (existsSync(file)) writeFileSync(file, JSON.stringify([]), "utf-8"); + if (existsSync(file)) { + const tmpFile = file + ".tmp"; + writeFileSync(tmpFile, JSON.stringify([]), "utf-8"); + renameSync(tmpFile, file); + } s.completedKeySet.clear(); } catch { /* non-fatal */ } @@ -2286,7 +2290,11 @@ async function dispatchNextUnit( // Clear completed-units.json for the finished milestone so it doesn't grow unbounded. try { const file = completedKeysPath(s.basePath); - if (existsSync(file)) writeFileSync(file, JSON.stringify([]), "utf-8"); + if (existsSync(file)) { + const tmpFile = file + ".tmp"; + writeFileSync(tmpFile, JSON.stringify([]), "utf-8"); + renameSync(tmpFile, file); + } s.completedKeySet.clear(); } catch { /* non-fatal */ } // ── Milestone merge: squash-merge milestone branch to main before stopping ── diff --git a/src/resources/extensions/gsd/crash-recovery.ts b/src/resources/extensions/gsd/crash-recovery.ts index d58f903e4..3686df402 100644 --- a/src/resources/extensions/gsd/crash-recovery.ts +++ b/src/resources/extensions/gsd/crash-recovery.ts @@ -10,7 +10,7 @@ * so the file on disk reflects every tool call up to the crash point). */ -import { writeFileSync, readFileSync, unlinkSync, existsSync } from "node:fs"; +import { renameSync, writeFileSync, readFileSync, unlinkSync, existsSync } from "node:fs"; import { join } from "node:path"; import { gsdRoot } from "./paths.js"; @@ -49,7 +49,10 @@ export function writeLock( completedUnits, sessionFile, }; - writeFileSync(lockPath(basePath), JSON.stringify(data, null, 2), "utf-8"); + const lp = lockPath(basePath); + const tmpLp = lp + ".tmp"; + writeFileSync(tmpLp, JSON.stringify(data, null, 2), "utf-8"); + renameSync(tmpLp, lp); } catch (e) { /* non-fatal: lock write failure */ void e; } }