refactor(memory): consolidate memory system, remove dead code

- Delete memory-backfill.js — not imported anywhere, dead code
- Rename memory-sleeper.js → tool-watchdog.js — misnamed; it is a
  tool-output watchdog with no relation to the memory store
- Collapse memory-embeddings-llm-gateway.js into memory-embeddings.js —
  removes the lazy-import split; loadGatewayConfigFromEnv,
  createGatewayEmbedFn, and rerankCandidates are now direct exports
- Remove buildEmbeddingFn() dead stub (always returned null)
- Enable packages/coding-agent memory extraction extension by default
  (memory.enabled ?? true) so session-level extraction is active
- Update all import sites and tests

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Mikael Hugo 2026-05-10 18:17:49 +02:00
parent 2a1309d127
commit a77e1551d2
9 changed files with 304 additions and 434 deletions

View file

@ -1232,7 +1232,7 @@ export class SettingsManager {
summaryInjectionTokenLimit: number;
} {
return {
enabled: this.settings.memory?.enabled ?? false,
enabled: this.settings.memory?.enabled ?? true,
maxRolloutsPerStartup: this.settings.memory?.maxRolloutsPerStartup ?? 64,
maxRolloutAgeDays: this.settings.memory?.maxRolloutAgeDays ?? 30,
minRolloutIdleHours: this.settings.memory?.minRolloutIdleHours ?? 12,

View file

@ -1,5 +1,12 @@
import { spawnSync } from "node:child_process";
import { existsSync, readdirSync } from "node:fs";
import {
existsSync,
mkdirSync,
readdirSync,
readFileSync,
unlinkSync,
writeFileSync,
} from "node:fs";
import { join, relative, resolve } from "node:path";
import { isToolCallEventType } from "@singularity-forge/coding-agent";
import { resetAskUserQuestionsCache } from "../../ask-user-questions.js";
@ -15,6 +22,7 @@ import {
markToolEnd,
markToolStart,
recordToolInvocationError,
startAutoDetached,
} from "../auto.js";
import {
applyCompletionNudgeTemperature,
@ -39,9 +47,9 @@ import {
selectLearnedModel,
} from "../learning/runtime.js";
import {
observeMemorySleeperToolResult,
resetMemorySleeper,
} from "../memory-sleeper.js";
observeToolResult,
resetToolWatchdog,
} from "../tool-watchdog.js";
import { initNotificationStore } from "../notification-store.js";
import { initNotificationWidget } from "../notification-widget.js";
import {
@ -106,6 +114,42 @@ import {
// printed it before the TUI launched. Only re-print on /clear (subsequent sessions).
let isFirstSession = true;
let lastGeminiPreflightWarning;
const COMPACTION_AUTO_RESUME_FILE = "compaction-auto-resume.json";
function compactionResumePath(basePath) {
return join(basePath, ".sf", "runtime", COMPACTION_AUTO_RESUME_FILE);
}
function writeCompactionResumeMarker(basePath, marker) {
try {
const runtimeDir = join(basePath, ".sf", "runtime");
mkdirSync(runtimeDir, { recursive: true });
writeFileSync(
compactionResumePath(basePath),
JSON.stringify(marker, null, 2) + "\n",
"utf-8",
);
} catch {
// Non-fatal: compaction autoresume marker is best-effort.
}
}
function consumeCompactionResumeMarker(basePath) {
const markerPath = compactionResumePath(basePath);
if (!existsSync(markerPath)) return null;
try {
const marker = JSON.parse(readFileSync(markerPath, "utf-8"));
unlinkSync(markerPath);
return marker;
} catch {
try {
unlinkSync(markerPath);
} catch {
// Ignore cleanup errors.
}
return null;
}
}
export async function runAgentEndMemoryBackfill(runBackfill) {
try {
@ -209,7 +253,7 @@ export function registerHooks(pi, ecosystemHandlers = []) {
pi.on("session_start", async (_event, ctx) => {
lastGeminiPreflightWarning = undefined;
resetLearningRuntime();
resetMemorySleeper();
resetToolWatchdog();
try {
const sid = ctx.sessionManager?.getSessionId?.() ?? "";
const _sfile = ctx.sessionManager?.getSessionFile?.() ?? "";
@ -434,11 +478,38 @@ export function registerHooks(pi, ecosystemHandlers = []) {
} catch {
/* non-fatal — requirement promoter must never block session start */
}
// Compaction should never behave like a stop boundary. If autonomous mode
// was active when compaction happened, continue automatically on session start.
try {
if (!isAutoActive() && !isAutoPaused()) {
const marker = consumeCompactionResumeMarker(process.cwd());
if (
marker?.reason === "session_before_compact" &&
marker?.autoActive === true &&
marker?.basePath === process.cwd()
) {
ctx.ui?.notify?.(
"Resuming autonomous mode after compaction.",
"info",
);
startAutoDetached(ctx, pi, process.cwd(), marker?.verbose === true, {
step: marker?.stepMode === true,
canAskUser: marker?.canAskUser !== false,
milestoneLock:
typeof marker?.milestoneLock === "string"
? marker.milestoneLock
: null,
});
}
}
} catch {
/* non-fatal — autoresume must never block session start */
}
});
pi.on("session_switch", async (_event, ctx) => {
lastGeminiPreflightWarning = undefined;
resetLearningRuntime();
resetMemorySleeper();
resetToolWatchdog();
initNotificationStore(process.cwd());
installNotifyInterceptor(ctx);
resetWriteGateState();
@ -689,6 +760,21 @@ export function registerHooks(pi, ecosystemHandlers = []) {
},
},
};
if (isAutoActive()) {
writeCompactionResumeMarker(basePath, {
reason: "session_before_compact",
ts: new Date().toISOString(),
autoActive: true,
basePath,
stepMode: session?.stepMode === true,
canAskUser: isCanAskUser(),
milestoneLock:
typeof session?.sessionMilestoneLock === "string"
? session.sessionMilestoneLock
: null,
verbose: session?.verbose === true,
});
}
// Persist compaction summary as the session's most recent work description
// so memory-pipeline ingestion has a compact semantic handle for retrieval.
try {
@ -1060,11 +1146,11 @@ export function registerHooks(pi, ecosystemHandlers = []) {
) {
markResearchTerminalTransition();
}
const steer = observeMemorySleeperToolResult(event);
const steer = observeToolResult(event);
if (steer) {
pi.sendMessage(
{
customType: "sf-memory-sleeper",
customType: "sf-tool-watchdog",
content: steer.content,
display: false,
details: {

View file

@ -219,7 +219,7 @@ async function handleSearch(ctx, parsed) {
return;
}
const { loadGatewayConfigFromEnv } = await import(
"./memory-embeddings-llm-gateway.js"
"./memory-embeddings.js"
);
const gatewayConfig = loadGatewayConfigFromEnv();
const usingEmbeddings = !!gatewayConfig;
@ -367,7 +367,7 @@ async function handleStatus(ctx) {
}
const dbStatus = readMemoryDbStatus(adapter);
const { loadGatewayConfigFromEnv, createGatewayEmbedFn, rerankCandidates } =
await import("./memory-embeddings-llm-gateway.js");
await import("./memory-embeddings.js");
const gatewayConfig = loadGatewayConfigFromEnv();
let embeddingProbe = { ok: false, status: "not_configured", latencyMs: null };
let rerankProbe = { ok: false, status: "not_configured", latencyMs: null };
@ -419,7 +419,7 @@ async function handleBackfill(ctx) {
}
const before = readMemoryDbStatus(adapter);
const { loadGatewayConfigFromEnv } = await import(
"./memory-embeddings-llm-gateway.js"
"./memory-embeddings.js"
);
const gatewayConfig = loadGatewayConfigFromEnv();
if (!gatewayConfig) {

View file

@ -1,104 +0,0 @@
// SF — Decisions -> memories backfill
//
// Idempotent one-shot migration that copies every active decisions row into
// the memories table with category="architecture". Idempotency is enforced
// by tagging each backfilled memory's content with the original decision ID
// via a structured prefix and skipping any decision whose ID already appears
// in the memories table.
//
// Triggered opportunistically by buildBeforeAgentStartResult so the cost
// only ever fires once per project. Costs O(N) inserts on first run where
// N is the active-decisions count; subsequent runs are an O(N) lookup that
// finds existing markers and exits.
import { createMemory } from "./memory-store.js";
import { _getAdapter, isDbAvailable } from "./sf-db.js";
import { logWarning } from "./workflow-logger.js";
/**
* Backfill active decisions rows into the memories table.
*
* - Idempotent (per-row): every row written embeds
* `[decision:${decisionId}]` as a prefix in the content so we can
* detect existing backfills via a LIKE query. Only decisions whose id
* is already present in the memory store are skipped.
* - Best-effort: never throws. Logs and returns 0 on failure so a broken
* backfill cannot block agent startup.
* - Active-only: skips rows where `superseded_by IS NOT NULL`. Superseded
* decisions are historical record; the memory store is for active
* knowledge.
*
* Returns the number of memories written (0 when already backfilled or
* when the DB has no decisions). Callers can log the result or surface it
* to the user.
*/
export function backfillDecisionsToMemories() {
if (!isDbAvailable()) return 0;
const adapter = _getAdapter();
if (!adapter) return 0;
try {
const decisions = adapter
.prepare(
"SELECT id, when_context, scope, decision, choice, rationale, made_by, revisable, superseded_by FROM decisions WHERE superseded_by IS NULL",
)
.all();
if (decisions.length === 0) return 0;
// Per-row idempotency: each backfilled memory starts with
// "[decision:<id>]" in the content. Detect existing rows via LIKE.
const checkExisting = adapter.prepare(
"SELECT 1 FROM memories WHERE content LIKE :pattern LIMIT 1",
);
let written = 0;
for (const raw of decisions) {
const row = {
id: String(raw["id"] ?? ""),
when_context: String(raw["when_context"] ?? ""),
scope: String(raw["scope"] ?? ""),
decision: String(raw["decision"] ?? ""),
choice: String(raw["choice"] ?? ""),
rationale: String(raw["rationale"] ?? ""),
made_by: String(raw["made_by"] ?? "agent"),
revisable: String(raw["revisable"] ?? ""),
superseded_by:
raw["superseded_by"] == null ? null : String(raw["superseded_by"]),
};
if (!row.id) continue;
if (checkExisting.get({ ":pattern": `[decision:${row.id}] %` })) continue;
const content = synthesizeContent(row);
const id = createMemory({
category: "architecture",
content,
confidence: 0.85,
});
if (id) written += 1;
}
return written;
} catch (e) {
logWarning(
"memory-backfill",
`decisions->memories backfill failed: ${e.message}`,
);
return 0;
}
}
/**
* Combine the decision's structured fields into a 1-3 sentence content
* string suitable for keyword retrieval and human review.
*
* Format: "[decision:<id>] <decision> Chose: <choice>. Rationale: <rationale>."
* The "[decision:<id>]" prefix enables idempotent backfill detection.
* Truncates each field to keep the synthesized line under ~600 chars.
*/
function synthesizeContent(row) {
const trim = (value, max) => {
const cleaned = value.replace(/\s+/g, " ").trim();
return cleaned.length > max ? cleaned.slice(0, max - 1) + "…" : cleaned;
};
const parts = [`[decision:${row.id}]`];
const decision = trim(row.decision, 240);
const choice = trim(row.choice, 200);
const rationale = trim(row.rationale, 200);
if (decision) parts.push(decision);
if (choice) parts.push(`Chose: ${choice}.`);
if (rationale) parts.push(`Rationale: ${rationale}.`);
return parts.join(" ");
}

View file

@ -1,252 +0,0 @@
// SF Memory Embeddings — LLM Gateway adapter
//
// Speaks the OpenAI-shaped /v1/embeddings and /v1/rerank protocols against
// a custom inference-fabric llm-gateway endpoint. Returns null when the
// gateway is not configured (env var unset / unreachable / catalog empty),
// so the consumer can fall through to keyword-only ranking without
// surfacing errors.
//
// Why a separate module: keeping gateway-specific HTTP, headers, and error
// shapes out of memory-embeddings.ts (which is provider-agnostic) means the
// embed-fn discovery surface stays clean and the gateway can be swapped or
// disabled without touching the consumer.
import { logWarning } from "./workflow-logger.js";
const DEFAULT_TIMEOUT_MS = 30_000;
// Throttle the "rerank worker offline" warning so per-query log spam doesn't
// drown out other diagnostics when SF_LLM_GATEWAY_RERANK_MODEL is set but no
// worker is online — the soft-degrade is expected in that state.
let lastRerankUnavailableLogAt = 0;
const RERANK_UNAVAILABLE_LOG_THROTTLE_MS = 60_000;
function logRerankUnavailable(msg) {
const now = Date.now();
if (now - lastRerankUnavailableLogAt < RERANK_UNAVAILABLE_LOG_THROTTLE_MS) {
return;
}
lastRerankUnavailableLogAt = now;
logWarning("memory-embeddings", msg);
}
// Circuit breaker for the embed path. When the remote gateway is unreachable
// (network timeout, cold-start stall), each call would otherwise wait the full
// DEFAULT_TIMEOUT_MS (30 s) before failing. After EMBED_CIRCUIT_THRESHOLD
// consecutive failures the circuit opens for EMBED_CIRCUIT_OPEN_MS and returns
// [] immediately — callers fall through to keyword-only ranking with no stall.
// The circuit half-opens automatically after the cooldown expires.
const EMBED_CIRCUIT_THRESHOLD = 3;
const EMBED_CIRCUIT_OPEN_MS = 60_000;
const embedCircuit = { failures: 0, openUntil: 0, lastLogAt: 0 };
function embedCircuitIsOpen() {
return embedCircuit.openUntil > Date.now();
}
function onEmbedSuccess() {
embedCircuit.failures = 0;
embedCircuit.openUntil = 0;
}
function onEmbedFailure() {
embedCircuit.failures += 1;
if (embedCircuit.failures >= EMBED_CIRCUIT_THRESHOLD) {
embedCircuit.openUntil = Date.now() + EMBED_CIRCUIT_OPEN_MS;
const now = Date.now();
if (now - embedCircuit.lastLogAt >= EMBED_CIRCUIT_OPEN_MS) {
embedCircuit.lastLogAt = now;
logWarning(
"memory-embeddings",
`llm-gateway /embeddings circuit open after ${EMBED_CIRCUIT_THRESHOLD} failures; ` +
`skipping embed for ${EMBED_CIRCUIT_OPEN_MS / 1000}s — memory search falls back to keyword ranking`,
);
}
}
}
const ENV_KEY = "SF_LLM_GATEWAY_KEY";
const ENV_URL = "SF_LLM_GATEWAY_URL";
const ENV_EMBED_MODEL = "SF_LLM_GATEWAY_EMBED_MODEL";
const ENV_RERANK_MODEL = "SF_LLM_GATEWAY_RERANK_MODEL";
const ENV_EMBED_QUERY_INSTRUCTION = "SF_LLM_GATEWAY_EMBED_QUERY_INSTRUCTION";
const DEFAULT_EMBEDDING_MODEL = "Qwen/Qwen3-Embedding-4B";
const DEFAULT_RERANK_MODEL = "Qwen/Qwen3-Reranker-0.6B";
// Qwen3-Embedding uses asymmetric retrieval: queries are prefixed with a task
// instruction so the model projects them into the "query" region of the embedding
// space, while document texts are sent as-is (no instruction) so they land in
// the "passage" region. Mixing these correctly is critical for retrieval quality.
//
// Format expected by the model: "Instruct: <task>\nQuery: " followed by the
// query text (the gateway appends the text to the instruction). Documents omit
// the instruction entirely.
//
// References: Qwen3-Embedding model card (HuggingFace) §Asymmetric Retrieval.
const DEFAULT_QUERY_INSTRUCTION =
"Instruct: Retrieve relevant software engineering memories, facts, and project decisions for the given query\nQuery: ";
const KEY_ALIASES = [
ENV_KEY,
"LLM_GATEWAY_API_KEY",
"LLM_GATEWAY_BEARER_KEY",
"LLM_MUX_API_KEY",
];
const URL_ALIASES = [ENV_URL, "LLM_GATEWAY_BASE_URL", "LLM_MUX_BASE_URL"];
function firstEnvEntry(keys) {
for (const key of keys) {
const value = process.env[key]?.trim();
if (value) return { key, value };
}
return null;
}
function firstEnvValue(keys) {
return firstEnvEntry(keys)?.value ?? "";
}
/** Read gateway config from env. Returns null when SF_LLM_GATEWAY_KEY is
* missing the gateway path is opt-in and silently absent otherwise.
*
* `queryInstruction` is the Qwen3-style task instruction prepended to query
* texts during retrieval. Document texts (backfill) are sent without it.
* Override via SF_LLM_GATEWAY_EMBED_QUERY_INSTRUCTION. */
export function loadGatewayConfigFromEnv() {
const keyEntry = firstEnvEntry(KEY_ALIASES);
if (!keyEntry) return null;
const urlEntry = firstEnvEntry(URL_ALIASES);
const url = urlEntry?.value || "https://llm-gateway.centralcloud.com/v1";
const embeddingModel =
firstEnvValue([ENV_EMBED_MODEL]) || DEFAULT_EMBEDDING_MODEL;
const rerankModel = firstEnvValue([ENV_RERANK_MODEL]) || DEFAULT_RERANK_MODEL;
const queryInstruction =
firstEnvValue([ENV_EMBED_QUERY_INSTRUCTION]) || DEFAULT_QUERY_INSTRUCTION;
return {
url,
apiKey: keyEntry.value,
keySource: keyEntry.key,
urlSource: urlEntry?.key ?? "default",
embeddingModel,
rerankModel,
queryInstruction,
};
}
/** Build an EmbedFn that posts to <url>/embeddings with Bearer auth.
* Returns Float32Array[] in the same order as the input. Throws on HTTP
* errors so the caller (embedMemories) logs and counts as zero.
* A circuit breaker short-circuits to [] after EMBED_CIRCUIT_THRESHOLD
* consecutive failures so a down/cold gateway never stalls the caller for
* the full 30 s timeout on every call.
*
* `opts.instruction` when set, included as the top-level `instruction`
* field in the request body. Qwen3-Embedding uses this for asymmetric
* retrieval: pass `config.queryInstruction` for query embeddings; omit for
* document/memory backfill so passages land in the correct embedding region. */
export function createGatewayEmbedFn(config, opts) {
return async (texts) => {
if (texts.length === 0) return [];
// Circuit open — fail fast, no network call.
if (embedCircuitIsOpen()) return [];
const controller = new AbortController();
const timeout = setTimeout(
() => controller.abort(),
config.timeoutMs ?? DEFAULT_TIMEOUT_MS,
);
try {
const body = {
model: config.embeddingModel,
input: texts,
};
if (opts?.instruction) {
body.instruction = opts.instruction;
}
const res = await fetch(`${config.url}/embeddings`, {
method: "POST",
headers: {
Authorization: `Bearer ${config.apiKey}`,
"Content-Type": "application/json",
},
body: JSON.stringify(body),
signal: controller.signal,
});
if (!res.ok) {
const body = await res.text().catch(() => "");
// Throw immediately — the outer catch handles onEmbedFailure once.
throw new Error(
`llm-gateway /embeddings ${res.status}: ${body.slice(0, 200)}`,
);
}
const json = await res.json();
if (!Array.isArray(json.data)) {
// Throw — the outer catch handles onEmbedFailure once.
throw new Error("llm-gateway /embeddings: missing data array");
}
// Sort by index to handle out-of-order responses defensively.
const sorted = [...json.data].sort((a, b) => a.index - b.index);
const result = sorted.map((d) => Float32Array.from(d.embedding));
onEmbedSuccess();
return result;
} catch (err) {
// Catch AbortError (timeout) and all thrown errors from above — all
// count as a circuit failure. onEmbedFailure is called exactly once
// per failed request regardless of failure mode.
onEmbedFailure();
throw err;
} finally {
clearTimeout(timeout);
}
};
}
/** Score candidates against a query via <url>/rerank. Returns null when no
* rerank model is configured OR the gateway has no rerank worker online
* callers should treat null as "skip the rerank pass". */
export async function rerankCandidates(config, query, candidates) {
if (!config.rerankModel) return null;
if (candidates.length === 0) return [];
const controller = new AbortController();
const timeout = setTimeout(
() => controller.abort(),
config.timeoutMs ?? DEFAULT_TIMEOUT_MS,
);
try {
const res = await fetch(`${config.url}/rerank`, {
method: "POST",
headers: {
Authorization: `Bearer ${config.apiKey}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
model: config.rerankModel,
query,
documents: candidates.map((c) => c.text),
}),
signal: controller.signal,
});
if (res.status === 503 || res.status === 404) {
logRerankUnavailable(
`llm-gateway /rerank unavailable (${res.status}); falling back to non-reranked results`,
);
return null;
}
// Read once — the gateway sometimes returns 200 with a plain-text body
// like "no worker with rerank capability is available", so we can't
// branch on res.ok before peeking at the body.
const bodyText = await res.text().catch(() => "");
if (/no worker.*rerank/i.test(bodyText)) {
logRerankUnavailable(
"llm-gateway /rerank: no worker capability available",
);
return null;
}
if (!res.ok) {
throw new Error(
`llm-gateway /rerank ${res.status}: ${bodyText.slice(0, 200)}`,
);
}
let json;
try {
json = JSON.parse(bodyText);
} catch {
throw new Error(
`llm-gateway /rerank: malformed JSON response (${bodyText.slice(0, 200)})`,
);
}
if (!Array.isArray(json.results)) {
throw new Error("llm-gateway /rerank: missing results array");
}
return json.results.map((r) => ({
id: candidates[r.index]?.id ?? String(r.index),
score: r.relevance_score,
}));
} finally {
clearTimeout(timeout);
}
}

View file

@ -1,16 +1,13 @@
// SF Memory Embeddings — provider-agnostic embedding layer
// SF Memory Embeddings — embedding, reranking, and vector ranking
//
// Active providers:
// - llm-gateway adapter (memory-embeddings-llm-gateway.ts) — opt-in via
// SF_LLM_GATEWAY_KEY; the only working write path today.
// - buildEmbeddingFn (Pi-SDK provider scan) — discovery hook only;
// returns null because the SDK has no provider-neutral embedding
// API yet. Kept as the future plug-in surface.
// OpenAI-shaped /v1/embeddings and /v1/rerank against a configurable
// gateway endpoint (opt-in via SF_LLM_GATEWAY_KEY). All paths degrade
// gracefully to keyword-only ranking when no gateway is configured.
//
// Read pipeline used by `getRelevantMemoriesRanked` (memory-store.ts):
// Read pipeline used by `getRelevantMemoriesRanked` (memory-store.js):
// 1. cosine ranking via rankMemoriesByEmbedding (uses loadEmbeddingMap)
// 2. relation-boost via applyRelationBoost (uses memory_relations edges)
// 3. optional rerank via memory-embeddings-llm-gateway/rerankCandidates
// 3. optional rerank via rerankCandidates
//
// When no gateway is configured (or the worker is offline), all three
// pipeline stages soft-degrade and `getRelevantMemoriesRanked` falls back
@ -23,44 +20,196 @@ import {
} from "./sf-db.js";
import { logWarning } from "./workflow-logger.js";
// ─── Model selection ────────────────────────────────────────────────────────
const EMBEDDING_ID_HINTS = [
"embed",
"embedding",
"voyage",
"text-embedding",
"nomic",
"jina-embed",
"bge",
"mxbai-embed",
// ─── Gateway config ──────────────────────────────────────────────────────────
const DEFAULT_TIMEOUT_MS = 30_000;
const ENV_KEY = "SF_LLM_GATEWAY_KEY";
const ENV_URL = "SF_LLM_GATEWAY_URL";
const ENV_EMBED_MODEL = "SF_LLM_GATEWAY_EMBED_MODEL";
const ENV_RERANK_MODEL = "SF_LLM_GATEWAY_RERANK_MODEL";
const ENV_EMBED_QUERY_INSTRUCTION = "SF_LLM_GATEWAY_EMBED_QUERY_INSTRUCTION";
const DEFAULT_EMBEDDING_MODEL = "Qwen/Qwen3-Embedding-4B";
const DEFAULT_RERANK_MODEL = "Qwen/Qwen3-Reranker-0.6B";
const DEFAULT_QUERY_INSTRUCTION =
"Instruct: Retrieve relevant software engineering memories, facts, and project decisions for the given query\nQuery: ";
const KEY_ALIASES = [
ENV_KEY,
"LLM_GATEWAY_API_KEY",
"LLM_GATEWAY_BEARER_KEY",
"LLM_MUX_API_KEY",
];
/**
* Try to build an embedding function from the model registry. Returns null
* when no embedding-capable model is obvious from the registry metadata.
*
* NOTE: the Pi SDK doesn't yet expose a dedicated embeddings API for every
* provider. This implementation currently targets Anthropic / OpenAI-shaped
* SDKs: when the caller has direct API access via `ctx.modelRegistry`, they
* can wire this up by providing an `embedFn` override. We ship the hint-based
* detection here so future providers can plug in without touching callers.
*/
export function buildEmbeddingFn(ctx) {
try {
const available = ctx.modelRegistry?.getAvailable?.();
if (!available || available.length === 0) return null;
const candidate = available.find((model) => {
const id = typeof model?.id === "string" ? model.id.toLowerCase() : "";
return EMBEDDING_ID_HINTS.some((hint) => id.includes(hint));
});
if (!candidate) return null;
// We don't currently have a provider-neutral embedding call in Pi; the
// detection surface is in place so wiring can happen once Pi offers it.
return null;
} catch (err) {
logWarning("memory-embeddings", `model discovery failed: ${err.message}`);
return null;
const URL_ALIASES = [ENV_URL, "LLM_GATEWAY_BASE_URL", "LLM_MUX_BASE_URL"];
function firstEnvEntry(keys) {
for (const key of keys) {
const value = process.env[key]?.trim();
if (value) return { key, value };
}
return null;
}
function firstEnvValue(keys) {
return firstEnvEntry(keys)?.value ?? "";
}
/** Read gateway config from env. Returns null when no key is present — opt-in. */
export function loadGatewayConfigFromEnv() {
const keyEntry = firstEnvEntry(KEY_ALIASES);
if (!keyEntry) return null;
const urlEntry = firstEnvEntry(URL_ALIASES);
const url = urlEntry?.value || "https://llm-gateway.centralcloud.com/v1";
const embeddingModel =
firstEnvValue([ENV_EMBED_MODEL]) || DEFAULT_EMBEDDING_MODEL;
const rerankModel = firstEnvValue([ENV_RERANK_MODEL]) || DEFAULT_RERANK_MODEL;
const queryInstruction =
firstEnvValue([ENV_EMBED_QUERY_INSTRUCTION]) || DEFAULT_QUERY_INSTRUCTION;
return {
url,
apiKey: keyEntry.value,
keySource: keyEntry.key,
urlSource: urlEntry?.key ?? "default",
embeddingModel,
rerankModel,
queryInstruction,
};
}
// ─── Circuit breaker ─────────────────────────────────────────────────────────
const EMBED_CIRCUIT_THRESHOLD = 3;
const EMBED_CIRCUIT_OPEN_MS = 60_000;
const embedCircuit = { failures: 0, openUntil: 0, lastLogAt: 0 };
function embedCircuitIsOpen() {
return embedCircuit.openUntil > Date.now();
}
function onEmbedSuccess() {
embedCircuit.failures = 0;
embedCircuit.openUntil = 0;
}
function onEmbedFailure() {
embedCircuit.failures += 1;
if (embedCircuit.failures >= EMBED_CIRCUIT_THRESHOLD) {
embedCircuit.openUntil = Date.now() + EMBED_CIRCUIT_OPEN_MS;
const now = Date.now();
if (now - embedCircuit.lastLogAt >= EMBED_CIRCUIT_OPEN_MS) {
embedCircuit.lastLogAt = now;
logWarning(
"memory-embeddings",
`gateway /embeddings circuit open after ${EMBED_CIRCUIT_THRESHOLD} failures; ` +
`skipping for ${EMBED_CIRCUIT_OPEN_MS / 1000}s — falling back to keyword ranking`,
);
}
}
}
// ─── Rerank throttle ─────────────────────────────────────────────────────────
let lastRerankUnavailableLogAt = 0;
const RERANK_UNAVAILABLE_LOG_THROTTLE_MS = 60_000;
function logRerankUnavailable(msg) {
const now = Date.now();
if (now - lastRerankUnavailableLogAt < RERANK_UNAVAILABLE_LOG_THROTTLE_MS)
return;
lastRerankUnavailableLogAt = now;
logWarning("memory-embeddings", msg);
}
// ─── Gateway HTTP ─────────────────────────────────────────────────────────────
/** Build an embed function that posts to <url>/embeddings with Bearer auth. */
export function createGatewayEmbedFn(config, opts) {
return async (texts) => {
if (texts.length === 0) return [];
if (embedCircuitIsOpen()) return [];
const controller = new AbortController();
const timeout = setTimeout(
() => controller.abort(),
config.timeoutMs ?? DEFAULT_TIMEOUT_MS,
);
try {
const body = { model: config.embeddingModel, input: texts };
if (opts?.instruction) body.instruction = opts.instruction;
const res = await fetch(`${config.url}/embeddings`, {
method: "POST",
headers: {
Authorization: `Bearer ${config.apiKey}`,
"Content-Type": "application/json",
},
body: JSON.stringify(body),
signal: controller.signal,
});
if (!res.ok) {
const bodyText = await res.text().catch(() => "");
throw new Error(
`gateway /embeddings ${res.status}: ${bodyText.slice(0, 200)}`,
);
}
const json = await res.json();
if (!Array.isArray(json.data))
throw new Error("gateway /embeddings: missing data array");
const sorted = [...json.data].sort((a, b) => a.index - b.index);
const result = sorted.map((d) => Float32Array.from(d.embedding));
onEmbedSuccess();
return result;
} catch (err) {
onEmbedFailure();
throw err;
} finally {
clearTimeout(timeout);
}
};
}
/** Score candidates against a query via /rerank. Returns null when unavailable. */
export async function rerankCandidates(config, query, candidates) {
if (!config.rerankModel) return null;
if (candidates.length === 0) return [];
const controller = new AbortController();
const timeout = setTimeout(
() => controller.abort(),
config.timeoutMs ?? DEFAULT_TIMEOUT_MS,
);
try {
const res = await fetch(`${config.url}/rerank`, {
method: "POST",
headers: {
Authorization: `Bearer ${config.apiKey}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
model: config.rerankModel,
query,
documents: candidates.map((c) => c.text),
}),
signal: controller.signal,
});
if (res.status === 503 || res.status === 404) {
logRerankUnavailable(
`gateway /rerank unavailable (${res.status}); falling back to cosine order`,
);
return null;
}
const bodyText = await res.text().catch(() => "");
if (/no worker.*rerank/i.test(bodyText)) {
logRerankUnavailable("gateway /rerank: no worker capability available");
return null;
}
if (!res.ok)
throw new Error(
`gateway /rerank ${res.status}: ${bodyText.slice(0, 200)}`,
);
let json;
try {
json = JSON.parse(bodyText);
} catch {
throw new Error(
`gateway /rerank: malformed JSON (${bodyText.slice(0, 200)})`,
);
}
if (!Array.isArray(json.results))
throw new Error("gateway /rerank: missing results array");
return json.results.map((r) => ({
id: candidates[r.index]?.id ?? String(r.index),
score: r.relevance_score,
}));
} finally {
clearTimeout(timeout);
}
}
// ─── Vector (de)serialization ───────────────────────────────────────────────
export function packFloat32(vec) {
return new Uint8Array(vec.buffer, vec.byteOffset, vec.byteLength);
@ -245,9 +394,6 @@ export function rankMemoriesByEmbedding(
export async function embedQueryViaGateway(query) {
if (!query.trim()) return null;
try {
const { loadGatewayConfigFromEnv, createGatewayEmbedFn } = await import(
"./memory-embeddings-llm-gateway.js"
);
const cfg = loadGatewayConfigFromEnv();
if (!cfg) return null;
const embedFn = createGatewayEmbedFn(cfg, {
@ -344,11 +490,8 @@ export async function runEmbeddingBackfill(opts) {
if (backfillInFlight) return 0;
const max = opts?.maxPerInvocation ?? 50;
const batchSize = opts?.batchSize ?? 16;
const { loadGatewayConfigFromEnv, createGatewayEmbedFn } = await import(
"./memory-embeddings-llm-gateway.js"
);
const cfg = loadGatewayConfigFromEnv();
if (!cfg) return 0; // Gateway opt-in; absent config = no-op.
if (!cfg) return 0;
backfillInFlight = true;
let embedded = 0;
try {

View file

@ -212,7 +212,7 @@ export async function getRelevantMemoriesRanked(query, limit = 10) {
// the cosine-ranked order as-is — strictly additive precision boost.
try {
const { loadGatewayConfigFromEnv, rerankCandidates } = await import(
"./memory-embeddings-llm-gateway.js"
"./memory-embeddings.js"
);
const cfg = loadGatewayConfigFromEnv();
if (cfg?.rerankModel && topK.length > 1) {

View file

@ -203,16 +203,16 @@ describe("readFrozenDefinition error wrapping", () => {
});
});
// ─── memory-sleeper: seenKeys bounded (LOW) ────────────────────────────────
// ─── tool-watchdog: seenKeys bounded (LOW) ────────────────────────────────
describe("memory-sleeper seenKeys", () => {
test("resetMemorySleeper clears seenKeys", async () => {
const { resetMemorySleeper, observeMemorySleeperToolResult } = await import(
"../memory-sleeper.js"
const { resetToolWatchdog, observeToolResult } = await import(
"../tool-watchdog.js"
);
resetMemorySleeper();
resetToolWatchdog();
// After reset, the same event should be processed again
const result = observeMemorySleeperToolResult({
const result = observeToolResult({
toolName: "bash",
input: { command: "bun install" },
content: [{ type: "text", text: "ok" }],

View file

@ -1,11 +1,8 @@
// SF Memory Sleeper — runtime tool-output watchdog
// SF Tool Watchdog — runtime tool-output observer
//
// Despite the "memory" prefix, this module is unrelated to the
// memory-store / memory-relations / memory-embeddings pipeline. It
// observes tool-result events at runtime and emits a steer (info /
// Observes tool-result events at runtime and emits a steer (info /
// warning) when patterns like a repeated bash failure or a too-large
// tool result indicate the agent is stuck. Naming is historical —
// "sleeper" means a dormant watcher, not a stored memory.
// tool result indicate the agent is stuck.
const seenKeys = new Set();
const bashFailures = new Map();
const MAX_RESULT_CHARS = 6000;
@ -13,7 +10,7 @@ const REPEAT_FAILURE_WINDOW_MS = 10 * 60 * 1000;
// Maximum number of seen keys to retain before clearing; prevents unbounded
// accumulation across long autonomous mode sessions spanning many units.
const MAX_SEEN_KEYS = 200;
export function resetMemorySleeper() {
export function resetToolWatchdog() {
seenKeys.clear();
bashFailures.clear();
}
@ -38,7 +35,7 @@ function once(steer) {
}
function buildSteer(title, body) {
return [
`Memory sleeper steering: ${title}`,
`Tool watchdog steering: ${title}`,
"",
body,
"",
@ -109,7 +106,7 @@ function maybeRepeatedFailureSteer(event, text) {
),
});
}
export function observeMemorySleeperToolResult(event) {
export function observeToolResult(event) {
const text = contentText(event);
return (
maybeBunSteer(event) ??