Merge pull request #3836 from jeremymcs/fix/workflow-custom-engine-safety
fix: prevent stale workflow reconcile state writes
This commit is contained in:
commit
468d8995ae
8 changed files with 311 additions and 47 deletions
|
|
@ -379,40 +379,8 @@ export const DISPATCH_RULES: DispatchRule[] = [
|
|||
},
|
||||
},
|
||||
{
|
||||
name: "planning (no research, not S01) → research-slice",
|
||||
match: async ({ state, mid, midTitle, basePath, prefs }) => {
|
||||
if (state.phase !== "planning") return null;
|
||||
// Phase skip: skip research when preference or profile says so
|
||||
if (prefs?.phases?.skip_research || prefs?.phases?.skip_slice_research)
|
||||
return null;
|
||||
if (!state.activeSlice) return missingSliceStop(mid, state.phase);
|
||||
const sid = state.activeSlice!.id;
|
||||
const sTitle = state.activeSlice!.title;
|
||||
const researchFile = resolveSliceFile(basePath, mid, sid, "RESEARCH");
|
||||
if (researchFile) return null; // has research, fall through
|
||||
// Skip slice research for S01 when milestone research already exists —
|
||||
// the milestone research already covers the same ground for the first slice.
|
||||
const milestoneResearchFile = resolveMilestoneFile(
|
||||
basePath,
|
||||
mid,
|
||||
"RESEARCH",
|
||||
);
|
||||
if (milestoneResearchFile && sid === "S01") return null; // fall through to plan-slice
|
||||
return {
|
||||
action: "dispatch",
|
||||
unitType: "research-slice",
|
||||
unitId: `${mid}/${sid}`,
|
||||
prompt: await buildResearchSlicePrompt(
|
||||
mid,
|
||||
midTitle,
|
||||
sid,
|
||||
sTitle,
|
||||
basePath,
|
||||
),
|
||||
};
|
||||
},
|
||||
},
|
||||
{
|
||||
// Keep this rule before the single-slice research rule so the multi-slice
|
||||
// path wins whenever 2+ slices are ready.
|
||||
name: "planning (multiple slices need research) → parallel-research-slices",
|
||||
match: async ({ state, mid, midTitle, basePath, prefs }) => {
|
||||
if (state.phase !== "planning") return null;
|
||||
|
|
@ -459,6 +427,40 @@ export const DISPATCH_RULES: DispatchRule[] = [
|
|||
};
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "planning (no research, not S01) → research-slice",
|
||||
match: async ({ state, mid, midTitle, basePath, prefs }) => {
|
||||
if (state.phase !== "planning") return null;
|
||||
// Phase skip: skip research when preference or profile says so
|
||||
if (prefs?.phases?.skip_research || prefs?.phases?.skip_slice_research)
|
||||
return null;
|
||||
if (!state.activeSlice) return missingSliceStop(mid, state.phase);
|
||||
const sid = state.activeSlice!.id;
|
||||
const sTitle = state.activeSlice!.title;
|
||||
const researchFile = resolveSliceFile(basePath, mid, sid, "RESEARCH");
|
||||
if (researchFile) return null; // has research, fall through
|
||||
// Skip slice research for S01 when milestone research already exists —
|
||||
// the milestone research already covers the same ground for the first slice.
|
||||
const milestoneResearchFile = resolveMilestoneFile(
|
||||
basePath,
|
||||
mid,
|
||||
"RESEARCH",
|
||||
);
|
||||
if (milestoneResearchFile && sid === "S01") return null; // fall through to plan-slice
|
||||
return {
|
||||
action: "dispatch",
|
||||
unitType: "research-slice",
|
||||
unitId: `${mid}/${sid}`,
|
||||
prompt: await buildResearchSlicePrompt(
|
||||
mid,
|
||||
midTitle,
|
||||
sid,
|
||||
sTitle,
|
||||
basePath,
|
||||
),
|
||||
};
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "planning → plan-slice",
|
||||
match: async ({ state, mid, midTitle, basePath }) => {
|
||||
|
|
@ -883,4 +885,3 @@ export async function resolveDispatch(
|
|||
export function getDispatchRuleNames(): string[] {
|
||||
return DISPATCH_RULES.map((r) => r.name);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -194,7 +194,7 @@ export async function autoLoop(
|
|||
|
||||
// Verification passed — mark step complete
|
||||
debugLog("autoLoop", { phase: "custom-engine-reconcile", iteration, unitId: iterData.unitId });
|
||||
await engine.reconcile(engineState, {
|
||||
const reconcileResult = await engine.reconcile(engineState, {
|
||||
unitType: iterData.unitType,
|
||||
unitId: iterData.unitId,
|
||||
startedAt: s.currentUnit?.startedAt ?? Date.now(),
|
||||
|
|
@ -206,6 +206,19 @@ export async function autoLoop(
|
|||
recentErrorMessages.length = 0;
|
||||
deps.emitJournalEvent({ ts: new Date().toISOString(), flowId, seq: nextSeq(), eventType: "iteration-end", data: { iteration } });
|
||||
debugLog("autoLoop", { phase: "iteration-complete", iteration });
|
||||
|
||||
if (reconcileResult.outcome === "milestone-complete") {
|
||||
await deps.stopAuto(ctx, pi, "Workflow complete");
|
||||
break;
|
||||
}
|
||||
if (reconcileResult.outcome === "pause") {
|
||||
await deps.pauseAuto(ctx, pi);
|
||||
break;
|
||||
}
|
||||
if (reconcileResult.outcome === "stop") {
|
||||
await deps.stopAuto(ctx, pi, reconcileResult.reason ?? "Engine stopped");
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -179,7 +179,9 @@ export class CustomWorkflowEngine implements WorkflowEngine {
|
|||
state: EngineState,
|
||||
completedStep: CompletedStep,
|
||||
): Promise<ReconcileResult> {
|
||||
const graph = state.raw as WorkflowGraph;
|
||||
// Re-read the graph from disk so we do not overwrite concurrent
|
||||
// workflow edits with a stale in-memory snapshot from deriveState().
|
||||
const graph = readGraph(this.runDir);
|
||||
|
||||
// Extract stepId from "<workflowName>/<stepId>"
|
||||
const { milestone, slice, task } = parseUnitId(completedStep.unitId);
|
||||
|
|
|
|||
|
|
@ -311,6 +311,12 @@ describe("Custom engine loop integration", () => {
|
|||
`stopAuto reason should include "Workflow complete", got: ${stopEntry}`,
|
||||
);
|
||||
|
||||
assert.equal(
|
||||
deps.callLog.filter((e: string) => e === "deriveState").length,
|
||||
3,
|
||||
"custom engine should stop immediately after a milestone-complete reconcile",
|
||||
);
|
||||
|
||||
// Verify dev path was NOT used (resolveDispatch should not appear)
|
||||
assert.ok(
|
||||
!deps.callLog.includes("resolveDispatch"),
|
||||
|
|
|
|||
|
|
@ -249,6 +249,37 @@ describe("CustomWorkflowEngine.reconcile", () => {
|
|||
const graph = readGraph(runDir);
|
||||
assert.equal(graph.steps[0].status, "complete");
|
||||
});
|
||||
|
||||
it("re-reads GRAPH.yaml before reconcile so concurrent edits are preserved", async () => {
|
||||
const { engine, runDir } = setupEngine([
|
||||
makeStep({ id: "step-1" }),
|
||||
makeStep({ id: "step-2", dependsOn: ["step-1"] }),
|
||||
], "wf");
|
||||
|
||||
const staleState = await engine.deriveState("/unused");
|
||||
|
||||
// Simulate another process appending a new step after deriveState() ran.
|
||||
writeGraph(runDir, makeGraph([
|
||||
makeStep({ id: "step-1" }),
|
||||
makeStep({ id: "step-2", dependsOn: ["step-1"] }),
|
||||
makeStep({ id: "step-3", dependsOn: ["step-2"] }),
|
||||
], "wf"));
|
||||
|
||||
const result = await engine.reconcile(staleState, {
|
||||
unitType: "custom-step",
|
||||
unitId: "wf/step-1",
|
||||
startedAt: Date.now() - 1000,
|
||||
finishedAt: Date.now(),
|
||||
});
|
||||
|
||||
assert.equal(result.outcome, "continue");
|
||||
|
||||
const graph = readGraph(runDir);
|
||||
assert.equal(graph.steps.length, 3, "reconcile should preserve the concurrent graph edit");
|
||||
assert.equal(graph.steps[0].status, "complete");
|
||||
assert.equal(graph.steps[1].status, "pending");
|
||||
assert.equal(graph.steps[2].status, "pending");
|
||||
});
|
||||
});
|
||||
|
||||
// ─── getDisplayMetadata ──────────────────────────────────────────────────
|
||||
|
|
|
|||
|
|
@ -4,12 +4,15 @@
|
|||
* Verifies the dispatch rule and prompt builder exist with correct structure.
|
||||
*/
|
||||
|
||||
import test from "node:test";
|
||||
import test, { afterEach } from "node:test";
|
||||
import assert from "node:assert/strict";
|
||||
import { readFileSync } from "node:fs";
|
||||
import { mkdtempSync, mkdirSync, readFileSync, rmSync, writeFileSync } from "node:fs";
|
||||
import { join, dirname } from "node:path";
|
||||
import { tmpdir } from "node:os";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
import { resolveDispatch } from "../auto-dispatch.ts";
|
||||
|
||||
const __dirname = dirname(fileURLToPath(import.meta.url));
|
||||
|
||||
const dispatchSrc = readFileSync(join(__dirname, "..", "auto-dispatch.ts"), "utf-8");
|
||||
|
|
@ -17,6 +20,47 @@ const promptsSrc = readFileSync(join(__dirname, "..", "auto-prompts.ts"), "utf-8
|
|||
const templatePath = join(__dirname, "..", "prompts", "parallel-research-slices.md");
|
||||
const templateSrc = readFileSync(templatePath, "utf-8");
|
||||
|
||||
const tmpDirs: string[] = [];
|
||||
|
||||
function makeTmpProject(): string {
|
||||
const base = mkdtempSync(join(tmpdir(), "parallel-research-"));
|
||||
tmpDirs.push(base);
|
||||
const milestoneDir = join(base, ".gsd", "milestones", "M001");
|
||||
mkdirSync(milestoneDir, { recursive: true });
|
||||
writeFileSync(
|
||||
join(milestoneDir, "M001-ROADMAP.md"),
|
||||
[
|
||||
"# M001: Parallel Research Milestone",
|
||||
"",
|
||||
"**Vision:** Research-ready slices.",
|
||||
"",
|
||||
"**Success Criteria:**",
|
||||
"- Research both slices",
|
||||
"",
|
||||
"## Slices",
|
||||
"",
|
||||
"- [ ] **S01: Alpha** `risk:low` `depends:[]`",
|
||||
"- [ ] **S02: Beta** `risk:low` `depends:[]`",
|
||||
"",
|
||||
"## Boundary Map",
|
||||
"",
|
||||
].join("\n"),
|
||||
"utf-8",
|
||||
);
|
||||
return base;
|
||||
}
|
||||
|
||||
afterEach(() => {
|
||||
for (const dir of tmpDirs) {
|
||||
try {
|
||||
rmSync(dir, { recursive: true, force: true });
|
||||
} catch {
|
||||
// Best-effort cleanup only.
|
||||
}
|
||||
}
|
||||
tmpDirs.length = 0;
|
||||
});
|
||||
|
||||
// ─── Dispatch rule ────────────────────────────────────────────────────────
|
||||
|
||||
test("dispatch: parallel-research-slices rule exists", () => {
|
||||
|
|
@ -75,3 +119,28 @@ test("template: validate-milestone uses parallel reviewers", () => {
|
|||
"validate-milestone should dispatch 3 parallel reviewers",
|
||||
);
|
||||
});
|
||||
|
||||
test("resolveDispatch prefers parallel research when multiple slices are ready", async () => {
|
||||
const base = makeTmpProject();
|
||||
|
||||
const action = await resolveDispatch({
|
||||
basePath: base,
|
||||
mid: "M001",
|
||||
midTitle: "Parallel Research Milestone",
|
||||
state: {
|
||||
phase: "planning",
|
||||
activeMilestone: { id: "M001", title: "Parallel Research Milestone", status: "active" },
|
||||
activeSlice: { id: "S01", title: "Alpha" },
|
||||
activeTask: null,
|
||||
registry: [],
|
||||
blockers: [],
|
||||
} as any,
|
||||
prefs: undefined,
|
||||
});
|
||||
|
||||
assert.equal(action.action, "dispatch");
|
||||
if (action.action === "dispatch") {
|
||||
assert.equal(action.unitType, "research-slice");
|
||||
assert.equal(action.unitId, "M001/parallel-research");
|
||||
}
|
||||
});
|
||||
|
|
|
|||
|
|
@ -0,0 +1,91 @@
|
|||
import test, { afterEach } from "node:test";
|
||||
import assert from "node:assert/strict";
|
||||
import { mkdtempSync, mkdirSync, rmSync, existsSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
import { tmpdir } from "node:os";
|
||||
|
||||
import { appendEvent, readEvents } from "../workflow-events.ts";
|
||||
import { listConflicts, reconcileWorktreeLogs, resolveConflict } from "../workflow-reconcile.ts";
|
||||
import { closeDatabase } from "../gsd-db.ts";
|
||||
|
||||
const tmpDirs: string[] = [];
|
||||
|
||||
function makeTmpRepo(): { main: string; worktree: string } {
|
||||
const root = mkdtempSync(join(tmpdir(), "workflow-reconcile-"));
|
||||
const main = join(root, "main");
|
||||
const worktree = join(root, "worktree");
|
||||
mkdirSync(main, { recursive: true });
|
||||
mkdirSync(worktree, { recursive: true });
|
||||
tmpDirs.push(root);
|
||||
return { main, worktree };
|
||||
}
|
||||
|
||||
afterEach(() => {
|
||||
closeDatabase();
|
||||
for (const dir of tmpDirs) {
|
||||
try {
|
||||
rmSync(dir, { recursive: true, force: true });
|
||||
} catch {
|
||||
// Best-effort cleanup on platforms that keep files open briefly.
|
||||
}
|
||||
}
|
||||
tmpDirs.length = 0;
|
||||
});
|
||||
|
||||
test("resolveConflict(pick=main) rewrites the worktree log durably", () => {
|
||||
const { main, worktree } = makeTmpRepo();
|
||||
|
||||
appendEvent(main, {
|
||||
cmd: "plan_milestone",
|
||||
params: { milestoneId: "M001", title: "Base Milestone" },
|
||||
ts: "2026-01-01T00:00:00.000Z",
|
||||
actor: "agent",
|
||||
});
|
||||
appendEvent(worktree, {
|
||||
cmd: "plan_milestone",
|
||||
params: { milestoneId: "M001", title: "Base Milestone" },
|
||||
ts: "2026-01-01T00:00:00.000Z",
|
||||
actor: "agent",
|
||||
});
|
||||
|
||||
appendEvent(main, {
|
||||
cmd: "plan_milestone",
|
||||
params: { milestoneId: "M001", title: "Main Choice" },
|
||||
ts: "2026-01-01T00:01:00.000Z",
|
||||
actor: "agent",
|
||||
});
|
||||
|
||||
appendEvent(worktree, {
|
||||
cmd: "plan_milestone",
|
||||
params: { milestoneId: "M001", title: "Worktree Choice" },
|
||||
ts: "2026-01-01T00:01:00.000Z",
|
||||
actor: "agent",
|
||||
});
|
||||
|
||||
const initial = reconcileWorktreeLogs(main, worktree);
|
||||
assert.equal(initial.conflicts.length, 1, "expected one conflict before resolution");
|
||||
assert.ok(listConflicts(main).length === 1, "CONFLICTS.md should exist after detection");
|
||||
|
||||
resolveConflict(main, worktree, "milestone:M001", "main");
|
||||
|
||||
assert.equal(listConflicts(main).length, 0, "conflict file should be cleared after resolving main");
|
||||
const conflictsPath = join(main, ".gsd", "CONFLICTS.md");
|
||||
assert.equal(
|
||||
existsSync(conflictsPath),
|
||||
false,
|
||||
"CONFLICTS.md should be removed after the last conflict is resolved",
|
||||
);
|
||||
|
||||
const wtEvents = readEvents(join(worktree, ".gsd", "event-log.jsonl"));
|
||||
assert.ok(
|
||||
wtEvents.some((e) => e.cmd === "plan_milestone" && e.params.title === "Main Choice"),
|
||||
"worktree log should be rewritten to the main-side resolution",
|
||||
);
|
||||
assert.ok(
|
||||
!wtEvents.some((e) => e.cmd === "plan_milestone" && e.params.title === "Worktree Choice"),
|
||||
"worktree log should no longer contain the discarded conflict event",
|
||||
);
|
||||
|
||||
const second = reconcileWorktreeLogs(main, worktree);
|
||||
assert.equal(second.conflicts.length, 0, "reconcile should stay clean after choosing main");
|
||||
});
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
import { join } from "node:path";
|
||||
import { mkdirSync, existsSync, readFileSync, unlinkSync } from "node:fs";
|
||||
import { logWarning, logError } from "./workflow-logger.js";
|
||||
import { readEvents, findForkPoint, appendEvent, getSessionId } from "./workflow-events.js";
|
||||
import { readEvents, findForkPoint, getSessionId } from "./workflow-events.js";
|
||||
import type { WorkflowEvent } from "./workflow-events.js";
|
||||
import {
|
||||
transaction,
|
||||
|
|
@ -329,6 +329,41 @@ export function detectConflicts(
|
|||
return conflicts;
|
||||
}
|
||||
|
||||
function rewriteDivergedEventsForEntity(
|
||||
divergedEvents: WorkflowEvent[],
|
||||
entityType: string,
|
||||
entityId: string,
|
||||
replacementEvents: WorkflowEvent[],
|
||||
): WorkflowEvent[] {
|
||||
const rewritten: WorkflowEvent[] = [];
|
||||
let inserted = false;
|
||||
|
||||
for (const event of divergedEvents) {
|
||||
const key = extractEntityKey(event);
|
||||
if (key?.type === entityType && key.id === entityId) {
|
||||
if (!inserted) {
|
||||
rewritten.push(...replacementEvents);
|
||||
inserted = true;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
rewritten.push(event);
|
||||
}
|
||||
|
||||
if (!inserted) {
|
||||
rewritten.push(...replacementEvents);
|
||||
}
|
||||
|
||||
return rewritten;
|
||||
}
|
||||
|
||||
function writeEventLog(basePath: string, events: WorkflowEvent[]): void {
|
||||
const dir = join(basePath, ".gsd");
|
||||
mkdirSync(dir, { recursive: true });
|
||||
const content = events.map((e) => JSON.stringify(e)).join("\n") + (events.length > 0 ? "\n" : "");
|
||||
atomicWriteSync(join(dir, "event-log.jsonl"), content);
|
||||
}
|
||||
|
||||
// ─── writeConflictsFile ───────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
|
|
@ -575,8 +610,8 @@ function parseEventBlock(block: string): WorkflowEvent[] {
|
|||
|
||||
/**
|
||||
* Resolve a single conflict by picking one side's events.
|
||||
* Replays the picked events through the DB helpers, appends them to the event log,
|
||||
* and updates or removes CONFLICTS.md.
|
||||
* Replays the picked events through the DB helpers, rewrites the chosen side's
|
||||
* event log so the conflict is durable, and updates or removes CONFLICTS.md.
|
||||
*
|
||||
* When the last conflict is resolved, non-conflicting events from both sides
|
||||
* are also replayed (they were blocked by the all-or-nothing D-04 rule).
|
||||
|
|
@ -598,14 +633,30 @@ export function resolveConflict(
|
|||
const conflict = conflicts[idx]!;
|
||||
const eventsToReplay = pick === "main" ? conflict.mainSideEvents : conflict.worktreeSideEvents;
|
||||
|
||||
const mainLogPath = join(basePath, ".gsd", "event-log.jsonl");
|
||||
const wtLogPath = join(worktreeBasePath, ".gsd", "event-log.jsonl");
|
||||
const mainEvents = readEvents(mainLogPath);
|
||||
const wtEvents = readEvents(wtLogPath);
|
||||
const forkPoint = findForkPoint(mainEvents, wtEvents);
|
||||
const mainBaseEvents = mainEvents.slice(0, forkPoint + 1);
|
||||
const wtBaseEvents = wtEvents.slice(0, forkPoint + 1);
|
||||
const mainDiverged = mainEvents.slice(forkPoint + 1);
|
||||
const wtDiverged = wtEvents.slice(forkPoint + 1);
|
||||
|
||||
const rewrittenTargetEvents = pick === "main"
|
||||
? rewriteDivergedEventsForEntity(wtDiverged, entityType, entityId, eventsToReplay)
|
||||
: rewriteDivergedEventsForEntity(mainDiverged, entityType, entityId, eventsToReplay);
|
||||
|
||||
const targetBasePath = pick === "main" ? worktreeBasePath : basePath;
|
||||
const targetBaseEvents = pick === "main" ? wtBaseEvents : mainBaseEvents;
|
||||
writeEventLog(targetBasePath, targetBaseEvents.concat(rewrittenTargetEvents));
|
||||
|
||||
// Replay resolved events through the DB (updates DB state)
|
||||
openDatabase(join(basePath, ".gsd", "gsd.db"));
|
||||
replayEvents(eventsToReplay);
|
||||
|
||||
// Append resolved events to the event log
|
||||
for (const event of eventsToReplay) {
|
||||
appendEvent(basePath, { cmd: event.cmd, params: event.params, ts: event.ts, actor: event.actor });
|
||||
}
|
||||
invalidateStateCache();
|
||||
clearPathCache();
|
||||
clearParseCache();
|
||||
|
||||
// Remove resolved conflict from list
|
||||
conflicts.splice(idx, 1);
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue