概述:在java中,线程池ThreadPoolExecutor是一个绕不过去的类,它是享元模式思想的体现,通过在容器中创建一定数量的线程加以重复利用,从而避免频繁创建线程带来额外的开销。一个设置合理的线程可以提高任务响应速度,并且避免线程数超过硬件能力带来的意外情况。


一、初识线程池
https://mp.weixin.qq.com/s/HZcry6_5IHbD_udd6CzNig
1、构造方法
java线程池提供了四个关于ThreadPoolExecutor类的构造方法,下面的例子以最全的构造方法进行描述各个参数的作用:
Public ThreadPoolExecutor(int corePoolSize, //核心线程数 int maximumPoolSize, //最大线程数 long keepAliveTime, //非核心线程闲置存活时间 TimeUint uint,//时间单位 BlockingQueue<Runable> workQueue, //工作队列 ThreadFactory threadFactory, //创建线程使用的线程工厂 RejectedExecutionHandler handler //拒绝策略){}
- 核心线程数:即长期存在的线程数,但线程池中运行线程未达到核心线程数时会优先创建新线程;
- 最大线程数:当核心线程已满,工作队列已满,同时线程池中线程总数未超过最大线程数,会创建非核心线程数;
- 非核心线程闲置存活时间:非核心线程闲置时的最大存活时间;
- 时间单位:非核心线程闲置存活时间的时间单位;
- 任务队列:当核心线程满后,任务会优先加入工作队列,等待核心线程消费;
- 线程工厂:线程池创建新线程时使用的工厂;
- 拒绝策略:当工作队列和线程池都满时,用以执行的策略;
2、线程池状态
线程池拥有一个AtomicInteger类型成员变量ctl,通过位运算分别使用ctl的高低位以便在一个值中存储线程数量以及线程池状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0)); //29(32-3) private static final int COUNT_BITS = Integer.SIZE - 3; //允许的最大工作线程(2^29-1 约5亿) private static final int CAPACITY = (1 << COUNT_BITS)-1; // 运行状态。线程池接受并处理新任务 private static final int RUNNING = -1 << COUNT_BITS; // 关闭状态。线程池不能接受新任务,处理完剩余任务后关闭。调用shutdown()方法会进入该状态。 private static final int SHUTDOWN = 0 << COUNT_BITS; // 停止状态。线程池不能接受新任务,并且尝试中断旧任务。调用shutdownNow()方法会进入该状态。 private static final int STOP = 1 << COUNT_BITS; // 整理状态。由关闭状态转变,线程池任务队列为空时进入该状态,会调用terminated()方法。 private static final int TIDYING = 2 << COUNT_BITS; // 终止状态。terminated()方法执行完毕后进入该状态,线程池彻底停止。 private static final int TERMINATED = 3 << COUNT_BITS;
3、总结
- 线程池提供了四个构造方法,参数最全的构造方法参数按顺序有:核心线程数,最大线程数,非核心线程闲置存活时间,存活时间单位,任务队列,线程工厂,拒绝策略。
- 线程池共有五种状态,分别是:RUNNING,SHUTDOWN,STOP,TYDYING,TERMINATED,它们与工作线程数量一同记录在成员变量 ctl 中,其中高 3 位用于记录状态,低 29 位用于记录工作线程数,实际使用中通过位运算去获取。
- 线程池中任务线程以继承了 AQS 的 Worker 类的实例形式存在。当添加任务时,会有四种情况:核心线程不满,优先创建核心线程;核心线程满,优先添加任务队列;核心线程与队列都满,创建非核心线程;线程和队列都满,则执行拒绝策略。
- 其中,拒绝策略分为四类,默认的拒绝策略 AbortPolicy;调用者运行策略 CallerRunsPolicy;弃老策略 DiscardOldestPolicy;丢弃策略 DiscardPolicy。
- 线程池的中断有两个方法:
shutdown()与shutdownNow(),两者都会让线程池不再接受新任务,但是shutdown()会等待当前与任务队列中的任务执行完毕,而shutdownNow()会直接中断当前任务,忽略并删除任务队列中的任务。- 线程池提供了
beforeExecute(),afterExecute(),terminated()三个钩子函数,其中,afterExecute()的入参含有抛出的异常,因此可以借由该方法处理线程池中线程抛出的异常。
二、原理分析
https://mp.weixin.qq.com/s/pwxDLs8TXMEMSVYWq73Ttw
https://www.cnblogs.com/throwable/p/13574306.html
https://xie.infoq.cn/article/d0120c6e1518cec1da65cd31f
三、Executor使用
package com.ljh.listen.thread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
/**
* uncaughtExceptionHandler 处理非正常的线程中止
*/
public class MyExecutors {
/**
* 线程捕捉异常
*/
public static void thread(Runnable r){
Thread thread = new Thread(r);
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.err.printf("线程名:%s,异常:%s",t,e);
}
});
thread.start();
}
/**
* 线程池捕捉异常
* 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
* (t,e)->表示重写了:new Thread.UncaughtExceptionHandler() {}的方法uncaughtException
*/
public static ExecutorService fixedThreadPool(){
return Executors.newFixedThreadPool(1,r->{
Thread th = new Thread(r);
th.setUncaughtExceptionHandler((t,e)->System.err.printf("线程名:%s,异常:%s",t,e));
return th;
});
}
/**
* 线程池捕捉异常
* l创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
*/
public static ExecutorService cachedThreadPool(){
return Executors.newCachedThreadPool(r->{
Thread th = new Thread(r);
th.setUncaughtExceptionHandler((t,e)->System.err.printf("线程名:%s,异常:%s",t,e));
return th;
});
}
/**
* 线程池捕捉异常
* 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
*/
public static ExecutorService singleThreadExecutor(){
return Executors.newSingleThreadExecutor(r->{
Thread th = new Thread(r);
th.setUncaughtExceptionHandler((t,e)->System.err.printf("线程名:%s,异常:%s",t,e));
return th;
});
}
/**
* 线程池捕捉异常
* 创建一个定长线程池,支持定时及周期性任务执行
*/
public static ScheduledExecutorService scheduledThreadPool(){
return Executors.newScheduledThreadPool(1,r->{
Thread th = new Thread(r);
th.setUncaughtExceptionHandler((t,e)->System.err.printf("线程名:%s,异常:%s",t,e));
return th;
});
}
}
四、创建一个线程池
自定义线程工厂
import java.util.concurrent.ThreadFactory;
/**
* 自定义线程工厂
* uncaughtExceptionHandler 捕获运行中出现的异常
*/
public class MyThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setUncaughtExceptionHandler((t, e) -> {
throw new RuntimeException(String.format("线程:%s 发生异常",t.getName()),e);
});
return thread;
}
}
实现RejectedExecutionHandler 接口重写线程池饱和的4种拒绝策略
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 自定义线程池饱和拒绝策略(4种)
*/
public class ThreadRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
}
/**
* 1、调用者线程执行策略,即调用者执行被拒绝的run方法
*/
public static class CallerRunsPolocy extends ThreadRejectedExecutionHandler{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//如果线程池关闭,则直接丢弃任务
if(!executor.isShutdown())r.run();
}
}
/**
* 2、终止策略,直接拒绝,抛出拒绝执行异常
*/
public static class AbortPolocy extends ThreadRejectedExecutionHandler{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RejectedExecutionException("请求任务:"+r.toString()+" ,线程池负载过高执行饱和终止策略!");
}
}
/**
* 3、丢弃策略,直接拒绝,什么都不做
*/
public static class DiscardPolicy extends ThreadRejectedExecutionHandler{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
}
}
/**
* 4、弃老策略,丢弃最早放入阻塞队列中的线程,并尝试将决绝任务加入阻塞队列
*/
public static class DiscardOldestPolicy extends ThreadRejectedExecutionHandler{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//如果线程池关闭,则直接丢弃任务
if(!executor.isShutdown()){
executor.getQueue().poll();
executor.execute(r);
}
}
}
}
一个单列的线程池工具类
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
/**
* @Desc 单例的线程池工具类
*/
public class ThreadPoolUtils {
/**
* 系统可用计算资源
*/
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
/**
* 核心线程数
*/
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
/**
* 最大线程数
*/
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
/**
* 空闲线程存活时间
*/
private static final int KEEP_ALIVE_SECONDS = 30;
/**
* 工作队列
*/
private static final BlockingQueue<Runnable> POOL_WORK_QUEUE = new LinkedBlockingQueue<>(128);
/**
* 工厂模式
*/
private static final MyThreadFactory MY_THREAD_FACTORY = new MyThreadFactory();
/**
* 饱和策略
*/
private static final ThreadRejectedExecutionHandler THREAD_REJECTED_EXECUTION_HANDLER = new ThreadRejectedExecutionHandler.CallerRunsPolocy();
/**
* 线程池对象
*/
private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR;
/**
* 声明式定义线程池工具类对象静态变量,在所有线程中同步
*/
private static volatile ThreadPoolUtils threadPoolUtils = null;
/**
* 初始化线程池静态代码块
*/
static {
THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
//核心线程数
CORE_POOL_SIZE,
//最大线程数
MAXIMUM_POOL_SIZE,
//空闲线程执行时间
KEEP_ALIVE_SECONDS,
//空闲线程执行时间单位
TimeUnit.SECONDS,
//工作队列(或阻塞队列)
POOL_WORK_QUEUE,
//工厂模式
MY_THREAD_FACTORY,
//饱和策略
THREAD_REJECTED_EXECUTION_HANDLER
);
// Arrays.stream(new int[]{THREAD_POOL_EXECUTOR.getCorePoolSize(),THREAD_POOL_EXECUTOR.getMaximumPoolSize(),CPU_COUNT}).forEach(System.err::println);
}
/**
* 线程池工具类空参构造方法
*/
private ThreadPoolUtils() {}
/**
* 获取线程池工具类实例
* @return
*/
public static ThreadPoolUtils getNewInstance(){
if (threadPoolUtils == null) {
synchronized (ThreadPoolUtils.class) {
if (threadPoolUtils == null) {
threadPoolUtils = new ThreadPoolUtils();
}
}
}
return threadPoolUtils;
}
/**
* 执行线程任务
* @param runnable 任务线程
*/
public void executor(Runnable runnable) {
THREAD_POOL_EXECUTOR.execute(runnable);
}
/**
* 获取线程池状态
* @return 返回线程池状态
*/
public boolean isShutDown(){
return THREAD_POOL_EXECUTOR.isShutdown();
}
/**
* 停止正在执行的线程任务
* @return 返回等待执行的任务列表
*/
public List<Runnable> shutDownNow(){
return THREAD_POOL_EXECUTOR.shutdownNow();
}
/**
* 关闭线程池
*/
public void showDown(){
THREAD_POOL_EXECUTOR.shutdown();
}
/**
* 关闭线程池后判断所有任务是否都已完成
* @return
*/
public boolean isTerminated(){
return THREAD_POOL_EXECUTOR.isTerminated();
}
}