Atomic Variables and ConcurrentMap 2022-05-14 01:51 312阅读 0赞 ### AtomicInteger ### The package `java.concurrent.atomic` contains many useful classes to perform atomic operations. An operation is atomic when you can safely perform the operation in parallel on multiple threads without using the `synchronized` keyword or locks as shown in my [ previous tutorial][previous tutorial] . Internally, the atomic classes make heavy use of [ compare-and-swap][compare-and-swap] (CAS), an atomic instruction directly supported by most modern CPUs. Those instructions usually are much faster than synchronizing via locks. So my advice is to prefer atomic classes over locks in case you just have to change a single mutable variable concurrently. Now let’s pick one of the atomic classes for a few examples: `AtomicInteger` AtomicInteger atomicInt = new AtomicInteger(0); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> executor.submit(atomicInt::incrementAndGet)); stop(executor); System.out.println(atomicInt.get()); // => 1000 By using `AtomicInteger` as a replacement for `Integer` we’re able to increment the number concurrently in a thread-safe manor without synchronizing the access to the variable. The method `incrementAndGet()` is an atomic operation so we can safely call this method from multiple threads. AtomicInteger supports various kinds of atomic operations. The method `updateAndGet()` accepts a lambda expression in order to perform arbitrary arithmetic operations upon the integer: AtomicInteger atomicInt = new AtomicInteger(0); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> { Runnable task = () -> atomicInt.updateAndGet(n -> n + 2); executor.submit(task); }); stop(executor); System.out.println(atomicInt.get()); // => 2000 The method `accumulateAndGet()` accepts another kind of lambda expression of type `IntBinaryOperator` . We use this method to sum up all values from 0 to 1000 concurrently in the next sample: AtomicInteger atomicInt = new AtomicInteger(0); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> { Runnable task = () -> atomicInt.accumulateAndGet(i, (n, m) -> n + m); executor.submit(task); }); stop(executor); System.out.println(atomicInt.get()); // => 499500 Other useful atomic classes are [ AtomicBoolean][AtomicBoolean], [ AtomicLong][AtomicLong] and [ AtomicReference ][AtomicReference] . ### LongAdder ### The class `LongAdder` as an alternative to `AtomicLong` can be used to consecutively add values to a number. ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> executor.submit(adder::increment)); stop(executor); System.out.println(adder.sumThenReset()); // => 1000 LongAdder provides methods `add()` and `increment()` just like the atomic number classes and is also thread-safe. But instead of summing up a single result this class maintains a set of variables internally to reduce contention over threads. The actual result can be retrieved by calling `sum()` or `sumThenReset()` . This class is usually preferable over atomic numbers when updates from multiple threads are more common than reads. This is often the case when capturing statistical data, e.g. you want to count the number of requests served on a web server. The drawback of `LongAdder` is higher memory consumption because a set of variables is held in-memory. ### LongAccumulator ### LongAccumulator is a more generalized version of LongAdder. Instead of performing simple add operations the class `LongAccumulator` builds around a lambda expression of type `LongBinaryOperator` as demonstrated in this code sample: LongBinaryOperator op = (x, y) -> 2 * x + y; LongAccumulator accumulator = new LongAccumulator(op, 1L); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 10) .forEach(i -> executor.submit(() -> accumulator.accumulate(i))); stop(executor); System.out.println(accumulator.getThenReset()); // => 2539 We create a LongAccumulator with the function `2 * x + y` and an initial value of one. With every call to `accumulate(i)` both the current result and the value `i` are passed as parameters to the lambda expression. A `LongAccumulator` just like `LongAdder` maintains a set of variables internally to reduce contention over threads. ### ConcurrentMap ### The interface `ConcurrentMap` extends the map interface and defines one of the most useful concurrent collection types. Java 8 introduces functional programming by adding new methods to this interface. In the next code snippets we use the following sample map to demonstrates those new methods: ConcurrentMap<String, String> map = new ConcurrentHashMap<>(); map.put("foo", "bar"); map.put("han", "solo"); map.put("r2", "d2"); map.put("c3", "p0"); The method `forEach()` accepts a lambda expression of type `BiConsumer` with both the key and value of the map passed as parameters. It can be used as a replacement to for-each loops to iterate over the entries of the concurrent map. The iteration is performed sequentially on the current thread. map.forEach((key, value) -> System.out.printf("%s = %s\n", key, value)); The method `putIfAbsent()` puts a new value into the map only if no value exists for the given key. At least for the `ConcurrentHashMap` implementation of this method is thread-safe just like `put()` so you don’t have to synchronize when accessing the map concurrently from different threads: String value = map.putIfAbsent("c3", "p1"); System.out.println(value); // p0 The method `getOrDefault()` returns the value for the given key. In case no entry exists for this key the passed default value is returned: String value = map.getOrDefault("hi", "there"); System.out.println(value); // there The method `replaceAll()` accepts a lambda expression of type `BiFunction`. BiFunctions take two parameters and return a single value. In this case the function is called with the key and the value of each map entry and returns a new value to be assigned for the current key: map.replaceAll((key, value) -> "r2".equals(key) ? "d3" : value); System.out.println(map.get("r2")); // d3 Instead of replacing all values of the map `compute()` let’s us transform a single entry. The method accepts both the key to be computed and a bi-function to specify the transformation of the value. map.compute("foo", (key, value) -> value + value); System.out.println(map.get("foo")); // barbar In addition to `compute()` two variants exist: `computeIfAbsent()` and `computeIfPresent()` . The functional parameters of these methods only get called if the key is absent or present respectively. Finally, the method `merge()` can be utilized to unify a new value with an existing value in the map. Merge accepts a key, the new value to be merged into the existing entry and a bi-function to specify the merging behavior of both values: map.merge("foo", "boo", (oldVal, newVal) -> newVal + " was " + oldVal); System.out.println(map.get("foo")); // boo was foo ### ConcurrentHashMap ### All those methods above are part of the `ConcurrentMap` interface, thereby available to all implementations of that interface. In addition the most important implementation `ConcurrentHashMap` has been further enhanced with a couple of new methods to perform parallel operations upon the map. Just like parallel streams those methods use a special `ForkJoinPool` available via `ForkJoinPool.commonPool()` in Java 8. This pool uses a preset parallelism which depends on the number of available cores. Four CPU cores are available on my machine which results in a parallelism of three: System.out.println(ForkJoinPool.getCommonPoolParallelism()); // 3 This value can be decreased or increased by setting the following JVM parameter: -Djava.util.concurrent.ForkJoinPool.common.parallelism=5 We use the same example map for demonstrating purposes but this time we work upon the concrete implementation `ConcurrentHashMap` instead of the interface `ConcurrentMap` , so we can access all public methods from this class: ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(); map.put("foo", "bar"); map.put("han", "solo"); map.put("r2", "d2"); map.put("c3", "p0"); Java 8 introduces three kinds of parallel operations: `forEach` , `search` and `reduce` . Each of those operations are available in four forms accepting functions with keys, values, entries and key-value pair arguments. All of those methods use a common first argument called `parallelismThreshold` . This threshold indicates the minimum collection size when the operation should be executed in parallel. E.g. if you pass a threshold of 500 and the actual size of the map is 499 the operation will be performed sequentially on a single thread. In the next examples we use a threshold of one to always force parallel execution for demonstrating purposes. #### ForEach #### The method `forEach()` is capable of iterating over the key-value pairs of the map in parallel. The lambda expression of type `BiConsumer` is called with the key and value of the current iteration step. In order to visualize parallel execution we print the current threads name to the console. Keep in mind that in my case the underlying `ForkJoinPool` uses up to a maximum of three threads. map.forEach(1, (key, value) -> System.out.printf("key: %s; value: %s; thread: %s\n", key, value, Thread.currentThread().getName())); // key: r2; value: d2; thread: main // key: foo; value: bar; thread: ForkJoinPool.commonPool-worker-1 // key: han; value: solo; thread: ForkJoinPool.commonPool-worker-2 // key: c3; value: p0; thread: main #### Search #### The method `search()` accepts a `BiFunction` returning a non-null search result for the current key-value pair or `null` if the current iteration doesn’t match the desired search criteria. As soon as a non-null result is returned further processing is suppressed. Keep in mind that `ConcurrentHashMap` is unordered. The search function should not depend on the actual processing order of the map. If multiple entries of the map match the given search function the result may be non-deterministic. String result = map.search(1, (key, value) -> { System.out.println(Thread.currentThread().getName()); if ("foo".equals(key)) { return value; } return null; }); System.out.println("Result: " + result); // ForkJoinPool.commonPool-worker-2 // main // ForkJoinPool.commonPool-worker-3 // Result: bar Here’s another example searching solely on the values of the map: String result = map.searchValues(1, value -> { System.out.println(Thread.currentThread().getName()); if (value.length() > 3) { return value; } return null; }); System.out.println("Result: " + result); // ForkJoinPool.commonPool-worker-2 // main // main // ForkJoinPool.commonPool-worker-1 // Result: solo #### Reduce #### The method `reduce()` already known from Java 8 Streams accepts two lambda expressions of type `BiFunction` . The first function transforms each key-value pair into a single value of any type. The second function combines all those transformed values into a single result, ignoring any possible `null` values. String result = map.reduce(1, (key, value) -> { System.out.println("Transform: " + Thread.currentThread().getName()); return key + "=" + value; }, (s1, s2) -> { System.out.println("Reduce: " + Thread.currentThread().getName()); return s1 + ", " + s2; }); System.out.println("Result: " + result); // Transform: ForkJoinPool.commonPool-worker-2 // Transform: main // Transform: ForkJoinPool.commonPool-worker-3 // Reduce: ForkJoinPool.commonPool-worker-3 // Transform: main // Reduce: main // Reduce: main // Result: r2=d2, c3=p0, han=solo, foo=bar [https://winterbe.com/posts/2015/05/22/java8-concurrency-tutorial-atomic-concurrent-map-examples/][https_winterbe.com_posts_2015_05_22_java8-concurrency-tutorial-atomic-concurrent-map-examples] [previous tutorial]: https://blog.csdn.net/kangkanglou/article/details/82254969 [compare-and-swap]: http://en.wikipedia.org/wiki/Compare-and-swap [AtomicBoolean]: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/AtomicBoolean.html [AtomicLong]: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/AtomicLong.html [AtomicReference]: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/AtomicReference.html [https_winterbe.com_posts_2015_05_22_java8-concurrency-tutorial-atomic-concurrent-map-examples]: https://winterbe.com/posts/2015/05/22/java8-concurrency-tutorial-atomic-concurrent-map-examples/
相关 atomic 原子操作 原子(atomic)本意是“不能被进一步分割的最小粒子”,而原子操作(atomic operation)意 为“不可被中断的一个或一系列操作”。 处理器 左手的ㄟ右手/ 2022年09月12日 11:47/ 0 赞/ 159 阅读
相关 Package funModeling: data cleaning, importance variable analysis and model perfomance (This article was first published on [R - Data Science Heroes Blog][], and kindly contr ╰半橙微兮°/ 2022年08月20日 06:29/ 0 赞/ 123 阅读
相关 What is the difference between static and global variables ? [What is the difference between static and global variables][] ? 这个是我根据下面的这个回答总结的一个表 我就是我/ 2022年08月02日 05:56/ 0 赞/ 143 阅读
相关 创建 Atomic CSS 工具 Atomizer Atomizer 详细介绍 Atomizer 是一个创建 Atomic CSS 的工具,它适用于模板框架的组件,例如 [React][]、[Ember][] 和 [Angul ╰+哭是因爲堅強的太久メ/ 2022年05月19日 07:48/ 0 赞/ 179 阅读
相关 Atomic Variables and ConcurrentMap AtomicInteger `java.concurrent.atomic`包中包含了许多可以执行原子操作的类,所谓的原子操作是指在多线程并发的情况下无需使用`synch 悠悠/ 2022年05月14日 06:38/ 0 赞/ 237 阅读
相关 ConcurrentMap接口 ConcurrentMap接口 两个实现 ConcurrentHashMap ConcurrentSkipListMap 支持并发排序功能,弥补Concurr 今天药忘吃喽~/ 2022年05月14日 04:56/ 0 赞/ 315 阅读
相关 Atomic Variables and ConcurrentMap AtomicInteger The package `java.concurrent.atomic` contains many useful classes to pe 野性酷女/ 2022年05月14日 01:51/ 0 赞/ 313 阅读
相关 彻头彻尾的理解ConcurrentMap 转载:[https://blog.csdn.net/justloveyou\_/article/details/72783008][https_blog.csdn.net_ju ﹏ヽ暗。殇╰゛Y/ 2022年05月05日 12:58/ 0 赞/ 289 阅读
相关 Java集合--ConcurrentMap Java集合–ConcurrentMap 1 Map并发集合 1.1 ConcurrentMap ConcurrentMap,它是一个接口,是一个能够支持并发 我就是我/ 2022年03月15日 02:04/ 0 赞/ 285 阅读
相关 Java ConcurrentMap 接口 Java ConcurrentMap 接口 Java 集合框架的ConcurrentMap接口提供了一个线程安全的映射。也就是说,多个线程可以一次访问该映射,而不会影响映... 朱雀/ 2022年02月19日 12:04/ 0 赞/ 10866 阅读
还没有评论,来说两句吧...