发送消息, webhook

dev/supplier-email-drawer
Lei OT 9 months ago
parent 03127e3910
commit 25d52e75e7

@ -2,22 +2,23 @@ const { sessionService } = require('../../core'); // Import from core/index.js
const { createWhatsApp } = require('../../core/baileys'); // Import from core/index.js const { createWhatsApp } = require('../../core/baileys'); // Import from core/index.js
const { getConnection } = require('../../services/connections.service'); const { getConnection } = require('../../services/connections.service');
const { objectMapper, isEmpty } = require('../../utils/commons.util'); const { objectMapper, isEmpty } = require('../../utils/commons.util');
const { domain } = require('../../config').server; const { domain, name: domainName } = require('../../config').server;
exports.newConnect = async ctx => { exports.newConnect = async ctx => {
try { try {
const { phone } = ctx.query; const { phone } = ctx.query;
// const findSession = await getConnection({ sesson_id: phone, connect_domain: domain }); const findSession = await getConnection({ sesson_id: phone, status: ['open', 'connecting'] }); // connect_domain: domain
// // todo: 只有一条 // // todo: 只有一条
// if (!isEmpty(findSession)) { if (!isEmpty(findSession)) {
// const { sesson_id: sessonId, status } = findSession[0]; const { sesson_id: sessionId, status, ...connection } = findSession[0];
// if (['open', 'connecting'].includes(status)) { // if (['open', 'connecting'].includes(status)) {
// const sock = sessionService.getSession(sessonId); // const sock = sessionService.getSession(sessionId);
// if (!isEmpty(sock)) { // if (!isEmpty(sock)) {
// return sock; // return sock;
// } // }
// } // }
// } // return { qr: '', phone, sessionId, ...connection }; // 返回已登录
}
const whatsApp1 = await createWhatsApp(phone); const whatsApp1 = await createWhatsApp(phone);
const existsSession = sessionService.getSession(phone); const existsSession = sessionService.getSession(phone);
let qr; let qr;
@ -26,7 +27,7 @@ exports.newConnect = async ctx => {
const { sessionId } = sessionService.createSession(phone, whatsApp1); const { sessionId } = sessionService.createSession(phone, whatsApp1);
// } // }
ctx.assert(whatsApp1, 503, 'No available connections'); ctx.assert(whatsApp1, 503, 'No available connections');
return { qr, phone, sessionId: phone }; return { qr, phone, sessionId: phone, domainName };
} catch (error) { } catch (error) {
console.error('create connection error', error); console.error('create connection error', error);
ctx.assert(null, 500, 'Failed to create connection or generate QR code.'); ctx.assert(null, 500, 'Failed to create connection or generate QR code.');
@ -36,6 +37,4 @@ exports.newConnect = async ctx => {
exports.getAll = async () => { exports.getAll = async () => {
const findConnection = await getConnection({}); const findConnection = await getConnection({});
return findConnection; return findConnection;
// const sessions = sessionService.sessions;
// return Array.from(sessions);
}; };

@ -2,7 +2,7 @@
const generateId = require('../../utils/generateId.util'); const generateId = require('../../utils/generateId.util');
const { sessionService } = require('../../core'); const { sessionService } = require('../../core');
// const { upsertOutboundMessage } = require('../../services/outbound_messages.service'); const { upsertOutboundMessage } = require('../../services/outbound_messages.service');
function sleep(ms) { function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms)); return new Promise(resolve => setTimeout(resolve, ms));
@ -23,8 +23,9 @@ exports.sendText = async ctx => {
} }
// return wsToSend; // return wsToSend;
try { try {
wsToSend.sendTextMessage(to, content); const sockMsg = wsToSend.sendTextMessage(to, content);
// await upsertOutboundMessage(); console.log(JSON.stringify(sockMsg, undefined, 2));
await upsertOutboundMessage(null, { ...ctx.request.body, id: sockMsg.key });
return 'Message sent successfully'; // { wsToSend, ret: 'Message sent successfully' }; return 'Message sent successfully'; // { wsToSend, ret: 'Message sent successfully' };
} catch (error) { } catch (error) {
console.error('Error sending message:', error); console.error('Error sending message:', error);

@ -1,10 +1,11 @@
const generateId = require('../../utils/generateId.util'); const generateId = require('../../utils/generateId.util');
const { domain } = require('../../config').server; const { domain, name: domainName } = require('../../config').server;
const whatsappEvents = require('../emitter'); const whatsappEvents = require('../emitter');
const { callWebhook } = require('../webhook'); const { callWebhook } = require('../webhook');
const { addConnection, updateConnection, addCurrentConnection, resetConnection } = require('../../services/connections.service'); const { addConnection, updateConnection, addCurrentConnection, resetConnection } = require('../../services/connections.service');
const { objectMapper } = require('../../utils/commons.util'); const { objectMapper } = require('../../utils/commons.util');
const { sessionService } = require('..'); const { sessionService } = require('..');
const { getOutboundMessage, upsertOutboundMessage } = require('../../services/outbound_messages.service');
const logger = console; const logger = console;
@ -17,6 +18,10 @@ const eventTypeMapped = {
}; };
const timeField = { sent: 'sendTime', delivered: 'deliverTime', read: 'readTime' }; const timeField = { sent: 'sendTime', delivered: 'deliverTime', read: 'readTime' };
const directionField = { 'message:received': 'inbound', 'message:updated': 'outbound' }; const directionField = { 'message:received': 'inbound', 'message:updated': 'outbound' };
/**
* @returns {Object} webhookBody
*/
const webhookBodyBuilder = (messageData, messageType) => { const webhookBodyBuilder = (messageData, messageType) => {
const message = { const message = {
id: `evt_${generateId().replace(/-/g, '')}`, id: `evt_${generateId().replace(/-/g, '')}`,
@ -24,12 +29,13 @@ const webhookBodyBuilder = (messageData, messageType) => {
apiVersion: 'v2', apiVersion: 'v2',
webhooksource: 'wai', webhooksource: 'wai',
createTime: new Date(new Date().getTime() + 8 * 60 * 60 * 1000).toISOString(), // GMT +8 createTime: new Date(new Date().getTime() + 8 * 60 * 60 * 1000).toISOString(), // GMT +8
domainName,
waiMessage: { waiMessage: {
...messageData, ...messageData,
...(messageData.updateTime ? { [timeField[messageData.status]]: messageData.updateTime } : {}), ...(messageData.updateTime ? { [timeField[messageData.status]]: messageData.updateTime } : {}),
wamid: messageData.id, wamid: messageData.id,
direction: directionField[messageType], direction: directionField[messageType],
externalId: '-1', // todo: // externalId: '-1', // todo:
}, },
}; };
return message; return message;
@ -42,7 +48,7 @@ const setupConnectionHandler = () => {
try { try {
// find Or create // find Or create
await addCurrentConnection({ await addCurrentConnection({
...objectMapper(connectionData, { phone: [{ key: 'wa_id' }, { key: 'sesson_id' }], channelId: 'channel_id', createTimestamp: 'createtime' }), ...objectMapper(connectionData, { phone: [{ key: 'wa_id' }, { key: 'sesson_id' }], channelId: 'channel_id', createTimestamp: 'createtime' }, false),
service_type: 'baileys', service_type: 'baileys',
status: 'connecting', status: 'connecting',
}); });
@ -69,10 +75,13 @@ const setupConnectionHandler = () => {
whatsappEvents.on('connection:close', async connectionData => { whatsappEvents.on('connection:close', async connectionData => {
try { try {
sessionService.removeSession(connectionData.sesson_id); sessionService.removeSession(connectionData.sesson_id);
await updateConnection({ await updateConnection(
...objectMapper(connectionData, { whatsAppNo: [{ key: 'wa_id' }, { key: 'sesson_id' }], channelId: 'channel_id' }), {
service_type: 'baileys', ...objectMapper(connectionData, { whatsAppNo: [{ key: 'wa_id' }, { key: 'sesson_id' }], channelId: 'channel_id' }),
}); service_type: 'baileys',
},
{ connect_domain: domain },
);
// todo: 通知前端: 重新扫码 // todo: 通知前端: 重新扫码
} catch (error) { } catch (error) {
logger.error({ connectionData, error }, 'error add connection'); logger.error({ connectionData, error }, 'error add connection');
@ -86,8 +95,14 @@ const setupMessageHandler = () => {
logger.info(`Setting up event ${eventName}`); logger.info(`Setting up event ${eventName}`);
whatsappEvents.on(eventName, async messageData => { whatsappEvents.on(eventName, async messageData => {
try { try {
const x = webhookBodyBuilder(messageData, eventName); const m = await getOutboundMessage({ id: messageData.id });
await callWebhook(x);
const webhookBody = webhookBodyBuilder({ ...messageData, actionId: m.actionId, externalId: m.externalId }, eventName);
const { waiMessage } = webhookBody;
const upsert = await upsertOutboundMessage(m.sn, { ...waiMessage, msg_status: waiMessage.status });
// console.log('upsert=========================', upsert);
await callWebhook(webhookBody);
} catch (error) { } catch (error) {
logger.error({ messageData, error }, 'error call webhook'); logger.error({ messageData, error }, 'error call webhook');
} }

@ -32,6 +32,7 @@ module.exports = function(sequelize, DataTypes) {
msgtime: { msgtime: {
type: DataTypes.DATE, type: DataTypes.DATE,
allowNull: true, allowNull: true,
defaultValue: Sequelize.Sequelize.literal('CURRENT_TIMESTAMP'),
}, },
id: { id: {
type: DataTypes.STRING(100), type: DataTypes.STRING(100),

@ -20,7 +20,7 @@ const addConnection = async data => {
const addCurrentConnection = async data => { const addCurrentConnection = async data => {
const [r, createdId] = await ConnectionsModel.findOrCreate({ const [r, createdId] = await ConnectionsModel.findOrCreate({
where: { connect_domain: domain, connect_name: name, sesson_id: data.sesson_id }, where: { connect_domain: domain, connect_name: name, sesson_id: data.sesson_id },
defaults: { ...data, connect_domain: domain, connect_name: name }, defaults: { ...data, connect_domain: domain, connect_name: name, closetime: null },
}); });
return r; return r;
}; };
@ -39,7 +39,7 @@ const updateConnection = async (data, where = {}) => {
}; };
const getConnection = async data => { const getConnection = async data => {
const r = await ConnectionsModel.findAll({ where: data }); const r = await ConnectionsModel.findAll({ where: data, raw: true });
return r; return r;
}; };

@ -2,6 +2,7 @@
const db = require('../config').database; const db = require('../config').database;
const { domain, name } = require('../config').server; const { domain, name } = require('../config').server;
const { objectMapper, pick } = require('../utils/commons.util');
const initModels = require('../models/init-models'); const initModels = require('../models/init-models');
const Sequelize = db.sequelize; const Sequelize = db.sequelize;
@ -9,10 +10,25 @@ const models = initModels(Sequelize);
const OutboundModelModel = models.outbound_messages; const OutboundModelModel = models.outbound_messages;
const upsertOutboundMessage = async data => { const getOutboundMessage = async msg => {
// const r = await OutboundModelModel.create({ ...data }); const r = await OutboundModelModel.findOne({
const [instance, created] = await OutboundModelModel.upsert({ ...data }, { returning: true }); where: msg,
return created; });
return r.toJSON();
}; };
module.exports = { upsertOutboundMessage }; /**
* MySQL - Implemented with ON DUPLICATE KEY UPDATE
*/
const upsertOutboundMessage = async (key, data) => {
const defaultR = { direction: 'outbound' };
const r1 = pick(data, ['actionId', 'msgtype', 'externalId', 'id']);
const record = objectMapper(data, { from: 'froms', to: 'tos' }, false);
const byType = data.msgtype === 'text' ? { text_body: data.msgcontent.body, text_preview_url: data.msgcontent.preview_url } : {};
const toUpsert = { ...defaultR, ...r1, ...record, ...byType, sn: key, message_origin: JSON.stringify(data) };
const [instance, created] = await OutboundModelModel.upsert({ ...toUpsert }, { returning: true });
console.info('upsertOutboundMessage', { instance, created });
return instance.toJSON();
};
module.exports = { getOutboundMessage, upsertOutboundMessage };

Loading…
Cancel
Save