AI 架构

智能应用架构完全指南:LLM 工作流、Agent 系统与多模态处理

深入学习如何构建复杂的 AI 应用架构,包括工作流编排、自主 Agent、多模态融合等技术

19 分钟阅读
#LLM 工作流 #Agent #多模态 #应用架构

📖 文章概述

从简单的聊天机器人到复杂的多 Agent 系统,本文讲解如何设计和实现企业级智能应用架构。


🎯 智能应用架构演进

架构层级

第 1 层: 单 Agent 应用
├─ 直接调用 LLM
└─ 简单 Prompt 工程

第 2 层: 工作流应用
├─ 多步骤 Prompt 链
├─ 条件分支
└─ 外部工具调用

第 3 层: Multi-Agent 系统
├─ 多个专业 Agent
├─ 任务分解和协调
├─ 动态流程编排
└─ 记忆和学习

第 4 层: 企业级系统
├─ 分布式 Agent 网络
├─ 知识库集成
├─ 安全和合规
└─ 监控和优化

应用架构对比

维度简单应用工作流系统Multi-Agent企业系统
复杂度极高
可维护性
灵活性
成本
实现时间很慢

🔗 LLM 工作流编排

1. 基础工作流

import { LLMChain } from 'langchain/chains'
import { OpenAI } from 'langchain/llms/openai'
import { PromptTemplate } from 'langchain/prompts'

// 单步工作流
async function singleStepWorkflow() {
  const model = new OpenAI({
    temperature: 0.7,
    openAIApiKey: process.env.OPENAI_API_KEY
  })
  
  const template = `你是一个创意文案撰写专家。
根据产品信息生成吸引人的广告文案。

产品: {product}
目标人群: {audience}

请生成一个 50 字以内的广告文案:`

  const prompt = new PromptTemplate({
    template: template,
    inputVariables: ['product', 'audience']
  })
  
  const chain = new LLMChain({ llm: model, prompt: prompt })
  
  const result = await chain.call({
    product: '智能健身手环',
    audience: '年轻上班族'
  })
  
  return result.text
}

// 多步工作流 - 分析 → 生成 → 审核
async function multiStepWorkflow() {
  const model = new OpenAI({ temperature: 0.7 })
  
  // 步骤 1: 分析用户需求
  const analyzePrompt = new PromptTemplate({
    template: `分析用户需求:{request}
    
提取关键信息:`,
    inputVariables: ['request']
  })
  
  const analyzeChain = new LLMChain({
    llm: model,
    prompt: analyzePrompt
  })
  
  const analysis = await analyzeChain.call({
    request: '我需要一个能追踪我健身进度的应用'
  })
  
  // 步骤 2: 基于分析生成方案
  const generatePrompt = new PromptTemplate({
    template: `基于以下分析:{analysis}
    
生成详细的技术方案:`,
    inputVariables: ['analysis']
  })
  
  const generateChain = new LLMChain({
    llm: model,
    prompt: generatePrompt
  })
  
  const solution = await generateChain.call({
    analysis: analysis.text
  })
  
  // 步骤 3: 审核方案
  const reviewPrompt = new PromptTemplate({
    template: `审核以下技术方案是否可行:{solution}
    
列出潜在问题和改进建议:`,
    inputVariables: ['solution']
  })
  
  const reviewChain = new LLMChain({
    llm: model,
    prompt: reviewPrompt
  })
  
  const review = await reviewChain.call({
    solution: solution.text
  })
  
  return {
    analysis: analysis.text,
    solution: solution.text,
    review: review.text
  }
}

2. 条件分支和路由

class WorkflowRouter {
  constructor(model) {
    this.model = model
  }
  
