#漏桶算法实现
漏桶算法是另一种经典限流算法。与令牌桶不同,漏桶以固定速率输出,不管请求来得有多快。
漏桶的核心思想:有一个桶,请求像水一样流入桶中,桶底有一个洞,水以固定速率从洞里漏出去。当桶满了,新来的水就会溢出(被拒绝)。
#漏桶 vs 令牌桶
| 特性 | 漏桶 | 令牌桶 |
|---|---|---|
| 输出速率 | 固定速率 | 可变(有令牌就输出) |
| 突发流量 | 不支持,会被整形 | 支持,可一次性取多个令牌 |
| 队列行为 | 有队列,请求排队等待 | 无队列,直接拒绝或阻塞 |
| 实现复杂度 | 中等 | 中等 |
| 适用场景 | 需要平滑输出的场景 | 需要允许瞬时突发的场景 |
flowchart LR
subgraph 令牌桶
A["请求"] --> B["检查令牌"]
B --> |"有令牌| C["放行"]
B --> |"无令牌| D["拒绝"]
E["令牌补充"] --> F["令牌桶"]
end
subgraph 漏桶
G["请求"] --> H["进入队列"]
H --> |"队列| I["漏桶"]
I --> |"固定速率| J["放行"]
H --> |"队列满| K["拒绝"]
end选择建议:
- 需要平滑输出(如 API 限流保护下游):用漏桶
- 需要允许突发(如用户请求):用令牌桶
#漏桶原理
#核心变量
| 变量 | 说明 |
|---|---|
capacity | 桶的容量(最大队列长度) |
rate | 漏出速率(每秒处理多少请求) |
level | 当前桶中的请求数 |
lastLeakTime | 上次漏出的时间 |
#核心公式
当前水位 = min(capacity, level + (now - lastLeakTime) × rate)#漏桶实现
#基础实现
LeakyBucket.java
public class LeakyBucket {
private final long capacity; // 桶容量(最大队列长度)
private final double rate; // 漏出速率(每秒处理请求数)
private volatile long level; // 当前桶中的请求数
private volatile long lastLeakTime; // 上次漏出时间
public LeakyBucket(long capacity, double rate) {
this.capacity = capacity;
this.rate = rate;
this.level = 0;
this.lastLeakTime = System.nanoTime();
}
public synchronized boolean tryAdd() {
leak(); // 先漏掉一些
if (level < capacity) {
level++; // 加入一个请求
return true;
}
return false; // 桶满,拒绝
}
public synchronized boolean tryAdd(long timeoutMs) throws InterruptedException {
long deadline = System.currentTimeMillis() + timeoutMs;
while (level >= capacity) {
long waitTime = calculateWaitTime();
if (waitTime > 0) {
long waitUntil = Math.min(waitTime, deadline - System.currentTimeMillis());
if (waitUntil <= 0) {
return false; // 超时
}
wait(waitUntil);
}
leak();
}
level++;
return true;
}
private void leak() {
long now = System.nanoTime();
long elapsed = now - lastLeakTime;
if (elapsed > 0) {
// 计算应该漏掉的请求数
double leaked = (elapsed / 1_000_000_000.0) * rate;
level = Math.max(0, level - (long) leaked);
lastLeakTime = now;
}
}
private long calculateWaitTime() {
if (level < capacity) {
return 0;
}
// 需要等待多久才能漏掉一个请求
return (long) ((1.0 / rate) * 1_000);
}
public long getCurrentLevel() {
leak(); // 先漏掉一些再返回
return level;
}
}#无锁实现
LockFreeLeakyBucket.java
public class LockFreeLeakyBucket {
private final long capacity;
private final double rate;
private volatile long level; // 当前水位
private volatile long lastLeakTime; // 上次漏水时间
private static final AtomicLongFieldUpdater<LockFreeLeakyBucket> LEVEL_UPDATER =
AtomicLongFieldUpdater.newUpdater(LockFreeLeakyBucket.class, "level");
public LockFreeLeakyBucket(long capacity, double rate) {
this.capacity = capacity;
this.rate = rate;
this.level = 0;
this.lastLeakTime = System.nanoTime();
}
public boolean tryAcquire() {
while (true) {
long currentLevel = level;
leak();
if (currentLevel < capacity) {
if (LEVEL_UPDATER.compareAndSet(this, currentLevel, currentLevel + 1)) {
return true;
}
// CAS 失败,重试
continue;
}
return false;
}
}
private void leak() {
long now = System.nanoTime();
long lastTime = lastLeakTime;
long elapsed = now - lastTime;
if (elapsed > 0) {
long toLeak = (long) ((elapsed / 1_000_000_000.0) * rate);
if (toLeak > 0) {
long newLevel = Math.max(0, level - toLeak);
level = newLevel;
lastLeakTime = now;
}
}
}
}#漏桶的队列行为
漏桶内置队列,请求会排队等待:
LeakyBucketWithQueue.java
public class LeakyBucketWithQueue {
private final long capacity;
private final double rate;
private final Queue<Request> queue;
private volatile long lastLeakTime;
private final ScheduledExecutorService scheduler;
public LeakyBucketWithQueue(long capacity, double rate) {
this.capacity = capacity;
this.rate = rate;
this.queue = new ConcurrentLinkedQueue<>();
this.lastLeakTime = System.currentTimeMillis();
this.scheduler = Executors.newSingleThreadScheduledExecutor();
// 启动漏水线程
scheduler.scheduleAtFixedRate(this::leak, 0, (long) (1000 / rate), TimeUnit.MILLISECONDS);
}
public CompletableFuture<Result> submit(Request request) {
CompletableFuture<Result> future = new CompletableFuture<>();
if (queue.size() >= capacity) {
future.completeExceptionally(new RateLimitException("队列已满"));
return future;
}
queue.offer(new QueuedRequest(request, future));
return future;
}
private void leak() {
long now = System.currentTimeMillis();
long elapsed = now - lastLeakTime;
if (elapsed >= (1000 / rate)) {
QueuedRequest request = queue.poll();
if (request != null) {
try {
Result result = process(request.getRequest());
request.getFuture().complete(result);
} catch (Exception e) {
request.getFuture().completeExceptionally(e);
}
}
lastLeakTime = now;
}
}
public void shutdown() {
scheduler.shutdown();
}
}#Redis 漏桶实现
leaky_bucket.lua
-- Redis 漏桶 Lua 脚本
-- KEYS[1]: 限流 key
-- ARGV[1]: 桶容量
-- ARGV[2]: 漏出速率(每秒)
-- ARGV[3]: 当前时间戳(毫秒)
-- ARGV[4]: 请求需要的空间
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
-- 获取当前状态
local bucket = redis.call('HMGET', key, 'level', 'lastLeakTime')
local level = tonumber(bucket[1]) or 0
local lastLeakTime = tonumber(bucket[2]) or now
-- 计算应该漏掉多少
local elapsed = (now - lastLeakTime) / 1000.0 -- 转换为秒
local leaked = elapsed * rate
level = math.max(0, level - leaked)
-- 检查是否有足够空间
if level + requested <= capacity then
level = level + requested
redis.call('HMSET', key, 'level', level, 'lastLeakTime', now)
redis.call('EXPIRE', key, 60)
return 1 -- 成功
else
redis.call('HMSET', key, 'level', level, 'lastLeakTime', now)
redis.call('EXPIRE', key, 60)
return 0 -- 失败,桶满
endRedisLeakyBucket.java
public class RedisLeakyBucket {
private final RedisTemplate<String, String> redisTemplate;
private final String luaScript;
public RedisLeakyBucket(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
this.luaScript = loadScript("leaky_bucket.lua");
}
/**
* 尝试加入漏桶
* @param key 限流 key
* @param capacity 桶容量
* @param rate 漏出速率(每秒)
* @return 是否成功
*/
public boolean tryAdd(String key, long capacity, double rate) {
Long result = redisTemplate.execute(
new DefaultRedisScript<>(luaScript, Long.class),
List.of(key),
capacity,
rate,
System.currentTimeMillis(),
1
);
return result != null && result == 1;
}
/**
* 获取当前队列长度
*/
public long getCurrentLevel(String key) {
Object level = redisTemplate.opsForHash().get(key, "level");
return level != null ? Long.parseLong(level.toString()) : 0;
}
}#漏桶 vs 令牌桶选择
flowchart TD
A["需要限流"] --> B{"需要平滑输出?"}
B -->|"是| C["漏桶"]
B -->|"否| D{"需要突发?"}
D -->|"是| E["令牌桶"]
D -->|"否| F["两者皆可"]
C --> G["保护下游\nAPI 限流"]
E --> H["允许瞬时突发\n用户请求限流"]
F --> I["根据偏好选择"]| 场景 | 推荐算法 | 原因 |
|---|---|---|
| API 限流,保护下游服务 | 漏桶 | 必须平滑输出 |
| 用户请求限流 | 令牌桶 | 允许适当突发 |
| 秒杀场景 | 令牌桶 | 需要处理瞬时高峰 |
| 消息队列消费 | 漏桶 | 固定速率消费 |
#本章总结
核心要点:
- 漏桶以固定速率输出:不管请求来得有多快,输出始终平滑
- 漏桶内置队列:请求会排队等待,直到被处理
- 桶满则拒绝:超出容量的请求直接拒绝
- 适合保护下游:需要平滑输出的场景优先选择漏桶
- Redis 实现用 Lua 脚本:保证原子性