事件驱动架构

你的电商系统刚刚经历了一次「秒杀」活动。系统设计了复杂的限流、排队逻辑,数据库连接池被打满,Redis 集群过载。但当你复盘时发现:真正的峰值其实只有 1000 QPS,远没达到设计的上限。

问题出在哪?

「同步调用的系统,一处慢,处处慢。」 事件驱动架构的核心思想,就是把「实时响应」和「异步处理」分开,让系统从容应对流量峰值。

事件驱动架构的本质

事件驱动架构(Event-Driven Architecture,EDA)是一种软件设计范式,其中组件之间的通信通过事件的产生和消费来完成,而不是直接的函数调用。

flowchart LR
    subgraph Producer["事件生产者"]
        API[API Gateway]
        DB[(数据库)]
        IoT[IoT 设备]
    end

    subgraph Broker["事件总线"]
        SNS[SNS]
        SQS[SQS]
        EventBridge[EventBridge]
        Kafka[Kafka/Kinesis]
    end

    subgraph Consumer["事件消费者"]
        Lambda[Lambda]
        ECS[ECS/Fargate]
        K8s[K8s Pods]
    end

    API & DB & IoT --> |产生事件| Broker
    Broker --> |路由事件| Lambda & ECS & K8s

核心模式

模式一:发布-订阅(Pub/Sub)

sequenceDiagram
    participant P as 生产者
    participant Topic as 消息主题
    participant S1 as 订阅者1
    participant S2 as 订阅者2
    participant S3 as 订阅者3

    P->>Topic: 发布事件
    Note over Topic: order.created
    Topic->>S1: 异步推送
    Topic->>S2: 异步推送
    Topic->>S3: 异步推送
    Note over S1: 发邮件通知
    Note over S2: 更新库存
    Note over S3: 更新推荐系统
pub-sub/producer.ts
import { EventBridgeClient, PutEventsCommand } from '@aws-sdk/client-eventbridge';

const eb = new EventBridgeClient({});

export const publishOrderCreated = async (order: Order) => {
  const result = await eb.send(new PutEventsCommand({
    Entries: [{
      EventBusName: 'default',
      Source: 'com.ecommerce.orders',
      DetailType: 'order.created',
      Time: new Date(),
      Detail: JSON.stringify({
        orderId: order.id,
        customerId: order.customerId,
        totalAmount: order.totalAmount,
        items: order.items,
      }),
    }],
  }));

  return result;
};
pub-sub/consumer-email.ts
import { EventBridgeCloudWatchDestination, Lambda } from '@aws-sdk/client-eventbridge';

export const handler = async (event: any) => {
  const detail = JSON.parse(event.detail);

  console.log(`Processing order.created for order: ${detail.orderId}`);

  // 发送确认邮件
  await sendEmail({
    to: detail.customerEmail,
    subject: `Order Confirmed: #${detail.orderId}`,
    body: `Your order totaling $${detail.totalAmount} has been confirmed.`,
  });

  return { statusCode: 200 };
};

模式二:消息队列(Queue)

flowchart TB
    subgraph In["入站"]
        API[API 请求]
    end

    subgraph Queue["消息队列"]
        Q[SQS FIFO]
    end

    subgraph Workers["并行处理"]
        W1[Worker 1]
        W2[Worker 2]
        W3[Worker 3]
    end

    API --> |发送消息| Q
    Q --> |轮询拉取| W1 & W2 & W3

    style Q fill:#74c0fc
    style W1 fill:#63e6be
    style W2 fill:#63e6be
    style W3 fill:#63e6be
queue/producer.ts
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';

const sqs = new SQSClient({});

export const enqueueImageProcessing = async (imageId: string, userId: string) => {
  await sqs.send(new SendMessageCommand({
    QueueUrl: process.env.QUEUE_URL!,
    MessageBody: JSON.stringify({
      imageId,
      userId,
      operation: 'resize',
      timestamp: Date.now(),
    }),
    // FIFO 队列需要 MessageGroupId
    MessageGroupId: 'image-processing',
    // 消息去重
    MessageDeduplicationId: `img-${imageId}-${Date.now()}`,
  }));
};
queue/consumer.ts
import { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } from '@aws-sdk/client-sqs';

const sqs = new SQSClient({});

export const handler = async (event: SQSEvent) => {
  for (const record of event.Records) {
    const message = JSON.parse(record.body);

    try {
      await processImage(message);
      // 处理成功后删除消息
      await sqs.send(new DeleteMessageCommand({
        QueueUrl: process.env.QUEUE_URL!,
        ReceiptHandle: record.receiptHandle,
      }));
    } catch (error) {
      // 失败时不删除,消息会重新可见
      console.error(`Failed to process image ${message.imageId}:`, error);
      throw error;
    }
  }
};

