隔板模式实现

本节深入讲解隔板模式的三种实现方式:线程池隔离、信号量隔离和分布式隔离,以及各自的监控和调优。

三种实现方式对比

实现方式隔离性资源开销适用场景复杂度
线程池隔离完全隔离高(每个池占用栈内存)异步调用
信号量隔离部分隔离低(仅计数器)同步调用
分布式隔离逻辑隔离高(需要中心协调)跨服务隔离

线程池隔离实现

完整的线程池隔离服务

ThreadPoolIsolationService.java
@Service
public class ThreadPoolIsolationService {

    private final Map<String, ExecutorService> pools = new ConcurrentHashMap<>();

    @PostConstruct
    public void init() {
        // 用户服务:高性能需求,线程池稍大
        pools.put("user-service", createPool(
            PoolConfig.builder()
                .name("user-pool")
                .coreSize(10)
                .maxSize(20)
                .queueCapacity(100)
                .keepAliveSeconds(60)
                .rejectedPolicy(RejectedExecutionHandler.CallerRunsPolicy)
                .build()
        ));

        // 支付服务:关键服务,线程池适中,队列较短
        pools.put("payment-service", createPool(
            PoolConfig.builder()
                .name("payment-pool")
                .coreSize(5)
                .maxSize(10)
                .queueCapacity(20)
                .keepAliveSeconds(60)
                .rejectedPolicy(new AlertingRejectedPolicy("payment-pool"))
                .build()
        ));

        // 推荐服务:非关键,线程池较小
        pools.put("recommendation-service", createPool(
            PoolConfig.builder()
                .name("rec-pool")
                .coreSize(2)
                .maxSize(5)
                .queueCapacity(10)
                .keepAliveSeconds(30)
                .rejectedPolicy(RejectedExecutionHandler.CallerRunsPolicy)
                .build()
        ));
    }

    private ExecutorService createPool(PoolConfig config) {
        return new ThreadPoolExecutor(
            config.getCoreSize(),
            config.getMaxSize(),
            config.getKeepAliveSeconds(),
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(config.getQueueCapacity()),
            new NamedThreadFactory(config.getName()),
            config.getRejectedPolicy()
        );
    }

    public <T> CompletableFuture<T> executeAsync(String service, Supplier<T> task) {
        ExecutorService pool = pools.get(service);
        if (pool == null) {
            throw new IllegalArgumentException("Unknown service: " + service);
        }
        return CompletableFuture.supplyAsync(task, pool);
    }

    public Map<String, PoolStats> getStats() {
        Map<String, PoolStats> stats = new HashMap<>();
        pools.forEach((name, pool) -> {
            if (pool instanceof ThreadPoolExecutor tpe) {
                stats.put(name, new PoolStats(
                    tpe.getActiveCount(),
                    tpe.getPoolSize(),
                    tpe.getQueue().size(),
                    tpe.getCompletedTaskCount(),
                    tpe.getLargestPoolSize()
                ));
            }
        });
        return stats;
    }

    @PreDestroy
    public void shutdown() {
        pools.forEach((name, pool) -> {
            log.info("关闭线程池: {}", name);
            pool.shutdown();
            try {
                if (!pool.awaitTermination(30, TimeUnit.SECONDS)) {
                    pool.shutdownNow();
                }
            } catch (InterruptedException e) {
                pool.shutdownNow();
                Thread.currentThread().interrupt();
            }
        });
    }
}

@Data
@Builder
class PoolConfig {
    String name;
    int coreSize;
    int maxSize;
    int queueCapacity;
    long keepAliveSeconds;
    RejectedExecutionHandler rejectedPolicy;
}

record PoolStats(
    int activeCount,
    int poolSize,
    int queueSize,
    long completedTaskCount,
    int largestPoolSize
) {}

带监控的线程池

MonitoredThreadPool.java]
public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {

    private final String name;
    private final MeterRegistry meterRegistry;

    public MonitoredThreadPoolExecutor(
            String name,
            int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue,
            MeterRegistry meterRegistry) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.name = name;
        this.meterRegistry = meterRegistry;

        // 注册指标
        Gauge.builder(name + ".active", this, ThreadPoolExecutor::getActiveCount)
            .register(meterRegistry);
        Gauge.builder(name + ".pool.size", this, ThreadPoolExecutor::getPoolSize)
            .register(meterRegistry);
        Gauge.builder(name + ".queue.size", this, e -> e.getQueue().size())
            .register(meterRegistry);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        // 记录执行时间
    }
}

