当前位置: 首页 > news >正文

java线程池原理及使用和处理流程

实际测试使用如下:

package com.study;import java.util.concurrent.*;/*** 线程池作用:* 1、线程的复用* 2、资源管理* 3、任务调度* --------------执行过程--------------* 第1-3个任务进来时,直接创建任务并执行* 第4-8个任务进来时,会把新任务放到队列,然后按照顺序执行队列中的任务,新的任务在队列最后* 第9-15个任务进来时,会先执行队列中已有的任务,再执行新的任务* 第16个任务进来时,会执行拒绝策略** @author admin* @since 2025-04-18 15:48*/
public class ThreadPoolDemo {// 核心线程数private static final int CORE_POOL_SIZE = 3;// 最大线程数private static final int MAX_POOL_SIZE = 10;// 空闲线程存活时间private static final long KEEP_ALIVE_TIME = 5;// 时间单位private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;// 队列容量private static final int QUEUE_SIZE = 10;/*** 任务队列* 1、ArrayBlockingQueue 有界队列,通用队列,线程池的默认队列* 2、LinkedBlockingQueue 无界队列,默认大小为Integer.MAX_VALUE* 3、SynchronousQueue 无容量队列,不存储任务,直接提交给线程池处理* 4、PriorityBlockingQueue 优先级队列,线程池的默认队列* 5、DelayQueue 延迟队列*/static ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(QUEUE_SIZE);/*** 拒绝策略* 1、abortPolicy 直接抛出异常(默认)* 2、discardPolicy 直接丢弃任务* 3、discardOldestPolicy 踢出队列中最老的任务,再次提交当前任务* 4、callerRunsPolicy 由提交任务的线程来执行任务*/static final RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();/*** 线程工厂*/static final ThreadFactory threadFactory = new CustomThreadFactory(Thread.NORM_PRIORITY, false);/*** 传统线程池创建* 场景:不同业务线需隔离资源(如支付交易与普通查询互不影响)*/private static final ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TIME_UNIT, queue, threadFactory, handler);/*** 1、固定-线程池创建* 默认核心线程数和最大线程数相同* 默认空闲线程存活时间为0s* 默认LinkedBlockingQueue无界队列* 默认拒绝策略和默认线程工厂* 场景:短时涌入大量HTTP请求(如电商秒杀、票务系统),需快速响应且避免服务器崩溃*/private static final ExecutorService fixedExecutor = Executors.newFixedThreadPool(3);/*** 2、缓存-线程池创建* 默认核心线程数0,所有线程均为非核心线程* 最大线程数为Integer最大值* 默认空闲线程存活时间60s* 使用SynchronousQueue无容量队列,不存储任务,直接提交给线程池处理* 默认拒绝策略和默认线程工厂* 场景:主线程需快速返回,耗时操作异步执行(如发送邮件、生成报表)*/private static final ExecutorService cachedExecutor = Executors.newCachedThreadPool();/*** 3、任务-线程池创建* 最大线程数为Integer最大值* 默认空闲线程存活时间0s* 默认DelayedWorkQueue高性能队列* 默认拒绝策略和默认线程工厂* 场景:定时执行任务(如每日数据备份、定期推送消息)*/private static final ExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);/*** 4、单线程-线程池创建* 核心线程数和最大线程数都为1* 默认空闲线程存活时间0s* 默认LinkedBlockingQueue无界队列* 默认拒绝策略和默认线程工厂* 场景:跨服务任务调度(如分布式锁续期、集群状态同步)*/private static final ExecutorService singleExecutor = Executors.newSingleThreadExecutor();/*** 5、并行-线程池创建* 并行级别(默认 CPU 核心数)* 场景:多核CPU任务并行执行(如并行计算、并行处理),海量数据分片并行处理(如日志分析、图像渲染)、递归任务分解*/private static final ExecutorService workExecutor = Executors.newWorkStealingPool();/*** 测试*/public static void main(String[] args) throws InterruptedException {String type = "fixed";String returnType = "test";ExecutorService executorService = getExecutorService(type);for (int i = 0; i < 10; i++) {executeTask(executorService, i, returnType);}// 等待任务执行完成关闭线程池executorService.shutdown();// 添加等待终止逻辑(确保任务完成)if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {System.err.println("线程池未在限定时间内关闭");}}/*** 线程池执行任务*/private static void executeTask(ExecutorService executorService, int taskId, String returnType) {// 根据返回值判断是否执行成功if (("wait").equals(returnType)) {Future<String> future = getFuture(executorService, taskId);// 方法1:最大等待30sgetFutureResultWait(future, taskId);} else if (("while").equals(returnType)) {FutureTask<String> future = getFutureTask(executorService, taskId);// 方法2:轮询判断是否执行完成getFutureResultWhile(future, taskId);} else {// 方法3:异步获取返回值CompletableFuture<Integer> futureWithExecutor = CompletableFuture.supplyAsync(() -> {System.out.println(taskId + "\t" + Thread.currentThread().getName());return taskId;}, executorService);// 添加上述异步回调处理futureWithExecutor.whenComplete((result, ex) -> {if (ex != null) {System.err.println("Task " + taskId + " failed: " + ex.getMessage());} else {System.out.println("Task " + taskId + " completed: " + result);}});}}/*** 提交异步任务并返回Future对象** @param executorService 线程池执行器,用于提交异步任务* @param taskId          任务标识符,用于日志跟踪* @return Future<String> 表示异步计算的结果对象*/private static Future<String> getFuture(ExecutorService executorService, int taskId) {return executorService.submit(() -> {// 任务执行逻辑:打印线程信息并模拟耗时操作System.out.println(taskId + "\t" + Thread.currentThread().getName());try {Thread.sleep(1000);return "OK";} catch (InterruptedException e) {// 正确的中断处理:恢复中断状态并记录日志Thread.currentThread().interrupt();System.out.println("Task interrupted: " + taskId);return "ERROR";}});}/*** 提交异步任务并返回FutureTask对象** @param executorService 线程池执行器,用于提交异步任务* @param taskId          任务标识符,用于日志跟踪* @return FutureTask<String> 表示异步计算的结果对象*/private static FutureTask<String> getFutureTask(ExecutorService executorService, int taskId) {// 创建Callable任务Callable<String> task = () -> {// 任务执行逻辑:打印线程信息并模拟耗时操作System.out.println(taskId + "\t" + Thread.currentThread().getName());try {Thread.sleep(1000);return "OK";} catch (InterruptedException e) {// 正确的中断处理:恢复中断状态并记录日志Thread.currentThread().interrupt();System.out.println("Task interrupted: " + taskId);return "ERROR";}};/** 创建FutureTask实例以包装异步任务* @param task 需要执行的Callable/Runnable任务对象* FutureTask兼具Runnable和Future的特性:* 1. 可作为Runnable被线程池执行* 2. 通过Future接口方法获取计算结果*/FutureTask<String> futureTask = new FutureTask<>(task);/** 将FutureTask提交到线程池执行* @param futureTask 包装了任务的可执行对象* 提交后会立即返回,实际执行由线程池调度* 后续可通过futureTask.get()阻塞获取计算结果* 或通过futureTask.cancel()取消任务*/executorService.submit(futureTask);return futureTask;}/*** 阻塞等待Future结果并处理超时** @param future 异步任务Future对象* @param taskId 任务标识符,用于异常日志*/private static void getFutureResultWait(Future<String> future, int taskId) {try {// 设置最长等待时间为10秒String result = future.get(5, TimeUnit.SECONDS);System.out.println("Task result: " + result);} catch (InterruptedException | ExecutionException e) {// 中断处理:恢复中断状态并记录日志Thread.currentThread().interrupt();System.out.println("Task interrupted: " + taskId);} catch (TimeoutException e) {// 超时处理:主动取消任务并记录日志future.cancel(true);System.out.println("Task timeout: " + taskId);}}/*** 轮询检查任务完成状态** @param future 异步任务Future对象* @param taskId 任务标识符,用于中断日志*/private static void getFutureResultWhile(Future<String> future, int taskId) {String result = "ERROR";// 轮询机制检查任务状态while (!future.isDone()) {System.out.println("任务仍在执行中...");try {// 降低轮询频率以避免CPU过载Thread.sleep(500);result = "OK";} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println("Task interrupted: " + taskId);// 立即退出循环break;}}System.out.println("Task result: " + result);}/*** 获取不同类型的线程池*/private static ExecutorService getExecutorService(String type) {switch (type) {case "fixed":return fixedExecutor;case "cached":return cachedExecutor;case "scheduled":return scheduledExecutor;case "single":return singleExecutor;case "work":return workExecutor;default:return poolExecutor;}}
}

