You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Global-sales/wai-server/core/handler/whatsappHandler.js

222 lines
9.3 KiB
JavaScript

6 months ago
'use strict';
const generateId = require('../../utils/generateId.util');
const { domain, name: domainName } = require('../../config').server;
const whatsappEvents = require('../emitter');
const { callWebhook } = require('../webhook');
const { sessionStore } = require('..');
const { createWhatsApp } = require('../../core/baileys');
const { updateConnection, addCurrentConnection, resetConnection, getConnection } = require('../../services/connections.service');
6 months ago
const { getOutboundMessage, upsertOutboundMessage } = require('../../services/outbound_messages.service');
const { objectMapper, pick, isEmpty } = require('../../utils/commons.util');
const { logger, getUserLogger } = require('../../utils/logger.util');
const { DbData, waiContentToDB, uploadMediaFile } = require('../../helper/wai.msg.helper');
6 months ago
const connectionEventNames = ['connection:connect', 'connection:open', 'connection:close'];
const messageEventNames = ['message:received', 'message:updated'];
const eventTypeMapped = {
'message:received': 'wai.message.received',
'message:updated': 'wai.message.updated',
'creds:update': 'wai.creds.update',
};
const timeField = { saved: 'createTime', pending: 'createTime', sent: 'sendTime', delivered: 'deliverTime', read: 'readTime', failed: 'updateTime' };
const statusMapped = { saved: 'accepted', pending: 'accepted', sent: 'sent', delivered: 'delivered', read: 'read', failed: 'failed' };
const directionField = { 'message:received': 'inbound', 'message:updated': 'outbound' };
const directionPrefix = { inbound: 'in_', outbound: 'out_' };
const directionIdUserPrefix = { inbound: 'to', outbound: 'from' };
6 months ago
const uniqueMsgId = msg => (msg.id && msg.direction ? `${directionPrefix[msg.direction]}${msg[directionIdUserPrefix[msg.direction]].replace('+', '')}_${msg.id}` : undefined);
6 months ago
/**
* @returns {Object} webhookBody
*/
const webhookBodyBuilder = (messageData, messageType) => {
const defaultContent = { id: '', from: '', to: '', externalId: '', type: '', direction: '', status: '' };
const message = {
id: `evt_${generateId().replace(/-/g, '')}`,
type: eventTypeMapped[messageType],
apiVersion: 'v2',
webhooksource: 'wai',
createTime: new Date(new Date().getTime() + 8 * 60 * 60 * 1000).toISOString(), // GMT +8
domainName,
conversationid: messageData?.externalId || '',
whatsAppNo: messageData?.whatsAppNo || messageData.from || messageData.to || '',
6 months ago
waiMessage: {
...defaultContent,
...messageData,
...(messageData.updateTime ? { [timeField[messageData.status]]: messageData.updateTime } : {}),
type: messageData.type || messageData.msgtype || '',
6 months ago
id: uniqueMsgId(messageData) || messageData.id || generateId(),
wamid: messageData.id || '',
// direction: directionField[messageType],
status: messageData.direction === 'inbound' ? '' : statusMapped?.[messageData.status] || messageData.status || '',
6 months ago
externalId: messageData?.externalId || '',
},
};
return message;
};
const webhookBodyFill = (webhookBody, messageData) => {
const DBDataObj = DbData(messageData);
Object.assign(webhookBody.waiMessage, DBDataObj);
return webhookBody;
};
/**
* WhatsApp 连接事件
* * connection:connect
* * connection:open
* * connection:close
6 months ago
*/
const setupConnectionHandler = () => {
whatsappEvents.on('connection:connect', async connectionData => {
try {
getUserLogger(connectionData.phone).info({ msg: `连接https://web.whatsapp.com/`, connectionData });
6 months ago
// find Or create
await addCurrentConnection({
...objectMapper(connectionData, { phone: [{ key: 'wa_id' }, { key: 'sesson_id' }], channelId: 'channel_id', createTimestamp: 'createtime', version: 'version' }, false),
service_type: 'baileys',
status: 'connecting',
});
} catch (error) {
logger.error({ connectionData, error }, 'error add connection');
}
});
whatsappEvents.on('connection:open', async connectionData => {
// todo: 更新实例
try {
getUserLogger(connectionData.whatsAppNo).info({ msg: `已登录`, connectionData });
6 months ago
await updateConnection(
{
...objectMapper(connectionData, { whatsAppNo: [{ key: 'wa_id' }, { key: 'sesson_id' }], channelId: 'channel_id' }),
service_type: 'baileys',
closetime: null,
},
{ connect_domain: domain, connect_name: domainName },
);
const webhookBody = webhookBodyBuilder({ ...connectionData, to: connectionData.whatsAppNo, connection: 'open' }, 'creds:update');
await callWebhook(webhookBody);
} catch (error) {
logger.error({ connectionData, error }, 'error add connection');
}
});
whatsappEvents.on('connection:close', async connectionData => {
try {
getUserLogger(connectionData.whatsAppNo).warn({ msg: `断开连接`, connectionData });
6 months ago
sessionStore.removeSession(connectionData.channelId);
await updateConnection(
{
...objectMapper(connectionData, { whatsAppNo: [{ key: 'wa_id' }, { key: 'sesson_id' }], channelId: 'channel_id' }),
service_type: 'baileys',
},
{ connect_domain: domain, connect_name: domainName },
);
const webhookBody = webhookBodyBuilder({ ...connectionData, to: connectionData.whatsAppNo, connection: 'offline' }, 'creds:update');
await callWebhook(webhookBody);
} catch (error) {
logger.error({ connectionData, error }, 'error close connection');
}
});
};
/**
* 监听 Creds 更新事件
* * creds:update
6 months ago
*/
const setupCredsHandler = () => {
whatsappEvents.on('creds:update', async creds => {
try {
getUserLogger(creds.whatsAppNo).info({ msg: `二维码`, creds });
6 months ago
const webhookBody = webhookBodyBuilder({ ...creds, to: creds.whatsAppNo, connection: '' }, 'creds:update');
await callWebhook(webhookBody);
} catch (error) {
logger.error({ creds, error }, 'error update creds');
}
});
};
/**
* WhatsApp 消息事件
* * pending -> saved -> sent(*) -> delivered -> read
* * saved -> pending -> sent(*) -> delivered -> read
6 months ago
*/
const setupMessageHandler = () => {
messageEventNames.forEach(eventName => {
whatsappEvents.on(eventName, async messageData => {
// if (messageData.status === 'pending') {
// logger.info('message pending', messageData);
// return false;
// }
const { from, to, whatsAppNo } = messageData;
const _whatsAppNo = whatsAppNo || from || to;
getUserLogger(_whatsAppNo).info({ eventName, messageData });
6 months ago
try {
const now = new Date(new Date().getTime() + 60 * 60 * 1000).toISOString();
const savedId = uniqueMsgId(messageData);
const targetUpsert = messageData.externalId ? { actionId: messageData.externalId } : { id: savedId };
const savedMsg = await getOutboundMessage(targetUpsert);
const bixFields = pick(savedMsg, ['actionId', 'externalId']);
logger.info('message evt\n', eventName, messageData, savedMsg);
const _type = messageData?.type || savedMsg?.msgtype || 'text';
const typeField = { msgtype: _type }; // fix: type 空
const webhookBody = webhookBodyBuilder({ ...messageData, ...bixFields, ...typeField }, eventName);
const { waiMessage } = webhookBody;
const timeFields = pick(waiMessage, [...Object.values(timeField), 'createTime', 'updateTime']);
const upsertFields = pick(waiMessage, ['direction', 'wamid', 'id', 'status']);
upsertFields.evt_id = webhookBody.id;
const pusher = { customerProfile_id: waiMessage.customerProfile?.id || '', customerProfile_name: waiMessage.customerProfile?.name || '' };
const record = objectMapper(waiMessage, { from: 'from', to: 'to', status: 'msg_status', type: 'msgtype' }, false);
const waiContentFieldsToDB = messageData.type ? waiContentToDB(messageData) : {};
if (isEmpty(savedMsg.IVADS_link) && ['image', 'sticker', 'audio', 'video', 'document'].includes(waiMessage.type)) {
// 存储文件
const filePath = messageData[messageData.type].filePath;
const webLink = await uploadMediaFile(filePath);
waiContentFieldsToDB.IVADS_link = webLink;
}
6 months ago
const msgRow = await upsertOutboundMessage(
{ ...timeFields, ...upsertFields, ...pusher, ...waiContentFieldsToDB, ...record, ...typeField, message_origin: savedMsg?.message_origin || JSON.stringify(messageData) },
6 months ago
targetUpsert,
);
// 把内容加上, 否则前端没显示
6 months ago
await callWebhook(webhookBodyFill(webhookBody, msgRow));
} catch (error) {
logger.error({ messageData, error }, 'error call webhook');
}
});
});
};
function setupWhatsappHandler() {
setupConnectionHandler();
setupCredsHandler();
setupMessageHandler();
}
/**
* 登出: 当前服务的所有连接
*/
async function resetCurrentConnections() {
6 months ago
await resetConnection();
}
/**
* 登录: 当前服务的所有连接
*/
async function loginCurrentConnections() {
const currents = await getConnection({ connect_domain: domain, connect_name: domainName });
for (const user of currents) {
const { wa_id: waId } = user;
const phone = waId.replace('+', '');
const whatsApp1 = await createWhatsApp(phone);
whatsApp1.start();
sessionStore.createSession(phone, whatsApp1);
}
}
module.exports = { setupWhatsappHandler, resetCurrentConnections, loginCurrentConnections };