当前位置: 首页 > news >正文

做外账要登什么网站/自媒体平台注册下载

做外账要登什么网站,自媒体平台注册下载,wordpress首页登陆,四海网络网站建设建站王有志,一个分享硬核 Java 技术的互金摸鱼侠 加入 Java 人的提桶跑路群:共同富裕的Java人 今天是《面霸的自我修养》第 6 篇文章,我们一起来看看面试中会问到哪些关于线程池的问题吧。数据来源: 大部分来自于各机构(J…

封面:线程池专题.png

王有志,一个分享硬核 Java 技术的互金摸鱼侠
加入 Java 人的提桶跑路群:共同富裕的Java人

今天是《面霸的自我修养》第 6 篇文章,我们一起来看看面试中会问到哪些关于线程池的问题吧。
数据来源

  • 大部分来自于各机构(Java 之父,Java 继父,某灵,某泡,某客)以及各博主整理文档;
  • 小部分来自于我以及身边朋友的实际经历,题目上会做出标识,并注明面试公司。

叠“BUFF”

  • 八股文通常出现在面试的第一二轮,是“敲门砖”,但仅仅掌握八股文并不能帮助你拿下 Offer;
  • 由于本人水平有限,文中难免出现错误,还请大家以批评指正为主,尽量不要喷~~

线程池是什么?为什么要使用线程池?

:::info
难易程度:🔥🔥
:::
:::warning
重要程度:🔥🔥🔥🔥🔥
:::
:::success
面试公司:无
:::
计算机中,线程的创建和销毁开销较大,频繁的创建和销毁线程会影响程序性能。利用基于池化思想的线程池来统一管理和分配线程,复用已创建的线程,避免频繁创建和销毁线程带来的资源消耗,提高系统资源的利用率
线程池具有以下 3 点优势:

  • 降低资源消耗,重复利用已经创建的线程,避免线程创建与销毁带来的资源消耗;
  • 提高响应速度,接收任务时,可以通过线程池直接获取线程,避免了创建线程带来的时间消耗;
  • 便于管理线程,统一管理和分配线程,避免无限制创建线程,另外可以引入线程监控机制。

Java 中如何创建线程池?

:::info
难易程度:🔥🔥
:::
:::warning
重要程度:🔥🔥🔥🔥
:::
:::success
面试公司:无
:::
Java 中可以通过 ThreadPoolExecutor 和 Executors 创建线程池。

使用 ThreadPoolExecutor 创建线程池

使用 ThreadPoolExecutor 可以创建自定义线程池,例如:

ExecutorService threadPoolExecutor = new ThreadPoolExecutor(10, 20, 10, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),new ThreadPoolExecutor.AbortPolicy()
);

了解以上代码的含义前,我们先来看 ThreadPoolExecutor 提供的的构造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
}public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
}public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
}public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}

ThreadPoolExecutor 提供了 4 个构造方法,但最后都会指向含有 7 个参数的构造方法上。我们一一说明这些参数的含义:

  • int corePoolSize,线程池的核心线程数量,核心线程在线程池的生命周期中不会被销毁
  • int maximumPoolSize,线程池的最大线程数量,超出核心线程数量的非核心线程
  • long keepAliveTime,线程存活时间,非核心线程空闲时存活的最大时间;
  • TimeUnit unit,keepAliveTime 的时间单位;
  • BlockingQueue<Runnable> workQueue,线程池的任务队列;
  • ThreadFactory threadFactory,线程工厂,用于创建线程,可以自定义线程;
  • RejectedExecutionHandler handler,拒绝策略,当任务数量超出线程池的容量(超过 maximumPoolSize 并且 workQueue 已满)时的处理策略。

使用 Executors 创建线程池

除了使用 ThreadPoolExecutor 创建线程池外,还可以通过 Executors 创建 Java 内置的线程池,Java 中提供了 6 种内置线程池:

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();ExecutorService cachedThreadPool = Executors.newCachedThreadPool();ExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);ExecutorService singleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();ExecutorService workStealingPool = Executors.newWorkStealingPool();

Tips:关于这 6 种线程池的详细解释,参考下一题。


Java 中提供了哪些线程池?

:::info
难易程度:🔥🔥
:::
:::warning
重要程度:🔥🔥🔥🔥
:::
:::success
面试公司:无
:::
Java 中提供了 6 种线程池,可以通过 Executors 获取。

FixedThreadPool

通过 Executors 创建 FixedThreadPool 的代码如下:

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);

FixedThreadPool 是固定线程数量的线程池,通过Executors#newFixedThreadPool方法获得,源码如下:

public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

本质上是通过 ThreadPoolExecutor 创建的线程池,核心线程数和最大线程数相同,工作队列使用 LinkedBlockingQueue,该队列最大容量是 Integer.MAX_VALUE

SingleThreadExecutor

通过 Executors 创建 SingleThreadExecutor 的代码如下:

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

SingleThreadExecutor 是只有一个线程的线程池,通过Executors#newSingleThreadExecutor方法获得,其源码如下:

public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}

依旧是通过 ThreadPoolExecutor 创建的线程池,最大线程数和核心线程数设置为 1,工作队列使用 LinkedBlockingQueue。SingleThreadExecutor 适合按顺序执行的场景。

ScheduledThreadPool

通过 Executors 创建 ScheduledThreadPool 的代码如下:

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);

ScheduledThreadPool 是具有定时调度和延迟调度能力的线程池,通过Executors#newScheduledThreadPool方法获得,其源码如下:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
}

与前两个不同的是 ScheduledThreadPool 是用过 ScheduledThreadPoolExecutor 创建的,源码如下:

public class Executors {public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);}
}public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue());}
}

追根溯源的话 ScheduledThreadPoolExecutor 依旧是通过 ThreadPoolExecutor 的构造方法创建线程池的,能够实现定时调度的特性是因为ScheduledThreadPoolExecutor#execute方法和ScheduledThreadPoolExecutor#schedule方法实现的:

public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {public void execute(Runnable command) {schedule(command, 0, NANOSECONDS);}public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();RunnableScheduledFuture<Void> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit), sequencer.getAndIncrement()));delayedExecute(t);return t;}
}

因为 ScheduledThreadPoolExecutor 并不是线程池中的重点内容,这里我们不过多讨论源码的实现,我们接下来看 ScheduledThreadPoolExecutor 该如何使用:

SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
System.out.println("当前时间:" + simpleDateFormat.format(new Date()));scheduledThreadPool.schedule(new Runnable() {@Overridepublic void run() {System.out.println("执行时间:" + simpleDateFormat.format(new Date()) + ",延迟3秒执行");}
}, 3, TimeUnit.SECONDS);scheduledThreadPool.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {System.out.println("执行时间:" + simpleDateFormat.format(new Date()) + ",每3秒执行一次");}
}, 0, 3, TimeUnit.SECONDS);

SingleThreadScheduledExecutor

通过 Executors 创建 ScheduledExecutorService 的代码如下:

ExecutorService singleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();

与 ScheduledThreadPool 相同 SingleThreadScheduledExecutor 也是具有定时调度和延迟调度能力的线程池,同样的 SingleThreadScheduledExecutor 也是通过 ScheduledThreadPoolExecutor 创建的,不同之处在于 ScheduledThreadPool 并不限制核心线程的数量,而 SingleThreadScheduledExecutor 只会创建一个核心线程

