实时应用

实时应用开发完全指南:WebSocket、消息队列与实时通信

深入学习 WebSocket、Server-Sent Events、消息队列等实时通信技术,构建高效的实时应用

17 分钟阅读
#WebSocket #实时通信 #消息队列 #Socket.io

📖 文章概述

实时通信是现代应用的核心需求。本文讲解 WebSocket、Server-Sent Events、消息队列等技术,以及如何构建高效的实时应用。


🎯 实时通信核心概念

通信模型演进

1. HTTP Polling(轮询)
客户端 ──→ 服务器 ──→ 查询
  ↓                      检查更新
  等待 → 重复请求

问题:浪费带宽、延迟高、服务器压力大

2. HTTP Long Polling(长轮询)
客户端 ──→ 服务器 ──→ 等待...
           ↓
           有数据时返回

问题:仍然需要多个连接、复杂性高

3. WebSocket(双向连接)
客户端 ←──→ 服务器 ← 持久化连接
     ↑                    ↓
   发送消息            推送消息

优点:低延迟、低开销、真正实时

实时通信技术对比

技术延迟开销难度浏览器支持
Polling全部
Long Polling全部
Server-Sent Events99%
WebSocket极低极低99%
消息队列N/A

🚀 WebSocket 完全指南

1. WebSocket 基础

// 客户端 WebSocket
const socket = new WebSocket('ws://localhost:8080')

// 连接打开
socket.addEventListener('open', () => {
  console.log('WebSocket 连接已打开')
  socket.send('Hello Server!')
})

// 接收消息
socket.addEventListener('message', (event) => {
  console.log('收到消息:', event.data)
})

// 连接关闭
socket.addEventListener('close', () => {
  console.log('WebSocket 连接已关闭')
  // 重新连接逻辑
  setTimeout(() => {
    reconnectWebSocket()
  }, 3000)
})

// 错误处理
socket.addEventListener('error', (event) => {
  console.error('WebSocket 错误:', event)
})

2. Node.js WebSocket 服务器

// 使用 ws 库(轻量级)
import WebSocket from 'ws'

const wss = new WebSocket.Server({ port: 8080 })

wss.on('connection', (ws) => {
  console.log('客户端已连接')
  
  // 发送初始消息
  ws.send(JSON.stringify({
    type: 'welcome',
    message: 'Welcome to WebSocket server'
  }))
  
  // 接收消息
  ws.on('message', (data) => {
    const message = JSON.parse(data)
    console.log('收到消息:', message)
    
    // 广播给所有客户端
    wss.clients.forEach((client) => {
      if (client.readyState === WebSocket.OPEN) {
        client.send(JSON.stringify({
          type: 'broadcast',
          data: message
        }))
      }
    })
  })
  
  // 客户端断开连接
  ws.on('close', () => {
    console.log('客户端已断开连接')
  })
  
  // 错误处理
  ws.on('error', (error) => {
    console.error('WebSocket 错误:', error)
  })
})

3. Socket.io(生产级方案)

// 服务器端
import io from 'socket.io'

const server = io(3000, {
  cors: { origin: '*' }
})

server.on('connection', (socket) => {
  console.log('客户端已连接:', socket.id)
  
  // 发送消息给该客户端
  socket.emit('welcome', {
    message: 'Welcome!',
    socketId: socket.id
  })
  
  // 接收消息
  socket.on('chat message', (msg) => {
    console.log('消息:', msg)
    
    // 发送给所有客户端
    server.emit('chat message', {
      from: socket.id,
      message: msg,
      timestamp: Date.now()
    })
  })
  
  // 房间管理
  socket.on('join room', (roomId) => {
    socket.join(roomId)
    
    // 通知房间内其他成员
    socket.to(roomId).emit('user joined', {
      userId: socket.id,
      message: '新用户加入了房间'
    })
  })
  
  // 发送给指定房间
  socket.on('room message', (roomId, msg) => {
    server.to(roomId).emit('room message', {
      from: socket.id,
      message: msg
    })
  })
  
  // 断开连接
  socket.on('disconnect', () => {
    console.log('客户端已断开连接:', socket.id)
  })
})

