多线程并发AQS源码分析
一、业务场景描述
业务场景,银行中存在一个办理业务的窗口,A、B、C三个用户(线程)去办理业务,先是A用户去窗口办理业务,然后B、C在等候区排队等候,过一会儿,A用户(线程)办理完业务,B用户(线程)去业务窗口办理业务,以此类推C用户一样。
二、分析之前上分析图
三、源码分析
#(1)A线程开始工作,第一步调用lock方法
public void lock() {
sync.lock();
}
#其中:sync继承 AbstractQueuedSynchronizer——(AQS)
abstract static class Sync extends AbstractQueuedSynchronizer
#(2)进入lock方法体
final void lock() {
#比较并设置state的状态为1;1-表示占用,0-表示未占用
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
#(3)进入setExclusiveOwnerThread()设置当前拥有独占访问权限的线程。
到此A线程开始在窗口执行工作,此代码块到此执行完成。
#(4)用户(线程)B开启
进入方法
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
#进入这个代码块
acquire(1);
}
#(5)acquire的方法结构
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
#(6)先进入tryAcquire()方法解析
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
#(7)进入nonfairTryAcquire方法 传入参数为1
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
#c=2不进入
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
#current为B线程,getExclusiveOwnerThread()为A线程
#所以也不进入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
#返回false
return false;
}
#(8)向下执行addWaiter()方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
#(9)进入方法体addWaiter()
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
#第一次进入pred 为空所以不进入
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
#进入下面enq
enq(node);
return node;
}
#(10)进入enq方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
#满足条件进入
if (t == null) { // Must initialize
#创建一个哨兵节点,头节点的地址指向一个空node(哨兵节点)
if (compareAndSetHead(new Node()))
#刚刚创建的头节点指向尾节点
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
#进入二次循环
private Node enq(final Node node) {
for (;;) {
Node t = tail;
#条件不满足,因为上次循环已经初始化
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
#刚刚创建的头节点指向尾节点
tail = head;
#满足else条件进入下面代码块
} else {
#哨兵节点的前指针指向初始节点
node.prev = t;
#进入下面代码compareAndSetTail,将尾指针指向新的B节点
#
if (compareAndSetTail(t, node)) {
#将哨兵节点的后指针指向node(B节点)
t.next = node;
#返回哨兵节点,然后此方法结束
return t;
}
}
}
}
#(11)addWaiter方法结束,放回B的node节点
#(12)进入acquireQueued方法,注意当前还处于B线程中
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
#arg=1,addWaiter(Node.EXCLUSIVE)为B节点
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
#(13)进入acquireQueued方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
#返回前置节点,也就是哨兵节点
final Node p = node.predecessor();
#p == head 结果为true
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
#(14)分析一下tryAcquire(arg)方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
#进入nonfairTryAcquire(acquires);方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
#state = 1;
int c = getState();
#故不满足条件,不进入
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
#current为B线程,getExclusiveOwnerThread()为A线程,故不进入这个代码块
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
#返回false
return false;
}
#(15)拿到tryAcquire的结果后继续分析acquireQueued方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
#条件不满足,p == head结果为true,tryAcquire(arg)结果为false,所以不进入if
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
#进入shouldParkAfterFailedAcquire(p, node) 方法
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
#(16)分析shouldParkAfterFailedAcquire(p, node)方法,p为哨兵节点,node为B节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
#ws为0
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
#进入else
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
#将哨兵节点的waitStatus设置成-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
#最后返回false
return false;
}
#(17)拿到shouldParkAfterFailedAcquire(p, node)的返回结果为false,继续分析acquireQueued(final Node node, int arg)方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
#shouldParkAfterFailedAcquire(p, node) 结果为false不进入
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
#(18)再次执行acquireQueued的第二次循环
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
#p == head满足,同样tryAcquire(arg)返回false不满足,不进入
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
#进入这个节点继续分析
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
#(19)再次分析shouldParkAfterFailedAcquire(p, node)方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
#等于-1
int ws = pred.waitStatus;
#符合条件进入
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
#返回true
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
#(20)根据shouldParkAfterFailedAcquire返回的结果(true),继续执行下一步parkAndCheckInterrupt()
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
#当前的B线程开始阻塞,因为执行了LockSupport.park(this);待A线程结束的时候,唤醒B
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
#(21)C线程也是像B一样,最后阻塞等待被唤醒
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
#如果当前线程已被中断,则为true ; 否则为false
return Thread.interrupted();
}
#(22)此时A线业务办理完成,调用lock.unlock();方法解锁
public void unlock() {
sync.release(1);
}
#进入release方法,arg = 1
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
#(23)进入tryRelease(int releases)方法,releases = 1;
protected final boolean tryRelease(int releases) {
# c = 0;
int c = getState() - releases;
#相等不进入
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
#进入
if (c == 0) {
free = true;
#将原来的A线程变为null
setExclusiveOwnerThread(null);
}
#设置当前状态为0;state = 1
setState(c);
#返回free = true;
return free;
}
#(24)返回release(int arg)方法,继续向下执行
public final boolean release(int arg) {
#返回true进入
if (tryRelease(arg)) {
Node h = head;
#此时h为哨兵节点,h.waitStatus 为 -1;所以进入
if (h != null && h.waitStatus != 0)
#接下来看该方法
unparkSuccessor(h);
return true;
}
return false;
}
#(25)解析unparkSuccessor(Node node)方法,其中node为哨兵节点
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
#进入,将哨兵node节点的waitStatus,由-1改为0
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
#s为B结点
Node s = node.next;
#s!=null,所以不进入
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
#进入
if (s != null)
#唤醒B进程
LockSupport.unpark(s.thread);
}
#(26)到此lock.unlock()方法执行完毕,执行结果:A线程执行完成,B线程开始执行;
#(27)B线程被唤醒后,继续执行后面的代码
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
#(28)继续执行for循环,node为B节点,arg为1
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
#再次进入tryAcquire(arg)
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
#的内部方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
#(29)再次分析nonfairTryAcquire(acquires)方法;acquires = 1;
final boolean nonfairTryAcquire(int acquires) {
#B线程
final Thread current = Thread.currentThread();
# c = 0;
int c = getState();
#进入条件
if (c == 0) {
#修改状态为1
if (compareAndSetState(0, acquires)) {
#设置当前线程B为有独占访问权限的线程
setExclusiveOwnerThread(current);
#设置返回值为true
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
#tryAcquire(int acquires) 返回true
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
#(30)返回上层方法acquireQueued(final Node node, int arg),node为B节点,arg为1
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
#p==head = true; tryAcquire(arg))=true
if (p == head && tryAcquire(arg)) {
#设置B节点为头节点
setHead(node);
#哨兵节点的后指针为null,此时的哨兵node,为不可达状态,gc时会被回收掉
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
#node为B节点
private void setHead(Node node) {
#B节点为head节点
head = node;
node.thread = null;
#B节点取消前置节点
node.prev = null;
}
#(31)到此lock.lock()方法执行结束。
还没有评论,来说两句吧...