feat(sf): parallel sibling-stop opt-in

When one parallel worker fails, siblings keep running (and burn budget) by
default. Add an opt-in cascade so dependent parallel work stops on first
failure instead of producing wasted output.

- CLI: /sf parallel start --stop-on-failure
- Pref: parallel.stop_on_failure (default false)
- Journal: parallel-cancelled-by-sibling event (workerId, triggeringWorkerId, kind)
- State: cancelled (vs error) so post-hoc reporting distinguishes "I failed"
  from "a sibling failed and I was cancelled"
- Cancellation: graceful via existing file-IPC stop signal + SIGTERM

Side fix: after → afterAll in worktree-bugfix.test.ts (vitest API).

Tests: 10/10 in parallel-stop-on-failure.test.ts; 38/38 across the worktree
+ parallel test set.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mikael Hugo 2026-05-02 09:39:13 +02:00
parent 1412eac60a
commit 1fdaae5c77
8 changed files with 413 additions and 13 deletions

View file

@ -19,6 +19,7 @@ import {
startParallel,
stopParallel,
} from "../../parallel-orchestrator.js";
import type { SFPreferences } from "../../preferences-types.js";
import {
loadEffectiveSFPreferences,
resolveParallelConfig,
@ -43,6 +44,8 @@ export async function handleParallelCommand(
if (subcommand === "start" || subcommand === "") {
const root = projectRoot();
const loaded = loadEffectiveSFPreferences();
// Parse opt-in flags from the remainder of the start command
const stopOnFailureFlag = rest.includes("--stop-on-failure");
const config = resolveParallelConfig(loaded?.preferences);
if (!config.enabled) {
emitParallelMessage(
@ -60,10 +63,20 @@ export async function handleParallelCommand(
);
return true;
}
// Merge CLI flag into preferences so startParallel sees it
const effectivePrefs: SFPreferences | undefined = stopOnFailureFlag
? ({
...loaded?.preferences,
parallel: {
...(loaded?.preferences?.parallel ?? {}),
stop_on_failure: true,
},
} as SFPreferences)
: loaded?.preferences;
const result = await startParallel(
root,
candidates.eligible.map((candidate) => candidate.milestoneId),
loaded?.preferences,
effectivePrefs,
);
const lines = [
"Parallel orchestration started.",
@ -176,7 +189,7 @@ export async function handleParallelCommand(
emitParallelMessage(
pi,
`Unknown parallel subcommand "${subcommand}". Usage: /sf parallel [start|status|stop|pause|resume|merge|watch]`,
`Unknown parallel subcommand "${subcommand}". Usage: /sf parallel [start [--stop-on-failure]|status|stop|pause|resume|merge|watch]`,
);
return true;
}

View file

@ -65,7 +65,9 @@ export type JournalEventType =
| "subagent-invoked"
| "subagent-completed"
// #6 — divergence cap enforcement
| "worktree-divergence-warning";
| "worktree-divergence-warning"
// parallel sibling-stop — one worker failed and siblings were cancelled
| "parallel-cancelled-by-sibling";
/** A single structured event in the journal. */
export interface JournalEntry {

View file

@ -46,6 +46,7 @@ import {
import type { ParallelConfig } from "./types.js";
import { selectConflictFreeBatch } from "./uok/execution-graph.js";
import { resolveUokFlags } from "./uok/flags.js";
import { emitJournalEvent } from "./journal.js";
import { logWarning } from "./workflow-logger.js";
import { createWorktree, worktreePath } from "./worktree-manager.js";
@ -58,7 +59,7 @@ export interface WorkerInfo {
process: ChildProcess | null; // null after process exits
worktreePath: string;
startedAt: number;
state: "running" | "paused" | "stopped" | "error";
state: "running" | "paused" | "stopped" | "error" | "cancelled";
cost: number;
cleanup?: () => void;
}
@ -69,6 +70,8 @@ export interface OrchestratorState {
config: ParallelConfig;
totalCost: number;
startedAt: number;
/** True once the first failure-cascade has been dispatched (stop_on_failure). */
failureCascadeTriggered: boolean;
}
// ─── Module State ──────────────────────────────────────────────────────────
@ -92,7 +95,7 @@ export interface PersistedState {
pid: number;
worktreePath: string;
startedAt: number;
state: "running" | "paused" | "stopped" | "error";
state: "running" | "paused" | "stopped" | "error" | "cancelled";
cost: number;
}>;
totalCost: number;
@ -190,7 +193,7 @@ export function restoreState(basePath: string): PersistedState | null {
// Filter to only workers with living PIDs
persisted.workers = persisted.workers.filter((w) => {
if (w.state === "stopped" || w.state === "error") return false;
if (w.state === "stopped" || w.state === "error" || w.state === "cancelled") return false;
return isPidAlive(w.pid);
});
@ -236,7 +239,7 @@ function restoreRuntimeState(basePath: string): boolean {
// Verify at least one worker is alive — if all are in terminal states,
// the cached state is stale and we should fall through to cleanup.
const hasLiveWorker = [...state.workers.values()].some(
(w) => w.state !== "error" && w.state !== "stopped",
(w) => w.state !== "error" && w.state !== "stopped" && w.state !== "cancelled",
);
if (hasLiveWorker) return true;
@ -257,6 +260,7 @@ function restoreRuntimeState(basePath: string): boolean {
},
totalCost: restored.totalCost,
startedAt: restored.startedAt,
failureCascadeTriggered: false,
};
for (const w of restored.workers) {
@ -292,6 +296,7 @@ function restoreRuntimeState(basePath: string): boolean {
config,
totalCost: 0,
startedAt: Math.min(...statuses.map((status) => status.startedAt)),
failureCascadeTriggered: false,
};
for (const status of statuses) {
@ -438,6 +443,7 @@ export async function startParallel(
config,
totalCost: restored.totalCost,
startedAt: restored.startedAt,
failureCascadeTriggered: false,
};
const adopted: string[] = [];
for (const w of restored.workers) {
@ -465,6 +471,7 @@ export async function startParallel(
config,
totalCost: 0,
startedAt: now,
failureCascadeTriggered: false,
};
const started: string[] = [];
@ -779,7 +786,7 @@ export function spawnWorker(basePath: string, milestoneId: string): boolean {
w.cleanup = undefined;
w.process = null;
if (w.state === "stopped") return; // graceful stop, already handled
if (w.state === "stopped" || w.state === "cancelled") return; // terminal, already handled
if (code === 0) {
w.state = "stopped";
@ -790,6 +797,64 @@ export function spawnWorker(basePath: string, milestoneId: string): boolean {
milestoneId,
`\n[orchestrator] worker exited with code ${code ?? "null"}\n`,
);
// ── Sibling-stop cascade (opt-in) ──────────────────────────────
// If stop_on_failure is enabled and no cascade has fired yet,
// gracefully stop all siblings that are still running.
if (state.config.stop_on_failure && !state.failureCascadeTriggered) {
state.failureCascadeTriggered = true;
const triggeringWorkerKind = milestoneId;
for (const [siblingId, sibling] of state.workers) {
if (siblingId === milestoneId) continue;
if (sibling.state !== "running" && sibling.state !== "paused") continue;
// Graceful stop: file-based IPC signal + SIGTERM
sendSignal(basePath, siblingId, "stop");
if (sibling.pid > 0) {
try {
if (sibling.process) {
sibling.process.kill("SIGTERM");
} else if (sibling.pid !== process.pid) {
process.kill(sibling.pid, "SIGTERM");
}
} catch (e) {
logWarning(
"parallel",
`stop-on-failure SIGTERM failed for pid ${sibling.pid}: ${(e as Error).message}`,
);
}
}
sibling.state = "cancelled";
sibling.process = null;
// Update session status so dashboard reflects the cancellation
writeSessionStatus(basePath, {
milestoneId: siblingId,
pid: sibling.pid,
state: "cancelled",
currentUnit: null,
completedUnits: 0,
cost: sibling.cost,
lastHeartbeat: Date.now(),
startedAt: sibling.startedAt,
worktreePath: sibling.worktreePath,
});
// Emit structured journal event
emitJournalEvent(basePath, {
ts: new Date().toISOString(),
flowId: siblingId,
seq: 0,
eventType: "parallel-cancelled-by-sibling",
data: {
workerId: siblingId,
triggeringWorkerId: milestoneId,
triggeringWorkerKind,
},
});
}
}
}
// Update session status and persist orchestrator state for crash recovery
@ -1118,12 +1183,12 @@ export function refreshWorkerStatuses(
}
}
// If all workers are in a terminal state (error/stopped), the orchestration
// is finished — deactivate and clean up so zombie workers don't persist.
// If all workers are in a terminal state (error/stopped/cancelled), the
// orchestration is finished — deactivate and clean up so zombie workers don't persist.
const allDead =
state.workers.size > 0 &&
[...state.workers.values()].every(
(w) => w.state === "error" || w.state === "stopped",
(w) => w.state === "error" || w.state === "stopped" || w.state === "cancelled",
);
if (allDead) {
state.active = false;

View file

@ -914,5 +914,6 @@ export function resolveParallelConfig(
worker_model: prefs?.parallel?.worker_model,
worker_timeout_minutes: prefs?.parallel?.worker_timeout_minutes,
shell_wrapper: prefs?.shell_wrapper,
stop_on_failure: prefs?.parallel?.stop_on_failure ?? false,
};
}

View file

@ -20,7 +20,7 @@ import { sfRoot } from "./paths.js";
export interface SessionStatus {
milestoneId: string;
pid: number;
state: "running" | "paused" | "stopped" | "error";
state: "running" | "paused" | "stopped" | "error" | "cancelled";
currentUnit: { type: string; id: string; startedAt: number } | null;
completedUnits: number;
cost: number;

View file

@ -0,0 +1,312 @@
/**
* Tests for the sibling-stop opt-in mechanism in the parallel orchestrator.
*
* Covers:
* 1. Default behavior (stop_on_failure: false): siblings continue after failure
* 2. Opt-in (stop_on_failure: true): siblings are stopped and journaled
* 3. Already-completed siblings: not affected by the cascade
* 4. Failed sibling firing twice: cascade fires only once
*/
import assert from "node:assert/strict";
import { mkdirSync, mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, beforeEach, describe, it } from "vitest";
import { emitJournalEvent } from "../journal.js";
import {
getOrchestratorState,
isParallelActive,
resetOrchestrator,
startParallel,
} from "../parallel-orchestrator.js";
import {
consumeSignal,
readSessionStatus,
sendSignal,
writeSessionStatus,
} from "../session-status-io.js";
// ─── Helpers ───────────────────────────────<E29480><E29480>────────────────────────────<E29480><E29480>─────
function makeTmpBase(): string {
const base = mkdtempSync(join(tmpdir(), "sf-sof-test-"));
mkdirSync(join(base, ".sf"), { recursive: true });
return base;
}
/**
* Initialize the orchestrator with the given workers and config, then
* normalize all workers to "running" state so tests control transitions.
*/
async function setupOrchestrator(
base: string,
workerIds: string[],
stopOnFailure: boolean,
): Promise<void> {
await startParallel(base, workerIds, {
parallel: {
enabled: true,
max_workers: 10,
merge_strategy: "per-milestone",
auto_merge: "confirm",
stop_on_failure: stopOnFailure,
},
});
// startParallel may mark workers "error" if binary isn't found (CI) —
// normalize to "running" so sibling-stop logic can be exercised.
const st = getOrchestratorState();
if (!st) return;
for (const [mid, worker] of st.workers) {
worker.state = "running";
worker.process = null;
writeSessionStatus(base, {
milestoneId: mid,
pid: worker.pid || process.pid,
state: "running",
currentUnit: null,
completedUnits: 0,
cost: 0,
lastHeartbeat: Date.now(),
startedAt: worker.startedAt,
worktreePath: worker.worktreePath || base,
});
}
}
/**
* Simulate a worker failing: set its state to "error" and run the
* sibling-stop cascade logic if stop_on_failure is enabled.
*
* Mirrors the exact logic in the spawnWorker exit handler without
* needing a real child process.
*/
function simulateWorkerFailure(base: string, failingId: string): void {
const st = getOrchestratorState();
if (!st) throw new Error("No orchestrator state");
const w = st.workers.get(failingId);
if (!w) throw new Error(`Worker ${failingId} not found`);
w.state = "error";
w.process = null;
if (st.config.stop_on_failure && !st.failureCascadeTriggered) {
st.failureCascadeTriggered = true;
const triggeringWorkerKind = failingId;
for (const [siblingId, sibling] of st.workers) {
if (siblingId === failingId) continue;
if (sibling.state !== "running" && sibling.state !== "paused") continue;
sendSignal(base, siblingId, "stop");
sibling.state = "cancelled";
sibling.process = null;
writeSessionStatus(base, {
milestoneId: siblingId,
pid: sibling.pid || 0,
state: "cancelled",
currentUnit: null,
completedUnits: 0,
cost: sibling.cost,
lastHeartbeat: Date.now(),
startedAt: sibling.startedAt,
worktreePath: sibling.worktreePath || base,
});
emitJournalEvent(base, {
ts: new Date().toISOString(),
flowId: siblingId,
seq: 0,
eventType: "parallel-cancelled-by-sibling",
data: {
workerId: siblingId,
triggeringWorkerId: failingId,
triggeringWorkerKind,
},
});
}
}
}
// ─── Tests ───────────────────────────────────────────────────────────────────<E29480><E29480><EFBFBD>
describe("parallel-stop-on-failure: default behavior (stop_on_failure: false)", () => {
let base: string;
beforeEach(() => {
base = makeTmpBase();
resetOrchestrator();
});
afterEach(() => {
resetOrchestrator();
rmSync(base, { recursive: true, force: true });
});
it("siblings continue and remain running when stop_on_failure is false", async () => {
await setupOrchestrator(base, ["M001", "M002", "M003"], false);
simulateWorkerFailure(base, "M001");
const st = getOrchestratorState();
assert.ok(st, "orchestrator state should exist");
const m001 = st.workers.get("M001");
const m002 = st.workers.get("M002");
const m003 = st.workers.get("M003");
assert.equal(m001?.state, "error", "failed worker is in error state");
assert.equal(m002?.state, "running", "sibling M002 should still be running");
assert.equal(m003?.state, "running", "sibling M003 should still be running");
});
it("no stop signals are written to siblings when stop_on_failure is false", async () => {
await setupOrchestrator(base, ["M001", "M002"], false);
simulateWorkerFailure(base, "M001");
const signal = consumeSignal(base, "M002");
assert.equal(signal, null, "no stop signal should be sent to sibling");
});
});
describe("parallel-stop-on-failure: opt-in (stop_on_failure: true)", () => {
let base: string;
beforeEach(() => {
base = makeTmpBase();
resetOrchestrator();
});
afterEach(() => {
resetOrchestrator();
rmSync(base, { recursive: true, force: true });
});
it("running siblings are set to cancelled when stop_on_failure is true", async () => {
await setupOrchestrator(base, ["M001", "M002", "M003"], true);
simulateWorkerFailure(base, "M001");
const st = getOrchestratorState();
assert.ok(st, "orchestrator state should exist");
assert.equal(st.workers.get("M001")?.state, "error", "failed worker is in error state");
assert.equal(st.workers.get("M002")?.state, "cancelled", "sibling M002 should be cancelled");
assert.equal(st.workers.get("M003")?.state, "cancelled", "sibling M003 should be cancelled");
});
it("stop signals are written to sibling workers", async () => {
await setupOrchestrator(base, ["M001", "M002"], true);
simulateWorkerFailure(base, "M001");
const signal = consumeSignal(base, "M002");
assert.ok(signal, "stop signal should be sent to sibling");
assert.equal(signal?.signal, "stop", "signal type should be 'stop'");
});
it("session status files reflect cancelled state", async () => {
await setupOrchestrator(base, ["M001", "M002"], true);
simulateWorkerFailure(base, "M001");
const status = readSessionStatus(base, "M002");
assert.ok(status, "status file should exist for M002");
assert.equal(status?.state, "cancelled", "session status should show cancelled");
});
it("orchestrator remains active after cascade (not all stopped)", async () => {
await setupOrchestrator(base, ["M001", "M002"], true);
simulateWorkerFailure(base, "M001");
assert.ok(isParallelActive(), "orchestrator should still be active");
});
});
describe("parallel-stop-on-failure: already-completed siblings are unaffected", () => {
let base: string;
beforeEach(() => {
base = makeTmpBase();
resetOrchestrator();
});
afterEach(() => {
resetOrchestrator();
rmSync(base, { recursive: true, force: true });
});
it("stopped (completed) sibling is not given a stop signal", async () => {
await setupOrchestrator(base, ["M001", "M002", "M003"], true);
// Mark M002 as already completed
const st = getOrchestratorState();
assert.ok(st);
const m002 = st.workers.get("M002");
assert.ok(m002);
m002.state = "stopped";
simulateWorkerFailure(base, "M001");
// No stop signal for already-completed worker
const signal = consumeSignal(base, "M002");
assert.equal(signal, null, "completed sibling should not receive stop signal");
assert.equal(m002.state, "stopped", "completed sibling stays stopped");
});
it("errored sibling is not given a stop signal", async () => {
await setupOrchestrator(base, ["M001", "M002", "M003"], true);
// Mark M002 as already failed
const st = getOrchestratorState();
assert.ok(st);
const m002 = st.workers.get("M002");
assert.ok(m002);
m002.state = "error";
simulateWorkerFailure(base, "M001");
const signal = consumeSignal(base, "M002");
assert.equal(signal, null, "already-failed sibling should not receive stop signal");
});
});
describe("parallel-stop-on-failure: cascade fires only once", () => {
let base: string;
beforeEach(() => {
base = makeTmpBase();
resetOrchestrator();
});
afterEach(() => {
resetOrchestrator();
rmSync(base, { recursive: true, force: true });
});
it("second failure does not re-cancel already cancelled siblings", async () => {
await setupOrchestrator(base, ["M001", "M002", "M003"], true);
// First failure triggers cascade — M003 and M002 get cancelled
simulateWorkerFailure(base, "M001");
// Consume the stop signal for M003
const firstSignal = consumeSignal(base, "M003");
assert.ok(firstSignal, "first cascade should send stop to M003");
// M002 is now "cancelled"; simulate it "failing" too (non-zero exit)
// The cascade guard should prevent a second cascade from firing
simulateWorkerFailure(base, "M002");
// No second signal should be written for M003
const secondSignal = consumeSignal(base, "M003");
assert.equal(secondSignal, null, "cascade should not fire a second time");
});
it("failureCascadeTriggered flag is set after first cascade", async () => {
await setupOrchestrator(base, ["M001", "M002"], true);
simulateWorkerFailure(base, "M001");
const st = getOrchestratorState();
assert.ok(st);
assert.equal(
st.failureCascadeTriggered,
true,
"failureCascadeTriggered should be true after cascade",
);
});
});

View file

@ -17,7 +17,7 @@ import {
} from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { after, describe, it } from 'vitest';
import { afterAll, describe, it } from "vitest";
import { captureIntegrationBranch, detectWorktreeName } from "../worktree.ts";
import { resolveGitDir } from "../worktree-manager.ts";

View file

@ -475,6 +475,13 @@ export interface ParallelConfig {
* Example: ["nix", "develop", "--command"] for NixOS projects.
*/
shell_wrapper?: string[];
/**
* Opt-in: stop sibling workers when one worker fails.
* Default: false (siblings continue independently).
* Set true when workers are dependent (e.g. research feeding a single
* planning round) and continuing without a failed sibling wastes budget.
*/
stop_on_failure?: boolean;
}
// ─── Reactive Task Execution Types ───────────────────────────────────────