AQS原理与源码剖析(结合ReentrantLock源码)

AQS原理与源码剖析(结合ReentrantLock源码)

原文地址:http://jachindo.top:8090/archives/aqs%E5%8E%9F%E7%90%86%E4%B8%8E%E6%BA%90%E7%A0%81%E5%89%96%E6%9E%90%E7%BB%93%E5%90%88reentrantlock%E6%BA%90%E7%A0%81

介绍:AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架

format_png


2. AQS数据结构

§ 队列概述

AQS包含两种队列:同步队列和条件队列,实现都如下:

image-20191217114957123

维护一个由双向链表实现的等待队列,链表元素被封装为一个Node对象(其中包含线程信息),这些节点都尝试通过CAS获取/修改state。

节点的详细信息如下:

注意:head节点(队头)是一个虚节点(thread字段为null),仅保留waitStatus属性供后继节点做相应的判断。它代表其中的线程正在工作。

image-20191217115057708


§ 同步队列属性

当多个线程都来请求锁时,某一时刻有且只有一个线程能够获得锁(排它锁),那么剩余获取不到锁的线程,都会到同步队列中去排队并阻塞自己,当有线程主动释放锁时,就会从同步队列头开始释放一个排队的线程,让线程重新去竞争锁。

所以同步队列的主要作用是***阻塞获取不到锁的线程,并在适当时机释放这些线程。***

同步队列底层数据结构是个双向链表,我们从源码中可以看到链表的头尾,如下:

  1. // 同步队列的头。
  2. private transient volatile Node head;
  3. // 同步队列的尾
  4. private transient volatile Node tail;

§ 条件队列属性

条件队列和同步队列的功能一样,管理获取不到锁的线程,底层数据结构也是链表队列,但条件队列不直接和锁打交道,但常常和锁配合使用,是一定的场景下,对锁功能的一种补充。

条件队列的属性如下:

  1. // 条件队列,从属性上可以看出是链表结构
  2. public class ConditionObject implements Condition, java.io.Serializable {
  3. private static final long serialVersionUID = 1173984872572414699L;
  4. // 条件队列中第一个 node
  5. private transient Node firstWaiter;
  6. // 条件队列中最后一个 node
  7. private transient Node lastWaiter;
  8. }

ConditionObject 我们就称为条件队列,我们需要使用时,直接 new ConditionObject () 即可。


§ Node

  1. static final class Node {
  2. /** * 同步队列单独的属性 */
  3. static final Node SHARED = new Node(); // node 是共享模式
  4. static final Node EXCLUSIVE = null; // node 是排它模式
  5. // 当前节点的前节点
  6. // 节点 acquire 成功后就会变成head
  7. // head 节点不能被 cancelled
  8. volatile Node prev;
  9. // 当前节点的下一个节点
  10. volatile Node next;
  11. /** * 两个队列共享的属性 */
  12. // 表示当前节点的状态,通过节点的状态来控制节点的行为
  13. // 普通同步节点,就是 0 ,条件节点是 CONDITION -2
  14. volatile int waitStatus;
  15. // waitStatus 的状态有以下几种
  16. // 被取消
  17. static final int CANCELLED = 1;
  18. // SIGNAL 状态的意义:同步队列中的节点在自旋获取锁的时候,如果前一个节点的状态是 SIGNAL,那么自己就可以阻塞休息了,否则自己一直自旋尝试获得锁
  19. static final int SIGNAL = -1;
  20. // 表示当前 node 正在条件队列中,当有节点从同步队列转移到条件队列时,状态就会被更改成 CONDITION
  21. static final int CONDITION = -2;
  22. // 无条件传播,共享模式下,该状态的进程处于可运行状态
  23. static final int PROPAGATE = -3;
  24. // 当前节点的线程
  25. volatile Thread thread;
  26. // 在同步队列中,nextWaiter 并不真的是指向其下一个节点,我们用 next 表示同步队列的下一个节点,nextWaiter 只是表示当前 Node 是排它模式还是共享模式
  27. // 但在条件队列中,nextWaiter 就是表示下一个节点元素
  28. Node nextWaiter;
  29. }
  • 从 Node 的结构中,我们需要重点关注 waitStatus 字段,Node 的很多操作都是围绕着 waitStatus 字段进行的。
  • Node 的 pre、next 属性是同步队列中的链表前后指向字段,nextWaiter 是条件队列中下一个节点的指向字段,但在同步队列中,nextWaiter 只是一个标识符,表示当前节点是共享还是排它模式。

