跳转至

CompletableFuture 详解

它实现了 Future 和 CompletionStage 接口,解决了传统 Future 的阻塞 get() 和无法组合问题,通过 supplyAsync 等方法异步执行任务,默认使用 ForkJoinPool.commonPool() 线程池。

核心优势

CompletableFuture 允许非阻塞回调和任务组合,避免串行等待,提高接口响应速度。为什么有效?因为它支持将无依赖任务并行运行(如 allOf),或链式依赖(如 thenCompose),利用多核 CPU 资源。

创建方式

使用静态工厂方法创建异步任务:

  • runAsync(Runnable):无返回值,适合纯操作。
  • supplyAsync(Supplier):有返回值,核心方法。

示例代码(推荐用自定义线程池,避免默认池竞争):

import java.util.concurrent.*;

ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try { Thread.sleep(1000); } catch (InterruptedException e) { /* ignore */ }
    return "Hello, CompletableFuture!";
}, executor);  // 关键:传入自定义 executor,避免 ForkJoinPool 饥饿

String result = future.get(5, TimeUnit.SECONDS);  // 带超时,避免无限阻塞
System.out.println(result);  // 输出: Hello, CompletableFuture!

为什么用自定义池?默认 ForkJoinPool 是全局共享,多库竞争导致线程饥饿,自定义池隔离资源、控制大小。

结果处理

链式处理结果:

  • thenApply(Function):转换结果,支持流式调用。
  • thenAccept(Consumer):消费结果,无返回。
  • thenRun(Runnable):后续无参操作。

示例:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello")
    .thenApply(s -> s + " world")  // 转换: hello world
    .thenAccept(System.out::println);  // 消费并打印

future.join();  // 非阻塞 join() 推荐代替 get()

机制:这些方法返回新 CompletableFuture,形成链;不指定 executor 时复用上游线程池,提高效率。

异常处理

用 handle(BiFunction) 或 exceptionally(Function) 处理 Throwable,确保链不中断。 ​

示例(关键:handle 同时处理成功/失败):

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (true) throw new RuntimeException("Error!");
    return "success";
}).handle((res, ex) -> res != null ? res : "fallback");  // ex 非空时返回 fallback

System.out.println(future.join());  // 输出: fallback

为什么这样?传统 Future 无内置异常回调,handle 允许恢复结果,继续下游任务。

任务组合

  • thenCompose:串行依赖,前结果传后任务。
  • thenCombine:并行合并,两任务独立,结果 BiFunction 合并。
  • acceptEither:任一完成触发(无序赛跑)。 ​

示例(thenCombine 并行合并):

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "world");
CompletableFuture<String> combined = f1.thenCombine(f2, (s1, s2) -> s1 + " " + s2);

System.out.println(combined.join());  // hello world

区别:thenCompose 顺序执行(依赖),thenCombine 并行(无依赖),适合如用户详情后查订单的场景。 ​ 多任务并行

  • allOf:所有完成才结束(汇总场景)。
  • anyOf:任一完成(快速失败)。 ​ 示例(allOf 并行 2 任务):
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { /* task1, sleep 2s */ return "A"; });
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { /* task2, sleep 1s */ return "B"; });

CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2);
all.join();  // 等待总计 ~2s(非串行 3s)
System.out.println("All done: " + f1.join() + f2.join());  // All done: AB

为什么优于串行?任务独立,利用并行缩短总时长,join() 抛 CompletionException 若任一失败。

最佳实践

优先自定义 Executor、用 join() 代替 get()(非检查异常)、链式异常处理。项目经验:Spring 中结合 @Async 返回 CompletableFuture,实现服务间异步调用,提升 TPS。

示例代码

以下完整可运行代码模拟数据(3班级、随机成绩),并行计算每个班级的科目平均分,最终输出全校汇总 Map<班级, Map<科目, 平均分>>。

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class SchoolAverageScore {
    // 科目定义
    private static final List<String> SUBJECTS = Arrays.asList("语文", "数学", "英语");

    // 模拟学生成绩:Map<班级, List<Map<科目, 分数>>>
    private static Map<String, List<Map<String, Double>>> generateData() {
        Map<String, List<Map<String, Double>>> data = new HashMap<>();
        Random rand = new Random();
        for (int i = 1; i <= 3; i++) {
            String className = "班级" + i;
            List<Map<String, Double>> students = IntStream.range(0, 30)  // 30 名学生/班
                .mapToObj(j -> {
                    Map<String, Double> score = new HashMap<>();
                    SUBJECTS.forEach(sub -> score.put(sub, 60.0 + rand.nextDouble() * 40));  // 60-100分
                    return score;
                })
                .collect(Collectors.toList());
            data.put(className, students);
        }
        return data;
    }

    // 计算单个班级各科目平均分:Map<科目, 平均分>
    private static Map<String, Double> calculateClassAverages(String className, List<Map<String, Double>> students) {
        System.out.println(Thread.currentThread().getName() + " 开始计算 " + className);
        try { Thread.sleep(2000 + new Random().nextInt(1000)); } catch (InterruptedException e) { /* ignore */ }  // 模拟耗时
        Map<String, Double> averages = new HashMap<>();
        SUBJECTS.forEach(subject -> {
            double avg = students.stream()
                .mapToDouble(s -> s.getOrDefault(subject, 0.0))
                .average().orElse(0.0);
            averages.put(subject, avg);
        });
        System.out.println(Thread.currentThread().getName() + " " + className + " 计算完成");
        return averages;
    }

    public static void main(String[] args) throws Exception {
        // 数据
        Map<String, List<Map<String, Double>>> schoolData = generateData();

        // 自定义线程池:核心10,最大20,避免默认池问题
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(), r -> new Thread(r, "ClassCalc-Thread"));

        long start = System.currentTimeMillis();

        // 并行计算每个班级:supplyAsync 返回 CompletableFuture<Map<String, Double>>
        List<CompletableFuture<Map<String, Double>>> futures = schoolData.entrySet().stream()
            .map(entry -> CompletableFuture.supplyAsync(() -> calculateClassAverages(entry.getKey(), entry.getValue()), executor))
            .collect(Collectors.toList());

        // allOf 等待所有班级完成(总时间 ~ 最慢班级 3s,非串行 9s)
        CompletableFuture<Void> allDone = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        allDone.join();  // 非阻塞 join,推荐代替 get()

        // 汇总结果:班级 -> 科目平均分
        Map<String, Map<String, Double>> results = new HashMap<>();
        for (int i = 0; i < futures.size(); i++) {
            String className = schoolData.keySet().toArray(new String[0])[i];  // 匹配班级
            results.put(className, futures.get(i).get());  // 已完成,可安全 get
        }

        long end = System.currentTimeMillis();
        System.out.println("全校平均分汇总(总耗时: " + (end - start) + " ms): " + results);
        // 示例输出: {班级1={语文=85.2, 数学=78.9, 英语=82.1}, ...}

        executor.shutdown();
    }
}

关键语句解释:

  • supplyAsync(() -> calculateClassAverages(...), executor):异步计算单个班级,返回 Future,传入自定义 executor 隔离线程。
  • CompletableFuture.allOf(futures):并行启动所有班级任务,Void Future 完成时所有 ready;为什么?无序并行,时间复杂度 O(max(班级耗时))。 ​+ allDone.join():主线程等待总完成,若任一异常抛 CompletionException,可用 handle 处理。
  • 汇总用 get():allOf 后安全获取,避免超时阻塞。