CachedThreadPool

通过 Executors 创建 CachedThreadPool 的代码如下:

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

CachedThreadPool 是可缓存线程的线程池,通过Executors#newSingleThreadExecutor方法获得,其源码如下:

public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

CachedThreadPool 的特点是没有核心线程,任务提交后会创建新线程执行,并且没有最大数量限制,每个线程空闲后的存活时间是 60 秒,如果 60 秒内有新的任务提交,会复用这些线程,也就实现了线程缓存的能力,工作队列使用 SynchronousQueue,它不存储任何元素,使用 SynchronousQueue 的目的是为了能够立即创建(使用)非核心线程(涉及到 ThreadPoolExecutor 的实现原理,后文详解)执行任务

WorkStealingPool

通过 Executors 创建 ScheduledExecutorService 的代码如下:

ExecutorService workStealingPool = Executors.newWorkStealingPool();

WorkStealingPool 是 Java 8 中加入的线程池,与之前的 5 种线程池直接或间接的回归到 ThreadPoolExecutor 不同,WorkStealingPool 是通过 ForkJoinPool 实现的(ForkJoinPool 与 ThreadPoolExecutor 都是 AbstractExecutorService 的实现),内部通过 Work-Stealing 算法并行的处理任务,但无法保证任务的执行顺序


Executor 框架是什么?

:::info
难易程度:🔥🔥
:::
:::warning
重要程度:🔥🔥🔥🔥
:::
:::success
面试公司:无
:::
Executor 用于执行 Runnable(Callable)任务,提供了将 Runnable(Callable) 与运行机制解耦的能力,屏蔽了线程创建,调度方式等。Java 中的注释是这样描述 Executor 的:

An object that executes submitted Runnable tasks. This interface provides a way of decoupling task submission from the mechanics of how each task will be run, including details of thread use, scheduling, etc. An Executor is normally used instead of explicitly creating threads.

Executor 的体系:
图1:Executor框架.png
Executor 体系中 ,ExecutorService 接口对 Executor 进行了扩展,提供了管理 Executor 和查看 Executor 状态的能力。

An Executor that provides methods to manage termination and methods that can produce a Future for tracking progress of one or more asynchronous tasks.

另外,我把 Executors 也加入到了 Executor 的体系结构中,虽然没有继承或者实现的关系,但是根据 Java 中的命名规范,Executors 是作为 Executor 的工具类出现的,从类图中可以看到 Executors 提供了
图2:Executors的类图.png


线程池都有哪些状态?

:::info
难易程度:🔥🔥
:::
:::warning
重要程度:🔥🔥🔥🔥🔥
:::
:::success
面试公司:无
:::
Java 中定义了线程池的 5 种状态:

private static final int RUNNING    = -1 << COUNT_BITS; // 111 0 0000 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 000 0 0000 0000 0000 0000 0000 0000 0000
private static final int STOP       =  1 << COUNT_BITS; // 001 0 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING    =  2 << COUNT_BITS; // 010 0 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED =  3 << COUNT_BITS; // 011 0 0000 0000 0000 0000 0000 0000 0000

注释中也非常详细的解释了每个状态:

RUNNING: Accept new tasks and process queued tasks
SHUTDOWN: Don’t accept new tasks, but process queued tasks
STOP: Don’t accept new tasks, don’t process queued tasks, and interrupt in-progress tasks
TIDYING: All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method
TERMINATED: terminated() has completed

  • RUNNING:接收新任务,并处理队列中的任务;
  • SHUTDOWN:不接收新任务,仅处理队列中的任务;
  • STOP:不接收新任务,不处理队列中的任务,中断正在执行的任务;
  • TIDYING:所有任务已经执行完毕,并且工作线程为 0,转换到 TIDYING 状态后将执行 Hook 方法ThreadPoolExecutor#terminated
  • TERMINATEDThreadPoolExecutor#terminated方法执行完毕,该状态表示线程池彻底终止。

另外注释中还详细的描述了线程池状态间转换的规则:

RUNNING -> SHUTDOWN On invocation of shutdown()
(RUNNING or SHUTDOWN) -> STOP On invocation of shutdownNow()
SHUTDOWN -> TIDYING When both queue and pool are empty
STOP -> TIDYING When pool is empty
TIDYING -> TERMINATED When the terminated() hook method has completed

  • RUNNING -> SHUTDOWN:通过调用ThreadPoolExecutor#shutdown方法;
  • RUNNING 或 SHUTDOWN -> STOP:通过通过调用ThreadPoolExecutor#shutdownNow方法;
  • SHUTDOWN -> TIDYING:工作线程为 0(没有处理中的任务),并且工作队列中待处理的任务为 0 时;
  • STOP -> TIDYING:工作线程为 0 时(没有处理中的任务);
  • TIDYING -> TERMINATED:TIDYING 状态下,调用ThreadPoolExecutor#terminated方法。

最后我们通过一张图来看下线程池状态之间的转换:
图3:线程池的状态转换.png


🔥线程池是如何实现的?

:::info
难易程度:🔥🔥🔥🔥🔥
:::
:::warning
重要程度:🔥🔥🔥🔥🔥
:::
:::success
面试公司:阿里巴巴,美团,蚂蚁金服
:::
我们通过一段演示代码尝试着推测线程池的执行过程,首先我们实现一个自定义的阻塞队列:

public class CustomBlockingQueue<E> extends LinkedBlockingQueue<E> {public CustomBlockingQueue(int capacity) {super(capacity);}@Overridepublic boolean offer(E e) {boolean result = super.offer(e);if (result) {System.out.println("添加到队列中");}return result;}
}

CustomBlockingQueue 继承自 LinkedBlockingQueue,并重写了LinkedBlockingQueue#offer方法,在元素成功添加到队列时输出一行日志,为的是能够清晰的展示线程池中任务添加到工作队列中的时机。
接着我们创建线程池:

ExecutorService threadPoolExecutor = new ThreadPoolExecutor(3,3,10,TimeUnit.SECONDS,new CustomBlockingQueue<>(3),new ThreadPoolExecutor.AbortPolicy());

线程池的容量是 9,即最大线程 6 个(核心线程 3 个,非核心线程 3 个,工作队列容量为 3),拒绝策略是 AbortPolicy,超出线程池的容量后,直接丢弃任务。
使用这个线程池同时执行 10 个任务:

for(int i = 1; i <= 10; i++) {int finalI = i;threadPoolExecutor.execute(() -> {String threadName = Thread.currentThread().getName();System.out.println(threadName + ",开始执行任务:" + finalI);try {// 为了能够长时间占用线程TimeUnit.SECONDS.sleep(15);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(threadName + ",结束执行任务:" + finalI);});// 为了能够实现按顺序提交任务TimeUnit.SECONDS.sleep(1);
}

执行上述代码,可以看到打印的日志为:
图4:线程池执行日志.png
根据执行结果,并且结合之前对于线程池参数的解释,我们就能推测出线程池的大致流程:

  1. 编号 1~3 的任务提交到线程池后,创建核心线程执行任务;
  2. 编号 4~6 的任务提交到线程池后,因为超出核心线程的数量,将任务添加到工作队列中;
  3. 编号 7~9 的任务提交到线程池后,因为工作队列已满,创建非核心线程执行任务;
  4. 编号 10 的任务提交到线程池后,因为超出线程池的容量,执行拒绝策略;
  5. 编号 1~3 的任务执行完毕后,核心线程空闲,取出工作队列中编号 4~6 的任务开始执行。

我们用一张图来展示上述代码中任务执行的顺序:
图5:线程池的执行顺序.png
以上是我们通过案例来推测出的线程池工作流程,接下来我们通过源码分析来印证我们推测的结果,并梳理线程池执行过程中的具体细节。

线程池的源码分析

CTL 与线程池状态

分析线程池执行流程前,我们先来看 ThreadPoolExecutor 中定义的主控状态 CTL 和线程池状态:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 111 0 0000 0000 0000 0000 0000 0000 0000private static final int COUNT_BITS = Integer.SIZE - 3;                 // 29
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;            // 0001 1111 1111 1111 1111 1111 1111 1111// 线程池状态
private static final int RUNNING    = -1 << COUNT_BITS;                 // 111 0 0000 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN   =  0 << COUNT_BITS;                 // 000 0 0000 0000 0000 0000 0000 0000 0000
private static final int STOP       =  1 << COUNT_BITS;                 // 001 0 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING    =  2 << COUNT_BITS;                 // 010 0 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED =  3 << COUNT_BITS;                 // 011 0 0000 0000 0000 0000 0000 0000 0000// 计算CTL
private static int ctlOf(int rs, int wc) {return rs | wc;
}// 获取线程池状态
private static int runStateOf(int c) { return c & ~COUNT_MASK;
}// 获取线程池工作线程数
private static int workerCountOf(int c) {return c & COUNT_MASK;
}

Doug Lea 采用了位掩码技术来设计 CTL,将 CTL 拆分成了 2 个部分,高 3 位表示线程池状态,低 29 位表示工作线程数量,因此线程池最多允许 536870911 个线程处于工作状态。
图6:线程池的主控状态CTL.png
Java 中采用补码的形式表示数字,最高位是符号位,0 表示正数,1 表示负数,RUNNING 状态在二进制表示中最高位是 1,是负数,其余状态的数值均为正数。结合上一题的分析,我们可以得到,线程池从运行状态到终止状态,随着状态的转换,每种状态的数值表示是逐渐增大的。
Tips:如果不熟悉位运算,可以参考我的另一篇文章《编程技巧:“高端”的位运算》。

线程池的任务执行

我们从任务执行的入口ThreadPoolExecutor#execute的源码开始:

public void execute(Runnable command) {// 校验提交的任务if (command == null) {throw new NullPointerException();}// 获取CTLint c = ctl.get();// STEP 1:工作线程数小于核心线程数if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) {return;}// addWorker方法执行失败时,重新获取CTLc = ctl.get();}// STEP 2:工作线程数大于核心线程数,但可以添加到工作队列中if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (!isRunning(recheck) && remove(command)) {reject(command);} else if (workerCountOf(recheck) == 0) {addWorker(null, false);}} // STEP 3:工作队列中无法继续添加任务else if (!addWorker(command, false)) {reject(command);}
}

ThreadPoolExecutor#execute源码中,可以将整体的执行逻辑分为 3 步(见注释)。详细分析这 3 步前,我们先来看频繁出现的ThreadPoolExecutor#addWorker方法,由于该方法的源码较长,我们拆开来分析。
ThreadPoolExecutor#addWorker方法的声明:

private boolean addWorker(Runnable firstTask, boolean core)

ThreadPoolExecutor#addWorker方法提供了两个参数:

  • Runnable firstTask,要执行的任务
  • boolean core,表示是否为核心线程

接着是ThreadPoolExecutor#addWorker方法源码的第一部分,检查线程池的状态及线程的数量:

retry:
for (int c = ctl.get();;) {// 检查线程池状态if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty()))return false;for (;;) {// 检查线程池的工作线程数量if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;// 增加线程池工作线程数量if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();if (runStateAtLeast(c, SHUTDOWN))continue retry;}
}

以上这部分代码是对线程池状态的检查,以及处理主控状态 CTL,我们来分析关建代码:

  • 第 4 行,检查线程池“最多”处于 SHUTDOWN 状态,因为只有在 SHUTDOWN 状态和 RUNNING 状态中,才会创建线程执行任务;
  • 第 8 行,根据入参boolean core决定创建线程数量的上限是小于 corePoolSize 还是小于 maxmunPoolSize;
  • 第 11 行,调用ThreadPoolExecutor#compareAndIncrementWorkerCount方法来增加 CTL 中保存的工作线程数量,成功后则跳出 retry 标签;
  • 第 13 行,如果程序运行到这里,修改 CTL 中工作线程数量失败,需要重新获取 CTL 并检查线程池状态。

注意ThreadPoolExecutor#compareAndIncrementWorkerCount方法采用了 CAS 技术,方法返回失败说明此时 CTL 已经被其它线程修改,需要重新获取 CTL 并检查线程池状态。
最后来看ThreadPoolExecutor#addWorker方法的第二部分源码,执行工作任务:

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {// 创建Worker对象w = new Worker(firstTask);// 获取到Worker中创建的线程final Thread t = w.thread;if (t != null) {// 使用ReentrantLock加锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int c = ctl.get();// 检查线程池状态,只有RUNNING状态和STOP之下的状态允许创建线程if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {// 检查线程状态if (t.getState() != Thread.State.NEW)throw new IllegalThreadStateException();workers.add(w);workerAdded = true;// 记录线程池中出现的最大线程数量int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;}} finally {mainLock.unlock();}if (workerAdded) {// 启动线程,开始执行任务t.start();workerStarted = true;}}
} finally {if (! workerStarted)addWorkerFailed(w);
}
return workerStarted;

以上这部分代码是创建线程并执行任务的方法,我们来分析关键部分:

  • 第 6 行,创建 Worker 对象,我们可以直接将 Worker 对象与 Thread 对象画上等号,认为创建了 Worker 对象就是创建了线程。
  • 第 16 行,依旧是对线程池状态的检查,这里验证了线程池状态“最多”处于 SHUTDOWN 状态,这个限制也不难理解,RUNNGING 状态是线程池的正常状态,允许创建线程并处理任务,SHUTDOWN 状态下允许处理工作队列中的线程,如果线程池中没有活跃的线程,需要创建线程来处理。

需要注意,这部分代码中出现的ThreadPoolExecutor#runStateLessThan方法与之前出现的ThreadPoolExecutor#runStateAtLeast方法在比较 CTL 与状态时在开闭区间上存在差异。
到这里整个ThreadPoolExecutor#addWorker的方法也就差不多分析完了,它实际上就做了两件事:

  • 检查线程池,包括状态检查,线程池中线程数量的检查
  • 创建 Worker 对象,并启动(相当于创建 Thread 对象并启动)

回过头来,我们结合对ThreadPoolExecutor#addWorker方法的分析,不难看出ThreadPoolExecutor#execute方法的 3 步都做了什么:

步骤线程池状态工作线程数队列状态处理方式
STEP 1RUNNING< corePoolSize核心线程数未满,创建核心线程处理任务
STEP2RUNNING= corePoolSize未饱和核心线程数已满,将任务添加到队列中
STEP3RUNNING>= corePoolSize已饱和核心线程和工作队列已满,创建非核心线程处理任务,创建失败则执行拒绝策略

至此,我们已经能够清晰的看到线程池的执行流程了,并且也可以印证我们通过演示代码来推测的线程池执行流程的正确性了。


线程池中的线程是什么时间创建的?

:::info
难易程度:🔥🔥🔥🔥
:::
:::warning
重要程度:🔥🔥🔥
:::
:::success
面试公司:无
:::
在上一题的分析中,我们已经知道线程池的线程是在提交任务后通过ThreadPoolExecutor#addWorker方法中创建的,具体是在 Worker 的构造方法中:

Worker(Runnable firstTask) {setState(-1);this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);
}

每个 Worker 对象中都持有一个通过 ThreadFactory 创建的线程,这也是为什么我将线程池中的线程与 Worker 对象画上等号。
除此之外,还可以调用ThreadPoolExecutor#prestartCoreThread来预创建线程,该方法源码如下:

public boolean prestartCoreThread() {return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true);
}

从源码中可以看到,每次调用只会预创建 1 个 Worker 对象(即 1 个核心线程),如果需要创建全部的核心线程,需要多次调用。


🔥线程池中的核心线程是如何复用的?

:::info
难易程度:🔥🔥🔥🔥🔥
:::
:::warning
重要程度:🔥🔥🔥🔥🔥
:::
:::success
面试公司:阿里巴巴,美团,蚂蚁金服
:::
线程池是通过 Worker 对象来完成线程的复用的。我们来看内部类 Worker 的类型声明:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable;

Worker 自身继承了 AQS,并实现了 Runnable 接口,说明 Worker 自身就是可被线程执行的。
接下来是 Worker 的构造方法:

Worker(Runnable firstTask) {setState(-1);this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);
}