  // 路由示例:根据用户意图选择不同的处理流程
  async route(userInput) {
    // 第一步:识别意图
    const intentPrompt = new PromptTemplate({
      template: `分类用户请求的意图:
用户请求:{input}

意图分类(返回单一分类):
- question: 提问
- complaint: 投诉
- suggestion: 建议
- technical: 技术问题

意图:`,
      inputVariables: ['input']
    })
    
    const intentChain = new LLMChain({
      llm: this.model,
      prompt: intentPrompt
    })
    
    const intentResult = await intentChain.call({ input: userInput })
    const intent = intentResult.text.trim().split(':')[1].trim()
    
    // 第二步:根据意图路由
    switch (intent) {
      case 'question':
        return await this.handleQuestion(userInput)
      case 'complaint':
        return await this.handleComplaint(userInput)
      case 'suggestion':
        return await this.handleSuggestion(userInput)
      case 'technical':
        return await this.handleTechnical(userInput)
      default:
        return await this.handleDefault(userInput)
    }
  }
  
  async handleQuestion(input) {
    const prompt = new PromptTemplate({
      template: `回答用户的问题:{input}\n\n详细答案:`,
      inputVariables: ['input']
    })
    
    const chain = new LLMChain({ llm: this.model, prompt })
    return await chain.call({ input })
  }
  
  async handleComplaint(input) {
    const prompt = new PromptTemplate({
      template: `用户提出投诉:{input}\n\n请表示理解并提供解决方案:`,
      inputVariables: ['input']
    })
    
    const chain = new LLMChain({ llm: this.model, prompt })
    return await chain.call({ input })
  }
  
  async handleTechnical(input) {
    // 可能需要调用技术知识库
    const prompt = new PromptTemplate({
      template: `解决技术问题:{input}\n\n详细的技术解决方案:`,
      inputVariables: ['input']
    })
    
    const chain = new LLMChain({ llm: this.model, prompt })
    return await chain.call({ input })
  }
  
  async handleSuggestion(input) {
    const prompt = new PromptTemplate({
      template: `感谢用户的建议:{input}\n\n回复:`,
      inputVariables: ['input']
    })
    
    const chain = new LLMChain({ llm: this.model, prompt })
    return await chain.call({ input })
  }
  
  async handleDefault(input) {
    return '感谢你的输入,我正在处理你的请求...'
  }
}

🤖 自主 Agent 系统

3. 基础 Agent 实现

import { initializeAgentExecutorWithOptions } from 'langchain/agents'
import { Tool } from 'langchain/tools'
import { OpenAI } from 'langchain/llms/openai'

// 定义工具
class CalculatorTool extends Tool {
  name = 'calculator'
  description = '用于数学计算'
  
  async _call(input) {
    try {
      return eval(input).toString()
    } catch (e) {
      return `计算错误: ${e.message}`
    }
  }
}

class WebSearchTool extends Tool {
  name = 'web_search'
  description = '在网上搜索信息'
  
  async _call(query) {
    // 模拟搜索结果
    const results = {
      'AI': '人工智能是计算机科学的一个分支...',
      'Python': 'Python 是一种通用编程语言...',
      '深度学习': '深度学习是机器学习的子领域...'
    }
    
    return results[query] || `搜索不到关于 ${query} 的结果`
  }
}

class DatabaseQueryTool extends Tool {
  name = 'database_query'
  description = '查询数据库'
  
  async _call(query) {
    // 模拟数据库查询
    if (query.includes('用户')) {
      return '数据库中有 1000 个用户'
    }
    return '查询结果:空'
  }
}

// 创建 Agent
async function createAgent() {
  const model = new OpenAI({
    temperature: 0,
    openAIApiKey: process.env.OPENAI_API_KEY
  })
  
  const tools = [
    new CalculatorTool(),
    new WebSearchTool(),
    new DatabaseQueryTool()
  ]
  
  const executor = await initializeAgentExecutorWithOptions(tools, model, {
    agentType: 'zero-shot-react-description',
    verbose: true,
    maxIterations: 10,
    returnIntermediateSteps: true
  })
  
  return executor
}

// 使用 Agent
async function main() {
  const agent = await createAgent()
  
  const result = await agent.call({
    input: '今年 2023 年的平均气温比 2022 年高多少?'
  })
  
  console.log('Agent 思考过程:', result.intermediateSteps)
  console.log('最终答案:', result.output)
}

4. Multi-Agent 协作系统

class MultiAgentSystem {
  constructor() {
    this.agents = new Map()
    this.taskQueue = []
    this.memory = new Map()
  }
  
