📖 文章概述
从简单的聊天机器人到复杂的多 Agent 系统,本文讲解如何设计和实现企业级智能应用架构。
🎯 智能应用架构演进
架构层级
第 1 层: 单 Agent 应用
├─ 直接调用 LLM
└─ 简单 Prompt 工程
第 2 层: 工作流应用
├─ 多步骤 Prompt 链
├─ 条件分支
└─ 外部工具调用
第 3 层: Multi-Agent 系统
├─ 多个专业 Agent
├─ 任务分解和协调
├─ 动态流程编排
└─ 记忆和学习
第 4 层: 企业级系统
├─ 分布式 Agent 网络
├─ 知识库集成
├─ 安全和合规
└─ 监控和优化
应用架构对比
| 维度 | 简单应用 | 工作流系统 | Multi-Agent | 企业系统 |
|---|---|---|---|---|
| 复杂度 | 低 | 中 | 高 | 极高 |
| 可维护性 | 高 | 中 | 低 | 中 |
| 灵活性 | 低 | 中 | 高 | 高 |
| 成本 | 低 | 中 | 中 | 高 |
| 实现时间 | 快 | 中 | 慢 | 很慢 |
🔗 LLM 工作流编排
1. 基础工作流
import { LLMChain } from 'langchain/chains'
import { OpenAI } from 'langchain/llms/openai'
import { PromptTemplate } from 'langchain/prompts'
// 单步工作流
async function singleStepWorkflow() {
const model = new OpenAI({
temperature: 0.7,
openAIApiKey: process.env.OPENAI_API_KEY
})
const template = `你是一个创意文案撰写专家。
根据产品信息生成吸引人的广告文案。
产品: {product}
目标人群: {audience}
请生成一个 50 字以内的广告文案:`
const prompt = new PromptTemplate({
template: template,
inputVariables: ['product', 'audience']
})
const chain = new LLMChain({ llm: model, prompt: prompt })
const result = await chain.call({
product: '智能健身手环',
audience: '年轻上班族'
})
return result.text
}
// 多步工作流 - 分析 → 生成 → 审核
async function multiStepWorkflow() {
const model = new OpenAI({ temperature: 0.7 })
// 步骤 1: 分析用户需求
const analyzePrompt = new PromptTemplate({
template: `分析用户需求:{request}
提取关键信息:`,
inputVariables: ['request']
})
const analyzeChain = new LLMChain({
llm: model,
prompt: analyzePrompt
})
const analysis = await analyzeChain.call({
request: '我需要一个能追踪我健身进度的应用'
})
// 步骤 2: 基于分析生成方案
const generatePrompt = new PromptTemplate({
template: `基于以下分析:{analysis}
生成详细的技术方案:`,
inputVariables: ['analysis']
})
const generateChain = new LLMChain({
llm: model,
prompt: generatePrompt
})
const solution = await generateChain.call({
analysis: analysis.text
})
// 步骤 3: 审核方案
const reviewPrompt = new PromptTemplate({
template: `审核以下技术方案是否可行:{solution}
列出潜在问题和改进建议:`,
inputVariables: ['solution']
})
const reviewChain = new LLMChain({
llm: model,
prompt: reviewPrompt
})
const review = await reviewChain.call({
solution: solution.text
})
return {
analysis: analysis.text,
solution: solution.text,
review: review.text
}
}
2. 条件分支和路由
class WorkflowRouter {
constructor(model) {
this.model = model
}
// 路由示例:根据用户意图选择不同的处理流程
async route(userInput) {
// 第一步:识别意图
const intentPrompt = new PromptTemplate({
template: `分类用户请求的意图:
用户请求:{input}
意图分类(返回单一分类):
- question: 提问
- complaint: 投诉
- suggestion: 建议
- technical: 技术问题
意图:`,
inputVariables: ['input']
})
const intentChain = new LLMChain({
llm: this.model,
prompt: intentPrompt
})
const intentResult = await intentChain.call({ input: userInput })
const intent = intentResult.text.trim().split(':')[1].trim()
// 第二步:根据意图路由
switch (intent) {
case 'question':
return await this.handleQuestion(userInput)
case 'complaint':
return await this.handleComplaint(userInput)
case 'suggestion':
return await this.handleSuggestion(userInput)
case 'technical':
return await this.handleTechnical(userInput)
default:
return await this.handleDefault(userInput)
}
}
async handleQuestion(input) {
const prompt = new PromptTemplate({
template: `回答用户的问题:{input}\n\n详细答案:`,
inputVariables: ['input']
})
const chain = new LLMChain({ llm: this.model, prompt })
return await chain.call({ input })
}
async handleComplaint(input) {
const prompt = new PromptTemplate({
template: `用户提出投诉:{input}\n\n请表示理解并提供解决方案:`,
inputVariables: ['input']
})
const chain = new LLMChain({ llm: this.model, prompt })
return await chain.call({ input })
}
async handleTechnical(input) {
// 可能需要调用技术知识库
const prompt = new PromptTemplate({
template: `解决技术问题:{input}\n\n详细的技术解决方案:`,
inputVariables: ['input']
})
const chain = new LLMChain({ llm: this.model, prompt })
return await chain.call({ input })
}
async handleSuggestion(input) {
const prompt = new PromptTemplate({
template: `感谢用户的建议:{input}\n\n回复:`,
inputVariables: ['input']
})
const chain = new LLMChain({ llm: this.model, prompt })
return await chain.call({ input })
}
async handleDefault(input) {
return '感谢你的输入,我正在处理你的请求...'
}
}
🤖 自主 Agent 系统
3. 基础 Agent 实现
import { initializeAgentExecutorWithOptions } from 'langchain/agents'
import { Tool } from 'langchain/tools'
import { OpenAI } from 'langchain/llms/openai'
// 定义工具
class CalculatorTool extends Tool {
name = 'calculator'
description = '用于数学计算'
async _call(input) {
try {
return eval(input).toString()
} catch (e) {
return `计算错误: ${e.message}`
}
}
}
class WebSearchTool extends Tool {
name = 'web_search'
description = '在网上搜索信息'
async _call(query) {
// 模拟搜索结果
const results = {
'AI': '人工智能是计算机科学的一个分支...',
'Python': 'Python 是一种通用编程语言...',
'深度学习': '深度学习是机器学习的子领域...'
}
return results[query] || `搜索不到关于 ${query} 的结果`
}
}
class DatabaseQueryTool extends Tool {
name = 'database_query'
description = '查询数据库'
async _call(query) {
// 模拟数据库查询
if (query.includes('用户')) {
return '数据库中有 1000 个用户'
}
return '查询结果:空'
}
}
// 创建 Agent
async function createAgent() {
const model = new OpenAI({
temperature: 0,
openAIApiKey: process.env.OPENAI_API_KEY
})
const tools = [
new CalculatorTool(),
new WebSearchTool(),
new DatabaseQueryTool()
]
const executor = await initializeAgentExecutorWithOptions(tools, model, {
agentType: 'zero-shot-react-description',
verbose: true,
maxIterations: 10,
returnIntermediateSteps: true
})
return executor
}
// 使用 Agent
async function main() {
const agent = await createAgent()
const result = await agent.call({
input: '今年 2023 年的平均气温比 2022 年高多少?'
})
console.log('Agent 思考过程:', result.intermediateSteps)
console.log('最终答案:', result.output)
}
4. Multi-Agent 协作系统
class MultiAgentSystem {
constructor() {
this.agents = new Map()
this.taskQueue = []
this.memory = new Map()
}
// 注册 Agent
registerAgent(agentId, agent, specialization) {
this.agents.set(agentId, {
agent: agent,
specialization: specialization,
status: 'idle',
completedTasks: 0
})
}
// 分解任务并分配给合适的 Agent
async delegateTask(mainTask) {
console.log(`主任务: ${mainTask}`)
// 步骤 1: 分解任务
const subtasks = await this.decomposeTasks(mainTask)
console.log(`分解为 ${subtasks.length} 个子任务`)
// 步骤 2: 分配给合适的 Agent
const assignments = this.assignTasks(subtasks)
// 步骤 3: 并行执行
const results = await Promise.all(
assignments.map(assignment => this.executeTask(assignment))
)
// 步骤 4: 合并结果
return this.mergeResults(results)
}
// 任务分解
async decomposeTasks(mainTask) {
// 使用 LLM 分解任务
const prompt = `将以下任务分解为 3-5 个具体的子任务:
任务:${mainTask}
子任务列表(每行一个):`
// 模拟响应
return [
`收集关于 ${mainTask} 的背景信息`,
`分析数据和趋势`,
`生成建议方案`,
`进行可行性评估`
]
}
// 任务分配 - 根据 Agent 的专长
assignTasks(subtasks) {
const specializations = {
'data-analyst': ['分析数据', '收集信息', '统计'],
'strategist': ['生成建议', '规划策略'],
'evaluator': ['评估', '风险分析'],
'writer': ['撰写', '报告生成']
}
return subtasks.map(task => {
let assignedAgent = null
for (const [agentId, keywords] of Object.entries(specializations)) {
if (keywords.some(keyword => task.includes(keyword))) {
assignedAgent = agentId
break
}
}
return {
agentId: assignedAgent || 'default',
task: task
}
})
}
// 执行任务
async executeTask(assignment) {
const { agentId, task } = assignment
const agentInfo = this.agents.get(agentId)
if (!agentInfo) {
return { agentId, task, result: '错误:Agent 不存在' }
}
agentInfo.status = 'working'
try {
// 模拟 Agent 执行任务
const result = await this.simulateAgentExecution(agentId, task)
agentInfo.completedTasks++
agentInfo.status = 'idle'
// 存储结果到共享内存
this.memory.set(`${agentId}:${task}`, result)
return { agentId, task, result }
} catch (error) {
agentInfo.status = 'idle'
return { agentId, task, result: `错误:${error.message}` }
}
}
async simulateAgentExecution(agentId, task) {
// 实际应用中这里会调用真实的 Agent
return new Promise(resolve => {
setTimeout(() => {
resolve(`[${agentId}] 完成任务:${task}`)
}, 1000)
})
}
// 合并结果
mergeResults(results) {
const summary = {
totalTasks: results.length,
completedTasks: results.filter(r => !r.result.includes('错误')).length,
details: results,
timestamp: new Date().toISOString()
}
return summary
}
// 获取系统状态
getSystemStatus() {
const status = {
totalAgents: this.agents.size,
agents: []
}
for (const [agentId, agentInfo] of this.agents) {
status.agents.push({
id: agentId,
specialization: agentInfo.specialization,
status: agentInfo.status,
completedTasks: agentInfo.completedTasks
})
}
return status
}
}
// 使用 Multi-Agent 系统
async function main() {
const system = new MultiAgentSystem()
// 注册 Agent
system.registerAgent('agent1', {}, 'data-analyst')
system.registerAgent('agent2', {}, 'strategist')
system.registerAgent('agent3', {}, 'evaluator')
// 执行复杂任务
const result = await system.delegateTask(
'为新产品制定市场进入战略'
)
console.log('最终结果:', result)
console.log('系统状态:', system.getSystemStatus())
}
🧠 记忆和上下文管理
5. 对话记忆实现
class ConversationMemory {
constructor(maxMessages = 10) {
this.messages = []
this.maxMessages = maxMessages
this.summary = ''
}
// 添加消息
addMessage(role, content, metadata = {}) {
this.messages.push({
role,
content,
timestamp: new Date(),
metadata
})
// 维持最大消息数
if (this.messages.length > this.maxMessages) {
this.summarizeOldMessages()
}
}
// 总结旧消息
async summarizeOldMessages() {
if (this.messages.length <= this.maxMessages) return
const oldMessages = this.messages.slice(0, this.messages.length - this.maxMessages)
// 使用 LLM 总结
const summaryPrompt = `总结以下对话的要点:
${oldMessages.map(m => `${m.role}: ${m.content}`).join('\n')}
总结:`
// 模拟 LLM 响应
this.summary += `\n旧对话总结:${oldMessages.length} 条消息...`
// 删除旧消息,保留最新的
this.messages = this.messages.slice(-this.maxMessages)
}
// 获取上下文
getContext() {
let context = ''
if (this.summary) {
context += `之前对话总结:\n${this.summary}\n\n`
}
context += '最近对话:\n'
for (const msg of this.messages) {
context += `${msg.role}: ${msg.content}\n`
}
return context
}
// 清空记忆
clear() {
this.messages = []
this.summary = ''
}
}
// 使用
const memory = new ConversationMemory()
async function chat(userInput) {
memory.addMessage('user', userInput)
const context = memory.getContext()
const response = await callLLM(`
${context}
用户: ${userInput}
助手:`)
memory.addMessage('assistant', response)
return response
}
🎨 多模态处理管道
6. 多模态融合
class MultimodalPipeline {
constructor() {
this.textModel = null
this.imageModel = null
this.audioModel = null
this.fusionModel = null
}
// 处理文本
async processText(text) {
return {
type: 'text',
content: text,
embedding: await this.getTextEmbedding(text),
sentiment: await this.analyzeSentiment(text),
keywords: await this.extractKeywords(text)
}
}
// 处理图像
async processImage(imageUrl) {
return {
type: 'image',
url: imageUrl,
description: await this.describeImage(imageUrl),
objects: await this.detectObjects(imageUrl),
embedding: await this.getImageEmbedding(imageUrl)
}
}
// 处理音频
async processAudio(audioUrl) {
return {
type: 'audio',
url: audioUrl,
transcript: await this.transcribeAudio(audioUrl),
emotion: await this.analyzeEmotion(audioUrl),
summary: await this.summarizeAudio(audioUrl)
}
}
// 融合多模态信息
async fuse(textData, imageData, audioData) {
const fused = {
timestamp: new Date(),
modalities: {
text: textData,
image: imageData,
audio: audioData
},
analysis: {}
}
// 交叉模态分析
fused.analysis.consistency = await this.checkModalatyConsistency(fused)
fused.analysis.sentiment = this.fuseSentiments(textData, audioData)
fused.analysis.topics = this.fueseTopics(textData, imageData)
// 生成统一的理解
fused.understanding = await this.generateUnifiedUnderstanding(fused)
return fused
}
async checkModalatyConsistency(fused) {
// 检查各种模态的一致性
return {
textImageConsistency: 0.85,
textAudioConsistency: 0.92,
imageAudioConsistency: 0.78
}
}
fuseSentiments(textData, audioData) {
// 融合文本和音频的情感分析
return (textData.sentiment + audioData.emotion) / 2
}
fueseTopics(textData, imageData) {
// 融合文本关键词和图像对象
return [...textData.keywords, ...imageData.objects]
}
async generateUnifiedUnderstanding(fused) {
// 基于所有模态生成统一的理解
const prompt = `基于以下多模态信息生成统一的理解:
文本: ${fused.modalities.text.content}
图像描述: ${fused.modalities.image.description}
音频转文本: ${fused.modalities.audio.transcript}
统一理解:`
return await callLLM(prompt)
}
// 模拟各种处理方法
async getTextEmbedding(text) {
return new Array(1536).fill(0.1)
}
async analyzeSentiment(text) {
return 0.7 // 0-1 之间
}
async extractKeywords(text) {
return ['关键词1', '关键词2', '关键词3']
}
async describeImage(imageUrl) {
return '这是一张展示...'
}
async detectObjects(imageUrl) {
return ['对象1', '对象2', '对象3']
}
async getImageEmbedding(imageUrl) {
return new Array(1536).fill(0.15)
}
async transcribeAudio(audioUrl) {
return '音频转录文本...'
}
async analyzeEmotion(audioUrl) {
return 0.8
}
async summarizeAudio(audioUrl) {
return '音频摘要...'
}
}
// 使用
async function processMultimodalContent() {
const pipeline = new MultimodalPipeline()
const textData = await pipeline.processText('这是一个很好的产品演示')
const imageData = await pipeline.processImage('https://example.com/demo.jpg')
const audioData = await pipeline.processAudio('https://example.com/demo.mp3')
const fused = await pipeline.fuse(textData, imageData, audioData)
console.log('融合结果:', fused)
}
🎓 生产级最佳实践
7. 错误恢复和监控
class RobustAgentSystem {
constructor() {
this.metrics = {
totalRequests: 0,
successfulRequests: 0,
failedRequests: 0,
averageLatency: 0
}
this.errorLog = []
}
// 执行任务(带重试和监控)
async executeWithMonitoring(task, maxRetries = 3) {
const startTime = Date.now()
let lastError = null
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
this.metrics.totalRequests++
const result = await task()
this.metrics.successfulRequests++
const latency = Date.now() - startTime
this.updateAverageLatency(latency)
console.log(`✅ 任务成功(第 ${attempt} 次尝试,${latency}ms)`)
return result
} catch (error) {
lastError = error
this.logError(error, task, attempt)
if (attempt < maxRetries) {
const delay = Math.pow(2, attempt) * 1000 // 指数退避
console.log(`⚠️ 重试中... (${delay}ms 后)`)
await new Promise(r => setTimeout(r, delay))
}
}
}
this.metrics.failedRequests++
throw lastError
}
logError(error, task, attempt) {
this.errorLog.push({
timestamp: new Date(),
error: error.message,
attempt: attempt,
taskName: task.name || 'unknown'
})
}
updateAverageLatency(newLatency) {
const n = this.metrics.successfulRequests
this.metrics.averageLatency =
(this.metrics.averageLatency * (n - 1) + newLatency) / n
}
getMetrics() {
return {
...this.metrics,
successRate: (
this.metrics.successfulRequests / this.metrics.totalRequests * 100
).toFixed(2) + '%',
errorCount: this.errorLog.length
}
}
}
📚 总结
关键要点:
- 工作流: 链式调用、条件分支、工具集成
- Agent: 自主决策、工具使用、错误恢复
- Multi-Agent: 任务分解、并行执行、结果合并
- 多模态: 文本、图像、音频的融合理解
- 生产级: 监控、错误处理、性能优化
构建智能应用的核心就是合理的架构设计!