refactor: rf-09/rf-08/rf-12/rf-05 cleanup and deduplication

- rf-09: Remove isTransientNetworkError from preferences-models.js/preferences.js/preferences-models.d.ts (canonical is error-classifier.js)
- rf-08: Extract Gemini token counting to google-gemini-token-counter.js; update register-hooks.js import
- rf-12: Remove 3 dead _allRequirements/_allDecisions fetch blocks from db-writer.js
- rf-05: Extract resolveSfBin() and monitorNdjsonStdout() to spawn-worker.js; both orchestrators now import from there

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Mikael Hugo 2026-05-11 08:59:51 +02:00
parent 96d751555f
commit 9756edfe0b
10 changed files with 146 additions and 223 deletions

View file

@ -40,6 +40,7 @@ import {
classifyExecutionPolicyCall, classifyExecutionPolicyCall,
} from "../execution-policy.js"; } from "../execution-policy.js";
import { formatContinue, loadFile, saveFile } from "../files.js"; import { formatContinue, loadFile, saveFile } from "../files.js";
import { countGoogleGeminiCliTokens } from "../google-gemini-token-counter.js";
import { getDiscussionMilestoneId } from "../guided-flow.js"; import { getDiscussionMilestoneId } from "../guided-flow.js";
import { initHealthWidget } from "../health-widget.js"; import { initHealthWidget } from "../health-widget.js";
import { emitJournalEvent } from "../journal.js"; import { emitJournalEvent } from "../journal.js";
@ -74,7 +75,6 @@ import {
} from "../safety/evidence-collector.js"; } from "../safety/evidence-collector.js";
import { initSessionRecorder } from "../session-recorder.js"; import { initSessionRecorder } from "../session-recorder.js";
import { deriveState } from "../state.js"; import { deriveState } from "../state.js";
import { countGoogleGeminiCliTokens } from "../token-counter.js";
import { observeToolResult, resetToolWatchdog } from "../tool-watchdog.js"; import { observeToolResult, resetToolWatchdog } from "../tool-watchdog.js";
import { getSessionTodoCompactionBlock } from "../tools/session-todo-tool.js"; import { getSessionTodoCompactionBlock } from "../tools/session-todo-tool.js";
import { parseUnitId } from "../unit-id.js"; import { parseUnitId } from "../unit-id.js";

View file

@ -285,28 +285,6 @@ export async function saveRequirementToDb(fields, _basePath) {
db.upsertRequirement(requirement); db.upsertRequirement(requirement);
return nextId; return nextId;
}); });
// Fetch all requirements for full file regeneration
const adapter = db._getAdapter();
let _allRequirements = [];
if (adapter) {
const rows = adapter
.prepare("SELECT * FROM requirements ORDER BY id")
.all();
_allRequirements = rows.map((row) => ({
id: row["id"],
class: row["class"],
status: row["status"],
description: row["description"],
why: row["why"],
source: row["source"],
primary_owner: row["primary_owner"],
supporting_slices: row["supporting_slices"],
validation: row["validation"],
notes: row["notes"],
full_content: row["full_content"],
superseded_by: row["superseded_by"] ?? null,
}));
}
invalidateStateCache(); invalidateStateCache();
clearPathCache(); clearPathCache();
clearParseCache(); clearParseCache();
@ -361,26 +339,6 @@ export async function saveDecisionToDb(fields, _basePath) {
}); });
return nextId; return nextId;
}); });
// Fetch all decisions (including superseded for the full register)
const adapter = db._getAdapter();
let _allDecisions = [];
if (adapter) {
const rows = adapter
.prepare("SELECT * FROM decisions ORDER BY seq")
.all();
_allDecisions = rows.map((row) => ({
seq: row["seq"],
id: row["id"],
when_context: row["when_context"],
scope: row["scope"],
decision: row["decision"],
choice: row["choice"],
rationale: row["rationale"],
revisable: row["revisable"],
made_by: row["made_by"] ?? "agent",
superseded_by: row["superseded_by"] ?? null,
}));
}
// #2661: When a decision defers a slice, update the slice status in the DB // #2661: When a decision defers a slice, update the slice status in the DB
// so the dispatcher skips it. Without this, STATE.md and DECISIONS.md are // so the dispatcher skips it. Without this, STATE.md and DECISIONS.md are
// in split-brain: the decision says "deferred" but the state still says // in split-brain: the decision says "deferred" but the state still says
@ -502,28 +460,6 @@ export async function updateRequirementInDb(id, updates, basePath) {
id: base.id, // ID cannot be changed id: base.id, // ID cannot be changed
}; };
db.upsertRequirement(merged); db.upsertRequirement(merged);
// Fetch ALL requirements (including superseded) for full file regeneration
const adapter = db._getAdapter();
let _allRequirements = [];
if (adapter) {
const rows = adapter
.prepare("SELECT * FROM requirements ORDER BY id")
.all();
_allRequirements = rows.map((row) => ({
id: row["id"],
class: row["class"],
status: row["status"],
description: row["description"],
why: row["why"],
source: row["source"],
primary_owner: row["primary_owner"],
supporting_slices: row["supporting_slices"],
validation: row["validation"],
notes: row["notes"],
full_content: row["full_content"],
superseded_by: row["superseded_by"] ?? null,
}));
}
invalidateStateCache(); invalidateStateCache();
clearPathCache(); clearPathCache();
clearParseCache(); clearParseCache();