可以看到,构造在创建线程时使用的 Runnable 对象是 Worker 对象自身,同时 Worker 对象通过成员变量 firstTask 保存了我们提交的 Runnable 对象。
回到ThreadPoolExecutor#addWorker方法中,我们来看线程启动的部分:

w = new Worker(firstTask);
final Thread t = w.thread;
t.start();

我们已经知道,Java 中调用Thread#start方法中,会执行Runnable#run方法,那么在这段代码中执行的是谁的 run 方法呢?答案是 Worker 对象重写的 run 方法。
我们来看Worker#run的源码:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {public void run() {runWorker(this);}
}

Worker#run方法调用了 runWorker 方法,该方法并不是内部类 Worker 的,而是 ThreadPoolExecutor 的方法,接着来看源码:

final void runWorker(Worker w) {Thread wt = Thread.currentThread();// 获取Worker对象中封装的RunnableRunnable task = w.firstTask;w.firstTask = null;w.unlock();boolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// 线程池状态检查if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);try {// 执行任务task.run();afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}

重点关注第 9 行的 while 循环,它有两个条件:

  • task != null,即 Worker 对象自身保存的 Runnable 不为 null;
  • (task = getTask()) != null,即通过ThreadPoolExecutor#getTask方法获取到的 Runnable 不为 null。

第一个条件不难想到,即首次提交时 Worker 会“携带” Runnable,那么第二个条件是从哪里获取到的待执行任务呢?答案是工作队列
我们来看ThreadPoolExecutor#getTask方法的源码:

private Runnable getTask() {boolean timedOut = false;for (;;) {// 状态检查int c = ctl.get();if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null;}// 获取工作线程数量,并判断是否为非核心线程int wc = workerCountOf(c);boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 通过工作队列获取待执行任务Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}

先来关注第 12 行中变量 timed 的赋值逻辑,有两个条件:

  • boolean allowCoreThreadTimeOut,ThreadPoolExecutor 的成员变量,可以通过ThreadPoolExecutor#allowCoreThreadTimeOut方法赋值,表示是否允许核心线程超时销毁,默认值是 false;
  • wc > corePoolSize,即工作线程数量是否超出核心线程的数量限制。

timed 变量关系到第 20 行代码中获取工作队列中待执行任务的方式:

  • 如果允许销毁核心线程,或工作线程数量已经超出核心线程数量限制,则通过workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)的方式获取待执行任务,该方法获取队首元素,如果当前队列为空,则在等待指定时间后返回 null
  • 如果不允许销毁核心线程,或工作线程数量小于核心线程数量,则通过workQueue.take()的方式获取待执行任务,该方法获取队首元素,如果队列为空,则进入等待,直到能够获取元素为止

知道了以上内容后,我们再来看ThreadPoolExecutor#runWorker方法的 while 循环,在不允许销毁核心线程的线程池中,核心线程第一次执行 Worker 自身“携带”的任务,执行完毕后再次进入 while 循环,尝试获取工作队列中的待执行任务,如果此时工作队列中没有待执行任务,则工作队列进入阻塞,此时 runWorker 方法也进入阻塞,直到工作队列能够成功返回待执行任务
简单来说,线程池的核心线程复用,在于核心线程启动后就进入“不眠不休”的工作中,除了执行首次提交的任务外,还会不断尝试从工作队列中获取待执行任务,如果无法获取就阻塞到能够获取任务为止。


线程池的非核心线程是什么时候销毁的?

:::info
难易程度:🔥🔥🔥🔥🔥
:::
:::warning
重要程度:🔥🔥🔥🔥🔥
:::
:::success
面试公司:无
:::
非核心线程的销毁依旧是在ThreadPoolExecutor#runWorker方法中进行的。
我们回到ThreadPoolExecutor#getTask方法中:

private Runnable getTask() {// 标记是否超时boolean timedOut = false;for (;;) {int c = ctl.get();if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}

假设现在的情况是非核心线程通过ThreadPoolExecutor#getTask方法获取工作队列中的待执行任务,而此时工作队列中已经没有待执行的任务了。
第一次进入循环时,第 11 行代码会因为wc> corePoolSize而将 timed 赋值为 true,此时wc == maximumPoolSize且 timedOut 为 false,并不满足第 12 行的判断条件,会直接来到第 18 行通过workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)的方式从工作队列中获取待执行任务,在等待一定的时间后,工作队列返回了 null,此时进入第 21 行,将 timedOut 赋值为 true 后再次进入循环。
第二次进入循环时,变量 timed 依旧被赋值为 true,与第一次进入循环不同的是,此时的 timedOut 为 ture,已经满足第 12 行的判断条件,执行 if 语句的内容,调用ThreadPoolExecutor#compareAndDecrementWorkerCount方法减少 CTL 中工作线程的数量,并且返回 null。
重点在这个返回的 null 上,当ThreadPoolExecutor#getTask返回 null 后,ThreadPoolExecutor#runWorker会跳出循环,执行 finally 中的ThreadPoolExecutor#processWorkerExit方法:

private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;// 从workers中删除workerworkers.remove(w);} finally {mainLock.unlock();}// 尝试修改线程池状态tryTerminate();// 处理STOP之下的状态int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; }addWorker(null, false);}
}

该方法中,从 ThreadPoolExecutor 的成员变量 workers 中删除 Worker 对象后,其内部的线程也将执行完毕,随后会调用Thread#exit方法来销毁线程,即表示线程池中该线程已经销毁。


ThreadPoolExecutor#submit 方法和 ThreadPoolExecutor#execute 方法有什么区别?

:::info
难易程度:🔥🔥
:::
:::warning
重要程度:🔥🔥🔥
:::
:::success
面试公司:无
:::
ThreadPoolExecutor 的实现中,ThreadPoolExecutor#submitThreadPoolExecutor#execute有 4 处区别:

ThreadPoolExecutor#submitThreadPoolExecutor#execute
方法定义ExecutorService 接口中定义Executor 接口中定义
返回值有返回值无返回值
任务类型Runnable 任务和 Callable 任务Runnable 任务
实现AbstractExecutorService 中实现ThreadPoolExecutor 中实现

Java 中提供了哪些拒绝策略?

:::info
难易程度:🔥🔥
:::
:::warning
重要程度:🔥🔥🔥
:::
:::success
面试公司:无
:::
Java 中提供了 4 种拒绝策略:

  • CallerRunsPolicy:由调用ThreadPoolExecutor#execute方法的线程执行该任务;
  • AbortPolicy:直接抛出异常;
  • DiscardPolicy:丢弃当前任务;
  • DiscardOldestPolicy:丢弃工作队列中队首的任务,即最早加入队列中的任务,并将当前任务添加到队列中。

这 4 种拒绝策略被定义为 ThreadPoolExecutor 的内部类,源码如下:

public class ThreadPoolExecutor extends AbstractExecutorService {public static class CallerRunsPolicy implements RejectedExecutionHandler {public CallerRunsPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}}public static class AbortPolicy implements RejectedExecutionHandler {public AbortPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());}}public static class DiscardPolicy implements RejectedExecutionHandler {public DiscardPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}public static class DiscardOldestPolicy implements RejectedExecutionHandler {public DiscardOldestPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}}
}

