AI 应用

AI/大模型集成完全指南:OpenAI、DeepSeek 与智能应用开发

深入学习如何集成大语言模型,实现 Prompt 工程、RAG 系统、流式响应与多模态处理

18 分钟阅读
#大模型 #OpenAI #DeepSeek #Prompt Engineering #RAG

📖 文章概述

大语言模型已成为应用开发的核心组件。本文讲解如何集成 OpenAI/DeepSeek API、实现 Prompt 工程、构建 RAG 系统,以及处理流式响应。


🎯 大模型基础架构

大模型与应用的集成流程

用户输入
   ↓
[应用层] → 预处理、验证、历史管理
   ↓
[Prompt 工程] → 上下文注入、示例、角色设定
   ↓
[API 调用层] → OpenAI/DeepSeek/本地模型
   ↓
[大模型处理] → Token 生成、采样、解码
   ↓
[流式返回] → 增量更新、实时展示
   ↓
[应用处理] → 结构化提取、存储、反馈
   ↓
用户界面

常见集成架构对比

架构延迟成本定制度使用场景
API 调用中等快速原型、简单应用
本地部署私密数据、离线场景
混合模式关键+非关键任务分离
多模型中等任务自适应选择

🔌 OpenAI API 集成

1. 基础对话实现

// 使用 OpenAI SDK
import OpenAI from 'openai'

const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY
})

async function simpleChat(messages) {
  const completion = await openai.chat.completions.create({
    model: 'gpt-4-turbo',
    messages: messages,
    temperature: 0.7,  // 创意程度 0-2
    max_tokens: 2000,  // 最大长度
    top_p: 0.95        // 核采样
  })
  
  return completion.choices[0].message.content
}

// 使用
const messages = [
  { role: 'system', content: 'You are a helpful assistant.' },
  { role: 'user', content: 'Explain quantum computing in simple terms.' }
]

const response = await simpleChat(messages)
console.log(response)

2. 流式响应处理

async function streamChat(messages) {
  const stream = await openai.chat.completions.create({
    model: 'gpt-4-turbo',
    messages: messages,
    stream: true,  // 启用流式传输
    temperature: 0.7
  })
  
  // 处理流式数据
  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content || ''
    process.stdout.write(content)
  }
}

