#分布式限流(Redis + Lua)
单机限流只能保护单机,无法保护整个分布式系统。
当系统有多台服务器时,每台服务器都有自己的限流器。但每个限流器只知道本机的请求量,不知道整个系统的请求量。这就是分布式限流要解决的问题。
#为什么需要分布式限流
flowchart LR
subgraph 单机限流的问题
A["用户"] --> B["服务器 1\n(限流 100QPS)"]
A --> C["服务器 2\n(限流 100QPS)"]
A --> D["服务器 3\n(限流 100QPS)"]
Note over B,C,D: 每个限流器只知道本机请求量
Note over B,C,D: 总请求量 = 300,限流失效!
end
subgraph 分布式限流
E["用户"] --> F["服务器 1"]
E --> G["服务器 2"]
E --> H["服务器 3"]
F & G & H --> I["Redis\n(统一限流)"]
I --> |"超过阈值| J["拒绝请求"]
end#Redis 限流的核心模式
#模式一:计数器模式
最简单的分布式限流:
counter_limit.lua
-- 简单计数器限流
-- KEYS[1]: 限流 key
-- ARGV[1]: 时间窗口(秒)
-- ARGV[2]: 最大请求数
local key = KEYS[1]
local window = tonumber(ARGV[1])
local limit = tonumber(ARGV[2])
local current = redis.call('INCR', key)
if current == 1 then
redis.call('EXPIRE', key, window)
end
if current > limit then
return 0
else
return 1
end#模式二:滑动日志模式
最精确的限流,记录每个请求的时间戳:
sliding_log_limit.lua
-- 滑动日志限流
-- KEYS[1]: 限流 key
-- ARGV[1]: 时间窗口(毫秒)
-- ARGV[2]: 最大请求数
local key = KEYS[1]
local window = tonumber(ARGV[1])
local limit = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
-- 删除窗口外的日志
local windowStart = now - window
redis.call('ZREMRANGEBYSCORE', key, '-inf', windowStart)
-- 获取当前请求数
local currentCount = redis.call('ZCARD', key)
if currentCount < limit then
-- 添加当前请求
redis.call('ZADD', key, now, now .. '-' .. math.random())
redis.call('PEXPIRE', key, window)
return 1
else
return 0
end#模式三:令牌桶模式
支持突发流量的分布式限流:
token_bucket_limit.lua
-- 令牌桶分布式限流
-- KEYS[1]: 限流 key
-- ARGV[1]: 桶容量
-- ARGV[2]: 补充速率(每秒)
-- ARGV[3]: 请求令牌数
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refillRate = tonumber(ARGV[2])
local requested = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
-- 获取当前状态
local bucket = redis.call('HMGET', key, 'tokens', 'lastRefill')
local tokens = tonumber(bucket[1]) or capacity
local lastRefill = tonumber(bucket[2]) or now
-- 计算应该补充的令牌数
local elapsed = (now - lastRefill) / 1000.0 -- 转换为秒
tokens = math.min(capacity, tokens + elapsed * refillRate)
-- 检查是否足够
if tokens >= requested then
tokens = tokens - requested
redis.call('HMSET', key, 'tokens', tokens, 'lastRefill', now)
redis.call('PEXPIRE', key, 60)
return 1
else
redis.call('HMSET', key, 'tokens', tokens, 'lastRefill', now)
redis.call('PEXPIRE', key, 60)
return 0
end#多维度限流
实际生产环境中,限流往往需要多维度:
MultiDimensionRateLimiter.java
@Service
public class MultiDimensionRateLimiter {
private final RedisTemplate<String, String> redisTemplate;
// Lua 脚本缓存
private final Map<String, RedisScript<Long>> scriptCache = new ConcurrentHashMap<>();
public MultiDimensionRateLimiter(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
/**
* 多维度限流检查
*/
public RateLimitResult checkRateLimit(RateLimitRequest request) {
String[] keys = {
"limit:user:" + request.getUserId(), // 用户维度
"limit:ip:" + request.getIp(), // IP 维度
"limit:api:" + request.getApi(), // API 维度
"limit:global" // 全局限流
};
int[] limits = {
100, // 用户:每秒 100 请求
200, // IP:每秒 200 请求
500, // API:每秒 500 请求
10000 // 全局:每秒 10000 请求
};
for (int i = 0; i < keys.length; i++) {
boolean allowed = tryAcquire(keys[i], limits[i], 1);
if (!allowed) {
return RateLimitResult.rejected(keys[i], limits[i]);
}
}
return RateLimitResult.allowed();
}
private boolean tryAcquire(String key, int limit, int count) {
String script = loadScript("counter_limit.lua");
RedisScript<Long> redisScript = scriptCache.computeIfAbsent(script,
s -> new DefaultRedisScript<>(s, Long.class));
Long result = redisTemplate.execute(
redisScript,
List.of(key),
1, // 窗口大小(秒)
limit
);
return result != null && result == 1;
}
}#集群限流
当 Redis 是集群模式时,需要特殊处理:
#Redis Cluster 下的限流
ClusterRateLimiter.java
public class ClusterRateLimiter {
private final RedisClusterTemplate clusterTemplate;
/**
* 使用哈希槽分片
* 优点:不同维度的限流可以聚合计算
* 缺点:实现复杂
*/
public boolean checkRateLimit(String userId, String api) {
// 将用户和 API 组合成同一个 key,确保同一用户同一 API 的请求打到同一个 slot
String combinedKey = userId + ":" + api;
// 计算 slot
int slot = slot(combinedKey);
// 使用 tag 路由到正确的节点
String keyWithTag = "{" + combinedKey + "}";
return tryAcquire(keyWithTag, 100, 1);
}
private int slot(String key) {
// CRC16 哈希槽算法
return Math.abs(CRC16.calculate(key) % 16384);
}
}#Redisson 分布式限流
Redisson 提供了开箱即用的分布式限流实现:
RedissonRateLimiter.java
@Service
public class RedissonRateLimiter {
private final RedissonClient redissonClient;
private final RRateLimiter userLimiter;
private final RRateLimiter ipLimiter;
private final RRateLimiter globalLimiter;
public RedissonRateLimiter(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
// 用户维度:每秒 100 个许可
this.userLimiter = redissonClient.getRateLimiter("limit:user:");
this.userLimiter.trySetRate(RateIntervalUnit.SECONDS, 100, 1, RateLimitParameters.Direction.BOTH);
// IP 维度:每秒 200 个许可
this.ipLimiter = redissonClient.getRateLimiter("limit:ip:");
this.ipLimiter.trySetRate(RateIntervalUnit.SECONDS, 200, 1, RateLimitParameters.Direction.BOTH);
// 全局维度:每秒 10000 个许可
this.globalLimiter = redissonClient.getRateLimiter("limit:global:");
this.globalLimiter.trySetRate(RateIntervalUnit.SECONDS, 10000, 1, RateLimitParameters.Direction.BOTH);
}
public boolean tryAcquire(String userId, String ip) {
// 获取所有维度的许可
boolean userAllowed = userLimiter.tryAcquire(1, userId);
boolean ipAllowed = ipLimiter.tryAcquire(1, ip);
boolean globalAllowed = globalLimiter.tryAcquire(1, "global");
return userAllowed && ipAllowed && globalAllowed;
}
public RateLimitResult tryAcquireWithResult(String userId, String ip) {
boolean userAllowed = userLimiter.tryAcquire(1, userId);
boolean ipAllowed = ipLimiter.tryAcquire(1, ip);
boolean globalAllowed = globalLimiter.tryAcquire(1, "global");
if (userAllowed && ipAllowed && globalAllowed) {
return RateLimitResult.allowed();
}
List<String> rejected = new ArrayList<>();
if (!userAllowed) rejected.add("user");
if (!ipAllowed) rejected.add("ip");
if (!globalAllowed) rejected.add("global");
return RateLimitResult.rejected(rejected);
}
}#限流 Key 的设计
合理的 Key 设计对于限流效果至关重要:
# 限流 Key 设计规范
key_patterns:
# 用户维度
limit:user:{userId}
# 说明:每个用户独立的限流计数器
# IP 维度
limit:ip:{ip}
# 说明:每个 IP 独立的限流计数器
# 接口维度
limit:api:{api}
# 说明:每个接口独立的限流计数器
# 用户+接口维度(更精细)
limit:user:{userId}:api:{api}
# 说明:特定用户对特定接口的限流
# 全局限流
limit:global
# 说明:整个系统的全局限流
# Key 过期时间
# 建议设置为限流窗口的 2 倍,避免边界问题
expiry:
second_level: 2 # 1 秒窗口,过期 2 秒
minute_level: 120 # 1 分钟窗口,过期 2 分钟#分布式限流的挑战
#挑战一:性能
Redis 限流会增加每次请求的延迟:
# 性能影响分析
延迟:
- 无 Redis 限流: 1-2ms
- 本地 Redis: 2-5ms
- 远程 Redis: 5-10ms
# 优化策略
优化:
- "本地限流 + Redis 校正"
- "批量请求,减少 Redis 调用"
- "异步写入,本地计数"#挑战二:一致性
分布式限流的一致性问题:
| 问题 | 解决方案 |
|---|---|
| Redis 单点故障 | Redis Cluster / Sentinel |
| 网络分区 | 降级为单机限流 |
| 数据漂移 | 使用滑动窗口而非固定窗口 |
#挑战三:多 Redis 实例
当有多个 Redis 实例时:
MultiRedisRateLimiter.java
public class MultiRedisRateLimiter {
private final List<RedisTemplate<String, String>> redisTemplates;
/**
* 使用一致性哈希选择 Redis 实例
*/
public boolean tryAcquire(String key, int limit) {
// 一致性哈希选择节点
int index = consistentHash(key);
RedisTemplate<String, String> redis = redisTemplates.get(index);
String script = loadScript("counter_limit.lua");
Long result = redis.execute(
new DefaultRedisScript<>(script, Long.class),
List.of(key),
1, limit
);
return result != null && result == 1;
}
}#监控与告警
分布式限流监控
# 限流关键指标
metrics:
- name: rate_limit_check_total
type: counter
labels: [dimension, result]
description: "限流检查次数 (allowed/rejected)"
- name: rate_limit_rejection_rate
type: gauge
labels: [dimension]
description: "限流拒绝率"
- name: redis_operation_duration
type: histogram
description: "Redis 操作耗时"
# 告警规则
alerts:
- name: HighRejectionRate
condition: rejection_rate > 0.1
severity: warning
message: "限流拒绝率超过 10%"
- name: RedisLatencyHigh
condition: p99_latency > 10ms
severity: warning
message: "Redis 操作延迟过高"#本章总结
核心要点:
- 分布式限流需要中心化的存储:Redis 是最常用的选择
- Lua 脚本保证限流操作的原子性:避免竞态条件
- 滑动日志是最精确的限流算法:但内存开销大
- 令牌桶支持突发流量:适合大多数场景
- 多维度限流更精细:用户、IP、接口、全局层层防护