fix(rpc, web): integrate drain into forceShutdown + healthz-503 on shutdown

Three fixes addressing codex's adversarial review of the earlier orphan-
recovery / graceful-shutdown landing:

(1) Codex point B — single shutdown path. Removed the parallel
    installGracefulShutdown() handler in rpc-mode.ts that was adding
    a second SIGTERM listener and racing forceShutdown()'s teardown.
    The drain is now the FIRST step inside forceShutdown() (before
    killTrackedDetachedChildren / extension session_shutdown / etc.)
    so DB writes complete cleanly while child processes are still
    alive to flush. Race-free against the existing shutdown ordering.

(2) Codex point D — recovery-before-each-drain. Cloud-volume mtime
    visibility lags between containers can mean an orphan `.draining`
    file from a previous container isn't visible during the startup
    scan but appears moments later. drainQueuedSfFeedbackCommands()
    now runs recoverOrphanedFeedbackDrains() as its first step, so
    each dispatch's drain sees the latest filesystem state.

(3) Codex point E — healthz returns 503 during shutdown. New module
    src/web/shutdown-state.ts holds a per-process flag, auto-registers
    SIGTERM/SIGINT/SIGHUP handlers on first read, and exposes a
    snapshot (signal, startedAt, elapsedMs) for diagnostics. The
    healthz route imports isShuttingDown() and returns 503 when set,
    so k8s readinessProbe / Forgejo blue-green probes drain traffic
    BEFORE we actually stop responding.

Tests:
  - rpc-mode-orphan-recovery.test.ts: 8/8 still green
  - web-shutdown-state.test.ts: 5/5 new — default false, mark sets
    flag, idempotent, signal exposed via snapshot, null signal for
    manual mark

Deferred to a follow-up commit (codex didn't flag, but noted for
completeness): a SIGTERM-drain child-process integration test that
spawns rpc-mode + sends a real signal. The 5 unit tests cover the
flag logic; the integration test would cover the full process tree
and is bulkier than the current commit warrants.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mikael Hugo 2026-05-17 22:35:50 +02:00
parent 68178a9260
commit f8e53840da
4 changed files with 209 additions and 65 deletions

View file

