数据回填

2020 年,某在线教育平台启动订单系统拆分。旧订单系统是单体的一部分,新订单服务需要独立部署。但旧订单数据已经积累了 3 年,总量超过 2 亿条。

拆分的第一个问题是:这 2 亿条历史订单,怎么同步到新订单服务的数据库里?

这就是数据回填(Data Backfill)。

回填的本质是:在系统改造后,将历史数据补充到新结构中。和双写不同,双写解决的是「迁移期间的增量同步」,回填解决的是「历史数据的批量迁移」。

回填的难点在于:数据量大、时间窗口长、不能影响生产系统。2 亿条数据,如果全量查询一次性写入,会把数据库打爆;如果在业务高峰期执行,会影响正常业务。如何在保证数据正确性的同时,把回填对生产系统的影响降到最低?

回填的时机

回填通常在以下两个阶段执行:

阶段一:新旧系统并行期间

新旧系统并行运行期间,旧系统产生的增量数据会通过双写同步到新系统。但并行之前旧系统已有的历史数据,需要通过回填来处理。

┌─────────────────────┐      ┌─────────────────────┐
│     旧订单服务        │      │     新订单服务        │
│                      │      │                      │
│  ┌─────────────────┐ │      │                      │
│  │   历史订单数据    │ │      │                      │
│  │  (3年, 2亿条)   │─┼──────┼───→ 回填任务        │
│  └─────────────────┘ │      │                      │
│                      │      │                      │
│  ┌─────────────────┐ │      │  ┌─────────────────┐ │
│  │   增量订单数据    │─┼──────┼───→ 双写任务       │
│  └─────────────────┘ │      │  └─────────────────┘ │
└─────────────────────┘      └─────────────────────┘

阶段二:数据库迁移期间

如果是数据库迁移(如 Oracle → MySQL,分库分表),历史数据需要从源库迁移到目标库,这也是回填的一种场景。

回填的策略

全量回填 vs 增量回填

策略说明优点缺点适用场景
全量回填一次性将所有历史数据迁移到新系统简单直接,不遗漏数据数据量大时对源库压力大,回填时间长首次迁移、数据量可控的场景
增量回填先迁移历史快照,然后持续同步增量对源库压力小,可以分批执行实现复杂,需要处理增量同步数据量大、生产库不能长时间压力的场景

实际项目中,通常采用「全量 + 增量」的组合策略:

  1. 先做一次全量回填,把历史数据迁移过去
  2. 然后开启增量同步,持续同步增量数据
  3. 等增量追平后,停止增量任务,开始切流量

全量回填的实现

全量回填的核心是:分页查询 + 批量写入 + 断点续传。

分页查询

@Service
public class OrderBackfillService {

    @Autowired private OrderMapper oldOrderMapper;   // 源库(旧系统)
    @Autowired private OrderMapper newOrderMapper;   // 目标库(新系统)

    private static final int PAGE_SIZE = 1000;
    private static final long INITIAL_LAST_ID = 0;

    public BackfillResult fullBackfill() {
        long lastId = INITIAL_LAST_ID;
        int totalCount = 0;
        int failCount = 0;

        while (true) {
            // 1. 分页查询(按 ID 升序,避免数据偏移)
            List<Order> orders = oldOrderMapper.selectByPage(lastId, PAGE_SIZE);

            if (orders.isEmpty()) {
                break;
            }

            // 2. 批量写入新库
            try {
                BatchInsertResult result = batchInsert(orders);
                totalCount += result.getSuccessCount();
                failCount += result.getFailCount();

            } catch (Exception e) {
                log.error("批量写入失败,lastId={}, 数量={}", lastId, orders.size(), e);
                failCount += orders.size();
            }

            // 3. 更新游标
            lastId = orders.get(orders.size() - 1).getId();

            // 4. 记录进度(用于断点续传)
            saveProgress("order_backfill", lastId, totalCount, failCount);

            // 5. 休眠,避免对源库造成过大压力
            Thread.sleep(100);
        }

        return new BackfillResult(totalCount, failCount);
    }

    private BatchInsertResult batchInsert(List<Order> orders) {
        // 批量插入实现
        return newOrderMapper.batchInsert(orders);
    }
}
// Mapper 层实现
@Mapper
public interface OrderMapper {

