fix: reap orphan-prone child processes across session churn (#920)
* fix: reap orphan-prone child processes across session churn * test: make bg-shell cleanup test shell-safe
This commit is contained in:
parent
3167e9fbf4
commit
7733e12413
9 changed files with 250 additions and 6 deletions
|
|
@ -52,6 +52,7 @@ import {
|
|||
getGroupStatus,
|
||||
pruneDeadProcesses,
|
||||
cleanupAll,
|
||||
cleanupSessionProcesses,
|
||||
persistManifest,
|
||||
loadManifest,
|
||||
pushAlert,
|
||||
|
|
@ -71,7 +72,7 @@ import { toPosixPath } from "../shared/path-display.js";
|
|||
// ── Re-exports for consumers ───────────────────────────────────────────────
|
||||
|
||||
export type { ProcessStatus, ProcessType, BgProcess, BgProcessInfo, OutputDigest, OutputLine, ProcessEvent } from "./types.js";
|
||||
export { processes, startProcess, killProcess, restartProcess, cleanupAll } from "./process-manager.js";
|
||||
export { processes, startProcess, killProcess, restartProcess, cleanupAll, cleanupSessionProcesses } from "./process-manager.js";
|
||||
export { generateDigest, getHighlights, getOutput, formatDigestText } from "./output-formatter.js";
|
||||
export { waitForReady, probePort } from "./readiness-detector.js";
|
||||
export { sendAndWait, runOnSession, queryShellEnv } from "./interaction.js";
|
||||
|
|
@ -136,7 +137,13 @@ export default function (pi: ExtensionAPI) {
|
|||
});
|
||||
|
||||
// Session switch resets the agent's context.
|
||||
pi.on("session_switch", async () => {
|
||||
pi.on("session_switch", async (event, ctx) => {
|
||||
latestCtx = ctx;
|
||||
if (event.reason === "new" && event.previousSessionFile) {
|
||||
await cleanupSessionProcesses(event.previousSessionFile);
|
||||
syncLatestCtxCwd();
|
||||
if (latestCtx) persistManifest(latestCtx.cwd);
|
||||
}
|
||||
buildProcessStateAlert("Session was switched.");
|
||||
});
|
||||
|
||||
|
|
@ -232,6 +239,7 @@ export default function (pi: ExtensionAPI) {
|
|||
"Use 'run' to execute a command on a persistent shell session and block until it completes — returns structured output + exit code. Shell state (env vars, cwd, virtualenvs) persists across runs.",
|
||||
"Use 'send_and_wait' for interactive CLIs: send input and wait for expected output pattern.",
|
||||
"Use 'env' to check the current working directory and active environment variables of a shell session — useful after cd, source, or export commands.",
|
||||
"Background processes are session-scoped by default: a new session reaps them unless you set persist_across_sessions:true.",
|
||||
"Use 'restart' to kill and relaunch with the same config — preserves restart count.",
|
||||
"Background processes are auto-classified (server/build/test/watcher) based on the command.",
|
||||
"Process crashes and errors are automatically surfaced as alerts at the start of your next turn — you don't need to poll.",
|
||||
|
|
@ -300,6 +308,12 @@ export default function (pi: ExtensionAPI) {
|
|||
group: Type.Optional(
|
||||
Type.String({ description: "Group name for related processes (for start, group_status)" }),
|
||||
),
|
||||
persist_across_sessions: Type.Optional(
|
||||
Type.Boolean({
|
||||
description: "Keep this process running after a new session starts. Default: false.",
|
||||
default: false,
|
||||
}),
|
||||
),
|
||||
}),
|
||||
|
||||
async execute(_toolCallId, params, signal, _onUpdate, ctx) {
|
||||
|
|
@ -318,6 +332,8 @@ export default function (pi: ExtensionAPI) {
|
|||
const bg = startProcess({
|
||||
command: params.command,
|
||||
cwd: ctx.cwd,
|
||||
ownerSessionFile: ctx.sessionManager.getSessionFile() ?? null,
|
||||
persistAcrossSessions: params.persist_across_sessions ?? false,
|
||||
label: params.label,
|
||||
type: params.type as ProcessType | undefined,
|
||||
readyPattern: params.ready_pattern,
|
||||
|
|
@ -341,6 +357,7 @@ export default function (pi: ExtensionAPI) {
|
|||
text += ` cwd: ${toPosixPath(bg.cwd)}`;
|
||||
|
||||
if (bg.group) text += `\n group: ${bg.group}`;
|
||||
if (bg.persistAcrossSessions) text += `\n persist_across_sessions: true`;
|
||||
if (bg.readyPort) text += `\n ready_port: ${bg.readyPort}`;
|
||||
if (bg.readyPattern) text += `\n ready_pattern: ${bg.readyPattern}`;
|
||||
if (bg.ports.length > 0) text += `\n detected ports: ${bg.ports.join(", ")}`;
|
||||
|
|
|
|||
|
|
@ -67,6 +67,8 @@ export function getInfo(p: BgProcess): BgProcessInfo {
|
|||
label: p.label,
|
||||
command: p.command,
|
||||
cwd: p.cwd,
|
||||
ownerSessionFile: p.ownerSessionFile,
|
||||
persistAcrossSessions: p.persistAcrossSessions,
|
||||
startedAt: p.startedAt,
|
||||
alive: p.alive,
|
||||
exitCode: p.exitCode,
|
||||
|
|
@ -138,6 +140,8 @@ export function startProcess(opts: StartOptions): BgProcess {
|
|||
label: opts.label || command.slice(0, 60),
|
||||
command,
|
||||
cwd: opts.cwd,
|
||||
ownerSessionFile: opts.ownerSessionFile ?? null,
|
||||
persistAcrossSessions: opts.persistAcrossSessions ?? false,
|
||||
startedAt: Date.now(),
|
||||
proc,
|
||||
output: [],
|
||||
|
|
@ -170,6 +174,8 @@ export function startProcess(opts: StartOptions): BgProcess {
|
|||
cwd: opts.cwd,
|
||||
label: opts.label || command.slice(0, 60),
|
||||
processType,
|
||||
ownerSessionFile: opts.ownerSessionFile ?? null,
|
||||
persistAcrossSessions: opts.persistAcrossSessions ?? false,
|
||||
readyPattern: opts.readyPattern || null,
|
||||
readyPort: opts.readyPort || null,
|
||||
group: opts.group || null,
|
||||
|
|
@ -312,6 +318,8 @@ export async function restartProcess(id: string): Promise<BgProcess | null> {
|
|||
cwd: config.cwd,
|
||||
label: config.label,
|
||||
type: config.processType,
|
||||
ownerSessionFile: config.ownerSessionFile,
|
||||
persistAcrossSessions: config.persistAcrossSessions,
|
||||
readyPattern: config.readyPattern || undefined,
|
||||
readyPort: config.readyPort || undefined,
|
||||
group: config.group || undefined,
|
||||
|
|
@ -367,6 +375,41 @@ export function cleanupAll(): void {
|
|||
processes.clear();
|
||||
}
|
||||
|
||||
async function waitForProcessExit(bg: BgProcess, timeoutMs: number): Promise<boolean> {
|
||||
if (!bg.alive) return true;
|
||||
await new Promise<void>((resolve) => {
|
||||
const done = () => resolve();
|
||||
const timer = setTimeout(done, timeoutMs);
|
||||
bg.proc.once("exit", () => {
|
||||
clearTimeout(timer);
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
return !bg.alive;
|
||||
}
|
||||
|
||||
export async function cleanupSessionProcesses(
|
||||
sessionFile: string,
|
||||
options?: { graceMs?: number },
|
||||
): Promise<string[]> {
|
||||
const graceMs = Math.max(0, options?.graceMs ?? 300);
|
||||
const matches = Array.from(processes.values()).filter(
|
||||
(bg) => bg.alive && !bg.persistAcrossSessions && bg.ownerSessionFile === sessionFile,
|
||||
);
|
||||
if (matches.length === 0) return [];
|
||||
|
||||
for (const bg of matches) {
|
||||
killProcess(bg.id, "SIGTERM");
|
||||
}
|
||||
if (graceMs > 0) {
|
||||
await Promise.all(matches.map((bg) => waitForProcessExit(bg, graceMs)));
|
||||
}
|
||||
for (const bg of matches) {
|
||||
if (bg.alive) killProcess(bg.id, "SIGKILL");
|
||||
}
|
||||
return matches.map((bg) => bg.id);
|
||||
}
|
||||
|
||||
// ── Persistence ────────────────────────────────────────────────────────────
|
||||
|
||||
export function getManifestPath(cwd: string): string {
|
||||
|
|
@ -384,6 +427,8 @@ export function persistManifest(cwd: string): void {
|
|||
label: p.label,
|
||||
command: p.command,
|
||||
cwd: p.cwd,
|
||||
ownerSessionFile: p.ownerSessionFile,
|
||||
persistAcrossSessions: p.persistAcrossSessions,
|
||||
startedAt: p.startedAt,
|
||||
processType: p.processType,
|
||||
group: p.group,
|
||||
|
|
|
|||
|
|
@ -53,6 +53,10 @@ export interface BgProcess {
|
|||
label: string;
|
||||
command: string;
|
||||
cwd: string;
|
||||
/** Session file that created this process (used for per-session cleanup) */
|
||||
ownerSessionFile: string | null;
|
||||
/** Whether this process should survive a new-session boundary */
|
||||
persistAcrossSessions: boolean;
|
||||
startedAt: number;
|
||||
proc: import("node:child_process").ChildProcess;
|
||||
/** Unified chronologically-interleaved output buffer */
|
||||
|
|
@ -103,7 +107,17 @@ export interface BgProcess {
|
|||
/** Restart count */
|
||||
restartCount: number;
|
||||
/** Original start config for restart */
|
||||
startConfig: { command: string; cwd: string; label: string; processType: ProcessType; readyPattern: string | null; readyPort: number | null; group: string | null };
|
||||
startConfig: {
|
||||
command: string;
|
||||
cwd: string;
|
||||
label: string;
|
||||
processType: ProcessType;
|
||||
ownerSessionFile: string | null;
|
||||
persistAcrossSessions: boolean;
|
||||
readyPattern: string | null;
|
||||
readyPort: number | null;
|
||||
group: string | null;
|
||||
};
|
||||
}
|
||||
|
||||
export interface BgProcessInfo {
|
||||
|
|
@ -111,6 +125,8 @@ export interface BgProcessInfo {
|
|||
label: string;
|
||||
command: string;
|
||||
cwd: string;
|
||||
ownerSessionFile: string | null;
|
||||
persistAcrossSessions: boolean;
|
||||
startedAt: number;
|
||||
alive: boolean;
|
||||
exitCode: number | null;
|
||||
|
|
@ -133,6 +149,8 @@ export interface BgProcessInfo {
|
|||
export interface StartOptions {
|
||||
command: string;
|
||||
cwd: string;
|
||||
ownerSessionFile?: string | null;
|
||||
persistAcrossSessions?: boolean;
|
||||
label?: string;
|
||||
type?: ProcessType;
|
||||
readyPattern?: string;
|
||||
|
|
@ -154,6 +172,8 @@ export interface ProcessManifest {
|
|||
label: string;
|
||||
command: string;
|
||||
cwd: string;
|
||||
ownerSessionFile: string | null;
|
||||
persistAcrossSessions: boolean;
|
||||
startedAt: number;
|
||||
processType: ProcessType;
|
||||
group: string | null;
|
||||
|
|
|
|||
|
|
@ -60,6 +60,7 @@ import { shortcutDesc } from "../shared/terminal.js";
|
|||
import { Text } from "@gsd/pi-tui";
|
||||
import { pauseAutoForProviderError } from "./provider-error-pause.js";
|
||||
import { toPosixPath } from "../shared/path-display.js";
|
||||
import { isParallelActive, shutdownParallel } from "./parallel-orchestrator.js";
|
||||
|
||||
// ── Agent Instructions ────────────────────────────────────────────────────
|
||||
// Lightweight "always follow" files injected into every GSD agent session.
|
||||
|
|
@ -856,6 +857,12 @@ export default function (pi: ExtensionAPI) {
|
|||
|
||||
// ── session_shutdown: save activity log on Ctrl+C / SIGTERM ─────────────
|
||||
pi.on("session_shutdown", async (_event, ctx: ExtensionContext) => {
|
||||
if (isParallelActive()) {
|
||||
try {
|
||||
await shutdownParallel(process.cwd());
|
||||
} catch { /* best-effort */ }
|
||||
}
|
||||
|
||||
if (!isAutoActive() && !isAutoPaused()) return;
|
||||
|
||||
// Save the current session — the lock file stays on disk
|
||||
|
|
|
|||
|
|
@ -139,6 +139,7 @@ function removeStateFile(basePath: string): void {
|
|||
}
|
||||
|
||||
function isPidAlive(pid: number): boolean {
|
||||
if (!Number.isInteger(pid) || pid <= 0) return false;
|
||||
try {
|
||||
process.kill(pid, 0);
|
||||
return true;
|
||||
|
|
@ -179,6 +180,27 @@ export function restoreState(basePath: string): PersistedState | null {
|
|||
}
|
||||
}
|
||||
|
||||
async function waitForWorkerExit(worker: WorkerInfo, timeoutMs: number): Promise<boolean> {
|
||||
if (worker.process) {
|
||||
await new Promise<void>((resolve) => {
|
||||
const done = () => resolve();
|
||||
const timer = setTimeout(done, timeoutMs);
|
||||
worker.process!.once("exit", () => {
|
||||
clearTimeout(timer);
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
return worker.process === null || !isPidAlive(worker.pid);
|
||||
}
|
||||
|
||||
const startedAt = Date.now();
|
||||
while (Date.now() - startedAt < timeoutMs) {
|
||||
if (!isPidAlive(worker.pid)) return true;
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
}
|
||||
return !isPidAlive(worker.pid);
|
||||
}
|
||||
|
||||
// ─── Accessors ─────────────────────────────────────────────────────────────
|
||||
|
||||
/** Returns true if the orchestrator is active and has been initialized. */
|
||||
|
|
@ -653,12 +675,24 @@ export async function stopParallel(
|
|||
try {
|
||||
if (worker.process) {
|
||||
worker.process.kill("SIGTERM");
|
||||
} else {
|
||||
} else if (worker.pid !== process.pid) {
|
||||
process.kill(worker.pid, "SIGTERM");
|
||||
}
|
||||
} catch { /* process may already be dead */ }
|
||||
}
|
||||
|
||||
const exitedAfterTerm = await waitForWorkerExit(worker, 750);
|
||||
if (!exitedAfterTerm && worker.pid > 0) {
|
||||
try {
|
||||
if (worker.process) {
|
||||
worker.process.kill("SIGKILL");
|
||||
} else if (worker.pid !== process.pid) {
|
||||
process.kill(worker.pid, "SIGKILL");
|
||||
}
|
||||
} catch { /* process may already be dead */ }
|
||||
await waitForWorkerExit(worker, 250);
|
||||
}
|
||||
|
||||
// Update in-memory state
|
||||
worker.state = "stopped";
|
||||
worker.process = null;
|
||||
|
|
@ -676,6 +710,12 @@ export async function stopParallel(
|
|||
removeStateFile(basePath);
|
||||
}
|
||||
|
||||
export async function shutdownParallel(basePath: string): Promise<void> {
|
||||
if (!state) return;
|
||||
await stopParallel(basePath);
|
||||
resetOrchestrator();
|
||||
}
|
||||
|
||||
// ─── Pause / Resume ────────────────────────────────────────────────────────
|
||||
|
||||
/** Pause a specific worker or all workers. */
|
||||
|
|
|
|||
|
|
@ -154,7 +154,7 @@ Templates showing the expected format for each artifact type are in:
|
|||
|
||||
**External facts:** Use `search-the-web` + `fetch_page`, or `search_and_read` for one-call extraction. Use `freshness` for recency. Never state current facts from training data without verification.
|
||||
|
||||
**Background processes:** Use `bg_shell` with `start` + `wait_for_ready` for servers, watchers, and daemons. Never use `bash` with `&` or `nohup` to background a process — the `bash` tool waits for stdout to close, so backgrounded children that inherit the file descriptors cause it to hang indefinitely. Never poll with `sleep`/retry loops — `wait_for_ready` exists for this. For status checks, use `digest` (~30 tokens), not `output` (~2000 tokens). Use `highlights` (~100 tokens) when you need significant lines only. Use `output` only when actively debugging.
|
||||
**Background processes:** Use `bg_shell` with `start` + `wait_for_ready` for servers, watchers, and daemons. Never use `bash` with `&` or `nohup` to background a process — the `bash` tool waits for stdout to close, so backgrounded children that inherit the file descriptors cause it to hang indefinitely. Never poll with `sleep`/retry loops — `wait_for_ready` exists for this. For status checks, use `digest` (~30 tokens), not `output` (~2000 tokens). Use `highlights` (~100 tokens) when you need significant lines only. Use `output` only when actively debugging. Background processes are session-scoped by default; set `persist_across_sessions:true` only when you intentionally need them to survive a fresh session.
|
||||
|
||||
**One-shot commands:** Use `async_bash` for builds, tests, and installs. The result is pushed to you when the command exits — no polling needed. Use `await_job` to block on a specific job.
|
||||
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ import {
|
|||
getWorkerStatuses,
|
||||
startParallel,
|
||||
stopParallel,
|
||||
shutdownParallel,
|
||||
pauseWorker,
|
||||
resumeWorker,
|
||||
getAggregateCost,
|
||||
|
|
@ -338,6 +339,14 @@ describe("parallel-orchestrator: lifecycle", () => {
|
|||
assert.ok(signal);
|
||||
assert.equal(signal.signal, "pause");
|
||||
});
|
||||
|
||||
it("shutdownParallel deactivates the orchestrator state", async () => {
|
||||
await startParallel(base, ["M001"], undefined);
|
||||
assert.equal(isParallelActive(), true);
|
||||
await shutdownParallel(base);
|
||||
assert.equal(isParallelActive(), false);
|
||||
assert.equal(getOrchestratorState(), null);
|
||||
});
|
||||
});
|
||||
|
||||
describe("parallel-orchestrator: budget", () => {
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@
|
|||
* Uses JSON mode to capture structured output from subagents.
|
||||
*/
|
||||
|
||||
import { spawn } from "node:child_process";
|
||||
import { spawn, type ChildProcess } from "node:child_process";
|
||||
import * as crypto from "node:crypto";
|
||||
import * as fs from "node:fs";
|
||||
import * as os from "node:os";
|
||||
|
|
@ -38,6 +38,44 @@ import { registerWorker, updateWorker } from "./worker-registry.js";
|
|||
const MAX_PARALLEL_TASKS = 8;
|
||||
const MAX_CONCURRENCY = 4;
|
||||
const COLLAPSED_ITEM_COUNT = 10;
|
||||
const liveSubagentProcesses = new Set<ChildProcess>();
|
||||
|
||||
async function stopLiveSubagents(): Promise<void> {
|
||||
const active = Array.from(liveSubagentProcesses);
|
||||
if (active.length === 0) return;
|
||||
|
||||
for (const proc of active) {
|
||||
try {
|
||||
proc.kill("SIGTERM");
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
|
||||
await Promise.all(
|
||||
active.map(
|
||||
(proc) =>
|
||||
new Promise<void>((resolve) => {
|
||||
const done = () => resolve();
|
||||
const timer = setTimeout(done, 500);
|
||||
proc.once("exit", () => {
|
||||
clearTimeout(timer);
|
||||
resolve();
|
||||
});
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
for (const proc of active) {
|
||||
if (proc.exitCode === null) {
|
||||
try {
|
||||
proc.kill("SIGKILL");
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function formatTokens(count: number): string {
|
||||
if (count < 1000) return count.toString();
|
||||
|
|
@ -302,6 +340,7 @@ async function runSingleAgent(
|
|||
[process.env.GSD_BIN_PATH!, ...extensionArgs, ...args],
|
||||
{ cwd: cwd ?? defaultCwd, shell: false, stdio: ["ignore", "pipe", "pipe"] },
|
||||
);
|
||||
liveSubagentProcesses.add(proc);
|
||||
let buffer = "";
|
||||
|
||||
const processLine = (line: string) => {
|
||||
|
|
@ -353,11 +392,13 @@ async function runSingleAgent(
|
|||
});
|
||||
|
||||
proc.on("close", (code) => {
|
||||
liveSubagentProcesses.delete(proc);
|
||||
if (buffer.trim()) processLine(buffer);
|
||||
resolve(code ?? 0);
|
||||
});
|
||||
|
||||
proc.on("error", () => {
|
||||
liveSubagentProcesses.delete(proc);
|
||||
resolve(1);
|
||||
});
|
||||
|
||||
|
|
@ -432,6 +473,10 @@ const SubagentParams = Type.Object({
|
|||
});
|
||||
|
||||
export default function (pi: ExtensionAPI) {
|
||||
pi.on("session_shutdown", async () => {
|
||||
await stopLiveSubagents();
|
||||
});
|
||||
|
||||
// /subagent command - list available agents
|
||||
pi.registerCommand("subagent", {
|
||||
description: "List available subagents",
|
||||
|
|
|
|||
61
src/tests/bg-shell-session-cleanup.test.ts
Normal file
61
src/tests/bg-shell-session-cleanup.test.ts
Normal file
|
|
@ -0,0 +1,61 @@
|
|||
import test from "node:test";
|
||||
import assert from "node:assert/strict";
|
||||
|
||||
import {
|
||||
startProcess,
|
||||
cleanupAll,
|
||||
cleanupSessionProcesses,
|
||||
processes,
|
||||
} from "../resources/extensions/bg-shell/process-manager.ts";
|
||||
|
||||
function isPidAlive(pid: number | undefined): boolean {
|
||||
if (!pid || pid <= 0) return false;
|
||||
try {
|
||||
process.kill(pid, 0);
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Use a shell-native sleeper so the test exercises bg_shell's real spawn path
|
||||
// without relying on platform-specific quoting for `node -e "..."`
|
||||
const sleeperCommand = "sleep 30";
|
||||
|
||||
test("cleanupSessionProcesses reaps only session-scoped processes from the previous session", async () => {
|
||||
const owned = startProcess({
|
||||
command: sleeperCommand,
|
||||
cwd: process.cwd(),
|
||||
ownerSessionFile: "session-a",
|
||||
});
|
||||
const persistent = startProcess({
|
||||
command: sleeperCommand,
|
||||
cwd: process.cwd(),
|
||||
ownerSessionFile: "session-a",
|
||||
persistAcrossSessions: true,
|
||||
});
|
||||
const foreign = startProcess({
|
||||
command: sleeperCommand,
|
||||
cwd: process.cwd(),
|
||||
ownerSessionFile: "session-b",
|
||||
});
|
||||
|
||||
try {
|
||||
await new Promise((resolve) => setTimeout(resolve, 150));
|
||||
assert.equal(isPidAlive(owned.proc.pid), true, "owned process should be alive before cleanup");
|
||||
assert.equal(isPidAlive(persistent.proc.pid), true, "persistent process should be alive before cleanup");
|
||||
assert.equal(isPidAlive(foreign.proc.pid), true, "foreign process should be alive before cleanup");
|
||||
|
||||
const removed = await cleanupSessionProcesses("session-a", { graceMs: 200 });
|
||||
assert.deepEqual(removed.sort(), [owned.id], "only the session-scoped process should be reaped");
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 150));
|
||||
assert.equal(isPidAlive(owned.proc.pid), false, "owned process should be terminated");
|
||||
assert.equal(isPidAlive(persistent.proc.pid), true, "persistent process should survive cleanup");
|
||||
assert.equal(isPidAlive(foreign.proc.pid), true, "foreign process should survive cleanup");
|
||||
assert.equal(processes.get(owned.id)?.persistAcrossSessions, false);
|
||||
assert.equal(processes.get(persistent.id)?.persistAcrossSessions, true);
|
||||
} finally {
|
||||
cleanupAll();
|
||||
}
|
||||
});
|
||||
Loading…
Add table
Reference in a new issue