自定义线程工厂

package com.study;import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;/*** 自定义线程工厂** @author admin* @since 2025-04-18 17:29*/
public class CustomThreadFactory implements ThreadFactory {// 编号从1开始private final AtomicInteger number = new AtomicInteger(1);private static final String NAME_PREFIX = "pool-thread-";// 线程优先级private final int priority;// 是否守护线程private final boolean daemon;/*** 默认构造方法,创建普通优先级、非守护线程*/public CustomThreadFactory() {this(Thread.NORM_PRIORITY, false);}/*** 构造方法,自定义优先级和是否守护线程** @param priority 优先级* @param daemon   是否守护线程*/public CustomThreadFactory(int priority, boolean daemon) {if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {throw new IllegalArgumentException("优先级超出范围: " + priority);}this.priority = priority;this.daemon = daemon;}/*** 创建线程** @param r 线程任务* @return 线程对象*/@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName(NAME_PREFIX + number.getAndIncrement());thread.setPriority(priority);thread.setDaemon(daemon);thread.setUncaughtExceptionHandler((t, e) -> {System.err.println("线程 " + t.getName() + " 发生异常: " + e);e.printStackTrace();});return thread;}
}

相关文章:

  • 【AI部署】腾讯云GPU-常见故障—SadTalker的AI数字人视频—未来之窗超算中心 tb-lightly
  • ESP8266简单介绍
  • cpolar 内网穿透 实现公网可以访问本机
  • 智能体时代的产业范式确立,中国企业以探索者姿态走出自己的路
  • 【卡洛诗】成为平价市场中的标杆西餐厅
  • 守护进程编程、GDB调试以及外网连接树莓派
  • 关于@Scheduled注解的参数
  • NOIP2015提高组.信息传递
  • 工厂方法模式详解及在自动驾驶场景代码示例(c++代码实现)
  • AI象棋 3.0 |AI自动下象棋工具,破译残局,自动帮助下棋,内置视频教程
  • `peft`(Parameter-Efficient Fine-Tuning:高效微调)是什么
  • 跟我学C++中级篇——内存异常的分析
  • 自注意力机制self-attention
  • C++智能指针的知识!
  • 【HDFS入门】联邦机制(Federation)与扩展性:HDFS NameNode水平扩展深度解析
  • Ubuntu命令速查
  • Linux 文件查找终极指南:find, locate, grep 等命令详解
  • 原型模式详解及在自动驾驶场景代码示例(c++代码实现)
  • Flash存储器(一):接口标准全解析
  • z-library电子图书馆最新地址的查询方法
  • 西安旅游:2024年营业收入约5.82亿元,同比增长5.88%
  • 新闻1+1丨全球首场人机共跑马拉松,有何看点?
  • 季度市场叙事|时间已不在美国那边
  • 纪念沈渭滨︱初五沈大大  浓浓师生情
  • 上海浦东打造全新开放平台,年内实现基本功能落地运行
  • 碳访|储能行业将迎市场化考验,宁德时代:我们希望“卷价值”