singularity-forge/src/resources/extensions/sf/uok/writer.js

157 lines
4.5 KiB
JavaScript

import { randomUUID } from "node:crypto";
import { existsSync } from "node:fs";
import { join } from "node:path";
import { atomicWriteSync } from "../atomic-write.js";
import { sfRoot } from "../paths.js";
const activeTokens = new Map();
const TOKEN_TTL_MS = 2 * 60 * 60 * 1000; // 2 hours — autonomous turns can run 20-30+ minutes
function tokenKey(basePath, turnId) {
return `${basePath}:${turnId}`;
}
function tokensPath(basePath) {
return join(sfRoot(basePath), "runtime", "uok-writer-tokens.json");
}
function readTokensState(basePath) {
const path = tokensPath(basePath);
if (!existsSync(path)) return {};
try {
return JSON.parse(readFileSync(path, "utf-8"));
} catch {
return {};
}
}
function writeTokensState(basePath, state) {
atomicWriteSync(
tokensPath(basePath),
JSON.stringify(state, null, 2) + "\n",
"utf-8",
);
}
function isTokenExpired(token) {
if (!token?.acquiredAt) return true;
const acquired = Date.parse(token.acquiredAt);
if (Number.isNaN(acquired)) return true;
return Date.now() - acquired > TOKEN_TTL_MS;
}
function sequencePath(basePath) {
return join(sfRoot(basePath), "runtime", "uok-writer-sequence.json");
}
function readSequenceState(basePath) {
const path = sequencePath(basePath);
if (!existsSync(path)) {
return { lastSequence: 0, updatedAt: new Date(0).toISOString() };
}
try {
const parsed = JSON.parse(readFileSync(path, "utf-8"));
return {
lastSequence: Number.isInteger(parsed.lastSequence)
? Number(parsed.lastSequence)
: 0,
updatedAt:
typeof parsed.updatedAt === "string"
? parsed.updatedAt
: new Date(0).toISOString(),
};
} catch {
return { lastSequence: 0, updatedAt: new Date(0).toISOString() };
}
}
function writeSequenceState(basePath, state) {
atomicWriteSync(
sequencePath(basePath),
JSON.stringify(state, null, 2) + "\n",
"utf-8",
);
}
export function acquireWriterToken(args) {
const key = tokenKey(args.basePath, args.turnId);
const existing = activeTokens.get(key);
if (existing && !isTokenExpired(existing)) {
throw new Error(`Writer token already active for turn ${args.turnId}`);
}
// Clean up expired tokens from disk
const diskTokens = readTokensState(args.basePath);
for (const [k, token] of Object.entries(diskTokens)) {
if (isTokenExpired(token)) delete diskTokens[k];
}
const token = {
tokenId: randomUUID(),
traceId: args.traceId,
turnId: args.turnId,
acquiredAt: new Date().toISOString(),
owner: args.owner ?? "uok",
};
activeTokens.set(key, token);
diskTokens[key] = token;
writeTokensState(args.basePath, diskTokens);
return token;
}
export function releaseWriterToken(basePath, token) {
const key = tokenKey(basePath, token.turnId);
const current = activeTokens.get(key);
if (current?.tokenId === token.tokenId) {
activeTokens.delete(key);
}
// Also remove from disk
const diskTokens = readTokensState(basePath);
if (diskTokens[key]?.tokenId === token.tokenId) {
delete diskTokens[key];
writeTokensState(basePath, diskTokens);
}
}
export function hasActiveWriterToken(basePath, turnId) {
const key = tokenKey(basePath, turnId);
if (activeTokens.has(key)) {
const token = activeTokens.get(key);
if (!isTokenExpired(token)) return true;
activeTokens.delete(key);
}
// Check disk for tokens from crashed processes
const diskTokens = readTokensState(basePath);
const diskToken = diskTokens[key];
if (diskToken && !isTokenExpired(diskToken)) {
activeTokens.set(key, diskToken);
return true;
}
return false;
}
export function nextWriteRecord(args) {
if (!hasActiveWriterToken(args.basePath, args.token.turnId)) {
throw new Error(`Writer token is not active for turn ${args.token.turnId}`);
}
const state = readSequenceState(args.basePath);
const sequence = state.lastSequence + 1;
const updatedAt = new Date().toISOString();
writeSequenceState(args.basePath, { lastSequence: sequence, updatedAt });
return {
writerToken: args.token,
sequence: {
traceId: args.token.traceId,
turnId: args.token.turnId,
sequence,
},
category: args.category,
operation: args.operation,
path: args.path,
ts: updatedAt,
metadata: args.metadata,
};
}
export function resetWriterTokensForTests() {
activeTokens.clear();
}
export function clearExpiredWriterTokens(basePath) {
const diskTokens = readTokensState(basePath);
let changed = false;
for (const [k, token] of Object.entries(diskTokens)) {
if (isTokenExpired(token)) {
delete diskTokens[k];
changed = true;
}
}
if (changed) writeTokensState(basePath, diskTokens);
for (const [k, token] of activeTokens) {
if (isTokenExpired(token)) activeTokens.delete(k);
}
}