test: Built rate-limit-aware MessageBatcher with timer/capacity flush,…

- "packages/daemon/src/message-batcher.ts"
- "packages/daemon/src/message-batcher.test.ts"

GSD-Task: S04/T02
This commit is contained in:
Lex Christopherson 2026-03-27 15:04:55 -06:00
parent 4c8bbca46f
commit 05abf86912
2 changed files with 518 additions and 0 deletions

View file

@ -0,0 +1,308 @@
import { describe, it, beforeEach, afterEach, mock } from 'node:test';
import assert from 'node:assert/strict';
import { MessageBatcher } from './message-batcher.js';
import type { SendPayload, BatcherLogger } from './message-batcher.js';
import type { FormattedEvent } from './types.js';
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
/** Create a minimal FormattedEvent for testing. */
function fakeEvent(content: string, hasEmbed = false): FormattedEvent {
const fe: FormattedEvent = { content };
if (hasEmbed) {
// Minimal mock embed — just needs to be truthy and pass through
fe.embed = { data: { title: content } } as any;
}
return fe;
}
/** Create a tracking send function. */
function createSend() {
const calls: SendPayload[] = [];
const fn = mock.fn(async (payload: SendPayload) => {
calls.push(payload);
});
return { fn, calls };
}
/** Create a logger that captures error/warn calls. */
function createLogger() {
const errors: string[] = [];
const warns: string[] = [];
const debugs: string[] = [];
const logger: BatcherLogger = {
error(msg: string) { errors.push(msg); },
warn(msg: string) { warns.push(msg); },
debug(msg: string) { debugs.push(msg); },
};
return { logger, errors, warns, debugs };
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe('MessageBatcher', () => {
describe('enqueue + capacity flush', () => {
it('flushes when buffer reaches maxBatchSize', async () => {
const { fn, calls } = createSend();
const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 3, flushIntervalMs: 60_000 });
batcher.enqueue(fakeEvent('a'));
batcher.enqueue(fakeEvent('b'));
assert.equal(calls.length, 0, 'should not flush yet');
batcher.enqueue(fakeEvent('c')); // hits capacity
// flush is async — give it a tick
await new Promise((r) => setTimeout(r, 10));
assert.equal(calls.length, 1, 'should have flushed once');
assert.equal(calls[0].content, 'a\nb\nc');
assert.equal(batcher.pending, 0);
await batcher.destroy();
});
it('combines embeds into a single send call', async () => {
const { fn, calls } = createSend();
const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 2, flushIntervalMs: 60_000 });
batcher.enqueue(fakeEvent('a', true));
batcher.enqueue(fakeEvent('b', true)); // triggers flush
await new Promise((r) => setTimeout(r, 10));
assert.equal(calls.length, 1);
assert.equal(calls[0].embeds.length, 2);
assert.equal(calls[0].content, 'a\nb');
await batcher.destroy();
});
});
describe('enqueueImmediate', () => {
it('flushes pending buffer then sends immediately', async () => {
const { fn, calls } = createSend();
const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 10, flushIntervalMs: 60_000 });
batcher.enqueue(fakeEvent('buffered-1'));
batcher.enqueue(fakeEvent('buffered-2'));
await batcher.enqueueImmediate(fakeEvent('blocker!'));
// First call: the pending buffer flush
// Second call: the immediate event
assert.equal(calls.length, 2, 'should have two send calls');
assert.equal(calls[0].content, 'buffered-1\nbuffered-2');
assert.equal(calls[1].content, 'blocker!');
assert.equal(batcher.pending, 0);
await batcher.destroy();
});
it('sends immediately when buffer is empty', async () => {
const { fn, calls } = createSend();
const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 10, flushIntervalMs: 60_000 });
await batcher.enqueueImmediate(fakeEvent('urgent'));
assert.equal(calls.length, 1);
assert.equal(calls[0].content, 'urgent');
await batcher.destroy();
});
});
describe('timer-based flush', () => {
it('flushes on interval', async () => {
const { fn, calls } = createSend();
const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 100, flushIntervalMs: 50 });
batcher.start();
batcher.enqueue(fakeEvent('timed-1'));
batcher.enqueue(fakeEvent('timed-2'));
// Wait longer than flushIntervalMs
await new Promise((r) => setTimeout(r, 120));
assert.ok(calls.length >= 1, 'timer should have triggered at least one flush');
assert.equal(calls[0].content, 'timed-1\ntimed-2');
assert.equal(batcher.pending, 0);
await batcher.destroy();
});
it('stop prevents further timer flushes', async () => {
const { fn, calls } = createSend();
const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 100, flushIntervalMs: 30 });
batcher.start();
batcher.stop();
batcher.enqueue(fakeEvent('orphan'));
await new Promise((r) => setTimeout(r, 80));
assert.equal(calls.length, 0, 'no flush after stop');
// Cleanup without triggering flush timer
batcher.stop(); // idempotent
// Manually drain for cleanup
await batcher.destroy();
});
});
describe('destroy', () => {
it('flushes remaining buffer on destroy', async () => {
const { fn, calls } = createSend();
const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 100, flushIntervalMs: 60_000 });
batcher.enqueue(fakeEvent('leftover-1'));
batcher.enqueue(fakeEvent('leftover-2'));
await batcher.destroy();
assert.equal(calls.length, 1);
assert.equal(calls[0].content, 'leftover-1\nleftover-2');
});
it('is idempotent — second destroy is no-op', async () => {
const { fn, calls } = createSend();
const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 100, flushIntervalMs: 60_000 });
batcher.enqueue(fakeEvent('once'));
await batcher.destroy();
await batcher.destroy(); // second call
assert.equal(calls.length, 1, 'only flushed once');
});
it('enqueue after destroy is silently ignored', async () => {
const { fn, calls } = createSend();
const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 2, flushIntervalMs: 60_000 });
await batcher.destroy();
batcher.enqueue(fakeEvent('post-destroy'));
await new Promise((r) => setTimeout(r, 10));
assert.equal(calls.length, 0, 'no sends after destroy');
});
});
describe('empty buffer', () => {
it('flush of empty buffer is no-op', async () => {
const { fn, calls } = createSend();
const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 100, flushIntervalMs: 60_000 });
batcher.start();
// Force a timer tick with an empty buffer
await new Promise((r) => setTimeout(r, 10));
await batcher.destroy();
// Only the destroy-triggered flush, which should also be a no-op
assert.equal(calls.length, 0, 'no sends for empty buffer');
});
});
describe('single-item flush', () => {
it('handles a single item in buffer at destroy', async () => {
const { fn, calls } = createSend();
const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 100, flushIntervalMs: 60_000 });
batcher.enqueue(fakeEvent('solo'));
await batcher.destroy();
assert.equal(calls.length, 1);
assert.equal(calls[0].content, 'solo');
assert.equal(calls[0].embeds.length, 0);
assert.equal(calls[0].components.length, 0);
});
});
describe('error handling', () => {
it('logs error and continues when send throws', async () => {
let attempt = 0;
const sendFn = async () => {
attempt++;
throw new Error('Discord rate limit');
};
const { logger, errors, warns } = createLogger();
const batcher = new MessageBatcher(sendFn, logger, { maxBatchSize: 2, flushIntervalMs: 60_000 });
batcher.enqueue(fakeEvent('x'));
batcher.enqueue(fakeEvent('y')); // triggers flush
// Wait for flush + retry
await new Promise((r) => setTimeout(r, 1500));
assert.ok(errors.length >= 1, 'should have logged an error');
assert.ok(warns.length >= 1, 'should have logged a warning on retry failure');
assert.equal(batcher.pending, 0, 'buffer cleared even on error');
// Batcher should still be alive — enqueue more
batcher.enqueue(fakeEvent('after-error'));
assert.equal(batcher.pending, 1, 'can still enqueue after error');
await batcher.destroy();
});
it('succeeds on retry if first attempt fails', async () => {
let attempt = 0;
const calls: SendPayload[] = [];
const sendFn = async (payload: SendPayload) => {
attempt++;
if (attempt === 1) throw new Error('transient');
calls.push(payload);
};
const { logger, errors } = createLogger();
const batcher = new MessageBatcher(sendFn, logger, { maxBatchSize: 2, flushIntervalMs: 60_000 });
batcher.enqueue(fakeEvent('retry-me'));
batcher.enqueue(fakeEvent('retry-too'));
// Wait for flush + retry delay
await new Promise((r) => setTimeout(r, 1500));
assert.equal(errors.length, 1, 'logged one error on first attempt');
assert.equal(calls.length, 1, 'retry succeeded');
assert.equal(calls[0].content, 'retry-me\nretry-too');
await batcher.destroy();
});
});
describe('buffer at exactly capacity', () => {
it('flushes at exactly maxBatchSize', async () => {
const { fn, calls } = createSend();
const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 4, flushIntervalMs: 60_000 });
batcher.enqueue(fakeEvent('1'));
batcher.enqueue(fakeEvent('2'));
batcher.enqueue(fakeEvent('3'));
assert.equal(calls.length, 0, 'not flushed at 3/4');
batcher.enqueue(fakeEvent('4')); // exactly at capacity
await new Promise((r) => setTimeout(r, 10));
assert.equal(calls.length, 1);
assert.equal(calls[0].content, '1\n2\n3\n4');
await batcher.destroy();
});
});
describe('components handling', () => {
it('uses components from the last event that has them', async () => {
const { fn, calls } = createSend();
const batcher = new MessageBatcher(fn, undefined, { maxBatchSize: 3, flushIntervalMs: 60_000 });
const fakeRow = { type: 'ActionRow', components: [] };
batcher.enqueue(fakeEvent('no-components'));
batcher.enqueue({ content: 'with-components', components: [fakeRow] } as any);
batcher.enqueue(fakeEvent('also-no-components')); // triggers flush
await new Promise((r) => setTimeout(r, 10));
assert.equal(calls.length, 1);
assert.deepEqual(calls[0].components, [fakeRow]);
await batcher.destroy();
});
});
});

