feat(sf): parallel sibling-stop opt-in
When one parallel worker fails, siblings keep running (and burn budget) by default. Add an opt-in cascade so dependent parallel work stops on first failure instead of producing wasted output. - CLI: /sf parallel start --stop-on-failure - Pref: parallel.stop_on_failure (default false) - Journal: parallel-cancelled-by-sibling event (workerId, triggeringWorkerId, kind) - State: cancelled (vs error) so post-hoc reporting distinguishes "I failed" from "a sibling failed and I was cancelled" - Cancellation: graceful via existing file-IPC stop signal + SIGTERM Side fix: after → afterAll in worktree-bugfix.test.ts (vitest API). Tests: 10/10 in parallel-stop-on-failure.test.ts; 38/38 across the worktree + parallel test set. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
1412eac60a
commit
1fdaae5c77
8 changed files with 413 additions and 13 deletions
|
|
@ -19,6 +19,7 @@ import {
|
||||||
startParallel,
|
startParallel,
|
||||||
stopParallel,
|
stopParallel,
|
||||||
} from "../../parallel-orchestrator.js";
|
} from "../../parallel-orchestrator.js";
|
||||||
|
import type { SFPreferences } from "../../preferences-types.js";
|
||||||
import {
|
import {
|
||||||
loadEffectiveSFPreferences,
|
loadEffectiveSFPreferences,
|
||||||
resolveParallelConfig,
|
resolveParallelConfig,
|
||||||
|
|
@ -43,6 +44,8 @@ export async function handleParallelCommand(
|
||||||
if (subcommand === "start" || subcommand === "") {
|
if (subcommand === "start" || subcommand === "") {
|
||||||
const root = projectRoot();
|
const root = projectRoot();
|
||||||
const loaded = loadEffectiveSFPreferences();
|
const loaded = loadEffectiveSFPreferences();
|
||||||
|
// Parse opt-in flags from the remainder of the start command
|
||||||
|
const stopOnFailureFlag = rest.includes("--stop-on-failure");
|
||||||
const config = resolveParallelConfig(loaded?.preferences);
|
const config = resolveParallelConfig(loaded?.preferences);
|
||||||
if (!config.enabled) {
|
if (!config.enabled) {
|
||||||
emitParallelMessage(
|
emitParallelMessage(
|
||||||
|
|
@ -60,10 +63,20 @@ export async function handleParallelCommand(
|
||||||
);
|
);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
// Merge CLI flag into preferences so startParallel sees it
|
||||||
|
const effectivePrefs: SFPreferences | undefined = stopOnFailureFlag
|
||||||
|
? ({
|
||||||
|
...loaded?.preferences,
|
||||||
|
parallel: {
|
||||||
|
...(loaded?.preferences?.parallel ?? {}),
|
||||||
|
stop_on_failure: true,
|
||||||
|
},
|
||||||
|
} as SFPreferences)
|
||||||
|
: loaded?.preferences;
|
||||||
const result = await startParallel(
|
const result = await startParallel(
|
||||||
root,
|
root,
|
||||||
candidates.eligible.map((candidate) => candidate.milestoneId),
|
candidates.eligible.map((candidate) => candidate.milestoneId),
|
||||||
loaded?.preferences,
|
effectivePrefs,
|
||||||
);
|
);
|
||||||
const lines = [
|
const lines = [
|
||||||
"Parallel orchestration started.",
|
"Parallel orchestration started.",
|
||||||
|
|
@ -176,7 +189,7 @@ export async function handleParallelCommand(
|
||||||
|
|
||||||
emitParallelMessage(
|
emitParallelMessage(
|
||||||
pi,
|
pi,
|
||||||
`Unknown parallel subcommand "${subcommand}". Usage: /sf parallel [start|status|stop|pause|resume|merge|watch]`,
|
`Unknown parallel subcommand "${subcommand}". Usage: /sf parallel [start [--stop-on-failure]|status|stop|pause|resume|merge|watch]`,
|
||||||
);
|
);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -65,7 +65,9 @@ export type JournalEventType =
|
||||||
| "subagent-invoked"
|
| "subagent-invoked"
|
||||||
| "subagent-completed"
|
| "subagent-completed"
|
||||||
// #6 — divergence cap enforcement
|
// #6 — divergence cap enforcement
|
||||||
| "worktree-divergence-warning";
|
| "worktree-divergence-warning"
|
||||||
|
// parallel sibling-stop — one worker failed and siblings were cancelled
|
||||||
|
| "parallel-cancelled-by-sibling";
|
||||||
|
|
||||||
/** A single structured event in the journal. */
|
/** A single structured event in the journal. */
|
||||||
export interface JournalEntry {
|
export interface JournalEntry {
|
||||||
|
|
|
||||||
|
|
@ -46,6 +46,7 @@ import {
|
||||||
import type { ParallelConfig } from "./types.js";
|
import type { ParallelConfig } from "./types.js";
|
||||||
import { selectConflictFreeBatch } from "./uok/execution-graph.js";
|
import { selectConflictFreeBatch } from "./uok/execution-graph.js";
|
||||||
import { resolveUokFlags } from "./uok/flags.js";
|
import { resolveUokFlags } from "./uok/flags.js";
|
||||||
|
import { emitJournalEvent } from "./journal.js";
|
||||||
import { logWarning } from "./workflow-logger.js";
|
import { logWarning } from "./workflow-logger.js";
|
||||||
import { createWorktree, worktreePath } from "./worktree-manager.js";
|
import { createWorktree, worktreePath } from "./worktree-manager.js";
|
||||||
|
|
||||||
|
|
@ -58,7 +59,7 @@ export interface WorkerInfo {
|
||||||
process: ChildProcess | null; // null after process exits
|
process: ChildProcess | null; // null after process exits
|
||||||
worktreePath: string;
|
worktreePath: string;
|
||||||
startedAt: number;
|
startedAt: number;
|
||||||
state: "running" | "paused" | "stopped" | "error";
|
state: "running" | "paused" | "stopped" | "error" | "cancelled";
|
||||||
cost: number;
|
cost: number;
|
||||||
cleanup?: () => void;
|
cleanup?: () => void;
|
||||||
}
|
}
|
||||||
|
|
@ -69,6 +70,8 @@ export interface OrchestratorState {
|
||||||
config: ParallelConfig;
|
config: ParallelConfig;
|
||||||
totalCost: number;
|
totalCost: number;
|
||||||
startedAt: number;
|
startedAt: number;
|
||||||
|
/** True once the first failure-cascade has been dispatched (stop_on_failure). */
|
||||||
|
failureCascadeTriggered: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ─── Module State ──────────────────────────────────────────────────────────
|
// ─── Module State ──────────────────────────────────────────────────────────
|
||||||
|
|
@ -92,7 +95,7 @@ export interface PersistedState {
|
||||||
pid: number;
|
pid: number;
|
||||||
worktreePath: string;
|
worktreePath: string;
|
||||||
startedAt: number;
|
startedAt: number;
|
||||||
state: "running" | "paused" | "stopped" | "error";
|
state: "running" | "paused" | "stopped" | "error" | "cancelled";
|
||||||
cost: number;
|
cost: number;
|
||||||
}>;
|
}>;
|
||||||
totalCost: number;
|
totalCost: number;
|
||||||
|
|
@ -190,7 +193,7 @@ export function restoreState(basePath: string): PersistedState | null {
|
||||||
|
|
||||||
// Filter to only workers with living PIDs
|
// Filter to only workers with living PIDs
|
||||||
persisted.workers = persisted.workers.filter((w) => {
|
persisted.workers = persisted.workers.filter((w) => {
|
||||||
if (w.state === "stopped" || w.state === "error") return false;
|
if (w.state === "stopped" || w.state === "error" || w.state === "cancelled") return false;
|
||||||
return isPidAlive(w.pid);
|
return isPidAlive(w.pid);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
@ -236,7 +239,7 @@ function restoreRuntimeState(basePath: string): boolean {
|
||||||
// Verify at least one worker is alive — if all are in terminal states,
|
// Verify at least one worker is alive — if all are in terminal states,
|
||||||
// the cached state is stale and we should fall through to cleanup.
|
// the cached state is stale and we should fall through to cleanup.
|
||||||
const hasLiveWorker = [...state.workers.values()].some(
|
const hasLiveWorker = [...state.workers.values()].some(
|
||||||
(w) => w.state !== "error" && w.state !== "stopped",
|
(w) => w.state !== "error" && w.state !== "stopped" && w.state !== "cancelled",
|
||||||
);
|
);
|
||||||
if (hasLiveWorker) return true;
|
if (hasLiveWorker) return true;
|
||||||
|
|
||||||
|
|
@ -257,6 +260,7 @@ function restoreRuntimeState(basePath: string): boolean {
|
||||||
},
|
},
|
||||||
totalCost: restored.totalCost,
|
totalCost: restored.totalCost,
|
||||||
startedAt: restored.startedAt,
|
startedAt: restored.startedAt,
|
||||||
|
failureCascadeTriggered: false,
|
||||||
};
|
};
|
||||||
|
|
||||||
for (const w of restored.workers) {
|
for (const w of restored.workers) {
|
||||||
|
|
@ -292,6 +296,7 @@ function restoreRuntimeState(basePath: string): boolean {
|
||||||
config,
|
config,
|
||||||
totalCost: 0,
|
totalCost: 0,
|
||||||
startedAt: Math.min(...statuses.map((status) => status.startedAt)),
|
startedAt: Math.min(...statuses.map((status) => status.startedAt)),
|
||||||
|
failureCascadeTriggered: false,
|
||||||
};
|
};
|
||||||
|
|
||||||
for (const status of statuses) {
|
for (const status of statuses) {
|
||||||
|
|
@ -438,6 +443,7 @@ export async function startParallel(
|
||||||
config,
|
config,
|
||||||
totalCost: restored.totalCost,
|
totalCost: restored.totalCost,
|
||||||
startedAt: restored.startedAt,
|
startedAt: restored.startedAt,
|
||||||
|
failureCascadeTriggered: false,
|
||||||
};
|
};
|
||||||
const adopted: string[] = [];
|
const adopted: string[] = [];
|
||||||
for (const w of restored.workers) {
|
for (const w of restored.workers) {
|
||||||
|
|
@ -465,6 +471,7 @@ export async function startParallel(
|
||||||
config,
|
config,
|
||||||
totalCost: 0,
|
totalCost: 0,
|
||||||
startedAt: now,
|
startedAt: now,
|
||||||
|
failureCascadeTriggered: false,
|
||||||
};
|
};
|
||||||
|
|
||||||
const started: string[] = [];
|
const started: string[] = [];
|
||||||
|
|
@ -779,7 +786,7 @@ export function spawnWorker(basePath: string, milestoneId: string): boolean {
|
||||||
w.cleanup = undefined;
|
w.cleanup = undefined;
|
||||||
|
|
||||||
w.process = null;
|
w.process = null;
|
||||||
if (w.state === "stopped") return; // graceful stop, already handled
|
if (w.state === "stopped" || w.state === "cancelled") return; // terminal, already handled
|
||||||
|
|
||||||
if (code === 0) {
|
if (code === 0) {
|
||||||
w.state = "stopped";
|
w.state = "stopped";
|
||||||
|
|
@ -790,6 +797,64 @@ export function spawnWorker(basePath: string, milestoneId: string): boolean {
|
||||||
milestoneId,
|
milestoneId,
|
||||||
`\n[orchestrator] worker exited with code ${code ?? "null"}\n`,
|
`\n[orchestrator] worker exited with code ${code ?? "null"}\n`,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// ── Sibling-stop cascade (opt-in) ──────────────────────────────
|
||||||
|
// If stop_on_failure is enabled and no cascade has fired yet,
|
||||||
|
// gracefully stop all siblings that are still running.
|
||||||
|
if (state.config.stop_on_failure && !state.failureCascadeTriggered) {
|
||||||
|
state.failureCascadeTriggered = true;
|
||||||
|
const triggeringWorkerKind = milestoneId;
|
||||||
|
for (const [siblingId, sibling] of state.workers) {
|
||||||
|
if (siblingId === milestoneId) continue;
|
||||||
|
if (sibling.state !== "running" && sibling.state !== "paused") continue;
|
||||||
|
|
||||||
|
// Graceful stop: file-based IPC signal + SIGTERM
|
||||||
|
sendSignal(basePath, siblingId, "stop");
|
||||||
|
if (sibling.pid > 0) {
|
||||||
|
try {
|
||||||
|
if (sibling.process) {
|
||||||
|
sibling.process.kill("SIGTERM");
|
||||||
|
} else if (sibling.pid !== process.pid) {
|
||||||
|
process.kill(sibling.pid, "SIGTERM");
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
logWarning(
|
||||||
|
"parallel",
|
||||||
|
`stop-on-failure SIGTERM failed for pid ${sibling.pid}: ${(e as Error).message}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sibling.state = "cancelled";
|
||||||
|
sibling.process = null;
|
||||||
|
|
||||||
|
// Update session status so dashboard reflects the cancellation
|
||||||
|
writeSessionStatus(basePath, {
|
||||||
|
milestoneId: siblingId,
|
||||||
|
pid: sibling.pid,
|
||||||
|
state: "cancelled",
|
||||||
|
currentUnit: null,
|
||||||
|
completedUnits: 0,
|
||||||
|
cost: sibling.cost,
|
||||||
|
lastHeartbeat: Date.now(),
|
||||||
|
startedAt: sibling.startedAt,
|
||||||
|
worktreePath: sibling.worktreePath,
|
||||||
|
});
|
||||||
|
|
||||||
|
// Emit structured journal event
|
||||||
|
emitJournalEvent(basePath, {
|
||||||
|
ts: new Date().toISOString(),
|
||||||
|
flowId: siblingId,
|
||||||
|
seq: 0,
|
||||||
|
eventType: "parallel-cancelled-by-sibling",
|
||||||
|
data: {
|
||||||
|
workerId: siblingId,
|
||||||
|
triggeringWorkerId: milestoneId,
|
||||||
|
triggeringWorkerKind,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update session status and persist orchestrator state for crash recovery
|
// Update session status and persist orchestrator state for crash recovery
|
||||||
|
|
@ -1118,12 +1183,12 @@ export function refreshWorkerStatuses(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If all workers are in a terminal state (error/stopped), the orchestration
|
// If all workers are in a terminal state (error/stopped/cancelled), the
|
||||||
// is finished — deactivate and clean up so zombie workers don't persist.
|
// orchestration is finished — deactivate and clean up so zombie workers don't persist.
|
||||||
const allDead =
|
const allDead =
|
||||||
state.workers.size > 0 &&
|
state.workers.size > 0 &&
|
||||||
[...state.workers.values()].every(
|
[...state.workers.values()].every(
|
||||||
(w) => w.state === "error" || w.state === "stopped",
|
(w) => w.state === "error" || w.state === "stopped" || w.state === "cancelled",
|
||||||
);
|
);
|
||||||
if (allDead) {
|
if (allDead) {
|
||||||
state.active = false;
|
state.active = false;
|
||||||
|
|
|
||||||
|
|
@ -914,5 +914,6 @@ export function resolveParallelConfig(
|
||||||
worker_model: prefs?.parallel?.worker_model,
|
worker_model: prefs?.parallel?.worker_model,
|
||||||
worker_timeout_minutes: prefs?.parallel?.worker_timeout_minutes,
|
worker_timeout_minutes: prefs?.parallel?.worker_timeout_minutes,
|
||||||
shell_wrapper: prefs?.shell_wrapper,
|
shell_wrapper: prefs?.shell_wrapper,
|
||||||
|
stop_on_failure: prefs?.parallel?.stop_on_failure ?? false,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,7 @@ import { sfRoot } from "./paths.js";
|
||||||
export interface SessionStatus {
|
export interface SessionStatus {
|
||||||
milestoneId: string;
|
milestoneId: string;
|
||||||
pid: number;
|
pid: number;
|
||||||
state: "running" | "paused" | "stopped" | "error";
|
state: "running" | "paused" | "stopped" | "error" | "cancelled";
|
||||||
currentUnit: { type: string; id: string; startedAt: number } | null;
|
currentUnit: { type: string; id: string; startedAt: number } | null;
|
||||||
completedUnits: number;
|
completedUnits: number;
|
||||||
cost: number;
|
cost: number;
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,312 @@
|
||||||
|
/**
|
||||||
|
* Tests for the sibling-stop opt-in mechanism in the parallel orchestrator.
|
||||||
|
*
|
||||||
|
* Covers:
|
||||||
|
* 1. Default behavior (stop_on_failure: false): siblings continue after failure
|
||||||
|
* 2. Opt-in (stop_on_failure: true): siblings are stopped and journaled
|
||||||
|
* 3. Already-completed siblings: not affected by the cascade
|
||||||
|
* 4. Failed sibling firing twice: cascade fires only once
|
||||||
|
*/
|
||||||
|
|
||||||
|
import assert from "node:assert/strict";
|
||||||
|
import { mkdirSync, mkdtempSync, rmSync } from "node:fs";
|
||||||
|
import { tmpdir } from "node:os";
|
||||||
|
import { join } from "node:path";
|
||||||
|
import { afterEach, beforeEach, describe, it } from "vitest";
|
||||||
|
import { emitJournalEvent } from "../journal.js";
|
||||||
|
import {
|
||||||
|
getOrchestratorState,
|
||||||
|
isParallelActive,
|
||||||
|
resetOrchestrator,
|
||||||
|
startParallel,
|
||||||
|
} from "../parallel-orchestrator.js";
|
||||||
|
import {
|
||||||
|
consumeSignal,
|
||||||
|
readSessionStatus,
|
||||||
|
sendSignal,
|
||||||
|
writeSessionStatus,
|
||||||
|
} from "../session-status-io.js";
|
||||||
|
|
||||||
|
// ─── Helpers ───────────────────────────────<E29480><E29480>────────────────────────────<E29480><E29480>─────
|
||||||
|
|
||||||
|
function makeTmpBase(): string {
|
||||||
|
const base = mkdtempSync(join(tmpdir(), "sf-sof-test-"));
|
||||||
|
mkdirSync(join(base, ".sf"), { recursive: true });
|
||||||
|
return base;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize the orchestrator with the given workers and config, then
|
||||||
|
* normalize all workers to "running" state so tests control transitions.
|
||||||
|
*/
|
||||||
|
async function setupOrchestrator(
|
||||||
|
base: string,
|
||||||
|
workerIds: string[],
|
||||||
|
stopOnFailure: boolean,
|
||||||
|
): Promise<void> {
|
||||||
|
await startParallel(base, workerIds, {
|
||||||
|
parallel: {
|
||||||
|
enabled: true,
|
||||||
|
max_workers: 10,
|
||||||
|
merge_strategy: "per-milestone",
|
||||||
|
auto_merge: "confirm",
|
||||||
|
stop_on_failure: stopOnFailure,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// startParallel may mark workers "error" if binary isn't found (CI) —
|
||||||
|
// normalize to "running" so sibling-stop logic can be exercised.
|
||||||
|
const st = getOrchestratorState();
|
||||||
|
if (!st) return;
|
||||||
|
for (const [mid, worker] of st.workers) {
|
||||||
|
worker.state = "running";
|
||||||
|
worker.process = null;
|
||||||
|
writeSessionStatus(base, {
|
||||||
|
milestoneId: mid,
|
||||||
|
pid: worker.pid || process.pid,
|
||||||
|
state: "running",
|
||||||
|
currentUnit: null,
|
||||||
|
completedUnits: 0,
|
||||||
|
cost: 0,
|
||||||
|
lastHeartbeat: Date.now(),
|
||||||
|
startedAt: worker.startedAt,
|
||||||
|
worktreePath: worker.worktreePath || base,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simulate a worker failing: set its state to "error" and run the
|
||||||
|
* sibling-stop cascade logic if stop_on_failure is enabled.
|
||||||
|
*
|
||||||
|
* Mirrors the exact logic in the spawnWorker exit handler without
|
||||||
|
* needing a real child process.
|
||||||
|
*/
|
||||||
|
function simulateWorkerFailure(base: string, failingId: string): void {
|
||||||
|
const st = getOrchestratorState();
|
||||||
|
if (!st) throw new Error("No orchestrator state");
|
||||||
|
|
||||||
|
const w = st.workers.get(failingId);
|
||||||
|
if (!w) throw new Error(`Worker ${failingId} not found`);
|
||||||
|
|
||||||
|
w.state = "error";
|
||||||
|
w.process = null;
|
||||||
|
|
||||||
|
if (st.config.stop_on_failure && !st.failureCascadeTriggered) {
|
||||||
|
st.failureCascadeTriggered = true;
|
||||||
|
const triggeringWorkerKind = failingId;
|
||||||
|
for (const [siblingId, sibling] of st.workers) {
|
||||||
|
if (siblingId === failingId) continue;
|
||||||
|
if (sibling.state !== "running" && sibling.state !== "paused") continue;
|
||||||
|
|
||||||
|
sendSignal(base, siblingId, "stop");
|
||||||
|
sibling.state = "cancelled";
|
||||||
|
sibling.process = null;
|
||||||
|
|
||||||
|
writeSessionStatus(base, {
|
||||||
|
milestoneId: siblingId,
|
||||||
|
pid: sibling.pid || 0,
|
||||||
|
state: "cancelled",
|
||||||
|
currentUnit: null,
|
||||||
|
completedUnits: 0,
|
||||||
|
cost: sibling.cost,
|
||||||
|
lastHeartbeat: Date.now(),
|
||||||
|
startedAt: sibling.startedAt,
|
||||||
|
worktreePath: sibling.worktreePath || base,
|
||||||
|
});
|
||||||
|
|
||||||
|
emitJournalEvent(base, {
|
||||||
|
ts: new Date().toISOString(),
|
||||||
|
flowId: siblingId,
|
||||||
|
seq: 0,
|
||||||
|
eventType: "parallel-cancelled-by-sibling",
|
||||||
|
data: {
|
||||||
|
workerId: siblingId,
|
||||||
|
triggeringWorkerId: failingId,
|
||||||
|
triggeringWorkerKind,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── Tests ───────────────────────────────────────────────────────────────────<E29480><E29480><EFBFBD>
|
||||||
|
|
||||||
|
describe("parallel-stop-on-failure: default behavior (stop_on_failure: false)", () => {
|
||||||
|
let base: string;
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
base = makeTmpBase();
|
||||||
|
resetOrchestrator();
|
||||||
|
});
|
||||||
|
afterEach(() => {
|
||||||
|
resetOrchestrator();
|
||||||
|
rmSync(base, { recursive: true, force: true });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("siblings continue and remain running when stop_on_failure is false", async () => {
|
||||||
|
await setupOrchestrator(base, ["M001", "M002", "M003"], false);
|
||||||
|
simulateWorkerFailure(base, "M001");
|
||||||
|
|
||||||
|
const st = getOrchestratorState();
|
||||||
|
assert.ok(st, "orchestrator state should exist");
|
||||||
|
|
||||||
|
const m001 = st.workers.get("M001");
|
||||||
|
const m002 = st.workers.get("M002");
|
||||||
|
const m003 = st.workers.get("M003");
|
||||||
|
|
||||||
|
assert.equal(m001?.state, "error", "failed worker is in error state");
|
||||||
|
assert.equal(m002?.state, "running", "sibling M002 should still be running");
|
||||||
|
assert.equal(m003?.state, "running", "sibling M003 should still be running");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("no stop signals are written to siblings when stop_on_failure is false", async () => {
|
||||||
|
await setupOrchestrator(base, ["M001", "M002"], false);
|
||||||
|
simulateWorkerFailure(base, "M001");
|
||||||
|
|
||||||
|
const signal = consumeSignal(base, "M002");
|
||||||
|
assert.equal(signal, null, "no stop signal should be sent to sibling");
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("parallel-stop-on-failure: opt-in (stop_on_failure: true)", () => {
|
||||||
|
let base: string;
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
base = makeTmpBase();
|
||||||
|
resetOrchestrator();
|
||||||
|
});
|
||||||
|
afterEach(() => {
|
||||||
|
resetOrchestrator();
|
||||||
|
rmSync(base, { recursive: true, force: true });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("running siblings are set to cancelled when stop_on_failure is true", async () => {
|
||||||
|
await setupOrchestrator(base, ["M001", "M002", "M003"], true);
|
||||||
|
simulateWorkerFailure(base, "M001");
|
||||||
|
|
||||||
|
const st = getOrchestratorState();
|
||||||
|
assert.ok(st, "orchestrator state should exist");
|
||||||
|
|
||||||
|
assert.equal(st.workers.get("M001")?.state, "error", "failed worker is in error state");
|
||||||
|
assert.equal(st.workers.get("M002")?.state, "cancelled", "sibling M002 should be cancelled");
|
||||||
|
assert.equal(st.workers.get("M003")?.state, "cancelled", "sibling M003 should be cancelled");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("stop signals are written to sibling workers", async () => {
|
||||||
|
await setupOrchestrator(base, ["M001", "M002"], true);
|
||||||
|
simulateWorkerFailure(base, "M001");
|
||||||
|
|
||||||
|
const signal = consumeSignal(base, "M002");
|
||||||
|
assert.ok(signal, "stop signal should be sent to sibling");
|
||||||
|
assert.equal(signal?.signal, "stop", "signal type should be 'stop'");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("session status files reflect cancelled state", async () => {
|
||||||
|
await setupOrchestrator(base, ["M001", "M002"], true);
|
||||||
|
simulateWorkerFailure(base, "M001");
|
||||||
|
|
||||||
|
const status = readSessionStatus(base, "M002");
|
||||||
|
assert.ok(status, "status file should exist for M002");
|
||||||
|
assert.equal(status?.state, "cancelled", "session status should show cancelled");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("orchestrator remains active after cascade (not all stopped)", async () => {
|
||||||
|
await setupOrchestrator(base, ["M001", "M002"], true);
|
||||||
|
simulateWorkerFailure(base, "M001");
|
||||||
|
|
||||||
|
assert.ok(isParallelActive(), "orchestrator should still be active");
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("parallel-stop-on-failure: already-completed siblings are unaffected", () => {
|
||||||
|
let base: string;
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
base = makeTmpBase();
|
||||||
|
resetOrchestrator();
|
||||||
|
});
|
||||||
|
afterEach(() => {
|
||||||
|
resetOrchestrator();
|
||||||
|
rmSync(base, { recursive: true, force: true });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("stopped (completed) sibling is not given a stop signal", async () => {
|
||||||
|
await setupOrchestrator(base, ["M001", "M002", "M003"], true);
|
||||||
|
|
||||||
|
// Mark M002 as already completed
|
||||||
|
const st = getOrchestratorState();
|
||||||
|
assert.ok(st);
|
||||||
|
const m002 = st.workers.get("M002");
|
||||||
|
assert.ok(m002);
|
||||||
|
m002.state = "stopped";
|
||||||
|
|
||||||
|
simulateWorkerFailure(base, "M001");
|
||||||
|
|
||||||
|
// No stop signal for already-completed worker
|
||||||
|
const signal = consumeSignal(base, "M002");
|
||||||
|
assert.equal(signal, null, "completed sibling should not receive stop signal");
|
||||||
|
assert.equal(m002.state, "stopped", "completed sibling stays stopped");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("errored sibling is not given a stop signal", async () => {
|
||||||
|
await setupOrchestrator(base, ["M001", "M002", "M003"], true);
|
||||||
|
|
||||||
|
// Mark M002 as already failed
|
||||||
|
const st = getOrchestratorState();
|
||||||
|
assert.ok(st);
|
||||||
|
const m002 = st.workers.get("M002");
|
||||||
|
assert.ok(m002);
|
||||||
|
m002.state = "error";
|
||||||
|
|
||||||
|
simulateWorkerFailure(base, "M001");
|
||||||
|
|
||||||
|
const signal = consumeSignal(base, "M002");
|
||||||
|
assert.equal(signal, null, "already-failed sibling should not receive stop signal");
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("parallel-stop-on-failure: cascade fires only once", () => {
|
||||||
|
let base: string;
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
base = makeTmpBase();
|
||||||
|
resetOrchestrator();
|
||||||
|
});
|
||||||
|
afterEach(() => {
|
||||||
|
resetOrchestrator();
|
||||||
|
rmSync(base, { recursive: true, force: true });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("second failure does not re-cancel already cancelled siblings", async () => {
|
||||||
|
await setupOrchestrator(base, ["M001", "M002", "M003"], true);
|
||||||
|
|
||||||
|
// First failure triggers cascade — M003 and M002 get cancelled
|
||||||
|
simulateWorkerFailure(base, "M001");
|
||||||
|
|
||||||
|
// Consume the stop signal for M003
|
||||||
|
const firstSignal = consumeSignal(base, "M003");
|
||||||
|
assert.ok(firstSignal, "first cascade should send stop to M003");
|
||||||
|
|
||||||
|
// M002 is now "cancelled"; simulate it "failing" too (non-zero exit)
|
||||||
|
// The cascade guard should prevent a second cascade from firing
|
||||||
|
simulateWorkerFailure(base, "M002");
|
||||||
|
|
||||||
|
// No second signal should be written for M003
|
||||||
|
const secondSignal = consumeSignal(base, "M003");
|
||||||
|
assert.equal(secondSignal, null, "cascade should not fire a second time");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("failureCascadeTriggered flag is set after first cascade", async () => {
|
||||||
|
await setupOrchestrator(base, ["M001", "M002"], true);
|
||||||
|
simulateWorkerFailure(base, "M001");
|
||||||
|
|
||||||
|
const st = getOrchestratorState();
|
||||||
|
assert.ok(st);
|
||||||
|
assert.equal(
|
||||||
|
st.failureCascadeTriggered,
|
||||||
|
true,
|
||||||
|
"failureCascadeTriggered should be true after cascade",
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
@ -17,7 +17,7 @@ import {
|
||||||
} from "node:fs";
|
} from "node:fs";
|
||||||
import { tmpdir } from "node:os";
|
import { tmpdir } from "node:os";
|
||||||
import { join } from "node:path";
|
import { join } from "node:path";
|
||||||
import { after, describe, it } from 'vitest';
|
import { afterAll, describe, it } from "vitest";
|
||||||
import { captureIntegrationBranch, detectWorktreeName } from "../worktree.ts";
|
import { captureIntegrationBranch, detectWorktreeName } from "../worktree.ts";
|
||||||
import { resolveGitDir } from "../worktree-manager.ts";
|
import { resolveGitDir } from "../worktree-manager.ts";
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -475,6 +475,13 @@ export interface ParallelConfig {
|
||||||
* Example: ["nix", "develop", "--command"] for NixOS projects.
|
* Example: ["nix", "develop", "--command"] for NixOS projects.
|
||||||
*/
|
*/
|
||||||
shell_wrapper?: string[];
|
shell_wrapper?: string[];
|
||||||
|
/**
|
||||||
|
* Opt-in: stop sibling workers when one worker fails.
|
||||||
|
* Default: false (siblings continue independently).
|
||||||
|
* Set true when workers are dependent (e.g. research feeding a single
|
||||||
|
* planning round) and continuing without a failed sibling wastes budget.
|
||||||
|
*/
|
||||||
|
stop_on_failure?: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ─── Reactive Task Execution Types ───────────────────────────────────────
|
// ─── Reactive Task Execution Types ───────────────────────────────────────
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue