fix(sf-db): share open-DB state across module instances via globalThis
Two SQLite connections were being opened in the same Node process when
the same module loaded under two graphs:
- the autonomous-loop side loads sf-db modules via normal ESM resolution
- src/headless-feedback.ts re-imports them via jiti.createJiti() so the
in-server `sf headless feedback ...` drain can call them without
bringing the agent extension into the rpc-mode bundle
Module-level `let currentDb / currentPath / currentPid` etc. lived on
two independent module instances, so each instance opened its own
SQLite handle to .sf/sf.db. WAL mode lets readers share, but two writer
connections in the same process produced SQLITE_BUSY / writer stalls —
the hang we saw on sf-mpa4g46x and the wedged-drainer recurrence after
the server restart at 19:35.
Fix: hoist the connection slot onto globalThis under a well-known
Symbol so every module instance points at the same record. All five
fields formerly module-level become `_sf.<field>` and live in one
shared object.
Codex's original diagnosis (split module-graph DB-writer contention)
was right; I dismissed it earlier because I missed that
headless-feedback uses jiti even though rpc-mode itself doesn't import
sf-db directly.
Verification:
- Syntax check: clean
- sf-db-migration.test.mjs: 12/13 pass. The one failure
(openDatabase_migrates_v27_tasks_without_created_at_through_spec_backfill
expects schema version 72, actual 73) is unrelated — a schema
migration landed elsewhere without bumping that test's expected
version.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
a3469f2334
commit
cc67970fa0
1 changed files with 65 additions and 48 deletions
|
|
@ -294,11 +294,28 @@ export function isEmptyMilestoneSpec(row) {
|
|||
(row["product_research_json"] ?? "") === ""
|
||||
);
|
||||
}
|
||||
let currentDb = null;
|
||||
let currentPath = null;
|
||||
let currentPid = 0;
|
||||
let _exitHandlerRegistered = false;
|
||||
let _dbOpenAttempted = false;
|
||||
// Process-wide singleton state for the open project DB connection.
|
||||
//
|
||||
// Stored on globalThis under a well-known Symbol so multiple module
|
||||
// instances loaded into the same Node process — e.g. one via normal ESM
|
||||
// resolution from the autonomous loop and one via `jiti.import()` from
|
||||
// `src/headless-feedback.ts` — share the same connection slot. Without
|
||||
// this shared backing each module instance opened its own SQLite handle to
|
||||
// the same `.sf/sf.db`, producing intra-process write contention (the
|
||||
// SQLITE_BUSY / writer-stall pattern that hung the in-process drainer for
|
||||
// `sf headless feedback ...` while the autonomous loop ran). See
|
||||
// sf-mpa4g46x-jixg3x for the original report and codex's diagnosis.
|
||||
//
|
||||
// All five fields formerly stored as module-level `let` are now read/written
|
||||
// through `_sf.<field>` so every module instance points at the same slot.
|
||||
const _SF_DB_GLOBAL = Symbol.for("singularity-forge:sf-db:state");
|
||||
const _sf = (globalThis[_SF_DB_GLOBAL] ??= {
|
||||
currentDb: null,
|
||||
currentPath: null,
|
||||
currentPid: 0,
|
||||
_exitHandlerRegistered: false,
|
||||
_dbOpenAttempted: false,
|
||||
});
|
||||
/**
|
||||
* Get the name of the SQLite provider currently loaded (or null if unavailable).
|
||||
*/
|
||||
|
|
@ -310,7 +327,7 @@ export function getDbProvider() {
|
|||
* Check if the database is currently open and available for queries.
|
||||
*/
|
||||
export function isDbAvailable() {
|
||||
return currentDb !== null;
|
||||
return _sf.currentDb !== null;
|
||||
}
|
||||
/**
|
||||
* Returns true if openDatabase() has been called at least once this session.
|
||||
|
|
@ -319,21 +336,21 @@ export function isDbAvailable() {
|
|||
* trigger a false degraded-mode warning.
|
||||
*/
|
||||
export function wasDbOpenAttempted() {
|
||||
return _dbOpenAttempted;
|
||||
return _sf._dbOpenAttempted;
|
||||
}
|
||||
/**
|
||||
* Get the current database adapter, or null if the database is not open.
|
||||
*/
|
||||
export function getDatabase() {
|
||||
return currentDb;
|
||||
return _sf.currentDb;
|
||||
}
|
||||
/**
|
||||
* Open the database at the specified path. Returns true if successful.
|
||||
*/
|
||||
export function openDatabase(path) {
|
||||
_dbOpenAttempted = true;
|
||||
if (currentDb && currentPath !== path) closeDatabase();
|
||||
if (currentDb && currentPath === path) return true;
|
||||
_sf._dbOpenAttempted = true;
|
||||
if (_sf.currentDb && _sf.currentPath !== path) closeDatabase();
|
||||
if (_sf.currentDb && _sf.currentPath === path) return true;
|
||||
const rawDb = openRawDb(path);
|
||||
if (!rawDb) return false;
|
||||
const adapter = createAdapter(rawDb);
|
||||
|
|
@ -374,11 +391,11 @@ export function openDatabase(path) {
|
|||
throw err;
|
||||
}
|
||||
}
|
||||
currentDb = adapter;
|
||||
currentPath = path;
|
||||
currentPid = process.pid;
|
||||
if (!_exitHandlerRegistered) {
|
||||
_exitHandlerRegistered = true;
|
||||
_sf.currentDb = adapter;
|
||||
_sf.currentPath = path;
|
||||
_sf.currentPid = process.pid;
|
||||
if (!_sf._exitHandlerRegistered) {
|
||||
_sf._exitHandlerRegistered = true;
|
||||
process.on("exit", () => {
|
||||
try {
|
||||
closeDatabase();
|
||||
|
|
@ -404,9 +421,9 @@ export function openDatabase(path) {
|
|||
* Consumer: runFinalize() in auto/phases.js after each successful unit.
|
||||
*/
|
||||
export function checkpointWal() {
|
||||
if (!currentDb) return;
|
||||
if (!_sf.currentDb) return;
|
||||
try {
|
||||
currentDb.exec("PRAGMA wal_checkpoint(PASSIVE)");
|
||||
_sf.currentDb.exec("PRAGMA wal_checkpoint(PASSIVE)");
|
||||
} catch (e) {
|
||||
logWarning("db", `WAL checkpoint failed: ${getErrorMessage(e)}`);
|
||||
}
|
||||
|
|
@ -416,27 +433,27 @@ export function checkpointWal() {
|
|||
* Close the database connection.
|
||||
*/
|
||||
export function closeDatabase() {
|
||||
if (currentDb) {
|
||||
if (_sf.currentDb) {
|
||||
try {
|
||||
currentDb.exec("PRAGMA wal_checkpoint(TRUNCATE)");
|
||||
_sf.currentDb.exec("PRAGMA wal_checkpoint(TRUNCATE)");
|
||||
} catch (e) {
|
||||
logWarning("db", `WAL checkpoint failed: ${e.message}`);
|
||||
}
|
||||
try {
|
||||
// Incremental vacuum to reclaim space without blocking
|
||||
currentDb.exec("PRAGMA incremental_vacuum(64)");
|
||||
_sf.currentDb.exec("PRAGMA incremental_vacuum(64)");
|
||||
} catch (e) {
|
||||
logWarning("db", `incremental vacuum failed: ${e.message}`);
|
||||
}
|
||||
try {
|
||||
currentDb.close();
|
||||
_sf.currentDb.close();
|
||||
} catch (e) {
|
||||
logWarning("db", `database close failed: ${e.message}`);
|
||||
}
|
||||
currentDb = null;
|
||||
currentPath = null;
|
||||
currentPid = 0;
|
||||
_dbOpenAttempted = false;
|
||||
_sf.currentDb = null;
|
||||
_sf.currentPath = null;
|
||||
_sf.currentPid = 0;
|
||||
_sf._dbOpenAttempted = false;
|
||||
}
|
||||
}
|
||||
/** Run a full VACUUM — call sparingly (e.g. after milestone completion). */
|
||||
|
|
@ -444,9 +461,9 @@ export function closeDatabase() {
|
|||
* Vacuum the database to reclaim disk space and optimize.
|
||||
*/
|
||||
export function vacuumDatabase() {
|
||||
if (!currentDb) return;
|
||||
if (!_sf.currentDb) return;
|
||||
try {
|
||||
currentDb.exec("VACUUM");
|
||||
_sf.currentDb.exec("VACUUM");
|
||||
} catch (e) {
|
||||
logWarning("db", `VACUUM failed: ${e.message}`);
|
||||
}
|
||||
|
|
@ -456,7 +473,7 @@ let _txDepth = 0;
|
|||
* Execute a callback within a database transaction (BEGIN...COMMIT or ROLLBACK).
|
||||
*/
|
||||
export function transaction(fn) {
|
||||
if (!currentDb) throw new SFError(SF_STALE_STATE, "sf-db: No database open");
|
||||
if (!_sf.currentDb) throw new SFError(SF_STALE_STATE, "sf-db: No database open");
|
||||
// Re-entrant: if already inside a transaction, just run fn() without
|
||||
// starting a new one. SQLite does not support nested BEGIN/COMMIT.
|
||||
if (_txDepth > 0) {
|
||||
|
|
@ -468,13 +485,13 @@ export function transaction(fn) {
|
|||
}
|
||||
}
|
||||
_txDepth++;
|
||||
currentDb.exec("BEGIN");
|
||||
_sf.currentDb.exec("BEGIN");
|
||||
try {
|
||||
const result = fn();
|
||||
currentDb.exec("COMMIT");
|
||||
_sf.currentDb.exec("COMMIT");
|
||||
return result;
|
||||
} catch (err) {
|
||||
currentDb.exec("ROLLBACK");
|
||||
_sf.currentDb.exec("ROLLBACK");
|
||||
throw err;
|
||||
} finally {
|
||||
_txDepth--;
|
||||
|
|
@ -491,7 +508,7 @@ export function transaction(fn) {
|
|||
* Execute a callback within a read-only database transaction.
|
||||
*/
|
||||
export function readTransaction(fn) {
|
||||
if (!currentDb) throw new SFError(SF_STALE_STATE, "sf-db: No database open");
|
||||
if (!_sf.currentDb) throw new SFError(SF_STALE_STATE, "sf-db: No database open");
|
||||
if (_txDepth > 0) {
|
||||
_txDepth++;
|
||||
try {
|
||||
|
|
@ -501,14 +518,14 @@ export function readTransaction(fn) {
|
|||
}
|
||||
}
|
||||
_txDepth++;
|
||||
currentDb.exec("BEGIN DEFERRED");
|
||||
_sf.currentDb.exec("BEGIN DEFERRED");
|
||||
try {
|
||||
const result = fn();
|
||||
currentDb.exec("COMMIT");
|
||||
_sf.currentDb.exec("COMMIT");
|
||||
return result;
|
||||
} catch (err) {
|
||||
try {
|
||||
currentDb.exec("ROLLBACK");
|
||||
_sf.currentDb.exec("ROLLBACK");
|
||||
} catch (rollbackErr) {
|
||||
// A failed ROLLBACK after a failed read is a split-brain signal —
|
||||
// the transaction is in an indeterminate state. Surface it via the
|
||||
|
|
@ -523,14 +540,14 @@ export function readTransaction(fn) {
|
|||
}
|
||||
}
|
||||
export function getDbOwnerPid() {
|
||||
return currentPid;
|
||||
return _sf.currentPid;
|
||||
}
|
||||
export function getDbPath() {
|
||||
return currentPath;
|
||||
return _sf.currentPath;
|
||||
}
|
||||
|
||||
export function _getAdapter() {
|
||||
return currentDb;
|
||||
return _sf.currentDb;
|
||||
}
|
||||
export function _resetProvider() {
|
||||
loadAttempted = false;
|
||||
|
|
@ -538,7 +555,7 @@ export function _resetProvider() {
|
|||
|
||||
export function insertMilestoneSpecIfAbsent(milestoneId, planning = {}) {
|
||||
if (!hasPlanningPayload(planning)) return;
|
||||
const existing = currentDb
|
||||
const existing = _sf.currentDb
|
||||
.prepare("SELECT * FROM milestone_specs WHERE id = ?")
|
||||
.get(milestoneId);
|
||||
if (existing && !isEmptyMilestoneSpec(existing)) return;
|
||||
|
|
@ -565,7 +582,7 @@ export function insertMilestoneSpecIfAbsent(milestoneId, planning = {}) {
|
|||
};
|
||||
if (existing) {
|
||||
const { ":created_at": _createdAt, ...updateParams } = params;
|
||||
currentDb
|
||||
_sf.currentDb
|
||||
.prepare(`UPDATE milestone_specs SET
|
||||
vision = :vision,
|
||||
success_criteria = :success_criteria,
|
||||
|
|
@ -584,7 +601,7 @@ export function insertMilestoneSpecIfAbsent(milestoneId, planning = {}) {
|
|||
.run(updateParams);
|
||||
return;
|
||||
}
|
||||
currentDb
|
||||
_sf.currentDb
|
||||
.prepare(`INSERT OR IGNORE INTO milestone_specs (
|
||||
id, vision, success_criteria, key_risks, proof_strategy,
|
||||
verification_contract, verification_integration, verification_operational, verification_uat,
|
||||
|
|
@ -600,7 +617,7 @@ export function insertMilestoneSpecIfAbsent(milestoneId, planning = {}) {
|
|||
}
|
||||
|
||||
export function insertSliceSpecIfAbsent(milestoneId, sliceId, planning = {}) {
|
||||
currentDb
|
||||
_sf.currentDb
|
||||
.prepare(`INSERT OR IGNORE INTO slice_specs (
|
||||
milestone_id, slice_id, goal, success_criteria, proof_level,
|
||||
integration_closure, observability_impact,
|
||||
|
|
@ -681,7 +698,7 @@ export function insertTaskSpecIfAbsent(
|
|||
"sf-db:insertTaskSpec",
|
||||
`frontmatter validation errors for ${milestoneId}/${sliceId}/${taskId}: ${errors.join(", ")}`,
|
||||
);
|
||||
currentDb
|
||||
_sf.currentDb
|
||||
.prepare(`INSERT OR IGNORE INTO task_specs (
|
||||
milestone_id, slice_id, task_id, verify, inputs, expected_output,
|
||||
risk, mutation_scope, verification_type, plan_approval, estimated_effort,
|
||||
|
|
@ -715,9 +732,9 @@ export function insertTaskSpecIfAbsent(
|
|||
}
|
||||
|
||||
export function insertTaskSchedulerIfAbsent(milestoneId, sliceId, taskId) {
|
||||
const currentDb = _getAdapter();
|
||||
if (!currentDb) throw new SFError(SF_STALE_STATE, "sf-db: No database open");
|
||||
currentDb
|
||||
const localDb = _getAdapter();
|
||||
if (!localDb) throw new SFError(SF_STALE_STATE, "sf-db: No database open");
|
||||
localDb
|
||||
.prepare(`INSERT OR IGNORE INTO task_scheduler (
|
||||
milestone_id, slice_id, task_id, status, updated_at
|
||||
) VALUES (
|
||||
|
|
@ -1034,7 +1051,7 @@ export function rowToGate(row) {
|
|||
export function capErrorForStorage(error, runId) {
|
||||
if (!error || error.length <= MAX_ERROR_STORED_BYTES) return error;
|
||||
try {
|
||||
const errDir = join(dirname(currentPath), "runtime", "errors");
|
||||
const errDir = join(dirname(_sf.currentPath), "runtime", "errors");
|
||||
mkdirSync(errDir, { recursive: true });
|
||||
writeFileSync(join(errDir, `${runId}.txt`), error, "utf-8");
|
||||
} catch {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue