当前位置: 首页 > news >正文

五、线程池

文章目录

  • 什么是线程池
  • JDK自带的构建线程池的方式
    • newFixedThreadPool
    • newSingleThreadExecutor
    • newCachedThreadPool
    • newScheduleThreadPool
    • newWorkStealingPool
  • ThreadPoolExecutor应用&源码剖析
    • 为什么要自定义线程池
    • ThreadPoolExecutor应用
    • ThreadPoolExecutor源码剖析
      • ThreadPoolExecutor的核心属性
      • ThreadPoolExecutor的有参构造
      • ThreadPoolExecutor的execute方法
      • ThreadPoolExecutor的addWorker方法
      • ThreadPoolExecutor的Worker工作线程
      • ThreadPoolExecutor的runWorker方法
      • ThreadPoolExecutor的getTask方法
      • ThreadPoolExecutor的关闭方法
    • 线程池的核心参数设计规则
    • 线程池处理任务的核心流程
  • ScheduleThreadPoolExecutor应用&源码
    • ScheduleThreadPoolExecutor介绍
    • ScheduleThreadPoolExecutor应用
    • ScheduleThreadPoolExecutor源码剖析
      • 核心属性
      • schedule方法
      • At和With方法&任务的run方法

什么是线程池

为什么要使用线程池?
在开发中,为了提升效率的操作,我们需要将一些业务采用多线程的方式去执行。
比如有一个比较大的任务,可以将任务分成几块,分别交给几个线程去执行,最终做一个汇总就可以了。
比如做业务操作时,需要发送短信或者是发送邮件,这种操作也可以基于异步的方式完成,这种异步的方式,其实就是再构建一个线程去执行。
但是,如果每次异步操作或者多线程操作都需要新创建一个线程,使用完毕后,线程再被销毁,这样的话,对系统造成一些额外的开销。在处理过程中到底由多线程处理了多少个任务,以及每个线程的开销无法统计和管理。
所以咱们需要一个线程池机制来管理这些内容。线程池的概念和连接池类似,都是在一个Java的集合中存储大量的线程对象,每次需要执行异步操作或者多线程操作时,不需要重新创建线程,直接从集合中拿到线程对象直接执行方法就可以了。
JDK中就提供了线程池的类。
在线程池构建初期,可以将任务提交到线程池中。会根据一定的机制来异步执行这个任务。

  • 可能任务直接被执行。
  • 任务可以暂时被存储起来了。等到有空闲线程再来处理。
  • 任务也可能被拒绝,无法被执行。

JDK提供的线程池中记录了每个线程处理了多少个任务,以及整个线程池处理了多少个任务。同时还可以针对任务执行前后做一些勾子函数的实现。可以在任务执行前后做一些日志信息,这样可以多记录信息方便后面统计线程池执行任务时的一些内容参数等等… …

JDK自带的构建线程池的方式

JDK中基于Executors提供了很多种线程池

newFixedThreadPool

在Executors中第一个方法就是构建newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);
}

构建时,需要给newFixedThreadPool方法提供一个nThreads的属性,而这个属性其实就是当前线程池中线程的个数。当前线程池的本质其实就是使用ThreadPoolExecutor。
构建好当前线程池后,线程个数已经固定好**(线程是懒加载,在构建之初,线程并没有构建出来, 而是随着人任务的提交才会将线程在线程池中构建出来)**。如果线程没构建,线程会待着任务执行被创建和执行。如果线程都已经构建好了,此时任务会被放到LinkedBlockingQueue无界队列中存放,等待线程从LinkedBlockingQueue中去take出任务,然后执行。

测试功能效果

public static void main(String[] args) {ExecutorService threadPool = Executors.newFixedThreadPool(3);threadPool.execute(() -> {System.out.println("1号任务:" + Thread.currentThread().getName() + System.currentTimeMillis());try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}});threadPool.execute(() -> {System.out.println("2号任务:" + Thread.currentThread().getName() + System.currentTimeMillis());try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}});threadPool.execute(() -> {System.out.println("3号任务:" + Thread.currentThread().getName() + System.currentTimeMillis());try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}});
}

newSingleThreadExecutor

这个线程池的特别是线程数是固定的。
如果业务涉及到顺序消费,可以采用newSingleThreadExecutor

// 当前这里就是构建单例线程池的方式
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService// 在内部依然是构建了ThreadPoolExecutor,设置的线程个数为1// 当任务投递过来后,第一个任务会被工作线程处理,后续的任务会被扔到阻塞队列中 // 投递到阻塞队列中任务的顺序,就是工作线程处理的顺序// 当前这种线程池可以用作顺序处理的一些业务中(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}static class FinalizableDelegatedExecutorServiceextends DelegatedExecutorService {// 线程池的使用没有区别,跟正常的ThreadPoolExecutor没区别FinalizableDelegatedExecutorService(ExecutorService executor) {super(executor);}// finalize是当前对象被GC干掉之前要执行的方法// 当前FinalizableDelegatedExecutorService的目的是为了在当前线程池被GC回收之前// 可以执行shutdown,shutdown方法是将当前线程池停止,并且干掉工作线程// 但是不能基于这种方式保证线程池一定会执行shutdown// finalize在执行时,是守护线程,这种线程无法保证一定可以执行完毕。// 在使用线程池时,如果线程池是基于一个业务构建的,在使用完毕之后,一定要手动执行shutdown,// 否则会造成JVM中一堆线程protected void finalize() {super.shutdown();}
}

测试单例线程池效果:

public static void main(String[] args) {ExecutorService threadPool = Executors.newSingleThreadExecutor();threadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + "," + "111");});threadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + "," + "222");});threadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + "," + "333");});threadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + "," + "444");});
}

测试线程池使用完毕后,不执行shutdown的后果:
如果是局部变量仅限当前线程池使用的线程池,在使用完毕之后要记得执行shutdown,避免线程无法结束。

public static void main(String[] args) throws IOException, InterruptedException {newThreadPool();System.gc();Thread.sleep(5000);System.out.println("线程池被回收了!");System.in.read();
}private static void newThreadPool() {ExecutorService threadPool = Executors.newFixedThreadPool(200);for (int i = 0; i < 200; i++) {final int a = i;threadPool.execute(() -> {System.out.println(a);});}threadPool.shutdown();
}

在这里插入图片描述
在这里插入图片描述
如果是全局的线程池,很多业务都会到,使用完毕后不要shutdown,因为其他业务也要执行当前线程池。
在这里插入图片描述

static ExecutorService threadPool = Executors.newFixedThreadPool(200);public static void main(String[] args) throws InterruptedException, IOException {newThreadPool();System.gc();Thread.sleep(5000);System.out.println("线程池被回收了!!");System.in.read();
}private static void newThreadPool() {for (int i = 0; i < 200; i++) {final int a = i;threadPool.execute(() -> {System.out.println(a);});}threadPool.shutdown();for (int i = 0; i < 200; i++) {final int a = i;threadPool.execute(() -> {System.out.println(a);});}
}

newCachedThreadPool

看名字好像是一个缓存的线程池,查看一下构建的方式。

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);
}

当第一次提交任务到线程池时,会直接构建一个工作线程。
这个工作线程带执行完人后,60秒没有任务可以执行后,会结束。
如果在等待60秒期间有任务进来,他会再次拿到这个任务去执行。
如果后续提升任务时,没有线程是空闲的,那么就构建工作线程去执行。
最大的一个特点,任务只要提交到当前的newCachedThreadPool中,就必然有工作线程可以处理
代码测试效果:

ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 1; i <= 200; i++) {final int j = i;executorService.execute(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + ":" + j);});
}

newScheduleThreadPool

首先看到名字就可以猜到当前线程池是一个定时任务的线程池,而这个线程池就是可以以一定周期去执行一个任务,或者是延迟多久执行一个任务一次。
查看一下如何构建的。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
}