View file

@ -0,0 +1,75 @@
/**
* google-gemini-token-counter.js Google Gemini CLI token counting
*
* Purpose: provider-specific token counting for Gemini CLI payloads using the
* CodeAssist server API. Isolated from the generic token-counter so that
* Gemini-only imports (@google/gemini-cli-core, google-auth-library) are
* not loaded for users who never use Gemini.
*
* Consumer: bootstrap/register-hooks.js token-count hook.
*/
/**
* Parse Google Gemini CLI API key JSON to extract token and project ID.
*/
export function parseGoogleGeminiCliApiKey(apiKeyRaw) {
try {
const parsed = JSON.parse(apiKeyRaw);
if (
typeof parsed.token !== "string" ||
typeof parsed.projectId !== "string"
) {
return undefined;
}
if (!parsed.token || !parsed.projectId) {
return undefined;
}
return { token: parsed.token, projectId: parsed.projectId };
} catch {
return undefined;
}
}
async function buildGoogleGeminiCliServer(apiKeyRaw) {
const credentials = parseGoogleGeminiCliApiKey(apiKeyRaw);
if (!credentials) {
throw new Error("Invalid Google Gemini CLI credentials");
}
const [{ CodeAssistServer }, { OAuth2Client }] = await Promise.all([
import("@google/gemini-cli-core"),
import("google-auth-library"),
]);
const authClient = new OAuth2Client();
authClient.setCredentials({ access_token: credentials.token });
return new CodeAssistServer(authClient, credentials.projectId, {
headers: {},
});
}
/**
* Type guard for Google Gemini token counting payload.
*/
export function isGoogleGeminiCountablePayload(payload) {
if (!payload || typeof payload !== "object") return false;
const candidate = payload;
return (
typeof candidate.model === "string" && Array.isArray(candidate.contents)
);
}
/**
* Count tokens in a Google Gemini CLI request using their server API.
*/
export async function countGoogleGeminiCliTokens(
payload,
apiKeyRaw,
deps = { buildServer: buildGoogleGeminiCliServer },
) {
if (!apiKeyRaw || !isGoogleGeminiCountablePayload(payload)) return undefined;
const response = await (await deps.buildServer(apiKeyRaw)).countTokens(
payload,
);
return typeof response.totalTokens === "number"
? response.totalTokens
: undefined;
}

View file

