From bb364e5cacd6c24874370126ebafb836ce112bb7 Mon Sep 17 00:00:00 2001 From: Lei OT Date: Fri, 7 Mar 2025 17:17:19 +0800 Subject: [PATCH] =?UTF-8?q?feat(WAI):=20=E8=AE=BE=E7=BD=AE=E5=8F=B7?= =?UTF-8?q?=E7=A0=81=E6=89=98=E7=AE=A1;=20=E6=89=98=E7=AE=A1=E8=B4=A6?= =?UTF-8?q?=E5=8F=B7,=20=E5=8D=95=E4=B8=AA=E4=BC=9A=E8=AF=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wai-server/api/channels/channel.controller.js | 27 ++++++++++++++++++- wai-server/api/channels/channel.routes.js | 1 + wai-server/core/handler/whatsappHandler.js | 23 ++++++++++++++-- wai-server/core/tripplanner/index.js | 26 +++++++++++++++--- wai-server/services/agent_sessions.service.js | 19 ++++++++++++- wai-server/services/connections.service.js | 2 +- 6 files changed, 90 insertions(+), 8 deletions(-) diff --git a/wai-server/api/channels/channel.controller.js b/wai-server/api/channels/channel.controller.js index 1b4817b..21ab354 100644 --- a/wai-server/api/channels/channel.controller.js +++ b/wai-server/api/channels/channel.controller.js @@ -3,6 +3,7 @@ const { sessionStore } = require('../../core'); // Import from core/index.js const { createWhatsApp } = require('../../core/baileys'); // Import from core/index.js const { getConnection } = require('../../services/connections.service'); +const { upsert: upsertAgentSession } = require('../../services/agent_sessions.service'); const { isEmpty } = require('../../utils/commons.util'); const { getUserLogger } = require('../../utils/logger.util'); const { domain } = require('../../config').server; @@ -53,10 +54,33 @@ const offline = async ctx => { } }; +const hosted = async ctx => { + const { phone, contact, hosted } = ctx.query; + ctx.assert(phone, 400, 'phone is required'); + const existsSession = sessionStore.getSession(phone); + ctx.assert(existsSession, 400, `WhatsApp ${phone} 已离线`); + try { + // hosting an USER + if (isEmpty(contact)) { + waEmitter.emit('connection:update', { phone, whatsAppNo: phone, status: hosted ? 'hosted' : 'open', channelId: existsSession.channelId }); + return ''; + } + // hosting a session + await upsertAgentSession({ + hosted: phone, + contact, + }); + return ''; + } catch (error) { + console.error('set hosted error', error); + ctx.assert(null, 500, 'Failed to set hosted'); + } +}; + /** * @deprecated */ -const setStatus = async (ctx) => { +const setStatus = async ctx => { const { phone, status } = ctx.request.body; const existsSession = sessionStore.getSession(phone); ctx.assert(existsSession, 400, `WhatsApp ${phone} 已离线`); @@ -76,5 +100,6 @@ module.exports = { getAll, getSessions, offline, + hosted, test, }; diff --git a/wai-server/api/channels/channel.routes.js b/wai-server/api/channels/channel.routes.js index 38db24d..3f0cb89 100644 --- a/wai-server/api/channels/channel.routes.js +++ b/wai-server/api/channels/channel.routes.js @@ -13,6 +13,7 @@ module.exports = Router => { // .get('/qrcode', controller.qrcode) .get('/sessions', controller.getSessions) .get('/offline', controller.offline) + .get('/hosted', controller.hosted) .get('/test', controller.test) .get('/', controller.getAll); diff --git a/wai-server/core/handler/whatsappHandler.js b/wai-server/core/handler/whatsappHandler.js index 309ef3f..cf7e11b 100644 --- a/wai-server/core/handler/whatsappHandler.js +++ b/wai-server/core/handler/whatsappHandler.js @@ -159,6 +159,22 @@ const setupConnectionHandler = () => { logger.error({ connectionData, error }, 'error close connection'); } }); + whatsappEvents.on('connection:update', async connectionData => { + try { + getUserLogger(connectionData.whatsAppNo).warn({ msg: 'connection:update', connectionData }); + await updateConnection( + { + ...objectMapper(connectionData, { whatsAppNo: [{ key: 'wa_id' }], channelId: 'channel_id' }), + service_type: 'baileys', + }, + { connect_domain: domain, connect_name: domainName }, + ); + const webhookBody = webhookBodyBuilder({ ...connectionData, to: connectionData.whatsAppNo, connection: connectionData.status }, 'creds:update'); + await callWebhook(webhookBody); + } catch (error) { + logger.error({ connectionData, error }, 'error close connection'); + } + }); }; /** @@ -233,7 +249,10 @@ const setupMessageHandler = async () => { await callWebhook(webhookBodyFill(webhookBody, { ...msgRow, status: msgRow.msg_status || '' })); } - whatsappEvents.emit(`user:${eventName}`, msgRow); + // 只对新消息, 同步的历史消息不再处理 + if (messageData.eventSource.includes('notify')) { + whatsappEvents.emit(`user:${eventName}`, msgRow); + } } catch (error) { logger.error({ messageData, error }, 'error call webhook'); } @@ -280,7 +299,7 @@ async function resetCurrentConnections() { * 登录: 当前服务的所有连接 */ async function loginCurrentConnections() { - const currents = await getConnection({ connect_domain: domain, connect_name: domainName, status: 'open' }); + const currents = await getConnection({ connect_domain: domain, connect_name: domainName, status: ['open', 'hosted'] }); for (const user of currents) { const { wa_id: waId } = user; const phone = waId.replace('+', ''); diff --git a/wai-server/core/tripplanner/index.js b/wai-server/core/tripplanner/index.js index 563a2c1..6574366 100644 --- a/wai-server/core/tripplanner/index.js +++ b/wai-server/core/tripplanner/index.js @@ -3,7 +3,8 @@ const generateId = require('../../utils/generateId.util'); const plannerEvents = require('../emitter'); // const createAsyncQueueProcessor = require('../emitter/queueProcessor'); -const { getAgentSession, createAgentSession } = require('../../services/agent_sessions.service'); +const { getConnection } = require('../../services/connections.service') +const { getAgentSession, createAgentSession, update: updateAgentSession } = require('../../services/agent_sessions.service'); // const { getOutboundMessage, upsertOutboundMessage } = require('../../services/outbound_messages.service'); const { isEmpty } = require('../../utils/commons.util'); const { logger, getUserLogger } = require('../../utils/logger.util'); @@ -25,10 +26,13 @@ const setupUserMessageHandler = async () => { const { from, to, direction } = msgRow; const _whatsAppNo = to; const _contact = from; - if (!AGENT_PHONE_NO.includes(_whatsAppNo) || direction !== 'inbound') { + getUserLogger(_whatsAppNo).info({ eventName, msgRow }); + + const hostedUser = await getConnection({ wa_id: _whatsAppNo, status: 'hosted' }); + if ((!AGENT_PHONE_NO.includes(_whatsAppNo) && isEmpty(hostedUser)) || direction !== 'inbound') { + // logger.info({ eventName, msgRow }, 'ignore message'); return false; } - getUserLogger(_whatsAppNo).info({ eventName, msgRow }); try { const agentSession = await getAgentSession({ hosted: _whatsAppNo, contact: _contact }); @@ -79,6 +83,22 @@ const setupAgentMessageHandler = async () => { agent_name: agentRes.agent.name, model_id: agentRes.usage.models[0].model_id, }); + } else { + // 更新 + await updateAgentSession( + { + agent_session_id: agentSessionId, + hosted: _whatsAppNo, + contact: _contact, + agent_id: agentRes.agent.id, + agent_name: agentRes.agent.name, + model_id: agentRes.usage.models[0].model_id, + }, + { + hosted: _whatsAppNo, + contact: _contact, + }, + ); } // agent -> contact const msgReady = { from: _whatsAppNo, to: _contact, msgcontent: { body: agentMsg.text }, msgtype: 'text', actionId: `.${generateId()}.${agentSessionId}` }; diff --git a/wai-server/services/agent_sessions.service.js b/wai-server/services/agent_sessions.service.js index e1605a6..1f3075d 100644 --- a/wai-server/services/agent_sessions.service.js +++ b/wai-server/services/agent_sessions.service.js @@ -22,4 +22,21 @@ const createAgentSession = async data => { return r; }; -module.exports = { getAgentSession, createAgentSession }; +const update = async (data, where) => { + // const r = await AgentSessionsModelModel.create(data); + const [rows] = await AgentSessionsModelModel.update(data, { where }); + return rows; +}; + +const upsert = async (data, where = {}) => { + const _where = isEmpty(where) ? data : where; + const [instance, created] = await AgentSessionsModelModel.findOrCreate({ where: _where, defaults: { ...data } }); + if (!created) { + await instance.update({ ...data }, { where }); + const savedI = await instance.save(); // reload + return savedI.toJSON(); + } + return instance.toJSON(); +}; + +module.exports = { getAgentSession, createAgentSession, update, upsert }; diff --git a/wai-server/services/connections.service.js b/wai-server/services/connections.service.js index b6238cc..9062275 100644 --- a/wai-server/services/connections.service.js +++ b/wai-server/services/connections.service.js @@ -40,7 +40,7 @@ const updateConnection = async (data, where = {}) => { const getConnection = async data => { const r = await ConnectionsModel.findAll({ where: data, raw: true }); - return r; + return r || []; }; const resetConnection = async () => {