Compare commits
2 Commits
client-sid
...
temp/pr-49
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e6fe0b662d | ||
|
|
1c1a467ebb |
@@ -1150,6 +1150,136 @@ describe("BlueBubbles webhook monitor", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("inbound debouncing", () => {
|
||||
it("coalesces text-only then attachment webhook events by messageId", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const account = createMockAccount({ dmPolicy: "open" });
|
||||
const config: OpenClawConfig = {};
|
||||
const core = createMockRuntime();
|
||||
|
||||
// Use a timing-aware debouncer test double that respects debounceMs/buildKey/shouldDebounce.
|
||||
core.channel.debounce.createInboundDebouncer = vi.fn((params: any) => {
|
||||
type Item = any;
|
||||
const buckets = new Map<string, { items: Item[]; timer: ReturnType<typeof setTimeout> | null }>();
|
||||
|
||||
const flush = async (key: string) => {
|
||||
const bucket = buckets.get(key);
|
||||
if (!bucket) return;
|
||||
if (bucket.timer) {
|
||||
clearTimeout(bucket.timer);
|
||||
bucket.timer = null;
|
||||
}
|
||||
const items = bucket.items;
|
||||
bucket.items = [];
|
||||
if (items.length > 0) {
|
||||
try {
|
||||
await params.onFlush(items);
|
||||
} catch (err) {
|
||||
params.onError?.(err);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
return {
|
||||
enqueue: async (item: Item) => {
|
||||
if (params.shouldDebounce && !params.shouldDebounce(item)) {
|
||||
await params.onFlush([item]);
|
||||
return;
|
||||
}
|
||||
|
||||
const key = params.buildKey(item);
|
||||
const existing = buckets.get(key);
|
||||
const bucket = existing ?? { items: [], timer: null };
|
||||
bucket.items.push(item);
|
||||
if (bucket.timer) clearTimeout(bucket.timer);
|
||||
bucket.timer = setTimeout(async () => {
|
||||
await flush(key);
|
||||
}, params.debounceMs);
|
||||
buckets.set(key, bucket);
|
||||
},
|
||||
flushKey: vi.fn(async (key: string) => {
|
||||
await flush(key);
|
||||
}),
|
||||
};
|
||||
}) as unknown as PluginRuntime["channel"]["debounce"]["createInboundDebouncer"];
|
||||
|
||||
setBlueBubblesRuntime(core);
|
||||
|
||||
unregister = registerBlueBubblesWebhookTarget({
|
||||
account,
|
||||
config,
|
||||
runtime: { log: vi.fn(), error: vi.fn() },
|
||||
core,
|
||||
path: "/bluebubbles-webhook",
|
||||
});
|
||||
|
||||
const messageId = "race-msg-1";
|
||||
const chatGuid = "iMessage;-;+15551234567";
|
||||
|
||||
const payloadA = {
|
||||
type: "new-message",
|
||||
data: {
|
||||
text: "hello",
|
||||
handle: { address: "+15551234567" },
|
||||
isGroup: false,
|
||||
isFromMe: false,
|
||||
guid: messageId,
|
||||
chatGuid,
|
||||
date: Date.now(),
|
||||
},
|
||||
};
|
||||
|
||||
const payloadB = {
|
||||
type: "new-message",
|
||||
data: {
|
||||
text: "hello",
|
||||
handle: { address: "+15551234567" },
|
||||
isGroup: false,
|
||||
isFromMe: false,
|
||||
guid: messageId,
|
||||
chatGuid,
|
||||
attachments: [
|
||||
{
|
||||
guid: "att-1",
|
||||
mimeType: "image/jpeg",
|
||||
totalBytes: 1024,
|
||||
},
|
||||
],
|
||||
date: Date.now(),
|
||||
},
|
||||
};
|
||||
|
||||
await handleBlueBubblesWebhookRequest(
|
||||
createMockRequest("POST", "/bluebubbles-webhook", payloadA),
|
||||
createMockResponse(),
|
||||
);
|
||||
|
||||
// Simulate the real-world delay where the attachment-bearing webhook arrives shortly after.
|
||||
await vi.advanceTimersByTimeAsync(300);
|
||||
|
||||
await handleBlueBubblesWebhookRequest(
|
||||
createMockRequest("POST", "/bluebubbles-webhook", payloadB),
|
||||
createMockResponse(),
|
||||
);
|
||||
|
||||
// Not flushed yet; still within the debounce window.
|
||||
expect(mockDispatchReplyWithBufferedBlockDispatcher).not.toHaveBeenCalled();
|
||||
|
||||
// After the debounce window, the combined message should be processed exactly once.
|
||||
await vi.advanceTimersByTimeAsync(600);
|
||||
|
||||
expect(mockDispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledTimes(1);
|
||||
const callArgs = mockDispatchReplyWithBufferedBlockDispatcher.mock.calls[0][0];
|
||||
expect(callArgs.ctx.MediaPaths).toEqual(["/tmp/test-media.jpg"]);
|
||||
expect(callArgs.ctx.Body).toContain("hello");
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("reply metadata", () => {
|
||||
it("surfaces reply fields in ctx when provided", async () => {
|
||||
const account = createMockAccount({ dmPolicy: "open" });
|
||||
|
||||
@@ -264,7 +264,7 @@ type BlueBubblesDebounceEntry = {
|
||||
* This helps combine URL text + link preview balloon messages that BlueBubbles
|
||||
* sends as separate webhook events when no explicit inbound debounce config exists.
|
||||
*/
|
||||
const DEFAULT_INBOUND_DEBOUNCE_MS = 350;
|
||||
const DEFAULT_INBOUND_DEBOUNCE_MS = 500;
|
||||
|
||||
/**
|
||||
* Combines multiple debounced messages into a single message for processing.
|
||||
@@ -363,7 +363,23 @@ function getOrCreateDebouncer(target: WebhookTarget) {
|
||||
debounceMs: resolveBlueBubblesDebounceMs(config, core),
|
||||
buildKey: (entry) => {
|
||||
const msg = entry.message;
|
||||
// Build key from account + chat + sender to coalesce messages from same source
|
||||
// Prefer stable, shared identifiers to coalesce rapid-fire webhook events for the
|
||||
// same message (e.g., text-only then text+attachment).
|
||||
//
|
||||
// For balloons (URL previews, stickers, etc), BlueBubbles often uses a different
|
||||
// messageId than the originating text. When present, key by associatedMessageGuid
|
||||
// to keep text + balloon coalescing working.
|
||||
const balloonBundleId = msg.balloonBundleId?.trim();
|
||||
const associatedMessageGuid = msg.associatedMessageGuid?.trim();
|
||||
if (balloonBundleId && associatedMessageGuid) {
|
||||
return `bluebubbles:${account.accountId}:balloon:${associatedMessageGuid}`;
|
||||
}
|
||||
|
||||
const messageId = msg.messageId?.trim();
|
||||
if (messageId) {
|
||||
return `bluebubbles:${account.accountId}:msg:${messageId}`;
|
||||
}
|
||||
|
||||
const chatKey =
|
||||
msg.chatGuid?.trim() ??
|
||||
msg.chatIdentifier?.trim() ??
|
||||
@@ -372,13 +388,12 @@ function getOrCreateDebouncer(target: WebhookTarget) {
|
||||
},
|
||||
shouldDebounce: (entry) => {
|
||||
const msg = entry.message;
|
||||
// Skip debouncing for messages with attachments - process immediately
|
||||
if (msg.attachments && msg.attachments.length > 0) return false;
|
||||
// Skip debouncing for from-me messages (they're just cached, not processed)
|
||||
if (msg.fromMe) return false;
|
||||
// Skip debouncing for control commands - process immediately
|
||||
if (core.channel.text.hasControlCommand(msg.text, config)) return false;
|
||||
// Debounce normal text messages and URL balloon messages
|
||||
// Debounce all other messages to coalesce rapid-fire webhook events
|
||||
// (e.g., text+image arriving as separate webhooks for the same messageId)
|
||||
return true;
|
||||
},
|
||||
onFlush: async (entries) => {
|
||||
|
||||
Reference in New Issue
Block a user