feat(swarm): integrate LLM runner into AgentSwarm.run()

- Make AgentSwarm.run() async with optional enableLLM flag
- Wire runAgentTurn from agent-runner.js into all 4 topologies
  (round_robin, supervisor, dynamic, sleeptime)
- Update drainSleeptimeQueue to use runAgentTurn for actual LLM
  execution instead of passive inbox reading
- Export runAgentTurn, runAgentLoop, runSwarmTurn from uok/index.js
- Update PersistentAgent JSDoc to reflect runner exists
- Fix test imports after extension consolidation (ttsr, google-search)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Mikael Hugo 2026-05-15 03:05:01 +02:00
parent f6619b792c
commit ca7ff554c3
17 changed files with 513 additions and 64 deletions

View file

@ -237,19 +237,19 @@ async function drainSleeptimeQueue(basePath) {
} }
if (!pending || pending.length === 0) return; if (!pending || pending.length === 0) return;
const { AgentSwarm } = await import("../uok/agent-swarm.js"); const { AgentSwarm } = await import("../uok/agent-swarm.js");
const { runAgentTurn } = await import("../uok/agent-runner.js");
for (const job of pending) { for (const job of pending) {
try { try {
const swarm = new AgentSwarm(basePath); const swarm = new AgentSwarm(basePath);
const memAgent = swarm.getByRole("coordinator")[0]; const memAgent = swarm.getByRole("coordinator")[0];
if (memAgent) { if (memAgent) {
swarm.send(job.conversation_agent, job.memory_agent, job.content); swarm.send(job.conversation_agent, job.memory_agent, job.content);
const received = memAgent.receive(true); const result = await runAgentTurn(memAgent, { timeoutMs: 60_000 });
const last = received[received.length - 1]; const output = result.response
const result = last?.body ? typeof result.response === "string"
? typeof last.body === "string" ? result.response
? last.body : JSON.stringify(result.response)
: JSON.stringify(last.body) : result.error ?? "";
: "";
db.prepare( db.prepare(
`UPDATE sleeptime_consolidation_queue `UPDATE sleeptime_consolidation_queue
SET status = 'done', processed_at = :ts, result = :result SET status = 'done', processed_at = :ts, result = :result
@ -257,7 +257,7 @@ async function drainSleeptimeQueue(basePath) {
).run({ ).run({
":id": job.id, ":id": job.id,
":ts": new Date().toISOString(), ":ts": new Date().toISOString(),
":result": result, ":result": output,
}); });
} else { } else {
db.prepare( db.prepare(

View file

@ -37,7 +37,7 @@ const UNSUPPORTED_MODEL_SCOPE_RE = /\b(?:account|plan|tier|subscription)\b/i;
// OpenRouter affordability-style quota errors should be treated as transient // OpenRouter affordability-style quota errors should be treated as transient
// so core retry logic can lower maxTokens and continue in-session. // so core retry logic can lower maxTokens and continue in-session.
const AFFORDABILITY_RE = const AFFORDABILITY_RE =
/requires more credits|can only afford|insufficient credits|not enough credits|fewer max_tokens/i; /requires more credits|can only afford|insufficient (?:credits|balance)|not enough credits|credit balance|fewer max_tokens/i;
const NETWORK_RE = const NETWORK_RE =
/network|ECONNRESET|ETIMEDOUT|ECONNREFUSED|socket hang up|fetch failed|connection.*reset|dns/i; /network|ECONNRESET|ETIMEDOUT|ECONNREFUSED|socket hang up|fetch failed|connection.*reset|dns/i;
const SERVER_RE = const SERVER_RE =

View file

@ -10,9 +10,8 @@ import { homedir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import { getModels, getProviders } from "@singularity-forge/ai"; import { getModels, getProviders } from "@singularity-forge/ai";
import { selectByBenchmarks } from "./benchmark-selector.js"; import { selectByBenchmarks } from "./benchmark-selector.js";
import { sfHome } from "./sf-home.js";
import { defaultRoutingConfig, MODEL_CAPABILITY_TIER } from "./model-router.js"; import { defaultRoutingConfig, MODEL_CAPABILITY_TIER } from "./model-router.js";
import { sfHome } from "./sf-home.js";
import { import {
DEFAULT_RUNAWAY_CHANGED_FILES_WARNING, DEFAULT_RUNAWAY_CHANGED_FILES_WARNING,
DEFAULT_RUNAWAY_DIAGNOSTIC_TURNS, DEFAULT_RUNAWAY_DIAGNOSTIC_TURNS,
@ -380,11 +379,13 @@ export function resolveModelWithFallbacksForUnit(unitType, options = {}) {
phaseConfig = m.completion; phaseConfig = m.completion;
break; break;
case "reassess-roadmap": case "reassess-roadmap":
case "rewrite-docs":
case "gate-evaluate": case "gate-evaluate":
case "validate-milestone": case "validate-milestone":
phaseConfig = m.validation ?? m.planning; phaseConfig = m.validation ?? m.planning;
break; break;
case "rewrite-docs":
phaseConfig = m.validation ?? m.execution ?? m.planning;
break;
default: default:
// Subagent unit types (e.g., "subagent", "subagent/scout") // Subagent unit types (e.g., "subagent", "subagent/scout")
if (unitType === "subagent" || unitType.startsWith("subagent/")) { if (unitType === "subagent" || unitType.startsWith("subagent/")) {

View file

@ -14,7 +14,6 @@ import { dirname, join, resolve } from "node:path";
import { normalizeStringArray } from "@singularity-forge/coding-agent"; import { normalizeStringArray } from "@singularity-forge/coding-agent";
import { parse as parseYaml } from "yaml"; import { parse as parseYaml } from "yaml";
import { sfRoot } from "./paths.js"; import { sfRoot } from "./paths.js";
import { sfHome } from "./sf-home.js";
import { import {
_initPrefsLoader, _initPrefsLoader,
resolveProfileDefaults as _resolveProfileDefaults, resolveProfileDefaults as _resolveProfileDefaults,
@ -22,6 +21,7 @@ import {
import { upgradePreferencesFileIfDrifted } from "./preferences-template-upgrade.js"; import { upgradePreferencesFileIfDrifted } from "./preferences-template-upgrade.js";
import { formatSkillRef, MODE_DEFAULTS } from "./preferences-types.js"; import { formatSkillRef, MODE_DEFAULTS } from "./preferences-types.js";
import { validatePreferences } from "./preferences-validation.js"; import { validatePreferences } from "./preferences-validation.js";
import { sfHome } from "./sf-home.js";
import { logWarning } from "./workflow-logger.js"; import { logWarning } from "./workflow-logger.js";
// ─── Re-exports: types ────────────────────────────────────────────────────── // ─── Re-exports: types ──────────────────────────────────────────────────────
@ -87,6 +87,9 @@ export function getSfAgentSettingsPath() {
function globalPreferencesYamlPath() { function globalPreferencesYamlPath() {
return join(sfHome(), "preferences.yaml"); return join(sfHome(), "preferences.yaml");
} }
function legacyGlobalPreferencesMarkdownPath() {
return join(sfHome(), "preferences.md");
}
/** /**
* Resolve the "project root" for preferences. When SF is running inside a * Resolve the "project root" for preferences. When SF is running inside a
* git worktree (e.g. `.sf/worktrees/M003/`), project-level prefs should * git worktree (e.g. `.sf/worktrees/M003/`), project-level prefs should
@ -147,7 +150,12 @@ export function getProjectSFPreferencesPath() {
* Load global SF preferences from preferences.yaml. * Load global SF preferences from preferences.yaml.
*/ */
export function loadGlobalSFPreferences() { export function loadGlobalSFPreferences() {
return loadPreferencesFile(globalPreferencesYamlPath(), "global"); return (
loadPreferencesFile(globalPreferencesYamlPath(), "global") ??
loadPreferencesFile(legacyGlobalPreferencesMarkdownPath(), "global", {
legacyMarkdown: true,
})
);
} }
/** /**
* Load project-level SF preferences from preferences.yaml. * Load project-level SF preferences from preferences.yaml.
@ -205,17 +213,21 @@ export function loadEffectiveSFPreferences() {
} }
return result; return result;
} }
function loadPreferencesFile(path, scope) { function loadPreferencesFile(path, scope, options = {}) {
if (!existsSync(path)) return null; if (!existsSync(path)) return null;
const raw = readFileSync(path, "utf-8"); const raw = readFileSync(path, "utf-8");
const preferences = parsePreferencesYaml(raw); const preferences = options.legacyMarkdown
? parsePreferencesMarkdown(raw)
: parsePreferencesYaml(raw);
if (!preferences) return null; if (!preferences) return null;
const validation = validatePreferences(preferences); const validation = validatePreferences(preferences);
// Self-align: if the file's recorded sf version drifted from current, // Self-align: if the file's recorded sf version drifted from current,
// silently re-render the frontmatter so subsequent reads match. No // silently re-render the frontmatter so subsequent reads match. No
// human-facing warning — sf keeps its own files in sync. Body content // human-facing warning — sf keeps its own files in sync. Body content
// (anything after the frontmatter) is preserved verbatim. // (anything after the frontmatter) is preserved verbatim.
const aligned = upgradePreferencesFileIfDrifted(path, validation.preferences); const aligned = options.legacyMarkdown
? validation.preferences
: upgradePreferencesFileIfDrifted(path, validation.preferences);
const allWarnings = [...validation.warnings, ...validation.errors]; const allWarnings = [...validation.warnings, ...validation.errors];
return { return {
path, path,

View file

@ -43,6 +43,7 @@ vi.mock("../auto-prompts.js", () => ({
import { import {
closeDatabase, closeDatabase,
insertAssessment,
insertMilestone, insertMilestone,
insertSlice, insertSlice,
openDatabase, openDatabase,
@ -152,4 +153,63 @@ describe("resolveDispatch canonical milestone plan", () => {
cleanup(base); cleanup(base);
} }
}); });
test("completing_milestone_when_validation_needs_attention_without_plan_dispatches_remediation", async () => {
const base = makeTempDir("sf-dispatch-validation-attention-");
try {
mkdirSync(join(base, ".sf"), { recursive: true });
openDatabase(join(base, ".sf", "sf.db"));
insertMilestone({
id: "M900",
title: "Validation self healing",
status: "active",
});
const validationContent = [
"---",
"verdict: needs-attention",
"remediation_round: 0",
"---",
"",
"# Milestone Validation: M900",
"",
"## Verdict Rationale",
"The validation looks stale: it says only S01 is complete even though DB-backed slice state may have moved on.",
"Address or explicitly defer the finding and rerun validation.",
"",
].join("\n");
insertAssessment({
path: ".sf/milestones/M900/M900-VALIDATION.md",
milestoneId: "M900",
scope: "milestone-validation",
status: "needs-attention",
fullContent: validationContent,
});
const result = await resolveDispatch({
state: {
phase: "completing-milestone",
},
mid: "M900",
midTitle: "Validation self healing",
basePath: base,
prefs: { phases: {} },
session: {},
pipelineVariant: "standard",
});
expect(result).toMatchObject({
action: "dispatch",
unitType: "rewrite-docs",
unitId: "M900/validation-attention",
});
expect(result.reason).toBeUndefined();
expect(result.prompt).toContain(
"No structured remediation section was present",
);
expect(result.prompt).toContain("ready for revalidation");
} finally {
closeDatabase();
cleanup(base);
}
});
}); });

View file

@ -0,0 +1,20 @@
/**
* error-classifier.test.mjs provider failure routing contracts.
*
* Purpose: keep provider-account balance failures in the autonomous fallback
* path so one drained route does not pause a unit that has usable fallbacks.
*/
import { describe, expect, test } from "vitest";
import { classifyError, isTransient } from "../error-classifier.js";
describe("classifyError", () => {
test("opencode_insufficient_balance_routes_to_fallback", () => {
const result = classifyError(
"401 Insufficient balance. Manage your billing here: https://opencode.ai/workspace/example/billing",
);
expect(result.kind).toBe("rate-limit");
expect(isTransient(result)).toBe(true);
});
});

View file

@ -39,10 +39,16 @@ function makePreferencesProject(projectPreferences, projectSettings) {
); );
} }
process.env.HOME = home; process.env.HOME = home;
process.env.SF_HOME = join(home, ".sf");
process.chdir(project); process.chdir(project);
return project; return project;
} }
function writeLegacyGlobalPreferences(project, content) {
const sfHome = join(project, "..", "home", ".sf");
writeFileSync(join(sfHome, "preferences.md"), content, "utf-8");
}
describe("preferences model resolution", () => { describe("preferences model resolution", () => {
test("resolveModelWithFallbacksForUnit_when_google_env_auth_is_default_off_skips_google_auto_benchmark_candidates", () => { test("resolveModelWithFallbacksForUnit_when_google_env_auth_is_default_off_skips_google_auto_benchmark_candidates", () => {
makePreferencesProject( makePreferencesProject(
@ -56,4 +62,49 @@ describe("preferences model resolution", () => {
assert.equal(result, undefined); assert.equal(result, undefined);
}); });
test("resolveModelWithFallbacksForUnit_when_global_yaml_missing_reads_legacy_global_preferences_md", () => {
const project = makePreferencesProject(
["version: 1", "models: {}", ""].join("\n"),
);
writeLegacyGlobalPreferences(
project,
[
"---",
"version: 1",
"models:",
" execution: kimi-coding/kimi-k2.6",
"---",
"",
"# Global SF Preferences",
"",
].join("\n"),
);
const result = resolveModelWithFallbacksForUnit("execute-task");
assert.deepEqual(result, {
primary: "kimi-coding/kimi-k2.6",
fallbacks: [],
});
});
test("resolveModelWithFallbacksForUnit_when_rewrite_docs_has_no_validation_model_uses_execution_model", () => {
makePreferencesProject(
[
"version: 1",
"models:",
" planning: minimax/MiniMax-M2.7",
" execution: kimi-coding/kimi-k2.6",
"",
].join("\n"),
);
const result = resolveModelWithFallbacksForUnit("rewrite-docs");
assert.deepEqual(result, {
primary: "kimi-coding/kimi-k2.6",
fallbacks: [],
});
});
}); });

View file

@ -0,0 +1,239 @@
/**
* agent-runner.js LLM execution runner for PersistentAgent swarm agents.
*
* Purpose: bridge the gap between PersistentAgent (passive inbox/memory store)
* and actual LLM execution. Reads pending messages, assembles prompts, dispatches
* to LLM via `sf headless --print`, writes replies back to the MessageBus, and
* manages context window.
*
* Integration: imported lazily by `AgentSwarm.run()` when `opts.enableLLM` is true,
* and directly by `auto/loop.js` for sleeptime consolidation. Keeps PersistentAgent
* itself free of child_process spawning logic.
*
* Consumer: AgentSwarm.run() orchestrator and autonomous dispatch paths.
*/
import { spawn } from "node:child_process";
const DEFAULT_MAX_CONTEXT_TURNS = 10;
const DEFAULT_MAX_TURNS_PER_RUN = 5;
const DEFAULT_RUNNER_TIMEOUT_MS = 120_000;
const DEFAULT_POLL_INTERVAL_MS = 1_000;
/**
* Assemble a prompt from core memory blocks + recent inbox messages.
*/
function buildAgentPrompt(agent, messages) {
const blocks = agent.getContextBlocks();
const lines = [];
if (blocks) {
lines.push(blocks, "");
}
lines.push("## Instructions", "");
lines.push(
"You are a persistent agent in a multi-agent swarm. Process the incoming messages and produce a response.",
);
lines.push("Be concise. Use tools when needed. Return structured output.");
lines.push("");
if (messages.length > 0) {
lines.push("## Incoming Messages", "");
for (const msg of messages) {
const from = msg.from?.replace(/^agent:/, "") ?? "unknown";
lines.push(
`**${from}**: ${typeof msg.body === "string" ? msg.body : JSON.stringify(msg.body)}`,
);
}
lines.push("");
}
lines.push("## Your Response");
return lines.join("\n");
}
/**
* Execute a prompt via sf headless --print mode.
* Returns the LLM response text.
*/
function runHeadlessPrompt(
basePath,
prompt,
timeoutMs = DEFAULT_RUNNER_TIMEOUT_MS,
) {
return new Promise((resolve, reject) => {
const sfBin = process.env.SF_BIN_PATH || process.argv[1];
if (!sfBin) {
reject(new Error("SF_BIN_PATH not set and process.argv[1] unavailable"));
return;
}
const args = [sfBin, "--print", prompt];
const child = spawn(process.execPath, args, {
cwd: basePath,
stdio: ["ignore", "pipe", "pipe"],
env: { ...process.env, SF_AUTONOMOUS: "0" },
});
const chunks = [];
const errChunks = [];
let settled = false;
const timer = setTimeout(() => {
if (!settled) {
settled = true;
child.kill("SIGTERM");
reject(new Error(`Agent runner timed out after ${timeoutMs}ms`));
}
}, timeoutMs);
child.stdout.on("data", (chunk) => chunks.push(chunk));
child.stderr.on("data", (chunk) => errChunks.push(chunk));
child.on("error", (err) => {
if (!settled) {
settled = true;
clearTimeout(timer);
reject(err);
}
});
child.on("exit", (code) => {
if (settled) return;
settled = true;
clearTimeout(timer);
const stdout = Buffer.concat(chunks).toString("utf-8").trim();
const stderr = Buffer.concat(errChunks).toString("utf-8").trim();
if (code !== 0) {
reject(
new Error(
`sf headless exited ${code}: ${stderr || stdout || "unknown error"}`,
),
);
return;
}
resolve(stdout);
});
});
}
/**
* Run a single turn for a PersistentAgent: read inbox, build prompt, call LLM, reply.
*
* @param {PersistentAgent} agent
* @param {object} [opts]
* @param {number} [opts.maxContextTurns=10]
* @param {number} [opts.timeoutMs=120000]
* @returns {Promise<{turnsProcessed: number, response: string|null}>}
*/
export async function runAgentTurn(agent, opts = {}) {
const {
maxContextTurns = DEFAULT_MAX_CONTEXT_TURNS,
timeoutMs = DEFAULT_RUNNER_TIMEOUT_MS,
} = opts;
const messages = agent.receive(true);
if (messages.length === 0) {
return { turnsProcessed: 0, response: null };
}
// Mark messages as read before processing so a crash doesn't re-process them
for (const msg of messages) {
agent.markRead(msg.id);
}
// Limit context window
const contextMessages = messages.slice(-maxContextTurns);
const prompt = buildAgentPrompt(agent, contextMessages);
let response;
try {
response = await runHeadlessPrompt(agent._basePath, prompt, timeoutMs);
} catch (err) {
// On failure, write error back to bus so sender knows
agent._bus.send(
`agent:${agent.identity.name}`,
messages[0]?.from ?? "agent:coordinator",
{ error: err.message, original: messages[0]?.body },
{ replyTo: messages[0]?.id, type: "error" },
);
return { turnsProcessed: 0, response: null, error: err.message };
}
// Reply to the most recent message (or broadcast if no clear target)
const target = messages[messages.length - 1]?.from ?? `agent:coordinator`;
const replyId = agent._bus.send(
`agent:${agent.identity.name}`,
target,
response,
{ replyTo: messages[messages.length - 1]?.id, type: "response" },
);
return { turnsProcessed: messages.length, response, replyId };
}
/**
* Continuously poll and run an agent until no more unread messages or max turns reached.
*
* @param {PersistentAgent} agent
* @param {object} [opts]
* @param {number} [opts.maxTurns=5]
* @param {number} [opts.pollIntervalMs=1000]
* @param {number} [opts.timeoutMs=120000]
* @returns {Promise<{totalTurns: number, stoppedReason: string}>}
*/
export async function runAgentLoop(agent, opts = {}) {
const {
maxTurns = DEFAULT_MAX_TURNS_PER_RUN,
pollIntervalMs = DEFAULT_POLL_INTERVAL_MS,
timeoutMs = DEFAULT_RUNNER_TIMEOUT_MS,
} = opts;
let totalTurns = 0;
const startTime = Date.now();
while (totalTurns < maxTurns) {
if (Date.now() - startTime > timeoutMs) {
return { totalTurns, stoppedReason: "timeout" };
}
const result = await runAgentTurn(agent, {
timeoutMs: Math.min(timeoutMs, 60_000),
});
if (result.turnsProcessed === 0) {
return { totalTurns, stoppedReason: "no_messages" };
}
totalTurns += result.turnsProcessed;
// Brief pause between turns to avoid hammering the LLM
if (totalTurns < maxTurns) {
await new Promise((r) => setTimeout(r, pollIntervalMs));
}
}
return { totalTurns, stoppedReason: "max_turns" };
}
/**
* Run a swarm turn: each agent in the swarm processes its inbox once.
*
* @param {AgentSwarm} swarm
* @param {object} [opts]
* @returns {Promise<Array<{agentName: string, result: object}>>}
*/
export async function runSwarmTurn(swarm, opts = {}) {
const agents = swarm.getAll();
const results = [];
for (const agent of agents) {
const result = await runAgentTurn(agent, opts);
results.push({ agentName: agent.identity.name, result });
}
return results;
}

View file

@ -413,19 +413,25 @@ export class AgentSwarm {
* topology (round_robin, supervisor, dynamic, sleeptime) and return the full * topology (round_robin, supervisor, dynamic, sleeptime) and return the full
* turn history with termination status. * turn history with termination status.
* *
* When `opts.enableLLM` is true, each agent turn is backed by an actual LLM
* call via the agent runner. Otherwise the loop is a pure message router
* (useful for tests and lightweight orchestration).
*
* Consumer: autonomous dispatch, eval harness, and multi-agent task runners. * Consumer: autonomous dispatch, eval harness, and multi-agent task runners.
* *
* @param {string} initialMessage * @param {string} initialMessage
* @param {object} [opts] * @param {object} [opts]
* @param {string} [opts.from] - agent name to send as * @param {string} [opts.from] - agent name to send as
* @param {number} [opts.maxTurns] - override swarm default * @param {number} [opts.maxTurns] - override swarm default
* @param {number} [opts.timeout] - timeout in ms (informational; not enforced at socket level) * @param {number} [opts.timeout] - per-turn timeout in ms (default 120000)
* @returns {{ turns: Array<{agent: string, message: string}>, terminated: boolean, reason: string }} * @param {boolean} [opts.enableLLM] - run LLM-backed turns (default false)
* @returns {Promise<{ turns: Array<{agent: string, message: string}>, terminated: boolean, reason: string }>}
*/ */
run(initialMessage, opts = {}) { async run(initialMessage, opts = {}) {
const maxTurns = opts.maxTurns ?? this._maxTurns; const maxTurns = opts.maxTurns ?? this._maxTurns;
const fromName = opts.from ?? "external"; const fromName = opts.from ?? "external";
const terminationToken = this._terminationToken; const terminationToken = this._terminationToken;
const enableLLM = opts.enableLLM ?? false;
const turns = []; const turns = [];
let terminated = false; let terminated = false;
@ -436,6 +442,13 @@ export class AgentSwarm {
return { turns, terminated: false, reason: "no_agents" }; return { turns, terminated: false, reason: "no_agents" };
} }
/** @type {((agent: import("./persistent-agent.js").PersistentAgent, opts?: object) => Promise<{response?: string}>) | null} */
let runAgentTurn = null;
if (enableLLM) {
const runner = await import("./agent-runner.js");
runAgentTurn = runner.runAgentTurn;
}
switch (this._managerType) { switch (this._managerType) {
case ManagerType.ROUND_ROBIN: { case ManagerType.ROUND_ROBIN: {
let message = initialMessage; let message = initialMessage;
@ -443,9 +456,18 @@ export class AgentSwarm {
const agent = agents[i % agents.length]; const agent = agents[i % agents.length];
const agentName = agent.identity.name; const agentName = agent.identity.name;
this.send(fromName, agentName, message); this.send(fromName, agentName, message);
const received = agent.receive(false);
const last = received[received.length - 1]; let reply;
const reply = _bodyToString(last?.body, message); if (runAgentTurn) {
const result = await runAgentTurn(agent, {
timeoutMs: opts.timeout,
});
reply = result.response ?? message;
} else {
const received = agent.receive(false);
const last = received[received.length - 1];
reply = _bodyToString(last?.body, message);
}
turns.push({ agent: agentName, message: reply }); turns.push({ agent: agentName, message: reply });
if (reply.includes(terminationToken)) { if (reply.includes(terminationToken)) {
terminated = true; terminated = true;
@ -465,9 +487,17 @@ export class AgentSwarm {
let message = initialMessage; let message = initialMessage;
for (let i = 0; i < maxTurns; i++) { for (let i = 0; i < maxTurns; i++) {
const received = supervisor.receive(true); let reply;
const last = received[received.length - 1]; if (runAgentTurn) {
const reply = _bodyToString(last?.body, message); const result = await runAgentTurn(supervisor, {
timeoutMs: opts.timeout,
});
reply = result.response ?? message;
} else {
const received = supervisor.receive(true);
const last = received[received.length - 1];
reply = _bodyToString(last?.body, message);
}
turns.push({ agent: supervisorName, message: reply }); turns.push({ agent: supervisorName, message: reply });
if (reply.includes(terminationToken)) { if (reply.includes(terminationToken)) {
terminated = true; terminated = true;
@ -481,9 +511,18 @@ export class AgentSwarm {
const worker = workers[i % workers.length]; const worker = workers[i % workers.length];
const workerName = worker.identity.name; const workerName = worker.identity.name;
this.send(supervisorName, workerName, reply); this.send(supervisorName, workerName, reply);
const workerMessages = worker.receive(true);
const workerLast = workerMessages[workerMessages.length - 1]; let workerReply;
const workerReply = _bodyToString(workerLast?.body, reply); if (runAgentTurn) {
const result = await runAgentTurn(worker, {
timeoutMs: opts.timeout,
});
workerReply = result.response ?? reply;
} else {
const workerMessages = worker.receive(true);
const workerLast = workerMessages[workerMessages.length - 1];
workerReply = _bodyToString(workerLast?.body, reply);
}
turns.push({ agent: workerName, message: workerReply }); turns.push({ agent: workerName, message: workerReply });
if (workerReply.includes(terminationToken)) { if (workerReply.includes(terminationToken)) {
terminated = true; terminated = true;
@ -508,9 +547,18 @@ export class AgentSwarm {
for (const agent of agents) { for (const agent of agents) {
const agentName = agent.identity.name; const agentName = agent.identity.name;
this.send(fromName, agentName, message); this.send(fromName, agentName, message);
const received = agent.receive(false);
const last = received[received.length - 1]; let reply;
const reply = _bodyToString(last?.body, message); if (runAgentTurn) {
const result = await runAgentTurn(agent, {
timeoutMs: opts.timeout,
});
reply = result.response ?? message;
} else {
const received = agent.receive(false);
const last = received[received.length - 1];
reply = _bodyToString(last?.body, message);
}
turns.push({ agent: agentName, message: reply }); turns.push({ agent: agentName, message: reply });
if (reply.includes(terminationToken)) { if (reply.includes(terminationToken)) {
terminated = true; terminated = true;
@ -539,9 +587,17 @@ export class AgentSwarm {
this.send(fromName, convoName, message); this.send(fromName, convoName, message);
for (let i = 0; i < maxTurns; i++) { for (let i = 0; i < maxTurns; i++) {
const received = convoAgent.receive(true); let reply;
const last = received[received.length - 1]; if (runAgentTurn) {
const reply = _bodyToString(last?.body, message); const result = await runAgentTurn(convoAgent, {
timeoutMs: opts.timeout,
});
reply = result.response ?? message;
} else {
const received = convoAgent.receive(true);
const last = received[received.length - 1];
reply = _bodyToString(last?.body, message);
}
turns.push({ agent: convoName, message: reply }); turns.push({ agent: convoName, message: reply });
if (reply.includes(terminationToken)) { if (reply.includes(terminationToken)) {
terminated = true; terminated = true;

View file

@ -357,6 +357,24 @@ export function extractValidationAttentionPlan(validationContent) {
if (tracking?.[1]?.trim()) return tracking[1].trim(); if (tracking?.[1]?.trim()) return tracking[1].trim();
return null; return null;
} }
function buildFallbackValidationAttentionPlan(validationContent) {
const excerpt = validationContent.trim().slice(0, 12_000);
return [
"No structured remediation section was present in the validation artifact.",
"",
"Use the full validation artifact as the attention source:",
"",
"1. Inspect the DB-backed milestone, slice, task, summary, UAT, and validation state.",
"2. Identify whether the `needs-attention` rationale describes a real open defect, stale validation drift, or a follow-up that should be deferred.",
"3. If it is stale drift, update the current validation/remediation evidence so the next validation run evaluates the DB-backed state.",
"4. If it is a real defect, apply the smallest project or SF-state change that addresses it.",
"5. If it cannot be completed now, explicitly defer it with the concrete reason, owner/artifact, and revalidation evidence.",
"",
"## Validation Artifact Excerpt",
"",
excerpt || "(validation artifact was empty)",
].join("\n");
}
function validationAttentionMarkerPath(basePath, mid) { function validationAttentionMarkerPath(basePath, mid) {
return join( return join(
sfRoot(basePath), sfRoot(basePath),
@ -406,8 +424,8 @@ function validationAttentionRuntimePath(basePath, mid) {
); );
} }
function hasActiveValidationAttentionMarker(basePath, mid) { function hasActiveValidationAttentionMarker(basePath, mid) {
const markerPath = validationAttentionMarkerPath(basePath, mid); const marker = readValidationAttentionMarker(basePath, mid);
if (!existsSync(markerPath)) return false; if (!marker) return false;
if (existsSync(validationAttentionRuntimePath(basePath, mid))) return true; if (existsSync(validationAttentionRuntimePath(basePath, mid))) return true;
logWarning( logWarning(
"dispatch", "dispatch",
@ -1560,11 +1578,9 @@ export const DISPATCH_RULES = [
if (verdict && verdict !== "pass") { if (verdict && verdict !== "pass") {
if (verdict === "needs-attention") { if (verdict === "needs-attention") {
const attentionPlan = const attentionPlan =
extractValidationAttentionPlan(validationContent); extractValidationAttentionPlan(validationContent) ??
if ( buildFallbackValidationAttentionPlan(validationContent);
attentionPlan && if (!hasActiveValidationAttentionMarker(basePath, mid)) {
!hasActiveValidationAttentionMarker(basePath, mid)
) {
try { try {
writeValidationAttentionMarker(basePath, mid, { writeValidationAttentionMarker(basePath, mid, {
milestoneId: mid, milestoneId: mid,
@ -1914,8 +1930,8 @@ export const DISPATCH_RULES = [
}, },
]; ];
import { getRegistry, hasRegistry } from "../rule-registry.js";
import { getErrorMessage } from "../error-utils.js"; import { getErrorMessage } from "../error-utils.js";
import { getRegistry, hasRegistry } from "../rule-registry.js";
// ─── Dispatch Envelope Emission ─────────────────────────────────────────── // ─── Dispatch Envelope Emission ───────────────────────────────────────────
/** /**

View file

@ -23,6 +23,11 @@ export {
USER_SKILL_DIR, USER_SKILL_DIR,
validateSkillFrontmatter, validateSkillFrontmatter,
} from "../skills/index.js"; } from "../skills/index.js";
export {
runAgentLoop,
runAgentTurn,
runSwarmTurn,
} from "./agent-runner.js";
// ─── Agent Swarm ─────────────────────────────────────────────────────────── // ─── Agent Swarm ───────────────────────────────────────────────────────────
export { AgentSwarm, ManagerType } from "./agent-swarm.js"; export { AgentSwarm, ManagerType } from "./agent-swarm.js";
export { export {

View file

@ -8,27 +8,16 @@
* Consumer: AgentSwarm orchestrator, swarm role agents (CoordinatorAgent, WorkerAgent etc), * Consumer: AgentSwarm orchestrator, swarm role agents (CoordinatorAgent, WorkerAgent etc),
* and direct use in multi-agent dispatch flows. * and direct use in multi-agent dispatch flows.
* *
* ## Current state * ## Runner
* This module implements the **container** half of a persistent agent: identity, inbox, * The `agent-runner.js` module (sibling to this file) implements the LLM execution half:
* memory blocks, and message routing. It does NOT implement the **runner** half.
*
* The missing piece is an LLM execution runner that:
* 1. Reads pending messages from this agent's inbox (`receive(true)`) * 1. Reads pending messages from this agent's inbox (`receive(true)`)
* 2. Assembles a prompt from core memory blocks + inbox messages * 2. Assembles a prompt from core memory blocks + inbox messages
* 3. Dispatches to SF headless (`node dist/loader.js headless --print <prompt>`) * 3. Dispatches to SF headless (`node dist/loader.js headless --print <prompt>`)
* 4. Writes the LLM response back into the bus as a reply * 4. Writes the LLM response back into the bus as a reply
* 5. Updates memory blocks (eviction, summarization) when context grows large * 5. Limits context window to the most recent N turns
* *
* Until the runner exists, `PersistentAgent` is a passive store. The autonomous loop * `AgentSwarm.run()` optionally enables LLM-backed turns via `opts.enableLLM`.
* uses it this way for sleeptime memory consolidation (caller sends + immediately reads * When disabled, the swarm acts as a pure message router (useful for tests).
* inbox). `SwarmDispatchLayer` also only enqueues messages nothing processes them.
*
* When building the runner, key design decisions to make:
* - Context window management: how many inbox turns to include before summarizing
* - Memory eviction: which core blocks are injected, which are summarized to archival
* - Turn limits: max rounds before the runner yields and re-queues
* - Concurrency: one runner per agent name (enforce via DB lock or process mutex)
* - Error handling: failed LLM calls should leave the message as unread, not drop it
* *
* See: Codex `codex-rs/core/src/agent/control.rs` for the reference implementation of * See: Codex `codex-rs/core/src/agent/control.rs` for the reference implementation of
* typed parallel subagents (explorer/worker roles) with forked rollout history. * typed parallel subagents (explorer/worker roles) with forked rollout history.

View file

@ -1,6 +1,6 @@
import assert from "node:assert/strict"; import assert from "node:assert/strict";
import { afterEach, test } from "vitest"; import { afterEach, test } from "vitest";
import googleSearchExtension from "../resources/extensions/google-search/index.js"; import { googleSearchExtension } from "../resources/extensions/search-the-web/tool-google-search.js";
function createMockPI() { function createMockPI() {
const handlers: any[] = []; const handlers: any[] = [];

View file

@ -8,7 +8,7 @@
import assert from "node:assert/strict"; import assert from "node:assert/strict";
import { afterEach, test, vi } from "vitest"; import { afterEach, test, vi } from "vitest";
import googleSearchExtension from "../resources/extensions/google-search/index.js"; import { googleSearchExtension } from "../resources/extensions/search-the-web/tool-google-search.js";
const cliCoreMock = vi.hoisted(() => ({ const cliCoreMock = vi.hoisted(() => ({
lastRequest: undefined as any, lastRequest: undefined as any,

View file

@ -11,7 +11,7 @@
import assert from "node:assert/strict"; import assert from "node:assert/strict";
import { afterEach, test } from "vitest"; import { afterEach, test } from "vitest";
import searchExtension from "../resources/extensions/search-the-web/index.ts"; import searchExtension from "../resources/extensions/search-the-web/index.js";
import { import {
registerSearchTool, registerSearchTool,
resetSearchLoopGuardState, resetSearchLoopGuardState,

View file

@ -10,7 +10,7 @@ import {
type Rule, type Rule,
TtsrManager, TtsrManager,
type TtsrMatchContext, type TtsrMatchContext,
} from "../../src/resources/extensions/ttsr/index.js"; } from "../../src/resources/extensions/guardrails/ttsr-manager.js";
// ─── Helpers ───────────────────────────────────────────────────────────────── // ─── Helpers ─────────────────────────────────────────────────────────────────

View file

@ -9,7 +9,7 @@ import { tmpdir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import { afterEach, test } from "vitest"; import { afterEach, test } from "vitest";
import { loadRules } from "../../src/resources/extensions/ttsr/index.js"; import { loadRules } from "../../src/resources/extensions/guardrails/ttsr-rule-loader.js";
// ─── Helpers ───────────────────────────────────────────────────────────────── // ─── Helpers ─────────────────────────────────────────────────────────────────