|
|
|
'use strict';
|
|
|
|
|
|
|
|
const generateId = require('../../utils/generateId.util');
|
|
|
|
const { domain, name: domainName } = require('../../config').server;
|
|
|
|
const whatsappEvents = require('../emitter');
|
|
|
|
const createAsyncQueueProcessor = require('../emitter/queueProcessor');
|
|
|
|
const { callWebhook } = require('../webhook');
|
|
|
|
const { sessionStore } = require('..');
|
|
|
|
const { createWhatsApp } = require('../../core/baileys');
|
|
|
|
const { updateConnection, addCurrentConnection, resetConnection, getConnection } = require('../../services/connections.service');
|
|
|
|
const { getOutboundMessage, upsertOutboundMessage, createOutboundMessage } = require('../../services/outbound_messages.service');
|
|
|
|
const { objectMapper, pick, isEmpty, omitEmpty } = require('../../utils/commons.util');
|
|
|
|
const { logger, getUserLogger } = require('../../utils/logger.util');
|
|
|
|
const { DbData, waiContentToDB, uploadMediaFile, ctxToSendBuilder, ctxToDB } = require('../../helper/wai.msg.helper');
|
|
|
|
|
|
|
|
const connectionEventNames = ['connection:connect', 'connection:open', 'connection:close'];
|
|
|
|
const messageEventNames = ['message:received', 'message:updated'];
|
|
|
|
|
|
|
|
const eventTypeMapped = {
|
|
|
|
'message:received': 'wai.message.received',
|
|
|
|
'message:updated': 'wai.message.updated',
|
|
|
|
'creds:update': 'wai.creds.update',
|
|
|
|
};
|
|
|
|
const timeField = { saved: 'createTime', pending: 'createTime', sent: 'sendTime', delivered: 'deliverTime', read: 'readTime', failed: 'updateTime', error: 'updateTime' };
|
|
|
|
const statusMapped = { saved: 'accepted', pending: 'accepted', sent: 'sent', delivered: 'delivered', read: 'read', failed: 'failed', error: 'failed' };
|
|
|
|
function getCorrectOrderStatus(currentStatus, newStatus) {
|
|
|
|
const statusOrder = ['accepted', 'failed', 'sent', 'delivered', 'read'];
|
|
|
|
if (!currentStatus || typeof currentStatus !== 'string') {
|
|
|
|
currentStatus = null; // default 'pending'
|
|
|
|
}
|
|
|
|
if (!newStatus || typeof newStatus !== 'string') {
|
|
|
|
return currentStatus;
|
|
|
|
}
|
|
|
|
if (!statusOrder.includes(newStatus)) {
|
|
|
|
// console.warn(`Unknown status: ${newStatus}`);
|
|
|
|
return currentStatus;
|
|
|
|
}
|
|
|
|
if (!currentStatus) {
|
|
|
|
return newStatus;
|
|
|
|
}
|
|
|
|
if (!statusOrder.includes(currentStatus)) {
|
|
|
|
// console.warn(`Unknown status: ${currentStatus}`);
|
|
|
|
return newStatus;
|
|
|
|
}
|
|
|
|
const currentIndex = statusOrder.indexOf(currentStatus);
|
|
|
|
const newIndex = statusOrder.indexOf(newStatus);
|
|
|
|
if (newIndex > currentIndex) {
|
|
|
|
return newStatus;
|
|
|
|
} else if (newIndex === currentIndex) {
|
|
|
|
return currentStatus;
|
|
|
|
} else {
|
|
|
|
// Optionally handle cases where the new status is "behind" the current one
|
|
|
|
// Options:
|
|
|
|
// 1. Ignore the new status (most common):
|
|
|
|
return currentStatus;
|
|
|
|
// 2. Allow "downgrades" (use with caution, may indicate errors):
|
|
|
|
// return newStatus;
|
|
|
|
// 3. Throw an error or log a warning:
|
|
|
|
// console.error(`Invalid status transition: ${currentStatus} -> ${newStatus}`);
|
|
|
|
// return currentStatus; // Or throw an error
|
|
|
|
}
|
|
|
|
}
|
|
|
|
const directionField = { 'message:received': 'inbound', 'message:updated': 'outbound' };
|
|
|
|
const directionPrefix = { inbound: 'in_', outbound: 'out_' };
|
|
|
|
const directionIdUserPrefix = { inbound: 'to', outbound: 'from' };
|
|
|
|
|
|
|
|
const uniqueMsgId = msg => (msg.id && msg.direction ? `${directionPrefix[msg.direction]}${msg[directionIdUserPrefix[msg.direction]].replace('+', '')}_${msg.id}` : undefined);
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @returns {Object} webhookBody
|
|
|
|
*/
|
|
|
|
const webhookBodyBuilder = (messageData, messageType) => {
|
|
|
|
const defaultContent = { id: '', from: '', to: '', externalId: '', type: '', direction: '', status: '' };
|
|
|
|
const status = messageData.direction === 'inbound' ? '' : statusMapped?.[messageData.status] || messageData.status || '';
|
|
|
|
const errors = messageData.status === 'error' ? { errorMessage: '未知错误', errorCode: '' } : {};
|
|
|
|
const message = {
|
|
|
|
id: `evt_${generateId().replace(/-/g, '')}`,
|
|
|
|
type: eventTypeMapped[messageType],
|
|
|
|
apiVersion: 'v2',
|
|
|
|
webhooksource: 'wai',
|
|
|
|
createTime: new Date(new Date().getTime() + 8 * 60 * 60 * 1000).toISOString(), // GMT +8
|
|
|
|
domainName,
|
|
|
|
conversationid: messageData?.externalId || '',
|
|
|
|
whatsAppNo: messageData?.whatsAppNo || messageData.from || messageData.to || '',
|
|
|
|
waiMessage: {
|
|
|
|
...defaultContent,
|
|
|
|
...messageData,
|
|
|
|
...(messageData.updateTime && !isEmpty(status) ? { [timeField[messageData.status]]: messageData.updateTime } : {}),
|
|
|
|
type: messageData.type || messageData.msgtype || '',
|
|
|
|
id: uniqueMsgId(messageData) || messageData.id || generateId(),
|
|
|
|
wamid: messageData.id || '',
|
|
|
|
// direction: directionField[messageType],
|
|
|
|
status,
|
|
|
|
...errors,
|
|
|
|
externalId: messageData?.externalId || '',
|
|
|
|
},
|
|
|
|
};
|
|
|
|
return message;
|
|
|
|
};
|
|
|
|
|
|
|
|
const webhookBodyFill = (webhookBody, savedData) => {
|
|
|
|
const DBDataObj = DbData(savedData);
|
|
|
|
Object.assign(webhookBody.waiMessage, DBDataObj, pick(savedData, ['status']));
|
|
|
|
return webhookBody;
|
|
|
|
};
|
|
|
|
|
|
|
|
/**
|
|
|
|
* WhatsApp 连接事件
|
|
|
|
* * connection:connect
|
|
|
|
* * connection:open
|
|
|
|
* * connection:close
|
|
|
|
*/
|
|
|
|
const setupConnectionHandler = () => {
|
|
|
|
whatsappEvents.on('connection:connect', async connectionData => {
|
|
|
|
try {
|
|
|
|
getUserLogger(connectionData.phone).info({ msg: `连接https://web.whatsapp.com/`, connectionData });
|
|
|
|
// find Or create
|
|
|
|
await addCurrentConnection({
|
|
|
|
...objectMapper(connectionData, { phone: [{ key: 'wa_id' }], channelId: 'channel_id', createTimestamp: 'createtime', version: 'version' }, false),
|
|
|
|
service_type: 'baileys',
|
|
|
|
status: 'connecting',
|
|
|
|
});
|
|
|
|
} catch (error) {
|
|
|
|
logger.error({ connectionData, error }, 'error add connection');
|
|
|
|
}
|
|
|
|
});
|
|
|
|
whatsappEvents.on('connection:open', async connectionData => {
|
|
|
|
// todo: 更新实例
|
|
|
|
try {
|
|
|
|
getUserLogger(connectionData.whatsAppNo).info({ msg: `已登录`, connectionData });
|
|
|
|
await updateConnection(
|
|
|
|
{
|
|
|
|
...objectMapper(connectionData, { whatsAppNo: [{ key: 'wa_id' }], channelId: 'channel_id' }),
|
|
|
|
service_type: 'baileys',
|
|
|
|
closetime: null,
|
|
|
|
},
|
|
|
|
{ connect_domain: domain, connect_name: domainName },
|
|
|
|
);
|
|
|
|
const webhookBody = webhookBodyBuilder({ ...connectionData, to: connectionData.whatsAppNo, connection: 'open' }, 'creds:update');
|
|
|
|
await callWebhook(webhookBody);
|
|
|
|
} catch (error) {
|
|
|
|
logger.error({ connectionData, error }, 'error add connection');
|
|
|
|
}
|
|
|
|
});
|
|
|
|
whatsappEvents.on('connection:close', async connectionData => {
|
|
|
|
try {
|
|
|
|
getUserLogger(connectionData.whatsAppNo).warn({ msg: `断开连接`, connectionData });
|
|
|
|
sessionStore.removeSession(connectionData.channelId);
|
|
|
|
await updateConnection(
|
|
|
|
{
|
|
|
|
...objectMapper(connectionData, { whatsAppNo: [{ key: 'wa_id' }], channelId: 'channel_id' }),
|
|
|
|
service_type: 'baileys',
|
|
|
|
},
|
|
|
|
{ connect_domain: domain, connect_name: domainName },
|
|
|
|
);
|
|
|
|
const webhookBody = webhookBodyBuilder({ ...connectionData, to: connectionData.whatsAppNo, connection: 'offline' }, 'creds:update');
|
|
|
|
await callWebhook(webhookBody);
|
|
|
|
} catch (error) {
|
|
|
|
logger.error({ connectionData, error }, 'error close connection');
|
|
|
|
}
|
|
|
|
});
|
|
|
|
};
|
|
|
|
|
|
|
|
/**
|
|
|
|
* 监听 Creds 更新事件
|
|
|
|
* * creds:update
|
|
|
|
*/
|
|
|
|
const setupCredsHandler = () => {
|
|
|
|
whatsappEvents.on('creds:update', async creds => {
|
|
|
|
try {
|
|
|
|
getUserLogger(creds.whatsAppNo).info({ msg: `二维码`, creds });
|
|
|
|
const webhookBody = webhookBodyBuilder({ ...creds, to: creds.whatsAppNo, connection: '' }, 'creds:update');
|
|
|
|
await callWebhook(webhookBody);
|
|
|
|
} catch (error) {
|
|
|
|
logger.error({ creds, error }, 'error update creds');
|
|
|
|
}
|
|
|
|
});
|
|
|
|
};
|
|
|
|
|
|
|
|
/**
|
|
|
|
* WhatsApp 消息事件
|
|
|
|
* * pending ⏰ -> saved ⏰ -> sent(*) ✔ -> delivered ✔✔ -> read ✅
|
|
|
|
* * saved -> pending -> sent(*) -> delivered -> read
|
|
|
|
*/
|
|
|
|
const setupMessageHandler = async () => {
|
|
|
|
const messageListner = async ({ eventName, messageData }) => {
|
|
|
|
// if (messageData.status === 'pending') {
|
|
|
|
// logger.info('message pending', messageData);
|
|
|
|
// return false;
|
|
|
|
// }
|
|
|
|
const { from, to, whatsAppNo } = messageData;
|
|
|
|
const _whatsAppNo = whatsAppNo || from || to;
|
|
|
|
getUserLogger(_whatsAppNo).info({ eventName, messageData });
|
|
|
|
try {
|
|
|
|
const now = new Date(new Date().getTime() + 60 * 60 * 1000).toISOString();
|
|
|
|
|
|
|
|
const savedId = uniqueMsgId(messageData);
|
|
|
|
const targetUpsert = messageData.externalId ? { actionId: messageData.externalId } : { id: savedId };
|
|
|
|
const savedMsg = await getOutboundMessage(targetUpsert);
|
|
|
|
const bixFields = pick(savedMsg, ['actionId', 'externalId']);
|
|
|
|
const savedTimeFields = pick(savedMsg, Object.values(timeField));
|
|
|
|
logger.info('message evt\n', eventName, messageData, savedMsg);
|
|
|
|
const _type = messageData?.type || savedMsg?.msgtype || 'unresolvable';
|
|
|
|
const typeField = { msgtype: _type }; // fix: type 空
|
|
|
|
|
|
|
|
const webhookBody = webhookBodyBuilder({ ...savedTimeFields, ...messageData, ...bixFields, ...typeField }, eventName);
|
|
|
|
const { waiMessage } = webhookBody;
|
|
|
|
|
|
|
|
const timeFields = pick(waiMessage, Object.values(timeField));
|
|
|
|
const upsertFields = pick(waiMessage, ['direction', 'wamid', 'id', 'status']);
|
|
|
|
upsertFields.evt_id = webhookBody.id;
|
|
|
|
const pusher = { customerProfile_id: waiMessage.customerProfile?.id || '', customerProfile_name: waiMessage.customerProfile?.name || '' };
|
|
|
|
const record = objectMapper(waiMessage, { from: 'from', to: 'to', status: 'msg_status', type: 'msgtype' }, false);
|
|
|
|
const waiContentFieldsToDB = messageData.type ? waiContentToDB(messageData) : {};
|
|
|
|
// 是否空数据. 存储有数据的事件原文
|
|
|
|
const upsertMsgOrigin = !isEmpty(omitEmpty(waiContentFieldsToDB)) || isEmpty(savedMsg);
|
|
|
|
const msgOrigin = upsertMsgOrigin ? { message_origin: JSON.stringify(messageData) } : {};
|
|
|
|
if (isEmpty(savedMsg.IVADS_link) && ['image', 'sticker', 'audio', 'video', 'document'].includes(messageData.type)) {
|
|
|
|
// 存储文件
|
|
|
|
const filePath = messageData[messageData.type].filePath;
|
|
|
|
const webLink = await uploadMediaFile(filePath);
|
|
|
|
waiContentFieldsToDB.IVADS_link = webLink;
|
|
|
|
}
|
|
|
|
const currenctStatus = getCorrectOrderStatus(savedMsg.msg_status, waiMessage.status);
|
|
|
|
record.msg_status = currenctStatus;
|
|
|
|
delete upsertFields.status;
|
|
|
|
|
|
|
|
const readyUpsert = omitEmpty({ ...timeFields, ...pusher, ...waiContentFieldsToDB, ...record });
|
|
|
|
const msgRow = await upsertOutboundMessage({ ...upsertFields, ...readyUpsert, ...typeField, ...msgOrigin }, targetUpsert);
|
|
|
|
|
|
|
|
whatsappEvents.emit(`user:${eventName}`, msgRow);
|
|
|
|
|
|
|
|
if (_type !== 'unresolvable') {
|
|
|
|
// 把内容加上, 否则前端没显示
|
|
|
|
await callWebhook(webhookBodyFill(webhookBody, { ...msgRow, status: msgRow.msg_status || '' }));
|
|
|
|
}
|
|
|
|
} catch (error) {
|
|
|
|
logger.error({ messageData, error }, 'error call webhook');
|
|
|
|
}
|
|
|
|
};
|
|
|
|
const queueProcessor = await createAsyncQueueProcessor(messageListner, {
|
|
|
|
retryOptions: { retries: 1, minTimeout: 1000 },
|
|
|
|
});
|
|
|
|
|
|
|
|
messageEventNames.forEach(eventName => {
|
|
|
|
// whatsappEvents.on(eventName, async messageData => await messageListner(messageData, eventName));
|
|
|
|
whatsappEvents.on(eventName, messageData => {
|
|
|
|
const savedId = uniqueMsgId(messageData);
|
|
|
|
queueProcessor.enqueue(savedId, { eventName, messageData });
|
|
|
|
});
|
|
|
|
});
|
|
|
|
};
|
|
|
|
|
|
|
|
function setupWhatsappHandler() {
|
|
|
|
setupConnectionHandler();
|
|
|
|
setupCredsHandler();
|
|
|
|
setupMessageHandler();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* 发送消息
|
|
|
|
* @param {*} _data { from, to, msgtype, msgcontent: { body, image, }, actionId }
|
|
|
|
*/
|
|
|
|
async function sendMessage(_data) {
|
|
|
|
const { from } = _data;
|
|
|
|
const toUpsert = ctxToDB(_data);
|
|
|
|
await createOutboundMessage({ ...toUpsert });
|
|
|
|
const messagePayload = ctxToSendBuilder(_data);
|
|
|
|
whatsappEvents.emit('request.' + from + '.send.message', messagePayload);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* 登出: 当前服务的所有连接
|
|
|
|
*/
|
|
|
|
async function resetCurrentConnections() {
|
|
|
|
await resetConnection();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* 登录: 当前服务的所有连接
|
|
|
|
*/
|
|
|
|
async function loginCurrentConnections() {
|
|
|
|
const currents = await getConnection({ connect_domain: domain, connect_name: domainName, status: 'open' });
|
|
|
|
for (const user of currents) {
|
|
|
|
const { wa_id: waId } = user;
|
|
|
|
const phone = waId.replace('+', '');
|
|
|
|
const whatsApp1 = await createWhatsApp(phone);
|
|
|
|
whatsApp1.start();
|
|
|
|
sessionStore.createSession(phone, whatsApp1);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
module.exports = { setupWhatsappHandler, resetCurrentConnections, loginCurrentConnections, sendMessage };
|