线程池
三种方法、七大参数、四种拒绝策略
使用线程池的优点
- 降低系统的资源开销、提高系统响应速度
- 降低线程创建回收频率,在 1:1 线程模型下,减少操作系统操作线程带来的延迟
- 防止开发人员无意识频繁创建线程耗尽系统资源
- 方便线程的管理
- 维护线程状态(ID等)
- 维护任务执行状态
线程复用、最大并发数、管理线程
使用线程池后,不再使用
new Thread()
的方式创建线程,而是使用线程池创建线程
阿里巴巴Java编程规约
线程池不允许使用Executors
创建,而是通过ThreadPoolExecutor
的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源消耗的风险
FixedThreadPool
和SingleThreadPool
允许的请求队列长度为
Integer_MAX_VALUE
,可能会堆积大量的请求,从而导致OOMCacheedThreadPool
和ScheduledThreadPool
允许的创建数量为
integer_MAX_VALUE
,可能会创建大量的线程,从而导致OOM
三大方法
Executors的三大方法
newSingleThreadExecutor()
——单线程线程池
newFixedThreadPool(int)
——固定数量线程的线程池
newCachedThreadPool()
——可伸缩线程池创建线程:
- 同步
executorService.execute(Runnable runnable)
- 异步
executorService.submit(Runnable runnable)关闭线程池:
executorService.shutDown()
- SingleThreadExecutor
可以看到确实只有一个线程
- FixedThreadPool
创建拥有9个线程的线程池,开10个线程,发现线程2被复用了
- CachedThreadPool
可以看到可伸缩的CachedThreadPool
自动创建了5个线程,程序申请10个线程,线程池中的5个线程都被复用了一次
ThreadPoolExecutor
ThreadPoolExecutor 自身有哪些状态
继承关系
Executor ← ExecutorService ← AbstractExecutorService ← ThreadPoolExecutor
/*** The main pool control state, ctl, is an atomic integer packing* two conceptual fields* workerCount, indicating the effective number of threads* runState, indicating whether running, shutting down etc** In order to pack them into one int, we limit workerCount to* (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2* billion) otherwise representable. If this is ever an issue in* the future, the variable can be changed to be an AtomicLong,* and the shift/mask constants below adjusted. But until the need* arises, this code is a bit faster and simpler using an int.** The workerCount is the number of workers that have been* permitted to start and not permitted to stop. The value may be* transiently different from the actual number of live threads,* for example when a ThreadFactory fails to create a thread when* asked, and when exiting threads are still performing* bookkeeping before terminating. The user-visible pool size is* reported as the current size of the workers set.** The runState provides the main lifecycle control, taking on values:** RUNNING: Accept new tasks and process queued tasks* SHUTDOWN: Don't accept new tasks, but process queued tasks* STOP: Don't accept new tasks, don't process queued tasks,* and interrupt in-progress tasks* TIDYING: All tasks have terminated, workerCount is zero,* the thread transitioning to state TIDYING* will run the terminated() hook method* TERMINATED: terminated() has completed** The numerical order among these values matters, to allow* ordered comparisons. The runState monotonically increases over* time, but need not hit each state. The transitions are:** RUNNING -> SHUTDOWN* On invocation of shutdown(), perhaps implicitly in finalize()* (RUNNING or SHUTDOWN) -> STOP* On invocation of shutdownNow()* SHUTDOWN -> TIDYING* When both queue and pool are empty* STOP -> TIDYING* When pool is empty* TIDYING -> TERMINATED* When the terminated() hook method has completed** Threads waiting in awaitTermination() will return when the* state reaches TERMINATED.** Detecting the transition from SHUTDOWN to TIDYING is less* straightforward than you'd like because the queue may become* empty after non-empty and vice versa during SHUTDOWN state, but* we can only terminate if, after seeing that it is empty, we see* that workerCount is 0 (which sometimes entails a recheck -- see* below).*/// 初始状态下线程池状态为 RUNNING 工作线程数为0private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3; // aka 29private static final int CAPACITY = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits// -1补码 11111111 11111111 11111111 11111111// -1 << 29 => 11100000 00000000 00000000 00000000private static final int RUNNING = -1 << COUNT_BITS;// 0 << 29 => 00000000 00000000 00000000 00000000private static final int SHUTDOWN = 0 << COUNT_BITS;// 1 << 29 => 00100000 00000000 00000000 00000000private static final int STOP = 1 << COUNT_BITS;// 2 << 29 => 01000000 00000000 00000000 00000000private static final int TIDYING = 2 << COUNT_BITS;// 3 << 29 => 01100000 00000000 00000000 00000000private static final int TERMINATED = 3 << COUNT_BITS;// Packing and unpacking ctlprivate static int runStateOf(int c) { return c & ~CAPACITY; }private static int workerCountOf(int c) { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }
- 可以看到几个状态只用了
最高3bit
来表示,这是因为 ThreadPoolExecutor 只用了一个 int 变量来同时保存线程池状态
和工作线程数
- 最高3bit:表示线程池状态
- 其余29bit:工作线程数
ctl
这个原子整型就用来存储上述两个信息,ctlOf()
为它的构造方法,初始化是线程池 状态为RUNNING
,工作线程数为0
- 状态:
RUNNING
:接收新任务,也能处理阻塞队列里的任务SHUTDOWN
:不接收新任务,但是处理阻塞队列里的任务STOP
:不接受新任务,不处理阻塞队列中的任务,中断处理过程中的任务TIDYING
:当所有任务都执行完毕,当前线程池已经没有工作线程,这时线程池将转换为 TIDYDING 状态,并将调用 terminated 方法TERMINATED
:terminated 方法调用完成
ThreadPoolExecutor 如何维护内部线程
成员变量
/*** The queue used for holding tasks and handing off to worker* threads. We do not require that workQueue.poll() returning* null necessarily means that workQueue.isEmpty(), so rely* solely on isEmpty to see if the queue is empty (which we must* do for example when deciding whether to transition from* SHUTDOWN to TIDYING). This accommodates special-purpose* queues such as DelayQueues for which poll() is allowed to* return null even if it may later return non-null when delays* expire.*/// 用于存储挤压线程的阻塞队列// 为什么泛型是 Runnable ,因为 父类 AbstractExecutorService 会把 Callable 对象转换为 Runnable 的子类 FutureTaskprivate final BlockingQueue<Runnable> workQueue;/*** Lock held on access to workers set and related bookkeeping.* While we could use a concurrent set of some sort, it turns out* to be generally preferable to use a lock. Among the reasons is* that this serializes interruptIdleWorkers, which avoids* unnecessary interrupt storms, especially during shutdown.* Otherwise exiting threads would concurrently interrupt those* that have not yet interrupted. It also simplifies some of the* associated statistics bookkeeping of largestPoolSize etc. We* also hold mainLock on shutdown and shutdownNow, for the sake of* ensuring workers set is stable while separately checking* permission to interrupt and actually interrupting.*/private final ReentrantLock mainLock = new ReentrantLock();/*** Set containing all worker threads in pool. Accessed only when* holding mainLock.*/private final HashSet<Worker> workers = new HashSet<Worker>();/*** Wait condition to support awaitTermination*/private final Condition termination = mainLock.newCondition();/*** Tracks largest attained pool size. Accessed only under* mainLock.*/private int largestPoolSize;/*** Counter for completed tasks. Updated only on termination of* worker threads. Accessed only under mainLock.*/private long completedTaskCount;/** All user control parameters are declared as volatiles so that* ongoing actions are based on freshest values, but without need* for locking, since no internal invariants depend on them* changing synchronously with respect to other actions.*//*** Factory for new threads. All threads are created using this* factory (via method addWorker). All callers must be prepared* for addWorker to fail, which may reflect a system or user's* policy limiting the number of threads. Even though it is not* treated as an error, failure to create threads may result in* new tasks being rejected or existing ones remaining stuck in* the queue.** We go further and preserve pool invariants even in the face of* errors such as OutOfMemoryError, that might be thrown while* trying to create threads. Such errors are rather common due to* the need to allocate a native stack in Thread.start, and users* will want to perform clean pool shutdown to clean up. There* will likely be enough memory available for the cleanup code to* complete without encountering yet another OutOfMemoryError.*/private volatile ThreadFactory threadFactory;/*** Handler called when saturated or shutdown in execute.*/private volatile RejectedExecutionHandler handler;/*** Timeout in nanoseconds for idle threads waiting for work.* Threads use this timeout when there are more than corePoolSize* present or if allowCoreThreadTimeOut. Otherwise they wait* forever for new work.*/private volatile long keepAliveTime;/*** If false (default), core threads stay alive even when idle.* If true, core threads use keepAliveTime to time out waiting* for work.*/private volatile boolean allowCoreThreadTimeOut;/*** Core pool size is the minimum number of workers to keep alive* (and not allow to time out etc) unless allowCoreThreadTimeOut* is set, in which case the minimum is zero.*/private volatile int corePoolSize;/*** Maximum pool size. Note that the actual maximum is internally* bounded by CAPACITY.*/private volatile int maximumPoolSize;
- 阻塞队列
workQueue
:泛型为Runnable
,对于Callable
,父类 AbstractExecutorService 会把其转换为 Runnable 的子类 FutureTaskmainLock
:一个 ReentrantLock,用于线程池内部完成线程同步功能workers
:一个 HashSet,泛型 Worker 类,Worker 内部类是对 Thread 以及一些其他状态的封装。workers 是用来存储所有工作线程的集合- Int
largestPoolSize
:线程池中最多有过多少个活跃线程- long
completedTaskCount
:线程池总共处理了多少任务- volatile ThreadFactory
threadFactory
:线程工厂,可由用户自定义- volatile RejectedExecutionHandler
handler
:可指定的拒绝策略- volatile long
keepAliveTime
:若线程空闲则保持存活的时间- volatile boolean
allowCoreThreadTimeOut
:是否保持核心线程始终存活- volatile int
corePoolSize
:核心线程数,稳定的线程数目- volatile int
maximumPoolSize
:最大线程数:当核心线程满了,线程池将会在核心线程数的基础上创建新线程来处理任务,直到最大线程数
内部类 Worker
继承了 AQS 实现了 Runnable
- 实现 Runnbale:因为 worker 就是一个类似 Thread 的东西,代表一个异步任务
- 继承 AQS:当 worker 处理过程中接收到中断指令时,是立刻退出,还是等当前任务执行完毕再退出,这可以通过对
状态
的维护实现,继承 AQS 就具有了锁的能力/*** Class Worker mainly maintains interrupt control state for* threads running tasks, along with other minor bookkeeping.* This class opportunistically extends AbstractQueuedSynchronizer* to simplify acquiring and releasing a lock surrounding each* task execution. This protects against interrupts that are* intended to wake up a worker thread waiting for a task from* instead interrupting a task being run. We implement a simple* non-reentrant mutual exclusion lock rather than use* ReentrantLock because we do not want worker tasks to be able to* reacquire the lock when they invoke pool control methods like* setCorePoolSize. Additionally, to suppress interrupts until* the thread actually starts running tasks, we initialize lock* state to a negative value, and clear it upon start (in* runWorker).*/private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{/** Thread this worker is running in. Null if factory fails. */final Thread thread;/** Initial task to run. Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {// 用 -1 而不是0,因为希望在完全构造完毕前不接受中断// 配合最后的 interruptIfStarted() 方法实现setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker */public void run() {runWorker(this); // 重点、核心 稍后再说}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() != 0; // AQS}protected boolean tryAcquire(int unused) { // AQSif (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) { // AQSsetExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); } // AQSpublic boolean tryLock() { return tryAcquire(1); } // AQSpublic void unlock() { release(1); } // AQSpublic boolean isLocked() { return isHeldExclusively(); } // AQSvoid interruptIfStarted() { // 调用后中断当前工作线程Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}
runWokrer() 与 getTask() 方法
runWorker(this) 方法
- 处理构造函数中的第一个 task 后,不断从阻塞队列中获取 task 并处理
- 每当获取一个任务,就先加锁然后进行处理
beforeExecute()
、afterExecute()
方法,在 ThreadPoolExecutor 中时 protected 修饰的空实现,相当于线程池给每个任务都进行了切面- 一旦 task 为 null,就会触发
processWorkerExit
回收当前 worker 的操作/*** Main worker run loop. Repeatedly gets tasks from queue and* executes them, while coping with a number of issues:** 1. We may start out with an initial task, in which case we* don't need to get the first one. Otherwise, as long as pool is* running, we get tasks from getTask. If it returns null then the* worker exits due to changed pool state or configuration* parameters. Other exits result from exception throws in* external code, in which case completedAbruptly holds, which* usually leads processWorkerExit to replace this thread.** 2. Before running any task, the lock is acquired to prevent* other pool interrupts while the task is executing, and then we* ensure that unless pool is stopping, this thread does not have* its interrupt set.** 3. Each task run is preceded by a call to beforeExecute, which* might throw an exception, in which case we cause thread to die* (breaking loop with completedAbruptly true) without processing* the task.** 4. Assuming beforeExecute completes normally, we run the task,* gathering any of its thrown exceptions to send to afterExecute.* We separately handle RuntimeException, Error (both of which the* specs guarantee that we trap) and arbitrary Throwables.* Because we cannot rethrow Throwables within Runnable.run, we* wrap them within Errors on the way out (to the thread's* UncaughtExceptionHandler). Any thrown exception also* conservatively causes thread to die.** 5. After task.run completes, we call afterExecute, which may* also throw an exception, which will also cause thread to* die. According to JLS Sec 14.20, this exception is the one that* will be in effect even if task.run throws.** The net effect of the exception mechanics is that afterExecute* and the thread's UncaughtExceptionHandler have as accurate* information as we can provide about any problems encountered by* user code.** @param w the worker*/final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) { // 循环处理阻塞队列中的任务w.lock(); // 每个任务一进来都加锁// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task); // 切面Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown); // 切面}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally { // 释放 workerprocessWorkerExit(w, completedAbruptly); //}}getTask() 方法
/*** Performs blocking or timed wait for a task, depending on* current configuration settings, or returns null if this worker* must exit because of any of:* 1. There are more than maximumPoolSize workers (due to* a call to setMaximumPoolSize).* 2. The pool is stopped.* 3. The pool is shutdown and the queue is empty.* 4. This worker timed out waiting for a task, and timed-out* workers are subject to termination (that is,* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})* both before and after the timed wait, and if the queue is* non-empty, this worker is not the last thread in the pool.** @return task, or null if the worker must exit, in which case* workerCount is decremented*/private Runnable getTask() {// 记录上一次从阻塞队列中 poll 任务是否超时boolean timedOut = false; // Did the last poll() time out?// 预期:// 1. 如果阻塞队列里有任务,那么返回该任务// 2. 如果阻塞队列里没任务,那么应该阻塞等待队列里的任务// 3. 如果判断当前 worker 需要被回收,那么返回 nullfor (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.// 预期3:判断状态,可以回收并返回 nullif (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}// 当前工作线程的数目int wc = workerCountOf(c);// Are workers subject to culling?// allowCoreThreadTimeOut 是否允许核心线程一吃保持boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c)) // 尝试回收当前 workerreturn null;continue;}// 上述都不满足,就尝试从阻塞队列中获取任务 预期1 预期2try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take(); // take() 从队列中获取不到东西时,会等待if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
ThreadPoolExecutor 如何处理提交任务
execute 方法
- 当提交一个任务,线程池可能会创建一个线程来处理,也可能使用已有的线程处理
- 通过
addWorker
方法来尝试开启新线程
addWorker 方法
- 首先通过 CAS 操作,如果成功说明才真的有机会创建新的 worker,则进入创建 worker 的流程
- 为了防止并发环境下多个线程同时创建 worker,突破线程池核心线程数和最大线程数限制,因此使用了 成员 ReentrantLock(实际上 CAS 就保证了),主要是保证 workers 这个 HashSet 的线程安全
- 由于是 CAS 之后又用 Lock,因此释放 worker 的代码不需要判断 worker 数目
- 结果:新建 worker 是线程安全的;对 workers 这个 HashSet 进行 add 操作是线程安全的
七大参数
Executors
底层调用的是ThreadPoolExecutor
ThreadPoolExecutor
构造方法的七大参数
int corePoolSize
——核心线程池大小int maximumPoolSize
——最大线程池大小long keepAliveTime
——存活时间、超时没人调用就释放TimeUnit unit
——时间单位BlockingQueue<Runnable> workQueue
——阻塞队列ThreadFactory threadFactory
——线程工厂,用来创建线程RejectedExecutionHandler handler)
——拒绝策略
- ThreadPoolExecutor构造方法
核心线程池
去银行取钱,银行有5个柜台,其中2个有员工正在办理业务,剩下3个没人,暂停办理
- 有人的那两个就是核心线程池里的线程
最大线程池大小
银行窗口的总数,当所有有员工的柜台都已经被使用,且银行的候客区也没了座位的时候,就打开非核心线程
阻塞队列
相当于银行的候客区(那一排椅子),假设有3把椅子
- 用阻塞队列的形式来表示,即大小为3的阻塞队列
存活时间
银行开放了全部窗口,客户业务都办完了,人都走了,有几个窗口空出来了,空了一段时间后,还是没多少人,于是几个工作人员溜了,把窗口关了,只剩下一开始的那2个窗口
- 非核心线程,超过这个时间如果没有被使用,就释放,以节省系统开销
时间单位
存活时间的时间单位,可以是时、分、秒等
线程工厂
用来创建线程的工厂,一般不做改变
Executors.DefaultThreadFactory()
拒绝策略
去银行取钱,所有窗口和候客区全都满了,银行如何拒绝新来的人办理业务
- interface RejectedExecutionHandler有四个实现类,具体看四种拒绝策略
四种拒绝策略
Interface RejectedExecutionHandler
实现类:
CallerRunsPolicy
哪来的回哪里,回到要求开启这个线程的线程,比如在main方法中开线程,被拒绝,就会让main线程跑AbortPolicy
—— ThreadPoolExecutor默认拒绝策略 不处理新请求,抛出异常DiscardPolicy
不处理心情求,也不会抛出异常DiscardOldestPolicy
尝试丢弃最早的请求,然后把新请求放进来。竞争成功就执行新的,竞争失败就不响应。不抛出异常
使用原生ThreadPoolExecutor创建线程池
按照上面的举例来设置参数
- 核心线程池大小2
- 最大线程池大小5
- 阻塞队列大小为3
- 拒绝策略为Abort
- 测试:开5个线程,根据分析,2个用核心线程池跑,3个在阻塞队列里等
可以看到确实只用了两个线程,然后复用他们
- 测试,开6个线程,阻塞队列满了,多出来一个请求,则线程池额外开放一个非核心线程以供使用
可以看到,实际使用的线程为3个,其中2个核心,1个非核心是新开出来的
- 开 个线程,按照分析,最大5个线程,阻塞队列里能放3个,整个线程池能处理的线程请求数就是5 + 3 = 8,超过8则触发拒绝策略
- 根据设置的拒绝策略
Abort
,线程池会抛出异常 - 开9个线程
- 为了防止CPU跑的太快把用完的线程提前还回线程池了,于是让每个线程睡一会
CPU密集型 | IO密集型
最大线程数如何定义
CPU密集型
- CPU上有多少线程就定义几,可以保证硬件线程最大效率,可以通过任务管理器查看
- 在代码中使用代码获取CPU线程数
Runtime.getRuntime().availableProcessors()
IO密集型
- 程序有多个大型任务,十分占用资源
- 判断程序中耗费IO资源多的线程有多少个,然后将线程池最大线程数设置的大于这个数一些即可,通常为2倍