海量数据处理

2018 年双十一,阿里数据平台的单日数据增量突破了 1PB(1024TB)。这个数字是什么概念?如果用普通 8TB 硬盘存储,需要 128 块硬盘串联起来才能装下;如果用 100Mbps 的网络传输,需要 2.4 年才能传完。

但这还不是最难的——更难的是:这些数据不仅需要存储,还需要实时查询。双十一大屏上的 GMV 数字,每一秒都在变化;商家后台的实时销售看板,需要秒级刷新;风控系统需要在用户下单的瞬间,判断这笔订单是否存在欺诈风险。

这背后是一套完整的数据平台架构,涵盖从数据采集、存储、计算到查询的全链路。本篇文章以一个中型电商公司(约 500 万日活用户)的数据平台为例,讲解如何从零构建海量数据处理系统。

数据特点:为什么普通方案不够用

一个具体的数字

假设你的电商平台每天产生以下数据:

数据来源日增量单条大小存储时长总存储量
用户行为日志500GB500B90 天45TB
订单数据10GB2KB2 年7.3TB
商品数据500MB10KB全量 500 万条50GB
用户数据1GB2KB全量 5000 万条100GB
日志系统200GB1KB7 天1.4TB
合计~710GB/天--~53.5TB

这个规模看起来不大,但如果加上以下需求:

  • 实时查询:用户在某个页面的停留时长、点击热力图,需要秒级响应
  • 历史分析:对比去年双十一和今年双十一的 GMV 趋势,需要跨年查询
  • 高并发:运营人员在白天高峰期同时使用 BI 系统,要求 p99 < 3s
  • 低成本:数据平台不直接产生收入,成本控制是重要命题

MySQL 单机在 1 亿行数据时已经开始吃力,更别说 53TB。普通 OLTP 数据库根本无法满足这类需求。

数据平台的两个核心问题

问题一:写入和查询是两种不同的工作负载。

数据写入的特点是:顺序追加、大流量、批量操作。数据查询的特点是:随机读取、复杂条件、聚合分析。这两种 workload 放在同一个数据库里,互相干扰。

解决方案:分离写入和查询路径。用专门的写入系统接收数据,用专门的查询引擎响应查询。

问题二:历史数据和实时数据的口径需要一致。

分析师今天算的「双十一 DAU」和昨天算的「双十一 DAU」必须口径一致。如果实时计算用 Flink,历史计算用 Hive,两套系统的时间窗口定义不同,就会产生数据不一致。

解决方案:建立统一的数仓分层,所有计算逻辑都在同一层实现,用视图或物化视图屏蔽底层差异。

数据分层架构

数据分层(Data Warehouse)是工业界几十年沉淀下来的最佳实践。它的核心思想是:不同阶段的数据解决不同的问题,用清晰的边界降低耦合。

数据源(MySQL Binlog / App 埋点 / 日志)


ODS 层(原始数据层):未经处理的原始数据


DWD 层(明细数据层):清洗、去重、业务过程定义


DWS 层(汇总数据层):按主题汇总、公共指标聚合


ADS 层(应用数据层):面向业务的直接查询

ODS 层:原始数据层

ODS 层(Operational Data Store,直译为「操作数据层」,但业内通常理解为「原始数据层」)是整个数仓的最底层,数据几乎不做任何加工,直接原样存储

-- ODS 层建表:用户行为日志原始表
CREATE TABLE ods.user_behavior_log (
    -- 原始字段,直接从 Kafka topic 的 Avro schema 映射
    log_id     STRING,
    event_type STRING,        -- click, view, add_cart, purchase
    user_id    STRING,
    device_id  STRING,
    ip         STRING,
    url        STRING,
    referrer   STRING,
    user_agent STRING,
    timestamp  BIGINT,         -- 毫秒时间戳
    params     MAP<STRING, STRING>,  -- 动态扩展字段
    -- 审计字段
    dt         STRING,         -- 分区字段:2024-01-15
    hp_time    TIMESTAMP      -- 数据进入数仓的时间
) PARTITIONED BY (dt STRING)
STORED AS PARQUET
TBLPROPERTIES ('parquet.compression' = 'SNAPPY');

ODS 层的核心原则

  • 保留全量原始数据:即使后续清洗出错,也能从 ODS 重刷
  • 按时间分区:方便按日期删除历史数据,控制存储成本
  • 列式存储:Parquet 或 ORC,减少读取数据量

