AQS-CountDownLatch&CyclicBarrier&Semaphore

淩亂°似流年 2022-12-08 14:24 197阅读 0赞

AQS-CountDownLatch&CyclicBarrier&Semaphore

文章目录

  • AQS-CountDownLatch&CyclicBarrier&Semaphore
    • CountDownLatch
      • 源码分析
    • Semaphore
      • 源码分析
    • CyclicBarrier
      • 源码分析

CountDownLatch

CountDownLatch是JUC包下的一个基于AQS实现的并发工具类,利用他可以实现类似计数器的功能,比如有一个任务A,他要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现

简单使用demo:

  1. public static void main(String[] args) throws Exception{
  2. final CountDownLatch countDownLatch = new CountDownLatch(2);
  3. Thread thread1 = new Thread(()->{
  4. try {
  5. System.out.println("线程一执行中。。。。");
  6. Thread.sleep(2000);
  7. System.out.println("线程一执行完成。");
  8. countDownLatch.countDown();
  9. }catch (Exception e){
  10. }
  11. },"t1");
  12. Thread thread2 = new Thread(()->{
  13. try {
  14. System.out.println("线程二执行中。。。。");
  15. Thread.sleep(2000);
  16. System.out.println("线程二执行完成。");
  17. countDownLatch.countDown();
  18. }catch (Exception e){
  19. }
  20. },"t2");
  21. thread1.start();
  22. thread2.start();
  23. countDownLatch.await();
  24. System.out.println("----------线程一二执行完成,继续执行主线程");
  25. }

执行结果:

在这里插入图片描述

CountDownLatch中最重要的三个方法:

  1. public void countDown() {
  2. }; //将count值减1
  3. public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
  4. }; //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
  5. public void await() throws InterruptedException {
  6. }; //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行

源码分析

构造方法:

传入计数大小,并且实例化同步器

  1. public CountDownLatch(int count) {
  2. if (count < 0) throw new IllegalArgumentException("count < 0");
  3. this.sync = new Sync(count);
  4. }

同步器实现:

  1. // 基于AQS实现
  2. private static final class Sync extends AbstractQueuedSynchronizer {
  3. private static final long serialVersionUID = 4982264981922014374L;
  4. Sync(int count) {
  5. setState(count);
  6. }
  7. int getCount() {
  8. return getState();
  9. }
  10. // 获取共享锁
  11. protected int tryAcquireShared(int acquires) {
  12. return (getState() == 0) ? 1 : -1;
  13. }
  14. // 释放共享锁
  15. protected boolean tryReleaseShared(int releases) {
  16. // Decrement count; signal when transition to zero
  17. for (;;) {
  18. int c = getState();
  19. if (c == 0)
  20. return false;
  21. int nextc = c-1;
  22. if (compareAndSetState(c, nextc))
  23. return nextc == 0;
  24. }
  25. }
  26. }

await方法:

  1. public void await() throws InterruptedException {
  2. // await方法就是可中断的获取共享锁
  3. sync.acquireSharedInterruptibly(1);
  4. }
  5. protected int tryAcquireShared(int acquires) {
  6. return (getState() == 0) ? 1 : -1;// getState中获取的state,state是我们传入的count,即count != 0返回-1
  7. }
  8. public final void acquireSharedInterruptibly(int arg)
  9. throws InterruptedException {
  10. if (Thread.interrupted())
  11. throw new InterruptedException();
  12. if (tryAcquireShared(arg) < 0)
  13. doAcquireSharedInterruptibly(arg);
  14. }
  15. // 返回-1后就会将其线程封装为节点链接到等待队列中,自旋获取共享锁
  16. private void doAcquireSharedInterruptibly(int arg)
  17. throws InterruptedException {
  18. final Node node = addWaiter(Node.SHARED);
  19. boolean failed = true;
  20. try {
  21. for (;;) {
  22. final Node p = node.predecessor();
  23. if (p == head) {
  24. int r = tryAcquireShared(arg);
  25. if (r >= 0) {
  26. setHeadAndPropagate(node, r);
  27. p.next = null; // help GC
  28. failed = false;
  29. return;
  30. }
  31. }
  32. if (shouldParkAfterFailedAcquire(p, node) &&
  33. parkAndCheckInterrupt())
  34. throw new InterruptedException();
  35. }
  36. } finally {
  37. if (failed)
  38. cancelAcquire(node);
  39. }
  40. }

countDown方法

  1. public void countDown() {
  2. // countDown方法其实就是释放共享锁
  3. sync.releaseShared(1);
  4. }

Semaphore

