Version: Next

线程池

三种方法、七大参数、四种拒绝策略

使用线程池的优点

  1. 降低系统的资源开销、提高系统响应速度
    • 降低线程创建回收频率,在 1:1 线程模型下,减少操作系统操作线程带来的延迟
    • 防止开发人员无意识频繁创建线程耗尽系统资源
  2. 方便线程的管理
    • 维护线程状态(ID等)
    • 维护任务执行状态

线程复用、最大并发数、管理线程

使用线程池后,不再使用new Thread()的方式创建线程,而是使用线程池创建线程

阿里巴巴Java编程规约

线程池不允许使用Executors创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源消耗的风险

  1. FixedThreadPoolSingleThreadPool

    允许的请求队列长度为Integer_MAX_VALUE,可能会堆积大量的请求,从而导致OOM

  2. CacheedThreadPoolScheduledThreadPool

    允许的创建数量为integer_MAX_VALUE,可能会创建大量的线程,从而导致OOM

三大方法

Executors的三大方法

  • newSingleThreadExecutor()——单线程线程池

  • newFixedThreadPool(int)——固定数量线程的线程池

  • newCachedThreadPool()——可伸缩线程池

  • 创建线程:

    • 同步
    executorService.execute(Runnable runnable)
    • 异步
    executorService.submit(Runnable runnable)
  • 关闭线程池: executorService.shutDown()

  • SingleThreadExecutor