DWD 层:明细数据层

DWD 层(Dimensional Data Warehouse,也称「明细数据层」)是整个数仓的核心层,负责把原始数据加工成业务可直接使用的明细数据。

DWD 层需要完成的工作:

  1. 数据清洗:去除异常数据(爬虫流量、测试数据、系统内部流量)
  2. 去重:同一用户的重复事件去重
  3. 业务过程定义:什么叫「一次有效的点击」?什么叫「一次成功下单」?这些定义在 DWD 层统一
  4. 关联维表:把用户行为和用户属性(地域、等级、注册时间)关联起来
-- DWD 层建表:用户行为明细表(已清洗)
CREATE TABLE dwd.user_behavior_detail (
    -- 核心维度
    user_id        STRING,
    user_type      STRING,      -- new / old
    user_level     STRING,      -- v1-v8 会员等级
    province       STRING,      -- 用户归属省份
    city           STRING,      -- 用户归属城市

    -- 行为事实
    event_type     STRING,      -- click / view / add_cart / purchase / refund
    product_id     STRING,      -- 商品 ID(非购买事件为空)
    product_name   STRING,
    category_id    STRING,
    category_name  STRING,
    brand_id       STRING,
    brand_name     STRING,
    shop_id        STRING,
    shop_name      STRING,
    price          DECIMAL(12,2),  -- 商品价格(购买事件)
    quantity       INT,          -- 购买数量(购买事件)

    -- 时间维度
    event_time     TIMESTAMP,   -- 事件发生时间
    event_date     STRING,      -- 事件日期:2024-01-15

    -- 地域维度(从用户维表关联)
    ip             STRING,
    ip_province    STRING,
    ip_city        STRING,

    -- 来源维度
    source         STRING,      -- app / h5 / pc / mini_program
    channel        STRING,      -- 应用市场 / 分享链接 / 搜索
    utm_source     STRING,
    utm_medium     STRING,
    utm_campaign   STRING
) PARTITIONED BY (dt STRING)
STORED AS PARQUET;

-- DWD 层数据清洗任务(每日凌晨调度)
INSERT OVERWRITE TABLE dwd.user_behavior_detail PARTITION (dt = '${bizdate}')
SELECT
    -- 基础信息
    user_id,
    CASE WHEN u.first_order_time IS NOT NULL THEN 'old' ELSE 'new' END as user_type,
    COALESCE(u.level, 'v1') as user_level,
    COALESCE(u.province, '未知') as province,
    COALESCE(u.city, '未知') as city,

    -- 行为类型标准化
    CASE
        WHEN event_type = 'product_click' THEN 'click'
        WHEN event_type = 'sku_view' THEN 'view'
        WHEN event_type = 'add_to_cart' THEN 'add_cart'
        WHEN event_type = 'order_submit' THEN 'purchase'
        WHEN event_type = 'order_refund' THEN 'refund'
        ELSE event_type
    END as event_type,

    -- 商品信息
    product_id,
    product_name,
    category_id,
    category_name,
    brand_id,
    brand_name,
    shop_id,
    shop_name,
    CAST(price AS DECIMAL(12,2)) as price,
    CAST(quantity AS INT) as quantity,

    -- 时间和地域
    FROM_UNIXTIME(timestamp / 1000) as event_time,
    '${bizdate}' as event_date,
    ip,
    ip_to_province(ip) as ip_province,
    ip_to_city(ip) as ip_city,

    -- 来源信息
    source,
    channel,
    utm_source,
    utm_medium,
    utm_campaign

FROM ods.user_behavior_log
WHERE dt = '${bizdate}'
  -- 过滤爬虫流量
  AND NOT is_crawler(user_agent, ip)
  -- 过滤测试用户
  AND user_id NOT IN (${test_user_ids})
  -- 过滤内部流量
  AND ip NOT IN (${internal_ip_ranges})
  -- 过滤异常数据
  AND user_id IS NOT NULL
  AND LENGTH(user_id) > 0;

DWS 层:汇总数据层

DWS 层(Data Warehouse Summary,也称「汇总数据层」)负责按业务主题做轻度汇总,把明细数据的粒度往上提一层,减少 ADS 层的查询压力。

