fix: failure recovery & resume safeguards (all 4 waves) (#956)
* fix: prevent data loss on crash with atomic writes, file locking, and error handling Wave 1 of failure recovery safeguards: 1. Atomic session file rewrites (tmp+rename) — _rewriteFile() and forkFrom() now use atomicWriteFileSync to prevent session file corruption on crash 2. Atomic auto.lock writes — crash-recovery.ts writeLock() uses tmp+rename so the crash detection system itself can't be corrupted 3. unhandledRejection handler — catches silent process death from unhandled promise rejections in OAuth, extensions, LSP, or MCP connections 4. try/catch in emitToolCall — matches pattern used by emitUserBash, emitContext, and emitToolResult to prevent extension handler crashes from killing the entire agent turn 5. File locking on session appends — prevents concurrent pi instances from interleaving partial JSON lines in session JSONL files using the same proper-lockfile pattern established in auth-storage.ts and settings-manager.ts * fix: add OAuth timeouts, RPC exit detection, and command context guards Wave 2 of failure recovery safeguards: 1. OAuth fetch timeouts — all fetch() calls across all OAuth providers (Anthropic, OpenAI Codex, Google Antigravity, Google Gemini CLI, GitHub Copilot) now have 30-second AbortSignal.timeout() to prevent indefinite hangs when OAuth servers are unresponsive 2. RPC subprocess exit detection — pending requests are now rejected when the agent subprocess exits unexpectedly, preventing indefinite hangs in the RPC client 3. Extension command context guards — default handlers for newSession, fork, navigateTree, switchSession, and reload now throw explicit errors instead of silently returning success when called before bindCommandContext() 4. OAuth error detail preservation — token refresh errors now preserve the original error as `cause` for better diagnostics * fix: resource cleanup, LSP retry, and crash detection on session resume Wave 3 of failure recovery safeguards: 1. Atomic completed-units.json cleanup — milestone completion writes now use tmp+rename pattern for consistency with auto-recovery.ts 2. Bash temp file cleanup — track temp files created for large output and register a process exit handler to clean them up 3. Settings write queue flush on shutdown — call settingsManager.flush() during interactive mode shutdown so queued writes aren't lost 4. LSP initialization retry — wrap getOrCreateClient with up to 2 retries with exponential backoff (1s, 2s) for transient spawn failures 5. Crash detection on session resume — wasInterrupted() checks if last assistant turn had tool calls without results, shows warning on resume * fix: blob garbage collection and LSP debug logging Wave 4 of failure recovery safeguards: 1. Blob garbage collection — BlobStore.gc(referencedHashes) removes orphaned blobs not referenced by any session file, plus totalSize() for monitoring blob directory growth 2. LSP JSON parse error logging — malformed LSP messages are now logged at debug level (when DEBUG env is set) instead of being silently dropped
This commit is contained in:
parent
87122e0b7a
commit
0e0f47ef9f
19 changed files with 433 additions and 35 deletions
|
|
@ -61,6 +61,7 @@ export async function loginAnthropic(
|
|||
redirect_uri: REDIRECT_URI,
|
||||
code_verifier: verifier,
|
||||
}),
|
||||
signal: AbortSignal.timeout(30_000),
|
||||
});
|
||||
|
||||
if (!tokenResponse.ok) {
|
||||
|
|
@ -97,6 +98,7 @@ export async function refreshAnthropicToken(refreshToken: string): Promise<OAuth
|
|||
client_id: CLIENT_ID,
|
||||
refresh_token: refreshToken,
|
||||
}),
|
||||
signal: AbortSignal.timeout(30_000),
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
|
|
|
|||
|
|
@ -89,7 +89,10 @@ export function getGitHubCopilotBaseUrl(token?: string, enterpriseDomain?: strin
|
|||
}
|
||||
|
||||
async function fetchJson(url: string, init: RequestInit): Promise<unknown> {
|
||||
const response = await fetch(url, init);
|
||||
const response = await fetch(url, {
|
||||
...init,
|
||||
signal: init.signal ?? AbortSignal.timeout(30_000),
|
||||
});
|
||||
if (!response.ok) {
|
||||
const text = await response.text();
|
||||
throw new Error(`${response.status} ${response.statusText}: ${text}`);
|
||||
|
|
@ -276,6 +279,7 @@ async function enableGitHubCopilotModel(token: string, modelId: string, enterpri
|
|||
"x-interaction-type": "chat-policy",
|
||||
},
|
||||
body: JSON.stringify({ state: "enabled" }),
|
||||
signal: AbortSignal.timeout(30_000),
|
||||
});
|
||||
return response.ok;
|
||||
} catch {
|
||||
|
|
|
|||
|
|
@ -184,6 +184,7 @@ async function discoverProject(accessToken: string, onProgress?: (message: strin
|
|||
pluginType: "GEMINI",
|
||||
},
|
||||
}),
|
||||
signal: AbortSignal.timeout(30_000),
|
||||
});
|
||||
|
||||
if (loadResponse.ok) {
|
||||
|
|
@ -220,6 +221,7 @@ async function getUserEmail(accessToken: string): Promise<string | undefined> {
|
|||
headers: {
|
||||
Authorization: `Bearer ${accessToken}`,
|
||||
},
|
||||
signal: AbortSignal.timeout(30_000),
|
||||
});
|
||||
|
||||
if (response.ok) {
|
||||
|
|
@ -245,6 +247,7 @@ export async function refreshAntigravityToken(refreshToken: string, projectId: s
|
|||
refresh_token: refreshToken,
|
||||
grant_type: "refresh_token",
|
||||
}),
|
||||
signal: AbortSignal.timeout(30_000),
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
|
|
@ -392,6 +395,7 @@ export async function loginAntigravity(
|
|||
redirect_uri: REDIRECT_URI,
|
||||
code_verifier: verifier,
|
||||
}),
|
||||
signal: AbortSignal.timeout(30_000),
|
||||
});
|
||||
|
||||
if (!tokenResponse.ok) {
|
||||
|
|
|
|||
|
|
@ -207,6 +207,7 @@ async function pollOperation(
|
|||
const response = await fetch(`${CODE_ASSIST_ENDPOINT}/v1internal/${operationName}`, {
|
||||
method: "GET",
|
||||
headers,
|
||||
signal: AbortSignal.timeout(30_000),
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
|
|
@ -250,6 +251,7 @@ async function discoverProject(accessToken: string, onProgress?: (message: strin
|
|||
duetProject: envProjectId,
|
||||
},
|
||||
}),
|
||||
signal: AbortSignal.timeout(30_000),
|
||||
});
|
||||
|
||||
let data: LoadCodeAssistPayload;
|
||||
|
|
@ -321,6 +323,7 @@ async function discoverProject(accessToken: string, onProgress?: (message: strin
|
|||
method: "POST",
|
||||
headers,
|
||||
body: JSON.stringify(onboardBody),
|
||||
signal: AbortSignal.timeout(30_000),
|
||||
});
|
||||
|
||||
if (!onboardResponse.ok) {
|
||||
|
|
@ -362,6 +365,7 @@ async function getUserEmail(accessToken: string): Promise<string | undefined> {
|
|||
headers: {
|
||||
Authorization: `Bearer ${accessToken}`,
|
||||
},
|
||||
signal: AbortSignal.timeout(30_000),
|
||||
});
|
||||
|
||||
if (response.ok) {
|
||||
|
|
@ -387,6 +391,7 @@ export async function refreshGoogleCloudToken(refreshToken: string, projectId: s
|
|||
refresh_token: refreshToken,
|
||||
grant_type: "refresh_token",
|
||||
}),
|
||||
signal: AbortSignal.timeout(30_000),
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
|
|
@ -534,6 +539,7 @@ export async function loginGeminiCli(
|
|||
redirect_uri: REDIRECT_URI,
|
||||
code_verifier: verifier,
|
||||
}),
|
||||
signal: AbortSignal.timeout(30_000),
|
||||
});
|
||||
|
||||
if (!tokenResponse.ok) {
|
||||
|
|
|
|||
|
|
@ -126,8 +126,8 @@ export async function getOAuthApiKey(
|
|||
if (Date.now() >= creds.expires) {
|
||||
try {
|
||||
creds = await provider.refreshToken(creds);
|
||||
} catch (_error) {
|
||||
throw new Error(`Failed to refresh OAuth token for ${providerId}`);
|
||||
} catch (error) {
|
||||
throw new Error(`Failed to refresh OAuth token for ${providerId}`, { cause: error });
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -114,6 +114,7 @@ async function exchangeAuthorizationCode(
|
|||
code_verifier: verifier,
|
||||
redirect_uri: redirectUri,
|
||||
}),
|
||||
signal: AbortSignal.timeout(30_000),
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
|
|
@ -151,6 +152,7 @@ async function refreshAccessToken(refreshToken: string): Promise<TokenResult> {
|
|||
refresh_token: refreshToken,
|
||||
client_id: CLIENT_ID,
|
||||
}),
|
||||
signal: AbortSignal.timeout(30_000),
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
|
|
|
|||
|
|
@ -7,10 +7,28 @@
|
|||
*/
|
||||
|
||||
import { randomBytes } from "node:crypto";
|
||||
import { createWriteStream, type WriteStream } from "node:fs";
|
||||
import { createWriteStream, unlinkSync, type WriteStream } from "node:fs";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { type ChildProcess, spawn } from "child_process";
|
||||
|
||||
/** Track temp files created by bash execution for cleanup on exit. */
|
||||
const bashTempFiles = new Set<string>();
|
||||
|
||||
let cleanupRegistered = false;
|
||||
function registerTempCleanup(): void {
|
||||
if (cleanupRegistered) return;
|
||||
cleanupRegistered = true;
|
||||
process.on("exit", () => {
|
||||
for (const file of bashTempFiles) {
|
||||
try {
|
||||
unlinkSync(file);
|
||||
} catch {
|
||||
// Best-effort cleanup
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
import { processStreamChunk, type StreamState } from "@gsd/native";
|
||||
import { getShellConfig, getShellEnv, killProcessTree, sanitizeCommand } from "../utils/shell.js";
|
||||
import type { BashOperations } from "./tools/bash.js";
|
||||
|
|
@ -111,8 +129,10 @@ export function executeBash(command: string, options?: BashExecutorOptions): Pro
|
|||
|
||||
// Start writing to temp file if exceeds threshold
|
||||
if (totalBytes > DEFAULT_MAX_BYTES && !tempFilePath) {
|
||||
registerTempCleanup();
|
||||
const id = randomBytes(8).toString("hex");
|
||||
tempFilePath = join(tmpdir(), `pi-bash-${id}.log`);
|
||||
bashTempFiles.add(tempFilePath);
|
||||
tempFileStream = createWriteStream(tempFilePath);
|
||||
// Write already-buffered chunks to temp file
|
||||
for (const chunk of outputChunks) {
|
||||
|
|
@ -212,8 +232,10 @@ export async function executeBashWithOperations(
|
|||
|
||||
// Start writing to temp file if exceeds threshold
|
||||
if (totalBytes > DEFAULT_MAX_BYTES && !tempFilePath) {
|
||||
registerTempCleanup();
|
||||
const id = randomBytes(8).toString("hex");
|
||||
tempFilePath = join(tmpdir(), `pi-bash-${id}.log`);
|
||||
bashTempFiles.add(tempFilePath);
|
||||
tempFileStream = createWriteStream(tempFilePath);
|
||||
for (const chunk of outputChunks) {
|
||||
tempFileStream.write(chunk);
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@
|
|||
* provides automatic deduplication across sessions.
|
||||
*/
|
||||
import { createHash } from "node:crypto";
|
||||
import { mkdirSync, readFileSync, writeFileSync, existsSync, accessSync } from "node:fs";
|
||||
import { mkdirSync, readdirSync, readFileSync, writeFileSync, existsSync, accessSync, unlinkSync, statSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
|
||||
const BLOB_PREFIX = "blob:sha256:";
|
||||
|
|
@ -64,6 +64,51 @@ export class BlobStore {
|
|||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove blobs not referenced by any session file.
|
||||
* @param referencedHashes Set of SHA-256 hashes still referenced in session files.
|
||||
* @returns Number of orphaned blobs removed.
|
||||
*/
|
||||
gc(referencedHashes: Set<string>): number {
|
||||
let removed = 0;
|
||||
try {
|
||||
const entries = readdirSync(this.dir);
|
||||
for (const entry of entries) {
|
||||
if (!SHA256_HEX_RE.test(entry)) continue;
|
||||
if (!referencedHashes.has(entry)) {
|
||||
try {
|
||||
unlinkSync(join(this.dir, entry));
|
||||
removed++;
|
||||
} catch {
|
||||
// Best-effort removal
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Blob dir may not exist or be unreadable
|
||||
}
|
||||
return removed;
|
||||
}
|
||||
|
||||
/** Get total size of all blobs in bytes, or 0 if the directory is empty/unreadable. */
|
||||
totalSize(): number {
|
||||
try {
|
||||
const entries = readdirSync(this.dir);
|
||||
let total = 0;
|
||||
for (const entry of entries) {
|
||||
if (!SHA256_HEX_RE.test(entry)) continue;
|
||||
try {
|
||||
total += statSync(join(this.dir, entry)).size;
|
||||
} catch {
|
||||
// Skip unreadable files
|
||||
}
|
||||
}
|
||||
return total;
|
||||
} catch {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Check if a data string is a blob reference. */
|
||||
|
|
|
|||
84
packages/pi-coding-agent/src/core/extensions/runner.test.ts
Normal file
84
packages/pi-coding-agent/src/core/extensions/runner.test.ts
Normal file
|
|
@ -0,0 +1,84 @@
|
|||
import assert from "node:assert/strict";
|
||||
import { describe, it, mock } from "node:test";
|
||||
import { ExtensionRunner } from "./runner.js";
|
||||
import type { Extension, ExtensionRuntime, ToolCallEvent } from "./index.js";
|
||||
import { SessionManager } from "../session-manager.js";
|
||||
import { ModelRegistry } from "../model-registry.js";
|
||||
import { mkdtempSync, rmSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
import { tmpdir } from "node:os";
|
||||
import { AuthStorage } from "../auth-storage.js";
|
||||
|
||||
function makeMinimalRuntime(): ExtensionRuntime {
|
||||
return {
|
||||
sendMessage: async () => {},
|
||||
sendUserMessage: async () => {},
|
||||
appendEntry: () => {},
|
||||
setSessionName: () => {},
|
||||
getSessionName: () => undefined,
|
||||
setLabel: () => {},
|
||||
getActiveTools: () => [],
|
||||
getAllTools: () => [],
|
||||
setActiveTools: () => {},
|
||||
refreshTools: () => {},
|
||||
getCommands: () => [],
|
||||
setModel: async () => {},
|
||||
getThinkingLevel: () => undefined,
|
||||
setThinkingLevel: () => {},
|
||||
registerProvider: () => {},
|
||||
unregisterProvider: () => {},
|
||||
pendingProviderRegistrations: [],
|
||||
} as unknown as ExtensionRuntime;
|
||||
}
|
||||
|
||||
function makeThrowingExtension(eventType: string, error: Error): Extension {
|
||||
const handlers = new Map();
|
||||
handlers.set(eventType, [
|
||||
async () => {
|
||||
throw error;
|
||||
},
|
||||
]);
|
||||
return {
|
||||
path: "/test/throwing-ext",
|
||||
handlers,
|
||||
commands: [],
|
||||
shortcuts: [],
|
||||
diagnostics: [],
|
||||
} as unknown as Extension;
|
||||
}
|
||||
|
||||
describe("ExtensionRunner.emitToolCall", () => {
|
||||
it("catches throwing extension handler and routes to emitError", async () => {
|
||||
const dir = mkdtempSync(join(tmpdir(), "runner-test-"));
|
||||
try {
|
||||
const sessionManager = SessionManager.create(dir, dir);
|
||||
const authStorage = AuthStorage.create();
|
||||
const modelRegistry = new ModelRegistry(authStorage, join(dir, "models.json"));
|
||||
|
||||
const throwingExt = makeThrowingExtension("tool_call", new Error("handler crashed"));
|
||||
const runtime = makeMinimalRuntime();
|
||||
const runner = new ExtensionRunner([throwingExt], runtime, dir, sessionManager, modelRegistry);
|
||||
|
||||
const errors: any[] = [];
|
||||
runner.onError((err) => errors.push(err));
|
||||
|
||||
const event: ToolCallEvent = {
|
||||
type: "tool_call",
|
||||
toolCallId: "test-123",
|
||||
toolName: "test_tool",
|
||||
input: {},
|
||||
} as ToolCallEvent;
|
||||
|
||||
const result = await runner.emitToolCall(event);
|
||||
|
||||
// Should not throw — error is caught and routed to emitError
|
||||
assert.equal(result, undefined);
|
||||
assert.equal(errors.length, 1);
|
||||
assert.equal(errors[0].error, "handler crashed");
|
||||
assert.equal(errors[0].event, "tool_call");
|
||||
assert.equal(errors[0].extensionPath, "/test/throwing-ext");
|
||||
} finally {
|
||||
rmSync(dir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
});
|
||||
|
|
@ -211,11 +211,21 @@ export class ExtensionRunner {
|
|||
private getContextUsageFn: () => ContextUsage | undefined = () => undefined;
|
||||
private compactFn: (options?: CompactOptions) => void = () => {};
|
||||
private getSystemPromptFn: () => string = () => "";
|
||||
private newSessionHandler: NewSessionHandler = async () => ({ cancelled: false });
|
||||
private forkHandler: ForkHandler = async () => ({ cancelled: false });
|
||||
private navigateTreeHandler: NavigateTreeHandler = async () => ({ cancelled: false });
|
||||
private switchSessionHandler: SwitchSessionHandler = async () => ({ cancelled: false });
|
||||
private reloadHandler: ReloadHandler = async () => {};
|
||||
private newSessionHandler: NewSessionHandler = async () => {
|
||||
throw new Error("Command context not yet bound: newSession is unavailable during early lifecycle");
|
||||
};
|
||||
private forkHandler: ForkHandler = async () => {
|
||||
throw new Error("Command context not yet bound: fork is unavailable during early lifecycle");
|
||||
};
|
||||
private navigateTreeHandler: NavigateTreeHandler = async () => {
|
||||
throw new Error("Command context not yet bound: navigateTree is unavailable during early lifecycle");
|
||||
};
|
||||
private switchSessionHandler: SwitchSessionHandler = async () => {
|
||||
throw new Error("Command context not yet bound: switchSession is unavailable during early lifecycle");
|
||||
};
|
||||
private reloadHandler: ReloadHandler = async () => {
|
||||
throw new Error("Command context not yet bound: reload is unavailable during early lifecycle");
|
||||
};
|
||||
private shutdownHandler: ShutdownHandler = () => {};
|
||||
private shortcutDiagnostics: ResourceDiagnostic[] = [];
|
||||
private commandDiagnostics: ResourceDiagnostic[] = [];
|
||||
|
|
@ -637,13 +647,24 @@ export class ExtensionRunner {
|
|||
if (!handlers || handlers.length === 0) continue;
|
||||
|
||||
for (const handler of handlers) {
|
||||
const handlerResult = await handler(event, ctx);
|
||||
try {
|
||||
const handlerResult = await handler(event, ctx);
|
||||
|
||||
if (handlerResult) {
|
||||
result = handlerResult as ToolCallEventResult;
|
||||
if (result.block) {
|
||||
return result;
|
||||
if (handlerResult) {
|
||||
result = handlerResult as ToolCallEventResult;
|
||||
if (result.block) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
const stack = err instanceof Error ? err.stack : undefined;
|
||||
this.emitError({
|
||||
extensionPath: ext.path,
|
||||
event: "tool_call",
|
||||
error: message,
|
||||
stack,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
66
packages/pi-coding-agent/src/core/fs-utils.test.ts
Normal file
66
packages/pi-coding-agent/src/core/fs-utils.test.ts
Normal file
|
|
@ -0,0 +1,66 @@
|
|||
import assert from "node:assert/strict";
|
||||
import { describe, it } from "node:test";
|
||||
import { mkdtempSync, readFileSync, rmSync, existsSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
import { tmpdir } from "node:os";
|
||||
import { atomicWriteFileSync } from "./fs-utils.js";
|
||||
|
||||
describe("atomicWriteFileSync", () => {
|
||||
it("writes file content atomically", () => {
|
||||
const dir = mkdtempSync(join(tmpdir(), "fs-utils-test-"));
|
||||
try {
|
||||
const filePath = join(dir, "test.txt");
|
||||
atomicWriteFileSync(filePath, "hello world");
|
||||
assert.equal(readFileSync(filePath, "utf-8"), "hello world");
|
||||
} finally {
|
||||
rmSync(dir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("overwrites existing file atomically", () => {
|
||||
const dir = mkdtempSync(join(tmpdir(), "fs-utils-test-"));
|
||||
try {
|
||||
const filePath = join(dir, "test.txt");
|
||||
atomicWriteFileSync(filePath, "first");
|
||||
atomicWriteFileSync(filePath, "second");
|
||||
assert.equal(readFileSync(filePath, "utf-8"), "second");
|
||||
} finally {
|
||||
rmSync(dir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("does not leave .tmp file after successful write", () => {
|
||||
const dir = mkdtempSync(join(tmpdir(), "fs-utils-test-"));
|
||||
try {
|
||||
const filePath = join(dir, "test.txt");
|
||||
atomicWriteFileSync(filePath, "content");
|
||||
assert.equal(existsSync(filePath + ".tmp"), false);
|
||||
} finally {
|
||||
rmSync(dir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("supports Buffer content", () => {
|
||||
const dir = mkdtempSync(join(tmpdir(), "fs-utils-test-"));
|
||||
try {
|
||||
const filePath = join(dir, "test.bin");
|
||||
const buf = Buffer.from([0x00, 0x01, 0x02, 0xff]);
|
||||
atomicWriteFileSync(filePath, buf);
|
||||
const result = readFileSync(filePath);
|
||||
assert.deepEqual(result, buf);
|
||||
} finally {
|
||||
rmSync(dir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("supports encoding parameter", () => {
|
||||
const dir = mkdtempSync(join(tmpdir(), "fs-utils-test-"));
|
||||
try {
|
||||
const filePath = join(dir, "test.txt");
|
||||
atomicWriteFileSync(filePath, "utf8 content", "utf-8");
|
||||
assert.equal(readFileSync(filePath, "utf-8"), "utf8 content");
|
||||
} finally {
|
||||
rmSync(dir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
});
|
||||
12
packages/pi-coding-agent/src/core/fs-utils.ts
Normal file
12
packages/pi-coding-agent/src/core/fs-utils.ts
Normal file
|
|
@ -0,0 +1,12 @@
|
|||
import { renameSync, writeFileSync } from "node:fs";
|
||||
|
||||
/**
|
||||
* Atomically write a file by writing to a temporary path then renaming.
|
||||
* This prevents data loss if the process crashes mid-write — either the
|
||||
* old file remains intact or the new content is fully written.
|
||||
*/
|
||||
export function atomicWriteFileSync(filePath: string, content: string | Buffer, encoding?: BufferEncoding): void {
|
||||
const tmpPath = filePath + ".tmp";
|
||||
writeFileSync(tmpPath, content, encoding);
|
||||
renameSync(tmpPath, filePath);
|
||||
}
|
||||
|
|
@ -197,8 +197,12 @@ function parseMessage(
|
|||
let message: LspJsonRpcResponse | LspJsonRpcNotification;
|
||||
try {
|
||||
message = JSON.parse(messageText);
|
||||
} catch {
|
||||
// Malformed JSON from LSP server — skip this message and advance past it
|
||||
} catch (err) {
|
||||
// Malformed JSON from LSP server — log and skip this message
|
||||
if (process.env.DEBUG) {
|
||||
const preview = messageText.length > 200 ? messageText.slice(0, 200) + "..." : messageText;
|
||||
console.error(`[lsp] Dropped malformed JSON message: ${err instanceof Error ? err.message : err} — ${preview}`);
|
||||
}
|
||||
return { message: null, remaining };
|
||||
}
|
||||
|
||||
|
|
@ -409,6 +413,22 @@ export const WARMUP_TIMEOUT_MS = 5000;
|
|||
* Get or create an LSP client for the given server configuration and working directory.
|
||||
*/
|
||||
export async function getOrCreateClient(config: ServerConfig, cwd: string, initTimeoutMs?: number): Promise<LspClient> {
|
||||
const maxRetries = 2;
|
||||
let lastErr: unknown;
|
||||
for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
||||
try {
|
||||
return await getOrCreateClientOnce(config, cwd, initTimeoutMs);
|
||||
} catch (err) {
|
||||
lastErr = err;
|
||||
if (attempt < maxRetries) {
|
||||
await new Promise((resolve) => setTimeout(resolve, 1000 * (attempt + 1)));
|
||||
}
|
||||
}
|
||||
}
|
||||
throw lastErr;
|
||||
}
|
||||
|
||||
async function getOrCreateClientOnce(config: ServerConfig, cwd: string, initTimeoutMs?: number): Promise<LspClient> {
|
||||
const key = `${config.command}:${cwd}`;
|
||||
|
||||
const existingClient = clients.get(key);
|
||||
|
|
|
|||
|
|
@ -13,8 +13,10 @@ import {
|
|||
statSync,
|
||||
writeFileSync,
|
||||
} from "fs";
|
||||
import { atomicWriteFileSync } from "./fs-utils.js";
|
||||
import { readdir, readFile, stat } from "fs/promises";
|
||||
import { join, resolve } from "path";
|
||||
import lockfile from "proper-lockfile";
|
||||
import { getAgentDir as getDefaultAgentDir, getBlobsDir, getSessionsDir } from "../config.js";
|
||||
import {
|
||||
type BashExecutionMessage,
|
||||
|
|
@ -821,6 +823,37 @@ export class SessionManager {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the last assistant turn in the session appears to have been
|
||||
* interrupted (e.g., the last message is from the assistant with tool_use
|
||||
* blocks but no subsequent tool_result message).
|
||||
*/
|
||||
wasInterrupted(): boolean {
|
||||
// Walk backwards to find the last message entry
|
||||
for (let i = this.fileEntries.length - 1; i >= 0; i--) {
|
||||
const entry = this.fileEntries[i];
|
||||
if (entry.type !== "message") continue;
|
||||
|
||||
const msg = entry.message;
|
||||
if (msg.role === "user") return false; // clean user turn boundary
|
||||
if (msg.role === "assistant") {
|
||||
// Check if the assistant message contains tool_use blocks
|
||||
const content = Array.isArray(msg.content) ? msg.content : [];
|
||||
const hasToolUse = content.some(
|
||||
(block) => block.type === "toolCall",
|
||||
);
|
||||
if (hasToolUse) {
|
||||
// If the last message is an assistant tool_use with no following
|
||||
// tool_result message, the turn was likely interrupted
|
||||
return true;
|
||||
}
|
||||
return false; // assistant message without tool_use = completed text response
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/** Switch to a different session file (used for resume and branching) */
|
||||
setSessionFile(sessionFile: string): void {
|
||||
this.sessionFile = resolve(sessionFile);
|
||||
|
|
@ -903,10 +936,43 @@ export class SessionManager {
|
|||
}
|
||||
}
|
||||
|
||||
private acquireSessionLock(path: string): (() => void) | undefined {
|
||||
const maxAttempts = 10;
|
||||
const delayMs = 20;
|
||||
let lastError: unknown;
|
||||
|
||||
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
|
||||
try {
|
||||
return lockfile.lockSync(path, { realpath: false });
|
||||
} catch (error) {
|
||||
const code =
|
||||
typeof error === "object" && error !== null && "code" in error
|
||||
? String((error as { code?: unknown }).code)
|
||||
: undefined;
|
||||
if (code !== "ELOCKED" || attempt === maxAttempts) {
|
||||
// Non-fatal: proceed without lock rather than losing data
|
||||
return undefined;
|
||||
}
|
||||
lastError = error;
|
||||
const start = Date.now();
|
||||
while (Date.now() - start < delayMs) {
|
||||
// Busy-wait to avoid async
|
||||
}
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
private _rewriteFile(): void {
|
||||
if (!this.persist || !this.sessionFile) return;
|
||||
const content = `${this.fileEntries.map((e) => JSON.stringify(e)).join("\n")}\n`;
|
||||
writeFileSync(this.sessionFile, content);
|
||||
let release: (() => void) | undefined;
|
||||
try {
|
||||
release = this.acquireSessionLock(this.sessionFile);
|
||||
atomicWriteFileSync(this.sessionFile, content);
|
||||
} finally {
|
||||
release?.();
|
||||
}
|
||||
}
|
||||
|
||||
isPersisted(): boolean {
|
||||
|
|
@ -939,15 +1005,21 @@ export class SessionManager {
|
|||
return;
|
||||
}
|
||||
|
||||
if (!this.flushed) {
|
||||
for (const e of this.fileEntries) {
|
||||
const prepared = prepareForPersistence(e, this.blobStore) as FileEntry;
|
||||
let release: (() => void) | undefined;
|
||||
try {
|
||||
release = this.acquireSessionLock(this.sessionFile);
|
||||
if (!this.flushed) {
|
||||
for (const e of this.fileEntries) {
|
||||
const prepared = prepareForPersistence(e, this.blobStore) as FileEntry;
|
||||
appendFileSync(this.sessionFile, `${JSON.stringify(prepared)}\n`);
|
||||
}
|
||||
this.flushed = true;
|
||||
} else {
|
||||
const prepared = prepareForPersistence(entry, this.blobStore) as FileEntry;
|
||||
appendFileSync(this.sessionFile, `${JSON.stringify(prepared)}\n`);
|
||||
}
|
||||
this.flushed = true;
|
||||
} else {
|
||||
const prepared = prepareForPersistence(entry, this.blobStore) as FileEntry;
|
||||
appendFileSync(this.sessionFile, `${JSON.stringify(prepared)}\n`);
|
||||
} finally {
|
||||
release?.();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1495,14 +1567,14 @@ export class SessionManager {
|
|||
cwd: targetCwd,
|
||||
parentSession: sourcePath,
|
||||
};
|
||||
appendFileSync(newSessionFile, `${JSON.stringify(newHeader)}\n`);
|
||||
|
||||
// Copy all non-header entries from source
|
||||
// Build complete fork content and write atomically to prevent partial files on crash
|
||||
const lines = [JSON.stringify(newHeader)];
|
||||
for (const entry of sourceEntries) {
|
||||
if (entry.type !== "session") {
|
||||
appendFileSync(newSessionFile, `${JSON.stringify(entry)}\n`);
|
||||
lines.push(JSON.stringify(entry));
|
||||
}
|
||||
}
|
||||
atomicWriteFileSync(newSessionFile, lines.join("\n") + "\n");
|
||||
|
||||
return new SessionManager(targetCwd, dir, newSessionFile, true);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -577,6 +577,13 @@ async function handleConfigCommand(args: string[]): Promise<boolean> {
|
|||
}
|
||||
|
||||
export async function main(args: string[]) {
|
||||
// Catch unhandled promise rejections so the process doesn't silently disappear
|
||||
process.on("unhandledRejection", (reason) => {
|
||||
const message = reason instanceof Error ? reason.stack ?? reason.message : String(reason);
|
||||
console.error(`\nFatal: unhandled promise rejection\n${message}`);
|
||||
process.exitCode = 1;
|
||||
});
|
||||
|
||||
const offlineMode = args.includes("--offline") || isTruthyEnvFlag(process.env.PI_OFFLINE);
|
||||
if (offlineMode) {
|
||||
process.env.PI_OFFLINE = "1";
|
||||
|
|
|
|||
|
|
@ -2764,6 +2764,9 @@ export class InteractiveMode {
|
|||
if (this.isShuttingDown) return;
|
||||
this.isShuttingDown = true;
|
||||
|
||||
// Flush any queued settings writes before shutdown
|
||||
await this.settingsManager.flush();
|
||||
|
||||
// Emit shutdown event to extensions
|
||||
const extensionRunner = this.session.extensionRunner;
|
||||
if (extensionRunner?.hasHandlers("session_shutdown")) {
|
||||
|
|
@ -3860,7 +3863,12 @@ export class InteractiveMode {
|
|||
// Clear and re-render the chat
|
||||
this.chatContainer.clear();
|
||||
this.renderInitialMessages();
|
||||
this.showStatus("Resumed session");
|
||||
|
||||
if (this.session.sessionManager.wasInterrupted()) {
|
||||
this.showStatus("Resumed session (previous session ended unexpectedly — last action may be incomplete)");
|
||||
} else {
|
||||
this.showStatus("Resumed session");
|
||||
}
|
||||
}
|
||||
|
||||
private showProviderManager(): void {
|
||||
|
|
|
|||
|
|
@ -99,6 +99,18 @@ export class RpcClient {
|
|||
this.handleLine(line);
|
||||
});
|
||||
|
||||
// Detect unexpected subprocess exit and reject all pending requests
|
||||
this.process.on("exit", (code, signal) => {
|
||||
if (this.pendingRequests.size > 0) {
|
||||
const reason = signal ? `signal ${signal}` : `code ${code}`;
|
||||
const error = new Error(`Agent process exited unexpectedly (${reason}). Stderr: ${this.stderr}`);
|
||||
for (const [id, pending] of this.pendingRequests) {
|
||||
this.pendingRequests.delete(id);
|
||||
pending.reject(error);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Wait a moment for process to initialize
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
|
|
|
|||
|
|
@ -103,7 +103,7 @@ import {
|
|||
import { computeBudgets, resolveExecutorContextWindow } from "./context-budget.js";
|
||||
import { join } from "node:path";
|
||||
import { sep as pathSep } from "node:path";
|
||||
import { readdirSync, readFileSync, existsSync, mkdirSync, writeFileSync, unlinkSync, statSync } from "node:fs";
|
||||
import { readdirSync, readFileSync, existsSync, mkdirSync, writeFileSync, renameSync, unlinkSync, statSync } from "node:fs";
|
||||
import { nativeIsRepo, nativeInit, nativeAddPaths, nativeCommit } from "./native-git-bridge.js";
|
||||
import {
|
||||
autoCommitCurrentBranch,
|
||||
|
|
@ -2138,7 +2138,11 @@ async function dispatchNextUnit(
|
|||
// Clear completed-units.json for the finished milestone
|
||||
try {
|
||||
const file = completedKeysPath(s.basePath);
|
||||
if (existsSync(file)) writeFileSync(file, JSON.stringify([]), "utf-8");
|
||||
if (existsSync(file)) {
|
||||
const tmpFile = file + ".tmp";
|
||||
writeFileSync(tmpFile, JSON.stringify([]), "utf-8");
|
||||
renameSync(tmpFile, file);
|
||||
}
|
||||
s.completedKeySet.clear();
|
||||
} catch { /* non-fatal */ }
|
||||
|
||||
|
|
@ -2286,7 +2290,11 @@ async function dispatchNextUnit(
|
|||
// Clear completed-units.json for the finished milestone so it doesn't grow unbounded.
|
||||
try {
|
||||
const file = completedKeysPath(s.basePath);
|
||||
if (existsSync(file)) writeFileSync(file, JSON.stringify([]), "utf-8");
|
||||
if (existsSync(file)) {
|
||||
const tmpFile = file + ".tmp";
|
||||
writeFileSync(tmpFile, JSON.stringify([]), "utf-8");
|
||||
renameSync(tmpFile, file);
|
||||
}
|
||||
s.completedKeySet.clear();
|
||||
} catch { /* non-fatal */ }
|
||||
// ── Milestone merge: squash-merge milestone branch to main before stopping ──
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@
|
|||
* so the file on disk reflects every tool call up to the crash point).
|
||||
*/
|
||||
|
||||
import { writeFileSync, readFileSync, unlinkSync, existsSync } from "node:fs";
|
||||
import { renameSync, writeFileSync, readFileSync, unlinkSync, existsSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
import { gsdRoot } from "./paths.js";
|
||||
|
||||
|
|
@ -49,7 +49,10 @@ export function writeLock(
|
|||
completedUnits,
|
||||
sessionFile,
|
||||
};
|
||||
writeFileSync(lockPath(basePath), JSON.stringify(data, null, 2), "utf-8");
|
||||
const lp = lockPath(basePath);
|
||||
const tmpLp = lp + ".tmp";
|
||||
writeFileSync(tmpLp, JSON.stringify(data, null, 2), "utf-8");
|
||||
renameSync(tmpLp, lp);
|
||||
} catch (e) { /* non-fatal: lock write failure */ void e; }
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue