【JUC】二十三、LongAdder:多线程计数的更优解

冷不防 2024-02-05 12:21 87阅读 0赞

文章目录

  • 1、常用API
  • 2、热点商品点赞计算器
  • 3、LongAdder高性能的原理
  • 4、源码:LongAdder-add方法
  • 5、源码:LongAdder-longAccumulate方法
  • 6、源码:LongAdder-sum方法
  • 7、AtomicLong和LongAdder的对比

Since 1.8,新加原子操作增强类:

  • DoubleAccumulator
  • DoubleAdder
  • LongAccumulator
  • LongAdder

API文档:runoob.com/manual/jdk11api/java.base/java/util/concurrent/atomic/LongAdder.html

1、常用API

常用方法:
在这里插入图片描述

LongAdder只能用来计算加法,且从零开始计算,LongAccumulator则更灵活,可传入一个函数式接口和初始值,函数式接口中自定义计算逻辑,加减乘除。

  1. LongAdder longAdder = new LongAdder();
  2. longAdder.increment(); //+1
  3. longAdder.increment();
  4. longAdder.increment();
  5. System.out.println(longAdder.sum()); //3
  6. //new LongAccumulator((x, y) -> x * y, 1)
  7. LongAccumulator longAccumulator = new LongAccumulator(new LongBinaryOperator() {
  8. @Override
  9. //left:初始值,right:传进来的值
  10. public long applyAsLong(long left, long right) {
  11. return left * right;
  12. }
  13. }, 1);
  14. longAccumulator.accumulate(2); //2
  15. longAccumulator.accumulate(3); //6
  16. System.out.println(longAccumulator.get()); //6

2、热点商品点赞计算器

热点商品点赞计算器,点赞数加加统计,不要求实时精确。比如:50个线程,每个线程100W次,总点赞数出来。分析下,点赞一次就+1,本质是多线程下的并发的i++:

  1. class ClickNumber{
  2. int number = 0;
  3. //方式一
  4. public synchronized void clickBySynchronized(){
  5. number++;
  6. }
  7. //方式二
  8. AtomicLong atomicLong = new AtomicLong(0);
  9. public void clickByAtomicLong(){
  10. atomicLong.getAndIncrement();
  11. }
  12. //方式三
  13. LongAdder longAdder = new LongAdder();
  14. public void clickByLongAdder(){
  15. longAdder.increment();
  16. }
  17. //方式四
  18. LongAccumulator longAccumulator = new LongAccumulator((x,y) -> x+y,0);
  19. public void clickByLongAccumulator(){
  20. longAccumulator.accumulate(1);
  21. }
  22. }

调用4种方式,借助辅助类计数器,计算5000000点赞耗时:

  1. public class AccumulatorDemo {
  2. public static final int _1W = 10000;
  3. public static final int threadNum = 50;
  4. public static void main(String[] args) throws InterruptedException {
  5. ClickNumber clickNumber = new ClickNumber(); //共享资源类对象
  6. long startTime;
  7. long endTime;
  8. CountDownLatch countDownLatch1 = new CountDownLatch(threadNum); //计数器
  9. CountDownLatch countDownLatch2 = new CountDownLatch(threadNum);
  10. CountDownLatch countDownLatch3 = new CountDownLatch(threadNum);
  11. CountDownLatch countDownLatch4 = new CountDownLatch(threadNum);
  12. //====方法一的耗时===
  13. startTime = System.currentTimeMillis();
  14. for (int i = 1; i <= threadNum ; i++) {
  15. new Thread(() -> {
  16. try {
  17. for (int j = 0; j < 100 * _1W; j++) {
  18. clickNumber.clickBySynchronized();
  19. }
  20. } finally {
  21. //50个线程,做完一个少一个
  22. countDownLatch1.countDown();
  23. }
  24. },String.valueOf(i)).start();
  25. }
  26. countDownLatch1.await();
  27. endTime = System.currentTimeMillis();
  28. System.out.println("synchronized耗时:" + (endTime - startTime) + ",当前点赞数:" + clickNumber.number);
  29. //====方法2的耗时===
  30. startTime = System.currentTimeMillis();
  31. for (int i = 1; i <= threadNum ; i++) {
  32. new Thread(() -> {
  33. try {
  34. for (int j = 0; j < 100 * _1W; j++) {
  35. clickNumber.clickByAtomicLong();
  36. }
  37. } finally {
  38. //50个线程,做完一个少一个
  39. countDownLatch2.countDown();
  40. }
  41. },String.valueOf(i)).start();
  42. }
  43. countDownLatch2.await();
  44. endTime = System.currentTimeMillis();
  45. System.out.println("AtomicLong耗时:" + (endTime - startTime) + ",当前点赞数:" + clickNumber.atomicLong.get());
  46. //====方法3的耗时===
  47. startTime = System.currentTimeMillis();
  48. for (int i = 1; i <= threadNum ; i++) {
  49. new Thread(() -> {
  50. try {
  51. for (int j = 0; j < 100 * _1W; j++) {
  52. clickNumber.clickByLongAdder();
  53. }
  54. } finally {
  55. //50个线程,做完一个少一个
  56. countDownLatch3.countDown();
  57. }
  58. },String.valueOf(i)).start();
  59. }
  60. countDownLatch3.await();
  61. endTime = System.currentTimeMillis();
  62. System.out.println("LongAdder耗时:" + (endTime - startTime) + ",当前点赞数:" + clickNumber.longAdder.sum());
  63. //====方法4的耗时===
  64. startTime = System.currentTimeMillis();
  65. for (int i = 1; i <= threadNum ; i++) {
  66. new Thread(() -> {
  67. try {
  68. for (int j = 0; j < 100 * _1W; j++) {
  69. clickNumber.clickByLongAccumulator();
  70. }
  71. } finally {
  72. //50个线程,做完一个少一个
  73. countDownLatch4.countDown();
  74. }
  75. },String.valueOf(i)).start();
  76. }
  77. countDownLatch4.await();
  78. endTime = System.currentTimeMillis();
  79. System.out.println("LongAccumulator耗时:" + (endTime - startTime) + ",当前点赞数:" + clickNumber.longAccumulator.get());
  80. }
  81. }

运算结果:

在这里插入图片描述

结论:很大的高并发下,LongAdder的性能优于AtomicLong(减少了乐观锁的重试次数)

在这里插入图片描述

3、LongAdder高性能的原理

LongAdder —> Striped64类 —> Number类,Cell类,单元格类,是Striped64类的一个内部类。

在这里插入图片描述

Striped64类的属性解释:

在这里插入图片描述
前面的AtomicLong,N个线程同时CAS修改一个值,每次只会有一个成功,而其余N-1个线程一定失败而继续不停的自旋,N很大时,就会有大量的失败自旋。

而LongAdder的基本思路就是分散热点,不要逮着一个值自旋,而是将value值分散到一个Cell数组中,不同线程会命中到Cell数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。

sum()时会将所有Cel数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,每次并发CAS的失败线程数量就少了。

在这里插入图片描述在这里插入图片描述

LongAdder在无竞争的情况下,跟AtomicLong一样,对同一个base进行操作,当出现竞争关系时则是采用化整为零分散热点的做法,用空间换时间,新加一个数组cells,将一个value的累加拆分进这个数组cells来分担。

在这里插入图片描述

多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到cells数组的某下标,然后对该下标所对应的值进行自增换作。当所有线程操作完毕,将数组cells的所有值和base都加起来作为最终结果。

4、源码:LongAdder-add方法

  1. new LongAdder().increment()

各方法底层的调用关系为:

  1. increment() --> add(1L) --> longAccumulate()
  2. //最后累加的结果
  3. --> sum()

在这里插入图片描述
从add方法开始看,方法局部变量有:

  • as:Striped64类的cells数组
  • b:Striped64类的的base值
  • v:期望值
  • m:cells数组的长度
  • a:当前线程命中的cell数组中的cell单元格对象

