消息可靠性与确认机制

双十一零点秒杀,用户下单成功看到「库存已锁定」的提示。但几秒后,系统提示「库存不足,订单已取消」。用户懵了:我明明抢到了,怎么又没了?

这个问题的根源,很可能在于消息可靠性与确认机制的设计缺陷。

三种投递语义

消息队列的可靠性通常用三种投递语义(Delivery Semantics)来描述,它们构成了一个经典的不可能三角。

At Most Once(最多一次)

消息可能丢失,但绝不会重复投递。发送后不等待确认,失败就丢弃。

Producer → Kafka → Consumer
           ↓ (可能丢失)
        [消息丢失]

实现方式:生产者发送后立即返回,不等待 Broker 确认;消费者使用自动提交 offset,收到消息就认为已处理。

适用场景:某些日志采集场景,丢失几条无所谓,重要的是不要重复(比如统计 UV)。

At Least Once(至少一次)

消息绝不会丢失,但可能重复投递。发送后等待确认,失败就重试。

Producer → Kafka → Consumer → ACK → 提交 offset
                ↓ (重复)
        [消息可能被重复消费]

实现方式:生产者配置重试机制,等待 Broker 确认;消费者手动提交 offset,处理成功后才提交。

适用场景:大多数业务场景,如订单处理、支付通知。重复消费可以通过业务幂等来兜底。

Exactly Once(恰好一次)

消息既不丢失,也不重复。这是最理想的状态,但实现代价最高。

Producer → Kafka → Consumer → 幂等处理 → ACK
                ↓ (恰好一次)
        [每条消息只被处理一次]

实现方式:Kafka 事务、幂等生产者、消费者端幂等处理。实现复杂,后面会有专题讨论。

适用场景:支付交易、账户扣款等对精确性要求极高的场景。

生产者确认机制

Kafka 生产者的 acks 配置决定了需要多少副本确认后才认为消息发送成功。

acks 配置含义可靠性性能
acks=0不等待任何确认低:leader 宕机可能丢消息最高
acks=1等待 leader 副本确认中:leader 宕机可能丢消息
acks=all / -1等待 ISR 所有副本确认高:需要 broker 配合
// 高可靠配置
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("acks", "all");           // 等待所有 ISR 副本确认
props.put("retries", Integer.MAX_VALUE);  // 重试次数足够大
props.put("enable.idempotence", true);    // 开启幂等生产者

配置警示acks=all 配合 retries 配置时,如果 min.insync.replicas < 2,在 broker 数量不足时会导致写入失败。需要同时配置副本数和 ISR 最小副本数。

消费者手动确认

自动提交 offset 简单,但存在消息丢失风险:消息拉取成功但处理失败,offset 已经提交,下次不会重新消费。

手动确认是生产环境的最佳实践:

while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        try {
            processMessage(record);
            // 处理成功,手动提交 offset
            consumer.commitSync(Collections.singletonMap(
                new TopicPartition(record.topic(), record.partition()),
                new OffsetAndMetadata(record.offset() + 1)
            ));
        } catch (Exception e) {
            // 处理失败,记录日志,等待重试
            log.error("处理消息失败", e);
        }
    }
}

批量手动提交

单条提交效率低,可以批量处理后统一提交:

Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        processMessage(record);
        offsets.put(
            new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1)
        );
    }
    
    // 批量提交所有已处理消息的 offset
    if (!offsets.isEmpty()) {
        consumer.commitSync(offsets);
        offsets.clear();
    }
}

幂等处理:兜底重试

At Least Once 语义下,消息可能重复。幂等处理是应对重复消息的标准方案。

数据库唯一索引

利用数据库唯一索引防止重复写入:

CREATE UNIQUE INDEX idx_order_id ON orders(order_id);

INSERT INTO orders (order_id, amount, status) 
VALUES ('ORDER_123', 100, 'CREATED')
ON CONFLICT (order_id) DO NOTHING;

分布式锁

对同一业务 key 加锁,防止并发重复处理:

public void processMessage(Message msg) {
    String key = msg.getKey();
    
    // 尝试获取分布式锁
    String lockKey = "process:" + key;
    if (!redis.setnx(lockKey, "1", 30)) {
        return; // 已被其他实例处理
    }
    
    try {
        doProcess(msg);
    } finally {
        redis.del(lockKey);
    }
}

经验之谈:幂等处理不是消息队列的责任,而是业务层的义务。无论使用哪种投递语义,业务代码都应该假设「消息可能被重复投递」,做好幂等保护。

可靠性配置检查清单

确保消息不丢失,至少检查以下几点:

  • 生产者配置 acks=all 和足够大的 retries
  • 生产者配置 enable.idempotence=true
  • Topic 配置 replication.factor >= 3
  • Broker 配置 min.insync.replicas >= 2
  • 消费者使用手动提交 offset
  • 业务代码实现幂等处理

每一条配置都是一道防线。配置得越多,可靠性越强,但性能代价也越大。根据业务对可靠性的要求,合理取舍。