-- DWS 层建表:用户粒度日活汇总
CREATE TABLE dws.user_daily_summary (
    user_id         STRING,
    dt              STRING,        -- 日期

    -- 行为汇总
    pv              BIGINT,        -- 页面浏览次数
    click_cnt       BIGINT,        -- 点击次数
    cart_cnt        BIGINT,        -- 加购次数
    order_cnt       BIGINT,        -- 下单次数
    order_amount    DECIMAL(14,2), -- 下单金额
    pay_cnt         BIGINT,        -- 支付次数
    pay_amount      DECIMAL(14,2), -- 支付金额

    -- 商品维度
    product_cnt     BIGINT,        -- 访问商品数
    category_cnt    BIGINT,        -- 访问类目数

    -- 时间特征
    first_event_time TIMESTAMP,    -- 当天首次行为时间
    last_event_time  TIMESTAMP,   -- 当天末次行为时间
    duration_seconds  BIGINT       -- 活跃时长(秒)
) PARTITIONED BY (dt STRING)
STORED AS PARQUET;

-- DWS 层计算任务
INSERT OVERWRITE TABLE dws.user_daily_summary PARTITION (dt = '${bizdate}')
SELECT
    user_id,
    '${bizdate}' as dt,

    COUNT(1) as pv,
    SUM(CASE WHEN event_type = 'click' THEN 1 ELSE 0 END) as click_cnt,
    SUM(CASE WHEN event_type = 'add_cart' THEN 1 ELSE 0 END) as cart_cnt,
    SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as order_cnt,
    SUM(CASE WHEN event_type = 'purchase' THEN price ELSE 0 END) as order_amount,
    SUM(CASE WHEN is_paid THEN 1 ELSE 0 END) as pay_cnt,
    SUM(CASE WHEN is_paid THEN price ELSE 0 END) as pay_amount,

    COUNT(DISTINCT product_id) as product_cnt,
    COUNT(DISTINCT category_id) as category_cnt,

    MIN(event_time) as first_event_time,
    MAX(event_time) as last_event_time,
    UNIX_TIMESTAMP(MAX(event_time)) - UNIX_TIMESTAMP(MIN(event_time)) as duration_seconds

FROM dwd.user_behavior_detail
WHERE dt = '${bizdate}'
  AND user_id IS NOT NULL
GROUP BY user_id;

ADS 层:应用数据层

ADS 层(Application Data Service,也称「应用数据层」)直接面向业务查询,是 BI 系统和数据接口的底层数据来源。

-- ADS 层:运营大盘实时数据(供大屏使用)
CREATE TABLE ads.operation_dashboard (
    dt              STRING,
    -- 核心指标
    dau             BIGINT,       -- 日活用户数
    new_user_cnt    BIGINT,       -- 新增用户数
    order_cnt       BIGINT,       -- 订单数
    order_amount    DECIMAL(16,2),-- GMV
    pay_rate        DECIMAL(6,4), -- 支付转化率
    avg_order_price DECIMAL(10,2),-- 客单价

    -- 商品维度
    top_category    STRING,       -- GMV 最高的类目
    top_product_id  STRING,       -- GMV 最高的商品

    -- 时间戳
    update_time     TIMESTAMP
) STORED AS PARQUET
TBLPROPERTIES ('parquet.compression' = 'SNAPPY');

存储选型

数据平台的存储选型,不是选「哪个数据库最好」,而是选「哪个数据库最适合这个场景」。

四类存储引擎的定位

存储引擎数据量级查询特点适用场景不适用场景
Hive(HDFS)TB-PB复杂分析、批量扫描离线数据仓库、ETL 批处理实时查询、点查
ClickHouseTB-PB高速 OLAP、实时聚合BI 报表、实时分析大屏高并发点查、事务更新
ElasticsearchTB 级全文检索、复杂过滤日志分析、用户行为检索聚合计算、Join 查询
HBasePB 级KV 快速读写画像数据、实时推荐特征复杂分析、Scan 查询

ClickHouse 为什么这么快

ClickHouse 是近年来大数据领域最火热的 OLAP 引擎,由俄罗斯搜索巨头 Yandex 开源。它的核心设计哲学是:用列式存储 + 向量化执行 + 数据压缩,让查询尽可能靠近磁盘带宽而不是 CPU

