From a3e68df3178ecd594bb7c68a254f04e73e1f64e3 Mon Sep 17 00:00:00 2001 From: Jimmy Date: Tue, 31 Dec 2024 09:24:08 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20=E4=BD=BF=E7=94=A8=20emitter=20?= =?UTF-8?q?=E5=8F=91=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wai-server/core/baileys/index-use-emitter.js | 361 +++++++++++++++++++ 1 file changed, 361 insertions(+) create mode 100644 wai-server/core/baileys/index-use-emitter.js diff --git a/wai-server/core/baileys/index-use-emitter.js b/wai-server/core/baileys/index-use-emitter.js new file mode 100644 index 0000000..2ddd646 --- /dev/null +++ b/wai-server/core/baileys/index-use-emitter.js @@ -0,0 +1,361 @@ +const { + makeWASocket, + Browsers, + DisconnectReason, + fetchLatestBaileysVersion, + makeCacheableSignalKeyStore, + makeInMemoryStore, + useMultiFileAuthState, + downloadMediaMessage, + isJidUser +} = require('@whiskeysockets/baileys'); +const { writeFile } = require('fs/promises'); +const waEmitter = require('../emitter'); +const serverConfig = require('../../config').server; + +const { formatPhoneNumber, parsePhoneNumber, formatStatus, formatTimestamp } = require('./helper'); +const generateId = require('../../utils/generateId.util'); +const NodeCache = require('node-cache'); +const P = require('pino'); + +waEmitter.on('message:updated', event => { + console.info('msg:evt:updated', event); +}); +waEmitter.on('message:received', event => { + console.info('msg:evt:received', event); +}); +waEmitter.on('connection:open', event => { + console.info('con:evt.open', event); +}); +waEmitter.on('connection:close', event => { + console.info('con:evt.close', event); +}); + +const createWhatsApp = async phone => { + + let qrCode = null; + const channelId = generateId(); + const whatsAppNo = phone; + // 储存键值对 msgId-externalId + // TODO 什么时候清理旧的? + const msgIdMap = new Map(); + + const connectedLisenter = []; + const fireConnected = (whatsAppNo, waInstance) => { + for (let index = 0; index < connectedLisenter.length; index++) { + const lisenter = connectedLisenter[index]; + lisenter(whatsAppNo, waInstance); + } + } + + const onConnected = (lisenter) => { + connectedLisenter.push(lisenter); + } + + const handleMessagesUpsert = async upsert => { + console.info('messages.upsert: ', JSON.stringify(upsert, undefined, 2)); + + if (upsert.type === 'notify') { + for (const msg of upsert.messages) { + + // 没有类型的消息,先忽略 + if (!msg.message) { + continue; + } + + const messageType = Object.keys(msg.message)[0]; + console.log('messageType', messageType); + + const fromWhatsAppNo = parsePhoneNumber(msg.key.remoteJid); + + if (msg.message?.conversation || msg.message?.extendedTextMessage?.text) { + const text = msg.message?.conversation || msg.message?.extendedTextMessage?.text; + + if (text.indexOf('「自~测」') > -1) { + sendTextMessage(fromWhatsAppNo, '这是自测消息:' + new Date().toString()); + } + + const externalId = msgIdMap.get(msg.key.id); + if (msg.key.fromMe) { + waEmitter.emit('message:updated', { + id: msg.key.id, + externalId, + status: formatStatus(msg.status), + direction: 'outbound', + from: whatsAppNo, + to: fromWhatsAppNo, + type: 'text', + text: { + body: text, + }, + conversation: { + type: isJidUser(msg.key.remoteJid) ? 'individual' : 'group', + }, + customerProfile: { + id: parsePhoneNumber(msg.key.participant), + name: msg.pushName, + }, + whatsAppNo, + fromMe: msg.key.fromMe, + eventSource: 'WA-01-HK.messages.upsert.notify', + updateTime: formatTimestamp(msg.messageTimestamp), + }); + } else { + waEmitter.emit('message:received', { + id: msg.key.id, + externalId, + status: '', + direction: 'inbound', + from: fromWhatsAppNo, + to: whatsAppNo, + type: 'text', + text: { + body: text, + }, + conversation: { + type: isJidUser(msg.key.remoteJid) ? 'individual' : 'group', + }, + customerProfile: { + id: parsePhoneNumber(msg.key.participant), + name: msg.pushName, + }, + whatsAppNo, + fromMe: msg.key.fromMe, + eventSource: 'WA-01-HK.messages.upsert.notify', + createTime: formatTimestamp(msg.messageTimestamp), + }); + } + } + } + } else if (upsert.type === 'append') { + for (const msg of upsert.messages) { + if (msg.message?.conversation || msg.message?.extendedTextMessage?.text) { + const text = msg.message?.conversation || msg.message?.extendedTextMessage?.text; + const fromWhatsAppNo = parsePhoneNumber(msg.key.remoteJid); + + const externalId = msgIdMap.get(msg.key.id); + if (msg.key.fromMe) { + waEmitter.emit('message:updated', { + id: msg.key.id, + externalId, + status: formatStatus(msg.status), + direction: 'outbound', + from: whatsAppNo, + to: fromWhatsAppNo, + type: 'text', + text: { + body: text, + }, + conversation: { + type: isJidUser(msg.key.remoteJid) ? 'individual' : 'group', + }, + customerProfile: { + id: parsePhoneNumber(msg.participant), + name: msg.pushName, + }, + whatsAppNo, + fromMe: msg.key.fromMe, + eventSource: 'WA-01-HK.messages.upsert.append', + updateTime: formatTimestamp(msg.messageTimestamp), + }); + } + } + } + } + } + + const handleMessagesUpdate = async messageUpdate => { + console.info('messages.update: ', JSON.stringify(messageUpdate, undefined, 2)); + + for (const msg of messageUpdate) { + + const externalId = msgIdMap.get(msg.key.id); + waEmitter.emit('message:updated', { + id: msg.key.id, + externalId, + status: formatStatus(msg.update?.status), + direction: msg.key.fromMe ? 'outbound' : 'inbound', + from: msg.key.fromMe ? whatsAppNo : parsePhoneNumber(msg.key.remoteJid), + to: msg.key.fromMe ? parsePhoneNumber(msg.key.remoteJid) : whatsAppNo, + conversation: { + type: isJidUser(msg.key.remoteJid) ? 'individual' : 'group', + }, + customerProfile: { + id: parsePhoneNumber(msg.key.participant), + name: msg.pushName, + }, + whatsAppNo, + fromMe: msg.key.fromMe, + eventSource: 'WA-01-HK.messages.updated', + updateTime: formatTimestamp(new Date().getTime() / 1000), + }); + } + } + + const start = async () => { + + const logger = P({ timestamp: () => `,"time":"${new Date().toJSON()}"` }, P.destination('./wa-logs-' + phone + '_' + channelId + '.txt')); + logger.level = 'trace'; + const msgRetryCounterCache = new NodeCache(); + const storeFilename = './baileys_auth_info/baileys_store_' + phone + '_' + channelId + '.json' + const store = makeInMemoryStore({ logger }); + store?.readFromFile(storeFilename); + // save every 10s + setInterval(() => { + store?.writeToFile(storeFilename); + }, 10_000); + const { state, saveCreds } = await useMultiFileAuthState('baileys_auth_info/' + phone + '_' + channelId); + // fetch latest version of WA Web + const { version, isLatest } = await fetchLatestBaileysVersion(); + + const waSocket = makeWASocket({ + version, + logger, + auth: { + creds: state.creds, + keys: makeCacheableSignalKeyStore(state.keys, logger), + }, + // connectTimeoutMs: 1000*60*10, + // defaultQueryTimeoutMs: 1000*60*1, + // keepAliveIntervalMs: 1000*60*60, + //retryRequestDelayMs: 1000*25, + // https://github.com/WhiskeySockets/Baileys/blob/31bc8ab/src/Utils/generics.ts#L21 + // https://github.com/WhiskeySockets/Baileys/blob/31bc8ab4e2c825c0d774875701ed07e20d05bdb6/WAProto/WAProto.proto + //browser: Browsers.ubuntu('IOS_PHONE'),//Browsers.macOS('SAFARI'),//Browsers.ubuntu('IOS_PHONE'),//Browsers.baileys('WEAR_OS'),// + msgRetryCounterCache, + generateHighQualityLinkPreview: false, + syncFullHistory: false, + }); + + store?.bind(waSocket.ev); + + + const sendTextMessage = async (number, content, externalId) => { + const jid = formatPhoneNumber(number); + + return new Promise(() => { + waSocket.sendMessage(jid, { text: content }) + .then(msg => { + msgIdMap.set(msg.key.id, externalId); + }) + .catch(ex => { + waEmitter.emit('message:updated', { + id: generateId(), + externalId, + status: 'failed', + direction: 'outbound', + from: whatsAppNo, + to: number, + error: `发送文本消息出错 ` + ex, + eventSource: 'WA-01-HK.sendMessage.promise.catch', + updateTime: formatTimestamp(new Date().getTime() / 1000), + }); + }); + }); + }; + + const sendImageMessage = async (number, imageUrl) => { + const jid = formatPhoneNumber(number); + try { + const msgInfo = await waSocket.sendMessage(jid, { + image: { url: imageUrl }, + }); + return { + messageId: msgInfo?.key?.id ?? generateId() + }; + } catch (ex) { + waEmitter.emit('message.error', { + messge: `[${whatsAppNo}->${number}]发送图片消息出错`, + from: whatsAppNo, + to: number, + error: ex + }) + console.error(`[${whatsAppNo}->${number}]发送图片消息出错: `, ex); + } + }; + + waSocket.ev.process( + // events is a map for event name => event data + async(events) => { + // something about the connection changed + // maybe it closed, or we received all offline message or connection opened + if(events['connection.update']) { + const update = events['connection.update'] + const { connection, qr, lastDisconnect } = update + + if (connection === 'close') { + if((lastDisconnect?.error)?.output?.statusCode !== DisconnectReason.loggedOut) { + start(); + } else { + waEmitter.emit('connection:close', { + whatsAppNo, channelId, + eventSource: 'WA-01-HK.connection.update.close', + status: 'offline', + }); + } + } else if (connection === 'open') { + connectionStatus = 'open'; + + waEmitter.on('request.send.message', event => { + console.info('request.send.message', event); + waSocket.sendMessage( + event.to + '@s.whatsapp.net', + { text: 'request.send.message: ' + event.content + new Date().toString()}) + }); + + fireConnected(whatsAppNo, { + socket: waSocket, + sendTextMessage, + sendImageMessage, + }); + waEmitter.emit('connection:open', { + status: 'open', whatsAppNo, channelId, + eventSource: 'WA-01-HK.connection.update.open'}); + //waSocket.sendMessage('8613317835586@s.whatsapp.net', { text: 'connection:open 发消息:' + new Date().toString()}) + } else if (qr !== undefined) { + // WebSocket 创建成功等待扫码,如果没有扫码会更新 qr + // 第一次一分钟,后面是 20 秒更新一次 + if (qrCode === null) { + qrCode = qr; + console.info('qr: ', qr) + + waEmitter.emit('connection:qr', { + createTimestamp: Date.now(), + status: 'offline', + version: '0.111111', + channelId: channelId, + phone: phone, + qrCode: qr, + socket: waSocket + }) + } else { + // 第一次二维码时效后退出,不需要等待更新二维码 + waSocket.logout(() => '二维码已过期'); + } + } + } + + if (events['creds.update']) { + await saveCreds() + } + } + ) + + waSocket.ev.on('creds.update', async () => await saveCreds()); + waSocket.ev.on('messages.upsert', handleMessagesUpsert); + waSocket.ev.on('messages.update', handleMessagesUpdate); + }; + console.info('serverConfig: ', serverConfig); + return { + createTimestamp: Date.now(), + status: 'offline', + version: 'no-promise-version', + channelId: channelId, + phone: phone, + start,onConnected + }; +}; + +module.exports = { + createWhatsApp, +};