微服务

微服务治理完全指南:服务网格、负载均衡、链路追踪与可观测性

深入学习微服务架构的关键技术:服务通信、故障隔离、监控告警、性能优化

18 分钟阅读
#微服务 #服务网格 #负载均衡 #可观测性

📖 文章概述

从单体到微服务,从服务发现到可观测性,本文讲解如何构建和治理大规模微服务系统。


🎯 微服务架构演进

微服务系统设计

单体应用
   ↓
微服务拆分
   ├─ 按业务域划分
   ├─ 按能力划分
   └─ 混合划分
   ↓
服务治理挑战
   ├─ 服务发现
   ├─ 负载均衡
   ├─ 故障处理
   └─ 可观测性
   ↓
生产级系统
   ├─ 自动化部署
   ├─ 性能监控
   ├─ 故障恢复
   └─ 成本优化

微服务通信模式对比

模式延迟可靠性复杂度适用场景
REST简单查询、CRUD
gRPC高性能通信
消息队列异步解耦
事件溯源审计、实时流

🔌 服务发现和注册

1. 基于 Consul 的服务发现

import { ConsulClient } from 'consul'

const consul = new ConsulClient({
  host: 'localhost',
  port: 8500
})

// 服务注册
async function registerService() {
  const serviceConfig = {
    id: 'user-service-1',
    name: 'user-service',
    address: 'localhost',
    port: 3001,
    tags: ['backend', 'users'],
    check: {
      http: 'http://localhost:3001/health',
      interval: '10s',
      timeout: '5s'
    },
    meta: {
      version: '1.0.0',
      environment: 'production'
    }
  }
  
  await consul.agent.service.register(serviceConfig)
  console.log('服务已注册:', serviceConfig.name)
}

// 服务发现
async function discoverService(serviceName) {
  const services = await consul.health.service({
    service: serviceName,
    passing: true  // 仅返回健康的实例
  })
  
  return services.map(entry => ({
    id: entry.Service.ID,
    address: entry.Service.Address,
    port: entry.Service.Port,
    meta: entry.Service.Meta
  }))
}

// 服务注销
async function deregisterService(serviceId) {
  await consul.agent.service.deregister(serviceId)
  console.log('服务已注销:', serviceId)
}

// 健康检查端点
app.get('/health', (req, res) => {
  res.json({
    status: 'healthy',
    timestamp: new Date(),
    uptime: process.uptime()
  })
})

// 使用
async function setupService() {
  await registerService()
  
  // 服务上线时注册
  const server = app.listen(3001)
  
  // 服务关闭时注销
  process.on('SIGTERM', async () => {
    await deregisterService('user-service-1')
    server.close()
  })
}

2. Kubernetes 原生服务发现

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: user-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: user-service
  template:
    metadata:
      labels:
        app: user-service
        version: v1
    spec:
      containers:
      - name: user-service
        image: myregistry/user-service:1.0.0
        ports:
        - containerPort: 3001
        livenessProbe:
          httpGet:
            path: /health
            port: 3001
          initialDelaySeconds: 10
          periodSeconds: 5
        readinessProbe:
          httpGet:
            path: /ready
            port: 3001
          initialDelaySeconds: 5
          periodSeconds: 10
        env:
        - name: SERVICE_NAME
          value: user-service
        - name: LOG_LEVEL
          value: info

---
apiVersion: v1
kind: Service
metadata:
  name: user-service
spec:
  type: ClusterIP
  ports:
  - port: 80
    targetPort: 3001
  selector:
    app: user-service

⚖️ 负载均衡和流量管理

3. 轮询和加权负载均衡

class LoadBalancer {
  constructor() {
    this.services = []
    this.currentIndex = 0
  }
  
  // 添加服务实例
  addService(host, port, weight = 1) {
    for (let i = 0; i < weight; i++) {
      this.services.push({ host, port })
    }
  }
  
  // 轮询算法
  roundRobin() {
    if (this.services.length === 0) {
      throw new Error('没有可用的服务')
    }
    
    const service = this.services[this.currentIndex]
    this.currentIndex = (this.currentIndex + 1) % this.services.length
    
    return service
  }
  
