'use strict'; const generateId = require('../../utils/generateId.util'); const { domain, name: domainName } = require('../../config').server; const whatsappEvents = require('../emitter'); const { callWebhook } = require('../webhook'); const { addConnection, updateConnection, addCurrentConnection, resetConnection } = require('../../services/connections.service'); const { objectMapper, pick } = require('../../utils/commons.util'); const { sessionStore } = require('..'); const { getOutboundMessage, upsertOutboundMessage } = require('../../services/outbound_messages.service'); const logger = require('../../utils/logger.util'); const { DbData, } = require('../../helper/wai.msg.helper'); const connectionEventNames = ['connection:connect', 'connection:open', 'connection:close']; const messageEventNames = ['message:received', 'message:updated']; const eventTypeMapped = { 'message:received': 'wai.message.received', 'message:updated': 'wai.message.updated', 'creds:update': 'wai.creds.update', }; const timeField = { saved: 'createTime', pending: 'createTime', sent: 'sendTime', delivered: 'deliverTime', read: 'readTime', failed: 'updateTime' }; const statusMapped = { saved: 'accepted', pending: 'accepted', sent: 'sent', delivered: 'delivered', read: 'read', failed: 'failed' }; const directionField = { 'message:received': 'inbound', 'message:updated': 'outbound' }; const directionPrefix = { inbound: 'in_', outbound: 'out_' }; /** * @returns {Object} webhookBody */ const webhookBodyBuilder = (messageData, messageType) => { const defaultContent = { id: '', from: '', to: '', externalId: '', type: '', direction: '', status: '', }; const message = { id: `evt_${generateId().replace(/-/g, '')}`, type: eventTypeMapped[messageType], apiVersion: 'v2', webhooksource: 'wai', createTime: new Date(new Date().getTime() + 8 * 60 * 60 * 1000).toISOString(), // GMT +8 domainName, conversationid: messageData?.externalId || '', whatsAppNo: messageData?.whatsAppNo || '', waiMessage: { ...defaultContent, ...messageData, ...(messageData.updateTime ? { [timeField[messageData.status]]: messageData.updateTime } : {}), id: messageData.id && messageData.direction ? `${directionPrefix[messageData.direction]}${messageData.id}` : (messageData.id || generateId()), wamid: messageData.id || '', // direction: directionField[messageType], status: statusMapped?.[messageData.status] || messageData.status || '', externalId: messageData?.externalId || '', }, }; return message; }; const webhookBodyFill = (webhookBody, messageData) => { const DBDataObj = DbData(messageData); Object.assign(webhookBody.waiMessage, DBDataObj); return webhookBody; }; /** * WhatsApp 连接事件 */ const setupConnectionHandler = () => { // connectionEventNames.forEach(eventName => { whatsappEvents.on('connection:connect', async connectionData => { try { // find Or create await addCurrentConnection({ ...objectMapper(connectionData, { phone: [{ key: 'wa_id' }, { key: 'sesson_id' }], channelId: 'channel_id', createTimestamp: 'createtime', version: 'version' }, false), service_type: 'baileys', status: 'connecting', }); } catch (error) { logger.error({ connectionData, error }, 'error add connection'); } }); whatsappEvents.on('connection:open', async connectionData => { logger.info(`event ${'connection:open'}`, connectionData); // todo: 更新实例 try { await updateConnection( { ...objectMapper(connectionData, { whatsAppNo: [{ key: 'wa_id' }, { key: 'sesson_id' }], channelId: 'channel_id' }), service_type: 'baileys', closetime: null, }, { connect_domain: domain, connect_name: domainName }, ); const webhookBody = webhookBodyBuilder({ ...connectionData, to: connectionData.whatsAppNo, connection: 'open' }, 'creds:update'); await callWebhook(webhookBody); } catch (error) { logger.error({ connectionData, error }, 'error add connection'); } }); whatsappEvents.on('connection:close', async connectionData => { logger.info(`event ${'connection:close'}`, connectionData); try { sessionStore.removeSession(connectionData.channelId); await updateConnection( { ...objectMapper(connectionData, { whatsAppNo: [{ key: 'wa_id' }, { key: 'sesson_id' }], channelId: 'channel_id' }), service_type: 'baileys', }, { connect_domain: domain, connect_name: domainName, channel_id: connectionData.channelId }, ); // todo: 通知前端: 重新扫码 const webhookBody = webhookBodyBuilder({ ...connectionData, to: connectionData.whatsAppNo, connection: 'offline' }, 'creds:update'); await callWebhook(webhookBody); } catch (error) { logger.error({ connectionData, error }, 'error add connection'); } }); // }); }; /** * 监听 Creds 更新事件 */ const setupCredsHandler = () => { whatsappEvents.on('creds:update', async creds => { logger.info('creds:update', creds); try { const webhookBody = webhookBodyBuilder({ ...creds, to: creds.whatsAppNo, connection: '' }, 'creds:update'); await callWebhook(webhookBody); } catch (error) { logger.error({ creds, error }, 'error update creds'); } }); }; /** * WhatsApp 消息事件 * pending ⏰ -> saved ⏰ -> sent(*) ✔ -> delivered ✔✔ -> read ✅ * saved -> pending -> sent(*) -> delivered -> read */ const setupMessageHandler = () => { messageEventNames.forEach(eventName => { whatsappEvents.on(eventName, async messageData => { // if (messageData.status === 'pending') { // logger.info('message pending', messageData); // return false; // } try { const now = new Date(new Date().getTime() + 60 * 60 * 1000).toISOString(); const savedId = `${directionPrefix[messageData.direction]}${messageData.id}`; const targetUpsert = messageData.externalId ? { actionId: messageData.externalId } : { id: savedId }; const savedMsg = await getOutboundMessage(targetUpsert); const bixFields = pick(savedMsg, ['actionId', 'externalId']); logger.info('message evt\n', eventName, messageData, savedMsg); const typeField = { msgtype: messageData?.type || savedMsg?.msgtype || 'text' }; // fix: type 空 const webhookBody = webhookBodyBuilder({ ...messageData, ...bixFields, ...typeField }, eventName); const { waiMessage } = webhookBody; const timeFields = pick(waiMessage, [...Object.values(timeField), 'createTime', 'updateTime']); const upsertFields = pick(waiMessage, ['direction', 'wamid', 'id', 'status']); upsertFields.evt_id = webhookBody.id; const pusher = { customerProfile_id: waiMessage.customerProfile?.id || '', customerProfile_name: waiMessage.customerProfile?.name || '' }; const record = objectMapper(waiMessage, { from: 'from', to: 'to', status: 'msg_status', type: 'msgtype' }, false); const contentFields = waiMessage.type === 'text' ? { text_body: waiMessage.text.body } : {}; // const contentFieldsToDB = // todo: 现在只能收text 消息, 后续再加其他类型 const msgRow = await upsertOutboundMessage( { ...timeFields, ...upsertFields, ...pusher, ...contentFields, ...record, ...typeField, message_origin: savedMsg?.message_origin || JSON.stringify(messageData) }, targetUpsert, ); // console.log('upsert=========================', upsert); // todo: 把内容加上, 否则前端没显示 await callWebhook(webhookBodyFill(webhookBody, msgRow)); } catch (error) { logger.error({ messageData, error }, 'error call webhook'); } }); }); }; function setupWhatsappHandler() { setupConnectionHandler(); setupCredsHandler(); setupMessageHandler(); } /** * 登出: 当前服务的所有连接 */ async function resetCurrentConnection() { await resetConnection(); } module.exports = { setupWhatsappHandler, resetCurrentConnection };