ReentrantLock源码解读(condition原理) 缺乏、安全感 2021-09-02 10:35 480阅读 0赞 #### 回顾 #### [ReentrantLock源码解读(lock、unlock原理)][ReentrantLock_lock_unlock] 前面我们看了ReentrantLock的加锁解锁原理,明白了ReentrantLock通过AbstractQueuedSynchronizer实现了保护临界区的功能。**让一个线程尝试去获取锁的时候,如果当前锁已经被其他线程占用,那么该线程就会阻塞,加入到阻塞队列中。直到持有锁的线程调用unlock,释放锁之后才会从阻塞队列中唤醒一个线程进入临界区**。 #### 阻塞队列不止一个 #### 上面我们提到,如果锁被其他线程占用,尝试加锁的线程将会被加入到阻塞队列中,来实现只允许一个线程进入临界区。事实上,在ReentrantLock中,阻塞队列不止一个,可以有n个。说到这里,引入我们今天要讲的,Condition。Condition的引入,使得ReentrantLock可以实现非常多的现实场景问题。 * 举一个过闸机的例子,闸机就像一把锁,每次只允许一个人(线程)经过,想要过闸机的人排成队(阻塞队列)。 此时小明发现自己没带身份证,无法过闸机(被阻塞了),可此时小明已经进入闸机了(已经占有锁了),其他人都不能进去了。那该怎么办呢?总不能所有人都过不去吧。 好,现在工作人员来了:小明不要着急!你是身份证没带是吧,**你先从原来的队伍离开(释放原来的锁),排到另一个队**,让家人先把身份证送过来,你再过闸机。 这样,就不会影响其他人过闸机了(获得锁),而当小明拿到身份证之后,又可以重新过闸机。 上面就提到的利用另外一个阻塞队列,解决了过闸机的问题,这里的condition同理。 再看一个condition解决经典的**生产者消费者模式**的代码: package juc; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLockConditionTest { /** * 管程(包含了共享变量、操作过程、数据结构) */ static class MyMonitor { private static final int MAX = 5; // 定义重入锁 private ReentrantLock rl = new ReentrantLock(); // 共享区 未满条件 private Condition notFull = rl.newCondition(); // 共享区 非空条件 private Condition notEmpty = rl.newCondition(); /** * 缓冲区 * 缓冲区用什么数据结构,根据实际需求选择 */ private Queue<String> container = new ArrayBlockingQueue<>(50); /** * 生产方法 */ public void produce() { rl.lock(); try { // 当共享区大小超过最大值,就不能再生产。未满条件就要阻塞 if (getSize() >= MAX) notFull.await(); doProduce(); // 当共享区有元素时,非空条件满足,唤醒因非空条件阻塞的线程 if (getSize() >= 1) notEmpty.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { rl.unlock(); } } /** * 消费方法 */ public void consume() { rl.lock(); try { // 共享区元素消费完,非空条件就不满足,要阻塞 if (getSize() == 0) notEmpty.await(); doConsume(); // 共享区元素小于MAX最大值,要因未满条件阻塞的线程 if (getSize() < MAX) notFull.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { rl.unlock(); } } private int getSize() { return container.size(); } /** * 真实生产方法 */ private void doProduce() { // 这里其实可以直接采用blockQueue的put方法实现 container.offer("product"); System.out.println("生产者生产product到缓冲区,当前大小:" + container.size() + " 当前时间:" + System.currentTimeMillis()); } /** * 真实消费方法方法 */ private void doConsume(){ // // 这里其实可以直接采用blockQueue的take方法实现 container.poll(); System.out.println("消费者从缓冲区消费product,当前大小:" + container.size() + " 当前时间:" + System.currentTimeMillis()); } } /** * 主线程,启动生产者和消费者 * @param args */ public static void main(String[] args) { MyMonitor myMonitor = new MyMonitor(); Thread produce = new Thread(() -> { while (true) { // 生产者线程 myMonitor.produce(); } }); Thread consume = new Thread(() -> { while (true) { // 消费者线程 myMonitor.consume(); } }); consume.start(); produce.start(); } } 这里使用管程实现(管程可以参考[通过管程和信号量分别实现生产者消费者模式][Link 1]) 我们可以看到,定义了两个condition,notFull和notEmpty。 * 当共享区满了的时候,生产者线程就不能继续生产了,需要排队,就排在notFull队伍里。当消费者线程消费了一个之后,共享区留出空间,未满条件成立,就从notFull队伍里唤醒一个线程进行生产。 * 而当共享区为空,消费者线程就不能继续消费,就排在notEmpty队伍里。当生产者生产一个产品之后,共享区有产品了,非空条件成立,则从notEmpty队伍了唤醒一个消费者线程。 #### 上源码 #### 看了上面的讲解,其实我们大概已经清楚。condition的await()方法,是一个排队的过程。(将线程加入到等待队列中。)而singal方法,是从队伍里唤醒一个线程继续执行(将线程从等待队列中取出)。**singal与singalAll的区别就是唤醒一个与唤醒全部**。 先看await: 在AbstractQueuedSynchronizer内部类ConditionObject中对await的实现: public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 新增条件等待节点到队尾 Node node = addConditionWaiter(); // 释放当前持有的锁 int savedState = fullyRelease(node); int interruptMode = 0; // 如果释放锁成功(判断是否还在锁的同步队列里面),则挂起当前线程 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 后续处理 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } 上面主要需要关注的是:fullyRelease,调用await之后会释放当前线程持有的锁。 final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } 可以看到,await方法会将当前线程加入阻塞队列,通过释放当前线程持有的锁。 下面看一下signal与signalAll: public final void signal() { // 如果不是当前持有锁的线程,会抛出非法管程状态错误,所有await、signal要在临界区中使用 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 取出第一个等待线程,并唤醒 Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { // 如果只有一个等待线程,把lastWaiter 赋为null。同时firstWaiter 指向下一个 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; // 只唤醒一个线程 } while (!transferForSignal(first) && (first = firstWaiter) != null); } 我们看一下transferForSignal方法: final boolean transferForSignal(Node node) { // 尝试移除condition阻塞状态,失败会返回false,尝试唤醒下一个 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 移除成功之后,就让该线程去竞争锁,加入竞争锁的阻塞队列中 Node p = enq(node); int ws = p.waitStatus; // 如果ws是取消状态,或将ws置为signal失败的话,重新挂起线程。此时就放弃signal操作了,认为成功 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } 我们可以看到上面这段: if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; 如果线程重新挂起,我们还是认为signal是成功的,所以某些特殊场景下,可能是有问题的。即使你await和signal的是同一个线程,但**很多源码还是将signal改为了signalAll更加稳健**。 看下signalAll: public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; // 尝试对所有阻塞线程进行唤醒,去竞争锁 do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); } #### 小结: #### * await:将线程加入到对应condition的阻塞队列,等待signal或signal唤醒。其中使用了LockSupprt.park挂起线程,释放线程拥有的锁。 * signal和signalAll:唤醒condition的阻塞队列的线程重新去竞争锁。区别是唤醒阻塞队列中的一个与全部。 * 需要注意的是:以上的方法必须在lock与unlock之间使用,否则会抛出IllegalMonitorStateException异常。这个与Object的wait、notify、notifyAll只能在synchronized关键字内使用一样同理。 [ReentrantLock_lock_unlock]: https://blog.csdn.net/weixin_37968613/article/details/109388223 [Link 1]: https://blog.csdn.net/weixin_37968613/article/details/106303293?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522160421951419724822539911%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=160421951419724822539911&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~blog~first_rank_v2~rank_blog_default-1-106303293.pc_v2_rank_blog_default&utm_term=%E7%AE%A1%E7%A8%8B&spm=1018.2118.3001.4450
还没有评论,来说两句吧...