// 在 Express 中使用 SSE
app.get('/api/chat-stream', async (req, res) => {
  const messages = [
    { role: 'user', content: req.query.message }
  ]
  
  // 设置 SSE 响应头
  res.setHeader('Content-Type', 'text/event-stream')
  res.setHeader('Cache-Control', 'no-cache')
  res.setHeader('Connection', 'keep-alive')
  
  try {
    const stream = await openai.chat.completions.create({
      model: 'gpt-4-turbo',
      messages: messages,
      stream: true
    })
    
    for await (const chunk of stream) {
      const content = chunk.choices[0]?.delta?.content || ''
      if (content) {
        res.write(`data: ${JSON.stringify({ content })}\n\n`)
      }
    }
    
    res.write('data: [DONE]\n\n')
    res.end()
  } catch (error) {
    res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`)
    res.end()
  }
})

// 前端接收
const eventSource = new EventSource('/api/chat-stream?message=Hello')
let fullResponse = ''

eventSource.addEventListener('message', (event) => {
  if (event.data === '[DONE]') {
    eventSource.close()
    console.log('完整响应:', fullResponse)
    return
  }
  
  const data = JSON.parse(event.data)
  fullResponse += data.content
  updateUI(data.content)  // 实时更新UI
})

3. Function Calling(工具调用)

// 定义可用的工具/函数
const tools = [
  {
    type: 'function',
    function: {
      name: 'get_weather',
      description: '获取指定城市的天气',
      parameters: {
        type: 'object',
        properties: {
          city: {
            type: 'string',
            description: '城市名称'
          }
        },
        required: ['city']
      }
    }
  },
  {
    type: 'function',
    function: {
      name: 'search_web',
      description: '在网上搜索信息',
      parameters: {
        type: 'object',
        properties: {
          query: {
            type: 'string',
            description: '搜索查询'
          }
        },
        required: ['query']
      }
    }
  }
]

async function chatWithFunctions(messages) {
  const response = await openai.chat.completions.create({
    model: 'gpt-4-turbo',
    messages: messages,
    tools: tools
  })
  
  const assistantMessage = response.choices[0].message
  
  // 如果模型选择调用函数
  if (assistantMessage.tool_calls) {
    for (const toolCall of assistantMessage.tool_calls) {
      console.log(`调用函数: ${toolCall.function.name}`)
      console.log(`参数: ${toolCall.function.arguments}`)
      
      // 这里执行实际的函数
      const result = await executeTool(
        toolCall.function.name,
        JSON.parse(toolCall.function.arguments)
      )
      
      // 将结果返回给模型
      messages.push(assistantMessage)
      messages.push({
        role: 'tool',
        tool_call_id: toolCall.id,
        content: JSON.stringify(result)
      })
      
      // 再次调用模型以获得最终响应
      const finalResponse = await openai.chat.completions.create({
        model: 'gpt-4-turbo',
        messages: messages
      })
      
      return finalResponse.choices[0].message.content
    }
  }
  
  return assistantMessage.content
}

// 实际工具实现
async function executeTool(toolName, params) {
  if (toolName === 'get_weather') {
    return { city: params.city, temp: 25, condition: '晴' }
  } else if (toolName === 'search_web') {
    return { results: ['结果1', '结果2', '结果3'] }
  }
}

// 使用
const messages = [
  { 
    role: 'user', 
    content: '北京天气怎样?然后帮我搜索相关天气预报' 
  }
]

const result = await chatWithFunctions(messages)
console.log(result)

🧠 Prompt 工程最佳实践

4. Prompt 设计模式

// 模式 1: 角色 + 任务 + 约束
function createSystemPrompt(role, task, constraints) {
  return `你是一个${role}。
你的任务是${task}。
约束条件:
${constraints.map(c => `- ${c}`).join('\n')}`
}

const systemPrompt = createSystemPrompt(
  '技术文档编写专家',
  '为开发者编写清晰准确的技术文档',
  [
    '使用中文',
    '示例代码必须是可运行的',
    '避免模糊的表述',
    '包含最佳实践'
  ]
)

// 模式 2: Few-shot 学习(提供示例)
const fewShotMessages = [
  {
    role: 'user',
    content: '将这个任务分解为步骤:构建一个 Web 应用'
  },
  {
    role: 'assistant',
    content: `1. 需求分析
2. 技术选型
3. 架构设计
4. 开发实现
5. 测试验证
6. 部署上线`
  },
  {
    role: 'user',
    content: '将这个任务分解为步骤:实现一个支付系统'
  }
  // 模型会遵循相同的结构回答
]

// 模式 3: Chain of Thought(思维链)
async function chainOfThoughtReasoning(problem) {
  const messages = [
    {
      role: 'user',
      content: `请一步步思考这个问题:${problem}
      
请先分析问题,然后展示你的推理过程,最后得出结论。`
    }
  ]
  
  const response = await openai.chat.completions.create({
    model: 'gpt-4-turbo',
    messages: messages,
    temperature: 1.0  // CoT 时用更高温度获得多样性
  })
  
  return response.choices[0].message.content
}

// 模式 4: 结构化输出
async function structuredOutput(query) {
  const messages = [
    {
      role: 'user',
      content: `${query}
      
请以 JSON 格式返回结果,包含以下字段:
- title: 标题
- summary: 摘要
- steps: 步骤数组
- tips: 提示数组`
    }
  ]
  
  const response = await openai.chat.completions.create({
    model: 'gpt-4-turbo',
    messages: messages
  })
  
  try {
    return JSON.parse(response.choices[0].message.content)
  } catch {
    console.error('JSON 解析失败')
    return null
  }
}

5. Prompt 优化技巧

// ❌ 不好的 Prompt
'帮我写代码'

// ✅ 好的 Prompt
`你是一个经验丰富的 Node.js 开发者。
请编写一个 Express 中间件来验证 JWT token。
要求:
- 从 Authorization header 提取 token
- 验证签名和过期时间
- 返回用户信息或错误
- 包含完整的错误处理
- 代码应该是生产级别的`

// 优化 Prompt 的原则
const promptOptimizationPrinciples = {
  // 1. 明确角色
  role: 'You are a TypeScript expert',
  
  // 2. 清晰的任务
  task: 'Write a generic function that...',
  
  // 3. 指定格式
  format: 'Return the result as JSON',
  
  // 4. 设定约束
  constraints: [
    'Use async/await',
    'Include error handling',
    'Add JSDoc comments'
  ],
  
  // 5. 提供示例
  examples: 'Here is a similar example...',
  
  // 6. 明确评价标准
  criteria: 'The solution should be...'
}

// 完整优化示例
async function optimizedPrompt(userRequest) {
  const systemPrompt = `你是一个高级 Node.js 架构师。
用户会请求你设计系统架构。
你需要:
1. 分析需求
2. 提出多个方案
3. 对比优缺点
4. 推荐最优方案
5. 提供实现建议

回答格式:使用 Markdown,包含架构图(ASCII)`

  const userMessage = `${userRequest}

请考虑以下因素:
- 可扩展性
- 性能
- 维护成本
- 学习曲线`

  const messages = [
    { role: 'system', content: systemPrompt },
    { role: 'user', content: userMessage }
  ]
  
  const response = await openai.chat.completions.create({
    model: 'gpt-4-turbo',
    messages: messages,
    temperature: 0.7,
    max_tokens: 3000
  })
  
  return response.choices[0].message.content
}

🔍 RAG 系统(检索增强生成)

6. RAG 实现流程

import { RecursiveCharacterTextSplitter } from 'langchain/text_splitters'
import { OpenAIEmbeddings } from 'langchain/embeddings/openai'
import { PineconeStore } from 'langchain/vectorstores/pinecone'
import { PineconeClient } from '@pinecone-database/pinecone'

// 初始化
const pinecone = new PineconeClient()
await pinecone.init({
  apiKey: process.env.PINECONE_API_KEY,
  environment: process.env.PINECONE_ENVIRONMENT
})

const embeddings = new OpenAIEmbeddings({
  openAIApiKey: process.env.OPENAI_API_KEY
})

// 1. 文档分割
async function prepareDocuments(documents) {
  const splitter = new RecursiveCharacterTextSplitter({
    chunkSize: 1000,
    chunkOverlap: 200
  })
  
  const docs = []
  for (const doc of documents) {
    const splits = await splitter.splitText(doc.content)
    docs.push(...splits.map(text => ({
      content: text,
      source: doc.source,
      metadata: doc.metadata
    })))
  }
  
  return docs
}

// 2. 生成 Embeddings 并存储
async function indexDocuments(documents) {
  // 批量生成 embeddings
  const embeddings = []
  const batchSize = 100
  
  for (let i = 0; i < documents.length; i += batchSize) {
    const batch = documents.slice(i, i + batchSize)
    const batchEmbeddings = await generateBatchEmbeddings(batch)
    embeddings.push(...batchEmbeddings)
  }
  
  // 上传到 Pinecone
  const vectorStore = await PineconeStore.fromDocuments(
    documents,
    embeddings,
    {
      pineconeIndex: pinecone.Index('documents'),
      namespace: 'production'
    }
  )
  
  return vectorStore
}

// 3. 相似度搜索
async function searchDocuments(query, k = 3) {
  const vectorStore = await PineconeStore.fromExistingIndex(
    new OpenAIEmbeddings(),
    { pineconeIndex: pinecone.Index('documents') }
  )
  
  const results = await vectorStore.similaritySearch(query, k)
  
  return results.map(result => ({
    content: result.pageContent,
    score: result.metadata.score,
    source: result.metadata.source
  }))
}

// 4. RAG 生成
async function ragGenerate(userQuery) {
  // 获取相关文档
  const relevantDocs = await searchDocuments(userQuery, 5)
  
  // 构建上下文
  const context = relevantDocs
    .map((doc, i) => `[文档 ${i+1}]\n${doc.content}`)
    .join('\n\n')
  
  // 生成提示
  const systemPrompt = `你是一个有帮助的助手。
根据提供的文档回答问题。
如果文档中没有相关信息,明确说明。
始终引用你使用的文档。`

  const userPrompt = `根据以下文档回答问题:

${context}

问题:${userQuery}`

  // 调用模型
  const response = await openai.chat.completions.create({
    model: 'gpt-4-turbo',
    messages: [
      { role: 'system', content: systemPrompt },
      { role: 'user', content: userPrompt }
    ],
    temperature: 0.7
  })
  
  return {
    answer: response.choices[0].message.content,
    sources: relevantDocs.map(d => d.source)
  }
}

// 使用 RAG
async function main() {
  // 准备文档
  const documents = [
    {
      content: '技术文档内容...',
      source: 'tech-docs.md',
      metadata: { type: 'documentation' }
    },
    // 更多文档
  ]
  
  // 索引
  await indexDocuments(await prepareDocuments(documents))
  
  // 查询
  const result = await ragGenerate('如何实现 API 认证?')
  console.log('答案:', result.answer)
  console.log('来源:', result.sources)
}

🚀 DeepSeek API 集成

7. DeepSeek 实现

import axios from 'axios'

class DeepSeekClient {
  constructor(apiKey) {
    this.apiKey = apiKey
    this.baseURL = 'https://api.deepseek.com/v1'
  }
  
  async chat(messages, options = {}) {
    try {
      const response = await axios.post(
        `${this.baseURL}/chat/completions`,
        {
          model: options.model || 'deepseek-chat',
          messages: messages,
          temperature: options.temperature || 0.7,
          max_tokens: options.maxTokens || 2000,
          stream: options.stream || false
        },
        {
          headers: {
            'Authorization': `Bearer ${this.apiKey}`,
            'Content-Type': 'application/json'
          }
        }
      )
      
      return response.data.choices[0].message.content
    } catch (error) {
      console.error('DeepSeek API 错误:', error.message)
      throw error
    }
  }
  
  async *streamChat(messages, options = {}) {
    try {
      const response = await axios.post(
        `${this.baseURL}/chat/completions`,
        {
          model: options.model || 'deepseek-chat',
          messages: messages,
          temperature: options.temperature || 0.7,
          stream: true
        },
        {
          headers: {
            'Authorization': `Bearer ${this.apiKey}`,
            'Content-Type': 'application/json'
          },
          responseType: 'stream'
        }
      )
      
      for await (const chunk of response.data) {
        const lines = chunk.toString().split('\n')
        
        for (const line of lines) {
          if (line.startsWith('data: ')) {
            try {
              const data = JSON.parse(line.slice(6))
              if (data.choices[0]?.delta?.content) {
                yield data.choices[0].delta.content
              }
            } catch (e) {
              // 忽略解析错误
            }
          }
        }
      }
    } catch (error) {
      console.error('DeepSeek 流式 API 错误:', error.message)
      throw error
    }
  }
}

// 使用
const deepseek = new DeepSeekClient(process.env.DEEPSEEK_API_KEY)

// 普通调用
const response = await deepseek.chat([
  { role: 'user', content: '用中文解释量子计算' }
])
console.log(response)

// 流式调用
let fullResponse = ''
for await (const chunk of deepseek.streamChat([
  { role: 'user', content: '编写一个快速排序算法' }
])) {
  process.stdout.write(chunk)
  fullResponse += chunk
}

🎨 多模态处理

8. 图像分析

import fs from 'fs'
import path from 'path'

// 使用 URL 图片
async function analyzeImageFromUrl(imageUrl) {
  const response = await openai.chat.completions.create({
    model: 'gpt-4-vision',
    messages: [
      {
        role: 'user',
        content: [
          { type: 'text', text: '请详细描述这张图片的内容' },
          {
            type: 'image_url',
            image_url: {
              url: imageUrl,
              detail: 'high'  // high, low, auto
            }
          }
        ]
      }
    ]
  })
  
  return response.choices[0].message.content
}

// 使用本地图片
async function analyzeImageFromFile(imagePath) {
  const imageBuffer = fs.readFileSync(imagePath)
  const base64Image = imageBuffer.toString('base64')
  const mimeType = getMimeType(imagePath)
  
  const response = await openai.chat.completions.create({
    model: 'gpt-4-vision',
    messages: [
      {
        role: 'user',
        content: [
          { type: 'text', text: '这个代码的问题是什么?' },
          {
            type: 'image_url',
            image_url: {
              url: `data:${mimeType};base64,${base64Image}`
            }
          }
        ]
      }
    ]
  })
  
  return response.choices[0].message.content
}

// 获取 MIME 类型
function getMimeType(filePath) {
  const ext = path.extname(filePath).toLowerCase()
  const mimeTypes = {
    '.jpg': 'image/jpeg',
    '.jpeg': 'image/jpeg',
    '.png': 'image/png',
    '.gif': 'image/gif',
    '.webp': 'image/webp'
  }
  return mimeTypes[ext] || 'image/jpeg'
}

// 使用
const description = await analyzeImageFromUrl(
  'https://example.com/screenshot.png'
)
console.log(description)

const codeAnalysis = await analyzeImageFromFile(
  './screenshots/error.png'
)
console.log(codeAnalysis)

⚙️ 生产级最佳实践

9. 错误处理和重试

class RobustOpenAIClient {
  constructor(apiKey, options = {}) {
    this.apiKey = apiKey
    this.maxRetries = options.maxRetries || 3
    this.retryDelay = options.retryDelay || 1000
  }
  
  async callWithRetry(fn, context = '') {
    for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
      try {
        return await fn()
      } catch (error) {
        // 可重试的错误
        if (this.isRetryableError(error)) {
          if (attempt < this.maxRetries) {
            const delay = this.retryDelay * Math.pow(2, attempt - 1)
            console.log(
              `${context} 重试 ${attempt}/${this.maxRetries},` +
              `等待 ${delay}ms...`
            )
            await new Promise(r => setTimeout(r, delay))
            continue
          }
        }
        
        throw error
      }
    }
  }
  
  isRetryableError(error) {
    // 速率限制
    if (error.status === 429) return true
    
    // 服务器错误
    if (error.status >= 500) return true
    
    // 网络错误
    if (error.code === 'ECONNRESET') return true
    if (error.code === 'ETIMEDOUT') return true
    
    return false
  }
  
  async chat(messages, options = {}) {
    return this.callWithRetry(async () => {
      return await openai.chat.completions.create({
        model: options.model || 'gpt-4-turbo',
        messages: messages,
        timeout: 30000,  // 30 秒超时
        ...options
      })
    }, '聊天请求')
  }
}

// 使用
const client = new RobustOpenAIClient(
  process.env.OPENAI_API_KEY,
  { maxRetries: 5 }
)

const response = await client.chat([
  { role: 'user', content: 'Hello' }
])

10. Token 计数和成本管理

import { encoding_for_model } from 'js-tiktoken'

const enc = encoding_for_model('gpt-4-turbo')

function countTokens(text) {
  return enc.encode(text).length
}

function estimateCost(inputTokens, outputTokens) {
  const inputCost = inputTokens * 0.00003   // $0.03 per 1K tokens
  const outputCost = outputTokens * 0.0006  // $0.60 per 1K tokens
  return inputCost + outputCost
}

async function chatWithCostTracking(messages) {
  // 计算输入 tokens
  const inputText = messages.map(m => m.content).join('\n')
  const inputTokens = countTokens(inputText)
  
  console.log(`📊 输入 tokens: ${inputTokens}`)
  
  // 调用 API
  const response = await openai.chat.completions.create({
    model: 'gpt-4-turbo',
    messages: messages
  })
  
  // 使用响应中的 token 计数
  const outputTokens = response.usage.completion_tokens
  const totalTokens = response.usage.total_tokens
  
  const cost = estimateCost(inputTokens, outputTokens)
  
  console.log(`📊 输出 tokens: ${outputTokens}`)
  console.log(`📊 总 tokens: ${totalTokens}`)
  console.log(`💰 预估成本: $${cost.toFixed(6)}`)
  
  return {
    content: response.choices[0].message.content,
    tokens: { input: inputTokens, output: outputTokens },
    cost: cost
  }
}

🎓 最佳实践

DO ✅

// 1. 缓存 API 响应
const cache = new Map()

async function chatWithCache(key, messages) {
  if (cache.has(key)) {
    return cache.get(key)
  }
  
  const response = await openai.chat.completions.create({
    model: 'gpt-4-turbo',
    messages: messages
  })
  
  cache.set(key, response.choices[0].message.content)
  return response.choices[0].message.content
}

// 2. 流式处理大响应
async function processLargeResponse(stream) {
  let buffer = ''
  
  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content || ''
    buffer += content
    
    // 逐句处理而不是等待完整响应
    const sentences = buffer.split(/[。!?\n]/)
    
    for (let i = 0; i < sentences.length - 1; i++) {
      await processSentence(sentences[i])
    }
    
    buffer = sentences[sentences.length - 1]
  }
}

// 3. 设置 token 限制防止成本爆炸
const safeOptions = {
  max_tokens: 2000,
  temperature: 0.7,
  top_p: 0.9
}

// 4. 记录所有 API 调用
const apiCallLog = []

async function trackedChat(messages) {
  const startTime = Date.now()
  
  try {
    const response = await openai.chat.completions.create({
      model: 'gpt-4-turbo',
      messages: messages
    })
    
    apiCallLog.push({
      timestamp: new Date(),
      model: 'gpt-4-turbo',
      inputTokens: response.usage.prompt_tokens,
      outputTokens: response.usage.completion_tokens,
      duration: Date.now() - startTime,
      success: true
    })
    
    return response.choices[0].message.content
  } catch (error) {
    apiCallLog.push({
      timestamp: new Date(),
      model: 'gpt-4-turbo',
      error: error.message,
      duration: Date.now() - startTime,
      success: false
    })
    
    throw error
  }
}

DON'T ❌

// 1. 不要忽视 token 计数
// ❌ 无限制地调用 API
await openai.chat.completions.create({ messages })

// ✅ 设置限制
await openai.chat.completions.create({
  messages,
  max_tokens: 2000  // 防止成本爆炸
})

// 2. 不要同步等待流式响应
// ❌ 等待完整响应
const fullText = await streamResponse()

// ✅ 实时处理流式数据
for await (const chunk of stream) {
  updateUI(chunk.content)
}

// 3. 不要在 Prompt 中硬编码敏感信息
// ❌
const prompt = `API_KEY: ${process.env.SECRET_KEY}`

// ✅
const prompt = `请使用提供的认证token`

// 4. 不要忽视错误处理
// ❌ 未处理的 Promise rejection
openai.chat.completions.create({ messages })

// ✅ 完整的错误处理
try {
  await openai.chat.completions.create({ messages })
} catch (error) {
  if (error.status === 429) {
    // 处理速率限制
  } else if (error.status === 500) {
    // 处理服务器错误
  }
}

📚 总结

  • 基础集成: API 调用、流式响应、工具调用
  • Prompt 工程: 角色设定、Few-shot、链式思维、结构化输出
  • RAG 系统: 文档分割、embedding、相似度搜索
  • 多模态: 图像分析、视频处理
  • 生产优化: 错误重试、成本管理、性能监控

掌握大模型集成,构建智能应用的关键!