@Test
public void test1() {
ExecutorService threadPool = Executors.newSingleThreadExecutor();
try {
for (int i = 0; i < 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is running");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭线程池
threadPool.shutdown();
}
}
pool-1-thread-1 is running
pool-1-thread-1 is running
pool-1-thread-1 is running
pool-1-thread-1 is running
pool-1-thread-1 is running
pool-1-thread-1 is running
pool-1-thread-1 is running
pool-1-thread-1 is running
pool-1-thread-1 is running
pool-1-thread-1 is running

可以看到确实只有一个线程

  • FixedThreadPool
@Test
public void test2() {
ExecutorService threadPool = Executors.newFixedThreadPool(9);
try {
for (int i = 0; i < 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is running");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭线程池
threadPool.shutdown();
}
}
pool-1-thread-1 is running
pool-1-thread-2 is running
pool-1-thread-3 is running
pool-1-thread-4 is running
pool-1-thread-5 is running
pool-1-thread-6 is running
pool-1-thread-7 is running
pool-1-thread-8 is running
pool-1-thread-2 is running
pool-1-thread-9 is running

创建拥有9个线程的线程池,开10个线程,发现线程2被复用了

  • CachedThreadPool
@Test
public void test3() {
ExecutorService threadPool = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is running");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭线程池
threadPool.shutdown();
}
}
pool-1-thread-1 is running
pool-1-thread-2 is running
pool-1-thread-3 is running
pool-1-thread-4 is running
pool-1-thread-5 is running
pool-1-thread-5 is running
pool-1-thread-1 is running
pool-1-thread-3 is running
pool-1-thread-2 is running
pool-1-thread-4 is running

可以看到可伸缩的CachedThreadPool自动创建了5个线程,程序申请10个线程,线程池中的5个线程都被复用了一次


ThreadPoolExecutor

ThreadPoolExecutor 自身有哪些状态

继承关系

Executor ← ExecutorService ← AbstractExecutorService ← ThreadPoolExecutor

/**
* The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
*
* In order to pack them into one int, we limit workerCount to
* (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
* billion) otherwise representable. If this is ever an issue in
* the future, the variable can be changed to be an AtomicLong,
* and the shift/mask constants below adjusted. But until the need
* arises, this code is a bit faster and simpler using an int.
*
* The workerCount is the number of workers that have been
* permitted to start and not permitted to stop. The value may be
* transiently different from the actual number of live threads,
* for example when a ThreadFactory fails to create a thread when
* asked, and when exiting threads are still performing
* bookkeeping before terminating. The user-visible pool size is
* reported as the current size of the workers set.
*
* The runState provides the main lifecycle control, taking on values:
*
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* TERMINATED: terminated() has completed
*
* The numerical order among these values matters, to allow
* ordered comparisons. The runState monotonically increases over
* time, but need not hit each state. The transitions are:
*
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
*
* Threads waiting in awaitTermination() will return when the
* state reaches TERMINATED.
*
* Detecting the transition from SHUTDOWN to TIDYING is less
* straightforward than you'd like because the queue may become
* empty after non-empty and vice versa during SHUTDOWN state, but
* we can only terminate if, after seeing that it is empty, we see
* that workerCount is 0 (which sometimes entails a recheck -- see
* below).
*/
// 初始状态下线程池状态为 RUNNING 工作线程数为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // aka 29
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// -1补码 11111111 11111111 11111111 11111111
// -1 << 29 => 11100000 00000000 00000000 00000000
private static final int RUNNING = -1 << COUNT_BITS;
// 0 << 29 => 00000000 00000000 00000000 00000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 1 << 29 => 00100000 00000000 00000000 00000000
private static final int STOP = 1 << COUNT_BITS;
// 2 << 29 => 01000000 00000000 00000000 00000000
private static final int TIDYING = 2 << COUNT_BITS;
// 3 << 29 => 01100000 00000000 00000000 00000000
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
  • 可以看到几个状态只用了 最高3bit 来表示,这是因为 ThreadPoolExecutor 只用了一个 int 变量来同时保存 线程池状态工作线程数
    • 最高3bit:表示线程池状态
    • 其余29bit:工作线程数
  • ctl 这个原子整型就用来存储上述两个信息,ctlOf() 为它的构造方法,初始化是线程池 状态为 RUNNING,工作线程数为 0
  • 状态:
    • RUNNING:接收新任务,也能处理阻塞队列里的任务
    • SHUTDOWN:不接收新任务,但是处理阻塞队列里的任务
    • STOP:不接受新任务,不处理阻塞队列中的任务,中断处理过程中的任务
    • TIDYING:当所有任务都执行完毕,当前线程池已经没有工作线程,这时线程池将转换为 TIDYDING 状态,并将调用 terminated 方法
    • TERMINATED:terminated 方法调用完成

image-20210519200040059

ThreadPoolExecutor 如何维护内部线程

成员变量

/**
* The queue used for holding tasks and handing off to worker
* threads. We do not require that workQueue.poll() returning
* null necessarily means that workQueue.isEmpty(), so rely
* solely on isEmpty to see if the queue is empty (which we must
* do for example when deciding whether to transition from
* SHUTDOWN to TIDYING). This accommodates special-purpose
* queues such as DelayQueues for which poll() is allowed to
* return null even if it may later return non-null when delays
* expire.
*/
// 用于存储挤压线程的阻塞队列
// 为什么泛型是 Runnable ,因为 父类 AbstractExecutorService 会把 Callable 对象转换为 Runnable 的子类 FutureTask
private final BlockingQueue<Runnable> workQueue;
/**
* Lock held on access to workers set and related bookkeeping.
* While we could use a concurrent set of some sort, it turns out
* to be generally preferable to use a lock. Among the reasons is
* that this serializes interruptIdleWorkers, which avoids
* unnecessary interrupt storms, especially during shutdown.
* Otherwise exiting threads would concurrently interrupt those
* that have not yet interrupted. It also simplifies some of the
* associated statistics bookkeeping of largestPoolSize etc. We
* also hold mainLock on shutdown and shutdownNow, for the sake of
* ensuring workers set is stable while separately checking
* permission to interrupt and actually interrupting.
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* Wait condition to support awaitTermination
*/
private final Condition termination = mainLock.newCondition();
/**
* Tracks largest attained pool size. Accessed only under
* mainLock.
*/
private int largestPoolSize;
/**
* Counter for completed tasks. Updated only on termination of
* worker threads. Accessed only under mainLock.
*/
private long completedTaskCount;
/*
* All user control parameters are declared as volatiles so that
* ongoing actions are based on freshest values, but without need
* for locking, since no internal invariants depend on them
* changing synchronously with respect to other actions.
*/
/**
* Factory for new threads. All threads are created using this
* factory (via method addWorker). All callers must be prepared
* for addWorker to fail, which may reflect a system or user's
* policy limiting the number of threads. Even though it is not
* treated as an error, failure to create threads may result in
* new tasks being rejected or existing ones remaining stuck in
* the queue.
*
* We go further and preserve pool invariants even in the face of
* errors such as OutOfMemoryError, that might be thrown while
* trying to create threads. Such errors are rather common due to
* the need to allocate a native stack in Thread.start, and users
* will want to perform clean pool shutdown to clean up. There
* will likely be enough memory available for the cleanup code to
* complete without encountering yet another OutOfMemoryError.
*/
private volatile ThreadFactory threadFactory;
/**
* Handler called when saturated or shutdown in execute.
*/
private volatile RejectedExecutionHandler handler;
/**
* Timeout in nanoseconds for idle threads waiting for work.
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
*/
private volatile long keepAliveTime;
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* Core pool size is the minimum number of workers to keep alive
* (and not allow to time out etc) unless allowCoreThreadTimeOut
* is set, in which case the minimum is zero.
*/
private volatile int corePoolSize;
/**
* Maximum pool size. Note that the actual maximum is internally
* bounded by CAPACITY.
*/
private volatile int maximumPoolSize;
  • 阻塞队列 workQueue:泛型为 Runnable,对于 Callable,父类 AbstractExecutorService 会把其转换为 Runnable 的子类 FutureTask
  • mainLock:一个 ReentrantLock,用于线程池内部完成线程同步功能
  • workers:一个 HashSet,泛型 Worker 类,Worker 内部类是对 Thread 以及一些其他状态的封装。workers 是用来存储所有工作线程的集合
  • Int largestPoolSize:线程池中最多有过多少个活跃线程
  • long completedTaskCount:线程池总共处理了多少任务
  • volatile ThreadFactory threadFactory:线程工厂,可由用户自定义
  • volatile RejectedExecutionHandler handler:可指定的拒绝策略
  • volatile long keepAliveTime:若线程空闲则保持存活的时间
  • volatile boolean allowCoreThreadTimeOut:是否保持核心线程始终存活
  • volatile int corePoolSize:核心线程数,稳定的线程数目
  • volatile int maximumPoolSize:最大线程数:当核心线程满了,线程池将会在核心线程数的基础上创建新线程来处理任务,直到最大线程数

内部类 Worker

继承了 AQS 实现了 Runnable

  • 实现 Runnbale:因为 worker 就是一个类似 Thread 的东西,代表一个异步任务
  • 继承 AQS:当 worker 处理过程中接收到中断指令时,是立刻退出,还是等当前任务执行完毕再退出,这可以通过对 状态 的维护实现,继承 AQS 就具有了锁的能力
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 用 -1 而不是0,因为希望在完全构造完毕前不接受中断
// 配合最后的 interruptIfStarted() 方法实现
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this); // 重点、核心 稍后再说
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0; // AQS
}
protected boolean tryAcquire(int unused) { // AQS
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) { // AQS
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); } // AQS
public boolean tryLock() { return tryAcquire(1); } // AQS
public void unlock() { release(1); } // AQS
public boolean isLocked() { return isHeldExclusively(); } // AQS
void interruptIfStarted() { // 调用后中断当前工作线程
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

runWokrer() 与 getTask() 方法

runWorker(this) 方法

  • 处理构造函数中的第一个 task 后,不断从阻塞队列中获取 task 并处理
  • 每当获取一个任务,就先加锁然后进行处理
  • beforeExecute()afterExecute()方法,在 ThreadPoolExecutor 中时 protected 修饰的空实现,相当于线程池给每个任务都进行了切面
  • 一旦 task 为 null,就会触发 processWorkerExit 回收当前 worker 的操作
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { // 循环处理阻塞队列中的任务
w.lock(); // 每个任务一进来都加锁
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 切面
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 切面
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally { // 释放 worker
processWorkerExit(w, completedAbruptly); //
}
}

getTask() 方法

/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
// 记录上一次从阻塞队列中 poll 任务是否超时
boolean timedOut = false; // Did the last poll() time out?
// 预期:
// 1. 如果阻塞队列里有任务,那么返回该任务
// 2. 如果阻塞队列里没任务,那么应该阻塞等待队列里的任务
// 3. 如果判断当前 worker 需要被回收,那么返回 null
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 预期3:判断状态,可以回收并返回 null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 当前工作线程的数目
int wc = workerCountOf(c);
// Are workers subject to culling?
// allowCoreThreadTimeOut 是否允许核心线程一吃保持
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) // 尝试回收当前 worker
return null;
continue;
}
// 上述都不满足,就尝试从阻塞队列中获取任务 预期1 预期2
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take(); // take() 从队列中获取不到东西时,会等待
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