基于这个方法可以看到,构建的是ScheduledThreadPoolExecutor线程池。

public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {//....
}

所以本质上还是正常线程池,只不过在原来的线程池基础上实现了定时任务的功能。
原理是基于DelayQueue实现的延迟执行。周期性执行是任务执行完毕后,再次扔回到阻塞队列。
代码查看使用的方式和效果:

public static void main(String[] args) {ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);// 正常执行pool.execute(() -> {System.out.println(Thread.currentThread().getName() + ":1");});// 延迟执行,执行当前任务延迟5s后再执行pool.schedule(() -> {System.out.println(Thread.currentThread().getName() + ":2");}, 5, TimeUnit.SECONDS);// 周期执行,当前任务第一次延迟5s执行,然后每3s执行一次// 这个方法在计算下次执行时间时,是从任务刚刚开始时就计算。pool.scheduleAtFixedRate(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(System.currentTimeMillis() + ":3");}, 2, 1, TimeUnit.SECONDS);// 周期执行,当前任务第一次延迟5s执行,然后每3s执行一次// 这个方法在计算下次执行时间时,会等待任务结束后,再计算时间pool.scheduleWithFixedDelay(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(System.currentTimeMillis() + ":4");}, 2, 1, TimeUnit.SECONDS);
}

至于Executors提供的newSingleThreadScheduledExecutor单例的定时任务线程池就不说了。
一个线程的线程池可以延迟或者以一定的周期执行一个任务。

newWorkStealingPool

当前JDK提供构建线程池的方式newWorkStealingPool和之前的线程池很非常大的区别。
之前定长、单例、缓存、定时任务都基于ThreadPoolExecutor去实现的。
newWorkStealingPool是基于ForkJoinPool构建出来的。
ThreadPoolExecutor的核心点
在ThreadPoolExecutor中只有一个阻塞队列存放当前任务。
在这里插入图片描述
ForkJoinPool的核心特点:
ForkJoinPool从名字上就能看出一些东西。当有一个特别大的任务时,如果采用上述方式,这个大任务只能会某一个线程去执行。ForkJoin第一个特点是可以将一个大任务拆分成多个小任务,放到当前线程的阻塞队列中。其他的空闲线程就可以去处理有任务的线程的阻塞队列中的任务。
在这里插入图片描述
来一个比较大的数组,里面存满值,计算总和。
单线程处理一个任务:

public static void main(String[] args) {// ===================单线程累加10亿数据================================System.out.println("单线程计算数组总和!");long start = System.nanoTime();int sum = 0;for (int num : nums) {sum += num;}long end = System.nanoTime();System.out.println("单线程运算结果为:" + sum + ",计算时间为:" + (end - start));// 单线程运算结果为:1292852629,计算时间为:448611329
}

多线程分而治之的方式处理:

public static void main(String[] args) {// ===================单线程累加10亿数据================================System.out.println("单线程计算数组总和!");long start = System.nanoTime();int sum = 0;for (int num : nums) {sum += num;}long end = System.nanoTime();System.out.println("单线程运算结果为:" + sum + ",计算时间为:" + (end - start));// 单线程运算结果为:1290178136,计算时间为:445103473// ===================多线程分而治之累加10亿数据================================// 在使用forkJoinPool时,不推荐使用Runnable和Callable// 可以使用提供的另外两种任务的描述方式// Runnable(没有返回结果) -> RecursiveAction// Callable(有返回结果) -> RecursiveTaskForkJoinPool forkJoinPool = (ForkJoinPool) Executors.newWorkStealingPool();System.out.println("分而治之计算数组总和!");long forkJoinStart = System.nanoTime();ForkJoinTask<Integer> task = forkJoinPool.submit(new SumRecursiveTask(0, nums.length - 1));Integer result = task.join();long forkJoinEnd = System.nanoTime();System.out.println("分而治之运算结果为:" + result + ",计算时间为:" + (forkJoinEnd - forkJoinStart));// 分而治之运算结果为:1290178136,计算时间为:174586594
}private static class SumRecursiveTask extends RecursiveTask<Integer> {/*** 指定一个线程处理哪个位置的数据*/private int start, end;private final int MAX_STRIDE = 100_000_000;// 200_000_000: 147964900// 100_000_000: 145942100public SumRecursiveTask(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {// 在这个方法中,需要设置好任务拆分的逻辑以及聚合的逻辑int sum = 0;int stride = end - start;if (stride <= MAX_STRIDE) {// 可以处理任务for (int i = start; i <= end; i++) {sum += nums[i];}} else {// 将任务拆分,分而治之。int middle = (start + end) / 2;// 声明为2个任务SumRecursiveTask left = new SumRecursiveTask(start, middle);SumRecursiveTask right = new SumRecursiveTask(middle + 1, end);// 分别执行两个任务left.fork();right.fork();// 等待结果,并且获取sumsum = left.join() + right.join();}return sum;}
}

最终可以发现,这种累加的操作中,采用分而治之的方式效率提升了2倍多。
但是也不是所有任务都能拆分提升效率,首先任务得大,耗时要长。

ThreadPoolExecutor应用&源码剖析

前面讲到的Executors中的构建线程池的方式,大多数还是基于ThreadPoolExecutor去new出来的。

为什么要自定义线程池

首先ThreadPoolExecutor中,一共提供了7个参数,每个参数都是非常核心的属性,在线程池去执行任务时,每个参数都有决定性的作用。
但是如果直接采用JDK提供的方式去构建,可以设置的核心参数最多就两个,这样就会导致对线程池的控制粒度很粗。所以在阿里规范中也推荐自己去自定义线程池。
手动的去new ThreadPoolExecutor设置他的一些核心属性。
自定义构建线程池,可以细粒度的控制线程池,去管理内存的属性,并且针对一些参数的设置可能更好的在后期排查问题。
查看一下ThreadPoolExecutor提供的七个核心参数:

public ThreadPoolExecutor(int corePoolSize,	//核心线程数(当前任务执行结束,不会被销毁)int maximumPoolSize,	//最大线程数(代表当前线程池中,一共可以有多少个工作线程)long keepAliveTime,	//非核心工作线程在阻塞队列位置等待的时间TimeUnit unit,	//非核心工作线程在阻塞队列位置等待时间的单位BlockingQueue<Runnable> workQueue,	//任务在没有核心工作线程处理时,任务先扔到阻塞队列中ThreadFactory threadFactory,	//构建线程的线程工作,可以设置thread的一些信息RejectedExecutionHandler handler) {	//当线程池无法处理投递过来的任务时,执行当前的拒绝策略
}

ThreadPoolExecutor应用

手动new一下,处理的方式还是执行execute或者submit方法。
JDK提供的几种拒绝策略:

  • CallerRunsPolicy:当前拒绝策略会在线程池无法处理任务时,将任务交给调用者处理。

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}
    }
    
  • AbortPolicy:当前拒绝策略会在无法处理任务时,直接抛出一个异常。

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());
    }
    
  • DiscardPolicy:当前拒绝策略会在线程池无法处理任务时,直接将任务丢弃掉。

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
    
  • DiscardOldestPolicy:当前拒绝策略会在线程池无法处理任务时,将队列中最早的任务丢弃掉,将当前任务再次尝试交给线程池处理。

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}
    }
    

代码构建线程池,并处理有无返回结果的任务

public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,5,10,TimeUnit.SECONDS,new ArrayBlockingQueue<>(5), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("test-ThreadPoolExecutor");return thread;}},new MyRejectedExecution());//2. 让线程池处理任务,没返回结果threadPool.execute(() -> {System.out.println("没有返回结果的任务");});//3. 让线程池处理有返回结果的任务Future<Object> future = threadPool.submit(new Callable<Object>() {@Overridepublic Object call() throws Exception {System.out.println("我有返回结果!");return "返回结果";}});Object result = future.get();System.out.println(result);//4. 如果是局部变量的线程池,记得用完要shutdownthreadPool.shutdown();
}private static class MyRejectedExecution implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println("根据自己的业务情况,决定编写的代码!");}
}

