Aqs源码分析
AQS
AQS是Java中互斥锁实现的一个框架,很多Java中耳熟能详的锁类都继承了它。
先来看看acquire
方法。这个方法功能是拿到互斥锁,如果现在不可行就会等待到拿到锁后才返回。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//如果中间收到过中断信号,这里手动恢复这个信号
selfInterrupt();
}
上面的方法会尝试通过tryAcquire
来拿锁,这个是个模板方法,需要子类覆盖。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
来看一下addWaiter
方法做了什么,这个方法会将当前线程包装成一个链表结点,并将结点加入到等待链表的尾部。并且链表的头结点对应的是取到锁的那个线程。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
//优先用CAS指令来完成追加操作,如果因为并发原因失败,就使用较慢的方式
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
其默认用一次CAS指令来完成,如果没有成功,就一直循环尝试。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//head初始化为一个空结点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
剩下的acquireQueued
则是循环拿锁的逻辑。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//这里只有链表头后一个顶点才能竞争锁,就能减少实际对锁的竞争
final Node p = node.predecessor();
//假如自己有竞争的资格就去拿锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//拿锁失败后阻塞一段时间
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//如果取锁失败(只有可能是抛出异常退出的情况),将顶点设置为cancel状态
cancelAcquire(node);
}
}
具体会不会阻塞,取决于shouldParkAfterFailedAcquire
的返回值。其逻辑是只有前驱顶点是在等待信号的时候,才会选择被park
,否则自旋。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
//若前置结点因为超时或中断等原因被取消了,需要修正自己的prev指针。
//这相当于等待状态为cancel的结点会从链表中被删除
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//否则将前驱的等待状态设置为SIGNAL,它会在结束后恢复当前线程
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//如果不是signal状态,就先不阻塞
return false;
}
根据上面的逻辑,我们可以将等待状态为SIGNAL的连续结点合并为一个段,因此链表中存在若干个段。每个段中第一个结点都处于循环取锁的状态,其余结点则处于被挂起的状态。
下面的方法则是阻塞的实现,阻塞直到收到打断信息。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
//返回值是是否收到中断信号
return Thread.interrupted();
}
下面是LockSupport#park方法的实现:
public static void park(Object blocker) {
Thread t = Thread.currentThread();
//在这个场景中blocker就是锁对象
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
其中UNSAFE的park方法是native方法,我们需要追踪JDK源码。
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) {
HOTSPOT_THREAD_PARK_BEGIN((uintptr_t) thread->parker(), (int) isAbsolute, time);
EventThreadPark event;
JavaThreadParkedState jtps(thread, time != 0);
thread->parker()->park(isAbsolute != 0, time);
if (event.should_commit()) {
const oop obj = thread->current_park_blocker();
if (time == 0) {
post_thread_park_event(&event, obj, min_jlong, min_jlong);
} else {
if (isAbsolute != 0) {
post_thread_park_event(&event, obj, min_jlong, time);
} else {
post_thread_park_event(&event, obj, time, min_jlong);
}
}
}
HOTSPOT_THREAD_PARK_END((uintptr_t) thread->parker());
} UNSAFE_END
好的,抱歉,看不懂,我们继续读下面的代码好了。
下面来了解一下允许中断的情况的版本。
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
在实际阻塞过程中一旦检测到中断后,就会通过异常退出。
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
接下来来了解一下带超时时间的版本。
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
//超时了
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
//睡一会
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
之前拿的都是互斥锁,下面展示的是拿共享锁的版本。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
感觉都没啥区别。除了不可中断的版本外,其余的版本在失败的情况下都会调用cancelAcquire
这个方法。看不太懂就不扯了。
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
//这里老样子删除掉了前面所有被取消的结点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
//这里将这个顶点的状态也切换成CANCELLED
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
//如果是末尾的话,需要删除掉这个顶点
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
//如果pred是处于SIGNAL状态,那么将它的新的
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//由于可能中断了段,所以唤醒后继的SIGNAL等待状态的结点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
看了一下,好像忘了讲解锁的代码了。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
//这里做些额外的校验,之后还需需要再次校验
if (h != null && h.waitStatus != 0)
//唤醒后继结点
unparkSuccessor(h);
return true;
}
return false;
}
解锁会先调用tryRelease
,这个类做一些清理以及检查操作,默认实现是抛出异常,需要子类覆盖这个方法。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
比如在ReentrantLock
的Sync
类有如下实现:
protected final boolean tryRelease(int releases) {
//c表示剩余加锁次数
int c = getState() - releases;
//要求解锁线程和加锁线程相同,否则会抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
//加锁次数减少到0,就相当于锁被释放了
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
在通过tryRelease
的校验之后,之后是正式的解锁通知流程。唤醒的代码如下:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
//
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
//不存在下一个结点,或者下一个结点已经被唤醒
if (s == null || s.waitStatus > 0) {
s = null;
//从后向前找到最前面处于等待中的顶点(不考虑头顶点)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//如果存在这样的顶点,就unpark它对应的线程
if (s != null)
LockSupport.unpark(s.thread);
}
其中waitingStatus
表示一个顶点的阻塞状态,有如下值:
- 1,CANCELLED:等待线程因为超时或中断等原因被取消等待
- -1,SIGNAL:线程在等待信号中断,这个状态比较特殊的是
- -2,CONDITION:线程在等待条件的满足
- -3,PROPAGATE:下一个acquireShared将被无条件传播
这里比较神奇的是,可以发现解锁的时候,头部顶点是不会被删除的。
Condition
AQS支持Condition的方式,所谓的Condition其实对应的就是Object的Monitor。一般我们等待某个条件的时候会写下面代码:
syncronized(this){
while(!ok()){
wait();
}
}
对应的一方需要通知条件的成立。
syncronized(this){
setOk();
notifyAll();
}
这里使用wait
,notify
和syncronized
分别实现对条件的等待,通知它的达成(可能)以及为了访问条件需要抢占锁。我们可以
abstract static class Sync extends AbstractQueuedSynchronizer {
//创建一个条件对象
final ConditionObject newCondition() {
return new ConditionObject();
}
}
下面是await
和signal
的源码,分别对应wait
和notify
。
public class ConditionObject implements Condition, java.io.Serializable {
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//向条件队列中加入等待人
Node node = addConditionWaiter();
//释放锁资源
int savedState = fullyRelease(node);
int interruptMode = 0;
//如果还未收到通知,则继续等待
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//尝试重新获得锁资源
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//只唤醒第一个
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
//如果只有一个等待结点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
//这里会遍历全表
first = next;
} while (first != null);
}
}
下面是将结点加回到同步队列的代码。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//将node移动到同步队列中
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
//如果前驱被取消或者不能转换成SIGNAL状态,那么就直接唤醒,否则让前驱唤醒自己
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
}
ReentrantLock
ReentrantLock
是一种可重入锁,其内部持有Sync
对象负责加锁解锁逻辑,以复合优于继承的方式,避免暴露额外的接口。
你可以通过构造器决定具体用的是公平锁还是非公平锁。注意这里的公平是指所有线程都按照先来后到的方式取锁,而非公平是指已经加入到同步队列中的线程按照先来后到的方式取锁,但是如果有线程来抢锁(抢不到才会被加入同步队列中),是有可能比同步队列中的线程先抢到锁的。在锁释放后且同步队列中有等待线程的情况下,非公平锁会比公平锁更加高效,因为后者线程一定会被挂起并加入到同步队列尾部(导致线程上下文切换),而前者由于占有CPU时间片,因此有很大可能性可以抢到锁。而公平锁则能保证不会有线程被饿死。
public class ReentrantLock implements Lock, java.io.Serializable {
public ReentrantLock() {
//默认非公平锁
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
}
下面是Sync
的源码,它是公平锁和非公平锁的基类。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//非公平锁在无锁的情况下,会优先去抢锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果是重入的情况
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//这里确保加锁和解锁的是同一个线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果锁全部释放了,则清空占锁线程的信息
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//更新状态
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
//判断是否持有独占锁
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
final Thread getOwner() {
//获取占用锁的线程
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
}
下面是公平锁:
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//公平锁在无锁的情况下,会检查是否有更有资格的候选人,如果没有,才执行抢锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
下面是非公平锁:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
//先直接抢锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//之后再走默认流程
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
//非公平的方式抢锁
return nonfairTryAcquire(acquires);
}
}