ReentrantLock源码分析AQS原理

向右看齐 2023-09-26 17:18 254阅读 0赞

目录

(一)AQS简介

(二)AQS原理剖析

(三)源码思想-加锁:

(1)构造方法

(2)公平锁lock方法(核心)

3、acquire获取资源方法(核心)


(一)AQS简介

AQS是AbstractQueuedSynchronizer的简称,是一种实现阻塞锁和依赖FIFO队列的同步器框架。其依赖于一个原子变量state,子类要想使用这个同步器,必须要实现这个AQS,对state(私有可见)变量的操作都需要是原子操作主要是依赖于CAS,同时更改state只能是调用AQS本身提供的原子方法。
AQS主要使用情况:ReentrantLock、ReentrantReadWriteLock、CountdownLatch、Semaphore等。

(二)AQS原理剖析

利用ReentrantLock作为切入点对AQS进行源码分析==>ReentrantLock有一个核心成员变量sync(private final Sync sync),Sync是ReentrantLock类的内部类,其继承于AbstractQueuedSynchronizer。ReentrantLock是依赖于sync变量来调用AQS中的方法和重写的方法。

(三)源码思想-**加锁**:

(1)构造方法

使用构造方法设置该锁为公平锁FairSync或非公平锁NonfairSync;(本处选定非公平锁)

295ef4d59f4a451d9ea805ad2810bc4e.png

  1. //构造方法是使用公平锁类(FairSync)和非公平锁类(NonfairSync)继承Sync类来初始化sync变量,实现AQS关联
  2. //无参构造使用非公平锁
  3. public ReentrantLock() {
  4. sync = new NonfairSync();
  5. }
  6. //指定设置为公平锁和非公平锁
  7. public ReentrantLock(boolean fair) {
  8. sync = fair ? new FairSync() : new NonfairSync();
  9. }

(2)公平锁lock方法(核心)

调用lock方法进行获取锁,第一次(getState() == 0)NonfairSync中使用CAS(compareAndSwapInt)直接获取(此处默认未获取成功),第一次获取失败后,会执行AQS中的acquire()方法;126706c2e8af49e0afc50e730bb523a2.png

  1. NonfairSync类(非公平锁类):
  2. final void lock() {
  3. if (compareAndSetState(0, 1))//调用AQS--AbstractQueuedSynchronizer中封装的CAS方法,对state变量进行设置为1
  4. setExclusiveOwnerThread(Thread.currentThread());//CAS操作成功,调用AQS继承的AOS中的方法,将当前线程设置为占用
  5. else
  6. //核心方法acquire,包含三个大的部分+线程是否中断:tryAcquire、addWaiter、acquireQueued和selfInterrupt
  7. acquire(1);//CAS争抢线程资源失败,则再次进行资源获取或者入队等操作(核心)
  8. }
  9. AbstractOwnableSynchronizer类:
  10. /**
  11. * 此抽象类用于将当前线程设置到变量private transient Thread exclusiveOwnerThread;利用模板模式抽取
  12. * AQS类继承AbstractOwnableSynchronizer类
  13. */
  14. protected final void setExclusiveOwnerThread(Thread thread) {
  15. exclusiveOwnerThread = thread;
  16. }
  17. AbstractQueuedSynchronizer类:
  18. //封装CAS操作,内部基于Unsafe类的原子操作CAS
  19. protected final boolean compareAndSetState(int expect, int update) {
  20. return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  21. }

(3)acquire获取资源方法(核心)

当执行到AQSacquire方法时,首先会再次尝试获取锁tryAcquire被子类NonfairSync重写,实际上是执行Sync中的**nonfairTryAcquire方法(如上图), 再次执行CAS进行锁获取(此处再次默认未获取成功),然后再判断当前锁对象是否被同一线程获取**,可重入锁的由来(默认不是);

cb68733530884bde82cef81e7802f8cc.png

  1. /**
  2. * AQS方法acquire包含如下:
  3. * 再次请求获取资源tryAcquire
  4. * 创建等待节点addWaiter
  5. * 入队方法acquireQueued
  6. * 该争抢资源的线程是否设置了中断selfInterrupt
  7. */
  8. public final void acquire(int arg) {
  9. if (!tryAcquire(arg) &&//tryAcquire是抽象方法且设置为throw new UnsupportedOperationException();强制子类重写该方法
  10. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  11. selfInterrupt();//自我进行中断,acquireQueued中将已被标记的中断记号清楚掉,需要重新进行中断
  12. }
  13. NonfairSync类:
  14. //该方法为重写父类AQS中的抽象方法tryAcquire(抛出异常),子类强制重写,不然就会抛异常throw new UnsupportedOperationException()
  15. protected final boolean tryAcquire(int acquires) {
  16. return nonfairTryAcquire(acquires);//调用直系父类Sync中的方法
  17. }
  18. Sync类:
  19. //Sync类定义了非公平锁尝试获取资源方法acquires值为1
  20. final boolean nonfairTryAcquire(int acquires) {
  21. final Thread current = Thread.currentThread();//获取当前线程
  22. //调用AQS中的方法,主要是返回state值,因为state值是私有,必须通过public或者protected修饰方法获取
  23. int c = getState();
  24. if (c == 0) {//根据state值进行当前锁是否被占用 0代表未被占用 其他值代表已被占用
  25. if (compareAndSetState(0, acquires)) {//调用AQS封装的CAS方法进行设置state值
  26. //调用AQS继承的AOS中定义的方法,将当前线程值到exclusiveOwnerThread变量中
  27. //主要是用于当前线程如果在释放前再次获取锁,防止死锁即可重入锁
  28. setExclusiveOwnerThread(current);
  29. return true;
  30. }
  31. }else if (current == getExclusiveOwnerThread()) {//判断是否再次获取锁,当前线程与exclusiveOwnerThread变量对比即可重入锁
  32. int nextc = c + acquires;//锁的次数+1
  33. if (nextc < 0) //防止超过定义的最大值即Integer.Max
  34. throw new Error("Maximum lock count exceeded");
  35. setState(nextc);//调用AQS中定义的setState方法,对state设值,不需要原子操作,当前没有线程在竞争
  36. return true;
  37. }
  38. return false;//不满足上述两种条件,代表是在竞争锁资源且当前线程竞争失败
  39. }

(4)addWaiter创建等待者方法(核心)

tryAcquire方法获取失败后,会再次执行addWaiter方法,以独占模式进行创建等待着包含了哨兵节点的创建和当前线程创建的node节点连接;

  1. AQS类:
  2. /**
  3. * 增加一个等待者==》当前线程node且与队列连接
  4. * addWaiter方法是基于前面tryAcquire方法获取锁资源失败的情况下进行执行
  5. * mode = Node.EXCLUSIVE(互斥模式),该值默认值是独占锁节点为null
  6. */
  7. private Node addWaiter(Node mode) {
  8. Node node = new Node(Thread.currentThread(), mode);//基于当前线程创建Node节点
  9. Node pred = tail;//获取当前队列的尾节点
  10. if (pred != null) {//判断是否有被初始化
  11. node.prev = pred;//将当前新生Node节点的前节点为原队列中的尾节点
  12. if (compareAndSetTail(pred, node)) {//利用CAS对尾节点tail进行设置新的节点值
  13. pred.next = node;//队列中的前后节点连接,原节点的下一节点为当前节点
  14. return node;
  15. }
  16. }
  17. enq(node);//tail节点为null时,代表未被初始化,进行头尾节点初始化即队列初始化操作
  18. return node;
  19. }
  20. /**
  21. * 基于当前线程的node,利用自旋和CAS对队列进行初始化即创建哨兵节点和新节点连接
  22. */
  23. private Node enq(final Node node) {
  24. for (;;) {//自旋操作
  25. Node t = tail;//获取尾节点
  26. if (t == null) { //尾节点是空则进行初始化
  27. if (compareAndSetHead(new Node()))//新建一个空的哨兵节点设置为头节点
  28. tail = head;//尾节点也是指向哨兵节点
  29. } else {//已被初始化
  30. node.prev = t;//当前线程的前节点为当前尾节点
  31. if (compareAndSetTail(t, node)) {//利用CAS尾节点重新赋值
  32. t.next = node;//原尾节点的下一节点为当前node节点
  33. return t;
  34. }
  35. }
  36. }
  37. }

(5)acquireQueued入队等待方法(核心)

当等待者创建完成后,执行acquireQueued入队操作,会判断当前节点的前节点是否为头节点,如果是则再次进行tryAcquire获取锁资源,若不是,则进行信号量设置和真正意义上的线程阻塞LockSupport.park(this)进行阻塞;
说明:每个等待者(线程)的初始waitStatus都是0,在真正进入park之前,会将其前节点waitStatus设置为-1即shouldParkAfterFailedAcquire实现,这是为了线程在运行释放锁时,唤醒最近的后继节点,也即表明其后面是存在等待线程的。

  1. AQS类:
  2. /**
  3. * 再次验证是否需要进入真正的等待即park
  4. * node为新创建的等待着节点 arg为1代表争抢线程资源
  5. */
  6. final boolean acquireQueued(final Node node, int arg) {
  7. boolean failed = true;//当前node线程默认取消获取资源
  8. try {
  9. boolean interrupted = false;//默认中断标记为false
  10. for (;;) {
  11. final Node p = node.predecessor();//获取当前节点的前节点
  12. if (p == head && tryAcquire(arg)) {//当前节点的前节点为头节点后,尝试获取下锁资源
  13. setHead(node);//设置当前线程为头结点
  14. p.next = null; // help GC
  15. failed = false;//表示当前线程不需要取消获取锁资源
  16. return interrupted;//跳出循环并返回,根据线程是否需要中断,返回true或false
  17. }
  18. //当前线程的前节点设置信号量和当前线程真正意义上的阻塞
  19. if (shouldParkAfterFailedAcquire(p, node) &&
  20. parkAndCheckInterrupt())
  21. interrupted = true;//满足上述if条件则表明中断标记即放弃获取资源,此处是可中断的真正用处
  22. }
  23. } finally {
  24. //只有当try中出现异常时会执行finally代码即在doAcquireInterruptibly中会显式跑出异常,
  25. // doAcquireInterruptibly和acquireQueued代码基本相同,此处也是为了防止try突然出现一些其他原因导致程序异常的最后保障
  26. if (failed)//经过上述try逻辑,判断是否需要取消当前线程的锁资源
  27. cancelAcquire(node);//取消获取锁资源,方式如队列中跳过该节点或者将当前节点设置为超时状态
  28. }
  29. }
  30. //该方法为将当前节点的前节点线程等待值由0->-1,代表可唤醒状态
  31. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  32. int ws = pred.waitStatus;
  33. if (ws == Node.SIGNAL)//pred作为当前线程的前节点,判断是否等于-1即Unpark信号量
  34. return true;
  35. if (ws > 0) {//线程被取消时跳过该线程获取锁
  36. do {
  37. node.prev = pred = pred.prev;
  38. } while (pred.waitStatus > 0);
  39. pred.next = node;//跳过该线程获取锁资源
  40. } else {
  41. //对于前节点的线程是初始值0、-2(某种条件)、-3(共享模式)进行设置为-1,代表信号量
  42. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  43. }
  44. return false;//除了上述第一种情况代表true以外,其他情况代表本次执行仅仅是调整节点或者设置信号量值
  45. }
  46. //基于前面的shouldParkAfterFailedAcquire返回true的情况下,该方法是真正意义上的park,线程阻塞
  47. private final boolean parkAndCheckInterrupt() {
  48. LockSupport.park(this);//线程阻塞
  49. return Thread.interrupted();//线程如果没有中断标记,则返回false,反之则返回true即该方法是清楚线程中断标记
  50. }

6、cancelAcquire取消线程获取方法(非核心)

  1. //基于当前节点去处理节点中未取消状态的节点,不是指中断原理唷
  2. private void cancelAcquire(Node node) {
  3. if (node == null)
  4. return;
  5. // 将当前节点封装的线程设置为NULL
  6. node.thread = null;
  7. // 通过循环获取当前节点不为CANCELLED状态的前驱节点
  8. Node pred = node.prev;
  9. while (pred.waitStatus > 0)
  10. node.prev = pred = pred.prev;
  11. // 获取前驱节点的后继节点(如果节点的前驱节点不是CANCELLED状态,那么前驱节点的后继节点就是它自己)
  12. Node predNext = pred.next;
  13. // 将节点的等待状态设置为CANCELLED
  14. node.waitStatus = Node.CANCELLED;
  15. //如果当前节点是尾节点,则直接通过CAS将tail指针指向当前节点不为CANCELLED状态的前驱节点,
  16. //同时通过CAS将前驱节点的后继指针设置为NULL
  17. if (node == tail && compareAndSetTail(node, pred)) {
  18. compareAndSetNext(pred, predNext, null);
  19. } else {
  20. int ws;
  21. //如果当前节点的前驱节点不是头节点且前驱节点的等待状态为SIGNAL(如果不是SIGNAL那就
  22. //设置为SIGNAL)且前驱节点封装的线程不为NULL
  23. if (pred != head &&
  24. ((ws = pred.waitStatus) == Node.SIGNAL ||
  25. (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
  26. pred.thread != null) {
  27. // 获取节点的后继节点
  28. Node next = node.next;
  29. // 如果后继节点的等待状态不为CANCELLED,则通过CAS将前驱节点的后继指针指向当前节点的后继节点
  30. if (next != null && next.waitStatus <= 0)
  31. // 这里并没有将当前节点的后继节点的前驱指针指向前驱节点(不用设置,unparkSuccessor()方法会自动跳过)
  32. compareAndSetNext(pred, predNext, next);
  33. } else {
  34. // 如果当前节点的前驱节点是头节点,则直接唤醒当前节点的后继节点,让它来剔除当前节点
  35. unparkSuccessor(node);
  36. }
  37. node.next = node;
  38. }
  39. }

解锁:

(2)在AQS释放方法release中首先会进行释放锁的预备工作tryRelease方法,主要是在正常加解锁的前提下对state还原为0;
(3)在tryRelease预备工作满足下,开始真正唤醒头节点的后继节点且该后继节点满足等待状态不是0的调用唤醒方法unparkSuccessor;
(4)在唤醒方法unparkSuccessor中,对于状态为超时状态的线程进行过滤且从尾节点往前找到最靠近头结点的后继节点(<=0,正常一般是信号量-1)进行唤醒。源码逻辑

(1)unlock解锁

调用ReentrantLock中的unlock方法,会使用sync核心变量调用AQS中的release方法;

  1. ReentrantLock类:
  2. public void unlock() {
  3. sync.release(1);//核心成员变量sync调用AQS中的release方法
  4. }

(2)release或tryRelease方法

在AQS释放方法release中首先会进行释放锁的预备工作tryRelease方法,主要是在正常加解锁的前提下对state还原为0;

  1. AQS类:
  2. public final boolean release(int arg) {
  3. if (tryRelease(arg)) {//尝试调用释放锁方法即为真正释放锁做准备
  4. Node h = head;//头结点获取
  5. //头结点被初始化和头结点的等待状态是被改变过,主要是在acquireQueued方法中会在当前线程对前节点进行改变:0=>-1
  6. if (h != null && h.waitStatus != 0)
  7. unparkSuccessor(h);//满足唤醒被阻塞的等待线程
  8. return true;
  9. }
  10. return false;
  11. }
  12. Sync类:Sync继承于AQS类,父类定义了tryRelease抽象方法并抛出异常,强制子类重写
  13. //此方法主要是为真正释放锁做预备工作,是否正常能将state值还原为0
  14. protected final boolean tryRelease(int releases) {
  15. int c = getState() - releases;//使用state值-1的值给到更新state值
  16. if (Thread.currentThread() != getExclusiveOwnerThread())//保证当前释放的线程是锁的拥有线程
  17. throw new IllegalMonitorStateException();
  18. boolean free = false;//默认释放标志失败
  19. if (c == 0) {//正常释放后,state值是为0
  20. free = true;
  21. setExclusiveOwnerThread(null);//清空锁的占有者
  22. }
  23. setState(c);//更新state值,将1变为初始状态0,作用是为后面真正释放做准备
  24. return free;//返回释放结果即其结果会影响是否真正进行释放
  25. }

(3)unparkSuccessor唤醒方法

在tryRelease预备工作满足下,开始真正唤醒头节点的后继节点且该后继节点满足等待状态不是0的调用唤醒方法unparkSuccessor;

在唤醒方法unparkSuccessor中,对于状态为超时状态的线程进行过滤且从尾节点往前找到最靠近头结点的后继节点(<=0,正常一般是信号量-1)进行唤醒。

  1. //唤醒头结点的后继节点
  2. private void unparkSuccessor(Node node) {
  3. int ws = node.waitStatus;//获取传入的头结点的等待状态
  4. if (ws < 0)//头结点线程等待状态正常情况下一定是-1
  5. //因为拥有锁的线程执行释放逻辑时,等待队列中的头结点要么是哨兵空节点,要么是当前线程即执行锁释放的线程
  6. //等待队列头结点其实是一个占位节点,其身份为:初始化头结点(空节点)或者是拥有锁执行权的线程
  7. compareAndSetWaitStatus(node, ws, 0);//将头节点等待状态设置为0
  8. Node s = node.next;//当前线程的直接后继节点或空节点的直接后继节点
  9. if (s == null || s.waitStatus > 0) {//判断是否存在后继节点或者是超时节点
  10. s = null;
  11. for (Node t = tail; t != null && t != node; t = t.prev)
  12. if (t.waitStatus <= 0)//从队列末尾开始往前寻找最靠前等待状态为<=0的线程
  13. s = t;
  14. }
  15. if (s != null)
  16. LockSupport.unpark(s.thread);//唤醒最靠前且等待状态为<=0的等待线程
  17. }

总结

1.在多线程竞争条件下,CAS操作失败,会进行自旋,如果竞争激烈,就会给CPU带来非常大的开销
2.CAS每次只能保证一个共享变量的原子操作
3.CAS会产生ABA问题

小知识:

Java中的CAS方法都是来自Unsafe类。
Java中Unsafe提供了三种CAS方法:

  1. // 第一个参数:对象的实例 第二个参数:内存偏移量 第三个参数:对比期望的值 第四个参数:字段的新值
  2. public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
  3. public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
  4. public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
  5. //CAS示例
  6. public static void main(String[] args) {
  7. AtomicInteger atomicInteger = new AtomicInteger(1);
  8. //这个方法底层就是使用compareAndSwapInt()
  9. boolean b = atomicInteger.compareAndSet(1, 3);
  10. System.out.println("第一次修改的结果: "+b+ "修改后的值: " +atomicInteger.get());
  11. boolean b1 = atomicInteger.compareAndSet(1, 6);
  12. System.out.println("第二次修改的结果: "+b1+ "修改后的值: "+atomicInteger.get());
  13. //第一次修改的结果:true修改后的值: 3
  14. //第二次修改的结果:false修改后的值: 3
  15. }

最后上一份ReentrantLock的源码:在rt.jar中的java.util.concurrent.locks中

  1. /*
  2. * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  3. *
  4. *
  5. *
  6. *
  7. *
  8. *
  9. *
  10. *
  11. *
  12. *
  13. *
  14. *
  15. *
  16. *
  17. *
  18. *
  19. *
  20. *
  21. *
  22. *
  23. */
  24. /*
  25. *
  26. *
  27. *
  28. *
  29. *
  30. * Written by Doug Lea with assistance from members of JCP JSR-166
  31. * Expert Group and released to the public domain, as explained at
  32. * http://creativecommons.org/publicdomain/zero/1.0/
  33. */
  34. package java.util.concurrent.locks;
  35. import java.util.concurrent.TimeUnit;
  36. import java.util.Collection;
  37. /**
  38. * A reentrant mutual exclusion {@link Lock} with the same basic
  39. * behavior and semantics as the implicit monitor lock accessed using
  40. * {@code synchronized} methods and statements, but with extended
  41. * capabilities.
  42. *
  43. * <p>A {@code ReentrantLock} is <em>owned</em> by the thread last
  44. * successfully locking, but not yet unlocking it. A thread invoking
  45. * {@code lock} will return, successfully acquiring the lock, when
  46. * the lock is not owned by another thread. The method will return
  47. * immediately if the current thread already owns the lock. This can
  48. * be checked using methods {@link #isHeldByCurrentThread}, and {@link
  49. * #getHoldCount}.
  50. *
  51. * <p>The constructor for this class accepts an optional
  52. * <em>fairness</em> parameter. When set {@code true}, under
  53. * contention, locks favor granting access to the longest-waiting
  54. * thread. Otherwise this lock does not guarantee any particular
  55. * access order. Programs using fair locks accessed by many threads
  56. * may display lower overall throughput (i.e., are slower; often much
  57. * slower) than those using the default setting, but have smaller
  58. * variances in times to obtain locks and guarantee lack of
  59. * starvation. Note however, that fairness of locks does not guarantee
  60. * fairness of thread scheduling. Thus, one of many threads using a
  61. * fair lock may obtain it multiple times in succession while other
  62. * active threads are not progressing and not currently holding the
  63. * lock.
  64. * Also note that the untimed {@link #tryLock()} method does not
  65. * honor the fairness setting. It will succeed if the lock
  66. * is available even if other threads are waiting.
  67. *
  68. * <p>It is recommended practice to <em>always</em> immediately
  69. * follow a call to {@code lock} with a {@code try} block, most
  70. * typically in a before/after construction such as:
  71. *
  72. * <pre> {@code
  73. * class X {
  74. * private final ReentrantLock lock = new ReentrantLock();
  75. * // ...
  76. *
  77. * public void m() {
  78. * lock.lock(); // block until condition holds
  79. * try {
  80. * // ... method body
  81. * } finally {
  82. * lock.unlock()
  83. * }
  84. * }
  85. * }}</pre>
  86. *
  87. * <p>In addition to implementing the {@link Lock} interface, this
  88. * class defines a number of {@code public} and {@code protected}
  89. * methods for inspecting the state of the lock. Some of these
  90. * methods are only useful for instrumentation and monitoring.
  91. *
  92. * <p>Serialization of this class behaves in the same way as built-in
  93. * locks: a deserialized lock is in the unlocked state, regardless of
  94. * its state when serialized.
  95. *
  96. * <p>This lock supports a maximum of 2147483647 recursive locks by
  97. * the same thread. Attempts to exceed this limit result in
  98. * {@link Error} throws from locking methods.
  99. *
  100. * @since 1.5
  101. * @author Doug Lea
  102. */
  103. public class ReentrantLock implements Lock, java.io.Serializable {
  104. private static final long serialVersionUID = 7373984872572414699L;
  105. /** Synchronizer providing all implementation mechanics */
  106. private final Sync sync;
  107. /**
  108. * Base of synchronization control for this lock. Subclassed
  109. * into fair and nonfair versions below. Uses AQS state to
  110. * represent the number of holds on the lock.
  111. */
  112. abstract static class Sync extends AbstractQueuedSynchronizer {
  113. private static final long serialVersionUID = -5179523762034025860L;
  114. /**
  115. * Performs {@link Lock#lock}. The main reason for subclassing
  116. * is to allow fast path for nonfair version.
  117. */
  118. abstract void lock();
  119. /**
  120. * Performs non-fair tryLock. tryAcquire is implemented in
  121. * subclasses, but both need nonfair try for trylock method.
  122. */
  123. final boolean nonfairTryAcquire(int acquires) {
  124. final Thread current = Thread.currentThread();
  125. int c = getState();
  126. if (c == 0) {
  127. if (compareAndSetState(0, acquires)) {
  128. setExclusiveOwnerThread(current);
  129. return true;
  130. }
  131. }
  132. else if (current == getExclusiveOwnerThread()) {
  133. int nextc = c + acquires;
  134. if (nextc < 0) // overflow
  135. throw new Error("Maximum lock count exceeded");
  136. setState(nextc);
  137. return true;
  138. }
  139. return false;
  140. }
  141. protected final boolean tryRelease(int releases) {
  142. int c = getState() - releases;
  143. if (Thread.currentThread() != getExclusiveOwnerThread())
  144. throw new IllegalMonitorStateException();
  145. boolean free = false;
  146. if (c == 0) {
  147. free = true;
  148. setExclusiveOwnerThread(null);
  149. }
  150. setState(c);
  151. return free;
  152. }
  153. protected final boolean isHeldExclusively() {
  154. // While we must in general read state before owner,
  155. // we don't need to do so to check if current thread is owner
  156. return getExclusiveOwnerThread() == Thread.currentThread();
  157. }
  158. final ConditionObject newCondition() {
  159. return new ConditionObject();
  160. }
  161. // Methods relayed from outer class
  162. final Thread getOwner() {
  163. return getState() == 0 ? null : getExclusiveOwnerThread();
  164. }
  165. final int getHoldCount() {
  166. return isHeldExclusively() ? getState() : 0;
  167. }
  168. final boolean isLocked() {
  169. return getState() != 0;
  170. }
  171. /**
  172. * Reconstitutes the instance from a stream (that is, deserializes it).
  173. */
  174. private void readObject(java.io.ObjectInputStream s)
  175. throws java.io.IOException, ClassNotFoundException {
  176. s.defaultReadObject();
  177. setState(0); // reset to unlocked state
  178. }
  179. }
  180. /**
  181. * Sync object for non-fair locks
  182. */
  183. static final class NonfairSync extends Sync {
  184. private static final long serialVersionUID = 7316153563782823691L;
  185. /**
  186. * Performs lock. Try immediate barge, backing up to normal
  187. * acquire on failure.
  188. */
  189. final void lock() {
  190. if (compareAndSetState(0, 1))//调用AQS中封装的CAS方法,对state变量进行设置为1
  191. setExclusiveOwnerThread(Thread.currentThread());//CAS操作成功,调用AQS继承的AOS中的方法,将当前线程设置为占用
  192. else
  193. //核心方法acquire,包含三个大的部分+线程是否中断:tryAcquire、addWaiter、acquireQueued和selfInterrupt
  194. acquire(1);//CAS争抢线程资源失败,则再次进行资源获取或者入队等操作(核心)
  195. }
  196. protected final boolean tryAcquire(int acquires) {
  197. return nonfairTryAcquire(acquires);
  198. }
  199. }
  200. /**
  201. * Sync object for fair locks
  202. */
  203. static final class FairSync extends Sync {
  204. private static final long serialVersionUID = -3000897897090466540L;
  205. final void lock() {
  206. acquire(1);
  207. }
  208. /**
  209. * Fair version of tryAcquire. Don't grant access unless
  210. * recursive call or no waiters or is first.
  211. */
  212. protected final boolean tryAcquire(int acquires) {
  213. final Thread current = Thread.currentThread();
  214. int c = getState();
  215. if (c == 0) {
  216. if (!hasQueuedPredecessors() &&
  217. compareAndSetState(0, acquires)) {
  218. setExclusiveOwnerThread(current);
  219. return true;
  220. }
  221. }
  222. else if (current == getExclusiveOwnerThread()) {
  223. int nextc = c + acquires;
  224. if (nextc < 0)
  225. throw new Error("Maximum lock count exceeded");
  226. setState(nextc);
  227. return true;
  228. }
  229. return false;
  230. }
  231. }
  232. /**
  233. * Creates an instance of {@code ReentrantLock}.
  234. * This is equivalent to using {@code ReentrantLock(false)}.
  235. */
  236. //构造方法是使用公平锁类(FairSync)和非公平锁类(NonfairSync)继承Sync类来初始化sync变量,实现AQS关联
  237. //无参构造使用非公平锁
  238. public ReentrantLock() {
  239. sync = new NonfairSync();
  240. }
  241. /**
  242. * Creates an instance of {@code ReentrantLock} with the
  243. * given fairness policy.
  244. *
  245. * @param fair {@code true} if this lock should use a fair ordering policy
  246. */
  247. //指定设置为公平锁和非公平锁
  248. public ReentrantLock(boolean fair) {
  249. sync = fair ? new FairSync() : new NonfairSync();
  250. }
  251. /**
  252. * Acquires the lock.
  253. *
  254. * <p>Acquires the lock if it is not held by another thread and returns
  255. * immediately, setting the lock hold count to one.
  256. *
  257. * <p>If the current thread already holds the lock then the hold
  258. * count is incremented by one and the method returns immediately.
  259. *
  260. * <p>If the lock is held by another thread then the
  261. * current thread becomes disabled for thread scheduling
  262. * purposes and lies dormant until the lock has been acquired,
  263. * at which time the lock hold count is set to one.
  264. */
  265. public void lock() {
  266. sync.lock();
  267. }
  268. /**
  269. * Acquires the lock unless the current thread is
  270. * {@linkplain Thread#interrupt interrupted}.
  271. *
  272. * <p>Acquires the lock if it is not held by another thread and returns
  273. * immediately, setting the lock hold count to one.
  274. *
  275. * <p>If the current thread already holds this lock then the hold count
  276. * is incremented by one and the method returns immediately.
  277. *
  278. * <p>If the lock is held by another thread then the
  279. * current thread becomes disabled for thread scheduling
  280. * purposes and lies dormant until one of two things happens:
  281. *
  282. * <ul>
  283. *
  284. * <li>The lock is acquired by the current thread; or
  285. *
  286. * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
  287. * current thread.
  288. *
  289. * </ul>
  290. *
  291. * <p>If the lock is acquired by the current thread then the lock hold
  292. * count is set to one.
  293. *
  294. * <p>If the current thread:
  295. *
  296. * <ul>
  297. *
  298. * <li>has its interrupted status set on entry to this method; or
  299. *
  300. * <li>is {@linkplain Thread#interrupt interrupted} while acquiring
  301. * the lock,
  302. *
  303. * </ul>
  304. *
  305. * then {@link InterruptedException} is thrown and the current thread's
  306. * interrupted status is cleared.
  307. *
  308. * <p>In this implementation, as this method is an explicit
  309. * interruption point, preference is given to responding to the
  310. * interrupt over normal or reentrant acquisition of the lock.
  311. *
  312. * @throws InterruptedException if the current thread is interrupted
  313. */
  314. public void lockInterruptibly() throws InterruptedException {
  315. sync.acquireInterruptibly(1);
  316. }
  317. /**
  318. * Acquires the lock only if it is not held by another thread at the time
  319. * of invocation.
  320. *
  321. * <p>Acquires the lock if it is not held by another thread and
  322. * returns immediately with the value {@code true}, setting the
  323. * lock hold count to one. Even when this lock has been set to use a
  324. * fair ordering policy, a call to {@code tryLock()} <em>will</em>
  325. * immediately acquire the lock if it is available, whether or not
  326. * other threads are currently waiting for the lock.
  327. * This "barging" behavior can be useful in certain
  328. * circumstances, even though it breaks fairness. If you want to honor
  329. * the fairness setting for this lock, then use
  330. * {@link #tryLock(long, TimeUnit) tryLock(0, TimeUnit.SECONDS) }
  331. * which is almost equivalent (it also detects interruption).
  332. *
  333. * <p>If the current thread already holds this lock then the hold
  334. * count is incremented by one and the method returns {@code true}.
  335. *
  336. * <p>If the lock is held by another thread then this method will return
  337. * immediately with the value {@code false}.
  338. *
  339. * @return {@code true} if the lock was free and was acquired by the
  340. * current thread, or the lock was already held by the current
  341. * thread; and {@code false} otherwise
  342. */
  343. public boolean tryLock() {
  344. return sync.nonfairTryAcquire(1);
  345. }
  346. /**
  347. * Acquires the lock if it is not held by another thread within the given
  348. * waiting time and the current thread has not been
  349. * {@linkplain Thread#interrupt interrupted}.
  350. *
  351. * <p>Acquires the lock if it is not held by another thread and returns
  352. * immediately with the value {@code true}, setting the lock hold count
  353. * to one. If this lock has been set to use a fair ordering policy then
  354. * an available lock <em>will not</em> be acquired if any other threads
  355. * are waiting for the lock. This is in contrast to the {@link #tryLock()}
  356. * method. If you want a timed {@code tryLock} that does permit barging on
  357. * a fair lock then combine the timed and un-timed forms together:
  358. *
  359. * <pre> {@code
  360. * if (lock.tryLock() ||
  361. * lock.tryLock(timeout, unit)) {
  362. * ...
  363. * }}</pre>
  364. *
  365. * <p>If the current thread
  366. * already holds this lock then the hold count is incremented by one and
  367. * the method returns {@code true}.
  368. *
  369. * <p>If the lock is held by another thread then the
  370. * current thread becomes disabled for thread scheduling
  371. * purposes and lies dormant until one of three things happens:
  372. *
  373. * <ul>
  374. *
  375. * <li>The lock is acquired by the current thread; or
  376. *
  377. * <li>Some other thread {@linkplain Thread#interrupt interrupts}
  378. * the current thread; or
  379. *
  380. * <li>The specified waiting time elapses
  381. *
  382. * </ul>
  383. *
  384. * <p>If the lock is acquired then the value {@code true} is returned and
  385. * the lock hold count is set to one.
  386. *
  387. * <p>If the current thread:
  388. *
  389. * <ul>
  390. *
  391. * <li>has its interrupted status set on entry to this method; or
  392. *
  393. * <li>is {@linkplain Thread#interrupt interrupted} while
  394. * acquiring the lock,
  395. *
  396. * </ul>
  397. * then {@link InterruptedException} is thrown and the current thread's
  398. * interrupted status is cleared.
  399. *
  400. * <p>If the specified waiting time elapses then the value {@code false}
  401. * is returned. If the time is less than or equal to zero, the method
  402. * will not wait at all.
  403. *
  404. * <p>In this implementation, as this method is an explicit
  405. * interruption point, preference is given to responding to the
  406. * interrupt over normal or reentrant acquisition of the lock, and
  407. * over reporting the elapse of the waiting time.
  408. *
  409. * @param timeout the time to wait for the lock
  410. * @param unit the time unit of the timeout argument
  411. * @return {@code true} if the lock was free and was acquired by the
  412. * current thread, or the lock was already held by the current
  413. * thread; and {@code false} if the waiting time elapsed before
  414. * the lock could be acquired
  415. * @throws InterruptedException if the current thread is interrupted
  416. * @throws NullPointerException if the time unit is null
  417. */
  418. public boolean tryLock(long timeout, TimeUnit unit)
  419. throws InterruptedException {
  420. return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  421. }
  422. /**
  423. * Attempts to release this lock.
  424. *
  425. * <p>If the current thread is the holder of this lock then the hold
  426. * count is decremented. If the hold count is now zero then the lock
  427. * is released. If the current thread is not the holder of this
  428. * lock then {@link IllegalMonitorStateException} is thrown.
  429. *
  430. * @throws IllegalMonitorStateException if the current thread does not
  431. * hold this lock
  432. */
  433. public void unlock() {
  434. sync.release(1);
  435. }
  436. /**
  437. * Returns a {@link Condition} instance for use with this
  438. * {@link Lock} instance.
  439. *
  440. * <p>The returned {@link Condition} instance supports the same
  441. * usages as do the {@link Object} monitor methods ({@link
  442. * Object#wait() wait}, {@link Object#notify notify}, and {@link
  443. * Object#notifyAll notifyAll}) when used with the built-in
  444. * monitor lock.
  445. *
  446. * <ul>
  447. *
  448. * <li>If this lock is not held when any of the {@link Condition}
  449. * {@linkplain Condition#await() waiting} or {@linkplain
  450. * Condition#signal signalling} methods are called, then an {@link
  451. * IllegalMonitorStateException} is thrown.
  452. *
  453. * <li>When the condition {@linkplain Condition#await() waiting}
  454. * methods are called the lock is released and, before they
  455. * return, the lock is reacquired and the lock hold count restored
  456. * to what it was when the method was called.
  457. *
  458. * <li>If a thread is {@linkplain Thread#interrupt interrupted}
  459. * while waiting then the wait will terminate, an {@link
  460. * InterruptedException} will be thrown, and the thread's
  461. * interrupted status will be cleared.
  462. *
  463. * <li> Waiting threads are signalled in FIFO order.
  464. *
  465. * <li>The ordering of lock reacquisition for threads returning
  466. * from waiting methods is the same as for threads initially
  467. * acquiring the lock, which is in the default case not specified,
  468. * but for <em>fair</em> locks favors those threads that have been
  469. * waiting the longest.
  470. *
  471. * </ul>
  472. *
  473. * @return the Condition object
  474. */
  475. public Condition newCondition() {
  476. return sync.newCondition();
  477. }
  478. /**
  479. * Queries the number of holds on this lock by the current thread.
  480. *
  481. * <p>A thread has a hold on a lock for each lock action that is not
  482. * matched by an unlock action.
  483. *
  484. * <p>The hold count information is typically only used for testing and
  485. * debugging purposes. For example, if a certain section of code should
  486. * not be entered with the lock already held then we can assert that
  487. * fact:
  488. *
  489. * <pre> {@code
  490. * class X {
  491. * ReentrantLock lock = new ReentrantLock();
  492. * // ...
  493. * public void m() {
  494. * assert lock.getHoldCount() == 0;
  495. * lock.lock();
  496. * try {
  497. * // ... method body
  498. * } finally {
  499. * lock.unlock();
  500. * }
  501. * }
  502. * }}</pre>
  503. *
  504. * @return the number of holds on this lock by the current thread,
  505. * or zero if this lock is not held by the current thread
  506. */
  507. public int getHoldCount() {
  508. return sync.getHoldCount();
  509. }
  510. /**
  511. * Queries if this lock is held by the current thread.
  512. *
  513. * <p>Analogous to the {@link Thread#holdsLock(Object)} method for
  514. * built-in monitor locks, this method is typically used for
  515. * debugging and testing. For example, a method that should only be
  516. * called while a lock is held can assert that this is the case:
  517. *
  518. * <pre> {@code
  519. * class X {
  520. * ReentrantLock lock = new ReentrantLock();
  521. * // ...
  522. *
  523. * public void m() {
  524. * assert lock.isHeldByCurrentThread();
  525. * // ... method body
  526. * }
  527. * }}</pre>
  528. *
  529. * <p>It can also be used to ensure that a reentrant lock is used
  530. * in a non-reentrant manner, for example:
  531. *
  532. * <pre> {@code
  533. * class X {
  534. * ReentrantLock lock = new ReentrantLock();
  535. * // ...
  536. *
  537. * public void m() {
  538. * assert !lock.isHeldByCurrentThread();
  539. * lock.lock();
  540. * try {
  541. * // ... method body
  542. * } finally {
  543. * lock.unlock();
  544. * }
  545. * }
  546. * }}</pre>
  547. *
  548. * @return {@code true} if current thread holds this lock and
  549. * {@code false} otherwise
  550. */
  551. public boolean isHeldByCurrentThread() {
  552. return sync.isHeldExclusively();
  553. }
  554. /**
  555. * Queries if this lock is held by any thread. This method is
  556. * designed for use in monitoring of the system state,
  557. * not for synchronization control.
  558. *
  559. * @return {@code true} if any thread holds this lock and
  560. * {@code false} otherwise
  561. */
  562. public boolean isLocked() {
  563. return sync.isLocked();
  564. }
  565. /**
  566. * Returns {@code true} if this lock has fairness set true.
  567. *
  568. * @return {@code true} if this lock has fairness set true
  569. */
  570. public final boolean isFair() {
  571. return sync instanceof FairSync;
  572. }
  573. /**
  574. * Returns the thread that currently owns this lock, or
  575. * {@code null} if not owned. When this method is called by a
  576. * thread that is not the owner, the return value reflects a
  577. * best-effort approximation of current lock status. For example,
  578. * the owner may be momentarily {@code null} even if there are
  579. * threads trying to acquire the lock but have not yet done so.
  580. * This method is designed to facilitate construction of
  581. * subclasses that provide more extensive lock monitoring
  582. * facilities.
  583. *
  584. * @return the owner, or {@code null} if not owned
  585. */
  586. protected Thread getOwner() {
  587. return sync.getOwner();
  588. }
  589. /**
  590. * Queries whether any threads are waiting to acquire this lock. Note that
  591. * because cancellations may occur at any time, a {@code true}
  592. * return does not guarantee that any other thread will ever
  593. * acquire this lock. This method is designed primarily for use in
  594. * monitoring of the system state.
  595. *
  596. * @return {@code true} if there may be other threads waiting to
  597. * acquire the lock
  598. */
  599. public final boolean hasQueuedThreads() {
  600. return sync.hasQueuedThreads();
  601. }
  602. /**
  603. * Queries whether the given thread is waiting to acquire this
  604. * lock. Note that because cancellations may occur at any time, a
  605. * {@code true} return does not guarantee that this thread
  606. * will ever acquire this lock. This method is designed primarily for use
  607. * in monitoring of the system state.
  608. *
  609. * @param thread the thread
  610. * @return {@code true} if the given thread is queued waiting for this lock
  611. * @throws NullPointerException if the thread is null
  612. */
  613. public final boolean hasQueuedThread(Thread thread) {
  614. return sync.isQueued(thread);
  615. }
  616. /**
  617. * Returns an estimate of the number of threads waiting to
  618. * acquire this lock. The value is only an estimate because the number of
  619. * threads may change dynamically while this method traverses
  620. * internal data structures. This method is designed for use in
  621. * monitoring of the system state, not for synchronization
  622. * control.
  623. *
  624. * @return the estimated number of threads waiting for this lock
  625. */
  626. public final int getQueueLength() {
  627. return sync.getQueueLength();
  628. }
  629. /**
  630. * Returns a collection containing threads that may be waiting to
  631. * acquire this lock. Because the actual set of threads may change
  632. * dynamically while constructing this result, the returned
  633. * collection is only a best-effort estimate. The elements of the
  634. * returned collection are in no particular order. This method is
  635. * designed to facilitate construction of subclasses that provide
  636. * more extensive monitoring facilities.
  637. *
  638. * @return the collection of threads
  639. */
  640. protected Collection<Thread> getQueuedThreads() {
  641. return sync.getQueuedThreads();
  642. }
  643. /**
  644. * Queries whether any threads are waiting on the given condition
  645. * associated with this lock. Note that because timeouts and
  646. * interrupts may occur at any time, a {@code true} return does
  647. * not guarantee that a future {@code signal} will awaken any
  648. * threads. This method is designed primarily for use in
  649. * monitoring of the system state.
  650. *
  651. * @param condition the condition
  652. * @return {@code true} if there are any waiting threads
  653. * @throws IllegalMonitorStateException if this lock is not held
  654. * @throws IllegalArgumentException if the given condition is
  655. * not associated with this lock
  656. * @throws NullPointerException if the condition is null
  657. */
  658. public boolean hasWaiters(Condition condition) {
  659. if (condition == null)
  660. throw new NullPointerException();
  661. if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
  662. throw new IllegalArgumentException("not owner");
  663. return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
  664. }
  665. /**
  666. * Returns an estimate of the number of threads waiting on the
  667. * given condition associated with this lock. Note that because
  668. * timeouts and interrupts may occur at any time, the estimate
  669. * serves only as an upper bound on the actual number of waiters.
  670. * This method is designed for use in monitoring of the system
  671. * state, not for synchronization control.
  672. *
  673. * @param condition the condition
  674. * @return the estimated number of waiting threads
  675. * @throws IllegalMonitorStateException if this lock is not held
  676. * @throws IllegalArgumentException if the given condition is
  677. * not associated with this lock
  678. * @throws NullPointerException if the condition is null
  679. */
  680. public int getWaitQueueLength(Condition condition) {
  681. if (condition == null)
  682. throw new NullPointerException();
  683. if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
  684. throw new IllegalArgumentException("not owner");
  685. return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
  686. }
  687. /**
  688. * Returns a collection containing those threads that may be
  689. * waiting on the given condition associated with this lock.
  690. * Because the actual set of threads may change dynamically while
  691. * constructing this result, the returned collection is only a
  692. * best-effort estimate. The elements of the returned collection
  693. * are in no particular order. This method is designed to
  694. * facilitate construction of subclasses that provide more
  695. * extensive condition monitoring facilities.
  696. *
  697. * @param condition the condition
  698. * @return the collection of threads
  699. * @throws IllegalMonitorStateException if this lock is not held
  700. * @throws IllegalArgumentException if the given condition is
  701. * not associated with this lock
  702. * @throws NullPointerException if the condition is null
  703. */
  704. protected Collection<Thread> getWaitingThreads(Condition condition) {
  705. if (condition == null)
  706. throw new NullPointerException();
  707. if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
  708. throw new IllegalArgumentException("not owner");
  709. return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
  710. }
  711. /**
  712. * Returns a string identifying this lock, as well as its lock state.
  713. * The state, in brackets, includes either the String {@code "Unlocked"}
  714. * or the String {@code "Locked by"} followed by the
  715. * {@linkplain Thread#getName name} of the owning thread.
  716. *
  717. * @return a string identifying this lock, as well as its lock state
  718. */
  719. public String toString() {
  720. Thread o = sync.getOwner();
  721. return super.toString() + ((o == null) ?
  722. "[Unlocked]" :
  723. "[Locked by thread " + o.getName() + "]");
  724. }
  725. }
  726. AbstractQueuedSynchronizer 源码 java.util.concurrent.locks
  727. /*
  728. * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  729. *
  730. *
  731. *
  732. *
  733. *
  734. *
  735. *
  736. *
  737. *
  738. *
  739. *
  740. *
  741. *
  742. *
  743. *
  744. *
  745. *
  746. *
  747. *
  748. *
  749. */
  750. /*
  751. *
  752. *
  753. *
  754. *
  755. *
  756. * Written by Doug Lea with assistance from members of JCP JSR-166
  757. * Expert Group and released to the public domain, as explained at
  758. * http://creativecommons.org/publicdomain/zero/1.0/
  759. */
  760. package java.util.concurrent.locks;
  761. import java.util.concurrent.TimeUnit;
  762. import java.util.ArrayList;
  763. import java.util.Collection;
  764. import java.util.Date;
  765. import sun.misc.Unsafe;
  766. /**
  767. * Provides a framework for implementing blocking locks and related
  768. * synchronizers (semaphores, events, etc) that rely on
  769. * first-in-first-out (FIFO) wait queues. This class is designed to
  770. * be a useful basis for most kinds of synchronizers that rely on a
  771. * single atomic {@code int} value to represent state. Subclasses
  772. * must define the protected methods that change this state, and which
  773. * define what that state means in terms of this object being acquired
  774. * or released. Given these, the other methods in this class carry
  775. * out all queuing and blocking mechanics. Subclasses can maintain
  776. * other state fields, but only the atomically updated {@code int}
  777. * value manipulated using methods {@link #getState}, {@link
  778. * #setState} and {@link #compareAndSetState} is tracked with respect
  779. * to synchronization.
  780. *
  781. * <p>Subclasses should be defined as non-public internal helper
  782. * classes that are used to implement the synchronization properties
  783. * of their enclosing class. Class
  784. * {@code AbstractQueuedSynchronizer} does not implement any
  785. * synchronization interface. Instead it defines methods such as
  786. * {@link #acquireInterruptibly} that can be invoked as
  787. * appropriate by concrete locks and related synchronizers to
  788. * implement their public methods.
  789. *
  790. * <p>This class supports either or both a default <em>exclusive</em>
  791. * mode and a <em>shared</em> mode. When acquired in exclusive mode,
  792. * attempted acquires by other threads cannot succeed. Shared mode
  793. * acquires by multiple threads may (but need not) succeed. This class
  794. * does not "understand" these differences except in the
  795. * mechanical sense that when a shared mode acquire succeeds, the next
  796. * waiting thread (if one exists) must also determine whether it can
  797. * acquire as well. Threads waiting in the different modes share the
  798. * same FIFO queue. Usually, implementation subclasses support only
  799. * one of these modes, but both can come into play for example in a
  800. * {@link ReadWriteLock}. Subclasses that support only exclusive or
  801. * only shared modes need not define the methods supporting the unused mode.
  802. *
  803. * <p>This class defines a nested {@link ConditionObject} class that
  804. * can be used as a {@link Condition} implementation by subclasses
  805. * supporting exclusive mode for which method {@link
  806. * #isHeldExclusively} reports whether synchronization is exclusively
  807. * held with respect to the current thread, method {@link #release}
  808. * invoked with the current {@link #getState} value fully releases
  809. * this object, and {@link #acquire}, given this saved state value,
  810. * eventually restores this object to its previous acquired state. No
  811. * {@code AbstractQueuedSynchronizer} method otherwise creates such a
  812. * condition, so if this constraint cannot be met, do not use it. The
  813. * behavior of {@link ConditionObject} depends of course on the
  814. * semantics of its synchronizer implementation.
  815. *
  816. * <p>This class provides inspection, instrumentation, and monitoring
  817. * methods for the internal queue, as well as similar methods for
  818. * condition objects. These can be exported as desired into classes
  819. * using an {@code AbstractQueuedSynchronizer} for their
  820. * synchronization mechanics.
  821. *
  822. * <p>Serialization of this class stores only the underlying atomic
  823. * integer maintaining state, so deserialized objects have empty
  824. * thread queues. Typical subclasses requiring serializability will
  825. * define a {@code readObject} method that restores this to a known
  826. * initial state upon deserialization.
  827. *
  828. * <h3>Usage</h3>
  829. *
  830. * <p>To use this class as the basis of a synchronizer, redefine the
  831. * following methods, as applicable, by inspecting and/or modifying
  832. * the synchronization state using {@link #getState}, {@link
  833. * #setState} and/or {@link #compareAndSetState}:
  834. *
  835. * <ul>
  836. * <li> {@link #tryAcquire}
  837. * <li> {@link #tryRelease}
  838. * <li> {@link #tryAcquireShared}
  839. * <li> {@link #tryReleaseShared}
  840. * <li> {@link #isHeldExclusively}
  841. * </ul>
  842. *
  843. * Each of these methods by default throws {@link
  844. * UnsupportedOperationException}. Implementations of these methods
  845. * must be internally thread-safe, and should in general be short and
  846. * not block. Defining these methods is the <em>only</em> supported
  847. * means of using this class. All other methods are declared
  848. * {@code final} because they cannot be independently varied.
  849. *
  850. * <p>You may also find the inherited methods from {@link
  851. * AbstractOwnableSynchronizer} useful to keep track of the thread
  852. * owning an exclusive synchronizer. You are encouraged to use them
  853. * -- this enables monitoring and diagnostic tools to assist users in
  854. * determining which threads hold locks.
  855. *
  856. * <p>Even though this class is based on an internal FIFO queue, it
  857. * does not automatically enforce FIFO acquisition policies. The core
  858. * of exclusive synchronization takes the form:
  859. *
  860. * <pre>
  861. * Acquire:
  862. * while (!tryAcquire(arg)) {
  863. * <em>enqueue thread if it is not already queued</em>;
  864. * <em>possibly block current thread</em>;
  865. * }
  866. *
  867. * Release:
  868. * if (tryRelease(arg))
  869. * <em>unblock the first queued thread</em>;
  870. * </pre>
  871. *
  872. * (Shared mode is similar but may involve cascading signals.)
  873. *
  874. * <p id="barging">Because checks in acquire are invoked before
  875. * enqueuing, a newly acquiring thread may <em>barge</em> ahead of
  876. * others that are blocked and queued. However, you can, if desired,
  877. * define {@code tryAcquire} and/or {@code tryAcquireShared} to
  878. * disable barging by internally invoking one or more of the inspection
  879. * methods, thereby providing a <em>fair</em> FIFO acquisition order.
  880. * In particular, most fair synchronizers can define {@code tryAcquire}
  881. * to return {@code false} if {@link #hasQueuedPredecessors} (a method
  882. * specifically designed to be used by fair synchronizers) returns
  883. * {@code true}. Other variations are possible.
  884. *
  885. * <p>Throughput and scalability are generally highest for the
  886. * default barging (also known as <em>greedy</em>,
  887. * <em>renouncement</em>, and <em>convoy-avoidance</em>) strategy.
  888. * While this is not guaranteed to be fair or starvation-free, earlier
  889. * queued threads are allowed to recontend before later queued
  890. * threads, and each recontention has an unbiased chance to succeed
  891. * against incoming threads. Also, while acquires do not
  892. * "spin" in the usual sense, they may perform multiple
  893. * invocations of {@code tryAcquire} interspersed with other
  894. * computations before blocking. This gives most of the benefits of
  895. * spins when exclusive synchronization is only briefly held, without
  896. * most of the liabilities when it isn't. If so desired, you can
  897. * augment this by preceding calls to acquire methods with
  898. * "fast-path" checks, possibly prechecking {@link #hasContended}
  899. * and/or {@link #hasQueuedThreads} to only do so if the synchronizer
  900. * is likely not to be contended.
  901. *
  902. * <p>This class provides an efficient and scalable basis for
  903. * synchronization in part by specializing its range of use to
  904. * synchronizers that can rely on {@code int} state, acquire, and
  905. * release parameters, and an internal FIFO wait queue. When this does
  906. * not suffice, you can build synchronizers from a lower level using
  907. * {@link java.util.concurrent.atomic atomic} classes, your own custom
  908. * {@link java.util.Queue} classes, and {@link LockSupport} blocking
  909. * support.
  910. *
  911. * <h3>Usage Examples</h3>
  912. *
  913. * <p>Here is a non-reentrant mutual exclusion lock class that uses
  914. * the value zero to represent the unlocked state, and one to
  915. * represent the locked state. While a non-reentrant lock
  916. * does not strictly require recording of the current owner
  917. * thread, this class does so anyway to make usage easier to monitor.
  918. * It also supports conditions and exposes
  919. * one of the instrumentation methods:
  920. *
  921. * <pre> {@code
  922. * class Mutex implements Lock, java.io.Serializable {
  923. *
  924. * // Our internal helper class
  925. * private static class Sync extends AbstractQueuedSynchronizer {
  926. * // Reports whether in locked state
  927. * protected boolean isHeldExclusively() {
  928. * return getState() == 1;
  929. * }
  930. *
  931. * // Acquires the lock if state is zero
  932. * public boolean tryAcquire(int acquires) {
  933. * assert acquires == 1; // Otherwise unused
  934. * if (compareAndSetState(0, 1)) {
  935. * setExclusiveOwnerThread(Thread.currentThread());
  936. * return true;
  937. * }
  938. * return false;
  939. * }
  940. *
  941. * // Releases the lock by setting state to zero
  942. * protected boolean tryRelease(int releases) {
  943. * assert releases == 1; // Otherwise unused
  944. * if (getState() == 0) throw new IllegalMonitorStateException();
  945. * setExclusiveOwnerThread(null);
  946. * setState(0);
  947. * return true;
  948. * }
  949. *
  950. * // Provides a Condition
  951. * Condition newCondition() { return new ConditionObject(); }
  952. *
  953. * // Deserializes properly
  954. * private void readObject(ObjectInputStream s)
  955. * throws IOException, ClassNotFoundException {
  956. * s.defaultReadObject();
  957. * setState(0); // reset to unlocked state
  958. * }
  959. * }
  960. *
  961. * // The sync object does all the hard work. We just forward to it.
  962. * private final Sync sync = new Sync();
  963. *
  964. * public void lock() { sync.acquire(1); }
  965. * public boolean tryLock() { return sync.tryAcquire(1); }
  966. * public void unlock() { sync.release(1); }
  967. * public Condition newCondition() { return sync.newCondition(); }
  968. * public boolean isLocked() { return sync.isHeldExclusively(); }
  969. * public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
  970. * public void lockInterruptibly() throws InterruptedException {
  971. * sync.acquireInterruptibly(1);
  972. * }
  973. * public boolean tryLock(long timeout, TimeUnit unit)
  974. * throws InterruptedException {
  975. * return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  976. * }
  977. * }}</pre>
  978. *
  979. * <p>Here is a latch class that is like a
  980. * {@link java.util.concurrent.CountDownLatch CountDownLatch}
  981. * except that it only requires a single {@code signal} to
  982. * fire. Because a latch is non-exclusive, it uses the {@code shared}
  983. * acquire and release methods.
  984. *
  985. * <pre> {@code
  986. * class BooleanLatch {
  987. *
  988. * private static class Sync extends AbstractQueuedSynchronizer {
  989. * boolean isSignalled() { return getState() != 0; }
  990. *
  991. * protected int tryAcquireShared(int ignore) {
  992. * return isSignalled() ? 1 : -1;
  993. * }
  994. *
  995. * protected boolean tryReleaseShared(int ignore) {
  996. * setState(1);
  997. * return true;
  998. * }
  999. * }
  1000. *
  1001. * private final Sync sync = new Sync();
  1002. * public boolean isSignalled() { return sync.isSignalled(); }
  1003. * public void signal() { sync.releaseShared(1); }
  1004. * public void await() throws InterruptedException {
  1005. * sync.acquireSharedInterruptibly(1);
  1006. * }
  1007. * }}</pre>
  1008. *
  1009. * @since 1.5
  1010. * @author Doug Lea
  1011. */
  1012. public abstract class AbstractQueuedSynchronizer
  1013. extends AbstractOwnableSynchronizer
  1014. implements java.io.Serializable {
  1015. private static final long serialVersionUID = 7373984972572414691L;
  1016. /**
  1017. * Creates a new {@code AbstractQueuedSynchronizer} instance
  1018. * with initial synchronization state of zero.
  1019. */
  1020. protected AbstractQueuedSynchronizer() { }
  1021. /**
  1022. * Wait queue node class.
  1023. *
  1024. * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
  1025. * Hagersten) lock queue. CLH locks are normally used for
  1026. * spinlocks. We instead use them for blocking synchronizers, but
  1027. * use the same basic tactic of holding some of the control
  1028. * information about a thread in the predecessor of its node. A
  1029. * "status" field in each node keeps track of whether a thread
  1030. * should block. A node is signalled when its predecessor
  1031. * releases. Each node of the queue otherwise serves as a
  1032. * specific-notification-style monitor holding a single waiting
  1033. * thread. The status field does NOT control whether threads are
  1034. * granted locks etc though. A thread may try to acquire if it is
  1035. * first in the queue. But being first does not guarantee success;
  1036. * it only gives the right to contend. So the currently released
  1037. * contender thread may need to rewait.
  1038. *
  1039. * <p>To enqueue into a CLH lock, you atomically splice it in as new
  1040. * tail. To dequeue, you just set the head field.
  1041. * <pre>
  1042. * +------+ prev +-----+ +-----+
  1043. * head | | <---- | | <---- | | tail
  1044. * +------+ +-----+ +-----+
  1045. * </pre>
  1046. *
  1047. * <p>Insertion into a CLH queue requires only a single atomic
  1048. * operation on "tail", so there is a simple atomic point of
  1049. * demarcation from unqueued to queued. Similarly, dequeuing
  1050. * involves only updating the "head". However, it takes a bit
  1051. * more work for nodes to determine who their successors are,
  1052. * in part to deal with possible cancellation due to timeouts
  1053. * and interrupts.
  1054. *
  1055. * <p>The "prev" links (not used in original CLH locks), are mainly
  1056. * needed to handle cancellation. If a node is cancelled, its
  1057. * successor is (normally) relinked to a non-cancelled
  1058. * predecessor. For explanation of similar mechanics in the case
  1059. * of spin locks, see the papers by Scott and Scherer at
  1060. * http://www.cs.rochester.edu/u/scott/synchronization/
  1061. *
  1062. * <p>We also use "next" links to implement blocking mechanics.
  1063. * The thread id for each node is kept in its own node, so a
  1064. * predecessor signals the next node to wake up by traversing
  1065. * next link to determine which thread it is. Determination of
  1066. * successor must avoid races with newly queued nodes to set
  1067. * the "next" fields of their predecessors. This is solved
  1068. * when necessary by checking backwards from the atomically
  1069. * updated "tail" when a node's successor appears to be null.
  1070. * (Or, said differently, the next-links are an optimization
  1071. * so that we don't usually need a backward scan.)
  1072. *
  1073. * <p>Cancellation introduces some conservatism to the basic
  1074. * algorithms. Since we must poll for cancellation of other
  1075. * nodes, we can miss noticing whether a cancelled node is
  1076. * ahead or behind us. This is dealt with by always unparking
  1077. * successors upon cancellation, allowing them to stabilize on
  1078. * a new predecessor, unless we can identify an uncancelled
  1079. * predecessor who will carry this responsibility.
  1080. *
  1081. * <p>CLH queues need a dummy header node to get started. But
  1082. * we don't create them on construction, because it would be wasted
  1083. * effort if there is never contention. Instead, the node
  1084. * is constructed and head and tail pointers are set upon first
  1085. * contention.
  1086. *
  1087. * <p>Threads waiting on Conditions use the same nodes, but
  1088. * use an additional link. Conditions only need to link nodes
  1089. * in simple (non-concurrent) linked queues because they are
  1090. * only accessed when exclusively held. Upon await, a node is
  1091. * inserted into a condition queue. Upon signal, the node is
  1092. * transferred to the main queue. A special value of status
  1093. * field is used to mark which queue a node is on.
  1094. *
  1095. * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
  1096. * Scherer and Michael Scott, along with members of JSR-166
  1097. * expert group, for helpful ideas, discussions, and critiques
  1098. * on the design of this class.
  1099. */
  1100. static final class Node {
  1101. /** Marker to indicate a node is waiting in shared mode */
  1102. static final Node SHARED = new Node();
  1103. /** Marker to indicate a node is waiting in exclusive mode */
  1104. static final Node EXCLUSIVE = null;
  1105. /** waitStatus value to indicate thread has cancelled */
  1106. static final int CANCELLED = 1;
  1107. /** waitStatus value to indicate successor's thread needs unparking */
  1108. static final int SIGNAL = -1;
  1109. /** waitStatus value to indicate thread is waiting on condition */
  1110. static final int CONDITION = -2;
  1111. /**
  1112. * waitStatus value to indicate the next acquireShared should
  1113. * unconditionally propagate
  1114. */
  1115. static final int PROPAGATE = -3;
  1116. /**
  1117. * Status field, taking on only the values:
  1118. * SIGNAL: The successor of this node is (or will soon be)
  1119. * blocked (via park), so the current node must
  1120. * unpark its successor when it releases or
  1121. * cancels. To avoid races, acquire methods must
  1122. * first indicate they need a signal,
  1123. * then retry the atomic acquire, and then,
  1124. * on failure, block.
  1125. * CANCELLED: This node is cancelled due to timeout or interrupt.
  1126. * Nodes never leave this state. In particular,
  1127. * a thread with cancelled node never again blocks.
  1128. * CONDITION: This node is currently on a condition queue.
  1129. * It will not be used as a sync queue node
  1130. * until transferred, at which time the status
  1131. * will be set to 0. (Use of this value here has
  1132. * nothing to do with the other uses of the
  1133. * field, but simplifies mechanics.)
  1134. * PROPAGATE: A releaseShared should be propagated to other
  1135. * nodes. This is set (for head node only) in
  1136. * doReleaseShared to ensure propagation
  1137. * continues, even if other operations have
  1138. * since intervened.
  1139. * 0: None of the above
  1140. *
  1141. * The values are arranged numerically to simplify use.
  1142. * Non-negative values mean that a node doesn't need to
  1143. * signal. So, most code doesn't need to check for particular
  1144. * values, just for sign.
  1145. *
  1146. * The field is initialized to 0 for normal sync nodes, and
  1147. * CONDITION for condition nodes. It is modified using CAS
  1148. * (or when possible, unconditional volatile writes).
  1149. */
  1150. volatile int waitStatus;
  1151. /**
  1152. * Link to predecessor node that current node/thread relies on
  1153. * for checking waitStatus. Assigned during enqueuing, and nulled
  1154. * out (for sake of GC) only upon dequeuing. Also, upon
  1155. * cancellation of a predecessor, we short-circuit while
  1156. * finding a non-cancelled one, which will always exist
  1157. * because the head node is never cancelled: A node becomes
  1158. * head only as a result of successful acquire. A
  1159. * cancelled thread never succeeds in acquiring, and a thread only
  1160. * cancels itself, not any other node.
  1161. */
  1162. volatile Node prev;
  1163. /**
  1164. * Link to the successor node that the current node/thread
  1165. * unparks upon release. Assigned during enqueuing, adjusted
  1166. * when bypassing cancelled predecessors, and nulled out (for
  1167. * sake of GC) when dequeued. The enq operation does not
  1168. * assign next field of a predecessor until after attachment,
  1169. * so seeing a null next field does not necessarily mean that
  1170. * node is at end of queue. However, if a next field appears
  1171. * to be null, we can scan prev's from the tail to
  1172. * double-check. The next field of cancelled nodes is set to
  1173. * point to the node itself instead of null, to make life
  1174. * easier for isOnSyncQueue.
  1175. */
  1176. volatile Node next;
  1177. /**
  1178. * The thread that enqueued this node. Initialized on
  1179. * construction and nulled out after use.
  1180. */
  1181. volatile Thread thread;
  1182. /**
  1183. * Link to next node waiting on condition, or the special
  1184. * value SHARED. Because condition queues are accessed only
  1185. * when holding in exclusive mode, we just need a simple
  1186. * linked queue to hold nodes while they are waiting on
  1187. * conditions. They are then transferred to the queue to
  1188. * re-acquire. And because conditions can only be exclusive,
  1189. * we save a field by using special value to indicate shared
  1190. * mode.
  1191. */
  1192. Node nextWaiter;
  1193. /**
  1194. * Returns true if node is waiting in shared mode.
  1195. */
  1196. final boolean isShared() {
  1197. return nextWaiter == SHARED;
  1198. }
  1199. /**
  1200. * Returns previous node, or throws NullPointerException if null.
  1201. * Use when predecessor cannot be null. The null check could
  1202. * be elided, but is present to help the VM.
  1203. *
  1204. * @return the predecessor of this node
  1205. */
  1206. final Node predecessor() throws NullPointerException {
  1207. Node p = prev;
  1208. if (p == null)
  1209. throw new NullPointerException();
  1210. else
  1211. return p;
  1212. }
  1213. Node() { // Used to establish initial head or SHARED marker
  1214. }
  1215. Node(Thread thread, Node mode) { // Used by addWaiter
  1216. this.nextWaiter = mode;
  1217. this.thread = thread;
  1218. }
  1219. Node(Thread thread, int waitStatus) { // Used by Condition
  1220. this.waitStatus = waitStatus;
  1221. this.thread = thread;
  1222. }
  1223. }
  1224. /**
  1225. * Head of the wait queue, lazily initialized. Except for
  1226. * initialization, it is modified only via method setHead. Note:
  1227. * If head exists, its waitStatus is guaranteed not to be
  1228. * CANCELLED.
  1229. */
  1230. private transient volatile Node head;
  1231. /**
  1232. * Tail of the wait queue, lazily initialized. Modified only via
  1233. * method enq to add new wait node.
  1234. */
  1235. private transient volatile Node tail;
  1236. /**
  1237. * The synchronization state.
  1238. */
  1239. private volatile int state;
  1240. /**
  1241. * Returns the current value of synchronization state.
  1242. * This operation has memory semantics of a {@code volatile} read.
  1243. * @return current state value
  1244. */
  1245. protected final int getState() {
  1246. return state;
  1247. }
  1248. /**
  1249. * Sets the value of synchronization state.
  1250. * This operation has memory semantics of a {@code volatile} write.
  1251. * @param newState the new state value
  1252. */
  1253. protected final void setState(int newState) {
  1254. state = newState;
  1255. }
  1256. /**
  1257. * Atomically sets synchronization state to the given updated
  1258. * value if the current state value equals the expected value.
  1259. * This operation has memory semantics of a {@code volatile} read
  1260. * and write.
  1261. *
  1262. * @param expect the expected value
  1263. * @param update the new value
  1264. * @return {@code true} if successful. False return indicates that the actual
  1265. * value was not equal to the expected value.
  1266. */
  1267. protected final boolean compareAndSetState(int expect, int update) {
  1268. // See below for intrinsics setup to support this
  1269. return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  1270. }
  1271. // Queuing utilities
  1272. /**
  1273. * The number of nanoseconds for which it is faster to spin
  1274. * rather than to use timed park. A rough estimate suffices
  1275. * to improve responsiveness with very short timeouts.
  1276. */
  1277. static final long spinForTimeoutThreshold = 1000L;
  1278. /**
  1279. * Inserts node into queue, initializing if necessary. See picture above.
  1280. * @param node the node to insert
  1281. * @return node's predecessor
  1282. */
  1283. private Node enq(final Node node) {
  1284. for (;;) {
  1285. Node t = tail;
  1286. if (t == null) { // Must initialize
  1287. if (compareAndSetHead(new Node()))
  1288. tail = head;
  1289. } else {
  1290. node.prev = t;
  1291. if (compareAndSetTail(t, node)) {
  1292. t.next = node;
  1293. return t;
  1294. }
  1295. }
  1296. }
  1297. }
  1298. /**
  1299. * Creates and enqueues node for current thread and given mode.
  1300. *
  1301. * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
  1302. * @return the new node
  1303. */
  1304. private Node addWaiter(Node mode) {
  1305. Node node = new Node(Thread.currentThread(), mode);
  1306. // Try the fast path of enq; backup to full enq on failure
  1307. Node pred = tail;
  1308. if (pred != null) {
  1309. node.prev = pred;
  1310. if (compareAndSetTail(pred, node)) {
  1311. pred.next = node;
  1312. return node;
  1313. }
  1314. }
  1315. enq(node);
  1316. return node;
  1317. }
  1318. /**
  1319. * Sets head of queue to be node, thus dequeuing. Called only by
  1320. * acquire methods. Also nulls out unused fields for sake of GC
  1321. * and to suppress unnecessary signals and traversals.
  1322. *
  1323. * @param node the node
  1324. */
  1325. private void setHead(Node node) {
  1326. head = node;
  1327. node.thread = null;
  1328. node.prev = null;
  1329. }
  1330. /**
  1331. * Wakes up node's successor, if one exists.
  1332. *
  1333. * @param node the node
  1334. */
  1335. private void unparkSuccessor(Node node) {
  1336. /*
  1337. * If status is negative (i.e., possibly needing signal) try
  1338. * to clear in anticipation of signalling. It is OK if this
  1339. * fails or if status is changed by waiting thread.
  1340. */
  1341. int ws = node.waitStatus;
  1342. if (ws < 0)
  1343. compareAndSetWaitStatus(node, ws, 0);
  1344. /*
  1345. * Thread to unpark is held in successor, which is normally
  1346. * just the next node. But if cancelled or apparently null,
  1347. * traverse backwards from tail to find the actual
  1348. * non-cancelled successor.
  1349. */
  1350. Node s = node.next;
  1351. if (s == null || s.waitStatus > 0) {
  1352. s = null;
  1353. for (Node t = tail; t != null && t != node; t = t.prev)
  1354. if (t.waitStatus <= 0)
  1355. s = t;
  1356. }
  1357. if (s != null)
  1358. LockSupport.unpark(s.thread);
  1359. }
  1360. /**
  1361. * Release action for shared mode -- signals successor and ensures
  1362. * propagation. (Note: For exclusive mode, release just amounts
  1363. * to calling unparkSuccessor of head if it needs signal.)
  1364. */
  1365. private void doReleaseShared() {
  1366. /*
  1367. * Ensure that a release propagates, even if there are other
  1368. * in-progress acquires/releases. This proceeds in the usual
  1369. * way of trying to unparkSuccessor of head if it needs
  1370. * signal. But if it does not, status is set to PROPAGATE to
  1371. * ensure that upon release, propagation continues.
  1372. * Additionally, we must loop in case a new node is added
  1373. * while we are doing this. Also, unlike other uses of
  1374. * unparkSuccessor, we need to know if CAS to reset status
  1375. * fails, if so rechecking.
  1376. */
  1377. for (;;) {
  1378. Node h = head;
  1379. if (h != null && h != tail) {
  1380. int ws = h.waitStatus;
  1381. if (ws == Node.SIGNAL) {
  1382. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  1383. continue; // loop to recheck cases
  1384. unparkSuccessor(h);
  1385. }
  1386. else if (ws == 0 &&
  1387. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  1388. continue; // loop on failed CAS
  1389. }
  1390. if (h == head) // loop if head changed
  1391. break;
  1392. }
  1393. }
  1394. /**
  1395. * Sets head of queue, and checks if successor may be waiting
  1396. * in shared mode, if so propagating if either propagate > 0 or
  1397. * PROPAGATE status was set.
  1398. *
  1399. * @param node the node
  1400. * @param propagate the return value from a tryAcquireShared
  1401. */
  1402. private void setHeadAndPropagate(Node node, int propagate) {
  1403. Node h = head; // Record old head for check below
  1404. setHead(node);
  1405. /*
  1406. * Try to signal next queued node if:
  1407. * Propagation was indicated by caller,
  1408. * or was recorded (as h.waitStatus either before
  1409. * or after setHead) by a previous operation
  1410. * (note: this uses sign-check of waitStatus because
  1411. * PROPAGATE status may transition to SIGNAL.)
  1412. * and
  1413. * The next node is waiting in shared mode,
  1414. * or we don't know, because it appears null
  1415. *
  1416. * The conservatism in both of these checks may cause
  1417. * unnecessary wake-ups, but only when there are multiple
  1418. * racing acquires/releases, so most need signals now or soon
  1419. * anyway.
  1420. */
  1421. if (propagate > 0 || h == null || h.waitStatus < 0 ||
  1422. (h = head) == null || h.waitStatus < 0) {
  1423. Node s = node.next;
  1424. if (s == null || s.isShared())
  1425. doReleaseShared();
  1426. }
  1427. }
  1428. // Utilities for various versions of acquire
  1429. /**
  1430. * Cancels an ongoing attempt to acquire.
  1431. *
  1432. * @param node the node
  1433. */
  1434. private void cancelAcquire(Node node) {
  1435. // Ignore if node doesn't exist
  1436. if (node == null)
  1437. return;
  1438. node.thread = null;
  1439. // Skip cancelled predecessors
  1440. Node pred = node.prev;
  1441. while (pred.waitStatus > 0)
  1442. node.prev = pred = pred.prev;
  1443. // predNext is the apparent node to unsplice. CASes below will
  1444. // fail if not, in which case, we lost race vs another cancel
  1445. // or signal, so no further action is necessary.
  1446. Node predNext = pred.next;
  1447. // Can use unconditional write instead of CAS here.
  1448. // After this atomic step, other Nodes can skip past us.
  1449. // Before, we are free of interference from other threads.
  1450. node.waitStatus = Node.CANCELLED;
  1451. // If we are the tail, remove ourselves.
  1452. if (node == tail && compareAndSetTail(node, pred)) {
  1453. compareAndSetNext(pred, predNext, null);
  1454. } else {
  1455. // If successor needs signal, try to set pred's next-link
  1456. // so it will get one. Otherwise wake it up to propagate.
  1457. int ws;
  1458. if (pred != head &&
  1459. ((ws = pred.waitStatus) == Node.SIGNAL ||
  1460. (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
  1461. pred.thread != null) {
  1462. Node next = node.next;
  1463. if (next != null && next.waitStatus <= 0)
  1464. compareAndSetNext(pred, predNext, next);
  1465. } else {
  1466. unparkSuccessor(node);
  1467. }
  1468. node.next = node; // help GC
  1469. }
  1470. }
  1471. /**
  1472. * Checks and updates status for a node that failed to acquire.
  1473. * Returns true if thread should block. This is the main signal
  1474. * control in all acquire loops. Requires that pred == node.prev.
  1475. *
  1476. * @param pred node's predecessor holding status
  1477. * @param node the node
  1478. * @return {@code true} if thread should block
  1479. */
  1480. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  1481. int ws = pred.waitStatus;
  1482. if (ws == Node.SIGNAL)
  1483. /*
  1484. * This node has already set status asking a release
  1485. * to signal it, so it can safely park.
  1486. */
  1487. return true;
  1488. if (ws > 0) {
  1489. /*
  1490. * Predecessor was cancelled. Skip over predecessors and
  1491. * indicate retry.
  1492. */
  1493. do {
  1494. node.prev = pred = pred.prev;
  1495. } while (pred.waitStatus > 0);
  1496. pred.next = node;
  1497. } else {
  1498. /*
  1499. * waitStatus must be 0 or PROPAGATE. Indicate that we
  1500. * need a signal, but don't park yet. Caller will need to
  1501. * retry to make sure it cannot acquire before parking.
  1502. */
  1503. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  1504. }
  1505. return false;
  1506. }
  1507. /**
  1508. * Convenience method to interrupt current thread.
  1509. */
  1510. static void selfInterrupt() {
  1511. Thread.currentThread().interrupt();
  1512. }
  1513. /**
  1514. * Convenience method to park and then check if interrupted
  1515. *
  1516. * @return {@code true} if interrupted
  1517. */
  1518. private final boolean parkAndCheckInterrupt() {
  1519. LockSupport.park(this);
  1520. return Thread.interrupted();
  1521. }
  1522. /*
  1523. * Various flavors of acquire, varying in exclusive/shared and
  1524. * control modes. Each is mostly the same, but annoyingly
  1525. * different. Only a little bit of factoring is possible due to
  1526. * interactions of exception mechanics (including ensuring that we
  1527. * cancel if tryAcquire throws exception) and other control, at
  1528. * least not without hurting performance too much.
  1529. */
  1530. /**
  1531. * Acquires in exclusive uninterruptible mode for thread already in
  1532. * queue. Used by condition wait methods as well as acquire.
  1533. *
  1534. * @param node the node
  1535. * @param arg the acquire argument
  1536. * @return {@code true} if interrupted while waiting
  1537. */
  1538. final boolean acquireQueued(final Node node, int arg) {
  1539. boolean failed = true;
  1540. try {
  1541. boolean interrupted = false;
  1542. for (;;) {
  1543. final Node p = node.predecessor();
  1544. if (p == head && tryAcquire(arg)) {
  1545. setHead(node);
  1546. p.next = null; // help GC
  1547. failed = false;
  1548. return interrupted;
  1549. }
  1550. if (shouldParkAfterFailedAcquire(p, node) &&
  1551. parkAndCheckInterrupt())
  1552. interrupted = true;
  1553. }
  1554. } finally {
  1555. if (failed)
  1556. cancelAcquire(node);
  1557. }
  1558. }
  1559. /**
  1560. * Acquires in exclusive interruptible mode.
  1561. * @param arg the acquire argument
  1562. */
  1563. private void doAcquireInterruptibly(int arg)
  1564. throws InterruptedException {
  1565. final Node node = addWaiter(Node.EXCLUSIVE);
  1566. boolean failed = true;
  1567. try {
  1568. for (;;) {
  1569. final Node p = node.predecessor();
  1570. if (p == head && tryAcquire(arg)) {
  1571. setHead(node);
  1572. p.next = null; // help GC
  1573. failed = false;
  1574. return;
  1575. }
  1576. if (shouldParkAfterFailedAcquire(p, node) &&
  1577. parkAndCheckInterrupt())
  1578. throw new InterruptedException();
  1579. }
  1580. } finally {
  1581. if (failed)
  1582. cancelAcquire(node);
  1583. }
  1584. }
  1585. /**
  1586. * Acquires in exclusive timed mode.
  1587. *
  1588. * @param arg the acquire argument
  1589. * @param nanosTimeout max wait time
  1590. * @return {@code true} if acquired
  1591. */
  1592. private boolean doAcquireNanos(int arg, long nanosTimeout)
  1593. throws InterruptedException {
  1594. if (nanosTimeout <= 0L)
  1595. return false;
  1596. final long deadline = System.nanoTime() + nanosTimeout;
  1597. final Node node = addWaiter(Node.EXCLUSIVE);
  1598. boolean failed = true;
  1599. try {
  1600. for (;;) {
  1601. final Node p = node.predecessor();
  1602. if (p == head && tryAcquire(arg)) {
  1603. setHead(node);
  1604. p.next = null; // help GC
  1605. failed = false;
  1606. return true;
  1607. }
  1608. nanosTimeout = deadline - System.nanoTime();
  1609. if (nanosTimeout <= 0L)
  1610. return false;
  1611. if (shouldParkAfterFailedAcquire(p, node) &&
  1612. nanosTimeout > spinForTimeoutThreshold)
  1613. LockSupport.parkNanos(this, nanosTimeout);
  1614. if (Thread.interrupted())
  1615. throw new InterruptedException();
  1616. }
  1617. } finally {
  1618. if (failed)
  1619. cancelAcquire(node);
  1620. }
  1621. }
  1622. /**
  1623. * Acquires in shared uninterruptible mode.
  1624. * @param arg the acquire argument
  1625. */
  1626. private void doAcquireShared(int arg) {
  1627. final Node node = addWaiter(Node.SHARED);
  1628. boolean failed = true;
  1629. try {
  1630. boolean interrupted = false;
  1631. for (;;) {
  1632. final Node p = node.predecessor();
  1633. if (p == head) {
  1634. int r = tryAcquireShared(arg);
  1635. if (r >= 0) {
  1636. setHeadAndPropagate(node, r);
  1637. p.next = null; // help GC
  1638. if (interrupted)
  1639. selfInterrupt();
  1640. failed = false;
  1641. return;
  1642. }
  1643. }
  1644. if (shouldParkAfterFailedAcquire(p, node) &&
  1645. parkAndCheckInterrupt())
  1646. interrupted = true;
  1647. }
  1648. } finally {
  1649. if (failed)
  1650. cancelAcquire(node);
  1651. }
  1652. }
  1653. /**
  1654. * Acquires in shared interruptible mode.
  1655. * @param arg the acquire argument
  1656. */
  1657. private void doAcquireSharedInterruptibly(int arg)
  1658. throws InterruptedException {
  1659. final Node node = addWaiter(Node.SHARED);
  1660. boolean failed = true;
  1661. try {
  1662. for (;;) {
  1663. final Node p = node.predecessor();
  1664. if (p == head) {
  1665. int r = tryAcquireShared(arg);
  1666. if (r >= 0) {
  1667. setHeadAndPropagate(node, r);
  1668. p.next = null; // help GC
  1669. failed = false;
  1670. return;
  1671. }
  1672. }
  1673. if (shouldParkAfterFailedAcquire(p, node) &&
  1674. parkAndCheckInterrupt())
  1675. throw new InterruptedException();
  1676. }
  1677. } finally {
  1678. if (failed)
  1679. cancelAcquire(node);
  1680. }
  1681. }
  1682. /**
  1683. * Acquires in shared timed mode.
  1684. *
  1685. * @param arg the acquire argument
  1686. * @param nanosTimeout max wait time
  1687. * @return {@code true} if acquired
  1688. */
  1689. private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
  1690. throws InterruptedException {
  1691. if (nanosTimeout <= 0L)
  1692. return false;
  1693. final long deadline = System.nanoTime() + nanosTimeout;
  1694. final Node node = addWaiter(Node.SHARED);
  1695. boolean failed = true;
  1696. try {
  1697. for (;;) {
  1698. final Node p = node.predecessor();
  1699. if (p == head) {
  1700. int r = tryAcquireShared(arg);
  1701. if (r >= 0) {
  1702. setHeadAndPropagate(node, r);
  1703. p.next = null; // help GC
  1704. failed = false;
  1705. return true;
  1706. }
  1707. }
  1708. nanosTimeout = deadline - System.nanoTime();
  1709. if (nanosTimeout <= 0L)
  1710. return false;
  1711. if (shouldParkAfterFailedAcquire(p, node) &&
  1712. nanosTimeout > spinForTimeoutThreshold)
  1713. LockSupport.parkNanos(this, nanosTimeout);
  1714. if (Thread.interrupted())
  1715. throw new InterruptedException();
  1716. }
  1717. } finally {
  1718. if (failed)
  1719. cancelAcquire(node);
  1720. }
  1721. }
  1722. // Main exported methods
  1723. /**
  1724. * Attempts to acquire in exclusive mode. This method should query
  1725. * if the state of the object permits it to be acquired in the
  1726. * exclusive mode, and if so to acquire it.
  1727. *
  1728. * <p>This method is always invoked by the thread performing
  1729. * acquire. If this method reports failure, the acquire method
  1730. * may queue the thread, if it is not already queued, until it is
  1731. * signalled by a release from some other thread. This can be used
  1732. * to implement method {@link Lock#tryLock()}.
  1733. *
  1734. * <p>The default
  1735. * implementation throws {@link UnsupportedOperationException}.
  1736. *
  1737. * @param arg the acquire argument. This value is always the one
  1738. * passed to an acquire method, or is the value saved on entry
  1739. * to a condition wait. The value is otherwise uninterpreted
  1740. * and can represent anything you like.
  1741. * @return {@code true} if successful. Upon success, this object has
  1742. * been acquired.
  1743. * @throws IllegalMonitorStateException if acquiring would place this
  1744. * synchronizer in an illegal state. This exception must be
  1745. * thrown in a consistent fashion for synchronization to work
  1746. * correctly.
  1747. * @throws UnsupportedOperationException if exclusive mode is not supported
  1748. */
  1749. protected boolean tryAcquire(int arg) {
  1750. throw new UnsupportedOperationException();
  1751. }
  1752. /**
  1753. * Attempts to set the state to reflect a release in exclusive
  1754. * mode.
  1755. *
  1756. * <p>This method is always invoked by the thread performing release.
  1757. *
  1758. * <p>The default implementation throws
  1759. * {@link UnsupportedOperationException}.
  1760. *
  1761. * @param arg the release argument. This value is always the one
  1762. * passed to a release method, or the current state value upon
  1763. * entry to a condition wait. The value is otherwise
  1764. * uninterpreted and can represent anything you like.
  1765. * @return {@code true} if this object is now in a fully released
  1766. * state, so that any waiting threads may attempt to acquire;
  1767. * and {@code false} otherwise.
  1768. * @throws IllegalMonitorStateException if releasing would place this
  1769. * synchronizer in an illegal state. This exception must be
  1770. * thrown in a consistent fashion for synchronization to work
  1771. * correctly.
  1772. * @throws UnsupportedOperationException if exclusive mode is not supported
  1773. */
  1774. protected boolean tryRelease(int arg) {
  1775. throw new UnsupportedOperationException();
  1776. }
  1777. /**
  1778. * Attempts to acquire in shared mode. This method should query if
  1779. * the state of the object permits it to be acquired in the shared
  1780. * mode, and if so to acquire it.
  1781. *
  1782. * <p>This method is always invoked by the thread performing
  1783. * acquire. If this method reports failure, the acquire method
  1784. * may queue the thread, if it is not already queued, until it is
  1785. * signalled by a release from some other thread.
  1786. *
  1787. * <p>The default implementation throws {@link
  1788. * UnsupportedOperationException}.
  1789. *
  1790. * @param arg the acquire argument. This value is always the one
  1791. * passed to an acquire method, or is the value saved on entry
  1792. * to a condition wait. The value is otherwise uninterpreted
  1793. * and can represent anything you like.
  1794. * @return a negative value on failure; zero if acquisition in shared
  1795. * mode succeeded but no subsequent shared-mode acquire can
  1796. * succeed; and a positive value if acquisition in shared
  1797. * mode succeeded and subsequent shared-mode acquires might
  1798. * also succeed, in which case a subsequent waiting thread
  1799. * must check availability. (Support for three different
  1800. * return values enables this method to be used in contexts
  1801. * where acquires only sometimes act exclusively.) Upon
  1802. * success, this object has been acquired.
  1803. * @throws IllegalMonitorStateException if acquiring would place this
  1804. * synchronizer in an illegal state. This exception must be
  1805. * thrown in a consistent fashion for synchronization to work
  1806. * correctly.
  1807. * @throws UnsupportedOperationException if shared mode is not supported
  1808. */
  1809. protected int tryAcquireShared(int arg) {
  1810. throw new UnsupportedOperationException();
  1811. }
  1812. /**
  1813. * Attempts to set the state to reflect a release in shared mode.
  1814. *
  1815. * <p>This method is always invoked by the thread performing release.
  1816. *
  1817. * <p>The default implementation throws
  1818. * {@link UnsupportedOperationException}.
  1819. *
  1820. * @param arg the release argument. This value is always the one
  1821. * passed to a release method, or the current state value upon
  1822. * entry to a condition wait. The value is otherwise
  1823. * uninterpreted and can represent anything you like.
  1824. * @return {@code true} if this release of shared mode may permit a
  1825. * waiting acquire (shared or exclusive) to succeed; and
  1826. * {@code false} otherwise
  1827. * @throws IllegalMonitorStateException if releasing would place this
  1828. * synchronizer in an illegal state. This exception must be
  1829. * thrown in a consistent fashion for synchronization to work
  1830. * correctly.
  1831. * @throws UnsupportedOperationException if shared mode is not supported
  1832. */
  1833. protected boolean tryReleaseShared(int arg) {
  1834. throw new UnsupportedOperationException();
  1835. }
  1836. /**
  1837. * Returns {@code true} if synchronization is held exclusively with
  1838. * respect to the current (calling) thread. This method is invoked
  1839. * upon each call to a non-waiting {@link ConditionObject} method.
  1840. * (Waiting methods instead invoke {@link #release}.)
  1841. *
  1842. * <p>The default implementation throws {@link
  1843. * UnsupportedOperationException}. This method is invoked
  1844. * internally only within {@link ConditionObject} methods, so need
  1845. * not be defined if conditions are not used.
  1846. *
  1847. * @return {@code true} if synchronization is held exclusively;
  1848. * {@code false} otherwise
  1849. * @throws UnsupportedOperationException if conditions are not supported
  1850. */
  1851. protected boolean isHeldExclusively() {
  1852. throw new UnsupportedOperationException();
  1853. }
  1854. /**
  1855. * Acquires in exclusive mode, ignoring interrupts. Implemented
  1856. * by invoking at least once {@link #tryAcquire},
  1857. * returning on success. Otherwise the thread is queued, possibly
  1858. * repeatedly blocking and unblocking, invoking {@link
  1859. * #tryAcquire} until success. This method can be used
  1860. * to implement method {@link Lock#lock}.
  1861. *
  1862. * @param arg the acquire argument. This value is conveyed to
  1863. * {@link #tryAcquire} but is otherwise uninterpreted and
  1864. * can represent anything you like.
  1865. */
  1866. public final void acquire(int arg) {
  1867. if (!tryAcquire(arg) &&
  1868. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  1869. selfInterrupt();
  1870. }
  1871. /**
  1872. * Acquires in exclusive mode, aborting if interrupted.
  1873. * Implemented by first checking interrupt status, then invoking
  1874. * at least once {@link #tryAcquire}, returning on
  1875. * success. Otherwise the thread is queued, possibly repeatedly
  1876. * blocking and unblocking, invoking {@link #tryAcquire}
  1877. * until success or the thread is interrupted. This method can be
  1878. * used to implement method {@link Lock#lockInterruptibly}.
  1879. *
  1880. * @param arg the acquire argument. This value is conveyed to
  1881. * {@link #tryAcquire} but is otherwise uninterpreted and
  1882. * can represent anything you like.
  1883. * @throws InterruptedException if the current thread is interrupted
  1884. */
  1885. public final void acquireInterruptibly(int arg)
  1886. throws InterruptedException {
  1887. if (Thread.interrupted())
  1888. throw new InterruptedException();
  1889. if (!tryAcquire(arg))
  1890. doAcquireInterruptibly(arg);
  1891. }
  1892. /**
  1893. * Attempts to acquire in exclusive mode, aborting if interrupted,
  1894. * and failing if the given timeout elapses. Implemented by first
  1895. * checking interrupt status, then invoking at least once {@link
  1896. * #tryAcquire}, returning on success. Otherwise, the thread is
  1897. * queued, possibly repeatedly blocking and unblocking, invoking
  1898. * {@link #tryAcquire} until success or the thread is interrupted
  1899. * or the timeout elapses. This method can be used to implement
  1900. * method {@link Lock#tryLock(long, TimeUnit)}.
  1901. *
  1902. * @param arg the acquire argument. This value is conveyed to
  1903. * {@link #tryAcquire} but is otherwise uninterpreted and
  1904. * can represent anything you like.
  1905. * @param nanosTimeout the maximum number of nanoseconds to wait
  1906. * @return {@code true} if acquired; {@code false} if timed out
  1907. * @throws InterruptedException if the current thread is interrupted
  1908. */
  1909. public final boolean tryAcquireNanos(int arg, long nanosTimeout)
  1910. throws InterruptedException {
  1911. if (Thread.interrupted())
  1912. throw new InterruptedException();
  1913. return tryAcquire(arg) ||
  1914. doAcquireNanos(arg, nanosTimeout);
  1915. }
  1916. /**
  1917. * Releases in exclusive mode. Implemented by unblocking one or
  1918. * more threads if {@link #tryRelease} returns true.
  1919. * This method can be used to implement method {@link Lock#unlock}.
  1920. *
  1921. * @param arg the release argument. This value is conveyed to
  1922. * {@link #tryRelease} but is otherwise uninterpreted and
  1923. * can represent anything you like.
  1924. * @return the value returned from {@link #tryRelease}
  1925. */
  1926. public final boolean release(int arg) {
  1927. if (tryRelease(arg)) {
  1928. Node h = head;
  1929. if (h != null && h.waitStatus != 0)
  1930. unparkSuccessor(h);
  1931. return true;
  1932. }
  1933. return false;
  1934. }
  1935. /**
  1936. * Acquires in shared mode, ignoring interrupts. Implemented by
  1937. * first invoking at least once {@link #tryAcquireShared},
  1938. * returning on success. Otherwise the thread is queued, possibly
  1939. * repeatedly blocking and unblocking, invoking {@link
  1940. * #tryAcquireShared} until success.
  1941. *
  1942. * @param arg the acquire argument. This value is conveyed to
  1943. * {@link #tryAcquireShared} but is otherwise uninterpreted
  1944. * and can represent anything you like.
  1945. */
  1946. public final void acquireShared(int arg) {
  1947. if (tryAcquireShared(arg) < 0)
  1948. doAcquireShared(arg);
  1949. }
  1950. /**
  1951. * Acquires in shared mode, aborting if interrupted. Implemented
  1952. * by first checking interrupt status, then invoking at least once
  1953. * {@link #tryAcquireShared}, returning on success. Otherwise the
  1954. * thread is queued, possibly repeatedly blocking and unblocking,
  1955. * invoking {@link #tryAcquireShared} until success or the thread
  1956. * is interrupted.
  1957. * @param arg the acquire argument.
  1958. * This value is conveyed to {@link #tryAcquireShared} but is
  1959. * otherwise uninterpreted and can represent anything
  1960. * you like.
  1961. * @throws InterruptedException if the current thread is interrupted
  1962. */
  1963. public final void acquireSharedInterruptibly(int arg)
  1964. throws InterruptedException {
  1965. if (Thread.interrupted())
  1966. throw new InterruptedException();
  1967. if (tryAcquireShared(arg) < 0)
  1968. doAcquireSharedInterruptibly(arg);
  1969. }
  1970. /**
  1971. * Attempts to acquire in shared mode, aborting if interrupted, and
  1972. * failing if the given timeout elapses. Implemented by first
  1973. * checking interrupt status, then invoking at least once {@link
  1974. * #tryAcquireShared}, returning on success. Otherwise, the
  1975. * thread is queued, possibly repeatedly blocking and unblocking,
  1976. * invoking {@link #tryAcquireShared} until success or the thread
  1977. * is interrupted or the timeout elapses.
  1978. *
  1979. * @param arg the acquire argument. This value is conveyed to
  1980. * {@link #tryAcquireShared} but is otherwise uninterpreted
  1981. * and can represent anything you like.
  1982. * @param nanosTimeout the maximum number of nanoseconds to wait
  1983. * @return {@code true} if acquired; {@code false} if timed out
  1984. * @throws InterruptedException if the current thread is interrupted
  1985. */
  1986. public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
  1987. throws InterruptedException {
  1988. if (Thread.interrupted())
  1989. throw new InterruptedException();
  1990. return tryAcquireShared(arg) >= 0 ||
  1991. doAcquireSharedNanos(arg, nanosTimeout);
  1992. }
  1993. /**
  1994. * Releases in shared mode. Implemented by unblocking one or more
  1995. * threads if {@link #tryReleaseShared} returns true.
  1996. *
  1997. * @param arg the release argument. This value is conveyed to
  1998. * {@link #tryReleaseShared} but is otherwise uninterpreted
  1999. * and can represent anything you like.
  2000. * @return the value returned from {@link #tryReleaseShared}
  2001. */
  2002. public final boolean releaseShared(int arg) {
  2003. if (tryReleaseShared(arg)) {
  2004. doReleaseShared();
  2005. return true;
  2006. }
  2007. return false;
  2008. }
  2009. // Queue inspection methods
  2010. /**
  2011. * Queries whether any threads are waiting to acquire. Note that
  2012. * because cancellations due to interrupts and timeouts may occur
  2013. * at any time, a {@code true} return does not guarantee that any
  2014. * other thread will ever acquire.
  2015. *
  2016. * <p>In this implementation, this operation returns in
  2017. * constant time.
  2018. *
  2019. * @return {@code true} if there may be other threads waiting to acquire
  2020. */
  2021. public final boolean hasQueuedThreads() {
  2022. return head != tail;
  2023. }
  2024. /**
  2025. * Queries whether any threads have ever contended to acquire this
  2026. * synchronizer; that is if an acquire method has ever blocked.
  2027. *
  2028. * <p>In this implementation, this operation returns in
  2029. * constant time.
  2030. *
  2031. * @return {@code true} if there has ever been contention
  2032. */
  2033. public final boolean hasContended() {
  2034. return head != null;
  2035. }
  2036. /**
  2037. * Returns the first (longest-waiting) thread in the queue, or
  2038. * {@code null} if no threads are currently queued.
  2039. *
  2040. * <p>In this implementation, this operation normally returns in
  2041. * constant time, but may iterate upon contention if other threads are
  2042. * concurrently modifying the queue.
  2043. *
  2044. * @return the first (longest-waiting) thread in the queue, or
  2045. * {@code null} if no threads are currently queued
  2046. */
  2047. public final Thread getFirstQueuedThread() {
  2048. // handle only fast path, else relay
  2049. return (head == tail) ? null : fullGetFirstQueuedThread();
  2050. }
  2051. /**
  2052. * Version of getFirstQueuedThread called when fastpath fails
  2053. */
  2054. private Thread fullGetFirstQueuedThread() {
  2055. /*
  2056. * The first node is normally head.next. Try to get its
  2057. * thread field, ensuring consistent reads: If thread
  2058. * field is nulled out or s.prev is no longer head, then
  2059. * some other thread(s) concurrently performed setHead in
  2060. * between some of our reads. We try this twice before
  2061. * resorting to traversal.
  2062. */
  2063. Node h, s;
  2064. Thread st;
  2065. if (((h = head) != null && (s = h.next) != null &&
  2066. s.prev == head && (st = s.thread) != null) ||
  2067. ((h = head) != null && (s = h.next) != null &&
  2068. s.prev == head && (st = s.thread) != null))
  2069. return st;
  2070. /*
  2071. * Head's next field might not have been set yet, or may have
  2072. * been unset after setHead. So we must check to see if tail
  2073. * is actually first node. If not, we continue on, safely
  2074. * traversing from tail back to head to find first,
  2075. * guaranteeing termination.
  2076. */
  2077. Node t = tail;
  2078. Thread firstThread = null;
  2079. while (t != null && t != head) {
  2080. Thread tt = t.thread;
  2081. if (tt != null)
  2082. firstThread = tt;
  2083. t = t.prev;
  2084. }
  2085. return firstThread;
  2086. }
  2087. /**
  2088. * Returns true if the given thread is currently queued.
  2089. *
  2090. * <p>This implementation traverses the queue to determine
  2091. * presence of the given thread.
  2092. *
  2093. * @param thread the thread
  2094. * @return {@code true} if the given thread is on the queue
  2095. * @throws NullPointerException if the thread is null
  2096. */
  2097. public final boolean isQueued(Thread thread) {
  2098. if (thread == null)
  2099. throw new NullPointerException();
  2100. for (Node p = tail; p != null; p = p.prev)
  2101. if (p.thread == thread)
  2102. return true;
  2103. return false;
  2104. }
  2105. /**
  2106. * Returns {@code true} if the apparent first queued thread, if one
  2107. * exists, is waiting in exclusive mode. If this method returns
  2108. * {@code true}, and the current thread is attempting to acquire in
  2109. * shared mode (that is, this method is invoked from {@link
  2110. * #tryAcquireShared}) then it is guaranteed that the current thread
  2111. * is not the first queued thread. Used only as a heuristic in
  2112. * ReentrantReadWriteLock.
  2113. */
  2114. final boolean apparentlyFirstQueuedIsExclusive() {
  2115. Node h, s;
  2116. return (h = head) != null &&
  2117. (s = h.next) != null &&
  2118. !s.isShared() &&
  2119. s.thread != null;
  2120. }
  2121. /**
  2122. * Queries whether any threads have been waiting to acquire longer
  2123. * than the current thread.
  2124. *
  2125. * <p>An invocation of this method is equivalent to (but may be
  2126. * more efficient than):
  2127. * <pre> {@code
  2128. * getFirstQueuedThread() != Thread.currentThread() &&
  2129. * hasQueuedThreads()}</pre>
  2130. *
  2131. * <p>Note that because cancellations due to interrupts and
  2132. * timeouts may occur at any time, a {@code true} return does not
  2133. * guarantee that some other thread will acquire before the current
  2134. * thread. Likewise, it is possible for another thread to win a
  2135. * race to enqueue after this method has returned {@code false},
  2136. * due to the queue being empty.
  2137. *
  2138. * <p>This method is designed to be used by a fair synchronizer to
  2139. * avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>.
  2140. * Such a synchronizer's {@link #tryAcquire} method should return
  2141. * {@code false}, and its {@link #tryAcquireShared} method should
  2142. * return a negative value, if this method returns {@code true}
  2143. * (unless this is a reentrant acquire). For example, the {@code
  2144. * tryAcquire} method for a fair, reentrant, exclusive mode
  2145. * synchronizer might look like this:
  2146. *
  2147. * <pre> {@code
  2148. * protected boolean tryAcquire(int arg) {
  2149. * if (isHeldExclusively()) {
  2150. * // A reentrant acquire; increment hold count
  2151. * return true;
  2152. * } else if (hasQueuedPredecessors()) {
  2153. * return false;
  2154. * } else {
  2155. * // try to acquire normally
  2156. * }
  2157. * }}</pre>
  2158. *
  2159. * @return {@code true} if there is a queued thread preceding the
  2160. * current thread, and {@code false} if the current thread
  2161. * is at the head of the queue or the queue is empty
  2162. * @since 1.7
  2163. */
  2164. public final boolean hasQueuedPredecessors() {
  2165. // The correctness of this depends on head being initialized
  2166. // before tail and on head.next being accurate if the current
  2167. // thread is first in queue.
  2168. Node t = tail; // Read fields in reverse initialization order
  2169. Node h = head;
  2170. Node s;
  2171. return h != t &&
  2172. ((s = h.next) == null || s.thread != Thread.currentThread());
  2173. }
  2174. // Instrumentation and monitoring methods
  2175. /**
  2176. * Returns an estimate of the number of threads waiting to
  2177. * acquire. The value is only an estimate because the number of
  2178. * threads may change dynamically while this method traverses
  2179. * internal data structures. This method is designed for use in
  2180. * monitoring system state, not for synchronization
  2181. * control.
  2182. *
  2183. * @return the estimated number of threads waiting to acquire
  2184. */
  2185. public final int getQueueLength() {
  2186. int n = 0;
  2187. for (Node p = tail; p != null; p = p.prev) {
  2188. if (p.thread != null)
  2189. ++n;
  2190. }
  2191. return n;
  2192. }
  2193. /**
  2194. * Returns a collection containing threads that may be waiting to
  2195. * acquire. Because the actual set of threads may change
  2196. * dynamically while constructing this result, the returned
  2197. * collection is only a best-effort estimate. The elements of the
  2198. * returned collection are in no particular order. This method is
  2199. * designed to facilitate construction of subclasses that provide
  2200. * more extensive monitoring facilities.
  2201. *
  2202. * @return the collection of threads
  2203. */
  2204. public final Collection<Thread> getQueuedThreads() {
  2205. ArrayList<Thread> list = new ArrayList<Thread>();
  2206. for (Node p = tail; p != null; p = p.prev) {
  2207. Thread t = p.thread;
  2208. if (t != null)
  2209. list.add(t);
  2210. }
  2211. return list;
  2212. }
  2213. /**
  2214. * Returns a collection containing threads that may be waiting to
  2215. * acquire in exclusive mode. This has the same properties
  2216. * as {@link #getQueuedThreads} except that it only returns
  2217. * those threads waiting due to an exclusive acquire.
  2218. *
  2219. * @return the collection of threads
  2220. */
  2221. public final Collection<Thread> getExclusiveQueuedThreads() {
  2222. ArrayList<Thread> list = new ArrayList<Thread>();
  2223. for (Node p = tail; p != null; p = p.prev) {
  2224. if (!p.isShared()) {
  2225. Thread t = p.thread;
  2226. if (t != null)
  2227. list.add(t);
  2228. }
  2229. }
  2230. return list;
  2231. }
  2232. /**
  2233. * Returns a collection containing threads that may be waiting to
  2234. * acquire in shared mode. This has the same properties
  2235. * as {@link #getQueuedThreads} except that it only returns
  2236. * those threads waiting due to a shared acquire.
  2237. *
  2238. * @return the collection of threads
  2239. */
  2240. public final Collection<Thread> getSharedQueuedThreads() {
  2241. ArrayList<Thread> list = new ArrayList<Thread>();
  2242. for (Node p = tail; p != null; p = p.prev) {
  2243. if (p.isShared()) {
  2244. Thread t = p.thread;
  2245. if (t != null)
  2246. list.add(t);
  2247. }
  2248. }
  2249. return list;
  2250. }
  2251. /**
  2252. * Returns a string identifying this synchronizer, as well as its state.
  2253. * The state, in brackets, includes the String {@code "State ="}
  2254. * followed by the current value of {@link #getState}, and either
  2255. * {@code "nonempty"} or {@code "empty"} depending on whether the
  2256. * queue is empty.
  2257. *
  2258. * @return a string identifying this synchronizer, as well as its state
  2259. */
  2260. public String toString() {
  2261. int s = getState();
  2262. String q = hasQueuedThreads() ? "non" : "";
  2263. return super.toString() +
  2264. "[State = " + s + ", " + q + "empty queue]";
  2265. }
  2266. // Internal support methods for Conditions
  2267. /**
  2268. * Returns true if a node, always one that was initially placed on
  2269. * a condition queue, is now waiting to reacquire on sync queue.
  2270. * @param node the node
  2271. * @return true if is reacquiring
  2272. */
  2273. final boolean isOnSyncQueue(Node node) {
  2274. if (node.waitStatus == Node.CONDITION || node.prev == null)
  2275. return false;
  2276. if (node.next != null) // If has successor, it must be on queue
  2277. return true;
  2278. /*
  2279. * node.prev can be non-null, but not yet on queue because
  2280. * the CAS to place it on queue can fail. So we have to
  2281. * traverse from tail to make sure it actually made it. It
  2282. * will always be near the tail in calls to this method, and
  2283. * unless the CAS failed (which is unlikely), it will be
  2284. * there, so we hardly ever traverse much.
  2285. */
  2286. return findNodeFromTail(node);
  2287. }
  2288. /**
  2289. * Returns true if node is on sync queue by searching backwards from tail.
  2290. * Called only when needed by isOnSyncQueue.
  2291. * @return true if present
  2292. */
  2293. private boolean findNodeFromTail(Node node) {
  2294. Node t = tail;
  2295. for (;;) {
  2296. if (t == node)
  2297. return true;
  2298. if (t == null)
  2299. return false;
  2300. t = t.prev;
  2301. }
  2302. }
  2303. /**
  2304. * Transfers a node from a condition queue onto sync queue.
  2305. * Returns true if successful.
  2306. * @param node the node
  2307. * @return true if successfully transferred (else the node was
  2308. * cancelled before signal)
  2309. */
  2310. final boolean transferForSignal(Node node) {
  2311. /*
  2312. * If cannot change waitStatus, the node has been cancelled.
  2313. */
  2314. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  2315. return false;
  2316. /*
  2317. * Splice onto queue and try to set waitStatus of predecessor to
  2318. * indicate that thread is (probably) waiting. If cancelled or
  2319. * attempt to set waitStatus fails, wake up to resync (in which
  2320. * case the waitStatus can be transiently and harmlessly wrong).
  2321. */
  2322. Node p = enq(node);
  2323. int ws = p.waitStatus;
  2324. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  2325. LockSupport.unpark(node.thread);
  2326. return true;
  2327. }
  2328. /**
  2329. * Transfers node, if necessary, to sync queue after a cancelled wait.
  2330. * Returns true if thread was cancelled before being signalled.
  2331. *
  2332. * @param node the node
  2333. * @return true if cancelled before the node was signalled
  2334. */
  2335. final boolean transferAfterCancelledWait(Node node) {
  2336. if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
  2337. enq(node);
  2338. return true;
  2339. }
  2340. /*
  2341. * If we lost out to a signal(), then we can't proceed
  2342. * until it finishes its enq(). Cancelling during an
  2343. * incomplete transfer is both rare and transient, so just
  2344. * spin.
  2345. */
  2346. while (!isOnSyncQueue(node))
  2347. Thread.yield();
  2348. return false;
  2349. }
  2350. /**
  2351. * Invokes release with current state value; returns saved state.
  2352. * Cancels node and throws exception on failure.
  2353. * @param node the condition node for this wait
  2354. * @return previous sync state
  2355. */
  2356. final int fullyRelease(Node node) {
  2357. boolean failed = true;
  2358. try {
  2359. int savedState = getState();
  2360. if (release(savedState)) {
  2361. failed = false;
  2362. return savedState;
  2363. } else {
  2364. throw new IllegalMonitorStateException();
  2365. }
  2366. } finally {
  2367. if (failed)
  2368. node.waitStatus = Node.CANCELLED;
  2369. }
  2370. }
  2371. // Instrumentation methods for conditions
  2372. /**
  2373. * Queries whether the given ConditionObject
  2374. * uses this synchronizer as its lock.
  2375. *
  2376. * @param condition the condition
  2377. * @return {@code true} if owned
  2378. * @throws NullPointerException if the condition is null
  2379. */
  2380. public final boolean owns(ConditionObject condition) {
  2381. return condition.isOwnedBy(this);
  2382. }
  2383. /**
  2384. * Queries whether any threads are waiting on the given condition
  2385. * associated with this synchronizer. Note that because timeouts
  2386. * and interrupts may occur at any time, a {@code true} return
  2387. * does not guarantee that a future {@code signal} will awaken
  2388. * any threads. This method is designed primarily for use in
  2389. * monitoring of the system state.
  2390. *
  2391. * @param condition the condition
  2392. * @return {@code true} if there are any waiting threads
  2393. * @throws IllegalMonitorStateException if exclusive synchronization
  2394. * is not held
  2395. * @throws IllegalArgumentException if the given condition is
  2396. * not associated with this synchronizer
  2397. * @throws NullPointerException if the condition is null
  2398. */
  2399. public final boolean hasWaiters(ConditionObject condition) {
  2400. if (!owns(condition))
  2401. throw new IllegalArgumentException("Not owner");
  2402. return condition.hasWaiters();
  2403. }
  2404. /**
  2405. * Returns an estimate of the number of threads waiting on the
  2406. * given condition associated with this synchronizer. Note that
  2407. * because timeouts and interrupts may occur at any time, the
  2408. * estimate serves only as an upper bound on the actual number of
  2409. * waiters. This method is designed for use in monitoring of the
  2410. * system state, not for synchronization control.
  2411. *
  2412. * @param condition the condition
  2413. * @return the estimated number of waiting threads
  2414. * @throws IllegalMonitorStateException if exclusive synchronization
  2415. * is not held
  2416. * @throws IllegalArgumentException if the given condition is
  2417. * not associated with this synchronizer
  2418. * @throws NullPointerException if the condition is null
  2419. */
  2420. public final int getWaitQueueLength(ConditionObject condition) {
  2421. if (!owns(condition))
  2422. throw new IllegalArgumentException("Not owner");
  2423. return condition.getWaitQueueLength();
  2424. }
  2425. /**
  2426. * Returns a collection containing those threads that may be
  2427. * waiting on the given condition associated with this
  2428. * synchronizer. Because the actual set of threads may change
  2429. * dynamically while constructing this result, the returned
  2430. * collection is only a best-effort estimate. The elements of the
  2431. * returned collection are in no particular order.
  2432. *
  2433. * @param condition the condition
  2434. * @return the collection of threads
  2435. * @throws IllegalMonitorStateException if exclusive synchronization
  2436. * is not held
  2437. * @throws IllegalArgumentException if the given condition is
  2438. * not associated with this synchronizer
  2439. * @throws NullPointerException if the condition is null
  2440. */
  2441. public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
  2442. if (!owns(condition))
  2443. throw new IllegalArgumentException("Not owner");
  2444. return condition.getWaitingThreads();
  2445. }
  2446. /**
  2447. * Condition implementation for a {@link
  2448. * AbstractQueuedSynchronizer} serving as the basis of a {@link
  2449. * Lock} implementation.
  2450. *
  2451. * <p>Method documentation for this class describes mechanics,
  2452. * not behavioral specifications from the point of view of Lock
  2453. * and Condition users. Exported versions of this class will in
  2454. * general need to be accompanied by documentation describing
  2455. * condition semantics that rely on those of the associated
  2456. * {@code AbstractQueuedSynchronizer}.
  2457. *
  2458. * <p>This class is Serializable, but all fields are transient,
  2459. * so deserialized conditions have no waiters.
  2460. */
  2461. public class ConditionObject implements Condition, java.io.Serializable {
  2462. private static final long serialVersionUID = 1173984872572414699L;
  2463. /** First node of condition queue. */
  2464. private transient Node firstWaiter;
  2465. /** Last node of condition queue. */
  2466. private transient Node lastWaiter;
  2467. /**
  2468. * Creates a new {@code ConditionObject} instance.
  2469. */
  2470. public ConditionObject() { }
  2471. // Internal methods
  2472. /**
  2473. * Adds a new waiter to wait queue.
  2474. * @return its new wait node
  2475. */
  2476. private Node addConditionWaiter() {
  2477. Node t = lastWaiter;
  2478. // If lastWaiter is cancelled, clean out.
  2479. if (t != null && t.waitStatus != Node.CONDITION) {
  2480. unlinkCancelledWaiters();
  2481. t = lastWaiter;
  2482. }
  2483. Node node = new Node(Thread.currentThread(), Node.CONDITION);
  2484. if (t == null)
  2485. firstWaiter = node;
  2486. else
  2487. t.nextWaiter = node;
  2488. lastWaiter = node;
  2489. return node;
  2490. }
  2491. /**
  2492. * Removes and transfers nodes until hit non-cancelled one or
  2493. * null. Split out from signal in part to encourage compilers
  2494. * to inline the case of no waiters.
  2495. * @param first (non-null) the first node on condition queue
  2496. */
  2497. private void doSignal(Node first) {
  2498. do {
  2499. if ( (firstWaiter = first.nextWaiter) == null)
  2500. lastWaiter = null;
  2501. first.nextWaiter = null;
  2502. } while (!transferForSignal(first) &&
  2503. (first = firstWaiter) != null);
  2504. }
  2505. /**
  2506. * Removes and transfers all nodes.
  2507. * @param first (non-null) the first node on condition queue
  2508. */
  2509. private void doSignalAll(Node first) {
  2510. lastWaiter = firstWaiter = null;
  2511. do {
  2512. Node next = first.nextWaiter;
  2513. first.nextWaiter = null;
  2514. transferForSignal(first);
  2515. first = next;
  2516. } while (first != null);
  2517. }
  2518. /**
  2519. * Unlinks cancelled waiter nodes from condition queue.
  2520. * Called only while holding lock. This is called when
  2521. * cancellation occurred during condition wait, and upon
  2522. * insertion of a new waiter when lastWaiter is seen to have
  2523. * been cancelled. This method is needed to avoid garbage
  2524. * retention in the absence of signals. So even though it may
  2525. * require a full traversal, it comes into play only when
  2526. * timeouts or cancellations occur in the absence of
  2527. * signals. It traverses all nodes rather than stopping at a
  2528. * particular target to unlink all pointers to garbage nodes
  2529. * without requiring many re-traversals during cancellation
  2530. * storms.
  2531. */
  2532. private void unlinkCancelledWaiters() {
  2533. Node t = firstWaiter;
  2534. Node trail = null;
  2535. while (t != null) {
  2536. Node next = t.nextWaiter;
  2537. if (t.waitStatus != Node.CONDITION) {
  2538. t.nextWaiter = null;
  2539. if (trail == null)
  2540. firstWaiter = next;
  2541. else
  2542. trail.nextWaiter = next;
  2543. if (next == null)
  2544. lastWaiter = trail;
  2545. }
  2546. else
  2547. trail = t;
  2548. t = next;
  2549. }
  2550. }
  2551. // public methods
  2552. /**
  2553. * Moves the longest-waiting thread, if one exists, from the
  2554. * wait queue for this condition to the wait queue for the
  2555. * owning lock.
  2556. *
  2557. * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  2558. * returns {@code false}
  2559. */
  2560. public final void signal() {
  2561. if (!isHeldExclusively())
  2562. throw new IllegalMonitorStateException();
  2563. Node first = firstWaiter;
  2564. if (first != null)
  2565. doSignal(first);
  2566. }
  2567. /**
  2568. * Moves all threads from the wait queue for this condition to
  2569. * the wait queue for the owning lock.
  2570. *
  2571. * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  2572. * returns {@code false}
  2573. */
  2574. public final void signalAll() {
  2575. if (!isHeldExclusively())
  2576. throw new IllegalMonitorStateException();
  2577. Node first = firstWaiter;
  2578. if (first != null)
  2579. doSignalAll(first);
  2580. }
  2581. /**
  2582. * Implements uninterruptible condition wait.
  2583. * <ol>
  2584. * <li> Save lock state returned by {@link #getState}.
  2585. * <li> Invoke {@link #release} with saved state as argument,
  2586. * throwing IllegalMonitorStateException if it fails.
  2587. * <li> Block until signalled.
  2588. * <li> Reacquire by invoking specialized version of
  2589. * {@link #acquire} with saved state as argument.
  2590. * </ol>
  2591. */
  2592. public final void awaitUninterruptibly() {
  2593. Node node = addConditionWaiter();
  2594. int savedState = fullyRelease(node);
  2595. boolean interrupted = false;
  2596. while (!isOnSyncQueue(node)) {
  2597. LockSupport.park(this);
  2598. if (Thread.interrupted())
  2599. interrupted = true;
  2600. }
  2601. if (acquireQueued(node, savedState) || interrupted)
  2602. selfInterrupt();
  2603. }
  2604. /*
  2605. * For interruptible waits, we need to track whether to throw
  2606. * InterruptedException, if interrupted while blocked on
  2607. * condition, versus reinterrupt current thread, if
  2608. * interrupted while blocked waiting to re-acquire.
  2609. */
  2610. /** Mode meaning to reinterrupt on exit from wait */
  2611. private static final int REINTERRUPT = 1;
  2612. /** Mode meaning to throw InterruptedException on exit from wait */
  2613. private static final int THROW_IE = -1;
  2614. /**
  2615. * Checks for interrupt, returning THROW_IE if interrupted
  2616. * before signalled, REINTERRUPT if after signalled, or
  2617. * 0 if not interrupted.
  2618. */
  2619. private int checkInterruptWhileWaiting(Node node) {
  2620. return Thread.interrupted() ?
  2621. (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
  2622. 0;
  2623. }
  2624. /**
  2625. * Throws InterruptedException, reinterrupts current thread, or
  2626. * does nothing, depending on mode.
  2627. */
  2628. private void reportInterruptAfterWait(int interruptMode)
  2629. throws InterruptedException {
  2630. if (interruptMode == THROW_IE)
  2631. throw new InterruptedException();
  2632. else if (interruptMode == REINTERRUPT)
  2633. selfInterrupt();
  2634. }
  2635. /**
  2636. * Implements interruptible condition wait.
  2637. * <ol>
  2638. * <li> If current thread is interrupted, throw InterruptedException.
  2639. * <li> Save lock state returned by {@link #getState}.
  2640. * <li> Invoke {@link #release} with saved state as argument,
  2641. * throwing IllegalMonitorStateException if it fails.
  2642. * <li> Block until signalled or interrupted.
  2643. * <li> Reacquire by invoking specialized version of
  2644. * {@link #acquire} with saved state as argument.
  2645. * <li> If interrupted while blocked in step 4, throw InterruptedException.
  2646. * </ol>
  2647. */
  2648. public final void await() throws InterruptedException {
  2649. if (Thread.interrupted())
  2650. throw new InterruptedException();
  2651. Node node = addConditionWaiter();
  2652. int savedState = fullyRelease(node);
  2653. int interruptMode = 0;
  2654. while (!isOnSyncQueue(node)) {
  2655. LockSupport.park(this);
  2656. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  2657. break;
  2658. }
  2659. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  2660. interruptMode = REINTERRUPT;
  2661. if (node.nextWaiter != null) // clean up if cancelled
  2662. unlinkCancelledWaiters();
  2663. if (interruptMode != 0)
  2664. reportInterruptAfterWait(interruptMode);
  2665. }
  2666. /**
  2667. * Implements timed condition wait.
  2668. * <ol>
  2669. * <li> If current thread is interrupted, throw InterruptedException.
  2670. * <li> Save lock state returned by {@link #getState}.
  2671. * <li> Invoke {@link #release} with saved state as argument,
  2672. * throwing IllegalMonitorStateException if it fails.
  2673. * <li> Block until signalled, interrupted, or timed out.
  2674. * <li> Reacquire by invoking specialized version of
  2675. * {@link #acquire} with saved state as argument.
  2676. * <li> If interrupted while blocked in step 4, throw InterruptedException.
  2677. * </ol>
  2678. */
  2679. public final long awaitNanos(long nanosTimeout)
  2680. throws InterruptedException {
  2681. if (Thread.interrupted())
  2682. throw new InterruptedException();
  2683. Node node = addConditionWaiter();
  2684. int savedState = fullyRelease(node);
  2685. final long deadline = System.nanoTime() + nanosTimeout;
  2686. int interruptMode = 0;
  2687. while (!isOnSyncQueue(node)) {
  2688. if (nanosTimeout <= 0L) {
  2689. transferAfterCancelledWait(node);
  2690. break;
  2691. }
  2692. if (nanosTimeout >= spinForTimeoutThreshold)
  2693. LockSupport.parkNanos(this, nanosTimeout);
  2694. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  2695. break;
  2696. nanosTimeout = deadline - System.nanoTime();
  2697. }
  2698. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  2699. interruptMode = REINTERRUPT;
  2700. if (node.nextWaiter != null)
  2701. unlinkCancelledWaiters();
  2702. if (interruptMode != 0)
  2703. reportInterruptAfterWait(interruptMode);
  2704. return deadline - System.nanoTime();
  2705. }
  2706. /**
  2707. * Implements absolute timed condition wait.
  2708. * <ol>
  2709. * <li> If current thread is interrupted, throw InterruptedException.
  2710. * <li> Save lock state returned by {@link #getState}.
  2711. * <li> Invoke {@link #release} with saved state as argument,
  2712. * throwing IllegalMonitorStateException if it fails.
  2713. * <li> Block until signalled, interrupted, or timed out.
  2714. * <li> Reacquire by invoking specialized version of
  2715. * {@link #acquire} with saved state as argument.
  2716. * <li> If interrupted while blocked in step 4, throw InterruptedException.
  2717. * <li> If timed out while blocked in step 4, return false, else true.
  2718. * </ol>
  2719. */
  2720. public final boolean awaitUntil(Date deadline)
  2721. throws InterruptedException {
  2722. long abstime = deadline.getTime();
  2723. if (Thread.interrupted())
  2724. throw new InterruptedException();
  2725. Node node = addConditionWaiter();
  2726. int savedState = fullyRelease(node);
  2727. boolean timedout = false;
  2728. int interruptMode = 0;
  2729. while (!isOnSyncQueue(node)) {
  2730. if (System.currentTimeMillis() > abstime) {
  2731. timedout = transferAfterCancelledWait(node);
  2732. break;
  2733. }
  2734. LockSupport.parkUntil(this, abstime);
  2735. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  2736. break;
  2737. }
  2738. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  2739. interruptMode = REINTERRUPT;
  2740. if (node.nextWaiter != null)
  2741. unlinkCancelledWaiters();
  2742. if (interruptMode != 0)
  2743. reportInterruptAfterWait(interruptMode);
  2744. return !timedout;
  2745. }
  2746. /**
  2747. * Implements timed condition wait.
  2748. * <ol>
  2749. * <li> If current thread is interrupted, throw InterruptedException.
  2750. * <li> Save lock state returned by {@link #getState}.
  2751. * <li> Invoke {@link #release} with saved state as argument,
  2752. * throwing IllegalMonitorStateException if it fails.
  2753. * <li> Block until signalled, interrupted, or timed out.
  2754. * <li> Reacquire by invoking specialized version of
  2755. * {@link #acquire} with saved state as argument.
  2756. * <li> If interrupted while blocked in step 4, throw InterruptedException.
  2757. * <li> If timed out while blocked in step 4, return false, else true.
  2758. * </ol>
  2759. */
  2760. public final boolean await(long time, TimeUnit unit)
  2761. throws InterruptedException {
  2762. long nanosTimeout = unit.toNanos(time);
  2763. if (Thread.interrupted())
  2764. throw new InterruptedException();
  2765. Node node = addConditionWaiter();
  2766. int savedState = fullyRelease(node);
  2767. final long deadline = System.nanoTime() + nanosTimeout;
  2768. boolean timedout = false;
  2769. int interruptMode = 0;
  2770. while (!isOnSyncQueue(node)) {
  2771. if (nanosTimeout <= 0L) {
  2772. timedout = transferAfterCancelledWait(node);
  2773. break;
  2774. }
  2775. if (nanosTimeout >= spinForTimeoutThreshold)
  2776. LockSupport.parkNanos(this, nanosTimeout);
  2777. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  2778. break;
  2779. nanosTimeout = deadline - System.nanoTime();
  2780. }
  2781. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  2782. interruptMode = REINTERRUPT;
  2783. if (node.nextWaiter != null)
  2784. unlinkCancelledWaiters();
  2785. if (interruptMode != 0)
  2786. reportInterruptAfterWait(interruptMode);
  2787. return !timedout;
  2788. }
  2789. // support for instrumentation
  2790. /**
  2791. * Returns true if this condition was created by the given
  2792. * synchronization object.
  2793. *
  2794. * @return {@code true} if owned
  2795. */
  2796. final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
  2797. return sync == AbstractQueuedSynchronizer.this;
  2798. }
  2799. /**
  2800. * Queries whether any threads are waiting on this condition.
  2801. * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
  2802. *
  2803. * @return {@code true} if there are any waiting threads
  2804. * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  2805. * returns {@code false}
  2806. */
  2807. protected final boolean hasWaiters() {
  2808. if (!isHeldExclusively())
  2809. throw new IllegalMonitorStateException();
  2810. for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
  2811. if (w.waitStatus == Node.CONDITION)
  2812. return true;
  2813. }
  2814. return false;
  2815. }
  2816. /**
  2817. * Returns an estimate of the number of threads waiting on
  2818. * this condition.
  2819. * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
  2820. *
  2821. * @return the estimated number of waiting threads
  2822. * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  2823. * returns {@code false}
  2824. */
  2825. protected final int getWaitQueueLength() {
  2826. if (!isHeldExclusively())
  2827. throw new IllegalMonitorStateException();
  2828. int n = 0;
  2829. for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
  2830. if (w.waitStatus == Node.CONDITION)
  2831. ++n;
  2832. }
  2833. return n;
  2834. }
  2835. /**
  2836. * Returns a collection containing those threads that may be
  2837. * waiting on this Condition.
  2838. * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
  2839. *
  2840. * @return the collection of threads
  2841. * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  2842. * returns {@code false}
  2843. */
  2844. protected final Collection<Thread> getWaitingThreads() {
  2845. if (!isHeldExclusively())
  2846. throw new IllegalMonitorStateException();
  2847. ArrayList<Thread> list = new ArrayList<Thread>();
  2848. for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
  2849. if (w.waitStatus == Node.CONDITION) {
  2850. Thread t = w.thread;
  2851. if (t != null)
  2852. list.add(t);
  2853. }
  2854. }
  2855. return list;
  2856. }
  2857. }
  2858. /**
  2859. * Setup to support compareAndSet. We need to natively implement
  2860. * this here: For the sake of permitting future enhancements, we
  2861. * cannot explicitly subclass AtomicInteger, which would be
  2862. * efficient and useful otherwise. So, as the lesser of evils, we
  2863. * natively implement using hotspot intrinsics API. And while we
  2864. * are at it, we do the same for other CASable fields (which could
  2865. * otherwise be done with atomic field updaters).
  2866. */
  2867. private static final Unsafe unsafe = Unsafe.getUnsafe();
  2868. private static final long stateOffset;
  2869. private static final long headOffset;
  2870. private static final long tailOffset;
  2871. private static final long waitStatusOffset;
  2872. private static final long nextOffset;
  2873. static {
  2874. try {
  2875. stateOffset = unsafe.objectFieldOffset
  2876. (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
  2877. headOffset = unsafe.objectFieldOffset
  2878. (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
  2879. tailOffset = unsafe.objectFieldOffset
  2880. (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
  2881. waitStatusOffset = unsafe.objectFieldOffset
  2882. (Node.class.getDeclaredField("waitStatus"));
  2883. nextOffset = unsafe.objectFieldOffset
  2884. (Node.class.getDeclaredField("next"));
  2885. } catch (Exception ex) { throw new Error(ex); }
  2886. }
  2887. /**
  2888. * CAS head field. Used only by enq.
  2889. */
  2890. private final boolean compareAndSetHead(Node update) {
  2891. return unsafe.compareAndSwapObject(this, headOffset, null, update);
  2892. }
  2893. /**
  2894. * CAS tail field. Used only by enq.
  2895. */
  2896. private final boolean compareAndSetTail(Node expect, Node update) {
  2897. return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
  2898. }
  2899. /**
  2900. * CAS waitStatus field of a node.
  2901. */
  2902. private static final boolean compareAndSetWaitStatus(Node node,
  2903. int expect,
  2904. int update) {
  2905. return unsafe.compareAndSwapInt(node, waitStatusOffset,
  2906. expect, update);
  2907. }
  2908. /**
  2909. * CAS next field of a node.
  2910. */
  2911. private static final boolean compareAndSetNext(Node node,
  2912. Node expect,
  2913. Node update) {
  2914. return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
  2915. }
  2916. }

本文参考 https://blog.csdn.net/txk396879586/article/details/123055812

发表评论

表情:
评论列表 (有 0 条评论,254人围观)

还没有评论,来说两句吧...

相关阅读

    相关 分析:AQS

    在开始这篇源码之前,最好先看下转载整理的[这篇文章][Link 1],有很多值得学习的地方。AQS是用来构建锁或者其他同步组件的基础框架。总体来说,它使用一个 int 成员变量