【JUC源码】JUC核心:AQS(四)条件队列源码分析

叁歲伎倆 2022-10-31 01:43 284阅读 0赞

AQS 系列:

  • 【JUC源码】JUC核心:AQS(一)底层结构分析
  • 【JUC源码】JUC核心:AQS(二)同步队列源码分析(独占锁)
  • 【JUC源码】JUC核心:AQS(三)同步队列源码分析(共享锁)
  • 【JUC源码】JUC核心:AQS(四)条件队列源码分析
  • 【JUC源码】JUC核心:关于AQS的几个问题

条件队列:

  • 作用

    • 实现类似 synchronized 的 wait 与 signal,实现在使用锁时对线程管理
    • 而且由于实现了 Condition,对线程的管理可以更加细化
  • 命名:条件队列中将 node 叫做 waiter
  • 策略:

    • 要加入阻塞队列的前提是,当前线程已经拿到了锁,并处于运行状态
    • 加入阻塞队列前要释放锁,即唤醒同步队列的队二结点拿锁运行
    • 条件队列中的所有结点都是在阻塞状态
    • 唤醒操作实际是将一个node从条件队列,移动到同步队列尾,让它去返回同步队列去休眠,并不是随机就唤醒(unpark)一个线程。
  • 两个状态

    • CONDITION(-2):条件队列中所有正常节点
    • 初始化(0):要移到同步队列的节点

1.休眠

await(核心方法)

将线程的 node 放入条件队列,但该方法实质上整个休眠-唤醒的整体逻辑

  1. 为当前线程新建一个 node,并加入条件队列队尾(state=CONDITION)
  2. 将 AQS 的状态置为 0,释放当前线程的锁,然后再唤醒一个新线程(此时当前线程还未被阻塞)
  3. 将当前线程通过 park() 阻塞,等待被唤醒
  4. 待 signal 方法被调用后,就会有结点被移动到同步队列队尾(一般是条件队列队首结点),修改该 node 的状态为初始状态0,修改该节点前一个结点状态为 SIGNAL
  5. 被移动到同步队列后,如果阻塞在条件队列的线程被唤醒了,由于此时状态已经不是 CONDITION 了,所以就可以推退出循环
  6. 但是有一个问题,条件队列的这个 node 的线程虽然被唤醒了,也退出循环等待了。但是,此刻他还没有拿到锁,所以这里需要手动调用一次 acquireQueued 尝试一次拿锁(注:独占锁的 acquireQueued 是在 acquire 方法调的)

    public final void await() throws InterruptedException {

    1. if (Thread.interrupted())
    2. throw new InterruptedException();
    3. // 加入到条件队列的队尾
    4. Node node = addConditionWaiter();
    5. // 加入条件队列后,会释放锁,并唤醒队二去tryLock,然后删掉自己(head)
    6. // 自己马上就要阻塞了,必须释放之前 lock 的资源,不然自己不被唤醒的话,别的线程永远得不到该共享资源了
    7. int savedState = fullyRelease(node);
    8. int interruptMode = 0;
    9. // 当一个node刚进入条件队列,它会被阻塞再这里
    10. // 当某个线程被唤醒,即某个node被移动到同步队列,并且在同步队列中被唤醒了就会就会退出当前循环
    11. while (!isOnSyncQueue(node)) {
  1. // 阻塞在条件队列上
  2. LockSupport.park(this);
  3. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  4. break;
  5. }
  6. // 线程虽然已经被唤醒,但是还没有锁,所以手动 acquireQueued() 尝试拿锁
  7. // 如果失败,该线程又会进入休眠
  8. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  9. interruptMode = REINTERRUPT;
  10. if (node.nextWaiter != null)
  11. // 如果状态不是CONDITION,就会自动删除
  12. unlinkCancelledWaiters();
  13. if (interruptMode != 0)
  14. reportInterruptAfterWait(interruptMode);
  15. }

addConditionWaiter()

将node加入到条件队列尾,返回新添加的 waiter

  • 如果尾节点状态不是 CONDITION 状态,删除条件队列中所有状态不是 CONDITION 的节点
  • 如果队列为空,新增节点作为队列头节点,否则追加到尾节点上

    private Node addConditionWaiter() {

    1. Node t = lastWaiter;
    2. // 如果尾部的 waiter 不是 CONDITION 状态了,删除所有不是Condition的节点
    3. if (t != null && t.waitStatus != Node.CONDITION) {
    4. unlinkCancelledWaiters();
    5. t = lastWaiter;
    6. }
    7. // 新建node,这个node与同步对列的队首不同,但都存着同一个thread
    8. Node node = new Node(Thread.currentThread(), Node.CONDITION);
    9. // 队列是空的,直接放到队列头
    10. if (t == null)
    11. firstWaiter = node;
    12. // 队列不为空,直接到队列尾部
    13. else
    14. t.nextWaiter = node;
    15. lastWaiter = node;
    16. return node;

    }