ThreadPoolExecutor源码剖析

线程池的源码内容会比较多一点,需要一点一点的去查看,内部比较多。

ThreadPoolExecutor的核心属性

核心属性主要就是ctl,基于ctl拿到线程池的状态以及工作线程个数。
在整个线程池的执行流程中,会基于ctl判断上述两个内容。

// 当前是线程池的核心属性
// 当前的ctl其实就是一个int类型的数值,内部是基于AtomicInteger套了一层,进行运算时,是原子性的。 
// ctl表示着线程池中的2个核心状态:
// 线程池的状态:ctl的高3位,表示线程池状态
// 工作线程的数量:ctl的低29位,表示工作线程的个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer.SIZE:在获取Integer的bit位个数
// 声明了一个常量:COUNT_BITS = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
/*
00000000 00000000 00000000 00000001
00100000 00000000 00000000 00000000
00011111 11111111 11111111 11111111*/
// CAPACITY就是当前工作线程能记录的工作线程的最大个数
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits
// 线程池状态的表示
// 当前五个状态中,只有RUNNING状态代表线程池没问题,可以正常接收任务处理
// 111:代表RUNNING状态,RUNNING可以处理任务,并且处理阻塞队列中的任务。
private static final int RUNNING    = -1 << COUNT_BITS;
// 000:代表SHUTDOWN状态,不会接收新任务,正在处理的任务正常进行,阻塞队列的任务也会做完。
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 001:代表STOP状态,不会接收新任务,正在处理任务的线程会被中断,阻塞队列的任务一个不管。
private static final int STOP       =  1 << COUNT_BITS;
// 010:代表TIDYING状态,这个状态是否SHUTDOWN或者STOP转换过来的,代表当前线程池马上关闭,就是过渡状态。
private static final int TIDYING    =  2 << COUNT_BITS;
// 011:代表TERMINATED状态,这个状态是TIDYING状态转换过来的,转换过来只需要执行一个terminated方法。
private static final int TERMINATED =  3 << COUNT_BITS;// Packing and unpacking ctl
// 在使用下面这几个方法时,需要传递ctl进来
// 基于&运算的特点,保证只会拿到ctl高三位的值。
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 基于&运算的特点,保证只会拿到ctl低29位的值。
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池状态的特点以及转换的方式
在这里插入图片描述

ThreadPoolExecutor的有参构造

有参构造没啥说的,记住核心线程个数是允许为0的。

