#!/usr/bin/env node /** * SF Parallel Worker Monitor * * Real-time TUI dashboard for monitoring parallel SF auto-mode workers. * Zero external dependencies — uses raw ANSI escape codes, Node.js builtins, * and the shared SF monitor projection store. * * Usage: * node scripts/parallel-monitor.mjs # live dashboard, 5s refresh * node scripts/parallel-monitor.mjs --interval 3 # faster refresh * node scripts/parallel-monitor.mjs --once # single snapshot, then exit * node scripts/parallel-monitor.mjs --heal # auto-respawn dead workers * node scripts/parallel-monitor.mjs --heal --heal-retries 5 --heal-cooldown 60 * * Options: * --interval Refresh interval in seconds (default: 5) * --once Render once and exit (useful for scripting/piping) * --heal Auto-respawn dead workers (opt-in, off by default) * --heal-retries Max respawn attempts per worker (default: 3) * --heal-cooldown Seconds between respawn attempts (default: 30) * --dir Status file directory (default: .sf/parallel) * --root Project root (default: cwd) * * Data sources: * .sf/parallel/M0xx.status.json — heartbeat, cost, state (written by orchestrator) * .sf/worktrees/M0xx/.sf/auto.lock — current unit type + ID (written by worker) * .sf/worktrees/M0xx/.sf/sf.db — task/slice completion (read-only node:sqlite query) * .sf/parallel/M0xx.stdout.log — NDJSON events (cost extraction, notify messages) * .sf/parallel/M0xx.stderr.log — error surfacing * * Health indicators: * ● green — PID alive, fresh heartbeat (<30s) * ● green — PID alive, heartbeat stale (respawned worker, file mtime used as proxy) * ○ red — PID dead * * Self-healing (--heal): * When a dead worker is detected, the monitor writes a temp shell script and launches * a new headless auto-mode process in the worker's worktree with the correct env vars. * Cooldown prevents rapid respawn loops. Gives up after --heal-retries consecutive * failures. Resets retry count when a worker comes back alive. */ import { execSync, spawn, spawnSync } from "node:child_process"; import fs from "node:fs"; import path from "node:path"; import { queryParallelRecentCompletionRows, queryParallelSliceProgress, } from "../src/resources/extensions/sf/parallel-monitor-store.js"; // ─── Configuration ─────────────────────────────────────────────────────────── const args = process.argv.slice(2); const INTERVAL_SEC = parseInt(getArg("--interval", "5"), 10); const PARALLEL_DIR = getArg("--dir", ".sf/parallel"); const PROJECT_ROOT = getArg("--root", process.cwd()); const ONE_SHOT = args.includes("--once"); const HEAL_MODE = args.includes("--heal"); const HEAL_MAX_RETRIES = parseInt(getArg("--heal-retries", "3"), 10); const HEAL_COOLDOWN_SEC = parseInt(getArg("--heal-cooldown", "30"), 10); // Per-worker heal state: { lastAttempt: number, retries: number } const healState = {}; function getArg(flag, defaultVal) { const idx = args.indexOf(flag); return idx !== -1 && args[idx + 1] ? args[idx + 1] : defaultVal; } // ─── ANSI Helpers ──────────────────────────────────────────────────────────── const ESC = "\x1b["; const RESET = `${ESC}0m`; const BOLD = `${ESC}1m`; const DIM = `${ESC}2m`; const _ITALIC = `${ESC}3m`; const FG = { black: `${ESC}30m`, red: `${ESC}31m`, green: `${ESC}32m`, yellow: `${ESC}33m`, blue: `${ESC}34m`, magenta: `${ESC}35m`, cyan: `${ESC}36m`, white: `${ESC}37m`, gray: `${ESC}90m`, }; const BG = { black: `${ESC}40m`, red: `${ESC}41m`, green: `${ESC}42m`, yellow: `${ESC}43m`, blue: `${ESC}44m`, white: `${ESC}47m`, }; // Screen control const CLEAR_SCREEN = `${ESC}2J${ESC}H`; const HIDE_CURSOR = `${ESC}?25l`; const SHOW_CURSOR = `${ESC}?25h`; const _SAVE_POS = `${ESC}s`; const _RESTORE_POS = `${ESC}u`; function _moveTo(row, col) { return `${ESC}${row};${col}H`; } // ─── Data Reading ──────────────────────────────────────────────────────────── function readJsonSafe(filePath) { try { return JSON.parse(fs.readFileSync(filePath, "utf-8")); } catch { return null; } } function isPidAlive(pid) { try { process.kill(pid, 0); return true; } catch { return false; } } function discoverWorkers() { const dir = path.resolve(PROJECT_ROOT, PARALLEL_DIR); const worktreeDir = path.resolve(PROJECT_ROOT, ".sf/worktrees"); const mids = new Set(); // From status files if (fs.existsSync(dir)) { for (const f of fs.readdirSync(dir)) { if (f.endsWith(".status.json")) mids.add(f.replace(".status.json", "")); } } // From stderr/stdout logs (manually respawned workers may lack status.json) if (fs.existsSync(dir)) { for (const f of fs.readdirSync(dir)) { const m = f.match(/^(M\d+)\.(stderr|stdout)\.log$/); if (m) mids.add(m[1]); } } // From worktree directories that have auto.lock (actively running) if (fs.existsSync(worktreeDir)) { for (const d of fs.readdirSync(worktreeDir)) { if ( d.startsWith("M") && fs.existsSync(path.join(worktreeDir, d, ".sf", "auto.lock")) ) { mids.add(d); } } } return [...mids].sort(); } function readWorkerStatus(mid) { const statusPath = path.resolve( PROJECT_ROOT, PARALLEL_DIR, `${mid}.status.json`, ); return readJsonSafe(statusPath); } function readAutoLock(mid) { const lockPath = path.resolve( PROJECT_ROOT, `.sf/worktrees/${mid}/.sf/auto.lock`, ); return readJsonSafe(lockPath); } function readRecentEvents(mid, maxLines = 5) { const stdoutPath = path.resolve( PROJECT_ROOT, PARALLEL_DIR, `${mid}.stdout.log`, ); const notifications = []; const errors = []; // Parse NDJSON notify events from stdout log if (fs.existsSync(stdoutPath)) { try { const stat = fs.statSync(stdoutPath); const readSize = Math.min(stat.size, 32768); const fd = fs.openSync(stdoutPath, "r"); const buf = Buffer.alloc(readSize); fs.readSync(fd, buf, 0, readSize, Math.max(0, stat.size - readSize)); fs.closeSync(fd); const content = buf.toString("utf-8"); const lines = content.trim().split("\n").slice(-100); for (const line of lines) { try { const obj = JSON.parse(line); if (obj.method === "notify" && obj.message) { notifications.push({ ts: Date.now(), msg: obj.message, mid }); } } catch { /* skip */ } } } catch { /* skip */ } } // Parse errors from stderr log — only new bytes since monitor started const stderrPath = path.resolve( PROJECT_ROOT, PARALLEL_DIR, `${mid}.stderr.log`, ); if (fs.existsSync(stderrPath)) { try { const stat = fs.statSync(stderrPath); // Record baseline on first read — skip pre-existing errors if (!(mid in stderrBaselines)) { stderrBaselines[mid] = stat.size; } const baseline = stderrBaselines[mid]; const newBytes = stat.size - baseline; if (newBytes > 0) { const readSize = Math.min(newBytes, 4096); const fd = fs.openSync(stderrPath, "r"); const buf = Buffer.alloc(readSize); fs.readSync( fd, buf, 0, readSize, Math.max(baseline, stat.size - readSize), ); fs.closeSync(fd); const content = buf.toString("utf-8"); const lines = content.trim().split("\n").slice(-10); for (const line of lines) { if ( line.includes("error") || line.includes("Error") || line.includes("WARN") || line.includes("exited") ) { errors.push({ ts: Date.now(), msg: line.trim(), mid, isError: true, }); } } } } catch { /* skip */ } } return { notifications: notifications.slice(-maxLines), errors: errors.slice(-3), }; } /** * Extract accumulated cost from NDJSON stdout log (fallback when status.json is missing). * Sums `message.usage.cost.total` from all `message_end` events. */ function extractCostFromNdjson(mid) { const stdoutPath = path.resolve( PROJECT_ROOT, PARALLEL_DIR, `${mid}.stdout.log`, ); if (!fs.existsSync(stdoutPath)) return 0; try { const content = fs.readFileSync(stdoutPath, "utf-8"); let total = 0; for (const line of content.split("\n")) { if (!line.includes("message_end")) continue; try { const obj = JSON.parse(line); if (obj.type === "message_end") { const cost = obj.message?.usage?.cost?.total; if (typeof cost === "number") total += cost; } } catch { /* skip */ } } return total; } catch { return 0; } } // ─── Self-Healing ──────────────────────────────────────────────────────────── // Auto-detect the SF loader path — works across npm global, homebrew, and local installs function findSfLoader() { // 1. Check if we're running from inside the singularity-forge repo itself const repoLoader = path.resolve( import.meta.dirname, "..", "dist", "loader.js", ); if (fs.existsSync(repoLoader)) return repoLoader; // 2. Check common global install locations try { const globalRoot = execSync("npm root -g", { encoding: "utf-8", timeout: 3000, }).trim(); const candidates = [ path.join(globalRoot, "singularity-forge", "dist", "loader.js"), path.join(globalRoot, "@sf", "pi", "dist", "loader.js"), ]; for (const c of candidates) { if (fs.existsSync(c)) return c; } } catch { /* skip */ } // 3. Try `which sf` and resolve symlink try { const pathLookup = process.platform === "win32" ? "where.exe" : "which"; const lookupArgs = ["sf"]; const result = spawnSync(pathLookup, lookupArgs, { encoding: "utf-8", timeout: 3000, }); const bin = result.status === 0 ? result.stdout.trim().split(/\r?\n/)[0]?.trim() : ""; if (bin) { const realBin = fs.realpathSync(bin); const loader = path.resolve( path.dirname(realBin), "..", "dist", "loader.js", ); if (fs.existsSync(loader)) return loader; } } catch { /* skip */ } return null; } const SF_LOADER = findSfLoader(); /** * Respawn a dead worker. Returns the new PID or null on failure. * Uses a detached Node child with log file descriptors so the child is fully detached. */ function respawnWorker(mid) { const worktreeDir = path.resolve(PROJECT_ROOT, `.sf/worktrees/${mid}`); if (!fs.existsSync(worktreeDir)) return null; if (!fs.existsSync(SF_LOADER)) return null; const stdoutLog = path.resolve( PROJECT_ROOT, PARALLEL_DIR, `${mid}.stdout.log`, ); const stderrLog = path.resolve( PROJECT_ROOT, PARALLEL_DIR, `${mid}.stderr.log`, ); let stdoutFd; let stderrFd; try { fs.mkdirSync(path.dirname(stdoutLog), { recursive: true }); stdoutFd = fs.openSync(stdoutLog, "a"); stderrFd = fs.openSync(stderrLog, "a"); const child = spawn( process.execPath, [SF_LOADER, "headless", "--json", "auto"], { cwd: worktreeDir, detached: true, env: { ...process.env, SF_MILESTONE_LOCK: mid, SF_PROJECT_ROOT: PROJECT_ROOT, SF_PARALLEL_WORKER: "1", }, stdio: ["ignore", stdoutFd, stderrFd], windowsHide: true, }, ); child.unref(); return child.pid ?? null; } catch (_err) { return null; } finally { if (stdoutFd !== undefined) { try { fs.closeSync(stdoutFd); } catch {} } if (stderrFd !== undefined) { try { fs.closeSync(stderrFd); } catch {} } } } /** * Check all workers and respawn dead ones if --heal is active. * Returns an array of heal events for the event feed. */ function healWorkers(workers) { if (!HEAL_MODE) return []; const events = []; const now = Date.now(); for (const wk of workers) { if (wk.alive) { // Worker is alive — reset its heal state on success if (healState[wk.mid]) { healState[wk.mid].retries = 0; } continue; } // Worker is dead — check if we should attempt a respawn if (!healState[wk.mid]) { healState[wk.mid] = { lastAttempt: 0, retries: 0 }; } const hs = healState[wk.mid]; // Give up after max retries if (hs.retries >= HEAL_MAX_RETRIES) { if (hs.retries === HEAL_MAX_RETRIES) { events.push({ ts: now, mid: wk.mid, msg: `⛔ ${wk.mid}: gave up after ${HEAL_MAX_RETRIES} respawn attempts`, }); hs.retries++; // Increment past max so this message only shows once } continue; } // Cooldown — don't respawn too quickly const elapsed = now - hs.lastAttempt; if (elapsed < HEAL_COOLDOWN_SEC * 1000) { const _remaining = Math.ceil((HEAL_COOLDOWN_SEC * 1000 - elapsed) / 1000); // Don't spam the feed — only note on first cooldown tick continue; } // Check the milestone isn't already complete const allSlicesDone = wk.slices.length > 0 && wk.slices.every((s) => s.status === "complete"); if (allSlicesDone) { events.push({ ts: now, mid: wk.mid, msg: `✅ ${wk.mid}: all slices complete, no respawn needed`, }); hs.retries = HEAL_MAX_RETRIES + 1; // Don't try again continue; } // Attempt respawn hs.lastAttempt = now; hs.retries++; events.push({ ts: now, mid: wk.mid, msg: `🔄 ${wk.mid}: respawning (attempt ${hs.retries}/${HEAL_MAX_RETRIES})...`, }); const newPid = respawnWorker(wk.mid); if (newPid) { events.push({ ts: now, mid: wk.mid, msg: `🟢 ${wk.mid}: respawned as PID ${newPid}`, }); // Reset stderr baseline so we don't show old errors delete stderrBaselines[wk.mid]; } else { events.push({ ts: now, mid: wk.mid, isError: true, msg: `❌ ${wk.mid}: respawn failed`, }); } } return events; } // ─── Formatting Helpers ────────────────────────────────────────────────────── function formatDuration(ms) { if (!ms || ms < 0) return "--:--"; const totalSec = Math.floor(ms / 1000); const h = Math.floor(totalSec / 3600); const m = Math.floor((totalSec % 3600) / 60); const s = totalSec % 60; if (h > 0) return `${h}h${String(m).padStart(2, "0")}m`; return `${String(m).padStart(2, "0")}m${String(s).padStart(2, "0")}s`; } function formatCost(cost) { if (cost == null) return "$-.--"; return `$${cost.toFixed(2)}`; } function healthColor(heartbeatAge, alive) { if (!alive) return "red"; // PID alive is the strongest signal — worker is running if (heartbeatAge < 30000) return "green"; // Alive but stale heartbeat — either respawned (no orchestrator writing status.json) // or potentially stuck. Show green since headless idle timeout (120s) kills stuck workers. if (alive) return "green"; return "red"; } function healthIcon(color) { switch (color) { case "green": return "●"; case "yellow": return "◐"; case "red": return "○"; default: return "?"; } } function unitTypeLabel(unitType) { const labels = { "execute-task": "EXEC", "research-slice": "RSRCH", "plan-slice": "PLAN", "complete-slice": "DONE", "complete-task": "DONE", reassess: "ASSESS", validate: "VALID", }; return labels[unitType] || (unitType || "---").toUpperCase().slice(0, 5); } function progressBar(done, total, width = 20) { if (total === 0) return `${"░".repeat(width)}`; const filled = Math.round((done / total) * width); const empty = width - filled; return `${"█".repeat(filled)}${"░".repeat(empty)}`; } function _pad(str, width) { const s = String(str); return s.length >= width ? s.slice(0, width) : s + " ".repeat(width - s.length); } function _rpad(str, width) { const s = String(str); return s.length >= width ? s.slice(0, width) : " ".repeat(width - s.length) + s; } function truncate(str, maxLen) { if (str.length <= maxLen) return str; return str.slice(0, maxLen - 1) + "…"; } /** * Get recently completed tasks/slices from the worktree DB for the event feed. */ function queryRecentCompletions(mid) { return queryParallelRecentCompletionRows(PROJECT_ROOT, mid).map((row) => ({ ts: row.completedAt ? new Date(row.completedAt).getTime() : Date.now(), msg: `✓ ${mid}/${row.sliceId}/${row.taskId}${row.oneLiner ? ": " + row.oneLiner : ""}`, mid, })); } // ─── Rendering ─────────────────────────────────────────────────────────────── const COLS = Math.max(process.stdout.columns || 100, 80); const _ROWS = Math.max(process.stdout.rows || 40, 20); let lastEventFeed = []; // Persisted across renders const stderrBaselines = {}; // mid → file size at monitor startup (skip pre-existing errors) function collectWorkerData() { const mids = discoverWorkers(); const workers = []; for (const mid of mids) { const status = readWorkerStatus(mid); const lock = readAutoLock(mid); const slices = queryParallelSliceProgress(PROJECT_ROOT, mid); const { notifications, errors } = readRecentEvents(mid, 3); // Prefer auto.lock PID (written by the running worker) over status.json PID // (written by the orchestrator, stale after respawn) const pid = lock?.pid || status?.pid; const alive = pid ? isPidAlive(pid) : false; // Heartbeat: prefer status.json if its PID matches (orchestrator-managed), // otherwise fall back to stdout.log mtime (respawned workers write NDJSON continuously) let heartbeatAge = Infinity; const statusPidMatches = status?.pid && status.pid === pid; if (status?.lastHeartbeat && statusPidMatches) { heartbeatAge = Date.now() - status.lastHeartbeat; } else { // Check stdout/stderr log mtime as proxy heartbeat const stdoutLog = path.resolve( PROJECT_ROOT, PARALLEL_DIR, `${mid}.stdout.log`, ); const stderrLog = path.resolve( PROJECT_ROOT, PARALLEL_DIR, `${mid}.stderr.log`, ); try { const mtimes = []; if (fs.existsSync(stdoutLog)) mtimes.push(fs.statSync(stdoutLog).mtimeMs); if (fs.existsSync(stderrLog)) mtimes.push(fs.statSync(stderrLog).mtimeMs); if (lock?.unitStartedAt) mtimes.push(new Date(lock.unitStartedAt).getTime()); if (mtimes.length > 0) heartbeatAge = Date.now() - Math.max(...mtimes); } catch { /* skip */ } } // Cost: prefer status.json, fall back to NDJSON log parsing let cost = status?.cost || 0; if (cost === 0) { cost = extractCostFromNdjson(mid); } const totalTasks = slices.reduce((sum, s) => sum + s.total, 0); const doneTasks = slices.reduce((sum, s) => sum + s.done, 0); const doneSlices = slices.filter((s) => s.status === "complete").length; const totalSlices = slices.length; // Current unit from auto.lock (more accurate than status.json currentUnit) const currentUnit = lock?.unitId || status?.currentUnit || null; const unitType = lock?.unitType || null; const unitStarted = lock?.unitStartedAt ? new Date(lock.unitStartedAt).getTime() : null; // If no lock and worker is dead, show nothing (not a misleading "START" label) const _showUnit = currentUnit || (alive ? null : null); const elapsed = status?.startedAt ? Date.now() - status.startedAt : lock?.startedAt ? Date.now() - new Date(lock.startedAt).getTime() : 0; workers.push({ mid, pid, alive, state: alive ? "running" : status?.state || "dead", cost, heartbeatAge, health: healthColor(heartbeatAge, alive), currentUnit, unitType, unitElapsed: unitStarted ? Date.now() - unitStarted : 0, elapsed, totalTasks, doneTasks, totalSlices, doneSlices, slices, notifications, errors, }); } return workers; } function render(workers) { const buf = []; const w = COLS; // ── Header ── buf.push(""); const title = " SF Parallel Monitor "; const titlePad = Math.max(0, Math.floor((w - title.length) / 2)); buf.push( `${" ".repeat(titlePad)}${BOLD}${BG.blue}${FG.white}${title}${RESET}`, ); const now = new Date().toLocaleTimeString(); const totalCost = workers.reduce((s, w) => s + w.cost, 0); const aliveCount = workers.filter((w) => w.alive).length; const healTag = HEAL_MODE ? ` │ ${FG.green}⚕ heal${RESET}${DIM}` : ""; buf.push( `${DIM} ${now} │ ${aliveCount}/${workers.length} alive │ Total: ${RESET}${BOLD}${formatCost(totalCost)}${RESET}${DIM} │ Refresh: ${INTERVAL_SEC}s${healTag}${RESET}`, ); buf.push(`${DIM}${"─".repeat(w)}${RESET}`); // ── Worker Panels ── if (workers.length === 0) { buf.push(""); buf.push(` ${FG.yellow}No workers found in ${PARALLEL_DIR}/${RESET}`); buf.push(` ${DIM}Waiting for .sf/parallel/*.status.json files...${RESET}`); } else { for (const wk of workers) { buf.push(""); // Worker header: milestone ID + health + state const icon = healthIcon(wk.health); const hc = FG[wk.health]; const stateLabel = wk.alive ? wk.state === "running" ? `${FG.green}RUNNING${RESET}` : `${FG.yellow}${wk.state.toUpperCase()}${RESET}` : `${FG.red}${BOLD}DEAD${RESET}`; const heartbeatText = wk.heartbeatAge === Infinity ? "never" : formatDuration(wk.heartbeatAge) + " ago"; buf.push( ` ${hc}${icon}${RESET} ${BOLD}${wk.mid}${RESET} ${stateLabel} ${DIM}PID ${wk.pid || "?"}${RESET} ${DIM}│${RESET} ${DIM}elapsed${RESET} ${formatDuration(wk.elapsed)} ${DIM}│${RESET} ${DIM}cost${RESET} ${BOLD}${formatCost(wk.cost)}${RESET} ${DIM}│${RESET} ${DIM}heartbeat${RESET} ${hc}${heartbeatText}${RESET}`, ); // Current unit if (wk.currentUnit) { const phaseColor = wk.unitType === "execute-task" ? FG.cyan : wk.unitType === "research-slice" ? FG.magenta : wk.unitType === "plan-slice" ? FG.blue : wk.unitType?.includes("complete") ? FG.green : FG.white; buf.push( ` ${DIM}▸${RESET} ${phaseColor}${unitTypeLabel(wk.unitType)}${RESET} ${wk.currentUnit} ${DIM}(${formatDuration(wk.unitElapsed)})${RESET}`, ); } else if (!wk.alive) { buf.push(` ${DIM}▸ ${FG.red}stopped${RESET}`); } else { buf.push(` ${DIM}▸ idle / between units${RESET}`); } // Slice progress grid if (wk.slices.length > 0) { const sliceChips = wk.slices.map((s) => { const pct = s.total > 0 ? s.done / s.total : 0; let color; if (s.status === "complete") color = FG.green; else if (pct > 0) color = FG.yellow; else color = FG.gray; const label = `${s.id}:${s.done}/${s.total}`; return `${color}${label}${RESET}`; }); buf.push(` ${DIM}slices${RESET} ${sliceChips.join(" ")}`); // Overall progress bar const bar = progressBar(wk.doneTasks, wk.totalTasks, 30); const pctStr = wk.totalTasks > 0 ? `${Math.round((wk.doneTasks / wk.totalTasks) * 100)}%` : "0%"; buf.push( ` ${DIM}tasks${RESET} ${FG.green}${bar}${RESET} ${wk.doneTasks}/${wk.totalTasks} ${DIM}(${pctStr})${RESET} ${DIM}│${RESET} ${DIM}slices done${RESET} ${wk.doneSlices}/${wk.totalSlices}`, ); } // Recent errors from this worker if (wk.errors.length > 0) { for (const err of wk.errors.slice(-2)) { buf.push(` ${FG.red}⚠ ${truncate(err.msg, w - 10)}${RESET}`); } } } } // ── Separator ── buf.push(""); buf.push(`${DIM}${"─".repeat(w)}${RESET}`); // ── Event Feed ── buf.push(` ${BOLD}Recent Events${RESET}`); // Collect new notification events from all workers for (const wk of workers) { for (const evt of wk.notifications) { if (!lastEventFeed.some((e) => e.msg === evt.msg && e.mid === evt.mid)) { lastEventFeed.push(evt); } } } // Also add recent task completions from the DB for (const wk of workers) { const completions = queryRecentCompletions(wk.mid); for (const evt of completions) { if (!lastEventFeed.some((e) => e.msg === evt.msg)) { lastEventFeed.push(evt); } } } // Sort by timestamp and keep last 10 lastEventFeed.sort((a, b) => a.ts - b.ts); lastEventFeed = lastEventFeed.slice(-10); if (lastEventFeed.length === 0) { buf.push(` ${DIM}No events yet...${RESET}`); } else { for (const evt of lastEventFeed.slice(-6)) { const midTag = `${FG.cyan}${evt.mid}${RESET}`; buf.push(` ${DIM}│${RESET} ${midTag} ${truncate(evt.msg, w - 12)}`); } } // ── Completion Check ── const allDone = workers.length > 0 && workers.every((w) => !w.alive); if (allDone) { buf.push(""); buf.push(`${DIM}${"─".repeat(w)}${RESET}`); buf.push(""); const doneMsg = " ALL WORKERS COMPLETE "; const donePad = Math.max(0, Math.floor((w - doneMsg.length) / 2)); buf.push( `${" ".repeat(donePad)}${BOLD}${BG.green}${FG.black}${doneMsg}${RESET}`, ); buf.push(""); for (const wk of workers) { buf.push( ` ${wk.mid} ${formatCost(wk.cost)} ${DIM}│${RESET} ${wk.doneSlices}/${wk.totalSlices} slices ${wk.doneTasks}/${wk.totalTasks} tasks ${DIM}│${RESET} ${formatDuration(wk.elapsed)}`, ); } const totalCostFinal = workers.reduce((s, w) => s + w.cost, 0); buf.push(` ${BOLD}Total: ${formatCost(totalCostFinal)}${RESET}`); } // ── Footer ── buf.push(""); const healInfo = HEAL_MODE ? ` │ heal: ${HEAL_COOLDOWN_SEC}s cooldown, ${HEAL_MAX_RETRIES} max retries` : ""; buf.push( ` ${DIM}Ctrl+C to exit${allDone ? " (monitoring stopped)" : ""}${healInfo}${RESET}`, ); // Write to screen process.stdout.write(CLEAR_SCREEN); process.stdout.write(buf.join("\n") + "\n"); return allDone; } // ─── Main Loop ─────────────────────────────────────────────────────────────── function main() { process.stdout.write(HIDE_CURSOR); // Handle resize process.stdout.on("resize", () => { // COLS/ROWS are recalculated on next render }); // Graceful exit const cleanup = () => { process.stdout.write(SHOW_CURSOR); process.stdout.write(CLEAR_SCREEN); console.log("Monitor stopped."); process.exit(0); }; process.on("SIGINT", cleanup); process.on("SIGTERM", cleanup); // Initial render const workers = collectWorkerData(); const healEvents = healWorkers(workers); for (const evt of healEvents) lastEventFeed.push(evt); let done = render(workers); if (done || ONE_SHOT) { process.stdout.write(SHOW_CURSOR); return; } // Refresh loop const timer = setInterval(() => { try { const workers = collectWorkerData(); const healEvents = healWorkers(workers); for (const evt of healEvents) lastEventFeed.push(evt); done = render(workers); if (done) { clearInterval(timer); // Keep showing final state for 3 seconds then exit setTimeout(() => { process.stdout.write(SHOW_CURSOR); process.exit(0); }, 3000); } } catch (err) { // Don't crash the monitor on transient read errors process.stderr.write(`Monitor error: ${err.message}\n`); } }, INTERVAL_SEC * 1000); } main();