singularity-forge/scripts/upgrade-vega-source-server.mjs
Mikael Hugo d54f18c95f feat(rpc): orphan-recovery + 10-min graceful shutdown for safe container swap
Two related changes to make blue/green upgrades (per scripts/upgrade-vega-
source-server.mjs) safe for in-flight self-feedback writes.

1. Startup orphan recovery (feedback-queue-recovery.ts, extracted module).
   Scans .sf/runtime/ for sf-feedback-queue.jsonl.<pid>(.<sid>)?.draining
   files left by previous processes. For each:
     - if our own session id: leave alone (live drain)
     - if PID is alive: leave alone (foreign drainer)
     - else: rename back to queue (only if no active queue file exists)
   Crash safety: when both an orphan AND an active queue exist, we DEFER
   recovery rather than merge — appending then unlinking would risk
   duplicate replay on crash. The next restart's recovery picks it up
   once the queue is naturally drained. Supports legacy filenames
   (.<pid>.draining, pre-session-id) for backward compat.

   Added SF_DRAIN_SESSION_ID (per-process 6-byte hex) stamped into the
   .draining filename. PID reuse across container restarts is normally
   safe because /proc clears, but the session id is a stronger guarantee
   that we don't trample a foreign drainer that happens to land on the
   same PID.

2. SIGTERM/SIGINT drain-then-exit handler (installGracefulShutdown).
   Drains the queue once on signal, then exits. Bounded by
   SF_RPC_SHUTDOWN_GRACE_MS (default 600_000 = 10 min). Rationale: if
   a drain is in flight, it MUST finish — losing self-feedback writes
   across a server upgrade is worse than a long wait. Normal drains
   complete in <1s; the 10-min ceiling is for pathological lock
   contention. Operator overrides via env var, or docker kill /
   kubectl delete --force for hard bypass.

   Upgrader script bumped to docker stop --timeout 610 (10s safety
   margin past the grace). k8s deployments must set
   terminationGracePeriodSeconds≥610 for the rolling-update path.

Tests: rpc-mode-orphan-recovery.test.ts — 7 cases covering empty,
no-orphans, dead-PID single recovery, both-files-deferred (codex's
crash-safety fix), live-PID untouched, multiple-dead-PIDs, malformed-
filename ignored.

Refs sf-mpa5kdpu (drainer orphans never recovered), sf-mpa4g46x
(original RPC hang). Codex adversarial-reviewed; the PID-reuse hardening
and crash-safety deferral landed per its feedback. Open follow-ups:
shutdown-aware /api/healthz returning 503 (codex point E), integrate
with existing forceShutdown ordering (codex point C).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 22:29:24 +02:00

138 lines
4.1 KiB
JavaScript

#!/usr/bin/env node
/**
* upgrade-vega-source-server.mjs — blue/green upgrade the shared vega SF
* webserver.
*
* Purpose: prove a candidate source-mounted server on a side port before
* replacing the shared production container on port 4000.
*
* Consumer: `npm run docker:vega:upgrade` locally and Forgejo/host-side deploy
* automation when vega is the target.
*/
import { spawnSync } from "node:child_process";
import { dirname, resolve } from "node:path";
import { fileURLToPath } from "node:url";
const root = resolve(fileURLToPath(new URL("..", import.meta.url)));
const bind = process.env.SF_VEGA_BIND || "127.0.0.1";
const prodName = process.env.SF_VEGA_CONTAINER || "sf-server-vega";
const candidateName =
process.env.SF_VEGA_CANDIDATE_CONTAINER || "sf-server-vega-candidate";
const prodPort = process.env.SF_VEGA_PORT || "4000";
const candidatePort = process.env.SF_VEGA_CANDIDATE_PORT || "4001";
const workspacesRoot = process.env.SF_WORKSPACES_DIR || dirname(root);
const skipBuild = process.env.SF_VEGA_UPGRADE_SKIP_BUILD === "1";
if (!skipBuild) {
run("npm", ["run", "build:web-host"]);
run(process.execPath, [
"scripts/generate-release-manifest.mjs",
"--out",
"dist/sf-release-manifest.json",
]);
}
run("docker", [
"build",
"-f",
"docker/Dockerfile.source-server",
"-t",
process.env.SF_VEGA_IMAGE || "sf-source-server:vega",
".",
]);
startServer(candidateName, candidatePort);
await probeServer(candidatePort, "candidate");
drainContainer(prodName);
startServer(prodName, prodPort);
await probeServer(prodPort, "prod");
drainContainer(candidateName);
process.stdout.write(
`sf server upgraded: ${prodName} is healthy on ${bind}:${prodPort}\n`,
);
function startServer(name, port) {
run("node", ["scripts/run-vega-source-server.mjs", "up"], {
env: {
...process.env,
SF_VEGA_CONTAINER: name,
SF_VEGA_PORT: port,
SF_VEGA_SKIP_IMAGE_BUILD: "1",
},
});
}
async function probeServer(port, label) {
const baseUrl = `http://${bind}:${port}`;
const checks = [
["healthz", `${baseUrl}/api/healthz`],
["ready", `${baseUrl}/api/ready`],
["version", `${baseUrl}/api/version`],
[
"projects",
`${baseUrl}/api/projects?root=${encodeURIComponent(workspacesRoot)}&detail=true`,
],
];
const deadline = Date.now() + 60_000;
let lastError = "";
while (Date.now() < deadline) {
try {
for (const [name, url] of checks) {
const response = await fetch(url, { cache: "no-store" });
if (!response.ok) {
throw new Error(`${name} returned ${response.status}`);
}
if (name === "projects") {
const projects = await response.json();
if (!Array.isArray(projects) || projects.length === 0) {
throw new Error("projects returned no configured repos");
}
} else {
await response.arrayBuffer();
}
}
process.stdout.write(`${label} probes passed on ${baseUrl}\n`);
return;
} catch (error) {
lastError = error instanceof Error ? error.message : String(error);
await delay(1000);
}
}
showLogs(label === "candidate" ? candidateName : prodName);
throw new Error(`${label} probes failed: ${lastError}`);
}
function showLogs(name) {
spawnSync("docker", ["logs", "--tail=120", name], {
cwd: root,
stdio: "inherit",
env: process.env,
});
}
function drainContainer(name) {
// 610s: matches SF_RPC_SHUTDOWN_GRACE_MS=600000 in rpc-mode's
// graceful-shutdown handler with a 10s safety margin for Node exit.
// Normal drains finish in <1s; the long ceiling is for pathological
// lock contention so queued self-feedback writes are never lost
// across an upgrade. Override per-deployment via env if needed.
const stopTime = process.env.SF_VEGA_DRAIN_STOP_TIME || "610";
run("docker", ["stop", "--timeout", stopTime, name], { allowFailure: true });
run("docker", ["rm", "-f", name], { allowFailure: true });
}
function delay(ms) {
return new Promise((resolveDelay) => setTimeout(resolveDelay, ms));
}
function run(command, args, options = {}) {
const result = spawnSync(command, args, {
cwd: root,
stdio: "inherit",
env: options.env ?? process.env,
});
if (result.status !== 0 && !options.allowFailure) {
process.exit(result.status ?? 1);
}
}