emit webhook

dev/supplier-email-drawer
Lei OT 6 months ago
parent 8300b5f957
commit c5f413f543

2
.gitignore vendored

@ -29,3 +29,5 @@ tmp
/package-lock.json
**/LexicalEditor0
*.zip

@ -8,3 +8,5 @@ DB_DATABASE='whatsapp_individual'
DB_PORT=3306
NODE_ENV='development'
WEBHOOK_URL='https://p9axztuwd7x8a7.mycht.cn/whatsapp_server/wawebhook'

@ -1,8 +1,6 @@
const { websocketService, websocketManager } = require('../../core'); // Import from core/index.js
const { sessionService } = require('../../core'); // Import from core/index.js
const { createWhatsApp } = require('../../core/baileys'); // Import from core/index.js
const generateId = require('../../utils/generateId.util');
// const { createConnection, getConnection } = require('../../core/whatsapp/connection');
// const { getAvailableConnection } = require('../../core/whatsapp/sessionStore');
const waInstance = {
wa: null,
@ -20,7 +18,7 @@ exports.newConnect = async ctx => {
const qr = await whatsApp1.start();
waInstance.wa = whatsApp1;
ctx.assert(whatsApp1, 503, 'No available connections');
const { sessionId } = websocketService.createSession(phone, whatsApp1);
const { sessionId } = sessionService.createSession(phone, whatsApp1);
return { qr };
} catch (error) {
console.error('create connection error', error);
@ -38,7 +36,7 @@ exports.testSend = async ctx => {
};
exports.getAll = async ctx => {
const sessions = websocketService.sessions;
const sessions = sessionService.sessions;
return Array.from(sessions);
};
@ -57,17 +55,14 @@ exports.getIn = async ctx => {
// };
// checkConnections();
// });
const availableWs = websocketManager.getAvailableConnection();
if (!availableWs) {
// ctx.status = 503;
// ctx.body = { message: 'No available connections' };
ctx.assert(availableWs, 503, 'No available connections');
return;
}
// return availableWs;
const { sessionId, url } = websocketService.createSession(availableWs);
return { sessionId, url, message: 'Connection established' }; // availableWs
// const availableWs = websocketManager.getAvailableConnection();
// if (!availableWs) {
// // ctx.status = 503;
// // ctx.body = { message: 'No available connections' };
// ctx.assert(availableWs, 503, 'No available connections');
// return;
// }
// // return availableWs;
// const { sessionId, url } = websocketService.createSession(availableWs);
// return { sessionId, url, message: 'Connection established' }; // availableWs
};

@ -1,7 +1,7 @@
'use strict';
const generateId = require('../../utils/generateId.util');
const { websocketService, websocketManager, createWhatsApp } = require('../../core');
const { sessionService } = require('../../core');
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
@ -15,20 +15,20 @@ exports.sendText111 = async ctx => {
return body;
};
exports.sendText = ctx => {
exports.sendText = async ctx => {
const { from, to, content } = ctx.request.body;
if (!from || !content) {
ctx.assert(from, 400, 'From and message are required');
return;
}
const wsToSend = websocketService.getSession(from);
const wsToSend = sessionService.getSession(from);
if (!wsToSend) {
ctx.assert(wsToSend, 400, 'Session not found'); // 404
return;
}
// return wsToSend;
try {
wsToSend.sendTextMessage(to, content);
await wsToSend.sendTextMessage(to, content);
return { wsToSend, ret: 'Message sent successfully' };
} catch (error) {
console.error('Error sending message:', error);

@ -7,6 +7,7 @@ const serverNodes = Array.from({ length: 2 }, (_, i) => ({
const config = {
whatsappBaileys: serverNodes,
webhook: process.env.WEBHOOK_URL,
};
module.exports = config;

@ -0,0 +1,11 @@
const EventEmitter = require('events');
module.exports = new EventEmitter();
/**
* 监听事件
* whatsapp EventEmitter:
* - connection:removed
* - connection:added
* - message:received
* - message:updated
*/

@ -1,14 +1,11 @@
// core/index.js
const websocketServicesI = require('./websocket/services/session');
const websocketConnectionI = require('./websocket/connection');
const sessionServicesI = require('./services/session');
// Create the instances here
const websocketService = websocketServicesI();
const websocketManager = websocketConnectionI(websocketService);
const sessionService = sessionServicesI();
const createWhatsApp = require('./baileys/index');
module.exports = {
websocketService,
websocketManager,
sessionService,
createWhatsApp,
};

@ -0,0 +1,16 @@
const whatsappEvents = require('../emitter');
const { callWebhook } = require('../webhook');
const logger = console;
function setupMessageHandler() {
whatsappEvents.on('message:received', async messageData => {
try {
await callWebhook(messageData);
} catch (error) {
logger.error({ messageData, error }, 'error call webhook');
}
});
}
module.exports = { setupMessageHandler };

@ -1,4 +1,8 @@
const generateId = require('../../../utils/generateId.util');
const whatsappEvents = require('../emitter');
whatsappEvents.on('connection:added', ({ sock, sessionId }) => {});
whatsappEvents.on('connection:removed', ({ sessionId }) => {});
module.exports = () => {
const sessions = new Map();

@ -0,0 +1,37 @@
const axios = require('axios');
const axiosRetry = require('axios-retry');
const logger = console;
const webhookUrl = require('../../config').webhook;
// 5s, 10s, 20s, 30s, 1m, 15m, 30m, 1h
const retryDelays = [5000, 10000, 20000, 30000, 300000, 900000, 1800000, 3600000];
axiosRetry(axios, {
retries: 8, // Number of retries
retryDelay: retryCount => {
const delayIndex = Math.min(retryCount - 1, retryDelays.length - 1);
const delay = retryDelays[delayIndex];
logger.warn(`retry attempt: ${retryCount}, delay: ${delay / 1000}s`);
return delay;
},
retryCondition: error => {
return error.response?.status !== 200; // Retry only on non-200 status codes
},
onRetry: (retryCount, error, requestConfig) => {
logger.warn({ retryCount, error, requestConfig }, `Retrying webhook call, attempt ${retryCount}`);
},
});
async function callWebhook(messageData) {
try {
if (!webhookUrl) {
logger.info('no webhook url provided');
return;
}
await axios.post(webhookUrl, messageData);
logger.info({ webhookUrl: webhookUrl, messageData }, 'Webhook called successfully');
} catch (error) {
logger.error({ webhookUrl: webhookUrl, messageData, error }, 'Error calling webhook');
}
}
module.exports = { callWebhook };

@ -1,60 +0,0 @@
const WebSocket = require('ws');
const websocketServerNodes = require('../../config').websockets;
module.exports = callService => {
let connections = [];
const connect = () => {
websocketServerNodes.forEach((node, index) => {
createConnection(node, index);
});
};
const createConnection = (node, index) => {
try {
const ws = new WebSocket(node.host, [node.protocol]);
ws.on('open', () => {
console.log(`Connected to WebSocket ${index}: ${node.protocol} ${node.host}`);
// connections.push(ws);
connections = [...connections, ws];
console.log('Current connections:', connections.length);
});
ws.on('close', () => {
console.log(`WebSocket ${index} disconnected. Reconnecting...`);
connections.splice(connections.indexOf(ws), 1);
setTimeout(() => createConnection(node, index), 3000);
});
ws.on('error', error => {
console.error(`WebSocket ${index} error:`, error);
});
ws.on('message', message => {
console.log(`Received from WebSocket ${index}: ${message}`);
});
} catch (error) {
console.error(`Error connecting to WebSocket ${index}:`, error);
}
};
const getAvailableConnection = () => {
// console.log('Available connections length:', connections.length);
for (const ws of connections) {
// console.log('Session has ws:', callService.sessions.has(ws));
if (!callService.sessions.has(ws)) {
// console.log('Found available ws:', ws); // Add this line
return ws;
}
}
console.log('No available connections found.');
return null;
};
const getConnections = () => connections; // Add this getter
return {
connect,
getAvailableConnection,
getConnections,
};
};

@ -5,12 +5,17 @@ const server = require('./server');
const { port } = require('./config').server;
const { setupMessageHandler } = require('./core/services/messageHandler');
async function bootstrap() {
/**
* Add external services init as async operations (db, redis, etc...)
* e.g.
* await sequelize.authenticate()
*/
setupMessageHandler();
return http.createServer(server.callback()).listen(port, '0.0.0.0');
}

@ -10,6 +10,7 @@
"dependencies": {
"@koa/cors": "2.2.3",
"@whiskeysockets/baileys": "^6.7.9",
"axios-retry": "^4.5.0",
"dotenv": "8.0.0",
"joi": "14.3.1",
"koa": "2.7.0",
@ -998,6 +999,28 @@
"proxy-from-env": "^1.1.0"
}
},
"node_modules/axios-retry": {
"version": "4.5.0",
"resolved": "https://registry.npmjs.org/axios-retry/-/axios-retry-4.5.0.tgz",
"integrity": "sha512-aR99oXhpEDGo0UuAlYcn2iGRds30k366Zfa05XWScR9QaQD4JYiP3/1Qt1u7YlefUOK+cn0CcwoL1oefavQUlQ==",
"dependencies": {
"is-retry-allowed": "^2.2.0"
},
"peerDependencies": {
"axios": "0.x || 1.x"
}
},
"node_modules/axios-retry/node_modules/is-retry-allowed": {
"version": "2.2.0",
"resolved": "https://registry.npmjs.org/is-retry-allowed/-/is-retry-allowed-2.2.0.tgz",
"integrity": "sha512-XVm7LOeLpTW4jV19QSH38vkswxoLud8sQ57YwJVTPWdiaI9I8keEhGFpBlslyVsgdQy4Opg8QOLb8YRgsyZiQg==",
"engines": {
"node": ">=10"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/balanced-match": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/balanced-match/-/balanced-match-1.0.0.tgz",

@ -27,6 +27,8 @@
},
"dependencies": {
"@koa/cors": "2.2.3",
"@whiskeysockets/baileys": "^6.7.9",
"axios-retry": "^4.5.0",
"dotenv": "8.0.0",
"joi": "14.3.1",
"koa": "2.7.0",
@ -39,7 +41,6 @@
"mysql2": "^3.11.5",
"sequelize": "^6.37.5",
"uuid": "3.3.2",
"ws": "^8.18.0",
"@whiskeysockets/baileys": "^6.7.9"
"ws": "^8.18.0"
}
}

Loading…
Cancel
Save