#CompletableFuture 异步编程
Future 是 Java 5 引入的异步编程模型,但它的功能有限:只能通过 get() 阻塞获取结果,无法组合多个 Future。CompletableFuture 在 JDK 8 引入,提供了强大的链式异步编程能力。
#Future 的局限性
#基本 Future
// Future 的问题:无法链式调用
Future<String> future = executor.submit(() -> {
// 异步执行
return fetchData();
});
String result = future.get(); // 阻塞等待
// 如果需要组合操作...
String result2 = future.get() + process(result); // 仍然阻塞#Future 的问题
flowchart LR
A["Future 问题"] --> B["无法链式调用"]
A --> C["无法组合多个 Future"]
A --> D["不支持回调"]
A --> E["无法手动完成"]#CompletableFuture 入门
#创建 CompletableFuture
// 1. 直接创建
CompletableFuture<String> future = new CompletableFuture<>();
// 2. 静态方法创建
CompletableFuture<String> completed = CompletableFuture.completedFuture("result");
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
// 异步执行,无返回值
});
// 3. supplyAsync
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
return "Hello";
});#获取结果
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "result");
// 1. 阻塞获取
String result = future.get(); // 抛出受检异常
String result2 = future.get(5, TimeUnit.SECONDS); // 带超时
// 2. 非阻塞获取
future.thenAccept(result -> {
System.out.println("Result: " + result);
});#链式调用
#thenApply:转换
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(() -> "123") // String
.thenApply(Integer::parseInt); // Integer#thenCompose:扁平化
// thenCompose 用于返回另一个 CompletableFuture 的场景
CompletableFuture<User> userFuture = CompletableFuture
.supplyAsync(() -> userId)
.thenCompose(this::fetchUser); // 返回 CompletableFuture<User>
// thenApply 用于返回普通值的场景
CompletableFuture<String> nameFuture = userFuture
.thenApply(User::getName); // 返回 String#thenCombine:组合
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combined = future1
.thenCombine(future2, (s1, s2) -> s1 + " " + s2); // "Hello World"#异常处理
#异常捕获
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(() -> riskyOperation())
.exceptionally(ex -> {
System.out.println("Error: " + ex.getMessage());
return 0; // 默认值
});#handle:无论成功失败都处理
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(() -> riskyOperation())
.handle((result, ex) -> {
if (ex != null) {
System.out.println("Error: " + ex.getMessage());
return 0;
}
return result;
});#测试异常
CompletableFuture<Integer> future = CompletableFuture
.failedFuture(new RuntimeException("Error"));
CompletableFuture<Integer> defaultFuture = future.exceptionally(ex -> {
return 0;
});#实战示例
#异步调用链
// 场景:用户下单 -> 查询用户 -> 查询商品 -> 计算价格
public CompletableFuture<OrderResult> createOrder(Long userId, Long productId) {
return CompletableFuture
.supplyAsync(() -> fetchUser(userId)) // 查询用户
.thenCompose(user -> // 组合商品查询
CompletableFuture.supplyAsync(() -> fetchProduct(productId))
.thenApply(product -> new OrderContext(user, product)))
.thenApply(context -> calculatePrice(context)) // 计算价格
.thenApply(context -> saveOrder(context)); // 保存订单
}#并行执行
// 场景:同时查询多个接口
public UserProfile getUserProfile(Long userId) {
// 1. 查询用户基本信息
CompletableFuture<User> userFuture =
CompletableFuture.supplyAsync(() -> fetchUser(userId));
// 2. 查询用户订单
CompletableFuture<List<Order>> ordersFuture =
CompletableFuture.supplyAsync(() -> fetchOrders(userId));
// 3. 查询用户偏好
CompletableFuture<Preferences> prefsFuture =
CompletableFuture.supplyAsync(() -> fetchPreferences(userId));
// 等待所有完成
User user = userFuture.join();
List<Order> orders = ordersFuture.join();
Preferences prefs = prefsFuture.join();
return new UserProfile(user, orders, prefs);
}#allOf 和 anyOf
List<CompletableFuture<String>> futures = Arrays.asList(
CompletableFuture.supplyAsync(() -> fetchFrom("API1")),
CompletableFuture.supplyAsync(() -> fetchFrom("API2")),
CompletableFuture.supplyAsync(() -> fetchFrom("API3"))
);
// 等待所有完成
CompletableFuture<Void> allDone = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
allDone.join(); // 阻塞直到所有完成
// 获取所有结果
List<String> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
// 任意一个完成(第一个返回)
CompletableFuture<Object> anyDone = CompletableFuture.anyOf(
futures.toArray(new CompletableFuture[0])
);
Object first = anyDone.join();#线程池配置
#默认线程池
// 默认使用 ForkJoinPool.commonPool()
// 线程数 = Runtime.getRuntime().availableProcessors() - 1
CompletableFuture.supplyAsync(() -> doSomething());
// 使用公共线程池#自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> doSomething(), executor);
// 链式调用默认使用同一个线程池
future.thenApply(this::process); // 仍在 executor 中执行#推荐实践
// IO 密集型任务:使用更大的线程池
ExecutorService ioExecutor = new ThreadPoolExecutor(
50, 100, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000)
);
// 异步方法封装
public <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier) {
return CompletableFuture.supplyAsync(supplier, ioExecutor);
}#取消操作
#取消 CompletableFuture
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
return "result";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Cancelled");
}
});
// 取消
future.cancel(true);
// 检查状态
if (future.isCompletedExceptionally()) {
// 处理取消
}#性能考虑
#避免阻塞
// 错误:在异步链中使用 get
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> fetchData())
.thenApply(data -> {
String result = anotherFuture.get(); // 阻塞!
return process(data, result);
});
// 正确:继续使用异步链
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> fetchData())
.thenCompose(data ->
anotherFuture.thenApply(result -> process(data, result))
);#避免过度嵌套
// 错误:回调地狱
future1.thenAccept(result1 -> {
future2.thenAccept(result2 -> {
future3.thenAccept(result3 -> {
// 嵌套太深
});
});
});
// 正确:扁平化链式调用
CompletableFuture
.supplyAsync(() -> result1)
.thenCompose(r1 -> future2.thenApply(r2 -> combine(r1, r2)))
.thenCompose(result -> future3.thenApply(r3 -> combineAll(result, r3)))
.thenAccept(finalResult -> {});#本章总结
核心要点:
- Future 的局限:无法链式调用、组合、回调
- CompletableFuture:支持链式调用的异步编程模型
- thenApply:同步转换
- thenCompose:异步扁平化
- thenCombine:组合两个 Future
- exceptionally/handle:异常处理
- allOf/anyOf:并行执行和竞态处理
CompletableFuture 是现代 Java 异步编程的核心工具。下一节我们将讲解 Fork/Join 框架。