wip: M005 daemon — orchestrator, event bridge, formatter, batcher improvements (#2929)

Saves in-progress daemon work from M005-m138xe that was sitting uncommitted.
Includes orchestrator expansion, event bridge/formatter enhancements,
message batcher tweaks, and discord bot additions.
This commit is contained in:
TÂCHES 2026-03-27 20:22:30 -06:00 committed by GitHub
parent 1783559610
commit efe61c2fcc
8 changed files with 175 additions and 19 deletions

View file

@ -82,7 +82,7 @@ export class Daemon {
channelManager,
scanProjects: () => this.scanProjects(),
config: {
model: this.config.discord.orchestrator?.model ?? 'claude-sonnet-4-5-20250929',
model: this.config.discord.orchestrator?.model ?? 'claude-haiku-4-5-20251001',
max_tokens: this.config.discord.orchestrator?.max_tokens ?? 1024,
control_channel_id: this.config.discord.control_channel_id,
},

View file

@ -129,6 +129,17 @@ export class DiscordBot {
this.handleInteraction(interaction);
});
// Debug: log all incoming messages at debug level
client.on('messageCreate', (msg) => {
this.logger.debug('raw messageCreate', {
authorId: msg.author.id,
authorBot: msg.author.bot,
channelId: msg.channelId,
contentLength: msg.content.length,
hasContent: msg.content.length > 0,
});
});
// Reconnection observability — structured logging for all shard lifecycle events (R027)
client.on('shardError', (error) => {
this.logger.error('discord shard error', { error: error.message });

View file

@ -417,17 +417,23 @@ export class EventBridge {
return;
}
// Otherwise, steer the session with the message content
if (session.status === 'running') {
try {
// Otherwise, relay the message to the GSD session
// Use steer() when running (injects mid-turn), prompt() otherwise (starts new turn)
try {
if (session.status === 'running') {
await session.client.steer(message.content);
await message.react('📨').catch(() => {});
this.logger.info('bridge: message relayed to session', { sessionId });
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
this.logger.error('bridge: steer failed', { sessionId, error: errMsg });
await message.reply(`❌ Failed to relay message: ${errMsg}`).catch(() => {});
} else {
await session.client.prompt(message.content);
}
await message.react('📨').catch(() => {});
this.logger.info('bridge: message relayed to session', {
sessionId,
method: session.status === 'running' ? 'steer' : 'prompt',
});
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
this.logger.error('bridge: relay failed', { sessionId, error: errMsg });
await message.reply(`❌ Failed to relay message: ${errMsg}`).catch(() => {});
}
}

View file

@ -102,14 +102,42 @@ export function formatToolEnd(event: SdkAgentEvent): FormattedEvent {
export function formatMessage(event: SdkAgentEvent): FormattedEvent {
// Extract text from content blocks or message field
let text = '';
// Try content array first (most common for agent messages)
if (Array.isArray(event.content)) {
const blocks = event.content as Array<{ type?: string; text?: string }>;
text = blocks
.filter((b) => b.type === 'text' && typeof b.text === 'string')
.map((b) => b.text!)
.join('\n');
} else {
text = str(event.message || event.text || event.content);
}
// Try message field — could be string, object with content array, or object with text
if (!text && event.message != null) {
if (typeof event.message === 'string') {
text = event.message;
} else if (typeof event.message === 'object') {
const msg = event.message as Record<string, unknown>;
if (Array.isArray(msg.content)) {
const blocks = msg.content as Array<{ type?: string; text?: string }>;
text = blocks
.filter((b) => b.type === 'text' && typeof b.text === 'string')
.map((b) => b.text!)
.join('\n');
} else if (typeof msg.text === 'string') {
text = msg.text;
} else if (typeof msg.content === 'string') {
text = msg.content;
}
}
}
// Fallback to text or content as plain strings
if (!text) {
text = typeof event.text === 'string' ? event.text : '';
}
if (!text && typeof event.content === 'string') {
text = event.content;
}
if (!text) {

View file

@ -65,7 +65,7 @@ describe('MessageBatcher', () => {
await batcher.destroy();
});
it('combines embeds into a single send call', async () => {
it('skips embeds for batched messages (only content)', async () => {
const { fn, calls } = createSend();
const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 2, flushIntervalMs: 60_000 });
@ -74,7 +74,7 @@ describe('MessageBatcher', () => {
await new Promise((r) => setTimeout(r, 10));
assert.equal(calls.length, 1);
assert.equal(calls[0].embeds.length, 2);
assert.equal(calls[0].embeds.length, 0, 'batched sends skip embeds to avoid duplication');
assert.equal(calls[0].content, 'a\nb');
await batcher.destroy();

View file

@ -162,6 +162,10 @@ export class MessageBatcher {
/**
* Build a SendPayload from a batch of FormattedEvents and invoke the send callback.
* Catches and logs errors never throws.
*
* For batched messages (2+ events), we send content-only to avoid duplication
* between content text and embed descriptions, and to stay under Discord's
* 10-embed limit. Single-event sends include the embed for rich formatting.
*/
private async doSend(batch: FormattedEvent[]): Promise<void> {
if (batch.length === 0) return;
@ -169,10 +173,12 @@ export class MessageBatcher {
// Combine content lines
const content = batch.map((e) => e.content).join('\n');
// Collect all embeds (Discord allows up to 10 per message)
// For single events, include the embed for rich formatting.
// For batches, skip embeds — the content lines are self-descriptive and
// embeds would duplicate the information + risk hitting Discord's 10-embed cap.
const embeds: unknown[] = [];
for (const e of batch) {
if (e.embed) embeds.push(e.embed);
if (batch.length === 1 && batch[0].embed) {
embeds.push(batch[0].embed);
}
// Collect all component rows (only from the last event with components —

View file

@ -201,6 +201,7 @@ function makeMessage(overrides: Partial<{
send: async (content: string) => {
sentMessages.push(content);
},
sendTyping: async () => {},
},
sentMessages,
};

View file

@ -12,6 +12,9 @@
*/
import { z } from 'zod';
import { readFileSync, writeFileSync, chmodSync } from 'node:fs';
import { join } from 'node:path';
import { homedir } from 'node:os';
import type Anthropic from '@anthropic-ai/sdk';
import type {
MessageParam,
@ -26,6 +29,93 @@ import type { ChannelManager } from './channel-manager.js';
import type { ProjectInfo, ManagedSession } from './types.js';
import type { Logger } from './logger.js';
// ---------------------------------------------------------------------------
// OAuth token resolution — reads GSD's auth.json, refreshes if expired
// ---------------------------------------------------------------------------
interface OAuthCredentials {
type: 'oauth';
refresh: string;
access: string;
expires: number;
}
const TOKEN_URL = 'https://platform.claude.com/v1/oauth/token';
const CLIENT_ID = atob('OWQxYzI1MGEtZTYxYi00NGQ5LTg4ZWQtNTk0NGQxOTYyZjVl');
/**
* Read the Anthropic OAuth access token from GSD's auth.json.
* If expired, refresh it and write the new credentials back.
* Falls back to ANTHROPIC_API_KEY env var if no OAuth credential exists.
*/
async function resolveAnthropicApiKey(logger?: Logger): Promise<string> {
// Try env var first (explicit override)
if (process.env.ANTHROPIC_API_KEY) {
return process.env.ANTHROPIC_API_KEY;
}
const authPath = join(homedir(), '.gsd', 'agent', 'auth.json');
let authData: Record<string, unknown>;
try {
authData = JSON.parse(readFileSync(authPath, 'utf-8'));
} catch {
throw new Error(
'No Anthropic auth found. Run `gsd login` to authenticate, or set ANTHROPIC_API_KEY.',
);
}
const cred = authData.anthropic as OAuthCredentials | undefined;
if (!cred || cred.type !== 'oauth' || !cred.access) {
throw new Error(
'No Anthropic OAuth credential in auth.json. Run `gsd login` to authenticate.',
);
}
// If token is still valid, use it
if (Date.now() < cred.expires) {
return cred.access;
}
// Token expired — refresh it
logger?.info('orchestrator: refreshing Anthropic OAuth token');
const response = await fetch(TOKEN_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
grant_type: 'refresh_token',
client_id: CLIENT_ID,
refresh_token: cred.refresh,
}),
signal: AbortSignal.timeout(30_000),
});
if (!response.ok) {
const error = await response.text();
throw new Error(`Anthropic token refresh failed: ${error}`);
}
const data = (await response.json()) as {
access_token: string;
refresh_token: string;
expires_in: number;
};
const newCred: OAuthCredentials = {
type: 'oauth',
refresh: data.refresh_token,
access: data.access_token,
expires: Date.now() + data.expires_in * 1000 - 5 * 60 * 1000,
};
// Write back to auth.json
authData.anthropic = newCred;
writeFileSync(authPath, JSON.stringify(authData, null, 2), 'utf-8');
chmodSync(authPath, 0o600);
logger?.info('orchestrator: Anthropic OAuth token refreshed');
return newCred.access;
}
// ---------------------------------------------------------------------------
// Configuration
// ---------------------------------------------------------------------------
@ -164,11 +254,13 @@ export class Orchestrator {
/**
* Lazily initialise the Anthropic client. Dynamic import handles K007 module resolution.
* Resolves auth from GSD's OAuth credentials (auth.json), refreshing if needed.
*/
private async getClient(): Promise<Anthropic> {
if (this.client) return this.client;
const apiKey = await resolveAnthropicApiKey(this.deps.logger);
const { default: AnthropicSDK } = await import('@anthropic-ai/sdk');
this.client = new AnthropicSDK();
this.client = new AnthropicSDK({ apiKey });
return this.client;
}
@ -204,6 +296,9 @@ export class Orchestrator {
this.history.push({ role: 'user', content });
try {
// Show typing indicator while processing
await message.channel.sendTyping().catch(() => {});
const responseText = await this.runAgentLoop();
// Send response to Discord
@ -215,6 +310,12 @@ export class Orchestrator {
});
} catch (err) {
const errorMsg = err instanceof Error ? err.message : String(err);
// Invalidate cached client on auth errors so next call re-resolves OAuth token
if (errorMsg.includes('authentication') || errorMsg.includes('apiKey') || errorMsg.includes('authToken') || errorMsg.includes('401')) {
this.client = null;
}
this.deps.logger.error('orchestrator error', {
error: errorMsg,
userId: message.author.id,
@ -436,5 +537,8 @@ export interface DiscordMessageLike {
author: { id: string; bot: boolean };
channelId: string;
content: string;
channel: { send: (content: string) => Promise<unknown> };
channel: {
send: (content: string) => Promise<unknown>;
sendTyping: () => Promise<unknown>;
};
}