首次,只有一个线程的时候,base就可以完成,cell == null,casBase方法+1的操作操作成功,if条件不成立,直接跳出循环,但已经在||条件判断的时候顺带着完成了+1的操作。

  1. public void add(long x) {
  2. Cell[] cs; long b, v; int m; Cell c;
  3. if ((cs = cells) != null || !casBase(b = base, b + x)) {
  4. // true无竞争,false表示竟争激烈,多个线hash到同一个CeLL,可能要扩容
  5. boolean uncontended = true;
  6. //||下的四个条件分别为:
  7. //条件1:cell单元格数组为空
  8. //条件2:cell长度小于1,一般不会,因为到这儿说明cell不为null,而其长度2次幂起步
  9. //条件3:getProbe获取当前线程的哈希值,映射到cell后,cell为空,说明当前线程还没更新过cell,应初始化一个cell
  10. //条件4:更新当前线程所映射的cell失败,即多个线程hash到了同一个cell,说明竞争激烈,取反后到longAccumulate继续扩容
  11. if (cs == null || (m = cs.length - 1) < 0 ||
  12. //getProbe方法返回的时线程中的threadLocalRandomProbe字段
  13. //它是通过随机数生成的一个值,对于一个确定的线程,这个值固定,除非刻意修改
  14. (c = cs[getProbe() & m]) == null ||
  15. !(uncontended = c.cas(v = c.value, v + x)))
  16. longAccumulate(x, null, uncontended); //Striped64中的方法扩容
  17. }
  18. }

随着线程增多,CAS判断失败,false,取反后条件成立,进入if体中,uncountened=true,默认没有冲突,此时还没扩容,Cell[] as == null是成立的,进入longAccumulate(),按2次幂阔,出来两个Cell。(longAccumulate方法下面再详细展开)

在这里插入图片描述

此时,base可以+1,cell0、cell1也可以做+1,再调add,此时Cell[] as不再等于null,且length-1=1>0(2次幂) ,继续看||后面的条件,a = as[getProbe() & m],算坑位,比如算到了cell1这个单元格,此时,假设cell1中有值,为1,做个CAS,x为1,则1改为2,返回true,取反为false,跳出方法,但+1也随着条件判断完成了。

在这里插入图片描述

如上图,再并发,竞争激烈,cell0、cell1扛不住了也,cell上cas失败,uncontended为false,取反,true,再进入longAccumulate()扩容,2个变4个。

总结:

  • 最初无竞争时只更新base;
  • 如果更新base失败后,首次新建一个Cell[ ]数组
  • 当多个线程竞争同一个Cell比较激烈时,可能就要对Cell[ ]扩容

代码亮点:调用方法做为判断条件,最终的效果就是活儿干了(数据改变了),条件也做了判断了。借鉴!

简略版图解:

在这里插入图片描述

5、源码:LongAdder-longAccumulate方法

Striped64类中的一些属性和方法:getProbe方法,获取线程的hash值,这个值用于判断去cell数组的哪个槽位中去。

在这里插入图片描述

  1. //getProbe方法返回的时线程中的threadLocalRandomProbe字段
  2. static final int getProbe(){
  3. return UNSAFE.getInt(Thread.currentThread(),PROBE);
  4. }

longAccumulate()方法的入参:

  • long x :需要做雷加的值,increment调用下,一般默认都是+1
  • LongBinaryOperator fn :默认传递的是null
  • wasUncontended:竞争标识,如果是false则代表有竞争,只有cells初始化之后,并且当前线程CAS竞争修改失败,才会是false

