Future/Promise 异步模式

想象你去一家网红餐厅吃饭,领位员说「前面还有 15 桌,您先逛逛商场,轮到您时会收到短信」。你不需要站在餐厅门口傻等,可以在商场里做任何事——这就是 Future 的隐喻:结果还没出来,但你知道它会来。

传统的同步调用像是在餐厅门口站着等,翻台了才轮到你。Future 则把「等待」变成了「异步通知」。

Future 接口:异步结果的占位符

Future 是 Java 5 引入的接口,代表一个异步计算的结果:

public interface Future<V> {
    // 获取结果,会阻塞直到结果可用
    V get() throws InterruptedException, ExecutionException;

    // 带超时获取结果
    V get(long timeout, TimeUnit unit) throws ...;

    // 判断任务是否完成
    boolean isDone();

    // 取消任务
    boolean cancel(boolean mayInterruptIfRunning);
}

基本使用

ExecutorService executor = Executors.newFixedThreadPool(10);

// 提交一个 Callable,返回 Future
Future<Integer> future = executor.submit(() -> {
    // 模拟耗时操作
    Thread.sleep(2000);
    return 42;
});

// 主线程可以干其他事
System.out.println("做点别的事...");

// 需要结果时调用 get(),会阻塞
Integer result = future.get();
System.out.println("结果是:" + result);

Future 的局限性

Future 解决了「获取异步结果」的问题,但它的能力很有限:

  1. 无法链式调用。拿到结果后想继续异步处理,没法直接 chain。
  2. 无法组合多个 Future。等 A、B 都完成了再执行 C,做不到。
  3. 无法手动完成。无法让别人帮你 set 值,只有 submit 的任务才能。
  4. 缺少异常处理链。异步任务出错了,get() 时才感知到。
// Future 的局限:需要嵌套等待
Future<List<User>> usersFuture = executor.submit(this::fetchUsers);
List<User> users = usersFuture.get(); // 阻塞等

for (User user : users) {
    Future<Order> orderFuture = executor.submit(() -> fetchOrder(user.getId()));
    Order order = orderFuture.get(); // 再次阻塞
    // ...
}
// 这段代码是串行的!

CompletableFuture:链式异步编程

Java 8 引入的 CompletableFuture 解决了这些问题。它不仅实现了 Future,还实现了 CompletionStage 接口,支持丰富的链式操作。

创建 CompletableFuture

// 方式一:手动完成
CompletableFuture<String> cf = new CompletableFuture<>();
cf.complete("结果"); // 手动设置结果

// 方式二:使用 supplyAsync(使用默认线程池 ForkJoinPool.commonPool)
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "Hello");

// 方式三:使用 supplyAsync(指定线程池)
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> "Hello", executor);

// 方式四:使用 runAsync(无返回值)
CompletableFuture<Void> cf3 = CompletableFuture.runAsync(() -> {
    System.out.println("异步执行");
});

thenApply:转换结果

thenApply 用函数转换 CompletableFuture 的结果:

CompletableFuture<Integer> cf = CompletableFuture
    .supplyAsync(() -> "Hello")           // 返回 String
    .thenApply(s -> s.length());          // 转为 Integer

System.out.println(cf.get()); // 5

thenCompose:扁平化链式调用

当转换函数返回另一个 CompletableFuture 时,用 thenCompose 避免嵌套:

// 错误写法:嵌套 Future
cf.thenApply(userId -> fetchUser(userId)); // 返回 CompletableFuture<CompletableFuture<User>>

// 正确写法:扁平化
cf.thenCompose(userId -> fetchUserAsync(userId)); // 返回 CompletableFuture<User>
// 实际例子:先查用户 ID,再异步查询用户详情
CompletableFuture<Integer> userIdFuture = CompletableFuture
    .supplyAsync(() -> getUserIdFromSession());

CompletableFuture<User> userFuture = userIdFuture
    .thenCompose(userId -> userService.findByIdAsync(userId));

thenCombine:组合两个 Future

thenCombine 用于等待两个独立的 CompletableFuture 完成后进行组合:

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<String> combined = future1
    .thenCombine(future2, (s1, s2) -> s1 + " " + s2);

System.out.println(combined.get()); // Hello World

allOf / anyOf:批量等待

等待多个 CompletableFuture:

// 等待所有 Future 完成
CompletableFuture.allOf(f1, f2, f3).join();

// 任意一个完成就继续
CompletableFuture.anyOf(f1, f2, f3).join();

异常处理

CompletableFuture 提供了优雅的异常处理机制:

