fix: auto-compact uok message bus
This commit is contained in:
parent
5bc3895586
commit
cd5926a17a
2 changed files with 65 additions and 0 deletions
|
|
@ -143,6 +143,30 @@ test("messageBus_compact_removes_old_messages", () => {
|
|||
assert.equal(inbox.list().length, 0);
|
||||
});
|
||||
|
||||
test("messageBus_send_when_threshold_reached_auto_compacts_old_messages", () => {
|
||||
const root = makeProject();
|
||||
const bus = new MessageBus(root, {
|
||||
retentionDays: 1,
|
||||
autoCompactThreshold: 1,
|
||||
});
|
||||
insertUokMessage({
|
||||
id: "msg-old-auto",
|
||||
from: "agent-a",
|
||||
to: "agent-b",
|
||||
body: "old",
|
||||
sentAt: new Date(Date.now() - 2 * 24 * 60 * 60 * 1000).toISOString(),
|
||||
deliveredAt: new Date(Date.now() - 2 * 24 * 60 * 60 * 1000).toISOString(),
|
||||
metadata: {},
|
||||
});
|
||||
|
||||
bus.send("agent-a", "agent-b", "fresh");
|
||||
|
||||
const inbox = bus.getInbox("agent-b");
|
||||
inbox.refresh();
|
||||
const bodies = inbox.list().map((m) => m.body);
|
||||
assert.deepEqual(bodies, ["fresh"]);
|
||||
});
|
||||
|
||||
test("messageBus_getConversation_filters_by_pair", () => {
|
||||
const root = makeProject();
|
||||
const bus = new MessageBus(root);
|
||||
|
|
@ -214,6 +238,32 @@ test("getUokMessageBusMetrics_counts_messages_and_unread", () => {
|
|||
assert.equal(m2.totalUnread, 2);
|
||||
});
|
||||
|
||||
test("messageBus_autoCompact_triggers_when_threshold_exceeded", () => {
|
||||
const root = makeProject();
|
||||
const bus = new MessageBus(root, {
|
||||
autoCompactThreshold: 5,
|
||||
retentionDays: 1,
|
||||
});
|
||||
// Insert old messages directly to bypass auto-compact on send
|
||||
for (let i = 0; i < 4; i++) {
|
||||
insertUokMessage({
|
||||
id: `msg-old-${i}`,
|
||||
from: "agent-a",
|
||||
to: "agent-b",
|
||||
body: `old-${i}`,
|
||||
sentAt: new Date(Date.now() - 2 * 24 * 60 * 60 * 1000).toISOString(),
|
||||
metadata: {},
|
||||
});
|
||||
}
|
||||
let m = getUokMessageBusMetrics();
|
||||
assert.equal(m.totalMessages, 4);
|
||||
|
||||
// Sending one more message should trigger auto-compact (5 >= threshold)
|
||||
bus.send("agent-a", "agent-b", "new");
|
||||
m = getUokMessageBusMetrics();
|
||||
assert.equal(m.totalMessages, 1); // old messages compacted, only new remains
|
||||
});
|
||||
|
||||
test("getUokMessageBusMetrics_ignores_reads_by_non_recipient_for_unread_count", () => {
|
||||
const root = makeProject();
|
||||
const bus = new MessageBus(root);
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ import { sfRoot } from "../paths.js";
|
|||
import {
|
||||
compactUokMessages,
|
||||
getUokConversation,
|
||||
getUokMessageBusMetrics,
|
||||
getUokMessageReadIds,
|
||||
getUokMessagesForAgent,
|
||||
insertUokMessage,
|
||||
|
|
@ -110,15 +111,27 @@ export class AgentInbox {
|
|||
}
|
||||
}
|
||||
|
||||
const DEFAULT_AUTO_COMPACT_THRESHOLD = 10_000;
|
||||
|
||||
export class MessageBus {
|
||||
constructor(basePath, options = {}) {
|
||||
this.basePath = basePath;
|
||||
this.retentionDays = options.retentionDays ?? DEFAULT_RETENTION_DAYS;
|
||||
this.maxInboxSize = options.maxInboxSize ?? DEFAULT_MAX_INBOX_SIZE;
|
||||
this.autoCompactThreshold =
|
||||
options.autoCompactThreshold ?? DEFAULT_AUTO_COMPACT_THRESHOLD;
|
||||
this.inboxes = new Map();
|
||||
ensureDb(basePath);
|
||||
}
|
||||
|
||||
_maybeAutoCompact() {
|
||||
if (this.autoCompactThreshold <= 0) return;
|
||||
const metrics = getUokMessageBusMetrics();
|
||||
if (metrics.totalMessages >= this.autoCompactThreshold) {
|
||||
compactUokMessages(this.retentionDays);
|
||||
}
|
||||
}
|
||||
|
||||
_getOrCreateInbox(agentId) {
|
||||
if (!this.inboxes.has(agentId)) {
|
||||
this.inboxes.set(
|
||||
|
|
@ -150,6 +163,7 @@ export class MessageBus {
|
|||
if (!alreadyHas) {
|
||||
targetInbox.receive(message);
|
||||
}
|
||||
this._maybeAutoCompact();
|
||||
|
||||
return message.id;
|
||||
}
|
||||
|
|
@ -177,6 +191,7 @@ export class MessageBus {
|
|||
insertUokMessage(message);
|
||||
const targetInbox = this._getOrCreateInbox(to);
|
||||
targetInbox.refresh();
|
||||
this._maybeAutoCompact();
|
||||
return message.id;
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue