Java并发 多线程实现计数功能(线程安全)

左手的ㄟ右手 2021-11-22 12:48 927阅读 0赞

在上篇,我们利用线程池,信号量,倒计时相关类实现计数的功能,但运行结果总不能达到目标,我们将做以下改进。

1.首先附上源码,红色标注,是我们此次修改的地方

  1. import javax.annotation.concurrent.ThreadSafe;
  2. import java.util.concurrent.CountDownLatch;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.Executors;
  5. import java.util.concurrent.Semaphore;
  6. import java.util.concurrent.atomic.AtomicInteger;
  7. @ThreadSafe
  8. public class SafeCountExample {
  9. //请求总数
  10. private static int clientTotal = 5000;
  11. //线程数量
  12. private static int threadTotal = 200;
  13. private static AtomicInteger count = new AtomicInteger(0);
  14. public static void main(String[] args) throws Exception{
  15. ExecutorService exec = Executors.newCachedThreadPool();
  16. final Semaphore semaphore = new Semaphore(threadTotal);
  17. //每次固定数量的线程获取许可
  18. final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
  19. for (int i = 0; i < clientTotal ; i++) {
  20. exec.execute(()->{
  21. try {
  22. semaphore.acquire();
  23. add();
  24. semaphore.release();
  25. }catch (Exception e){
  26. e.printStackTrace();
  27. }
  28. countDownLatch.countDown();
  29. });
  30. }
  31. countDownLatch.await();
  32. exec.shutdown();
  33. System.out.println(count.get());
  34. }
  35. private static void add(){
  36. count.incrementAndGet();
  37. }
  38. }

2.AtomicInteger详解

  1. 2.1 AtomicInteger是一个支持原子操作的Integer类,它提供了原子自增方法、原子自减方法以及原子赋值方法等。其底层是通过volatileCAS实现的,其中volatile保证了内存可见性,CAS算法保证了原子性。因此接下来我们先了解下volatileCAS,然后在研究下AtomicIntegerunsafe相关的源码。
  2. 2.2 CAS
  3. CASCompare And Swap)即比较并交换,CAS是乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。它包含三个参数:V内存值,预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。原理图如下所示:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3k1MzI3OTgxMTM_size_16_color_FFFFFF_t_70

  1. 2.3 volatile变量
  2. volatile是一种稍弱的同步机制,用来确保将变量的更新操作通知到其他线程。当把变量声明为volatile类型后,编译器与运行时都会注意到这个变量是共享的,因此不会将该变量上的操作与其他内存操作一起重排序。volatile变量不会被缓存在寄存器或者对其他处理器不可见的地方,因此在读取volatile类型的变量时总返回最新写入的值。在访问volatile变量时不会执行加锁操作,因此也就不会使执行线程阻塞,因此volatile变量是一种比sychronized关键字更轻量级的同步机制。
  3. 2.4 源码分析
  4. 2.4.1 AtomicInteger 部分源码:
  5. public class AtomicInteger extends Number implements java.io.Serializable {
  6. private static final Unsafe unsafe = Unsafe.getUnsafe();//调用类Unsafe
  7. private static final long valueOffset;//变量value的内存偏移量
  8. private volatile int value;//volatile修饰的int变量value
  9. static {
  10. try {
  11. valueOffset = unsafe.objectFieldOffset
  12. (AtomicInteger.class.getDeclaredField("value"));
  13. } catch (Exception ex) { throw new Error(ex); }
  14. }
  15. public final int getAndSet(int newValue) {//设置新值并返回旧值
  16. return unsafe.getAndSetInt(this, valueOffset, newValue);
  17. }
  18. public final boolean compareAndSet(int expect, int update) {//如果当前值为expect,则设置为update
  19. return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
  20. }
  21. public final int getAndIncrement() {//当前值加1返回旧值
  22. return unsafe.getAndAddInt(this, valueOffset, 1);
  23. }
  24. public final int getAndDecrement() {//当前值减1返回旧值
  25. return unsafe.getAndAddInt(this, valueOffset, -1);
  26. }
  27. public final int getAndAdd(int delta) {//当前值增加delta,返回旧值
  28. return unsafe.getAndAddInt(this, valueOffset, delta);
  29. }
  30. public final int incrementAndGet() {//当前值增加1返回新值
  31. return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
  32. }
  33. public final int decrementAndGet() {//当前值减1,返回新值
  34. return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
  35. }
  36. }
  37. 2.4.1 unsafe部分源码
  38. public final class Unsafe {
  39. private static final Unsafe theUnsafe;
  40. public final int getAndAddInt(Object var1, long var2, int var4) {
  41. int var5;
  42. do
  43. //变量var5 调用底层的方法获取底层当前的值
  44. var5 = this.getIntVolatile(var1, var2);
  45. //变量var1位count计数对象,变量var2当前值,变量为var4增加量
  46. } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
  47. return var5;
  48. }
  49. public final long getAndAddLong(Object var1, long var2, long var4) {
  50. long var6;
  51. do {
  52. var6 = this.getLongVolatile(var1, var2);
  53. } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));
  54. return var6;
  55. }
  56. public final int getAndSetInt(Object var1, long var2, int var4) {
  57. int var5;
  58. do {
  59. var5 = this.getIntVolatile(var1, var2);
  60. } while(!this.compareAndSwapInt(var1, var2, var5, var4));
  61. return var5;
  62. }
  63. public final long getAndSetLong(Object var1, long var2, long var4) {
  64. long var6;
  65. do {
  66. var6 = this.getLongVolatile(var1, var2);
  67. } while(!this.compareAndSwapLong(var1, var2, var6, var4));
  68. return var6;
  69. }
  70. public final Object getAndSetObject(Object var1, long var2, Object var4) {
  71. Object var5;
  72. do {
  73. var5 = this.getObjectVolatile(var1, var2);
  74. } while(!this.compareAndSwapObject(var1, var2, var5, var4));
  75. return var5;
  76. }
  77. }

分析:我们着重关注compareAndSwapInt(var1, var2, var5, var4),这个方法,我们看到这是一个底层的实现方法

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3k1MzI3OTgxMTM_size_16_color_FFFFFF_t_70 1

//变量var5 调用底层的方法获取底层当前的值

//变量var1位count计数对象,变量var2当前值,变量为var4增加量

每次传入的count对象var1时,将底层的值var5与当前的值var2进行比较

i. 若相同就将底层的值更新为var5+var4

ii. 若不相同,则循环从底层获取最新的值,原因:在多线程的前提下,底层的值,可能已被别的线程修改。

因此:这样就保证变量的值为最新。

发表评论

表情:
评论列表 (有 0 条评论,927人围观)

还没有评论,来说两句吧...

相关阅读