Work Queue 工作队列模式

晚上 8 点,电商平台的秒杀活动开始了。瞬时间,订单创建请求从平时的 1000 QPS 暴涨到 10 万 QPS。你的服务能承受吗?如果订单处理是同步的,数据库、缓存、外部支付接口都会被瞬间打爆。

但聪明的架构师不会让这种事发生。他们把订单请求先接收下来,扔进一个队列,然后按照系统能承受的速度慢慢处理。这就像三峡大坝的作用——不是让洪水直接冲向下游,而是在库容范围内调节流量。

这就是 Work Queue 工作队列模式的核心思想。

工作队列模式的核心思想

Work Queue 模式将任务的提交执行解耦。任务发起者不需要等待任务完成,只需要将任务描述扔进队列;worker 从队列中获取任务,按照自己的节奏执行。

flowchart LR
    subgraph Producer["生产者"]
        P["业务服务"]
    end

    subgraph Queue["消息队列"]
        Q["任务队列"]
    end

    subgraph Consumer["消费者"]
        W1["Worker 1"]
        W2["Worker 2"]
        W3["Worker N"]
    end

    P -->|"提交任务"| Q
    Q -->|"拉取任务"| W1
    Q -->|"拉取任务"| W2
    Q -->|"拉取任务"| W3

这种模式带来了几个关键优势:

  1. 削峰填谷:瞬时高峰被队列吸收,worker 平稳处理
  2. 异步处理:发起者不需要等待任务完成
  3. 负载均衡:多个 worker 分担任务处理
  4. 解耦:生产者和消费者独立演进,互不感知

分布式任务队列

Kafka:高性能日志流

Kafka 适合高吞吐量的日志类任务,如日志收集、事件处理、数据管道。

public class KafkaTaskProducer {
    private final KafkaProducer<String, TaskMessage> producer;
    private final String taskTopic = "work-queue-tasks";

    public void submitTask(Task task) {
        TaskMessage message = TaskMessage.newBuilder()
            .setTaskId(UUID.randomUUID().toString())
            .setTaskType(task.getType())
            .setPayload(task.getPayload())
            .setPriority(task.getPriority())
            .setSubmitTime(System.currentTimeMillis())
            .build();

        // 按任务类型分区,保证同类型任务有序处理
        producer.send(new ProducerRecord<>(
            taskTopic,
            task.getType(),  // partition key
            message
        ));
    }
}

public class KafkaTaskConsumer {
    private final KafkaConsumer<String, TaskMessage> consumer;

    public void processTasks() {
        while (running) {
            ConsumerRecords<String, TaskMessage> records = consumer.poll(Duration.ofSeconds(1));

            for (ConsumerRecord<String, TaskMessage> record : records) {
                TaskMessage task = record.value();
                try {
                    processTask(task);
                    // 手动提交 offset,保证 exactly-once
                    consumer.commitSync();
                } catch (Exception e) {
                    // 任务失败,进入重试队列
                    retryQueue.offer(task);
                }
            }
        }
    }
}

RabbitMQ:灵活的路由

RabbitMQ 适合需要复杂路由、多租户隔离、任务优先级的场景。

public class RabbitTaskProducer {
    private final RabbitTemplate template;

    public void submitTask(Task task) {
        Map<String, Object> headers = new HashMap<>();
        headers.put("x-task-type", task.getType());
        headers.put("x-priority", task.getPriority());
        headers.put("x-retry-count", 0);

        template.convertAndSend(
            "work-exchange",    // 交换机
            "task." + task.getType(),  // 路由键
            task.getPayload(),
            m -> {
                m.getMessageProperties().setHeaders(headers);
                // 消息 TTL,过期后进入死信队列
                m.getMessageProperties().setExpiration("3600000");
                return m;
            }
        );
    }
}

public class RabbitTaskConsumer {
    @RabbitListener(queues = "work-queue", concurrency = "3-10")
    public void processTask(TaskMessage message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        try {
            // 处理任务
            doProcess(message.getPayload());

            // 手动确认
            channel.basicAck(tag, false);
        } catch (Exception e) {
            // 任务失败,消息重新入队
            channel.basicNack(tag, false, true);
        }
    }
}

RocketMQ:事务消息

RocketMQ 提供事务消息能力,适合需要「本地事务 + 消息」原子性的场景。

public class RocketTaskProducer {
    private final TransactionMQProducer producer;

    public void submitTaskWithTransaction(Task task, Runnable localTransaction) {
        Message message = new Message("task-topic", task.getType(), task.getPayload());

        producer.sendMessageInTransaction(message, (msg, arg) -> {
            // 执行本地事务
            localTransaction.run();
            // 返回事务状态
            return LocalTransactionState.COMMIT_MESSAGE;
        }, null);
    }
}

任务分片:Job 与 Task 的分离

当单个任务处理时间较长时,可以将任务拆分为多个分片并行处理。

flowchart TB
    subgraph Job["大型任务 Job"]
        J["处理 100 万条数据"]
    end

    subgraph Task["分片任务 Tasks"]
        T1["Task 1: 0-20万"]
        T2["Task 2: 20-40万"]
        T3["Task 3: 40-60万"]
        T4["Task 4: 60-80万"]
        T5["Task 5: 80-100万"]
    end

    subgraph Workers["并行执行"]
        W1["Worker 1"]
        W2["Worker 2"]
        W3["Worker 3"]
    end

    J -->|"拆分为 5 个分片"| T1
    J -->|"拆分为 5 个分片"| T2
    J -->|"拆分为 5 个分片"| T3
    J -->|"拆分为 5 个分片"| T4
    J -->|"拆分为 5 个分片"| T5

    T1 --> W1
    T2 --> W2
    T3 --> W3
    T4 --> W1
    T5 --> W2
public class JobSplitter {
    public List<Task> splitJob(Job job, int shardCount) {
        long totalRecords = job.getTotalRecords();
        long shardSize = (totalRecords + shardCount - 1) / shardCount;

        List<Task> tasks = new ArrayList<>();
        for (int i = 0; i < shardCount; i++) {
            long start = i * shardSize;
            long end = Math.min(start + shardSize, totalRecords);

            tasks.add(Task.builder()
                .jobId(job.getId())
                .shardId(i)
                .startRecord(start)
                .endRecord(end)
                .totalShards(shardCount)
                .build());
        }
        return tasks;
    }
}

任务持久化与重试

持久化策略

任务不能「丢了就丢了」,必须持久化到磁盘或可靠的存储中。

持久化级别说明适用场景
内存队列只存内存,宕机丢失开发测试
磁盘持久化消息落盘,宕机可恢复一般生产环境
多副本同步多节点同步写入关键任务

Kafka 默认将消息持久化到磁盘,并通过操作系统的 Page Cache 优化读写性能。对于更严格的持久化要求,可以配置 acks=all,等待所有 ISR(in-sync replica)确认。

重试机制

任务执行失败后,需要进入重试流程。常见重试策略:

public class RetryableTaskProcessor {
    private final int maxRetries = 3;
    private final Map<String, Integer> retryCount = new ConcurrentHashMap<>();

    public ProcessingResult process(Task task) {
        int currentRetry = retryCount.getOrDefault(task.getId(), 0);

        try {
            // 执行业务逻辑
            return doProcess(task);
        } catch (TransientException e) {
            // 瞬时错误,可重试
            if (currentRetry < maxRetries) {
                retryCount.put(task.getId(), currentRetry + 1);
                // 指数退避:1s, 2s, 4s
                long delay = (long) Math.pow(2, currentRetry) * 1000;
                scheduleRetry(task, delay);
                return ProcessingResult.RETRY_SCHEDULED;
            } else {
                // 超过重试次数,进入死信队列
                deadLetterQueue.offer(task);
                return ProcessingResult.DEAD_LETTER;
            }
        } catch (PermanentException e) {
            // 永久错误,不重试
            deadLetterQueue.offer(task);
            return ProcessingResult.DEAD_LETTER;
        }
    }
}

任务状态追踪

对于关键业务任务,需要追踪其完整生命周期。