模式三:事件溯源(Event Sourcing)

flowchart LR
    subgraph Commands["命令"]
        CMD1[创建订单]
        CMD2[添加商品]
        CMD3[付款]
        CMD4[取消订单]
    end

    subgraph Events["事件存储"]
        Store[(事件日志)]
    end

    subgraph Projections["物化视图"]
        V1[订单状态]
        V2[库存]
        V3[用户积分]
    end

    CMD1 & CMD2 & CMD3 & CMD4 --> |验证后| Store
    Store --> |重放| V1 & V2 & V3
event-sourcing/aggregate.ts
interface Event {
  type: string;
  payload: any;
  timestamp: number;
  aggregateId: string;
}

class OrderAggregate {
  private events: Event[] = [];
  private state = {
    orderId: '',
    status: 'pending',
    items: [] as OrderItem[],
    totalAmount: 0,
  };

  createOrder(orderId: string, items: OrderItem[]) {
    this.state.orderId = orderId;
    this.state.items = items;
    this.state.totalAmount = items.reduce((sum, item) => sum + item.price, 0);
    this.state.status = 'pending';

    this.events.push({
      type: 'ORDER_CREATED',
      payload: { orderId, items },
      timestamp: Date.now(),
      aggregateId: orderId,
    });
  }

  addItem(item: OrderItem) {
    this.state.items.push(item);
    this.state.totalAmount += item.price;

    this.events.push({
      type: 'ITEM_ADDED',
      payload: { item },
      timestamp: Date.now(),
      aggregateId: this.state.orderId,
    });
  }

  pay(paymentId: string) {
    if (this.state.status !== 'pending') {
      throw new Error('Order cannot be paid');
    }

    this.state.status = 'paid';

    this.events.push({
      type: 'ORDER_PAID',
      payload: { paymentId },
      timestamp: Date.now(),
      aggregateId: this.state.orderId,
    });
  }

  getEvents() {
    return [...this.events];
  }

  // 从事件重建状态
  static fromEvents(events: Event[]): OrderAggregate {
    const aggregate = new OrderAggregate();
    for (const event of events) {
      // 重放事件重建状态
      aggregate.state.orderId = event.aggregateId;
    }
    return aggregate;
  }
}

模式四:Saga 模式

sequenceDiagram
    participant Orch as Saga 编排器
    participant S1 as 库存服务
    participant S2 as 支付服务
    participant S3 as 物流服务

    Orch->>S1: 预留库存
    S1-->>Orch: 库存预留成功
    Orch->>S2: 执行支付
    S2-->>Orch: 支付成功
    Orch->>S3: 创建物流单
    S3-->>Orch: 物流创建成功
    Note over Orch: 订单完成

    Note over Orch,S2: 如果支付失败
    Orch->>S1: 释放库存
    S1-->>Orch: 库存已释放
saga/orchestrator.ts
interface SagaStep {
  name: string;
  execute: () => Promise<any>;
  compensate: () => Promise<void>;
}

class OrderSaga {
  private steps: SagaStep[] = [];

  addStep(step: SagaStep): this {
    this.steps.push(step);
    return this;
  }

  async execute(): Promise<any> {
    const executed: SagaStep[] = [];

    try {
      for (const step of this.steps) {
        const result = await step.execute();
        executed.push(step);
      }
    } catch (error) {
      // 回滚已执行的步骤
      console.error('Saga failed, compensating...');
      for (const step of executed.reverse()) {
        try {
          await step.compensate();
        } catch (compensateError) {
          console.error(`Compensate failed for ${step.name}:`, compensateError);
        }
      }
      throw error;
    }
  }
}

// 定义 Saga
const orderSaga = new OrderSaga()
  .addStep({
    name: 'reserve-inventory',
    execute: async () => {
      const result = await inventoryService.reserve(orderId, items);
      return { reservationId: result.id };
    },
    compensate: async () => {
      await inventoryService.release(orderId);
    },
  })
  .addStep({
    name: 'process-payment',
    execute: async () => {
      const result = await paymentService.charge(customerId, totalAmount);
      return { paymentId: result.id };
    },
    compensate: async () => {
      await paymentService.refund(paymentId);
    },
  })
  .addStep({
    name: 'create-shipment',
    execute: async () => {
      const result = await shippingService.create(orderId, address);
      return { trackingId: result.id };
    },
    compensate: async () => {
      await shippingService.cancel(trackingId);
    },
  });

