📖 文章概述
大语言模型已成为应用开发的核心组件。本文讲解如何集成 OpenAI/DeepSeek API、实现 Prompt 工程、构建 RAG 系统,以及处理流式响应。
🎯 大模型基础架构
大模型与应用的集成流程
用户输入
↓
[应用层] → 预处理、验证、历史管理
↓
[Prompt 工程] → 上下文注入、示例、角色设定
↓
[API 调用层] → OpenAI/DeepSeek/本地模型
↓
[大模型处理] → Token 生成、采样、解码
↓
[流式返回] → 增量更新、实时展示
↓
[应用处理] → 结构化提取、存储、反馈
↓
用户界面
常见集成架构对比
| 架构 | 延迟 | 成本 | 定制度 | 使用场景 |
|---|---|---|---|---|
| API 调用 | 中等 | 高 | 低 | 快速原型、简单应用 |
| 本地部署 | 低 | 中 | 高 | 私密数据、离线场景 |
| 混合模式 | 低 | 中 | 中 | 关键+非关键任务分离 |
| 多模型 | 中等 | 中 | 中 | 任务自适应选择 |
🔌 OpenAI API 集成
1. 基础对话实现
// 使用 OpenAI SDK
import OpenAI from 'openai'
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY
})
async function simpleChat(messages) {
const completion = await openai.chat.completions.create({
model: 'gpt-4-turbo',
messages: messages,
temperature: 0.7, // 创意程度 0-2
max_tokens: 2000, // 最大长度
top_p: 0.95 // 核采样
})
return completion.choices[0].message.content
}
// 使用
const messages = [
{ role: 'system', content: 'You are a helpful assistant.' },
{ role: 'user', content: 'Explain quantum computing in simple terms.' }
]
const response = await simpleChat(messages)
console.log(response)
2. 流式响应处理
async function streamChat(messages) {
const stream = await openai.chat.completions.create({
model: 'gpt-4-turbo',
messages: messages,
stream: true, // 启用流式传输
temperature: 0.7
})
// 处理流式数据
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || ''
process.stdout.write(content)
}
}
// 在 Express 中使用 SSE
app.get('/api/chat-stream', async (req, res) => {
const messages = [
{ role: 'user', content: req.query.message }
]
// 设置 SSE 响应头
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
try {
const stream = await openai.chat.completions.create({
model: 'gpt-4-turbo',
messages: messages,
stream: true
})
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || ''
if (content) {
res.write(`data: ${JSON.stringify({ content })}\n\n`)
}
}
res.write('data: [DONE]\n\n')
res.end()
} catch (error) {
res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`)
res.end()
}
})
// 前端接收
const eventSource = new EventSource('/api/chat-stream?message=Hello')
let fullResponse = ''
eventSource.addEventListener('message', (event) => {
if (event.data === '[DONE]') {
eventSource.close()
console.log('完整响应:', fullResponse)
return
}
const data = JSON.parse(event.data)
fullResponse += data.content
updateUI(data.content) // 实时更新UI
})
3. Function Calling(工具调用)
// 定义可用的工具/函数
const tools = [
{
type: 'function',
function: {
name: 'get_weather',
description: '获取指定城市的天气',
parameters: {
type: 'object',
properties: {
city: {
type: 'string',
description: '城市名称'
}
},
required: ['city']
}
}
},
{
type: 'function',
function: {
name: 'search_web',
description: '在网上搜索信息',
parameters: {
type: 'object',
properties: {
query: {
type: 'string',
description: '搜索查询'
}
},
required: ['query']
}
}
}
]
async function chatWithFunctions(messages) {
const response = await openai.chat.completions.create({
model: 'gpt-4-turbo',
messages: messages,
tools: tools
})
const assistantMessage = response.choices[0].message
// 如果模型选择调用函数
if (assistantMessage.tool_calls) {
for (const toolCall of assistantMessage.tool_calls) {
console.log(`调用函数: ${toolCall.function.name}`)
console.log(`参数: ${toolCall.function.arguments}`)
// 这里执行实际的函数
const result = await executeTool(
toolCall.function.name,
JSON.parse(toolCall.function.arguments)
)
// 将结果返回给模型
messages.push(assistantMessage)
messages.push({
role: 'tool',
tool_call_id: toolCall.id,
content: JSON.stringify(result)
})
// 再次调用模型以获得最终响应
const finalResponse = await openai.chat.completions.create({
model: 'gpt-4-turbo',
messages: messages
})
return finalResponse.choices[0].message.content
}
}
return assistantMessage.content
}
// 实际工具实现
async function executeTool(toolName, params) {
if (toolName === 'get_weather') {
return { city: params.city, temp: 25, condition: '晴' }
} else if (toolName === 'search_web') {
return { results: ['结果1', '结果2', '结果3'] }
}
}
// 使用
const messages = [
{
role: 'user',
content: '北京天气怎样?然后帮我搜索相关天气预报'
}
]
const result = await chatWithFunctions(messages)
console.log(result)
🧠 Prompt 工程最佳实践
4. Prompt 设计模式
// 模式 1: 角色 + 任务 + 约束
function createSystemPrompt(role, task, constraints) {
return `你是一个${role}。
你的任务是${task}。
约束条件:
${constraints.map(c => `- ${c}`).join('\n')}`
}
const systemPrompt = createSystemPrompt(
'技术文档编写专家',
'为开发者编写清晰准确的技术文档',
[
'使用中文',
'示例代码必须是可运行的',
'避免模糊的表述',
'包含最佳实践'
]
)
// 模式 2: Few-shot 学习(提供示例)
const fewShotMessages = [
{
role: 'user',
content: '将这个任务分解为步骤:构建一个 Web 应用'
},
{
role: 'assistant',
content: `1. 需求分析
2. 技术选型
3. 架构设计
4. 开发实现
5. 测试验证
6. 部署上线`
},
{
role: 'user',
content: '将这个任务分解为步骤:实现一个支付系统'
}
// 模型会遵循相同的结构回答
]
// 模式 3: Chain of Thought(思维链)
async function chainOfThoughtReasoning(problem) {
const messages = [
{
role: 'user',
content: `请一步步思考这个问题:${problem}
请先分析问题,然后展示你的推理过程,最后得出结论。`
}
]
const response = await openai.chat.completions.create({
model: 'gpt-4-turbo',
messages: messages,
temperature: 1.0 // CoT 时用更高温度获得多样性
})
return response.choices[0].message.content
}
// 模式 4: 结构化输出
async function structuredOutput(query) {
const messages = [
{
role: 'user',
content: `${query}
请以 JSON 格式返回结果,包含以下字段:
- title: 标题
- summary: 摘要
- steps: 步骤数组
- tips: 提示数组`
}
]
const response = await openai.chat.completions.create({
model: 'gpt-4-turbo',
messages: messages
})
try {
return JSON.parse(response.choices[0].message.content)
} catch {
console.error('JSON 解析失败')
return null
}
}
5. Prompt 优化技巧
// ❌ 不好的 Prompt
'帮我写代码'
// ✅ 好的 Prompt
`你是一个经验丰富的 Node.js 开发者。
请编写一个 Express 中间件来验证 JWT token。
要求:
- 从 Authorization header 提取 token
- 验证签名和过期时间
- 返回用户信息或错误
- 包含完整的错误处理
- 代码应该是生产级别的`
// 优化 Prompt 的原则
const promptOptimizationPrinciples = {
// 1. 明确角色
role: 'You are a TypeScript expert',
// 2. 清晰的任务
task: 'Write a generic function that...',
// 3. 指定格式
format: 'Return the result as JSON',
// 4. 设定约束
constraints: [
'Use async/await',
'Include error handling',
'Add JSDoc comments'
],
// 5. 提供示例
examples: 'Here is a similar example...',
// 6. 明确评价标准
criteria: 'The solution should be...'
}
// 完整优化示例
async function optimizedPrompt(userRequest) {
const systemPrompt = `你是一个高级 Node.js 架构师。
用户会请求你设计系统架构。
你需要:
1. 分析需求
2. 提出多个方案
3. 对比优缺点
4. 推荐最优方案
5. 提供实现建议
回答格式:使用 Markdown,包含架构图(ASCII)`
const userMessage = `${userRequest}
请考虑以下因素:
- 可扩展性
- 性能
- 维护成本
- 学习曲线`
const messages = [
{ role: 'system', content: systemPrompt },
{ role: 'user', content: userMessage }
]
const response = await openai.chat.completions.create({
model: 'gpt-4-turbo',
messages: messages,
temperature: 0.7,
max_tokens: 3000
})
return response.choices[0].message.content
}
🔍 RAG 系统(检索增强生成)
6. RAG 实现流程
import { RecursiveCharacterTextSplitter } from 'langchain/text_splitters'
import { OpenAIEmbeddings } from 'langchain/embeddings/openai'
import { PineconeStore } from 'langchain/vectorstores/pinecone'
import { PineconeClient } from '@pinecone-database/pinecone'
// 初始化
const pinecone = new PineconeClient()
await pinecone.init({
apiKey: process.env.PINECONE_API_KEY,
environment: process.env.PINECONE_ENVIRONMENT
})
const embeddings = new OpenAIEmbeddings({
openAIApiKey: process.env.OPENAI_API_KEY
})
// 1. 文档分割
async function prepareDocuments(documents) {
const splitter = new RecursiveCharacterTextSplitter({
chunkSize: 1000,
chunkOverlap: 200
})
const docs = []
for (const doc of documents) {
const splits = await splitter.splitText(doc.content)
docs.push(...splits.map(text => ({
content: text,
source: doc.source,
metadata: doc.metadata
})))
}
return docs
}
// 2. 生成 Embeddings 并存储
async function indexDocuments(documents) {
// 批量生成 embeddings
const embeddings = []
const batchSize = 100
for (let i = 0; i < documents.length; i += batchSize) {
const batch = documents.slice(i, i + batchSize)
const batchEmbeddings = await generateBatchEmbeddings(batch)
embeddings.push(...batchEmbeddings)
}
// 上传到 Pinecone
const vectorStore = await PineconeStore.fromDocuments(
documents,
embeddings,
{
pineconeIndex: pinecone.Index('documents'),
namespace: 'production'
}
)
return vectorStore
}
// 3. 相似度搜索
async function searchDocuments(query, k = 3) {
const vectorStore = await PineconeStore.fromExistingIndex(
new OpenAIEmbeddings(),
{ pineconeIndex: pinecone.Index('documents') }
)
const results = await vectorStore.similaritySearch(query, k)
return results.map(result => ({
content: result.pageContent,
score: result.metadata.score,
source: result.metadata.source
}))
}
// 4. RAG 生成
async function ragGenerate(userQuery) {
// 获取相关文档
const relevantDocs = await searchDocuments(userQuery, 5)
// 构建上下文
const context = relevantDocs
.map((doc, i) => `[文档 ${i+1}]\n${doc.content}`)
.join('\n\n')
// 生成提示
const systemPrompt = `你是一个有帮助的助手。
根据提供的文档回答问题。
如果文档中没有相关信息,明确说明。
始终引用你使用的文档。`
const userPrompt = `根据以下文档回答问题:
${context}
问题:${userQuery}`
// 调用模型
const response = await openai.chat.completions.create({
model: 'gpt-4-turbo',
messages: [
{ role: 'system', content: systemPrompt },
{ role: 'user', content: userPrompt }
],
temperature: 0.7
})
return {
answer: response.choices[0].message.content,
sources: relevantDocs.map(d => d.source)
}
}
// 使用 RAG
async function main() {
// 准备文档
const documents = [
{
content: '技术文档内容...',
source: 'tech-docs.md',
metadata: { type: 'documentation' }
},
// 更多文档
]
// 索引
await indexDocuments(await prepareDocuments(documents))
// 查询
const result = await ragGenerate('如何实现 API 认证?')
console.log('答案:', result.answer)
console.log('来源:', result.sources)
}
🚀 DeepSeek API 集成
7. DeepSeek 实现
import axios from 'axios'
class DeepSeekClient {
constructor(apiKey) {
this.apiKey = apiKey
this.baseURL = 'https://api.deepseek.com/v1'
}
async chat(messages, options = {}) {
try {
const response = await axios.post(
`${this.baseURL}/chat/completions`,
{
model: options.model || 'deepseek-chat',
messages: messages,
temperature: options.temperature || 0.7,
max_tokens: options.maxTokens || 2000,
stream: options.stream || false
},
{
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json'
}
}
)
return response.data.choices[0].message.content
} catch (error) {
console.error('DeepSeek API 错误:', error.message)
throw error
}
}
async *streamChat(messages, options = {}) {
try {
const response = await axios.post(
`${this.baseURL}/chat/completions`,
{
model: options.model || 'deepseek-chat',
messages: messages,
temperature: options.temperature || 0.7,
stream: true
},
{
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json'
},
responseType: 'stream'
}
)
for await (const chunk of response.data) {
const lines = chunk.toString().split('\n')
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const data = JSON.parse(line.slice(6))
if (data.choices[0]?.delta?.content) {
yield data.choices[0].delta.content
}
} catch (e) {
// 忽略解析错误
}
}
}
}
} catch (error) {
console.error('DeepSeek 流式 API 错误:', error.message)
throw error
}
}
}
// 使用
const deepseek = new DeepSeekClient(process.env.DEEPSEEK_API_KEY)
// 普通调用
const response = await deepseek.chat([
{ role: 'user', content: '用中文解释量子计算' }
])
console.log(response)
// 流式调用
let fullResponse = ''
for await (const chunk of deepseek.streamChat([
{ role: 'user', content: '编写一个快速排序算法' }
])) {
process.stdout.write(chunk)
fullResponse += chunk
}
🎨 多模态处理
8. 图像分析
import fs from 'fs'
import path from 'path'
// 使用 URL 图片
async function analyzeImageFromUrl(imageUrl) {
const response = await openai.chat.completions.create({
model: 'gpt-4-vision',
messages: [
{
role: 'user',
content: [
{ type: 'text', text: '请详细描述这张图片的内容' },
{
type: 'image_url',
image_url: {
url: imageUrl,
detail: 'high' // high, low, auto
}
}
]
}
]
})
return response.choices[0].message.content
}
// 使用本地图片
async function analyzeImageFromFile(imagePath) {
const imageBuffer = fs.readFileSync(imagePath)
const base64Image = imageBuffer.toString('base64')
const mimeType = getMimeType(imagePath)
const response = await openai.chat.completions.create({
model: 'gpt-4-vision',
messages: [
{
role: 'user',
content: [
{ type: 'text', text: '这个代码的问题是什么?' },
{
type: 'image_url',
image_url: {
url: `data:${mimeType};base64,${base64Image}`
}
}
]
}
]
})
return response.choices[0].message.content
}
// 获取 MIME 类型
function getMimeType(filePath) {
const ext = path.extname(filePath).toLowerCase()
const mimeTypes = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.webp': 'image/webp'
}
return mimeTypes[ext] || 'image/jpeg'
}
// 使用
const description = await analyzeImageFromUrl(
'https://example.com/screenshot.png'
)
console.log(description)
const codeAnalysis = await analyzeImageFromFile(
'./screenshots/error.png'
)
console.log(codeAnalysis)
⚙️ 生产级最佳实践
9. 错误处理和重试
class RobustOpenAIClient {
constructor(apiKey, options = {}) {
this.apiKey = apiKey
this.maxRetries = options.maxRetries || 3
this.retryDelay = options.retryDelay || 1000
}
async callWithRetry(fn, context = '') {
for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
try {
return await fn()
} catch (error) {
// 可重试的错误
if (this.isRetryableError(error)) {
if (attempt < this.maxRetries) {
const delay = this.retryDelay * Math.pow(2, attempt - 1)
console.log(
`${context} 重试 ${attempt}/${this.maxRetries},` +
`等待 ${delay}ms...`
)
await new Promise(r => setTimeout(r, delay))
continue
}
}
throw error
}
}
}
isRetryableError(error) {
// 速率限制
if (error.status === 429) return true
// 服务器错误
if (error.status >= 500) return true
// 网络错误
if (error.code === 'ECONNRESET') return true
if (error.code === 'ETIMEDOUT') return true
return false
}
async chat(messages, options = {}) {
return this.callWithRetry(async () => {
return await openai.chat.completions.create({
model: options.model || 'gpt-4-turbo',
messages: messages,
timeout: 30000, // 30 秒超时
...options
})
}, '聊天请求')
}
}
// 使用
const client = new RobustOpenAIClient(
process.env.OPENAI_API_KEY,
{ maxRetries: 5 }
)
const response = await client.chat([
{ role: 'user', content: 'Hello' }
])
10. Token 计数和成本管理
import { encoding_for_model } from 'js-tiktoken'
const enc = encoding_for_model('gpt-4-turbo')
function countTokens(text) {
return enc.encode(text).length
}
function estimateCost(inputTokens, outputTokens) {
const inputCost = inputTokens * 0.00003 // $0.03 per 1K tokens
const outputCost = outputTokens * 0.0006 // $0.60 per 1K tokens
return inputCost + outputCost
}
async function chatWithCostTracking(messages) {
// 计算输入 tokens
const inputText = messages.map(m => m.content).join('\n')
const inputTokens = countTokens(inputText)
console.log(`📊 输入 tokens: ${inputTokens}`)
// 调用 API
const response = await openai.chat.completions.create({
model: 'gpt-4-turbo',
messages: messages
})
// 使用响应中的 token 计数
const outputTokens = response.usage.completion_tokens
const totalTokens = response.usage.total_tokens
const cost = estimateCost(inputTokens, outputTokens)
console.log(`📊 输出 tokens: ${outputTokens}`)
console.log(`📊 总 tokens: ${totalTokens}`)
console.log(`💰 预估成本: $${cost.toFixed(6)}`)
return {
content: response.choices[0].message.content,
tokens: { input: inputTokens, output: outputTokens },
cost: cost
}
}
🎓 最佳实践
DO ✅
// 1. 缓存 API 响应
const cache = new Map()
async function chatWithCache(key, messages) {
if (cache.has(key)) {
return cache.get(key)
}
const response = await openai.chat.completions.create({
model: 'gpt-4-turbo',
messages: messages
})
cache.set(key, response.choices[0].message.content)
return response.choices[0].message.content
}
// 2. 流式处理大响应
async function processLargeResponse(stream) {
let buffer = ''
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || ''
buffer += content
// 逐句处理而不是等待完整响应
const sentences = buffer.split(/[。!?\n]/)
for (let i = 0; i < sentences.length - 1; i++) {
await processSentence(sentences[i])
}
buffer = sentences[sentences.length - 1]
}
}
// 3. 设置 token 限制防止成本爆炸
const safeOptions = {
max_tokens: 2000,
temperature: 0.7,
top_p: 0.9
}
// 4. 记录所有 API 调用
const apiCallLog = []
async function trackedChat(messages) {
const startTime = Date.now()
try {
const response = await openai.chat.completions.create({
model: 'gpt-4-turbo',
messages: messages
})
apiCallLog.push({
timestamp: new Date(),
model: 'gpt-4-turbo',
inputTokens: response.usage.prompt_tokens,
outputTokens: response.usage.completion_tokens,
duration: Date.now() - startTime,
success: true
})
return response.choices[0].message.content
} catch (error) {
apiCallLog.push({
timestamp: new Date(),
model: 'gpt-4-turbo',
error: error.message,
duration: Date.now() - startTime,
success: false
})
throw error
}
}
DON'T ❌
// 1. 不要忽视 token 计数
// ❌ 无限制地调用 API
await openai.chat.completions.create({ messages })
// ✅ 设置限制
await openai.chat.completions.create({
messages,
max_tokens: 2000 // 防止成本爆炸
})
// 2. 不要同步等待流式响应
// ❌ 等待完整响应
const fullText = await streamResponse()
// ✅ 实时处理流式数据
for await (const chunk of stream) {
updateUI(chunk.content)
}
// 3. 不要在 Prompt 中硬编码敏感信息
// ❌
const prompt = `API_KEY: ${process.env.SECRET_KEY}`
// ✅
const prompt = `请使用提供的认证token`
// 4. 不要忽视错误处理
// ❌ 未处理的 Promise rejection
openai.chat.completions.create({ messages })
// ✅ 完整的错误处理
try {
await openai.chat.completions.create({ messages })
} catch (error) {
if (error.status === 429) {
// 处理速率限制
} else if (error.status === 500) {
// 处理服务器错误
}
}
📚 总结
- 基础集成: API 调用、流式响应、工具调用
- Prompt 工程: 角色设定、Few-shot、链式思维、结构化输出
- RAG 系统: 文档分割、embedding、相似度搜索
- 多模态: 图像分析、视频处理
- 生产优化: 错误重试、成本管理、性能监控
掌握大模型集成,构建智能应用的关键!