stateDiagram-v2
    [*] --> Pending: 提交任务
    Pending --> Processing: Worker 领取
    Processing --> Completed: 执行成功
    Processing --> Failed: 执行失败
    Processing --> Retry: 需要重试
    Retry --> Processing: 重试执行
    Retry --> DeadLetter: 重试耗尽
    Failed --> Retry: 自动重试
    DeadLetter --> [*]
    Completed --> [*]
public class TaskStateTracker {
    private final RedisTemplate<String, String> redis;

    public void updateTaskState(String taskId, TaskState state, String detail) {
        String key = "task:state:" + taskId;

        // 使用 Redis 事务保证原子性
        redis.execute(new SessionCallback<Object>() {
            @Override
            public Object execute(RedisOperations operations) throws DataAccessException {
                operations.multi();
                operations.opsForHash().put(key, "state", state.name());
                operations.opsForHash().put(key, "updatedAt", String.valueOf(System.currentTimeMillis()));
                operations.opsForHash().put(key, "detail", detail);

                // 记录状态变更历史
                operations.opsForList().rightPush("task:history:" + taskId,
                    state.name() + ":" + System.currentTimeMillis());

                return operations.exec();
            }
        });
    }

    public TaskStatus getTaskStatus(String taskId) {
        String key = "task:state:" + taskId;
        Map<Object, Object> state = redis.opsForHash().entries(key);

        return TaskStatus.builder()
            .taskId(taskId)
            .state(TaskState.valueOf((String) state.get("state")))
            .detail((String) state.get("detail"))
            .updatedAt(Long.parseLong((String) state.get("updatedAt")))
            .history(getTaskHistory(taskId))
            .build();
    }
}

分布式 Work Queue vs 本地线程池

维度分布式 Work Queue本地线程池
任务范围跨进程、跨机器单进程内
故障恢复任务持久化,worker 重启后可继续worker 重启后任务丢失
扩展性可以增加 worker 机器受单机资源限制
延迟有队列和网络开销纯内存,无额外开销
复杂度需要队列、序列化、网络等简单
适用场景异步任务、跨服务协作并发处理、批量计算
选型建议

如果任务处理是CPU 密集型且在单机内完成,用本地线程池更简单高效;如果任务是IO 密集型或需要跨服务协作,分布式 Work Queue 是更好的选择。很多系统会同时使用两者:本地线程池负责计算,分布式队列负责分发。

思考题

问题 1:如果消息队列本身故障了,任务会丢失吗?如何保证不丢失?

参考答案

如果消息队列故障,任务是否会丢失取决于持久化配置和队列架构。对于 Kafka,可以通过以下配置保证不丢失:replication.factor >= 3min.insync.replicas >= 2acks=all。对于 RabbitMQ,需要使用镜像队列或仲裁队列(Quorum Queue)。此外,生产者在发送消息时应该使用同步发送并检查返回码,失败时重试或持久化到本地。但要注意:没有绝对不丢失的系统,需要在可靠性、成本、性能之间做权衡。

问题 2:Worker 数量如何确定?是不是越多越好?

参考答案

Worker 数量取决于多个因素:任务类型(CPU 密集还是 IO 密集)、下游服务能力(数据库连接池、API 限流)、队列积压情况。CPU 密集型任务的 Worker 数一般设置为 CPU 核心数;IO 密集型任务可以多一些(因为大部分时间在等待)。但 Worker 过多可能导致资源竞争、下游服务过载。推荐做法是:从小数量开始,通过监控队列积压和消费延迟,动态调整 Worker 数量。

问题 3:如何处理「重复消费」问题?

参考答案

分布式队列在某些情况下(如网络抖动、consumer 重启)可能重复投递消息。幂等性处理是关键:1)业务层面设计幂等接口(如唯一订单号);2)使用数据库唯一索引防止重复写入;3)Redis Set 记录已处理的 taskId,处理前先检查;4)消息中携带唯一 ID,消费者根据 ID 做去重。对于 Kafka,可以使用事务或 Exactly-Once 语义(需要下游支持事务)。