  // 注册 Agent
  registerAgent(agentId, agent, specialization) {
    this.agents.set(agentId, {
      agent: agent,
      specialization: specialization,
      status: 'idle',
      completedTasks: 0
    })
  }
  
  // 分解任务并分配给合适的 Agent
  async delegateTask(mainTask) {
    console.log(`主任务: ${mainTask}`)
    
    // 步骤 1: 分解任务
    const subtasks = await this.decomposeTasks(mainTask)
    console.log(`分解为 ${subtasks.length} 个子任务`)
    
    // 步骤 2: 分配给合适的 Agent
    const assignments = this.assignTasks(subtasks)
    
    // 步骤 3: 并行执行
    const results = await Promise.all(
      assignments.map(assignment => this.executeTask(assignment))
    )
    
    // 步骤 4: 合并结果
    return this.mergeResults(results)
  }
  
  // 任务分解
  async decomposeTasks(mainTask) {
    // 使用 LLM 分解任务
    const prompt = `将以下任务分解为 3-5 个具体的子任务:
任务:${mainTask}

子任务列表(每行一个):`
    
    // 模拟响应
    return [
      `收集关于 ${mainTask} 的背景信息`,
      `分析数据和趋势`,
      `生成建议方案`,
      `进行可行性评估`
    ]
  }
  
  // 任务分配 - 根据 Agent 的专长
  assignTasks(subtasks) {
    const specializations = {
      'data-analyst': ['分析数据', '收集信息', '统计'],
      'strategist': ['生成建议', '规划策略'],
      'evaluator': ['评估', '风险分析'],
      'writer': ['撰写', '报告生成']
    }
    
    return subtasks.map(task => {
      let assignedAgent = null
      
      for (const [agentId, keywords] of Object.entries(specializations)) {
        if (keywords.some(keyword => task.includes(keyword))) {
          assignedAgent = agentId
          break
        }
      }
      
      return {
        agentId: assignedAgent || 'default',
        task: task
      }
    })
  }
  
  // 执行任务
  async executeTask(assignment) {
    const { agentId, task } = assignment
    const agentInfo = this.agents.get(agentId)
    
    if (!agentInfo) {
      return { agentId, task, result: '错误:Agent 不存在' }
    }
    
    agentInfo.status = 'working'
    
    try {
      // 模拟 Agent 执行任务
      const result = await this.simulateAgentExecution(agentId, task)
      
      agentInfo.completedTasks++
      agentInfo.status = 'idle'
      
      // 存储结果到共享内存
      this.memory.set(`${agentId}:${task}`, result)
      
      return { agentId, task, result }
    } catch (error) {
      agentInfo.status = 'idle'
      return { agentId, task, result: `错误:${error.message}` }
    }
  }
  
  async simulateAgentExecution(agentId, task) {
    // 实际应用中这里会调用真实的 Agent
    return new Promise(resolve => {
      setTimeout(() => {
        resolve(`[${agentId}] 完成任务:${task}`)
      }, 1000)
    })
  }
  
  // 合并结果
  mergeResults(results) {
    const summary = {
      totalTasks: results.length,
      completedTasks: results.filter(r => !r.result.includes('错误')).length,
      details: results,
      timestamp: new Date().toISOString()
    }
    
    return summary
  }
  
  // 获取系统状态
  getSystemStatus() {
    const status = {
      totalAgents: this.agents.size,
      agents: []
    }
    
    for (const [agentId, agentInfo] of this.agents) {
      status.agents.push({
        id: agentId,
        specialization: agentInfo.specialization,
        status: agentInfo.status,
        completedTasks: agentInfo.completedTasks
      })
    }
    
    return status
  }
}

// 使用 Multi-Agent 系统
async function main() {
  const system = new MultiAgentSystem()
  
  // 注册 Agent
  system.registerAgent('agent1', {}, 'data-analyst')
  system.registerAgent('agent2', {}, 'strategist')
  system.registerAgent('agent3', {}, 'evaluator')
  
  // 执行复杂任务
  const result = await system.delegateTask(
    '为新产品制定市场进入战略'
  )
  
  console.log('最终结果:', result)
  console.log('系统状态:', system.getSystemStatus())
}

