优雅关闭实现方案

上一节讲了优雅关闭的概念和配置,本节深入讲解各种场景下的完整实现方案。

Spring Boot 优雅关闭

完整配置

application.yml]
server:
  shutdown: graceful

spring:
  lifecycle:
    timeout-per-shutdown-phase: 30s

  task:
    execution:
      pool:
        core-size: 10
        max-size: 50
        queue-capacity: 100
        keep-alive: 60s
      thread-name-prefix: async-task-

异步任务优雅关闭

TaskExecutorShutdown.java]
@Configuration
public class AsyncConfig implements AsyncConfigurer {

    private ThreadPoolTaskExecutor executor;

    @Override
    public Executor getAsyncExecutor() {
        executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("async-task-");
        executor.setWaitForTasksToCompleteOnShutdown(true);  // 关键:等待任务完成
        executor.setAwaitTerminationSeconds(30);             // 等待超时
        executor.initialize();
        return executor;
    }

    @PreDestroy
    public void destroy() {
        if (executor != null) {
            executor.shutdown();
            try {
                if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
                    executor.shutdownNow();
                }
            } catch (InterruptedException e) {
                executor.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
    }
}

完整生命周期管理

GracefulShutdownManager.java]
@Component
@Slf4j
public class GracefulShutdownManager
        implements ApplicationListener<ContextClosedEvent> {

    @Autowired
    private TaskExecutor taskExecutor;

    @Autowired
    private DataSource dataSource;

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerRegistry;

    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    @Override
    public void onApplicationEvent(ContextClosedEvent event) {
        log.info("=== 开始优雅关闭 ===");
        long startTime = System.currentTimeMillis();

        try {
            // 第一步:停止接收新请求(Spring Boot 自动处理)

            // 第二步:停止 Kafka 消费者
            stopKafkaListeners();

            // 第三步:等待异步任务完成
            awaitAsyncTasks();

            // 第四步:关闭 Redis 连接池
            closeRedisConnections();

            // 第五步:关闭数据库连接池
            closeDatabaseConnections();

            // 第六步:刷出日志
            flushLogs();

        } catch (Exception e) {
            log.error("优雅关闭异常", e);
        }

        long duration = System.currentTimeMillis() - startTime;
        log.info("=== 优雅关闭完成,耗时 {} ms ===", duration);
    }

    private void stopKafkaListeners() {
        log.info("停止 Kafka 消费者...");
        kafkaListenerRegistry.getListenerContainers().forEach(container -> {
            container.stop();
        });
        log.info("Kafka 消费者已停止");
    }

    private void awaitAsyncTasks() {
        log.info("等待异步任务完成...");
        if (taskExecutor instanceof ThreadPoolTaskExecutor tp) {
            tp.shutdown();
            try {
                if (!tp.awaitTermination(30, TimeUnit.SECONDS)) {
                    log.warn("异步任务未能在 30 秒内完成");
                    tp.shutdownNow();
                }
            } catch (InterruptedException e) {
                tp.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
        log.info("异步任务已处理完成");
    }

    private void closeRedisConnections() {
        log.info("关闭 Redis 连接...");
        if (redisConnectionFactory instanceof LettuceConnectionFactory lcf) {
            lcf.getConnectionPool().evictAll();
        }
        log.info("Redis 连接已关闭");
    }

    private void closeDatabaseConnections() {
        log.info("关闭数据库连接池...");
        if (dataSource instanceof HikariDataSource hikari) {
            hikari.close();
        }
        log.info("数据库连接池已关闭");
    }

    private void flushLogs() {
        log.info("刷出日志缓冲区...");
        // Logback 会自动刷出,但可以强制调用
        ((LogbackLoggingSystem) LoggingSystem.get(ClassLoader.getSystemClassLoader()))
            .setLogLevel("INFO");
    }
}

消息队列优雅关闭

Kafka 消费者优雅关闭

KafkaGracefulShutdown.java]
@Configuration
public class KafkaConfig {

    @Bean
    public KafkaListenerContainerFactory<?>
            kafkaListenerContainerFactory(
                    ConcurrentKafkaListenerContainerFactory<String, String> factory) {
        // 设置并发为 1,方便优雅关闭
        factory.setConcurrency(1);

        // 手动确认模式
        factory.getContainerProperties().setAckMode(AckMode.MANUAL);

        // 监听器类型
        factory.getContainerProperties().setMessageListener(
            (AcknowledgingMessageListener<String, String>) (data, ack) -> {
                try {
                    processMessage(data);
                    ack.acknowledge();
                } catch (Exception e) {
                    log.error("处理消息失败", e);
                    // 可以选择重试或死信
                }
            }
        );

        return factory;
    }
}

@Component
@Slf4j
public class KafkaGracefulShutdown {

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @PreDestroy
    public void shutdown() {
        log.info("开始停止 Kafka 消费者...");

        // 暂停所有监听器
        registry.getListenerContainers().forEach(container -> {
            log.info("停止监听器: {}", container);
            container.stop();
        });

        log.info("Kafka 消费者已全部停止");
    }
}

RabbitMQ 优雅关闭

RabbitMQGracefulShutdown.java]
@Component
@Slf4j
public class RabbitMQGracefulShutdown {

    @Autowired
    private SimpleMessageListenerContainer container;