Serverless 事件源

AWS Lambda 支持的事件源

事件源触发方式常见场景
API GatewayHTTP 请求REST/GraphQL API
S3对象创建/修改文件处理、CDN 触发
DynamoDB Streams数据变更CDC、实时处理
Kinesis流数据日志分析、实时 BI
SQS队列消息异步任务处理
SNS主题发布广播通知
EventBridge规则匹配跨服务编排
CloudWatch Events定时/系统事件定时任务

EventBridge Schema Registry

eventbridge/schema.ts
import { SchemaRegistryClient, discoverSchema } from '@aws-sdk/client-eventbridge';

const schemaClient = new SchemaRegistryClient({});

// 验证事件格式
export const validateEvent = async (event: any) => {
  const schema = await discoverSchema({
    SchemaArn: 'arn:aws:schemas:us-east-1:123456789:schema/com.ecommerce/orders',
  });

  // Schema Registry 提供类型安全的验证
  return schema.validate(event);
};

错误处理策略

重试与死信

dlq-config.yaml
Resources:
  MyQueue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: my-processor-queue
      VisibilityTimeout: 30
      MessageRetentionPeriod: 1209600  # 14 天
      RedrivePolicy:
        # 超过 3 次处理失败,进入死信队列
        maxReceiveCount: 3
        deadLetterTargetArn: !GetAtt DeadLetterQueue.Arn

  DeadLetterQueue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: my-processor-dlq

幂等处理

export const handler = async (event: SQSEvent) => {
  for (const record of event.Records) {
    const { orderId, idempotencyKey } = JSON.parse(record.body);

    // 检查是否已处理(使用 Redis 或 DynamoDB)
    const processed = await redis.exists(`processed:${idempotencyKey}`);

    if (processed) {
      console.log(`Skipping duplicate: ${idempotencyKey}`);
      continue;
    }

    await processOrder(orderId);

    // 标记为已处理(设置过期时间)
    await redis.setex(`processed:${idempotencyKey}`, 86400, '1');
  }
};

监控与可观测性

分布式追踪

lib/tracing.ts
import { LambdaClient, InvokeCommand } from '@aws-sdk/client-lambda';
import { Context } from 'aws-lambda';

export const handler = async (event: any, context: Context) => {
  // 提取上游追踪头
  const traceId = event.headers?.['x-trace-id'] || context.awsRequestId;

  // 在处理开始时记录
  const startTime = Date.now();

  try {
    const result = await processEvent(event, traceId);

    // 记录成功
    console.log(JSON.stringify({
      event: 'processing_complete',
      traceId,
      duration: Date.now() - startTime,
      status: 'success',
    }));

    return result;
  } catch (error) {
    // 记录失败
    console.error(JSON.stringify({
      event: 'processing_failed',
      traceId,
      duration: Date.now() - startTime,
      status: 'error',
      error: error.message,
    }));
    throw error;
  }
};

CloudWatch 仪表板

{
  "widgets": [
    {
      "type": "metric",
      "properties": {
        "title": "Event Processing",
        "metrics": [
          ["Serverless/Events", "EventsProcessed", "Service", "order-service"],
          [".", "EventsFailed", ".", "."],
          [".", "ProcessingLatency", ".", "."]
        ]
      }
    },
    {
      "type": "log",
      "properties": {
        "title": "Recent Errors",
        "query": "fields @timestamp, @message | filter status = 'error' | sort @timestamp desc | limit 20"
      }
    }
  ]
}

权衡矩阵

模式优势劣势适用场景
Pub/Sub解耦、广播消息丢失风险通知、分析
Queue可靠处理、背压延迟增加异步任务
Event Sourcing完整历史、审计复杂度高金融、订单
Saga分布式事务补偿逻辑复杂跨服务流程

延伸思考

事件驱动架构不是银弹。它解决了很多问题,但也引入了新的复杂性:

  1. 最终一致性:消息延迟导致系统状态在某一时刻可能不一致
  2. 调试困难:异步流程的调用链不直观
  3. 消息顺序:如果需要保证顺序,消息系统选择受限

在选择事件驱动架构之前,先问自己:我的业务真的需要异步吗? 对于大多数 CRUD 操作,同步调用可能更简单。只有当性能、可靠性或解耦需求超过一致性需求时,才考虑事件驱动。

另一个方向是:如何设计好的事件? 一个好的事件应该:

  • 自包含(包含所有需要的信息)
  • 语义清晰(事件名明确表达发生了什么)
  • 版本化(考虑事件格式可能演进)
  • 不可变(事件一旦产生就不修改)