海量数据处理
2018 年双十一,阿里数据平台的单日数据增量突破了 1PB(1024TB)。这个数字是什么概念?如果用普通 8TB 硬盘存储,需要 128 块硬盘串联起来才能装下;如果用 100Mbps 的网络传输,需要 2.4 年才能传完。
但这还不是最难的——更难的是:这些数据不仅需要存储,还需要实时查询。双十一大屏上的 GMV 数字,每一秒都在变化;商家后台的实时销售看板,需要秒级刷新;风控系统需要在用户下单的瞬间,判断这笔订单是否存在欺诈风险。
这背后是一套完整的数据平台架构,涵盖从数据采集、存储、计算到查询的全链路。本篇文章以一个中型电商公司(约 500 万日活用户)的数据平台为例,讲解如何从零构建海量数据处理系统。
数据特点:为什么普通方案不够用
一个具体的数字
假设你的电商平台每天产生以下数据:
这个规模看起来不大,但如果加上以下需求:
- 实时查询:用户在某个页面的停留时长、点击热力图,需要秒级响应
- 历史分析:对比去年双十一和今年双十一的 GMV 趋势,需要跨年查询
- 高并发:运营人员在白天高峰期同时使用 BI 系统,要求 p99 < 3s
- 低成本:数据平台不直接产生收入,成本控制是重要命题
MySQL 单机在 1 亿行数据时已经开始吃力,更别说 53TB。普通 OLTP 数据库根本无法满足这类需求。
数据平台的两个核心问题
问题一:写入和查询是两种不同的工作负载。
数据写入的特点是:顺序追加、大流量、批量操作。数据查询的特点是:随机读取、复杂条件、聚合分析。这两种 workload 放在同一个数据库里,互相干扰。
解决方案:分离写入和查询路径。用专门的写入系统接收数据,用专门的查询引擎响应查询。
问题二:历史数据和实时数据的口径需要一致。
分析师今天算的「双十一 DAU」和昨天算的「双十一 DAU」必须口径一致。如果实时计算用 Flink,历史计算用 Hive,两套系统的时间窗口定义不同,就会产生数据不一致。
解决方案:建立统一的数仓分层,所有计算逻辑都在同一层实现,用视图或物化视图屏蔽底层差异。
数据分层架构
数据分层(Data Warehouse)是工业界几十年沉淀下来的最佳实践。它的核心思想是:不同阶段的数据解决不同的问题,用清晰的边界降低耦合。
数据源(MySQL Binlog / App 埋点 / 日志)
│
▼
ODS 层(原始数据层):未经处理的原始数据
│
▼
DWD 层(明细数据层):清洗、去重、业务过程定义
│
▼
DWS 层(汇总数据层):按主题汇总、公共指标聚合
│
▼
ADS 层(应用数据层):面向业务的直接查询
ODS 层:原始数据层
ODS 层(Operational Data Store,直译为「操作数据层」,但业内通常理解为「原始数据层」)是整个数仓的最底层,数据几乎不做任何加工,直接原样存储。
-- ODS 层建表:用户行为日志原始表
CREATE TABLE ods.user_behavior_log (
-- 原始字段,直接从 Kafka topic 的 Avro schema 映射
log_id STRING,
event_type STRING, -- click, view, add_cart, purchase
user_id STRING,
device_id STRING,
ip STRING,
url STRING,
referrer STRING,
user_agent STRING,
timestamp BIGINT, -- 毫秒时间戳
params MAP<STRING, STRING>, -- 动态扩展字段
-- 审计字段
dt STRING, -- 分区字段:2024-01-15
hp_time TIMESTAMP -- 数据进入数仓的时间
) PARTITIONED BY (dt STRING)
STORED AS PARQUET
TBLPROPERTIES ('parquet.compression' = 'SNAPPY');
ODS 层的核心原则:
- 保留全量原始数据:即使后续清洗出错,也能从 ODS 重刷
- 按时间分区:方便按日期删除历史数据,控制存储成本
- 列式存储:Parquet 或 ORC,减少读取数据量
DWD 层:明细数据层
DWD 层(Dimensional Data Warehouse,也称「明细数据层」)是整个数仓的核心层,负责把原始数据加工成业务可直接使用的明细数据。
DWD 层需要完成的工作:
- 数据清洗:去除异常数据(爬虫流量、测试数据、系统内部流量)
- 去重:同一用户的重复事件去重
- 业务过程定义:什么叫「一次有效的点击」?什么叫「一次成功下单」?这些定义在 DWD 层统一
- 关联维表:把用户行为和用户属性(地域、等级、注册时间)关联起来
-- DWD 层建表:用户行为明细表(已清洗)
CREATE TABLE dwd.user_behavior_detail (
-- 核心维度
user_id STRING,
user_type STRING, -- new / old
user_level STRING, -- v1-v8 会员等级
province STRING, -- 用户归属省份
city STRING, -- 用户归属城市
-- 行为事实
event_type STRING, -- click / view / add_cart / purchase / refund
product_id STRING, -- 商品 ID(非购买事件为空)
product_name STRING,
category_id STRING,
category_name STRING,
brand_id STRING,
brand_name STRING,
shop_id STRING,
shop_name STRING,
price DECIMAL(12,2), -- 商品价格(购买事件)
quantity INT, -- 购买数量(购买事件)
-- 时间维度
event_time TIMESTAMP, -- 事件发生时间
event_date STRING, -- 事件日期:2024-01-15
-- 地域维度(从用户维表关联)
ip STRING,
ip_province STRING,
ip_city STRING,
-- 来源维度
source STRING, -- app / h5 / pc / mini_program
channel STRING, -- 应用市场 / 分享链接 / 搜索
utm_source STRING,
utm_medium STRING,
utm_campaign STRING
) PARTITIONED BY (dt STRING)
STORED AS PARQUET;
-- DWD 层数据清洗任务(每日凌晨调度)
INSERT OVERWRITE TABLE dwd.user_behavior_detail PARTITION (dt = '${bizdate}')
SELECT
-- 基础信息
user_id,
CASE WHEN u.first_order_time IS NOT NULL THEN 'old' ELSE 'new' END as user_type,
COALESCE(u.level, 'v1') as user_level,
COALESCE(u.province, '未知') as province,
COALESCE(u.city, '未知') as city,
-- 行为类型标准化
CASE
WHEN event_type = 'product_click' THEN 'click'
WHEN event_type = 'sku_view' THEN 'view'
WHEN event_type = 'add_to_cart' THEN 'add_cart'
WHEN event_type = 'order_submit' THEN 'purchase'
WHEN event_type = 'order_refund' THEN 'refund'
ELSE event_type
END as event_type,
-- 商品信息
product_id,
product_name,
category_id,
category_name,
brand_id,
brand_name,
shop_id,
shop_name,
CAST(price AS DECIMAL(12,2)) as price,
CAST(quantity AS INT) as quantity,
-- 时间和地域
FROM_UNIXTIME(timestamp / 1000) as event_time,
'${bizdate}' as event_date,
ip,
ip_to_province(ip) as ip_province,
ip_to_city(ip) as ip_city,
-- 来源信息
source,
channel,
utm_source,
utm_medium,
utm_campaign
FROM ods.user_behavior_log
WHERE dt = '${bizdate}'
-- 过滤爬虫流量
AND NOT is_crawler(user_agent, ip)
-- 过滤测试用户
AND user_id NOT IN (${test_user_ids})
-- 过滤内部流量
AND ip NOT IN (${internal_ip_ranges})
-- 过滤异常数据
AND user_id IS NOT NULL
AND LENGTH(user_id) > 0;
DWS 层:汇总数据层
DWS 层(Data Warehouse Summary,也称「汇总数据层」)负责按业务主题做轻度汇总,把明细数据的粒度往上提一层,减少 ADS 层的查询压力。
-- DWS 层建表:用户粒度日活汇总
CREATE TABLE dws.user_daily_summary (
user_id STRING,
dt STRING, -- 日期
-- 行为汇总
pv BIGINT, -- 页面浏览次数
click_cnt BIGINT, -- 点击次数
cart_cnt BIGINT, -- 加购次数
order_cnt BIGINT, -- 下单次数
order_amount DECIMAL(14,2), -- 下单金额
pay_cnt BIGINT, -- 支付次数
pay_amount DECIMAL(14,2), -- 支付金额
-- 商品维度
product_cnt BIGINT, -- 访问商品数
category_cnt BIGINT, -- 访问类目数
-- 时间特征
first_event_time TIMESTAMP, -- 当天首次行为时间
last_event_time TIMESTAMP, -- 当天末次行为时间
duration_seconds BIGINT -- 活跃时长(秒)
) PARTITIONED BY (dt STRING)
STORED AS PARQUET;
-- DWS 层计算任务
INSERT OVERWRITE TABLE dws.user_daily_summary PARTITION (dt = '${bizdate}')
SELECT
user_id,
'${bizdate}' as dt,
COUNT(1) as pv,
SUM(CASE WHEN event_type = 'click' THEN 1 ELSE 0 END) as click_cnt,
SUM(CASE WHEN event_type = 'add_cart' THEN 1 ELSE 0 END) as cart_cnt,
SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as order_cnt,
SUM(CASE WHEN event_type = 'purchase' THEN price ELSE 0 END) as order_amount,
SUM(CASE WHEN is_paid THEN 1 ELSE 0 END) as pay_cnt,
SUM(CASE WHEN is_paid THEN price ELSE 0 END) as pay_amount,
COUNT(DISTINCT product_id) as product_cnt,
COUNT(DISTINCT category_id) as category_cnt,
MIN(event_time) as first_event_time,
MAX(event_time) as last_event_time,
UNIX_TIMESTAMP(MAX(event_time)) - UNIX_TIMESTAMP(MIN(event_time)) as duration_seconds
FROM dwd.user_behavior_detail
WHERE dt = '${bizdate}'
AND user_id IS NOT NULL
GROUP BY user_id;
ADS 层:应用数据层
ADS 层(Application Data Service,也称「应用数据层」)直接面向业务查询,是 BI 系统和数据接口的底层数据来源。
-- ADS 层:运营大盘实时数据(供大屏使用)
CREATE TABLE ads.operation_dashboard (
dt STRING,
-- 核心指标
dau BIGINT, -- 日活用户数
new_user_cnt BIGINT, -- 新增用户数
order_cnt BIGINT, -- 订单数
order_amount DECIMAL(16,2),-- GMV
pay_rate DECIMAL(6,4), -- 支付转化率
avg_order_price DECIMAL(10,2),-- 客单价
-- 商品维度
top_category STRING, -- GMV 最高的类目
top_product_id STRING, -- GMV 最高的商品
-- 时间戳
update_time TIMESTAMP
) STORED AS PARQUET
TBLPROPERTIES ('parquet.compression' = 'SNAPPY');
存储选型
数据平台的存储选型,不是选「哪个数据库最好」,而是选「哪个数据库最适合这个场景」。
四类存储引擎的定位
ClickHouse 为什么这么快
ClickHouse 是近年来大数据领域最火热的 OLAP 引擎,由俄罗斯搜索巨头 Yandex 开源。它的核心设计哲学是:用列式存储 + 向量化执行 + 数据压缩,让查询尽可能靠近磁盘带宽而不是 CPU。
-- ClickHouse 建表:用户行为分析表
CREATE TABLE ch.user_behaviors (
user_id UInt32,
event_type Enum8('click'=1, 'view'=2, 'cart'=3, 'purchase'=4, 'refund'=5),
product_id UInt32,
category_id UInt32,
brand_id UInt32,
price Decimal(10,2),
quantity UInt16,
province FixedString(24),
city FixedString(24),
source UInt8, -- 1=app 2=h5 3=pc 4=小程序
channel String,
event_time DateTime,
dt Date
) ENGINE = MergeTree()
PARTITION BY dt
ORDER BY (dt, event_type, user_id) -- 排序键:决定数据如何物理排列
SETTINGS index_granularity = 8192; -- 索引粒度:每隔 8192 行建一个稀疏索引
-- 查询示例:计算各渠道的 GMV(ClickHouse 秒级返回上亿行数据)
SELECT
channel,
count() as cnt,
sum(price * quantity) as gmv
FROM user_behaviors
WHERE dt >= '2024-01-01' AND dt <= '2024-01-15'
AND event_type = 'purchase'
GROUP BY channel
ORDER BY gmv DESC
LIMIT 100;
ClickHouse 快的原因:
列式存储:只需要读取查询涉及的列,不需要读取整行数据。比如查询所有渠道的 GMV,只需要读取 channel 和 price 两列,不需要读取 user_id、city 等无关列。
向量化执行:一次性处理一批数据(通常 1024 行),而不是一行一行处理。现代 CPU 的 SIMD 指令可以在单条指令里处理多个数据,减少指令数和分支预测失败。
数据压缩:列式存储的数据重复度高(如 event_type 列只有 5 种值),压缩后磁盘读取量大幅减少。压缩比通常在 5-10 倍,意味着读取速度提升 5-10 倍。
稀疏索引:每 8192 行建一个稀疏索引,查询时先通过稀疏索引过滤不相关的数据块,再读取数据。避免全表扫描。
实时数据写入:Kafka + Flink
实时数据的采集和处理链路如下:
App/后端 → 埋点 SDK → Kafka → Flink 实时计算 → ClickHouse/Doris
// Flink 实时计算:实时 DAU 和 GMV
public class RealTimeMetricsJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 开启 Checkpoint:Flink 任务失败后可以从最近快照恢复
env.enableCheckpointing(60000L); // 每 60 秒做一次 Checkpoint
env.getCheckpointConfig().setCheckpointStorage("hdfs://ns1/flink/checkpoints");
// Kafka 数据源
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
"user-behavior-events",
new SimpleStringSchema(),
kafkaProps
);
kafkaSource.setStartFromEarliest();
DataStream<String> rawStream = env.addSource(kafkaSource);
// 解析 JSON
DataStream<UserEvent> events = rawStream
.map(json -> JSON.parseObject(json, UserEvent.class))
.filter(e -> e.getUserId() != null && e.getEventTime() != null);
// ===== 实时 DAU =====
DataStream<WindowResult> dauStats = events
.keyBy(e -> e.getDt()) // 按日期 keyBy
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
.aggregate(new DauAggregator());
// ===== 实时 GMV =====
DataStream<WindowResult> gmvStats = events
.filter(e -> "purchase".equals(e.getEventType()))
.keyBy(e -> e.getDt())
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
.aggregate(new GmvAggregator());
// ===== 实时类目 GMV 排行榜 =====
DataStream<CategoryRank> categoryRank = events
.filter(e -> "purchase".equals(e.getEventType()))
.keyBy(e -> e.getCategoryId())
.window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
.aggregate(new CategoryGmvAggregator());
// 输出到 ClickHouse(JDBC Connector)
dauStats.addSink(new ClickHouseJdbcSink(
"jdbc:clickhouse://localhost:8123/default",
"INSERT INTO ads.realtime_dau (dt, window_start, window_end, dau) VALUES(?,?,?,?)"));
gmvStats.addSink(new ClickHouseJdbcSink(
"jdbc:clickhouse://localhost:8123/default",
"INSERT INTO ads.realtime_gmv (dt, window_start, window_end, order_cnt, gmv) VALUES(?,?,?,?,?)"));
categoryRank.addSink(new ClickHouseJdbcSink(
"jdbc:clickhouse://localhost:8123/default",
"INSERT INTO ads.category_gmv_rank (dt, category_id, category_name, rank, gmv) VALUES(?,?,?,?,?)"));
env.execute("Real-time Metrics Job");
}
}
// DAU 聚合器
public class DauAggregator
implements AggregateFunction<UserEvent, Set<String>, WindowResult> {
@Override
public Set<String> createAccumulator() {
return new HashSet<>(); // 用 Set 自动去重
}
@Override
public Set<String> add(UserEvent event, Set<String> accumulator) {
accumulator.add(event.getUserId()); // Set 去重
return accumulator;
}
@Override
public WindowResult getResult(Set<String> accumulator) {
return new WindowResult(accumulator.size());
}
@Override
public Set<String> merge(Set<String> a, Set<String> b) {
a.addAll(b);
return a;
}
}
// GMV 聚合器
public class GmvAggregator
implements AggregateFunction<UserEvent, GmvAccumulator, WindowResult> {
@Override
public GmvAccumulator createAccumulator() {
return new GmvAccumulator(0, BigDecimal.ZERO);
}
@Override
public GmvAccumulator add(UserEvent event, GmvAccumulator acc) {
acc.orderCnt++;
acc.gmv = acc.gmv.add(event.getPrice().multiply(
BigDecimal.valueOf(event.getQuantity())));
return acc;
}
@Override
public WindowResult getResult(GmvAccumulator acc) {
return new WindowResult(acc.orderCnt, acc.gmv);
}
@Override
public GmvAccumulator merge(GmvAccumulator a, GmvAccumulator b) {
return new GmvAccumulator(
a.orderCnt + b.orderCnt,
a.gmv.add(b.gmv)
);
}
}
数据治理:冷热分层与成本控制
数据平台最大的隐性成本是存储成本。每天 710GB 的数据,如果全量保留两年,需要约 518TB 存储。按云存储 0.12 元/GB/月计算,一年的存储成本高达 75 万元。
冷热分层策略
冷热分层(Tiered Storage)是降低存储成本的核心手段。核心思想是:把访问频率高的数据放在快速存储(SSD),把访问频率低的数据放在低成本存储(HDD / 对象存储)。
# 数据生命周期管理配置
lifecycle:
- table: dwd.user_behavior_detail
retention:
hot: 30d # 最近 30 天:SSD,Parquet + Zstd 压缩
warm: 90d # 30-90 天:普通 HDD,Parquet + LZ4 压缩
cold: 365d # 90 天 - 1 年:对象存储(阿里云 OSS / AWS S3),ORC + LZ4
archive: 730d # 1-2 年:归档存储,仅在需要时恢复
- table: ods.user_behavior_log
retention:
hot: 7d # ODS 层只保留 7 天
cold: 0d # 7 天后直接删除(原始日志可以从 Kafka 重建)
ClickHouse 冷热分层实现
-- ClickHouse 冷热分层存储(需要 ClickHouse Cloud 或自建配置)
-- 语法示例(ClickHouse 22.8+)
CREATE TABLE user_behaviors (
user_id UInt32,
event_type String,
product_id UInt32,
event_time DateTime,
dt Date
) ENGINE = MergeTree()
ORDER BY (dt, event_type, user_id)
TTL dt + INTERVAL 30 DAY -- 30 天后自动移动到冷存储
SETTINGS storage_policy = 'tiered'; -- tiered: 热 + 冷分层
-- 查看存储使用情况
SELECT
table,
sum(rows) as total_rows,
sum(bytes_on_disk) as total_bytes,
sum(bytes_on_disk) / 1024 / 1024 / 1024 as size_gb
FROM system.parts
WHERE table LIKE 'dwd.%'
GROUP BY table
ORDER BY size_gb DESC;
真实踩坑案例
踩坑一:数据倾斜导致 Flink 任务OOM
某公司的 Flink 任务在处理用户行为数据时,TaskManager 频繁 OOM。排查发现:大量用户的 user_id 为空,被归类到同一个 bucket 中,导致某个 Flink 分区的数据量是其他分区的 100 倍。
教训:Flink 的 keyBy 如果 key 分布不均匀,会造成数据倾斜。在 keyBy 之前,需要先过滤掉异常 key,或者用随机分区替代 keyBy。
踩坑二:Hive 小文件过多
某公司的 Hive 任务每天产生数十万个 10MB 以下的小文件,导致 HDFS NameNode 内存压力增大,后续任务启动变慢。
原因:Flink 写入 Kafka 的数据被切分成很多小批次,每个批次触发一个 Hive 分区的写入,产生了大量小文件。
解法:在 Flink 到 Hive 的写入中间加一层 Kafka 到 Hive 的离线同步任务,使用 distribute by rand() 合并小文件。
踩坑三:ClickHouse Join 爆内存
某分析师写了一条跨 3 张表的 Join 查询,结果 ClickHouse 直接报 Memory limit exceeded 并崩溃。
原因:ClickHouse 的 JOIN 默认会把右表全部加载到内存,如果右表很大,内存直接爆。
解法:
- 改写 SQL,用子查询替代 JOIN,让小表先做聚合
- 使用
GLOBAL IN 替代 JOIN,ClickHouse 会把数据广播到所有节点
- 调整
max_memory_usage 参数,分配更多内存
-- 错误的写法:直接 JOIN
SELECT a.user_id, b.user_name, c.order_cnt
FROM dwd.user a
JOIN dim.user_info b ON a.user_id = b.user_id
JOIN ads.user_orders c ON a.user_id = c.user_id -- [!code error]
-- 正确的写法:子查询 + GLOBAL IN
SELECT user_id, user_name, order_cnt
FROM dwd.user a
ANY LEFT JOIN (
SELECT user_id, user_name FROM dim.user_info
) b GLOBAL IN (SELECT user_id, user_name FROM dim.user_info)
ON a.user_id = b.user_id
ANY LEFT JOIN (
SELECT user_id, any(order_cnt) as order_cnt
FROM ads.user_orders GROUP BY user_id
) c ON a.user_id = c.user_id;
术语表
总结
海量数据处理的核心是在成本和性能之间找到动态平衡点。
完整数据链路:
App/后端 → 埋点 → Kafka → Flink 实时计算 → ClickHouse/Doris(实时)
↓
ODS(全量原始数据)→ DWD(清洗)→ DWS(汇总)→ ADS(应用)
存储选型的决策树:
- 需要高速 OLAP 查询(BI 报表)→ ClickHouse
- 需要全文检索(日志分析)→ Elasticsearch
- 需要 KV 快速读写(画像特征)→ HBase / Redis
- 需要离线批处理(ETL 报表)→ Hive / Spark
成本控制的三个手段:
- 冷热分层存储:热数据用 SSD,冷数据用对象存储
- 数据压缩:列式存储 + 适当压缩算法,减少磁盘占用
- 生命周期管理:定期清理过期数据,控制存储总量
真正让数据平台产生价值的,不是用了多少 TB 的存储,而是「数据能不能用起来」。再好的数仓架构,如果没有人用、没有业务价值,就是成本而不是投资。