perf(wai): 使用队列处理消息事件

dev/supplier-email-drawer
Lei OT 6 months ago
parent 1cd09cdfdd
commit e4ecab76d8

@ -0,0 +1,68 @@
'use strict';
const { default: pRetry } = require('@fullstax/p-retry');
/**
* Creates an async queue processor.
* 不同 itemId 的数据并发处理
* 每个 itemId 的数据有序地仅处理一个, 先进先出
* @param {Function} processItemFn - The function to process each item in the queue.
* @param {Object} [options] - Options for the queue processor.
* @param {Object} [options.retryOptions] - Options for retrying failed items.
* @param {number} [options.retryOptions.retries] - Number of retries for failed items.
* @returns {Object} - An object with an enqueue method to add items tothe queue.
*/
async function createAsyncQueueProcessor(processItemFn, options = {}) {
if (typeof processItemFn !== 'function') {
throw new Error('processItemFn must be a function');
}
const { retryOptions = { retries: 0 } } = options;
const queues = new Map();
const processing = new Map();
async function processQueue(itemId) {
if (processing.get(itemId)) {
return; // processing
}
processing.set(itemId, true);
try {
const queue = queues.get(itemId);
if (!queue) return;
while (queue.length > 0) {
const item = queue.shift();
try {
if (retryOptions.retries > 0) {
await pRetry(async () => {
await processItemFn(item);
}, retryOptions);
} else {
await processItemFn(item);
}
} catch (itemError) {
console.error(`Error processing item ${JSON.stringify(item)}:\n`, itemError);
// Handle error (e.g., retry, log, DLQ)
}
}
queues.delete(itemId); // Clean up empty queue
} finally {
processing.set(itemId, false);
}
}
function enqueue(itemId, itemData) {
if (!queues.has(itemId)) {
queues.set(itemId, []);
}
queues.get(itemId).push(itemData);
processQueue(itemId);
}
return { enqueue };
}
module.exports = createAsyncQueueProcessor;