    // 分页查询(按 ID 升序)
    @Select("SELECT * FROM orders WHERE id > #{lastId} ORDER BY id ASC LIMIT #{pageSize}")
    List<Order> selectByPage(@Param("lastId") long lastId, @Param("pageSize") int pageSize);

    // 批量插入
    @Insert("<script>" +
        "INSERT INTO orders (id, order_no, user_id, amount, status, create_time, update_time) " +
        "VALUES " +
        "<foreach collection='orders' item='item' separator=','>" +
        "(#{item.id}, #{item.orderNo}, #{item.userId}, #{item.amount}, " +
        "#{item.status}, #{item.createTime}, #{item.updateTime})" +
        "</foreach>" +
        "</script>")
    int batchInsert(@Param("orders") List<Order> orders);
}

断点续传

回填任务如果中途失败(如数据库连接超时),需要能够从上次中断的位置继续,而不是从头开始。断点续传通过记录进度来实现:

@Service
public class BackfillProgressService {

    @Autowired private ConfigMapper configMapper;

    public void saveProgress(String taskName, long lastId, int successCount, int failCount) {
        ConfigRecord record = new ConfigRecord();
        record.setConfigKey("backfill_progress_" + taskName);
        record.setConfigValue(JSON.toJSONString(new Progress(lastId, successCount, failCount)));
        configMapper.insertOrUpdate(record);
    }

    public Progress loadProgress(String taskName) {
        ConfigRecord record = configMapper.findByKey("backfill_progress_" + taskName);
        if (record == null) {
            return new Progress(0, 0, 0);
        }
        return JSON.parseObject(record.getConfigValue(), Progress.class);
    }

    @Data
    public static class Progress {
        private long lastId;
        private int successCount;
        private int failCount;

        public Progress() {}
        public Progress(long lastId, int successCount, int failCount) {
            this.lastId = lastId;
            this.successCount = successCount;
            this.failCount = failCount;
        }
    }
}

回填任务启动时,先从进度表中加载上次执行的位置:

public BackfillResult fullBackfill() {
    // 加载断点
    BackfillProgressService.Progress progress = progressService.loadProgress("order_backfill");
    long lastId = progress.getLastId();
    int totalCount = progress.getSuccessCount();

    log.info("从断点继续回填,lastId={}, 已完成数量={}", lastId, totalCount);

    // 继续回填...
}

增量回填的实现

增量回填的核心是:基于时间戳或版本号的增量同步。

基于时间戳的增量同步

@Service
public class IncrementalBackfillService {

    @Autowired private OrderMapper oldOrderMapper;
    @Autowired private OrderMapper newOrderMapper;
    @Autowired private BackfillProgressService progressService;

    private static final int SYNC_INTERVAL_SECONDS = 60;  // 每次同步 60 秒内的增量

    // 增量同步定时任务
    @Scheduled(fixedDelay = 5000)  // 每 5 秒执行一次
    public void incrementalSync() {
        // 1. 加载同步水位
        long watermark = progressService.getWatermark("order_incremental");
        long now = System.currentTimeMillis();

        // 2. 查询增量数据
        List<Order> incrementalOrders = oldOrderMapper.selectByCreateTimeRange(
            watermark, now);

        if (incrementalOrders.isEmpty()) {
            return;
        }

        // 3. 批量写入新库
        int successCount = 0;
        for (Order order : incrementalOrders) {
            try {
                newOrderMapper.insertOrUpdate(order);
                successCount++;
            } catch (Exception e) {
                log.error("增量同步失败,orderId={}", order.getId(), e);
            }
        }

        // 4. 更新水位(取本次同步的最大时间戳)
        long newWatermark = incrementalOrders.stream()
            .mapToLong(Order::getCreateTime)
            .max()
            .orElse(watermark);
        progressService.setWatermark("order_incremental", newWatermark);

        log.info("增量同步完成,数量={}, 新水位={}", successCount, newWatermark);
    }
}

基于 Binlog 的增量同步

对于 MySQL,可以直接解析 Binlog 来获取增量数据,避免对源库加字段:

@Service
public class BinlogSyncService {

