fix(gsd): prevent stale workflow reconcile state writes

This commit is contained in:
Jeremy 2026-04-08 20:07:46 -05:00
parent 477bf3c3fd
commit bb4d708b80
8 changed files with 312 additions and 48 deletions

View file

@ -379,40 +379,8 @@ export const DISPATCH_RULES: DispatchRule[] = [
},
},
{
name: "planning (no research, not S01) → research-slice",
match: async ({ state, mid, midTitle, basePath, prefs }) => {
if (state.phase !== "planning") return null;
// Phase skip: skip research when preference or profile says so
if (prefs?.phases?.skip_research || prefs?.phases?.skip_slice_research)
return null;
if (!state.activeSlice) return missingSliceStop(mid, state.phase);
const sid = state.activeSlice!.id;
const sTitle = state.activeSlice!.title;
const researchFile = resolveSliceFile(basePath, mid, sid, "RESEARCH");
if (researchFile) return null; // has research, fall through
// Skip slice research for S01 when milestone research already exists —
// the milestone research already covers the same ground for the first slice.
const milestoneResearchFile = resolveMilestoneFile(
basePath,
mid,
"RESEARCH",
);
if (milestoneResearchFile && sid === "S01") return null; // fall through to plan-slice
return {
action: "dispatch",
unitType: "research-slice",
unitId: `${mid}/${sid}`,
prompt: await buildResearchSlicePrompt(
mid,
midTitle,
sid,
sTitle,
basePath,
),
};
},
},
{
// Keep this rule before the single-slice research rule so the multi-slice
// path wins whenever 2+ slices are ready.
name: "planning (multiple slices need research) → parallel-research-slices",
match: async ({ state, mid, midTitle, basePath, prefs }) => {
if (state.phase !== "planning") return null;
@ -459,6 +427,40 @@ export const DISPATCH_RULES: DispatchRule[] = [
};
},
},
{
name: "planning (no research, not S01) → research-slice",
match: async ({ state, mid, midTitle, basePath, prefs }) => {
if (state.phase !== "planning") return null;
// Phase skip: skip research when preference or profile says so
if (prefs?.phases?.skip_research || prefs?.phases?.skip_slice_research)
return null;
if (!state.activeSlice) return missingSliceStop(mid, state.phase);
const sid = state.activeSlice!.id;
const sTitle = state.activeSlice!.title;
const researchFile = resolveSliceFile(basePath, mid, sid, "RESEARCH");
if (researchFile) return null; // has research, fall through
// Skip slice research for S01 when milestone research already exists —
// the milestone research already covers the same ground for the first slice.
const milestoneResearchFile = resolveMilestoneFile(
basePath,
mid,
"RESEARCH",
);
if (milestoneResearchFile && sid === "S01") return null; // fall through to plan-slice
return {
action: "dispatch",
unitType: "research-slice",
unitId: `${mid}/${sid}`,
prompt: await buildResearchSlicePrompt(
mid,
midTitle,
sid,
sTitle,
basePath,
),
};
},
},
{
name: "planning → plan-slice",
match: async ({ state, mid, midTitle, basePath }) => {
@ -883,4 +885,3 @@ export async function resolveDispatch(
export function getDispatchRuleNames(): string[] {
return DISPATCH_RULES.map((r) => r.name);
}

View file

@ -194,7 +194,7 @@ export async function autoLoop(
// Verification passed — mark step complete
debugLog("autoLoop", { phase: "custom-engine-reconcile", iteration, unitId: iterData.unitId });
await engine.reconcile(engineState, {
const reconcileResult = await engine.reconcile(engineState, {
unitType: iterData.unitType,
unitId: iterData.unitId,
startedAt: s.currentUnit?.startedAt ?? Date.now(),
@ -206,6 +206,19 @@ export async function autoLoop(
recentErrorMessages.length = 0;
deps.emitJournalEvent({ ts: new Date().toISOString(), flowId, seq: nextSeq(), eventType: "iteration-end", data: { iteration } });
debugLog("autoLoop", { phase: "iteration-complete", iteration });
if (reconcileResult.outcome === "milestone-complete") {
await deps.stopAuto(ctx, pi, "Workflow complete");
break;
}
if (reconcileResult.outcome === "pause") {
await deps.pauseAuto(ctx, pi);
break;
}
if (reconcileResult.outcome === "stop") {
await deps.stopAuto(ctx, pi, reconcileResult.reason ?? "Engine stopped");
break;
}
continue;
}

View file

@ -179,7 +179,9 @@ export class CustomWorkflowEngine implements WorkflowEngine {
state: EngineState,
completedStep: CompletedStep,
): Promise<ReconcileResult> {
const graph = state.raw as WorkflowGraph;
// Re-read the graph from disk so we do not overwrite concurrent
// workflow edits with a stale in-memory snapshot from deriveState().
const graph = readGraph(this.runDir);
// Extract stepId from "<workflowName>/<stepId>"
const { milestone, slice, task } = parseUnitId(completedStep.unitId);

View file

@ -178,7 +178,7 @@ function makeMockDeps(overrides?: Partial<LoopDeps>): LoopDeps & { callLog: stri
getCurrentBranch: () => "main",
autoWorktreeBranch: () => "auto/M001",
resolveMilestoneFile: () => null,
reconcileMergeState: () => false,
reconcileMergeState: () => "clean",
getLedger: () => null,
getProjectTotals: () => ({ cost: 0 }),
formatCost: (c: number) => `$${c.toFixed(2)}`,
@ -311,6 +311,12 @@ describe("Custom engine loop integration", () => {
`stopAuto reason should include "Workflow complete", got: ${stopEntry}`,
);
assert.equal(
deps.callLog.filter((e: string) => e === "deriveState").length,
3,
"custom engine should stop immediately after a milestone-complete reconcile",
);
// Verify dev path was NOT used (resolveDispatch should not appear)
assert.ok(
!deps.callLog.includes("resolveDispatch"),

View file

@ -249,6 +249,37 @@ describe("CustomWorkflowEngine.reconcile", () => {
const graph = readGraph(runDir);
assert.equal(graph.steps[0].status, "complete");
});
it("re-reads GRAPH.yaml before reconcile so concurrent edits are preserved", async () => {
const { engine, runDir } = setupEngine([
makeStep({ id: "step-1" }),
makeStep({ id: "step-2", dependsOn: ["step-1"] }),
], "wf");
const staleState = await engine.deriveState("/unused");
// Simulate another process appending a new step after deriveState() ran.
writeGraph(runDir, makeGraph([
makeStep({ id: "step-1" }),
makeStep({ id: "step-2", dependsOn: ["step-1"] }),
makeStep({ id: "step-3", dependsOn: ["step-2"] }),
], "wf"));
const result = await engine.reconcile(staleState, {
unitType: "custom-step",
unitId: "wf/step-1",
startedAt: Date.now() - 1000,
finishedAt: Date.now(),
});
assert.equal(result.outcome, "continue");
const graph = readGraph(runDir);
assert.equal(graph.steps.length, 3, "reconcile should preserve the concurrent graph edit");
assert.equal(graph.steps[0].status, "complete");
assert.equal(graph.steps[1].status, "pending");
assert.equal(graph.steps[2].status, "pending");
});
});
// ─── getDisplayMetadata ──────────────────────────────────────────────────

View file

@ -4,12 +4,15 @@
* Verifies the dispatch rule and prompt builder exist with correct structure.
*/
import test from "node:test";
import test, { afterEach } from "node:test";
import assert from "node:assert/strict";
import { readFileSync } from "node:fs";
import { mkdtempSync, mkdirSync, readFileSync, rmSync, writeFileSync } from "node:fs";
import { join, dirname } from "node:path";
import { tmpdir } from "node:os";
import { fileURLToPath } from "node:url";
import { resolveDispatch } from "../auto-dispatch.ts";
const __dirname = dirname(fileURLToPath(import.meta.url));
const dispatchSrc = readFileSync(join(__dirname, "..", "auto-dispatch.ts"), "utf-8");
@ -17,6 +20,47 @@ const promptsSrc = readFileSync(join(__dirname, "..", "auto-prompts.ts"), "utf-8
const templatePath = join(__dirname, "..", "prompts", "parallel-research-slices.md");
const templateSrc = readFileSync(templatePath, "utf-8");
const tmpDirs: string[] = [];
function makeTmpProject(): string {
const base = mkdtempSync(join(tmpdir(), "parallel-research-"));
tmpDirs.push(base);
const milestoneDir = join(base, ".gsd", "milestones", "M001");
mkdirSync(milestoneDir, { recursive: true });
writeFileSync(
join(milestoneDir, "M001-ROADMAP.md"),
[
"# M001: Parallel Research Milestone",
"",
"**Vision:** Research-ready slices.",
"",
"**Success Criteria:**",
"- Research both slices",
"",
"## Slices",
"",
"- [ ] **S01: Alpha** `risk:low` `depends:[]`",
"- [ ] **S02: Beta** `risk:low` `depends:[]`",
"",
"## Boundary Map",
"",
].join("\n"),
"utf-8",
);
return base;
}
afterEach(() => {
for (const dir of tmpDirs) {
try {
rmSync(dir, { recursive: true, force: true });
} catch {
// Best-effort cleanup only.
}
}
tmpDirs.length = 0;
});
// ─── Dispatch rule ────────────────────────────────────────────────────────
test("dispatch: parallel-research-slices rule exists", () => {
@ -75,3 +119,28 @@ test("template: validate-milestone uses parallel reviewers", () => {
"validate-milestone should dispatch 3 parallel reviewers",
);
});
test("resolveDispatch prefers parallel research when multiple slices are ready", async () => {
const base = makeTmpProject();
const action = await resolveDispatch({
basePath: base,
mid: "M001",
midTitle: "Parallel Research Milestone",
state: {
phase: "planning",
activeMilestone: { id: "M001", title: "Parallel Research Milestone", status: "active" },
activeSlice: { id: "S01", title: "Alpha" },
activeTask: null,
registry: [],
blockers: [],
} as any,
prefs: undefined,
});
assert.equal(action.action, "dispatch");
if (action.action === "dispatch") {
assert.equal(action.unitType, "research-slice");
assert.equal(action.unitId, "M001/parallel-research");
}
});

View file

@ -0,0 +1,91 @@
import test, { afterEach } from "node:test";
import assert from "node:assert/strict";
import { mkdtempSync, mkdirSync, rmSync, existsSync } from "node:fs";
import { join } from "node:path";
import { tmpdir } from "node:os";
import { appendEvent, readEvents } from "../workflow-events.ts";
import { listConflicts, reconcileWorktreeLogs, resolveConflict } from "../workflow-reconcile.ts";
import { closeDatabase } from "../gsd-db.ts";
const tmpDirs: string[] = [];
function makeTmpRepo(): { main: string; worktree: string } {
const root = mkdtempSync(join(tmpdir(), "workflow-reconcile-"));
const main = join(root, "main");
const worktree = join(root, "worktree");
mkdirSync(main, { recursive: true });
mkdirSync(worktree, { recursive: true });
tmpDirs.push(root);
return { main, worktree };
}
afterEach(() => {
closeDatabase();
for (const dir of tmpDirs) {
try {
rmSync(dir, { recursive: true, force: true });
} catch {
// Best-effort cleanup on platforms that keep files open briefly.
}
}
tmpDirs.length = 0;
});
test("resolveConflict(pick=main) rewrites the worktree log durably", () => {
const { main, worktree } = makeTmpRepo();
appendEvent(main, {
cmd: "plan_milestone",
params: { milestoneId: "M001", title: "Base Milestone" },
ts: "2026-01-01T00:00:00.000Z",
actor: "agent",
});
appendEvent(worktree, {
cmd: "plan_milestone",
params: { milestoneId: "M001", title: "Base Milestone" },
ts: "2026-01-01T00:00:00.000Z",
actor: "agent",
});
appendEvent(main, {
cmd: "plan_milestone",
params: { milestoneId: "M001", title: "Main Choice" },
ts: "2026-01-01T00:01:00.000Z",
actor: "agent",
});
appendEvent(worktree, {
cmd: "plan_milestone",
params: { milestoneId: "M001", title: "Worktree Choice" },
ts: "2026-01-01T00:01:00.000Z",
actor: "agent",
});
const initial = reconcileWorktreeLogs(main, worktree);
assert.equal(initial.conflicts.length, 1, "expected one conflict before resolution");
assert.ok(listConflicts(main).length === 1, "CONFLICTS.md should exist after detection");
resolveConflict(main, worktree, "milestone:M001", "main");
assert.equal(listConflicts(main).length, 0, "conflict file should be cleared after resolving main");
const conflictsPath = join(main, ".gsd", "CONFLICTS.md");
assert.equal(
existsSync(conflictsPath),
false,
"CONFLICTS.md should be removed after the last conflict is resolved",
);
const wtEvents = readEvents(join(worktree, ".gsd", "event-log.jsonl"));
assert.ok(
wtEvents.some((e) => e.cmd === "plan_milestone" && e.params.title === "Main Choice"),
"worktree log should be rewritten to the main-side resolution",
);
assert.ok(
!wtEvents.some((e) => e.cmd === "plan_milestone" && e.params.title === "Worktree Choice"),
"worktree log should no longer contain the discarded conflict event",
);
const second = reconcileWorktreeLogs(main, worktree);
assert.equal(second.conflicts.length, 0, "reconcile should stay clean after choosing main");
});

View file

@ -1,7 +1,7 @@
import { join } from "node:path";
import { mkdirSync, existsSync, readFileSync, unlinkSync } from "node:fs";
import { logWarning, logError } from "./workflow-logger.js";
import { readEvents, findForkPoint, appendEvent, getSessionId } from "./workflow-events.js";
import { readEvents, findForkPoint, getSessionId } from "./workflow-events.js";
import type { WorkflowEvent } from "./workflow-events.js";
import {
transaction,
@ -329,6 +329,41 @@ export function detectConflicts(
return conflicts;
}
function rewriteDivergedEventsForEntity(
divergedEvents: WorkflowEvent[],
entityType: string,
entityId: string,
replacementEvents: WorkflowEvent[],
): WorkflowEvent[] {
const rewritten: WorkflowEvent[] = [];
let inserted = false;
for (const event of divergedEvents) {
const key = extractEntityKey(event);
if (key?.type === entityType && key.id === entityId) {
if (!inserted) {
rewritten.push(...replacementEvents);
inserted = true;
}
continue;
}
rewritten.push(event);
}
if (!inserted) {
rewritten.push(...replacementEvents);
}
return rewritten;
}
function writeEventLog(basePath: string, events: WorkflowEvent[]): void {
const dir = join(basePath, ".gsd");
mkdirSync(dir, { recursive: true });
const content = events.map((e) => JSON.stringify(e)).join("\n") + (events.length > 0 ? "\n" : "");
atomicWriteSync(join(dir, "event-log.jsonl"), content);
}
// ─── writeConflictsFile ───────────────────────────────────────────────────────
/**
@ -575,8 +610,8 @@ function parseEventBlock(block: string): WorkflowEvent[] {
/**
* Resolve a single conflict by picking one side's events.
* Replays the picked events through the DB helpers, appends them to the event log,
* and updates or removes CONFLICTS.md.
* Replays the picked events through the DB helpers, rewrites the chosen side's
* event log so the conflict is durable, and updates or removes CONFLICTS.md.
*
* When the last conflict is resolved, non-conflicting events from both sides
* are also replayed (they were blocked by the all-or-nothing D-04 rule).
@ -598,14 +633,30 @@ export function resolveConflict(
const conflict = conflicts[idx]!;
const eventsToReplay = pick === "main" ? conflict.mainSideEvents : conflict.worktreeSideEvents;
const mainLogPath = join(basePath, ".gsd", "event-log.jsonl");
const wtLogPath = join(worktreeBasePath, ".gsd", "event-log.jsonl");
const mainEvents = readEvents(mainLogPath);
const wtEvents = readEvents(wtLogPath);
const forkPoint = findForkPoint(mainEvents, wtEvents);
const mainBaseEvents = mainEvents.slice(0, forkPoint + 1);
const wtBaseEvents = wtEvents.slice(0, forkPoint + 1);
const mainDiverged = mainEvents.slice(forkPoint + 1);
const wtDiverged = wtEvents.slice(forkPoint + 1);
const rewrittenTargetEvents = pick === "main"
? rewriteDivergedEventsForEntity(wtDiverged, entityType, entityId, eventsToReplay)
: rewriteDivergedEventsForEntity(mainDiverged, entityType, entityId, eventsToReplay);
const targetBasePath = pick === "main" ? worktreeBasePath : basePath;
const targetBaseEvents = pick === "main" ? wtBaseEvents : mainBaseEvents;
writeEventLog(targetBasePath, targetBaseEvents.concat(rewrittenTargetEvents));
// Replay resolved events through the DB (updates DB state)
openDatabase(join(basePath, ".gsd", "gsd.db"));
replayEvents(eventsToReplay);
// Append resolved events to the event log
for (const event of eventsToReplay) {
appendEvent(basePath, { cmd: event.cmd, params: event.params, ts: event.ts, actor: event.actor });
}
invalidateStateCache();
clearPathCache();
clearParseCache();
// Remove resolved conflict from list
conflicts.splice(idx, 1);