-- ClickHouse 建表:用户行为分析表
CREATE TABLE ch.user_behaviors (
    user_id       UInt32,
    event_type    Enum8('click'=1, 'view'=2, 'cart'=3, 'purchase'=4, 'refund'=5),
    product_id    UInt32,
    category_id   UInt32,
    brand_id      UInt32,
    price         Decimal(10,2),
    quantity       UInt16,
    province      FixedString(24),
    city          FixedString(24),
    source        UInt8,     -- 1=app 2=h5 3=pc 4=小程序
    channel       String,
    event_time    DateTime,
    dt            Date
) ENGINE = MergeTree()
PARTITION BY dt
ORDER BY (dt, event_type, user_id)  -- 排序键:决定数据如何物理排列
SETTINGS index_granularity = 8192;  -- 索引粒度:每隔 8192 行建一个稀疏索引

-- 查询示例:计算各渠道的 GMV(ClickHouse 秒级返回上亿行数据)
SELECT
    channel,
    count() as cnt,
    sum(price * quantity) as gmv
FROM user_behaviors
WHERE dt >= '2024-01-01' AND dt <= '2024-01-15'
  AND event_type = 'purchase'
GROUP BY channel
ORDER BY gmv DESC
LIMIT 100;

ClickHouse 快的原因:

列式存储:只需要读取查询涉及的列,不需要读取整行数据。比如查询所有渠道的 GMV,只需要读取 channelprice 两列,不需要读取 user_idcity 等无关列。

向量化执行:一次性处理一批数据(通常 1024 行),而不是一行一行处理。现代 CPU 的 SIMD 指令可以在单条指令里处理多个数据,减少指令数和分支预测失败。

数据压缩:列式存储的数据重复度高(如 event_type 列只有 5 种值),压缩后磁盘读取量大幅减少。压缩比通常在 5-10 倍,意味着读取速度提升 5-10 倍。

稀疏索引:每 8192 行建一个稀疏索引,查询时先通过稀疏索引过滤不相关的数据块,再读取数据。避免全表扫描。

实时数据的采集和处理链路如下:

App/后端 → 埋点 SDK → Kafka → Flink 实时计算 → ClickHouse/Doris
// Flink 实时计算:实时 DAU 和 GMV
public class RealTimeMetricsJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        // 开启 Checkpoint:Flink 任务失败后可以从最近快照恢复
        env.enableCheckpointing(60000L);  // 每 60 秒做一次 Checkpoint
        env.getCheckpointConfig().setCheckpointStorage("hdfs://ns1/flink/checkpoints");

        // Kafka 数据源
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
            "user-behavior-events",
            new SimpleStringSchema(),
            kafkaProps
        );
        kafkaSource.setStartFromEarliest();

        DataStream<String> rawStream = env.addSource(kafkaSource);

        // 解析 JSON
        DataStream<UserEvent> events = rawStream
            .map(json -> JSON.parseObject(json, UserEvent.class))
            .filter(e -> e.getUserId() != null && e.getEventTime() != null);

        // ===== 实时 DAU =====
        DataStream<WindowResult> dauStats = events
            .keyBy(e -> e.getDt())  // 按日期 keyBy
            .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
            .aggregate(new DauAggregator());

        // ===== 实时 GMV =====
        DataStream<WindowResult> gmvStats = events
            .filter(e -> "purchase".equals(e.getEventType()))
            .keyBy(e -> e.getDt())
            .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
            .aggregate(new GmvAggregator());

        // ===== 实时类目 GMV 排行榜 =====
        DataStream<CategoryRank> categoryRank = events
            .filter(e -> "purchase".equals(e.getEventType()))
            .keyBy(e -> e.getCategoryId())
            .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
            .aggregate(new CategoryGmvAggregator());

        // 输出到 ClickHouse(JDBC Connector)
        dauStats.addSink(new ClickHouseJdbcSink(
            "jdbc:clickhouse://localhost:8123/default",
            "INSERT INTO ads.realtime_dau (dt, window_start, window_end, dau) VALUES(?,?,?,?)"));

        gmvStats.addSink(new ClickHouseJdbcSink(
            "jdbc:clickhouse://localhost:8123/default",
            "INSERT INTO ads.realtime_gmv (dt, window_start, window_end, order_cnt, gmv) VALUES(?,?,?,?,?)"));

        categoryRank.addSink(new ClickHouseJdbcSink(
            "jdbc:clickhouse://localhost:8123/default",
            "INSERT INTO ads.category_gmv_rank (dt, category_id, category_name, rank, gmv) VALUES(?,?,?,?,?)"));

        env.execute("Real-time Metrics Job");
    }
}

