From f26ec3a55d03a63fd7ef9f70064c89609e38f2bd Mon Sep 17 00:00:00 2001 From: Lex Christopherson Date: Fri, 27 Mar 2026 15:11:52 -0600 Subject: [PATCH] =?UTF-8?q?test:=20Built=20EventBridge=20orchestrator=20wi?= =?UTF-8?q?ring=20session=20events=20to=20Discord=20w=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - "packages/daemon/src/event-bridge.ts" - "packages/daemon/src/event-bridge.test.ts" GSD-Task: S04/T03 --- packages/daemon/src/event-bridge.test.ts | 619 +++++++++++++++++++++++ packages/daemon/src/event-bridge.ts | 488 ++++++++++++++++++ 2 files changed, 1107 insertions(+) create mode 100644 packages/daemon/src/event-bridge.test.ts create mode 100644 packages/daemon/src/event-bridge.ts diff --git a/packages/daemon/src/event-bridge.test.ts b/packages/daemon/src/event-bridge.test.ts new file mode 100644 index 000000000..8516b9dc4 --- /dev/null +++ b/packages/daemon/src/event-bridge.test.ts @@ -0,0 +1,619 @@ +/** + * event-bridge.test.ts — Tests for EventBridge orchestrator. + * + * Uses mock SessionManager (EventEmitter), mock ChannelManager, + * mock Discord Client, and mock Logger to test event wiring, + * blocker handling, conversation relay, and cleanup. + */ + +import { describe, it, mock } from 'node:test'; +import assert from 'node:assert/strict'; +import { EventEmitter } from 'node:events'; +import { EventBridge } from './event-bridge.js'; +import type { EventBridgeOptions, BridgeClient } from './event-bridge.js'; +import type { PendingBlocker, ManagedSession, DaemonConfig, SessionStatus } from './types.js'; +import type { SdkAgentEvent, RpcClient, RpcExtensionUIRequest } from '@gsd-build/rpc-client'; + +// --------------------------------------------------------------------------- +// Mock factories +// --------------------------------------------------------------------------- + +function createMockLogger() { + return { + debug: mock.fn(() => {}), + info: mock.fn(() => {}), + warn: mock.fn(() => {}), + error: mock.fn(() => {}), + }; +} + +function createMockChannelManager() { + const sentMessages: unknown[] = []; + const mockChannel = { + id: 'ch-123', + send: mock.fn(async (_payload: unknown) => { + sentMessages.push(_payload); + return { id: 'msg-1' }; + }), + createMessageComponentCollector: mock.fn((_opts?: unknown) => { + const collector = new EventEmitter() as EventEmitter & { stop: (reason?: string) => void }; + collector.stop = (reason?: string) => collector.emit('end', [], reason ?? 'manual'); + return collector; + }), + }; + return { + createProjectChannel: mock.fn(async (_dir: string) => mockChannel), + _channel: mockChannel, + _sentMessages: sentMessages, + }; +} + +function createMockClient(): BridgeClient & EventEmitter { + const emitter = new EventEmitter(); + const dmSendFn = mock.fn(async () => ({})); + const fetchFn = mock.fn(async (_id: string) => ({ send: dmSendFn })); + (emitter as unknown as Record).users = { fetch: fetchFn }; + return Object.assign(emitter, { + users: { fetch: fetchFn }, + _dmSend: dmSendFn, + }) as unknown as BridgeClient & EventEmitter; +} + +function createMockSessionManager() { + const sm = new EventEmitter() as EventEmitter & { + getSession: ReturnType; + resolveBlocker: ReturnType; + }; + sm.getSession = mock.fn((_id: string) => undefined as ManagedSession | undefined); + sm.resolveBlocker = mock.fn(async (_sid: string, _resp: string) => {}); + return sm; +} + +function createMockSession(overrides?: Partial): ManagedSession { + return { + sessionId: 'sess-1', + projectDir: '/test/project', + projectName: 'project', + status: 'running' as SessionStatus, + client: { + steer: mock.fn(async (_msg: string) => {}), + prompt: mock.fn(async () => ({})), + } as unknown as RpcClient, + events: [], + pendingBlocker: null, + cost: { totalCost: 0, tokens: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 } }, + startTime: Date.now(), + ...overrides, + }; +} + +const DEFAULT_CONFIG: DaemonConfig = { + discord: { + token: 'test-token', + guild_id: 'guild-1', + owner_id: 'owner-1', + dm_on_blocker: false, + }, + projects: { scan_roots: [] }, + log: { file: '/tmp/test.log', level: 'debug', max_size_mb: 10 }, +}; + +function buildBridge(overrides?: Partial) { + const sessionManager = createMockSessionManager(); + const channelManager = createMockChannelManager(); + const client = createMockClient(); + const logger = createMockLogger(); + + const opts: EventBridgeOptions = { + sessionManager: sessionManager as unknown as EventBridgeOptions['sessionManager'], + channelManager: channelManager as unknown as EventBridgeOptions['channelManager'], + client, + config: DEFAULT_CONFIG, + logger: logger as unknown as EventBridgeOptions['logger'], + ownerId: 'owner-1', + ...overrides, + }; + + const bridge = new EventBridge(opts); + return { bridge, sessionManager, channelManager, client, logger }; +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- +const tick = () => new Promise((r) => setTimeout(r, 30)); + +function mockFn(obj: unknown): { mock: { callCount(): number; calls: Array<{ arguments: unknown[]; result?: unknown }> } } { + return obj as { mock: { callCount(): number; calls: Array<{ arguments: unknown[]; result?: unknown }> } }; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('EventBridge', () => { + describe('lifecycle', () => { + it('start() subscribes to session manager events and messageCreate', () => { + const { bridge, sessionManager, client } = buildBridge(); + bridge.start(); + assert.ok(sessionManager.listenerCount('session:started') > 0); + assert.ok(sessionManager.listenerCount('session:event') > 0); + assert.ok(sessionManager.listenerCount('session:blocked') > 0); + assert.ok(sessionManager.listenerCount('session:completed') > 0); + assert.ok(sessionManager.listenerCount('session:error') > 0); + assert.ok(client.listenerCount('messageCreate') > 0); + }); + + it('stop() unsubscribes from all events and clears mappings', async () => { + const { bridge, sessionManager, client } = buildBridge(); + bridge.start(); + await bridge.stop(); + assert.equal(sessionManager.listenerCount('session:started'), 0); + assert.equal(sessionManager.listenerCount('session:event'), 0); + assert.equal(sessionManager.listenerCount('session:blocked'), 0); + assert.equal(sessionManager.listenerCount('session:completed'), 0); + assert.equal(sessionManager.listenerCount('session:error'), 0); + assert.equal(client.listenerCount('messageCreate'), 0); + }); + + it('start() is idempotent', () => { + const { bridge, sessionManager } = buildBridge(); + bridge.start(); + bridge.start(); + assert.equal(sessionManager.listenerCount('session:started'), 1); + }); + + it('getVerbosityManager() returns a VerbosityManager', () => { + const { bridge } = buildBridge(); + const vm = bridge.getVerbosityManager(); + assert.ok(vm); + assert.equal(typeof vm.shouldShow, 'function'); + }); + }); + + describe('session:started → channel creation + welcome embed', () => { + it('creates channel and batcher', async () => { + const { bridge, sessionManager, channelManager } = buildBridge(); + bridge.start(); + sessionManager.emit('session:started', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', + }); + await tick(); + assert.equal(mockFn(channelManager.createProjectChannel).mock.callCount(), 1); + }); + + it('logs error and skips when channel creation fails', async () => { + const failingCm = { + createProjectChannel: mock.fn(async () => { throw new Error('API error'); }), + }; + const { bridge, sessionManager, logger } = buildBridge({ + channelManager: failingCm as unknown as EventBridgeOptions['channelManager'], + }); + bridge.start(); + sessionManager.emit('session:started', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', + }); + await tick(); + assert.ok(mockFn(logger.error).mock.callCount() > 0); + }); + }); + + describe('session:event → format + verbosity filter + enqueue', () => { + it('formats event and enqueues to batcher (no errors)', async () => { + const { bridge, sessionManager, logger } = buildBridge(); + bridge.start(); + sessionManager.emit('session:started', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', + }); + await tick(); + + sessionManager.emit('session:event', { + sessionId: 'sess-1', projectDir: '/test/project', + event: { type: 'tool_execution_start', name: 'read' } as SdkAgentEvent, + }); + await tick(); + // No errors + assert.equal(mockFn(logger.error).mock.callCount(), 0); + }); + + it('filters events based on verbosity', async () => { + const { bridge, sessionManager, channelManager, logger } = buildBridge(); + bridge.start(); + sessionManager.emit('session:started', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', + }); + await tick(); + + // Set quiet mode + bridge.getVerbosityManager().setLevel('ch-123', 'quiet'); + + // cost_update filtered in quiet + sessionManager.emit('session:event', { + sessionId: 'sess-1', projectDir: '/test/project', + event: { type: 'cost_update', cumulativeCost: 1.5 } as SdkAgentEvent, + }); + await tick(); + // tool_execution_start filtered in quiet + sessionManager.emit('session:event', { + sessionId: 'sess-1', projectDir: '/test/project', + event: { type: 'tool_execution_start', name: 'read' } as SdkAgentEvent, + }); + await tick(); + assert.equal(mockFn(logger.error).mock.callCount(), 0); + }); + }); + + describe('session:blocked → blocker embed + buttons + optional DM', () => { + it('sends blocker embed and creates collector for confirm', async () => { + const { bridge, sessionManager, channelManager } = buildBridge(); + bridge.start(); + sessionManager.emit('session:started', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', + }); + await tick(); + + const blocker: PendingBlocker = { + id: 'blocker-1', method: 'confirm', message: 'Continue?', + event: { id: 'blocker-1', method: 'confirm', message: 'Continue?' } as RpcExtensionUIRequest, + }; + sessionManager.emit('session:blocked', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', blocker, + }); + await tick(); + assert.ok(mockFn(channelManager._channel.createMessageComponentCollector).mock.callCount() > 0); + }); + + it('sends DM when dm_on_blocker is configured', async () => { + const config: DaemonConfig = { + ...DEFAULT_CONFIG, + discord: { ...DEFAULT_CONFIG.discord!, dm_on_blocker: true }, + }; + const client = createMockClient(); + const { bridge, sessionManager } = buildBridge({ config, client }); + bridge.start(); + + sessionManager.emit('session:started', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', + }); + await tick(); + + const blocker: PendingBlocker = { + id: 'blocker-1', method: 'input', message: 'Enter API key', + event: { id: 'blocker-1', method: 'input' } as RpcExtensionUIRequest, + }; + sessionManager.emit('session:blocked', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', blocker, + }); + await tick(); + + const usersFetch = (client as unknown as Record).users.fetch; + assert.equal(mockFn(usersFetch).mock.callCount(), 1); + }); + + it('does not send DM when dm_on_blocker is false', async () => { + const client = createMockClient(); + const { bridge, sessionManager } = buildBridge({ client }); + bridge.start(); + + sessionManager.emit('session:started', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', + }); + await tick(); + + const blocker: PendingBlocker = { + id: 'blocker-1', method: 'input', message: 'Enter value', + event: { id: 'blocker-1', method: 'input' } as RpcExtensionUIRequest, + }; + sessionManager.emit('session:blocked', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', blocker, + }); + await tick(); + + const usersFetch = (client as unknown as Record).users.fetch; + assert.equal(mockFn(usersFetch).mock.callCount(), 0); + }); + }); + + describe('button collector → resolveBlocker', () => { + it('resolves blocker on button click from authorized user', async () => { + const { bridge, sessionManager, channelManager } = buildBridge(); + bridge.start(); + + sessionManager.emit('session:started', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', + }); + await tick(); + + const blocker: PendingBlocker = { + id: 'blocker-1', method: 'confirm', message: 'Confirm?', + event: { id: 'blocker-1', method: 'confirm' } as RpcExtensionUIRequest, + }; + sessionManager.emit('session:blocked', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', blocker, + }); + await tick(); + + const collectorCalls = mockFn(channelManager._channel.createMessageComponentCollector).mock.calls; + assert.ok(collectorCalls.length > 0); + const collector = collectorCalls[0]!.result as EventEmitter; + + const mockInteraction = { + customId: 'blocker:blocker-1:confirm:true', + user: { id: 'owner-1' }, + update: mock.fn(async () => {}), + reply: mock.fn(async () => {}), + }; + collector.emit('collect', mockInteraction); + await tick(); + + assert.equal(mockFn(sessionManager.resolveBlocker).mock.callCount(), 1); + const args = mockFn(sessionManager.resolveBlocker).mock.calls[0]!.arguments; + assert.equal(args[0], 'sess-1'); + assert.equal(args[1], 'true'); + }); + + it('rejects button click from unauthorized user', async () => { + const { bridge, sessionManager, channelManager } = buildBridge(); + bridge.start(); + + sessionManager.emit('session:started', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', + }); + await tick(); + + const blocker: PendingBlocker = { + id: 'blocker-1', method: 'confirm', message: 'Confirm?', + event: { id: 'blocker-1', method: 'confirm' } as RpcExtensionUIRequest, + }; + sessionManager.emit('session:blocked', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', blocker, + }); + await tick(); + + const collectorCalls = mockFn(channelManager._channel.createMessageComponentCollector).mock.calls; + const collector = collectorCalls[0]!.result as EventEmitter; + + const mockInteraction = { + customId: 'blocker:blocker-1:confirm:true', + user: { id: 'stranger-99' }, + update: mock.fn(async () => {}), + reply: mock.fn(async () => {}), + }; + collector.emit('collect', mockInteraction); + await tick(); + + assert.equal(mockFn(sessionManager.resolveBlocker).mock.callCount(), 0); + assert.equal(mockFn(mockInteraction.reply).mock.callCount(), 1); + }); + + it('posts error when resolveBlocker throws', async () => { + const { bridge, sessionManager, channelManager } = buildBridge(); + sessionManager.resolveBlocker = mock.fn(async () => { throw new Error('No pending blocker'); }); + bridge.start(); + + sessionManager.emit('session:started', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', + }); + await tick(); + + const blocker: PendingBlocker = { + id: 'blocker-1', method: 'confirm', message: 'Confirm?', + event: { id: 'blocker-1', method: 'confirm' } as RpcExtensionUIRequest, + }; + sessionManager.emit('session:blocked', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', blocker, + }); + await tick(); + + const collectorCalls = mockFn(channelManager._channel.createMessageComponentCollector).mock.calls; + const collector = collectorCalls[0]!.result as EventEmitter; + + const mockInteraction = { + customId: 'blocker:blocker-1:confirm:true', + user: { id: 'owner-1' }, + update: mock.fn(async () => {}), + reply: mock.fn(async () => {}), + }; + collector.emit('collect', mockInteraction); + await tick(); + + assert.equal(mockFn(mockInteraction.reply).mock.callCount(), 1); + const replyArg = mockFn(mockInteraction.reply).mock.calls[0]!.arguments[0] as Record; + assert.ok(String(replyArg.content).includes('Failed to resolve')); + }); + }); + + describe('messageCreate relay', () => { + it('relays message to session steer when no pending blocker', async () => { + const session = createMockSession(); + const { bridge, sessionManager, client } = buildBridge(); + sessionManager.getSession = mock.fn(() => session); + bridge.start(); + + sessionManager.emit('session:started', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', + }); + await tick(); + + const msg = { + author: { id: 'owner-1', bot: false }, + channelId: 'ch-123', + content: 'check the test results', + react: mock.fn(async () => {}), + reply: mock.fn(async () => {}), + }; + client.emit('messageCreate', msg); + await tick(); + + assert.equal(mockFn(session.client.steer).mock.callCount(), 1); + assert.equal(mockFn(session.client.steer).mock.calls[0]!.arguments[0], 'check the test results'); + }); + + it('resolves blocker via relay for input method', async () => { + const blocker: PendingBlocker = { + id: 'blocker-2', method: 'input', message: 'Enter value', + event: { id: 'blocker-2', method: 'input' } as RpcExtensionUIRequest, + }; + const session = createMockSession({ pendingBlocker: blocker, status: 'blocked' }); + const { bridge, sessionManager, client } = buildBridge(); + sessionManager.getSession = mock.fn(() => session); + bridge.start(); + + sessionManager.emit('session:started', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', + }); + await tick(); + + const msg = { + author: { id: 'owner-1', bot: false }, + channelId: 'ch-123', + content: 'my-api-key-value', + react: mock.fn(async () => {}), + reply: mock.fn(async () => {}), + }; + client.emit('messageCreate', msg); + await tick(); + + assert.equal(mockFn(sessionManager.resolveBlocker).mock.callCount(), 1); + assert.equal(mockFn(sessionManager.resolveBlocker).mock.calls[0]!.arguments[1], 'my-api-key-value'); + }); + + it('ignores bot messages', async () => { + const session = createMockSession(); + const { bridge, sessionManager, client } = buildBridge(); + sessionManager.getSession = mock.fn(() => session); + bridge.start(); + + sessionManager.emit('session:started', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', + }); + await tick(); + + client.emit('messageCreate', { + author: { id: 'bot-1', bot: true }, + channelId: 'ch-123', + content: 'automated', + react: mock.fn(async () => {}), + reply: mock.fn(async () => {}), + }); + await tick(); + + assert.equal(mockFn(session.client.steer).mock.callCount(), 0); + }); + + it('ignores messages in non-project channels', async () => { + const session = createMockSession(); + const { bridge, sessionManager, client } = buildBridge(); + sessionManager.getSession = mock.fn(() => session); + bridge.start(); + + client.emit('messageCreate', { + author: { id: 'owner-1', bot: false }, + channelId: 'random-ch-999', + content: 'hello', + react: mock.fn(async () => {}), + reply: mock.fn(async () => {}), + }); + await tick(); + + assert.equal(mockFn(session.client.steer).mock.callCount(), 0); + }); + + it('ignores messages from unauthorized users', async () => { + const session = createMockSession(); + const { bridge, sessionManager, client } = buildBridge(); + sessionManager.getSession = mock.fn(() => session); + bridge.start(); + + sessionManager.emit('session:started', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', + }); + await tick(); + + client.emit('messageCreate', { + author: { id: 'stranger-99', bot: false }, + channelId: 'ch-123', + content: 'hack the planet', + react: mock.fn(async () => {}), + reply: mock.fn(async () => {}), + }); + await tick(); + + assert.equal(mockFn(session.client.steer).mock.callCount(), 0); + }); + + it('posts error when steer fails', async () => { + const session = createMockSession(); + (session.client as unknown as Record).steer = mock.fn(async () => { + throw new Error('session dead'); + }); + const { bridge, sessionManager, client } = buildBridge(); + sessionManager.getSession = mock.fn(() => session); + bridge.start(); + + sessionManager.emit('session:started', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', + }); + await tick(); + + const msg = { + author: { id: 'owner-1', bot: false }, + channelId: 'ch-123', + content: 'try this', + react: mock.fn(async () => {}), + reply: mock.fn(async () => {}), + }; + client.emit('messageCreate', msg); + await tick(); + + assert.equal(mockFn(msg.reply).mock.callCount(), 1); + }); + }); + + describe('session:completed → cleanup', () => { + it('posts completion embed and cleans up', async () => { + const { bridge, sessionManager, logger } = buildBridge(); + bridge.start(); + + sessionManager.emit('session:started', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', + }); + await tick(); + + sessionManager.emit('session:completed', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', + }); + await tick(); + + // After cleanup, events for this session are silently ignored + sessionManager.emit('session:event', { + sessionId: 'sess-1', projectDir: '/test/project', + event: { type: 'tool_execution_start', name: 'read' } as SdkAgentEvent, + }); + await tick(); + assert.equal(mockFn(logger.error).mock.callCount(), 0); + }); + }); + + describe('session:error → cleanup', () => { + it('posts error embed and cleans up', async () => { + const { bridge, sessionManager, logger } = buildBridge(); + bridge.start(); + + sessionManager.emit('session:started', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', + }); + await tick(); + + sessionManager.emit('session:error', { + sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', error: 'Process crashed', + }); + await tick(); + + const infoCalls = mockFn(logger.info).mock.calls; + assert.ok( + infoCalls.some((c) => String(c.arguments[0]).includes('session error')), + ); + }); + }); +}); diff --git a/packages/daemon/src/event-bridge.ts b/packages/daemon/src/event-bridge.ts new file mode 100644 index 000000000..22221afca --- /dev/null +++ b/packages/daemon/src/event-bridge.ts @@ -0,0 +1,488 @@ +/** + * event-bridge.ts — Orchestrator wiring SessionManager events through + * formatter → batcher → Discord channels. + * + * Handles: + * - Session lifecycle → Discord channel creation and cleanup + * - Event streaming → format + verbosity filter + batcher + * - Blocker resolution → interactive buttons + text relay + * - Conversation relay → Discord messages forwarded to GSD sessions + * - DM backup → owner gets DM on blocker when dm_on_blocker configured + */ + +import type { Client, Message, TextChannel, MessageComponentInteraction } from 'discord.js'; +import { EmbedBuilder, ComponentType } from 'discord.js'; +import type { SdkAgentEvent } from '@gsd-build/rpc-client'; +import type { Logger } from './logger.js'; +import type { DaemonConfig, PendingBlocker } from './types.js'; +import type { SessionManager } from './session-manager.js'; +import type { ChannelManager } from './channel-manager.js'; +import { MessageBatcher } from './message-batcher.js'; +import { VerbosityManager } from './verbosity.js'; +import { + formatEvent, + formatBlocker, + formatSessionStarted, + formatError, + formatCompletion, +} from './event-formatter.js'; +import { isAuthorized } from './discord-bot.js'; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +/** Minimal interface for a Discord client — extracted for testability. */ +export interface BridgeClient { + on(event: 'messageCreate', listener: (message: Message) => void): void; + off(event: 'messageCreate', listener: (message: Message) => void): void; + users: { fetch(id: string): Promise<{ send(opts: unknown): Promise }> }; +} + +/** Options for creating an EventBridge. */ +export interface EventBridgeOptions { + sessionManager: SessionManager; + channelManager: ChannelManager; + client: BridgeClient; + config: DaemonConfig; + logger: Logger; + ownerId: string; +} + +// --------------------------------------------------------------------------- +// Collector timeout +// --------------------------------------------------------------------------- + +const BLOCKER_COLLECTOR_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 24 hours + +// --------------------------------------------------------------------------- +// EventBridge +// --------------------------------------------------------------------------- + +export class EventBridge { + private readonly sessionManager: SessionManager; + private readonly channelManager: ChannelManager; + private readonly client: BridgeClient; + private readonly config: DaemonConfig; + private readonly logger: Logger; + private readonly ownerId: string; + + /** sessionId → channelId */ + private readonly sessionToChannel = new Map(); + /** channelId → sessionId */ + private readonly channelToSession = new Map(); + /** sessionId → MessageBatcher */ + private readonly batchers = new Map(); + /** sessionId → TextChannel (cached for send operations) */ + private readonly channels = new Map(); + + private readonly verbosity = new VerbosityManager(); + + /** Bound event handlers for cleanup */ + private boundHandlers: { + started: (...args: unknown[]) => void; + event: (...args: unknown[]) => void; + blocked: (...args: unknown[]) => void; + completed: (...args: unknown[]) => void; + error: (...args: unknown[]) => void; + messageCreate: (msg: Message) => void; + } | null = null; + + constructor(opts: EventBridgeOptions) { + this.sessionManager = opts.sessionManager; + this.channelManager = opts.channelManager; + this.client = opts.client; + this.config = opts.config; + this.logger = opts.logger; + this.ownerId = opts.ownerId; + } + + // ----------------------------------------------------------------------- + // Lifecycle + // ----------------------------------------------------------------------- + + /** Subscribe to SessionManager events and Discord messageCreate. */ + start(): void { + if (this.boundHandlers) return; // already started + + this.boundHandlers = { + started: (data: unknown) => { + void this.onSessionStarted(data as SessionStartedPayload); + }, + event: (data: unknown) => { + void this.onSessionEvent(data as SessionEventPayload); + }, + blocked: (data: unknown) => { + void this.onSessionBlocked(data as SessionBlockedPayload); + }, + completed: (data: unknown) => { + void this.onSessionCompleted(data as SessionCompletedPayload); + }, + error: (data: unknown) => { + void this.onSessionError(data as SessionErrorPayload); + }, + messageCreate: (msg: Message) => { + void this.handleMessageCreate(msg); + }, + }; + + this.sessionManager.on('session:started', this.boundHandlers.started); + this.sessionManager.on('session:event', this.boundHandlers.event); + this.sessionManager.on('session:blocked', this.boundHandlers.blocked); + this.sessionManager.on('session:completed', this.boundHandlers.completed); + this.sessionManager.on('session:error', this.boundHandlers.error); + this.client.on('messageCreate', this.boundHandlers.messageCreate); + + this.logger.info('event bridge started'); + } + + /** Unsubscribe from all events, destroy batchers, clear mappings. */ + async stop(): Promise { + if (this.boundHandlers) { + this.sessionManager.off('session:started', this.boundHandlers.started); + this.sessionManager.off('session:event', this.boundHandlers.event); + this.sessionManager.off('session:blocked', this.boundHandlers.blocked); + this.sessionManager.off('session:completed', this.boundHandlers.completed); + this.sessionManager.off('session:error', this.boundHandlers.error); + this.client.off('messageCreate', this.boundHandlers.messageCreate); + this.boundHandlers = null; + } + + // Destroy all batchers + const destroyPromises: Promise[] = []; + for (const batcher of this.batchers.values()) { + destroyPromises.push(batcher.destroy()); + } + await Promise.allSettled(destroyPromises); + + this.batchers.clear(); + this.sessionToChannel.clear(); + this.channelToSession.clear(); + this.channels.clear(); + + this.logger.info('event bridge stopped'); + } + + /** Expose the verbosity manager for slash-command integration. */ + getVerbosityManager(): VerbosityManager { + return this.verbosity; + } + + // ----------------------------------------------------------------------- + // SessionManager event handlers + // ----------------------------------------------------------------------- + + private async onSessionStarted(data: SessionStartedPayload): Promise { + const { sessionId, projectDir, projectName } = data; + + try { + const channel = await this.channelManager.createProjectChannel(projectDir); + + // Create batcher with channel.send as the send function + const batcher = new MessageBatcher( + async (payload) => { + await channel.send(payload as Parameters[0]); + }, + this.logger, + ); + batcher.start(); + + // Register bidirectional mapping + this.sessionToChannel.set(sessionId, channel.id); + this.channelToSession.set(channel.id, sessionId); + this.batchers.set(sessionId, batcher); + this.channels.set(sessionId, channel); + + // Post welcome embed + const welcome = formatSessionStarted(projectName); + batcher.enqueue(welcome); + + this.logger.info('bridge: session channel created', { + sessionId, + channelId: channel.id, + projectName, + }); + } catch (err) { + // Failure mode: log error, skip streaming for this session + this.logger.error('bridge: channel creation failed', { + sessionId, + projectDir, + error: err instanceof Error ? err.message : String(err), + }); + } + } + + private async onSessionEvent(data: SessionEventPayload): Promise { + const { sessionId, event } = data; + const channelId = this.sessionToChannel.get(sessionId); + if (!channelId) return; // no channel for this session + + // Verbosity filter + const eventType = (event as Record).type as string; + if (!this.verbosity.shouldShow(channelId, eventType)) return; + + const formatted = formatEvent(event, this.ownerId); + const batcher = this.batchers.get(sessionId); + if (batcher) { + batcher.enqueue(formatted); + } + } + + private async onSessionBlocked(data: SessionBlockedPayload): Promise { + const { sessionId, projectName, blocker } = data; + const channel = this.channels.get(sessionId); + if (!channel) return; + + const formatted = formatBlocker(blocker, this.ownerId); + + // Send immediately (bypasses batching for blockers) + const batcher = this.batchers.get(sessionId); + if (batcher) { + await batcher.enqueueImmediate(formatted); + } + + // For select/confirm methods, set up button collector + if (blocker.method === 'select' || blocker.method === 'confirm') { + this.createButtonCollector(sessionId, channel, blocker); + } + + // DM backup + if (this.config.discord?.dm_on_blocker) { + await this.sendBlockerDM(sessionId, projectName, blocker); + } + } + + private async onSessionCompleted(data: SessionCompletedPayload): Promise { + const { sessionId, projectName } = data; + const batcher = this.batchers.get(sessionId); + if (!batcher) return; + + const completion = formatCompletion({ + type: 'execution_complete', + status: 'completed', + } as SdkAgentEvent); + + // Flush through batcher then cleanup + batcher.enqueue(completion); + await this.cleanupSession(sessionId); + + this.logger.info('bridge: session completed', { sessionId, projectName }); + } + + private async onSessionError(data: SessionErrorPayload): Promise { + const { sessionId, projectName, error } = data; + const batcher = this.batchers.get(sessionId); + if (!batcher) return; + + const errorEmbed = formatError(sessionId, error); + batcher.enqueue(errorEmbed); + await this.cleanupSession(sessionId); + + this.logger.info('bridge: session error', { sessionId, projectName, error }); + } + + // ----------------------------------------------------------------------- + // Blocker resolution — button collector + // ----------------------------------------------------------------------- + + private createButtonCollector( + sessionId: string, + channel: TextChannel, + blocker: PendingBlocker, + ): void { + // Create a message collector on the channel for button interactions + // We use createMessageComponentCollector on the channel + try { + const collector = channel.createMessageComponentCollector({ + componentType: ComponentType.Button, + time: BLOCKER_COLLECTOR_TIMEOUT_MS, + filter: (interaction: MessageComponentInteraction) => { + return interaction.customId.startsWith(`blocker:${blocker.id}:`); + }, + }); + + collector.on('collect', async (interaction: MessageComponentInteraction) => { + // Auth guard + if (!isAuthorized(interaction.user.id, this.ownerId)) { + await interaction.reply({ + content: '⛔ Only the project owner can respond to blockers.', + ephemeral: true, + }).catch(() => {}); + return; + } + + // Parse customId: blocker:{id}:{method}:{value} + const parts = interaction.customId.split(':'); + const value = parts[3] ?? ''; + + try { + await this.sessionManager.resolveBlocker(sessionId, value); + await interaction.update({ + content: `✅ Blocker resolved with: ${value}`, + components: [], + }).catch(() => {}); + collector.stop('resolved'); + } catch (err) { + const errMsg = err instanceof Error ? err.message : String(err); + this.logger.error('bridge: blocker resolve failed', { sessionId, error: errMsg }); + await interaction.reply({ + content: `❌ Failed to resolve blocker: ${errMsg}`, + ephemeral: true, + }).catch(() => {}); + } + }); + + collector.on('end', (_collected, reason) => { + if (reason === 'time') { + // Timeout: edit to show expired + this.logger.info('bridge: blocker collector timed out', { sessionId, blockerId: blocker.id }); + // Post a new message indicating expiry — editing original may fail + const batcher = this.batchers.get(sessionId); + if (batcher) { + batcher.enqueue({ + content: `⏰ Blocker response timed out after 24h. Re-posting...`, + embed: new EmbedBuilder() + .setColor(0xf1c40f) + .setTitle('⏰ Blocker Expired') + .setDescription(blocker.message) + .setTimestamp(), + }); + } + } + }); + } catch (err) { + this.logger.error('bridge: collector creation failed', { + sessionId, + error: err instanceof Error ? err.message : String(err), + }); + } + } + + // ----------------------------------------------------------------------- + // DM backup + // ----------------------------------------------------------------------- + + private async sendBlockerDM( + sessionId: string, + projectName: string, + blocker: PendingBlocker, + ): Promise { + try { + const user = await this.client.users.fetch(this.ownerId); + await user.send({ + content: `⚠️ **Blocker** in **${projectName}** — ${blocker.message}\n\nRespond in the project channel.`, + }); + this.logger.debug('bridge: DM sent for blocker', { sessionId, blockerId: blocker.id }); + } catch (err) { + // DM failure is non-fatal — channel message is the primary path + this.logger.warn('bridge: DM send failed', { + sessionId, + error: err instanceof Error ? err.message : String(err), + }); + } + } + + // ----------------------------------------------------------------------- + // Conversation relay — Discord → GSD + // ----------------------------------------------------------------------- + + private async handleMessageCreate(message: Message): Promise { + // Filter: bot messages + if (message.author.bot) return; + + // Filter: must be in a project channel + const sessionId = this.channelToSession.get(message.channelId); + if (!sessionId) return; + + // Filter: must be authorized + if (!isAuthorized(message.author.id, this.ownerId)) return; + + const session = this.sessionManager.getSession(sessionId); + if (!session) return; + + // If session has a pending blocker with input/editor method, resolve it + if (session.pendingBlocker && (session.pendingBlocker.method === 'input' || session.pendingBlocker.method === 'editor')) { + try { + await this.sessionManager.resolveBlocker(sessionId, message.content); + await message.react('✅').catch(() => {}); + this.logger.info('bridge: blocker resolved via relay', { + sessionId, + method: session.pendingBlocker.method, + }); + } catch (err) { + const errMsg = err instanceof Error ? err.message : String(err); + this.logger.error('bridge: relay blocker resolve failed', { sessionId, error: errMsg }); + await message.reply(`❌ Failed to resolve blocker: ${errMsg}`).catch(() => {}); + } + return; + } + + // Otherwise, steer the session with the message content + if (session.status === 'running') { + try { + await session.client.steer(message.content); + await message.react('📨').catch(() => {}); + this.logger.info('bridge: message relayed to session', { sessionId }); + } catch (err) { + const errMsg = err instanceof Error ? err.message : String(err); + this.logger.error('bridge: steer failed', { sessionId, error: errMsg }); + await message.reply(`❌ Failed to relay message: ${errMsg}`).catch(() => {}); + } + } + } + + // ----------------------------------------------------------------------- + // Cleanup + // ----------------------------------------------------------------------- + + private async cleanupSession(sessionId: string): Promise { + const batcher = this.batchers.get(sessionId); + if (batcher) { + await batcher.destroy(); + this.batchers.delete(sessionId); + } + + const channelId = this.sessionToChannel.get(sessionId); + if (channelId) { + this.channelToSession.delete(channelId); + } + this.sessionToChannel.delete(sessionId); + this.channels.delete(sessionId); + } +} + +// --------------------------------------------------------------------------- +// Internal event payload types (matching SessionManager emissions) +// --------------------------------------------------------------------------- + +interface SessionStartedPayload { + sessionId: string; + projectDir: string; + projectName: string; +} + +interface SessionEventPayload { + sessionId: string; + projectDir: string; + event: SdkAgentEvent; +} + +interface SessionBlockedPayload { + sessionId: string; + projectDir: string; + projectName: string; + blocker: PendingBlocker; +} + +interface SessionCompletedPayload { + sessionId: string; + projectDir: string; + projectName: string; +} + +interface SessionErrorPayload { + sessionId: string; + projectDir: string; + projectName: string; + error: string; +}