数据回填
2020 年,某在线教育平台启动订单系统拆分。旧订单系统是单体的一部分,新订单服务需要独立部署。但旧订单数据已经积累了 3 年,总量超过 2 亿条。
拆分的第一个问题是:这 2 亿条历史订单,怎么同步到新订单服务的数据库里?
这就是数据回填(Data Backfill)。
回填的本质是:在系统改造后,将历史数据补充到新结构中。和双写不同,双写解决的是「迁移期间的增量同步」,回填解决的是「历史数据的批量迁移」。
回填的难点在于:数据量大、时间窗口长、不能影响生产系统。2 亿条数据,如果全量查询一次性写入,会把数据库打爆;如果在业务高峰期执行,会影响正常业务。如何在保证数据正确性的同时,把回填对生产系统的影响降到最低?
回填的时机
回填通常在以下两个阶段执行:
阶段一:新旧系统并行期间
新旧系统并行运行期间,旧系统产生的增量数据会通过双写同步到新系统。但并行之前旧系统已有的历史数据,需要通过回填来处理。
┌─────────────────────┐ ┌─────────────────────┐
│ 旧订单服务 │ │ 新订单服务 │
│ │ │ │
│ ┌─────────────────┐ │ │ │
│ │ 历史订单数据 │ │ │ │
│ │ (3年, 2亿条) │─┼──────┼───→ 回填任务 │
│ └─────────────────┘ │ │ │
│ │ │ │
│ ┌─────────────────┐ │ │ ┌─────────────────┐ │
│ │ 增量订单数据 │─┼──────┼───→ 双写任务 │
│ └─────────────────┘ │ │ └─────────────────┘ │
└─────────────────────┘ └─────────────────────┘
阶段二:数据库迁移期间
如果是数据库迁移(如 Oracle → MySQL,分库分表),历史数据需要从源库迁移到目标库,这也是回填的一种场景。
回填的策略
全量回填 vs 增量回填
实际项目中,通常采用「全量 + 增量」的组合策略:
- 先做一次全量回填,把历史数据迁移过去
- 然后开启增量同步,持续同步增量数据
- 等增量追平后,停止增量任务,开始切流量
全量回填的实现
全量回填的核心是:分页查询 + 批量写入 + 断点续传。
分页查询
@Service
public class OrderBackfillService {
@Autowired private OrderMapper oldOrderMapper; // 源库(旧系统)
@Autowired private OrderMapper newOrderMapper; // 目标库(新系统)
private static final int PAGE_SIZE = 1000;
private static final long INITIAL_LAST_ID = 0;
public BackfillResult fullBackfill() {
long lastId = INITIAL_LAST_ID;
int totalCount = 0;
int failCount = 0;
while (true) {
// 1. 分页查询(按 ID 升序,避免数据偏移)
List<Order> orders = oldOrderMapper.selectByPage(lastId, PAGE_SIZE);
if (orders.isEmpty()) {
break;
}
// 2. 批量写入新库
try {
BatchInsertResult result = batchInsert(orders);
totalCount += result.getSuccessCount();
failCount += result.getFailCount();
} catch (Exception e) {
log.error("批量写入失败,lastId={}, 数量={}", lastId, orders.size(), e);
failCount += orders.size();
}
// 3. 更新游标
lastId = orders.get(orders.size() - 1).getId();
// 4. 记录进度(用于断点续传)
saveProgress("order_backfill", lastId, totalCount, failCount);
// 5. 休眠,避免对源库造成过大压力
Thread.sleep(100);
}
return new BackfillResult(totalCount, failCount);
}
private BatchInsertResult batchInsert(List<Order> orders) {
// 批量插入实现
return newOrderMapper.batchInsert(orders);
}
}
// Mapper 层实现
@Mapper
public interface OrderMapper {
// 分页查询(按 ID 升序)
@Select("SELECT * FROM orders WHERE id > #{lastId} ORDER BY id ASC LIMIT #{pageSize}")
List<Order> selectByPage(@Param("lastId") long lastId, @Param("pageSize") int pageSize);
// 批量插入
@Insert("<script>" +
"INSERT INTO orders (id, order_no, user_id, amount, status, create_time, update_time) " +
"VALUES " +
"<foreach collection='orders' item='item' separator=','>" +
"(#{item.id}, #{item.orderNo}, #{item.userId}, #{item.amount}, " +
"#{item.status}, #{item.createTime}, #{item.updateTime})" +
"</foreach>" +
"</script>")
int batchInsert(@Param("orders") List<Order> orders);
}
断点续传
回填任务如果中途失败(如数据库连接超时),需要能够从上次中断的位置继续,而不是从头开始。断点续传通过记录进度来实现:
@Service
public class BackfillProgressService {
@Autowired private ConfigMapper configMapper;
public void saveProgress(String taskName, long lastId, int successCount, int failCount) {
ConfigRecord record = new ConfigRecord();
record.setConfigKey("backfill_progress_" + taskName);
record.setConfigValue(JSON.toJSONString(new Progress(lastId, successCount, failCount)));
configMapper.insertOrUpdate(record);
}
public Progress loadProgress(String taskName) {
ConfigRecord record = configMapper.findByKey("backfill_progress_" + taskName);
if (record == null) {
return new Progress(0, 0, 0);
}
return JSON.parseObject(record.getConfigValue(), Progress.class);
}
@Data
public static class Progress {
private long lastId;
private int successCount;
private int failCount;
public Progress() {}
public Progress(long lastId, int successCount, int failCount) {
this.lastId = lastId;
this.successCount = successCount;
this.failCount = failCount;
}
}
}
回填任务启动时,先从进度表中加载上次执行的位置:
public BackfillResult fullBackfill() {
// 加载断点
BackfillProgressService.Progress progress = progressService.loadProgress("order_backfill");
long lastId = progress.getLastId();
int totalCount = progress.getSuccessCount();
log.info("从断点继续回填,lastId={}, 已完成数量={}", lastId, totalCount);
// 继续回填...
}
增量回填的实现
增量回填的核心是:基于时间戳或版本号的增量同步。
基于时间戳的增量同步
@Service
public class IncrementalBackfillService {
@Autowired private OrderMapper oldOrderMapper;
@Autowired private OrderMapper newOrderMapper;
@Autowired private BackfillProgressService progressService;
private static final int SYNC_INTERVAL_SECONDS = 60; // 每次同步 60 秒内的增量
// 增量同步定时任务
@Scheduled(fixedDelay = 5000) // 每 5 秒执行一次
public void incrementalSync() {
// 1. 加载同步水位
long watermark = progressService.getWatermark("order_incremental");
long now = System.currentTimeMillis();
// 2. 查询增量数据
List<Order> incrementalOrders = oldOrderMapper.selectByCreateTimeRange(
watermark, now);
if (incrementalOrders.isEmpty()) {
return;
}
// 3. 批量写入新库
int successCount = 0;
for (Order order : incrementalOrders) {
try {
newOrderMapper.insertOrUpdate(order);
successCount++;
} catch (Exception e) {
log.error("增量同步失败,orderId={}", order.getId(), e);
}
}
// 4. 更新水位(取本次同步的最大时间戳)
long newWatermark = incrementalOrders.stream()
.mapToLong(Order::getCreateTime)
.max()
.orElse(watermark);
progressService.setWatermark("order_incremental", newWatermark);
log.info("增量同步完成,数量={}, 新水位={}", successCount, newWatermark);
}
}
基于 Binlog 的增量同步
对于 MySQL,可以直接解析 Binlog 来获取增量数据,避免对源库加字段:
@Service
public class BinlogSyncService {
public void startBinlogSync() {
// 使用 Canal 或 Debezium 解析 Binlog
CanalClient canalClient = CanalClient.builder()
.destination("example")
.zookeeperAddress("127.0.0.1:2181")
.build();
canalClient.subscribe(".*\\..*"); // 订阅所有表
canalClient.addMessageListener((entry) -> {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {
// 处理新增
handleInsert(rowData);
} else if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
// 处理更新
handleUpdate(rowData);
} else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
// 处理删除
handleDelete(rowData);
}
}
});
canalClient.start();
}
}
回填的风险与应对
风险一:数据量过大导致数据库压力
回填任务本质上是对源库的大规模查询。如果不加控制,可能把源库打爆,影响正常业务。
应对策略:
- 限速查询:每批查询后休眠一段时间,降低对源库的 QPS
- 只读副本:如果源库有只读副本,回填任务从只读副本读取,避免影响主库
- 错峰执行:在业务低峰期执行回填(如凌晨 2-6 点)
// 限速查询示例
while (true) {
List<Order> orders = oldOrderMapper.selectByPage(lastId, PAGE_SIZE);
if (orders.isEmpty()) {
break;
}
// 批量写入...
lastId = orders.get(orders.size() - 1).getId();
// 限速:每处理 1000 条休眠 100ms
// 相当于每秒处理 10000 条,对源库的压力可控
if (totalCount % 1000 == 0) {
Thread.sleep(100);
}
}
风险二:回填期间数据被修改
回填过程中,如果源数据被修改(如用户修改了收货地址),回填的是修改前的快照,会覆盖掉新的修改。
应对策略:
- 基于版本号乐观锁:写入时检查版本号,如果目标库的数据版本更新,则跳过覆盖
- 基于时间戳的 Last-Write-Wins:保留最新修改的数据
- 双写 + 回填组合:回填完成后,只依赖双写同步增量数据
// 乐观锁处理示例
public void backfillOrder(Order order) {
// 先查询目标库是否已有该订单
Order existingOrder = newOrderMapper.findById(order.getId());
if (existingOrder == null) {
// 目标库没有,直接插入
newOrderMapper.insert(order);
} else {
// 目标库已有,比较更新时间
if (order.getUpdateTime().isAfter(existingOrder.getUpdateTime())) {
// 源数据更新,覆盖目标数据
newOrderMapper.update(order);
} else {
// 目标数据更新或相同,跳过
log.debug("跳过更新的数据,orderId={}", order.getId());
}
}
}
回填的监控与校验
回填完成后,必须验证数据的正确性。校验通常分为三个层次:
1. 数量校验
@Service
public class BackfillValidator {
@Autowired private OrderMapper oldOrderMapper;
@Autowired private OrderMapper newOrderMapper;
public ValidationReport validate() {
ValidationReport report = new ValidationReport();
// 1. 数量对比
long oldCount = oldOrderMapper.count();
long newCount = newOrderMapper.count();
report.setOldCount(oldCount);
report.setNewCount(newCount);
report.setCountDiff(oldCount - newCount);
if (oldCount != newCount) {
report.setStatus("FAILED");
alertService.alert("回填数量不一致:源库={}, 目标库={}",
oldCount, newCount);
} else {
report.setStatus("PASSED");
}
return report;
}
}
2. 抽样校验
全量对比太慢,通常采用抽样对比的方式:
public ValidationReport sampleValidate(int sampleSize) {
ValidationReport report = new ValidationReport();
List<Order> samples = oldOrderMapper.selectRandomSamples(sampleSize);
int mismatchCount = 0;
for (Order oldOrder : samples) {
Order newOrder = newOrderMapper.findById(oldOrder.getId());
if (newOrder == null) {
mismatchCount++;
log.warn("目标库缺失数据,orderId={}", oldOrder.getId());
} else if (!equals(oldOrder, newOrder)) {
mismatchCount++;
log.warn("数据不一致,orderId={}", oldOrder.getId());
}
}
report.setSampleSize(sampleSize);
report.setMismatchCount(mismatchCount);
report.setMismatchRate((double) mismatchCount / sampleSize);
if (mismatchCount > 0) {
alertService.alert("回填数据抽样校验失败,不一致数量={}/{}",
mismatchCount, sampleSize);
}
return report;
}
3. 业务校验
某些核心业务逻辑的校验,如订单金额计算、状态流转等:
public void validateOrderBusinessLogic() {
// 抽样校验订单金额计算
List<Order> samples = newOrderMapper.selectRandomSamples(100);
for (Order order : samples) {
// 重新计算订单金额
BigDecimal calculatedAmount = orderItemMapper
.sumAmountByOrderId(order.getId());
if (order.getAmount().compareTo(calculatedAmount) != 0) {
alertService.alert("订单金额计算不一致,orderId={}, 记录金额={}, 计算金额={}",
order.getId(), order.getAmount(), calculatedAmount);
}
}
}
真实案例
真实案例:某电商平台订单系统拆分过程中如何回填 3 年历史订单
- 背景:订单系统从单体拆分,新订单服务需要承接 3 年的历史订单数据,总量约 2 亿条
- 方案:全量 + 增量组合,先全量回填 2 亿条历史订单,再开启增量同步追平双写期间的增量
- 执行策略:1. 从只读副本查询,避免影响主库;2. 每天凌晨 2-6 点执行,每次回填 500 万条;3. 分 4 天完成全量回填
- 问题:回填第 3 天发现,部分订单的收货地址在回填后被用户修改,新地址被旧数据覆盖
- 解决:增加乐观锁逻辑,比较更新时间,只覆盖旧数据
- 结果:最终数据一致率达到 99.999%,只有极少数因并发修改导致的数据差异
- 来源:内部技术复盘文档
总结
数据回填是迁移过程中的重要环节。它的核心挑战是:在保证数据正确性的同时,把对生产系统的影响降到最低。
全量回填适合数据量可控的场景,关键是分页查询 + 批量写入 + 断点续传。增量回填适合数据量大、生产库压力不能过高的场景,关键是设计合理的水位机制。实际项目中,通常采用「全量 + 增量」的组合策略,先快速迁移历史数据,再持续追平增量。
回填的风险主要集中在两个方面:源库压力和数据覆盖问题。限速查询、只读副本、乐观锁是常用的应对手段。回填完成后,必须通过数量校验、抽样校验和业务校验三重验证,确保数据正确性。
常见陷阱与反模式:
- 没有断点续传:回填任务中途失败后从头开始,导致浪费大量时间和资源。必须实现断点续传机制。
- 回填对源库压力过大:一次性查询全量数据,把数据库打爆。必须限速、错峰、或者从只读副本读取。
- 忽略增量数据:只做全量回填,不处理回填期间产生的增量数据。回填完成后要检查是否有数据遗漏。
思考题
问题 1:如果回填过程中发现数据有质量问题(如脏数据),应该如何处理?
参考答案
数据质量问题通常分为两类:1. 可自动修复的脏数据:如格式不统一、字段缺失等,可以在回填过程中通过清洗脚本自动修复;2. 不可自动修复的脏数据:如业务逻辑错误,需要人工判断和处理。建议在回填前先做数据质量评估,对脏数据进行分类并制定处理策略。对于严重的脏数据,可以先跳过,回填完成后再单独处理。
问题 2:如果回填的目标是分库分表的新库,如何保证数据均匀分布?
参考答案
分库分表的关键是选择合理的分片键。如果按用户 ID 分片,需要在回填前先计算每条数据应该写入哪个分片,避免数据倾斜。可以使用「预计算 + 批量路由」的方式:先扫描一遍数据,统计各分片的数据量,然后根据分片容量动态调整回填速率,确保分片间负载均衡。
问题 3:回填完成后,如何判断可以停止双写、切换流量?
参考答案
需要同时满足以下条件:1. 全量回填完成,所有历史数据都已迁移到新库;2. 增量同步追平,新旧库之间没有数据延迟;3. 数据一致性校验通过,不一致率在可接受范围内(如 < 0.01%);4. 新服务稳定性验证通过,在灰度流量下运行正常(通常观察 24-48 小时)。满足以上条件后,可以开始切流量,但双写任务应该保留一段时间(如 1 周),作为应急回退的保障。