ConcurrentHashMap源码分析

一时失言乱红尘 2023-09-29 15:15 100阅读 0赞

ConcurrentHashMap关系图

在这里插入图片描述

JDK1.7中的实现

JDK1.7 中的ConcurrentHashMap采用了分段锁的设计,先来看一下它的数据结构。
ConcurrentHashMap中含有几个Segment数组。每个Segment中又含有几个HashEntry数组。

Segment是一种可重入锁,在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数据。

一个ConcurrentHashMap里面包含多个Segment数组。Segment的结构和HashMap类似,是一种数组和链表结构。

ConcurrentHashMap通过使用分段锁技术,将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。
在这里插入图片描述

1、存储结构

  1. static final class HashEntry<K,V> {
  2. final int hash;
  3. final K key;
  4. volatile V value;
  5. volatile HashEntry<K,V> next;
  6. }

ConcurrentHashMap 和 HashMap 实现上类似,最主要的差别是 ConcurrentHashMap 采用了分段锁(Segment),每个分段锁维护着几个桶(HashEntry),多个线程可以同时访问不同分段锁上的桶,从而使其并发度更高(并发度就是 Segment 的个数)。
Segment 继承自 ReentrantLock。

  1. static final class Segment<K,V> extends ReentrantLock implements Serializable {
  2. private static final long serialVersionUID = 2249069246763182397L;
  3. static final int MAX_SCAN_RETRIES =
  4. Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
  5. transient volatile HashEntry<K,V>[] table;
  6. transient int count;
  7. transient int modCount;
  8. transient int threshold;
  9. final float loadFactor;
  10. }
  11. final Segment<K,V>[] segments;

可以看到几个熟悉的字段。HashEntry(哈希数组),threshold(扩容阈值),loadFactor(负载因子)表示segment是一个完整的HashMap.
接下来我们看看ConcurrentHashMap的构造函数

  1. public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)

三个参数分别代表:

  • 初始容量:初始容量表示所有的segment数组中,一共含有多少个hashentry。若initialCapacity不为2的幂,会取一个大于initialCapacity的2的幂。
  • 负载因子:默认0.75
  • 并发级别:可以同时允许多少个线程并发。concurrencyLevel为多少,就有多少个segment,当然也会取一个大于等于这个值的2的幂。

默认的并发级别为 16,也就是说默认创建 16 个 Segment。

  1. static final int DEFAULT_CONCURRENCY_LEVEL = 16;

在这里插入图片描述
接下来我们看一下ConcurrentHashMap中的几个关键函数,get,put,rehash(扩容), size方法,看看他是如何实现并发的。

2、get操作

在这里插入图片描述
get实现过程:
1、根据key,计算出hashCode;
2、根据步骤1计算出的hashCode定位segment,如果segment不为null && segment.table也不为null,跳转到步骤3,否则,返回null,该key所对应的value不存在;
3、根据hashCode定位table中对应的hashEntry,遍历hashEntry,如果key存在,返回key对应的value;
4、步骤3结束仍未找到key所对应的value,返回null,该key对应的value不存在。

比起Hashtable,ConcurrentHashMap的get操作高效之处在于整个get操作不需要加锁。如果不加锁,ConcurrentHashMap的get操作是如何做到线程安全的呢?原因是volatile,所有的value都定义成了volatile类型,volatile可以保证线程之间的可见性,这也是用volatile替换锁的经典应用场景

3、put操作

ConcurrentHashMap提供两个方法put和putIfAbsent来完成put操作,它们之间的区别在于put方法做插入时key存在会更新key所对应的value,而putIfAbsent不会更新。
在这里插入图片描述
put实现过程:
1、参数校验,value不能为null,为null时抛出空指针异常;
2、计算key的hashCode;
3、定位segment,如果segment不存在,创建新的segment;
4、调用segment的put方法在对应的segment做插入操作。

segment的put方法实现
segment的put方法是整个put操作的核心,它实现了在segment的HashEntry数组中做插入(segment的HashEntry数组采用拉链法来处理冲突)。
在这里插入图片描述
segment put实现过程:
1、获取锁,保证put操作的线程安全;
2、 定位到HashEntry数组中具体的HashEntry;
3、遍历HashEntry链表,假若待插入key已存在:
* 需要更新key所对应value,更新oldValue=newValue,跳转到步骤5;
* 否则,直接跳转到步骤5;
4、遍历完HashEntry链表,key不存在,插入HashEntry节点,oldValue=null,跳转到步骤5、释放锁,返回oldValue。

步骤4做插入的时候实际上经历了两个步骤:
1、第一:HashEntry数组扩容;

  • 是否需要扩容
    在插入元素前会先判断Segment的HashEntry数组是否超过threshold,如果超过阀值,则需要对HashEntry数组扩容;
  • 如何扩容
    在扩容的时候,首先创建一个容量是原来容量两倍的数组,将原数组的元素再散列后插入到新的数组里。为了高效,ConcurrentHashMap只对某个Segment进行扩容,不会对整个容器扩容

JDK1.8的改动

JDK1.8中,出现了较大的改动。没有使用段锁,改成了Node数组 + 链表 + 红黑树的方式。

put()方法

