当前位置: 首页 > news >正文

CompletableFuture并行处理任务

CompletableFuture并行处理任务

  • CompletableFuture
    • 基本概念与特性
    • 创建CompletableFuture实例
  • 任务编排方法
  • 线程池选择
    • 默认线程池
    • 自定义线程池
    • 线程池配置建议
  • 代码示例
  • 同步代码

CompletableFuture

基本概念与特性

异步执行: CompletableFuture允许任务在后台线程中异步执行,不会阻塞主线程,从而提高了应用程序的响应性和性能。
可组合性: CompletableFuture的操作可以组合成一个或多个CompletableFuture对象,构成复杂的异步计算链。这包括结果的转换、组合以及异常处理等。
异常处理: 通过exceptionally()等方法,CompletableFuture可以捕获计算中的异常并返回默认值,或者通过handle()等方法同时处理正常结果和异常。
取消与超时: 支持取消异步任务和设置超时时间,避免任务的无限等待。
非阻塞式等待: 提供了非阻塞式的等待方法,如join()和getNow(),可以在不阻塞当前线程的情况下获取任务的结果。
并行处理: 在处理多个耗时操作时,如I/O操作、数据库访问或网络请求,CompletableFuture可以并行执行这些任务,提高系统吞吐量和响应能力。

创建CompletableFuture实例

supplyAsync():用于创建返回结果的异步任务。例如:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {// 执行异步任务并返回结果return "Hello, CompletableFuture!";
});

runAsync():用于创建不返回结果的异步任务。例如:

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {// 执行异步任务System.out.println("Task running asynchronously");
});

任务编排方法

转换类方法:

  • thenApply() / thenApplyAsync():将上一个任务的结果转换为新的结果。thenApply()在同一个线程中执行,而 thenApplyAsync()可能在新的线程中执行。
  • thenAccept() / thenAcceptAsync():处理上一个任务的结果,但不返回新的值。thenAccept()在同一个线程中执行,而thenAcceptAsync()可能在新的线程中执行。
  • thenRun() / thenRunAsync():在上一个任务完成后执行一个操作,不使用上一个任务的结果。

组合类方法:

  • thenCompose() / thenComposeAsync():将两个CompletableFuture组合成一个。当一个任务依赖另一个任务的结果时,可以使用此方法。
  • thenCombine() / thenCombineAsync():组合两个独立任务的结果。需要两个独立任务的结果进行计算时,可以使用此方法。

多任务协调方法:

  • thenCompose() / thenComposeAsync():将两个CompletableFuture组合成一个。当一个任务依赖另一个任务的结果时,可以使用此方法。
  • thenCombine() / thenCombineAsync():组合两个独立任务的结果。需要两个独立任务的结果进行计算时,可以使用此方法。

异常处理机制:

  • exceptionally():处理异常并提供默认值。当CompletableFuture中的任务抛出异常时,可以捕获该异常并返回一个默认值。
  • handle() / handleAsync():处理正常结果和异常。无论任务是否成功完成,都可以使用此方法处理结果或异常。
  • whenComplete() / whenCompleteAsync():任务完成时的回调(正常或异常)。可以在任务完成后执行一些清理工作或记录日志等。

线程池选择

默认线程池

如果未显式指定线程池,CompletableFuture 默认使用 ForkJoinPool.commonPool()。
适用场景:

  • 计算密集型任务(如复杂数学计算、数据处理)。
  • 短期任务且任务量较小。

特点:

  • 线程池大小为 Runtime.getRunTime().availableProcessors()-1(如 4 核 CPU 默认 3 线程)。
  • 所有 CompletableFuture 共享同一个公共池,可能导致资源竞争。

自定义线程池

通过显式传递 Executor,可以更精细地控制线程池行为。
适用场景:

  • I/O 密集型任务(如网络请求、数据库查询)。
  • 需要隔离任务类型(避免公共池资源耗尽)。
  • 长期运行或阻塞任务。