@ -130,6 +130,12 @@ function parseQueuedSfFeedbackLine(
*/
async function drainQueuedSfFeedbackCommands(cwd: string): Promise<void> {
const runtimeDir = join(cwd, ".sf", "runtime");
// Pull any orphan `.draining` files back into the active queue before we
// claim it ourselves. Closes the cloud-volume-mtime-lag window where a
// previous container's rename only becomes visible after our startup
// recovery scan ran. Cheap (single-pass readdir + a few sync renames)
// and idempotent — if there are no orphans this is a no-op.
recoverOrphanedFeedbackDrains(cwd);
const queuePath = join(runtimeDir, SF_FEEDBACK_QUEUE_FILE);
if (!existsSync(queuePath)) return;
@ -196,67 +202,6 @@ function scheduleQueuedSfFeedbackDrain(cwd: string): void {
}, 0);
}
/**
* SIGTERM-graceful-shutdown for the rpc-mode child.
*
* Purpose: when k8s rolls a new image (or operator runs `docker stop`),
* SIGTERM is sent; the container has a grace period before SIGKILL.
* Without a handler we exit on tini's default SIGTERM forwarding and any
* in-flight `.draining` work is stranded on the mounted volume exactly
* the orphan pattern recoverOrphanedFeedbackDrains exists to clean up
* later. Better to drain cleanly at shutdown so the next container starts
* with nothing pending.
*
* Behavior: on SIGTERM/SIGINT, run drainQueuedSfFeedbackCommands once,
* then exit cleanly. Bounded by SF_RPC_SHUTDOWN_GRACE_MS (default 600s
* = 10 min). Rationale: if a queue drain is in flight when shutdown is
* requested, it MUST be allowed to finish losing self-feedback writes
* across a server upgrade is worse than waiting. Normal drains finish
* in <1s; the 10-min ceiling is for pathological lock contention or
* a hung DB connection. Operator can shorten via env var, or use
* docker kill / kubectl delete --force to bypass entirely.
*
* The upgrader script (scripts/upgrade-vega-source-server.mjs) is
* bumped to `docker stop --time 610` so Docker's SIGKILL fires 10s
* after our grace expires, leaving a safety margin for Node shutdown.
* k8s deployments must set terminationGracePeriodSeconds610 to match.
*
* Registers once per process; subsequent calls are no-ops.
*/
let _gracefulHandlerInstalled = false;
function installGracefulShutdown(cwd: string): void {
if (_gracefulHandlerInstalled) return;
_gracefulHandlerInstalled = true;
const graceMs = Number(process.env.SF_RPC_SHUTDOWN_GRACE_MS ?? 600_000);
let shuttingDown = false;
const handler = (signal: NodeJS.Signals) => {
if (shuttingDown) return;
shuttingDown = true;
process.stderr.write(
`[rpc-mode] ${signal} received; draining queued sf_feedback before exit (grace ${graceMs}ms)\n`,
);
const kill = setTimeout(() => {
process.stderr.write(
`[rpc-mode] grace period exceeded; exiting with pending work\n`,
);
process.exit(1);
}, graceMs);
kill.unref?.();
void drainQueuedSfFeedbackCommands(cwd)
.catch((err) => {
process.stderr.write(
`[rpc-mode] drain on shutdown failed: ${err instanceof Error ? err.message : String(err)}\n`,
);
})
.finally(() => {
clearTimeout(kill);
process.exit(0);
});
};
process.on("SIGTERM", handler);
process.on("SIGINT", handler);
}
async function captureProcessWrites<T>(
run: () => Promise<T>,
): Promise<{ result: T; stdout: string; stderr: string }> {
@ -439,10 +384,9 @@ export async function runRpcMode(session: AgentSession): Promise<never> {
// inside the helper and don't abort startup.
recoverOrphanedFeedbackDrains(process.cwd());
// Install SIGTERM/SIGINT handler that drains pending queue before exit.
// k8s rollouts send SIGTERM with a grace period; without this handler
// in-flight drain work would be lost when tini forwards the signal.
installGracefulShutdown(process.cwd());
// SIGTERM/SIGINT drain is now integrated into the existing forceShutdown()
// path below (search "drain any queued sf_feedback"). A parallel handler
// would race forceShutdown's teardown and add a second listener.
const rawStdoutWrite = process.stdout.write.bind(process.stdout);
const rawStderrWrite = process.stderr.write.bind(process.stderr);
@ -1467,6 +1411,31 @@ export async function runRpcMode(session: AgentSession): Promise<never> {
async function forceShutdown(exitCode = 0): Promise<never> {
if (shuttingDown) process.exit(exitCode);
shuttingDown = true;
// Drain any queued sf_feedback writes BEFORE killing children so DB
// writes complete cleanly. Bounded by SF_RPC_SHUTDOWN_GRACE_MS
// (default 600s = 10 min). Operator can shorten via env or hard-bypass
// via SIGKILL / kubectl delete --force. Drain MUST finish — losing
// self-feedback across a server upgrade is worse than waiting. Codex
// point B: this replaces a parallel SIGTERM handler that was adding
// a second listener and racing the rest of forceShutdown's teardown.
const graceMs = Number(process.env.SF_RPC_SHUTDOWN_GRACE_MS ?? 600_000);
try {
await Promise.race([
drainQueuedSfFeedbackCommands(process.cwd()),
new Promise<void>((resolveTimeout) =>
setTimeout(() => {
process.stderr.write(
`[rpc-mode] shutdown drain exceeded ${graceMs}ms; exiting with .draining still on disk (will be recovered next start)\n`,
);
resolveTimeout();
}, graceMs),
),
]);
} catch (err) {
process.stderr.write(
`[rpc-mode] shutdown drain failed: ${err instanceof Error ? err.message : String(err)}\n`,
);
}
killTrackedDetachedChildren();
for (const cleanup of signalCleanupHandlers) cleanup();
const currentRunner = session.extensionRunner;

View file

@ -0,0 +1,56 @@
/**
* web-shutdown-state.test.ts covers the per-process shutdown flag the
* healthz route consumes to return 503 during graceful drain.
*
* Closes codex review point E: load balancers must see a shutdown signal
* the moment the orchestrator sends SIGTERM, BEFORE we stop responding,
* so traffic is drained to other replicas first.
*/
import { describe, expect, it, beforeEach } from "vitest";
import {
_resetShutdownStateForTests,
isShuttingDown,
markShuttingDown,
shutdownStateSnapshot,
} from "../web/shutdown-state.ts";
beforeEach(() => {
_resetShutdownStateForTests();
});
describe("shutdown-state", () => {
it("is not shutting down by default", () => {
expect(isShuttingDown()).toBe(false);
});
it("markShuttingDown sets the flag", () => {
markShuttingDown();
expect(isShuttingDown()).toBe(true);
});
it("markShuttingDown is idempotent — does not bump start time on repeat call", () => {
markShuttingDown();
const first = shutdownStateSnapshot();
const t0 = first.elapsedMs;
// Synchronous re-call shouldn't reset the start timestamp.
markShuttingDown();
const second = shutdownStateSnapshot();
expect(second.startedAt).toBe(first.startedAt);
expect(second.elapsedMs).toBeGreaterThanOrEqual(t0 ?? 0);
});
it("snapshot exposes signal field when triggered by signal-named reason", () => {
markShuttingDown("SIGTERM");
const snap = shutdownStateSnapshot();
expect(snap.shuttingDown).toBe(true);
expect(snap.signal).toBe("SIGTERM");
expect(snap.startedAt).toMatch(/^\d{4}-\d{2}-\d{2}T/);
});
it("snapshot signal is null for manual mark", () => {
markShuttingDown("manual");
const snap = shutdownStateSnapshot();
expect(snap.signal).toBeNull();
});
});

95
src/web/shutdown-state.ts Normal file
View file

@ -0,0 +1,95 @@
/**
* shutdown-state.ts process-local shutdown flag for the web server.
*
* Purpose: signal to readiness/liveness probes that this container is
* about to go away, so the load balancer / orchestrator routes new
* traffic to a different replica BEFORE we actually stop responding.
* Without this, /api/healthz keeps returning 200 during shutdown and
* k8s/Forgejo blue-green will keep sending traffic to a draining
* container observed in the wedged-server cycle today (sf-mpa6wuhm).
*
* Consumer: web/app/api/healthz/route.ts (returns 503 when set) and any
* future readiness gate that wants to react to "we're going down."
*
* Per-process state the web (next-server) process has its own flag;
* the spawned rpc-mode child manages its own teardown via forceShutdown
* (drainQueuedSfFeedbackCommands then exit). Cross-process coordination
* isn't needed: when tini delivers SIGTERM, both processes get it (the
* web server via process group; the child via signal forwarding) and
* each updates its own visibility.
*/
let _shuttingDown = false;
let _registered = false;
let _signalSeen: NodeJS.Signals | null = null;
let _startedAt: number | null = null;
function register(): void {
if (_registered) return;
_registered = true;
const handler = (signal: NodeJS.Signals): void => {
if (_shuttingDown) return;
_shuttingDown = true;
_signalSeen = signal;
_startedAt = Date.now();
// Intentionally not calling process.exit here. Next.js (and the
// rpc-mode child below it, via its own SIGTERM hook in
// forceShutdown) handle clean teardown on their own. This flag's
// only job is to make /api/healthz return 503 immediately so
// the load balancer stops sending us traffic.
};
process.on("SIGTERM", handler);
process.on("SIGINT", handler);
process.on("SIGHUP", handler);
}
/**
* True if the process has received SIGTERM/SIGINT/SIGHUP and is in
* graceful-shutdown territory. Auto-registers signal listeners on first
* call so the flag is meaningful from the moment a route imports this
* module.
*/
export function isShuttingDown(): boolean {
register();
return _shuttingDown;
}
/**
* Explicitly mark the process as shutting down (without waiting for a
* signal). Useful for tests and for code paths that decide to drain
* proactively before the orchestrator sends SIGTERM.
*/
export function markShuttingDown(reason: NodeJS.Signals | "manual" = "manual"): void {
register();
if (_shuttingDown) return;
_shuttingDown = true;
_signalSeen = reason === "manual" ? null : reason;
_startedAt = Date.now();
}
/**
* Compact snapshot for diagnostics. Exposes the signal name (if any)
* and how long the process has been in shutdown so probes/operator
* tools can correlate.
*/
export function shutdownStateSnapshot(): {
shuttingDown: boolean;
signal: string | null;
startedAt: string | null;
elapsedMs: number | null;
} {
register();
return {
shuttingDown: _shuttingDown,
signal: _signalSeen,
startedAt: _startedAt == null ? null : new Date(_startedAt).toISOString(),
elapsedMs: _startedAt == null ? null : Date.now() - _startedAt,
};
}
/** Test-only — reset the in-process flag. Does not unregister signal handlers. */
export function _resetShutdownStateForTests(): void {
_shuttingDown = false;
_signalSeen = null;
_startedAt = null;
}

View file

@ -1,9 +1,33 @@
import { getReleaseInfo } from "../../../../src/web/release-info.ts";
import {
isShuttingDown,
shutdownStateSnapshot,
} from "../../../../src/web/shutdown-state.ts";
export const runtime = "nodejs";
export const dynamic = "force-dynamic";
export async function GET(): Promise<Response> {
// Return 503 the moment we receive SIGTERM/SIGINT/SIGHUP so load
// balancers (k8s readinessProbe, Forgejo blue-green probes) drain
// new traffic away from this replica before we actually stop
// responding. Without this signal the orchestrator keeps sending
// requests during shutdown and the upgrade flow sees spurious
// failures. Codex review point E.
if (isShuttingDown()) {
return Response.json(
{
ok: false,
shuttingDown: true,
...shutdownStateSnapshot(),
...getReleaseInfo(),
},
{
status: 503,
headers: { "Cache-Control": "no-store" },
},
);
}
return Response.json(getReleaseInfo(), {
headers: { "Cache-Control": "no-store" },
});