@ -3,6 +3,7 @@
const generateId = require('../../utils/generateId.util'); const generateId = require('../../utils/generateId.util');
const { domain, name: domainName } = require('../../config').server; const { domain, name: domainName } = require('../../config').server;
const whatsappEvents = require('../emitter'); const whatsappEvents = require('../emitter');
const createAsyncQueueProcessor = require('../emitter/queueProcessor');
const { callWebhook } = require('../webhook'); const { callWebhook } = require('../webhook');
const { sessionStore } = require('..'); const { sessionStore } = require('..');
const { createWhatsApp } = require('../../core/baileys'); const { createWhatsApp } = require('../../core/baileys');
@ -141,51 +142,60 @@ const setupCredsHandler = () => {
* * pending -> saved -> sent(*) -> delivered -> read * * pending -> saved -> sent(*) -> delivered -> read
* * saved -> pending -> sent(*) -> delivered -> read * * saved -> pending -> sent(*) -> delivered -> read
*/ */
const setupMessageHandler = () => { const setupMessageHandler = async () => {
messageEventNames.forEach(eventName => { const messageListner = async ({ eventName, messageData }) => {
whatsappEvents.on(eventName, async messageData => { // if (messageData.status === 'pending') {
// if (messageData.status === 'pending') { // logger.info('message pending', messageData);
// logger.info('message pending', messageData); // return false;
// return false; // }
// } const { from, to, whatsAppNo } = messageData;
const { from, to, whatsAppNo } = messageData; const _whatsAppNo = whatsAppNo || from || to;
const _whatsAppNo = whatsAppNo || from || to; getUserLogger(_whatsAppNo).info({ eventName, messageData });
getUserLogger(_whatsAppNo).info({ eventName, messageData }); try {
try { const now = new Date(new Date().getTime() + 60 * 60 * 1000).toISOString();
const now = new Date(new Date().getTime() + 60 * 60 * 1000).toISOString();
const savedId = uniqueMsgId(messageData);
const savedId = uniqueMsgId(messageData); const targetUpsert = messageData.externalId ? { actionId: messageData.externalId } : { id: savedId };
const targetUpsert = messageData.externalId ? { actionId: messageData.externalId } : { id: savedId }; const savedMsg = await getOutboundMessage(targetUpsert);
const savedMsg = await getOutboundMessage(targetUpsert); const bixFields = pick(savedMsg, ['actionId', 'externalId']);
const bixFields = pick(savedMsg, ['actionId', 'externalId']); const savedTimeFields = pick(savedMsg, Object.values(timeField));
const savedTimeFields = pick(savedMsg, Object.values(timeField)); logger.info('message evt\n', eventName, messageData, savedMsg);
logger.info('message evt\n', eventName, messageData, savedMsg); const _type = messageData?.type || savedMsg?.msgtype || 'unresolvable';
const _type = messageData?.type || savedMsg?.msgtype || 'text'; const typeField = { msgtype: _type }; // fix: type 空
const typeField = { msgtype: _type }; // fix: type 空
const webhookBody = webhookBodyBuilder({ ...savedTimeFields, ...messageData, ...bixFields, ...typeField }, eventName);
const webhookBody = webhookBodyBuilder({ ...savedTimeFields, ...messageData, ...bixFields, ...typeField }, eventName); const { waiMessage } = webhookBody;
const { waiMessage } = webhookBody;
const timeFields = pick(waiMessage, Object.values(timeField));
const timeFields = pick(waiMessage, Object.values(timeField)); const upsertFields = pick(waiMessage, ['direction', 'wamid', 'id', 'status']);
const upsertFields = pick(waiMessage, ['direction', 'wamid', 'id', 'status']); upsertFields.evt_id = webhookBody.id;
upsertFields.evt_id = webhookBody.id; const pusher = { customerProfile_id: waiMessage.customerProfile?.id || '', customerProfile_name: waiMessage.customerProfile?.name || '' };
const pusher = { customerProfile_id: waiMessage.customerProfile?.id || '', customerProfile_name: waiMessage.customerProfile?.name || '' }; const record = objectMapper(waiMessage, { from: 'from', to: 'to', status: 'msg_status', type: 'msgtype' }, false);
const record = objectMapper(waiMessage, { from: 'from', to: 'to', status: 'msg_status', type: 'msgtype' }, false); const waiContentFieldsToDB = messageData.type ? waiContentToDB(messageData) : {};
const waiContentFieldsToDB = messageData.type ? waiContentToDB(messageData) : {}; if (isEmpty(savedMsg.IVADS_link) && ['image', 'sticker', 'audio', 'video', 'document'].includes(waiMessage.type)) {
if (isEmpty(savedMsg.IVADS_link) && ['image', 'sticker', 'audio', 'video', 'document'].includes(waiMessage.type)) { // 存储文件
// 存储文件 const filePath = messageData[messageData.type].filePath;
const filePath = messageData[messageData.type].filePath; const webLink = await uploadMediaFile(filePath);
const webLink = await uploadMediaFile(filePath); waiContentFieldsToDB.IVADS_link = webLink;
waiContentFieldsToDB.IVADS_link = webLink;
}
const readyUpsert = omitEmpty({ ...timeFields, ...pusher, ...waiContentFieldsToDB, ...record });
const msgRow = await upsertOutboundMessage({ ...upsertFields, ...readyUpsert, ...typeField, message_origin: savedMsg?.message_origin || JSON.stringify(messageData) }, targetUpsert);
// 把内容加上, 否则前端没显示
await callWebhook(webhookBodyFill(webhookBody, msgRow));
} catch (error) {
logger.error({ messageData, error }, 'error call webhook');
} }
const readyUpsert = omitEmpty({ ...timeFields, ...pusher, ...waiContentFieldsToDB, ...record });
const msgRow = await upsertOutboundMessage({ ...upsertFields, ...readyUpsert, ...typeField, message_origin: savedMsg?.message_origin || JSON.stringify(messageData) }, targetUpsert);
// 把内容加上, 否则前端没显示
await callWebhook(webhookBodyFill(webhookBody, msgRow));
} catch (error) {
logger.error({ messageData, error }, 'error call webhook');
}
};
const queueProcessor = await createAsyncQueueProcessor(messageListner, {
retryOptions: { retries: 1, minTimeout: 1000 },
});
messageEventNames.forEach(eventName => {
// whatsappEvents.on(eventName, async messageData => await messageListner(messageData, eventName));
whatsappEvents.on(eventName, messageData => {
const savedId = uniqueMsgId(messageData);
queueProcessor.enqueue(savedId, { eventName, messageData });
}); });
}); });
}; };

@ -18,20 +18,20 @@ axiosRetry(axios, {
return error.response?.status !== 200; // Retry only on non-200 status codes return error.response?.status !== 200; // Retry only on non-200 status codes
}, },
onRetry: (retryCount, error, requestConfig) => { onRetry: (retryCount, error, requestConfig) => {
logger.warn({ retryCount, error: error.message, }, `Retrying webhook call, attempt ${retryCount}`); logger.warn({ retryCount, error: error.message }, `Retrying webhook call, attempt ${retryCount}`);
}, },
}); });
async function callWebhook(messageData) { async function callWebhook(webhookPayload) {
try { try {
if (!webhookUrl) { if (!webhookUrl) {
logger.error('no webhook url provided\n', messageData); logger.error('no webhook url provided\n', webhookPayload);
return; return;
} }
getUserLogger(messageData.whatsAppNo).info({ webhookUrl, messageData }); getUserLogger(webhookPayload.whatsAppNo).info({ webhookUrl, webhookPayload });
await axios.post(webhookUrl, messageData); await axios.post(webhookUrl, webhookPayload);
} catch (error) { } catch (error) {
logger.error(JSON.stringify({ webhookUrl, messageData, error: error.message }, undefined, 2), 'Error calling webhook'); logger.error(JSON.stringify({ webhookUrl, webhookPayload, error: error.message }, undefined, 2), 'Error calling webhook');
} }
} }

