消息可靠性与确认机制
双十一零点秒杀,用户下单成功看到「库存已锁定」的提示。但几秒后,系统提示「库存不足,订单已取消」。用户懵了:我明明抢到了,怎么又没了?
这个问题的根源,很可能在于消息可靠性与确认机制的设计缺陷。
三种投递语义
消息队列的可靠性通常用三种投递语义(Delivery Semantics)来描述,它们构成了一个经典的不可能三角。
At Most Once(最多一次)
消息可能丢失,但绝不会重复投递。发送后不等待确认,失败就丢弃。
Producer → Kafka → Consumer
↓ (可能丢失)
[消息丢失]
实现方式:生产者发送后立即返回,不等待 Broker 确认;消费者使用自动提交 offset,收到消息就认为已处理。
适用场景:某些日志采集场景,丢失几条无所谓,重要的是不要重复(比如统计 UV)。
At Least Once(至少一次)
消息绝不会丢失,但可能重复投递。发送后等待确认,失败就重试。
Producer → Kafka → Consumer → ACK → 提交 offset
↓ (重复)
[消息可能被重复消费]
实现方式:生产者配置重试机制,等待 Broker 确认;消费者手动提交 offset,处理成功后才提交。
适用场景:大多数业务场景,如订单处理、支付通知。重复消费可以通过业务幂等来兜底。
Exactly Once(恰好一次)
消息既不丢失,也不重复。这是最理想的状态,但实现代价最高。
Producer → Kafka → Consumer → 幂等处理 → ACK
↓ (恰好一次)
[每条消息只被处理一次]
实现方式:Kafka 事务、幂等生产者、消费者端幂等处理。实现复杂,后面会有专题讨论。
适用场景:支付交易、账户扣款等对精确性要求极高的场景。
生产者确认机制
Kafka 生产者的 acks 配置决定了需要多少副本确认后才认为消息发送成功。
// 高可靠配置
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("acks", "all"); // 等待所有 ISR 副本确认
props.put("retries", Integer.MAX_VALUE); // 重试次数足够大
props.put("enable.idempotence", true); // 开启幂等生产者
配置警示:acks=all 配合 retries 配置时,如果 min.insync.replicas < 2,在 broker 数量不足时会导致写入失败。需要同时配置副本数和 ISR 最小副本数。
消费者手动确认
自动提交 offset 简单,但存在消息丢失风险:消息拉取成功但处理失败,offset 已经提交,下次不会重新消费。
手动确认是生产环境的最佳实践:
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processMessage(record);
// 处理成功,手动提交 offset
consumer.commitSync(Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
));
} catch (Exception e) {
// 处理失败,记录日志,等待重试
log.error("处理消息失败", e);
}
}
}
批量手动提交
单条提交效率低,可以批量处理后统一提交:
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processMessage(record);
offsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
}
// 批量提交所有已处理消息的 offset
if (!offsets.isEmpty()) {
consumer.commitSync(offsets);
offsets.clear();
}
}
幂等处理:兜底重试
At Least Once 语义下,消息可能重复。幂等处理是应对重复消息的标准方案。
数据库唯一索引
利用数据库唯一索引防止重复写入:
CREATE UNIQUE INDEX idx_order_id ON orders(order_id);
INSERT INTO orders (order_id, amount, status)
VALUES ('ORDER_123', 100, 'CREATED')
ON CONFLICT (order_id) DO NOTHING;
分布式锁
对同一业务 key 加锁,防止并发重复处理:
public void processMessage(Message msg) {
String key = msg.getKey();
// 尝试获取分布式锁
String lockKey = "process:" + key;
if (!redis.setnx(lockKey, "1", 30)) {
return; // 已被其他实例处理
}
try {
doProcess(msg);
} finally {
redis.del(lockKey);
}
}
经验之谈:幂等处理不是消息队列的责任,而是业务层的义务。无论使用哪种投递语义,业务代码都应该假设「消息可能被重复投递」,做好幂等保护。
可靠性配置检查清单
确保消息不丢失,至少检查以下几点:
每一条配置都是一道防线。配置得越多,可靠性越强,但性能代价也越大。根据业务对可靠性的要求,合理取舍。