消息队列在微服务中的应用
微服务架构下,服务之间如何通信?同步 HTTP 调用简单直接,但耦合紧、容错差。异步消息通信则实现了真正的松耦合——服务 A 只需要知道自己「发出了什么事件」,不需要知道谁在处理。
异步事件驱动
传统的同步调用:服务 A 调用服务 B,必须等待 B 返回才能继续。服务 B 挂了,A 也跟着失败。
事件驱动架构:用消息队列解耦。服务 A 发布事件,服务 B 订阅处理。A 不需要知道 B 的存在,通信变成了单向的「通知」。
flowchart LR
subgraph Sync["同步调用"]
A1["服务 A"] --> B1["服务 B"]
B1 --> A1
end
subgraph Async["异步事件"]
A2["服务 A"] --> MQ["消息队列"]
MQ --> B2["服务 B"]
end
事件驱动的好处
松耦合:服务之间不直接依赖,通过事件间接通信。
独立演进:服务可以独立开发、部署、扩展。
容错性强:消息队列提供缓冲,短暂的服务不可用不会导致消息丢失。
可扩展:新增消费者不影响现有服务。
事件驱动的问题
最终一致性:服务之间不再强一致,需要接受最终一致。
调试困难:跨服务的调用链路不直观,问题排查更复杂。
事务边界:跨服务的一致性需要 Saga 等模式来保证。
Saga 分布式事务
微服务架构下,每个服务有独立的数据库,本地事务无法覆盖跨服务操作。Saga 模式用「补偿」代替「回滚」,用一系列本地事务实现分布式事务。
Saga 的两种编排方式
编排式 Saga(Choreography):各服务通过事件相互通信,没有中央协调者。
sequenceDiagram
Order->>Payment: 支付 100 元
Payment-->>Order: 支付成功
Order->>Inventory: 扣减库存
Inventory-->>Order: 库存已扣
Order->>Shipping: 创建发货单
Shipping-->>Order: 发货单已创建
编排式 Saga 的补偿:
支付成功 → 库存扣减失败 → 补偿:退款
支付成功 → 库存扣减成功 → 发货失败 → 补偿:库存回滚 + 退款
编排式 Saga 问题:服务间循环依赖风险,事务边界不清晰。
编排式 Saga(Orchestration):中央协调者(Saga Orchestrator)管理整个事务流程。
sequenceDiagram
Orchestrator->>OrderService: 创建订单
OrderService-->>Orchestrator: 订单已创建
Orchestrator->>PaymentService: 支付
PaymentService-->>Orchestrator: 支付成功
Orchestrator->>InventoryService: 扣减库存
InventoryService-->>Orchestrator: 库存已扣
Orchestrator->>ShippingService: 创建发货单
Saga 实现示例
// Saga 编排器
public class OrderSagaOrchestrator {
@Autowired
private OrderService orderService;
@Autowired
private PaymentService paymentService;
@Autowired
private InventoryService inventoryService;
public void executeOrderSaga(OrderRequest request) {
try {
// Step 1: 创建订单
Order order = orderService.createOrder(request);
// Step 2: 支付
PaymentResult payment = paymentService.pay(order);
// Step 3: 扣减库存
inventoryService.reserve(order);
// Step 4: 完成订单
orderService.confirm(order);
} catch (PaymentException e) {
// 补偿:取消订单
orderService.cancel(order);
} catch (InventoryException e) {
// 补偿:退款
paymentService.refund(order.getId());
orderService.cancel(order);
}
}
}
Saga vs 2PC
数据同步与 CDC
微服务提倡「数据库 per 服务」,但有些场景需要跨服务共享数据。CDC(Change Data Capture,变更数据捕获)提供了一种优雅的解决方案。
CDC 工作原理
数据库 → Binlog/Redo Log → CDC 工具 → 消息队列 → 下游服务
CDC 工具监听数据库变更日志,将变更事件发布到消息队列,下游服务消费后更新自己的数据副本。
flowchart LR
DB["MySQL"] --> Binlog["Binlog"]
Binlog --> Debezium["Debezium"]
Debezium --> Kafka["Kafka"]
Kafka --> Service1["用户服务"]
Kafka --> Service2["订单服务"]
Kafka --> Analytics["分析服务"]
Debezium 示例
// Debezium 配置
public class DebeziumConfig {
@Bean
public io.debezium.config.Configuration debeziumConfig() {
return io.debezium.config.Configuration.create()
.with("name", "mysql-connector")
.with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
.with("database.hostname", "mysql")
.with("database.port", 3306)
.with("database.user", "debezium")
.with("database.password", "password")
.with("database.server.id", 184054)
.with("database.include.list", "mydb")
.with("table.include.list", "mydb.orders")
.with("topic.prefix", "dbz")
.build();
}
}
CDC 的价值
- 低延迟:直接读取 binlog,延迟可控制在毫秒级
- 无损捕获:不依赖业务代码,数据库变更一定被捕获
- 异构同步:同一个变更事件可以同时发给多个下游
最终一致性保障
事件驱动架构下,系统处于「最终一致」状态。如何保障最终一致性?
原则一:事件幂等
消费者必须能处理重复事件。
@KafkaListener(topics = "order-events")
public void handleOrderEvent(ConsumerRecord<String, OrderEvent> record) {
OrderEvent event = record.value();
// 业务层幂等处理
if (orderService.isProcessed(event.getId())) {
return;
}
orderService.processEvent(event);
}
原则二:补偿机制
定义好补偿事件(Compensating Event),当正向操作失败时执行补偿。
下单成功 → 支付失败 → 发送「订单取消」事件 → 各服务回滚
原则三:状态机流转
用状态机约束事件的合法流转,防止非法事件导致状态错乱。
// 订单状态机
public enum OrderStatus {
CREATED, // 订单创建
PAID, // 已支付
INVENTORY_RESERVED, // 库存已预留
SHIPPED, // 已发货
COMPLETED, // 已完成
CANCELLED // 已取消
}
// 合法的状态流转
public boolean canTransition(OrderStatus from, OrderStatus to) {
return switch (from) {
case CREATED -> to == PAID || to == CANCELLED;
case PAID -> to == INVENTORY_RESERVED || to == CANCELLED;
case INVENTORY_RESERVED -> to == SHIPPED || to == CANCELLED;
case SHIPPED -> to == COMPLETED;
default -> false;
};
}
原则四:超时检测
定期检测超时未完成的事务,主动补偿。
@Scheduled(fixedRate = 60000)
public void checkTimeoutOrders() {
List<Order> timeoutOrders = orderService.findTimeoutOrders();
for (Order order : timeoutOrders) {
// 发送超时补偿事件
eventPublisher.publish(new OrderTimeoutEvent(order.getId()));
}
}
经验之谈:事件驱动架构的核心挑战不是技术,而是组织。上下游服务由不同团队维护,事件的契约(Schema)一旦发布就难以变更。在发布事件之前,花时间设计好事件格式,考虑好未来可能的扩展。