Java8 并发之原子变量与ConcurrentMap 2022-06-12 09:29 289阅读 0赞 # 前言 # > 点击查看原文 [原文地址][Link 1] * 第一部分:[线程(Thread)与执行体(Executors)][Thread_Executors] * 第二部分:[同步(Synchronization)与锁(Locks)][Synchronization_Locks] * 第三部分:原子变量与ConcurrentMap > 欢迎浏览Java 8 并发教程的第二部分.本教程致力于使用简单而易于理解的代码实例来教授你关于java8中并发编程一些知识。接下来你会学到在并发编程中使用`synchronized`关键字,`锁`,`信号`来同步可变的共享变量。 > > Java 并发API于Java5首次加入,在后来发布的版本中不断迭代完善。本文中出现的大部分概念也适合java8以下的版本,不单单针对java8。但是本文将大量使用java8 中的 `lambda`表达式以及新的并发功能,如果你对`lambda`表达式不是很熟悉的话可以查看这个教程:[Java8 教程][Java8] > > 本文的代码示例中使用了两个帮助函数:`sleep(seconds)` 和`stop(executor)` public static void stop(ExecutorService executor) { try { executor.shutdown(); executor.awaitTermination(60, TimeUnit.SECONDS); } catch (InterruptedException e) { System.err.println("termination interrupted"); } finally { if (!executor.isTerminated()) { System.err.println("killing non-finished tasks"); } executor.shutdownNow(); } } public static void sleep(int seconds) { try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { throw new IllegalStateException(e); } } ### AtomicInteger ### `java.concurrent.atomic`包里面包含了很多非常有用的进行原子操作的类。当一个操作的原子性的,我们就可以在多线程中安全的执行这个操作而不用使用`synchronized`关键字或者锁。 原子类内部严重依赖 **比较-交换**这种指令([compare-and-swap (CAS)][compare-and-swap _CAS]),一种被大部分现代CPU直接支持的原子操作指令。这些指令通常比使用锁同步的指令快的多。所以当我们仅仅是想在多线程的情况下修改一个可变变量的值,强烈建议优先使用原子类,而不是锁。 下面让我们一起看一下`AtomicInteger`类的几个例子 AtomicInteger atomicInt = new AtomicInteger(0); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> executor.submit(atomicInt::incrementAndGet)); stop(executor); System.out.println(atomicInt.get()); // => 1000 上例使用`AtomicInteger`替代`Integer`使的我们可以在不用同步的情况下依然可以线程安全方式同步使一个整形变量数值递增。`incrementAndGet()`方法是原子操作的,所以我们可以在多线程的情况下安全的调用这个方法。 AtomicInteger 支持很多种原子操作。`updateAndGet()`方法接受一个Lambda表达式,在这个lambda表达式中我们可以对整形变量做各种处理。 AtomicInteger atomicInt = new AtomicInteger(0); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> { Runnable task = () -> atomicInt.updateAndGet(n -> n + 2); executor.submit(task); }); stop(executor); System.out.println(atomicInt.get()); // => 2000 `accumulateAndGet()`方法接受一个`IntBinaryOperator`类型的Lambda表达式。下面的例子演示了使用此方法并发的将0到1000的数相加。 AtomicInteger atomicInt = new AtomicInteger(0); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> { Runnable task = () -> atomicInt.accumulateAndGet(i, (n, m) -> n + m); executor.submit(task); }); stop(executor); System.out.println(atomicInt.get()); // => 499500 其他比较有用的原子类还有 [AtomicBoolean][], [ AtomicLong][AtomicLong], [AtomicReference][] ### LongAdder ### `LongAdder`类可以代替`AtomicLong`完成并发求和的功能。 ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> executor.submit(adder::increment)); stop(executor); System.out.println(adder.sumThenReset()); // => 1000 类似于原子数字类, LongAdder也 提供了`add()`和`increment()`方法,而且也是线程安全的。不过他们之间也有所不同,原子数字类是将结果累加到一个变量中,而LongAdder为了降低线程的争用而内部维护了一个变量集合。我们可以通过调用`sum()`或者`sumThenReset()`方法来获取真实结果。 这个类在更新频率大于读取频率的多线程场景下优于原子数字类。而获取分析数据就属于这种场景,例如你要统计服务器的访问量.`LongAdder`类的缺点是占用内存较高,这是由于其在内存中维护了一个变量集合。 ### LongAccumulator ### LongAccumulator 是LongAdder的一种广义版本。`LongAccumulator`结合`LongBinaryOperator`类型的Lambda表达式使用,如下例所示 LongBinaryOperator op = (x, y) -> 2 * x + y; LongAccumulator accumulator = new LongAccumulator(op, 1L); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 10) .forEach(i -> executor.submit(() -> accumulator.accumulate(i))); stop(executor); System.out.println(accumulator.getThenReset()); // => 2539 我们使用函数 `2 * x + y`以及初始值1创建了LongAccumlator. 每次调用`accumulate(i)`时,当前结果和`i`的值都会作为参数传入lambda表达式中。 与`LongAdder`类似,`LongAccumlator`在内部也维护了一个变量的集合用来降低线程之间的争用。 ### ConcurrentMap ### `ConcurrentMap`接口扩展至`map`接口,并且邓毅了很多非常有用的同步集合类型。Java 8 通过向这个接口添加新的方法进入了函数编程。 接下来的代码片段,我们准备使用下面这个简单的map来说明这些新方法: ConcurrentMap<String, String> map = new ConcurrentHashMap<>(); map.put("foo", "bar"); map.put("han", "solo"); map.put("r2", "d2"); map.put("c3", "p0"); `forEach()`方法接受一个`BiConsumer`类型的lambda表达式,这个表达式需要连个参数,一个是map的key,一个value。这个方法可以替代同步map中使用entries迭代的 for-each 循环。 map.forEach((key, value) -> System.out.printf("%s = %s\n", key, value)); `putIfAbsent()`方法只有当此map中不存在使用给定key获取的value时才压入一个新值。在`ConcurrentHashMap`中,这个方法被实现为线程安全的,所以在多线程并发访问下不需要同步。 String value = map.putIfAbsent("c3", "p1"); System.out.println(value); // p0 `getOrDefault()`方法当map中存在给定key值的value时就返回那个value,不然就返回默认值 String value = map.getOrDefault("hi", "there"); System.out.println(value); // there `replaceAll()`方法接受一个`BiFunction`类型的lambda表达式,这种表达式接受两个参数,返回一个value. 下面的代码将key值为”r2”的value替换成“d3”。 map.replaceAll((key, value) -> "r2".equals(key) ? "d3" : value); System.out.println(map.get("r2")); // d3 如果你只是想改变map中某一个`entry`,则可以使用 `compute()`方法。这个方法接受两个参数,一个是用于计算的key,另一个是`BiFunction`类型的lambda表达式,这个表达式负责转化工作。下面的代码将key是“foo”的value转化成了value+value。 map.compute("foo", (key, value) -> value + value); System.out.println(map.get("foo")); // barbar `compute()`方法还存在两个变种:`computeIfAbsent()`和`computeIfPresent()`. 最后介绍一个方法`merge()`,这个方法可以将一个新值合并到map中已经存在的value上面去。这个方法接受三个参数,第一个是要合并的entry的key值,第二个是要合并的新值,第三个是一个`BiFunction`类型的lambda表达式,这个表达式负责合并规则。例如下面的例子将“boo”合并到了key值为“foo”的value中去了。 map.merge("foo", "boo", (oldVal, newVal) -> newVal + " was " + oldVal); System.out.println(map.get("foo")); // boo was foo ### ConcurrentHashMap ### 上面介绍的所有方法都是`ConcurrentMap`接口中的方法,所以所有实现了此接口的类都可以使用这些方法。现在我们介绍一个最重要的实现类`ConcurrentHashMap`,其中加入了很多关于并发操作的方法。 在Java 8中, 类似于平行流(parallel streams),这些方法通过`ForkJoinPool.commonPool()`获取了一个`ForkJoinPool`线程池。这个线程池根据你电脑的CPU核数来预设平行参数。例如我们的CPU是4核的,JVM就帮我预设了3平行度。 System.out.println(ForkJoinPool.getCommonPoolParallelism()); // 3 我们可以通过设置来改变着平行度 -Djava.util.concurrent.ForkJoinPool.common.parallelism=5 我们使用下面的这个map来演示相关的方法 ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(); map.put("foo", "bar"); map.put("han", "solo"); map.put("r2", "d2"); map.put("c3", "p0"); Java 8 引入了3种类型的平行操作:`forEach`,`search`,`reduce`.每一种操作都有四种类型,四种类型区别去参入参数的类型上:key、values、entries、key-value pair. 所有操作的第一个参数都是`parallelismThreshold`.这个参数表明启动平行操作的最小集合数量,例如设置为500,只有当要操作的集合里面item的数量大于等于500才启用平行操作,不然都是单线程线性执行。 #### ForEach #### map.forEach(1, (key, value) -> System.out.printf("key: %s; value: %s; thread: %s\n", key, value, Thread.currentThread().getName())); // key: r2; value: d2; thread: main // key: foo; value: bar; thread: ForkJoinPool.commonPool-worker-1 // key: han; value: solo; thread: ForkJoinPool.commonPool-worker-2 // key: c3; value: p0; thread: main #### Search #### 下面的例子演示了`search()`方法接受一个`BiFunction`类型的lambda表达式,如果有符合条件的值则返回这个值,不然则返回null. String result = map.search(1, (key, value) -> { System.out.println(Thread.currentThread().getName()); if ("foo".equals(key)) { return value; } return null; }); System.out.println("Result: " + result); // ForkJoinPool.commonPool-worker-2 // main // ForkJoinPool.commonPool-worker-3 // Result: bar 值的注意的是:当首次达到条件,后面就不会再继续搜索了,而且`ConcurrentHashMap`是无序的,所以如果一个map中存在好几个符合搜索条件的值,返回结果不一定是第一个。 #### Reduce #### `reduce()`方法接受两个`BiFunction`类型的lambda表达式作为参数,下面的例子演示了:第一个lambda表达式将没一个key-value 对连接起来成为一个值,第二个表达式又将第一个表达式连接起来的值再连接到一起。 String result = map.reduce(1, (key, value) -> { System.out.println("Transform: " + Thread.currentThread().getName()); return key + "=" + value; }, (s1, s2) -> { System.out.println("Reduce: " + Thread.currentThread().getName()); return s1 + ", " + s2; }); System.out.println("Result: " + result); // Transform: ForkJoinPool.commonPool-worker-2 // Transform: main // Transform: ForkJoinPool.commonPool-worker-3 // Reduce: ForkJoinPool.commonPool-worker-3 // Transform: main // Reduce: main // Reduce: main // Result: r2=d2, c3=p0, han=solo, foo=bar 教程的代码托管在GitHub上[地址][Link 2] [Link 1]: http://winterbe.com/posts/2015/05/22/java8-concurrency-tutorial-atomic-concurrent-map-examples/ [Thread_Executors]: http://blog.csdn.net/shusheng0007/article/details/74352005 [Synchronization_Locks]: http://blog.csdn.net/shusheng0007/article/details/74838604 [Java8]: http://winterbe.com/posts/2014/03/16/java-8-tutorial/ [compare-and-swap _CAS]: https://en.wikipedia.org/wiki/Compare-and-swap [AtomicBoolean]: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/AtomicBoolean.html [AtomicLong]: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/AtomicLong.html [AtomicReference]: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/AtomicReference.html [Link 2]: https://github.com/winterbe/java8-tutorial
相关 并发系列之CAS与原子操作 并发系列之CAS与原子操作 1、CAS的概念 2、Java实现CAS的原理 - Unsafe类 3、原子操作-AtomicInte r囧r小猫/ 2022年11月18日 02:12/ 0 赞/ 146 阅读
相关 【并发】java并发之可见性与原子性 一、定义 1.可见性 在多核处理器中,如果多个线程对一个变量(假设)进行操作,但是这多个线程有可能被分配到多个处理器中运行,那么编译器会对代码进行优化,当线程要处理该 梦里梦外;/ 2022年07月16日 15:51/ 0 赞/ 202 阅读
相关 Java 8 并发之同步与锁 前言 > 点击查看原文: [原文地址][Link 1] 第一部分:[线程(Thread)与执行体(Executors)][Thread_Executors] 川长思鸟来/ 2022年06月13日 13:52/ 0 赞/ 203 阅读
相关 Java8 并发之原子变量与ConcurrentMap 前言 > 点击查看原文 [原文地址][Link 1] 第一部分:[线程(Thread)与执行体(Executors)][Thread_Executors] 小灰灰/ 2022年06月12日 09:29/ 0 赞/ 290 阅读
相关 Java并发编程---并发类容器(ConcurrentMap容器) 一.背景前奏 jdk5.0以后提供了多种并发类容器来替代同步类容器从而改善性能.同步类容器的状态都是串行化的.他们虽然实现了线程安全.但是严重降低了并发性,在多线程环境时, 我不是女神ヾ/ 2022年06月02日 02:57/ 0 赞/ 370 阅读
相关 【java并发编程】原子变量和CAS 我们知道锁的实现可以分为乐观锁和悲观锁,具体可以参照我的这篇文章[数据库的锁机制及原理][Link 1]。java中也有对应的乐观锁和悲观锁的实现,在之前的文章中我们讨论了[R 小鱼儿/ 2022年05月25日 01:43/ 0 赞/ 208 阅读
相关 Java并发编程之原子性Synchronized(九) 一.前言 线程安全是并发编程中的重要关注点,应该注意到的是,造成线程安全问题的主要诱因有两点,一是存在共享数据(也称临界资源),二是存在多条线程共同操作共享数据。因此为了解 谁践踏了优雅/ 2022年04月15日 05:29/ 0 赞/ 233 阅读
相关 Java并发编程之原子性Atomic(八) 一.Atomic包简介 Atomic包中的类基本的特性就是在多线程环境下,当有多个线程同时对单个(包括基本类型及引用类型)变量进行操作时,具有排他性,即当多个线程同时对 旧城等待,/ 2022年04月15日 02:41/ 0 赞/ 224 阅读
相关 Java并发编程之原子变量 原子变量最主要的一个特点就是所有的操作都是原子的,synchronized关键字也可以做到对变量的原子操作。只是synchronized的成本相对较高,需要获取锁对象,释放 ゞ 浴缸里的玫瑰/ 2022年03月18日 14:30/ 0 赞/ 205 阅读
相关 java并发:原子类之AtomicLong 原子类之AtomicLong java线程中的操作,需要满足原子性、可见性等原则,比如i++这样的操作不具备原子性, A线程读取了i,另一个线程执行i++,A线程再执 约定不等于承诺〃/ 2021年11月01日 09:42/ 0 赞/ 284 阅读
还没有评论,来说两句吧...