CompletableFuture并行处理任务
CompletableFuture并行处理任务
- CompletableFuture
- 基本概念与特性
- 创建CompletableFuture实例
- 任务编排方法
- 线程池选择
- 默认线程池
- 自定义线程池
- 线程池配置建议
- 代码示例
- 同步代码
CompletableFuture
基本概念与特性
异步执行: CompletableFuture允许任务在后台线程中异步执行,不会阻塞主线程,从而提高了应用程序的响应性和性能。
可组合性: CompletableFuture的操作可以组合成一个或多个CompletableFuture对象,构成复杂的异步计算链。这包括结果的转换、组合以及异常处理等。
异常处理: 通过exceptionally()等方法,CompletableFuture可以捕获计算中的异常并返回默认值,或者通过handle()等方法同时处理正常结果和异常。
取消与超时: 支持取消异步任务和设置超时时间,避免任务的无限等待。
非阻塞式等待: 提供了非阻塞式的等待方法,如join()和getNow(),可以在不阻塞当前线程的情况下获取任务的结果。
并行处理: 在处理多个耗时操作时,如I/O操作、数据库访问或网络请求,CompletableFuture可以并行执行这些任务,提高系统吞吐量和响应能力。
创建CompletableFuture实例
supplyAsync():用于创建返回结果的异步任务。例如:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {// 执行异步任务并返回结果return "Hello, CompletableFuture!";
});
runAsync():用于创建不返回结果的异步任务。例如:
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {// 执行异步任务System.out.println("Task running asynchronously");
});
任务编排方法
转换类方法:
- thenApply() / thenApplyAsync():将上一个任务的结果转换为新的结果。thenApply()在同一个线程中执行,而 thenApplyAsync()可能在新的线程中执行。
- thenAccept() / thenAcceptAsync():处理上一个任务的结果,但不返回新的值。thenAccept()在同一个线程中执行,而thenAcceptAsync()可能在新的线程中执行。
- thenRun() / thenRunAsync():在上一个任务完成后执行一个操作,不使用上一个任务的结果。
组合类方法:
- thenCompose() / thenComposeAsync():将两个CompletableFuture组合成一个。当一个任务依赖另一个任务的结果时,可以使用此方法。
- thenCombine() / thenCombineAsync():组合两个独立任务的结果。需要两个独立任务的结果进行计算时,可以使用此方法。
多任务协调方法:
- thenCompose() / thenComposeAsync():将两个CompletableFuture组合成一个。当一个任务依赖另一个任务的结果时,可以使用此方法。
- thenCombine() / thenCombineAsync():组合两个独立任务的结果。需要两个独立任务的结果进行计算时,可以使用此方法。
异常处理机制:
- exceptionally():处理异常并提供默认值。当CompletableFuture中的任务抛出异常时,可以捕获该异常并返回一个默认值。
- handle() / handleAsync():处理正常结果和异常。无论任务是否成功完成,都可以使用此方法处理结果或异常。
- whenComplete() / whenCompleteAsync():任务完成时的回调(正常或异常)。可以在任务完成后执行一些清理工作或记录日志等。
线程池选择
默认线程池
如果未显式指定线程池,CompletableFuture 默认使用 ForkJoinPool.commonPool()。
适用场景:
- 计算密集型任务(如复杂数学计算、数据处理)。
- 短期任务且任务量较小。
特点:
- 线程池大小为 Runtime.getRunTime().availableProcessors()-1(如 4 核 CPU 默认 3 线程)。
- 所有 CompletableFuture 共享同一个公共池,可能导致资源竞争。
自定义线程池
通过显式传递 Executor,可以更精细地控制线程池行为。
适用场景:
- I/O 密集型任务(如网络请求、数据库查询)。
- 需要隔离任务类型(避免公共池资源耗尽)。
- 长期运行或阻塞任务。
// 创建自定义线程池
ExecutorService customExecutor = new ThreadPoolExecutor(10, // 核心线程数50, // 最大线程数60L, // 空闲线程存活时间TimeUnit.SECONDS,new LinkedBlockingQueue<>(1000), // 有界队列(容量 1000)new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);// 使用自定义线程池
CompletableFuture.supplyAsync(() -> {return queryDataBase();
}, customExecutor);
线程池配置建议
参数 | 计算密集型任务 | IO密集型任务 |
---|---|---|
核心线程数 | N+1 | 2N+1 |
最大线程数 | 等于核心线程数 | 较高(如 100~200) |
队列类型 | 同步队列(SynchronousQueue) | 有界队列(如 LinkedBlockingQueue) |
拒绝策略 | 抛异常(AbortPolicy) | 级或重试(如 CallerRunsPolicy) |
队列与拒绝策略
- 有界队列:防止任务无限堆积导致内存溢出(OOM)。
- 拒绝策略:
- CallerRunsPolicy:主线程执行被拒绝的任务(避免任务丢失)。
- DiscardOldestPolicy:丢弃队列中最旧的任务(适合实时性要求高的场景)。
代码示例
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;public class ApplicationTest {public static void main(String[] args) {long start = System.currentTimeMillis();System.out.println("================ begin ===================");// 1. 创建异步任务列表List<CompletableFuture<List<String>>> futures = new ArrayList<>();// 2. 添加异步任务(并行查询不同数据库)futures.add(CompletableFuture.supplyAsync(() -> queryDatabase("DB1")));futures.add(CompletableFuture.supplyAsync(() -> queryDatabase("DB2")));futures.add(CompletableFuture.supplyAsync(() -> queryDatabase("DB3")));// 3. 等待所有任务完成,并合并结果CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();// 4. 处理最终结果List<String> resultList = new ArrayList<>();for (CompletableFuture<List<String>> future : futures) {try {List<String> oneResult = future.get();resultList.addAll(oneResult);} catch (Exception e) {System.out.println("批量查询异常 : " + e);}}//5. 阻塞获取最终结果(实际中应避免在主线程阻塞)System.out.println(System.currentTimeMillis() - start);System.out.println("合并后的结果数量: " + resultList.size());System.out.println(resultList);System.out.println("================ end ===================");// 或者// 等待所有任务完成,并合并结果CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));// 最终处理结果List<String> mergedResults = allFutures.thenApply(v ->futures.stream().flatMap(future -> future.join().stream()).collect(Collectors.toList())).join();System.out.println(mergedResults);}// 模拟数据库查询方法private static List<String> queryDatabase(String source) {// 模拟耗时操作try { Thread.sleep(2000); } catch (InterruptedException ignored) {}return Arrays.asList(source + "-Material1", source + "-Material2");}}
import java.util.*;
import java.util.concurrent.*;public class ApplicationTest {public static void main(String[] args) {System.out.println("==================== begin ====================");long start = System.currentTimeMillis();CompletableFuture<List<String>> future1 = CompletableFuture.supplyAsync(() -> {return queryStrDatabase("Str");});System.out.println("==================== running ====================");CompletableFuture<List<Integer>> future2 = CompletableFuture.supplyAsync(() -> {return queryIntDatabase();});List<String> strList = null;try {strList = future1.get();} catch (InterruptedException | ExecutionException e) {System.out.println("Str is fail");}List<Integer> intList = null;try {intList = future2.get();} catch (InterruptedException | ExecutionException e) {System.out.println("Int is fail");}System.out.println(strList);System.out.println(intList);System.out.println(System.currentTimeMillis() - start);System.out.println("==================== end ====================");}// 模拟数据库查询方法private static List<String> queryStrDatabase(String source) {// 模拟耗时操作try { Thread.sleep(2000); } catch (InterruptedException ignored) {}return Arrays.asList(source + "-Material1", source + "-Material2", source + "-Material3");}// 模拟数据库查询方法private static List<Integer> queryIntDatabase() {// 模拟耗时操作try { Thread.sleep(2000); } catch (InterruptedException ignored) {}return Arrays.asList(11, 21, 31);}}
同步代码
所谓的同步代码,也就是从我们接受到请求直到请求返回都是由一个线程处理的,如果处理代码中有阻塞那么这个时候此线程就会阻塞,在请求量比较大的情况下,也就是并发场景,这个时候会有很多的请求发过来,那么tomcat只有两百的线程,如果线程阻塞时间较长,那么tomcat的线程会被全部阻塞,导致无法处理外部请求,进而系统的吞吐量就会很低。
在这里我举个实际生活中的场景,如果你要下单,那么需要调用 用户服务(查询用户信息)—>商品服务(查询商品信息)—>积分服务(修改积分)—>订单服务(生成订单)—>库存服务(减库存)。要完成这一个任务时,需要先完成任务1,再完成任务2,直到完成任务n。那么所消耗的时间就是 :time > 任务1 + 任务2 + … + 任务n。
伪代码模拟:
JSONPObject createOrder(Integer userId,Integer goodsId){// 1、调用用户服务,获取用户信息User user = getUserById(userId); // 2s// 2、调用商品服务,获取商品详情Goods goods = getGoodsById(goodsId); // 2s// 3、调用积分服务,修改积分updatePoints(userId); // 2s// 4、调用订单服务,生成订单createOrderByUserAndGoods(user,goods); // 2s// 5、调用库存服务,修改库存updateInventoryByGoodsId(goodsId); //2sreturn null;
}
对于这个场景,我们可以开启了四个线程处理,在这里我将订单服务放到了用户服务和商品服务完成之后处理,这里和你的系统设计有关系,也可以和其他服务同时并发处理,那么经过这次优化后,处理时间 time > 前四个服务中最长的 + 订单服务,这样既完成了代码串行问题的优化。
伪代码模拟:
JSONPObject createOrder2(Integer userId, Integer goodsId) {// 1、调用用户服务,获取用户信息CompletableFuture<User> future1 = CompletableFuture.supplyAsync(() -> {// 2sreturn getUserById(userId);});// 2、调用商品服务,获取商品详情CompletableFuture<Goods> future2 = CompletableFuture.supplyAsync(() -> {return getGoodsById(goodsId); // 2s});// 3、调用积分服务,修改积分CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {updatePoints(userId); // 2s});// 4、调用订单服务,生成订单(在用户服务和商品服务调用结束后执行)CompletableFuture<Void> completableFuture = future1.thenCombineAsync(future2, (user, goods) -> {createOrderByUserAndGoods(user, goods); // 2sreturn null;});// 5、调用库存服务,修改库存CompletableFuture.runAsync(() -> {updateInventoryByGoodsId(goodsId); //2s});return null;
}