漏桶算法实现

漏桶算法是另一种经典限流算法。与令牌桶不同,漏桶以固定速率输出,不管请求来得有多快。

漏桶的核心思想:有一个桶,请求像水一样流入桶中,桶底有一个洞,水以固定速率从洞里漏出去。当桶满了,新来的水就会溢出(被拒绝)。

漏桶 vs 令牌桶

特性漏桶令牌桶
输出速率固定速率可变(有令牌就输出)
突发流量不支持,会被整形支持,可一次性取多个令牌
队列行为有队列,请求排队等待无队列,直接拒绝或阻塞
实现复杂度中等中等
适用场景需要平滑输出的场景需要允许瞬时突发的场景
flowchart LR
    subgraph 令牌桶
        A["请求"] --> B["检查令牌"]
        B --> |"有令牌| C["放行"]
        B --> |"无令牌| D["拒绝"]
        E["令牌补充"] --> F["令牌桶"]
    end

    subgraph 漏桶
        G["请求"] --> H["进入队列"]
        H --> |"队列| I["漏桶"]
        I --> |"固定速率| J["放行"]
        H --> |"队列满| K["拒绝"]
    end

选择建议

  • 需要平滑输出(如 API 限流保护下游):用漏桶
  • 需要允许突发(如用户请求):用令牌桶

漏桶原理

核心变量

变量说明
capacity桶的容量(最大队列长度)
rate漏出速率(每秒处理多少请求)
level当前桶中的请求数
lastLeakTime上次漏出的时间

核心公式

当前水位 = min(capacity, level + (now - lastLeakTime) × rate)

漏桶实现

基础实现

LeakyBucket.java
public class LeakyBucket {

    private final long capacity;       // 桶容量(最大队列长度)
    private final double rate;         // 漏出速率(每秒处理请求数)
    private volatile long level;       // 当前桶中的请求数
    private volatile long lastLeakTime; // 上次漏出时间

    public LeakyBucket(long capacity, double rate) {
        this.capacity = capacity;
        this.rate = rate;
        this.level = 0;
        this.lastLeakTime = System.nanoTime();
    }

    public synchronized boolean tryAdd() {
        leak(); // 先漏掉一些

        if (level < capacity) {
            level++; // 加入一个请求
            return true;
        }
        return false; // 桶满,拒绝
    }

    public synchronized boolean tryAdd(long timeoutMs) throws InterruptedException {
        long deadline = System.currentTimeMillis() + timeoutMs;

        while (level >= capacity) {
            long waitTime = calculateWaitTime();
            if (waitTime > 0) {
                long waitUntil = Math.min(waitTime, deadline - System.currentTimeMillis());
                if (waitUntil <= 0) {
                    return false; // 超时
                }
                wait(waitUntil);
            }
            leak();
        }

        level++;
        return true;
    }

    private void leak() {
        long now = System.nanoTime();
        long elapsed = now - lastLeakTime;

        if (elapsed > 0) {
            // 计算应该漏掉的请求数
            double leaked = (elapsed / 1_000_000_000.0) * rate;
            level = Math.max(0, level - (long) leaked);
            lastLeakTime = now;
        }
    }

    private long calculateWaitTime() {
        if (level < capacity) {
            return 0;
        }
        // 需要等待多久才能漏掉一个请求
        return (long) ((1.0 / rate) * 1_000);
    }

    public long getCurrentLevel() {
        leak(); // 先漏掉一些再返回
        return level;
    }
}

无锁实现

LockFreeLeakyBucket.java
public class LockFreeLeakyBucket {

    private final long capacity;
    private final double rate;

    private volatile long level;          // 当前水位
    private volatile long lastLeakTime;  // 上次漏水时间

    private static final AtomicLongFieldUpdater<LockFreeLeakyBucket> LEVEL_UPDATER =
        AtomicLongFieldUpdater.newUpdater(LockFreeLeakyBucket.class, "level");

    public LockFreeLeakyBucket(long capacity, double rate) {
        this.capacity = capacity;
        this.rate = rate;
        this.level = 0;
        this.lastLeakTime = System.nanoTime();
    }

    public boolean tryAcquire() {
        while (true) {
            long currentLevel = level;
            leak();

            if (currentLevel < capacity) {
                if (LEVEL_UPDATER.compareAndSet(this, currentLevel, currentLevel + 1)) {
                    return true;
                }
                // CAS 失败,重试
                continue;
            }
            return false;
        }
    }

    private void leak() {
        long now = System.nanoTime();
        long lastTime = lastLeakTime;
        long elapsed = now - lastTime;

        if (elapsed > 0) {
            long toLeak = (long) ((elapsed / 1_000_000_000.0) * rate);
            if (toLeak > 0) {
                long newLevel = Math.max(0, level - toLeak);
                level = newLevel;
                lastLeakTime = now;
            }
        }
    }
}

