Java并发:线程池
目录
一、核心概念与设计原理
1、线程池的核心价值
2、核心接口和类
3、线程池的核心构造参数
4、线程池工作流程
二、参数选择
1、任务队列选择
2、拒绝策略选择
3、常见线程池选择
4、参数调优
三、 应用
1、创建建议
2、生命周期管理:优雅关闭
3、监控
4、常见问题
5、使用案例
5.1 Web服务器请求处理(高并发场景)
5.2 批量数据并行处理(分片任务)
5.3 异步任务执行(带回调)
5.4 定时任务调度
5.5 资源隔离(多级线程池)
5.6 线程池监控(自定义扩展)
一、核心概念与设计原理
1、线程池的核心价值
资源服用:避免频繁创建/销毁线程的开销,通过池化技术管理线程生命周期
流量削峰:任务队列(如LinkedBlockingQueue
)缓冲突发请求,平滑处理高并发场景
精细控制:通过核心线程数、最大线程数、队列容量等参数实现资源精细化管理
2、核心接口和类
-
Executor:执行任务的根接口(
void execute(Runnable command)
) -
ExecutorService:扩展Executor接口,增加生命周期管理
-
ThreadPoolExecutor:标准线程池实现类
-
ScheduledExecutorService:支持定时/周期性任务
3、线程池的核心构造参数
参数 | 说明 | 推荐设置原则 |
---|---|---|
corePoolSize | 核心线程数(常驻线程) | CPU密集型:N+1;IO密集型:2N+1 |
maximumPoolSize | 最大线程数 | 根据业务峰值设置(建议不超过100) |
keepAliveTime | 非核心线程空闲存活时间 | 30-60秒 |
unit | 存活时间单位 | TimeUnit.SECONDS |
workQueue | 任务队列(缓冲机制) | 根据场景选择队列类型 |
threadFactory | 线程工厂(自定义线程创建) | 建议自定义命名线程 |
handler | 拒绝策略(饱和处理机制) | 根据业务容忍度选择 |
4、线程池工作流程
二、参数选择
1、任务队列选择
队列类型 | 特性 | 适用场景 |
---|---|---|
SynchronousQueue | 不存储元素,直接传递任务 | CachedThreadPool(零容量缓冲) |
LinkedBlockingQueue | 无界队列(默认容量Integer.MAX_VALUE) | FixedThreadPool(可能OOM) |
ArrayBlockingQueue | 有界队列(固定容量) | 需要控制队列大小的场景 |
PriorityBlockingQueue | 优先级队列(任务需实现Comparable) | 任务优先级调度 |
DelayedWorkQueue | 延迟队列(用于ScheduledThreadPool) | 定时任务调度 |
队列选择:根据业务特点选择队列类型
-
快速响应:SynchronousQueue(配合合理的最大线程数)
-
突发流量:LinkedBlockingQueue(需设置合理容量)
-
优先级处理:PriorityBlockingQueue
-
防止内存溢出:
ArrayBlockingQueue(
有界队列,需合理设置容量)
2、拒绝策略选择
策略类 | 行为描述 | 适用场景 |
---|---|---|
AbortPolicy | 默认策略,抛出RejectedExecutionException | 严格要求任务不丢失的场景 |
CallerRunsPolicy | 由调用者线程执行被拒绝的任务 | 允许任务降级的场景 |
DiscardPolicy | 静默丢弃被拒绝的任务 | 可容忍任务丢失的非关键业务 |
DiscardOldestPolicy | 丢弃队列最旧的任务,尝试重新提交当前任务 | 需要处理最新任务的场景 |
拒绝策略选择:
-
关键系统:自定义策略(记录日志+持久化任务)
-
可降级服务(关键任务):CallerRunsPolicy
-
非核心业务(允许部分丢失):DiscardOldestPolicy
3、常见线程池选择
类型 | 实现方式 | 特点 | 潜在风险 |
---|---|---|---|
FixedThreadPool | new ThreadPoolExecutor(n, n, 0L, LinkedBlockingQueue) | 固定线程数,无界队列,适合CPU密集型任务(如科学计算) | 队列堆积导致OOM |
CachedThreadPool | new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, SynchronousQueue) | 自动扩容,适合短生命周期IO密集型任务(如HTTP请求) | 线程数爆炸导致资源耗尽 |
SingleThreadExecutor | new ThreadPoolExecutor(1, 1, 0L, LinkedBlockingQueue) | 单线程顺序执行,适合需严格顺序的任务(如日志处理) | 无界队列OOM风险 |
ScheduledThreadPool | new ScheduledThreadPoolExecutor(corePoolSize) | 支持定时/周期性任务(如数据同步),使用DelayedWorkQueue | 复杂调度逻辑可能影响性能 |
4、参数调优
核心线程数选择:
-
CPU密集型:coreSize = CPU核心数 + 1
-
IO密集型:coreSize = CPU核心数 * 2
-
混合型:coreSize = (线程等待时间/线程CPU时间 + 1) * CPU核心数 或 拆分为不同线程池处理(如CPU计算与IO操作分离)
合理设置超时时间:防止死锁/长时间阻塞
预热机制:启动时调用prestartAllCoreThreads()
提前创建核心线程
动态调整:运行时通过setCorePoolSize()
和setMaximumPoolSize()
应对负载波动
三、 应用
1、创建建议
// 推荐手动创建线程池(避免使用Executors)
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, // corePoolSize20, // maximumPoolSize60L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(200), // 有界队列new NamedThreadFactory("app-thread"), // 自定义线程命名new CustomRejectionPolicy() // 自定义拒绝策略
);
线程命名:方便问题排查
public class NamedThreadFactory implements ThreadFactory {private final String namePrefix;private final AtomicInteger counter = new AtomicInteger(1);public NamedThreadFactory(String namePrefix) {this.namePrefix = namePrefix + "-";}@Overridepublic Thread newThread(Runnable r) {return new Thread(r, namePrefix + counter.getAndIncrement());}
}
2、生命周期管理:优雅关闭
方法 | 说明 |
---|---|
shutdown() | 平缓关闭:停止接受新任务,等待已提交任务完成 |
shutdownNow() | 立即关闭:尝试停止所有正在执行的任务,返回等待任务列表 |
awaitTermination() | 阻塞直到所有任务完成或超时 |
// 优雅关闭示例
void shutdownGracefully(ExecutorService executor, long timeout) {executor.shutdown();try {if (!executor.awaitTermination(timeout, TimeUnit.SECONDS)) {executor.shutdownNow();}} catch (InterruptedException e) {executor.shutdownNow();Thread.currentThread().interrupt();}
}
3、监控
监控指标:
方法 | 说明 |
---|---|
getActiveCount() | 获取活动线程数 |
getCompletedTaskCount() | 已完成任务数 |
getQueue().size() | 队列中等待任务数 |
getPoolSize() | 当前线程池大小 |
扩展监控:
public class MonitorThreadPool extends ThreadPoolExecutor {@Overrideprotected void beforeExecute(Thread t, Runnable r) {super.beforeExecute(t, r);// 记录任务开始时间}@Overrideprotected void afterExecute(Runnable r, Throwable t) {super.afterExecute(r, t);// 计算任务耗时,记录异常}
}
4、常见问题
1> 线程泄漏
-
症状:线程数持续增长不释放
-
检查点:未正确关闭线程池/任务中存在死循环
-
解决:使用
shutdown()
/shutdownNow()
关闭
2> 任务堆积
-
症状:队列大小持续增长
-
解决方案:优化任务处理速度/扩容线程池/增加队列容量
3> CPU利用率低
-
可能原因:IO阻塞严重/线程数设置不足/任务划分不合理
5、使用案例
5.1 Web服务器请求处理(高并发场景)
// 创建HTTP请求处理线程池
ThreadPoolExecutor serverExecutor = new ThreadPoolExecutor(8, // 核心线程数(根据CPU核心数调整)32, // 最大线程数(根据压测结果调整)60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(1000), // 有界队列防止OOMnew NamedThreadFactory("http-handler"),new ThreadPoolExecutor.AbortPolicy() // 拒绝时抛出异常
);// 模拟请求处理
void handleHttpRequest(Request request) {serverExecutor.execute(() -> {try {// 处理请求业务逻辑processRequest(request);} catch (Exception e) {log.error("Request processing failed", e);} finally {// 资源清理cleanUpResources();}});
}
5.2 批量数据并行处理(分片任务)
// 创建数据处理线程池
ExecutorService batchExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2
);// 分片处理10000条数据
List<Data> dataList = fetchDataFromDB(10000);
int batchSize = 1000;List<Future<Integer>> futures = new ArrayList<>();
for (int i = 0; i < dataList.size(); i += batchSize) {int end = Math.min(i + batchSize, dataList.size());List<Data> batch = dataList.subList(i, end);futures.add(batchExecutor.submit(() -> {int count = 0;for (Data data : batch) {if (processData(data)) count++;}return count;}));
}// 统计处理结果
int totalSuccess = 0;
for (Future<Integer> future : futures) {totalSuccess += future.get();
}batchExecutor.shutdown();
5.3 异步任务执行(带回调)
// 创建异步任务线程池
ThreadPoolExecutor asyncExecutor = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS,new SynchronousQueue<>(),new NamedThreadFactory("async-task")
);// 提交任务并处理结果
CompletableFuture.supplyAsync(() -> {// 执行耗时操作return generateReport();
}, asyncExecutor).whenComplete((result, ex) -> {if (ex != null) {handleError(ex);} else {sendReport(result);}
});
5.4 定时任务调度
// 创建定时任务线程池
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);// 每天凌晨执行数据清理
scheduler.scheduleAtFixedRate(() -> {cleanUpExpiredData();
}, initialDelay, 24 * 60 * 60, TimeUnit.SECONDS);// 每5分钟执行状态检查
scheduler.scheduleWithFixedDelay(() -> {checkSystemHealth();
}, 0, 5, TimeUnit.MINUTES);
5.5 资源隔离(多级线程池)
// 关键业务独立线程池
ThreadPoolExecutor orderExecutor = new ThreadPoolExecutor(4, 8, 60, TimeUnit.SECONDS,new ArrayBlockingQueue<>(100),new NamedThreadFactory("order-service")
);// 普通业务线程池
ThreadPoolExecutor commonExecutor = new ThreadPoolExecutor(8, 32, 60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(1000),new NamedThreadFactory("common-service")
);// 订单处理(高优先级)
void processOrder(Order order) {orderExecutor.execute(() -> {validateOrder(order);deductInventory(order);createPayment(order);});
}// 日志记录(低优先级)
void logActivity(Activity activity) {commonExecutor.execute(() -> {saveToLogDB(activity);});
}
5.6 线程池监控(自定义扩展)
public class MonitoredThreadPool extends ThreadPoolExecutor {private final Counter taskCounter = new Counter();public MonitoredThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);}@Overrideprotected void beforeExecute(Thread t, Runnable r) {super.beforeExecute(t, r);taskCounter.increment();MetricRegistry.recordThreadActive(getActiveCount());}@Overrideprotected void afterExecute(Runnable r, Throwable t) {super.afterExecute(r, t);taskCounter.decrement();if (t != null) {MetricRegistry.recordError();}}public int getCurrentTaskCount() {return taskCounter.get();}
}