From 05abf869123cc4e70450ca0b36f3ef9004616756 Mon Sep 17 00:00:00 2001 From: Lex Christopherson Date: Fri, 27 Mar 2026 15:04:55 -0600 Subject: [PATCH] =?UTF-8?q?test:=20Built=20rate-limit-aware=20MessageBatch?= =?UTF-8?q?er=20with=20timer/capacity=20flush,=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - "packages/daemon/src/message-batcher.ts" - "packages/daemon/src/message-batcher.test.ts" GSD-Task: S04/T02 --- packages/daemon/src/message-batcher.test.ts | 308 ++++++++++++++++++++ packages/daemon/src/message-batcher.ts | 210 +++++++++++++ 2 files changed, 518 insertions(+) create mode 100644 packages/daemon/src/message-batcher.test.ts create mode 100644 packages/daemon/src/message-batcher.ts diff --git a/packages/daemon/src/message-batcher.test.ts b/packages/daemon/src/message-batcher.test.ts new file mode 100644 index 000000000..70c682ea4 --- /dev/null +++ b/packages/daemon/src/message-batcher.test.ts @@ -0,0 +1,308 @@ +import { describe, it, beforeEach, afterEach, mock } from 'node:test'; +import assert from 'node:assert/strict'; +import { MessageBatcher } from './message-batcher.js'; +import type { SendPayload, BatcherLogger } from './message-batcher.js'; +import type { FormattedEvent } from './types.js'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** Create a minimal FormattedEvent for testing. */ +function fakeEvent(content: string, hasEmbed = false): FormattedEvent { + const fe: FormattedEvent = { content }; + if (hasEmbed) { + // Minimal mock embed — just needs to be truthy and pass through + fe.embed = { data: { title: content } } as any; + } + return fe; +} + +/** Create a tracking send function. */ +function createSend() { + const calls: SendPayload[] = []; + const fn = mock.fn(async (payload: SendPayload) => { + calls.push(payload); + }); + return { fn, calls }; +} + +/** Create a logger that captures error/warn calls. */ +function createLogger() { + const errors: string[] = []; + const warns: string[] = []; + const debugs: string[] = []; + const logger: BatcherLogger = { + error(msg: string) { errors.push(msg); }, + warn(msg: string) { warns.push(msg); }, + debug(msg: string) { debugs.push(msg); }, + }; + return { logger, errors, warns, debugs }; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('MessageBatcher', () => { + describe('enqueue + capacity flush', () => { + it('flushes when buffer reaches maxBatchSize', async () => { + const { fn, calls } = createSend(); + const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 3, flushIntervalMs: 60_000 }); + + batcher.enqueue(fakeEvent('a')); + batcher.enqueue(fakeEvent('b')); + assert.equal(calls.length, 0, 'should not flush yet'); + + batcher.enqueue(fakeEvent('c')); // hits capacity + // flush is async — give it a tick + await new Promise((r) => setTimeout(r, 10)); + + assert.equal(calls.length, 1, 'should have flushed once'); + assert.equal(calls[0].content, 'a\nb\nc'); + assert.equal(batcher.pending, 0); + + await batcher.destroy(); + }); + + it('combines embeds into a single send call', async () => { + const { fn, calls } = createSend(); + const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 2, flushIntervalMs: 60_000 }); + + batcher.enqueue(fakeEvent('a', true)); + batcher.enqueue(fakeEvent('b', true)); // triggers flush + await new Promise((r) => setTimeout(r, 10)); + + assert.equal(calls.length, 1); + assert.equal(calls[0].embeds.length, 2); + assert.equal(calls[0].content, 'a\nb'); + + await batcher.destroy(); + }); + }); + + describe('enqueueImmediate', () => { + it('flushes pending buffer then sends immediately', async () => { + const { fn, calls } = createSend(); + const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 10, flushIntervalMs: 60_000 }); + + batcher.enqueue(fakeEvent('buffered-1')); + batcher.enqueue(fakeEvent('buffered-2')); + + await batcher.enqueueImmediate(fakeEvent('blocker!')); + + // First call: the pending buffer flush + // Second call: the immediate event + assert.equal(calls.length, 2, 'should have two send calls'); + assert.equal(calls[0].content, 'buffered-1\nbuffered-2'); + assert.equal(calls[1].content, 'blocker!'); + assert.equal(batcher.pending, 0); + + await batcher.destroy(); + }); + + it('sends immediately when buffer is empty', async () => { + const { fn, calls } = createSend(); + const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 10, flushIntervalMs: 60_000 }); + + await batcher.enqueueImmediate(fakeEvent('urgent')); + + assert.equal(calls.length, 1); + assert.equal(calls[0].content, 'urgent'); + + await batcher.destroy(); + }); + }); + + describe('timer-based flush', () => { + it('flushes on interval', async () => { + const { fn, calls } = createSend(); + const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 100, flushIntervalMs: 50 }); + batcher.start(); + + batcher.enqueue(fakeEvent('timed-1')); + batcher.enqueue(fakeEvent('timed-2')); + + // Wait longer than flushIntervalMs + await new Promise((r) => setTimeout(r, 120)); + + assert.ok(calls.length >= 1, 'timer should have triggered at least one flush'); + assert.equal(calls[0].content, 'timed-1\ntimed-2'); + assert.equal(batcher.pending, 0); + + await batcher.destroy(); + }); + + it('stop prevents further timer flushes', async () => { + const { fn, calls } = createSend(); + const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 100, flushIntervalMs: 30 }); + batcher.start(); + batcher.stop(); + + batcher.enqueue(fakeEvent('orphan')); + await new Promise((r) => setTimeout(r, 80)); + + assert.equal(calls.length, 0, 'no flush after stop'); + // Cleanup without triggering flush timer + batcher.stop(); // idempotent + // Manually drain for cleanup + await batcher.destroy(); + }); + }); + + describe('destroy', () => { + it('flushes remaining buffer on destroy', async () => { + const { fn, calls } = createSend(); + const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 100, flushIntervalMs: 60_000 }); + + batcher.enqueue(fakeEvent('leftover-1')); + batcher.enqueue(fakeEvent('leftover-2')); + + await batcher.destroy(); + + assert.equal(calls.length, 1); + assert.equal(calls[0].content, 'leftover-1\nleftover-2'); + }); + + it('is idempotent — second destroy is no-op', async () => { + const { fn, calls } = createSend(); + const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 100, flushIntervalMs: 60_000 }); + + batcher.enqueue(fakeEvent('once')); + await batcher.destroy(); + await batcher.destroy(); // second call + + assert.equal(calls.length, 1, 'only flushed once'); + }); + + it('enqueue after destroy is silently ignored', async () => { + const { fn, calls } = createSend(); + const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 2, flushIntervalMs: 60_000 }); + await batcher.destroy(); + + batcher.enqueue(fakeEvent('post-destroy')); + await new Promise((r) => setTimeout(r, 10)); + + assert.equal(calls.length, 0, 'no sends after destroy'); + }); + }); + + describe('empty buffer', () => { + it('flush of empty buffer is no-op', async () => { + const { fn, calls } = createSend(); + const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 100, flushIntervalMs: 60_000 }); + batcher.start(); + + // Force a timer tick with an empty buffer + await new Promise((r) => setTimeout(r, 10)); + await batcher.destroy(); + + // Only the destroy-triggered flush, which should also be a no-op + assert.equal(calls.length, 0, 'no sends for empty buffer'); + }); + }); + + describe('single-item flush', () => { + it('handles a single item in buffer at destroy', async () => { + const { fn, calls } = createSend(); + const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 100, flushIntervalMs: 60_000 }); + + batcher.enqueue(fakeEvent('solo')); + await batcher.destroy(); + + assert.equal(calls.length, 1); + assert.equal(calls[0].content, 'solo'); + assert.equal(calls[0].embeds.length, 0); + assert.equal(calls[0].components.length, 0); + }); + }); + + describe('error handling', () => { + it('logs error and continues when send throws', async () => { + let attempt = 0; + const sendFn = async () => { + attempt++; + throw new Error('Discord rate limit'); + }; + const { logger, errors, warns } = createLogger(); + const batcher = new MessageBatcher(sendFn, logger, { maxBatchSize: 2, flushIntervalMs: 60_000 }); + + batcher.enqueue(fakeEvent('x')); + batcher.enqueue(fakeEvent('y')); // triggers flush + // Wait for flush + retry + await new Promise((r) => setTimeout(r, 1500)); + + assert.ok(errors.length >= 1, 'should have logged an error'); + assert.ok(warns.length >= 1, 'should have logged a warning on retry failure'); + assert.equal(batcher.pending, 0, 'buffer cleared even on error'); + + // Batcher should still be alive — enqueue more + batcher.enqueue(fakeEvent('after-error')); + assert.equal(batcher.pending, 1, 'can still enqueue after error'); + + await batcher.destroy(); + }); + + it('succeeds on retry if first attempt fails', async () => { + let attempt = 0; + const calls: SendPayload[] = []; + const sendFn = async (payload: SendPayload) => { + attempt++; + if (attempt === 1) throw new Error('transient'); + calls.push(payload); + }; + const { logger, errors } = createLogger(); + const batcher = new MessageBatcher(sendFn, logger, { maxBatchSize: 2, flushIntervalMs: 60_000 }); + + batcher.enqueue(fakeEvent('retry-me')); + batcher.enqueue(fakeEvent('retry-too')); + // Wait for flush + retry delay + await new Promise((r) => setTimeout(r, 1500)); + + assert.equal(errors.length, 1, 'logged one error on first attempt'); + assert.equal(calls.length, 1, 'retry succeeded'); + assert.equal(calls[0].content, 'retry-me\nretry-too'); + + await batcher.destroy(); + }); + }); + + describe('buffer at exactly capacity', () => { + it('flushes at exactly maxBatchSize', async () => { + const { fn, calls } = createSend(); + const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 4, flushIntervalMs: 60_000 }); + + batcher.enqueue(fakeEvent('1')); + batcher.enqueue(fakeEvent('2')); + batcher.enqueue(fakeEvent('3')); + assert.equal(calls.length, 0, 'not flushed at 3/4'); + + batcher.enqueue(fakeEvent('4')); // exactly at capacity + await new Promise((r) => setTimeout(r, 10)); + + assert.equal(calls.length, 1); + assert.equal(calls[0].content, '1\n2\n3\n4'); + + await batcher.destroy(); + }); + }); + + describe('components handling', () => { + it('uses components from the last event that has them', async () => { + const { fn, calls } = createSend(); + const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 3, flushIntervalMs: 60_000 }); + + const fakeRow = { type: 'ActionRow', components: [] }; + batcher.enqueue(fakeEvent('no-components')); + batcher.enqueue({ content: 'with-components', components: [fakeRow] } as any); + batcher.enqueue(fakeEvent('also-no-components')); // triggers flush + + await new Promise((r) => setTimeout(r, 10)); + + assert.equal(calls.length, 1); + assert.deepEqual(calls[0].components, [fakeRow]); + + await batcher.destroy(); + }); + }); +}); diff --git a/packages/daemon/src/message-batcher.ts b/packages/daemon/src/message-batcher.ts new file mode 100644 index 000000000..fb09cedae --- /dev/null +++ b/packages/daemon/src/message-batcher.ts @@ -0,0 +1,210 @@ +/** + * message-batcher.ts — Rate-limit-aware message batcher for Discord. + * + * Accumulates FormattedEvent payloads and flushes them to a Discord channel + * respecting the 5 msg/5s rate limit. Supports: + * - Timer-based periodic flush (default 1.5s) + * - Capacity-based flush when buffer hits maxBatchSize + * - Immediate priority flush for blockers (bypasses batching) + * - Combining multiple embeds into a single send() call + * - Error isolation: send() failures are logged, never crash the batcher + */ + +import type { FormattedEvent } from './types.js'; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +/** Payload passed to the send callback — matches Discord TextChannel.send() shape. */ +export interface SendPayload { + content: string; + embeds: unknown[]; + components: unknown[]; +} + +/** Send callback abstraction. Returns void or a promise. */ +export type SendFn = (payload: SendPayload) => Promise | void; + +/** Logger interface — just needs error/warn/debug. */ +export interface BatcherLogger { + error(msg: string, data?: Record): void; + warn(msg: string, data?: Record): void; + debug(msg: string, data?: Record): void; +} + +/** MessageBatcher configuration options. */ +export interface BatcherOptions { + /** Interval between timed flushes in ms. Default: 1500 */ + flushIntervalMs?: number; + /** Max events before triggering an immediate capacity flush. Default: 4 */ + maxBatchSize?: number; +} + +// --------------------------------------------------------------------------- +// Default no-op logger +// --------------------------------------------------------------------------- + +const noopLogger: BatcherLogger = { + error() {}, + warn() {}, + debug() {}, +}; + +// --------------------------------------------------------------------------- +// MessageBatcher +// --------------------------------------------------------------------------- + +export class MessageBatcher { + private readonly send: SendFn; + private readonly logger: BatcherLogger; + private readonly flushIntervalMs: number; + private readonly maxBatchSize: number; + + private buffer: FormattedEvent[] = []; + private timer: ReturnType | null = null; + private flushing = false; + private destroyed = false; + + constructor(send: SendFn, logger?: BatcherLogger, options?: BatcherOptions) { + this.send = send; + this.logger = logger ?? noopLogger; + this.flushIntervalMs = options?.flushIntervalMs ?? 1500; + this.maxBatchSize = options?.maxBatchSize ?? 4; + } + + // ----------------------------------------------------------------------- + // Public API + // ----------------------------------------------------------------------- + + /** Start the periodic flush timer. */ + start(): void { + if (this.timer) return; // already running + this.timer = setInterval(() => { + void this.flush(); + }, this.flushIntervalMs); + // Don't hold the process open for the timer + if (this.timer && typeof this.timer === 'object' && 'unref' in this.timer) { + this.timer.unref(); + } + this.logger.debug('Batcher started', { flushIntervalMs: this.flushIntervalMs }); + } + + /** Stop the periodic flush timer without flushing. */ + stop(): void { + if (this.timer) { + clearInterval(this.timer); + this.timer = null; + } + this.logger.debug('Batcher stopped'); + } + + /** Flush remaining buffer and stop. Safe to call multiple times. */ + async destroy(): Promise { + if (this.destroyed) return; + this.destroyed = true; + this.stop(); + await this.flush(); + this.logger.debug('Batcher destroyed'); + } + + /** + * Enqueue a formatted event for batched sending. + * Triggers an immediate capacity flush if buffer reaches maxBatchSize. + */ + enqueue(formatted: FormattedEvent): void { + if (this.destroyed) return; + this.buffer.push(formatted); + if (this.buffer.length >= this.maxBatchSize) { + void this.flush(); + } + } + + /** + * Immediately send a high-priority event (e.g. blocker). + * Flushes any pending buffer first, then sends the priority event alone. + */ + async enqueueImmediate(formatted: FormattedEvent): Promise { + if (this.destroyed) return; + // Flush pending buffer first so ordering is preserved + await this.flush(); + // Send the priority event immediately, alone + await this.doSend([formatted]); + } + + /** Current number of events in the buffer (for testing/diagnostics). */ + get pending(): number { + return this.buffer.length; + } + + // ----------------------------------------------------------------------- + // Internal + // ----------------------------------------------------------------------- + + /** + * Flush the current buffer as a single Discord message. + * Multiple embeds are combined into one send() call (Discord supports up to 10). + * No-op if buffer is empty. + */ + private async flush(): Promise { + if (this.buffer.length === 0) return; + if (this.flushing) return; // prevent re-entrant flush + + this.flushing = true; + const batch = this.buffer.splice(0); // take all + try { + await this.doSend(batch); + } finally { + this.flushing = false; + } + } + + /** + * Build a SendPayload from a batch of FormattedEvents and invoke the send callback. + * Catches and logs errors — never throws. + */ + private async doSend(batch: FormattedEvent[]): Promise { + if (batch.length === 0) return; + + // Combine content lines + const content = batch.map((e) => e.content).join('\n'); + + // Collect all embeds (Discord allows up to 10 per message) + const embeds: unknown[] = []; + for (const e of batch) { + if (e.embed) embeds.push(e.embed); + } + + // Collect all component rows (only from the last event with components — + // Discord only supports one set of components per message) + let components: unknown[] = []; + for (const e of batch) { + if (e.components && e.components.length > 0) { + components = e.components; + } + } + + const payload: SendPayload = { content, embeds, components }; + + try { + await this.send(payload); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + this.logger.error('Batcher send failed', { error: message, batchSize: batch.length }); + + // Retry once after a short delay + try { + await new Promise((r) => setTimeout(r, 1000)); + await this.send(payload); + this.logger.debug('Batcher retry succeeded'); + } catch (retryErr) { + const retryMessage = retryErr instanceof Error ? retryErr.message : String(retryErr); + this.logger.warn('Batcher retry also failed, dropping batch', { + error: retryMessage, + batchSize: batch.length, + }); + // Drop the batch — don't re-enqueue to prevent infinite loops + } + } + } +}