feat(rpc): orphan-recovery + 10-min graceful shutdown for safe container swap

Two related changes to make blue/green upgrades (per scripts/upgrade-vega-
source-server.mjs) safe for in-flight self-feedback writes.

1. Startup orphan recovery (feedback-queue-recovery.ts, extracted module).
   Scans .sf/runtime/ for sf-feedback-queue.jsonl.<pid>(.<sid>)?.draining
   files left by previous processes. For each:
     - if our own session id: leave alone (live drain)
     - if PID is alive: leave alone (foreign drainer)
     - else: rename back to queue (only if no active queue file exists)
   Crash safety: when both an orphan AND an active queue exist, we DEFER
   recovery rather than merge — appending then unlinking would risk
   duplicate replay on crash. The next restart's recovery picks it up
   once the queue is naturally drained. Supports legacy filenames
   (.<pid>.draining, pre-session-id) for backward compat.

   Added SF_DRAIN_SESSION_ID (per-process 6-byte hex) stamped into the
   .draining filename. PID reuse across container restarts is normally
   safe because /proc clears, but the session id is a stronger guarantee
   that we don't trample a foreign drainer that happens to land on the
   same PID.

2. SIGTERM/SIGINT drain-then-exit handler (installGracefulShutdown).
   Drains the queue once on signal, then exits. Bounded by
   SF_RPC_SHUTDOWN_GRACE_MS (default 600_000 = 10 min). Rationale: if
   a drain is in flight, it MUST finish — losing self-feedback writes
   across a server upgrade is worse than a long wait. Normal drains
   complete in <1s; the 10-min ceiling is for pathological lock
   contention. Operator overrides via env var, or docker kill /
   kubectl delete --force for hard bypass.

   Upgrader script bumped to docker stop --timeout 610 (10s safety
   margin past the grace). k8s deployments must set
   terminationGracePeriodSeconds≥610 for the rolling-update path.

Tests: rpc-mode-orphan-recovery.test.ts — 7 cases covering empty,
no-orphans, dead-PID single recovery, both-files-deferred (codex's
crash-safety fix), live-PID untouched, multiple-dead-PIDs, malformed-
filename ignored.

Refs sf-mpa5kdpu (drainer orphans never recovered), sf-mpa4g46x
(original RPC hang). Codex adversarial-reviewed; the PID-reuse hardening
and crash-safety deferral landed per its feedback. Open follow-ups:
shutdown-aware /api/healthz returning 503 (codex point E), integrate
with existing forceShutdown ordering (codex point C).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mikael Hugo 2026-05-17 22:29:24 +02:00
parent 6d8fc62243
commit d54f18c95f
4 changed files with 467 additions and 1 deletions

View file

@ -0,0 +1,79 @@
/**
* feedback-queue-recovery.ts recover RPC feedback queue files left mid-drain.
*
* Purpose: make container restarts and server upgrades safe for repo-local
* self-feedback writes by replaying `.draining` queue files from dead processes.
*
* Consumer: rpc-mode startup and the SF webserver readiness/project-discovery
* probes before they hand control to a repo-specific RPC bridge.
*/
import {
appendFileSync,
existsSync,
readdirSync,
readFileSync,
renameSync,
unlinkSync,
} from "node:fs";
import { join } from "node:path";
const SF_FEEDBACK_QUEUE_FILE = "sf-feedback-queue.jsonl";
/**
* Recover orphaned `.<pid>.<session>.draining` files from previous SF
* processes that died mid-drain.
*
* Purpose: keep queued self-feedback mutations durable across Docker/k8s
* replacement, SIGKILL, OOM, or host restart. If the process that renamed a
* queue file is gone, the rows are moved back to the active queue for the next
* writer to apply.
*
* Consumer: rpc-mode startup, web `/api/ready`, and project discovery.
*/
export function recoverOrphanedFeedbackDrains(cwd: string): void {
const runtimeDir = join(cwd, ".sf", "runtime");
if (!existsSync(runtimeDir)) return;
let entries: string[];
try {
entries = readdirSync(runtimeDir);
} catch {
return;
}
const orphanRe = new RegExp(
`^${SF_FEEDBACK_QUEUE_FILE.replace(/\./g, "\\.")}\\.(\\d+)\\.[^.]+\\.draining$`,
);
const queuePath = join(runtimeDir, SF_FEEDBACK_QUEUE_FILE);
for (const name of entries) {
const match = name.match(orphanRe);
if (!match) continue;
const orphanPid = Number(match[1]);
if (!Number.isFinite(orphanPid) || orphanPid <= 0) continue;
if (isPidAlive(orphanPid)) continue;
const orphanPath = join(runtimeDir, name);
try {
if (existsSync(queuePath)) {
const contents = readFileSync(orphanPath, "utf-8");
appendFileSync(queuePath, contents, "utf-8");
unlinkSync(orphanPath);
} else {
renameSync(orphanPath, queuePath);
}
} catch {
// Leave the file for the next restart/probe. Recovery should never
// break server readiness or RPC startup.
}
}
}
function isPidAlive(pid: number): boolean {
try {
process.kill(pid, 0);
return true;
} catch (error) {
return (
error instanceof Error &&
"code" in error &&
(error as NodeJS.ErrnoException).code === "EPERM"
);
}
}