  // 最少连接
  leastConnections() {
    if (this.services.length === 0) {
      throw new Error('没有可用的服务')
    }
    
    // 这需要跟踪每个服务的连接数
    return this.services.reduce((min, service) => 
      (service.connections || 0) < (min.connections || 0) ? service : min
    )
  }
  
  // IP Hash(会话亲和)
  ipHash(clientIp) {
    const hash = clientIp
      .split('.')
      .reduce((acc, num) => acc + parseInt(num), 0)
    
    return this.services[hash % this.services.length]
  }
}

// 使用
const lb = new LoadBalancer()
lb.addService('host1', 3001, 2)  // 权重 2
lb.addService('host2', 3001, 1)  // 权重 1
lb.addService('host3', 3001, 1)  // 权重 1

// HTTP 代理中使用
import http from 'http'
import httpProxy from 'http-proxy'

const proxy = httpProxy.createProxyServer({})

http.createServer((req, res) => {
  const service = lb.roundRobin()
  proxy.web(req, res, {
    target: `http://${service.host}:${service.port}`
  })
}).listen(8080)

4. 服务网格(Istio)配置

# VirtualService - 路由规则
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: user-service
spec:
  hosts:
  - user-service
  http:
  - match:
    - uri:
        prefix: /v2/
    route:
    - destination:
        host: user-service
        subset: v2
      weight: 10
    - destination:
        host: user-service
        subset: v1
      weight: 90  # 灰度发布:90% v1,10% v2
  - route:
    - destination:
        host: user-service
        subset: v1

---
# DestinationRule - 负载均衡策略
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: user-service
spec:
  host: user-service
  trafficPolicy:
    connectionPool:
      tcp:
        maxConnections: 100
      http:
        http1MaxPendingRequests: 100
        http2MaxRequests: 100
    outlierDetection:
      consecutive5xxErrors: 5
      interval: 30s
      baseEjectionTime: 30s
  subsets:
  - name: v1
    labels:
      version: v1
  - name: v2
    labels:
      version: v2

🔄 服务通信和断路器

5. 断路器模式

class CircuitBreaker {
  constructor(options = {}) {
    this.state = 'CLOSED'  // CLOSED, OPEN, HALF_OPEN
    this.failureCount = 0
    this.successCount = 0
    this.lastFailureTime = null
    
    this.failureThreshold = options.failureThreshold || 5
    this.successThreshold = options.successThreshold || 2
    this.timeout = options.timeout || 60000  // 1 分钟
  }
  
  async execute(fn) {
    if (this.state === 'OPEN') {
      if (Date.now() - this.lastFailureTime > this.timeout) {
        this.state = 'HALF_OPEN'
        this.successCount = 0
      } else {
        throw new Error('断路器打开,服务不可用')
      }
    }
    
    try {
      const result = await fn()
      this.onSuccess()
      return result
    } catch (error) {
      this.onFailure()
      throw error
    }
  }
  
  onSuccess() {
    this.failureCount = 0
    
    if (this.state === 'HALF_OPEN') {
      this.successCount++
      if (this.successCount >= this.successThreshold) {
        this.state = 'CLOSED'
        console.log('✅ 断路器闭合,服务恢复')
      }
    }
  }
  
  onFailure() {
    this.failureCount++
    this.lastFailureTime = Date.now()
    
    if (this.failureCount >= this.failureThreshold) {
      this.state = 'OPEN'
      console.log('🔴 断路器打开,服务熔断')
    }
  }
  
  getState() {
    return {
      state: this.state,
      failureCount: this.failureCount,
      successCount: this.successCount
    }
  }
}

// 使用断路器
const userServiceBreaker = new CircuitBreaker({
  failureThreshold: 5,
  timeout: 30000
})