Semaphore是和CountDownLatch一样,也是JUC包下的并发工具类,他可以控制并发访问资源的线程数通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。

  1. public class Test {
  2. public static void main(String[] args) {
  3. int N = 8; //工人数
  4. Semaphore semaphore = new Semaphore(5); //机器数目
  5. for(int i=0;i<N;i++)
  6. new Worker(i,semaphore).start();
  7. }
  8. static class Worker extends Thread{
  9. private int num;
  10. private Semaphore semaphore;
  11. public Worker(int num,Semaphore semaphore){
  12. this.num = num;
  13. this.semaphore = semaphore;
  14. }
  15. @Override
  16. public void run() {
  17. try {
  18. semaphore.acquire();
  19. System.out.println("工人"+this.num+"占用一个机器在生产...");
  20. Thread.sleep(2000);
  21. System.out.println("工人"+this.num+"释放出机器");
  22. semaphore.release();
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. }
  28. }

执行结果:

  1. 工人0占用一个机器在生产...
  2. 工人1占用一个机器在生产...
  3. 工人2占用一个机器在生产...
  4. 工人4占用一个机器在生产...
  5. 工人5占用一个机器在生产...
  6. 工人0释放出机器
  7. 工人2释放出机器
  8. 工人3占用一个机器在生产...
  9. 工人7占用一个机器在生产...
  10. 工人4释放出机器
  11. 工人5释放出机器
  12. 工人1释放出机器
  13. 工人6占用一个机器在生产...
  14. 工人3释放出机器
  15. 工人7释放出机器
  16. 工人6释放出机器

源码分析

构造方法:

  1. public Semaphore(int permits) {
  2. // 直接传入许可证数目
  3. sync = new NonfairSync(permits);
  4. }
  5. public Semaphore(int permits, boolean fair) {
  6. // 传入许可证加是否是公平锁的标志,等待时间越久的越先获取许可
  7. sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  8. }

获取许可:

  1. public void acquire() throws InterruptedException {
  2. // 获取共享锁
  3. sync.acquireSharedInterruptibly(1);
  4. }
  5. public final void acquireSharedInterruptibly(int arg)
  6. throws InterruptedException {
  7. if (Thread.interrupted())
  8. throw new InterruptedException();
  9. if (tryAcquireShared(arg) < 0)
  10. doAcquireSharedInterruptibly(arg);
  11. }
  12. static final class FairSync extends Sync {
  13. private static final long serialVersionUID = 2014338818796000944L;
  14. FairSync(int permits) {
  15. super(permits);
  16. }
  17. protected int tryAcquireShared(int acquires) {
  18. for (;;) {
  19. // 等待队列该线程前面有节点在等待,阻塞
  20. if (hasQueuedPredecessors())
  21. return -1;
  22. // 无线程等待,许可证减少
  23. int available = getState();
  24. int remaining = available - acquires;
  25. if (remaining < 0 ||
  26. compareAndSetState(available, remaining))
  27. return remaining;
  28. }
  29. }
  30. }

释放许可证

  1. public void release() {
  2. sync.releaseShared(1);
  3. }
  4. public final boolean releaseShared(int arg) {
  5. if (tryReleaseShared(arg)) {
  6. doReleaseShared();
  7. return true;
  8. }
  9. return false;
  10. }
  11. protected final boolean tryReleaseShared(int releases) {
  12. // 当前许可加上释放的许可,CAS更新后返回true
  13. for (;;) {
  14. int current = getState();
  15. int next = current + releases;
  16. if (next < current) // overflow
  17. throw new Error("Maximum permit count exceeded");
  18. if (compareAndSetState(current, next))
  19. return true;
  20. }
  21. }

CyclicBarrier

通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。

  1. public class Test {
  2. public static void main(String[] args) {
  3. int N = 4;
  4. CyclicBarrier barrier = new CyclicBarrier(N);
  5. for(int i=0;i<N;i++)
  6. new Writer(barrier).start();
  7. }
  8. static class Writer extends Thread{
  9. private CyclicBarrier cyclicBarrier;
  10. public Writer(CyclicBarrier cyclicBarrier) {
  11. this.cyclicBarrier = cyclicBarrier;
  12. }
  13. @Override
  14. public void run() {
  15. System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
  16. try {
  17. Thread.sleep(5000); //以睡眠来模拟写入数据操作
  18. System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
  19. cyclicBarrier.await();
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }catch(BrokenBarrierException e){
  23. e.printStackTrace();
  24. }
  25. System.out.println("所有线程写入完毕,继续处理其他任务...");
  26. }
  27. }
  28. }

结果:

  1. 线程Thread-0正在写入数据...
  2. 线程Thread-3正在写入数据...
  3. 线程Thread-2正在写入数据...
  4. 线程Thread-1正在写入数据...
  5. 线程Thread-2写入数据完毕,等待其他线程写入完毕
  6. 线程Thread-0写入数据完毕,等待其他线程写入完毕
  7. 线程Thread-3写入数据完毕,等待其他线程写入完毕
  8. 线程Thread-1写入数据完毕,等待其他线程写入完毕
  9. 所有线程写入完毕,继续处理其他任务...
  10. 所有线程写入完毕,继续处理其他任务...
  11. 所有线程写入完毕,继续处理其他任务...
  12. 所有线程写入完毕,继续处理其他任务...

CountDownLatch 可以实现多个线程的协调,在所有指定线程完成任务后,主线程才继续任务,但是CountDownLatch 有个缺点就是,不可重用,每次都需要创建新的CountDownLatch 实例

源码分析

构造方法

当parties个线程准备就绪后即都调用await方法后,执行barrierAction

  1. public CyclicBarrier(int parties, Runnable barrierAction) {
  2. if (parties <= 0) throw new IllegalArgumentException();
  3. this.parties = parties;
  4. this.count = parties;
  5. this.barrierCommand = barrierAction;
  6. }

准备就绪后,啥事不干

  1. public CyclicBarrier(int parties) {
  2. this(parties, null);
  3. }

await方法

有一个我们常用的方法 await,还有一个内部类,Generation ,仅有一个参数,有什么作用呢?

在 CyclicBarrier 中,有一个 “代” 的概念,因为 CyclicBarrier 是可以复用的,那么每次所有的线程通过了栅栏,就表示一代过去了,就像我们的新年一样。当所有人跨过了元旦,日历就更新了。

CyclicBarrier 支持在所有线程通过栅栏的时候,执行一个线程的任务。

  1. private int dowait(boolean timed, long nanos)
  2. throws InterruptedException, BrokenBarrierException,
  3. TimeoutException {
  4. final ReentrantLock lock = this.lock;
  5. // 锁住
  6. lock.lock();
  7. try {
  8. // 当前代
  9. final Generation g = generation;
  10. // 如果这代损坏了,抛出异常
  11. if (g.broken)
  12. throw new BrokenBarrierException();
  13. // 如果线程中断了,抛出异常
  14. if (Thread.interrupted()) {
  15. // 将损坏状态设置为 true
  16. // 并通知其他阻塞在此栅栏上的线程
  17. breakBarrier();
  18. throw new InterruptedException();
  19. }
  20. // 获取下标
  21. int index = --count;
  22. // 如果是 0 ,说明到头了
  23. if (index == 0) {
  24. // tripped
  25. boolean ranAction = false;
  26. try {
  27. final Runnable command = barrierCommand;
  28. // 执行栅栏任务
  29. if (command != null)
  30. command.run();
  31. ranAction = true;
  32. // 更新一代,将 count 重置,将 generation 重置.
  33. // 唤醒之前等待的线程
  34. nextGeneration();
  35. // 结束
  36. return 0;
  37. } finally {
  38. // 如果执行栅栏任务的时候失败了,就将栅栏失效
  39. if (!ranAction)
  40. breakBarrier();
  41. }
  42. }
  43. for (;;) {
  44. try {
  45. // 如果没有时间限制,则直接等待,直到被唤醒
  46. if (!timed)
  47. trip.await();
  48. // 如果有时间限制,则等待指定时间
  49. else if (nanos > 0L)
  50. nanos = trip.awaitNanos(nanos);
  51. } catch (InterruptedException ie) {
  52. // g == generation >> 当前代
  53. // ! g.broken >>> 没有损坏
  54. if (g == generation && ! g.broken) {
  55. // 让栅栏失效
  56. breakBarrier();
  57. throw ie;
  58. } else {
  59. // 上面条件不满足,说明这个线程不是这代的.
  60. // 就不会影响当前这代栅栏执行逻辑.所以,就打个标记就好了
  61. Thread.currentThread().interrupt();
  62. }
  63. }
  64. // 当有任何一个线程中断了,会调用 breakBarrier 方法.
  65. // 就会唤醒其他的线程,其他线程醒来后,也要抛出异常
  66. if (g.broken)
  67. throw new BrokenBarrierException();
  68. // g != generation >>> 正常换代了
  69. // 一切正常,返回当前线程所在栅栏的下标
  70. // 如果 g == generation,说明还没有换代,那为什么会醒了?
  71. // 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。
  72. // 正是因为这个原因,才需要 generation 来保证正确。
  73. if (g != generation)
  74. return index;
  75. // 如果有时间限制,且时间小于等于0,销毁栅栏,并抛出异常
  76. if (timed && nanos <= 0L) {
  77. breakBarrier();
  78. throw new TimeoutException();
  79. }
  80. }
  81. } finally {
  82. lock.unlock();
  83. }
  84. }

发表评论

表情:
评论列表 (有 0 条评论,197人围观)

还没有评论,来说两句吧...

相关阅读