    public void startBinlogSync() {
        // 使用 Canal 或 Debezium 解析 Binlog
        CanalClient canalClient = CanalClient.builder()
            .destination("example")
            .zookeeperAddress("127.0.0.1:2181")
            .build();

        canalClient.subscribe(".*\\..*");  // 订阅所有表

        canalClient.addMessageListener((entry) -> {
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());

            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {
                    // 处理新增
                    handleInsert(rowData);
                } else if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
                    // 处理更新
                    handleUpdate(rowData);
                } else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
                    // 处理删除
                    handleDelete(rowData);
                }
            }
        });

        canalClient.start();
    }
}

回填的风险与应对

风险一:数据量过大导致数据库压力

回填任务本质上是对源库的大规模查询。如果不加控制,可能把源库打爆,影响正常业务。

应对策略:

  1. 限速查询:每批查询后休眠一段时间,降低对源库的 QPS
  2. 只读副本:如果源库有只读副本,回填任务从只读副本读取,避免影响主库
  3. 错峰执行:在业务低峰期执行回填(如凌晨 2-6 点)
// 限速查询示例
while (true) {
    List<Order> orders = oldOrderMapper.selectByPage(lastId, PAGE_SIZE);

    if (orders.isEmpty()) {
        break;
    }

    // 批量写入...

    lastId = orders.get(orders.size() - 1).getId();

    // 限速:每处理 1000 条休眠 100ms
    // 相当于每秒处理 10000 条,对源库的压力可控
    if (totalCount % 1000 == 0) {
        Thread.sleep(100);
    }
}

风险二:回填期间数据被修改

回填过程中,如果源数据被修改(如用户修改了收货地址),回填的是修改前的快照,会覆盖掉新的修改。

应对策略:

  1. 基于版本号乐观锁:写入时检查版本号,如果目标库的数据版本更新,则跳过覆盖
  2. 基于时间戳的 Last-Write-Wins:保留最新修改的数据
  3. 双写 + 回填组合:回填完成后,只依赖双写同步增量数据
// 乐观锁处理示例
public void backfillOrder(Order order) {
    // 先查询目标库是否已有该订单
    Order existingOrder = newOrderMapper.findById(order.getId());

    if (existingOrder == null) {
        // 目标库没有,直接插入
        newOrderMapper.insert(order);
    } else {
        // 目标库已有,比较更新时间
        if (order.getUpdateTime().isAfter(existingOrder.getUpdateTime())) {
            // 源数据更新,覆盖目标数据
            newOrderMapper.update(order);
        } else {
            // 目标数据更新或相同,跳过
            log.debug("跳过更新的数据,orderId={}", order.getId());
        }
    }
}

回填的监控与校验

回填完成后,必须验证数据的正确性。校验通常分为三个层次:

1. 数量校验

@Service
public class BackfillValidator {

    @Autowired private OrderMapper oldOrderMapper;
    @Autowired private OrderMapper newOrderMapper;

    public ValidationReport validate() {
        ValidationReport report = new ValidationReport();

        // 1. 数量对比
        long oldCount = oldOrderMapper.count();
        long newCount = newOrderMapper.count();

        report.setOldCount(oldCount);
        report.setNewCount(newCount);
        report.setCountDiff(oldCount - newCount);

        if (oldCount != newCount) {
            report.setStatus("FAILED");
            alertService.alert("回填数量不一致:源库={}, 目标库={}",
                oldCount, newCount);
        } else {
            report.setStatus("PASSED");
        }

        return report;
    }
}

2. 抽样校验

全量对比太慢,通常采用抽样对比的方式:

public ValidationReport sampleValidate(int sampleSize) {
    ValidationReport report = new ValidationReport();
    List<Order> samples = oldOrderMapper.selectRandomSamples(sampleSize);

    int mismatchCount = 0;
    for (Order oldOrder : samples) {
        Order newOrder = newOrderMapper.findById(oldOrder.getId());

        if (newOrder == null) {
            mismatchCount++;
            log.warn("目标库缺失数据,orderId={}", oldOrder.getId());
        } else if (!equals(oldOrder, newOrder)) {
            mismatchCount++;
            log.warn("数据不一致,orderId={}", oldOrder.getId());
        }
    }

    report.setSampleSize(sampleSize);
    report.setMismatchCount(mismatchCount);
    report.setMismatchRate((double) mismatchCount / sampleSize);

    if (mismatchCount > 0) {
        alertService.alert("回填数据抽样校验失败,不一致数量={}/{}",
            mismatchCount, sampleSize);
    }

    return report;
}

