test: Built EventBridge orchestrator wiring session events to Discord w…

- "packages/daemon/src/event-bridge.ts"
- "packages/daemon/src/event-bridge.test.ts"

GSD-Task: S04/T03
This commit is contained in:
Lex Christopherson 2026-03-27 15:11:52 -06:00
parent 05abf86912
commit f26ec3a55d
2 changed files with 1107 additions and 0 deletions

View file

@ -0,0 +1,619 @@
/**
* event-bridge.test.ts Tests for EventBridge orchestrator.
*
* Uses mock SessionManager (EventEmitter), mock ChannelManager,
* mock Discord Client, and mock Logger to test event wiring,
* blocker handling, conversation relay, and cleanup.
*/
import { describe, it, mock } from 'node:test';
import assert from 'node:assert/strict';
import { EventEmitter } from 'node:events';
import { EventBridge } from './event-bridge.js';
import type { EventBridgeOptions, BridgeClient } from './event-bridge.js';
import type { PendingBlocker, ManagedSession, DaemonConfig, SessionStatus } from './types.js';
import type { SdkAgentEvent, RpcClient, RpcExtensionUIRequest } from '@gsd-build/rpc-client';
// ---------------------------------------------------------------------------
// Mock factories
// ---------------------------------------------------------------------------
function createMockLogger() {
return {
debug: mock.fn(() => {}),
info: mock.fn(() => {}),
warn: mock.fn(() => {}),
error: mock.fn(() => {}),
};
}
function createMockChannelManager() {
const sentMessages: unknown[] = [];
const mockChannel = {
id: 'ch-123',
send: mock.fn(async (_payload: unknown) => {
sentMessages.push(_payload);
return { id: 'msg-1' };
}),
createMessageComponentCollector: mock.fn((_opts?: unknown) => {
const collector = new EventEmitter() as EventEmitter & { stop: (reason?: string) => void };
collector.stop = (reason?: string) => collector.emit('end', [], reason ?? 'manual');
return collector;
}),
};
return {
createProjectChannel: mock.fn(async (_dir: string) => mockChannel),
_channel: mockChannel,
_sentMessages: sentMessages,
};
}
function createMockClient(): BridgeClient & EventEmitter {
const emitter = new EventEmitter();
const dmSendFn = mock.fn(async () => ({}));
const fetchFn = mock.fn(async (_id: string) => ({ send: dmSendFn }));
(emitter as unknown as Record<string, unknown>).users = { fetch: fetchFn };
return Object.assign(emitter, {
users: { fetch: fetchFn },
_dmSend: dmSendFn,
}) as unknown as BridgeClient & EventEmitter;
}
function createMockSessionManager() {
const sm = new EventEmitter() as EventEmitter & {
getSession: ReturnType<typeof mock.fn>;
resolveBlocker: ReturnType<typeof mock.fn>;
};
sm.getSession = mock.fn((_id: string) => undefined as ManagedSession | undefined);
sm.resolveBlocker = mock.fn(async (_sid: string, _resp: string) => {});
return sm;
}
function createMockSession(overrides?: Partial<ManagedSession>): ManagedSession {
return {
sessionId: 'sess-1',
projectDir: '/test/project',
projectName: 'project',
status: 'running' as SessionStatus,
client: {
steer: mock.fn(async (_msg: string) => {}),
prompt: mock.fn(async () => ({})),
} as unknown as RpcClient,
events: [],
pendingBlocker: null,
cost: { totalCost: 0, tokens: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 } },
startTime: Date.now(),
...overrides,
};
}
const DEFAULT_CONFIG: DaemonConfig = {
discord: {
token: 'test-token',
guild_id: 'guild-1',
owner_id: 'owner-1',
dm_on_blocker: false,
},
projects: { scan_roots: [] },
log: { file: '/tmp/test.log', level: 'debug', max_size_mb: 10 },
};
function buildBridge(overrides?: Partial<EventBridgeOptions>) {
const sessionManager = createMockSessionManager();
const channelManager = createMockChannelManager();
const client = createMockClient();
const logger = createMockLogger();
const opts: EventBridgeOptions = {
sessionManager: sessionManager as unknown as EventBridgeOptions['sessionManager'],
channelManager: channelManager as unknown as EventBridgeOptions['channelManager'],
client,
config: DEFAULT_CONFIG,
logger: logger as unknown as EventBridgeOptions['logger'],
ownerId: 'owner-1',
...overrides,
};
const bridge = new EventBridge(opts);
return { bridge, sessionManager, channelManager, client, logger };
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
const tick = () => new Promise<void>((r) => setTimeout(r, 30));
function mockFn(obj: unknown): { mock: { callCount(): number; calls: Array<{ arguments: unknown[]; result?: unknown }> } } {
return obj as { mock: { callCount(): number; calls: Array<{ arguments: unknown[]; result?: unknown }> } };
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe('EventBridge', () => {
describe('lifecycle', () => {
it('start() subscribes to session manager events and messageCreate', () => {
const { bridge, sessionManager, client } = buildBridge();
bridge.start();
assert.ok(sessionManager.listenerCount('session:started') > 0);
assert.ok(sessionManager.listenerCount('session:event') > 0);
assert.ok(sessionManager.listenerCount('session:blocked') > 0);
assert.ok(sessionManager.listenerCount('session:completed') > 0);
assert.ok(sessionManager.listenerCount('session:error') > 0);
assert.ok(client.listenerCount('messageCreate') > 0);
});
it('stop() unsubscribes from all events and clears mappings', async () => {
const { bridge, sessionManager, client } = buildBridge();
bridge.start();
await bridge.stop();
assert.equal(sessionManager.listenerCount('session:started'), 0);
assert.equal(sessionManager.listenerCount('session:event'), 0);
assert.equal(sessionManager.listenerCount('session:blocked'), 0);
assert.equal(sessionManager.listenerCount('session:completed'), 0);
assert.equal(sessionManager.listenerCount('session:error'), 0);
assert.equal(client.listenerCount('messageCreate'), 0);
});
it('start() is idempotent', () => {
const { bridge, sessionManager } = buildBridge();
bridge.start();
bridge.start();
assert.equal(sessionManager.listenerCount('session:started'), 1);
});
it('getVerbosityManager() returns a VerbosityManager', () => {
const { bridge } = buildBridge();
const vm = bridge.getVerbosityManager();
assert.ok(vm);
assert.equal(typeof vm.shouldShow, 'function');
});
});
describe('session:started → channel creation + welcome embed', () => {
it('creates channel and batcher', async () => {
const { bridge, sessionManager, channelManager } = buildBridge();
bridge.start();
sessionManager.emit('session:started', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project',
});
await tick();
assert.equal(mockFn(channelManager.createProjectChannel).mock.callCount(), 1);
});
it('logs error and skips when channel creation fails', async () => {
const failingCm = {
createProjectChannel: mock.fn(async () => { throw new Error('API error'); }),
};
const { bridge, sessionManager, logger } = buildBridge({
channelManager: failingCm as unknown as EventBridgeOptions['channelManager'],
});
bridge.start();
sessionManager.emit('session:started', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project',
});
await tick();
assert.ok(mockFn(logger.error).mock.callCount() > 0);
});
});
describe('session:event → format + verbosity filter + enqueue', () => {
it('formats event and enqueues to batcher (no errors)', async () => {
const { bridge, sessionManager, logger } = buildBridge();
bridge.start();
sessionManager.emit('session:started', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project',
});
await tick();
sessionManager.emit('session:event', {
sessionId: 'sess-1', projectDir: '/test/project',
event: { type: 'tool_execution_start', name: 'read' } as SdkAgentEvent,
});
await tick();
// No errors
assert.equal(mockFn(logger.error).mock.callCount(), 0);
});
it('filters events based on verbosity', async () => {
const { bridge, sessionManager, channelManager, logger } = buildBridge();
bridge.start();
sessionManager.emit('session:started', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project',
});
await tick();
// Set quiet mode
bridge.getVerbosityManager().setLevel('ch-123', 'quiet');
// cost_update filtered in quiet
sessionManager.emit('session:event', {
sessionId: 'sess-1', projectDir: '/test/project',
event: { type: 'cost_update', cumulativeCost: 1.5 } as SdkAgentEvent,
});
await tick();
// tool_execution_start filtered in quiet
sessionManager.emit('session:event', {
sessionId: 'sess-1', projectDir: '/test/project',
event: { type: 'tool_execution_start', name: 'read' } as SdkAgentEvent,
});
await tick();
assert.equal(mockFn(logger.error).mock.callCount(), 0);
});
});
describe('session:blocked → blocker embed + buttons + optional DM', () => {
it('sends blocker embed and creates collector for confirm', async () => {
const { bridge, sessionManager, channelManager } = buildBridge();
bridge.start();
sessionManager.emit('session:started', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project',
});
await tick();
const blocker: PendingBlocker = {
id: 'blocker-1', method: 'confirm', message: 'Continue?',
event: { id: 'blocker-1', method: 'confirm', message: 'Continue?' } as RpcExtensionUIRequest,
};
sessionManager.emit('session:blocked', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', blocker,
});
await tick();
assert.ok(mockFn(channelManager._channel.createMessageComponentCollector).mock.callCount() > 0);
});
it('sends DM when dm_on_blocker is configured', async () => {
const config: DaemonConfig = {
...DEFAULT_CONFIG,
discord: { ...DEFAULT_CONFIG.discord!, dm_on_blocker: true },
};
const client = createMockClient();
const { bridge, sessionManager } = buildBridge({ config, client });
bridge.start();
sessionManager.emit('session:started', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project',
});
await tick();
const blocker: PendingBlocker = {
id: 'blocker-1', method: 'input', message: 'Enter API key',
event: { id: 'blocker-1', method: 'input' } as RpcExtensionUIRequest,
};
sessionManager.emit('session:blocked', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', blocker,
});
await tick();
const usersFetch = (client as unknown as Record<string, { fetch: unknown }>).users.fetch;
assert.equal(mockFn(usersFetch).mock.callCount(), 1);
});
it('does not send DM when dm_on_blocker is false', async () => {
const client = createMockClient();
const { bridge, sessionManager } = buildBridge({ client });
bridge.start();
sessionManager.emit('session:started', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project',
});
await tick();
const blocker: PendingBlocker = {
id: 'blocker-1', method: 'input', message: 'Enter value',
event: { id: 'blocker-1', method: 'input' } as RpcExtensionUIRequest,
};
sessionManager.emit('session:blocked', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', blocker,
});
await tick();
const usersFetch = (client as unknown as Record<string, { fetch: unknown }>).users.fetch;
assert.equal(mockFn(usersFetch).mock.callCount(), 0);
});
});
describe('button collector → resolveBlocker', () => {
it('resolves blocker on button click from authorized user', async () => {
const { bridge, sessionManager, channelManager } = buildBridge();
bridge.start();
sessionManager.emit('session:started', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project',
});
await tick();
const blocker: PendingBlocker = {
id: 'blocker-1', method: 'confirm', message: 'Confirm?',
event: { id: 'blocker-1', method: 'confirm' } as RpcExtensionUIRequest,
};
sessionManager.emit('session:blocked', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', blocker,
});
await tick();
const collectorCalls = mockFn(channelManager._channel.createMessageComponentCollector).mock.calls;
assert.ok(collectorCalls.length > 0);
const collector = collectorCalls[0]!.result as EventEmitter;
const mockInteraction = {
customId: 'blocker:blocker-1:confirm:true',
user: { id: 'owner-1' },
update: mock.fn(async () => {}),
reply: mock.fn(async () => {}),
};
collector.emit('collect', mockInteraction);
await tick();
assert.equal(mockFn(sessionManager.resolveBlocker).mock.callCount(), 1);
const args = mockFn(sessionManager.resolveBlocker).mock.calls[0]!.arguments;
assert.equal(args[0], 'sess-1');
assert.equal(args[1], 'true');
});
it('rejects button click from unauthorized user', async () => {
const { bridge, sessionManager, channelManager } = buildBridge();
bridge.start();
sessionManager.emit('session:started', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project',
});
await tick();
const blocker: PendingBlocker = {
id: 'blocker-1', method: 'confirm', message: 'Confirm?',
event: { id: 'blocker-1', method: 'confirm' } as RpcExtensionUIRequest,
};
sessionManager.emit('session:blocked', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', blocker,
});
await tick();
const collectorCalls = mockFn(channelManager._channel.createMessageComponentCollector).mock.calls;
const collector = collectorCalls[0]!.result as EventEmitter;
const mockInteraction = {
customId: 'blocker:blocker-1:confirm:true',
user: { id: 'stranger-99' },
update: mock.fn(async () => {}),
reply: mock.fn(async () => {}),
};
collector.emit('collect', mockInteraction);
await tick();
assert.equal(mockFn(sessionManager.resolveBlocker).mock.callCount(), 0);
assert.equal(mockFn(mockInteraction.reply).mock.callCount(), 1);
});
it('posts error when resolveBlocker throws', async () => {
const { bridge, sessionManager, channelManager } = buildBridge();
sessionManager.resolveBlocker = mock.fn(async () => { throw new Error('No pending blocker'); });
bridge.start();
sessionManager.emit('session:started', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project',
});
await tick();
const blocker: PendingBlocker = {
id: 'blocker-1', method: 'confirm', message: 'Confirm?',
event: { id: 'blocker-1', method: 'confirm' } as RpcExtensionUIRequest,
};
sessionManager.emit('session:blocked', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', blocker,
});
await tick();
const collectorCalls = mockFn(channelManager._channel.createMessageComponentCollector).mock.calls;
const collector = collectorCalls[0]!.result as EventEmitter;
const mockInteraction = {
customId: 'blocker:blocker-1:confirm:true',
user: { id: 'owner-1' },
update: mock.fn(async () => {}),
reply: mock.fn(async () => {}),
};
collector.emit('collect', mockInteraction);
await tick();
assert.equal(mockFn(mockInteraction.reply).mock.callCount(), 1);
const replyArg = mockFn(mockInteraction.reply).mock.calls[0]!.arguments[0] as Record<string, unknown>;
assert.ok(String(replyArg.content).includes('Failed to resolve'));
});
});
describe('messageCreate relay', () => {
it('relays message to session steer when no pending blocker', async () => {
const session = createMockSession();
const { bridge, sessionManager, client } = buildBridge();
sessionManager.getSession = mock.fn(() => session);
bridge.start();
sessionManager.emit('session:started', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project',
});
await tick();
const msg = {
author: { id: 'owner-1', bot: false },
channelId: 'ch-123',
content: 'check the test results',
react: mock.fn(async () => {}),
reply: mock.fn(async () => {}),
};
client.emit('messageCreate', msg);
await tick();
assert.equal(mockFn(session.client.steer).mock.callCount(), 1);
assert.equal(mockFn(session.client.steer).mock.calls[0]!.arguments[0], 'check the test results');
});
it('resolves blocker via relay for input method', async () => {
const blocker: PendingBlocker = {
id: 'blocker-2', method: 'input', message: 'Enter value',
event: { id: 'blocker-2', method: 'input' } as RpcExtensionUIRequest,
};
const session = createMockSession({ pendingBlocker: blocker, status: 'blocked' });
const { bridge, sessionManager, client } = buildBridge();
sessionManager.getSession = mock.fn(() => session);
bridge.start();
sessionManager.emit('session:started', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project',
});
await tick();
const msg = {
author: { id: 'owner-1', bot: false },
channelId: 'ch-123',
content: 'my-api-key-value',
react: mock.fn(async () => {}),
reply: mock.fn(async () => {}),
};
client.emit('messageCreate', msg);
await tick();
assert.equal(mockFn(sessionManager.resolveBlocker).mock.callCount(), 1);
assert.equal(mockFn(sessionManager.resolveBlocker).mock.calls[0]!.arguments[1], 'my-api-key-value');
});
it('ignores bot messages', async () => {
const session = createMockSession();
const { bridge, sessionManager, client } = buildBridge();
sessionManager.getSession = mock.fn(() => session);
bridge.start();
sessionManager.emit('session:started', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project',
});
await tick();
client.emit('messageCreate', {
author: { id: 'bot-1', bot: true },
channelId: 'ch-123',
content: 'automated',
react: mock.fn(async () => {}),
reply: mock.fn(async () => {}),
});
await tick();
assert.equal(mockFn(session.client.steer).mock.callCount(), 0);
});
it('ignores messages in non-project channels', async () => {
const session = createMockSession();
const { bridge, sessionManager, client } = buildBridge();
sessionManager.getSession = mock.fn(() => session);
bridge.start();
client.emit('messageCreate', {
author: { id: 'owner-1', bot: false },
channelId: 'random-ch-999',
content: 'hello',
react: mock.fn(async () => {}),
reply: mock.fn(async () => {}),
});
await tick();
assert.equal(mockFn(session.client.steer).mock.callCount(), 0);
});
it('ignores messages from unauthorized users', async () => {
const session = createMockSession();
const { bridge, sessionManager, client } = buildBridge();
sessionManager.getSession = mock.fn(() => session);
bridge.start();
sessionManager.emit('session:started', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project',
});
await tick();
client.emit('messageCreate', {
author: { id: 'stranger-99', bot: false },
channelId: 'ch-123',
content: 'hack the planet',
react: mock.fn(async () => {}),
reply: mock.fn(async () => {}),
});
await tick();
assert.equal(mockFn(session.client.steer).mock.callCount(), 0);
});
it('posts error when steer fails', async () => {
const session = createMockSession();
(session.client as unknown as Record<string, unknown>).steer = mock.fn(async () => {
throw new Error('session dead');
});
const { bridge, sessionManager, client } = buildBridge();
sessionManager.getSession = mock.fn(() => session);
bridge.start();
sessionManager.emit('session:started', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project',
});
await tick();
const msg = {
author: { id: 'owner-1', bot: false },
channelId: 'ch-123',
content: 'try this',
react: mock.fn(async () => {}),
reply: mock.fn(async () => {}),
};
client.emit('messageCreate', msg);
await tick();
assert.equal(mockFn(msg.reply).mock.callCount(), 1);
});
});
describe('session:completed → cleanup', () => {
it('posts completion embed and cleans up', async () => {
const { bridge, sessionManager, logger } = buildBridge();
bridge.start();
sessionManager.emit('session:started', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project',
});
await tick();
sessionManager.emit('session:completed', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project',
});
await tick();
// After cleanup, events for this session are silently ignored
sessionManager.emit('session:event', {
sessionId: 'sess-1', projectDir: '/test/project',
event: { type: 'tool_execution_start', name: 'read' } as SdkAgentEvent,
});
await tick();
assert.equal(mockFn(logger.error).mock.callCount(), 0);
});
});
describe('session:error → cleanup', () => {
it('posts error embed and cleans up', async () => {
const { bridge, sessionManager, logger } = buildBridge();
bridge.start();
sessionManager.emit('session:started', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project',
});
await tick();
sessionManager.emit('session:error', {
sessionId: 'sess-1', projectDir: '/test/project', projectName: 'my-project', error: 'Process crashed',
});
await tick();
const infoCalls = mockFn(logger.info).mock.calls;
assert.ok(
infoCalls.some((c) => String(c.arguments[0]).includes('session error')),
);
});
});
});

