From 9756edfe0bd453af2c9426c6787108fd67cdf493 Mon Sep 17 00:00:00 2001 From: Mikael Hugo Date: Mon, 11 May 2026 08:59:51 +0200 Subject: [PATCH] refactor: rf-09/rf-08/rf-12/rf-05 cleanup and deduplication - rf-09: Remove isTransientNetworkError from preferences-models.js/preferences.js/preferences-models.d.ts (canonical is error-classifier.js) - rf-08: Extract Gemini token counting to google-gemini-token-counter.js; update register-hooks.js import - rf-12: Remove 3 dead _allRequirements/_allDecisions fetch blocks from db-writer.js - rf-05: Extract resolveSfBin() and monitorNdjsonStdout() to spawn-worker.js; both orchestrators now import from there Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../extensions/sf/bootstrap/register-hooks.js | 2 +- src/resources/extensions/sf/db-writer.js | 64 ---------------- .../sf/google-gemini-token-counter.js | 75 +++++++++++++++++++ .../extensions/sf/parallel-orchestrator.js | 48 +----------- .../extensions/sf/preferences-models.d.ts | 1 - .../extensions/sf/preferences-models.js | 15 +--- src/resources/extensions/sf/preferences.js | 1 - .../sf/slice-parallel-orchestrator.js | 41 +--------- src/resources/extensions/sf/spawn-worker.js | 61 +++++++++++++++ src/resources/extensions/sf/token-counter.js | 61 --------------- 10 files changed, 146 insertions(+), 223 deletions(-) create mode 100644 src/resources/extensions/sf/google-gemini-token-counter.js create mode 100644 src/resources/extensions/sf/spawn-worker.js diff --git a/src/resources/extensions/sf/bootstrap/register-hooks.js b/src/resources/extensions/sf/bootstrap/register-hooks.js index 7873ee50f..ec5578f1c 100644 --- a/src/resources/extensions/sf/bootstrap/register-hooks.js +++ b/src/resources/extensions/sf/bootstrap/register-hooks.js @@ -40,6 +40,7 @@ import { classifyExecutionPolicyCall, } from "../execution-policy.js"; import { formatContinue, loadFile, saveFile } from "../files.js"; +import { countGoogleGeminiCliTokens } from "../google-gemini-token-counter.js"; import { getDiscussionMilestoneId } from "../guided-flow.js"; import { initHealthWidget } from "../health-widget.js"; import { emitJournalEvent } from "../journal.js"; @@ -74,7 +75,6 @@ import { } from "../safety/evidence-collector.js"; import { initSessionRecorder } from "../session-recorder.js"; import { deriveState } from "../state.js"; -import { countGoogleGeminiCliTokens } from "../token-counter.js"; import { observeToolResult, resetToolWatchdog } from "../tool-watchdog.js"; import { getSessionTodoCompactionBlock } from "../tools/session-todo-tool.js"; import { parseUnitId } from "../unit-id.js"; diff --git a/src/resources/extensions/sf/db-writer.js b/src/resources/extensions/sf/db-writer.js index 3f24584e4..9e3691fea 100644 --- a/src/resources/extensions/sf/db-writer.js +++ b/src/resources/extensions/sf/db-writer.js @@ -285,28 +285,6 @@ export async function saveRequirementToDb(fields, _basePath) { db.upsertRequirement(requirement); return nextId; }); - // Fetch all requirements for full file regeneration - const adapter = db._getAdapter(); - let _allRequirements = []; - if (adapter) { - const rows = adapter - .prepare("SELECT * FROM requirements ORDER BY id") - .all(); - _allRequirements = rows.map((row) => ({ - id: row["id"], - class: row["class"], - status: row["status"], - description: row["description"], - why: row["why"], - source: row["source"], - primary_owner: row["primary_owner"], - supporting_slices: row["supporting_slices"], - validation: row["validation"], - notes: row["notes"], - full_content: row["full_content"], - superseded_by: row["superseded_by"] ?? null, - })); - } invalidateStateCache(); clearPathCache(); clearParseCache(); @@ -361,26 +339,6 @@ export async function saveDecisionToDb(fields, _basePath) { }); return nextId; }); - // Fetch all decisions (including superseded for the full register) - const adapter = db._getAdapter(); - let _allDecisions = []; - if (adapter) { - const rows = adapter - .prepare("SELECT * FROM decisions ORDER BY seq") - .all(); - _allDecisions = rows.map((row) => ({ - seq: row["seq"], - id: row["id"], - when_context: row["when_context"], - scope: row["scope"], - decision: row["decision"], - choice: row["choice"], - rationale: row["rationale"], - revisable: row["revisable"], - made_by: row["made_by"] ?? "agent", - superseded_by: row["superseded_by"] ?? null, - })); - } // #2661: When a decision defers a slice, update the slice status in the DB // so the dispatcher skips it. Without this, STATE.md and DECISIONS.md are // in split-brain: the decision says "deferred" but the state still says @@ -502,28 +460,6 @@ export async function updateRequirementInDb(id, updates, basePath) { id: base.id, // ID cannot be changed }; db.upsertRequirement(merged); - // Fetch ALL requirements (including superseded) for full file regeneration - const adapter = db._getAdapter(); - let _allRequirements = []; - if (adapter) { - const rows = adapter - .prepare("SELECT * FROM requirements ORDER BY id") - .all(); - _allRequirements = rows.map((row) => ({ - id: row["id"], - class: row["class"], - status: row["status"], - description: row["description"], - why: row["why"], - source: row["source"], - primary_owner: row["primary_owner"], - supporting_slices: row["supporting_slices"], - validation: row["validation"], - notes: row["notes"], - full_content: row["full_content"], - superseded_by: row["superseded_by"] ?? null, - })); - } invalidateStateCache(); clearPathCache(); clearParseCache(); diff --git a/src/resources/extensions/sf/google-gemini-token-counter.js b/src/resources/extensions/sf/google-gemini-token-counter.js new file mode 100644 index 000000000..f7f7ee3a9 --- /dev/null +++ b/src/resources/extensions/sf/google-gemini-token-counter.js @@ -0,0 +1,75 @@ +/** + * google-gemini-token-counter.js — Google Gemini CLI token counting + * + * Purpose: provider-specific token counting for Gemini CLI payloads using the + * CodeAssist server API. Isolated from the generic token-counter so that + * Gemini-only imports (@google/gemini-cli-core, google-auth-library) are + * not loaded for users who never use Gemini. + * + * Consumer: bootstrap/register-hooks.js token-count hook. + */ + +/** + * Parse Google Gemini CLI API key JSON to extract token and project ID. + */ +export function parseGoogleGeminiCliApiKey(apiKeyRaw) { + try { + const parsed = JSON.parse(apiKeyRaw); + if ( + typeof parsed.token !== "string" || + typeof parsed.projectId !== "string" + ) { + return undefined; + } + if (!parsed.token || !parsed.projectId) { + return undefined; + } + return { token: parsed.token, projectId: parsed.projectId }; + } catch { + return undefined; + } +} + +async function buildGoogleGeminiCliServer(apiKeyRaw) { + const credentials = parseGoogleGeminiCliApiKey(apiKeyRaw); + if (!credentials) { + throw new Error("Invalid Google Gemini CLI credentials"); + } + const [{ CodeAssistServer }, { OAuth2Client }] = await Promise.all([ + import("@google/gemini-cli-core"), + import("google-auth-library"), + ]); + const authClient = new OAuth2Client(); + authClient.setCredentials({ access_token: credentials.token }); + return new CodeAssistServer(authClient, credentials.projectId, { + headers: {}, + }); +} + +/** + * Type guard for Google Gemini token counting payload. + */ +export function isGoogleGeminiCountablePayload(payload) { + if (!payload || typeof payload !== "object") return false; + const candidate = payload; + return ( + typeof candidate.model === "string" && Array.isArray(candidate.contents) + ); +} + +/** + * Count tokens in a Google Gemini CLI request using their server API. + */ +export async function countGoogleGeminiCliTokens( + payload, + apiKeyRaw, + deps = { buildServer: buildGoogleGeminiCliServer }, +) { + if (!apiKeyRaw || !isGoogleGeminiCountablePayload(payload)) return undefined; + const response = await (await deps.buildServer(apiKeyRaw)).countTokens( + payload, + ); + return typeof response.totalTokens === "number" + ? response.totalTokens + : undefined; +} diff --git a/src/resources/extensions/sf/parallel-orchestrator.js b/src/resources/extensions/sf/parallel-orchestrator.js index 439dba3f4..8595a2278 100644 --- a/src/resources/extensions/sf/parallel-orchestrator.js +++ b/src/resources/extensions/sf/parallel-orchestrator.js @@ -43,6 +43,7 @@ import { sendSignal, writeSessionStatus, } from "./session-status-io.js"; +import { monitorNdjsonStdout, resolveSfBin } from "./spawn-worker.js"; import { selectConflictFreeBatch } from "./uok/execution-graph.js"; import { resolveUokFlags } from "./uok/flags.js"; import { logWarning } from "./workflow-logger.js"; @@ -607,21 +608,9 @@ export function spawnWorker(basePath, milestoneId) { // cost/token usage, keeping the coordinator's cost tracking in sync // with actual API spend. if (child.stdout) { - let stdoutBuffer = ""; - child.stdout.on("data", (data) => { - stdoutBuffer += data.toString(); - const lines = stdoutBuffer.split("\n"); - stdoutBuffer = lines.pop() || ""; - for (const line of lines) { - processWorkerLine(basePath, milestoneId, line); - } - }); - // Flush remaining buffer on close - child.stdout.on("close", () => { - if (stdoutBuffer.trim()) { - processWorkerLine(basePath, milestoneId, stdoutBuffer); - } - }); + monitorNdjsonStdout(child.stdout, (line) => + processWorkerLine(basePath, milestoneId, line), + ); } if (child.stderr) { child.stderr.on("data", (data) => { @@ -741,35 +730,6 @@ export function spawnWorker(basePath, milestoneId) { }); return true; } -/** - * Resolve the SF CLI binary path. - * Uses SF_BIN_PATH env var (set by loader.ts) or falls back to - * finding the binary relative to the current module. - */ -function resolveSfBin() { - // SF_BIN_PATH is set by loader.ts to the absolute path of dist/loader.js - if (process.env.SF_BIN_PATH && existsSync(process.env.SF_BIN_PATH)) { - return process.env.SF_BIN_PATH; - } - // Fallback: try to find loader.js relative to this file - // This file is at dist/resources/extensions/sf/parallel-orchestrator.js - // loader.js is at dist/loader.js - let thisDir; - try { - thisDir = import.meta.dirname; - } catch (e) { - logWarning("parallel", `dirname(fileURLToPath) failed: ${e.message}`); - thisDir = process.cwd(); - } - const candidates = [ - join(thisDir, "..", "..", "..", "loader.js"), - join(thisDir, "..", "..", "..", "..", "dist", "loader.js"), - ]; - for (const candidate of candidates) { - if (existsSync(candidate)) return candidate; - } - return null; -} // ─── NDJSON Processing ────────────────────────────────────────────────────── /** * Process a single NDJSON line from a worker's stdout. diff --git a/src/resources/extensions/sf/preferences-models.d.ts b/src/resources/extensions/sf/preferences-models.d.ts index 25b34f38c..07a7bbf31 100644 --- a/src/resources/extensions/sf/preferences-models.d.ts +++ b/src/resources/extensions/sf/preferences-models.d.ts @@ -23,7 +23,6 @@ export function getNextFallbackModel( currentModelId: string, modelConfig: Record, ): string | null; -export function isTransientNetworkError(errorMsg: string): boolean; export function validateModelId(modelId: string): boolean; export function updatePreferencesModels(models: unknown[]): void; export function updateSubscriptionTokensUsed( diff --git a/src/resources/extensions/sf/preferences-models.js b/src/resources/extensions/sf/preferences-models.js index 5c7f26d97..d3b08e1db 100644 --- a/src/resources/extensions/sf/preferences-models.js +++ b/src/resources/extensions/sf/preferences-models.js @@ -10,7 +10,7 @@ import { homedir } from "node:os"; import { join } from "node:path"; import { getModels, getProviders } from "@singularity-forge/ai"; import { selectByBenchmarks } from "./benchmark-selector.js"; -import { classifyError } from "./error-classifier.js"; + import { defaultRoutingConfig, MODEL_CAPABILITY_TIER } from "./model-router.js"; import { DEFAULT_RUNAWAY_CHANGED_FILES_WARNING, @@ -579,19 +579,6 @@ export function getNextFallbackModel(currentModelId, modelConfig) { return modelsToTry[0]; } } -/** - * Detect whether an error message indicates a transient network error - * (worth retrying the same model) vs a permanent provider error - * (auth failure, quota exceeded, etc. -- should fall back immediately). - * - * Delegates to error-classifier for consistent classification across the - * extension. error-classifier is the single source of truth for error triage. - */ -export function isTransientNetworkError(errorMsg) { - if (!errorMsg) return false; - const { kind } = classifyError(errorMsg); - return kind === "network" || kind === "connection" || kind === "stream"; -} /** * Validate a model ID string. * Returns true if the ID looks like a valid model identifier. diff --git a/src/resources/extensions/sf/preferences.js b/src/resources/extensions/sf/preferences.js index 3c64f885c..6a0c471aa 100644 --- a/src/resources/extensions/sf/preferences.js +++ b/src/resources/extensions/sf/preferences.js @@ -55,7 +55,6 @@ export { getNextFallbackModel, isProviderAllowedForAdvisor, isProviderModelAllowed, - isTransientNetworkError, resolveAutoSupervisorConfig, resolveContextSelection, resolveDynamicRoutingConfig, diff --git a/src/resources/extensions/sf/slice-parallel-orchestrator.js b/src/resources/extensions/sf/slice-parallel-orchestrator.js index deaf9b937..f7db5214c 100644 --- a/src/resources/extensions/sf/slice-parallel-orchestrator.js +++ b/src/resources/extensions/sf/slice-parallel-orchestrator.js @@ -20,6 +20,7 @@ import { getErrorMessage } from "./error-utils.js"; import { sfRoot } from "./paths.js"; import { writeSessionStatus } from "./session-status-io.js"; import { hasFileConflict } from "./slice-parallel-conflict.js"; +import { monitorNdjsonStdout, resolveSfBin } from "./spawn-worker.js"; import { selectConflictFreeBatch } from "./uok/execution-graph.js"; import { logWarning } from "./workflow-logger.js"; import { @@ -260,29 +261,6 @@ function filterConflictingSlices( return safe; } // ─── Internal: Worker Spawning ───────────────────────────────────────────── -/** - * Resolve the SF CLI binary path. - * Same logic as parallel-orchestrator.ts resolveSfBin(). - */ -function resolveSfBin() { - if (process.env.SF_BIN_PATH && existsSync(process.env.SF_BIN_PATH)) { - return process.env.SF_BIN_PATH; - } - let thisDir; - try { - thisDir = import.meta.dirname; - } catch { - thisDir = process.cwd(); - } - const candidates = [ - join(thisDir, "..", "..", "..", "loader.js"), - join(thisDir, "..", "..", "..", "..", "dist", "loader.js"), - ]; - for (const candidate of candidates) { - if (existsSync(candidate)) return candidate; - } - return null; -} /** * Spawn a worker process for a slice. * The worker runs `sf --mode json --print "/autonomous"` in the slice's worktree @@ -337,20 +315,9 @@ function spawnSliceWorker(basePath, milestoneId, sliceId) { } // ── NDJSON stdout monitoring ──────────────────────────────────────── if (child.stdout) { - let stdoutBuffer = ""; - child.stdout.on("data", (data) => { - stdoutBuffer += data.toString(); - const lines = stdoutBuffer.split("\n"); - stdoutBuffer = lines.pop() || ""; - for (const line of lines) { - processSliceWorkerLine(basePath, milestoneId, sliceId, line); - } - }); - child.stdout.on("close", () => { - if (stdoutBuffer.trim()) { - processSliceWorkerLine(basePath, milestoneId, sliceId, stdoutBuffer); - } - }); + monitorNdjsonStdout(child.stdout, (line) => + processSliceWorkerLine(basePath, milestoneId, sliceId, line), + ); } if (child.stderr) { child.stderr.on("data", (data) => { diff --git a/src/resources/extensions/sf/spawn-worker.js b/src/resources/extensions/sf/spawn-worker.js new file mode 100644 index 000000000..ebfc96b4d --- /dev/null +++ b/src/resources/extensions/sf/spawn-worker.js @@ -0,0 +1,61 @@ +/** + * spawn-worker.js — Shared helpers for parallel worker process management. + * + * Purpose: consolidate duplicated `resolveSfBin()` and NDJSON stdout buffering + * logic that appeared identically in both parallel-orchestrator.js and + * slice-parallel-orchestrator.js. Single source of truth for the SF binary + * path resolution strategy. + * + * Consumer: parallel-orchestrator.js, slice-parallel-orchestrator.js. + */ +import { existsSync } from "node:fs"; +import { join } from "node:path"; + +/** + * Resolve the absolute path to the SF CLI binary (dist/loader.js). + * Uses SF_BIN_PATH env var first (set by loader.ts), then falls back to + * relative path discovery from this file's location. + */ +export function resolveSfBin() { + if (process.env.SF_BIN_PATH && existsSync(process.env.SF_BIN_PATH)) { + return process.env.SF_BIN_PATH; + } + let thisDir; + try { + thisDir = import.meta.dirname; + } catch { + thisDir = process.cwd(); + } + const candidates = [ + join(thisDir, "..", "..", "..", "loader.js"), + join(thisDir, "..", "..", "..", "..", "dist", "loader.js"), + ]; + for (const candidate of candidates) { + if (existsSync(candidate)) return candidate; + } + return null; +} + +/** + * Attach a line-buffered NDJSON reader to a child process stdout stream. + * Splits incoming data on newlines, calls onLine for each complete line, + * and flushes any remaining buffer when the stream closes. + * + * Consumer: spawnWorker/spawnSliceWorker in each orchestrator. + */ +export function monitorNdjsonStdout(stream, onLine) { + let buffer = ""; + stream.on("data", (data) => { + buffer += data.toString(); + const lines = buffer.split("\n"); + buffer = lines.pop() || ""; + for (const line of lines) { + onLine(line); + } + }); + stream.on("close", () => { + if (buffer.trim()) { + onLine(buffer); + } + }); +} diff --git a/src/resources/extensions/sf/token-counter.js b/src/resources/extensions/sf/token-counter.js index 48dd7b25b..91b5b19e5 100644 --- a/src/resources/extensions/sf/token-counter.js +++ b/src/resources/extensions/sf/token-counter.js @@ -69,64 +69,3 @@ export function estimateTokensForProvider(text, provider) { const ratio = getCharsPerToken(provider); return Math.ceil(text.length / ratio); } -/** - * Parse Google Gemini CLI API key JSON to extract token and project ID. - */ -export function parseGoogleGeminiCliApiKey(apiKeyRaw) { - try { - const parsed = JSON.parse(apiKeyRaw); - if ( - typeof parsed.token !== "string" || - typeof parsed.projectId !== "string" - ) { - return undefined; - } - if (!parsed.token || !parsed.projectId) { - return undefined; - } - return { token: parsed.token, projectId: parsed.projectId }; - } catch { - return undefined; - } -} -async function buildGoogleGeminiCliServer(apiKeyRaw) { - const credentials = parseGoogleGeminiCliApiKey(apiKeyRaw); - if (!credentials) { - throw new Error("Invalid Google Gemini CLI credentials"); - } - const [{ CodeAssistServer }, { OAuth2Client }] = await Promise.all([ - import("@google/gemini-cli-core"), - import("google-auth-library"), - ]); - const authClient = new OAuth2Client(); - authClient.setCredentials({ access_token: credentials.token }); - return new CodeAssistServer(authClient, credentials.projectId, { - headers: {}, - }); -} -/** - * Type guard for Google Gemini token counting payload. - */ -export function isGoogleGeminiCountablePayload(payload) { - if (!payload || typeof payload !== "object") return false; - const candidate = payload; - return ( - typeof candidate.model === "string" && Array.isArray(candidate.contents) - ); -} -/** - * Count tokens in a Google Gemini CLI request using their server API. - */ -export async function countGoogleGeminiCliTokens( - payload, - apiKeyRaw, - deps = { buildServer: buildGoogleGeminiCliServer }, -) { - if (!apiKeyRaw || !isGoogleGeminiCountablePayload(payload)) return undefined; - const response = await (await deps.buildServer(apiKeyRaw)).countTokens( - payload, - ); - return typeof response.totalTokens === "number" - ? response.totalTokens - : undefined; -}