回环屏障CyclicBarrier详解

以你之姓@ 2023-01-07 15:23 258阅读 0赞

CountDownLatch的计数器是一次性的,也就是等到计数器值变为0后,再调用CountDownLatch的await和countdown方法都会立刻返回,这就起不到线程同步的效果了。CyclicBarrier可以让一组线程全部达到一个状态后再全部同时执行。这里之所以叫做回环是因为所有等待线程执完毕,并重置CyclicBarrier的状态后它可以被重用。之所以叫做屏障是因为线程调用await方法后就会被阻塞,这个阻塞点就成为屏障点,等所有线程都调用了await方法后,线程就会冲破屏障,继续向下运行。

1、案例介绍

使用两个线程去执行一个被分解的任务A,当两个线程把自己的任务都执行完毕后再对它们的结果进行汇总处理。

  1. public class CycleBarrierTest1 {
  2. // 创建一个CyclicBarrier实例,添加一个所有子线程全部到达屏障后执行的任务
  3. private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
  4. @Override
  5. public void run() {
  6. System.out.println(Thread.currentThread() + "task1 merge result");
  7. }
  8. });
  9. public static void main(String[] args) throws InterruptedException {
  10. // 创建一个线程数固定为2的线程池
  11. ExecutorService executorService = Executors.newFixedThreadPool(2);
  12. // 将线程A添加到线程池
  13. executorService.submit(new Runnable() {
  14. @Override
  15. public void run() {
  16. try {
  17. System.out.println(Thread.currentThread() + "task1-1");
  18. System.out.println(Thread.currentThread() + "enter in barrier");
  19. cyclicBarrier.await();
  20. System.out.println(Thread.currentThread() + "enter out barrier");
  21. }catch (Exception e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. });
  26. // 将线程B添加到线程池
  27. executorService.submit(new Runnable() {
  28. @Override
  29. public void run() {
  30. try {
  31. System.out.println(Thread.currentThread() + "task1-2");
  32. System.out.println(Thread.currentThread() + "enter in barrier");
  33. cyclicBarrier.await();
  34. System.out.println(Thread.currentThread() + "enter out barrier");
  35. }catch (Exception e) {
  36. e.printStackTrace();
  37. }
  38. }
  39. });
  40. // 关闭线程池
  41. executorService.shutdown();
  42. }
  43. }

执行程序:

  1. Thread[pool-1-thread-1,5,main]task1-1
  2. Thread[pool-1-thread-2,5,main]task1-2
  3. Thread[pool-1-thread-1,5,main]enter in barrier
  4. Thread[pool-1-thread-2,5,main]enter in barrier
  5. Thread[pool-1-thread-2,5,main]task1 merge result
  6. Thread[pool-1-thread-2,5,main]enter out barrier
  7. Thread[pool-1-thread-1,5,main]enter out barrier
  • 如上代码创建了一个CyclicBarrier对象,其第一个参数为计数器初始值,第二个参数Runable是当计数器值为0时需要执行的任务。
  • 在main函数里面首先创建了一个大小为2的线程池,然后添加两个子任务到线程池,每个子任务在执行自己的逻辑后会调用await方法。
  • 一开始计数器值为2,当第一个线程调用await方法时,计数器值会递减为1.由于计数器值不为0,所以当前线程就到了屏障点而被阻塞。然后第二个线程调用await时,会进入屏障,计数器值也会递减,现在计数器值为0,这时就会去执行CyclicBarrier构造函数中的任务,执行完毕后退出屏障点,并且唤醒被阻塞的第二个线程,这时候第一个线程也会退出屏障点继续向下执行。

2、案例2

假设一个任务由阶段1、阶段2和阶段3组成,每个线程要串行地执行阶段1、阶段2和阶段3,当多个线程执行该任务时,必须要保证所有线程的阶段1全部完成后才能进入阶段2执行,当所有线程的阶段2全部完成后才能进入阶段3执行:

  1. import java.util.concurrent.CyclicBarrier;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. public class CycleBarrierTest2 {
  5. // 创建一个CyclicBarrier实例
  6. private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
  7. public static void main(String[] args) throws InterruptedException {
  8. ExecutorService executorService = Executors.newFixedThreadPool(2);
  9. // 将线程A添加到线程池
  10. executorService.submit(new Runnable() {
  11. @Override
  12. public void run() {
  13. try {
  14. System.out.println(Thread.currentThread() + "step1");
  15. cyclicBarrier.await();
  16. System.out.println(Thread.currentThread() + "step2");
  17. cyclicBarrier.await();
  18. System.out.println(Thread.currentThread() + "step3");
  19. }catch (Exception e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. });
  24. // 将线程B添加到线程池
  25. executorService.submit(new Runnable() {
  26. @Override
  27. public void run() {
  28. try {
  29. System.out.println(Thread.currentThread() + "step1");
  30. cyclicBarrier.await();
  31. System.out.println(Thread.currentThread() + "step2");
  32. cyclicBarrier.await();
  33. System.out.println(Thread.currentThread() + "step3");
  34. }catch (Exception e) {
  35. e.printStackTrace();
  36. }
  37. }
  38. });
  39. // 关闭线程池
  40. executorService.shutdown();
  41. }
  42. }

执行程序:

  1. Thread[pool-1-thread-2,5,main]step1
  2. Thread[pool-1-thread-1,5,main]step1
  3. Thread[pool-1-thread-1,5,main]step2
  4. Thread[pool-1-thread-2,5,main]step2
  5. Thread[pool-1-thread-2,5,main]step3
  6. Thread[pool-1-thread-1,5,main]step3
  • 如上代码,每个子线程在执行完阶段1后都调用了await方法,等到所有线程都到达屏障点后才会一块往下执行,这就保证了所有线程都完成了阶段1后才会开始执行阶段2.然后在阶段2后面调用了await方法,这保证了所有线程都完成了阶段2后,才开始阶段3的执行。这个功能使用单个CountDownLatch是无法完成的。

3、实现原理

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpX3dfY2g_size_16_color_FFFFFF_t_70

由以上类图可知,CyclicBarrier基于独占锁实现,本质底层还是基于AQS的。parties用来记录线程个数,这里表示多少线程调用await后,所有线程才会冲破屏障继续往下运行。

count一开始等于parties,每当有线程调用await方法就递减1,当count为0时就表示所有线程都到了屏障点。

4、主要方法

4.1、int await()方法

当线程调用CyclicBarrier的该方法时会被阻塞,直到满足下面条件之一才会返回:

  • parties个线程都调用了await()方法,也就是线程都到了屏障点;
  • 其他线程调用了interrupt()方法中断了当前线程,则当前线程会抛出InterruptedException异常而返回;
  • 与当前屏障点关联的Generation对象的broken标志被设置为true时,会抛出BrokenBarrierException异常,然后返回;

4.2、boolean await(long timeout, TimeUnit unit)方法

当前线程调用CyclicBarrier的该方法时会被阻塞,直到满足下面条件之一才会返回:

  • parties个线程都调用了await()方法,也就是线程都到了屏障点,这时候返回true;
  • 设置的超时时间到了后返回false;
  • 其他线程调用当前线程的interrupt()方法中断了当前线程,则当前线程会抛出InterruptedException异常然后返回;
  • 与当前屏障点关联的Generation对象的broken标志被设置为true时,会抛出BrokenBarrierException异常,然后返回。

5、总结

CyclicBarrier与CountDownLatch的不同在于,前者可以复用,并且前者特别适合分段任务有序执行的场景。CyclicBarrier通过独占锁ReentrantLock实现计数器原子性更新,并使用条件变量队列来实现线程同步。

发表评论

表情:
评论列表 (有 0 条评论,258人围观)

还没有评论,来说两句吧...

相关阅读

    相关 设备

    你有没有想过在Linux的文件管理系统中再创建一个文件系统,就像在Windows中创建一个新的磁盘分区那样(但其实并不必非得直接那么做)。这时你就需要用到回环设备(loop d