如果 Java 内置的拒绝策略无法满足业务需求,可以自定义拒绝策略,只需要实现 RejectedExecutionHandler 接口即可。


🔥什么是阻塞队列?

:::info
难易程度:🔥🔥🔥🔥
:::
:::warning
重要程度:🔥🔥🔥
:::
:::success
面试公司:蚂蚁金服
:::
阻塞队列是一种特殊的队列,相较于普通的队列,阻塞队列提供了一个额外的功能,当队列为空时,获取队列中元素的线程会被阻塞
Java 中提供了 7 种阻塞队列,全部是继承自 BlockingQueue 接口:
图7:BlockingQueue的体系.png
BlockingQueue 接口继承了 Queue 接口,而 Queue 接口继承了 Collection 接口。使用时,如果需要阻塞队列的能力,要选择单独定义在 BlockingQueue 接口中的方法。
以下是阻塞队列在实现不同接口方法时的差异:

操作方法特点
Collection 接口添加元素boolean add(E e)失败时抛出异常
删除元素boolean remove(Object o)失败时抛出异常
Queue 接口入队操作boolean offer(E e)成功返回 true,失败返回 false
出队操作E poll()成功返回出队元素,失败返回 null
BlockingQueue 接口入队操作void put(E e)无法入队时阻塞当前线程,直到入队成功
boolean offer(E e, long timeout, TimeUnit unit)无法入队时阻塞当前线程,直到入队成功或超时
出队操作E take()队列中无元素时阻塞当前线程,直到有元素可出队
E poll(long timeout, TimeUnit unit)队列中无元素时阻塞当前线程,直到有元素可出队或超时

接下来我们来看这 7 种阻塞队列的特点。

ArrayBlockingQueue

ArrayBlockingQueue 底层使用数组作为存储结构,需要指定阻塞队列的容量(数组的特点),且不具备扩容能力。除此之外,ArrayBlockingQueue 还提供了公平访问和非公平访问两种模式,默认使用非公平访问模式

// 非公平访问的ArrayBlockingQueue
ArrayBlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<>(1000);// 公平访问模式的ArrayBlockingQueue
ArrayBlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<>(1000, true);

公平访问模式指的是,在队列中无元素时阻塞的线程,会在队列可用时按照阻塞的先后顺序访问队列;而非公平访问模式下,则是随机一个线程获取访问权限。公平访问模式保证了先后顺序,但是为了维护这个先后顺序,需要付出额外的性能作为代价。
需要额外注意的是,ArrayBlockingQueue 在入队和出队操作上使用同一把锁,也就是说一个线程的入队操作,不仅会阻塞其它线程的入队操作,还会阻塞其它线程的出队操作

LinkedBlockingQueue

LinkedBlockingQueue 底层使用单向链表作为存储结构,并且提供了默认的容量**Integer.MAX_VALUE**,即在不指定容量时 LinkedBlockingQueue 是“无限大”的,也即是常说的无界队列

LinkedBlockingQueue<Runnable> linkedBlockingQueue = new LinkedBlockingQueue<>();LinkedBlockingQueue<Runnable> linkedBlockingQueue = new LinkedBlockingQueue<>(1000);

LinkedBlockingQueue 内部使用两把独立的锁来控制入队和出队,这意味着入队和出队操作是相互独立的,并不会互相阻塞

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {private final ReentrantLock takeLock = new ReentrantLock();private final ReentrantLock putLock = new ReentrantLock();
}

PriorityBlockingQueue

PriorityBlockingQueue 底层使用数组作为存储结构,默认容量为 11,支持自动扩容,支持元素按照优先级排序,可以通过自定义实现 Comparator 接口来实现排序功能
例如,创建 Student 类,并允许根据 studenId 进行排序:

@Getter
@Setter
public class Student implements Comparable<Student> {private Integer studentId;private String name;@Overridepublic int compareTo(Student student) {return this.studentId.compareTo(student.studentId);}public static void main(String[] args) {PriorityBlockingQueue<Student> priorityBlockingQueue = new PriorityBlockingQueue<>();for(int i = 0; i < 10; i++) {Random random = new Random();Student student = new Student();Integer studentId = random.nextInt(1000);student.setStudentId(studentId);student.setName("name-"+ studentId);priorityBlockingQueue.put(student);}}
}

注意,PriorityBlockingQueue 在入队后并不能完全保证队列中元素的优先级顺序,即队列中存储的元素可能是乱序的,但在出队时是按照优先级顺序出队

DelayQueue

DelayQueue 底层使用 PriorityQueue 作为存储结构,因此 DelayQueue 具有 PriorityQueue 的特点,比如默认容量为 11,支持自动扩容,支持元素按照优先级排序。DelayQueue 自身的特点是延时获取元素,即在元素创建指定时间后才能够从队列中获取元素,为了实现这个功能,入队的元素必须实现 Delayed 接口
我们来创建一个实现 Delayed 接口的元素:

