字节推荐架构
2012 年 3 月,张一鸣在北京知春路的一间民宅里,创办了字节跳动的第一个产品:内涵段子。那时候没有人能想到,这家公司会在十年后成为全球最具影响力的内容平台——今日头条、抖音、TikTok,字节的产品覆盖了全球数十亿用户。
张一鸣的核心判断是:「未来的内容分发,一定是算法说了算,而不是编辑。用户不需要告诉他想要什么,算法会比用户自己更懂他。」
这个判断在二十年后看是惊人的准确——TikTok 的推荐算法,让全球用户在不知不觉中刷了几十个小时的视频,而完全不觉得无聊。
公司画像
字节跳动(ByteDance)是全球最大的内容平台之一,旗下产品包括今日头条(中文资讯)、抖音(中文短视频)、TikTok(海外短视频)、西瓜视频(长视频)等。截至 2024 年,TikTok 全球月活用户超过 15 亿,抖音日活用户超过 7 亿。
理解字节技术挑战的关键,在于它的推荐系统特性:
- 实时反馈循环:用户每划过一条视频,都在给算法提供训练样本。算法在用户使用过程中实时更新,用户下次打开时推荐就已经更新了。
- 内容冷启动:每天有数百万新视频发布,这些视频没有历史数据,推荐系统必须在极短时间内判断它们的价值。
- 规模极端化:字节的推荐系统每天处理超过 1 万亿次特征计算,单次推荐请求需要在 100ms 内完成。
- 多目标优化:推荐不仅要提升点击率,还要优化播放时长、互动率、完播率等多个目标。
架构演进时间线
第一阶段:今日头条的起步(2012-2015)
推荐系统的核心问题
今日头条的推荐系统,核心是解决一个经典问题:给定用户 U 和内容 C,预测用户 U 对内容 C 的偏好分数 P。
早期采用简单的协同过滤算法:
# 基于用户的协同过滤(简化版)
def recommend_for_user(user_id, n=20):
# 1. 找到与该用户相似的用户
similar_users = find_similar_users(user_id, top_k=100)
# 2. 获取相似用户喜欢的文章
candidate_articles = []
for u in similar_users:
articles = get_user_liked_articles(u)
for article in articles:
# 加权:相似度 × 用户对文章的评分
score = user_similarity[user_id][u] * article_score[u][article]
candidate_articles.append((article, score))
# 3. 按分数排序,取 Top N
candidate_articles.sort(key=lambda x: x[1], reverse=True)
return candidate_articles[:n]
但协同过滤的局限性很快暴露:
- 冷启动:新用户没有历史行为,无法计算相似度
- 稀疏性:用户-文章矩阵极度稀疏,大多数用户只看过极少数文章
- 内容特征缺失:协同过滤只看用户行为,不看文章内容本身
特征工程的重要性
头条工程师很快发现:推荐系统的效果,80% 取决于特征工程,20% 取决于模型选择。
早期特征体系:
# 用户特征
user_features = {
"user_id": "123456",
"age": 28,
"gender": "male",
"city": "beijing",
"device": "iphone",
"interest_categories": ["科技", "体育", "财经"], # 从历史行为推断
"reading_history": ["article_1", "article_2", ...], # 最近 N 天看过
"active_hours": [9, 10, 11, 14, 15, 20, 21], # 活跃时段
}
# 文章特征
article_features = {
"article_id": "789",
"author_id": "author_456",
"category": "科技",
"tags": ["AI", "大模型", "ChatGPT"],
"publish_time": "2024-03-15 10:00",
"word_count": 2000,
"has_video": False,
"has_images": True,
"title_length": 25,
}
第二阶段:规模化与 Lambda 架构(2015-2017)
Lambda 架构:实时与离线的融合
到 2016 年,今日头条的用户超过 1 亿,每天的行为数据超过 100TB。离线批处理(每天重新训练一次模型)已经无法满足需求——用户今天的行为,要到明天才能反映到推荐结果上。
字节引入了 Lambda 架构——同时维护实时和离线两条数据流:
- 离线层:每天凌晨跑全量数据,训练模型,生成「天级」推荐候选集
- 实时层:每小时/分钟增量更新,处理用户的最新行为
# Lambda 架构:双流融合
class LambdaRecommender:
def recommend(self, user_id, context):
# 1. 离线候选:从天级全量候选集中召回
offline_candidates = self.offline_index.get(
user_id, top_k=1000)
# 2. 实时召回:从实时行为队列中召回
# 用户最近 1 小时的行为更「热」
realtime_candidates = self.realtime_index.get(
user_id, top_k=100, recency_weight=0.3)
# 3. 合并候选集
all_candidates = merge_and_dedup(
offline_candidates, realtime_candidates)
# 4. 精排:模型打分
scores = self.ranker.predict(user_id, all_candidates)
# 5. 取 Top N
return sorted(scores, key=lambda x: x.score, reverse=True)[:20]
# 实时索引:基于 Kafka 事件实时更新
class RealtimeIndexUpdater:
def on_user_action(self, event):
"""
event: {
"user_id": "123",
"article_id": "789",
"action": "read",
"duration": 120, # 阅读时长(秒)
"timestamp": 1710000000
}
"""
# 1. 更新用户实时特征
self.update_user_features(event)
# 2. 更新文章的实时统计(播放量、互动率)
self.update_article_stats(event)
# 3. 实时召回索引增量更新
if event['action'] == 'read' and event['duration'] > 30:
# 阅读超过 30 秒,视为正向行为
self.realtime_index.add_positive(
event['user_id'], event['article_id'])
A/B 测试平台
字节是全球最早将 A/B 测试系统化的公司之一。2016 年,字节上线了覆盖全产品的 A/B 测试平台,支撑了推荐算法的快速迭代。
# A/B 测试框架
class ABTest:
def get_experiment_group(self, user_id, experiment_name):
"""
哈希用户 ID,确保同一用户始终进入同一实验组
"""
hash_key = f"{user_id}:{experiment_name}:salt_2024"
hash_value = hashlib.md5(hash_key.encode()).hexdigest()
bucket = int(hash_value, 16) % 1000 # 0-999
experiment = self.experiment_config[experiment_name]
# 10% 用户进入实验组
if bucket < experiment['treatment_ratio'] * 1000:
return 'treatment'
else:
return 'control'
def run_recommend_experiment(self, user_id):
group = self.get_experiment_group(user_id, "recommend_v3")
if group == 'treatment':
# 新算法:使用 DeepFM 模型
return self.deepfm_ranker.recommend(user_id)
else:
# 旧算法:使用 Logistic Regression
return self.lr_ranker.recommend(user_id)
第三阶段:Flink 实时计算(2017-2019)
Flink 在字节的应用
2017 年,字节引入了 Apache Flink 作为实时计算的核心引擎。Flink 的流式计算模型,非常适合字节的场景——用户行为是永不停歇的数据流,需要实时处理。
// Flink 作业:实时计算用户特征和文章统计
public class UserFeatureJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1000); // 1000 个并发实例
// 1. 从 Kafka 消费用户行为事件
DataStream<UserAction> actions = env
.addSource(new KafkaSource<>("user-actions"))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<UserAction>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((e, ts) -> e.getTimestamp())
);
// 2. 实时更新用户特征(滑动窗口)
DataStream<UserFeatures> userFeatures = actions
.keyBy(UserAction::getUserId)
.window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
.process(new UserFeatureAggregator());
// 3. 实时更新文章统计
DataStream<ArticleStats> articleStats = actions
.keyBy(UserAction::getArticleId)
.window(SlidingEventTimeWindows.of(Time.minutes(30), Time.minutes(5)))
.process(new ArticleStatsAggregator());
// 4. 写入在线特征存储(Redis / Feature Store)
userFeatures.addSink(new FeatureStoreSink("user_features"));
articleStats.addSink(new FeatureStoreSink("article_stats"));
env.execute("UserFeatureJob");
}
}
// 窗口聚合:5 分钟窗口内用户的阅读偏好
public class UserFeatureAggregator
extends KeyedProcessFunction<String, UserAction, UserFeatures> {
// 状态:窗口内的行为计数
private ValueState<Integer> readCountState;
private ValueState<Set<String>> categoryState;
private ValueState<Double> avgDurationState;
@Override
public void processElement(UserAction action, Context ctx,
Collector<UserFeatures> out) throws Exception {
// 累加阅读次数
Integer count = readCountState.value();
readCountState.update(count == null ? 1 : count + 1);
// 更新阅读类目分布
Set<String> categories = categoryState.value();
if (categories == null) categories = new HashSet<>();
categories.add(action.getCategory());
// 累加阅读时长,计算平均值
Double totalDuration = avgDurationState.value();
Integer totalCount = readCountState.value();
Double newAvg = totalDuration == null
? action.getDuration()
: (totalDuration + action.getDuration()) / totalCount;
avgDurationState.update(newAvg);
// 输出特征快照
UserFeatures features = new UserFeatures();
features.setUserId(action.getUserId());
features.setReadCount(count);
features.setTopCategories(categories);
features.setAvgDuration(newAvg);
features.setWindowEnd(ctx.timestamp());
out.collect(features);
}
}
第四阶段:深度学习推荐模型(2019-2022)
DeepFM:点击率预估模型
2019 年,字节的推荐模型从 Logistic Regression 升级到 DeepFM,这是一个结合了因子分解机(FM)和深度学习的混合模型。
DeepFM 的核心思想:同时学习低阶特征交叉(FM 部分)和高阶特征交叉(Deep 部分)。
# DeepFM 模型(PyTorch 实现)
import torch
import torch.nn as nn
class DeepFM(nn.Module):
def __init__(self, feature_dims, embed_dim=16, hidden_dims=[256, 128, 64]):
super().__init__()
# 1. 特征嵌入层
self.embeddings = nn.ModuleList([
nn.Embedding(dim, embed_dim) for dim in feature_dims
])
# 2. FM 部分:一阶特征 + 二阶交叉
self.first_order = nn.ModuleList([
nn.Embedding(dim, 1) for dim in feature_dims
])
# 3. Deep 部分:高阶特征交叉
layers = []
input_dim = len(feature_dims) * embed_dim
for hidden_dim in hidden_dims:
layers.append(nn.Linear(input_dim, hidden_dim))
layers.append(nn.ReLU())
layers.append(nn.Dropout(0.2))
input_dim = hidden_dim
self.deep = nn.Sequential(*layers)
# 4. 输出层
self.output_layer = nn.Linear(hidden_dims[-1] + len(feature_dims) + 1, 1)
def forward(self, feature_ids, feature_values):
# feature_ids: [batch, num_features] - 特征 ID
# feature_values: [batch, num_features] - 特征值
# FM 一阶项
first_orders = [self.first_order[i](feature_ids[:, i])
for i in range(len(self.embeddings))]
first_order_sum = sum(first_orders) # [batch, 1]
# FM 二阶项:embedding 的点积和
embeddings = [self.embeddings[i](feature_ids[:, i])
for i in range(len(self.embeddings))]
# sum-of-square - square-of-sum,优化计算
sum_square = sum(embeddings) ** 2
square_sum = sum([e ** 2 for e in embeddings])
second_order = (sum_square - square_sum).sum(dim=-1, keepdim=True) * 0.5
# Deep 部分
deep_input = torch.cat(embeddings, dim=-1)
deep_output = self.deep(deep_input)
# 合并 + 输出
concat = torch.cat([first_order_sum, second_order, deep_output], dim=-1)
output = torch.sigmoid(self.output_layer(concat))
return output
在线学习:分钟级模型更新
字节的推荐系统支持在线学习——用户行为数据可以在几分钟内影响模型参数,而不需要等待天级别的离线训练。
# 在线学习:FTRL 实时更新模型参数
class OnlineLearning:
def __init__(self, learning_rate=0.01):
self.lr = learning_rate
self.model = {} # feature_id -> weight
def update(self, features, label, pred):
"""
features: Dict[feature_id, feature_value]
label: 1 (click) or 0 (no click)
pred: 模型预测值
"""
error = label - pred # 预测误差
for feature_id, feature_value in features.items():
# FTRL 更新公式
if feature_id not in self.model:
self.model[feature_id] = 0.0
grad = error * feature_value # 梯度
self.model[feature_id] += self.lr * grad
# 推送到在线模型服务
model_service.update_feature(feature_id, self.model[feature_id])
def predict(self, features):
return sum(
self.model.get(fid, 0.0) * fval
for fid, fval in features.items()
)
架构启示
启示一:推荐系统的本质是特征工程
字节的实践经验表明,推荐系统的效果,80% 取决于特征,20% 取决于模型。
好的特征需要:
- 覆盖率:特征要能覆盖大多数用户和内容,冷启动要用全局统计填充
- 时效性:用户的兴趣会变化,特征要能反映最新状态
- 可解释性:特征的含义要清晰,方便分析和调优
建议:在优化模型之前,先审视特征体系是否完整。特征对了,简单的 LR 模型也能有不错的效果。
启示二:实时性是推荐系统的核心竞争力
传统的离线批处理模型,每天只能更新一次。字节的在线学习可以在几分钟内捕捉用户兴趣变化。
但实时性是有代价的:
- 工程复杂度急剧上升(实时特征、实时模型更新、实时监控)
- 容易引入噪声(新数据可能导致模型抖动)
- 运维成本高(Flink 作业的稳定性保障)
建议:核心特征(如用户类目偏好)优先实时化,非核心特征可以保持天级更新。
启示三:A/B 测试是算法迭代的基础设施
字节每天运行上千个 A/B 实验,这需要一套完整的基础设施:
- 流量分割:保证实验组和对照组的流量均匀
- 指标监控:实时监控实验组和对照组的各项指标
- 自动分析:判断实验是否显著,以及是否有副作用
建议:A/B 测试要从产品初期就建立,而不是等产品做大后再补。
术语表
总结
字节的技术演进,始终围绕一个核心命题:如何在极端规模下,让推荐算法比用户自己更懂用户。
演进脉络:
- 2012-2015:协同过滤起步,验证推荐算法价值
- 2015-2017:Lambda 架构,ABTest 平台,特征工程体系
- 2017-2019:Flink 实时计算,在线学习,分钟级模型更新
- 2019-2022:DeepFM 深度学习,多目标优化,Feature Store
- 2022-至今:多模态大模型,AIGC 内容生成,智能化推荐
核心技术亮点:
- Lambda 架构:实时与离线双流融合,平衡延迟和准确性
- Flink 实时计算:1000 并发实例,分钟级特征更新
- DeepFM 推荐模型:低阶 + 高阶特征交叉,点击率预估精度大幅提升
- 在线学习:用户行为在几分钟内影响模型参数
对普通项目的启发:
- 推荐系统效果 80% 取决于特征,先把特征工程做扎实
- 实时性有代价,按优先级逐步升级,不要一步到位
- A/B 测试是算法迭代的基础设施,要从初期就建立
- 多目标优化要找准核心指标,避免目标冲突