4. Socket.io 客户端

// 浏览器客户端
import io from 'socket.io-client'

const socket = io('http://localhost:3000', {
  reconnection: true,
  reconnectionDelay: 1000,
  reconnectionDelayMax: 5000,
  reconnectionAttempts: 5
})

// 监听连接
socket.on('connect', () => {
  console.log('已连接到服务器')
})

// 接收欢迎消息
socket.on('welcome', (data) => {
  console.log('欢迎消息:', data)
})

// 接收聊天消息
socket.on('chat message', (data) => {
  console.log('新消息:', data)
  addMessageToUI(data)
})

// 加入房间
function joinRoom(roomId) {
  socket.emit('join room', roomId)
}

// 发送消息
function sendMessage(message) {
  socket.emit('chat message', message)
}

// 发送房间消息
function sendRoomMessage(roomId, message) {
  socket.emit('room message', roomId, message)
}

// 监听连接失败
socket.on('connect_error', (error) => {
  console.error('连接错误:', error)
})

// 监听断开连接
socket.on('disconnect', () => {
  console.log('连接已断开')
})

📡 Server-Sent Events (SSE)

5. SSE 实现

// 服务器端(Express)
app.get('/events', (req, res) => {
  // 设置 SSE 响应头
  res.setHeader('Content-Type', 'text/event-stream')
  res.setHeader('Cache-Control', 'no-cache')
  res.setHeader('Connection', 'keep-alive')
  
  // 发送初始消息
  res.write('data: Connected to server\n\n')
  
  // 定期发送消息
  const interval = setInterval(() => {
    const message = {
      timestamp: new Date(),
      data: Math.random()
    }
    res.write(`data: ${JSON.stringify(message)}\n\n`)
  }, 1000)
  
  // 客户端断开时清理
  req.on('close', () => {
    clearInterval(interval)
    res.end()
  })
})

// 客户端
const eventSource = new EventSource('/events')

// 接收消息
eventSource.addEventListener('message', (event) => {
  const data = JSON.parse(event.data)
  console.log('收到数据:', data)
})

// 错误处理
eventSource.addEventListener('error', (error) => {
  console.error('SSE 错误:', error)
  eventSource.close()
})

// 关闭连接
function closeEventSource() {
  eventSource.close()
}

SSE vs WebSocket

SSE 优点:
- 使用标准 HTTP
- 自动重连
- 简单易用

SSE 缺点:
- 单向通信(服务器→客户端)
- 不适合高频双向通信

WebSocket 优点:
- 双向通信
- 低延迟
- 高效

WebSocket 缺点:
- 需要特殊库
- 需要处理连接管理

🔄 消息队列实时应用

6. Redis Pub/Sub

// 发布者
import redis from 'redis'

const publisher = redis.createClient()

// 发布消息
function publishMessage(channel, message) {
  publisher.publish(channel, JSON.stringify(message))
}

// 订阅者
const subscriber = redis.createClient()

subscriber.subscribe('notifications', (message) => {
  const data = JSON.parse(message)
  console.log('收到通知:', data)
  
  // 将消息推送给所有连接的客户端
  broadcastToClients(data)
})

// 在 WebSocket 服务器中使用
server.on('connection', (socket) => {
  // 订阅某个频道
  socket.on('subscribe', (channel) => {
    const sub = redis.createClient()
    sub.subscribe(channel, (message) => {
      socket.emit('message', JSON.parse(message))
    })
    
    // 保存订阅以便断开时清理
    socket.subscriptions = socket.subscriptions || []
    socket.subscriptions.push(sub)
  })
  
  // 断开时关闭订阅
  socket.on('disconnect', () => {
    if (socket.subscriptions) {
      socket.subscriptions.forEach(sub => sub.unsubscribe())
    }
  })
})

7. RabbitMQ 消息队列

// 使用 amqplib(RabbitMQ 客户端)
import amqp from 'amqplib'

// 生产者
async function sendMessage(queue, message) {
  const connection = await amqp.connect('amqp://localhost')
  const channel = await connection.createChannel()
  
  // 声明队列
  await channel.assertQueue(queue, { durable: true })
  
  // 发送消息
  channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
    persistent: true  // 消息持久化
  })
  
  console.log('消息已发送:', message)
}