public class DelayedElement implements Delayed {private final Long createTime;private final Long delayTIme;private final TimeUnit delayTimeUnit;public DelayedElement(long delayTime, TimeUnit delayTimeUnit) {this.createTime = System.currentTimeMillis();this.delayTIme = delayTime;this.delayTimeUnit = delayTimeUnit;}@Overridepublic long getDelay(TimeUnit unit) {long duration = createTime + TimeUnit.MILLISECONDS.convert(this.delayTIme, this.delayTimeUnit) - System.currentTimeMillis();return unit.convert(duration, TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {DelayedElement delayedElement = (DelayedElement) o;return this.createTime.compareTo(delayedElement.createTime);}

注意Delayed#getDelay方法的实现,这里的返回值并不是固定的延时时间,而是期望的延时时间与当前时间的差值,只有差值小于等于 0 时该元素才能出队。这里我选择在构造方法中传入延时时间与时间单位,并记录对象的创建时间,Delayed#getDelay方法通过 (创建时间+延时时间-当前时间)的方式计算差值。至于Comparable#compareTo方法的实现,与 DelayQueue 的使用与其它阻塞队列并无差别。

SynchronousQueue

SynchronousQueue 不存储元素,它的每次入队操作都要对应一次出队操作,否则不能继续添加元素
注意,SynchronousQueue 在入队成功后会阻塞线程,直到其他线程执行出队操作,同样的,出队操作时如果没有提前执行入队操作也会被阻塞,也就是说无法在一个线程中完成 SynchronousQueue 的入队操作和出队操作。

SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>();new Thread(() -> {for (int i = 0; i < 10; i++) {try {synchronousQueue.put(i);System.out.println("put-" + i);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}).start();TimeUnit.MILLISECONDS.sleep(100);new Thread(() -> {for (int i = 0; i < 10; i++) {try {System.out.println("take-" + synchronousQueue.take());} catch (InterruptedException e) {throw new RuntimeException(e);}}
}).start();

LinkedTransferQueue

LinkedTransferQueue 的底层使用单向链表作为存储结构,相较于其他阻塞队列 LinkedTransferQueue 还实现了 TransferQueue 接口,实现了类似于 SynchronousQueue 传递元素的功能,但与 SynchronousQueue 不同的是,LinkedTransferQueue 是能够存储元素的。
LinkedTransferQueue 实现了 TransferQueue 接口的 3 个方法:

public interface TransferQueue<E> extends BlockingQueue<E> {void transfer(E e) throws InterruptedException;boolean tryTransfer(E e);boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedExceptio)throws InterruptedException;
}

以上 3 个方法,在消费者等待获取元素(调用 take 方法或 poll 方法)时,会立刻将元素传递给消费者,它们 3 者的差异在于没有消费者等待获取元素时的处理方式:

  • transfer 方法,该方法会阻塞线程,直到传入的元素被消费;
  • tryTransfer 方法,该方法会直接返回 false,并且放弃该元素(即不会存储到队列中);
  • 带有超时时间的 tryTransfer 方法,等待指定的时间,如果依旧没有消费者消费该元素,返回 false。

我们写个例子来测试LinkedTransferQueue#tryTransfer方法:

LinkedTransferQueue<Integer> linkedTransferQueue = new LinkedTransferQueue<>();new Thread(() -> {for (int i = 0; i < 10; i++) {if (i == 3) {linkedTransferQueue.tryTransfer(i);} else {linkedTransferQueue.put(i);}}
}).start();TimeUnit.SECONDS.sleep(2);for (int i = 0; i < 10; i++) {System.out.println("take-" + linkedTransferQueue.take());
}

启动线程向 linkedTransferQueue 中添加元素,第 4 个元素调用LinkedTransferQueue#tryTransfer来传递元素,主线程等待两秒后从 linkedTransferQueue 中取出元素,可以看到队列中并未存储第 4 个元素。
另外,如果 LinkedTransferQueue 中已经存在元素,take 方法或者 poll 方法优先获取的是队首元素,而不是通过LinkedTransferQueue#transfeLinkedTransferQueue#tryTransfe传入的元素。

LinkedBlockingDeque

LinkedBlockingDeque 底层使用双向链表作为存储结构,与 LinkedBlockingQueue 相同,LinkedBlockingDeque 的默认容量为**Integer.MAX_VALUE**,不同的是 LinkedBlockingDeque 实现了 BlockingDeque 接口,允许操作队首和队尾的元素
LinkedBlockingDeque 的方法中,名称中含有 first 或 last 的公有方法均实现自 BlockingDeque 接口或 Deque 接口,例如:

public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable {public void addFirst(E e) {}public void addLast(E e) {}public boolean offerFirst(E e) {}public boolean offerLast(E e) {}public void putFirst(E e) throws InterruptedException{}public void putLast(E e) throws InterruptedException {}public E pollFirst() {}public E pollLast() {}public E takeFirst() throws InterruptedException {}public E takeLast() throws InterruptedException {}
}

什么是无界队列?使用无界队列会出现什么问题?

:::info
难易程度:🔥🔥🔥
:::
:::warning
重要程度:🔥🔥🔥
:::
:::success
面试公司:无
:::
无界队列指的是队列的容量是否为“无限大”,当然这并不是真正意义上无限大,而是指一个非常大的值。例如,创建 LinkedBlockingQueue 时,如果使用无参构造器,LinkedBlockingQueue 的容量会被设置为Integer.MAX_VALUE,那么它就是无界队列。
LinkedBlockingQueue 构造方法源码如下:

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}public LinkedBlockingQueue(int capacity) {if (capacity <= 0) {throw new IllegalArgumentException();}this.capacity = capacity;last = head = new Node<E>(null);}
}

Integer.MAX_VALUE的值约为 21 亿,当程序异常时或并发较大的情况下,可能会无限制的向阻塞队列中添加任务,导致内存溢出。通常在设置线程池的工作队列时,需要根据具体的需求来计算出较为合适的队列容量。
另外,在 Java 的内置线程池中,部分线程池使用了 LinkedBlockingQueue 且未指定容量,如:Executors#newFixedThreadPoolExecutors#newSingleThreadExecutor中创建的线程池使用了未指定容量的 LinkedBlockingQueue,这两个线程池中的队列即为无界队列。这也是为什么阿里巴巴在《Java 开发手册》中提示到,不要使用 Java 内置线程池的一个原因。


🔥如何合理的设置线程池的参数?

:::info
难易程度:🔥🔥🔥🔥🔥
:::
:::warning
重要程度:🔥🔥🔥🔥🔥
:::
:::success
面试公司:阿里巴巴,美团,蚂蚁金服
:::
Java 提供了 7 个参数来实现自定义线程池:

  • int corePoolSize:核心线程数量
  • int maximumPoolSize:最大线程数量
  • long keepAliveTime:非核心线程存活时间
  • TimeUnit unit:非核心线程存活时间的单位
  • BlockingQueue<Runnable> workQueue:阻塞队列
  • ThreadFactory threadFactory:线程工厂
  • RejectedExecutionHandler handler:拒绝策略

虽然参数众多,但我们关注的重点在 corePoolSize 和 maximumPoolSize 这两个参数上。
通常在 corePoolSize 的设置上,会将程序区分为 IO 密集型和 CPU 密集型进行 corePoolSize 的设置:

  • CPU 密集型: C P U 核心数 + 1 CPU核心数+1 CPU核心数+1
  • IO 密集型: C P U 核心数 × 2 CPU核心数\times 2 CPU核心数×2

在 IO 密集型程序的 corePoolSize 设置上,还有另一种方案: ( 线程等待时间 线程执行时间 + 1 ) × C P U 核心数 (\frac{线程等待时间}{线程执行时间} + 1)\times CPU核心数 (线程执行时间线程等待时间+1)×CPU核心数
例如,假设每个线程的执行时间为 1 秒,而线程等待获取 CPU 的时间为 2 秒,CPU 核心数为 16,那么根据以上公式可以得到: ( 2 1 + 1 ) × 16 = 48 (\frac{2}{1}+1)\times16=48 (12+1)×16=48,即将 corePoolSize 设置为 48.
IO 密集型程序允许设置较大的 corePoolSize 的原因是,CPU 可能有大量时间都在等待 IO 操作而处于空闲状态,设置较大的 corePoolSize 可以提升 CPU 的利用率,但这么做的另一个问题是,IO 操作的速度是远低于 CPU 的计算速度的, 程序的性能瓶颈会暴露在“低效”的 IO 操作上
除了以上两种方案外,美团在Java线程池实现原理及其在美团业务中的实践中也提到了一些 corePoolSize 和 maximumPoolSize 的设置方案:
图8:合理设置线程池.png
那么以上的计算方案哪个才是线程池设置的“银剑”呢?实际上,以上的方案都只能应对一些特定的场景,而程序往往是复杂多变,且不可预估的,没有哪一种理论方案可以应对所有的场景。
在设置线程池时,会根据以上理论公式预估出较为合理的初始设置,随后通过压测来调整线程池在极端场景的合理设置。当然,程序不会一直处于这种极端场景,如果有能力实现线程池的监控,可以根据实时情况调整线程池,保证程序运行的稳定性。
ThreadPoolExecutor 提供了一些方法,可以用于监控并动态调整线程池:

public class ThreadPoolExecutor extends AbstractExecutorService {// 设置线程工厂public void setThreadFactory(ThreadFactory threadFactory) {}// 设置聚聚策略public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {}// 设置核心线程数public void setCorePoolSize(int corePoolSize) {}// 设置最大线程数public void setMaximumPoolSize(int maximumPoolSize) {}// 设置非核心线程存活时间public void setKeepAliveTime(long time, TimeUnit unit) {}// 获取正在执行任务的大致线程数量public int getActiveCount() {}// 获取线程池中曾经出现过的最大的活跃线程数public int getLargestPoolSize() {}// 获取已经完成执行和待执行的大致任务数量public long getTaskCount() {}// 获取已经完成执行的大致任务数量public long getCompletedTaskCount() {}}

借助以上方法,并添加对执行线程的监控,可以完成线程池的动态调整,以达到线程池在任何场景下都处于最佳设置中。

参考资料