3. AQS维护核心变量–state和waitStatus

  1. state 是锁的状态,是 int 类型,子类继承 AQS 时,都是要根据 state 字段来判断有无得到锁,比如当前同步器状态是 0,表示可以获得锁,当前同步器状态是 1,表示锁已经被其他线程持有,当前线程无法获得锁;
  2. waitStatus 是节点(Node,可以理解为线程)的状态,种类很多,一共有初始化 (0)、CANCELLED (1)、SIGNAL (-1)、CONDITION (-2)、PROPAGATE (-3),各个状态的含义可以见上文。

我们可以通过修改state字段实现多线程的独占/共享加锁模式:

image-20191217115324973


image-20191217115333553


4. AQS核心代码

  1. 首先调用可以由子类重写的tryAcquire方法获取资源(修改state)。
  2. 成功则无事,失败则入队addWaiter(Node.EXCLUSIVE)
  3. acquireQueued方法对入队节点进行相应操作使其获取资源/停止获取,中断。

一般来说,自定义同步器要么是独占方式,要么是共享方式,它们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。ReentrantLock是独占锁,所以实现了tryAcquire-tryRelease


1)acquire()以下整个过程以排他锁为例

acquire指定了获取锁的框架分为排他锁和共享锁。

  1. // 排它模式下,尝试获得锁
  2. public final void acquire(int arg) {
  3. // tryAcquire 方法是需要实现类去实现的,实现思路一般都是 cas 给 state 赋值来决定是否能获得锁
  4. if (!tryAcquire(arg) &&
  5. // addWaiter 入参代表是排他模式
  6. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  7. selfInterrupt();
  8. }

img


a) tryAcquire()

​ 需要子类重写。

b)addWaiter()

  1. private Node addWaiter(Node mode) {
  2. Node node = new Node(Thread.currentThread(), mode);
  3. Node pred = tail;
  4. // 1. 先尝试将node插入尾部
  5. if (pred != null) { // a
  6. node.prev = pred;
  7. if (compareAndSetTail(pred, node)) { // b
  8. pred.next = node;
  9. return node;
  10. }
  11. }
  12. // 2. 尝试插入失败,则调用enq()自旋插入
  13. enq(node);
  14. return node; // 返回新入队的节点,用于acquireQueued()对其进行处理
  15. }

有两种情况会导致过程1插入失败:

  1. a处判断失败,pred是null,说明等待队列中没有元素。
  2. b处判断失败,cas校验失败,即pred被其他线程更改过。

c)enq()

CAS自旋

  1. private Node enq(final Node node) {
  2. for (;;) { // CAS自旋
  3. Node t = tail;
  4. if (t == null) { // 等待队列为空
  5. if (compareAndSetHead(new Node())) // 头节点初始化--虚节点
  6. tail = head;
  7. } else {
  8. // 同addWaiter中一样,只是这里是CAS自旋,直到入队成功
  9. node.prev = t;
  10. if (compareAndSetTail(t, node)) {
  11. t.next = node;
  12. return t;
  13. }
  14. }
  15. }
  16. }

d)acquireQueued()

