保存消息, webhook

dev/supplier-email-drawer
Lei OT 6 months ago
parent 5bd8122c50
commit 891db24af7

@ -1,4 +1,4 @@
const { sessionService } = require('../../core'); // Import from core/index.js
const { sessionStore } = require('../../core'); // Import from core/index.js
const { createWhatsApp } = require('../../core/baileys'); // Import from core/index.js
const { getConnection } = require('../../services/connections.service');
const { objectMapper, isEmpty } = require('../../utils/commons.util');
@ -12,7 +12,7 @@ exports.newConnect = async ctx => {
if (!isEmpty(findSession)) {
const { sesson_id: sessionId, status, ...connection } = findSession[0];
// if (['open', 'connecting'].includes(status)) {
// const sock = sessionService.getSession(sessionId);
// const sock = sessionStore.getSession(sessionId);
// if (!isEmpty(sock)) {
// return sock;
// }
@ -20,11 +20,11 @@ exports.newConnect = async ctx => {
// return { qr: '', phone, sessionId, ...connection }; // 返回已登录
}
const whatsApp1 = await createWhatsApp(phone);
const existsSession = sessionService.getSession(phone);
const existsSession = sessionStore.getSession(phone);
let qr;
// if (isEmpty(existsSession)) {
qr = await whatsApp1.start();
const { sessionId } = sessionService.createSession(phone, whatsApp1);
const { sessionId } = sessionStore.createSession(phone, whatsApp1);
// }
ctx.assert(whatsApp1, 503, 'No available connections');
return { qr, phone, sessionId: phone, domainName };
@ -38,3 +38,7 @@ exports.getAll = async () => {
const findConnection = await getConnection({});
return findConnection;
};
exports.getSessions = async ctx => {
return Array.from(sessionStore.sessions);
};

@ -13,7 +13,8 @@ module.exports = Router => {
// .get('/qrcode0', callController.getIn)
// .get('/qrcode', controller.qrcode)
// .get('/:id', controller.getOne)
.get('/', controller.getAll);
.get('/', controller.getAll)
.get('/sessions', controller.getSessions);
// .post('/test', controller.testSend);
// .post('/', controller.createOne)
// router.post('/get-in', callController.getIn);

@ -1,7 +1,8 @@
'use strict';
const generateId = require('../../utils/generateId.util');
const { sessionService } = require('../../core');
const { sessionStore } = require('../../core');
const { objectMapper, pick } = require('../../utils/commons.util');
const { upsertOutboundMessage } = require('../../services/outbound_messages.service');
function sleep(ms) {
@ -15,20 +16,31 @@ exports.sendText = async ctx => {
ctx.assert(from, 400, 'From and message are required');
return;
}
const wsToSend = sessionService.getSession(from);
const wsToSend = sessionStore.getSession(from);
// console.log('find wsToSend', wsToSend)
if (!wsToSend) {
ctx.assert(wsToSend, 400, 'Session not found'); // 404
return;
}
// return wsToSend;
try {
wsToSend.sendTextMessage(to, content, actionId).then(sockMsg => {
const { key } = sockMsg;
upsertOutboundMessage(null, { ...ctx.request.body, id: key.id || '' });
return wsToSend.sendTextMessage(to, content).then(({ messageId }) => {
// const messageId = generateId();
const _data = ctx.request.body;
const defaultR = { direction: 'outbound' };
const r1 = pick(_data, ['actionId', 'msgtype', 'externalId']);
r1.id = messageId;
r1.wamid = messageId;
r1.msg_status = 'accepted';
r1.createTime = Date.now();
const record = objectMapper(_data, { from: 'froms', to: 'tos' }, false);
const byType = _data.msgtype === 'text' ? { text_body: _data.msgcontent.body, text_preview_url: _data.msgcontent.preview_url } : {};
const toUpsert = { ...defaultR, ...r1, ...record, ...byType, message_origin: JSON.stringify(_data) };
upsertOutboundMessage(null, { ...toUpsert });
return 'Message sent successfully'; // { wsToSend, ret: 'Message sent successfully' };
});
// const sockMsg = await wsToSend.sendTextMessage(to, content, actionId);
return 'Message sent successfully'; // { wsToSend, ret: 'Message sent successfully' };
// return 'Message sent successfully'; // { wsToSend, ret: 'Message sent successfully' };
} catch (error) {
console.error('Error sending message:', error);
ctx.assert(null, 500, 'Failed to send message');

@ -18,8 +18,12 @@ module.exports = () => {
return null;
};
const removeSession = ws => {
sessions.delete(ws);
const removeSession = wsChannelId => {
for (const [ws, storedSessionId] of sessions) {
if (ws.channelId === wsChannelId) {
sessions.delete(ws);
}
}
};
return {

@ -3,11 +3,11 @@ const { domain, name: domainName } = require('../../config').server;
const whatsappEvents = require('../emitter');
const { callWebhook } = require('../webhook');
const { addConnection, updateConnection, addCurrentConnection, resetConnection } = require('../../services/connections.service');
const { objectMapper } = require('../../utils/commons.util');
const { sessionService } = require('..');
const { objectMapper, pick } = require('../../utils/commons.util');
const { sessionStore } = require('..');
const { getOutboundMessage, upsertOutboundMessage } = require('../../services/outbound_messages.service');
const logger = console;
const logger = require('../../utils/logger.util');
const connectionEventNames = ['connection:connect', 'connection:open', 'connection:close'];
const messageEventNames = ['message:received', 'message:updated'];
@ -16,7 +16,8 @@ const eventTypeMapped = {
'message:received': 'wai.message.received',
'message:updated': 'wai.message.updated',
};
const timeField = { sent: 'sendTime', delivered: 'deliverTime', read: 'readTime' };
const timeField = { pending: 'createTime', sent: 'sendTime', delivered: 'deliverTime', read: 'readTime' };
const statusMapped = { pending: 'accepted', sent: 'sent', delivered: 'delivered', read: 'read' }
const directionField = { 'message:received': 'inbound', 'message:updated': 'outbound' };
/**
@ -30,11 +31,13 @@ const webhookBodyBuilder = (messageData, messageType) => {
webhooksource: 'wai',
createTime: new Date(new Date().getTime() + 8 * 60 * 60 * 1000).toISOString(), // GMT +8
domainName,
conversationid: messageData?.externalId || '',
waiMessage: {
...messageData,
...(messageData.updateTime ? { [timeField[messageData.status]]: messageData.updateTime } : {}),
wamid: messageData.id,
direction: directionField[messageType],
// direction: directionField[messageType],
status: statusMapped?.[messageData.status] || '',
// externalId: '-1', // todo:
externalId: `-${messageData.externalId || 1}`, // debug: 测试: 是负值
},
@ -65,8 +68,9 @@ const setupConnectionHandler = () => {
{
...objectMapper(connectionData, { whatsAppNo: [{ key: 'wa_id' }, { key: 'sesson_id' }], channelId: 'channel_id' }),
service_type: 'baileys',
closetime: null,
},
{ connect_domain: domain },
{ connect_domain: domain, connect_name: domainName },
);
} catch (error) {
logger.error({ connectionData, error }, 'error add connection');
@ -75,7 +79,7 @@ const setupConnectionHandler = () => {
logger.info(`Setting up event ${'connection:close'}`);
whatsappEvents.on('connection:close', async connectionData => {
try {
sessionService.removeSession(connectionData.sesson_id);
sessionStore.removeSession(connectionData.channelId);
await updateConnection(
{
...objectMapper(connectionData, { whatsAppNo: [{ key: 'wa_id' }, { key: 'sesson_id' }], channelId: 'channel_id' }),
@ -95,13 +99,29 @@ const setupMessageHandler = () => {
messageEventNames.forEach(eventName => {
logger.info(`Setting up event ${eventName}`);
whatsappEvents.on(eventName, async messageData => {
if (messageData.status === 'pending') {
logger.info('message pending', messageData);
return false;
}
try {
const now = new Date(new Date().getTime() + 60 * 60 * 1000).toISOString();
const m = await getOutboundMessage({ id: messageData.id });
const bixFields = pick(m, ['actionId', 'externalId', 'message_origin']);
logger.info('message evt NOT pending \n', m, bixFields);
const webhookBody = webhookBodyBuilder({ actionId: m.actionId || '', externalId: m.externalId || '', ...messageData }, eventName);
const webhookBody = webhookBodyBuilder({ ...messageData, ...bixFields, savedMsg: m }, eventName);
const { waiMessage } = webhookBody;
const upsert = await upsertOutboundMessage(m.sn || null, { ...waiMessage, msg_status: waiMessage.status });
const timeFields = pick(waiMessage, [...Object.values(timeField), 'createTime', 'updateTime']);
// timeFields.msgtime = m?.msgtime || now;
// timeFields.createTime = messageData?.createTime || m?.createTime || now;
const upsertFields = pick(waiMessage, ['direction', 'wamid', 'id', 'status']);
upsertFields.evt_id = webhookBody.id;
const pusher = { customerProfile_id: waiMessage.customerProfile?.id || '', customerProfile_name: waiMessage.customerProfile?.name || '' };
const record = objectMapper(waiMessage, { from: 'froms', to: 'tos', status: 'msg_status', type: 'msgtype' }, false);
const contentFields = waiMessage.type === 'text' ? { text_body: waiMessage.text.body } : {};
await upsertOutboundMessage(m.sn || null, { ...timeFields, ...upsertFields, ...pusher, ...contentFields, ...record, message_origin: bixFields?.message_origin || JSON.stringify(messageData) });
// console.log('upsert=========================', upsert);
await callWebhook(webhookBody);
} catch (error) {

@ -1,11 +1,11 @@
// core/index.js
const sessionServicesI = require('./handler/sessionStore');
const sessionStoreI = require('./handler/sessionStore');
// Create the instances here
const sessionService = sessionServicesI();
const sessionStore = sessionStoreI();
const createWhatsApp = require('./baileys/index');
module.exports = {
sessionService,
sessionStore,
createWhatsApp,
};

@ -30,7 +30,7 @@ async function callWebhook(messageData) {
await axios.post(webhookUrl, messageData);
logger.info(JSON.stringify({ webhookUrl: webhookUrl, messageData }, undefined, 2), 'Webhook called successfully');
} catch (error) {
logger.error({ webhookUrl: webhookUrl, messageData, error: error.message }, 'Error calling webhook');
logger.error(JSON.stringify({ webhookUrl: webhookUrl, messageData, error: error.message }, undefined, 2), 'Error calling webhook');
}
}

@ -78,7 +78,6 @@ module.exports = function(sequelize, DataTypes) {
createTime: {
type: DataTypes.DATE,
allowNull: true,
defaultValue: Sequelize.Sequelize.literal('CURRENT_TIMESTAMP'),
},
updateTime: {
type: DataTypes.DATE,

@ -29,6 +29,7 @@ if (isDevelopment) {
* Pass to our server instance middlewares
*/
server
.use(logger)
.use(multer.any())
.use(bodyParser)
.use(helmet)

@ -14,20 +14,29 @@ const getOutboundMessage = async msg => {
const r = await OutboundModelModel.findOne({
where: msg,
});
return r;
return r || {};
};
/**
* MySQL - Implemented with ON DUPLICATE KEY UPDATE
*/
const upsertOutboundMessage = async (key, data) => {
const defaultR = { direction: 'outbound' };
const r1 = pick(data, ['actionId', 'msgtype', 'externalId', 'id']);
const record = objectMapper(data, { from: 'froms', to: 'tos' }, false);
const byType = data.msgtype === 'text' ? { text_body: data.msgcontent.body, text_preview_url: data.msgcontent.preview_url } : {};
const toUpsert = { ...defaultR, ...r1, ...record, ...byType, sn: key, message_origin: JSON.stringify(data) };
const [instance, created] = await OutboundModelModel.upsert({ ...toUpsert }, { returning: true });
console.info('upsertOutboundMessage', { instance, created });
// let instance, created;
// if (key) {
// instance = await OutboundModelModel.create(data);
// created = true;
// } else {
// const [rows] = await OutboundModelModel.update(data, { where: { sn: key } });
// }
// console.log(rows);
const [instance, created] = await OutboundModelModel.findOrCreate({ where: { sn: key }, defaults: { ...data, sn: key } });
if (!created) {
await instance.update({ ...data, sn: key });
const savedI = await instance.save(); // reload
console.info('update OutboundMessage --- 2\n', savedI.toJSON());
return savedI.toJSON();
}
console.info('insert OutboundMessage\n', instance.toJSON(), created);
return instance.toJSON();
};

@ -0,0 +1,4 @@
const log4js = require('log4js');
const logger = log4js.getLogger();
module.exports = logger;
Loading…
Cancel
Save