  • 一文彻底了解线程池
  • 编程技巧:“高端”的位运算
  • Java线程池实现原理及其在美团业务中的实践

如果本文对你有帮助的话,还请多多点赞支持。如果文章中出现任何错误,还请批评指正。最后欢迎大家关注分享硬核Java技术的金融摸鱼侠王有志,我们下次再见!

相关文章:

万字长文详解Java线程池面试题

王有志&#xff0c;一个分享硬核 Java 技术的互金摸鱼侠 加入 Java 人的提桶跑路群&#xff1a;共同富裕的Java人 今天是《面霸的自我修养》第 6 篇文章&#xff0c;我们一起来看看面试中会问到哪些关于线程池的问题吧。数据来源&#xff1a; 大部分来自于各机构&#xff08;J…...

【jQuery入门】链式编程、修改css、类操作和className的区别

文章目录 前言一、链式编程二、修改css2.1 获取css的值2.2 设置单个css属性2.3 设置类样式添加类移除类切换类 三、类操作与className的区别总结 前言 jQuery是一个流行的JavaScript库&#xff0c;广泛用于简化DOM操作和处理事件。在jQuery中&#xff0c;链式编程是一种强大的…...

使用的uview 微信高版本 头像昵称填写能力

<template><view><button class"cu-btn block bg-blue margin-tb-sm lg" tap"wxGetUserInfo">一键登录</button><view><!-- 提示窗示例 --><u-popup :show"show" background-color"#fff">&…...

Hadoop3完全分布式搭建

一、第一台的操作搭建 修改主机名 使用hostnamectl set-hostname 修改当前主机名 关闭防火墙和SELlinux 1&#xff0c;使用 systemctl stop firewalld systemctl disable firewalld 关闭防火墙 2&#xff0c;使用 vim /etc/selinux/config 修改为 SELINUXdisabled 使用N…...

中断——外部中断EXIT

前期疑问&#xff1a;中断可以分成外部中断和内部中断吗 文章目录 前言一、中断知识二、中断编程三、EXIT外部中断/事件控制器 3.1 中断事件线3.2 EXTI初始化结构体详解 四、软件设计 4.1 编程要点 五、代码回顾实现六、补充中断知识总结 前言 野火中断章节有这样一句话 【F…...

Kafka-服务端-副本机制

Kafka从0.8版本开始引入副本(Replica)的机制&#xff0c;其目的是为了增加Kafka集群的高可用性。 Kafka实现副本机制之后&#xff0c;每个分区可以有多个副本&#xff0c;并且会从其副本集合(Assigned Replica,AR)中选出一个副本作为Leader副本&#xff0c;所有的读写请求都由…...

银行数据仓库体系实践(4)--数据抽取和加载

1、ETL和ELT ETL是Extract、Transfrom、Load即抽取、转换、加载三个英文单词首字母的集合&#xff1a; E&#xff1a;抽取&#xff0c;从源系统(Souce)获取数据&#xff1b; T&#xff1a;转换&#xff0c;将源系统获取的数据进行处理加工&#xff0c;比如数据格式转化、数据精…...

云计算入门——Linux 命令行入门

云计算入门——Linux 命令行入门 前些天发现了一个人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;最重要的屌图甚多&#xff0c;忍不住分享一下给大家。点击跳转到网站。 介绍 如今&#xff0c;我们许多人都熟悉计算机&#xff08;台式机和笔记本电…...

自然语言处理(NLP)的发展

自然语言处理的发展 随着深度学习和大数据技术的进步&#xff0c;自然语言处理取得了显著的进步。人们正在研究如何使计算机更好地理解和生成人类语言&#xff0c;以及如何应用NLP技术改善搜索引擎、语音助手、机器翻译等领域。 方向一&#xff1a;技术进步 自然语言处理&…...

让uniapp小程序支持多色图标icon:iconfont-tools-cli

前景&#xff1a; uniapp开发小程序项目时&#xff0c;对于iconfont多色图标无法直接支持&#xff1b;若将多色icon下载引入项目则必须关注包体&#xff0c;若将图标放在oss或者哪里管理&#xff0c;加载又是一个问题&#xff0c;因此大多采用iconfont-tools工具&#xff0c;但…...

丹麦公司注册优势 丹麦公司注册条件 丹麦公司注册注意事项

丹麦公司注册优势 1、开-放的商业环境&#xff0c;拥有公平透明的商业法律和制度。 2、简化的注册流程&#xff0c;无需繁琐的审批程序和复杂的材料准备。 3、全球认可的声誉&#xff0c;有助于提升贵公司的国际形象。 4、该国的政-府在坚持适度紧缩的财政政策&#xff0c;…...

C++PythonC# 三语言OpenCV从零开发(4):视频流读取

文章目录 相关链接视频流读取CCSharpPython 总结 相关链接 C&Python&Csharp in OpenCV 专栏 【2022B站最好的OpenCV课程推荐】OpenCV从入门到实战 全套课程&#xff08;附带课程课件资料课件笔记&#xff09; OpenCV 教程中文文档|OpenCV中文 OpenCV教程中文文档|W3Csc…...

vue element MessageBox.prompt this.$prompt组件禁止显示右上角关闭按钮,取消按钮,及点击遮罩层关闭

vue element MessageBox.prompt this.$prompt组件禁止或取消显示右上角关闭按钮&#xff0c;取消按钮&#xff0c;及点击遮罩层关闭 实现效果&#xff1a; 实现代码 MessageBox.prompt(请先完成手机号绑定, 系统提示, {confirmButtonText: 提 交,showClose: false,closeOnClic…...

Oracle 日常健康脚本

文章目录 摘要常用脚本 摘要 保持 Oracle 数据库的良好健康状况对于系统的可靠性和性能至关重要。本文将介绍一些常用的 Oracle 日常健康脚本&#xff0c;帮助您监控数据库并及时识别潜在的问题&#xff0c;以保证数据库的稳定运行。 常用脚本 1.查询数据库实例和实例级别的…...

leetcode670最大交换

给定一个非负整数&#xff0c;你至多可以交换一次数字中的任意两位。返回你能得到的最大值。 示例 1 : 输入: 2736 输出: 7236 解释: 交换数字2和数字7。 示例 2 : 输入: 9973 输出: 9973 解释: 不需要交换。 注意: 给定数字的范围是 [0, 108] int maximumSwap(int num) {…...

XML 注入漏洞原理以及修复方法

漏洞名称&#xff1a;XML注入 漏洞描述&#xff1a;可扩展标记语言 (Extensible Markup Language, XML) &#xff0c;用于标记电子文件使其具 有结构性的标记语言&#xff0c;可以用来标记数据、定义数据类型&#xff0c;是一种允许用户对自己的标记语言进行定义的源语言。 XM…...

x-cmd pkg | dasel - JSON、YAML、TOML、XML、CSV 数据的查询和修改工具

目录 简介首次用户快速实验指南基本功能性能特点竞品进一步探索 简介 dasel&#xff0c;是数据&#xff08;data&#xff09;和 选择器&#xff08;selector&#xff09;的简写&#xff0c;该工具使用选择器查询和修改数据结构。 支持 JSON&#xff0c;YAML&#xff0c;TOML&…...

Oracle 19c RAC集群管理 ---------关键参数以及常用命令

Oracle 19c RAC集群管理 ---------关键参数 Oracle 19C RAC 参数最佳实践 --开启强制归档 ALTER DATABASE FORCE LOGGING; --设置 30分钟 强制归档 ALTER SYSTEM SET ARCHIVE_LAG_TARGET1800 SCOPEBOTH SID*; --设置期望undo保持时间3h ALTER SYSTEM SET UNDO_RETENTION21600…...

时限挑战——深度解析Pytest插件 pytest-timeout

在软件开发中&#xff0c;测试用例的执行时间通常是一个关键考虑因素。Pytest插件 pytest-timeout 提供了一个强大的插件&#xff0c;允许你设置测试用例的超时时间。本文将深入介绍 pytest-timeout 插件的基本用法和实际案例&#xff0c;助你精确掌控测试用例的执行时限。 什么…...

Java入门篇:打造你的Java开发环境——从零开始配置IDEA与Eclipse

引言 “工欲善其事,必先利其器” 作为每一位Java初学者的必经之路,搭建合适的开发环境是至关重要的第一步。本篇将详细指导你如何安装并配置两大主流Java开发工具——IntelliJ IDEA和Eclipse,助你在编程之旅上迈出坚实的第一步。 一、Java开发环境准备 1. 下载并安装Java D…...

文本批量处理大师:简化文本处理,释放无限生产力!

在数字化时代&#xff0c;我们每天都要处理大量的文本数据&#xff0c;无论是办公文档、网页内容还是社交媒体帖子。然而&#xff0c;面对海量的信息&#xff0c;传统的一键式操作已经无法满足我们的需求。我们需要一个更高效、更智能的工具来提升我们的工作效率。今天&#xf…...

Go 方法

第 1 章 方法 Go 语言也支持面向对象的思想&#xff1b;所谓面向对象编程&#xff1a;1对象就是简单的一个值或者变量&#xff0c;并且拥有其方法2方法是某种特定类型的函数3 面向对象编程就是使用方法来描述每个数据结构的属性和操作&#xff1b; 使用者不需要了解对象本身的…...

深度学习与大数据在自然语言处理中的应用与进展

引言 在当今社会&#xff0c;深度学习和大数据技术的快速发展为自然语言处理&#xff08;NLP&#xff09;领域带来了显著的进步。这种技术能够使计算机更好地理解和生成人类语言&#xff0c;从而推动了搜索引擎、语音助手、机器翻译等领域的创新和改进。 NLP的发展与技术进步…...

GPT4+Python近红外光谱数据分析及机器学习与深度学习建模

详情点击链接&#xff1a;GPT4Python近红外光谱数据分析及机器学习与深度学习建模 第一&#xff1a;GPT4 1、ChatGPT&#xff08;GPT-1、GPT-2、GPT-3、GPT-3.5、GPT-4模型的演变&#xff09; 2、ChatGPT对话初体验 3、GPT-4与GPT-3.5的区别&#xff0c;以及与国内大语言模…...

Java项目:12 Springboot的垃圾回收管理系统

作者主页&#xff1a;源码空间codegym 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 1.介绍 垃圾分类查询管理系统&#xff0c;对不懂的垃圾进行查询进行分类并可以预约上门回收垃圾。 让用户自己分类垃圾&#xff0c; 按国家标准自己分类…...

HarmonyOS自定义弹出对话框CustomDialog并传递变量

HarmonyOS定义了一系列弹窗反馈类的组件​ 和前端开发框架VUE3配套生态库element plus中的提供各种组件相比,还是要少一些。可能是手机端操作和PC端操作的差异导致的​ 如果内置的弹窗不满足要求,可以基于CustomDialog自定义出各种个性化的反馈组件。 首先新建一个ets文件,…...

React16源码: React中的renderRoot的错误处理的源码实现

renderRoot的错误处理 1 &#xff09;概述 在 completeWork这个方法之后, 再次回到 renderRoot 里面在 renderRoot 里面执行了 workLoop, 之后&#xff0c;对 workLoop 使用了try catch如果在里面有任何一个节点在更新的过程当中 throw Error 都会被catch到catch到之后就是错误…...

强化学习:MuJoCo机器人强化学习仿真入门(1)

声明&#xff1a;我们跳过mujoco环境的搭建&#xff0c;搭建环境不难&#xff0c;可自行百度 下面开始进入正题&#xff08;需要有一定的python基础与xml基础&#xff09;&#xff1a; 下面进入到建立机器人模型的部分&#xff1a; 需要先介绍URDF模型文件和导出MJCF格式 介绍完…...

8.Gateway服务网关

3.Gateway服务网关 Spring Cloud Gateway 是 Spring Cloud 的一个全新项目&#xff0c;该项目是基于 Spring 5.0&#xff0c;Spring Boot 2.0 和 Project Reactor 等响应式编程和事件流技术开发的网关&#xff0c;它旨在为微服务架构提供一种简单有效的统一的 API 路由管理方式…...

JVM篇----第四篇

系列文章目录 文章目录 系列文章目录前言一、虚拟机栈(线程私有)二、本地方法区(线程私有)三、你能保证 GC 执行吗?四、怎么获取 Java 程序使用的内存?堆使用的百分比?前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到…...