// 消费者
async function consumeMessages(queue) {
  const connection = await amqp.connect('amqp://localhost')
  const channel = await connection.createChannel()
  
  // 声明队列
  await channel.assertQueue(queue, { durable: true })
  
  // 设置并发数
  channel.prefetch(1)
  
  // 消费消息
  channel.consume(queue, (msg) => {
    if (msg) {
      const message = JSON.parse(msg.content.toString())
      console.log('处理消息:', message)
      
      // 处理消息
      processMessage(message).then(() => {
        // 确认消息
        channel.ack(msg)
      })
    }
  })
}

// 在 Express 中整合
app.post('/api/messages', async (req, res) => {
  try {
    await sendMessage('notifications', req.body)
    res.json({ success: true })
  } catch (error) {
    res.status(500).json({ error: error.message })
  }
})

🎯 实战:实时聊天应用

8. 完整实时聊天系统

// server.js
import express from 'express'
import io from 'socket.io'
import http from 'http'

const app = express()
const server = http.createServer(app)
const socket_io = io(server, {
  cors: { origin: '*' }
})

// 在内存中存储用户(实际应用应使用数据库)
const users = new Map()
const rooms = new Map()

socket_io.on('connection', (socket) => {
  console.log('用户已连接:', socket.id)
  
  // 用户加入
  socket.on('join', (data) => {
    const { username, roomId } = data
    
    // 保存用户信息
    users.set(socket.id, {
      id: socket.id,
      username,
      roomId
    })
    
    // 加入房间
    socket.join(roomId)
    
    // 通知房间内的其他用户
    socket.to(roomId).emit('user:joined', {
      username,
      userId: socket.id,
      timestamp: Date.now()
    })
    
    // 发送房间内的用户列表
    const roomUsers = Array.from(users.values())
      .filter(u => u.roomId === roomId)
    
    socket.emit('users:list', roomUsers)
  })
  
  // 接收聊天消息
  socket.on('message:send', (data) => {
    const user = users.get(socket.id)
    if (!user) return
    
    const message = {
      id: Math.random().toString(36),
      userId: socket.id,
      username: user.username,
      content: data.content,
      timestamp: Date.now()
    }
    
    // 广播给房间内的所有用户
    socket_io.to(user.roomId).emit('message:new', message)
  })
  
  // 输入状态
  socket.on('typing:start', () => {
    const user = users.get(socket.id)
    if (!user) return
    
    socket.to(user.roomId).emit('user:typing', {
      userId: socket.id,
      username: user.username
    })
  })
  
  socket.on('typing:stop', () => {
    const user = users.get(socket.id)
    if (!user) return
    
    socket.to(user.roomId).emit('user:stopped-typing', {
      userId: socket.id
    })
  })
  
  // 用户断开连接
  socket.on('disconnect', () => {
    const user = users.get(socket.id)
    if (user) {
      users.delete(socket.id)
      
      socket_io.to(user.roomId).emit('user:left', {
        username: user.username,
        userId: socket.id
      })
    }
  })
})

server.listen(3000, () => {
  console.log('聊天服务器运行在 http://localhost:3000')
})

9. 前端 Vue 聊天组件

<template>
  <div class="chat-container">
    <!-- 用户列表 -->
    <aside class="sidebar">
      <div class="room-info">
        <h2>{{ roomId }}</h2>
        <p>用户数: {{ users.length }}</p>
      </div>
      
      <div class="users-list">
        <div
          v-for="user in users"
          :key="user.id"
          class="user-item"
          :class="{ typing: typingUsers.has(user.id) }"
        >
          {{ user.username }}
          <span v-if="typingUsers.has(user.id)" class="typing-indicator">
            输入中...
          </span>
        </div>
      </div>
    </aside>
    
    <!-- 聊天区域 -->
    <div class="chat-area">
      <!-- 消息列表 -->
      <div class="messages" ref="messagesContainer">
        <div
          v-for="message in messages"
          :key="message.id"
          class="message"
          :class="{ 'own-message': message.userId === currentUserId }"
        >
          <div class="message-header">
            <strong>{{ message.username }}</strong>
            <span class="timestamp">
              {{ formatTime(message.timestamp) }}
            </span>
          </div>
          <div class="message-content">{{ message.content }}</div>
        </div>
      </div>
      
      <!-- 输入区域 -->
      <div class="input-area">
        <input
          v-model="inputMessage"
          type="text"
          placeholder="输入消息..."
          @keyup.enter="sendMessage"
          @input="handleTyping"
        />
        <button @click="sendMessage">发送</button>
      </div>
    </div>
  </div>