fullRelease()

调用release,释放当前要加入条件对列的node的锁

  • release后就会有线程在acquireQueued方法中醒来
  • 醒来拿到锁后就会将head(保存当前wait线程的node,注:与条件队列的node不是同一个)

    final int fullyRelease(Node node) {

    1. boolean failed = true;
    2. try {
    3. // 一般为1,重入为n
    4. int savedState = getState();
    5. // 调用release
    6. if (release(savedState)) {
    7. failed = false;
    8. return savedState;
    9. } else {
    10. throw new IllegalMonitorStateException();
    11. }
    12. } finally {
    13. // 若失败,就将node置为CANCELLED,并删除
    14. if (failed)
    15. node.waitStatus = Node.CANCELLED;
    16. }

    }

isOnSyncQueue()

确认node不在同步队列上,再阻塞,如果 node 在同步队列上,是不能够上锁的。

  • node 刚被加入到条件队列中,立马就被其他线程 signal 转移到同步队列中去了
  • 线程之前在条件队列中沉睡,被唤醒后加入到同步队列中去

    final boolean isOnSyncQueue(Node node) {

    1. if (node.waitStatus == Node.CONDITION || node.prev == null)
    2. return false;
    3. if (node.next != null) // If has successor, it must be on queue
    4. return true;
    5. return findNodeFromTail(node);

    }

2.唤醒

signal()

唤醒阻塞在条件队列中的一个节点

  1. public final void signal() {
  2. if (!isHeldExclusively())
  3. throw new IllegalMonitorStateException();
  4. // 从头节点开始唤醒
  5. Node first = firstWaiter;
  6. if (first != null)
  7. // doSignal 方法会把条件队列中的节点转移到同步队列中去
  8. doSignal(first);
  9. }

doSignal()

寻找被唤醒节点,从队首开始尝试转移到同步队列

  1. private void doSignal(Node first) {
  2. do {
  3. // firstWaiter(head)依次后移,若nextWaiter为空,说明到队尾了
  4. // 将firstWaiter置为条件队列中的第二个节点
  5. // 若第二个节点是null,即当前条件队列中只有一个节点,就将lastWaiter也置为null
  6. if ( (firstWaiter = first.nextWaiter) == null)
  7. lastWaiter = null;
  8. // 将第一个节点(first)的下一个节点置为null,作用是从条件队列中删除first
  9. first.nextWaiter = null;
  10. // 通过while保证一定能转移成功,若失败,就后移一位
  11. } while (!transferForSignal(first) && (first = firstWaiter) != null);
  12. }

transferForSignal(核心方法)

将当前节点移动加到同步队列队尾

  1. 将node的state改为0(初始化),若失败return false
  2. 调用enq将node加到同步队列队尾

    1. 由于node(引用),所以相当于直接将此node从条件队列删除->接到同步队列队尾
    2. 返回的是node在同步队列的前一个结点pre
  3. 将pre设置为SIGNAL,标识后面有待唤醒的节点
  4. 如果设置失败,直接唤醒当前线程

    final boolean transferForSignal(Node node) {

  1. // 将 node 的状态从 CONDITION 修改成初始化,失败返回 false
  2. // 等下次while循环,再尝试下一个结点
  3. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  4. return false;
  5. // 将node尾插到同步队列(从条件队列删除),返回的 p(pre) 是 node 在同步队列中的前一个节点
  6. Node p = enq(node);
  7. int ws = p.waitStatus;
  8. // 将前一个结点的状态修改成 SIGNAL,如果成功直接返回
  9. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  10. // 如果前一个结点被取消,或者状态不能修改成SIGNAL,直接唤醒(不一定能拿到锁)
  11. LockSupport.unpark(node.thread);
  12. return true;
  13. }

signalAll()

唤醒所有结点

  1. public final void signalAll() {
  2. if (!isHeldExclusively())
  3. throw new IllegalMonitorStateException();
  4. // 拿到头节点
  5. Node first = firstWaiter;
  6. if (first != null)
  7. // 从头节点开始唤醒条件队列中所有的节点
  8. doSignalAll(first);
  9. }

doSignalAll()

把条件队列所有节点依次转移到同步队列去,即循环调用transferForSignal

  1. private void doSignalAll(Node first) {
  2. lastWaiter = firstWaiter = null;
  3. do {
  4. // 拿出条件队列队列头节点的下一个节点
  5. Node next = first.nextWaiter;
  6. // 把头节点从条件队列中删除
  7. first.nextWaiter = null;
  8. // 头节点转移到同步队列中去
  9. transferForSignal(first);
  10. // 开始循环头节点的下一个节点
  11. first = next;
  12. } while (first != null);
  13. }

发表评论

表情:
评论列表 (有 0 条评论,284人围观)

还没有评论,来说两句吧...

相关阅读