云原生

Serverless 架构完全指南:AWS Lambda、函数即服务与成本优化

深入学习 Serverless 架构,掌握函数部署、冷启动优化、成本管理等核心技术

16 分钟阅读
#Serverless #AWS Lambda #函数计算 #云原生

📖 文章概述

Serverless 改变了应用架构方式。本文讲解如何使用 Lambda、优化冷启动、管理成本,以及构建生产级 Serverless 应用。


🎯 Serverless 架构对比

计算模型演进

物理服务器(自管理)
   ↓ 复杂性高、成本高、维护负担重
虚拟机/IaaS(基础设施即服务)
   ↓ 仍需管理操作系统、中等复杂度
容器/PaaS(平台即服务)
   ↓ 按应用计费、不管理基础设施
Serverless/FaaS(函数即服务)
   ↓ 按调用计费、自动扩展、完全托管

部署模型对比

模型部署单位成本模式冷启动状态管理适用场景
服务器应用固定N/A本地传统应用
容器镜像按资源时间分布式微服务
Serverless函数按调用次数+执行时间毫秒-秒外部事件驱动

🚀 AWS Lambda 快速开始

1. Lambda 函数创建和部署

// handler.js - Lambda 处理器
export const handler = async (event, context) => {
  console.log('收到事件:', event)
  
  // Lambda 执行上下文信息
  console.log('函数名:', context.functionName)
  console.log('请求 ID:', context.awsRequestId)
  console.log('剩余时间:', context.getRemainingTimeInMillis())
  
  try {
    const result = {
      statusCode: 200,
      body: JSON.stringify({
        message: 'Hello from Lambda!',
        input: event
      })
    }
    
    return result
  } catch (error) {
    return {
      statusCode: 500,
      body: JSON.stringify({ error: error.message })
    }
  }
}

// 本地测试
import { handler } from './handler.js'

const mockEvent = {
  path: '/api/users',
  method: 'GET',
  queryStringParameters: { id: '123' }
}

const mockContext = {
  functionName: 'my-function',
  awsRequestId: 'test-123',
  getRemainingTimeInMillis: () => 30000
}

await handler(mockEvent, mockContext)

2. Serverless Framework 部署

# serverless.yml
service: my-api

provider:
  name: aws
  runtime: nodejs18.x
  region: us-east-1
  environment:
    STAGE: ${self:provider.stage}
    DB_HOST: ${ssm:/db/host~true}
    DB_PASSWORD: ${ssm:/db/password~true}
  iamRoleStatements:
    - Effect: Allow
      Action:
        - s3:GetObject
        - s3:PutObject
      Resource: "arn:aws:s3:::my-bucket/*"
    - Effect: Allow
      Action:
        - dynamodb:Query
        - dynamodb:Scan
      Resource: "arn:aws:dynamodb:*:*:table/Users"

functions:
  # HTTP API
  getUser:
    handler: handlers/getUser.handler
    timeout: 30
    memorySize: 256
    environment:
      TABLE_NAME: Users
    events:
      - http:
          path: users/{id}
          method: get
          cors: true
  
  # 异步处理
  processOrder:
    handler: handlers/processOrder.handler
    timeout: 300
    memorySize: 1024
    events:
      - sns:
          arn: arn:aws:sns:us-east-1:123456789:OrderPlaced
          topicName: OrderPlaced
  
  # 定时任务
  cleanupExpiredSessions:
    handler: handlers/cleanup.handler
    timeout: 60
    events:
      - schedule:
          rate: cron(0 */6 * * ? *)  # 每 6 小时

resources:
  Resources:
    UsersTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: Users
        AttributeDefinitions:
          - AttributeName: id
            AttributeType: S
        KeySchema:
          - AttributeName: id
            KeyType: HASH
        BillingMode: PAY_PER_REQUEST

plugins:
  - serverless-plugin-tracing
  - serverless-offline

⚡ 性能优化

3. 冷启动优化

// 初始化在处理器外部(重用连接)
const AWS = require('aws-sdk')
const db = new AWS.DynamoDB.DocumentClient()

// 连接复用
let cachedConnection = null

async function getConnection() {
  if (cachedConnection) {
    return cachedConnection
  }
  
  cachedConnection = await createConnection()
  return cachedConnection
}