@ -43,6 +43,7 @@ import {
sendSignal, sendSignal,
writeSessionStatus, writeSessionStatus,
} from "./session-status-io.js"; } from "./session-status-io.js";
import { monitorNdjsonStdout, resolveSfBin } from "./spawn-worker.js";
import { selectConflictFreeBatch } from "./uok/execution-graph.js"; import { selectConflictFreeBatch } from "./uok/execution-graph.js";
import { resolveUokFlags } from "./uok/flags.js"; import { resolveUokFlags } from "./uok/flags.js";
import { logWarning } from "./workflow-logger.js"; import { logWarning } from "./workflow-logger.js";
@ -607,21 +608,9 @@ export function spawnWorker(basePath, milestoneId) {
// cost/token usage, keeping the coordinator's cost tracking in sync // cost/token usage, keeping the coordinator's cost tracking in sync
// with actual API spend. // with actual API spend.
if (child.stdout) { if (child.stdout) {
let stdoutBuffer = ""; monitorNdjsonStdout(child.stdout, (line) =>
child.stdout.on("data", (data) => { processWorkerLine(basePath, milestoneId, line),
stdoutBuffer += data.toString(); );
const lines = stdoutBuffer.split("\n");
stdoutBuffer = lines.pop() || "";
for (const line of lines) {
processWorkerLine(basePath, milestoneId, line);
}
});
// Flush remaining buffer on close
child.stdout.on("close", () => {
if (stdoutBuffer.trim()) {
processWorkerLine(basePath, milestoneId, stdoutBuffer);
}
});
} }
if (child.stderr) { if (child.stderr) {
child.stderr.on("data", (data) => { child.stderr.on("data", (data) => {
@ -741,35 +730,6 @@ export function spawnWorker(basePath, milestoneId) {
}); });
return true; return true;
} }
/**
* Resolve the SF CLI binary path.
* Uses SF_BIN_PATH env var (set by loader.ts) or falls back to
* finding the binary relative to the current module.
*/
function resolveSfBin() {
// SF_BIN_PATH is set by loader.ts to the absolute path of dist/loader.js
if (process.env.SF_BIN_PATH && existsSync(process.env.SF_BIN_PATH)) {
return process.env.SF_BIN_PATH;
}
// Fallback: try to find loader.js relative to this file
// This file is at dist/resources/extensions/sf/parallel-orchestrator.js
// loader.js is at dist/loader.js
let thisDir;
try {
thisDir = import.meta.dirname;
} catch (e) {
logWarning("parallel", `dirname(fileURLToPath) failed: ${e.message}`);
thisDir = process.cwd();
}
const candidates = [
join(thisDir, "..", "..", "..", "loader.js"),
join(thisDir, "..", "..", "..", "..", "dist", "loader.js"),
];
for (const candidate of candidates) {
if (existsSync(candidate)) return candidate;
}
return null;
}
// ─── NDJSON Processing ────────────────────────────────────────────────────── // ─── NDJSON Processing ──────────────────────────────────────────────────────
/** /**
* Process a single NDJSON line from a worker's stdout. * Process a single NDJSON line from a worker's stdout.

View file

@ -23,7 +23,6 @@ export function getNextFallbackModel(
currentModelId: string, currentModelId: string,
modelConfig: Record<string, unknown>, modelConfig: Record<string, unknown>,
): string | null; ): string | null;
export function isTransientNetworkError(errorMsg: string): boolean;
export function validateModelId(modelId: string): boolean; export function validateModelId(modelId: string): boolean;
export function updatePreferencesModels(models: unknown[]): void; export function updatePreferencesModels(models: unknown[]): void;
export function updateSubscriptionTokensUsed( export function updateSubscriptionTokensUsed(

View file

@ -10,7 +10,7 @@ import { homedir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import { getModels, getProviders } from "@singularity-forge/ai"; import { getModels, getProviders } from "@singularity-forge/ai";
import { selectByBenchmarks } from "./benchmark-selector.js"; import { selectByBenchmarks } from "./benchmark-selector.js";
import { classifyError } from "./error-classifier.js";
import { defaultRoutingConfig, MODEL_CAPABILITY_TIER } from "./model-router.js"; import { defaultRoutingConfig, MODEL_CAPABILITY_TIER } from "./model-router.js";
import { import {
DEFAULT_RUNAWAY_CHANGED_FILES_WARNING, DEFAULT_RUNAWAY_CHANGED_FILES_WARNING,
@ -579,19 +579,6 @@ export function getNextFallbackModel(currentModelId, modelConfig) {
return modelsToTry[0]; return modelsToTry[0];
} }
} }
/**
* Detect whether an error message indicates a transient network error
* (worth retrying the same model) vs a permanent provider error
* (auth failure, quota exceeded, etc. -- should fall back immediately).
*
* Delegates to error-classifier for consistent classification across the
* extension. error-classifier is the single source of truth for error triage.
*/
export function isTransientNetworkError(errorMsg) {
if (!errorMsg) return false;
const { kind } = classifyError(errorMsg);
return kind === "network" || kind === "connection" || kind === "stream";
}
/** /**
* Validate a model ID string. * Validate a model ID string.
* Returns true if the ID looks like a valid model identifier. * Returns true if the ID looks like a valid model identifier.

View file

@ -55,7 +55,6 @@ export {
getNextFallbackModel, getNextFallbackModel,
isProviderAllowedForAdvisor, isProviderAllowedForAdvisor,
isProviderModelAllowed, isProviderModelAllowed,
isTransientNetworkError,
resolveAutoSupervisorConfig, resolveAutoSupervisorConfig,
resolveContextSelection, resolveContextSelection,
resolveDynamicRoutingConfig, resolveDynamicRoutingConfig,

View file

@ -20,6 +20,7 @@ import { getErrorMessage } from "./error-utils.js";
import { sfRoot } from "./paths.js"; import { sfRoot } from "./paths.js";
import { writeSessionStatus } from "./session-status-io.js"; import { writeSessionStatus } from "./session-status-io.js";
import { hasFileConflict } from "./slice-parallel-conflict.js"; import { hasFileConflict } from "./slice-parallel-conflict.js";
import { monitorNdjsonStdout, resolveSfBin } from "./spawn-worker.js";
import { selectConflictFreeBatch } from "./uok/execution-graph.js"; import { selectConflictFreeBatch } from "./uok/execution-graph.js";
import { logWarning } from "./workflow-logger.js"; import { logWarning } from "./workflow-logger.js";
import { import {
@ -260,29 +261,6 @@ function filterConflictingSlices(
return safe; return safe;
} }
// ─── Internal: Worker Spawning ───────────────────────────────────────────── // ─── Internal: Worker Spawning ─────────────────────────────────────────────
/**
* Resolve the SF CLI binary path.
* Same logic as parallel-orchestrator.ts resolveSfBin().
*/
function resolveSfBin() {
if (process.env.SF_BIN_PATH && existsSync(process.env.SF_BIN_PATH)) {
return process.env.SF_BIN_PATH;
}
let thisDir;
try {
thisDir = import.meta.dirname;
} catch {
thisDir = process.cwd();
}
const candidates = [
join(thisDir, "..", "..", "..", "loader.js"),
join(thisDir, "..", "..", "..", "..", "dist", "loader.js"),
];
for (const candidate of candidates) {
if (existsSync(candidate)) return candidate;
}
return null;
}
/** /**
* Spawn a worker process for a slice. * Spawn a worker process for a slice.
* The worker runs `sf --mode json --print "/autonomous"` in the slice's worktree * The worker runs `sf --mode json --print "/autonomous"` in the slice's worktree
@ -337,20 +315,9 @@ function spawnSliceWorker(basePath, milestoneId, sliceId) {
} }
// ── NDJSON stdout monitoring ──────────────────────────────────────── // ── NDJSON stdout monitoring ────────────────────────────────────────
if (child.stdout) { if (child.stdout) {
let stdoutBuffer = ""; monitorNdjsonStdout(child.stdout, (line) =>
child.stdout.on("data", (data) => { processSliceWorkerLine(basePath, milestoneId, sliceId, line),
stdoutBuffer += data.toString(); );
const lines = stdoutBuffer.split("\n");
stdoutBuffer = lines.pop() || "";
for (const line of lines) {
processSliceWorkerLine(basePath, milestoneId, sliceId, line);
}
});
child.stdout.on("close", () => {
if (stdoutBuffer.trim()) {
processSliceWorkerLine(basePath, milestoneId, sliceId, stdoutBuffer);
}
});
} }
if (child.stderr) { if (child.stderr) {
child.stderr.on("data", (data) => { child.stderr.on("data", (data) => {

View file

@ -0,0 +1,61 @@
/**
* spawn-worker.js Shared helpers for parallel worker process management.
*
* Purpose: consolidate duplicated `resolveSfBin()` and NDJSON stdout buffering
* logic that appeared identically in both parallel-orchestrator.js and
* slice-parallel-orchestrator.js. Single source of truth for the SF binary
* path resolution strategy.
*
* Consumer: parallel-orchestrator.js, slice-parallel-orchestrator.js.
*/
import { existsSync } from "node:fs";
import { join } from "node:path";
/**
* Resolve the absolute path to the SF CLI binary (dist/loader.js).
* Uses SF_BIN_PATH env var first (set by loader.ts), then falls back to
* relative path discovery from this file's location.
*/
export function resolveSfBin() {
if (process.env.SF_BIN_PATH && existsSync(process.env.SF_BIN_PATH)) {
return process.env.SF_BIN_PATH;
}
let thisDir;
try {
thisDir = import.meta.dirname;
} catch {
thisDir = process.cwd();
}
const candidates = [
join(thisDir, "..", "..", "..", "loader.js"),
join(thisDir, "..", "..", "..", "..", "dist", "loader.js"),
];
for (const candidate of candidates) {
if (existsSync(candidate)) return candidate;
}
return null;
}
/**
* Attach a line-buffered NDJSON reader to a child process stdout stream.
* Splits incoming data on newlines, calls onLine for each complete line,
* and flushes any remaining buffer when the stream closes.
*
* Consumer: spawnWorker/spawnSliceWorker in each orchestrator.
*/
export function monitorNdjsonStdout(stream, onLine) {
let buffer = "";
stream.on("data", (data) => {
buffer += data.toString();
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
onLine(line);
}
});
stream.on("close", () => {
if (buffer.trim()) {
onLine(buffer);
}
});
}

View file

@ -69,64 +69,3 @@ export function estimateTokensForProvider(text, provider) {
const ratio = getCharsPerToken(provider); const ratio = getCharsPerToken(provider);
return Math.ceil(text.length / ratio); return Math.ceil(text.length / ratio);
} }
/**
* Parse Google Gemini CLI API key JSON to extract token and project ID.
*/
export function parseGoogleGeminiCliApiKey(apiKeyRaw) {
try {
const parsed = JSON.parse(apiKeyRaw);
if (
typeof parsed.token !== "string" ||
typeof parsed.projectId !== "string"
) {
return undefined;
}
if (!parsed.token || !parsed.projectId) {
return undefined;
}
return { token: parsed.token, projectId: parsed.projectId };
} catch {
return undefined;
}
}
async function buildGoogleGeminiCliServer(apiKeyRaw) {
const credentials = parseGoogleGeminiCliApiKey(apiKeyRaw);
if (!credentials) {
throw new Error("Invalid Google Gemini CLI credentials");
}
const [{ CodeAssistServer }, { OAuth2Client }] = await Promise.all([
import("@google/gemini-cli-core"),
import("google-auth-library"),
]);
const authClient = new OAuth2Client();
authClient.setCredentials({ access_token: credentials.token });
return new CodeAssistServer(authClient, credentials.projectId, {
headers: {},
});
}
/**
* Type guard for Google Gemini token counting payload.
*/
export function isGoogleGeminiCountablePayload(payload) {
if (!payload || typeof payload !== "object") return false;
const candidate = payload;
return (
typeof candidate.model === "string" && Array.isArray(candidate.contents)
);
}
/**
* Count tokens in a Google Gemini CLI request using their server API.
*/
export async function countGoogleGeminiCliTokens(
payload,
apiKeyRaw,
deps = { buildServer: buildGoogleGeminiCliServer },
) {
if (!apiKeyRaw || !isGoogleGeminiCountablePayload(payload)) return undefined;
const response = await (await deps.buildServer(apiKeyRaw)).countTokens(
payload,
);
return typeof response.totalTokens === "number"
? response.totalTokens
: undefined;
}