一个Node被放入等待队列后会做些什么?

  1. 通过不断的***自旋尝试使自己前一个节点的状态变成 signal,然后阻塞自己。***
  2. 获得锁的线程执行完成之后,释放锁时,会把阻塞的 node 唤醒,node 唤醒之后再次自旋,尝试获得锁
  3. 注意:这里获取锁成功后会将其设置成head节点,而在共享锁的获取时,这里还会唤醒其他后续的阻塞节点
  1. // 返回 false 表示获得锁成功,返回 true 表示失败
  2. final boolean acquireQueued(final Node node, int arg) {
  3. boolean failed = true;
  4. try {
  5. boolean interrupted = false;
  6. // 自旋
  7. for (;;) {
  8. // 选上一个节点
  9. final Node p = node.predecessor();
  10. // 有两种情况会走到 p == head:
  11. // 1:node 之前没有获得锁,进入 acquireQueued 方法时,才发现他的前置节点就是头节点,于是尝试获得一次锁;
  12. // 2:node 之前一直在阻塞沉睡,然后被唤醒,此时唤醒 node 的节点正是其前一个节点,也能走到 if
  13. // 如果自己 tryAcquire 成功,就立马把自己设置成 head,把上一个节点移除
  14. // 如果 tryAcquire 失败,尝试进入同步队列
  15. if (p == head && tryAcquire(arg)) {
  16. // 获得锁,设置成 head 节点
  17. setHead(node);
  18. //p被回收
  19. p.next = null; // help GC
  20. failed = false;
  21. return interrupted;
  22. }
  23. // shouldParkAfterFailedAcquire 把 node 的前一个节点状态置为 SIGNAL
  24. // 只要前一个节点状态是 SIGNAL了,那么自己就可以阻塞(park)了
  25. // parkAndCheckInterrupt 阻塞当前线程
  26. if (shouldParkAfterFailedAcquire(p, node) &&
  27. // 线程是在这个方法里面阻塞的,醒来的时候仍然在无限 for 循环里面,就能再次自旋尝试获得锁
  28. parkAndCheckInterrupt())
  29. interrupted = true;
  30. }
  31. } finally {
  32. // 如果获得node的锁失败,将 node 从队列中移除
  33. if (failed)
  34. cancelAcquire(node);
  35. }
  36. }

此方法的注释还是很清楚的,我们接着看下此方法的核心:shouldParkAfterFailedAcquire,***这个方法的主要目的就是把前一个节点的状态置为 SIGNAL,只要前一个节点的状态是 SIGNAL,当前节点就可以阻塞了(parkAndCheckInterrupt 就是使节点阻塞的方法)***,源码如下:

  1. // 当前线程可以安心阻塞的标准,就是前一个节点线程状态是 SIGNAL 了。
  2. // 入参 pred 是前一个节点,node 是当前节点。
  3. // 关键操作:
  4. // 1:确认前一个节点是否有效,无效的话,一直往前找到状态不是取消的节点。
  5. // 2: 把前一个节点状态置为 SIGNAL。
  6. // 1、2 两步操作,有可能一次就成功,有可能需要外部循环多次才能成功(外面是个无限的 for 循环),但最后一定是可以成功的
  7. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  8. int ws = pred.waitStatus;
  9. // 如果前一个节点 waitStatus 状态已经是 SIGNAL 了,直接返回,不需要在自旋了
  10. if (ws == Node.SIGNAL)
  11. return true;
  12. // 如果当前节点状态已经被取消了。
  13. if (ws > 0) {
  14. // 找到前一个状态不是取消的节点,因为把当前 node 挂在有效节点身上
  15. // 因为节点状态是取消的话,是无效的,是不能作为 node 的前置节点的,所以必须找到 node 的有效节点才行
  16. do {
  17. node.prev = pred = pred.prev;
  18. } while (pred.waitStatus > 0);
  19. pred.next = node;
  20. // 否则直接把节点状态置 为SIGNAL
  21. } else {
  22. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  23. }
  24. return false;
  25. }
  26. // 进入等待状态,等待唤醒
  27. private final boolean parkAndCheckInterrupt() {
  28. LockSupport.park(this);
  29. return Thread.interrupted(); // 判断自己是否被中断唤醒
  30. }

e)整个acquire()流程

image-20191217145308748

  1. 使用 tryAcquire 方法尝试获得锁,获得锁直接返回,获取不到锁的走 2;
  2. 把当前线程组装成节点(Node),追加到同步队列的尾部(addWaiter);
  3. ***自旋,使同步队列中当前节点的前置节点状态为 signal 后,然后阻塞自己。***

2)release()

