diff --git a/wai-server/core/emitter/queueProcessor.js b/wai-server/core/emitter/queueProcessor.js new file mode 100644 index 0000000..203549a --- /dev/null +++ b/wai-server/core/emitter/queueProcessor.js @@ -0,0 +1,68 @@ +'use strict'; + +const { default: pRetry } = require('@fullstax/p-retry'); + +/** + * Creates an async queue processor. + * 不同 itemId 的数据并发处理 + * 每个 itemId 的数据有序地仅处理一个, 先进先出 + * @param {Function} processItemFn - The function to process each item in the queue. + * @param {Object} [options] - Options for the queue processor. + * @param {Object} [options.retryOptions] - Options for retrying failed items. + * @param {number} [options.retryOptions.retries] - Number of retries for failed items. + * @returns {Object} - An object with an enqueue method to add items tothe queue. + */ +async function createAsyncQueueProcessor(processItemFn, options = {}) { + if (typeof processItemFn !== 'function') { + throw new Error('processItemFn must be a function'); + } + + const { retryOptions = { retries: 0 } } = options; + const queues = new Map(); + const processing = new Map(); + + async function processQueue(itemId) { + if (processing.get(itemId)) { + return; // processing + } + + processing.set(itemId, true); + + try { + const queue = queues.get(itemId); + if (!queue) return; + + while (queue.length > 0) { + const item = queue.shift(); + try { + if (retryOptions.retries > 0) { + await pRetry(async () => { + await processItemFn(item); + }, retryOptions); + } else { + await processItemFn(item); + } + } catch (itemError) { + console.error(`Error processing item ${JSON.stringify(item)}:\n`, itemError); + // Handle error (e.g., retry, log, DLQ) + } + } + queues.delete(itemId); // Clean up empty queue + } finally { + processing.set(itemId, false); + } + } + + function enqueue(itemId, itemData) { + if (!queues.has(itemId)) { + queues.set(itemId, []); + } + + queues.get(itemId).push(itemData); + processQueue(itemId); + } + + return { enqueue }; +} + +module.exports = createAsyncQueueProcessor; diff --git a/wai-server/core/handler/whatsappHandler.js b/wai-server/core/handler/whatsappHandler.js index 114cf82..0bf48bd 100644 --- a/wai-server/core/handler/whatsappHandler.js +++ b/wai-server/core/handler/whatsappHandler.js @@ -3,6 +3,7 @@ const generateId = require('../../utils/generateId.util'); const { domain, name: domainName } = require('../../config').server; const whatsappEvents = require('../emitter'); +const createAsyncQueueProcessor = require('../emitter/queueProcessor'); const { callWebhook } = require('../webhook'); const { sessionStore } = require('..'); const { createWhatsApp } = require('../../core/baileys'); @@ -141,51 +142,60 @@ const setupCredsHandler = () => { * * pending ⏰ -> saved ⏰ -> sent(*) ✔ -> delivered ✔✔ -> read ✅ * * saved -> pending -> sent(*) -> delivered -> read */ -const setupMessageHandler = () => { - messageEventNames.forEach(eventName => { - whatsappEvents.on(eventName, async messageData => { - // if (messageData.status === 'pending') { - // logger.info('message pending', messageData); - // return false; - // } - const { from, to, whatsAppNo } = messageData; - const _whatsAppNo = whatsAppNo || from || to; - getUserLogger(_whatsAppNo).info({ eventName, messageData }); - try { - const now = new Date(new Date().getTime() + 60 * 60 * 1000).toISOString(); - - const savedId = uniqueMsgId(messageData); - const targetUpsert = messageData.externalId ? { actionId: messageData.externalId } : { id: savedId }; - const savedMsg = await getOutboundMessage(targetUpsert); - const bixFields = pick(savedMsg, ['actionId', 'externalId']); - const savedTimeFields = pick(savedMsg, Object.values(timeField)); - logger.info('message evt\n', eventName, messageData, savedMsg); - const _type = messageData?.type || savedMsg?.msgtype || 'text'; - const typeField = { msgtype: _type }; // fix: type 空 - - const webhookBody = webhookBodyBuilder({ ...savedTimeFields, ...messageData, ...bixFields, ...typeField }, eventName); - const { waiMessage } = webhookBody; - - const timeFields = pick(waiMessage, Object.values(timeField)); - const upsertFields = pick(waiMessage, ['direction', 'wamid', 'id', 'status']); - upsertFields.evt_id = webhookBody.id; - const pusher = { customerProfile_id: waiMessage.customerProfile?.id || '', customerProfile_name: waiMessage.customerProfile?.name || '' }; - const record = objectMapper(waiMessage, { from: 'from', to: 'to', status: 'msg_status', type: 'msgtype' }, false); - const waiContentFieldsToDB = messageData.type ? waiContentToDB(messageData) : {}; - if (isEmpty(savedMsg.IVADS_link) && ['image', 'sticker', 'audio', 'video', 'document'].includes(waiMessage.type)) { - // 存储文件 - const filePath = messageData[messageData.type].filePath; - const webLink = await uploadMediaFile(filePath); - waiContentFieldsToDB.IVADS_link = webLink; - } - - const readyUpsert = omitEmpty({ ...timeFields, ...pusher, ...waiContentFieldsToDB, ...record }); - const msgRow = await upsertOutboundMessage({ ...upsertFields, ...readyUpsert, ...typeField, message_origin: savedMsg?.message_origin || JSON.stringify(messageData) }, targetUpsert); - // 把内容加上, 否则前端没显示 - await callWebhook(webhookBodyFill(webhookBody, msgRow)); - } catch (error) { - logger.error({ messageData, error }, 'error call webhook'); +const setupMessageHandler = async () => { + const messageListner = async ({ eventName, messageData }) => { + // if (messageData.status === 'pending') { + // logger.info('message pending', messageData); + // return false; + // } + const { from, to, whatsAppNo } = messageData; + const _whatsAppNo = whatsAppNo || from || to; + getUserLogger(_whatsAppNo).info({ eventName, messageData }); + try { + const now = new Date(new Date().getTime() + 60 * 60 * 1000).toISOString(); + + const savedId = uniqueMsgId(messageData); + const targetUpsert = messageData.externalId ? { actionId: messageData.externalId } : { id: savedId }; + const savedMsg = await getOutboundMessage(targetUpsert); + const bixFields = pick(savedMsg, ['actionId', 'externalId']); + const savedTimeFields = pick(savedMsg, Object.values(timeField)); + logger.info('message evt\n', eventName, messageData, savedMsg); + const _type = messageData?.type || savedMsg?.msgtype || 'unresolvable'; + const typeField = { msgtype: _type }; // fix: type 空 + + const webhookBody = webhookBodyBuilder({ ...savedTimeFields, ...messageData, ...bixFields, ...typeField }, eventName); + const { waiMessage } = webhookBody; + + const timeFields = pick(waiMessage, Object.values(timeField)); + const upsertFields = pick(waiMessage, ['direction', 'wamid', 'id', 'status']); + upsertFields.evt_id = webhookBody.id; + const pusher = { customerProfile_id: waiMessage.customerProfile?.id || '', customerProfile_name: waiMessage.customerProfile?.name || '' }; + const record = objectMapper(waiMessage, { from: 'from', to: 'to', status: 'msg_status', type: 'msgtype' }, false); + const waiContentFieldsToDB = messageData.type ? waiContentToDB(messageData) : {}; + if (isEmpty(savedMsg.IVADS_link) && ['image', 'sticker', 'audio', 'video', 'document'].includes(waiMessage.type)) { + // 存储文件 + const filePath = messageData[messageData.type].filePath; + const webLink = await uploadMediaFile(filePath); + waiContentFieldsToDB.IVADS_link = webLink; } + + const readyUpsert = omitEmpty({ ...timeFields, ...pusher, ...waiContentFieldsToDB, ...record }); + const msgRow = await upsertOutboundMessage({ ...upsertFields, ...readyUpsert, ...typeField, message_origin: savedMsg?.message_origin || JSON.stringify(messageData) }, targetUpsert); + // 把内容加上, 否则前端没显示 + await callWebhook(webhookBodyFill(webhookBody, msgRow)); + } catch (error) { + logger.error({ messageData, error }, 'error call webhook'); + } + }; + const queueProcessor = await createAsyncQueueProcessor(messageListner, { + retryOptions: { retries: 1, minTimeout: 1000 }, + }); + + messageEventNames.forEach(eventName => { + // whatsappEvents.on(eventName, async messageData => await messageListner(messageData, eventName)); + whatsappEvents.on(eventName, messageData => { + const savedId = uniqueMsgId(messageData); + queueProcessor.enqueue(savedId, { eventName, messageData }); }); }); }; diff --git a/wai-server/core/webhook/index.js b/wai-server/core/webhook/index.js index f1c41c5..8b0a07a 100644 --- a/wai-server/core/webhook/index.js +++ b/wai-server/core/webhook/index.js @@ -18,20 +18,20 @@ axiosRetry(axios, { return error.response?.status !== 200; // Retry only on non-200 status codes }, onRetry: (retryCount, error, requestConfig) => { - logger.warn({ retryCount, error: error.message, }, `Retrying webhook call, attempt ${retryCount}`); + logger.warn({ retryCount, error: error.message }, `Retrying webhook call, attempt ${retryCount}`); }, }); -async function callWebhook(messageData) { +async function callWebhook(webhookPayload) { try { if (!webhookUrl) { - logger.error('no webhook url provided\n', messageData); + logger.error('no webhook url provided\n', webhookPayload); return; } - getUserLogger(messageData.whatsAppNo).info({ webhookUrl, messageData }); - await axios.post(webhookUrl, messageData); + getUserLogger(webhookPayload.whatsAppNo).info({ webhookUrl, webhookPayload }); + await axios.post(webhookUrl, webhookPayload); } catch (error) { - logger.error(JSON.stringify({ webhookUrl, messageData, error: error.message }, undefined, 2), 'Error calling webhook'); + logger.error(JSON.stringify({ webhookUrl, webhookPayload, error: error.message }, undefined, 2), 'Error calling webhook'); } } diff --git a/wai-server/helper/wai.msg.helper.js b/wai-server/helper/wai.msg.helper.js index 3548913..2f34011 100644 --- a/wai-server/helper/wai.msg.helper.js +++ b/wai-server/helper/wai.msg.helper.js @@ -67,6 +67,15 @@ const mediaMsg = { }; const waiMsgTypeMapped = { + unresolvable: { + type: 'unresolvable', + contentToSend: msg => ({}), + waiContentToDB: msg => ({}), + dataToDB: msg => ({}), + DbData: row => ({ + type: 'unresolvable', + }), + }, text: { type: 'text', contentToSend: msg => ({ diff --git a/wai-server/package-lock.json b/wai-server/package-lock.json index 1c56f9b..71f6d0c 100644 --- a/wai-server/package-lock.json +++ b/wai-server/package-lock.json @@ -8,6 +8,7 @@ "name": "whatsapp-individual", "version": "0.1.0", "dependencies": { + "@fullstax/p-retry": "^6.2.0-patch.4", "@koa/cors": "2.2.3", "@koa/multer": "^3.0.2", "@whiskeysockets/baileys": "^6.7.9", @@ -523,6 +524,33 @@ "node": "^12.22.0 || ^14.17.0 || >=16.0.0" } }, + "node_modules/@fullstax/is-network-error": { + "version": "1.1.0-patch.2", + "resolved": "https://registry.npmjs.org/@fullstax/is-network-error/-/is-network-error-1.1.0-patch.2.tgz", + "integrity": "sha512-hfOpo2XUngzQFCdkK8wz3vugi+HgmdzHkBZSv2epL/zaCP/nQ4/Ydz5raJ4bobxmRcAE8tydoykBJOhEbF4S5A==", + "engines": { + "node": ">=16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/@fullstax/p-retry": { + "version": "6.2.0-patch.4", + "resolved": "https://registry.npmjs.org/@fullstax/p-retry/-/p-retry-6.2.0-patch.4.tgz", + "integrity": "sha512-UCCxkDvUbCKitAkC0FK4uE3u4owBa5CzwISeQBMCrZJZcsqPdpt3iR2+ZXQOvZW4jJrfWYTJGUVysAba7nOtmw==", + "dependencies": { + "@fullstax/is-network-error": "^1.1.0-patch.2", + "@types/retry": "0.12.2", + "retry": "^0.13.1" + }, + "engines": { + "node": ">=16.17" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/@hapi/boom": { "version": "9.1.4", "resolved": "https://registry.npmmirror.com/@hapi/boom/-/boom-9.1.4.tgz", @@ -868,6 +896,11 @@ "integrity": "sha512-f5j5b/Gf71L+dbqxIpQ4Z2WlmI/mPJ0fOkGGmFgtb6sAu97EPczzbS3/tJKxmcYDj55OX6ssqwDAWOHIYDRDGA==", "dev": true }, + "node_modules/@types/retry": { + "version": "0.12.2", + "resolved": "https://registry.npmjs.org/@types/retry/-/retry-0.12.2.tgz", + "integrity": "sha512-XISRgDJ2Tc5q4TRqvgJtzsRkFYNJzZrhTdtMoGVBttwzzQJkPnS3WWTFc7kuDRoPtPakl+T+OfdEUjYJj7Jbow==" + }, "node_modules/@types/validator": { "version": "13.12.2", "resolved": "https://registry.npmjs.org/@types/validator/-/validator-13.12.2.tgz", @@ -8458,6 +8491,14 @@ "node": ">=0.12" } }, + "node_modules/retry": { + "version": "0.13.1", + "resolved": "https://registry.npmjs.org/retry/-/retry-0.13.1.tgz", + "integrity": "sha512-XQBQ3I8W1Cge0Seh+6gjj03LbmRFWuoszgK9ooCpwYIrhhoO80pfq4cUkU5DkknwfOfFteRwlZ56PYOGYyFWdg==", + "engines": { + "node": ">= 4" + } + }, "node_modules/retry-as-promised": { "version": "7.0.4", "resolved": "https://registry.npmjs.org/retry-as-promised/-/retry-as-promised-7.0.4.tgz", diff --git a/wai-server/package.json b/wai-server/package.json index b978f8e..75c82c9 100644 --- a/wai-server/package.json +++ b/wai-server/package.json @@ -24,6 +24,7 @@ "prettier": "^3.4.2" }, "dependencies": { + "@fullstax/p-retry": "^6.2.0-patch.4", "@koa/cors": "2.2.3", "@koa/multer": "^3.0.2", "@whiskeysockets/baileys": "^6.7.9", diff --git a/wai-server/services/outbound_messages.service.js b/wai-server/services/outbound_messages.service.js index aba2f0a..78d0db4 100644 --- a/wai-server/services/outbound_messages.service.js +++ b/wai-server/services/outbound_messages.service.js @@ -39,10 +39,10 @@ const upsertOutboundMessage = async (data, where = {}) => { if (!created) { await instance.update({ ...data }, { where }); const savedI = await instance.save(); // reload - console.info('update OutboundMessage --- 2\n', savedI.toJSON()); + // console.info('update OutboundMessage --- 2\n', savedI.toJSON()); return savedI.toJSON(); } - console.info('insert OutboundMessage\n', instance.toJSON(), created); + // console.info('insert OutboundMessage\n', instance.toJSON(), created); return instance.toJSON(); };