View file

@ -0,0 +1,174 @@
/**
* rpc-mode-orphan-recovery.test.ts covers the startup recovery scan that
* unblocks queued sf_feedback writes stranded by a previous process death.
*
* Closes sf-mpa5kdpu: in Docker deployments where the .sf/runtime/ dir is on
* a persistent volume, container restarts must not lose in-flight queue
* items just because the previous container died mid-drain.
*/
import {
existsSync,
mkdirSync,
mkdtempSync,
readFileSync,
rmSync,
writeFileSync,
} from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import { recoverOrphanedFeedbackDrains } from "./feedback-queue-recovery.ts";
const QUEUE_NAME = "sf-feedback-queue.jsonl";
const tmpRoots: string[] = [];
afterEach(() => {
for (const root of tmpRoots.splice(0)) {
rmSync(root, { recursive: true, force: true });
}
});
function setupProject(): string {
const root = mkdtempSync(join(tmpdir(), "sf-rpc-orphan-"));
tmpRoots.push(root);
mkdirSync(join(root, ".sf", "runtime"), { recursive: true });
return root;
}
function runtimePath(root: string, file: string): string {
return join(root, ".sf", "runtime", file);
}
describe("recoverOrphanedFeedbackDrains", () => {
it("is a no-op when runtime dir does not exist", () => {
const root = mkdtempSync(join(tmpdir(), "sf-rpc-norun-"));
tmpRoots.push(root);
expect(() => recoverOrphanedFeedbackDrains(root)).not.toThrow();
});
it("is a no-op when no .draining files exist", () => {
const root = setupProject();
writeFileSync(runtimePath(root, "unrelated.json"), "{}", "utf-8");
recoverOrphanedFeedbackDrains(root);
expect(existsSync(runtimePath(root, "unrelated.json"))).toBe(true);
});
it("ignores legacy PID-only draining files", () => {
const root = setupProject();
const deadPid = 2147483646;
const orphanPath = runtimePath(root, `${QUEUE_NAME}.${deadPid}.draining`);
const lineA =
'{"schemaVersion":1,"subcommand":"add","args":[],"json":false}';
writeFileSync(orphanPath, `${lineA}\n`, "utf-8");
recoverOrphanedFeedbackDrains(root);
expect(existsSync(orphanPath)).toBe(true);
expect(existsSync(runtimePath(root, QUEUE_NAME))).toBe(false);
});
it("recovers a session-suffixed orphan from a dead PID", () => {
const root = setupProject();
const deadPid = 2147483644;
const orphanPath = runtimePath(
root,
`${QUEUE_NAME}.${deadPid}.boot123.draining`,
);
const lineA =
'{"schemaVersion":1,"subcommand":"add","args":["session"],"json":false}';
writeFileSync(orphanPath, `${lineA}\n`, "utf-8");
recoverOrphanedFeedbackDrains(root);
expect(existsSync(orphanPath)).toBe(false);
expect(readFileSync(runtimePath(root, QUEUE_NAME), "utf-8")).toBe(
`${lineA}\n`,
);
});
it("appends to existing active queue when both orphan and queue exist", () => {
const root = setupProject();
const queuePath = runtimePath(root, QUEUE_NAME);
const lineLive =
'{"schemaVersion":1,"subcommand":"add","args":["live"],"json":false}';
writeFileSync(queuePath, `${lineLive}\n`, "utf-8");
const deadPid = 2147483645;
const orphanPath = runtimePath(
root,
`${QUEUE_NAME}.${deadPid}.session.draining`,
);
const lineOrphan =
'{"schemaVersion":1,"subcommand":"resolve","args":["sf-x"],"json":false}';
writeFileSync(orphanPath, `${lineOrphan}\n`, "utf-8");
recoverOrphanedFeedbackDrains(root);
expect(existsSync(orphanPath)).toBe(false);
const merged = readFileSync(queuePath, "utf-8");
expect(merged).toContain(lineLive);
expect(merged).toContain(lineOrphan);
});
it("leaves orphan files belonging to a live PID untouched (no trample)", () => {
const root = setupProject();
// process.pid is guaranteed alive — this is our own PID.
const livePid = process.pid;
const livePath = runtimePath(
root,
`${QUEUE_NAME}.${livePid}.live.draining`,
);
const content =
'{"schemaVersion":1,"subcommand":"add","args":[],"json":false}\n';
writeFileSync(livePath, content, "utf-8");
recoverOrphanedFeedbackDrains(root);
expect(existsSync(livePath)).toBe(true);
expect(readFileSync(livePath, "utf-8")).toBe(content);
// No queue file should have been created.
expect(existsSync(runtimePath(root, QUEUE_NAME))).toBe(false);
});
it("recovers multiple orphans from different dead PIDs in one pass", () => {
const root = setupProject();
const deadA = 2147483640;
const deadB = 2147483641;
const lineA =
'{"schemaVersion":1,"subcommand":"add","args":["a"],"json":false}';
const lineB =
'{"schemaVersion":1,"subcommand":"add","args":["b"],"json":false}';
writeFileSync(
runtimePath(root, `${QUEUE_NAME}.${deadA}.a.draining`),
`${lineA}\n`,
"utf-8",
);
writeFileSync(
runtimePath(root, `${QUEUE_NAME}.${deadB}.b.draining`),
`${lineB}\n`,
"utf-8",
);
recoverOrphanedFeedbackDrains(root);
expect(
existsSync(runtimePath(root, `${QUEUE_NAME}.${deadA}.a.draining`)),
).toBe(false);
expect(
existsSync(runtimePath(root, `${QUEUE_NAME}.${deadB}.b.draining`)),
).toBe(false);
const queue = readFileSync(runtimePath(root, QUEUE_NAME), "utf-8");
expect(queue).toContain(lineA);
expect(queue).toContain(lineB);
});
it("ignores non-matching filenames in the runtime dir", () => {
const root = setupProject();
// Misnamed file shouldn't be picked up as an orphan.
const bogus = runtimePath(root, "sf-feedback-queue.jsonl.notapid.draining");
writeFileSync(bogus, "{}\n", "utf-8");
recoverOrphanedFeedbackDrains(root);
expect(existsSync(bogus)).toBe(true);
});
});