</template>

<script setup lang="ts">
import { ref, reactive, computed, watch, nextTick } from 'vue'
import { io } from 'socket.io-client'

interface User {
  id: string
  username: string
  roomId: string
}

interface Message {
  id: string
  userId: string
  username: string
  content: string
  timestamp: number
}

const props = defineProps({
  username: String,
  roomId: String
})

const socket = io('http://localhost:3000')
const messages = reactive<Message[]>([])
const users = reactive<User[]>([])
const typingUsers = ref(new Set<string>())
const inputMessage = ref('')
const currentUserId = ref('')
const messagesContainer = ref<HTMLElement>()
let typingTimeout: NodeJS.Timeout

// 加入房间
socket.on('connect', () => {
  currentUserId.value = socket.id!
  socket.emit('join', {
    username: props.username,
    roomId: props.roomId
  })
})

// 接收用户列表
socket.on('users:list', (userList) => {
  users.splice(0, users.length, ...userList)
})

// 新用户加入
socket.on('user:joined', (user) => {
  console.log(`${user.username} 加入了房间`)
})

// 新消息
socket.on('message:new', (message: Message) => {
  messages.push(message)
  
  // 自动滚动到最新消息
  nextTick(() => {
    if (messagesContainer.value) {
      messagesContainer.value.scrollTop = messagesContainer.value.scrollHeight
    }
  })
})

// 用户输入
socket.on('user:typing', (data) => {
  typingUsers.value.add(data.userId)
})

socket.on('user:stopped-typing', (data) => {
  typingUsers.value.delete(data.userId)
})

// 用户离开
socket.on('user:left', (user) => {
  console.log(`${user.username} 离开了房间`)
  users.splice(users.findIndex(u => u.id === user.userId), 1)
})

// 发送消息
const sendMessage = () => {
  if (!inputMessage.value.trim()) return
  
  socket.emit('message:send', {
    content: inputMessage.value
  })
  
  inputMessage.value = ''
  socket.emit('typing:stop')
}

// 输入状态处理
const handleTyping = () => {
  clearTimeout(typingTimeout)
  
  socket.emit('typing:start')
  
  typingTimeout = setTimeout(() => {
    socket.emit('typing:stop')
  }, 2000)
}

// 时间格式化
const formatTime = (timestamp: number) => {
  return new Date(timestamp).toLocaleTimeString()
}

// 清理
onUnmounted(() => {
  socket.disconnect()
})
</script>

<style scoped>
.chat-container {
  display: flex;
  height: 100vh;
  gap: 1px;
  background: #f0f0f0;
}

.sidebar {
  width: 250px;
  background: white;
  border-right: 1px solid #ddd;
  overflow-y: auto;
}

.room-info {
  padding: 15px;
  border-bottom: 1px solid #ddd;
}

.users-list {
  padding: 10px;
}

.user-item {
  padding: 8px;
  margin: 5px 0;
  background: #f5f5f5;
  border-radius: 4px;
  transition: background 0.3s;
}

.user-item.typing {
  background: #e3f2fd;
}

.typing-indicator {
  font-size: 0.8em;
  color: #999;
  margin-left: 10px;
}

.chat-area {
  flex: 1;
  display: flex;
  flex-direction: column;
  background: white;
}

.messages {
  flex: 1;
  overflow-y: auto;
  padding: 15px;
}

.message {
  margin: 10px 0;
  padding: 10px;
  background: #f5f5f5;
  border-radius: 8px;
  max-width: 70%;
}

.message.own-message {
  margin-left: auto;
  background: #e3f2fd;
}

