diff --git a/.gitignore b/.gitignore index 5954f1e..f371348 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,5 @@ tmp /package-lock.json **/LexicalEditor0 + +*.zip diff --git a/wai-server/.env.development b/wai-server/.env.development index ccdd5c1..2a84488 100644 --- a/wai-server/.env.development +++ b/wai-server/.env.development @@ -8,3 +8,5 @@ DB_DATABASE='whatsapp_individual' DB_PORT=3306 NODE_ENV='development' + +WEBHOOK_URL='https://p9axztuwd7x8a7.mycht.cn/whatsapp_server/wawebhook' diff --git a/wai-server/api/channels/channel.controller.js b/wai-server/api/channels/channel.controller.js index 27b0e64..f07a5ae 100644 --- a/wai-server/api/channels/channel.controller.js +++ b/wai-server/api/channels/channel.controller.js @@ -1,8 +1,6 @@ -const { websocketService, websocketManager } = require('../../core'); // Import from core/index.js +const { sessionService } = require('../../core'); // Import from core/index.js const { createWhatsApp } = require('../../core/baileys'); // Import from core/index.js const generateId = require('../../utils/generateId.util'); -// const { createConnection, getConnection } = require('../../core/whatsapp/connection'); -// const { getAvailableConnection } = require('../../core/whatsapp/sessionStore'); const waInstance = { wa: null, @@ -20,7 +18,7 @@ exports.newConnect = async ctx => { const qr = await whatsApp1.start(); waInstance.wa = whatsApp1; ctx.assert(whatsApp1, 503, 'No available connections'); - const { sessionId } = websocketService.createSession(phone, whatsApp1); + const { sessionId } = sessionService.createSession(phone, whatsApp1); return { qr }; } catch (error) { console.error('create connection error', error); @@ -38,7 +36,7 @@ exports.testSend = async ctx => { }; exports.getAll = async ctx => { - const sessions = websocketService.sessions; + const sessions = sessionService.sessions; return Array.from(sessions); }; @@ -57,17 +55,14 @@ exports.getIn = async ctx => { // }; // checkConnections(); // }); - - const availableWs = websocketManager.getAvailableConnection(); - - if (!availableWs) { - // ctx.status = 503; - // ctx.body = { message: 'No available connections' }; - ctx.assert(availableWs, 503, 'No available connections'); - return; - } - // return availableWs; - - const { sessionId, url } = websocketService.createSession(availableWs); - return { sessionId, url, message: 'Connection established' }; // availableWs + // const availableWs = websocketManager.getAvailableConnection(); + // if (!availableWs) { + // // ctx.status = 503; + // // ctx.body = { message: 'No available connections' }; + // ctx.assert(availableWs, 503, 'No available connections'); + // return; + // } + // // return availableWs; + // const { sessionId, url } = websocketService.createSession(availableWs); + // return { sessionId, url, message: 'Connection established' }; // availableWs }; diff --git a/wai-server/api/messages/message.controller.js b/wai-server/api/messages/message.controller.js index 533d21e..62cdfd4 100644 --- a/wai-server/api/messages/message.controller.js +++ b/wai-server/api/messages/message.controller.js @@ -1,7 +1,7 @@ 'use strict'; const generateId = require('../../utils/generateId.util'); -const { websocketService, websocketManager, createWhatsApp } = require('../../core'); +const { sessionService } = require('../../core'); function sleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)); @@ -15,20 +15,20 @@ exports.sendText111 = async ctx => { return body; }; -exports.sendText = ctx => { +exports.sendText = async ctx => { const { from, to, content } = ctx.request.body; if (!from || !content) { ctx.assert(from, 400, 'From and message are required'); return; } - const wsToSend = websocketService.getSession(from); + const wsToSend = sessionService.getSession(from); if (!wsToSend) { ctx.assert(wsToSend, 400, 'Session not found'); // 404 return; } // return wsToSend; try { - wsToSend.sendTextMessage(to, content); + await wsToSend.sendTextMessage(to, content); return { wsToSend, ret: 'Message sent successfully' }; } catch (error) { console.error('Error sending message:', error); diff --git a/wai-server/config/components/whatsapp.baileys.config.js b/wai-server/config/components/whatsapp.baileys.config.js index b031af3..cb89b12 100644 --- a/wai-server/config/components/whatsapp.baileys.config.js +++ b/wai-server/config/components/whatsapp.baileys.config.js @@ -7,6 +7,7 @@ const serverNodes = Array.from({ length: 2 }, (_, i) => ({ const config = { whatsappBaileys: serverNodes, + webhook: process.env.WEBHOOK_URL, }; module.exports = config; diff --git a/wai-server/core/emitter/index.js b/wai-server/core/emitter/index.js new file mode 100644 index 0000000..126cf22 --- /dev/null +++ b/wai-server/core/emitter/index.js @@ -0,0 +1,11 @@ +const EventEmitter = require('events'); +module.exports = new EventEmitter(); + +/** + * 监听事件 + * whatsapp EventEmitter: + * - connection:removed + * - connection:added + * - message:received + * - message:updated + */ diff --git a/wai-server/core/index.js b/wai-server/core/index.js index 2272204..18e85f8 100644 --- a/wai-server/core/index.js +++ b/wai-server/core/index.js @@ -1,14 +1,11 @@ // core/index.js -const websocketServicesI = require('./websocket/services/session'); -const websocketConnectionI = require('./websocket/connection'); +const sessionServicesI = require('./services/session'); // Create the instances here -const websocketService = websocketServicesI(); -const websocketManager = websocketConnectionI(websocketService); +const sessionService = sessionServicesI(); const createWhatsApp = require('./baileys/index'); module.exports = { - websocketService, - websocketManager, + sessionService, createWhatsApp, }; diff --git a/wai-server/core/services/messageHandler.js b/wai-server/core/services/messageHandler.js new file mode 100644 index 0000000..99d4c9b --- /dev/null +++ b/wai-server/core/services/messageHandler.js @@ -0,0 +1,16 @@ +const whatsappEvents = require('../emitter'); +const { callWebhook } = require('../webhook'); + +const logger = console; + +function setupMessageHandler() { + whatsappEvents.on('message:received', async messageData => { + try { + await callWebhook(messageData); + } catch (error) { + logger.error({ messageData, error }, 'error call webhook'); + } + }); +} + +module.exports = { setupMessageHandler }; diff --git a/wai-server/core/websocket/services/session.js b/wai-server/core/services/session.js similarity index 75% rename from wai-server/core/websocket/services/session.js rename to wai-server/core/services/session.js index ecdcfb7..eb73af6 100644 --- a/wai-server/core/websocket/services/session.js +++ b/wai-server/core/services/session.js @@ -1,4 +1,8 @@ const generateId = require('../../../utils/generateId.util'); +const whatsappEvents = require('../emitter'); + +whatsappEvents.on('connection:added', ({ sock, sessionId }) => {}); +whatsappEvents.on('connection:removed', ({ sessionId }) => {}); module.exports = () => { const sessions = new Map(); diff --git a/wai-server/core/webhook/index.js b/wai-server/core/webhook/index.js new file mode 100644 index 0000000..dabfebe --- /dev/null +++ b/wai-server/core/webhook/index.js @@ -0,0 +1,37 @@ +const axios = require('axios'); +const axiosRetry = require('axios-retry'); +const logger = console; +const webhookUrl = require('../../config').webhook; + +// 5s, 10s, 20s, 30s, 1m, 15m, 30m, 1h +const retryDelays = [5000, 10000, 20000, 30000, 300000, 900000, 1800000, 3600000]; +axiosRetry(axios, { + retries: 8, // Number of retries + retryDelay: retryCount => { + const delayIndex = Math.min(retryCount - 1, retryDelays.length - 1); + const delay = retryDelays[delayIndex]; + logger.warn(`retry attempt: ${retryCount}, delay: ${delay / 1000}s`); + return delay; + }, + retryCondition: error => { + return error.response?.status !== 200; // Retry only on non-200 status codes + }, + onRetry: (retryCount, error, requestConfig) => { + logger.warn({ retryCount, error, requestConfig }, `Retrying webhook call, attempt ${retryCount}`); + }, +}); + +async function callWebhook(messageData) { + try { + if (!webhookUrl) { + logger.info('no webhook url provided'); + return; + } + await axios.post(webhookUrl, messageData); + logger.info({ webhookUrl: webhookUrl, messageData }, 'Webhook called successfully'); + } catch (error) { + logger.error({ webhookUrl: webhookUrl, messageData, error }, 'Error calling webhook'); + } +} + +module.exports = { callWebhook }; diff --git a/wai-server/core/websocket/connection.js b/wai-server/core/websocket/connection.js deleted file mode 100644 index 3961253..0000000 --- a/wai-server/core/websocket/connection.js +++ /dev/null @@ -1,60 +0,0 @@ -const WebSocket = require('ws'); -const websocketServerNodes = require('../../config').websockets; - -module.exports = callService => { - let connections = []; - - const connect = () => { - websocketServerNodes.forEach((node, index) => { - createConnection(node, index); - }); - }; - - const createConnection = (node, index) => { - try { - const ws = new WebSocket(node.host, [node.protocol]); - ws.on('open', () => { - console.log(`Connected to WebSocket ${index}: ${node.protocol} ${node.host}`); - // connections.push(ws); - connections = [...connections, ws]; - console.log('Current connections:', connections.length); - }); - - ws.on('close', () => { - console.log(`WebSocket ${index} disconnected. Reconnecting...`); - connections.splice(connections.indexOf(ws), 1); - setTimeout(() => createConnection(node, index), 3000); - }); - - ws.on('error', error => { - console.error(`WebSocket ${index} error:`, error); - }); - - ws.on('message', message => { - console.log(`Received from WebSocket ${index}: ${message}`); - }); - } catch (error) { - console.error(`Error connecting to WebSocket ${index}:`, error); - } - }; - - const getAvailableConnection = () => { - // console.log('Available connections length:', connections.length); - for (const ws of connections) { - // console.log('Session has ws:', callService.sessions.has(ws)); - if (!callService.sessions.has(ws)) { - // console.log('Found available ws:', ws); // Add this line - return ws; - } - } - console.log('No available connections found.'); - return null; - }; - const getConnections = () => connections; // Add this getter - - return { - connect, - getAvailableConnection, - getConnections, - }; -}; diff --git a/wai-server/index.js b/wai-server/index.js index 80779a8..1452edc 100644 --- a/wai-server/index.js +++ b/wai-server/index.js @@ -5,12 +5,17 @@ const server = require('./server'); const { port } = require('./config').server; +const { setupMessageHandler } = require('./core/services/messageHandler'); + async function bootstrap() { /** * Add external services init as async operations (db, redis, etc...) * e.g. * await sequelize.authenticate() */ + + setupMessageHandler(); + return http.createServer(server.callback()).listen(port, '0.0.0.0'); } diff --git a/wai-server/package-lock.json b/wai-server/package-lock.json index 55ade97..c499411 100644 --- a/wai-server/package-lock.json +++ b/wai-server/package-lock.json @@ -10,6 +10,7 @@ "dependencies": { "@koa/cors": "2.2.3", "@whiskeysockets/baileys": "^6.7.9", + "axios-retry": "^4.5.0", "dotenv": "8.0.0", "joi": "14.3.1", "koa": "2.7.0", @@ -998,6 +999,28 @@ "proxy-from-env": "^1.1.0" } }, + "node_modules/axios-retry": { + "version": "4.5.0", + "resolved": "https://registry.npmjs.org/axios-retry/-/axios-retry-4.5.0.tgz", + "integrity": "sha512-aR99oXhpEDGo0UuAlYcn2iGRds30k366Zfa05XWScR9QaQD4JYiP3/1Qt1u7YlefUOK+cn0CcwoL1oefavQUlQ==", + "dependencies": { + "is-retry-allowed": "^2.2.0" + }, + "peerDependencies": { + "axios": "0.x || 1.x" + } + }, + "node_modules/axios-retry/node_modules/is-retry-allowed": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/is-retry-allowed/-/is-retry-allowed-2.2.0.tgz", + "integrity": "sha512-XVm7LOeLpTW4jV19QSH38vkswxoLud8sQ57YwJVTPWdiaI9I8keEhGFpBlslyVsgdQy4Opg8QOLb8YRgsyZiQg==", + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/balanced-match": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/balanced-match/-/balanced-match-1.0.0.tgz", diff --git a/wai-server/package.json b/wai-server/package.json index d8a8463..d5681ae 100644 --- a/wai-server/package.json +++ b/wai-server/package.json @@ -27,6 +27,8 @@ }, "dependencies": { "@koa/cors": "2.2.3", + "@whiskeysockets/baileys": "^6.7.9", + "axios-retry": "^4.5.0", "dotenv": "8.0.0", "joi": "14.3.1", "koa": "2.7.0", @@ -39,7 +41,6 @@ "mysql2": "^3.11.5", "sequelize": "^6.37.5", "uuid": "3.3.2", - "ws": "^8.18.0", - "@whiskeysockets/baileys": "^6.7.9" + "ws": "^8.18.0" } }