Future/Promise 异步模式
想象你去一家网红餐厅吃饭,领位员说「前面还有 15 桌,您先逛逛商场,轮到您时会收到短信」。你不需要站在餐厅门口傻等,可以在商场里做任何事——这就是 Future 的隐喻:结果还没出来,但你知道它会来。
传统的同步调用像是在餐厅门口站着等,翻台了才轮到你。Future 则把「等待」变成了「异步通知」。
Future 接口:异步结果的占位符
Future 是 Java 5 引入的接口,代表一个异步计算的结果:
public interface Future<V> {
// 获取结果,会阻塞直到结果可用
V get() throws InterruptedException, ExecutionException;
// 带超时获取结果
V get(long timeout, TimeUnit unit) throws ...;
// 判断任务是否完成
boolean isDone();
// 取消任务
boolean cancel(boolean mayInterruptIfRunning);
}
基本使用:
ExecutorService executor = Executors.newFixedThreadPool(10);
// 提交一个 Callable,返回 Future
Future<Integer> future = executor.submit(() -> {
// 模拟耗时操作
Thread.sleep(2000);
return 42;
});
// 主线程可以干其他事
System.out.println("做点别的事...");
// 需要结果时调用 get(),会阻塞
Integer result = future.get();
System.out.println("结果是:" + result);
Future 的局限性
Future 解决了「获取异步结果」的问题,但它的能力很有限:
- 无法链式调用。拿到结果后想继续异步处理,没法直接 chain。
- 无法组合多个 Future。等 A、B 都完成了再执行 C,做不到。
- 无法手动完成。无法让别人帮你 set 值,只有 submit 的任务才能。
- 缺少异常处理链。异步任务出错了,get() 时才感知到。
// Future 的局限:需要嵌套等待
Future<List<User>> usersFuture = executor.submit(this::fetchUsers);
List<User> users = usersFuture.get(); // 阻塞等
for (User user : users) {
Future<Order> orderFuture = executor.submit(() -> fetchOrder(user.getId()));
Order order = orderFuture.get(); // 再次阻塞
// ...
}
// 这段代码是串行的!
CompletableFuture:链式异步编程
Java 8 引入的 CompletableFuture 解决了这些问题。它不仅实现了 Future,还实现了 CompletionStage 接口,支持丰富的链式操作。
创建 CompletableFuture
// 方式一:手动完成
CompletableFuture<String> cf = new CompletableFuture<>();
cf.complete("结果"); // 手动设置结果
// 方式二:使用 supplyAsync(使用默认线程池 ForkJoinPool.commonPool)
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "Hello");
// 方式三:使用 supplyAsync(指定线程池)
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> "Hello", executor);
// 方式四:使用 runAsync(无返回值)
CompletableFuture<Void> cf3 = CompletableFuture.runAsync(() -> {
System.out.println("异步执行");
});
thenApply:转换结果
thenApply 用函数转换 CompletableFuture 的结果:
CompletableFuture<Integer> cf = CompletableFuture
.supplyAsync(() -> "Hello") // 返回 String
.thenApply(s -> s.length()); // 转为 Integer
System.out.println(cf.get()); // 5
thenCompose:扁平化链式调用
当转换函数返回另一个 CompletableFuture 时,用 thenCompose 避免嵌套:
// 错误写法:嵌套 Future
cf.thenApply(userId -> fetchUser(userId)); // 返回 CompletableFuture<CompletableFuture<User>>
// 正确写法:扁平化
cf.thenCompose(userId -> fetchUserAsync(userId)); // 返回 CompletableFuture<User>
// 实际例子:先查用户 ID,再异步查询用户详情
CompletableFuture<Integer> userIdFuture = CompletableFuture
.supplyAsync(() -> getUserIdFromSession());
CompletableFuture<User> userFuture = userIdFuture
.thenCompose(userId -> userService.findByIdAsync(userId));
thenCombine:组合两个 Future
thenCombine 用于等待两个独立的 CompletableFuture 完成后进行组合:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combined = future1
.thenCombine(future2, (s1, s2) -> s1 + " " + s2);
System.out.println(combined.get()); // Hello World
allOf / anyOf:批量等待
等待多个 CompletableFuture:
// 等待所有 Future 完成
CompletableFuture.allOf(f1, f2, f3).join();
// 任意一个完成就继续
CompletableFuture.anyOf(f1, f2, f3).join();
异常处理
CompletableFuture 提供了优雅的异常处理机制:
exceptionally:异常时提供默认值
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (Math.random() < 0.5) throw new RuntimeException("出错了");
return "成功";
})
.exceptionally(ex -> "默认值"); // 出错时返回默认值
System.out.println(future.get()); // 50%概率"成功",50%概率"默认值"
handle:无论成功失败都处理
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.handle((result, ex) -> {
if (ex != null) {
return "出错了:" + ex.getMessage();
}
return result.toUpperCase();
});
whenComplete:副作用处理
future.whenComplete((result, ex) -> {
if (ex != null) {
logger.error("异步任务失败", ex);
} else {
logger.info("异步任务成功,结果={}", result);
}
});
真实案例:电商详情页聚合调用
电商详情页需要展示商品信息、库存、价格、评价等数据。如果用同步方式串行调用:
// 同步方式:串行调用,总耗时 = t1 + t2 + t3 + t4
Product product = productService.getProduct(id);
Price price = priceService.getPrice(id);
Stock stock = stockService.getStock(id);
List<Review> reviews = reviewService.getReviews(id);
用 CompletableFuture 并行调用:
public ProductVO getProductDetail(Long productId) {
// 1. 查询商品基本信息
CompletableFuture<Product> productFuture = CompletableFuture
.supplyAsync(() -> productService.getProduct(productId));
// 2. 查询价格(依赖商品信息)
CompletableFuture<Price> priceFuture = productFuture
.thenCompose(p -> CompletableFuture
.supplyAsync(() -> priceService.getPrice(p.getPriceId())));
// 3. 查询库存(依赖商品信息)
CompletableFuture<Stock> stockFuture = productFuture
.thenCompose(p -> CompletableFuture
.supplyAsync(() -> stockService.getStock(p.getId())));
// 4. 查询评价(不依赖其他结果,可并行)
CompletableFuture<List<Review>> reviewFuture = CompletableFuture
.supplyAsync(() -> reviewService.getReviews(productId));
// 等待所有结果
CompletableFuture.allOf(productFuture, priceFuture,
stockFuture, reviewFuture).join();
// 组装返回
return new ProductVO(
productFuture.join(),
priceFuture.join(),
stockFuture.join(),
reviewFuture.join()
);
}
假设各服务调用耗时:
- 串行调用:440ms
- 并行调用:max(100, 80+100, 60+100, 200) = 300ms(考虑到价格和库存依赖商品查询)
优化效果显著。
线程池配置
CompletableFuture 默认使用 ForkJoinPool.commonPool,这是一个共享的、默认大小为 CPU核心数 - 1 的线程池。
问题:所有异步任务共享同一个池,如果某个任务阻塞,会影响其他任务。
最佳实践:为不同类型的任务配置不同的线程池。
// IO 密集型任务:线程数 = CPU核心数 * (1 + IO时间/CPU时间)
private static final ExecutorService IO_POOL = new ThreadPoolExecutor(
50, 100, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactoryBuilder()
.setNameFormat("io-pool-%d")
.build()
);
// CPU 密集型任务:线程数 = CPU核心数 + 1
private static final ExecutorService CPU_POOL = new ThreadPoolExecutor(
10, 20, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactoryBuilder()
.setNameFormat("cpu-pool-%d")
.build()
);
// 使用指定的线程池
CompletableFuture.supplyAsync(() -> doIO(), IO_POOL)
.thenApplyAsync(this::processCPU, CPU_POOL);
总结与延伸
CompletableFuture 是 Java 异步编程的利器:
核心优势:
- 链式调用,代码清晰
- 丰富的组合操作
- 灵活的异常处理
使用场景:
- 并行调用多个无依赖的服务
- 异步流程编排
- 链式依赖的服务调用
注意事项:
- 不要忘记
join() 或 get(),否则任务可能不执行
- 合理配置线程池,避免共享池竞争
- 异步任务要有超时机制,防止无限等待
- 异常处理要兜底,避免静默失败
CompletableFuture 解决了异步编程的很多问题,但它仍然是命令式的——你需要告诉它每一步怎么走。对于更复杂的异步流程,比如循环、退出条件等,可以考虑响应式编程(Reactive Streams),比如 RxJava 或 Project Reactor。
那么问题来了:CompletableFuture 的 thenCompose 和 thenApply 的区别是什么?如果转换函数返回的是 User 而不是 CompletableFuture<User>,应该用哪个?这涉及到函数式编程中「扁平化」的概念。