// 创建自定义线程池
ExecutorService customExecutor = new ThreadPoolExecutor(10,                        // 核心线程数50,                        // 最大线程数60L,                       // 空闲线程存活时间TimeUnit.SECONDS,new LinkedBlockingQueue<>(1000), // 有界队列(容量 1000)new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);// 使用自定义线程池
CompletableFuture.supplyAsync(() -> {return queryDataBase();
}, customExecutor);

线程池配置建议

参数计算密集型任务IO密集型任务
核心线程数N+12N+1
最大线程数等于核心线程数较高(如 100~200)
队列类型同步队列(SynchronousQueue)有界队列(如 LinkedBlockingQueue)
拒绝策略抛异常(AbortPolicy)级或重试(如 CallerRunsPolicy)

队列与拒绝策略

  • 有界队列:防止任务无限堆积导致内存溢出(OOM)。
  • 拒绝策略:
    • CallerRunsPolicy:主线程执行被拒绝的任务(避免任务丢失)。
    • DiscardOldestPolicy:丢弃队列中最旧的任务(适合实时性要求高的场景)。

代码示例

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;public class ApplicationTest {public static void main(String[] args) {long start = System.currentTimeMillis();System.out.println("================ begin ===================");// 1. 创建异步任务列表List<CompletableFuture<List<String>>> futures = new ArrayList<>();// 2. 添加异步任务(并行查询不同数据库)futures.add(CompletableFuture.supplyAsync(() -> queryDatabase("DB1")));futures.add(CompletableFuture.supplyAsync(() -> queryDatabase("DB2")));futures.add(CompletableFuture.supplyAsync(() -> queryDatabase("DB3")));// 3. 等待所有任务完成,并合并结果CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();// 4. 处理最终结果List<String> resultList = new ArrayList<>();for (CompletableFuture<List<String>> future : futures) {try {List<String> oneResult = future.get();resultList.addAll(oneResult);} catch (Exception e) {System.out.println("批量查询异常 : " + e);}}//5. 阻塞获取最终结果(实际中应避免在主线程阻塞)System.out.println(System.currentTimeMillis() - start);System.out.println("合并后的结果数量: " + resultList.size());System.out.println(resultList);System.out.println("================ end ===================");// 或者// 等待所有任务完成,并合并结果CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));// 最终处理结果List<String> mergedResults = allFutures.thenApply(v ->futures.stream().flatMap(future -> future.join().stream()).collect(Collectors.toList())).join();System.out.println(mergedResults);}// 模拟数据库查询方法private static List<String> queryDatabase(String source) {// 模拟耗时操作try { Thread.sleep(2000); } catch (InterruptedException ignored) {}return Arrays.asList(source + "-Material1", source + "-Material2");}}
import java.util.*;
import java.util.concurrent.*;public class ApplicationTest {public static void main(String[] args) {System.out.println("==================== begin ====================");long start = System.currentTimeMillis();CompletableFuture<List<String>> future1 = CompletableFuture.supplyAsync(() -> {return queryStrDatabase("Str");});System.out.println("==================== running ====================");CompletableFuture<List<Integer>> future2 = CompletableFuture.supplyAsync(() -> {return queryIntDatabase();});List<String> strList = null;try {strList = future1.get();} catch (InterruptedException | ExecutionException e) {System.out.println("Str is fail");}List<Integer> intList = null;try {intList = future2.get();} catch (InterruptedException | ExecutionException e) {System.out.println("Int is fail");}System.out.println(strList);System.out.println(intList);System.out.println(System.currentTimeMillis() - start);System.out.println("==================== end ====================");}// 模拟数据库查询方法private static List<String> queryStrDatabase(String source) {// 模拟耗时操作try { Thread.sleep(2000); } catch (InterruptedException ignored) {}return Arrays.asList(source + "-Material1", source + "-Material2", source + "-Material3");}// 模拟数据库查询方法private static List<Integer> queryIntDatabase() {// 模拟耗时操作try { Thread.sleep(2000); } catch (InterruptedException ignored) {}return Arrays.asList(11, 21, 31);}}

同步代码

所谓的同步代码,也就是从我们接受到请求直到请求返回都是由一个线程处理的,如果处理代码中有阻塞那么这个时候此线程就会阻塞,在请求量比较大的情况下,也就是并发场景,这个时候会有很多的请求发过来,那么tomcat只有两百的线程,如果线程阻塞时间较长,那么tomcat的线程会被全部阻塞,导致无法处理外部请求,进而系统的吞吐量就会很低。
在这里我举个实际生活中的场景,如果你要下单,那么需要调用 用户服务(查询用户信息)—>商品服务(查询商品信息)—>积分服务(修改积分)—>订单服务(生成订单)—>库存服务(减库存)。要完成这一个任务时,需要先完成任务1,再完成任务2,直到完成任务n。那么所消耗的时间就是 :time > 任务1 + 任务2 + … + 任务n。

伪代码模拟:

JSONPObject createOrder(Integer userId,Integer goodsId){// 1、调用用户服务,获取用户信息User user = getUserById(userId); // 2s// 2、调用商品服务,获取商品详情Goods goods = getGoodsById(goodsId); // 2s// 3、调用积分服务,修改积分updatePoints(userId);  // 2s// 4、调用订单服务,生成订单createOrderByUserAndGoods(user,goods); // 2s// 5、调用库存服务,修改库存updateInventoryByGoodsId(goodsId);  //2sreturn null;
}

对于这个场景,我们可以开启了四个线程处理,在这里我将订单服务放到了用户服务和商品服务完成之后处理,这里和你的系统设计有关系,也可以和其他服务同时并发处理,那么经过这次优化后,处理时间 time > 前四个服务中最长的 + 订单服务,这样既完成了代码串行问题的优化。
伪代码模拟:

JSONPObject createOrder2(Integer userId, Integer goodsId) {// 1、调用用户服务,获取用户信息CompletableFuture<User> future1 = CompletableFuture.supplyAsync(() -> {// 2sreturn getUserById(userId);});// 2、调用商品服务,获取商品详情CompletableFuture<Goods> future2 = CompletableFuture.supplyAsync(() -> {return getGoodsById(goodsId); // 2s});// 3、调用积分服务,修改积分CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {updatePoints(userId);  // 2s});// 4、调用订单服务,生成订单(在用户服务和商品服务调用结束后执行)CompletableFuture<Void> completableFuture = future1.thenCombineAsync(future2, (user, goods) -> {createOrderByUserAndGoods(user, goods); // 2sreturn null;});// 5、调用库存服务,修改库存CompletableFuture.runAsync(() -> {updateInventoryByGoodsId(goodsId);  //2s});return null;
}

相关文章:

  • 《MySQL:MySQL表的基本查询操作CRUD》
  • ros2 humble moveit调试笔记
  • docker基本命令1
  • Day-1 漏洞攻击实战
  • QT:Qt5 串口模块 (QSerialPort) 在 VS2015 中正确关闭串口避免被占用
  • 推荐系统/业务,相关知识/概念1
  • Sentinel源码—7.参数限流和注解的实现一
  • 如何在白平衡标定种构建不同类型的白平衡色温坐标系
  • 基于语义网络表示的不确定性推理
  • 从 0 到 1 转型 AI:突破技术壁垒的 5 大核心策略与实战路径
  • RK3588上编译opencv 及基于c++实现图像的读入
  • Java写数据结构:栈
  • Nebula图数据库
  • 富诺健康旗下运动营养品牌力爆(LIPOW):以冠军精神定义运动营养新时代
  • 论文分享:【2024 CVPR】Vision-and-Language Navigation via Causal Learning
  • NLTK 基础入门:用 Python 解锁自然语言处理
  • Redis 的单线程模型对微服务意味着什么?需要注意哪些潜在瓶颈?
  • Ansys-FLUENT-笔记1
  • yum如果备份已经安装的软件?
  • OpenCV day7
  • 世界史圆桌|16-18世纪的跨太平洋贸易
  • 30小时已过,俄罗斯复活节停火不再延长
  • 马拉松夺冠机器人将小批量量产:价格与一台入门级小轿车差不多
  • 梅德韦杰夫:如果欧盟和美国 “撒手不管”,俄罗斯会更快解决俄乌冲突
  • 如何应对国际贸易形势变化?长三角四省市主要领导密集部署
  • 景临已任四川省工商联党组书记