🧠 记忆和上下文管理

5. 对话记忆实现

class ConversationMemory {
  constructor(maxMessages = 10) {
    this.messages = []
    this.maxMessages = maxMessages
    this.summary = ''
  }
  
  // 添加消息
  addMessage(role, content, metadata = {}) {
    this.messages.push({
      role,
      content,
      timestamp: new Date(),
      metadata
    })
    
    // 维持最大消息数
    if (this.messages.length > this.maxMessages) {
      this.summarizeOldMessages()
    }
  }
  
  // 总结旧消息
  async summarizeOldMessages() {
    if (this.messages.length <= this.maxMessages) return
    
    const oldMessages = this.messages.slice(0, this.messages.length - this.maxMessages)
    
    // 使用 LLM 总结
    const summaryPrompt = `总结以下对话的要点:
${oldMessages.map(m => `${m.role}: ${m.content}`).join('\n')}

总结:`
    
    // 模拟 LLM 响应
    this.summary += `\n旧对话总结:${oldMessages.length} 条消息...`
    
    // 删除旧消息,保留最新的
    this.messages = this.messages.slice(-this.maxMessages)
  }
  
  // 获取上下文
  getContext() {
    let context = ''
    
    if (this.summary) {
      context += `之前对话总结:\n${this.summary}\n\n`
    }
    
    context += '最近对话:\n'
    for (const msg of this.messages) {
      context += `${msg.role}: ${msg.content}\n`
    }
    
    return context
  }
  
  // 清空记忆
  clear() {
    this.messages = []
    this.summary = ''
  }
}

// 使用
const memory = new ConversationMemory()

async function chat(userInput) {
  memory.addMessage('user', userInput)
  
  const context = memory.getContext()
  
  const response = await callLLM(`
${context}

用户: ${userInput}
助手:`)
  
  memory.addMessage('assistant', response)
  
  return response
}

🎨 多模态处理管道

6. 多模态融合

class MultimodalPipeline {
  constructor() {
    this.textModel = null
    this.imageModel = null
    this.audioModel = null
    this.fusionModel = null
  }
  
  // 处理文本
  async processText(text) {
    return {
      type: 'text',
      content: text,
      embedding: await this.getTextEmbedding(text),
      sentiment: await this.analyzeSentiment(text),
      keywords: await this.extractKeywords(text)
    }
  }
  
  // 处理图像
  async processImage(imageUrl) {
    return {
      type: 'image',
      url: imageUrl,
      description: await this.describeImage(imageUrl),
      objects: await this.detectObjects(imageUrl),
      embedding: await this.getImageEmbedding(imageUrl)
    }
  }
  
  // 处理音频
  async processAudio(audioUrl) {
    return {
      type: 'audio',
      url: audioUrl,
      transcript: await this.transcribeAudio(audioUrl),
      emotion: await this.analyzeEmotion(audioUrl),
      summary: await this.summarizeAudio(audioUrl)
    }
  }
  
  // 融合多模态信息
  async fuse(textData, imageData, audioData) {
    const fused = {
      timestamp: new Date(),
      modalities: {
        text: textData,
        image: imageData,
        audio: audioData
      },
      analysis: {}
    }
    
    // 交叉模态分析
    fused.analysis.consistency = await this.checkModalatyConsistency(fused)
    fused.analysis.sentiment = this.fuseSentiments(textData, audioData)
    fused.analysis.topics = this.fueseTopics(textData, imageData)
    
    // 生成统一的理解
    fused.understanding = await this.generateUnifiedUnderstanding(fused)
    
    return fused
  }
  
  async checkModalatyConsistency(fused) {
    // 检查各种模态的一致性
    return {
      textImageConsistency: 0.85,
      textAudioConsistency: 0.92,
      imageAudioConsistency: 0.78
    }
  }
  
  fuseSentiments(textData, audioData) {
    // 融合文本和音频的情感分析
    return (textData.sentiment + audioData.emotion) / 2
  }
  
