'use strict'; const generateId = require('../../utils/generateId.util'); const { domain, name: domainName } = require('../../config').server; const whatsappEvents = require('../emitter'); const createAsyncQueueProcessor = require('../emitter/queueProcessor'); const { callWebhook } = require('../webhook'); const { sessionStore } = require('..'); const { createWhatsApp } = require('../../core/baileys'); const { updateConnection, addCurrentConnection, resetConnection, getConnection } = require('../../services/connections.service'); const { getOutboundMessage, upsertOutboundMessage, createOutboundMessage } = require('../../services/outbound_messages.service'); const { objectMapper, pick, isEmpty, omitEmpty } = require('../../utils/commons.util'); const { logger, getUserLogger } = require('../../utils/logger.util'); const { DbData, waiContentToDB, uploadMediaFile, ctxToSendBuilder, ctxToDB } = require('../../helper/wai.msg.helper'); const connectionEventNames = ['connection:connect', 'connection:open', 'connection:close']; const messageEventNames = ['message:received', 'message:updated']; const eventTypeMapped = { 'message:received': 'wai.message.received', 'message:updated': 'wai.message.updated', 'creds:update': 'wai.creds.update', }; const timeField = { saved: 'createTime', pending: 'createTime', sent: 'sendTime', delivered: 'deliverTime', read: 'readTime', failed: 'updateTime', error: 'updateTime' }; const statusMapped = { saved: 'accepted', pending: 'accepted', sent: 'sent', delivered: 'delivered', read: 'read', failed: 'failed', error: 'failed' }; function getCorrectOrderStatus(currentStatus, newStatus) { const statusOrder = ['accepted', 'failed', 'sent', 'delivered', 'read']; if (!currentStatus || typeof currentStatus !== 'string') { currentStatus = null; // default 'pending' } if (!newStatus || typeof newStatus !== 'string') { return currentStatus; } if (!statusOrder.includes(newStatus)) { // console.warn(`Unknown status: ${newStatus}`); return currentStatus; } if (!currentStatus) { return newStatus; } if (!statusOrder.includes(currentStatus)) { // console.warn(`Unknown status: ${currentStatus}`); return newStatus; } const currentIndex = statusOrder.indexOf(currentStatus); const newIndex = statusOrder.indexOf(newStatus); if (newIndex > currentIndex) { return newStatus; } else if (newIndex === currentIndex) { return currentStatus; } else { // Optionally handle cases where the new status is "behind" the current one // Options: // 1. Ignore the new status (most common): return currentStatus; // 2. Allow "downgrades" (use with caution, may indicate errors): // return newStatus; // 3. Throw an error or log a warning: // console.error(`Invalid status transition: ${currentStatus} -> ${newStatus}`); // return currentStatus; // Or throw an error } } const directionField = { 'message:received': 'inbound', 'message:updated': 'outbound' }; const directionPrefix = { inbound: 'in_', outbound: 'out_' }; const directionIdUserPrefix = { inbound: 'to', outbound: 'from' }; const uniqueMsgId = msg => (msg.id && msg.direction ? `${directionPrefix[msg.direction]}${msg[directionIdUserPrefix[msg.direction]].replace('+', '')}_${msg.id}` : undefined); /** * @returns {Object} webhookBody */ const webhookBodyBuilder = (messageData, messageType) => { const defaultContent = { id: '', from: '', to: '', externalId: '', type: '', direction: '', status: '' }; const outboundStatus = statusMapped?.[messageData.status] || messageData.status || ''; const status = messageData.direction === 'inbound' ? '' : outboundStatus; const errors = messageData.status === 'error' ? { errorMessage: '未知错误', errorCode: '' } : {}; const message = { id: `evt_${generateId().replace(/-/g, '')}`, type: eventTypeMapped[messageType], apiVersion: 'v2', webhooksource: 'wai', createTime: new Date(new Date().getTime() + 8 * 60 * 60 * 1000).toISOString(), // GMT +8 domainName, conversationid: messageData?.externalId || '', whatsAppNo: messageData?.whatsAppNo || messageData.from || messageData.to || '', waiMessage: { ...defaultContent, ...messageData, ...(messageData.updateTime && !isEmpty(status) ? { [timeField[messageData.status]]: messageData.updateTime } : {}), type: messageData.type || messageData.msgtype || '', id: uniqueMsgId(messageData) || messageData.id || generateId(), wamid: messageData.id || '', // direction: directionField[messageType], status, ...errors, externalId: messageData?.externalId || '', }, }; return message; }; const webhookBodyFill = (webhookBody, savedData) => { const DBDataObj = DbData(savedData); Object.assign(webhookBody.waiMessage, DBDataObj, pick(savedData, ['status'])); return webhookBody; }; /** * WhatsApp 连接事件 * * connection:connect * * connection:open * * connection:close */ const setupConnectionHandler = () => { whatsappEvents.on('connection:connect', async connectionData => { try { getUserLogger(connectionData.phone).info({ msg: `连接https://web.whatsapp.com/`, connectionData }); // find Or create await addCurrentConnection({ ...objectMapper(connectionData, { phone: [{ key: 'wa_id' }], channelId: 'channel_id', createTimestamp: 'createtime', version: 'version' }, false), service_type: 'baileys', status: 'connecting', }); } catch (error) { logger.error({ connectionData, error }, 'error add connection'); } }); whatsappEvents.on('connection:open', async connectionData => { // todo: 更新实例 try { getUserLogger(connectionData.whatsAppNo).info({ msg: `已登录`, connectionData }); await updateConnection( { ...objectMapper(connectionData, { whatsAppNo: [{ key: 'wa_id' }], channelId: 'channel_id' }), service_type: 'baileys', closetime: null, }, { connect_domain: domain, connect_name: domainName }, ); const webhookBody = webhookBodyBuilder({ ...connectionData, to: connectionData.whatsAppNo, connection: 'open' }, 'creds:update'); await callWebhook(webhookBody); } catch (error) { logger.error({ connectionData, error }, 'error add connection'); } }); whatsappEvents.on('connection:close', async connectionData => { try { getUserLogger(connectionData.whatsAppNo).warn({ msg: `断开连接`, connectionData }); sessionStore.removeSession(connectionData.channelId); await updateConnection( { ...objectMapper(connectionData, { whatsAppNo: [{ key: 'wa_id' }], channelId: 'channel_id' }), service_type: 'baileys', }, { connect_domain: domain, connect_name: domainName }, ); const webhookBody = webhookBodyBuilder({ ...connectionData, to: connectionData.whatsAppNo, connection: 'offline' }, 'creds:update'); await callWebhook(webhookBody); } catch (error) { logger.error({ connectionData, error }, 'error close connection'); } }); whatsappEvents.on('connection:update', async connectionData => { try { getUserLogger(connectionData.whatsAppNo).warn({ msg: 'connection:update', connectionData }); await updateConnection( { ...objectMapper(connectionData, { whatsAppNo: [{ key: 'wa_id' }], channelId: 'channel_id' }), service_type: 'baileys', }, { connect_domain: domain, connect_name: domainName }, ); const webhookBody = webhookBodyBuilder({ ...connectionData, to: connectionData.whatsAppNo, connection: connectionData.status }, 'creds:update'); await callWebhook(webhookBody); } catch (error) { logger.error({ connectionData, error }, 'error close connection'); } }); }; /** * 监听 Creds 更新事件 * * creds:update */ const setupCredsHandler = () => { whatsappEvents.on('creds:update', async creds => { try { getUserLogger(creds.whatsAppNo).info({ msg: `二维码`, creds }); const webhookBody = webhookBodyBuilder({ ...creds, to: creds.whatsAppNo, connection: '' }, 'creds:update'); await callWebhook(webhookBody); } catch (error) { logger.error({ creds, error }, 'error update creds'); } }); }; /** * WhatsApp 消息事件 * * pending ⏰ -> saved ⏰ -> sent(*) ✔ -> delivered ✔✔ -> read ✅ * * saved -> pending -> sent(*) -> delivered -> read */ const setupMessageHandler = async () => { const messageListner = async ({ eventName, messageData }) => { // if (messageData.status === 'pending') { // logger.info('message pending', messageData); // return false; // } const { from, to, whatsAppNo } = messageData; const _whatsAppNo = whatsAppNo || from || to; getUserLogger(_whatsAppNo).info({ eventName, messageData }); try { const now = new Date(new Date().getTime() + 60 * 60 * 1000).toISOString(); const savedId = uniqueMsgId(messageData); const targetUpsert = messageData.externalId ? { actionId: messageData.externalId } : { id: savedId }; const savedMsg = await getOutboundMessage(targetUpsert); const bixFields = pick(savedMsg, ['actionId', 'externalId']); const savedTimeFields = pick(savedMsg, Object.values(timeField)); logger.info('message evt\n', eventName, messageData, savedMsg); const _type = messageData?.type || savedMsg?.msgtype || 'unresolvable'; const typeField = { msgtype: _type }; // fix: type 空 const webhookBody = webhookBodyBuilder({ ...savedTimeFields, ...messageData, ...bixFields, ...typeField }, eventName); const { waiMessage } = webhookBody; const timeFields = pick(waiMessage, Object.values(timeField)); const upsertFields = pick(waiMessage, ['direction', 'wamid', 'id', 'status']); upsertFields.evt_id = webhookBody.id; const pusher = { customerProfile_id: waiMessage.customerProfile?.id || '', customerProfile_name: waiMessage.customerProfile?.name || '' }; const record = objectMapper(waiMessage, { from: 'from', to: 'to', status: 'msg_status', type: 'msgtype' }, false); const waiContentFieldsToDB = messageData.type ? waiContentToDB(messageData) : {}; // 是否空数据. 存储有数据的事件原文 const upsertMsgOrigin = !isEmpty(omitEmpty(waiContentFieldsToDB)) || isEmpty(savedMsg); const msgOrigin = upsertMsgOrigin ? { message_origin: JSON.stringify(messageData) } : {}; if (isEmpty(savedMsg.IVADS_link) && ['image', 'sticker', 'audio', 'video', 'document'].includes(messageData.type)) { // 存储文件 const filePath = messageData[messageData.type].filePath; const webLink = await uploadMediaFile(filePath); waiContentFieldsToDB.IVADS_link = webLink; } const currenctStatus = getCorrectOrderStatus(savedMsg.msg_status, waiMessage.status); record.msg_status = currenctStatus; delete upsertFields.status; const readyUpsert = omitEmpty({ ...timeFields, ...pusher, ...waiContentFieldsToDB, ...record }); const msgRow = await upsertOutboundMessage({ ...upsertFields, ...readyUpsert, ...typeField, ...msgOrigin }, targetUpsert); if (_type !== 'unresolvable') { // 把内容加上, 否则前端没显示 await callWebhook(webhookBodyFill(webhookBody, { ...msgRow, status: msgRow.msg_status || '' })); } // 只对新消息, 同步的历史消息不再处理 if (messageData.eventSource.includes('notify')) { whatsappEvents.emit(`user:${eventName}`, msgRow); } } catch (error) { logger.error({ messageData, error }, 'error call webhook'); } }; const queueProcessor = await createAsyncQueueProcessor(messageListner, { retryOptions: { retries: 1, minTimeout: 1000 }, }); messageEventNames.forEach(eventName => { // whatsappEvents.on(eventName, async messageData => await messageListner(messageData, eventName)); whatsappEvents.on(eventName, messageData => { const savedId = uniqueMsgId(messageData); queueProcessor.enqueue(savedId, { eventName, messageData }); }); }); }; function setupWhatsappHandler() { setupConnectionHandler(); setupCredsHandler(); setupMessageHandler(); } /** * 发送消息 * @param {*} _data { from, to, msgtype, msgcontent: { body, image, }, actionId } */ async function sendMessage(_data) { const { from } = _data; const toUpsert = ctxToDB(_data); await createOutboundMessage({ ...toUpsert }); const messagePayload = ctxToSendBuilder(_data); whatsappEvents.emit('request.' + from + '.send.message', messagePayload); } /** * 登出: 当前服务的所有连接 */ async function resetCurrentConnections() { await resetConnection(); } /** * 登录: 当前服务的所有连接 */ async function loginCurrentConnections() { const currents = await getConnection({ connect_domain: domain, connect_name: domainName, status: ['open', 'hosted'] }); for (const user of currents) { const { wa_id: waId } = user; const phone = waId.replace('+', ''); const whatsApp1 = await createWhatsApp(phone); whatsApp1.start(); sessionStore.createSession(phone, whatsApp1); } } module.exports = { setupWhatsappHandler, resetCurrentConnections, loginCurrentConnections, sendMessage };