📖 文章概述
从单体到微服务,从服务发现到可观测性,本文讲解如何构建和治理大规模微服务系统。
🎯 微服务架构演进
微服务系统设计
单体应用
↓
微服务拆分
├─ 按业务域划分
├─ 按能力划分
└─ 混合划分
↓
服务治理挑战
├─ 服务发现
├─ 负载均衡
├─ 故障处理
└─ 可观测性
↓
生产级系统
├─ 自动化部署
├─ 性能监控
├─ 故障恢复
└─ 成本优化
微服务通信模式对比
| 模式 | 延迟 | 可靠性 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| REST | 中 | 中 | 低 | 简单查询、CRUD |
| gRPC | 低 | 高 | 中 | 高性能通信 |
| 消息队列 | 高 | 高 | 中 | 异步解耦 |
| 事件溯源 | 中 | 高 | 高 | 审计、实时流 |
🔌 服务发现和注册
1. 基于 Consul 的服务发现
import { ConsulClient } from 'consul'
const consul = new ConsulClient({
host: 'localhost',
port: 8500
})
// 服务注册
async function registerService() {
const serviceConfig = {
id: 'user-service-1',
name: 'user-service',
address: 'localhost',
port: 3001,
tags: ['backend', 'users'],
check: {
http: 'http://localhost:3001/health',
interval: '10s',
timeout: '5s'
},
meta: {
version: '1.0.0',
environment: 'production'
}
}
await consul.agent.service.register(serviceConfig)
console.log('服务已注册:', serviceConfig.name)
}
// 服务发现
async function discoverService(serviceName) {
const services = await consul.health.service({
service: serviceName,
passing: true // 仅返回健康的实例
})
return services.map(entry => ({
id: entry.Service.ID,
address: entry.Service.Address,
port: entry.Service.Port,
meta: entry.Service.Meta
}))
}
// 服务注销
async function deregisterService(serviceId) {
await consul.agent.service.deregister(serviceId)
console.log('服务已注销:', serviceId)
}
// 健康检查端点
app.get('/health', (req, res) => {
res.json({
status: 'healthy',
timestamp: new Date(),
uptime: process.uptime()
})
})
// 使用
async function setupService() {
await registerService()
// 服务上线时注册
const server = app.listen(3001)
// 服务关闭时注销
process.on('SIGTERM', async () => {
await deregisterService('user-service-1')
server.close()
})
}
2. Kubernetes 原生服务发现
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-service
spec:
replicas: 3
selector:
matchLabels:
app: user-service
template:
metadata:
labels:
app: user-service
version: v1
spec:
containers:
- name: user-service
image: myregistry/user-service:1.0.0
ports:
- containerPort: 3001
livenessProbe:
httpGet:
path: /health
port: 3001
initialDelaySeconds: 10
periodSeconds: 5
readinessProbe:
httpGet:
path: /ready
port: 3001
initialDelaySeconds: 5
periodSeconds: 10
env:
- name: SERVICE_NAME
value: user-service
- name: LOG_LEVEL
value: info
---
apiVersion: v1
kind: Service
metadata:
name: user-service
spec:
type: ClusterIP
ports:
- port: 80
targetPort: 3001
selector:
app: user-service
⚖️ 负载均衡和流量管理
3. 轮询和加权负载均衡
class LoadBalancer {
constructor() {
this.services = []
this.currentIndex = 0
}
// 添加服务实例
addService(host, port, weight = 1) {
for (let i = 0; i < weight; i++) {
this.services.push({ host, port })
}
}
// 轮询算法
roundRobin() {
if (this.services.length === 0) {
throw new Error('没有可用的服务')
}
const service = this.services[this.currentIndex]
this.currentIndex = (this.currentIndex + 1) % this.services.length
return service
}
// 最少连接
leastConnections() {
if (this.services.length === 0) {
throw new Error('没有可用的服务')
}
// 这需要跟踪每个服务的连接数
return this.services.reduce((min, service) =>
(service.connections || 0) < (min.connections || 0) ? service : min
)
}
// IP Hash(会话亲和)
ipHash(clientIp) {
const hash = clientIp
.split('.')
.reduce((acc, num) => acc + parseInt(num), 0)
return this.services[hash % this.services.length]
}
}
// 使用
const lb = new LoadBalancer()
lb.addService('host1', 3001, 2) // 权重 2
lb.addService('host2', 3001, 1) // 权重 1
lb.addService('host3', 3001, 1) // 权重 1
// HTTP 代理中使用
import http from 'http'
import httpProxy from 'http-proxy'
const proxy = httpProxy.createProxyServer({})
http.createServer((req, res) => {
const service = lb.roundRobin()
proxy.web(req, res, {
target: `http://${service.host}:${service.port}`
})
}).listen(8080)
4. 服务网格(Istio)配置
# VirtualService - 路由规则
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: user-service
spec:
hosts:
- user-service
http:
- match:
- uri:
prefix: /v2/
route:
- destination:
host: user-service
subset: v2
weight: 10
- destination:
host: user-service
subset: v1
weight: 90 # 灰度发布:90% v1,10% v2
- route:
- destination:
host: user-service
subset: v1
---
# DestinationRule - 负载均衡策略
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: user-service
spec:
host: user-service
trafficPolicy:
connectionPool:
tcp:
maxConnections: 100
http:
http1MaxPendingRequests: 100
http2MaxRequests: 100
outlierDetection:
consecutive5xxErrors: 5
interval: 30s
baseEjectionTime: 30s
subsets:
- name: v1
labels:
version: v1
- name: v2
labels:
version: v2
🔄 服务通信和断路器
5. 断路器模式
class CircuitBreaker {
constructor(options = {}) {
this.state = 'CLOSED' // CLOSED, OPEN, HALF_OPEN
this.failureCount = 0
this.successCount = 0
this.lastFailureTime = null
this.failureThreshold = options.failureThreshold || 5
this.successThreshold = options.successThreshold || 2
this.timeout = options.timeout || 60000 // 1 分钟
}
async execute(fn) {
if (this.state === 'OPEN') {
if (Date.now() - this.lastFailureTime > this.timeout) {
this.state = 'HALF_OPEN'
this.successCount = 0
} else {
throw new Error('断路器打开,服务不可用')
}
}
try {
const result = await fn()
this.onSuccess()
return result
} catch (error) {
this.onFailure()
throw error
}
}
onSuccess() {
this.failureCount = 0
if (this.state === 'HALF_OPEN') {
this.successCount++
if (this.successCount >= this.successThreshold) {
this.state = 'CLOSED'
console.log('✅ 断路器闭合,服务恢复')
}
}
}
onFailure() {
this.failureCount++
this.lastFailureTime = Date.now()
if (this.failureCount >= this.failureThreshold) {
this.state = 'OPEN'
console.log('🔴 断路器打开,服务熔断')
}
}
getState() {
return {
state: this.state,
failureCount: this.failureCount,
successCount: this.successCount
}
}
}
// 使用断路器
const userServiceBreaker = new CircuitBreaker({
failureThreshold: 5,
timeout: 30000
})
async function callUserService(userId) {
try {
return await userServiceBreaker.execute(async () => {
const response = await fetch(`http://user-service/users/${userId}`)
if (!response.ok) throw new Error('用户服务错误')
return response.json()
})
} catch (error) {
console.error('调用失败:', error)
// 返回缓存的默认值
return { id: userId, status: 'cached' }
}
}
6. 重试和超时策略
class ResilientClient {
constructor(options = {}) {
this.maxRetries = options.maxRetries || 3
this.timeout = options.timeout || 5000
this.backoffMultiplier = options.backoffMultiplier || 2
}
async callWithRetry(fn, operationName = 'operation') {
let lastError
for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
try {
return await this.callWithTimeout(fn)
} catch (error) {
lastError = error
if (attempt < this.maxRetries) {
const delay = Math.pow(this.backoffMultiplier, attempt - 1) * 1000
console.log(
`⚠️ ${operationName} 失败 (${attempt}/${this.maxRetries}),` +
`${delay}ms 后重试...`
)
await this.sleep(delay)
}
}
}
throw lastError
}
async callWithTimeout(fn) {
return Promise.race([
fn(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('操作超时')), this.timeout)
)
])
}
sleep(ms) {
return new Promise(r => setTimeout(r, ms))
}
}
// 使用
const client = new ResilientClient({
maxRetries: 3,
timeout: 5000,
backoffMultiplier: 2
})
const userData = await client.callWithRetry(
() => fetch('http://api.example.com/users/123'),
'获取用户数据'
)
🔍 可观测性:日志、指标和追踪
7. 结构化日志
import winston from 'winston'
const logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
transports: [
new winston.transports.File({ filename: 'error.log', level: 'error' }),
new winston.transports.File({ filename: 'combined.log' })
]
})
// 结构化日志记录
function logServiceCall(serviceName, requestId, metadata) {
logger.info({
timestamp: new Date().toISOString(),
requestId: requestId,
serviceName: serviceName,
level: 'INFO',
message: `调用 ${serviceName}`,
...metadata // 扩展字段
})
}
// 使用中间件
app.use((req, res, next) => {
const requestId = req.headers['x-request-id'] || generateId()
req.requestId = requestId
const startTime = Date.now()
res.on('finish', () => {
const duration = Date.now() - startTime
logger.info({
requestId,
timestamp: new Date().toISOString(),
method: req.method,
path: req.path,
statusCode: res.statusCode,
duration: duration,
userAgent: req.get('user-agent')
})
})
next()
})
8. 分布式追踪(Jaeger)
import { initTracer } from 'jaeger-client'
const tracer = initTracer({
serviceName: 'user-service',
sampler: {
type: 'const',
param: 1
},
reporter: {
host: 'localhost',
port: 6831
}
})
// 创建跨度
async function getUser(userId, parentSpan = null) {
const span = tracer.startSpan('getUser', {
childOf: parentSpan
})
try {
span.setTag('userId', userId)
// 调用数据库
const dbSpan = tracer.startSpan('db.query', {
childOf: span
})
const user = await db.users.findById(userId)
dbSpan.finish()
return user
} finally {
span.finish()
}
}
// 中间件集成
app.use((req, res, next) => {
const span = tracer.startSpan(req.path, {
tags: {
'span.kind': 'server',
'http.method': req.method,
'http.url': req.url
}
})
req.span = span
res.on('finish', () => {
span.setTag('http.status_code', res.statusCode)
span.finish()
})
next()
})
9. 指标收集(Prometheus)
import prometheus from 'prom-client'
// 定义指标
const httpRequestDuration = new prometheus.Histogram({
name: 'http_request_duration_seconds',
help: 'HTTP 请求耗时(秒)',
labelNames: ['method', 'route', 'status_code'],
buckets: [0.1, 0.5, 1, 2, 5]
})
const activeConnections = new prometheus.Gauge({
name: 'active_connections',
help: '活跃连接数',
labelNames: ['service']
})
const requestsTotal = new prometheus.Counter({
name: 'http_requests_total',
help: '总请求数',
labelNames: ['method', 'route', 'status_code']
})
// 中间件
app.use((req, res, next) => {
activeConnections.inc({ service: 'user-service' })
const startTime = Date.now()
res.on('finish', () => {
const duration = (Date.now() - startTime) / 1000
httpRequestDuration.observe(
{ method: req.method, route: req.route?.path, status_code: res.statusCode },
duration
)
requestsTotal.inc({
method: req.method,
route: req.route?.path,
status_code: res.statusCode
})
activeConnections.dec({ service: 'user-service' })
})
next()
})
// 暴露指标端点
app.get('/metrics', (req, res) => {
res.set('Content-Type', prometheus.register.contentType)
res.end(prometheus.register.metrics())
})
⚠️ 监控和告警
10. 告警规则配置
# prometheus-rules.yml
groups:
- name: service-alerts
rules:
# 高错误率告警
- alert: HighErrorRate
expr: |
rate(http_requests_total{status_code=~"5.."}[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "{{ $labels.service }} 错误率过高"
description: "过去 5 分钟错误率 {{ $value | humanizePercentage }}"
# 响应时间过长
- alert: HighLatency
expr: |
histogram_quantile(0.95, http_request_duration_seconds_bucket) > 1
for: 5m
labels:
severity: warning
annotations:
summary: "{{ $labels.service }} 响应时间过长"
description: "P95 延迟 {{ $value | humanizeDuration }}"
# 内存使用过高
- alert: HighMemoryUsage
expr: |
process_resident_memory_bytes / 1024 / 1024 / 1024 > 2
for: 10m
labels:
severity: warning
annotations:
summary: "内存使用过高"
description: "当前内存使用 {{ $value | humanize }}GB"
# 服务不可用
- alert: ServiceDown
expr: |
up{job="user-service"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "{{ $labels.instance }} 服务离线"
description: "{{ $labels.service }} 无法联系"
🎓 最佳实践
DO ✅
// 1. 使用幂等性确保消息不重复处理
async function processMessage(message) {
const idempotencyKey = message.id
// 检查是否已处理
const processed = await cache.get(`processed:${idempotencyKey}`)
if (processed) {
return { status: 'already_processed' }
}
// 处理消息
const result = await doWork(message)
// 标记为已处理
await cache.set(`processed:${idempotencyKey}`, true, 24 * 3600)
return result
}
// 2. 使用优雅关闭
app.listen(3000)
process.on('SIGTERM', async () => {
console.log('收到 SIGTERM,开始优雅关闭...')
// 停止接收新请求
server.close(() => {
console.log('HTTP 服务已关闭')
})
// 等待现有请求完成(最多 30 秒)
setTimeout(() => {
console.log('强制退出')
process.exit(1)
}, 30000)
})
// 3. 实现健康检查
app.get('/health', (req, res) => {
res.json({
status: 'healthy',
checks: {
database: await checkDatabase(),
cache: await checkCache(),
externalService: await checkExternalService()
}
})
})
📚 总结
微服务治理核心:
- 服务发现: Consul/Kubernetes/Eureka
- 负载均衡: 轮询、权重、IP Hash、服务网格
- 故障处理: 断路器、重试、超时
- 可观测性: 日志、指标、追踪
- 监控告警: 规则配置、实时通知
构建高可用的微服务系统就是不断优化这些方面!