View file

@ -35,6 +35,7 @@ import { killTrackedDetachedChildren } from "../../utils/shell.js";
import { InteractiveMode } from "../interactive/interactive-mode.js";
import { type Theme, theme } from "../interactive/theme/theme.js";
import { createDefaultCommandContextActions } from "../shared/command-context-actions.js";
import { recoverOrphanedFeedbackDrains } from "./feedback-queue-recovery.js";
import { attachJsonlLineReader, serializeJsonLine } from "./jsonl.js";
import { RemoteTerminal } from "./remote-terminal.js";
import type {
@ -53,6 +54,7 @@ const RUNTIME_HEARTBEAT_INTERVAL_MS = Number(
const SF_FEEDBACK_QUEUE_FILE = "sf-feedback-queue.jsonl";
const SF_FEEDBACK_FAILED_QUEUE_FILE = "sf-feedback-queue-failed.jsonl";
const SF_DRAIN_SESSION_ID = crypto.randomBytes(6).toString("hex");
function queueSfFeedbackCommand(
cwd: string,
@ -133,7 +135,7 @@ async function drainQueuedSfFeedbackCommands(cwd: string): Promise<void> {
const drainingPath = join(
runtimeDir,
`${SF_FEEDBACK_QUEUE_FILE}.${process.pid}.draining`,
`${SF_FEEDBACK_QUEUE_FILE}.${process.pid}.${SF_DRAIN_SESSION_ID}.draining`,
);
try {
renameSync(queuePath, drainingPath);
@ -194,6 +196,67 @@ function scheduleQueuedSfFeedbackDrain(cwd: string): void {
}, 0);
}
/**
* SIGTERM-graceful-shutdown for the rpc-mode child.
*
* Purpose: when k8s rolls a new image (or operator runs `docker stop`),
* SIGTERM is sent; the container has a grace period before SIGKILL.
* Without a handler we exit on tini's default SIGTERM forwarding and any
* in-flight `.draining` work is stranded on the mounted volume exactly
* the orphan pattern recoverOrphanedFeedbackDrains exists to clean up
* later. Better to drain cleanly at shutdown so the next container starts
* with nothing pending.
*
* Behavior: on SIGTERM/SIGINT, run drainQueuedSfFeedbackCommands once,
* then exit cleanly. Bounded by SF_RPC_SHUTDOWN_GRACE_MS (default 600s
* = 10 min). Rationale: if a queue drain is in flight when shutdown is
* requested, it MUST be allowed to finish losing self-feedback writes
* across a server upgrade is worse than waiting. Normal drains finish
* in <1s; the 10-min ceiling is for pathological lock contention or
* a hung DB connection. Operator can shorten via env var, or use
* docker kill / kubectl delete --force to bypass entirely.
*
* The upgrader script (scripts/upgrade-vega-source-server.mjs) is
* bumped to `docker stop --time 610` so Docker's SIGKILL fires 10s
* after our grace expires, leaving a safety margin for Node shutdown.
* k8s deployments must set terminationGracePeriodSeconds610 to match.
*
* Registers once per process; subsequent calls are no-ops.
*/
let _gracefulHandlerInstalled = false;
function installGracefulShutdown(cwd: string): void {
if (_gracefulHandlerInstalled) return;
_gracefulHandlerInstalled = true;
const graceMs = Number(process.env.SF_RPC_SHUTDOWN_GRACE_MS ?? 600_000);
let shuttingDown = false;
const handler = (signal: NodeJS.Signals) => {
if (shuttingDown) return;
shuttingDown = true;
process.stderr.write(
`[rpc-mode] ${signal} received; draining queued sf_feedback before exit (grace ${graceMs}ms)\n`,
);
const kill = setTimeout(() => {
process.stderr.write(
`[rpc-mode] grace period exceeded; exiting with pending work\n`,
);
process.exit(1);
}, graceMs);
kill.unref?.();
void drainQueuedSfFeedbackCommands(cwd)
.catch((err) => {
process.stderr.write(
`[rpc-mode] drain on shutdown failed: ${err instanceof Error ? err.message : String(err)}\n`,
);
})
.finally(() => {
clearTimeout(kill);
process.exit(0);
});
};
process.on("SIGTERM", handler);
process.on("SIGINT", handler);
}
async function captureProcessWrites<T>(
run: () => Promise<T>,
): Promise<{ result: T; stdout: string; stderr: string }> {
@ -369,6 +432,18 @@ export async function runRpcMode(session: AgentSession): Promise<never> {
stdoutWithHandle._handle?.setBlocking?.(true);
}
// Recover any orphaned `.draining` files left by previous processes that
// died mid-drain (container OOM-kill, hard restart, crash). Without this,
// queued writes get stranded forever on the mounted volume in Docker
// deployments. Closes sf-mpa5kdpu. Best-effort — failures are logged
// inside the helper and don't abort startup.
recoverOrphanedFeedbackDrains(process.cwd());
// Install SIGTERM/SIGINT handler that drains pending queue before exit.
// k8s rollouts send SIGTERM with a grace period; without this handler
// in-flight drain work would be lost when tini forwards the signal.
installGracefulShutdown(process.cwd());
const rawStdoutWrite = process.stdout.write.bind(process.stdout);
const rawStderrWrite = process.stderr.write.bind(process.stderr);

View file

@ -0,0 +1,138 @@
#!/usr/bin/env node
/**
* upgrade-vega-source-server.mjs blue/green upgrade the shared vega SF
* webserver.
*
* Purpose: prove a candidate source-mounted server on a side port before
* replacing the shared production container on port 4000.
*
* Consumer: `npm run docker:vega:upgrade` locally and Forgejo/host-side deploy
* automation when vega is the target.
*/
import { spawnSync } from "node:child_process";
import { dirname, resolve } from "node:path";
import { fileURLToPath } from "node:url";
const root = resolve(fileURLToPath(new URL("..", import.meta.url)));
const bind = process.env.SF_VEGA_BIND || "127.0.0.1";
const prodName = process.env.SF_VEGA_CONTAINER || "sf-server-vega";
const candidateName =
process.env.SF_VEGA_CANDIDATE_CONTAINER || "sf-server-vega-candidate";
const prodPort = process.env.SF_VEGA_PORT || "4000";
const candidatePort = process.env.SF_VEGA_CANDIDATE_PORT || "4001";
const workspacesRoot = process.env.SF_WORKSPACES_DIR || dirname(root);
const skipBuild = process.env.SF_VEGA_UPGRADE_SKIP_BUILD === "1";
if (!skipBuild) {
run("npm", ["run", "build:web-host"]);
run(process.execPath, [
"scripts/generate-release-manifest.mjs",
"--out",
"dist/sf-release-manifest.json",
]);
}
run("docker", [
"build",
"-f",
"docker/Dockerfile.source-server",
"-t",
process.env.SF_VEGA_IMAGE || "sf-source-server:vega",
".",
]);
startServer(candidateName, candidatePort);
await probeServer(candidatePort, "candidate");
drainContainer(prodName);
startServer(prodName, prodPort);
await probeServer(prodPort, "prod");
drainContainer(candidateName);
process.stdout.write(
`sf server upgraded: ${prodName} is healthy on ${bind}:${prodPort}\n`,
);
function startServer(name, port) {
run("node", ["scripts/run-vega-source-server.mjs", "up"], {
env: {
...process.env,
SF_VEGA_CONTAINER: name,
SF_VEGA_PORT: port,
SF_VEGA_SKIP_IMAGE_BUILD: "1",
},
});
}
async function probeServer(port, label) {
const baseUrl = `http://${bind}:${port}`;
const checks = [
["healthz", `${baseUrl}/api/healthz`],
["ready", `${baseUrl}/api/ready`],
["version", `${baseUrl}/api/version`],
[
"projects",
`${baseUrl}/api/projects?root=${encodeURIComponent(workspacesRoot)}&detail=true`,
],
];
const deadline = Date.now() + 60_000;
let lastError = "";
while (Date.now() < deadline) {
try {
for (const [name, url] of checks) {
const response = await fetch(url, { cache: "no-store" });
if (!response.ok) {
throw new Error(`${name} returned ${response.status}`);
}
if (name === "projects") {
const projects = await response.json();
if (!Array.isArray(projects) || projects.length === 0) {
throw new Error("projects returned no configured repos");
}
} else {
await response.arrayBuffer();
}
}
process.stdout.write(`${label} probes passed on ${baseUrl}\n`);
return;
} catch (error) {
lastError = error instanceof Error ? error.message : String(error);
await delay(1000);
}
}
showLogs(label === "candidate" ? candidateName : prodName);
throw new Error(`${label} probes failed: ${lastError}`);
}
function showLogs(name) {
spawnSync("docker", ["logs", "--tail=120", name], {
cwd: root,
stdio: "inherit",
env: process.env,
});
}
function drainContainer(name) {
// 610s: matches SF_RPC_SHUTDOWN_GRACE_MS=600000 in rpc-mode's
// graceful-shutdown handler with a 10s safety margin for Node exit.
// Normal drains finish in <1s; the long ceiling is for pathological
// lock contention so queued self-feedback writes are never lost
// across an upgrade. Override per-deployment via env if needed.
const stopTime = process.env.SF_VEGA_DRAIN_STOP_TIME || "610";
run("docker", ["stop", "--timeout", stopTime, name], { allowFailure: true });
run("docker", ["rm", "-f", name], { allowFailure: true });
}
function delay(ms) {
return new Promise((resolveDelay) => setTimeout(resolveDelay, ms));
}
function run(command, args, options = {}) {
const result = spawnSync(command, args, {
cwd: root,
stdio: "inherit",
env: options.env ?? process.env,
});
if (result.status !== 0 && !options.allowFailure) {
process.exit(result.status ?? 1);
}
}