async function callUserService(userId) {
  try {
    return await userServiceBreaker.execute(async () => {
      const response = await fetch(`http://user-service/users/${userId}`)
      if (!response.ok) throw new Error('用户服务错误')
      return response.json()
    })
  } catch (error) {
    console.error('调用失败:', error)
    // 返回缓存的默认值
    return { id: userId, status: 'cached' }
  }
}

6. 重试和超时策略

class ResilientClient {
  constructor(options = {}) {
    this.maxRetries = options.maxRetries || 3
    this.timeout = options.timeout || 5000
    this.backoffMultiplier = options.backoffMultiplier || 2
  }
  
  async callWithRetry(fn, operationName = 'operation') {
    let lastError
    
    for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
      try {
        return await this.callWithTimeout(fn)
      } catch (error) {
        lastError = error
        
        if (attempt < this.maxRetries) {
          const delay = Math.pow(this.backoffMultiplier, attempt - 1) * 1000
          console.log(
            `⚠️ ${operationName} 失败 (${attempt}/${this.maxRetries}),` +
            `${delay}ms 后重试...`
          )
          await this.sleep(delay)
        }
      }
    }
    
    throw lastError
  }
  
  async callWithTimeout(fn) {
    return Promise.race([
      fn(),
      new Promise((_, reject) =>
        setTimeout(() => reject(new Error('操作超时')), this.timeout)
      )
    ])
  }
  
  sleep(ms) {
    return new Promise(r => setTimeout(r, ms))
  }
}

// 使用
const client = new ResilientClient({
  maxRetries: 3,
  timeout: 5000,
  backoffMultiplier: 2
})

const userData = await client.callWithRetry(
  () => fetch('http://api.example.com/users/123'),
  '获取用户数据'
)

🔍 可观测性:日志、指标和追踪

7. 结构化日志

import winston from 'winston'

const logger = winston.createLogger({
  level: 'info',
  format: winston.format.json(),
  transports: [
    new winston.transports.File({ filename: 'error.log', level: 'error' }),
    new winston.transports.File({ filename: 'combined.log' })
  ]
})

// 结构化日志记录
function logServiceCall(serviceName, requestId, metadata) {
  logger.info({
    timestamp: new Date().toISOString(),
    requestId: requestId,
    serviceName: serviceName,
    level: 'INFO',
    message: `调用 ${serviceName}`,
    ...metadata  // 扩展字段
  })
}

// 使用中间件
app.use((req, res, next) => {
  const requestId = req.headers['x-request-id'] || generateId()
  req.requestId = requestId
  
  const startTime = Date.now()
  
  res.on('finish', () => {
    const duration = Date.now() - startTime
    
    logger.info({
      requestId,
      timestamp: new Date().toISOString(),
      method: req.method,
      path: req.path,
      statusCode: res.statusCode,
      duration: duration,
      userAgent: req.get('user-agent')
    })
  })
  
  next()
})

8. 分布式追踪(Jaeger)

import { initTracer } from 'jaeger-client'

const tracer = initTracer({
  serviceName: 'user-service',
  sampler: {
    type: 'const',
    param: 1
  },
  reporter: {
    host: 'localhost',
    port: 6831
  }
})

// 创建跨度
async function getUser(userId, parentSpan = null) {
  const span = tracer.startSpan('getUser', {
    childOf: parentSpan
  })
  
  try {
    span.setTag('userId', userId)
    
    // 调用数据库
    const dbSpan = tracer.startSpan('db.query', {
      childOf: span
    })
    
    const user = await db.users.findById(userId)
    
    dbSpan.finish()
    
    return user
  } finally {
    span.finish()
  }
}

// 中间件集成
app.use((req, res, next) => {
  const span = tracer.startSpan(req.path, {
    tags: {
      'span.kind': 'server',
      'http.method': req.method,
      'http.url': req.url
    }
  })
  
  req.span = span
  
  res.on('finish', () => {
    span.setTag('http.status_code', res.statusCode)
    span.finish()
  })
  
  next()
})

9. 指标收集(Prometheus)

import prometheus from 'prom-client'