3. 业务校验

某些核心业务逻辑的校验,如订单金额计算、状态流转等:

public void validateOrderBusinessLogic() {
    // 抽样校验订单金额计算
    List<Order> samples = newOrderMapper.selectRandomSamples(100);

    for (Order order : samples) {
        // 重新计算订单金额
        BigDecimal calculatedAmount = orderItemMapper
            .sumAmountByOrderId(order.getId());

        if (order.getAmount().compareTo(calculatedAmount) != 0) {
            alertService.alert("订单金额计算不一致,orderId={}, 记录金额={}, 计算金额={}",
                order.getId(), order.getAmount(), calculatedAmount);
        }
    }
}

真实案例

真实案例:某电商平台订单系统拆分过程中如何回填 3 年历史订单

  • 背景:订单系统从单体拆分,新订单服务需要承接 3 年的历史订单数据,总量约 2 亿条
  • 方案:全量 + 增量组合,先全量回填 2 亿条历史订单,再开启增量同步追平双写期间的增量
  • 执行策略:1. 从只读副本查询,避免影响主库;2. 每天凌晨 2-6 点执行,每次回填 500 万条;3. 分 4 天完成全量回填
  • 问题:回填第 3 天发现,部分订单的收货地址在回填后被用户修改,新地址被旧数据覆盖
  • 解决:增加乐观锁逻辑,比较更新时间,只覆盖旧数据
  • 结果:最终数据一致率达到 99.999%,只有极少数因并发修改导致的数据差异
  • 来源:内部技术复盘文档

总结

数据回填是迁移过程中的重要环节。它的核心挑战是:在保证数据正确性的同时,把对生产系统的影响降到最低

全量回填适合数据量可控的场景,关键是分页查询 + 批量写入 + 断点续传。增量回填适合数据量大、生产库压力不能过高的场景,关键是设计合理的水位机制。实际项目中,通常采用「全量 + 增量」的组合策略,先快速迁移历史数据,再持续追平增量。

回填的风险主要集中在两个方面:源库压力和数据覆盖问题。限速查询、只读副本、乐观锁是常用的应对手段。回填完成后,必须通过数量校验、抽样校验和业务校验三重验证,确保数据正确性。


常见陷阱与反模式

  1. 没有断点续传:回填任务中途失败后从头开始,导致浪费大量时间和资源。必须实现断点续传机制。
  2. 回填对源库压力过大:一次性查询全量数据,把数据库打爆。必须限速、错峰、或者从只读副本读取。
  3. 忽略增量数据:只做全量回填,不处理回填期间产生的增量数据。回填完成后要检查是否有数据遗漏。

思考题

问题 1:如果回填过程中发现数据有质量问题(如脏数据),应该如何处理?

参考答案

数据质量问题通常分为两类:1. 可自动修复的脏数据:如格式不统一、字段缺失等,可以在回填过程中通过清洗脚本自动修复;2. 不可自动修复的脏数据:如业务逻辑错误,需要人工判断和处理。建议在回填前先做数据质量评估,对脏数据进行分类并制定处理策略。对于严重的脏数据,可以先跳过,回填完成后再单独处理。

问题 2:如果回填的目标是分库分表的新库,如何保证数据均匀分布?

参考答案

分库分表的关键是选择合理的分片键。如果按用户 ID 分片,需要在回填前先计算每条数据应该写入哪个分片,避免数据倾斜。可以使用「预计算 + 批量路由」的方式:先扫描一遍数据,统计各分片的数据量,然后根据分片容量动态调整回填速率,确保分片间负载均衡。

问题 3:回填完成后,如何判断可以停止双写、切换流量?

参考答案

需要同时满足以下条件:1. 全量回填完成,所有历史数据都已迁移到新库;2. 增量同步追平,新旧库之间没有数据延迟;3. 数据一致性校验通过,不一致率在可接受范围内(如 < 0.01%);4. 新服务稳定性验证通过,在灰度流量下运行正常(通常观察 24-48 小时)。满足以上条件后,可以开始切流量,但双写任务应该保留一段时间(如 1 周),作为应急回退的保障。