ThreadPoolExecutor 如何处理提交任务

execute 方法

/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
  • 当提交一个任务,线程池可能会创建一个线程来处理,也可能使用已有的线程处理
  • 通过 addWorker 方法来尝试开启新线程

image-20210519205444079

addWorker 方法

  • 首先通过 CAS 操作,如果成功说明才真的有机会创建新的 worker,则进入创建 worker 的流程
  • 为了防止并发环境下多个线程同时创建 worker,突破线程池核心线程数和最大线程数限制,因此使用了 成员 ReentrantLock(实际上 CAS 就保证了),主要是保证 workers 这个 HashSet 的线程安全
  • 由于是 CAS 之后又用 Lock,因此释放 worker 的代码不需要判断 worker 数目
  • 结果:新建 worker 是线程安全的;对 workers 这个 HashSet 进行 add 操作是线程安全的
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

七大参数

Executors底层调用的是ThreadPoolExecutor

  • ThreadPoolExecutor构造方法的七大参数
    1. int corePoolSize——核心线程池大小
    2. int maximumPoolSize——最大线程池大小
    3. long keepAliveTime——存活时间、超时没人调用就释放
    4. TimeUnit unit——时间单位
    5. BlockingQueue<Runnable> workQueue——阻塞队列
    6. ThreadFactory threadFactory——线程工厂,用来创建线程
    7. RejectedExecutionHandler handler)——拒绝策略
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
  • ThreadPoolExecutor构造方法
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

