#优雅关闭实现方案
上一节讲了优雅关闭的概念和配置,本节深入讲解各种场景下的完整实现方案。
#Spring Boot 优雅关闭
#完整配置
application.yml]
server:
shutdown: graceful
spring:
lifecycle:
timeout-per-shutdown-phase: 30s
task:
execution:
pool:
core-size: 10
max-size: 50
queue-capacity: 100
keep-alive: 60s
thread-name-prefix: async-task-#异步任务优雅关闭
TaskExecutorShutdown.java]
@Configuration
public class AsyncConfig implements AsyncConfigurer {
private ThreadPoolTaskExecutor executor;
@Override
public Executor getAsyncExecutor() {
executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("async-task-");
executor.setWaitForTasksToCompleteOnShutdown(true); // 关键:等待任务完成
executor.setAwaitTerminationSeconds(30); // 等待超时
executor.initialize();
return executor;
}
@PreDestroy
public void destroy() {
if (executor != null) {
executor.shutdown();
try {
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
}#完整生命周期管理
GracefulShutdownManager.java]
@Component
@Slf4j
public class GracefulShutdownManager
implements ApplicationListener<ContextClosedEvent> {
@Autowired
private TaskExecutor taskExecutor;
@Autowired
private DataSource dataSource;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerRegistry;
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Override
public void onApplicationEvent(ContextClosedEvent event) {
log.info("=== 开始优雅关闭 ===");
long startTime = System.currentTimeMillis();
try {
// 第一步:停止接收新请求(Spring Boot 自动处理)
// 第二步:停止 Kafka 消费者
stopKafkaListeners();
// 第三步:等待异步任务完成
awaitAsyncTasks();
// 第四步:关闭 Redis 连接池
closeRedisConnections();
// 第五步:关闭数据库连接池
closeDatabaseConnections();
// 第六步:刷出日志
flushLogs();
} catch (Exception e) {
log.error("优雅关闭异常", e);
}
long duration = System.currentTimeMillis() - startTime;
log.info("=== 优雅关闭完成,耗时 {} ms ===", duration);
}
private void stopKafkaListeners() {
log.info("停止 Kafka 消费者...");
kafkaListenerRegistry.getListenerContainers().forEach(container -> {
container.stop();
});
log.info("Kafka 消费者已停止");
}
private void awaitAsyncTasks() {
log.info("等待异步任务完成...");
if (taskExecutor instanceof ThreadPoolTaskExecutor tp) {
tp.shutdown();
try {
if (!tp.awaitTermination(30, TimeUnit.SECONDS)) {
log.warn("异步任务未能在 30 秒内完成");
tp.shutdownNow();
}
} catch (InterruptedException e) {
tp.shutdownNow();
Thread.currentThread().interrupt();
}
}
log.info("异步任务已处理完成");
}
private void closeRedisConnections() {
log.info("关闭 Redis 连接...");
if (redisConnectionFactory instanceof LettuceConnectionFactory lcf) {
lcf.getConnectionPool().evictAll();
}
log.info("Redis 连接已关闭");
}
private void closeDatabaseConnections() {
log.info("关闭数据库连接池...");
if (dataSource instanceof HikariDataSource hikari) {
hikari.close();
}
log.info("数据库连接池已关闭");
}
private void flushLogs() {
log.info("刷出日志缓冲区...");
// Logback 会自动刷出,但可以强制调用
((LogbackLoggingSystem) LoggingSystem.get(ClassLoader.getSystemClassLoader()))
.setLogLevel("INFO");
}
}#消息队列优雅关闭
#Kafka 消费者优雅关闭
KafkaGracefulShutdown.java]
@Configuration
public class KafkaConfig {
@Bean
public KafkaListenerContainerFactory<?>
kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
// 设置并发为 1,方便优雅关闭
factory.setConcurrency(1);
// 手动确认模式
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
// 监听器类型
factory.getContainerProperties().setMessageListener(
(AcknowledgingMessageListener<String, String>) (data, ack) -> {
try {
processMessage(data);
ack.acknowledge();
} catch (Exception e) {
log.error("处理消息失败", e);
// 可以选择重试或死信
}
}
);
return factory;
}
}
@Component
@Slf4j
public class KafkaGracefulShutdown {
@Autowired
private KafkaListenerEndpointRegistry registry;
@PreDestroy
public void shutdown() {
log.info("开始停止 Kafka 消费者...");
// 暂停所有监听器
registry.getListenerContainers().forEach(container -> {
log.info("停止监听器: {}", container);
container.stop();
});
log.info("Kafka 消费者已全部停止");
}
}#RabbitMQ 优雅关闭
RabbitMQGracefulShutdown.java]
@Component
@Slf4j
public class RabbitMQGracefulShutdown {
@Autowired
private SimpleMessageListenerContainer container;
@PreDestroy
public void shutdown() {
log.info("开始停止 RabbitMQ 消费者...");
// 停止接收新消息
container.stop();
// 等待现有消息处理完成(最多 30 秒)
boolean stopped = container.waitForConsumersToStop(30000);
if (stopped) {
log.info("RabbitMQ 消费者已停止,所有消息处理完成");
} else {
log.warn("RabbitMQ 消费者停止超时,可能有消息未处理");
}
}
}#数据库连接池优雅关闭
#HikariCP 配置
hikari-config.yml]
spring:
datasource:
hikari:
# 连接池名称
pool-name: OrderServiceHikariCP
# 最大连接数
maximum-pool-size: 20
# 最小空闲连接
minimum-idle: 5
# 连接超时
connection-timeout: 30000
# 空闲超时
idle-timeout: 600000
# 最大生命周期
max-lifetime: 1800000
# 连接泄漏检测
leak-detection-threshold: 60000#连接池关闭
DatabaseConnectionShutdown.java]
@Component
@Slf4j
public class DatabaseConnectionShutdown {
@Autowired
private DataSource dataSource;
@PreDestroy
public void closeDataSource() {
log.info("关闭数据库连接池...");
if (dataSource instanceof HikariDataSource hikari) {
// 先拒绝新请求
hikari.setPoolName("CLOSING-POOL");
// 等待活跃连接完成
log.info("活跃连接数: {}", hikari.getHikariPoolMXBean().getActiveConnections());
log.info("等待连接数: {}", hikari.getHikariPoolMXBean().getThreadsAwaitingConnection());
// 关闭连接池
hikari.close();
log.info("数据库连接池已关闭");
}
}
}#线程池优雅关闭
#多种关闭策略
ExecutorServiceShutdown.java]
public class ExecutorServiceShutdown {
/**
* 策略一:温柔关闭(推荐)
* 不再接受新任务,等待现有任务完成
*/
public void gracefulShutdown(ExecutorService executor, long timeoutSeconds) {
executor.shutdown();
try {
if (!executor.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) {
executor.shutdownNow();
if (!executor.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) {
log.error("ExecutorService 未能在 {} 秒内终止", timeoutSeconds);
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
/**
* 策略二:强制关闭
* 立即中断所有任务
*/
public void forceShutdown(ExecutorService executor) {
executor.shutdownNow();
// 设置中断标志
// 注意:任务需要正确处理 InterruptedException
}
/**
* 策略三:有序关闭
* 按优先级顺序关闭不同的线程池
*/
public void orderlyShutdown(List<ExecutorService> executors, long timeoutSeconds) {
// 第一批:停止接收新任务
executors.forEach(ExecutorService::shutdown);
// 第二批:等待一段时间
long deadline = System.currentTimeMillis() + timeoutSeconds * 1000;
// 第三批:逐个强制关闭
for (ExecutorService executor : executors) {
long remaining = deadline - System.currentTimeMillis();
if (remaining <= 0) {
executor.shutdownNow();
} else {
try {
if (!executor.awaitTermination(remaining, TimeUnit.MILLISECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
}
}#健康检查配合
#关闭期间返回 unhealthy
ReadinessGateShutdown.java]
@Component
public class ReadinessGateShutdown
implements ApplicationListener<ContextClosedEvent>,
ReactiveHealthIndicator {
private volatile boolean shuttingDown = false;
@Override
public void onApplicationEvent(ContextClosedEvent event) {
shuttingDown = true;
log.info("开始关闭,Ready 状态将返回 unhealthy");
}
@Override
public Mono<Health> health() {
if (shuttingDown) {
return Mono.just(
Health.down()
.withDetail("reason", "Application is shutting down")
.build()
);
}
return Mono.just(Health.up().build());
}
}#集成测试
GracefulShutdownTest.java]
@SpringBootTest
class GracefulShutdownTest {
@Autowired
private TestRestTemplate restTemplate;
@Test
void testInFlightRequestsComplete() throws Exception {
// 启动一个长时间请求
CompletableFuture<String> responseFuture = CompletableFuture.supplyAsync(() -> {
restTemplate.getForObject("/api/long-running", String.class);
return "done";
});
// 触发关闭
context.close();
// 验证长时间请求完成
String result = responseFuture.get(60, TimeUnit.SECONDS);
assertThat(result).isEqualTo("done");
}
}#质量判断标准
一篇「优雅关闭实现方案」的文章是否达标,要看它是否回答了:
- ✅ Spring Boot 异步任务的优雅关闭?
- ✅ 消息队列(Kafka/RabbitMQ)的优雅关闭?
- ✅ 数据库连接池的优雅关闭?
- ✅ 线程池的多种关闭策略?
- ✅ 如何配合健康检查?
- ❌ 只有简单示例,没有完整场景——不达标
#本章总结
核心要点:
- Spring Boot 需要配置 shutdown=graceful 和超时时间
- 线程池的 waitForTasksToCompleteOnShutdown=true 是关键
- 消息队列要等待消息处理完成再关闭
- 数据库连接池要在最后关闭
- 关闭顺序:入口 → 消息 → 任务 → 资源 → 数据库
- 健康检查在关闭期间应返回 unhealthy