// DAU 聚合器
public class DauAggregator
    implements AggregateFunction<UserEvent, Set<String>, WindowResult> {

    @Override
    public Set<String> createAccumulator() {
        return new HashSet<>();  // 用 Set 自动去重
    }

    @Override
    public Set<String> add(UserEvent event, Set<String> accumulator) {
        accumulator.add(event.getUserId());  // Set 去重
        return accumulator;
    }

    @Override
    public WindowResult getResult(Set<String> accumulator) {
        return new WindowResult(accumulator.size());
    }

    @Override
    public Set<String> merge(Set<String> a, Set<String> b) {
        a.addAll(b);
        return a;
    }
}

// GMV 聚合器
public class GmvAggregator
    implements AggregateFunction<UserEvent, GmvAccumulator, WindowResult> {

    @Override
    public GmvAccumulator createAccumulator() {
        return new GmvAccumulator(0, BigDecimal.ZERO);
    }

    @Override
    public GmvAccumulator add(UserEvent event, GmvAccumulator acc) {
        acc.orderCnt++;
        acc.gmv = acc.gmv.add(event.getPrice().multiply(
            BigDecimal.valueOf(event.getQuantity())));
        return acc;
    }

    @Override
    public WindowResult getResult(GmvAccumulator acc) {
        return new WindowResult(acc.orderCnt, acc.gmv);
    }

    @Override
    public GmvAccumulator merge(GmvAccumulator a, GmvAccumulator b) {
        return new GmvAccumulator(
            a.orderCnt + b.orderCnt,
            a.gmv.add(b.gmv)
        );
    }
}

数据治理:冷热分层与成本控制

数据平台最大的隐性成本是存储成本。每天 710GB 的数据,如果全量保留两年,需要约 518TB 存储。按云存储 0.12 元/GB/月计算,一年的存储成本高达 75 万元。

冷热分层策略

冷热分层(Tiered Storage)是降低存储成本的核心手段。核心思想是:把访问频率高的数据放在快速存储(SSD),把访问频率低的数据放在低成本存储(HDD / 对象存储)

# 数据生命周期管理配置
lifecycle:
  - table: dwd.user_behavior_detail
    retention:
      hot: 30d    # 最近 30 天:SSD,Parquet + Zstd 压缩
      warm: 90d   # 30-90 天:普通 HDD,Parquet + LZ4 压缩
      cold: 365d  # 90 天 - 1 年:对象存储(阿里云 OSS / AWS S3),ORC + LZ4
      archive: 730d # 1-2 年:归档存储,仅在需要时恢复

  - table: ods.user_behavior_log
    retention:
      hot: 7d    # ODS 层只保留 7 天
      cold: 0d   # 7 天后直接删除(原始日志可以从 Kafka 重建)

ClickHouse 冷热分层实现

-- ClickHouse 冷热分层存储(需要 ClickHouse Cloud 或自建配置)
-- 语法示例(ClickHouse 22.8+)
CREATE TABLE user_behaviors (
    user_id     UInt32,
    event_type  String,
    product_id  UInt32,
    event_time  DateTime,
    dt          Date
) ENGINE = MergeTree()
ORDER BY (dt, event_type, user_id)
TTL dt + INTERVAL 30 DAY  -- 30 天后自动移动到冷存储
SETTINGS storage_policy = 'tiered';  -- tiered: 热 + 冷分层

-- 查看存储使用情况
SELECT
    table,
    sum(rows) as total_rows,
    sum(bytes_on_disk) as total_bytes,
    sum(bytes_on_disk) / 1024 / 1024 / 1024 as size_gb
FROM system.parts
WHERE table LIKE 'dwd.%'
GROUP BY table
ORDER BY size_gb DESC;

真实踩坑案例

某公司的 Flink 任务在处理用户行为数据时,TaskManager 频繁 OOM。排查发现:大量用户的 user_id 为空,被归类到同一个 bucket 中,导致某个 Flink 分区的数据量是其他分区的 100 倍。

教训:Flink 的 keyBy 如果 key 分布不均匀,会造成数据倾斜。在 keyBy 之前,需要先过滤掉异常 key,或者用随机分区替代 keyBy。

踩坑二:Hive 小文件过多

某公司的 Hive 任务每天产生数十万个 10MB 以下的小文件,导致 HDFS NameNode 内存压力增大,后续任务启动变慢。

