Java中的并发工具类 2022-05-28 08:41 108阅读 0赞 ## 等待多线程完成的 CountDownLatch ## CountDownLatch 允许一个或多个线程等待其他线程完成操作。 需求:我们需要解析一个 Excel 里多个 sheet 的数据,此时可以考虑使用多线程,每个线程解析一个 sheet 里的数据,等待所有的 sheet 都解析完之后,程序需要提示解析完成。 * 解决方案一:最简单的做法是使用 join() 方法,让当前执行线程等待 join 线程执行结束。其实现原理是不停的检查 join 线程是否存活,如果 join 线程存活则让当前线程等待。 public class JoinTest { public static void main(String[] args) throws InterruptedException { Thread thread0 = new Thread(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " 正在处理文件"); } }, "线程:" + 0); Thread thread1 = new Thread(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " 正在处理文件"); } }, "线程:" + 1); thread0.start(); thread1.start(); thread0.join(); thread1.join(); System.out.println("解析完成"); } } * 解决方案二:使用 CountDownLatch 创建带计数器的闭锁,计数器为主线程需要等待的 N 个点。当调用 countDown 方法时,N 就会减 1,await 方法会阻塞当前线程,直到 N 编程 0。由于 countDowm 方法可以用在任何地方,所以这里说的 N 个点,可以是 N 个线程,也可以是 1 个线程里的 N 个步骤。 import java.util.concurrent.CountDownLatch; public class CountDownLatchTest { // 创建闭锁,CountDownLatch 的构造函数接收一个 int 类型的参数作为计数器,也就是说主线程需要等待计数器减少到 0 时,才能继续执行 static CountDownLatch latch = new CountDownLatch(3); public static void main(String[] args) throws InterruptedException { // 创建 3 个线程 for (int i = 0; i < 3; i++) { new Thread(new Runnable() { public void run() { try { // 等待锁释放 latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 正在处理文件"); } }, "线程:" + i).start(); // 计数器减 1 latch.countDown(); } TimeUnit.SECONDS.sleep(10); System.out.println("解析完成"); } } ## 同步屏障 CyclicBarrier ## CyclicBarrier 默认的构造方法是 CyclicBarrier (int parties),其参数表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的场景 * 简单用法,保证调用 await 方法的线程的数量等于定义屏障拦截的线程数 import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CyclicBarrierTest { // 创建两个拦截的屏障 static CyclicBarrier c = new CyclicBarrier(2); public static void main(String[] args) throws InterruptedException, BrokenBarrierException { new Thread(new Runnable() { @Override public void run() { try { // 线程 1 到达 c.await(); } catch (InterruptedException | BrokenBarrierException e) { } System.out.println(1); } }).start(); // 主线程到达 c.await(); System.out.println(2); } } * 高级用法,构造方法 CyclicBarrier(int N , Runnable runnable),创建一个带有 N 个拦截并且 完成 N 个拦截后,调用 runnable 线程 需求:用一个 excel 保存了 4 个sheet 的数据,每个 sheet 保存一些数字,分别计算每个 sheet 中的总和,然后合计 4 个 sheet 的总和。 import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CyclicBarrierTest implements Runnable { CyclicBarrier c = new CyclicBarrier(4, this); ConcurrentHashMap<String, Integer> hashMap = new ConcurrentHashMap<String, Integer>(); Random random = new Random(); ExecutorService executor = Executors.newFixedThreadPool(4); public void doCount() { for (int i = 0; i < 4; i++) { executor.execute(new Runnable() { @Override public void run() { // 用随机数代替 sheet 数据 hashMap.put(Thread.currentThread().getName(), random.nextInt(100)); try { c.await(); } catch (InterruptedException | BrokenBarrierException e) { } } }); } } @Override public void run() { Integer count = 0; for (String name : hashMap.keySet()) { System.out.println(name + " 处理的数据结果为:" + hashMap.get(name)); count = count + hashMap.get(name); } System.out.println("合计:" + count); } public static void main(String[] args) throws Exception { CyclicBarrierTest cbt = new CyclicBarrierTest(); cbt.doCount(); cbt.executor.shutdown(); } } **CyclicBarrier 与 CountDownLatch 的区别**: 1. CountDownLatch 计数器只能使用一次,CyclicBarrier 的计数器可以使用 reset() 方法重置,所以 CyclicBarrier 能处理更为复杂的业务场景,例如,如果计算发生错误,可以充值计数器,并让线程重新执行一次 2. 计数器的释放不同,CountDownLatch是手动释放,CyclicBarrier是自动释放 3. CyclicBarrier高级用法中,当所有的线程到达阻塞点时,会优先执行barrierAction,执行完后再释放所有拦截,如果barrierAction中是永久阻塞的,那么拦截的其他线程永远不能执行 4. CyclicBarrier高级用法中,如果你的业务代码在await前,那么可以在barrierAction中进行统计数据等操作;如果业务代码在await后,那么会优先执行barrierAction,然后释放所有拦截,执行你的业务代码 5. CyclicBarrier 中还提供了其他的方法,通过 getNumberWaitting 可以获得阻塞的线程数量,也可以用 isBroken() 判断阻塞的线程是否被中断 ## 线程间交换数据的 Exchanger ## Exchanger 是用于线程间的数据交换,它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过 exchange 方法交换数据,如果第一个线程先执行 exchange 方法,它会一直等待第二个线程也执行 exchange 方法,当两个线程都到达同步点,这两个线程就可以交换数据。 Exchanger 可以用于遗传算法,即选取两个人作为交配对象,交换两人的数据,并使用交叉规则得出 2 个交配结果。Exchanger 也可以用于校对工作,比如我们需要将纸质银行流水通过人工录入的方式录入电子银行流水,为了避免错误,采用 AB 岗两人进行录入,录入到 excel 之后,系统需要加载这两个 Excel,并对两个 Excel 进行校对。 import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class ExchangerTest { private static final Exchanger<String> exchanger = new Exchanger<String>(); private static ExecutorService threadPool = Executors.newFixedThreadPool(2); public static void main(String[] args) { threadPool.execute(new A(exchanger)); threadPool.execute(new B(exchanger)); threadPool.shutdown(); } } class A implements Runnable { private Exchanger<String> exchanger; public A(Exchanger<String> exchanger) { this.exchanger = exchanger; } @Override public void run() { System.out.println("this is A platform"); try { // 设置最大等待时间为 1s String exchange = exchanger.exchange("这是A的数据", 1, TimeUnit.SECONDS); System.out.println("A收到的数据:" + exchange); } catch (InterruptedException e) { } catch (TimeoutException e) { System.out.println("交换数据超时"); } } } class B implements Runnable { private Exchanger<String> exchanger; public B(Exchanger<String> exchanger) { this.exchanger = exchanger; } @Override public void run() { System.out.println("this is B platform"); try { Thread.sleep(1000L); // 设置最大等待时间为 1s String exchange = exchanger.exchange("这是B的数据", 1, TimeUnit.SECONDS); System.out.println("B收到的数据:" + exchange); } catch (InterruptedException e) { } catch (TimeoutException e) { System.out.println("交换数据超时"); } } }
相关 聊聊JAVA中的并发工具类 在JDK的并发包里提供了几个非常有用的并发工具类。CountDownLatch、CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,Exchang 水深无声/ 2023年09月23日 17:56/ 0 赞/ 12 阅读
相关 彻底理解Java并发:Java并发工具类 > 本篇内容包括:Java 并发工具类的介绍、使用方式与 Demo,包括了 CountDownLatch(线程计数器)、CyclicBarrier(回环栅栏)、Semaphor 布满荆棘的人生/ 2023年09月23日 13:07/ 0 赞/ 13 阅读
相关 Java中的并发工具类 等待多线程完成的 CountDownLatch CountDownLatch 允许一个或多个线程等待其他线程完成操作。 需求:我们需要解析一个 Exc ゞ 浴缸里的玫瑰/ 2022年05月28日 08:41/ 0 赞/ 109 阅读
相关 Java并发——同步工具类 在java 1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch,CyclicBarrier和Semaphore 同步工具类包括信号量 超、凢脫俗/ 2022年05月18日 08:56/ 0 赞/ 162 阅读
相关 java中的并发工具类 Semaphore与Exchanger ![这里写图片描述][70] [70]: /images/20220517/1e3e4fd0a9b04cdd944c2a503b 旧城等待,/ 2022年05月17日 05:17/ 0 赞/ 97 阅读
相关 Java并发工具类 在Java1.5中,提供了一些很有用的辅助类来帮助我们进行并发编程,比如CountDownLatch\[计数器\],CyclicBarrier\[循环屏障\],Semaphor 迈不过友情╰/ 2022年05月08日 13:24/ 0 赞/ 157 阅读
相关 Java并发工具类Phaser Phaser由java7中推出,是Java SE 7中新增的一个使用同步工具,在功能上面它与[CyclicBarrier][]、[CountDownLatch][]有些重叠,但 忘是亡心i/ 2022年04月04日 03:22/ 0 赞/ 150 阅读
相关 Java并发工具类的使用 在JDK中的`java.util.concurrent`包里提供了几个实用的并发工具类,下面我们一起来了解下。 CountDownLatch 功能: CountDown 我会带着你远行/ 2022年02月28日 14:42/ 0 赞/ 132 阅读
相关 Java 中的并发工具类 From: [https://blog.wuwii.com/juc-utils.html][https_blog.wuwii.com_juc-utils.html] `jav Dear 丶/ 2022年02月25日 19:44/ 0 赞/ 151 阅读
相关 Java并发工具类 - CountDownLatch 一.CountDownLatch用法 1、简介 CountDownLatch是Java1.5之后引入的Java并发工具类,放在java.util.concurr 电玩女神/ 2022年01月23日 09:01/ 0 赞/ 152 阅读
还没有评论,来说两句吧...