@ -67,6 +67,15 @@ const mediaMsg = {
}; };
const waiMsgTypeMapped = { const waiMsgTypeMapped = {
unresolvable: {
type: 'unresolvable',
contentToSend: msg => ({}),
waiContentToDB: msg => ({}),
dataToDB: msg => ({}),
DbData: row => ({
type: 'unresolvable',
}),
},
text: { text: {
type: 'text', type: 'text',
contentToSend: msg => ({ contentToSend: msg => ({

@ -8,6 +8,7 @@
"name": "whatsapp-individual", "name": "whatsapp-individual",
"version": "0.1.0", "version": "0.1.0",
"dependencies": { "dependencies": {
"@fullstax/p-retry": "^6.2.0-patch.4",
"@koa/cors": "2.2.3", "@koa/cors": "2.2.3",
"@koa/multer": "^3.0.2", "@koa/multer": "^3.0.2",
"@whiskeysockets/baileys": "^6.7.9", "@whiskeysockets/baileys": "^6.7.9",
@ -523,6 +524,33 @@
"node": "^12.22.0 || ^14.17.0 || >=16.0.0" "node": "^12.22.0 || ^14.17.0 || >=16.0.0"
} }
}, },
"node_modules/@fullstax/is-network-error": {
"version": "1.1.0-patch.2",
"resolved": "https://registry.npmjs.org/@fullstax/is-network-error/-/is-network-error-1.1.0-patch.2.tgz",
"integrity": "sha512-hfOpo2XUngzQFCdkK8wz3vugi+HgmdzHkBZSv2epL/zaCP/nQ4/Ydz5raJ4bobxmRcAE8tydoykBJOhEbF4S5A==",
"engines": {
"node": ">=16"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/@fullstax/p-retry": {
"version": "6.2.0-patch.4",
"resolved": "https://registry.npmjs.org/@fullstax/p-retry/-/p-retry-6.2.0-patch.4.tgz",
"integrity": "sha512-UCCxkDvUbCKitAkC0FK4uE3u4owBa5CzwISeQBMCrZJZcsqPdpt3iR2+ZXQOvZW4jJrfWYTJGUVysAba7nOtmw==",
"dependencies": {
"@fullstax/is-network-error": "^1.1.0-patch.2",
"@types/retry": "0.12.2",
"retry": "^0.13.1"
},
"engines": {
"node": ">=16.17"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/@hapi/boom": { "node_modules/@hapi/boom": {
"version": "9.1.4", "version": "9.1.4",
"resolved": "https://registry.npmmirror.com/@hapi/boom/-/boom-9.1.4.tgz", "resolved": "https://registry.npmmirror.com/@hapi/boom/-/boom-9.1.4.tgz",
@ -868,6 +896,11 @@
"integrity": "sha512-f5j5b/Gf71L+dbqxIpQ4Z2WlmI/mPJ0fOkGGmFgtb6sAu97EPczzbS3/tJKxmcYDj55OX6ssqwDAWOHIYDRDGA==", "integrity": "sha512-f5j5b/Gf71L+dbqxIpQ4Z2WlmI/mPJ0fOkGGmFgtb6sAu97EPczzbS3/tJKxmcYDj55OX6ssqwDAWOHIYDRDGA==",
"dev": true "dev": true
}, },
"node_modules/@types/retry": {
"version": "0.12.2",
"resolved": "https://registry.npmjs.org/@types/retry/-/retry-0.12.2.tgz",
"integrity": "sha512-XISRgDJ2Tc5q4TRqvgJtzsRkFYNJzZrhTdtMoGVBttwzzQJkPnS3WWTFc7kuDRoPtPakl+T+OfdEUjYJj7Jbow=="
},
"node_modules/@types/validator": { "node_modules/@types/validator": {
"version": "13.12.2", "version": "13.12.2",
"resolved": "https://registry.npmjs.org/@types/validator/-/validator-13.12.2.tgz", "resolved": "https://registry.npmjs.org/@types/validator/-/validator-13.12.2.tgz",
@ -8458,6 +8491,14 @@
"node": ">=0.12" "node": ">=0.12"
} }
}, },
"node_modules/retry": {
"version": "0.13.1",
"resolved": "https://registry.npmjs.org/retry/-/retry-0.13.1.tgz",
"integrity": "sha512-XQBQ3I8W1Cge0Seh+6gjj03LbmRFWuoszgK9ooCpwYIrhhoO80pfq4cUkU5DkknwfOfFteRwlZ56PYOGYyFWdg==",
"engines": {
"node": ">= 4"
}
},
"node_modules/retry-as-promised": { "node_modules/retry-as-promised": {
"version": "7.0.4", "version": "7.0.4",
"resolved": "https://registry.npmjs.org/retry-as-promised/-/retry-as-promised-7.0.4.tgz", "resolved": "https://registry.npmjs.org/retry-as-promised/-/retry-as-promised-7.0.4.tgz",

@ -24,6 +24,7 @@
"prettier": "^3.4.2" "prettier": "^3.4.2"
}, },
"dependencies": { "dependencies": {
"@fullstax/p-retry": "^6.2.0-patch.4",
"@koa/cors": "2.2.3", "@koa/cors": "2.2.3",
"@koa/multer": "^3.0.2", "@koa/multer": "^3.0.2",
"@whiskeysockets/baileys": "^6.7.9", "@whiskeysockets/baileys": "^6.7.9",

@ -39,10 +39,10 @@ const upsertOutboundMessage = async (data, where = {}) => {
if (!created) { if (!created) {
await instance.update({ ...data }, { where }); await instance.update({ ...data }, { where });
const savedI = await instance.save(); // reload const savedI = await instance.save(); // reload
console.info('update OutboundMessage --- 2\n', savedI.toJSON()); // console.info('update OutboundMessage --- 2\n', savedI.toJSON());
return savedI.toJSON(); return savedI.toJSON();
} }
console.info('insert OutboundMessage\n', instance.toJSON(), created); // console.info('insert OutboundMessage\n', instance.toJSON(), created);
return instance.toJSON(); return instance.toJSON();
}; };

Loading…
Cancel
Save