线程池


概述:在java中,线程池ThreadPoolExecutor是一个绕不过去的类,它是享元模式思想的体现,通过在容器中创建一定数量的线程加以重复利用,从而避免频繁创建线程带来额外的开销。一个设置合理的线程可以提高任务响应速度,并且避免线程数超过硬件能力带来的意外情况。

image-20210913210535259

image-20210914204758847

一、初识线程池

原文转载

https://mp.weixin.qq.com/s/HZcry6_5IHbD_udd6CzNig

1、构造方法

java线程池提供了四个关于ThreadPoolExecutor类的构造方法,下面的例子以最全的构造方法进行描述各个参数的作用:

Public ThreadPoolExecutor(int corePoolSize, //核心线程数
						int maximumPoolSize, //最大线程数
						long keepAliveTime, //非核心线程闲置存活时间
						TimeUint uint,//时间单位
						BlockingQueue<Runable> workQueue, //工作队列
						ThreadFactory threadFactory, //创建线程使用的线程工厂
						RejectedExecutionHandler handler //拒绝策略){}
  • 核心线程数:即长期存在的线程数,但线程池中运行线程未达到核心线程数时会优先创建新线程;
  • 最大线程数:当核心线程已满,工作队列已满,同时线程池中线程总数未超过最大线程数,会创建非核心线程数;
  • 非核心线程闲置存活时间:非核心线程闲置时的最大存活时间;
  • 时间单位:非核心线程闲置存活时间的时间单位;
  • 任务队列:当核心线程满后,任务会优先加入工作队列,等待核心线程消费;
  • 线程工厂:线程池创建新线程时使用的工厂;
  • 拒绝策略:当工作队列和线程池都满时,用以执行的策略;

2、线程池状态

线程池拥有一个AtomicInteger类型成员变量ctl,通过位运算分别使用ctl的高低位以便在一个值中存储线程数量以及线程池状态

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0));
//29(32-3)
private static final int COUNT_BITS = Integer.SIZE - 3;
//允许的最大工作线程(2^29-1 约5亿)
private static final int CAPACITY = (1 << COUNT_BITS)-1;

// 运行状态。线程池接受并处理新任务
private static final int RUNNING    = -1 << COUNT_BITS;
// 关闭状态。线程池不能接受新任务,处理完剩余任务后关闭。调用shutdown()方法会进入该状态。
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 停止状态。线程池不能接受新任务,并且尝试中断旧任务。调用shutdownNow()方法会进入该状态。
private static final int STOP       =  1 << COUNT_BITS;
// 整理状态。由关闭状态转变,线程池任务队列为空时进入该状态,会调用terminated()方法。
private static final int TIDYING    =  2 << COUNT_BITS;
// 终止状态。terminated()方法执行完毕后进入该状态,线程池彻底停止。
private static final int TERMINATED =  3 << COUNT_BITS;

3、总结

  1. 线程池提供了四个构造方法,参数最全的构造方法参数按顺序有:核心线程数,最大线程数,非核心线程闲置存活时间,存活时间单位,任务队列,线程工厂,拒绝策略。
  2. 线程池共有五种状态,分别是:RUNNING,SHUTDOWN,STOP,TYDYING,TERMINATED,它们与工作线程数量一同记录在成员变量 ctl 中,其中高 3 位用于记录状态,低 29 位用于记录工作线程数,实际使用中通过位运算去获取。
  3. 线程池中任务线程以继承了 AQS 的 Worker 类的实例形式存在。当添加任务时,会有四种情况:核心线程不满,优先创建核心线程;核心线程满,优先添加任务队列;核心线程与队列都满,创建非核心线程;线程和队列都满,则执行拒绝策略。
  4. 其中,拒绝策略分为四类,默认的拒绝策略 AbortPolicy;调用者运行策略 CallerRunsPolicy;弃老策略 DiscardOldestPolicy;丢弃策略 DiscardPolicy。
  5. 线程池的中断有两个方法:shutdown()shutdownNow(),两者都会让线程池不再接受新任务,但是 shutdown()会等待当前与任务队列中的任务执行完毕,而 shutdownNow()会直接中断当前任务,忽略并删除任务队列中的任务。
  6. 线程池提供了beforeExecute()afterExecute()terminated()三个钩子函数,其中,afterExecute()的入参含有抛出的异常,因此可以借由该方法处理线程池中线程抛出的异常。