exceptionally:异常时提供默认值

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        if (Math.random() < 0.5) throw new RuntimeException("出错了");
        return "成功";
    })
    .exceptionally(ex -> "默认值"); // 出错时返回默认值

System.out.println(future.get()); // 50%概率"成功",50%概率"默认值"

handle:无论成功失败都处理

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> "Hello")
    .handle((result, ex) -> {
        if (ex != null) {
            return "出错了:" + ex.getMessage();
        }
        return result.toUpperCase();
    });

whenComplete:副作用处理

future.whenComplete((result, ex) -> {
    if (ex != null) {
        logger.error("异步任务失败", ex);
    } else {
        logger.info("异步任务成功,结果={}", result);
    }
});

真实案例:电商详情页聚合调用

电商详情页需要展示商品信息、库存、价格、评价等数据。如果用同步方式串行调用:

// 同步方式:串行调用,总耗时 = t1 + t2 + t3 + t4
Product product = productService.getProduct(id);
Price price = priceService.getPrice(id);
Stock stock = stockService.getStock(id);
List<Review> reviews = reviewService.getReviews(id);

用 CompletableFuture 并行调用:

public ProductVO getProductDetail(Long productId) {
    // 1. 查询商品基本信息
    CompletableFuture<Product> productFuture = CompletableFuture
        .supplyAsync(() -> productService.getProduct(productId));

    // 2. 查询价格(依赖商品信息)
    CompletableFuture<Price> priceFuture = productFuture
        .thenCompose(p -> CompletableFuture
            .supplyAsync(() -> priceService.getPrice(p.getPriceId())));

    // 3. 查询库存(依赖商品信息)
    CompletableFuture<Stock> stockFuture = productFuture
        .thenCompose(p -> CompletableFuture
            .supplyAsync(() -> stockService.getStock(p.getId())));

    // 4. 查询评价(不依赖其他结果,可并行)
    CompletableFuture<List<Review>> reviewFuture = CompletableFuture
        .supplyAsync(() -> reviewService.getReviews(productId));

    // 等待所有结果
    CompletableFuture.allOf(productFuture, priceFuture,
                            stockFuture, reviewFuture).join();

    // 组装返回
    return new ProductVO(
        productFuture.join(),
        priceFuture.join(),
        stockFuture.join(),
        reviewFuture.join()
    );
}

假设各服务调用耗时:

服务耗时
商品服务100ms
价格服务80ms
库存服务60ms
评价服务200ms
  • 串行调用:440ms
  • 并行调用:max(100, 80+100, 60+100, 200) = 300ms(考虑到价格和库存依赖商品查询)

优化效果显著。

线程池配置

CompletableFuture 默认使用 ForkJoinPool.commonPool,这是一个共享的、默认大小为 CPU核心数 - 1 的线程池。

问题:所有异步任务共享同一个池,如果某个任务阻塞,会影响其他任务。

最佳实践:为不同类型的任务配置不同的线程池。

// IO 密集型任务:线程数 = CPU核心数 * (1 + IO时间/CPU时间)
private static final ExecutorService IO_POOL = new ThreadPoolExecutor(
    50, 100, 60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(1000),
    new ThreadFactoryBuilder()
        .setNameFormat("io-pool-%d")
        .build()
);

// CPU 密集型任务:线程数 = CPU核心数 + 1
private static final ExecutorService CPU_POOL = new ThreadPoolExecutor(
    10, 20, 60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(1000),
    new ThreadFactoryBuilder()
        .setNameFormat("cpu-pool-%d")
        .build()
);

// 使用指定的线程池
CompletableFuture.supplyAsync(() -> doIO(), IO_POOL)
                 .thenApplyAsync(this::processCPU, CPU_POOL);

总结与延伸

CompletableFuture 是 Java 异步编程的利器:

核心优势

  • 链式调用,代码清晰
  • 丰富的组合操作
  • 灵活的异常处理

使用场景

  • 并行调用多个无依赖的服务
  • 异步流程编排
  • 链式依赖的服务调用

注意事项

  1. 不要忘记 join()get(),否则任务可能不执行
  2. 合理配置线程池,避免共享池竞争
  3. 异步任务要有超时机制,防止无限等待
  4. 异常处理要兜底,避免静默失败

CompletableFuture 解决了异步编程的很多问题,但它仍然是命令式的——你需要告诉它每一步怎么走。对于更复杂的异步流程,比如循环、退出条件等,可以考虑响应式编程(Reactive Streams),比如 RxJava 或 Project Reactor。

那么问题来了:CompletableFuture 的 thenComposethenApply 的区别是什么?如果转换函数返回的是 User 而不是 CompletableFuture<User>,应该用哪个?这涉及到函数式编程中「扁平化」的概念。