漏桶的队列行为

漏桶内置队列,请求会排队等待:

LeakyBucketWithQueue.java
public class LeakyBucketWithQueue {

    private final long capacity;
    private final double rate;
    private final Queue<Request> queue;

    private volatile long lastLeakTime;
    private final ScheduledExecutorService scheduler;

    public LeakyBucketWithQueue(long capacity, double rate) {
        this.capacity = capacity;
        this.rate = rate;
        this.queue = new ConcurrentLinkedQueue<>();
        this.lastLeakTime = System.currentTimeMillis();
        this.scheduler = Executors.newSingleThreadScheduledExecutor();

        // 启动漏水线程
        scheduler.scheduleAtFixedRate(this::leak, 0, (long) (1000 / rate), TimeUnit.MILLISECONDS);
    }

    public CompletableFuture<Result> submit(Request request) {
        CompletableFuture<Result> future = new CompletableFuture<>();

        if (queue.size() >= capacity) {
            future.completeExceptionally(new RateLimitException("队列已满"));
            return future;
        }

        queue.offer(new QueuedRequest(request, future));

        return future;
    }

    private void leak() {
        long now = System.currentTimeMillis();
        long elapsed = now - lastLeakTime;

        if (elapsed >= (1000 / rate)) {
            QueuedRequest request = queue.poll();
            if (request != null) {
                try {
                    Result result = process(request.getRequest());
                    request.getFuture().complete(result);
                } catch (Exception e) {
                    request.getFuture().completeExceptionally(e);
                }
            }
            lastLeakTime = now;
        }
    }

    public void shutdown() {
        scheduler.shutdown();
    }
}

Redis 漏桶实现

leaky_bucket.lua
-- Redis 漏桶 Lua 脚本
-- KEYS[1]: 限流 key
-- ARGV[1]: 桶容量
-- ARGV[2]: 漏出速率(每秒)
-- ARGV[3]: 当前时间戳(毫秒)
-- ARGV[4]: 请求需要的空间

local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

-- 获取当前状态
local bucket = redis.call('HMGET', key, 'level', 'lastLeakTime')
local level = tonumber(bucket[1]) or 0
local lastLeakTime = tonumber(bucket[2]) or now

-- 计算应该漏掉多少
local elapsed = (now - lastLeakTime) / 1000.0  -- 转换为秒
local leaked = elapsed * rate
level = math.max(0, level - leaked)

-- 检查是否有足够空间
if level + requested <= capacity then
    level = level + requested
    redis.call('HMSET', key, 'level', level, 'lastLeakTime', now)
    redis.call('EXPIRE', key, 60)
    return 1  -- 成功
else
    redis.call('HMSET', key, 'level', level, 'lastLeakTime', now)
    redis.call('EXPIRE', key, 60)
    return 0  -- 失败,桶满
end
RedisLeakyBucket.java
public class RedisLeakyBucket {

    private final RedisTemplate<String, String> redisTemplate;
    private final String luaScript;

    public RedisLeakyBucket(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
        this.luaScript = loadScript("leaky_bucket.lua");
    }

    /**
     * 尝试加入漏桶
     * @param key 限流 key
     * @param capacity 桶容量
     * @param rate 漏出速率(每秒)
     * @return 是否成功
     */
    public boolean tryAdd(String key, long capacity, double rate) {
        Long result = redisTemplate.execute(
            new DefaultRedisScript<>(luaScript, Long.class),
            List.of(key),
            capacity,
            rate,
            System.currentTimeMillis(),
            1
        );
        return result != null && result == 1;
    }

    /**
     * 获取当前队列长度
     */
    public long getCurrentLevel(String key) {
        Object level = redisTemplate.opsForHash().get(key, "level");
        return level != null ? Long.parseLong(level.toString()) : 0;
    }
}

漏桶 vs 令牌桶选择

flowchart TD
    A["需要限流"] --> B{"需要平滑输出?"}
    B -->|"是| C["漏桶"]
    B -->|"否| D{"需要突发?"}
    D -->|"是| E["令牌桶"]
    D -->|"否| F["两者皆可"]

    C --> G["保护下游\nAPI 限流"]
    E --> H["允许瞬时突发\n用户请求限流"]
    F --> I["根据偏好选择"]
场景推荐算法原因
API 限流,保护下游服务漏桶必须平滑输出
用户请求限流令牌桶允许适当突发
秒杀场景令牌桶需要处理瞬时高峰
消息队列消费漏桶固定速率消费

本章总结

核心要点

  1. 漏桶以固定速率输出:不管请求来得有多快,输出始终平滑
  2. 漏桶内置队列:请求会排队等待,直到被处理
  3. 桶满则拒绝:超出容量的请求直接拒绝
  4. 适合保护下游:需要平滑输出的场景优先选择漏桶
  5. Redis 实现用 Lua 脚本:保证原子性