// 有参构造。无论调用哪个有参构造,最终都会执行当前的有参构造
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {// 健壮性校验// 核心线程个数是允许为0个的。// 最大线程数必须大于0,最大线程数要大于等于核心线程数 // 非核心线程的最大空闲时间,可以等于0if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)// 不满足要求就抛出参数异常throw new IllegalArgumentException();// 阻塞队列,线程工厂,拒绝策略都不允许为null,为null就扔空指针异常if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();// 不要关注当前内容,系统资源访问决策,和线程池核心业务关系不大。this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();// 各种赋值,JUC包下,几乎所有涉及到线程挂起的操作,单位都用纳秒。// 有参构造的值,都赋值给成员变量。// Doug Lea的习惯就是将成员变量作为局部变量单独操作。this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}

ThreadPoolExecutor的execute方法

execute方法是提交任务到线程池的核心方法,很重要。
线程池的执行流程其实就是在说execute方法内部做了哪些判断。
execute源码的分析

// 提交任务到线程池的核心方法
// command就是提交过来的任务
public void execute(Runnable command) {// 提交的任务不能为nullif (command == null)throw new NullPointerException();// 获取核心属性ctl,用于后面的判断int c = ctl.get();// 如果工作线程个数,小于核心线程数。// 满足要求,添加核心工作线程if (workerCountOf(c) < corePoolSize) {// addWorker(任务,是核心线程吗)// addWorker返回true:代表添加工作线程成功// addWorker返回false:代表添加工作线程失败// addWorker中会基于线程池状态,以及工作线程个数做判断,查看能否添加工作线程if (addWorker(command, true))// 工作线程构建出来了,任务也交给command去处理了。return;// 说明线程池状态或者是工作线程个数发生了变化,导致添加失败,重新获取一次ctlc = ctl.get();}// 添加核心工作线程失败,往这走// 判断线程池状态是否是RUNNING,如果是,正常基于阻塞队列的offer方法,将任务添加到阻塞队列if (isRunning(c) && workQueue.offer(command)) {// 如果任务添加到阻塞队列成功,走if内部// 如果任务在扔到阻塞队列之前,线程池状态突然改变了。// 重新获取ctlint recheck = ctl.get();// 如果线程池的状态不是RUNNING,将任务从阻塞队列移除,if (! isRunning(recheck) && remove(command))// 并且直接拒绝策略reject(command);// 在这,说明阻塞队列有我刚刚放进去的任务// 查看一下工作线程数是不是0个// 如果工作线程为0个,需要添加一个非核心工作线程去处理阻塞队列中的任务// 发生这种情况有两种:// 1. 构建线程池时,核心线程数是0个。// 2. 即便有核心线程,可以设置核心线程也允许超时,设置allowCoreThreadTimeOut为true,代表核心线程也可以超else if (workerCountOf(recheck) == 0)// 为了避免阻塞队列中的任务饥饿,添加一个非核心工作线程去处理addWorker(null, false);}// 任务添加到阻塞队列失败// 构建一个非核心工作线程// 如果添加非核心工作线程成功,直接完事,告辞else if (!addWorker(command, false))// 添加失败,执行决绝策略reject(command);
}

execute方法的完整执行流程图
在这里插入图片描述

ThreadPoolExecutor的addWorker方法

addWorker中主要分成两大块去看

  • 第一块:校验线程池的状态以及工作线程个数
  • 第二块:添加工作线程并且启动工作线程

校验线程池的状态以及工作线程个数

// 添加工作线程之校验源码
private boolean addWorker(Runnable firstTask, boolean core) {// 外层for循环在校验线程池的状态// 内层for循环是在校验工作线程的个数// retry是给外层for循环添加一个标记,是为了方便在内层for循坏跳出外层for循环retry:for (;;) {// 获取ctlint c = ctl.get();// 拿到ctl的高3位的值int rs = runStateOf(c);// Check if queue empty only if necessary.//==========================线程池状态判断=============================// 如果线程池状态是SHUTDOWN,并且此时阻塞队列有任务,工作线程个数为0,添加一个工作线程去处理阻塞队列的任// 判断线程池的状态是否大于等于SHUTDOWN,如果满足,说明线程池不是RUNNINGif (rs >= SHUTDOWN &&// 如果这三个条件都满足,就代表是要添加非核心工作线程去处理阻塞队列任务// 如果三个条件有一个没满足,返回false,配合!就代表不需要添加! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))// 不需要添加工作线程return false;for (;;) {//==========================工作线程个数判断========================// 基于ctl拿到低29位的值,代表当前工作线程个数int wc = workerCountOf(c);// 如果工作线程个数大于最大值了,不可以添加了,返回falseif (wc >= CAPACITY ||// 基于core来判断添加的是否是核心工作线程// 如果是核心:基于corePoolSize去判断// 如果是非核心:基于maximumPoolSize去判断wc >= (core ? corePoolSize : maximumPoolSize))// 代表不能添加,工作线程个数不满足要求return false;// 针对ctl进行 + 1,采用CAS的方式if (compareAndIncrementWorkerCount(c))// CAS成功后,直接退出外层循环,代表可以执行添加工作线程操作了。break retry;// 重新获取一次ctl的值c = ctl.get();  // Re-read ctl// 判断重新获取到的ctl中,表示的线程池状态跟之前的是否有区别// 如果状态不一样,说明有变化,重新的去判断线程池状态if (runStateOf(c) != rs)// 跳出一次外层for循环continue retry;// else CAS failed due to workerCount change; retry inner loop}}// 省略添加工作线程以及启动的过程
}

添加工作线程并且启动工作线程

private boolean addWorker(Runnable firstTask, boolean core) {// 省略校验部分的代码// 添加工作线程以及启动工作线程// 声明了三个变量// 工作线程启动了没,默认falseboolean workerStarted = false;// 工作线程添加了没,默认falseboolean workerAdded = false;// 工作线程,默认为nullWorker w = null;try {// 构建工作线程,并且将任务传递进去w = new Worker(firstTask);// 获取了Worker中的Thread对象final Thread t = w.thread;// 判断Thread是否不为null,在new Worker时,内部会通过给予的ThreadFactory去构建Thread交给Worker// 一般如果为null,代表ThreadFactory有问题。if (t != null) {// 加锁,保证使用workers成员变量以及对largestPoolSize赋值时,保证线程安全final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 再次获取线程池状态。int rs = runStateOf(ctl.get());// 再次判断// 如果满足 rs < SHUTDOWN 说明线程池是RUNNING,状态正常,执行if代码块// 如果线程池状态为SHUTDOWN,并且firstTask为null,添加非核心工作处理阻塞队列任务if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 到这,可以添加工作线程。// 校验ThreadFactory构建线程后,不能自己启动线程,如果启动了,抛出异常if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// private final HashSet<Worker> workers = new HashSet<Worker>();// 将new好的Worker添加到HashSet中。workers.add(w);// 获取了HashSet的size,拿到工作线程个数int s = workers.size();// largestPoolSize在记录最大线程个数的记录// 如果当前工作线程个数,大于最大线程个数的记录,就赋值if (s > largestPoolSize)largestPoolSize = s;// 添加工作线程成功workerAdded = true;}} finally {mainLock.unlock();}// 如果工作线程添加成功,if (workerAdded) {// 直接启动Worker中的线程t.start();// 启动工作线程成功workerStarted = true;}}} finally {// 做补偿的操作,如果工作线程启动失败,将这个添加失败的工作线程处理掉if (! workerStarted)addWorkerFailed(w);}// 返回工作线程是否启动成功return workerStarted;
}// 工作线程启动失败,需要不的步长操作
private void addWorkerFailed(Worker w) {// 因为操作了workers,需要加锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 如果w不为null,之前Worker已经new出来了。if (w != null)// 从HashSet中移除workers.remove(w);// 同时对ctl进行 - 1,代表去掉了一个工作线程个数decrementWorkerCount();// 因为工作线程启动失败,判断一下状态的问题,是不是可以走TIDYING状态最终到TERMINATED状态了。tryTerminate();} finally {// 释放锁mainLock.unlock();}
}

ThreadPoolExecutor的Worker工作线程

Worker对象主要包含了两个内容

  • 工作线程要执行任务
  • 工作线程可能会被中断,控制中断
// Worker继承了AQS,目的就是为了控制工作线程的中断。
// Worker实现了Runnable,内部的Thread对象,在执行start时,必然要执行Worker中断额一些操作
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{// =======================Worker管理任务================================// 线程工厂构建的线程final Thread thread;// 当前Worker要执行的任务Runnable firstTask;// 记录当前工作线程处理了多少个任务。volatile long completedTasks;// 有参构造Worker(Runnable firstTask) {// 将State设置为-1,代表当前不允许中断线程setState(-1); // inhibit interrupts until runWorker// 任务赋值this.firstTask = firstTask;// 基于线程工作构建Thread,并且传入的Runnable是Workerthis.thread = getThreadFactory().newThread(this);}// 当thread执行start方法时,调用的是Worker的run方法,public void run() {// 任务执行时,执行的是runWorker方法runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock()        { acquire(1); }public boolean tryLock()  { return tryAcquire(1); }public void unlock()      { release(1); }public boolean isLocked() { return isHeldExclusively(); }// =======================Worker管理中断================================// 当前方法是中断工作线程时,执行的方法void interruptIfStarted() {Thread t;// 只有Worker中的state >= 0的时候,可以中断工作线程if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {// 如果状态正常,并且线程未中断,这边就中断线程t.interrupt();} catch (SecurityException ignore) {}}}
}

ThreadPoolExecutor的runWorker方法

runWorker就是让工作线程拿到任务去执行即可。
并且在内部也处理了在工作线程正常结束和异常结束时的处理方案。

// 工作线程启动后执行的任务。
final void runWorker(Worker w) {// 拿到当前线程Thread wt = Thread.currentThread();// 从worker对象中拿到任务Runnable task = w.firstTask;// 将Worker中的firstTask置位空w.firstTask = null;// 将Worker中的state置位0,代表当前线程可以中断的w.unlock(); // allow interrupts// 判断工作线程是否是异常结束,默认就是异常结束boolean completedAbruptly = true;try {// 获取任务// 直接拿到第一个任务去执行// 如果第一个任务为null,去阻塞队列中获取任务while (task != null || (task = getTask()) != null) {// 执行了Worker的lock方法,当前在lock时,shutdown操作不能中断当前线程,因为当前线程正在处理任务w.lock();// 比较ctl >= STOP,如果满足找个状态,说明线程池已经到了STOP状态甚至已经要凉凉了// 线程池到STOP状态,并且当前线程还没有中断,确保线程是中断的,进到if内部执行中断方法// if(runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()) {中断线程}// 如果线程池状态不是STOP,确保线程不是中断的。// 如果发现线程中断标记位是true了,再次查看线程池状态是大于STOP了,再次中断线程 // 这里其实就是做了一个事情,如果线程池状态 >= STOP,确保线程中断了。if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 勾子函数在线程池中没有做任何的实现,如果需要在线程池执行任务前后做一些额外的处理,可以重写勾子函数// 前置勾子函数beforeExecute(wt, task);Throwable thrown = null;try {// 执行任务。task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 前后置勾子函数afterExecute(task, thrown);}} finally {// 任务执行完,丢掉任务task = null;// 当前工作线程处理的任务数+1w.completedTasks++;// 执行unlock方法,此时shutdown方法才可以中断当前线程w.unlock();}}// 如果while循环结束,正常走到这,说明是正常结束// 正常结束的话,在getTask中就会做一个额外的处理,将ctl - 1,代表工作线程没一个。completedAbruptly = false;} finally {// 考虑干掉工作线程processWorkerExit(w, completedAbruptly);}
}// 工作线程结束前,要执行当前方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {// 如果是异常结束if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted// 将ctl - 1,扣掉一个工作线程decrementWorkerCount();// 操作Worker,为了线程安全,加锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 当前工作线程处理的任务个数累加到线程池处理任务的个数属性中completedTaskCount += w.completedTasks;// 将工作线程从hashSet中移除workers.remove(w);} finally {// 释放锁mainLock.unlock();}// 只要工作线程凉了,查看是不是线程池状态改变了。tryTerminate();// 获取ctlint c = ctl.get();// 判断线程池状态,当前线程池要么是RUNNING,要么是SHUTDOWNif (runStateLessThan(c, STOP)) {// 如果正常结束工作线程if (!completedAbruptly) {// 如果核心线程允许超时,min = 0,否则就是核心线程个数int min = allowCoreThreadTimeOut ? 0 : corePoolSize;// 如果min == 0,可能会出现没有工作线程,并且阻塞队列有任务没有线程处理if (min == 0 && ! workQueue.isEmpty())// 至少要有一个工作线程处理阻塞队列任务min = 1;// 如果工作线程个数 大于等于1,不怕没线程处理,正常returnif (workerCountOf(c) >= min)return; // replacement not needed}// 异常结束,为了避免出现问题,添加一个空任务的非核心线程来填补上刚刚异常结束的工作线程addWorker(null, false);}
}

