AQS 源码分析 ゝ一世哀愁。 2023-10-04 08:23 2阅读 0赞 > 本文基于版本为 1.8.0\_281 的 JDK 对 AbstractQueuedSynchronizer 的源码进行分析 > ![在这里插入图片描述][20210311015938265.png] > 不知道比较新的 JDK 内 AbstractQueuedSynchronizer 内实现咋样的,应该大差不差吧,研究好主流的 JDK 1.8 一通百通。 ## 全局概览 ## AQS,其实就是 `AbstractQueuedSynchronizer` 的缩写,直译为「抽象队列同步器」,做下解释: 1. Abstract:抽象,说明这是个抽象类 2. Queued:队列,说明有队列的特征(先入先出,FIFO「First In First Out」) 3. Synchronizer:同步,实现了同步的功能。 这里先解释下,其实是会有两种队列存在的,在 `AbstractQueuedSynchronizer` 的内部类 `Node` 的注释中可以看到,分别是 `condition queue` 和 `sync queue` 的,暂时先说下后者吧。 再看看大概的结构吧,大概就是下图的样子: > 有一点小错误:head 结点是一定没存线程的,具体可以看 `AbstractQueuedSynchronizer #setHead(Node node)` 方法。 > ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L01yQmF5bWF4_size_16_color_FFFFFF_t_70] 如上图所示,是一个双向链表,关联阅读:[双向链表][Link 1]。 再来看看主类 `AbstractQueuedSynchronizer` 吧,`AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable` 继承与 `AbstractOwnableSynchronizer` 类和实现了 `Serializable` 接口,能够序列化,能够设置独占线程和获取独占线程(这两个方法在 `AbstractOwnableSynchronizer` 中)。 再用 IDEA 看下 `AbstractQueuedSynchronizer` 的 Structure 吧,如下图(其中C 是类、I 是接口、m 是方法、f 是属性): ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L01yQmF5bWF4_size_16_color_FFFFFF_t_70 1] 下面将对「内部类」、「成员变量」、「构造方法」、以及一些「常用方法」进行一些源码分析。 ## 内部类 ## 简单说下 Node 这个内部类吧: static final class Node { /** * 标记一个在共享模式的结点 */ static final Node SHARED = new Node(); /** * 标记一个在独占模式的结点 */ static final Node EXCLUSIVE = null; /** * waitStatus 的值,表示该结点已被取消 */ static final int CANCELLED = 1; /** * waitStatus 的值,表示当前结点的后继结点需要被唤醒 */ static final int SIGNAL = -1; /** * waitStatus 的值,表示该结点在等待某一条件 */ static final int CONDITION = -2; /** * waitStatus 的值,表示有资源可用 */ static final int PROPAGATE = -3; /** * 等待状态,取值:1、-1、-2、-3、0(初始化为 0) */ volatile int waitStatus; /** * 前驱结点 */ volatile Node prev; /** * 后继节点 */ volatile Node next; /** * 结点对应的线程 */ volatile Thread thread; /** * condition queue 中的后继节点 */ Node nextWaiter; /** * 判断结点是否在共享模式下。 */ final boolean isShared() { return nextWaiter == SHARED; } /** * 返回前驱结点 */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } } ## 成员变量 ## /** * 等待队列的头结点 */ private transient volatile Node head; /** * 等待队列的尾结点 */ private transient volatile Node tail; /** * 状态 */ private volatile int state; // Queuing utilities /** * 自旋时间 */ static final long spinForTimeoutThreshold = 1000L; /** * Unsafe 类实例 */ private static final Unsafe unsafe = Unsafe.getUnsafe(); /** * state 内存偏移地址 */ private static final long stateOffset; /** * head 内存偏移地址 */ private static final long headOffset; /** * tail 内存偏移地址 */ private static final long tailOffset; /** * waitStatus 内存偏移地址 */ private static final long waitStatusOffset; /** * next 内存偏移地址 */ private static final long nextOffset; 其中有个 `state`,咱们再来详细说下。咱们可以通过 `getState()` 方法来获取 `state` 的值,也可以通过 `setState(int newState)` 来设置 `state` 的值。另外还有一个方法: /** * 期望值 expect,新值 update,使用 unsafe.compareAndSwapInt() 方法具体实现。 */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } ## 方法 ## 就重点介绍下资源的获取和释放吧。 ### 获取资源 acquire(int arg) ### /** * 在独占模式下获取资源,忽略中断 */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 从上述的方法咱们可以看到 `acquire()` 方法调用其它方法的流程如下: ![AQS][] #### tryAcquire() #### 先说下 `tryAcquire`,这个暂时没什么说的,会让子类重写此方法实现,以后的博客中会说到。 /** * 尝试在独占模式下获取,具体由子类实现 */ protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } #### addWaiter(Node mode) #### 再说下 `addWaiter(Node mode)`: /** * 将调用此方法的线程封装为一个结点,并放在 Sync Queue 中 */ private Node addWaiter(Node mode) { // 创建一个结点,默认为独占模式 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // 保留尾结点 Node pred = tail; // 判断尾结点是否为空 if (pred != null) { // 如果不为空,则将 node 结点的 prev 指向之前的尾节点 node.prev = pred; // 使用 CAS 将 node 设置为尾节点 if (compareAndSetTail(pred, node)) { // 如果成功,则将之前尾节点的 next 指向 node 结点 pred.next = node; return node; } } // 尾结点为空(队列为空,未初始化),或者 CAS 执行失败(尾结点不是预期的),通过自旋方式将结点插入队列 enq(node); return node; } 还得专门说下 `enq(final Node node)`: /** * 将结点插入队列 */ private Node enq(final Node node) { for (;;) { // 保存尾结点 Node t = tail; // 看尾结点是否为 null if (t == null) { // Must initialize // 尾节点为空/队列为空/队列未初始化,初始化一个结点 if (compareAndSetHead(new Node())) tail = head; } else { // 尾结点不为空,将 node 的前驱指向之前的尾节点 node.prev = t; // 将 node 设置为尾结点 if (compareAndSetTail(t, node)) { // 将之前尾结点的后继指向 node t.next = node; return t; } } } } #### acquireQueued(final Node node, int arg) #### /** * Sync Queue 中的结点不断去尝试获取资源。 * 被中断返回 true,否则返回 false。 */ final boolean acquireQueued(final Node node, int arg) { // 是否成功拿到资源的标志 boolean failed = true; try { // 等待过程中是否中断的标志 boolean interrupted = false; // 死循环 for (;;) { // 获取 node 的前驱结点 final Node p = node.predecessor(); // 前驱结点是头结点并且获取到锁(说明 node 是第二个结点) if (p == head && tryAcquire(arg)) { // 将 node 结点设置为头结点,后续可以出队 setHead(node); // 端口头结点的后继,利于 GC p.next = null; // help GC // 拿到了资源 failed = false; // 说明被中断 return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 再专门说下 `shouldParkAfterFailedAcquire(Node pred, Node node)` 方法: /** * 结点获取资源失败后,检查并更新结点状态(当前线程没有抢到锁,是否需要挂起当前线程) */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取 node 前驱结点的状态 int ws = pred.waitStatus; // 结点处于唤醒状态 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ // 可以进行 park 操作(阻塞操作),挂起当前线程 return true; // 如果结点状态为取消 if (ws > 0) { do { // 循环向前查找状态是取消的结点,直到找到状态不为取消的结点 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 此时的 pred 状态不为 取消,将其后继指向 node pred.next = node; } else { // 前驱结点状态不为取消,将 pred 结点的状态设置为唤醒 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // 不可以进行 park 操作(阻塞/挂起操作) return false; } 接着分析 `parkAndCheckInterrupt()`: /** * 挂起线程,并检查是否中断 */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } 再分析下 `cancelAcquire(node)` 方法,这个方法是获取锁出异常调用的: /** * 取消正在获取锁的线程,将其所在结点的状态设置为 cancel */ private void cancelAcquire(Node node) { // 忽略不存在的结点 if (node == null) return; // 线程要取消获取锁,将结点内的 thread 清空 node.thread = null; // 获取当前结点的前驱结点 Node pred = node.prev; // 跳过取消状态的结点 while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 获取过滤之后的前驱结点的后继结点 // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. // 将当前结点状态设置为取消 node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. // 如果当前结点是尾结点,将从后往前的第一个非取消状态的结点设置为尾结点(换句话说就是将尾结点的前驱结点设置为尾结点0) if (node == tail && compareAndSetTail(node, pred)) { // 将尾结点的 next 指针置空 compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; // if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 如果当前结点是 head 的后继结点,或上述条件不满足,唤醒当前结点的后继结点 unparkSuccessor(node); } node.next = node; // help GC } } ### 待办 ### 释放资源(这个好说) Condition 判断(这个需要重点说),方便 线程池 博客 ### 推荐阅读 ### -------------------- 写在最后: 本来写得差不多了,后来发现想要读懂线程池代码得先把 Condition 说清楚,等空了更新。 [20210311015938265.png]: https://img-blog.csdnimg.cn/20210311015938265.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L01yQmF5bWF4_size_16_color_FFFFFF_t_70]: https://img-blog.csdnimg.cn/20210316010640688.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L01yQmF5bWF4,size_16,color_FFFFFF,t_70 [Link 1]: https://mindartisan.blog.csdn.net/article/details/114501996 [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L01yQmF5bWF4_size_16_color_FFFFFF_t_70 1]: https://img-blog.csdnimg.cn/20210315180133508.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L01yQmF5bWF4,size_16,color_FFFFFF,t_70 [AQS]: https://img-blog.csdnimg.cn/20210315233237409.png
还没有评论,来说两句吧...