也分为释放排他锁和共享锁

  1. public final boolean release(int arg) {
  2. // tryRelease 交给实现类去实现,一般就是用当前同步器状态减去 arg,如果返回 true 说明成功释放锁。
  3. if (tryRelease(arg)) {
  4. Node h = head;
  5. // 头节点不为空,并且非初始化状态
  6. if (h != null && h.waitStatus != 0)
  7. // 从头开始唤醒等待锁的节点
  8. unparkSuccessor(h);
  9. return true;
  10. }
  11. return false;
  12. }
  13. // 共享模式下,释放当前线程的共享锁
  14. public final boolean releaseShared(int arg) {
  15. if (tryReleaseShared(arg)) {
  16. // 这个方法就是线程在获得锁时,唤醒后续节点时调用的方法
  17. doReleaseShared();
  18. return true;
  19. }
  20. return false;
  21. }

5、条件队列相关

锁 + 队列结合的场景

§ 入队等待await方法

获得锁的线程,如果在碰到队列满或空的时候,就会阻塞住,这个阻塞就是用条件队列实现的,这个动作我们叫做入条件队列:

  1. // 线程入条件队列
  2. public final void await() throws InterruptedException {
  3. if (Thread.interrupted())
  4. throw new InterruptedException();
  5. // 加入到条件队列的队尾
  6. Node node = addConditionWaiter();
  7. // 标记位置 A
  8. // 加入条件队列后,会释放 lock 时申请的资源,唤醒同步队列队列头的节点
  9. // 自己马上就要阻塞了,必须马上释放之前 lock 的资源,不然自己不被唤醒的话,别的线程永远得不到该共享资源了
  10. int savedState = fullyRelease(node);
  11. int interruptMode = 0;
  12. // 确认node不在同步队列上,再阻塞,如果 node 在同步队列上,是不能够上锁的
  13. // 目前想到的只有两种可能:
  14. // 1:node 刚被加入到条件队列中,立马就被其他线程 signal 转移到同步队列中去了
  15. // 2:线程之前在条件队列中沉睡,被唤醒后加入到同步队列中去
  16. while (!isOnSyncQueue(node)) {
  17. // this = AbstractQueuedSynchronizer$ConditionObject
  18. // 阻塞在条件队列上
  19. LockSupport.park(this);
  20. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  21. break;
  22. }
  23. // 标记位置 B
  24. // 其他线程通过 signal 已经把 node 从条件队列中转移到同步队列中的数据结构中去了
  25. // 所以这里节点苏醒了,直接尝试 acquireQueued
  26. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  27. interruptMode = REINTERRUPT;
  28. if (node.nextWaiter != null) // clean up if cancelled
  29. // 如果状态不是CONDITION,就会自动删除
  30. unlinkCancelledWaiters();
  31. if (interruptMode != 0)
  32. reportInterruptAfterWait(interruptMode);
  33. }
  1. 上述代码标记位置 A 处,节点在准备进入条件队列之前,一定会先释放当前持有的锁,不然自己进去条件队列了,其余的线程都无法获得锁了;
  2. 上述代码标记位置 B 处,***此时节点是被 Condition.signal 或者 signalAll 方法唤醒的,此时节点已经成功的被转移到同步队列中去了(整体架构图中蓝色流程),所以可以直接执行 acquireQueued 方法;***s
  3. Node 在条件队列中的命名,源码喜欢用 Waiter 来命名,所以我们在条件队列中看到 Waiter,其实就是 Node。

§ 单个唤醒signal方法

