feat(WAI): 托管号码给trip planner Agent

dev/voice
Lei OT 4 months ago
parent 38e61ed4d1
commit 3d5b8dba73

@ -13,3 +13,5 @@ WEBHOOK_URL='https://p9axztuwd7x8a7.mycht.cn/whatsapp_server/v2/individual_waweb
SERVER_DOMAIN='https://wai-server-qq4qmtq7wc9he4.mycht.cn' SERVER_DOMAIN='https://wai-server-qq4qmtq7wc9he4.mycht.cn'
SERVER_NAME='liaoyijun_01' SERVER_NAME='liaoyijun_01'
AGENT_PHONE_NO='8613317835586'

@ -8,6 +8,7 @@ const serverNodes = Array.from({ length: 2 }, (_, i) => ({
const config = { const config = {
whatsappBaileys: serverNodes, whatsappBaileys: serverNodes,
webhook: process.env.WEBHOOK_URL, webhook: process.env.WEBHOOK_URL,
agentHosted: process.env.AGENT_PHONE_NO,
}; };
module.exports = config; module.exports = config;

@ -228,12 +228,12 @@ const setupMessageHandler = async () => {
const readyUpsert = omitEmpty({ ...timeFields, ...pusher, ...waiContentFieldsToDB, ...record }); const readyUpsert = omitEmpty({ ...timeFields, ...pusher, ...waiContentFieldsToDB, ...record });
const msgRow = await upsertOutboundMessage({ ...upsertFields, ...readyUpsert, ...typeField, ...msgOrigin }, targetUpsert); const msgRow = await upsertOutboundMessage({ ...upsertFields, ...readyUpsert, ...typeField, ...msgOrigin }, targetUpsert);
whatsappEvents.emit(`user:${eventName}`, msgRow);
if (_type !== 'unresolvable') { if (_type !== 'unresolvable') {
// 把内容加上, 否则前端没显示 // 把内容加上, 否则前端没显示
await callWebhook(webhookBodyFill(webhookBody, { ...msgRow, status: msgRow.msg_status || '' })); await callWebhook(webhookBodyFill(webhookBody, { ...msgRow, status: msgRow.msg_status || '' }));
} }
whatsappEvents.emit(`user:${eventName}`, msgRow);
} catch (error) { } catch (error) {
logger.error({ messageData, error }, 'error call webhook'); logger.error({ messageData, error }, 'error call webhook');
} }

@ -0,0 +1,120 @@
'use strict';
const generateId = require('../../utils/generateId.util');
const plannerEvents = require('../emitter');
// const createAsyncQueueProcessor = require('../emitter/queueProcessor');
const { getAgentSession, createAgentSession } = require('../../services/agent_sessions.service');
// const { getOutboundMessage, upsertOutboundMessage } = require('../../services/outbound_messages.service');
const { isEmpty } = require('../../utils/commons.util');
const { logger, getUserLogger } = require('../../utils/logger.util');
const { callAgentRounds } = require('../vendor/bailian.aliyun');
const { sendMessage } = require('../handler/whatsappHandler');
const userMessageEventNames = ['user:message:received'];
const agentMessageEventNames = ['agent:message:received'];
// const AGENT_PHONE_NO = ['8613557032060'];
const AGENT_PHONE_NO = require('../../config').agentHosted.split(',');
/**
* 接收客人的回复
* - 发送给agent
*/
const setupUserMessageHandler = async () => {
const messageListner = async ({ eventName, msgRow }) => {
const { from, to, direction } = msgRow;
const _whatsAppNo = to;
const _contact = from;
if (!AGENT_PHONE_NO.includes(_whatsAppNo) || direction !== 'inbound') {
return false;
}
getUserLogger(_whatsAppNo).info({ eventName, msgRow });
try {
const agentSession = await getAgentSession({ hosted: _whatsAppNo, contact: _contact });
// if (isEmpty(agentSession) || isEmpty(savedMsg?.text_body)) {
// 只处理文本
if (isEmpty(msgRow?.text_body)) {
return false;
}
// contact -> agent
const agentPost = {
agent_session_id: agentSession?.agent_session_id || '',
content: msgRow.text_body,
};
const agentRes = await callAgentRounds(agentPost);
plannerEvents.emit('agent:message:received', { msgRow, agentSession, agentRes });
// #
} catch (error) {
logger.error({ messageData: msgRow, error }, 'error call agent');
}
};
userMessageEventNames.forEach(eventName => {
plannerEvents.on(eventName, async msgRow => messageListner({ eventName, msgRow }));
});
};
const setupAgentMessageHandler = async () => {
const messageListner = async ({ eventName, payload }) => {
const { msgRow, agentSession, agentRes } = payload;
const { from, to, whatsAppNo, direction } = msgRow;
const _whatsAppNo = whatsAppNo || to;
const _contact = from;
const { output: agentMsg } = agentRes;
const { session_id: agentSessionId } = agentMsg;
getUserLogger(_whatsAppNo).info({ eventName, agentRes });
try {
const now = new Date(new Date().getTime() + 60 * 60 * 1000).toISOString();
// 号码直接托管
// - todo: 按会话托管时另外处理
if (isEmpty(agentSession)) {
await createAgentSession({
agent_session_id: agentSessionId,
hosted: _whatsAppNo,
contact: _contact,
agent_id: agentRes.agent.id,
agent_name: agentRes.agent.name,
model_id: agentRes.usage.models[0].model_id,
});
}
// agent -> contact
const msgReady = { from: _whatsAppNo, to: _contact, msgcontent: { body: agentMsg.text }, msgtype: 'text', actionId: `.${generateId()}.${agentSessionId}` };
await sendMessage(msgReady);
// #
} catch (error) {
logger.error({ msgRow, agentRes, error }, 'error call agent');
}
};
agentMessageEventNames.forEach(eventName => {
plannerEvents.on(eventName, async payload => messageListner({ eventName, payload }));
});
};
async function startAgentSession(messageData) {
// const now = new Date(new Date().getTime() + 60 * 60 * 1000).toISOString();
// const { from, to, whatsAppNo, direction } = messageData;
// const _whatsAppNo = whatsAppNo || to;
// const _contact = from;
// // -> agent
// const agentRes = await callAgentFirst({
// // agent_session_id: agentSession.agent_session_id,
// content: messageData.text_body,
// });
// const { text: agentMsg, session_id: agentSessionId } = agentRes;
// await createAgentSession({ agent_session_id: agentSessionId, hosted: _whatsAppNo, contact: _contact, created_at: now });
// const msgReady = { from: _whatsAppNo, to: _contact, body: agentMsg, msgtype: 'text', actionId: `.${generateId()}` };
// await sendMessage(msgReady);
// //
}
function setupAgentHandler() {
setupAgentMessageHandler();
setupUserMessageHandler();
}
module.exports = { setupAgentHandler, startAgentSession };

@ -0,0 +1,69 @@
'use strict';
const axios = require('axios');
const { logger, getUserLogger } = require('../../utils/logger.util');
const config = {
publicUrl: 'https://dashscope.aliyuncs.com/api/v1/apps/{appId}/completion',
apiKey: 'sk-7d059313bd8943318b91cf4c25d91470', // process.env.ALIYUN_API_KEY,
agent: {
id: '3abf8008c77b4ccabb6ed37c6f42a882',
name: 'sales-7',
},
};
/**
* ! system prompt 将会覆盖 agent的system prompt
*/
async function callAgentFirst(payload) {
const url = config.publicUrl.replace('{appId}', config.agent.id);
try {
if (!url) {
logger.error('no agent url provided\n', payload);
return;
}
const body = {
input: {
// prompt: payload.content,
messages: [
// { role: 'system', content: `Start contacting the guest named ${payload.customer_name}, here is the guest's reservation request information: ${payload.background}` },
// { role: 'assistant', content: payload.hello },
{ role: 'user', content: payload.content },
],
},
// parameters: {
// messages: '',
// },
};
const { output } = await axios.post(url, body, { headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${config.apiKey}` } });
return output;
} catch (error) {
logger.error(JSON.stringify({ url, payload, error: error.message }, undefined, 2), 'Error calling agent');
}
}
/**
* @returns {object} agent response { session_id, text }
*/
async function callAgentRounds(payload) {
const url = config.publicUrl.replace('{appId}', config.agent.id);
try {
if (!url) {
logger.error('no agent url provided\n', payload);
return;
}
// getUserLogger(payload.whatsAppNo).info({ url, payload });
const body = {
input: { prompt: payload.content, session_id: payload.agent_session_id },
// parameters: {
// messages: '',
// },
};
const res = await axios.post(url, body, { headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${config.apiKey}` } });
return { ...res.data, agent: config.agent };
} catch (error) {
logger.error(JSON.stringify({ url, payload, error: error.message }, undefined, 2), 'Error calling agent');
}
}
module.exports = { callAgentFirst, callAgentRounds };

@ -6,6 +6,7 @@ const server = require('./server');
const { port } = require('./config').server; const { port } = require('./config').server;
const { setupWhatsappHandler, resetCurrentConnections, loginCurrentConnections } = require('./core/handler/whatsappHandler'); const { setupWhatsappHandler, resetCurrentConnections, loginCurrentConnections } = require('./core/handler/whatsappHandler');
const { setupAgentHandler } = require('./core/tripplanner');
async function bootstrap() { async function bootstrap() {
/** /**
@ -15,6 +16,7 @@ async function bootstrap() {
*/ */
setupWhatsappHandler(); setupWhatsappHandler();
setupAgentHandler();
return http.createServer(server.callback()).listen(port, '0.0.0.0'); return http.createServer(server.callback()).listen(port, '0.0.0.0');
} }

@ -0,0 +1,61 @@
const Sequelize = require('sequelize');
module.exports = function(sequelize, DataTypes) {
return sequelize.define('agent_sessions', {
sn: {
autoIncrement: true,
type: DataTypes.BIGINT,
allowNull: false,
primaryKey: true
},
agent_session_id: {
type: DataTypes.STRING(100),
allowNull: true
},
contact: {
type: DataTypes.STRING(100),
allowNull: true,
comment: "联系人"
},
hosted: {
type: DataTypes.STRING(100),
allowNull: true,
comment: "托管的号码"
},
agent_id: {
type: DataTypes.STRING(100),
allowNull: true
},
agent_name: {
type: DataTypes.STRING(100),
allowNull: true
},
model_id: {
type: DataTypes.STRING(100),
allowNull: true
},
createtime: {
type: DataTypes.DATE,
allowNull: true,
defaultValue: Sequelize.Sequelize.literal('CURRENT_TIMESTAMP')
},
updatetime: {
type: DataTypes.DATE,
allowNull: true,
defaultValue: Sequelize.Sequelize.literal('CURRENT_TIMESTAMP')
}
}, {
sequelize,
tableName: 'agent_sessions',
timestamps: false,
indexes: [
{
name: "PRIMARY",
unique: true,
using: "BTREE",
fields: [
{ name: "sn" },
]
},
]
});
};

@ -1,16 +1,19 @@
var DataTypes = require('sequelize').DataTypes; var DataTypes = require('sequelize').DataTypes;
var _agent_sessions = require("./agent_sessions");
var _connections = require('./connections'); var _connections = require('./connections');
var _outbound_messages = require("./outbound_messages"); var _outbound_messages = require("./outbound_messages");
var _request_logs = require('./request_logs'); var _request_logs = require('./request_logs');
var _webhook_logs = require('./webhook_logs'); var _webhook_logs = require('./webhook_logs');
function initModels(sequelize) { function initModels(sequelize) {
var agent_sessions = _agent_sessions(sequelize, DataTypes);
var connections = _connections(sequelize, DataTypes); var connections = _connections(sequelize, DataTypes);
var outbound_messages = _outbound_messages(sequelize, DataTypes); var outbound_messages = _outbound_messages(sequelize, DataTypes);
var request_logs = _request_logs(sequelize, DataTypes); var request_logs = _request_logs(sequelize, DataTypes);
var webhook_logs = _webhook_logs(sequelize, DataTypes); var webhook_logs = _webhook_logs(sequelize, DataTypes);
return { return {
agent_sessions,
connections, connections,
outbound_messages, outbound_messages,
request_logs, request_logs,

@ -0,0 +1,25 @@
'use strict';
const db = require('../config').database;
const { domain, name } = require('../config').server;
const { objectMapper, pick, isEmpty } = require('../utils/commons.util');
const initModels = require('../models/init-models');
const Sequelize = db.sequelize;
const models = initModels(Sequelize);
const AgentSessionsModelModel = models.agent_sessions;
const getAgentSession = async where => {
const r = await AgentSessionsModelModel.findOne({
where,
});
return r?.toJSON() || {};
};
const createAgentSession = async data => {
const r = await AgentSessionsModelModel.create(data);
return r;
};
module.exports = { getAgentSession, createAgentSession };
Loading…
Cancel
Save