核心线程池

去银行取钱,银行有5个柜台,其中2个有员工正在办理业务,剩下3个没人,暂停办理

  • 有人的那两个就是核心线程池里的线程

最大线程池大小

银行窗口的总数,当所有有员工的柜台都已经被使用,且银行的候客区也没了座位的时候,就打开非核心线程

阻塞队列

相当于银行的候客区(那一排椅子),假设有3把椅子

  • 用阻塞队列的形式来表示,即大小为3的阻塞队列

存活时间

银行开放了全部窗口,客户业务都办完了,人都走了,有几个窗口空出来了,空了一段时间后,还是没多少人,于是几个工作人员溜了,把窗口关了,只剩下一开始的那2个窗口

  • 非核心线程,超过这个时间如果没有被使用,就释放,以节省系统开销

时间单位

存活时间的时间单位,可以是时、分、秒等

线程工厂

用来创建线程的工厂,一般不做改变

  • Executors.DefaultThreadFactory()

拒绝策略

去银行取钱,所有窗口和候客区全都满了,银行如何拒绝新来的人办理业务

  • interface RejectedExecutionHandler有四个实现类,具体看四种拒绝策略

四种拒绝策略

Interface RejectedExecutionHandler

实现类

  1. CallerRunsPolicy 哪来的回哪里,回到要求开启这个线程的线程,比如在main方法中开线程,被拒绝,就会让main线程跑
  2. AbortPolicy—— ThreadPoolExecutor默认拒绝策略 不处理新请求,抛出异常
  3. DiscardPolicy 不处理心情求,也不会抛出异常
  4. DiscardOldestPolicy 尝试丢弃最早的请求,然后把新请求放进来。竞争成功就执行新的,竞争失败就不响应。不抛出异常

使用原生ThreadPoolExecutor创建线程池

按照上面的举例来设置参数

  • 核心线程池大小2
  • 最大线程池大小5
  • 阻塞队列大小为3
  • 拒绝策略为Abort
  • 测试:开5个线程,根据分析,2个用核心线程池跑,3个在阻塞队列里等
@Test
public void test4() {
//使用原生ThreadPoolExecutor创建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy())
try {
for (int i = 1; i <= 5; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " -> running");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
pool-1-thread-2 -> running
pool-1-thread-1 -> running
pool-1-thread-2 -> running
pool-1-thread-2 -> running
pool-1-thread-1 -> running

可以看到确实只用了两个线程,然后复用他们

  • 测试,开6个线程,阻塞队列满了,多出来一个请求,则线程池额外开放一个非核心线程以供使用
@Test
public void test4() {
//使用原生ThreadPoolExecutor创建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy())
try {
for (int i = 1; i <= 6; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " -> running");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
pool-1-thread-1 -> running
pool-1-thread-2 -> running
pool-1-thread-1 -> running
pool-1-thread-3 -> running
pool-1-thread-3 -> running
pool-1-thread-2 -> running

可以看到,实际使用的线程为3个,其中2个核心,1个非核心是新开出来的

  • 开 个线程,按照分析,最大5个线程,阻塞队列里能放3个,整个线程池能处理的线程请求数就是5 + 3 = 8,超过8则触发拒绝策略
  • 根据设置的拒绝策略Abort线程池会抛出异常
  • 开9个线程
  • 为了防止CPU跑的太快把用完的线程提前还回线程池了,于是让每个线程睡一会
@Test
public void test4() {
//使用原生ThreadPoolExecutor创建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy(
try {
for (int i = 1; i <= 9; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " -> running");
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
pool-1-thread-1 -> running
pool-1-thread-2 -> running
pool-1-thread-3 -> running
pool-1-thread-4 -> running
pool-1-thread-5 -> running
java.util.concurrent.RejectedExecutionException: Task threadPool.Demo01$$Lambda$1/992136656@7960847b rejected from java.util.concurrent.ThreadPoolExecutor@6a6824be[Running, pool size = 5, active threads = 5, queued tasks = 3, completed tasks = 0]
...

CPU密集型 | IO密集型

最大线程数如何定义

CPU密集型

  • CPU上有多少线程就定义几,可以保证硬件线程最大效率,可以通过任务管理器查看
  • 在代码中使用代码获取CPU线程数
Runtime.getRuntime().availableProcessors()

IO密集型

  • 程序有多个大型任务,十分占用资源
  • 判断程序中耗费IO资源多的线程有多少个,然后将线程池最大线程数设置的大于这个数一些即可,通常为2倍