  fueseTopics(textData, imageData) {
    // 融合文本关键词和图像对象
    return [...textData.keywords, ...imageData.objects]
  }
  
  async generateUnifiedUnderstanding(fused) {
    // 基于所有模态生成统一的理解
    const prompt = `基于以下多模态信息生成统一的理解:
文本: ${fused.modalities.text.content}
图像描述: ${fused.modalities.image.description}
音频转文本: ${fused.modalities.audio.transcript}

统一理解:`
    
    return await callLLM(prompt)
  }
  
  // 模拟各种处理方法
  async getTextEmbedding(text) {
    return new Array(1536).fill(0.1)
  }
  
  async analyzeSentiment(text) {
    return 0.7  // 0-1 之间
  }
  
  async extractKeywords(text) {
    return ['关键词1', '关键词2', '关键词3']
  }
  
  async describeImage(imageUrl) {
    return '这是一张展示...'
  }
  
  async detectObjects(imageUrl) {
    return ['对象1', '对象2', '对象3']
  }
  
  async getImageEmbedding(imageUrl) {
    return new Array(1536).fill(0.15)
  }
  
  async transcribeAudio(audioUrl) {
    return '音频转录文本...'
  }
  
  async analyzeEmotion(audioUrl) {
    return 0.8
  }
  
  async summarizeAudio(audioUrl) {
    return '音频摘要...'
  }
}

// 使用
async function processMultimodalContent() {
  const pipeline = new MultimodalPipeline()
  
  const textData = await pipeline.processText('这是一个很好的产品演示')
  const imageData = await pipeline.processImage('https://example.com/demo.jpg')
  const audioData = await pipeline.processAudio('https://example.com/demo.mp3')
  
  const fused = await pipeline.fuse(textData, imageData, audioData)
  
  console.log('融合结果:', fused)
}

🎓 生产级最佳实践

7. 错误恢复和监控

class RobustAgentSystem {
  constructor() {
    this.metrics = {
      totalRequests: 0,
      successfulRequests: 0,
      failedRequests: 0,
      averageLatency: 0
    }
    this.errorLog = []
  }
  
  // 执行任务(带重试和监控)
  async executeWithMonitoring(task, maxRetries = 3) {
    const startTime = Date.now()
    let lastError = null
    
    for (let attempt = 1; attempt <= maxRetries; attempt++) {
      try {
        this.metrics.totalRequests++
        
        const result = await task()
        
        this.metrics.successfulRequests++
        const latency = Date.now() - startTime
        this.updateAverageLatency(latency)
        
        console.log(`✅ 任务成功(第 ${attempt} 次尝试,${latency}ms)`)
        return result
        
      } catch (error) {
        lastError = error
        this.logError(error, task, attempt)
        
        if (attempt < maxRetries) {
          const delay = Math.pow(2, attempt) * 1000  // 指数退避
          console.log(`⚠️ 重试中... (${delay}ms 后)`)
          await new Promise(r => setTimeout(r, delay))
        }
      }
    }
    
    this.metrics.failedRequests++
    throw lastError
  }
  
  logError(error, task, attempt) {
    this.errorLog.push({
      timestamp: new Date(),
      error: error.message,
      attempt: attempt,
      taskName: task.name || 'unknown'
    })
  }
  
  updateAverageLatency(newLatency) {
    const n = this.metrics.successfulRequests
    this.metrics.averageLatency = 
      (this.metrics.averageLatency * (n - 1) + newLatency) / n
  }
  
  getMetrics() {
    return {
      ...this.metrics,
      successRate: (
        this.metrics.successfulRequests / this.metrics.totalRequests * 100
      ).toFixed(2) + '%',
      errorCount: this.errorLog.length
    }
  }
}

📚 总结

关键要点:

  • 工作流: 链式调用、条件分支、工具集成
  • Agent: 自主决策、工具使用、错误恢复
  • Multi-Agent: 任务分解、并行执行、结果合并
  • 多模态: 文本、图像、音频的融合理解
  • 生产级: 监控、错误处理、性能优化

构建智能应用的核心就是合理的架构设计!