原因:Flink 写入 Kafka 的数据被切分成很多小批次,每个批次触发一个 Hive 分区的写入,产生了大量小文件。

解法:在 Flink 到 Hive 的写入中间加一层 Kafka 到 Hive 的离线同步任务,使用 distribute by rand() 合并小文件。

踩坑三:ClickHouse Join 爆内存

某分析师写了一条跨 3 张表的 Join 查询,结果 ClickHouse 直接报 Memory limit exceeded 并崩溃。

原因:ClickHouse 的 JOIN 默认会把右表全部加载到内存,如果右表很大,内存直接爆。

解法

  1. 改写 SQL,用子查询替代 JOIN,让小表先做聚合
  2. 使用 GLOBAL IN 替代 JOIN,ClickHouse 会把数据广播到所有节点
  3. 调整 max_memory_usage 参数,分配更多内存
-- 错误的写法:直接 JOIN
SELECT a.user_id, b.user_name, c.order_cnt
FROM dwd.user a
JOIN dim.user_info b ON a.user_id = b.user_id
JOIN ads.user_orders c ON a.user_id = c.user_id  -- [!code error]

-- 正确的写法:子查询 + GLOBAL IN
SELECT user_id, user_name, order_cnt
FROM dwd.user a
ANY LEFT JOIN (
    SELECT user_id, user_name FROM dim.user_info
) b GLOBAL IN (SELECT user_id, user_name FROM dim.user_info)  
ON a.user_id = b.user_id
ANY LEFT JOIN (
    SELECT user_id, any(order_cnt) as order_cnt
    FROM ads.user_orders GROUP BY user_id
) c ON a.user_id = c.user_id;

术语表

术语类型说明
ODS(Operational Data Store)数仓分层原始数据层,数据几乎不做处理,保留全量原始日志
DWD(Data Warehouse Detail)数仓分层明细数据层,完成数据清洗、业务过程定义、维度关联
DWS(Data Warehouse Summary)数仓分层汇总数据层,按主题做轻度聚合,生成公共指标
ADS(Application Data Service)数仓分层应用数据层,直接面向业务查询
Parquet / ORC存储格式列式存储格式,Parquet 由 Twitter/Cloudera 开发,ORC 由 Hive 开发,均支持列裁剪和压缩
ClickHouse引擎名Yandex 开源的高速 OLAP 数据库,擅长海量数据的聚合分析查询
Flink框架名Apache 顶级项目,分布式流处理引擎,支持有状态的流式计算和 exactly-once 语义
Kafka框架名Apache 顶级项目,分布式消息队列,用于实时数据采集和解耦
ETL / ELT技术名词Extract-Transform-Load / Extract-Load-Transform,数据抽取、转换、加载的过程
数据倾斜故障类型数据在分区/分片间分布不均匀,导致部分节点负载过重
向量化执行(Vectorization)技术名词一次性处理一批数据(通常 1024 行),利用 CPU SIMD 指令提升计算吞吐量
物化视图技术名词预计算并存储查询结果,加速复杂聚合查询,ClickHouse 的 SummingMergeTree 是物化视图的一种
Checkpoint技术名词Flink 的故障恢复机制,定期将算子状态快照保存到持久存储,任务失败后从最近 Checkpoint 恢复
小文件问题故障类型大量小文件会增加 HDFS NameNode 内存压力,也会降低读取吞吐量

总结

海量数据处理的核心是在成本和性能之间找到动态平衡点

完整数据链路

App/后端 → 埋点 → Kafka → Flink 实时计算 → ClickHouse/Doris(实时)

                     ODS(全量原始数据)→ DWD(清洗)→ DWS(汇总)→ ADS(应用)

存储选型的决策树

  • 需要高速 OLAP 查询(BI 报表)→ ClickHouse
  • 需要全文检索(日志分析)→ Elasticsearch
  • 需要 KV 快速读写(画像特征)→ HBase / Redis
  • 需要离线批处理(ETL 报表)→ Hive / Spark

成本控制的三个手段

  • 冷热分层存储:热数据用 SSD,冷数据用对象存储
  • 数据压缩:列式存储 + 适当压缩算法,减少磁盘占用
  • 生命周期管理:定期清理过期数据,控制存储总量

真正让数据平台产生价值的,不是用了多少 TB 的存储,而是「数据能不能用起来」。再好的数仓架构,如果没有人用、没有业务价值,就是成本而不是投资。