信号量隔离实现

基于信号量的并发控制

SemaphoreBulkhead.java]
@Service
public class SemaphoreBulkheadService {

    // 每个服务独立的信号量
    private final Map<String, Semaphore> semaphores = new ConcurrentHashMap<>();

    // 共享线程池(用于执行任务)
    private final ExecutorService sharedPool = Executors.newFixedThreadPool(20);

    // 等待超时配置
    private static final long WAIT_TIMEOUT_MS = 100;

    @PostConstruct
    public void init() {
        // 用户服务:最多 10 个并发
        semaphores.put("user-service", new Semaphore(10));

        // 支付服务:最多 5 个并发(关键服务,限制更严)
        semaphores.put("payment-service", new Semaphore(5));

        // 推荐服务:最多 3 个并发
        semaphores.put("recommendation-service", new Semaphore(3));
    }

    public <T> T execute(String service, Callable<T> task) throws BulkheadException {
        Semaphore semaphore = semaphores.get(service);
        if (semaphore == null) {
            throw new IllegalArgumentException("Unknown service: " + service);
        }

        // 尝试获取许可
        boolean acquired = false;
        try {
            acquired = semaphore.tryAcquire(WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
            if (!acquired) {
                throw new BulkheadException("Bulkhead is full: " + service);
            }

            // 在共享线程池中执行任务
            Future<T> future = sharedPool.submit(task);
            return future.get(5, TimeUnit.SECONDS);  // 任务超时

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new BulkheadException("Bulkhead interrupted: " + service);
        } catch (TimeoutException e) {
            throw new BulkheadException("Bulkhead task timeout: " + service);
        } catch (ExecutionException e) {
            throw new BulkheadException("Bulkhead execution failed: " + service, e);
        } finally {
            if (acquired) {
                semaphore.release();
            }
        }
    }

    // 获取统计信息
    public Map<String, BulkheadStats> getStats() {
        Map<String, BulkheadStats> stats = new HashMap<>();
        semaphores.forEach((name, semaphore) -> {
            stats.put(name, new BulkheadStats(
                semaphore.availablePermits(),
                semaphore.getQueueLength(),
                10 - semaphore.availablePermits()  // 假设总许可数
            ));
        });
        return stats;
    }
}

record BulkheadStats(
    int availablePermits,
    int queueLength,
    int usedPermits
) {}

Resilience4j 信号量舱壁

Resilience4jSemaphoreBulkhead.java]
@Service
public class Resilience4jSemaphoreBulkhead {

    private final BulkheadRegistry registry;

    public Resilience4jSemaphoreBulkhead() {
        // 创建默认配置
        BulkheadConfig defaultConfig = BulkheadConfig.custom()
            .maxConcurrentCalls(10)           // 最大并发调用数
            .maxWaitDuration(Duration.ofMillis(100))  // 等待超时
            .build();

        this.registry = BulkheadRegistry.of(defaultConfig);
    }

    public <T> T executeWithBulkhead(String service, Callable<T> task) {
        Bulkhead bulkhead = registry.bulkhead(service);

        return Decorators.ofCallable(() -> task.call())
            .withBulkhead(bulkhead)
            .withFallback(
                List.of(BulkheadFullException.class),
                e -> {
                    log.warn("Bulkhead [{}] 已满,降级返回", service);
                    return getFallbackResult(service);
                }
            )
            .decorate()
            .call();
    }
}

分布式隔离实现

基于 Redis 的分布式舱壁

distributed_bulkhead.lua]
-- 分布式舱壁 Lua 脚本
-- KEYS[1]: bulkhead key (e.g., "bulkhead:payment-service")
-- ARGV[1]: 最大并发数
-- ARGV[2]: 过期时间(秒)

local key = KEYS[1]
local maxConcurrent = tonumber(ARGV[1])
local ttl = tonumber(ARGV[2])

-- 获取当前并发数
local current = tonumber(redis.call('GET', key) or '0')

if current >= maxConcurrent then
    -- 超过限制,拒绝
    return 0, current
end