// ✅ 推荐:在处理器外部初始化
const connection = getConnection()

export const handler = async (event, context) => {
  // 使用缓存的连接
  const db = await connection
  
  const user = await db.get({
    TableName: 'Users',
    Key: { id: event.pathParameters.id }
  }).promise()
  
  return {
    statusCode: 200,
    body: JSON.stringify(user)
  }
}

// 冷启动监控
function recordColdStart(context) {
  const isColdStart = !process.env.LAMBDA_EXECUTION_CONTEXT
  
  console.log(`冷启动: ${isColdStart ? '是' : '否'}`)
  
  if (isColdStart) {
    console.log(`冷启动耗时: ${Date.now() - process.env.START_TIME}ms`)
  }
  
  process.env.LAMBDA_EXECUTION_CONTEXT = context.awsRequestId
}

4. 内存和超时优化

// 监控内存使用
function getMemoryUsage() {
  const usage = process.memoryUsage()
  return {
    heapUsed: Math.round(usage.heapUsed / 1024 / 1024),  // MB
    heapTotal: Math.round(usage.heapTotal / 1024 / 1024),
    external: Math.round(usage.external / 1024 / 1024),
    rss: Math.round(usage.rss / 1024 / 1024)
  }
}

export const handler = async (event, context) => {
  const startMem = getMemoryUsage()
  const startTime = Date.now()
  
  try {
    // 业务逻辑
    const result = await processData(event)
    
    const endMem = getMemoryUsage()
    const duration = Date.now() - startTime
    
    console.log({
      duration,
      memoryUsed: endMem.heapUsed - startMem.heapUsed,
      maxMemoryConfigured: context.memoryLimitInMB,
      remainingTime: context.getRemainingTimeInMillis()
    })
    
    return {
      statusCode: 200,
      body: JSON.stringify(result)
    }
  } catch (error) {
    console.error('错误:', error)
    
    // 如果即将超时,立即返回
    if (context.getRemainingTimeInMillis() < 5000) {
      return {
        statusCode: 503,
        body: JSON.stringify({ error: '函数执行超时' })
      }
    }
    
    throw error
  }
}

💰 成本优化

5. 成本计算和优化

// 成本计算
class ServerlessCostCalculator {
  constructor(options = {}) {
    // AWS Lambda 定价(美国东部)
    this.invokePrice = 0.0000002  // 每次调用
    this.gbSecondPrice = 0.0000166667  // 每 GB·秒
    this.freeInvocations = 1000000  // 每月免费调用数
    this.freeGbSeconds = 400000  // 每月免费 GB·秒
  }
  
  calculateCost(monthlyInvocations, avgMemoryMb, avgDurationMs) {
    // 计算总 GB·秒
    const durationSeconds = avgDurationMs / 1000
    const gbSeconds = (avgMemoryMb / 1024) * durationSeconds * monthlyInvocations
    
    // 计算超过免费额度的部分
    const billableInvocations = Math.max(0, monthlyInvocations - this.freeInvocations)
    const billableGbSeconds = Math.max(0, gbSeconds - this.freeGbSeconds)
    
    // 计算费用
    const invocationCost = billableInvocations * this.invokePrice
    const gbSecondCost = billableGbSeconds * this.gbSecondPrice
    
    return {
      monthlyInvocations,
      avgMemoryMb,
      avgDurationMs,
      totalGbSeconds: gbSeconds,
      invocationCost: invocationCost.toFixed(4),
      gbSecondCost: gbSecondCost.toFixed(4),
      totalCost: (invocationCost + gbSecondCost).toFixed(4),
      estimatedMonthlyUSD: (invocationCost + gbSecondCost).toFixed(2)
    }
  }
  