二、原理分析

好文章

https://mp.weixin.qq.com/s/pwxDLs8TXMEMSVYWq73Ttw
https://www.cnblogs.com/throwable/p/13574306.html
https://xie.infoq.cn/article/d0120c6e1518cec1da65cd31f

三、Executor使用

package com.ljh.listen.thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

/**
 * uncaughtExceptionHandler 处理非正常的线程中止
 */
public class MyExecutors {

    /**
     * 线程捕捉异常
     */
    public static void thread(Runnable r){
        Thread thread = new Thread(r);
        thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                System.err.printf("线程名:%s,异常:%s",t,e);
            }
        });
        thread.start();
    }

    /**
     * 线程池捕捉异常
     * 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
     * (t,e)->表示重写了:new Thread.UncaughtExceptionHandler() {}的方法uncaughtException
     */
    public static ExecutorService fixedThreadPool(){
        return Executors.newFixedThreadPool(1,r->{
            Thread th = new Thread(r);
            th.setUncaughtExceptionHandler((t,e)->System.err.printf("线程名:%s,异常:%s",t,e));
            return th;
        });
    }

    /**
     * 线程池捕捉异常
     * l创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
     */
    public static ExecutorService cachedThreadPool(){
        return Executors.newCachedThreadPool(r->{
            Thread th = new Thread(r);
            th.setUncaughtExceptionHandler((t,e)->System.err.printf("线程名:%s,异常:%s",t,e));
            return th;
        });
    }

    /**
     * 线程池捕捉异常
     * 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
     */
    public static ExecutorService singleThreadExecutor(){
        return Executors.newSingleThreadExecutor(r->{
            Thread th = new Thread(r);
            th.setUncaughtExceptionHandler((t,e)->System.err.printf("线程名:%s,异常:%s",t,e));
            return th;
        });
    }

    /**
     * 线程池捕捉异常
     * 创建一个定长线程池,支持定时及周期性任务执行
     */
    public static ScheduledExecutorService scheduledThreadPool(){
        return Executors.newScheduledThreadPool(1,r->{
            Thread th = new Thread(r);
            th.setUncaughtExceptionHandler((t,e)->System.err.printf("线程名:%s,异常:%s",t,e));
            return th;
        });
    }
}

四、创建一个线程池

自定义线程工厂

import java.util.concurrent.ThreadFactory;

/**
 * 自定义线程工厂
 * uncaughtExceptionHandler 捕获运行中出现的异常
 */
public class MyThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setUncaughtExceptionHandler((t, e) -> {
            throw new RuntimeException(String.format("线程:%s 发生异常",t.getName()),e);
        });
        return thread;
    }
}

实现RejectedExecutionHandler 接口重写线程池饱和的4种拒绝策略

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 自定义线程池饱和拒绝策略(4种)
 */
public class ThreadRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

    }

    /**
     * 1、调用者线程执行策略,即调用者执行被拒绝的run方法
     */
    public static class CallerRunsPolocy extends ThreadRejectedExecutionHandler{
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            //如果线程池关闭,则直接丢弃任务
            if(!executor.isShutdown())r.run();
        }
    }

    /**
     * 2、终止策略,直接拒绝,抛出拒绝执行异常
     */
    public static class AbortPolocy extends ThreadRejectedExecutionHandler{
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            throw new RejectedExecutionException("请求任务:"+r.toString()+" ,线程池负载过高执行饱和终止策略!");
        }
    }

    /**
     * 3、丢弃策略,直接拒绝,什么都不做
     */
    public static class DiscardPolicy extends ThreadRejectedExecutionHandler{
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        }
    }

    /**
     * 4、弃老策略,丢弃最早放入阻塞队列中的线程,并尝试将决绝任务加入阻塞队列
     */
    public static class DiscardOldestPolicy extends ThreadRejectedExecutionHandler{
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            //如果线程池关闭,则直接丢弃任务
            if(!executor.isShutdown()){
                executor.getQueue().poll();
                executor.execute(r);
            }
        }
    }
}

