消息积压处理策略
早上九点,运营同事发现用户下单后迟迟收不到发货通知。查监控发现,消息队列的消费延迟从平时的几百毫秒飙到了十几分钟。再看消费者日志,全部在等待数据库连接。双十一的噩梦,似乎又要重演了。
消息积压,是消息队列运维中最常见的故障之一。
消息积压的原因
消息积压的本质是生产速度 > 消费速度。具体原因通常有以下几类:
消费者宕机或异常
消费者进程崩溃、OOM、连接池耗尽……任何导致消费者停止消费的因素,都会造成积压。
消费者处理能力不足
代码逻辑问题导致单条消息处理时间过长,或者消费者数量本身就不足以支撑当前的吞吐量。
流量突增
营销活动、大促、热点事件带来的瞬时流量,往往是平时的几十上百倍,消费能力跟不上就会积压。
下游系统故障
消费者依赖的数据库、外部服务响应变慢甚至超时,导致消费 RT 暴增,吞吐量骤降。
flowchart LR
Producer --> Topic["消息持续生产"]
Topic --> |生产速度 > 消费速度| Lag["消费 Lag 增长"]
Lag --> Overflow["消息积压"]
Overflow --> Alert["告警触发"]
积压监控:消费 Lag
消费 Lag 是衡量消息积压的核心指标,表示「还有多少消息未被消费」。Lag 越大,积压越严重。
Kafka 消费 Lag 监控
// 计算消费 Lag
public long calculateLag(String consumerGroup, String topic, int partitionId) {
// 获取分区最新消息的 offset
long latestOffset = kafkaAdmin.getEndOffsets(topic, partitionId).get(partitionId);
// 获取消费者提交的 offset
OffsetAndMetadata committed = consumer.committed(
new TopicPartition(topic, partitionId)
);
long committedOffset = committed != null ? committed.offset() : 0;
// Lag = 最新消息 offset - 已提交 offset
return latestOffset - committedOffset;
}
告警阈值设置
正常: Lag < 1000
关注: 1000 <= Lag < 10000
告警: Lag >= 10000
经验之谈:告警阈值应该根据业务容忍度设置。如果业务可以接受 5 分钟延迟,阈值可以设大一些;如果要求实时,阈值应该设得很小甚至接近零。
紧急处理措施
消息已经积压了,如何快速处理?
扩容消费者
最直接的办法是增加消费者数量。注意:Kafka 的并行度由分区数决定,消费者数量不能超过分区数。
// 动态调整消费者并发数(Spring Kafka 示例)
@KafkaListener(topics = "orders", concurrency = "6")
public void handleOrder(ConsumerRecord<String, String> record) {
// 每个并发实例处理一个分区
}
跳过积压消息
如果积压太久,或者部分消息已经过期,可以考虑跳过:
// 跳过旧消息,从最新位置开始消费(谨慎使用!)
consumer.seekToEnd(topicPartition);
// 或者跳过到指定时间戳
consumer.seekToTimestamp(topicPartition, targetTimestamp);
警告:跳过消息会丢失数据。只适用于消息本身已经无效(如过期的优惠券发放)或明确知道后果的场景。
临时停写
如果积压无法消化,紧急情况下可以暂停上游生产,给消费者喘息的时间。这会损失部分消息,需要评估是否可以接受。
预防措施
消费者隔离
按业务重要性分级,核心业务和边缘业务使用不同的消费者组。核心业务积压时,优先保障;边缘业务可以降级处理。
限流保护
消费者端配置限流,避免瞬时流量冲垮下游系统:
RateLimiter limiter = RateLimiter.create(1000); // 每秒最多处理 1000 条
@KafkaListener(topics = "orders")
public void handleOrder(ConsumerRecord<String, String> record) {
limiter.acquire(); // 限流
processOrder(record);
}
消费能力评估
上线前评估消费能力,确保消费者数量与分区数匹配:
监控告警
完善的监控体系是预防积压的关键:
// 关键监控指标
// 1. 消费 Lag(已消费 offset 与生产 offset 的差值)
// 2. 消费吞吐量(msg/s)
// 3. 消费延迟(消息产生到被消费的时间)
// 4. 消费者心跳健康状态
处理决策流程
当收到消息积压告警时,按以下顺序排查和决策:
flowchart TD
A["积压告警"] --> B{"消费者是否存活?"}
B --> |否| C["重启消费者"]
B --> |是| D{"Lag 是否持续增长?"}
D --> |否| E["观察 + 保持监控"]
D --> |是| F{"消费者数量 < 分区数?"}
F --> |是| G["扩容消费者"]
F --> |否| H{"下游系统是否正常?"}
H --> |否| I["排查下游故障"]
H --> |是| J["优化消费逻辑"]
I --> K["恢复后扩容"]
J --> G
消息积压是故障,也是改进的机会。每次积压后复盘,分析根因:是流量预估不足、消费逻辑效率低、还是系统容量不够?从根本上解决问题,才能避免下一次积压。