Version: Next

AQS

AQS AbstractQueuedSynchronizer类在java.util.concurren.locks包下

AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效的构造出应用广泛的大量的同步器

例如ReentrantLock、Semaphore、ReentrantReadWriteLock、SynchronizedQueue、FutureTask等皆是基于AQS的

AQS原理分析

AQS核心思想是

  • 如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态

  • 如果被请求的共享资源被占用,那么久需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入队列中

CLH (Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟双向队列即不出在队列实例,仅存在节点之间的关联关系)

  • AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个节点来实现锁分配的

image-20200702130952411

State:状态

AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改

private volatile int state; //共享变量,使用volatile修饰保证线程可见性
// 使用 int 而不是 boolean,因为在 共享锁(读锁)情况下,可能有多个读线程获取资源,需要标记它们的数目
private transient volatile Node tail;
private transient volatile Node head;
AQS 源码中的 Node 类
/**
* Wait queue node class.
*
* <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
* Hagersten) lock queue. CLH locks are normally used for
* spinlocks. We instead use them for blocking synchronizers, but
* use the same basic tactic of holding some of the control
* information about a thread in the predecessor of its node. A
* "status" field in each node keeps track of whether a thread
* should block. A node is signalled when its predecessor
* releases. Each node of the queue otherwise serves as a
* specific-notification-style monitor holding a single waiting
* thread. The status field does NOT control whether threads are
* granted locks etc though. A thread may try to acquire if it is
* first in the queue. But being first does not guarantee success;
* it only gives the right to contend. So the currently released
* contender thread may need to rewait.
*
* <p>To enqueue into a CLH lock, you atomically splice it in as new
* tail. To dequeue, you just set the head field.
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
*
* <p>Insertion into a CLH queue requires only a single atomic
* operation on "tail", so there is a simple atomic point of
* demarcation from unqueued to queued. Similarly, dequeuing
* involves only updating the "head". However, it takes a bit
* more work for nodes to determine who their successors are,
* in part to deal with possible cancellation due to timeouts
* and interrupts.
*
* <p>The "prev" links (not used in original CLH locks), are mainly
* needed to handle cancellation. If a node is cancelled, its
* successor is (normally) relinked to a non-cancelled
* predecessor. For explanation of similar mechanics in the case
* of spin locks, see the papers by Scott and Scherer at
* http://www.cs.rochester.edu/u/scott/synchronization/
*
* <p>We also use "next" links to implement blocking mechanics.
* The thread id for each node is kept in its own node, so a
* predecessor signals the next node to wake up by traversing
* next link to determine which thread it is. Determination of
* successor must avoid races with newly queued nodes to set
* the "next" fields of their predecessors. This is solved
* when necessary by checking backwards from the atomically
* updated "tail" when a node's successor appears to be null.
* (Or, said differently, the next-links are an optimization
* so that we don't usually need a backward scan.)
*
* <p>Cancellation introduces some conservatism to the basic
* algorithms. Since we must poll for cancellation of other
* nodes, we can miss noticing whether a cancelled node is
* ahead or behind us. This is dealt with by always unparking
* successors upon cancellation, allowing them to stabilize on
* a new predecessor, unless we can identify an uncancelled
* predecessor who will carry this responsibility.
*
* <p>CLH queues need a dummy header node to get started. But
* we don't create them on construction, because it would be wasted
* effort if there is never contention. Instead, the node
* is constructed and head and tail pointers are set upon first
* contention.
*
* <p>Threads waiting on Conditions use the same nodes, but
* use an additional link. Conditions only need to link nodes
* in simple (non-concurrent) linked queues because they are
* only accessed when exclusively held. Upon await, a node is
* inserted into a condition queue. Upon signal, the node is
* transferred to the main queue. A special value of status
* field is used to mark which queue a node is on.
*
* <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
* Scherer and Michael Scott, along with members of JSR-166
* expert group, for helpful ideas, discussions, and critiques
* on the design of this class.
*/
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
// 四个状态
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus; // 等待状态,枚举值
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
// ...
}

state状态的访问方式有是那种:getState()setState()compareAndSetState(),均是原子操作。

其中,compareAndSetState()调用了Unsafe的compareAndSwapInt()方法

// 返回同步状态的当前值,原子读
protected final int getState() {
return state;
}
// 设置同步状态的值,原子写
protected final void setState(int newState) {
state = newState;
}
// 自动将同步状态设置为给定的更新状态值(如果当前状态值达到预期值)
// 原子读写操作, 基于CAS
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

AQS共享资源的方式:独占式和共享式

AQS定义了两种资源共享方式:独占式(Exclusive)和共享式(Share)

  • Exclusive(独占式):只有一个线程能执行,如ReentrantLock,又分为公平锁和非公平锁
    • 公平锁:按照线程在队列中的排队顺序,先到者先获得锁
    • 非公平锁:当前线程要获取锁时,无视队列顺序直接抢锁,谁抢到就是谁的
  • Share(共享式):多个线程可同时执行,如Semaphore和CountDownLatch

