fix: move unit closeout to run immediately after completion (#1612)
closeoutUnit() ran at the start of the next loop iteration, creating a window where a crash between runUnit() returning and the next iteration would lose all telemetry (metrics, activity log, memory extraction). completed-units.json was also never flushed to disk, causing severe staleness (3 entries for 322 completed units in production). Closes #1590 Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
426e0e839c
commit
dbf24145ab
1 changed files with 59 additions and 55 deletions
|
|
@ -26,6 +26,9 @@ import type {
|
|||
import type { DispatchAction } from "./auto-dispatch.js";
|
||||
import type { WorktreeResolver } from "./worktree-resolver.js";
|
||||
import { debugLog } from "./debug-logger.js";
|
||||
import { gsdRoot } from "./paths.js";
|
||||
import { atomicWriteSync } from "./atomic-write.js";
|
||||
import { join } from "node:path";
|
||||
import type { CmuxLogLevel } from "../cmux/index.js";
|
||||
|
||||
/**
|
||||
|
|
@ -1205,61 +1208,6 @@ export async function autoLoop(
|
|||
);
|
||||
const previousTier = s.currentUnitRouting?.tier;
|
||||
|
||||
// Closeout previous unit
|
||||
if (s.currentUnit) {
|
||||
await deps.closeoutUnit(
|
||||
ctx,
|
||||
s.basePath,
|
||||
s.currentUnit.type,
|
||||
s.currentUnit.id,
|
||||
s.currentUnit.startedAt,
|
||||
deps.buildSnapshotOpts(s.currentUnit.type, s.currentUnit.id),
|
||||
);
|
||||
|
||||
if (s.currentUnitRouting) {
|
||||
const isRetryForOutcome =
|
||||
s.currentUnit.type === unitType && s.currentUnit.id === unitId;
|
||||
deps.recordOutcome(
|
||||
s.currentUnit.type,
|
||||
s.currentUnitRouting.tier as "light" | "standard" | "heavy",
|
||||
!isRetryForOutcome,
|
||||
);
|
||||
}
|
||||
|
||||
const closeoutKey = `${s.currentUnit.type}/${s.currentUnit.id}`;
|
||||
const incomingKey = `${unitType}/${unitId}`;
|
||||
const isHookUnit = s.currentUnit.type.startsWith("hook/");
|
||||
const artifactVerified =
|
||||
isHookUnit ||
|
||||
deps.verifyExpectedArtifact(
|
||||
s.currentUnit.type,
|
||||
s.currentUnit.id,
|
||||
s.basePath,
|
||||
);
|
||||
if (closeoutKey !== incomingKey && artifactVerified) {
|
||||
s.completedUnits.push({
|
||||
type: s.currentUnit.type,
|
||||
id: s.currentUnit.id,
|
||||
startedAt: s.currentUnit.startedAt,
|
||||
finishedAt: Date.now(),
|
||||
});
|
||||
if (s.completedUnits.length > 200) {
|
||||
s.completedUnits = s.completedUnits.slice(-200);
|
||||
}
|
||||
deps.clearUnitRuntimeRecord(
|
||||
s.basePath,
|
||||
s.currentUnit.type,
|
||||
s.currentUnit.id,
|
||||
);
|
||||
s.unitDispatchCount.delete(
|
||||
`${s.currentUnit.type}/${s.currentUnit.id}`,
|
||||
);
|
||||
s.unitRecoveryCount.delete(
|
||||
`${s.currentUnit.type}/${s.currentUnit.id}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
s.currentUnit = { type: unitType, id: unitId, startedAt: Date.now() };
|
||||
deps.captureAvailableSkills();
|
||||
deps.writeUnitRuntimeRecord(
|
||||
|
|
@ -1436,6 +1384,52 @@ export async function autoLoop(
|
|||
break;
|
||||
}
|
||||
|
||||
// ── Immediate unit closeout (metrics, activity log, memory) ────────
|
||||
// Run right after runUnit() returns so telemetry is never lost to a
|
||||
// crash between iterations.
|
||||
await deps.closeoutUnit(
|
||||
ctx,
|
||||
s.basePath,
|
||||
unitType,
|
||||
unitId,
|
||||
s.currentUnit.startedAt,
|
||||
deps.buildSnapshotOpts(unitType, unitId),
|
||||
);
|
||||
|
||||
if (s.currentUnitRouting) {
|
||||
deps.recordOutcome(
|
||||
unitType,
|
||||
s.currentUnitRouting.tier as "light" | "standard" | "heavy",
|
||||
true, // success assumed; dispatch will re-dispatch if artifact missing
|
||||
);
|
||||
}
|
||||
|
||||
const isHookUnit = unitType.startsWith("hook/");
|
||||
const artifactVerified =
|
||||
isHookUnit ||
|
||||
deps.verifyExpectedArtifact(unitType, unitId, s.basePath);
|
||||
if (artifactVerified) {
|
||||
s.completedUnits.push({
|
||||
type: unitType,
|
||||
id: unitId,
|
||||
startedAt: s.currentUnit.startedAt,
|
||||
finishedAt: Date.now(),
|
||||
});
|
||||
if (s.completedUnits.length > 200) {
|
||||
s.completedUnits = s.completedUnits.slice(-200);
|
||||
}
|
||||
// Flush completed-units to disk so the record survives crashes
|
||||
try {
|
||||
const completedKeysPath = join(gsdRoot(s.basePath), "completed-units.json");
|
||||
const keys = s.completedUnits.map((u) => `${u.type}/${u.id}`);
|
||||
atomicWriteSync(completedKeysPath, JSON.stringify(keys, null, 2));
|
||||
} catch { /* non-fatal: disk flush failure */ }
|
||||
|
||||
deps.clearUnitRuntimeRecord(s.basePath, unitType, unitId);
|
||||
s.unitDispatchCount.delete(`${unitType}/${unitId}`);
|
||||
s.unitRecoveryCount.delete(`${unitType}/${unitId}`);
|
||||
}
|
||||
|
||||
// ── Phase 5: Finalize ───────────────────────────────────────────────
|
||||
|
||||
debugLog("autoLoop", { phase: "finalize", iteration });
|
||||
|
|
@ -1601,6 +1595,16 @@ export async function autoLoop(
|
|||
break;
|
||||
}
|
||||
|
||||
// Immediate closeout for sidecar unit
|
||||
await deps.closeoutUnit(
|
||||
ctx,
|
||||
s.basePath,
|
||||
item.unitType,
|
||||
item.unitId,
|
||||
sidecarStartedAt,
|
||||
deps.buildSnapshotOpts(item.unitType, item.unitId),
|
||||
);
|
||||
|
||||
// Run pre-verification for the sidecar unit (lightweight path)
|
||||
const sidecarPreOpts: PreVerificationOpts = item.kind === "hook"
|
||||
? { skipSettleDelay: true, skipDoctor: true, skipStateRebuild: true, skipWorktreeSync: true }
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue