📖 文章概述
实时通信是现代应用的核心需求。本文讲解 WebSocket、Server-Sent Events、消息队列等技术,以及如何构建高效的实时应用。
🎯 实时通信核心概念
通信模型演进
1. HTTP Polling(轮询)
客户端 ──→ 服务器 ──→ 查询
↓ 检查更新
等待 → 重复请求
问题:浪费带宽、延迟高、服务器压力大
2. HTTP Long Polling(长轮询)
客户端 ──→ 服务器 ──→ 等待...
↓
有数据时返回
问题:仍然需要多个连接、复杂性高
3. WebSocket(双向连接)
客户端 ←──→ 服务器 ← 持久化连接
↑ ↓
发送消息 推送消息
优点:低延迟、低开销、真正实时
实时通信技术对比
| 技术 | 延迟 | 开销 | 难度 | 浏览器支持 |
|---|---|---|---|---|
| Polling | 高 | 高 | 低 | 全部 |
| Long Polling | 中 | 中 | 中 | 全部 |
| Server-Sent Events | 低 | 低 | 低 | 99% |
| WebSocket | 极低 | 极低 | 中 | 99% |
| 消息队列 | 中 | 低 | 高 | N/A |
🚀 WebSocket 完全指南
1. WebSocket 基础
// 客户端 WebSocket
const socket = new WebSocket('ws://localhost:8080')
// 连接打开
socket.addEventListener('open', () => {
console.log('WebSocket 连接已打开')
socket.send('Hello Server!')
})
// 接收消息
socket.addEventListener('message', (event) => {
console.log('收到消息:', event.data)
})
// 连接关闭
socket.addEventListener('close', () => {
console.log('WebSocket 连接已关闭')
// 重新连接逻辑
setTimeout(() => {
reconnectWebSocket()
}, 3000)
})
// 错误处理
socket.addEventListener('error', (event) => {
console.error('WebSocket 错误:', event)
})
2. Node.js WebSocket 服务器
// 使用 ws 库(轻量级)
import WebSocket from 'ws'
const wss = new WebSocket.Server({ port: 8080 })
wss.on('connection', (ws) => {
console.log('客户端已连接')
// 发送初始消息
ws.send(JSON.stringify({
type: 'welcome',
message: 'Welcome to WebSocket server'
}))
// 接收消息
ws.on('message', (data) => {
const message = JSON.parse(data)
console.log('收到消息:', message)
// 广播给所有客户端
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({
type: 'broadcast',
data: message
}))
}
})
})
// 客户端断开连接
ws.on('close', () => {
console.log('客户端已断开连接')
})
// 错误处理
ws.on('error', (error) => {
console.error('WebSocket 错误:', error)
})
})
3. Socket.io(生产级方案)
// 服务器端
import io from 'socket.io'
const server = io(3000, {
cors: { origin: '*' }
})
server.on('connection', (socket) => {
console.log('客户端已连接:', socket.id)
// 发送消息给该客户端
socket.emit('welcome', {
message: 'Welcome!',
socketId: socket.id
})
// 接收消息
socket.on('chat message', (msg) => {
console.log('消息:', msg)
// 发送给所有客户端
server.emit('chat message', {
from: socket.id,
message: msg,
timestamp: Date.now()
})
})
// 房间管理
socket.on('join room', (roomId) => {
socket.join(roomId)
// 通知房间内其他成员
socket.to(roomId).emit('user joined', {
userId: socket.id,
message: '新用户加入了房间'
})
})
// 发送给指定房间
socket.on('room message', (roomId, msg) => {
server.to(roomId).emit('room message', {
from: socket.id,
message: msg
})
})
// 断开连接
socket.on('disconnect', () => {
console.log('客户端已断开连接:', socket.id)
})
})
4. Socket.io 客户端
// 浏览器客户端
import io from 'socket.io-client'
const socket = io('http://localhost:3000', {
reconnection: true,
reconnectionDelay: 1000,
reconnectionDelayMax: 5000,
reconnectionAttempts: 5
})
// 监听连接
socket.on('connect', () => {
console.log('已连接到服务器')
})
// 接收欢迎消息
socket.on('welcome', (data) => {
console.log('欢迎消息:', data)
})
// 接收聊天消息
socket.on('chat message', (data) => {
console.log('新消息:', data)
addMessageToUI(data)
})
// 加入房间
function joinRoom(roomId) {
socket.emit('join room', roomId)
}
// 发送消息
function sendMessage(message) {
socket.emit('chat message', message)
}
// 发送房间消息
function sendRoomMessage(roomId, message) {
socket.emit('room message', roomId, message)
}
// 监听连接失败
socket.on('connect_error', (error) => {
console.error('连接错误:', error)
})
// 监听断开连接
socket.on('disconnect', () => {
console.log('连接已断开')
})
📡 Server-Sent Events (SSE)
5. SSE 实现
// 服务器端(Express)
app.get('/events', (req, res) => {
// 设置 SSE 响应头
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
// 发送初始消息
res.write('data: Connected to server\n\n')
// 定期发送消息
const interval = setInterval(() => {
const message = {
timestamp: new Date(),
data: Math.random()
}
res.write(`data: ${JSON.stringify(message)}\n\n`)
}, 1000)
// 客户端断开时清理
req.on('close', () => {
clearInterval(interval)
res.end()
})
})
// 客户端
const eventSource = new EventSource('/events')
// 接收消息
eventSource.addEventListener('message', (event) => {
const data = JSON.parse(event.data)
console.log('收到数据:', data)
})
// 错误处理
eventSource.addEventListener('error', (error) => {
console.error('SSE 错误:', error)
eventSource.close()
})
// 关闭连接
function closeEventSource() {
eventSource.close()
}
SSE vs WebSocket
SSE 优点:
- 使用标准 HTTP
- 自动重连
- 简单易用
SSE 缺点:
- 单向通信(服务器→客户端)
- 不适合高频双向通信
WebSocket 优点:
- 双向通信
- 低延迟
- 高效
WebSocket 缺点:
- 需要特殊库
- 需要处理连接管理
🔄 消息队列实时应用
6. Redis Pub/Sub
// 发布者
import redis from 'redis'
const publisher = redis.createClient()
// 发布消息
function publishMessage(channel, message) {
publisher.publish(channel, JSON.stringify(message))
}
// 订阅者
const subscriber = redis.createClient()
subscriber.subscribe('notifications', (message) => {
const data = JSON.parse(message)
console.log('收到通知:', data)
// 将消息推送给所有连接的客户端
broadcastToClients(data)
})
// 在 WebSocket 服务器中使用
server.on('connection', (socket) => {
// 订阅某个频道
socket.on('subscribe', (channel) => {
const sub = redis.createClient()
sub.subscribe(channel, (message) => {
socket.emit('message', JSON.parse(message))
})
// 保存订阅以便断开时清理
socket.subscriptions = socket.subscriptions || []
socket.subscriptions.push(sub)
})
// 断开时关闭订阅
socket.on('disconnect', () => {
if (socket.subscriptions) {
socket.subscriptions.forEach(sub => sub.unsubscribe())
}
})
})
7. RabbitMQ 消息队列
// 使用 amqplib(RabbitMQ 客户端)
import amqp from 'amqplib'
// 生产者
async function sendMessage(queue, message) {
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
// 声明队列
await channel.assertQueue(queue, { durable: true })
// 发送消息
channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
persistent: true // 消息持久化
})
console.log('消息已发送:', message)
}
// 消费者
async function consumeMessages(queue) {
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
// 声明队列
await channel.assertQueue(queue, { durable: true })
// 设置并发数
channel.prefetch(1)
// 消费消息
channel.consume(queue, (msg) => {
if (msg) {
const message = JSON.parse(msg.content.toString())
console.log('处理消息:', message)
// 处理消息
processMessage(message).then(() => {
// 确认消息
channel.ack(msg)
})
}
})
}
// 在 Express 中整合
app.post('/api/messages', async (req, res) => {
try {
await sendMessage('notifications', req.body)
res.json({ success: true })
} catch (error) {
res.status(500).json({ error: error.message })
}
})
🎯 实战:实时聊天应用
8. 完整实时聊天系统
// server.js
import express from 'express'
import io from 'socket.io'
import http from 'http'
const app = express()
const server = http.createServer(app)
const socket_io = io(server, {
cors: { origin: '*' }
})
// 在内存中存储用户(实际应用应使用数据库)
const users = new Map()
const rooms = new Map()
socket_io.on('connection', (socket) => {
console.log('用户已连接:', socket.id)
// 用户加入
socket.on('join', (data) => {
const { username, roomId } = data
// 保存用户信息
users.set(socket.id, {
id: socket.id,
username,
roomId
})
// 加入房间
socket.join(roomId)
// 通知房间内的其他用户
socket.to(roomId).emit('user:joined', {
username,
userId: socket.id,
timestamp: Date.now()
})
// 发送房间内的用户列表
const roomUsers = Array.from(users.values())
.filter(u => u.roomId === roomId)
socket.emit('users:list', roomUsers)
})
// 接收聊天消息
socket.on('message:send', (data) => {
const user = users.get(socket.id)
if (!user) return
const message = {
id: Math.random().toString(36),
userId: socket.id,
username: user.username,
content: data.content,
timestamp: Date.now()
}
// 广播给房间内的所有用户
socket_io.to(user.roomId).emit('message:new', message)
})
// 输入状态
socket.on('typing:start', () => {
const user = users.get(socket.id)
if (!user) return
socket.to(user.roomId).emit('user:typing', {
userId: socket.id,
username: user.username
})
})
socket.on('typing:stop', () => {
const user = users.get(socket.id)
if (!user) return
socket.to(user.roomId).emit('user:stopped-typing', {
userId: socket.id
})
})
// 用户断开连接
socket.on('disconnect', () => {
const user = users.get(socket.id)
if (user) {
users.delete(socket.id)
socket_io.to(user.roomId).emit('user:left', {
username: user.username,
userId: socket.id
})
}
})
})
server.listen(3000, () => {
console.log('聊天服务器运行在 http://localhost:3000')
})
9. 前端 Vue 聊天组件
<template>
<div class="chat-container">
<!-- 用户列表 -->
<aside class="sidebar">
<div class="room-info">
<h2>{{ roomId }}</h2>
<p>用户数: {{ users.length }}</p>
</div>
<div class="users-list">
<div
v-for="user in users"
:key="user.id"
class="user-item"
:class="{ typing: typingUsers.has(user.id) }"
>
{{ user.username }}
<span v-if="typingUsers.has(user.id)" class="typing-indicator">
输入中...
</span>
</div>
</div>
</aside>
<!-- 聊天区域 -->
<div class="chat-area">
<!-- 消息列表 -->
<div class="messages" ref="messagesContainer">
<div
v-for="message in messages"
:key="message.id"
class="message"
:class="{ 'own-message': message.userId === currentUserId }"
>
<div class="message-header">
<strong>{{ message.username }}</strong>
<span class="timestamp">
{{ formatTime(message.timestamp) }}
</span>
</div>
<div class="message-content">{{ message.content }}</div>
</div>
</div>
<!-- 输入区域 -->
<div class="input-area">
<input
v-model="inputMessage"
type="text"
placeholder="输入消息..."
@keyup.enter="sendMessage"
@input="handleTyping"
/>
<button @click="sendMessage">发送</button>
</div>
</div>
</div>
</template>
<script setup lang="ts">
import { ref, reactive, computed, watch, nextTick } from 'vue'
import { io } from 'socket.io-client'
interface User {
id: string
username: string
roomId: string
}
interface Message {
id: string
userId: string
username: string
content: string
timestamp: number
}
const props = defineProps({
username: String,
roomId: String
})
const socket = io('http://localhost:3000')
const messages = reactive<Message[]>([])
const users = reactive<User[]>([])
const typingUsers = ref(new Set<string>())
const inputMessage = ref('')
const currentUserId = ref('')
const messagesContainer = ref<HTMLElement>()
let typingTimeout: NodeJS.Timeout
// 加入房间
socket.on('connect', () => {
currentUserId.value = socket.id!
socket.emit('join', {
username: props.username,
roomId: props.roomId
})
})
// 接收用户列表
socket.on('users:list', (userList) => {
users.splice(0, users.length, ...userList)
})
// 新用户加入
socket.on('user:joined', (user) => {
console.log(`${user.username} 加入了房间`)
})
// 新消息
socket.on('message:new', (message: Message) => {
messages.push(message)
// 自动滚动到最新消息
nextTick(() => {
if (messagesContainer.value) {
messagesContainer.value.scrollTop = messagesContainer.value.scrollHeight
}
})
})
// 用户输入
socket.on('user:typing', (data) => {
typingUsers.value.add(data.userId)
})
socket.on('user:stopped-typing', (data) => {
typingUsers.value.delete(data.userId)
})
// 用户离开
socket.on('user:left', (user) => {
console.log(`${user.username} 离开了房间`)
users.splice(users.findIndex(u => u.id === user.userId), 1)
})
// 发送消息
const sendMessage = () => {
if (!inputMessage.value.trim()) return
socket.emit('message:send', {
content: inputMessage.value
})
inputMessage.value = ''
socket.emit('typing:stop')
}
// 输入状态处理
const handleTyping = () => {
clearTimeout(typingTimeout)
socket.emit('typing:start')
typingTimeout = setTimeout(() => {
socket.emit('typing:stop')
}, 2000)
}
// 时间格式化
const formatTime = (timestamp: number) => {
return new Date(timestamp).toLocaleTimeString()
}
// 清理
onUnmounted(() => {
socket.disconnect()
})
</script>
<style scoped>
.chat-container {
display: flex;
height: 100vh;
gap: 1px;
background: #f0f0f0;
}
.sidebar {
width: 250px;
background: white;
border-right: 1px solid #ddd;
overflow-y: auto;
}
.room-info {
padding: 15px;
border-bottom: 1px solid #ddd;
}
.users-list {
padding: 10px;
}
.user-item {
padding: 8px;
margin: 5px 0;
background: #f5f5f5;
border-radius: 4px;
transition: background 0.3s;
}
.user-item.typing {
background: #e3f2fd;
}
.typing-indicator {
font-size: 0.8em;
color: #999;
margin-left: 10px;
}
.chat-area {
flex: 1;
display: flex;
flex-direction: column;
background: white;
}
.messages {
flex: 1;
overflow-y: auto;
padding: 15px;
}
.message {
margin: 10px 0;
padding: 10px;
background: #f5f5f5;
border-radius: 8px;
max-width: 70%;
}
.message.own-message {
margin-left: auto;
background: #e3f2fd;
}
.message-header {
display: flex;
justify-content: space-between;
margin-bottom: 5px;
font-size: 0.9em;
}
.timestamp {
color: #999;
}
.message-content {
word-wrap: break-word;
}
.input-area {
display: flex;
padding: 15px;
gap: 10px;
border-top: 1px solid #ddd;
}
.input-area input {
flex: 1;
padding: 8px;
border: 1px solid #ddd;
border-radius: 4px;
}
.input-area button {
padding: 8px 20px;
background: #007bff;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
}
.input-area button:hover {
background: #0056b3;
}
</style>
🐛 常见问题解决
问题 1:连接断开重连
const socket = io('http://localhost:3000', {
reconnection: true,
reconnectionDelay: 1000,
reconnectionDelayMax: 5000,
reconnectionAttempts: 5,
transports: ['websocket', 'polling'] // 降级支持
})
socket.on('disconnect', () => {
console.log('连接已断开,将自动重连...')
})
socket.on('connect', () => {
console.log('连接已恢复')
// 重新加载数据
reloadData()
})
问题 2:消息丢失
// 使用消息队列确保可靠传递
const queue: Message[] = []
socket.on('disconnect', () => {
// 保存未发送的消息
localStorage.setItem('pendingMessages', JSON.stringify(queue))
})
socket.on('connect', () => {
// 恢复并重发未发送的消息
const pending = JSON.parse(localStorage.getItem('pendingMessages') || '[]')
pending.forEach(msg => {
socket.emit('message:send', msg)
})
localStorage.removeItem('pendingMessages')
})
问题 3:扩展性问题
// 使用消息队列(Redis Pub/Sub)实现多服务器支持
// 服务器 1 和服务器 2 都订阅相同频道
const subscriber = redis.createClient()
subscriber.subscribe('chat-messages')
socket_io.on('connection', (socket) => {
socket.on('message:send', (message) => {
// 发布到 Redis,所有服务器都能接收
publisher.publish('chat-messages', JSON.stringify(message))
})
})
// 接收来自 Redis 的消息
subscriber.on('message', (channel, message) => {
const data = JSON.parse(message)
// 广播给当前服务器的所有客户端
socket_io.emit('message:new', data)
})
🎓 最佳实践
DO ✅
// 1. 实现自动重连
const socket = io(url, { reconnection: true })
// 2. 使用心跳检测连接
setInterval(() => {
socket.emit('ping')
}, 30000)
// 3. 区分消息优先级
socket.emit('message:send', {
content: msg,
priority: 'high'
})
// 4. 限制消息频率
const throttle = (fn, delay) => {
let timeout
return () => {
clearTimeout(timeout)
timeout = setTimeout(fn, delay)
}
}
socket.on('input', throttle(() => {
socket.emit('typing')
}, 500))
// 5. 使用认证
socket.on('connect', () => {
socket.emit('authenticate', { token })
})
DON'T ❌
// 1. 不要忽视错误处理
// socket.emit('message') // ❌ 无错误处理
socket.emit('message', {}, (error) => {
if (error) console.error(error)
})
// 2. 不要频繁创建新连接
// new WebSocket(url) // 每次都创建新连接
const socket = io(url) // 单一实例
// 3. 不要在 disconnect 时立即清理
// ❌ socket.off('message')
// ✅ 等待重连失败后再清理
// 4. 不要发送过大的消息
// socket.emit('message', largeObject)
// ✅ 分块发送或压缩
📚 扩展资源
总结
实时应用开发的关键要素:
- 选择正确的技术:WebSocket(双向)、SSE(单向)、消息队列(可靠性)
- 连接管理:自动重连、心跳检测、状态恢复
- 消息处理:去重、排序、优先级、限流
- 扩展性:使用消息队列支持多服务器
- 性能优化:压缩、分块、缓存
- 用户体验:离线提示、输入状态、错误恢复
掌握实时通信技术,构建响应式、沉浸式的应用!