// 定义指标
const httpRequestDuration = new prometheus.Histogram({
  name: 'http_request_duration_seconds',
  help: 'HTTP 请求耗时(秒)',
  labelNames: ['method', 'route', 'status_code'],
  buckets: [0.1, 0.5, 1, 2, 5]
})

const activeConnections = new prometheus.Gauge({
  name: 'active_connections',
  help: '活跃连接数',
  labelNames: ['service']
})

const requestsTotal = new prometheus.Counter({
  name: 'http_requests_total',
  help: '总请求数',
  labelNames: ['method', 'route', 'status_code']
})

// 中间件
app.use((req, res, next) => {
  activeConnections.inc({ service: 'user-service' })
  
  const startTime = Date.now()
  
  res.on('finish', () => {
    const duration = (Date.now() - startTime) / 1000
    
    httpRequestDuration.observe(
      { method: req.method, route: req.route?.path, status_code: res.statusCode },
      duration
    )
    
    requestsTotal.inc({
      method: req.method,
      route: req.route?.path,
      status_code: res.statusCode
    })
    
    activeConnections.dec({ service: 'user-service' })
  })
  
  next()
})

// 暴露指标端点
app.get('/metrics', (req, res) => {
  res.set('Content-Type', prometheus.register.contentType)
  res.end(prometheus.register.metrics())
})

⚠️ 监控和告警

10. 告警规则配置

# prometheus-rules.yml
groups:
- name: service-alerts
  rules:
  # 高错误率告警
  - alert: HighErrorRate
    expr: |
      rate(http_requests_total{status_code=~"5.."}[5m]) > 0.05
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "{{ $labels.service }} 错误率过高"
      description: "过去 5 分钟错误率 {{ $value | humanizePercentage }}"

  # 响应时间过长
  - alert: HighLatency
    expr: |
      histogram_quantile(0.95, http_request_duration_seconds_bucket) > 1
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "{{ $labels.service }} 响应时间过长"
      description: "P95 延迟 {{ $value | humanizeDuration }}"

  # 内存使用过高
  - alert: HighMemoryUsage
    expr: |
      process_resident_memory_bytes / 1024 / 1024 / 1024 > 2
    for: 10m
    labels:
      severity: warning
    annotations:
      summary: "内存使用过高"
      description: "当前内存使用 {{ $value | humanize }}GB"

  # 服务不可用
  - alert: ServiceDown
    expr: |
      up{job="user-service"} == 0
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "{{ $labels.instance }} 服务离线"
      description: "{{ $labels.service }} 无法联系"

🎓 最佳实践

DO ✅

// 1. 使用幂等性确保消息不重复处理
async function processMessage(message) {
  const idempotencyKey = message.id
  
  // 检查是否已处理
  const processed = await cache.get(`processed:${idempotencyKey}`)
  if (processed) {
    return { status: 'already_processed' }
  }
  
  // 处理消息
  const result = await doWork(message)
  
  // 标记为已处理
  await cache.set(`processed:${idempotencyKey}`, true, 24 * 3600)
  
  return result
}

// 2. 使用优雅关闭
app.listen(3000)

process.on('SIGTERM', async () => {
  console.log('收到 SIGTERM,开始优雅关闭...')
  
  // 停止接收新请求
  server.close(() => {
    console.log('HTTP 服务已关闭')
  })
  
  // 等待现有请求完成(最多 30 秒)
  setTimeout(() => {
    console.log('强制退出')
    process.exit(1)
  }, 30000)
})

// 3. 实现健康检查
app.get('/health', (req, res) => {
  res.json({
    status: 'healthy',
    checks: {
      database: await checkDatabase(),
      cache: await checkCache(),
      externalService: await checkExternalService()
    }
  })
})

📚 总结

微服务治理核心:

  • 服务发现: Consul/Kubernetes/Eureka
  • 负载均衡: 轮询、权重、IP Hash、服务网格
  • 故障处理: 断路器、重试、超时
  • 可观测性: 日志、指标、追踪
  • 监控告警: 规则配置、实时通知

构建高可用的微服务系统就是不断优化这些方面!