From 25d52e75e744f63fb427027c5c09563475b43cd9 Mon Sep 17 00:00:00 2001 From: Lei OT Date: Thu, 26 Dec 2024 16:19:34 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8F=91=E9=80=81=E6=B6=88=E6=81=AF,=20webhook?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wai-server/api/channels/channel.controller.js | 27 ++++++++------- wai-server/api/messages/message.controller.js | 7 ++-- wai-server/core/services/whatsappHandler.js | 33 ++++++++++++++----- wai-server/models/outbound_messages.js | 1 + wai-server/services/connections.service.js | 4 +-- .../services/outbound_messages.service.js | 26 ++++++++++++--- 6 files changed, 65 insertions(+), 33 deletions(-) diff --git a/wai-server/api/channels/channel.controller.js b/wai-server/api/channels/channel.controller.js index e8d9f0b..71d0967 100644 --- a/wai-server/api/channels/channel.controller.js +++ b/wai-server/api/channels/channel.controller.js @@ -2,22 +2,23 @@ const { sessionService } = require('../../core'); // Import from core/index.js const { createWhatsApp } = require('../../core/baileys'); // Import from core/index.js const { getConnection } = require('../../services/connections.service'); const { objectMapper, isEmpty } = require('../../utils/commons.util'); -const { domain } = require('../../config').server; +const { domain, name: domainName } = require('../../config').server; exports.newConnect = async ctx => { try { const { phone } = ctx.query; - // const findSession = await getConnection({ sesson_id: phone, connect_domain: domain }); + const findSession = await getConnection({ sesson_id: phone, status: ['open', 'connecting'] }); // connect_domain: domain // // todo: 只有一条 - // if (!isEmpty(findSession)) { - // const { sesson_id: sessonId, status } = findSession[0]; - // if (['open', 'connecting'].includes(status)) { - // const sock = sessionService.getSession(sessonId); - // if (!isEmpty(sock)) { - // return sock; - // } - // } - // } + if (!isEmpty(findSession)) { + const { sesson_id: sessionId, status, ...connection } = findSession[0]; + // if (['open', 'connecting'].includes(status)) { + // const sock = sessionService.getSession(sessionId); + // if (!isEmpty(sock)) { + // return sock; + // } + // } + // return { qr: '', phone, sessionId, ...connection }; // 返回已登录 + } const whatsApp1 = await createWhatsApp(phone); const existsSession = sessionService.getSession(phone); let qr; @@ -26,7 +27,7 @@ exports.newConnect = async ctx => { const { sessionId } = sessionService.createSession(phone, whatsApp1); // } ctx.assert(whatsApp1, 503, 'No available connections'); - return { qr, phone, sessionId: phone }; + return { qr, phone, sessionId: phone, domainName }; } catch (error) { console.error('create connection error', error); ctx.assert(null, 500, 'Failed to create connection or generate QR code.'); @@ -36,6 +37,4 @@ exports.newConnect = async ctx => { exports.getAll = async () => { const findConnection = await getConnection({}); return findConnection; - // const sessions = sessionService.sessions; - // return Array.from(sessions); }; diff --git a/wai-server/api/messages/message.controller.js b/wai-server/api/messages/message.controller.js index 7d4efbf..c534f67 100644 --- a/wai-server/api/messages/message.controller.js +++ b/wai-server/api/messages/message.controller.js @@ -2,7 +2,7 @@ const generateId = require('../../utils/generateId.util'); const { sessionService } = require('../../core'); -// const { upsertOutboundMessage } = require('../../services/outbound_messages.service'); +const { upsertOutboundMessage } = require('../../services/outbound_messages.service'); function sleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)); @@ -23,8 +23,9 @@ exports.sendText = async ctx => { } // return wsToSend; try { - wsToSend.sendTextMessage(to, content); - // await upsertOutboundMessage(); + const sockMsg = wsToSend.sendTextMessage(to, content); + console.log(JSON.stringify(sockMsg, undefined, 2)); + await upsertOutboundMessage(null, { ...ctx.request.body, id: sockMsg.key }); return 'Message sent successfully'; // { wsToSend, ret: 'Message sent successfully' }; } catch (error) { console.error('Error sending message:', error); diff --git a/wai-server/core/services/whatsappHandler.js b/wai-server/core/services/whatsappHandler.js index 0b993a2..4dd7804 100644 --- a/wai-server/core/services/whatsappHandler.js +++ b/wai-server/core/services/whatsappHandler.js @@ -1,10 +1,11 @@ const generateId = require('../../utils/generateId.util'); -const { domain } = require('../../config').server; +const { domain, name: domainName } = require('../../config').server; const whatsappEvents = require('../emitter'); const { callWebhook } = require('../webhook'); const { addConnection, updateConnection, addCurrentConnection, resetConnection } = require('../../services/connections.service'); const { objectMapper } = require('../../utils/commons.util'); const { sessionService } = require('..'); +const { getOutboundMessage, upsertOutboundMessage } = require('../../services/outbound_messages.service'); const logger = console; @@ -17,6 +18,10 @@ const eventTypeMapped = { }; const timeField = { sent: 'sendTime', delivered: 'deliverTime', read: 'readTime' }; const directionField = { 'message:received': 'inbound', 'message:updated': 'outbound' }; + +/** + * @returns {Object} webhookBody + */ const webhookBodyBuilder = (messageData, messageType) => { const message = { id: `evt_${generateId().replace(/-/g, '')}`, @@ -24,12 +29,13 @@ const webhookBodyBuilder = (messageData, messageType) => { apiVersion: 'v2', webhooksource: 'wai', createTime: new Date(new Date().getTime() + 8 * 60 * 60 * 1000).toISOString(), // GMT +8 + domainName, waiMessage: { ...messageData, ...(messageData.updateTime ? { [timeField[messageData.status]]: messageData.updateTime } : {}), wamid: messageData.id, direction: directionField[messageType], - externalId: '-1', // todo: + // externalId: '-1', // todo: }, }; return message; @@ -42,7 +48,7 @@ const setupConnectionHandler = () => { try { // find Or create await addCurrentConnection({ - ...objectMapper(connectionData, { phone: [{ key: 'wa_id' }, { key: 'sesson_id' }], channelId: 'channel_id', createTimestamp: 'createtime' }), + ...objectMapper(connectionData, { phone: [{ key: 'wa_id' }, { key: 'sesson_id' }], channelId: 'channel_id', createTimestamp: 'createtime' }, false), service_type: 'baileys', status: 'connecting', }); @@ -69,10 +75,13 @@ const setupConnectionHandler = () => { whatsappEvents.on('connection:close', async connectionData => { try { sessionService.removeSession(connectionData.sesson_id); - await updateConnection({ - ...objectMapper(connectionData, { whatsAppNo: [{ key: 'wa_id' }, { key: 'sesson_id' }], channelId: 'channel_id' }), - service_type: 'baileys', - }); + await updateConnection( + { + ...objectMapper(connectionData, { whatsAppNo: [{ key: 'wa_id' }, { key: 'sesson_id' }], channelId: 'channel_id' }), + service_type: 'baileys', + }, + { connect_domain: domain }, + ); // todo: 通知前端: 重新扫码 } catch (error) { logger.error({ connectionData, error }, 'error add connection'); @@ -86,8 +95,14 @@ const setupMessageHandler = () => { logger.info(`Setting up event ${eventName}`); whatsappEvents.on(eventName, async messageData => { try { - const x = webhookBodyBuilder(messageData, eventName); - await callWebhook(x); + const m = await getOutboundMessage({ id: messageData.id }); + + const webhookBody = webhookBodyBuilder({ ...messageData, actionId: m.actionId, externalId: m.externalId }, eventName); + const { waiMessage } = webhookBody; + + const upsert = await upsertOutboundMessage(m.sn, { ...waiMessage, msg_status: waiMessage.status }); + // console.log('upsert=========================', upsert); + await callWebhook(webhookBody); } catch (error) { logger.error({ messageData, error }, 'error call webhook'); } diff --git a/wai-server/models/outbound_messages.js b/wai-server/models/outbound_messages.js index d7e13d9..2ce89f8 100644 --- a/wai-server/models/outbound_messages.js +++ b/wai-server/models/outbound_messages.js @@ -32,6 +32,7 @@ module.exports = function(sequelize, DataTypes) { msgtime: { type: DataTypes.DATE, allowNull: true, + defaultValue: Sequelize.Sequelize.literal('CURRENT_TIMESTAMP'), }, id: { type: DataTypes.STRING(100), diff --git a/wai-server/services/connections.service.js b/wai-server/services/connections.service.js index 4ec1196..98ff92b 100644 --- a/wai-server/services/connections.service.js +++ b/wai-server/services/connections.service.js @@ -20,7 +20,7 @@ const addConnection = async data => { const addCurrentConnection = async data => { const [r, createdId] = await ConnectionsModel.findOrCreate({ where: { connect_domain: domain, connect_name: name, sesson_id: data.sesson_id }, - defaults: { ...data, connect_domain: domain, connect_name: name }, + defaults: { ...data, connect_domain: domain, connect_name: name, closetime: null }, }); return r; }; @@ -39,7 +39,7 @@ const updateConnection = async (data, where = {}) => { }; const getConnection = async data => { - const r = await ConnectionsModel.findAll({ where: data }); + const r = await ConnectionsModel.findAll({ where: data, raw: true }); return r; }; diff --git a/wai-server/services/outbound_messages.service.js b/wai-server/services/outbound_messages.service.js index f149e9b..f2f721e 100644 --- a/wai-server/services/outbound_messages.service.js +++ b/wai-server/services/outbound_messages.service.js @@ -2,6 +2,7 @@ const db = require('../config').database; const { domain, name } = require('../config').server; +const { objectMapper, pick } = require('../utils/commons.util'); const initModels = require('../models/init-models'); const Sequelize = db.sequelize; @@ -9,10 +10,25 @@ const models = initModels(Sequelize); const OutboundModelModel = models.outbound_messages; -const upsertOutboundMessage = async data => { - // const r = await OutboundModelModel.create({ ...data }); - const [instance, created] = await OutboundModelModel.upsert({ ...data }, { returning: true }); - return created; +const getOutboundMessage = async msg => { + const r = await OutboundModelModel.findOne({ + where: msg, + }); + return r.toJSON(); }; -module.exports = { upsertOutboundMessage }; +/** + * MySQL - Implemented with ON DUPLICATE KEY UPDATE + */ +const upsertOutboundMessage = async (key, data) => { + const defaultR = { direction: 'outbound' }; + const r1 = pick(data, ['actionId', 'msgtype', 'externalId', 'id']); + const record = objectMapper(data, { from: 'froms', to: 'tos' }, false); + const byType = data.msgtype === 'text' ? { text_body: data.msgcontent.body, text_preview_url: data.msgcontent.preview_url } : {}; + const toUpsert = { ...defaultR, ...r1, ...record, ...byType, sn: key, message_origin: JSON.stringify(data) }; + const [instance, created] = await OutboundModelModel.upsert({ ...toUpsert }, { returning: true }); + console.info('upsertOutboundMessage', { instance, created }); + return instance.toJSON(); +}; + +module.exports = { getOutboundMessage, upsertOutboundMessage };