Merge pull request #4006 from jeremymcs/fix/workflow-events-toctou
fix: TOCTOU file locking race conditions in event log and custom workflow graph
This commit is contained in:
commit
de065094ea
4 changed files with 213 additions and 37 deletions
|
|
@ -34,6 +34,7 @@ import {
|
|||
import { injectContext } from "./context-injector.js";
|
||||
import type { WorkflowDefinition, StepDefinition } from "./definition-loader.js";
|
||||
import { parseUnitId } from "./unit-id.js";
|
||||
import { withFileLock } from "./file-lock.js";
|
||||
|
||||
/** Read and parse the frozen DEFINITION.yaml from a run directory. */
|
||||
export function readFrozenDefinition(runDir: string): WorkflowDefinition {
|
||||
|
|
@ -179,24 +180,28 @@ export class CustomWorkflowEngine implements WorkflowEngine {
|
|||
state: EngineState,
|
||||
completedStep: CompletedStep,
|
||||
): Promise<ReconcileResult> {
|
||||
// Re-read the graph from disk so we do not overwrite concurrent
|
||||
// workflow edits with a stale in-memory snapshot from deriveState().
|
||||
const graph = readGraph(this.runDir);
|
||||
const graphPath = join(this.runDir, "GRAPH.yaml");
|
||||
|
||||
// Extract stepId from "<workflowName>/<stepId>"
|
||||
const { milestone, slice, task } = parseUnitId(completedStep.unitId);
|
||||
const stepId = task ?? slice ?? milestone;
|
||||
return await withFileLock(graphPath, () => {
|
||||
// Re-read the graph from disk so we do not overwrite concurrent
|
||||
// workflow edits with a stale in-memory snapshot from deriveState().
|
||||
const graph = readGraph(this.runDir);
|
||||
|
||||
const updatedGraph = markStepComplete(graph, stepId);
|
||||
writeGraph(this.runDir, updatedGraph);
|
||||
// Extract stepId from "<workflowName>/<stepId>"
|
||||
const { milestone, slice, task } = parseUnitId(completedStep.unitId);
|
||||
const stepId = task ?? slice ?? milestone;
|
||||
|
||||
const allDone = updatedGraph.steps.every(
|
||||
(s) => s.status === "complete" || s.status === "expanded",
|
||||
);
|
||||
const updatedGraph = markStepComplete(graph, stepId);
|
||||
writeGraph(this.runDir, updatedGraph);
|
||||
|
||||
return {
|
||||
outcome: allDone ? "milestone-complete" : "continue",
|
||||
};
|
||||
const allDone = updatedGraph.steps.every(
|
||||
(s) => s.status === "complete" || s.status === "expanded",
|
||||
);
|
||||
|
||||
return {
|
||||
outcome: allDone ? "milestone-complete" : "continue",
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
59
src/resources/extensions/gsd/file-lock.ts
Normal file
59
src/resources/extensions/gsd/file-lock.ts
Normal file
|
|
@ -0,0 +1,59 @@
|
|||
import { existsSync } from "node:fs";
|
||||
|
||||
function _require(name: string) {
|
||||
try {
|
||||
return require(name);
|
||||
} catch {
|
||||
try {
|
||||
const gsdPiRequire = require("module").createRequire(
|
||||
require("path").join(process.cwd(), "node_modules", "gsd-pi", "index.js")
|
||||
);
|
||||
return gsdPiRequire(name);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function withFileLockSync<T>(filePath: string, fn: () => T): T {
|
||||
const lockfile = _require("proper-lockfile");
|
||||
if (!lockfile) return fn();
|
||||
|
||||
if (!existsSync(filePath)) return fn();
|
||||
|
||||
try {
|
||||
const release = lockfile.lockSync(filePath, { retries: 5, stale: 10000 });
|
||||
try {
|
||||
return fn();
|
||||
} finally {
|
||||
release();
|
||||
}
|
||||
} catch (err: any) {
|
||||
if (err.code === "ELOCKED") {
|
||||
// Could not get lock after retries, let's fallback to un-locked instead of crashing the whole state machine
|
||||
return fn();
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
export async function withFileLock<T>(filePath: string, fn: () => Promise<T> | T): Promise<T> {
|
||||
const lockfile = _require("proper-lockfile");
|
||||
if (!lockfile) return await fn();
|
||||
|
||||
if (!existsSync(filePath)) return await fn();
|
||||
|
||||
try {
|
||||
const release = await lockfile.lock(filePath, { retries: 5, stale: 10000 });
|
||||
try {
|
||||
return await fn();
|
||||
} finally {
|
||||
await release();
|
||||
}
|
||||
} catch (err: any) {
|
||||
if (err.code === "ELOCKED") {
|
||||
return await fn();
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
103
src/resources/extensions/gsd/tests/file-lock.test.ts
Normal file
103
src/resources/extensions/gsd/tests/file-lock.test.ts
Normal file
|
|
@ -0,0 +1,103 @@
|
|||
import test from "node:test";
|
||||
import assert from "node:assert/strict";
|
||||
import { mkdtempSync, rmSync, writeFileSync } from "node:fs";
|
||||
import { createRequire } from "node:module";
|
||||
import { join } from "node:path";
|
||||
import { tmpdir } from "node:os";
|
||||
|
||||
import { withFileLock, withFileLockSync } from "../file-lock.ts";
|
||||
|
||||
const require = createRequire(import.meta.url);
|
||||
|
||||
function hasProperLockfile(): boolean {
|
||||
try {
|
||||
require("proper-lockfile");
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
test("withFileLockSync: executes callback when file does not exist", () => {
|
||||
const dir = mkdtempSync(join(tmpdir(), "gsd-file-lock-test-"));
|
||||
try {
|
||||
const missingPath = join(dir, "missing.txt");
|
||||
let called = 0;
|
||||
const result = withFileLockSync(missingPath, () => {
|
||||
called++;
|
||||
return "ok";
|
||||
});
|
||||
|
||||
assert.equal(result, "ok");
|
||||
assert.equal(called, 1, "callback should execute exactly once");
|
||||
} finally {
|
||||
rmSync(dir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
test("withFileLock: executes callback when file does not exist", async () => {
|
||||
const dir = mkdtempSync(join(tmpdir(), "gsd-file-lock-test-"));
|
||||
try {
|
||||
const missingPath = join(dir, "missing.txt");
|
||||
let called = 0;
|
||||
const result = await withFileLock(missingPath, async () => {
|
||||
called++;
|
||||
return "ok";
|
||||
});
|
||||
|
||||
assert.equal(result, "ok");
|
||||
assert.equal(called, 1, "callback should execute exactly once");
|
||||
} finally {
|
||||
rmSync(dir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
test("withFileLockSync: falls back to unlocked callback on ELOCKED", () => {
|
||||
if (!hasProperLockfile() || process.platform === "win32") {
|
||||
return;
|
||||
}
|
||||
|
||||
const lockfile = require("proper-lockfile");
|
||||
const dir = mkdtempSync(join(tmpdir(), "gsd-file-lock-test-"));
|
||||
const filePath = join(dir, "locked.jsonl");
|
||||
writeFileSync(filePath, "{}\n", "utf-8");
|
||||
|
||||
const release = lockfile.lockSync(filePath, { retries: 0, stale: 10000 });
|
||||
try {
|
||||
let called = 0;
|
||||
const result = withFileLockSync(filePath, () => {
|
||||
called++;
|
||||
return "fallback-ok";
|
||||
});
|
||||
assert.equal(result, "fallback-ok");
|
||||
assert.equal(called, 1, "callback should run even when lock acquisition fails");
|
||||
} finally {
|
||||
release();
|
||||
rmSync(dir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
test("withFileLock: falls back to unlocked callback on ELOCKED", async () => {
|
||||
if (!hasProperLockfile() || process.platform === "win32") {
|
||||
return;
|
||||
}
|
||||
|
||||
const lockfile = require("proper-lockfile");
|
||||
const dir = mkdtempSync(join(tmpdir(), "gsd-file-lock-test-"));
|
||||
const filePath = join(dir, "locked.jsonl");
|
||||
writeFileSync(filePath, "{}\n", "utf-8");
|
||||
|
||||
const release = await lockfile.lock(filePath, { retries: 0, stale: 10000 });
|
||||
try {
|
||||
let called = 0;
|
||||
const result = await withFileLock(filePath, async () => {
|
||||
called++;
|
||||
return "fallback-ok";
|
||||
});
|
||||
assert.equal(result, "fallback-ok");
|
||||
assert.equal(called, 1, "callback should run even when lock acquisition fails");
|
||||
} finally {
|
||||
await release();
|
||||
rmSync(dir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
|
@ -2,6 +2,7 @@ import { createHash, randomUUID } from "node:crypto";
|
|||
import { appendFileSync, readFileSync, existsSync, mkdirSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
import { atomicWriteSync } from "./atomic-write.js";
|
||||
import { withFileLockSync } from "./file-lock.js";
|
||||
import { logWarning } from "./workflow-logger.js";
|
||||
|
||||
// ─── Session ID ───────────────────────────────────────────────────────────
|
||||
|
|
@ -127,31 +128,39 @@ export function compactMilestoneEvents(
|
|||
const logPath = join(basePath, ".gsd", "event-log.jsonl");
|
||||
const archivePath = join(basePath, ".gsd", `event-log-${milestoneId}.jsonl.archived`);
|
||||
|
||||
const allEvents = readEvents(logPath);
|
||||
const toArchive = allEvents.filter(
|
||||
(e) => (e.params as { milestoneId?: string }).milestoneId === milestoneId,
|
||||
);
|
||||
const remaining = allEvents.filter(
|
||||
(e) => (e.params as { milestoneId?: string }).milestoneId !== milestoneId,
|
||||
);
|
||||
return withFileLockSync(logPath, () => {
|
||||
const allEvents = readEvents(logPath);
|
||||
|
||||
// Single-pass partition to halve the work (per reviewer agent)
|
||||
const toArchive: WorkflowEvent[] = [];
|
||||
const remaining: WorkflowEvent[] = [];
|
||||
|
||||
for (const e of allEvents) {
|
||||
if ((e.params as { milestoneId?: string }).milestoneId === milestoneId) {
|
||||
toArchive.push(e);
|
||||
} else {
|
||||
remaining.push(e);
|
||||
}
|
||||
}
|
||||
|
||||
if (toArchive.length === 0) {
|
||||
return { archived: 0 };
|
||||
}
|
||||
if (toArchive.length === 0) {
|
||||
return { archived: 0 };
|
||||
}
|
||||
|
||||
// Write archived events to .jsonl.archived file (crash-safe)
|
||||
atomicWriteSync(
|
||||
archivePath,
|
||||
toArchive.map((e) => JSON.stringify(e)).join("\n") + "\n",
|
||||
);
|
||||
// Write archived events to .jsonl.archived file (crash-safe)
|
||||
atomicWriteSync(
|
||||
archivePath,
|
||||
toArchive.map((e) => JSON.stringify(e)).join("\n") + "\n",
|
||||
);
|
||||
|
||||
// Truncate active log to remaining events only
|
||||
atomicWriteSync(
|
||||
logPath,
|
||||
remaining.length > 0
|
||||
? remaining.map((e) => JSON.stringify(e)).join("\n") + "\n"
|
||||
: "",
|
||||
);
|
||||
// Truncate active log to remaining events only
|
||||
atomicWriteSync(
|
||||
logPath,
|
||||
remaining.length > 0
|
||||
? remaining.map((e) => JSON.stringify(e)).join("\n") + "\n"
|
||||
: "",
|
||||
);
|
||||
|
||||
return { archived: toArchive.length };
|
||||
return { archived: toArchive.length };
|
||||
});
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue