CountDownLatch源码分析

ゝ一纸荒年。 2023-10-01 16:42 76阅读 0赞

一、CountDownLatch简介

CountDownLatch(闭锁)是一个同步协助类,允许一个线程或多个线程阻塞等待,直到其他线程完成操作后,被阻塞的线程才会被唤醒,然后执行后面的业务逻辑。

CountDownLatch的构造方法需要给定一个计数值(count)进行初始化。CountDownLatch简单理解就是一个倒计数器,调用它的await()会被阻塞,直到它的计数值(count)为0时,被阻塞的线程才会被唤醒,而调用它的countDown()方法会对计数器值-1,直到计数值为0后,之前被阻塞的线程都会被唤醒,同时后续再调用await()方法时就不会再被阻塞,因为计数值count是一次性的,当它的值减为0后就不会再变化了,所以后续调用await()方法时不会被阻塞,而是立即返回

如果使用场景中,需要计数值能够被重置,可以考虑使用CyclicBarrier,关于CyclicBarrier会在下一篇文章进行详细介绍。

下图展示了CountDownLatch的原理:
watermark_type_d3F5LXplbmhlaQ_shadow_50_text_Q1NETiBAc2VybW9ubGl6aGk_size_13_color_FFFFFF_t_70_g_se_x_16

count数值为0时会去恢复线程A继续执行

二、使用场景

CountDownLatch一般用于多线程倒计时的计数器,强制它们等待其他一组线程(由CountDownLatch的初始化决定)任务执行完成。

CountDownLatch有两种使用场景:

  • 让多个线程等待
  • 让单个线程等待

2.1 多线程等待:并发线程同时执行

这类场景就很像田径远动员比赛,每一个运动员就是一个线程,然后发号员是一个主线程,每个远动员准备就绪后都需要等待发号员发令才能开始,发号员准备完毕后开枪,然后所有远动员同时开始开跑。

下面的代码就模拟了远动员比赛的场景:

三个远动员都阻塞在调用await()方法这里,当发号员准备两秒后发令,这是三个远动员同时往下进行

  1. CountDownLatch countDownLatch = new CountDownLatch(1);
  2. for (int num = 0; num < 3; num++){
  3. new Thread(() ->{
  4. try {
  5. //准备完毕,运动员都阻塞在这里,等待号令
  6. countDownLatch.await();
  7. System.out.println(Thread.currentThread().getName()+"开始跑……");
  8. } catch (InterruptedException e) {
  9. log.error(e);
  10. }
  11. }).start();
  12. }
  13. // 发号员准备发令
  14. Thread.sleep(2000);
  15. // 发令枪:发令
  16. countDownLatch.countDown();

2.2 单线程等待:并发线程完成后合并

在有些场景中,并发任务前后存在依赖关系,比如数据详情页需要同时调用多个接口获取数据,并发请求获取到数据后,需要对结果进行合并。这就是典型的并发线程完成后需要合并的场景。

等待合并的线程调用await()方法进行阻塞等待,当每个任务执行完成后都会调用countDown()方法,将计数值-1,当最后一个线程执行完任务后,计数值被减为0,这个时候就去唤醒等待汇总的线程。

  1. CountDownLatch countDownLatch = new CountDownLatch(3);
  2. for (int num = 0; num < 3; num++){
  3. new Thread(() ->{
  4. try {
  5. Thread.sleep(1000+ ThreadLocalRandom.current().nextInt(1000));
  6. System.out.println(Thread.currentThread().getName() + "finish task");
  7. countDownLatch.countDown();
  8. } catch (InterruptedException e) {
  9. log.error(e);
  10. }
  11. }).start();
  12. }
  13. countDownLatch.await();
  14. System.out.println("所有线程执行完成,对结果进行汇总");

三、源码分析

在理解CountDownLatch的源码之前,可以先看一下《Semaphore源码分析》这篇文章,后面理解CountDownLatch的源码就非常简单了。

CountDownLatch的源码相比ReentrantLock和Semaphore要简单很多,因为它就是一个到计数器的功能,所以也不会存在公平与非公平的概念,同时它的底层实现与Semaphore是一模一样的,只是它重写了tryAcquireShared()tryReleaseShared()两个方法

CountDownLatch同样也是基于AbstractQueuedSynchronizer实现的,所以的通过内部类Sync继承了AbstractQueuedSynchronizer,然后使用AQS的state属性来记录count的计数值。

CountDownLatch的整个结构是非常简单了,它只提供了以下几个方法:

  1. // 构造方法需要指定count值
  2. public CountDownLatch(int count) {
  3. if (count < 0) throw new IllegalArgumentException("count < 0");
  4. this.sync = new Sync(count);
  5. }
  6. public void await() throws InterruptedException {
  7. sync.acquireSharedInterruptibly(1);
  8. }
  9. // 阻塞等待timeout时长后,count值还没变为0,则不再等待,继续执行
  10. public boolean await(long timeout, TimeUnit unit)
  11. throws InterruptedException {
  12. return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  13. }
  14. // 将count值-1,直至为0
  15. public void countDown() {
  16. sync.releaseShared(1);
  17. }
  18. public long getCount() {
  19. return sync.getCount();
  20. }

3.1 阻塞等待

在上面方法的源码中,可以看到await()方法是调用的内部类SyncacquireSharedInterruptibly()方法,该方法在AQS中已经实现,Semaphore获取共享资源时也是调用该方法

  1. public final void acquireSharedInterruptibly(int arg)
  2. throws InterruptedException {
  3. if (Thread.interrupted())
  4. throw new InterruptedException();
  5. if (tryAcquireShared(arg) < 0)
  6. doAcquireSharedInterruptibly(arg);
  7. }

是否需要进行阻塞的核心在于tryAcquireShared(),只要这个方法的返回值小于-1,就会调用doAcquireSharedInterruptibly()方法将线程进行阻塞

而CountDownLatch的内部类Sync中对tryAcquireShared()方法的实现也非常简单,只要count的数值不等于0,就返回-1,表示需要进行阻塞。

  1. protected int tryAcquireShared(int acquires) {
  2. return (getState() == 0) ? 1 : -1;
  3. }

然后就是去调用doAcquireSharedInterruptibly()对线程进行阻塞,该方法的详细介绍在Semaphore源码分析的文章中已经做了详细的介绍。

3.2 唤醒执行

CountDownLatch中因调用awati()方法被阻塞的线程,能否被唤醒完全取决于countDown()方法,该方法会使count的计数值-1。

CountDownLatch的countDown()方法调用的是SyncreleaseShared()方法,同样,该方法也在AQS中已经实现了

  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) {
  3. doReleaseShared();
  4. return true;
  5. }
  6. return false;
  7. }

能不能调用doReleaseShared()唤醒线程,取决于CountDownLatch中SynctryReleaseShared()方法

该方法中,先判断count的值是否为0,如果为0说明已经唤醒过了,不需要重复唤醒,所以直接返回false

如果不为0,就利用自旋+CAS将state的属性值-1,修改后的count值为0,就会返回true

  1. protected boolean tryReleaseShared(int releases) {
  2. // Decrement count; signal when transition to zero
  3. for (;;) {
  4. int c = getState();
  5. if (c == 0)
  6. return false;
  7. int nextc = c-1;
  8. if (compareAndSetState(c, nextc))
  9. return nextc == 0;
  10. }
  11. }

返回true之后,就会调用AQS的doReleaseShared()唤醒线程,该方法也已经在Semaphore的源码中详细介绍过了。

从上面CountDownLatch的源码可以看出,它完全就是借助Semaphore的特性(被唤醒的线程会去尝试唤醒后面的线程)来唤醒所有被阻塞的线程

总结:从功能上来看,CountDownLatch的功能与Thread.join()方法非常相似,都是等待其他线程执行完成之后再执行后续的逻辑,但在实现上,CountDownLatch提供了比join()方法更灵活的API。同时CountDownLatch即可以手动控制在一个线程调用多次countDown()方法,也可以在多个线程调用多次。join()方法的实现原理是不停检查join线程是否存在,只要存活就让当前线程一直阻塞。

发表评论

表情:
评论列表 (有 0 条评论,76人围观)

还没有评论,来说两句吧...

相关阅读

    相关 并发工具类CountDownLatch分析

    > 同步工具类可以使任何一种对象,只要该对象可以根据自身的状态来协调控制线程的控制流。阻塞队列可以作为同步工具类,其他类型的同步工具类还包括:信号量(Semaphore)、栅栏