    @PreDestroy
    public void shutdown() {
        log.info("开始停止 RabbitMQ 消费者...");

        // 停止接收新消息
        container.stop();

        // 等待现有消息处理完成(最多 30 秒)
        boolean stopped = container.waitForConsumersToStop(30000);
        if (stopped) {
            log.info("RabbitMQ 消费者已停止,所有消息处理完成");
        } else {
            log.warn("RabbitMQ 消费者停止超时,可能有消息未处理");
        }
    }
}

数据库连接池优雅关闭

HikariCP 配置

hikari-config.yml]
spring:
  datasource:
    hikari:
      # 连接池名称
      pool-name: OrderServiceHikariCP

      # 最大连接数
      maximum-pool-size: 20

      # 最小空闲连接
      minimum-idle: 5

      # 连接超时
      connection-timeout: 30000

      # 空闲超时
      idle-timeout: 600000

      # 最大生命周期
      max-lifetime: 1800000

      # 连接泄漏检测
      leak-detection-threshold: 60000

连接池关闭

DatabaseConnectionShutdown.java]
@Component
@Slf4j
public class DatabaseConnectionShutdown {

    @Autowired
    private DataSource dataSource;

    @PreDestroy
    public void closeDataSource() {
        log.info("关闭数据库连接池...");

        if (dataSource instanceof HikariDataSource hikari) {
            // 先拒绝新请求
            hikari.setPoolName("CLOSING-POOL");

            // 等待活跃连接完成
            log.info("活跃连接数: {}", hikari.getHikariPoolMXBean().getActiveConnections());
            log.info("等待连接数: {}", hikari.getHikariPoolMXBean().getThreadsAwaitingConnection());

            // 关闭连接池
            hikari.close();

            log.info("数据库连接池已关闭");
        }
    }
}

线程池优雅关闭

多种关闭策略

ExecutorServiceShutdown.java]
public class ExecutorServiceShutdown {

    /**
     * 策略一:温柔关闭(推荐)
     * 不再接受新任务,等待现有任务完成
     */
    public void gracefulShutdown(ExecutorService executor, long timeoutSeconds) {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) {
                executor.shutdownNow();
                if (!executor.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) {
                    log.error("ExecutorService 未能在 {} 秒内终止", timeoutSeconds);
                }
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    /**
     * 策略二:强制关闭
     * 立即中断所有任务
     */
    public void forceShutdown(ExecutorService executor) {
        executor.shutdownNow();
        // 设置中断标志
        // 注意:任务需要正确处理 InterruptedException
    }

    /**
     * 策略三:有序关闭
     * 按优先级顺序关闭不同的线程池
     */
    public void orderlyShutdown(List<ExecutorService> executors, long timeoutSeconds) {
        // 第一批:停止接收新任务
        executors.forEach(ExecutorService::shutdown);

        // 第二批:等待一段时间
        long deadline = System.currentTimeMillis() + timeoutSeconds * 1000;

        // 第三批:逐个强制关闭
        for (ExecutorService executor : executors) {
            long remaining = deadline - System.currentTimeMillis();
            if (remaining <= 0) {
                executor.shutdownNow();
            } else {
                try {
                    if (!executor.awaitTermination(remaining, TimeUnit.MILLISECONDS)) {
                        executor.shutdownNow();
                    }
                } catch (InterruptedException e) {
                    executor.shutdownNow();
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

健康检查配合

关闭期间返回 unhealthy

ReadinessGateShutdown.java]
@Component
public class ReadinessGateShutdown
        implements ApplicationListener<ContextClosedEvent>,
                   ReactiveHealthIndicator {

    private volatile boolean shuttingDown = false;

    @Override
    public void onApplicationEvent(ContextClosedEvent event) {
        shuttingDown = true;
        log.info("开始关闭,Ready 状态将返回 unhealthy");
    }

    @Override
    public Mono<Health> health() {
        if (shuttingDown) {
            return Mono.just(
                Health.down()
                    .withDetail("reason", "Application is shutting down")
                    .build()
            );
        }

        return Mono.just(Health.up().build());
    }
}

集成测试

GracefulShutdownTest.java]
@SpringBootTest
class GracefulShutdownTest {

    @Autowired
    private TestRestTemplate restTemplate;

    @Test
    void testInFlightRequestsComplete() throws Exception {
        // 启动一个长时间请求
        CompletableFuture<String> responseFuture = CompletableFuture.supplyAsync(() -> {
            restTemplate.getForObject("/api/long-running", String.class);
            return "done";
        });

        // 触发关闭
        context.close();

        // 验证长时间请求完成
        String result = responseFuture.get(60, TimeUnit.SECONDS);
        assertThat(result).isEqualTo("done");
    }
}

质量判断标准

一篇「优雅关闭实现方案」的文章是否达标,要看它是否回答了:

  1. ✅ Spring Boot 异步任务的优雅关闭?
  2. ✅ 消息队列(Kafka/RabbitMQ)的优雅关闭?
  3. ✅ 数据库连接池的优雅关闭?
  4. ✅ 线程池的多种关闭策略?
  5. ✅ 如何配合健康检查?
  6. ❌ 只有简单示例,没有完整场景——不达标

本章总结

核心要点

  1. Spring Boot 需要配置 shutdown=graceful 和超时时间
  2. 线程池的 waitForTasksToCompleteOnShutdown=true 是关键
  3. 消息队列要等待消息处理完成再关闭
  4. 数据库连接池要在最后关闭
  5. 关闭顺序:入口 → 消息 → 任务 → 资源 → 数据库
  6. 健康检查在关闭期间应返回 unhealthy