signal 方法是唤醒的意思,比如之前队列满了,有了一些线程因为 take 操作而被阻塞进条件队列中,突然队列中的元素被线程 A 消费了,线程 A 就会调用 signal 方法,唤醒之前阻塞的线程,会从条件队列的头节点开始唤醒(流程见整体架构图中蓝色部分),源码如下:

  1. // 唤醒阻塞在条件队列中的节点
  2. public final void signal() {
  3. if (!isHeldExclusively())
  4. throw new IllegalMonitorStateException();
  5. // 从头节点开始唤醒
  6. Node first = firstWaiter;
  7. if (first != null)
  8. // doSignal 方法会把条件队列中的节点转移到同步队列中去
  9. doSignal(first);
  10. }
  11. // 把条件队列头节点转移到同步队列去
  12. private void doSignal(Node first) {
  13. do {
  14. // nextWaiter为空,说明到队尾了
  15. if ( (firstWaiter = first.nextWaiter) == null)
  16. lastWaiter = null;
  17. // 从队列头部开始唤醒,所以直接把头节点.next 置为 null,这种操作其实就是把 node 从条件队列中移除了
  18. // 这里有个重要的点是,每次唤醒都是从队列头部开始唤醒,所以把 next 置为 null 没有关系,如果唤醒是从任意节点开始唤醒的话,就会有问题,容易造成链表的割裂
  19. first.nextWaiter = null;
  20. // transferForSignal 方法会把节点转移到同步队列中去
  21. // 通过 while 保证 transferForSignal 能成功
  22. // 等待队列的 node 不用管他,在 await 的时候,会自动清除状态不是 Condition 的节点(通过 unlinkCancelledWaiters 方法)
  23. // (first = firstWaiter) != null = true 的话,表示还可以继续循环, = false 说明队列中的元素已经循环完了
  24. } while (!transferForSignal(first) &&
  25. (first = firstWaiter) != null);
  26. }

***唤醒条件队列中的节点,实际上就是把条件队列中的节点转移到同步队列中,并把其前置节点状态置为 SIGNAL。***


§ 全部唤醒signalAll

  1. public final void signalAll() {
  2. if (!isHeldExclusively())
  3. throw new IllegalMonitorStateException();
  4. // 拿到头节点
  5. Node first = firstWaiter;
  6. if (first != null)
  7. // 从头节点开始唤醒条件队列中所有的节点
  8. doSignalAll(first);
  9. }
  10. // 把条件队列所有节点依次转移到同步队列去
  11. private void doSignalAll(Node first) {
  12. lastWaiter = firstWaiter = null;
  13. do {
  14. // 拿出条件队列队列头节点的下一个节点
  15. Node next = first.nextWaiter;
  16. // 把头节点从条件队列中删除
  17. first.nextWaiter = null;
  18. // 头节点转移到同步队列中去
  19. transferForSignal(first);
  20. // 开始循环头节点的下一个节点
  21. first = next;
  22. } while (first != null);
  23. }

从源码中可以看出,其本质就是 for 循环调用 transferForSignal 方法,将条件队列中的节点循环转移到同步队列中去。

6. AQS与ReentantLock的关联

ReentrantLock 类本身是不继承 AQS 的,实现了 Lock 接口,如下:

  1. public class ReentrantLock implements Lock, java.io.Serializable { }

Lock 接口定义了各种加锁,释放锁的方法,接口有如下几个:

  1. // 获得锁方法,获取不到锁的线程会到同步队列中阻塞排队
  2. void lock();
  3. // 获取可中断的锁
  4. void lockInterruptibly() throws InterruptedException;
  5. // 尝试获得锁,如果锁空闲,立马返回 true,否则返回 false
  6. boolean tryLock();
  7. // 带有超时等待时间的锁,如果超时时间到了,仍然没有获得锁,返回 false
  8. boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  9. // 释放锁
  10. void unlock();
  11. // 得到新的 Condition
  12. Condition newCondition();

ReentrantLock 就负责实现这些接口,我们使用时,直接面对的也是这些方法,这些方法的底层实现都是交给 Sync 内部类去实现的,Sync 类的定义如下:

  1. abstract static class Sync extends AbstractQueuedSynchronizer { }

Sync 继承了 AbstractQueuedSynchronizer ,所以 Sync 就具有了锁的框架,根据 AQS 的框架,Sync 只需要实现 AQS 预留的几个方法即可,但 Sync 也只是实现了部分方法,还有一些交给子类 NonfairSync 和 FairSync 去实现了,NonfairSync 是非公平锁,FairSync 是公平锁,定义如下:

  1. // 同步器 Sync 的两个子类锁
  2. static final class FairSync extends Sync { }
  3. static final class NonfairSync extends Sync { }

图片描述

image-20191217144549565


img

ReentrantLock重写了tryAcquire-tryRelease方法。并且根据公平锁和非公平锁分了两种锁的获取方式。

