diff --git a/wai-server/api/channels/channel.controller.js b/wai-server/api/channels/channel.controller.js index 71d0967..2a75cd8 100644 --- a/wai-server/api/channels/channel.controller.js +++ b/wai-server/api/channels/channel.controller.js @@ -1,4 +1,4 @@ -const { sessionService } = require('../../core'); // Import from core/index.js +const { sessionStore } = require('../../core'); // Import from core/index.js const { createWhatsApp } = require('../../core/baileys'); // Import from core/index.js const { getConnection } = require('../../services/connections.service'); const { objectMapper, isEmpty } = require('../../utils/commons.util'); @@ -12,7 +12,7 @@ exports.newConnect = async ctx => { if (!isEmpty(findSession)) { const { sesson_id: sessionId, status, ...connection } = findSession[0]; // if (['open', 'connecting'].includes(status)) { - // const sock = sessionService.getSession(sessionId); + // const sock = sessionStore.getSession(sessionId); // if (!isEmpty(sock)) { // return sock; // } @@ -20,11 +20,11 @@ exports.newConnect = async ctx => { // return { qr: '', phone, sessionId, ...connection }; // 返回已登录 } const whatsApp1 = await createWhatsApp(phone); - const existsSession = sessionService.getSession(phone); + const existsSession = sessionStore.getSession(phone); let qr; // if (isEmpty(existsSession)) { qr = await whatsApp1.start(); - const { sessionId } = sessionService.createSession(phone, whatsApp1); + const { sessionId } = sessionStore.createSession(phone, whatsApp1); // } ctx.assert(whatsApp1, 503, 'No available connections'); return { qr, phone, sessionId: phone, domainName }; @@ -38,3 +38,7 @@ exports.getAll = async () => { const findConnection = await getConnection({}); return findConnection; }; + +exports.getSessions = async ctx => { + return Array.from(sessionStore.sessions); +}; diff --git a/wai-server/api/channels/channel.routes.js b/wai-server/api/channels/channel.routes.js index a6c6c11..1597164 100644 --- a/wai-server/api/channels/channel.routes.js +++ b/wai-server/api/channels/channel.routes.js @@ -13,7 +13,8 @@ module.exports = Router => { // .get('/qrcode0', callController.getIn) // .get('/qrcode', controller.qrcode) // .get('/:id', controller.getOne) - .get('/', controller.getAll); + .get('/', controller.getAll) + .get('/sessions', controller.getSessions); // .post('/test', controller.testSend); // .post('/', controller.createOne) // router.post('/get-in', callController.getIn); diff --git a/wai-server/api/messages/message.controller.js b/wai-server/api/messages/message.controller.js index 3408173..f80d686 100644 --- a/wai-server/api/messages/message.controller.js +++ b/wai-server/api/messages/message.controller.js @@ -1,7 +1,8 @@ 'use strict'; const generateId = require('../../utils/generateId.util'); -const { sessionService } = require('../../core'); +const { sessionStore } = require('../../core'); +const { objectMapper, pick } = require('../../utils/commons.util'); const { upsertOutboundMessage } = require('../../services/outbound_messages.service'); function sleep(ms) { @@ -15,20 +16,31 @@ exports.sendText = async ctx => { ctx.assert(from, 400, 'From and message are required'); return; } - const wsToSend = sessionService.getSession(from); + const wsToSend = sessionStore.getSession(from); // console.log('find wsToSend', wsToSend) if (!wsToSend) { ctx.assert(wsToSend, 400, 'Session not found'); // 404 return; } - // return wsToSend; try { - wsToSend.sendTextMessage(to, content, actionId).then(sockMsg => { - const { key } = sockMsg; - upsertOutboundMessage(null, { ...ctx.request.body, id: key.id || '' }); + return wsToSend.sendTextMessage(to, content).then(({ messageId }) => { + // const messageId = generateId(); + + const _data = ctx.request.body; + const defaultR = { direction: 'outbound' }; + const r1 = pick(_data, ['actionId', 'msgtype', 'externalId']); + r1.id = messageId; + r1.wamid = messageId; + r1.msg_status = 'accepted'; + r1.createTime = Date.now(); + const record = objectMapper(_data, { from: 'froms', to: 'tos' }, false); + const byType = _data.msgtype === 'text' ? { text_body: _data.msgcontent.body, text_preview_url: _data.msgcontent.preview_url } : {}; + const toUpsert = { ...defaultR, ...r1, ...record, ...byType, message_origin: JSON.stringify(_data) }; + upsertOutboundMessage(null, { ...toUpsert }); + return 'Message sent successfully'; // { wsToSend, ret: 'Message sent successfully' }; }); // const sockMsg = await wsToSend.sendTextMessage(to, content, actionId); - return 'Message sent successfully'; // { wsToSend, ret: 'Message sent successfully' }; + // return 'Message sent successfully'; // { wsToSend, ret: 'Message sent successfully' }; } catch (error) { console.error('Error sending message:', error); ctx.assert(null, 500, 'Failed to send message'); diff --git a/wai-server/core/handler/sessionStore.js b/wai-server/core/handler/sessionStore.js index bbcf06d..7a33944 100644 --- a/wai-server/core/handler/sessionStore.js +++ b/wai-server/core/handler/sessionStore.js @@ -18,8 +18,12 @@ module.exports = () => { return null; }; - const removeSession = ws => { - sessions.delete(ws); + const removeSession = wsChannelId => { + for (const [ws, storedSessionId] of sessions) { + if (ws.channelId === wsChannelId) { + sessions.delete(ws); + } + } }; return { diff --git a/wai-server/core/handler/whatsappHandler.js b/wai-server/core/handler/whatsappHandler.js index ab45d24..ea6c209 100644 --- a/wai-server/core/handler/whatsappHandler.js +++ b/wai-server/core/handler/whatsappHandler.js @@ -3,11 +3,11 @@ const { domain, name: domainName } = require('../../config').server; const whatsappEvents = require('../emitter'); const { callWebhook } = require('../webhook'); const { addConnection, updateConnection, addCurrentConnection, resetConnection } = require('../../services/connections.service'); -const { objectMapper } = require('../../utils/commons.util'); -const { sessionService } = require('..'); +const { objectMapper, pick } = require('../../utils/commons.util'); +const { sessionStore } = require('..'); const { getOutboundMessage, upsertOutboundMessage } = require('../../services/outbound_messages.service'); -const logger = console; +const logger = require('../../utils/logger.util'); const connectionEventNames = ['connection:connect', 'connection:open', 'connection:close']; const messageEventNames = ['message:received', 'message:updated']; @@ -16,7 +16,8 @@ const eventTypeMapped = { 'message:received': 'wai.message.received', 'message:updated': 'wai.message.updated', }; -const timeField = { sent: 'sendTime', delivered: 'deliverTime', read: 'readTime' }; +const timeField = { pending: 'createTime', sent: 'sendTime', delivered: 'deliverTime', read: 'readTime' }; +const statusMapped = { pending: 'accepted', sent: 'sent', delivered: 'delivered', read: 'read' } const directionField = { 'message:received': 'inbound', 'message:updated': 'outbound' }; /** @@ -30,11 +31,13 @@ const webhookBodyBuilder = (messageData, messageType) => { webhooksource: 'wai', createTime: new Date(new Date().getTime() + 8 * 60 * 60 * 1000).toISOString(), // GMT +8 domainName, + conversationid: messageData?.externalId || '', waiMessage: { ...messageData, ...(messageData.updateTime ? { [timeField[messageData.status]]: messageData.updateTime } : {}), wamid: messageData.id, - direction: directionField[messageType], + // direction: directionField[messageType], + status: statusMapped?.[messageData.status] || '', // externalId: '-1', // todo: externalId: `-${messageData.externalId || 1}`, // debug: 测试: 是负值 }, @@ -65,8 +68,9 @@ const setupConnectionHandler = () => { { ...objectMapper(connectionData, { whatsAppNo: [{ key: 'wa_id' }, { key: 'sesson_id' }], channelId: 'channel_id' }), service_type: 'baileys', + closetime: null, }, - { connect_domain: domain }, + { connect_domain: domain, connect_name: domainName }, ); } catch (error) { logger.error({ connectionData, error }, 'error add connection'); @@ -75,7 +79,7 @@ const setupConnectionHandler = () => { logger.info(`Setting up event ${'connection:close'}`); whatsappEvents.on('connection:close', async connectionData => { try { - sessionService.removeSession(connectionData.sesson_id); + sessionStore.removeSession(connectionData.channelId); await updateConnection( { ...objectMapper(connectionData, { whatsAppNo: [{ key: 'wa_id' }, { key: 'sesson_id' }], channelId: 'channel_id' }), @@ -95,13 +99,29 @@ const setupMessageHandler = () => { messageEventNames.forEach(eventName => { logger.info(`Setting up event ${eventName}`); whatsappEvents.on(eventName, async messageData => { + if (messageData.status === 'pending') { + logger.info('message pending', messageData); + return false; + } try { + const now = new Date(new Date().getTime() + 60 * 60 * 1000).toISOString(); + const m = await getOutboundMessage({ id: messageData.id }); + const bixFields = pick(m, ['actionId', 'externalId', 'message_origin']); + logger.info('message evt NOT pending \n', m, bixFields); - const webhookBody = webhookBodyBuilder({ actionId: m.actionId || '', externalId: m.externalId || '', ...messageData }, eventName); + const webhookBody = webhookBodyBuilder({ ...messageData, ...bixFields, savedMsg: m }, eventName); const { waiMessage } = webhookBody; - const upsert = await upsertOutboundMessage(m.sn || null, { ...waiMessage, msg_status: waiMessage.status }); + const timeFields = pick(waiMessage, [...Object.values(timeField), 'createTime', 'updateTime']); + // timeFields.msgtime = m?.msgtime || now; + // timeFields.createTime = messageData?.createTime || m?.createTime || now; + const upsertFields = pick(waiMessage, ['direction', 'wamid', 'id', 'status']); + upsertFields.evt_id = webhookBody.id; + const pusher = { customerProfile_id: waiMessage.customerProfile?.id || '', customerProfile_name: waiMessage.customerProfile?.name || '' }; + const record = objectMapper(waiMessage, { from: 'froms', to: 'tos', status: 'msg_status', type: 'msgtype' }, false); + const contentFields = waiMessage.type === 'text' ? { text_body: waiMessage.text.body } : {}; + await upsertOutboundMessage(m.sn || null, { ...timeFields, ...upsertFields, ...pusher, ...contentFields, ...record, message_origin: bixFields?.message_origin || JSON.stringify(messageData) }); // console.log('upsert=========================', upsert); await callWebhook(webhookBody); } catch (error) { diff --git a/wai-server/core/index.js b/wai-server/core/index.js index 0b05c41..654be6f 100644 --- a/wai-server/core/index.js +++ b/wai-server/core/index.js @@ -1,11 +1,11 @@ // core/index.js -const sessionServicesI = require('./handler/sessionStore'); +const sessionStoreI = require('./handler/sessionStore'); // Create the instances here -const sessionService = sessionServicesI(); +const sessionStore = sessionStoreI(); const createWhatsApp = require('./baileys/index'); module.exports = { - sessionService, + sessionStore, createWhatsApp, }; diff --git a/wai-server/core/webhook/index.js b/wai-server/core/webhook/index.js index 63c349c..96ab20e 100644 --- a/wai-server/core/webhook/index.js +++ b/wai-server/core/webhook/index.js @@ -30,7 +30,7 @@ async function callWebhook(messageData) { await axios.post(webhookUrl, messageData); logger.info(JSON.stringify({ webhookUrl: webhookUrl, messageData }, undefined, 2), 'Webhook called successfully'); } catch (error) { - logger.error({ webhookUrl: webhookUrl, messageData, error: error.message }, 'Error calling webhook'); + logger.error(JSON.stringify({ webhookUrl: webhookUrl, messageData, error: error.message }, undefined, 2), 'Error calling webhook'); } } diff --git a/wai-server/models/outbound_messages.js b/wai-server/models/outbound_messages.js index 2ce89f8..f4d3cf7 100644 --- a/wai-server/models/outbound_messages.js +++ b/wai-server/models/outbound_messages.js @@ -78,7 +78,6 @@ module.exports = function(sequelize, DataTypes) { createTime: { type: DataTypes.DATE, allowNull: true, - defaultValue: Sequelize.Sequelize.literal('CURRENT_TIMESTAMP'), }, updateTime: { type: DataTypes.DATE, diff --git a/wai-server/server.js b/wai-server/server.js index 8e3f3a5..7f060b5 100644 --- a/wai-server/server.js +++ b/wai-server/server.js @@ -29,6 +29,7 @@ if (isDevelopment) { * Pass to our server instance middlewares */ server + .use(logger) .use(multer.any()) .use(bodyParser) .use(helmet) diff --git a/wai-server/services/outbound_messages.service.js b/wai-server/services/outbound_messages.service.js index 469a896..923576f 100644 --- a/wai-server/services/outbound_messages.service.js +++ b/wai-server/services/outbound_messages.service.js @@ -14,20 +14,29 @@ const getOutboundMessage = async msg => { const r = await OutboundModelModel.findOne({ where: msg, }); - return r; + return r || {}; }; /** * MySQL - Implemented with ON DUPLICATE KEY UPDATE */ const upsertOutboundMessage = async (key, data) => { - const defaultR = { direction: 'outbound' }; - const r1 = pick(data, ['actionId', 'msgtype', 'externalId', 'id']); - const record = objectMapper(data, { from: 'froms', to: 'tos' }, false); - const byType = data.msgtype === 'text' ? { text_body: data.msgcontent.body, text_preview_url: data.msgcontent.preview_url } : {}; - const toUpsert = { ...defaultR, ...r1, ...record, ...byType, sn: key, message_origin: JSON.stringify(data) }; - const [instance, created] = await OutboundModelModel.upsert({ ...toUpsert }, { returning: true }); - console.info('upsertOutboundMessage', { instance, created }); + // let instance, created; + // if (key) { + // instance = await OutboundModelModel.create(data); + // created = true; + // } else { + // const [rows] = await OutboundModelModel.update(data, { where: { sn: key } }); + // } + // console.log(rows); + const [instance, created] = await OutboundModelModel.findOrCreate({ where: { sn: key }, defaults: { ...data, sn: key } }); + if (!created) { + await instance.update({ ...data, sn: key }); + const savedI = await instance.save(); // reload + console.info('update OutboundMessage --- 2\n', savedI.toJSON()); + return savedI.toJSON(); + } + console.info('insert OutboundMessage\n', instance.toJSON(), created); return instance.toJSON(); }; diff --git a/wai-server/utils/logger.util.js b/wai-server/utils/logger.util.js new file mode 100644 index 0000000..faabae6 --- /dev/null +++ b/wai-server/utils/logger.util.js @@ -0,0 +1,4 @@ +const log4js = require('log4js'); + +const logger = log4js.getLogger(); +module.exports = logger;