ThreadPoolExecutor的getTask方法

工作线程在去阻塞队列获取任务前,要先查看线程池状态。
如果状态没问题,去阻塞队列take或者是poll任务。
第二个循环时,不但要判断线程池状态,还要判断当前工作线程是否可以被干掉。

// 当前方法就在阻塞队列中获取任务
// 前面半部分是判断当前工作线程是否可以返回null,结束。 
// 后半部分就是从阻塞队列中拿任务
private Runnable getTask() {// timeOut默认值是false。boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();// 拿到线程池的状态int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}

ThreadPoolExecutor的关闭方法

首先查看shutdownNow方法,可以从RUNNING状态转变为STOP

// shutDownNow方法,shutdownNow不会处理阻塞队列的任务,将任务全部给你返回了。
public List<Runnable> shutdownNow() {// 声明返回结果List<Runnable> tasks;// 加锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 不关注这个方法......checkShutdownAccess();// 将线程池状态修改为STOPadvanceRunState(STOP);// 无论怎么,直接中断工作线程。interruptWorkers();// 将阻塞队列的任务全部扔到List集合中。tasks = drainQueue();} finally {// 释放锁mainLock.unlock();}tryTerminate();return tasks;
}// 将线程池状态修改为STOP
private void advanceRunState(int targetState) {for (;;) {// 获取ctl属性的值int c = ctl.get();// 第一个判断:如果当前线程池状态已经大于等于STOP了,不管了,告辞。if (runStateAtLeast(c, targetState) ||// 基于CAS,将ctl从c修改为STOP状态,不修改工作线程个数,但是状态变为了STOP// 如果修改成功结束ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))break;}
}// 无论怎么,直接中断工作线程。
private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 遍历HashSet,拿到所有的工作线程,直接中断。for (Worker w : workers)w.interruptIfStarted();} finally {mainLock.unlock();}
}// 移除阻塞队列,内容全部扔到List集合中
private List<Runnable> drainQueue() {BlockingQueue<Runnable> q = workQueue;ArrayList<Runnable> taskList = new ArrayList<Runnable>();// 阻塞队列自带的,直接清空阻塞队列,内容扔到List集合q.drainTo(taskList);// 为了避免任务丢失,重新判断,是否需要编辑阻塞队列,重新扔到Listif (!q.isEmpty()) {for (Runnable r : q.toArray(new Runnable[0])) {if (q.remove(r))taskList.add(r);}}return taskList;
}// 查看当前线程池是否可以变为TERMINATED状态
final void tryTerminate() {for (;;) {// 拿到ctlint c = ctl.get();// 如果是RUNNING,直接告辞。// 如果状态已经大于等于TIDYING,马上就要凉凉,直接告辞。// 如果状态是SHUTDOWN,但是阻塞队列还有任务,直接告辞。if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;// 如果还有工作线程if (workerCountOf(c) != 0) { // Eligible to terminate// 再次中断工作线程interruptIdleWorkers(ONLY_ONE);// 告辞,等你工作线程全完事,我这再尝试进入到TERMINATED状态return;}// 加锁,为了可以执行Condition的释放操作final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 将线程池状态修改为TIDYING状态,如果成功,继续往下走if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {// 这个方法是空的,如果你需要在线程池关闭后做一些额外操作,这里你可以自行实现terminated();} finally {// 最终修改为TERMINATED状态ctl.set(ctlOf(TERMINATED, 0));// 线程池提供了一个方法,主线程在提交任务到线程池后,是可以继续做其他操作的。 // 咱们也可以让主线程提交任务后,等待线程池处理完毕,再做后续操作// 这里线程池凉凉后,要唤醒哪些调用了awaitTermination方法的线程termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS}
}

再次shutdown方法,可以从RUNNING状态转变为SHUTDOWN。
shutdown状态下,不会中断正在干活的线程,而且会处理阻塞队列中的任务。

public void shutdown() {// 加锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 不看checkShutdownAccess();// 里面是一个死循环,将线程池状态修改为SHUTDOWNadvanceRunState(SHUTDOWN);// 中断空闲线程interruptIdleWorkers();// 说了,这个是为了ScheduleThreadPoolExecutor准备的,不管onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}// 尝试结束线程tryTerminate();
}// 中断空闲线程
private void interruptIdleWorkers(boolean onlyOne) {// 加锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;// 如果线程没有中断,那么就去获取Worker的锁,基于tryLock可知,不会中断正在干活的线程if (!t.isInterrupted() && w.tryLock()) {try {// 会中断空闲线程t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}
}

线程池的核心参数设计规则

线程池的使用难度不大,难度在于线程池的参数并不好配置。
主要难点在于任务类型无法控制,比如任务有CPU密集型,还有IO密集型,甚至还有混合型的。
因为IO咱们无法直接控制,所以很多时间按照一些书上提供的一些方法,是无法解决问题的。
《Java并发编程实践》

想调试出一个符合当前任务情况的核心参数,最好的方式就是测试。
需要将项目部署到测试环境或者是沙箱环境中,结果各种压测得到一个相对符合的参数。
如果每次修改项目都需要重新部署,成本太高了。
此时咱们可以实现一个动态监控以及修改线程池的方案。
因为线程池的核心参数无非就是:

  • corePoolSize:核心线程数
  • maximumPoolSize:最大线程数
  • workQueue:工作队列

线程池中提供了获取核心信息的get方法,同时也提供了动态修改核心属性的set方法。
在这里插入图片描述
在这里插入图片描述
也可以采用一些开源项目提供的方式去做监控和修改。
比如hippo4j就可以对线程池进行监控,而且可以和SpringBoot整合。
Github地址:https://github.com/opengoofy/hippo4j
官方文档:https://hippo4j.cn/docs/user_docs/intro

线程池处理任务的核心流程

基于addWorker添加工作线程的流程切入到整体处理任务的位置在这里插入图片描述

ScheduleThreadPoolExecutor应用&源码

ScheduleThreadPoolExecutor介绍

从名字上就可以看出,当前线程池是用于执行定时任务的线程池。
Java比较早的定时任务工具是Timer类。但是Timer问题很多,串行的,不靠谱,会影响到其他的任 务执行。
其实除了Timer以及ScheduleThreadPoolExecutor之外,正常在企业中一般会采用Quartz或者是SpringBoot提供的Schedule的方式去实现定时任务的功能。
ScheduleThreadPoolExecutor支持延迟执行以及周期性执行的功能。

ScheduleThreadPoolExecutor应用

定时任务线程池的有参构造

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}

发现ScheduleThreadPoolExecutor在构建时,直接调用了父类的构造方法
ScheduleThreadPoolExecutor的父类就是ThreadPoolExecutor
首先ScheduleThreadPoolExecutor最多允许设置3个参数A:

  • 核心线程数
  • 线程工厂
  • 拒绝策略

首先没有设置阻塞队列,以及最大线程数和空闲时间以及单位。
阻塞队列设置的是DelayedWorkQueue,其实本质就是DelayQueue,一个延迟队列。 DelayQueue是一个无界队列。所以最大线程数以及非核心线程的空闲时间是不需要设置的。

代码落地使用