View file

@ -0,0 +1,488 @@
/**
* event-bridge.ts Orchestrator wiring SessionManager events through
* formatter batcher Discord channels.
*
* Handles:
* - Session lifecycle Discord channel creation and cleanup
* - Event streaming format + verbosity filter + batcher
* - Blocker resolution interactive buttons + text relay
* - Conversation relay Discord messages forwarded to GSD sessions
* - DM backup owner gets DM on blocker when dm_on_blocker configured
*/
import type { Client, Message, TextChannel, MessageComponentInteraction } from 'discord.js';
import { EmbedBuilder, ComponentType } from 'discord.js';
import type { SdkAgentEvent } from '@gsd-build/rpc-client';
import type { Logger } from './logger.js';
import type { DaemonConfig, PendingBlocker } from './types.js';
import type { SessionManager } from './session-manager.js';
import type { ChannelManager } from './channel-manager.js';
import { MessageBatcher } from './message-batcher.js';
import { VerbosityManager } from './verbosity.js';
import {
formatEvent,
formatBlocker,
formatSessionStarted,
formatError,
formatCompletion,
} from './event-formatter.js';
import { isAuthorized } from './discord-bot.js';
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
/** Minimal interface for a Discord client — extracted for testability. */
export interface BridgeClient {
on(event: 'messageCreate', listener: (message: Message) => void): void;
off(event: 'messageCreate', listener: (message: Message) => void): void;
users: { fetch(id: string): Promise<{ send(opts: unknown): Promise<unknown> }> };
}
/** Options for creating an EventBridge. */
export interface EventBridgeOptions {
sessionManager: SessionManager;
channelManager: ChannelManager;
client: BridgeClient;
config: DaemonConfig;
logger: Logger;
ownerId: string;
}
// ---------------------------------------------------------------------------
// Collector timeout
// ---------------------------------------------------------------------------
const BLOCKER_COLLECTOR_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 24 hours
// ---------------------------------------------------------------------------
// EventBridge
// ---------------------------------------------------------------------------
export class EventBridge {
private readonly sessionManager: SessionManager;
private readonly channelManager: ChannelManager;
private readonly client: BridgeClient;
private readonly config: DaemonConfig;
private readonly logger: Logger;
private readonly ownerId: string;
/** sessionId → channelId */
private readonly sessionToChannel = new Map<string, string>();
/** channelId → sessionId */
private readonly channelToSession = new Map<string, string>();
/** sessionId → MessageBatcher */
private readonly batchers = new Map<string, MessageBatcher>();
/** sessionId → TextChannel (cached for send operations) */
private readonly channels = new Map<string, TextChannel>();
private readonly verbosity = new VerbosityManager();
/** Bound event handlers for cleanup */
private boundHandlers: {
started: (...args: unknown[]) => void;
event: (...args: unknown[]) => void;
blocked: (...args: unknown[]) => void;
completed: (...args: unknown[]) => void;
error: (...args: unknown[]) => void;
messageCreate: (msg: Message) => void;
} | null = null;
constructor(opts: EventBridgeOptions) {
this.sessionManager = opts.sessionManager;
this.channelManager = opts.channelManager;
this.client = opts.client;
this.config = opts.config;
this.logger = opts.logger;
this.ownerId = opts.ownerId;
}
// -----------------------------------------------------------------------
// Lifecycle
// -----------------------------------------------------------------------
/** Subscribe to SessionManager events and Discord messageCreate. */
start(): void {
if (this.boundHandlers) return; // already started
this.boundHandlers = {
started: (data: unknown) => {
void this.onSessionStarted(data as SessionStartedPayload);
},
event: (data: unknown) => {
void this.onSessionEvent(data as SessionEventPayload);
},
blocked: (data: unknown) => {
void this.onSessionBlocked(data as SessionBlockedPayload);
},
completed: (data: unknown) => {
void this.onSessionCompleted(data as SessionCompletedPayload);
},
error: (data: unknown) => {
void this.onSessionError(data as SessionErrorPayload);
},
messageCreate: (msg: Message) => {
void this.handleMessageCreate(msg);
},
};
this.sessionManager.on('session:started', this.boundHandlers.started);
this.sessionManager.on('session:event', this.boundHandlers.event);
this.sessionManager.on('session:blocked', this.boundHandlers.blocked);
this.sessionManager.on('session:completed', this.boundHandlers.completed);
this.sessionManager.on('session:error', this.boundHandlers.error);
this.client.on('messageCreate', this.boundHandlers.messageCreate);
this.logger.info('event bridge started');
}
/** Unsubscribe from all events, destroy batchers, clear mappings. */
async stop(): Promise<void> {
if (this.boundHandlers) {
this.sessionManager.off('session:started', this.boundHandlers.started);
this.sessionManager.off('session:event', this.boundHandlers.event);
this.sessionManager.off('session:blocked', this.boundHandlers.blocked);
this.sessionManager.off('session:completed', this.boundHandlers.completed);
this.sessionManager.off('session:error', this.boundHandlers.error);
this.client.off('messageCreate', this.boundHandlers.messageCreate);
this.boundHandlers = null;
}
// Destroy all batchers
const destroyPromises: Promise<void>[] = [];
for (const batcher of this.batchers.values()) {
destroyPromises.push(batcher.destroy());
}
await Promise.allSettled(destroyPromises);
this.batchers.clear();
this.sessionToChannel.clear();
this.channelToSession.clear();
this.channels.clear();
this.logger.info('event bridge stopped');
}
/** Expose the verbosity manager for slash-command integration. */
getVerbosityManager(): VerbosityManager {
return this.verbosity;
}
// -----------------------------------------------------------------------
// SessionManager event handlers
// -----------------------------------------------------------------------
private async onSessionStarted(data: SessionStartedPayload): Promise<void> {
const { sessionId, projectDir, projectName } = data;
try {
const channel = await this.channelManager.createProjectChannel(projectDir);
// Create batcher with channel.send as the send function
const batcher = new MessageBatcher(
async (payload) => {
await channel.send(payload as Parameters<TextChannel['send']>[0]);
},
this.logger,
);
batcher.start();
// Register bidirectional mapping
this.sessionToChannel.set(sessionId, channel.id);
this.channelToSession.set(channel.id, sessionId);
this.batchers.set(sessionId, batcher);
this.channels.set(sessionId, channel);
// Post welcome embed
const welcome = formatSessionStarted(projectName);
batcher.enqueue(welcome);
this.logger.info('bridge: session channel created', {
sessionId,
channelId: channel.id,
projectName,
});
} catch (err) {
// Failure mode: log error, skip streaming for this session
this.logger.error('bridge: channel creation failed', {
sessionId,
projectDir,
error: err instanceof Error ? err.message : String(err),
});
}
}
private async onSessionEvent(data: SessionEventPayload): Promise<void> {
const { sessionId, event } = data;
const channelId = this.sessionToChannel.get(sessionId);
if (!channelId) return; // no channel for this session
// Verbosity filter
const eventType = (event as Record<string, unknown>).type as string;
if (!this.verbosity.shouldShow(channelId, eventType)) return;
const formatted = formatEvent(event, this.ownerId);
const batcher = this.batchers.get(sessionId);
if (batcher) {
batcher.enqueue(formatted);
}
}
private async onSessionBlocked(data: SessionBlockedPayload): Promise<void> {
const { sessionId, projectName, blocker } = data;
const channel = this.channels.get(sessionId);
if (!channel) return;
const formatted = formatBlocker(blocker, this.ownerId);
// Send immediately (bypasses batching for blockers)
const batcher = this.batchers.get(sessionId);
if (batcher) {
await batcher.enqueueImmediate(formatted);
}
// For select/confirm methods, set up button collector
if (blocker.method === 'select' || blocker.method === 'confirm') {
this.createButtonCollector(sessionId, channel, blocker);
}
// DM backup
if (this.config.discord?.dm_on_blocker) {
await this.sendBlockerDM(sessionId, projectName, blocker);
}
}
private async onSessionCompleted(data: SessionCompletedPayload): Promise<void> {
const { sessionId, projectName } = data;
const batcher = this.batchers.get(sessionId);
if (!batcher) return;
const completion = formatCompletion({
type: 'execution_complete',
status: 'completed',
} as SdkAgentEvent);
// Flush through batcher then cleanup
batcher.enqueue(completion);
await this.cleanupSession(sessionId);
this.logger.info('bridge: session completed', { sessionId, projectName });
}
private async onSessionError(data: SessionErrorPayload): Promise<void> {
const { sessionId, projectName, error } = data;
const batcher = this.batchers.get(sessionId);
if (!batcher) return;
const errorEmbed = formatError(sessionId, error);
batcher.enqueue(errorEmbed);
await this.cleanupSession(sessionId);
this.logger.info('bridge: session error', { sessionId, projectName, error });
}
// -----------------------------------------------------------------------
// Blocker resolution — button collector
// -----------------------------------------------------------------------
private createButtonCollector(
sessionId: string,
channel: TextChannel,
blocker: PendingBlocker,
): void {
// Create a message collector on the channel for button interactions
// We use createMessageComponentCollector on the channel
try {
const collector = channel.createMessageComponentCollector({
componentType: ComponentType.Button,
time: BLOCKER_COLLECTOR_TIMEOUT_MS,
filter: (interaction: MessageComponentInteraction) => {
return interaction.customId.startsWith(`blocker:${blocker.id}:`);
},
});
collector.on('collect', async (interaction: MessageComponentInteraction) => {
// Auth guard
if (!isAuthorized(interaction.user.id, this.ownerId)) {
await interaction.reply({
content: '⛔ Only the project owner can respond to blockers.',
ephemeral: true,
}).catch(() => {});
return;
}
// Parse customId: blocker:{id}:{method}:{value}
const parts = interaction.customId.split(':');
const value = parts[3] ?? '';
try {
await this.sessionManager.resolveBlocker(sessionId, value);
await interaction.update({
content: `✅ Blocker resolved with: ${value}`,
components: [],
}).catch(() => {});
collector.stop('resolved');
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
this.logger.error('bridge: blocker resolve failed', { sessionId, error: errMsg });
await interaction.reply({
content: `❌ Failed to resolve blocker: ${errMsg}`,
ephemeral: true,
}).catch(() => {});
}
});
collector.on('end', (_collected, reason) => {
if (reason === 'time') {
// Timeout: edit to show expired
this.logger.info('bridge: blocker collector timed out', { sessionId, blockerId: blocker.id });
// Post a new message indicating expiry — editing original may fail
const batcher = this.batchers.get(sessionId);
if (batcher) {
batcher.enqueue({
content: `⏰ Blocker response timed out after 24h. Re-posting...`,
embed: new EmbedBuilder()
.setColor(0xf1c40f)
.setTitle('⏰ Blocker Expired')
.setDescription(blocker.message)
.setTimestamp(),
});
}
}
});
} catch (err) {
this.logger.error('bridge: collector creation failed', {
sessionId,
error: err instanceof Error ? err.message : String(err),
});
}
}
// -----------------------------------------------------------------------
// DM backup
// -----------------------------------------------------------------------
private async sendBlockerDM(
sessionId: string,
projectName: string,
blocker: PendingBlocker,
): Promise<void> {
try {
const user = await this.client.users.fetch(this.ownerId);
await user.send({
content: `⚠️ **Blocker** in **${projectName}** — ${blocker.message}\n\nRespond in the project channel.`,
});
this.logger.debug('bridge: DM sent for blocker', { sessionId, blockerId: blocker.id });
} catch (err) {
// DM failure is non-fatal — channel message is the primary path
this.logger.warn('bridge: DM send failed', {
sessionId,
error: err instanceof Error ? err.message : String(err),
});
}
}
// -----------------------------------------------------------------------
// Conversation relay — Discord → GSD
// -----------------------------------------------------------------------
private async handleMessageCreate(message: Message): Promise<void> {
// Filter: bot messages
if (message.author.bot) return;
// Filter: must be in a project channel
const sessionId = this.channelToSession.get(message.channelId);
if (!sessionId) return;
// Filter: must be authorized
if (!isAuthorized(message.author.id, this.ownerId)) return;
const session = this.sessionManager.getSession(sessionId);
if (!session) return;
// If session has a pending blocker with input/editor method, resolve it
if (session.pendingBlocker && (session.pendingBlocker.method === 'input' || session.pendingBlocker.method === 'editor')) {
try {
await this.sessionManager.resolveBlocker(sessionId, message.content);
await message.react('✅').catch(() => {});
this.logger.info('bridge: blocker resolved via relay', {
sessionId,
method: session.pendingBlocker.method,
});
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
this.logger.error('bridge: relay blocker resolve failed', { sessionId, error: errMsg });
await message.reply(`❌ Failed to resolve blocker: ${errMsg}`).catch(() => {});
}
return;
}
// Otherwise, steer the session with the message content
if (session.status === 'running') {
try {
await session.client.steer(message.content);
await message.react('📨').catch(() => {});
this.logger.info('bridge: message relayed to session', { sessionId });
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
this.logger.error('bridge: steer failed', { sessionId, error: errMsg });
await message.reply(`❌ Failed to relay message: ${errMsg}`).catch(() => {});
}
}
}
// -----------------------------------------------------------------------
// Cleanup
// -----------------------------------------------------------------------
private async cleanupSession(sessionId: string): Promise<void> {
const batcher = this.batchers.get(sessionId);
if (batcher) {
await batcher.destroy();
this.batchers.delete(sessionId);
}
const channelId = this.sessionToChannel.get(sessionId);
if (channelId) {
this.channelToSession.delete(channelId);
}
this.sessionToChannel.delete(sessionId);
this.channels.delete(sessionId);
}
}
// ---------------------------------------------------------------------------
// Internal event payload types (matching SessionManager emissions)
// ---------------------------------------------------------------------------
interface SessionStartedPayload {
sessionId: string;
projectDir: string;
projectName: string;
}
interface SessionEventPayload {
sessionId: string;
projectDir: string;
event: SdkAgentEvent;
}
interface SessionBlockedPayload {
sessionId: string;
projectDir: string;
projectName: string;
blocker: PendingBlocker;
}
interface SessionCompletedPayload {
sessionId: string;
projectDir: string;
projectName: string;
}
interface SessionErrorPayload {
sessionId: string;
projectDir: string;
projectName: string;
error: string;
}