.message-header {
  display: flex;
  justify-content: space-between;
  margin-bottom: 5px;
  font-size: 0.9em;
}

.timestamp {
  color: #999;
}

.message-content {
  word-wrap: break-word;
}

.input-area {
  display: flex;
  padding: 15px;
  gap: 10px;
  border-top: 1px solid #ddd;
}

.input-area input {
  flex: 1;
  padding: 8px;
  border: 1px solid #ddd;
  border-radius: 4px;
}

.input-area button {
  padding: 8px 20px;
  background: #007bff;
  color: white;
  border: none;
  border-radius: 4px;
  cursor: pointer;
}

.input-area button:hover {
  background: #0056b3;
}
</style>

🐛 常见问题解决

问题 1:连接断开重连

const socket = io('http://localhost:3000', {
  reconnection: true,
  reconnectionDelay: 1000,
  reconnectionDelayMax: 5000,
  reconnectionAttempts: 5,
  transports: ['websocket', 'polling']  // 降级支持
})

socket.on('disconnect', () => {
  console.log('连接已断开,将自动重连...')
})

socket.on('connect', () => {
  console.log('连接已恢复')
  // 重新加载数据
  reloadData()
})

问题 2:消息丢失

// 使用消息队列确保可靠传递
const queue: Message[] = []

socket.on('disconnect', () => {
  // 保存未发送的消息
  localStorage.setItem('pendingMessages', JSON.stringify(queue))
})

socket.on('connect', () => {
  // 恢复并重发未发送的消息
  const pending = JSON.parse(localStorage.getItem('pendingMessages') || '[]')
  pending.forEach(msg => {
    socket.emit('message:send', msg)
  })
  localStorage.removeItem('pendingMessages')
})

问题 3:扩展性问题

// 使用消息队列(Redis Pub/Sub)实现多服务器支持

// 服务器 1 和服务器 2 都订阅相同频道
const subscriber = redis.createClient()
subscriber.subscribe('chat-messages')

socket_io.on('connection', (socket) => {
  socket.on('message:send', (message) => {
    // 发布到 Redis,所有服务器都能接收
    publisher.publish('chat-messages', JSON.stringify(message))
  })
})

// 接收来自 Redis 的消息
subscriber.on('message', (channel, message) => {
  const data = JSON.parse(message)
  // 广播给当前服务器的所有客户端
  socket_io.emit('message:new', data)
})

🎓 最佳实践

DO ✅

// 1. 实现自动重连
const socket = io(url, { reconnection: true })

// 2. 使用心跳检测连接
setInterval(() => {
  socket.emit('ping')
}, 30000)

// 3. 区分消息优先级
socket.emit('message:send', {
  content: msg,
  priority: 'high'
})

// 4. 限制消息频率
const throttle = (fn, delay) => {
  let timeout
  return () => {
    clearTimeout(timeout)
    timeout = setTimeout(fn, delay)
  }
}

socket.on('input', throttle(() => {
  socket.emit('typing')
}, 500))

// 5. 使用认证
socket.on('connect', () => {
  socket.emit('authenticate', { token })
})

DON'T ❌

// 1. 不要忽视错误处理
// socket.emit('message')  // ❌ 无错误处理

socket.emit('message', {}, (error) => {
  if (error) console.error(error)
})

// 2. 不要频繁创建新连接
// new WebSocket(url)  // 每次都创建新连接

const socket = io(url)  // 单一实例

// 3. 不要在 disconnect 时立即清理
// ❌ socket.off('message')

// ✅ 等待重连失败后再清理

// 4. 不要发送过大的消息
// socket.emit('message', largeObject)

// ✅ 分块发送或压缩

📚 扩展资源


总结

实时应用开发的关键要素:

  1. 选择正确的技术:WebSocket(双向)、SSE(单向)、消息队列(可靠性)
  2. 连接管理:自动重连、心跳检测、状态恢复
  3. 消息处理:去重、排序、优先级、限流
  4. 扩展性:使用消息队列支持多服务器
  5. 性能优化:压缩、分块、缓存
  6. 用户体验:离线提示、输入状态、错误恢复

掌握实时通信技术,构建响应式、沉浸式的应用!