fix(auto): prevent state machine deadlock when units fail to produce artifacts
Three fixes to the dispatch loop: 1. Don't mark a unit complete when the next dispatch is the same unit (retry scenario) — let the retry mechanism handle it instead of persisting a false completion. 2. Verify expected artifact exists on disk before marking a unit complete. Uses resolveExpectedArtifactPath + existsSync to gate persistCompletedKey calls. 3. Cross-validate idempotency: when skipping a "completed" unit, verify the artifact actually exists. If missing, remove the stale record from completed-units.json and re-run the unit. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
108f6b4f1d
commit
91cf23a634
1 changed files with 71 additions and 21 deletions
|
|
@ -93,6 +93,18 @@ function persistCompletedKey(base: string, key: string): void {
|
|||
}
|
||||
}
|
||||
|
||||
/** Remove a stale completed unit key from disk. */
|
||||
function removePersistedKey(base: string, key: string): void {
|
||||
const file = completedKeysPath(base);
|
||||
try {
|
||||
if (existsSync(file)) {
|
||||
let keys: string[] = JSON.parse(readFileSync(file, "utf-8"));
|
||||
keys = keys.filter(k => k !== key);
|
||||
writeFileSync(file, JSON.stringify(keys), "utf-8");
|
||||
}
|
||||
} catch { /* non-fatal */ }
|
||||
}
|
||||
|
||||
/** Load all completed unit keys from disk into the in-memory set. */
|
||||
function loadPersistedKeys(base: string, target: Set<string>): void {
|
||||
const file = completedKeysPath(base);
|
||||
|
|
@ -1190,15 +1202,27 @@ async function dispatchNextUnit(
|
|||
// Idempotency: skip units already completed in a prior session.
|
||||
const idempotencyKey = `${unitType}/${unitId}`;
|
||||
if (completedKeySet.has(idempotencyKey)) {
|
||||
ctx.ui.notify(
|
||||
`Skipping ${unitType} ${unitId} — already completed in a prior session. Advancing.`,
|
||||
"info",
|
||||
);
|
||||
// Yield to the event loop before re-dispatching to avoid tight recursion
|
||||
// when many units are already completed (e.g., after crash recovery).
|
||||
await new Promise(r => setImmediate(r));
|
||||
await dispatchNextUnit(ctx, pi);
|
||||
return;
|
||||
// Cross-validate: does the expected artifact actually exist?
|
||||
const artifactExists = verifyExpectedArtifact(unitType, unitId, basePath);
|
||||
if (artifactExists) {
|
||||
ctx.ui.notify(
|
||||
`Skipping ${unitType} ${unitId} — already completed in a prior session. Advancing.`,
|
||||
"info",
|
||||
);
|
||||
// Yield to the event loop before re-dispatching to avoid tight recursion
|
||||
// when many units are already completed (e.g., after crash recovery).
|
||||
await new Promise(r => setImmediate(r));
|
||||
await dispatchNextUnit(ctx, pi);
|
||||
return;
|
||||
} else {
|
||||
// Stale completion record — artifact missing. Remove and re-run.
|
||||
completedKeySet.delete(idempotencyKey);
|
||||
removePersistedKey(basePath, idempotencyKey);
|
||||
ctx.ui.notify(
|
||||
`Re-running ${unitType} ${unitId} — marked complete but expected artifact missing.`,
|
||||
"warning",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Stuck detection — tracks total dispatches per unit (not just consecutive repeats).
|
||||
|
|
@ -1234,20 +1258,26 @@ async function dispatchNextUnit(
|
|||
snapshotUnitMetrics(ctx, currentUnit.type, currentUnit.id, currentUnit.startedAt, modelId);
|
||||
saveActivityLog(ctx, basePath, currentUnit.type, currentUnit.id);
|
||||
|
||||
// Persist completion to disk BEFORE updating memory — so a crash here is recoverable.
|
||||
// Only mark the previous unit as completed if:
|
||||
// 1. We're not about to re-dispatch the same unit (retry scenario)
|
||||
// 2. The expected artifact actually exists on disk
|
||||
const closeoutKey = `${currentUnit.type}/${currentUnit.id}`;
|
||||
persistCompletedKey(basePath, closeoutKey);
|
||||
completedKeySet.add(closeoutKey);
|
||||
const incomingKey = `${unitType}/${unitId}`;
|
||||
const artifactVerified = verifyExpectedArtifact(currentUnit.type, currentUnit.id, basePath);
|
||||
if (closeoutKey !== incomingKey && artifactVerified) {
|
||||
persistCompletedKey(basePath, closeoutKey);
|
||||
completedKeySet.add(closeoutKey);
|
||||
|
||||
completedUnits.push({
|
||||
type: currentUnit.type,
|
||||
id: currentUnit.id,
|
||||
startedAt: currentUnit.startedAt,
|
||||
finishedAt: Date.now(),
|
||||
});
|
||||
clearUnitRuntimeRecord(basePath, currentUnit.type, currentUnit.id);
|
||||
unitDispatchCount.delete(`${currentUnit.type}/${currentUnit.id}`);
|
||||
unitRecoveryCount.delete(`${currentUnit.type}/${currentUnit.id}`);
|
||||
completedUnits.push({
|
||||
type: currentUnit.type,
|
||||
id: currentUnit.id,
|
||||
startedAt: currentUnit.startedAt,
|
||||
finishedAt: Date.now(),
|
||||
});
|
||||
clearUnitRuntimeRecord(basePath, currentUnit.type, currentUnit.id);
|
||||
unitDispatchCount.delete(`${currentUnit.type}/${currentUnit.id}`);
|
||||
unitRecoveryCount.delete(`${currentUnit.type}/${currentUnit.id}`);
|
||||
}
|
||||
}
|
||||
currentUnit = { type: unitType, id: unitId, startedAt: Date.now() };
|
||||
writeUnitRuntimeRecord(basePath, unitType, unitId, currentUnit.startedAt, {
|
||||
|
|
@ -2587,6 +2617,15 @@ export function resolveExpectedArtifactPath(unitType: string, unitId: string, ba
|
|||
const dir = resolveSlicePath(base, mid, sid!);
|
||||
return dir ? join(dir, buildSliceFileName(sid!, "UAT-RESULT")) : null;
|
||||
}
|
||||
case "execute-task": {
|
||||
const tid = parts[2];
|
||||
const dir = resolveSlicePath(base, mid, sid!);
|
||||
return dir && tid ? join(dir, "tasks", buildTaskFileName(tid, "SUMMARY")) : null;
|
||||
}
|
||||
case "complete-slice": {
|
||||
const dir = resolveSlicePath(base, mid, sid!);
|
||||
return dir ? join(dir, buildSliceFileName(sid!, "SUMMARY")) : null;
|
||||
}
|
||||
case "complete-milestone": {
|
||||
const dir = resolveMilestonePath(base, mid);
|
||||
return dir ? join(dir, buildMilestoneFileName(mid, "SUMMARY")) : null;
|
||||
|
|
@ -2596,6 +2635,17 @@ export function resolveExpectedArtifactPath(unitType: string, unitId: string, ba
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether the expected artifact for a unit exists on disk.
|
||||
* Returns true if the artifact file exists, or if the unit type has no
|
||||
* single verifiable artifact (e.g., replan-slice).
|
||||
*/
|
||||
function verifyExpectedArtifact(unitType: string, unitId: string, base: string): boolean {
|
||||
const absPath = resolveExpectedArtifactPath(unitType, unitId, base);
|
||||
if (!absPath) return true;
|
||||
return existsSync(absPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a placeholder artifact so the pipeline can advance past a stuck unit.
|
||||
* Returns the relative path written, or null if the path couldn't be resolved.
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue