sf snapshot: pre-dispatch, uncommitted changes after 41m inactivity

This commit is contained in:
Mikael Hugo 2026-04-30 23:55:20 +02:00
parent 9843425836
commit 15c3c2d077
4 changed files with 229 additions and 6 deletions

View file

@ -6,6 +6,7 @@ import type {
ContentBlockParam,
MessageCreateParamsStreaming,
MessageParam,
RawMessageStreamEvent,
} from "@anthropic-ai/sdk/resources/messages.js";
import { calculateCost } from "../models.js";
import type {
@ -30,6 +31,7 @@ import type {
/** API types that use the Anthropic Messages protocol */
export type AnthropicApi = "anthropic-messages" | "anthropic-vertex";
import type { AssistantMessageEventStream } from "../utils/event-stream.js";
import { parseAnthropicSSE } from "../utils/event-stream.js";
import { parseStreamingJson } from "../utils/json-parse.js";
import { hasXmlParameterTags, repairToolJson } from "../utils/repair-tool-json.js";
import { sanitizeSurrogates } from "../utils/sanitize-unicode.js";
@ -574,13 +576,15 @@ export function processAnthropicStream(
if (nextParams !== undefined) {
params = nextParams as MessageCreateParamsStreaming;
}
const anthropicStream = client.messages.stream({ ...params, stream: true }, { signal: options?.signal });
const apiPromise = client.messages.create({ ...params, stream: true }, { signal: options?.signal });
const response = await apiPromise.asResponse();
stream.push({ type: "start", partial: output });
type Block = (ThinkingContent | TextContent | (ToolCall & { partialJson: string }) | ServerToolUseContent | WebSearchResultContent) & { index: number };
const blocks = output.content as Block[];
for await (const event of anthropicStream) {
for await (const rawEvent of parseAnthropicSSE(response, options?.signal)) {
const event = rawEvent as RawMessageStreamEvent;
if (event.type === "message_start") {
output.usage.input = event.message.usage.input_tokens || 0;
output.usage.output = event.message.usage.output_tokens || 0;

View file

@ -1,5 +1,133 @@
import type { AssistantMessage, AssistantMessageEvent } from "../types.js";
/** Known Anthropic SSE event types that we handle. Unknown events are silently dropped. */
const KNOWN_ANTHROPIC_EVENTS = new Set([
"message_start",
"message_delta",
"message_stop",
"content_block_start",
"content_block_delta",
"content_block_stop",
"ping",
"error",
]);
/**
* Parse a raw SSE (Server-Sent Events) stream response into JSON events.
*
* Purpose: give us full control over SSE parsing so that non-Anthropic events
* (e.g. OpenAI-style "done" events injected by proxies) are silently dropped
* instead of corrupting the stream.
*
* Consumer: processAnthropicStream in anthropic-shared.ts.
*/
export async function* parseAnthropicSSE(response: Response, signal?: AbortSignal): AsyncGenerator<unknown, void, unknown> {
if (!response.body) {
throw new Error("Attempted to iterate over a response with no body");
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
let eventName: string | null = null;
let dataLines: string[] = [];
function flushEvent(): unknown | undefined {
if (eventName === null && dataLines.length === 0) {
return undefined;
}
const data = dataLines.join("\n");
const name = eventName ?? "";
eventName = null;
dataLines = [];
if (name === "ping") {
return undefined;
}
if (name === "error") {
try {
return JSON.parse(data);
} catch {
return undefined;
}
}
if (!KNOWN_ANTHROPIC_EVENTS.has(name)) {
// Silently drop unknown events (e.g. OpenAI-style "done" from proxies)
return undefined;
}
try {
return JSON.parse(data);
} catch {
return undefined;
}
}
function processLine(line: string): unknown | undefined {
const trimmed = line.trim();
if (trimmed === "") {
// Empty line means end of an SSE event
return flushEvent();
}
if (trimmed.startsWith(":")) {
// Comment line, ignore
return undefined;
}
const colonIndex = trimmed.indexOf(":");
if (colonIndex === -1) return undefined;
const field = trimmed.slice(0, colonIndex);
const value = trimmed.slice(colonIndex + 1).trimStart();
if (field === "event") {
eventName = value;
} else if (field === "data") {
dataLines.push(value);
}
return undefined;
}
try {
while (true) {
if (signal?.aborted) {
return;
}
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
let newlineIndex: number;
while ((newlineIndex = buffer.indexOf("\n")) !== -1) {
const line = buffer.slice(0, newlineIndex);
buffer = buffer.slice(newlineIndex + 1);
const event = processLine(line);
if (event !== undefined) {
yield event;
}
}
}
// Flush remaining buffer as a final line
if (buffer.length > 0) {
const event = processLine(buffer);
if (event !== undefined) {
yield event;
}
}
// Flush any pending event
const event = flushEvent();
if (event !== undefined) {
yield event;
}
} finally {
reader.releaseLock();
}
}
// Generic event stream class for async iteration
export class EventStream<T, R = T> implements AsyncIterable<T> {
private queue: T[] = [];

View file

@ -1132,9 +1132,15 @@ export async function runDispatch(
recoveryAttempts: loopState.stuckRecoveryAttempts,
});
if (loopState.stuckRecoveryAttempts === 0) {
// Level 1: try verifying the artifact, then cache invalidation + retry
loopState.stuckRecoveryAttempts++;
// Graduated stuck recovery — up to 5 total attempts before hard stop.
// Attempt 0: cache invalidation + retry
// Attempts 14: rethink + retry
// Attempt 5 (exhausted): hard stop
loopState.stuckRecoveryAttempts++;
const attempt = loopState.stuckRecoveryAttempts;
if (attempt === 1) {
// Attempt 1: verify artifact + cache invalidation + retry
const artifactExists = verifyExpectedArtifact(
unitType,
unitId,
@ -1158,8 +1164,44 @@ export async function runDispatch(
"warning",
);
deps.invalidateAllCaches();
return { action: "continue" };
} else if (attempt <= 5) {
// Attempts 25: rethink + diagnostic + retry
const stuckDiag = diagnoseExpectedArtifact(
unitType,
unitId,
s.basePath,
);
const stuckRemediation = buildLoopRemediationSteps(
unitType,
unitId,
s.basePath,
);
const diagnostic = deps.getDeepDiagnostic(s.basePath);
const cappedDiag =
(diagnostic?.length ?? 0) > MAX_RECOVERY_CHARS
? diagnostic!.slice(0, MAX_RECOVERY_CHARS) +
"\n\n[...diagnostic truncated]"
: diagnostic ?? null;
s.pendingRethinkAttempt = JSON.stringify({
attempt,
reason: stuckSignal.reason,
diagnostic: cappedDiag,
stuckDiag,
remediation: stuckRemediation,
unitType,
unitId,
});
const rt = attempt === 5
? "**FINAL STUCK ATTEMPT — 5 of 5.** "
: `**STUCK RECOVERY ATTEMPT ${attempt - 1} of 4.** `;
ctx.ui.notify(
`${rt}Stuck on ${unitType} ${unitId} (${stuckSignal.reason}). Injecting diagnostic and retrying.`,
"warning",
);
return { action: "continue" };
} else {
// Level 2: hard stop — genuinely stuck
// Attempt 6+: genuinely exhausted — hard stop
debugLog("autoLoop", {
phase: "stuck-detected",
unitType,
@ -1824,6 +1866,52 @@ export async function runUnitPhase(
: s.pendingCrashRecovery;
finalPrompt = `${capped}\n\n---\n\n${finalPrompt}`;
s.pendingCrashRecovery = null;
} else if (s.pendingRethinkAttempt) {
// Stuck recovery: inject diagnostic + rethink prompt, then clear.
let rethinkCtx: {
attempt: number;
reason: string;
diagnostic: string | null;
stuckDiag: string | null;
remediation: string | null;
unitType: string;
unitId: string;
} | null = null;
try {
rethinkCtx = JSON.parse(s.pendingRethinkAttempt);
} catch {
// Malformed JSON — skip injection
}
s.pendingRethinkAttempt = null;
if (rethinkCtx) {
const isFinal = rethinkCtx.attempt === 5;
const lines = [
isFinal
? `**⚠ FINAL STUCK ATTEMPT (5 of 5) — You have run out of recovery attempts. Make this count.**`
: `**STUCK RECOVERY — Rethink attempt ${rethinkCtx.attempt - 1} of 4.**`,
"",
`You have been repeatedly stuck on **${rethinkCtx.unitType} ${rethinkCtx.unitId}** for reason: "${rethinkCtx.reason}".`,
"",
"Before continuing, you must reflect on the following:",
"",
"1. **What specific error or failure pattern are you seeing?**",
"2. **What assumption are you making that might be wrong?**",
"3. **What is ONE concrete, different approach you will try this time?**",
"",
"Do NOT repeat the same approach. Identify the root cause and try a genuinely different strategy.",
];
if (rethinkCtx.stuckDiag) {
lines.push("", `**What was expected:** ${rethinkCtx.stuckDiag}`);
}
if (rethinkCtx.remediation) {
lines.push("", `**Suggested remediation:**\n${rethinkCtx.remediation}`);
}
if (rethinkCtx.diagnostic) {
lines.push("", `**Full diagnostic from previous attempt:**\n${rethinkCtx.diagnostic}`);
}
lines.push("", "---", "", finalPrompt);
finalPrompt = lines.join("\n");
}
} else if ((s.unitDispatchCount.get(`${unitType}/${unitId}`) ?? 0) > 1) {
const diagnostic = deps.getDeepDiagnostic(s.basePath);
if (diagnostic) {

View file

@ -134,6 +134,8 @@ export class AutoSession {
// ── Recovery ─────────────────────────────────────────────────────────────
pendingCrashRecovery: string | null = null;
pendingVerificationRetry: PendingVerificationRetry | null = null;
/** Set when stuck detection triggers rethink: injected into next dispatch prompt. */
pendingRethinkAttempt: string | null = null;
readonly verificationRetryCount = new Map<string, number>();
pausedSessionFile: string | null = null;
pausedUnitType: string | null = null;
@ -305,6 +307,7 @@ export class AutoSession {
// Recovery
this.pendingCrashRecovery = null;
this.pendingVerificationRetry = null;
this.pendingRethinkAttempt = null;
this.verificationRetryCount.clear();
this.pausedSessionFile = null;
this.pausedUnitType = null;