CompletableFuture 详解¶
它实现了 Future 和 CompletionStage 接口,解决了传统 Future 的阻塞 get() 和无法组合问题,通过 supplyAsync 等方法异步执行任务,默认使用 ForkJoinPool.commonPool() 线程池。
核心优势
CompletableFuture 允许非阻塞回调和任务组合,避免串行等待,提高接口响应速度。为什么有效?因为它支持将无依赖任务并行运行(如 allOf),或链式依赖(如 thenCompose),利用多核 CPU 资源。
创建方式
使用静态工厂方法创建异步任务:
- runAsync(Runnable):无返回值,适合纯操作。
- supplyAsync(Supplier):有返回值,核心方法。
示例代码(推荐用自定义线程池,避免默认池竞争):
import java.util.concurrent.*;
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) { /* ignore */ }
return "Hello, CompletableFuture!";
}, executor); // 关键:传入自定义 executor,避免 ForkJoinPool 饥饿
String result = future.get(5, TimeUnit.SECONDS); // 带超时,避免无限阻塞
System.out.println(result); // 输出: Hello, CompletableFuture!
为什么用自定义池?默认 ForkJoinPool 是全局共享,多库竞争导致线程饥饿,自定义池隔离资源、控制大小。
结果处理
链式处理结果:
- thenApply(Function):转换结果,支持流式调用。
- thenAccept(Consumer):消费结果,无返回。
- thenRun(Runnable):后续无参操作。
示例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello")
.thenApply(s -> s + " world") // 转换: hello world
.thenAccept(System.out::println); // 消费并打印
future.join(); // 非阻塞 join() 推荐代替 get()
机制:这些方法返回新 CompletableFuture,形成链;不指定 executor 时复用上游线程池,提高效率。
异常处理
用 handle(BiFunction) 或 exceptionally(Function) 处理 Throwable,确保链不中断。
示例(关键:handle 同时处理成功/失败):
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (true) throw new RuntimeException("Error!");
return "success";
}).handle((res, ex) -> res != null ? res : "fallback"); // ex 非空时返回 fallback
System.out.println(future.join()); // 输出: fallback
为什么这样?传统 Future 无内置异常回调,handle 允许恢复结果,继续下游任务。
任务组合
- thenCompose:串行依赖,前结果传后任务。
- thenCombine:并行合并,两任务独立,结果 BiFunction 合并。
- acceptEither:任一完成触发(无序赛跑)。
示例(thenCombine 并行合并):
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "world");
CompletableFuture<String> combined = f1.thenCombine(f2, (s1, s2) -> s1 + " " + s2);
System.out.println(combined.join()); // hello world
区别:thenCompose 顺序执行(依赖),thenCombine 并行(无依赖),适合如用户详情后查订单的场景。 多任务并行
- allOf:所有完成才结束(汇总场景)。
- anyOf:任一完成(快速失败)。 示例(allOf 并行 2 任务):
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { /* task1, sleep 2s */ return "A"; });
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { /* task2, sleep 1s */ return "B"; });
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2);
all.join(); // 等待总计 ~2s(非串行 3s)
System.out.println("All done: " + f1.join() + f2.join()); // All done: AB
为什么优于串行?任务独立,利用并行缩短总时长,join() 抛 CompletionException 若任一失败。
最佳实践
优先自定义 Executor、用 join() 代替 get()(非检查异常)、链式异常处理。项目经验:Spring 中结合 @Async 返回 CompletableFuture,实现服务间异步调用,提升 TPS。
示例代码
以下完整可运行代码模拟数据(3班级、随机成绩),并行计算每个班级的科目平均分,最终输出全校汇总 Map<班级, Map<科目, 平均分>>。
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class SchoolAverageScore {
// 科目定义
private static final List<String> SUBJECTS = Arrays.asList("语文", "数学", "英语");
// 模拟学生成绩:Map<班级, List<Map<科目, 分数>>>
private static Map<String, List<Map<String, Double>>> generateData() {
Map<String, List<Map<String, Double>>> data = new HashMap<>();
Random rand = new Random();
for (int i = 1; i <= 3; i++) {
String className = "班级" + i;
List<Map<String, Double>> students = IntStream.range(0, 30) // 30 名学生/班
.mapToObj(j -> {
Map<String, Double> score = new HashMap<>();
SUBJECTS.forEach(sub -> score.put(sub, 60.0 + rand.nextDouble() * 40)); // 60-100分
return score;
})
.collect(Collectors.toList());
data.put(className, students);
}
return data;
}
// 计算单个班级各科目平均分:Map<科目, 平均分>
private static Map<String, Double> calculateClassAverages(String className, List<Map<String, Double>> students) {
System.out.println(Thread.currentThread().getName() + " 开始计算 " + className);
try { Thread.sleep(2000 + new Random().nextInt(1000)); } catch (InterruptedException e) { /* ignore */ } // 模拟耗时
Map<String, Double> averages = new HashMap<>();
SUBJECTS.forEach(subject -> {
double avg = students.stream()
.mapToDouble(s -> s.getOrDefault(subject, 0.0))
.average().orElse(0.0);
averages.put(subject, avg);
});
System.out.println(Thread.currentThread().getName() + " " + className + " 计算完成");
return averages;
}
public static void main(String[] args) throws Exception {
// 数据
Map<String, List<Map<String, Double>>> schoolData = generateData();
// 自定义线程池:核心10,最大20,避免默认池问题
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), r -> new Thread(r, "ClassCalc-Thread"));
long start = System.currentTimeMillis();
// 并行计算每个班级:supplyAsync 返回 CompletableFuture<Map<String, Double>>
List<CompletableFuture<Map<String, Double>>> futures = schoolData.entrySet().stream()
.map(entry -> CompletableFuture.supplyAsync(() -> calculateClassAverages(entry.getKey(), entry.getValue()), executor))
.collect(Collectors.toList());
// allOf 等待所有班级完成(总时间 ~ 最慢班级 3s,非串行 9s)
CompletableFuture<Void> allDone = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allDone.join(); // 非阻塞 join,推荐代替 get()
// 汇总结果:班级 -> 科目平均分
Map<String, Map<String, Double>> results = new HashMap<>();
for (int i = 0; i < futures.size(); i++) {
String className = schoolData.keySet().toArray(new String[0])[i]; // 匹配班级
results.put(className, futures.get(i).get()); // 已完成,可安全 get
}
long end = System.currentTimeMillis();
System.out.println("全校平均分汇总(总耗时: " + (end - start) + " ms): " + results);
// 示例输出: {班级1={语文=85.2, 数学=78.9, 英语=82.1}, ...}
executor.shutdown();
}
}
关键语句解释:
- supplyAsync(() -> calculateClassAverages(...), executor):异步计算单个班级,返回 Future
- CompletableFuture.allOf(futures):并行启动所有班级任务,Void Future 完成时所有 ready;为什么?无序并行,时间复杂度 O(max(班级耗时))。 + allDone.join():主线程等待总完成,若任一异常抛 CompletionException,可用 handle 处理。
- 汇总用 get():allOf 后安全获取,避免超时阻塞。