  // 优化建议
  getOptimizations(invokeRate, memoryConfig, duration) {
    const recommendations = []
    
    // 内存优化
    if (memoryConfig > 1024) {
      recommendations.push({
        type: 'memory',
        suggestion: '考虑减少内存配置',
        potential_savings: `$${(memoryConfig / 1024 * 0.001).toFixed(2)}/月`
      })
    }
    
    // 并发优化
    if (invokeRate < 100) {
      recommendations.push({
        type: 'concurrency',
        suggestion: '预留并发度设置较低,可以考虑提高以减少冷启动'
      })
    }
    
    // 执行时间优化
    if (duration > 30000) {
      recommendations.push({
        type: 'duration',
        suggestion: '考虑优化代码减少执行时间',
        potential_savings: `${Math.round((duration - 30000) / 1000)} 秒/调用`
      })
    }
    
    return recommendations
  }
}

// 使用成本计算器
const calculator = new ServerlessCostCalculator()

const cost = calculator.calculateCost(
  100000000,  // 每月 1 亿次调用
  512,        // 平均内存 512 MB
  1500        // 平均执行时间 1.5 秒
)

console.log('成本分析:', cost)
console.log('优化建议:', calculator.getOptimizations(100000000, 512, 1500))

6. 预留并发度管理

import AWS from 'aws-sdk'

const lambda = new AWS.Lambda()

// 设置预留并发度
async function setReservedConcurrency(functionName, concurrency) {
  try {
    const result = await lambda.putFunctionConcurrency({
      FunctionName: functionName,
      ReservedConcurrentExecutions: concurrency
    }).promise()
    
    console.log(`已为 ${functionName} 设置预留并发度: ${concurrency}`)
    return result
  } catch (error) {
    console.error('设置失败:', error)
  }
}

// 监控并发使用情况
async function getConcurrencyMetrics(functionName) {
  const cloudwatch = new AWS.CloudWatch()
  
  const metrics = await cloudwatch.getMetricStatistics({
    Namespace: 'AWS/Lambda',
    MetricName: 'ConcurrentExecutions',
    Dimensions: [
      {
        Name: 'FunctionName',
        Value: functionName
      }
    ],
    StartTime: new Date(Date.now() - 3600000),  // 过去 1 小时
    EndTime: new Date(),
    Period: 300,  // 5 分钟
    Statistics: ['Maximum', 'Average']
  }).promise()
  
  return metrics.Datapoints
}

// 自动扩展预留并发度
async function autoScaleReservedConcurrency(functionName, targetUtilization = 0.7) {
  try {
    const metrics = await getConcurrencyMetrics(functionName)
    
    if (metrics.length === 0) {
      console.log('暂无指标数据')
      return
    }
    
    const maxConcurrency = Math.max(...metrics.map(m => m.Maximum))
    const recommendedConcurrency = Math.ceil(maxConcurrency / targetUtilization)
    
    console.log(`当前最高并发: ${maxConcurrency}`)
    console.log(`推荐预留并发度: ${recommendedConcurrency}`)
    
    await setReservedConcurrency(functionName, recommendedConcurrency)
  } catch (error) {
    console.error('自动扩展失败:', error)
  }
}

🏗️ 事件驱动架构

7. 多事件源处理

// S3 事件
export const s3Handler = async (event) => {
  for (const record of event.Records) {
    const bucket = record.s3.bucket.name
    const key = decodeURIComponent(record.s3.object.key)
    
    console.log(`处理 s3://${bucket}/${key}`)
    
    // 处理文件
    await processFile(bucket, key)
  }
}

// DynamoDB Streams 事件
export const dynamodbHandler = async (event) => {
  for (const record of event.Records) {
    const eventName = record.eventName
    const keys = record.dynamodb.Keys
    const newImage = record.dynamodb.NewImage
    const oldImage = record.dynamodb.OldImage
    
    if (eventName === 'INSERT') {
      console.log('新记录插入:', newImage)
    } else if (eventName === 'MODIFY') {
      console.log('记录修改:', newImage)
    } else if (eventName === 'REMOVE') {
      console.log('记录删除:', oldImage)
    }
  }
}

// SQS 事件
export const sqsHandler = async (event) => {
  const failedMessageIds = []
  
  for (const record of event.Records) {
    try {
      const message = JSON.parse(record.body)
      await processMessage(message)
    } catch (error) {
      console.error('处理失败:', error)
      failedMessageIds.push(record.messageId)
    }
  }
  
  // 返回失败的消息 ID,SQS 会重新处理
  return {
    batchItemFailures: failedMessageIds.map(id => ({
      itemId: id
    }))
  }
}