longAccumulate方法开头处理下线程的probe值:

  1. final void longAccumulate(long x, LongBinaryOperator fn,
  2. boolean wasUncontended) {
  3. //存储线程的probe值
  4. int h;
  5. //getProbe返回为0,即线程随机数未初始化
  6. if ((h = getProbe()) == 0) {
  7. //使用ThreadLocalRandom为当前线程重新计算一个hash值,强制初始化
  8. ThreadLocalRandom.current(); // force initialization
  9. //重新获取probe值,hash被重置就好比一个全新的线程一样,所以设置了wasUncontended竞争状态为true,表示无竞争
  10. h = getProbe();
  11. //重新计算了当前线程的hash后认为此次不算是一次竞争,都未初始化,背定还不存在竟争激烈,wasUncontended竞争状态为true
  12. wasUncontended = true;
  13. }
  14. //......

然后longAccumulate方法源码大体结构为:首先给当前线程分配一个hash值,然后进入一个for(;;)自旋,这个自旋分为三个分支:

  • CASE1: Cell[ ] 数组已经初始化
  • CASE2:CelI[ ] 数组未初始化(首次新建)
  • CASE3:Cell[ ] 数组正在初始化中

在这里插入图片描述

  1. 先看Case2:未初始化过Cell[ ] 数组,尝试占有锁并首次初始化cells数组

cellsBusy:初始化cells或者扩容cells需要获取锁,0表示无锁状态,1表示其他线程已经持有了锁。为0时,抢到锁,&&后面casCellsBusy改为1,初始化创建Cell[2]后,finally中cellsBusy改回0,注意下面有点双重检锁的味道。

在这里插入图片描述

如果上面条件都执行成功就会执行数组的初始化及赋值操作, Cell[] rs = new Cell[2]表示数组的长度为2,rs[h & 1]= new Cell(x) 即创建一个新的Cell元素,value是累加的值x,默认为1。h & 1类似于之前HashMap常用到的计算散列桶index的算法,通常都是hash & (table.len - 1),同hashmap一个意恩。

  1. 再看Case3:上面竞争很激烈,else兜底的,多个线程尝试CAS修改失败的线程会走到这个分支

该分支直接操作base,将值累加到base

在这里插入图片描述

最后看Case1:Cell数组不再为空且可能存在Cell数组扩容,多个线程同时命中一个cell的竞争。此个If分支又分为6中if情况:

在这里插入图片描述

上面代码判断当前线程hash后指向的数据位置元素是否为空,如果为空则将Cell数据放入数组中,跳出循环。如果不空则继续循环。

在这里插入图片描述

wasUncontended表示cells初始化后,当前线程竞争修改失败wasUncontended =false,这里只是重新设置了这个值为true,紧接着执行Striped64类的advanceProbe(h)方法重置当前线程的hash,重新循环,重新再竞争一次。

在这里插入图片描述说明当前线程对应的数组中有了数据,也重置过hash值,这时通过CAS操作尝试对当前数中的value值进行累加x操作,x默认为1,如果CAS成功则直接break跳出循环。

在这里插入图片描述

如果n大于CPU最大数量,不可扩容并通过下面的h=advanceProbe(h)方法修改线程的probe再重新尝试。

在这里插入图片描述

如果扩容意向collide是false则修改它为true,然后重新算当前线程的hash值继续循环,如果当前数组的长度已经大于了CPU的核数,就会再次设置扩容意向collide=false (见上一步)

在这里插入图片描述

总结:

在这里插入图片描述
在这里插入图片描述

6、源码:LongAdder-sum方法

在这里插入图片描述
sum()会将斯有Cell数组中的value和base累加作为返回值。核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点。

在这里插入图片描述

为啥并发情况下sum的值不精确?

sum执行时,并没有限制对base和cells的更新(一句要命的话),所以LongAdder不是强一致性的,它是最终一致性的。

首先,最终返回的sum局部变量,初始被复制为base,而最终返回时,很可能base已经被更新了,而此时局部变量sum不会更新,造成不一致。其次,这里对cell的读取也无法保证是最后一次写入的值。所以,sum方法在没有并发的情况下,可以获得正确的结果。

7、AtomicLong和LongAdder的对比

AtomicLong:

  • 通过CAS+自旋实现
  • 线程安全,可允许一些性能损耗,要求高精度时选AtomicLong
  • 保证精度,但以性能为代价
  • AtomicLong是多个线程针对单个热点值value进行原子操作
  • 缺点是高并发下,性能急剧下降,且AtomicLong的自旋同时也是瓶颈:因为N个线程同时CAS一个值,只有一个线程成功,其余N-1个线程要不断自旋

LongAdder:

  • 通过CAS+Base +Cell数组分散热点来实现
  • 当需要在高并发下有较好的性能表现,且对值的精确度要求不高时,可以使用
  • 保证性能,但以精度为代价
  • LongAdder是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行CAS操作
  • 缺陷是:sum求和后,还有计算线程修改结果的话,最后返回的结果不够准确

发表评论

表情:
评论列表 (有 0 条评论,87人围观)

还没有评论,来说两句吧...

相关阅读

    相关 JUC_回顾线

    什么是JUC JUC是java.util.concurrent包的简称,在Java5.0添加,目的就是为了更好的支持高并发任务。让开发者进行多线程编程时减少竞争条件和死锁

    相关 线JUC

    进程 进程就是正在运行的程序,是系统进行资源分配和调用的独立单位。每一个进程都有他自己的内存空间和系统资源 多进程意义在于计算机可以执行多个任务,提高cpu使用率

    相关 JUC-LongAdder

    LongAdder只能用来计算加法,且从零开始计算 LongAccumulator提供了自定义的操作函数 ![watermark_type_ZHJvaWRzYW5zZmFs