假设table已经初始化完成,put操作采用CAS+synchronized实现并发插入或更新操作,具体实现如下。

  1. public V put(K key, V value) {
  2. return putVal(key, value, false);
  3. }
  4. /** Implementation for put and putIfAbsent */
  5. final V putVal(K key, V value, boolean onlyIfAbsent) {
  6. //不允许key、value为空
  7. if (key == null || value == null) throw new NullPointerException();
  8. //返回 (h ^ (h >>> 16)) & HASH_BITS;
  9. int hash = spread(key.hashCode());
  10. int binCount = 0;
  11. //循环,直到插入成功
  12. for (Node[] tab = table;;) {
  13. Node f; int n, i, fh;
  14. if (tab == null || (n = tab.length) == 0)
  15. //table为空,初始化table
  16. tab = initTable();
  17. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  18. //索引处无值
  19. if (casTabAt(tab, i, null,
  20. new Node(hash, key, value, null)))
  21. break; // no lock when adding to empty bin
  22. }
  23. else if ((fh = f.hash) == MOVED)// MOVED=-1;
  24. //检测到正在扩容,则帮助其扩容
  25. tab = helpTransfer(tab, f);
  26. else {
  27. V oldVal = null;
  28. //上锁(hash值相同的链表的头节点)
  29. synchronized (f) {
  30. if (tabAt(tab, i) == f) {
  31. if (fh >= 0) {
  32. //遍历链表节点
  33. binCount = 1;
  34. for (Node e = f;; ++binCount) {
  35. K ek;
  36. // hash和key相同,则修改value
  37. if (e.hash == hash &&
  38. ((ek = e.key) == key ||
  39. (ek != null && key.equals(ek)))) {
  40. oldVal = e.val;
  41. //仅putIfAbsent()方法中onlyIfAbsent为true
  42. if (!onlyIfAbsent)
  43. //putIfAbsent()包含key则返回get,否则put并返回
  44. e.val = value;
  45. break;
  46. }
  47. Node pred = e;
  48. //已遍历到链表尾部,直接插入
  49. if ((e = e.next) == null) {
  50. pred.next = new Node(hash, key,
  51. value, null);
  52. break;
  53. }
  54. }
  55. }
  56. else if (f instanceof TreeBin) {
  57. // 树节点
  58. Node p;
  59. binCount = 2;
  60. if ((p = ((TreeBin)f).putTreeVal(hash, key,
  61. value)) != null) {
  62. oldVal = p.val;
  63. if (!onlyIfAbsent)
  64. p.val = value;
  65. }
  66. }
  67. }
  68. }
  69. if (binCount != 0) {
  70. //判断是否要将链表转换为红黑树,临界值和HashMap一样也是8
  71. if (binCount >= TREEIFY_THRESHOLD)
  72. //若length<64,直接tryPresize,两倍table.length;不转树
  73. treeifyBin(tab, i);
  74. if (oldVal != null)
  75. return oldVal;
  76. break;
  77. }
  78. }
  79. }
  80. addCount(1L, binCount);
  81. return null;
  82. }

get()方法

get方法不用加锁。利用CAS操作,可以达到无锁的访问。

  1. public V get(Object key) {
  2. Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
  3. int h = spread(key.hashCode());
  4. if ((tab = table) != null && (n = tab.length) > 0 &&
  5. (e = tabAt(tab, (n - 1) & h)) != null) {
  6. //tabAt(i),获取索引i处Node
  7. // 判断头结点是否就是我们需要的节点
  8. if ((eh = e.hash) == h) {
  9. if ((ek = e.key) == key || (ek != null && key.equals(ek)))
  10. return e.val;
  11. }
  12. // 如果头结点的 hash<0,说明正在扩容,或者该位置是红黑树
  13. else if (eh < 0)
  14. return (p = e.find(h, key)) != null ? p.val : null;
  15. //遍历链表
  16. while ((e = e.next) != null) {
  17. if (e.hash == h &&
  18. ((ek = e.key) == key || (ek != null && key.equals(ek))))
  19. return e.val;
  20. }
  21. }
  22. return null;
  23. }

get() 执行过程:

  1. 计算 hash 值
  2. 根据 hash 值找到数组对应位置: (n – 1) & h
  3. 根据该位置处结点性质进行相应查找

    • 如果该位置为 null,那么直接返回 null 就可以了
    • 如果该位置处的节点刚好就是我们需要的,返回该节点的值即可
    • 如果该位置节点的 hash 值小于 0,说明正在扩容,或者是红黑树
    • 如果以上 3 条都不满足,那就是链表,进行遍历比对即可

JDK8中的ConcurrentHashMap是怎么保证并发安全的?

主要利用Unsafe操作+synchronized关键字。Unsafe操作的使用仍然和JDK7中的类似,主要负责并发安全的修改对象的属性或数组某个位置的值。

synchronized主要负责在需要操作某个位置时进行加锁 (该位置不为空),比如向某个位置的链表进行插入结点,向某个位置的红黑树插入结点。JDK8中其实仍然有分段锁的思想,只不过JDK7中段数是可以控制的,而JDK8中是数组的每一个位置都有 一把锁。

当向ConcurrentHashMap中put一 个key ,value时
1.首先根据key计算对应的数组下标i, 如果该位置没有元素, 则通过自旋的方法去向该位置赋值;

2.如果该位置有元素, 则synchronized会加锁;

3.加锁成功之后, 在判断该元素的类型a. 如果是链表节点则进行添加节点到链表中b. 如果是红黑树则添加节点到红黑树;

4.添加成功后,判断是否需要进行树化;

5.addCount,这个方法的意思是ConcurrentHashMap的元素个数加1, 但是这个操作也是需要并发安全的,并且元素个数加1成功后,会继续判断是否要进行扩容, 如果需要,则会进行扩容,所以这个方法很重要。

6、同时一个线程在put时如果发现当前ConcurrentHashMap正在进行扩容则会帮助扩容。

发表评论

表情:
评论列表 (有 0 条评论,100人围观)

还没有评论,来说两句吧...

相关阅读