一个单列的线程池工具类

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

/**
 * @Desc 单例的线程池工具类
 */
public class ThreadPoolUtils {

    /**
     * 系统可用计算资源
     */
    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();

    /**
     * 核心线程数
     */
    private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));

    /**
     * 最大线程数
     */
    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;

    /**
     * 空闲线程存活时间
     */
    private static final int KEEP_ALIVE_SECONDS = 30;

    /**
     * 工作队列
     */
    private static final BlockingQueue<Runnable> POOL_WORK_QUEUE = new LinkedBlockingQueue<>(128);

    /**
     * 工厂模式
     */
    private static final MyThreadFactory MY_THREAD_FACTORY = new MyThreadFactory();

    /**
     * 饱和策略
     */
    private static final ThreadRejectedExecutionHandler THREAD_REJECTED_EXECUTION_HANDLER = new ThreadRejectedExecutionHandler.CallerRunsPolocy();

    /**
     * 线程池对象
     */
    private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR;

    /**
     * 声明式定义线程池工具类对象静态变量,在所有线程中同步
     */
    private static volatile ThreadPoolUtils threadPoolUtils = null;


    /**
     * 初始化线程池静态代码块
     */
    static {
        THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
                //核心线程数
                CORE_POOL_SIZE,
                //最大线程数
                MAXIMUM_POOL_SIZE,
                //空闲线程执行时间
                KEEP_ALIVE_SECONDS,
                //空闲线程执行时间单位
                TimeUnit.SECONDS,
                //工作队列(或阻塞队列)
                POOL_WORK_QUEUE,
                //工厂模式
                MY_THREAD_FACTORY,
                //饱和策略
                THREAD_REJECTED_EXECUTION_HANDLER
                );
//        Arrays.stream(new int[]{THREAD_POOL_EXECUTOR.getCorePoolSize(),THREAD_POOL_EXECUTOR.getMaximumPoolSize(),CPU_COUNT}).forEach(System.err::println);
    }

    /**
     * 线程池工具类空参构造方法
     */
    private ThreadPoolUtils() {}

    /**
     * 获取线程池工具类实例
     * @return
     */
    public static ThreadPoolUtils getNewInstance(){
        if (threadPoolUtils == null) {
            synchronized (ThreadPoolUtils.class) {
                if (threadPoolUtils == null) {
                    threadPoolUtils = new ThreadPoolUtils();
                }
            }
        }
        return threadPoolUtils;
    }

    /**
     * 执行线程任务
     * @param runnable 任务线程
     */
    public void executor(Runnable runnable) {
        THREAD_POOL_EXECUTOR.execute(runnable);
    }

    /**
     * 获取线程池状态
     * @return 返回线程池状态
     */
    public boolean isShutDown(){
        return THREAD_POOL_EXECUTOR.isShutdown();
    }

    /**
     * 停止正在执行的线程任务
     * @return 返回等待执行的任务列表
     */
    public List<Runnable> shutDownNow(){
        return THREAD_POOL_EXECUTOR.shutdownNow();
    }

    /**
     * 关闭线程池
     */
    public void showDown(){
        THREAD_POOL_EXECUTOR.shutdown();
    }

    /**
     * 关闭线程池后判断所有任务是否都已完成
     * @return
     */
    public boolean isTerminated(){
        return THREAD_POOL_EXECUTOR.isTerminated();
    }
}

文章作者: LJH
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 LJH !
  目录