1)公平锁加锁重写tryAcquire()

  1. final void lock() {
  2. // 内部调用了重写过的tryAcquire()方法
  3. acquire(1);
  4. }
  5. protected final boolean tryAcquire(int acquires) {
  6. final Thread current = Thread.currentThread();
  7. int c = getState();
  8. if (c == 0) {
  9. // hasQueuedPredecessors 是实现公平的关键
  10. // 会判断当前线程是不是属于同步队列的头节点的下一个节点(头节点是释放锁的节点)
  11. // 如果是(返回false),符合先进先出的原则,可以获得锁
  12. // 如果不是(返回true),则继续等待
  13. if (!hasQueuedPredecessors() &&
  14. compareAndSetState(0, acquires)) {
  15. setExclusiveOwnerThread(current);
  16. return true;
  17. }
  18. }
  19. // 可重入锁
  20. else if (current == getExclusiveOwnerThread()) {
  21. int nextc = c + acquires;
  22. if (nextc < 0)
  23. throw new Error("Maximum lock count exceeded");
  24. setState(nextc);
  25. return true;
  26. }
  27. return false;
  28. }

2)非公平锁加锁重写tryAcquire()

  1. final void lock() {
  2. // 一上来先尝试获取锁,“插队”
  3. if (compareAndSetState(0, 1))
  4. setExclusiveOwnerThread(Thread.currentThread());
  5. else
  6. // 插队失败才调用,注意该方法内部调用了重写过的tryAcquire()方法
  7. acquire(1);
  8. }
  9. protected final boolean tryAcquire(int acquires) {
  10. return nonfairTryAcquire(acquires);
  11. }
  12. final boolean nonfairTryAcquire(int acquires) {
  13. final Thread current = Thread.currentThread();
  14. int c = getState();
  15. // 同步器的状态是 0,表示同步器的锁没有人持有
  16. if (c == 0) {
  17. // 当前线程持有锁
  18. if (compareAndSetState(0, acquires)) {
  19. // 标记当前持有锁的线程是谁
  20. setExclusiveOwnerThread(current);
  21. return true;
  22. }
  23. }
  24. // 如果当前线程已经持有锁了,同一个线程可以对同一个资源重复加锁,代码实现的是可重入锁
  25. else if (current == getExclusiveOwnerThread()) {
  26. // 当前线程持有锁的数量 + acquires
  27. int nextc = c + acquires;
  28. // int 是有最大值的,<0 表示持有锁的数量超过了 int 的最大值
  29. if (nextc < 0) // overflow
  30. throw new Error("Maximum lock count exceeded");
  31. setState(nextc);
  32. return true;
  33. }
  34. //否则线程进入同步队列
  35. return false;
  36. }

图片描述

3)tryRelease()

  1. // 方法返回当前锁是不是空闲
  2. protected final boolean tryRelease(int releases) {
  3. // 1. 减少可重入次数
  4. int c = getState() - releases;
  5. // 当前线程不是持有锁的线程,抛出异常
  6. if (Thread.currentThread() != getExclusiveOwnerThread())
  7. throw new IllegalMonitorStateException();
  8. boolean free = false;
  9. // 2. 如果持有线程全部释放,将当前独占锁所有线程设置为null
  10. if (c == 0) {
  11. free = true;
  12. setExclusiveOwnerThread(null);
  13. }
  14. // 3. 更新state
  15. setState(c);
  16. return free;
  17. }

参考地址:https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html

https://www.cnblogs.com/waterystone/p/4920797.html

发表评论

表情:
评论列表 (有 0 条评论,52人围观)

还没有评论,来说两句吧...

相关阅读

    相关 AQS

    ![在这里插入图片描述][20200509130408277.jpeg] > addWaiter往队列里添加线程节点的CAS操作: ![在这里插入图片描述][waterma

    相关 分析:AQS

    在开始这篇源码之前,最好先看下转载整理的[这篇文章][Link 1],有很多值得学习的地方。AQS是用来构建锁或者其他同步组件的基础框架。总体来说,它使用一个 int 成员变量

    相关 ReentrantLock

    Java的内置锁一直都是备受争议的,在JDK 1.6之前,synchronized这个重量级锁其性能一直都是较为低下,虽然在1.6后,进行大量的锁优化策略,但是与Lock相