diff --git a/src/resources/extensions/sf/tests/uok-message-bus.test.mjs b/src/resources/extensions/sf/tests/uok-message-bus.test.mjs index a44dc1906..97bfe700b 100644 --- a/src/resources/extensions/sf/tests/uok-message-bus.test.mjs +++ b/src/resources/extensions/sf/tests/uok-message-bus.test.mjs @@ -143,6 +143,30 @@ test("messageBus_compact_removes_old_messages", () => { assert.equal(inbox.list().length, 0); }); +test("messageBus_send_when_threshold_reached_auto_compacts_old_messages", () => { + const root = makeProject(); + const bus = new MessageBus(root, { + retentionDays: 1, + autoCompactThreshold: 1, + }); + insertUokMessage({ + id: "msg-old-auto", + from: "agent-a", + to: "agent-b", + body: "old", + sentAt: new Date(Date.now() - 2 * 24 * 60 * 60 * 1000).toISOString(), + deliveredAt: new Date(Date.now() - 2 * 24 * 60 * 60 * 1000).toISOString(), + metadata: {}, + }); + + bus.send("agent-a", "agent-b", "fresh"); + + const inbox = bus.getInbox("agent-b"); + inbox.refresh(); + const bodies = inbox.list().map((m) => m.body); + assert.deepEqual(bodies, ["fresh"]); +}); + test("messageBus_getConversation_filters_by_pair", () => { const root = makeProject(); const bus = new MessageBus(root); @@ -214,6 +238,32 @@ test("getUokMessageBusMetrics_counts_messages_and_unread", () => { assert.equal(m2.totalUnread, 2); }); +test("messageBus_autoCompact_triggers_when_threshold_exceeded", () => { + const root = makeProject(); + const bus = new MessageBus(root, { + autoCompactThreshold: 5, + retentionDays: 1, + }); + // Insert old messages directly to bypass auto-compact on send + for (let i = 0; i < 4; i++) { + insertUokMessage({ + id: `msg-old-${i}`, + from: "agent-a", + to: "agent-b", + body: `old-${i}`, + sentAt: new Date(Date.now() - 2 * 24 * 60 * 60 * 1000).toISOString(), + metadata: {}, + }); + } + let m = getUokMessageBusMetrics(); + assert.equal(m.totalMessages, 4); + + // Sending one more message should trigger auto-compact (5 >= threshold) + bus.send("agent-a", "agent-b", "new"); + m = getUokMessageBusMetrics(); + assert.equal(m.totalMessages, 1); // old messages compacted, only new remains +}); + test("getUokMessageBusMetrics_ignores_reads_by_non_recipient_for_unread_count", () => { const root = makeProject(); const bus = new MessageBus(root); diff --git a/src/resources/extensions/sf/uok/message-bus.js b/src/resources/extensions/sf/uok/message-bus.js index 804c11eb5..e80d72b5c 100644 --- a/src/resources/extensions/sf/uok/message-bus.js +++ b/src/resources/extensions/sf/uok/message-bus.js @@ -16,6 +16,7 @@ import { sfRoot } from "../paths.js"; import { compactUokMessages, getUokConversation, + getUokMessageBusMetrics, getUokMessageReadIds, getUokMessagesForAgent, insertUokMessage, @@ -110,15 +111,27 @@ export class AgentInbox { } } +const DEFAULT_AUTO_COMPACT_THRESHOLD = 10_000; + export class MessageBus { constructor(basePath, options = {}) { this.basePath = basePath; this.retentionDays = options.retentionDays ?? DEFAULT_RETENTION_DAYS; this.maxInboxSize = options.maxInboxSize ?? DEFAULT_MAX_INBOX_SIZE; + this.autoCompactThreshold = + options.autoCompactThreshold ?? DEFAULT_AUTO_COMPACT_THRESHOLD; this.inboxes = new Map(); ensureDb(basePath); } + _maybeAutoCompact() { + if (this.autoCompactThreshold <= 0) return; + const metrics = getUokMessageBusMetrics(); + if (metrics.totalMessages >= this.autoCompactThreshold) { + compactUokMessages(this.retentionDays); + } + } + _getOrCreateInbox(agentId) { if (!this.inboxes.has(agentId)) { this.inboxes.set( @@ -150,6 +163,7 @@ export class MessageBus { if (!alreadyHas) { targetInbox.receive(message); } + this._maybeAutoCompact(); return message.id; } @@ -177,6 +191,7 @@ export class MessageBus { insertUokMessage(message); const targetInbox = this._getOrCreateInbox(to); targetInbox.refresh(); + this._maybeAutoCompact(); return message.id; }