-- 增加计数
redis.call('INCR', key)
redis.call('EXPIRE', key, ttl)

return 1, current + 1
DistributedBulkheadService.java]
@Service
public class DistributedBulkheadService {

    private final RedisTemplate<String, String> redisTemplate;
    private final Map<String, RedisScript<Long>> scripts = new ConcurrentHashMap<>();

    public DistributedBulkheadService(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public boolean tryAcquire(String service, int maxConcurrent, int ttlSeconds) {
        String key = "bulkhead:" + service;

        Long result = redisTemplate.execute(
            getScript(),
            List.of(key),
            String.valueOf(maxConcurrent),
            String.valueOf(ttlSeconds)
        );

        return result != null && result == 1;
    }

    public void release(String service) {
        String key = "bulkhead:" + service;
        redisTemplate.opsForValue().decrement(key);
    }

    private RedisScript<Long> getScript() {
        return scripts.computeIfAbsent("bulkhead",
            k -> new DefaultRedisScript<>(BULKHEAD_SCRIPT, Long.class));
    }
}

线程池配置规范

不同服务的配置建议

服务类型核心线程数最大线程数队列容量拒绝策略说明
核心业务CPU 核心数CPU 核心数 × 2100~200CallerRunsPolicy队列缓冲请求
支付/金融CPU 核心数CPU 核心数20~50AbortPolicy + 告警严格控制,不堆积
异步任务1~2CPU 核心数1000+CallerRunsPolicy可堆积大量任务
非关键推荐1~22~510~20DiscardPolicy允许丢弃
批量处理批量大小批量大小0CallerRunsPolicy无队列,同步执行

线程池参数计算方法

def calculate_thread_pool(core_size: int, max_size: int, queue_capacity: int):
    """
    线程池参数计算

    核心公式:
    - 核心线程数 = CPU 核心数 × 期望 CPU 利用率 × (1 + 等待时间/计算时间)
    - 最大线程数 = CPU 核心数 × 最大 CPU 利用率
    - 队列容量 = 核心线程数 × 期望队列缓冲时间 / 平均任务时间
    """
    cpu_cores = Runtime.getRuntime().availableProcessors()

    # IO 密集型:等待时间长,线程数可以设置较大
    io_intensive_core = cpu_cores * 2
    io_intensive_max = cpu_cores * 4

    # CPU 密集型:计算时间长,线程数接近 CPU 核心数
    cpu_intensive_core = cpu_cores
    cpu_intensive_max = cpu_cores + 1

    return {
        "core_size": io_intensive_core,
        "max_size": io_intensive_max,
        "queue_capacity": io_intensive_core * 100
    }

监控与告警

bulkhead-monitoring.yaml]
# Prometheus 告警规则
groups:
- name: bulkhead-alerts
  rules:
  # 线程池接近满载
  - alert: BulkheadNearCapacity
    expr: |
      (bulkhead_active_count / bulkhead_max_concurrent) > 0.8
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "Bulkhead {{ $labels.name }} 使用率超过 80%"

  # 线程池拒绝
  - alert: BulkheadRejected
    expr: rate(bulkhead_rejected_total[5m]) > 0
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "Bulkhead {{ $labels.name }} 有请求被拒绝"

  # 队列堆积
  - alert: BulkheadQueueBacklog
    expr: bulkhead_queue_size > 100
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "Bulkhead {{ $labels.name }} 队列堆积"

质量判断标准

一篇「隔板模式实现」的文章是否达标,要看它是否回答了:

  1. ✅ 三种实现方式的对比(线程池/信号量/分布式)?
  2. ✅ 线程池隔离的完整实现代码?
  3. ✅ 信号量隔离的实现代码?
  4. ✅ 分布式隔离的实现(Redis Lua)?
  5. ✅ 不同服务的线程池配置建议?
  6. ❌ 只有简单代码片段,没有完整实现——不达标

本章总结

核心要点

  1. 线程池隔离适合异步调用:完全隔离,但资源开销大
  2. 信号量隔离适合同步调用:资源开销小,但线程不隔离
  3. 分布式隔离适合跨服务场景:需要 Redis 等中心化存储
  4. 线程池配置要差异化:关键服务和一般服务配置不同
  5. 监控告警不可少:线程池满载、请求拒绝等都要及时告警