public static void main(String[] args) {// 1. 构建定时任务线程池ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(5,new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);return t;}},new ThreadPoolExecutor.AbortPolicy());// 2. 应用ScheduledThreadPoolExecutor// 跟直接执行线程池的execute没啥区别pool.execute(() -> {System.out.println("execute");});// 指定延迟时间执行System.out.println(System.currentTimeMillis());pool.schedule(() -> {System.out.println("schedule");System.out.println(System.currentTimeMillis());}, 2, TimeUnit.SECONDS);// 指定第一次的延迟时间,并且确认后期的周期执行时间,周期时间是在任务开始时就计算// 周期性执行就是将执行完毕的任务再次社会好延迟时间,并且重新扔到阻塞队列// 计算的周期执行,也是在原有的时间上做累加,不关注任务的执行时长。System.out.println(System.currentTimeMillis());pool.scheduleAtFixedRate(() -> {System.out.println("scheduleAtFixedRate");System.out.println(System.currentTimeMillis());}, 2, 3, TimeUnit.SECONDS);// 指定第一次的延迟时间,并且确认后期的周期执行时间,周期时间是在任务结束后再计算下次的延迟时间System.out.println(System.currentTimeMillis());pool.scheduleWithFixedDelay(() -> {System.out.println("scheduleWithFixedDelay");System.out.println(System.currentTimeMillis());try {Thread.sleep(4000);} catch (InterruptedException e) {e.printStackTrace();}}, 2, 3, TimeUnit.SECONDS);
}

ScheduleThreadPoolExecutor源码剖析

核心属性

后面的方法业务流程会涉及到这些属性。

// 这里是针对任务取消时的一些业务判断会用到的标记
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
private volatile boolean removeOnCancel = false;
// 计数器,如果两个任务的执行时间节点一模一样,根据这个序列来判断谁先执行
private static final AtomicLong sequencer = new AtomicLong();
// 这个方法是获取当前系统时间的毫秒值
final long now() {return System.nanoTime();
}
// 内部类。核心类之一。
private class ScheduledFutureTask<V>extends FutureTask<V> implements RunnableScheduledFuture<V> {// 全局唯一的序列,如果两个任务时间一直,基于当前属性判断private final long sequenceNumber;// 任务执行的时间,单位纳秒private long time;/*** period == 0:执行一次的延迟任务 * period > 0:代表是At* period < 0:代表是With*/private final long period;// 周期性执行时,需要将任务重新扔回阻塞队列,基础当前属性拿到任务,方便扔回阻塞队列RunnableScheduledFuture<V> outerTask = this;int heapIndex;/*** 构建At和With任务的有参构造 */ScheduledFutureTask(Runnable r, V result, long ns) {super(r, result);this.time = ns;this.period = 0;this.sequenceNumber = sequencer.getAndIncrement();}
}// 内部类。核心类之一。
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {// 这个类就是DelayQueue,不用过分关注,如果没看过,看阻塞队列中的优先级队列和延迟队列
}

schedule方法

execute方法也是调用的schedule方法,只不过传入的延迟时间是0纳秒。
schedule方法就是将任务和延迟时间封装到一起,并且将任务扔到阻塞队列中,再去创建工作线程去take阻塞队列。