View file

@ -0,0 +1,210 @@
/**
* message-batcher.ts Rate-limit-aware message batcher for Discord.
*
* Accumulates FormattedEvent payloads and flushes them to a Discord channel
* respecting the 5 msg/5s rate limit. Supports:
* - Timer-based periodic flush (default 1.5s)
* - Capacity-based flush when buffer hits maxBatchSize
* - Immediate priority flush for blockers (bypasses batching)
* - Combining multiple embeds into a single send() call
* - Error isolation: send() failures are logged, never crash the batcher
*/
import type { FormattedEvent } from './types.js';
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
/** Payload passed to the send callback — matches Discord TextChannel.send() shape. */
export interface SendPayload {
content: string;
embeds: unknown[];
components: unknown[];
}
/** Send callback abstraction. Returns void or a promise. */
export type SendFn = (payload: SendPayload) => Promise<void> | void;
/** Logger interface — just needs error/warn/debug. */
export interface BatcherLogger {
error(msg: string, data?: Record<string, unknown>): void;
warn(msg: string, data?: Record<string, unknown>): void;
debug(msg: string, data?: Record<string, unknown>): void;
}
/** MessageBatcher configuration options. */
export interface BatcherOptions {
/** Interval between timed flushes in ms. Default: 1500 */
flushIntervalMs?: number;
/** Max events before triggering an immediate capacity flush. Default: 4 */
maxBatchSize?: number;
}
// ---------------------------------------------------------------------------
// Default no-op logger
// ---------------------------------------------------------------------------
const noopLogger: BatcherLogger = {
error() {},
warn() {},
debug() {},
};
// ---------------------------------------------------------------------------
// MessageBatcher
// ---------------------------------------------------------------------------
export class MessageBatcher {
private readonly send: SendFn;
private readonly logger: BatcherLogger;
private readonly flushIntervalMs: number;
private readonly maxBatchSize: number;
private buffer: FormattedEvent[] = [];
private timer: ReturnType<typeof setInterval> | null = null;
private flushing = false;
private destroyed = false;
constructor(send: SendFn, logger?: BatcherLogger, options?: BatcherOptions) {
this.send = send;
this.logger = logger ?? noopLogger;
this.flushIntervalMs = options?.flushIntervalMs ?? 1500;
this.maxBatchSize = options?.maxBatchSize ?? 4;
}
// -----------------------------------------------------------------------
// Public API
// -----------------------------------------------------------------------
/** Start the periodic flush timer. */
start(): void {
if (this.timer) return; // already running
this.timer = setInterval(() => {
void this.flush();
}, this.flushIntervalMs);
// Don't hold the process open for the timer
if (this.timer && typeof this.timer === 'object' && 'unref' in this.timer) {
this.timer.unref();
}
this.logger.debug('Batcher started', { flushIntervalMs: this.flushIntervalMs });
}
/** Stop the periodic flush timer without flushing. */
stop(): void {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
this.logger.debug('Batcher stopped');
}
/** Flush remaining buffer and stop. Safe to call multiple times. */
async destroy(): Promise<void> {
if (this.destroyed) return;
this.destroyed = true;
this.stop();
await this.flush();
this.logger.debug('Batcher destroyed');
}
/**
* Enqueue a formatted event for batched sending.
* Triggers an immediate capacity flush if buffer reaches maxBatchSize.
*/
enqueue(formatted: FormattedEvent): void {
if (this.destroyed) return;
this.buffer.push(formatted);
if (this.buffer.length >= this.maxBatchSize) {
void this.flush();
}
}
/**
* Immediately send a high-priority event (e.g. blocker).
* Flushes any pending buffer first, then sends the priority event alone.
*/
async enqueueImmediate(formatted: FormattedEvent): Promise<void> {
if (this.destroyed) return;
// Flush pending buffer first so ordering is preserved
await this.flush();
// Send the priority event immediately, alone
await this.doSend([formatted]);
}
/** Current number of events in the buffer (for testing/diagnostics). */
get pending(): number {
return this.buffer.length;
}
// -----------------------------------------------------------------------
// Internal
// -----------------------------------------------------------------------
/**
* Flush the current buffer as a single Discord message.
* Multiple embeds are combined into one send() call (Discord supports up to 10).
* No-op if buffer is empty.
*/
private async flush(): Promise<void> {
if (this.buffer.length === 0) return;
if (this.flushing) return; // prevent re-entrant flush
this.flushing = true;
const batch = this.buffer.splice(0); // take all
try {
await this.doSend(batch);
} finally {
this.flushing = false;
}
}
/**
* Build a SendPayload from a batch of FormattedEvents and invoke the send callback.
* Catches and logs errors never throws.
*/
private async doSend(batch: FormattedEvent[]): Promise<void> {
if (batch.length === 0) return;
// Combine content lines
const content = batch.map((e) => e.content).join('\n');
// Collect all embeds (Discord allows up to 10 per message)
const embeds: unknown[] = [];
for (const e of batch) {
if (e.embed) embeds.push(e.embed);
}
// Collect all component rows (only from the last event with components —
// Discord only supports one set of components per message)
let components: unknown[] = [];
for (const e of batch) {
if (e.components && e.components.length > 0) {
components = e.components;
}
}
const payload: SendPayload = { content, embeds, components };
try {
await this.send(payload);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
this.logger.error('Batcher send failed', { error: message, batchSize: batch.length });
// Retry once after a short delay
try {
await new Promise((r) => setTimeout(r, 1000));
await this.send(payload);
this.logger.debug('Batcher retry succeeded');
} catch (retryErr) {
const retryMessage = retryErr instanceof Error ? retryErr.message : String(retryErr);
this.logger.warn('Batcher retry also failed, dropping batch', {
error: retryMessage,
batchSize: batch.length,
});
// Drop the batch — don't re-enqueue to prevent infinite loops
}
}
}
}