ReentrantReadWriteLock可以看成是组合式,因为ReentrantReadWriteLock也就是读写锁,允许多个线程同时对某一资源进行读操作

AQS只是一个框架,只定义了一个接口,具体资源的获取、释放都交由自定义同步器实现

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队、唤醒出队等),AQS已经在顶层实现好了

AQS底层使用了模板方法模式

同步器的设计基于模板方法模式,如果需要自定义同步器一般的方式是这样

  1. 使用者继承AbstractQueuedSynchronizer并重写指定方法,这些方法很简单,无非是对共享资源state的获取和释放
  2. 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法

这和我们以往通过实现接口的方式有很大的区别,这是模板方法模式很经典的一个应用

AQS使用了模板方法模式,自定义同步器时,需要重写下面几个AQS提供的模板方法:

方法名资源共享方式说明
isHeldExclusively查询该是否正在独占资源,只有用到condition才需要去实现该方法
tryAcquire(int)独占方式尝试获取资源:成功返回true、失败返回false
tryRelease(int)独占方式尝试释放资源:成功返回true、失败返回false
tryAcquireShared(int)共享方式尝试获取资源:负数表示失败;0表示成功;正数表示成功且有剩余资源
tryReleaseShared(int)共享方式尝试释放资源:如果释放资源后允许唤醒后续等待线程,则返回true,否则返回false

默认情况下,每个方法都抛出UnsupportedOperationException。这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。

AQS类中的其他方法都是final,所以无法被其他类使用,只有这几个方法可以被其他类使用

ReentrantLock

以ReentrantLock为例,state初始化为0,表示未锁定状态

A线程lock()时,会调用tryAcquire()独占该锁并state+1

此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)位置,其他线程才有机会获取该锁

当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念

但要注意,获取多少次就要释放多少次,这样才能保证state能够回到0

CountDownLatch

任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。

这N个子线程时并行执行的,每个子线程执行完后countDown()一次,state会CAS减1

等到所有子线程都执行完后(state = 0 ),会unpack()主调用线程,然后主调用线程会从await()函数返回,继续后续动作


一般来说,自定义同步器要么是独占方式,要么是共享方式

以下两组方法:

  1. tryAcquire()tryRelease()
  2. tryAcquireShared()tryReleaseShared()

中的一组即可

但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock

acquire(int) 方法

acquire() 方法,获取锁,不立即返回,愿意进入队列等待,直到成功获取

// public 表明大家都来调用我
// final 表明不允许重写,因为这个方法一定可以获取到锁
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 如果 tryAcquire 可以获取锁就直接退出
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 否则排队等待
selfInterrupt();
}
  • addWaiter(Node mode):将当前线程封装成一个 Node 然后加入等待队列,返回值为当前节点
    • 如果尾结点不为空,通过 CAS 把新节点添加到 AQS 队尾
    • 否则执行 enq(node) 方法,触发完整入队方法
加入等待队列
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
enq(Node) 完整入队方法
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) { // 自旋
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued(final Node node, int arg)
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 由于存在虚拟头结点,判断是不是头结点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // 判断当前线程是否需要挂起
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
// 判断是否需要挂起当前线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 查看前置节点的 状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 如果状态为 SIGNAL 表示前置节点也在等待获取锁
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true; // 则当前节点可以直接返回并挂起
if (ws > 0) { // 说明状态值可能是 cancel
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do { // 所以可以将节点从队列中删除
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 让前置节点的 状态为 SINGNAL 让其被唤醒
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
如果上面的 shouldParkAfterFailedAcquire 方法返回 true,说明节点需要被挂起
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // Native 操作系统原语,线程阻塞
return Thread.interrupted();
}

总结

  • 如果当前 node 处于 headNode 的后面一个,则会自旋不断尝试获取锁,直到拿锁成功,否则进行判断自身是否需要挂起
  • 判断自身是否需要被挂起:
    • 如果当前线程所在的节点之前,除了 headNode 以外还有其他节点,且其他节点的 waitStatus 为 SIGNAL,那么当前节点就需要挂起,这样就能保证 headNode 之后只有一个节点在通过 CAS 获取锁,而之后的其他节点都已经挂起或者正在挂起,这样就可以尽量避免自旋带来的CPU消耗
  • 在合适的实际唤醒被挂起的线程:应当是在持有锁的线程释放时
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
// 通过 unparkSuccessor 方法唤醒 AQS 中被挂起的节点
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
// 传入的是 headNode,该方法是为了唤醒 headNode 之后的 node,使其自旋获取锁
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 一般是下一个节点,但如果被取消或当前为 null,从队列尾开始唤醒
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 唤醒
}