// 延迟任务执行的方法。
// command:任务
// delay:延迟时间
// unit:延迟时间的单位
public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {// 健壮性校验。if (command == null || unit == null)throw new NullPointerException();// 将任务和延迟时间封装到一起,最终组成ScheduledFutureTask// 要分成三个方法去看// triggerTime:计算延迟时间。最终返回的是当前系统时间 + 延迟时间// triggerTime就是将延迟时间转换为纳秒,并且+当前系统时间,再做一些健壮性校验// ScheduledFutureTask有参构造:将任务以及延迟时间封装到一起,并且设置任务执行的方式// decorateTask:当前方式是让用户基于自身情况可以动态修改任务的一个扩展口RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));// 任务封装好,执行delayedExecute方法,去执行任务delayedExecute(t);// 返回FutureTaskreturn t;
}// triggerTime做的事情
// 外部方法,对延迟时间做校验,如果小于0,就直接设置为0
// 并且转换为纳秒单位
private long triggerTime(long delay, TimeUnit unit) {return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
// 将延迟时间+当前系统时间
// 后面的校验是为了避免延迟时间超过Long的取值范围
long triggerTime(long delay) {return now() +((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
// ScheduledFutureTask有参构造
ScheduledFutureTask(Runnable r, V result, long ns) {super(r, result);// time就是任务要执行的时间this.time = ns;// period,为0,代表任务是延迟执行,不是周期执行this.period = 0;// 基于AtmoicLong生成的序列this.sequenceNumber = sequencer.getAndIncrement();
}// delayedExecute 执行延迟任务的操作
private void delayedExecute(RunnableScheduledFuture<?> task) {// 查看当前线程池是否还是RUNNING状态,如果不是RUNNING,进到ifif (isShutdown())// 不是RUNNING。 // 执行拒绝策略。reject(task);else {// 线程池状态是RUNNING// 直接让任务扔到延迟的阻塞队列中super.getQueue().add(task);// DCL的操作,再次查看线程池状态// 如果线程池在添加任务到阻塞队列后,状态不是RUNNINGif (isShutdown() &&// task.isPeriodic():现在反回的是false,因为任务是延迟执行,不是周期执行 // 默认情况,延迟队列中的延迟任务,可以执行!canRunInCurrentRunState(task.isPeriodic()) &&// 从阻塞队列中移除任务。remove(task))task.cancel(false);else// 线程池状态正常,任务可以执行ensurePrestart();}
}// 线程池状态不为RUNNING,查看任务是否可以执行
// 延迟执行:periodic==false
// 周期执行:periodic==true
// continueExistingPeriodicTasksAfterShutdown:周期执行任务,默认为false
// executeExistingDelayedTasksAfterShutdown:延迟执行任务,默认为true
boolean canRunInCurrentRunState(boolean periodic) {return isRunningOrShutdown(periodic ?continueExistingPeriodicTasksAfterShutdown :executeExistingDelayedTasksAfterShutdown);
}// 当前情况,shutdownOK为true
final boolean isRunningOrShutdown(boolean shutdownOK) {int rs = runStateOf(ctl.get());// 如果状态是RUNNING,正常可以执行,返回true// 如果状态是SHUTDOWN,根据shutdownOK来决定return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}// 任务可以正常执行后,做的操作
void ensurePrestart() {// 拿到工作线程个数int wc = workerCountOf(ctl.get());// 如果工作线程个数小于核心线程数if (wc < corePoolSize)// 添加核心线程去处理阻塞队列中的任务addWorker(null, true);else if (wc == 0)// 如果工作线程数为0,核心线程数也为0,这是添加一个非核心线程去处理阻塞队列任务addWorker(null, false);
}

At和With方法&任务的run方法

这两个方法在源码层面上的第一个区别,就是在计算周期时间时,需要将这个值传递给period,基于正负数在区别At和With
所以查看一个方法就ok,查看At方法

// At方法,
// command:任务
// initialDelay:第一次执行的延迟时间
// period:任务的周期执行时间
// unit:上面两个时间的单位
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {// 健壮性校验if (command == null || unit == null)throw new NullPointerException();// 周期时间不能小于等于0.if (period <= 0)throw new IllegalArgumentException();// 将任务以及第一次的延迟时间,和后续的周期时间封装好。ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));// 扩展口,可以对任务做修改。RunnableScheduledFuture<Void> t = decorateTask(command, sft);// 周期性任务,需要在任务执行完毕后,重新扔会到阻塞队列,为了方便拿任务,将任务设置到outerTask成员变量中sft.outerTask = t;// 和schedule方法一样的方式// 如果任务刚刚扔到阻塞队列,线程池状态变为SHUTDOWN,默认情况,当前任务不执行delayedExecute(t);return t;
}// 延迟任务以及周期任务在执行时,都会调用当前任务的run方法。
public void run() {// periodic == false:一次性延迟任务// periodic == true:周期任务boolean periodic = isPeriodic();// 任务执行前,会再次判断状态,能否执行任务if (!canRunInCurrentRunState(periodic))cancel(false);// 判断是周期执行还是一次性任务else if (!periodic)// 一次性任务,让工作线程直接执行command的逻辑ScheduledFutureTask.super.run();// 到这个else if,说明任务是周期执行else if (ScheduledFutureTask.super.runAndReset()) {// 设置下次任务执行的时间setNextRunTime();// 将任务重新扔回线程池做处理reExecutePeriodic(outerTask);}
}// 设置下次任务执行的时间
private void setNextRunTime() {long p = period;if (p > 0)// 拿着之前的执行时间,直接追加上周期时间time += p;else// 如果走到else,代表任务是With方式,这种方式要重新计算延迟时间// 拿到当前系统时间,追加上延迟时间,time = triggerTime(-p);
}// 将任务重新扔回线程池做处理
void reExecutePeriodic(RunnableScheduledFuture<?> task) {// 如果状态ok,可以执行if (canRunInCurrentRunState(true)) {// 将任务扔到延迟队列super.getQueue().add(task);// DCL,判断线程池状态if (!canRunInCurrentRunState(true) && remove(task))task.cancel(false);else// 添加工作线程ensurePrestart();}
}

相关文章:

五、线程池

文章目录什么是线程池JDK自带的构建线程池的方式newFixedThreadPoolnewSingleThreadExecutornewCachedThreadPoolnewScheduleThreadPoolnewWorkStealingPoolThreadPoolExecutor应用&源码剖析为什么要自定义线程池ThreadPoolExecutor应用ThreadPoolExecutor源码剖析ThreadPo…...

ROS从入门到精通2-6:Rviz可视化进阶(画坐标轴、直线、平面、圆柱等)

目录0 专栏介绍1 Rviz可视化2 环境配置3 使用方法4 测试用例0 专栏介绍 本专栏旨在通过对ROS的系统学习&#xff0c;掌握ROS底层基本分布式原理&#xff0c;并具有机器人建模和应用ROS进行实际项目的开发和调试的工程能力。 &#x1f680;详情&#xff1a;《ROS从入门到精通》…...

Linux命令之lz4命令

一、lz4命令简介 LZ4是一种压缩格式&#xff0c;特点是压缩/解压缩速度超快(压缩率不如gzip)&#xff0c;如果你特别在意压缩速度&#xff0c;或者当前环境的CPU资源紧缺&#xff0c;可以考虑这种格式。lz4是一种非常快速的无损压缩算法&#xff0c;基于字节对齐LZ77系列压缩方…...

强强角逐,筑梦开源| 2022年度启智社区优秀项目及开发者评选结果正式揭晓

2月24日&#xff0c;第四届OpenI/O启智开发者大会在深圳隆重开幕。本届大会以“算网筑基、开源启智、AI赋能”为主题&#xff0c;邀请国内人工智能开源领域领军院士亲自参加&#xff0c;汇聚学术界、产业界的技术专家&#xff0c;围绕中国算力网资源基座、开源社区服务支撑环境…...

【使用两个队列实现栈】

文章目录前言使用两个队列实现栈1.队列接口函数引入2.栈的初始化3.向栈中插入元素4.出栈操作5.取出栈顶元素6.判断栈是否为空7.释放内存空间总结前言 本文章主要介绍栈和队列的相互转换。 使用两个队列实现栈 我们知道&#xff0c;栈的特点是后进先出&#xff0c;而队列的特点…...

毕业设计 基于51单片机环境监测设计 光照 PM2.5粉尘 温湿度 2.4G无线通信

基于51单片机环境监测设计 光照 PM2.5粉尘 温湿度 2.4G无线通信1、项目简介1.1 系统构成1.2 系统功能2、部分电路设计2.1 STC89C52单片机核心系统电路设计2.2 dht11温湿度检测电路设计2.3 NRF24L01无线通信电路设计3、部分代码展示3.1 NRF24L01初始化3.2 NRF24L01的SPI写时序3.…...

PowerShell Install Rabbitmq

Rabbitmq 前言 RabbitMQ是实现了高级消息队列协议&#xff08;AMQP&#xff09;的开源消息代理软件&#xff08;亦称面向消息的中间件&#xff09;。RabbitMQ服务器是用Erlang语言编写的&#xff0c;而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代…...

ASM 字节码插桩:隐私合规方法检测!

1.前言近两年来工信部对于应用的隐私合规安全问题愈加重视&#xff0c;对 Android 平台的管控程度也要比 IOS 平台严格很多&#xff0c;很多不合规的应用也先后被下架要求整改。笔者就曾遇到过加班整改隐私合规的问题&#xff0c;隐私合规问题主要针对两个方面。在用户同意隐私…...

spring data jpa使用流式查询

思路 调用org.hibernate.query.Query.stream方法查询数据 代码样例 import static org.hibernate.annotations.QueryHints.READ_ONLY; import static org.hibernate.jpa.QueryHints.HINT_FETCH_SIZE; import org.hibernate.query.Query;使用HQL查询 Query<MyEntity> …...

Golang实现RabbitMQ中死信队列各个情况

下面这段教程针对是你已经有一些基本的MQ的知识&#xff0c;比如说能够很清楚的理解queue、exchange等概念&#xff0c;如果你还不是很理解&#xff0c;我建议你先访问官网查看基本的教程。 文章目录1、造成死信队列的主要原因2、操作逻辑图3、代码实战3.1 针对原因1&#xff1…...

react源码分析:组件的创建和更新

这一章节就来讲讲ReactDOM.render()方法的内部实现与流程吧。 因为初始化的源码文件部分所涵盖的内容很多&#xff0c;包括创建渲染、更新渲染、Fiber树的创建与diff&#xff0c;element的创建与插入&#xff0c;还包括一些优化算法&#xff0c;所以我就整个的React执行流程画了…...

Android Lmkd 低内存终止守护程序

一、低内存终止守护程序 Android 低内存终止守护程序 (lmkd) 进程可监控运行中的 Android 系统的内存状态&#xff0c;并通过终止最不必要的进程来应对内存压力大的问题&#xff0c;使系统以可接受的性能水平运行。 所有应用进程都是从zygote孵化出来的&#xff0c;记录在AMS…...

快速掌握 Flutter 图片开发核心技能

大家好&#xff0c;我是 17。 在 Flutter 中使用图片是最基础能力之一。17 做了精心准备&#xff0c;满满的都是干货&#xff01;本文介绍如何在 Flutter 中使用图片&#xff0c;尽量详细&#xff0c;示例完整&#xff0c;包会&#xff01; 使用网络图片 使用网络图片超级简…...

复习使用git(二)

删除远程分支 git push origin --delete 分支名 撤销修改 撤销工作区的修改 已修改&#xff0c;但尚未添加&#xff08;add&#xff09;&#xff0c;使用 git restore 文件名 撤销工作区的修改。 Note: “git checkout – 文件名”&#xff0c;checkout 检出的意思&#x…...

魔兽世界335服务端架设对外网开放的步骤

警告&#xff1a;在没有网络安全防护措施或基础知识的情况下&#xff0c;开放端口可能造成被黑客入侵、流量攻击、破坏数据、资料泄露等情况的发生。在你选择开放端口时&#xff0c;视为已经充分了解可能发生的后果、危害&#xff0c;清楚自己在做什么&#xff0c;并且自己将对…...

华为OD机试模拟题 用 C++ 实现 - 通信误码(2023.Q1)

最近更新的博客 【华为OD机试模拟题】用 C++ 实现 - 最多获得的短信条数(2023.Q1)) 文章目录 最近更新的博客使用说明通信误码题目输入输出示例一输入输出说明示例二输入输出说明Code使用说明 参加华为od机试,一定要注意不要完全背诵代码,需要理解之后模仿写出,...

Vue 核心

文章目录Vue 核心一&#xff0c;Vue 简介&#xff08;一&#xff09;官网&#xff08;二&#xff09;介绍与描述&#xff08;三&#xff09;Vue 的特点&#xff08;四&#xff09;与其它 JS 框架的关联&#xff08;五&#xff09;Vue 周边库二&#xff0c;初识 Vue三&#xff0…...

Kylin V10桌面版arm3568 源码安装redis

上传redis-5.0.14.tar.gz到/home/kylin/下载&#xff1b;解压kylinkylin:~/下载$ tar -zxvf redis-5.0.14.tar.gz/opt下新建redis目录&#xff0c;并将上面解压的文件夹移到此处kylinkylin:~/下载$ sudo mv redis-5.0.14 /opt/redis/编译&#xff1a;kylinkylin:/opt/redis/red…...

【ICCV2022】 CAPAO:一种高效的单阶段人体姿态估计模型

CAPAO&#xff1a;一种高效的单阶段人体姿态估计模型 重新思考关键点表示&#xff1a;将关键点和姿态建模作为多人姿态估计的对象&#xff08;Rethinking Keypoint Representations: Modeling Keypoints and Poses as Objects for Multi-Person Human Pose Estimation&#xf…...

ROS1学习笔记:ROS中的坐标管理系统(ubuntu20.04)

参考B站古月居ROS入门21讲&#xff1a;ROS中的坐标系管理系统 基于VMware Ubuntu 20.04 Noetic版本的环境 文章目录一、机器人中的坐标变换二、TF功能包三、小海龟跟随实验3.1 启动实验3.2 查看当前的TF树3.3 坐标相对位置可视化3.3.1 tf_echo3.3.2 rviz一、机器人中的坐标变换…...

requests---(2)session简介与自动写博客

目录&#xff1a;导读 session简介 session登录 自动写博客 获取登录cookies 抓取写博客接口 requests自动写博客 写在最后 http协议是无状态的&#xff0c;也就是每个请求都是独立的。那么登录后的一系列动作&#xff0c;都需要用cookie来验证身份是否是登录状态&#…...

基于 HAProxy + Keepalived 搭建 RabbitMQ 高可用集群

RabbitMQ 集群 通常情况下&#xff0c;在集群中我们把每一个服务称之为一个节点&#xff0c;在 RabbitMQ 集群中&#xff0c;节点类型可以分为两种&#xff1a; 内存节点&#xff1a;元数据存放于内存中。为了重启后能同步数据&#xff0c;内存节点会将磁盘节点的地址存放于磁…...

基于51单片机和proteus的智能调速风扇设计

此智能风扇是基于51单片机和proteus的仿真设计&#xff0c;功能如下&#xff1a; 1. Timer0 PWM控制电机转速 2. DHT11采集温湿度 3. LCD1602显示温湿度及电机状态 4. 按键控制电机加减速启停等 5. 串口控制电机加减速启停等 功能框图如下&#xff1a; Proteus仿真界面如下…...

SQL Server开启CDC的完整操作过程

这里写自定义目录标题写在前面SQL Server开启CDC1. 将指定库的实例先开启CDC2. 开启需要开启CDC的表3. 关闭CDC功能更详细信息参照官网写在前面 鉴于老旧数据的结构和项目都在sqlserver上存储&#xff0c;且迁移成本巨大&#xff0c;当下要为sqlserver的存储过程减负。要将一部…...

【Spring Cloud Alibaba】008-Sentinel

【Spring Cloud Alibaba】008-Sentinel 文章目录【Spring Cloud Alibaba】008-Sentinel一、服务雪崩1、概述2、解决方案常见的容错机制二、Sentinel&#xff1a;分布式系统的流量防卫兵1、**Sentinel** 概述简介特性Sentinel 的开源生态Sentinel 的历史2、Sentinel 基本概念资源…...

解读CRC校验计算

个人随笔 (Owed by: 春夜喜雨 http://blog.csdn.net/chunyexiyu) 参考&#xff1a;http://www.sunshine2k.de/articles/coding/crc/understanding_crc.html 参考&#xff1a;https://en.wikipedia.org/wiki/Cyclic_redundancy_check 参考&#xff1a;https://www.cnblogs.com/…...

深入理解Spring MVC下

上一篇博客从理论概念上来梳理Spring MVC相关知识&#xff0c;此篇博客将通过spring官网提供showcase代码为例子&#xff0c;详细介绍showcase代码中包含的各个例子是如何实现的。官网的showcase代码包含的主要例子包括&#xff0c;Demo地址&#xff1a;Mapping Requests&#…...

【Linux】ssh-keygen不需要回车,自动生成密钥,批量免密操作!

使用命令ssh-keygen 需要手动敲击回车&#xff0c;才会生成密钥&#xff0c;如下代码所示 [rootlocalhost ~]# ssh-keygen Generating public/private rsa key pair. Enter file in which to save the key (/root/.ssh/id_rsa): Enter passphrase (empty for no passphrase):…...

C/C++开发,无可避免的内存管理(篇四)-智能指针备选

一、智能指针 采用C/C开发堆内存管理无论是底层开发还是上层应用&#xff0c;无论是开发新手&#xff0c;还是多年的老手&#xff0c;都会不自觉中招&#xff0c;尤其是那些不是自己一手经历的代码&#xff0c;要追溯问题出在哪里更是个麻烦事。C/C程序常常会遇到程序突然退出&…...

VMware ESXi给虚拟机扩容

用ESXi管理的虚拟机硬盘空间不够了&#xff0c;讲一下如何进行扩容。 一、查看现状 通过如下三个命令&#xff0c;可以查看硬盘情况&#xff0c;可以看到只有500G&#xff0c;已经用了45%。这次我们再扩容500G。 df -Th lsblk fdisk -lIDE磁盘的文件名为        /de…...

重庆网站制作特点优势/网站营销策略

我想在Java中使用xz压缩。使用xz 1.5压缩库&#xff0c;commons io 2.4库和commons压缩1.8.1库。我试图运行下面的代码给我非常不一致的结果。文本超过70&#xff05;&#xff0c;音频和视频文件低于0.1&#xff05;(1-compressed/original * 100)。我在每次压缩之前使用tarbal…...

openwrt wordpress/如何提高网站排名seo

Linux提供了大量的命令&#xff0c;利用它可以有效地完成大量的工 作&#xff0c;如磁盘操作、文件存取、目录操作、进程管理、文件权限设定等。所以&#xff0c;在Linux系统上工作离不开使用系统提供的命令。要想真正理解Linux系统&#xff0c; 就必须从Linux命令学起&#xf…...

如何创建网站的详细步骤/公司网站推广怎么做

现在有n个人要排成一列&#xff0c;编号为1->n 。但由于一些不明原因的关系&#xff0c;人与人之间可能存在一些矛盾关系&#xff0c;具体有m条矛盾关系(u,v),表示编号为u的人想要排在编号为v的人前面。要使得队伍和谐&#xff0c;最多不能违背k条矛盾关系&#xff08;即不能…...

柳北网站制作/网站功能优化

Microsoft Visual Studio 2010 的项目为件改为Microsoft Visual Studio 2015默认打开 2010 的Solution (.Sln) file Microsoft Visual Studio Solution File, Format Version 11.00 # Visual Studio 2010 --默认打开的版本IDE Geovin Du 涂聚文 注释 Project("{FAE04EC…...

wordpress 另类主题/防晒霜营销软文

简介不知从什么时间起&#xff0c;“共享单车”这一概念在忽然间火遍了全国&#xff0c;ofo小黄车&#xff0c;摩拜单车……逐渐走入到我们的生活中。特别是在一线城市&#xff0c;共享单车成为广大白领们不可或缺的交通工具。今天我们就kaggle上的共享单车数据集进行分析。分析…...

百度怎么搜索到自己的网站/文案代写

1、给出导致进程状态转换的事件: &#xff08;1&#xff09;运行→就绪,1种; &#xff08;2&#xff09;创建→就绪,1种; &#xff08;3&#xff09;运行→阻塞,3种; &#xff08;4&#xff09;阻塞→就绪,3种; &#xff08;5&#xff09;运行→终止,4种 答&#xff1a; …...