十三. 等待通知机制之Condition接口 布满荆棘的人生 2024-03-29 15:08 13阅读 0赞 ### 前言 ### `Condition`接口定义了一组方法用于配合`Lock`实现**等待/通知**模式,与之作为对比的是,用于配合`synchronized`关键字实现**等待/通知**模式的定义在`java.lang.Object`上的监视器方法`wait()`和`notify()`等。 ### 正文 ### 通常基于`Lock`的`newCondition()`方法创建`Condition`对象并作为对象成员变量来使用,如下所示。 public class MyCondition { private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); ...... } 队列同步器`AbstractQueuedSynchronizer`的内部类`ConditionObject`实现了`Condition`接口,后续将基于`ConditionObject`的实现进行讨论。首先给出`Condition`接口定义的方法。 public interface Condition { void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); } 上述方法的说明如下表所示。 <table> <thead> <tr> <th>方法</th> <th>说明</th> </tr> </thead> <tbody> <tr> <td><code>await()</code></td> <td>调用此方法的线程进入等待状态,响应中断,也可以被<code>signal()</code>和<code>signalAll()</code>方法唤醒并返回,唤醒并返回前需要获取到锁资源。</td> </tr> <tr> <td><code>awaitUninterruptibly()</code></td> <td>同<code>await()</code>,但不响应中断。</td> </tr> <tr> <td><code>awaitNanos()</code></td> <td>同<code>await()</code>,并可指定等待时间,响应中断。该方法有返回值,表示剩余等待时间。</td> </tr> <tr> <td><code>awaitUntil()</code></td> <td>同<code>await()</code>,并可指定等待截止时间点,响应中断。该方法有返回值,<strong>true</strong>表示没有到截止时间点就被唤醒并返回。</td> </tr> <tr> <td><code>signal()</code></td> <td>唤醒<strong>等待队列</strong>中的第一个节点。</td> </tr> <tr> <td><code>signalAll()</code></td> <td>唤醒等待队列中的所有节点。</td> </tr> </tbody> </table> 针对上面的方法再做两点补充说明: * 等待队列是`Condition`对象内部维护的一个**FIFO**队列,当有线程进入等待状态后会被封装成等待队列的一个节点并添加到队列尾; * 从等待队列唤醒并返回的线程一定已经获取到了与`Condition`对象关联的锁资源,`Condition`对象与创建`Condition`对象的锁关联。 下面将结合`ConditionObject`类的源码来对**等待/通知**模式的实现进行说明。`await()`方法的实现如下所示。 public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 基于当前线程创建Node并添加到等待队列尾 // 这里创建的Node的等待状态为CONDITION,表示等待在等待队列中 Node node = addConditionWaiter(); // 释放锁资源 int savedState = fullyRelease(node); int interruptMode = 0; // Node从等待返回后会被添加到同步队列中 // Node成功被添加到同步队列中则退出while循环 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 让Node进入自旋状态,竞争锁资源 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 遍历等待队列,将已经取消等待的节点从等待队列中去除链接 if (node.nextWaiter != null) unlinkCancelledWaiters(); // Node如果是被中断而从等待返回,则抛出中断异常 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } 理解`await()`方法的整个执行流程前,先看一下等待队列的一个示意图,如下所示。 ![在这里插入图片描述][b12490e83fe24d21993149e30883f7eb.jpeg_pic_center] `Condition`对象分别持有等待队列头节点和尾节点的引用,新添加的节点会添加到等待队列尾,同时**lastWaiter**会指向新的尾节点。 现在回到`await()`方法,在`await()`方法中,会做如下事情。 * **首先**,会基于当前线程创建`Node`并添加到等待队列尾,创建`Node`有两个注意点:1. 这里创建的`Node`复用了同步队列中的`Node`定义;2. 在创建`Node`前会判断等待队列的尾节点是否已经结束等待(即等待状态不为**Condition**),如果是则会遍历等待队列并将所有已经取消等待的节点从等待队列中去除链接; * **然后**,当前线程会释放锁资源,并基于`LockSupport.park()`进入等待状态; * **再然后**,当前线程被其它线程唤醒,或者当前线程被中断,无论哪种方式,当前线程对应的`Node`都会被添加到同步队列尾并进入自旋状态竞争锁资源,注意,此时当前线程对应的`Node`还存在于等待队列中; * **再然后**,判断当前线程对应的`Node`是否是等待队列尾节点,如果不是则触发一次清除逻辑,即遍历等待队列,将已经取消等待的节点从等待队列中去除链接,如果是等待队列尾节点,那么当前线程对应的`Node`会在下一次创建`Node`时从等待队列中被清除链接; * **最后**,判断当前线程从等待返回的原因是否是因为被中断,如果是,则抛出中断异常。 上面讨论了**等待**的实现,下面再结合源码看一下**通知**的实现。首先是`signal()`方法,如下所示。 public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } 由`signal()`方法可知,调用`signal()`方法的线程需要持有锁,其次`signal()`方法会唤醒等待队列的头节点,即可以理解为唤醒等待时间最久的节点。下面再看一下`signalAll()`方法,如下所示。 public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } 可以发现,`signalAll()`与`signal()`方法大体相同,只不过前者最后会调用`doSignalAll()`方法来唤醒所有等待节点,后者会调用`doSignal()`方法来唤醒头节点,下面以`doSignal()`方法进行说明。 private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } 实际就是在`transferForSignal()`方法中将头节点添加到同步队列尾,然后再调用`LockSupport.unpark()`进行唤醒。 ### 总结 ### 《`Java`并发编程的艺术》5.6小节对两者的差异进行了对比和总结,这里直接贴过来作参考。 <table> <thead> <tr> <th>对比项</th> <th>Object Monitor Methods</th> <th>Condition</th> </tr> </thead> <tbody> <tr> <td>当前线程释放锁并进入等待状态</td> <td>支持</td> <td>支持</td> </tr> <tr> <td>当前线程释放锁并进入等待状态,等待过程中不响应中断</td> <td>不支持</td> <td>支持</td> </tr> <tr> <td>当前线程释放锁并进入超时等待状态</td> <td>支持</td> <td>支持</td> </tr> <tr> <td>当前线程释放锁并等待至将来某个时间点</td> <td>不支持</td> <td>支持</td> </tr> <tr> <td>唤醒队列中的一个线程</td> <td>支持</td> <td>支持</td> </tr> <tr> <td>唤醒队列中的多个线程</td> <td>支持</td> <td>支持</td> </tr> </tbody> </table> [b12490e83fe24d21993149e30883f7eb.jpeg_pic_center]: https://image.dandelioncloud.cn/pgy_files/images/2024/03/28/a181ea7baeed44c3a3b571d10e6074b4.png
还没有评论,来说两句吧...