// API Gateway 事件
export const apiHandler = async (event) => {
  const { path, httpMethod, body, queryStringParameters } = event
  
  if (path === '/users' && httpMethod === 'GET') {
    const users = await getUsers(queryStringParameters)
    return {
      statusCode: 200,
      body: JSON.stringify(users)
    }
  } else if (path === '/users' && httpMethod === 'POST') {
    const user = JSON.parse(body)
    const created = await createUser(user)
    return {
      statusCode: 201,
      body: JSON.stringify(created)
    }
  }
  
  return {
    statusCode: 404,
    body: JSON.stringify({ error: 'Not Found' })
  }
}

🔐 安全和最佳实践

8. 环境变量和密钥管理

import AWS from 'aws-sdk'

const secretsManager = new AWS.SecretsManager()

// 获取密钥(推荐用于敏感信息)
let cachedSecret = null

async function getSecret(secretName) {
  if (cachedSecret) {
    return cachedSecret
  }
  
  try {
    const data = await secretsManager.getSecretValue({
      SecretId: secretName
    }).promise()
    
    cachedSecret = JSON.parse(data.SecretString)
    return cachedSecret
  } catch (error) {
    console.error('获取密钥失败:', error)
    throw error
  }
}

// 使用密钥
export const handler = async (event) => {
  const dbPassword = await getSecret('prod/db/password')
  
  // 使用密钥
  await connectToDatabase(dbPassword)
}

// 环境变量(非敏感信息)
const env = {
  stage: process.env.STAGE,
  table_name: process.env.TABLE_NAME,
  log_level: process.env.LOG_LEVEL
}

9. 日志和监控

// 结构化日志
class Logger {
  static log(level, message, context = {}) {
    console.log(JSON.stringify({
      timestamp: new Date().toISOString(),
      level,
      message,
      awsRequestId: context.awsRequestId,
      functionName: context.functionName,
      ...context
    }))
  }
  
  static info(message, context = {}) {
    this.log('INFO', message, context)
  }
  
  static error(message, error, context = {}) {
    this.log('ERROR', message, {
      ...context,
      error: error.message,
      stack: error.stack
    })
  }
}

export const handler = async (event, context) => {
  Logger.info('收到请求', {
    awsRequestId: context.awsRequestId,
    functionName: context.functionName,
    eventPath: event.path
  })
  
  try {
    const result = await process(event)
    Logger.info('处理成功', { result })
    return result
  } catch (error) {
    Logger.error('处理失败', error, {
      awsRequestId: context.awsRequestId
    })
    throw error
  }
}

🎓 最佳实践总结

DO ✅

// 1. 使用环境变量配置
process.env.STAGE
process.env.TABLE_NAME

// 2. 实现幂等性
const idempotencyKey = event.requestId
await cache.setIfNotExists(idempotencyKey, result)

// 3. 快速返回
export const handler = async (event) => {
  // 异步任务发送到队列
  await sqs.sendMessage(event)
  return { statusCode: 202 }
}

// 4. 明确的错误处理
try {
  // 业务逻辑
} catch (error) {
  if (error.code === 'ResourceNotFound') {
    return { statusCode: 404 }
  }
  throw error
}

DON'T ❌

// 1. 不要在处理器内部创建连接
// ❌
export const handler = async () => {
  const conn = await createConnection()
}

// ✅ 在处理器外部创建
const conn = await createConnection()
export const handler = async () => {
  // 使用 conn
}

// 2. 不要忽视超时
// ❌ 没有检查
await longRunningTask()

// ✅ 检查剩余时间
if (context.getRemainingTimeInMillis() < 10000) {
  throw new Error('接近超时')
}

// 3. 不要返回大对象
// ❌ 可能超过 6MB 限制
return { data: hugeArray }

// ✅ 分页返回
return { data: hugeArray.slice(0, 100) }

📚 总结

Serverless 核心要点:

  • 简单部署: 无需管理基础设施
  • 自动扩展: 按需处理并发
  • 按量计费: 只为实际使用付费
  • 冷启动: 优化初始化和依赖
  • 事件驱动: 与 AWS 生态集成
  • 成本优化: 监控和预留并发度

Serverless 是云原生应用的未来!