From 15c3c2d07751b562a0a64ca3b420404d63ab8779 Mon Sep 17 00:00:00 2001 From: Mikael Hugo Date: Thu, 30 Apr 2026 23:55:20 +0200 Subject: [PATCH] sf snapshot: pre-dispatch, uncommitted changes after 41m inactivity --- .../pi-ai/src/providers/anthropic-shared.ts | 8 +- packages/pi-ai/src/utils/event-stream.ts | 128 ++++++++++++++++++ src/resources/extensions/sf/auto/phases.ts | 96 ++++++++++++- src/resources/extensions/sf/auto/session.ts | 3 + 4 files changed, 229 insertions(+), 6 deletions(-) diff --git a/packages/pi-ai/src/providers/anthropic-shared.ts b/packages/pi-ai/src/providers/anthropic-shared.ts index 1e8f78f9b..04837dc16 100644 --- a/packages/pi-ai/src/providers/anthropic-shared.ts +++ b/packages/pi-ai/src/providers/anthropic-shared.ts @@ -6,6 +6,7 @@ import type { ContentBlockParam, MessageCreateParamsStreaming, MessageParam, + RawMessageStreamEvent, } from "@anthropic-ai/sdk/resources/messages.js"; import { calculateCost } from "../models.js"; import type { @@ -30,6 +31,7 @@ import type { /** API types that use the Anthropic Messages protocol */ export type AnthropicApi = "anthropic-messages" | "anthropic-vertex"; import type { AssistantMessageEventStream } from "../utils/event-stream.js"; +import { parseAnthropicSSE } from "../utils/event-stream.js"; import { parseStreamingJson } from "../utils/json-parse.js"; import { hasXmlParameterTags, repairToolJson } from "../utils/repair-tool-json.js"; import { sanitizeSurrogates } from "../utils/sanitize-unicode.js"; @@ -574,13 +576,15 @@ export function processAnthropicStream( if (nextParams !== undefined) { params = nextParams as MessageCreateParamsStreaming; } - const anthropicStream = client.messages.stream({ ...params, stream: true }, { signal: options?.signal }); + const apiPromise = client.messages.create({ ...params, stream: true }, { signal: options?.signal }); + const response = await apiPromise.asResponse(); stream.push({ type: "start", partial: output }); type Block = (ThinkingContent | TextContent | (ToolCall & { partialJson: string }) | ServerToolUseContent | WebSearchResultContent) & { index: number }; const blocks = output.content as Block[]; - for await (const event of anthropicStream) { + for await (const rawEvent of parseAnthropicSSE(response, options?.signal)) { + const event = rawEvent as RawMessageStreamEvent; if (event.type === "message_start") { output.usage.input = event.message.usage.input_tokens || 0; output.usage.output = event.message.usage.output_tokens || 0; diff --git a/packages/pi-ai/src/utils/event-stream.ts b/packages/pi-ai/src/utils/event-stream.ts index 7eb0a0104..294145858 100644 --- a/packages/pi-ai/src/utils/event-stream.ts +++ b/packages/pi-ai/src/utils/event-stream.ts @@ -1,5 +1,133 @@ import type { AssistantMessage, AssistantMessageEvent } from "../types.js"; +/** Known Anthropic SSE event types that we handle. Unknown events are silently dropped. */ +const KNOWN_ANTHROPIC_EVENTS = new Set([ + "message_start", + "message_delta", + "message_stop", + "content_block_start", + "content_block_delta", + "content_block_stop", + "ping", + "error", +]); + +/** + * Parse a raw SSE (Server-Sent Events) stream response into JSON events. + * + * Purpose: give us full control over SSE parsing so that non-Anthropic events + * (e.g. OpenAI-style "done" events injected by proxies) are silently dropped + * instead of corrupting the stream. + * + * Consumer: processAnthropicStream in anthropic-shared.ts. + */ +export async function* parseAnthropicSSE(response: Response, signal?: AbortSignal): AsyncGenerator { + if (!response.body) { + throw new Error("Attempted to iterate over a response with no body"); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + let eventName: string | null = null; + let dataLines: string[] = []; + + function flushEvent(): unknown | undefined { + if (eventName === null && dataLines.length === 0) { + return undefined; + } + const data = dataLines.join("\n"); + const name = eventName ?? ""; + eventName = null; + dataLines = []; + + if (name === "ping") { + return undefined; + } + if (name === "error") { + try { + return JSON.parse(data); + } catch { + return undefined; + } + } + if (!KNOWN_ANTHROPIC_EVENTS.has(name)) { + // Silently drop unknown events (e.g. OpenAI-style "done" from proxies) + return undefined; + } + try { + return JSON.parse(data); + } catch { + return undefined; + } + } + + function processLine(line: string): unknown | undefined { + const trimmed = line.trim(); + if (trimmed === "") { + // Empty line means end of an SSE event + return flushEvent(); + } + + if (trimmed.startsWith(":")) { + // Comment line, ignore + return undefined; + } + + const colonIndex = trimmed.indexOf(":"); + if (colonIndex === -1) return undefined; + + const field = trimmed.slice(0, colonIndex); + const value = trimmed.slice(colonIndex + 1).trimStart(); + + if (field === "event") { + eventName = value; + } else if (field === "data") { + dataLines.push(value); + } + return undefined; + } + + try { + while (true) { + if (signal?.aborted) { + return; + } + + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + + let newlineIndex: number; + while ((newlineIndex = buffer.indexOf("\n")) !== -1) { + const line = buffer.slice(0, newlineIndex); + buffer = buffer.slice(newlineIndex + 1); + const event = processLine(line); + if (event !== undefined) { + yield event; + } + } + } + + // Flush remaining buffer as a final line + if (buffer.length > 0) { + const event = processLine(buffer); + if (event !== undefined) { + yield event; + } + } + + // Flush any pending event + const event = flushEvent(); + if (event !== undefined) { + yield event; + } + } finally { + reader.releaseLock(); + } +} + // Generic event stream class for async iteration export class EventStream implements AsyncIterable { private queue: T[] = []; diff --git a/src/resources/extensions/sf/auto/phases.ts b/src/resources/extensions/sf/auto/phases.ts index 63d6d42e3..efc92cb4a 100644 --- a/src/resources/extensions/sf/auto/phases.ts +++ b/src/resources/extensions/sf/auto/phases.ts @@ -1132,9 +1132,15 @@ export async function runDispatch( recoveryAttempts: loopState.stuckRecoveryAttempts, }); - if (loopState.stuckRecoveryAttempts === 0) { - // Level 1: try verifying the artifact, then cache invalidation + retry - loopState.stuckRecoveryAttempts++; + // Graduated stuck recovery — up to 5 total attempts before hard stop. + // Attempt 0: cache invalidation + retry + // Attempts 1–4: rethink + retry + // Attempt 5 (exhausted): hard stop + loopState.stuckRecoveryAttempts++; + const attempt = loopState.stuckRecoveryAttempts; + + if (attempt === 1) { + // Attempt 1: verify artifact + cache invalidation + retry const artifactExists = verifyExpectedArtifact( unitType, unitId, @@ -1158,8 +1164,44 @@ export async function runDispatch( "warning", ); deps.invalidateAllCaches(); + return { action: "continue" }; + } else if (attempt <= 5) { + // Attempts 2–5: rethink + diagnostic + retry + const stuckDiag = diagnoseExpectedArtifact( + unitType, + unitId, + s.basePath, + ); + const stuckRemediation = buildLoopRemediationSteps( + unitType, + unitId, + s.basePath, + ); + const diagnostic = deps.getDeepDiagnostic(s.basePath); + const cappedDiag = + (diagnostic?.length ?? 0) > MAX_RECOVERY_CHARS + ? diagnostic!.slice(0, MAX_RECOVERY_CHARS) + + "\n\n[...diagnostic truncated]" + : diagnostic ?? null; + s.pendingRethinkAttempt = JSON.stringify({ + attempt, + reason: stuckSignal.reason, + diagnostic: cappedDiag, + stuckDiag, + remediation: stuckRemediation, + unitType, + unitId, + }); + const rt = attempt === 5 + ? "**FINAL STUCK ATTEMPT — 5 of 5.** " + : `**STUCK RECOVERY ATTEMPT ${attempt - 1} of 4.** `; + ctx.ui.notify( + `${rt}Stuck on ${unitType} ${unitId} (${stuckSignal.reason}). Injecting diagnostic and retrying.`, + "warning", + ); + return { action: "continue" }; } else { - // Level 2: hard stop — genuinely stuck + // Attempt 6+: genuinely exhausted — hard stop debugLog("autoLoop", { phase: "stuck-detected", unitType, @@ -1824,6 +1866,52 @@ export async function runUnitPhase( : s.pendingCrashRecovery; finalPrompt = `${capped}\n\n---\n\n${finalPrompt}`; s.pendingCrashRecovery = null; + } else if (s.pendingRethinkAttempt) { + // Stuck recovery: inject diagnostic + rethink prompt, then clear. + let rethinkCtx: { + attempt: number; + reason: string; + diagnostic: string | null; + stuckDiag: string | null; + remediation: string | null; + unitType: string; + unitId: string; + } | null = null; + try { + rethinkCtx = JSON.parse(s.pendingRethinkAttempt); + } catch { + // Malformed JSON — skip injection + } + s.pendingRethinkAttempt = null; + if (rethinkCtx) { + const isFinal = rethinkCtx.attempt === 5; + const lines = [ + isFinal + ? `**⚠ FINAL STUCK ATTEMPT (5 of 5) — You have run out of recovery attempts. Make this count.**` + : `**STUCK RECOVERY — Rethink attempt ${rethinkCtx.attempt - 1} of 4.**`, + "", + `You have been repeatedly stuck on **${rethinkCtx.unitType} ${rethinkCtx.unitId}** for reason: "${rethinkCtx.reason}".`, + "", + "Before continuing, you must reflect on the following:", + "", + "1. **What specific error or failure pattern are you seeing?**", + "2. **What assumption are you making that might be wrong?**", + "3. **What is ONE concrete, different approach you will try this time?**", + "", + "Do NOT repeat the same approach. Identify the root cause and try a genuinely different strategy.", + ]; + if (rethinkCtx.stuckDiag) { + lines.push("", `**What was expected:** ${rethinkCtx.stuckDiag}`); + } + if (rethinkCtx.remediation) { + lines.push("", `**Suggested remediation:**\n${rethinkCtx.remediation}`); + } + if (rethinkCtx.diagnostic) { + lines.push("", `**Full diagnostic from previous attempt:**\n${rethinkCtx.diagnostic}`); + } + lines.push("", "---", "", finalPrompt); + finalPrompt = lines.join("\n"); + } } else if ((s.unitDispatchCount.get(`${unitType}/${unitId}`) ?? 0) > 1) { const diagnostic = deps.getDeepDiagnostic(s.basePath); if (diagnostic) { diff --git a/src/resources/extensions/sf/auto/session.ts b/src/resources/extensions/sf/auto/session.ts index a17e71c7e..0e6fb6ad3 100644 --- a/src/resources/extensions/sf/auto/session.ts +++ b/src/resources/extensions/sf/auto/session.ts @@ -134,6 +134,8 @@ export class AutoSession { // ── Recovery ───────────────────────────────────────────────────────────── pendingCrashRecovery: string | null = null; pendingVerificationRetry: PendingVerificationRetry | null = null; + /** Set when stuck detection triggers rethink: injected into next dispatch prompt. */ + pendingRethinkAttempt: string | null = null; readonly verificationRetryCount = new Map(); pausedSessionFile: string | null = null; pausedUnitType: string | null = null; @@ -305,6 +307,7 @@ export class AutoSession { // Recovery this.pendingCrashRecovery = null; this.pendingVerificationRetry = null; + this.pendingRethinkAttempt = null; this.verificationRetryCount.clear(); this.pausedSessionFile = null; this.pausedUnitType = null;