ConcurrentHashMap源码解析_04 transfer方法源码分析(难点)

电玩女神 2023-01-18 09:26 96阅读 0赞
  • 上一篇文章介绍过put方法以及其相关的方法,接下来,本篇就介绍一下transfer这个方法(比较难),最好能动手结合着源码进行分析,并仔细理解前面几篇文章的内容~
  • :代码分析的注释中的CASE0、CASE1… ,这些并没有直接关联关系,只是为了给每个if逻辑判断加一个标识,方便在其他逻辑判断的地方进行引用而已。

再复习一下并发Map的结构图:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MzU5MTk4MA_size_16_color_FFFFFF_t_70


1、transfer方法

transfer方法的作用是:迁移元素,扩容时table容量变为原来的两倍,并把部分元素迁移到其它桶nextTable中。该方法迁移数据的流程就是在发生扩容时,从散列表原table中,按照桶位下标,依次将每个桶中的元素(结点、链表、树)迁移到新表nextTable中。

另外还有与其相关的方法helpTransfer:协助扩容(迁移元素),当线程添加元素时发现table正在扩容,且当前元素所在的桶元素已经迁移完成了,则协助迁移其它桶的元素。

下图为并发扩容流程图,在分析源码前熟悉一下流程:

请添加图片描述

链表迁移示意图:

在这里插入图片描述

下面就开始分析代码把:

  1. /** * 迁移元素,扩容时table容量变为原来的两倍,并把部分元素迁移到其它桶nextTable中: */
  2. private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  3. // n 表示扩容之前table数组的长度
  4. // stride 表示分配给线程任务的步长
  5. int n = tab.length, stride;
  6. // 这里为了方便分析,根据CUP核数,stride 固定为MIN_TRANSFER_STRIDE 16
  7. if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
  8. // 控制线程迁移数据的最小步长(桶位的跨度~)
  9. stride = MIN_TRANSFER_STRIDE; // subdivide range
  10. // ------------------------------------------------------------------------------
  11. // CASE0:nextTab == null
  12. // 条件成立:表示当前线程为触发本次扩容的线程,需要做一些扩容准备工作:(在if代码块中)
  13. // 条件不成立:表示当前线程是协助扩容的线程...
  14. if (nextTab == null) { // initiating
  15. try {
  16. // 创建一个比扩容之前容量n大一倍的新table
  17. @SuppressWarnings("unchecked")
  18. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
  19. nextTab = nt;
  20. } catch (Throwable ex) { // try to cope with OOME
  21. sizeCtl = Integer.MAX_VALUE;
  22. return;
  23. }
  24. // 赋值nextTab对象给 nextTable ,方便协助扩容线程 拿到新表
  25. nextTable = nextTab;
  26. // 记录迁移数据整体位置的一个标记,初始值是原table数组的长度。
  27. // 可以理解为:全局范围内散列表的数据任务处理到哪个桶的位置了
  28. transferIndex = n;
  29. }
  30. // 表示新数组的长度
  31. int nextn = nextTab.length;
  32. // fwd节点,当某个桶位数据迁移处理完毕后,将此桶位设置为fwd节点,其它写线程或读线程看到后,会有不同逻辑。fwd结点指向新表nextTab的引用
  33. ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
  34. // 推进标记(后面讲数据迁移的时候再分析~)
  35. boolean advance = true;
  36. // 完成标记(后面讲数据迁移的时候再分析~)
  37. boolean finishing = false; // to ensure sweep before committing nextTab
  38. // i 表示分配给当前线程的数据迁移任务,任务执行到的桶位下标(任务进度~表示当前线程的任务已经干到哪里了~)
  39. // bound 表示分配给当前线程任务的下界限制。(线程的数据迁移任务先从高位开始迁移,迁移到下界位置结束)
  40. int i = 0, bound = 0;
  41. // 自旋~ 非常长的一个for循环...
  42. for (;;) {
  43. // f 桶位的头结点
  44. // fh 头结点的hash
  45. Node<K,V> f; int fh;
  46. /** * while循环的任务如下:(循环的条件为推进标记advance为true) * 1.给当前线程分配任务区间 * 2.维护当前线程任务进度(i 表示当前处理的桶位) * 3.维护map对象全局范围内的进度 */
  47. // 整个while循环就是在算i的值,过程太复杂,不用太关心
  48. // i的值会从n-1依次递减,感兴趣的可以打下断点就知道了
  49. // 其中n是旧桶数组的大小,也就是说i从15开始一直减到1这样去迁移元素
  50. while (advance) {
  51. // nextIndex~nextBound分配任务的区间:
  52. // 分配任务的开始下标
  53. // 分配任务的结束下标
  54. int nextIndex, nextBound;
  55. // -----------------------------------------------------------------------
  56. // CASE1:
  57. // 条件一:--i >= bound
  58. // 成立:表示当前线程的任务尚未完成,还有相应的区间的桶位要处理,--i 就让当前线程处理下一个桶位.
  59. // 不成立:表示当前线程任务已完成 或者 未分配
  60. if (--i >= bound || finishing)
  61. // 推进标记设置为flase
  62. advance = false;
  63. // -----------------------------------------------------------------------
  64. // CASE2: (nextIndex = transferIndex) <= 0
  65. // transferIndex:可以理解为,全局范围内散列表的数据任务处理到哪个桶的位置了~
  66. // 前置条件:当前线程任务已完成 或 者未分配,即没有经过CASE1
  67. // 条件成立:表示对象全局范围内的桶位都分配完毕了,没有区间可分配了,设置当前线程的i变量为-1 跳出循环,然后执行退出迁移任务相关的程序
  68. // 条件不成立:表示对象全局范围内的桶位尚未分配完毕,还有区间可分配~
  69. else if ((nextIndex = transferIndex) <= 0) {
  70. i = -1;
  71. // 推进标记设置为flase
  72. advance = false;
  73. }
  74. // -----------------------------------------------------------------------
  75. // CASE3,其实就是移动i的一个过程:CAS更新成功,则i从当前位置-1,再复习一下i的含义:
  76. // i 表示分配给当前线程的数据迁移任务,任务执行到的桶位下标(任务进度~表示当前线程的任务已经干到哪里了~)
  77. // CASE3前置条件(未经过CASE1-2):1、当前线程需要分配任务区间 2.全局范围内还有桶位尚未迁移
  78. // 条件成立:说明给当前线程分配任务成功
  79. // 条件失败:说明分配任务给当前线程失败,应该是和其它线程发生了竞争~
  80. else if (U.compareAndSwapInt
  81. (this, TRANSFERINDEX, nextIndex,
  82. nextBound = (nextIndex > stride ?
  83. nextIndex - stride : 0))) {
  84. bound = nextBound;
  85. i = nextIndex - 1;
  86. advance = false;
  87. }
  88. }
  89. /** 处理线程任务完成后,线程退出transfer方法的逻辑: **/
  90. // -------------------------------------------------------------------------
  91. // CASE4:
  92. // 条件一:i < 0
  93. // 成立:表示当前线程未分配到任务,或已完成了任务
  94. // 条件二、三:一般情况下不会成里~
  95. if (i < 0 || i >= n || i + n >= nextn) {
  96. // 如果一次遍历完成了
  97. // 也就是整个map所有桶中的元素都迁移完成了
  98. // 保存sizeCtl的变量
  99. int sc;
  100. // 扩容结束条件判断
  101. if (finishing) {
  102. // 如果全部桶中数据迁移完成了,则替换旧桶数组
  103. // 并设置下一次扩容门槛为新桶数组容量的0.75倍
  104. nextTable = null;
  105. table = nextTab;
  106. sizeCtl = (n << 1) - (n >>> 1);
  107. return;
  108. }
  109. // -------------------------------------------------------------------------
  110. // CASE5:当前线程扩容完成,把并发扩容的线程数-1,该操作基于CAS,修改成功返回true
  111. // 条件成立:说明,更新 sizeCtl低16位 -1 操作成功,当前线程可以正常退出了~
  112. // 如果条件不成立:说明CAS更新操作时,与其他线程发生竞争了~
  113. if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
  114. // 条件成立:说明当前线程不是最后一个退出transfer任务的线程
  115. // eg:
  116. // 1000 0000 0001 1011 0000 0000 0000 0000
  117. if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
  118. // 扩容完成两边肯定相等
  119. // 正常退出
  120. return;
  121. // 完成标记finishing置为true,,finishing为true才会走到上面的if扩容结束条件判断
  122. // 推进标记advance置为true
  123. finishing = advance = true;
  124. // i重新赋值为n
  125. // 这样会再重新遍历一次桶数组,看看是不是都迁移完成了
  126. // 也就是第二次遍历都会走到下面的(fh = f.hash) == MOVED这个条件
  127. i = n; // recheck before commit
  128. }
  129. }
  130. /** 线程处理一个桶位数据的迁移工作,处理完毕后, 设置advance为true: 表示继续推荐,然后就会回到for,继续循环 **/
  131. // -------------------------------------------------------------------------
  132. // CASE6:
  133. // 【CASE6~CASE8】前置条件:当前线程任务尚未处理完,正在进行中!
  134. // 条件成立:说明当前桶位未存放数据,只需要将此处设置为fwd节点即可。
  135. else if ((f = tabAt(tab, i)) == null)
  136. // 如果桶中无数据,直接放入FWD,标记该桶已迁移:
  137. // casTabAt通过CAS的方式去向Node数组指定位置i设置节点值,设置成功返回true,否则返回false
  138. // 即:将tab数组下标为i的结点设置为FWD结点
  139. advance = casTabAt(tab, i, null, fwd);
  140. // -------------------------------------------------------------------------
  141. // CASE7: (fh = f.hash) == MOVED 如果桶中第一个元素的hash值为MOVED
  142. // 条件成立:说明头节f它是ForwardingNode节点,
  143. // 则,当前桶位已经迁移过了,当前线程不用再处理了,直接再次更新当前线程任务索引,再次处理下一个桶位 或者 其它操作~
  144. else if ((fh = f.hash) == MOVED)
  145. advance = true; // already processed
  146. // -------------------------------------------------------------------------
  147. // CASE8:
  148. // 前置条件:**当前桶位有数据**,而且node节点 不是 fwd节点,说明这些数据需要迁移。
  149. else {
  150. // 锁定该桶并迁移元素:(sync 锁加在当前桶位的头结点)
  151. synchronized (f) {
  152. // 再次判断当前桶第一个元素是否有修改,(可能出现其它线程抢先一步迁移/修改了元素)
  153. // 防止在你加锁头对象之前,当前桶位的头对象被其它写线程修改过,导致你目前加锁对象错误...
  154. if (tabAt(tab, i) == f) {
  155. // 把一个链表拆分成两个链表(规则按照是桶中各元素的hash与桶大小n进行与操作):
  156. // 等于0的放到低位链表(low)中,不等于0的放到高位链表(high)中
  157. // 其中低位链表迁移到新桶中的位置相对旧桶不变
  158. // 高位链表迁移到新桶中位置正好是其在旧桶的位置加n
  159. // 这也正是为什么扩容时容量在变成两倍的原因 (链表迁移原理参考上面的图)
  160. // ln 表示低位链表引用
  161. // hn 表示高位链表引用
  162. Node<K,V> ln, hn;
  163. // ---------------------------------------------------------------
  164. // CASE9:
  165. // 条件成立:表示当前桶位是链表桶位
  166. if (fh >= 0) {
  167. // 第一个元素的hash值大于等于0,说明该桶中元素是以链表形式存储的
  168. // 这里与HashMap迁移算法基本类似,唯一不同的是多了一步寻找lastRun
  169. // 这里的lastRun是提取出链表后面不用处理再特殊处理的子链表
  170. // 比如所有元素的hash值与桶大小n与操作后的值分别为 0 0 4 4 0 0 0
  171. // 则最后后面三个0对应的元素肯定还是在同一个桶中
  172. // 这时lastRun对应的就是倒数第三个节点
  173. // 至于为啥要这样处理,我也没太搞明白
  174. // lastRun机制:
  175. // 可以获取出 当前链表 末尾连续高位不变的 node
  176. int runBit = fh & n;
  177. Node<K,V> lastRun = f;
  178. for (Node<K,V> p = f.next; p != null; p = p.next) {
  179. int b = p.hash & n;
  180. if (b != runBit) {
  181. runBit = b;
  182. lastRun = p;
  183. }
  184. }
  185. // 看看最后这几个元素归属于低位链表还是高位链表
  186. // 条件成立:说明lastRun引用的链表为 低位链表,那么就让 ln 指向 低位链表
  187. if (runBit == 0) {
  188. ln = lastRun;
  189. hn = null;
  190. }
  191. // 否则,说明lastRun引用的链表为 高位链表,就让 hn 指向 高位链表
  192. else {
  193. hn = lastRun;
  194. ln = null;
  195. }
  196. // 遍历链表,把hash&n为0的放在低位链表中,不为0的放在高位链表中
  197. // 循环跳出条件:当前循环结点!=lastRun
  198. for (Node<K,V> p = f; p != lastRun; p = p.next) {
  199. int ph = p.hash; K pk = p.key; V pv = p.val;
  200. if ((ph & n) == 0)
  201. ln = new Node<K,V>(ph, pk, pv, ln);
  202. else
  203. hn = new Node<K,V>(ph, pk, pv, hn);
  204. }
  205. // 低位链表的位置不变
  206. setTabAt(nextTab, i, ln);
  207. // 高位链表的位置是:原位置 + n
  208. setTabAt(nextTab, i + n, hn);
  209. // 标记当前桶已迁移
  210. setTabAt(tab, i, fwd);
  211. // advance为true,返回上面进行--i操作
  212. advance = true;
  213. }
  214. // ---------------------------------------------------------------
  215. // CASE10:
  216. // 条件成立:表示当前桶位是 红黑树 代理结点TreeBin
  217. else if (f instanceof TreeBin) {
  218. // 如果第一个元素是树节点,也是一样,分化成两颗树
  219. // 也是根据hash&n为0放在低位树中,不为0放在高位树中
  220. // 转换头结点为 treeBin引用 t
  221. TreeBin<K,V> t = (TreeBin<K,V>)f;
  222. // 低位双向链表 lo 指向低位链表的头 loTail 指向低位链表的尾巴
  223. TreeNode<K,V> lo = null, loTail = null;
  224. // 高位双向链表 lo 指向高位链表的头 loTail 指向高位链表的尾巴
  225. TreeNode<K,V> hi = null, hiTail = null;
  226. // lc 表示低位链表元素数量
  227. // hc 表示高位链表元素数量
  228. int lc = 0, hc = 0;
  229. // 遍历整颗树(TreeBin中的双向链表,从头结点至尾节点),根据hash&n是否为0分化成两颗树:
  230. for (Node<K,V> e = t.first; e != null; e = e.next) {
  231. // h 表示循环处理当前元素的 hash
  232. int h = e.hash;
  233. // 使用当前节点 构建出来的新的TreeNode
  234. TreeNode<K,V> p = new TreeNode<K,V>
  235. (h, e.key, e.val, null, null);
  236. // ---------------------------------------------------
  237. // CASE11:
  238. // 条件成立:表示当前循环节点 属于低位链节点
  239. if ((h & n) == 0) {
  240. // 条件成立:说明当前低位链表还没有数据
  241. if ((p.prev = loTail) == null)
  242. lo = p;
  243. // 说明低位链表已经有数据了,此时当前元素追加到低位链表的末尾就行了
  244. else
  245. loTail.next = p;
  246. // 将低位链表尾指针指向 p 节点
  247. loTail = p;
  248. ++lc;
  249. }
  250. // ---------------------------------------------------
  251. // CASE12:
  252. // 条件成立:当前节点属于高位链节点
  253. else {
  254. if ((p.prev = hiTail) == null)
  255. hi = p;
  256. else
  257. hiTail.next = p;
  258. hiTail = p;
  259. ++hc;
  260. }
  261. }
  262. // 如果分化的树中元素个数小于等于6,则退化成链表
  263. ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
  264. (hc != 0) ? new TreeBin<K,V>(lo) : t;
  265. hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
  266. (lc != 0) ? new TreeBin<K,V>(hi) : t;
  267. // 低位树的位置不变
  268. setTabAt(nextTab, i, ln);
  269. // 高位树的位置是原位置加n
  270. setTabAt(nextTab, i + n, hn);
  271. // 标记该桶已迁移
  272. setTabAt(tab, i, fwd);
  273. // advance为true,返回上面进行--i操作
  274. advance = true;
  275. }
  276. }
  277. }
  278. }
  279. }
  280. }

补充,lastRun机制示意图:

在这里插入图片描述

小节

(1)新桶数组大小是旧桶数组的两倍;

(2)迁移元素先从靠后的桶开始;

(3)迁移完成的桶在里面放置一ForwardingNode类型的元素,标记该桶迁移完成;

(4)迁移时根据hash&n是否等于0把桶中元素分化成两个链表或树;

(5)低位链表(树)存储在原来的位置;

(6)高们链表(树)存储在原来的位置加n的位置;

(7)迁移元素时会锁住当前桶,也是分段锁的思想;

2、helpTransfer方法

helpTransfer:协助扩容(迁移元素),当线程添加元素时发现table正在扩容,且当前元素所在的桶元素已经迁移完成了,则协助迁移其它桶的元素。当前桶元素迁移完成了才去协助迁移其它桶元素!

  1. final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
  2. // nextTab 引用的是 fwd.nextTable == map.nextTable 理论上是这样。
  3. // sc 保存map.sizeCtl
  4. Node<K,V>[] nextTab; int sc;
  5. // CASE0: 如果桶数组不为空,并且当前桶第一个元素为ForwardingNode类型,并且nextTab不为空
  6. // 说明当前桶已经迁移完毕了,才去帮忙迁移其它桶的元素
  7. // 扩容时会把旧桶的第一个元素置为ForwardingNode,并让其nextTab指向新桶数组
  8. // 条件一:tab != null 恒成立 true
  9. // 条件二:(f instanceof ForwardingNode) 恒成立 true
  10. // 条件三:((ForwardingNode<K,V>)f).nextTable) != null 恒成立 true
  11. if (tab != null && (f instanceof ForwardingNode) &&
  12. (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
  13. // 根据前表的长度tab.length去获取扩容唯一标识戳,假设 16 -> 32 扩容:1000 0000 0001 1011
  14. int rs = resizeStamp(tab.length);
  15. // 条件一:nextTab == nextTable
  16. // 成立:表示当前扩容正在进行中
  17. // 不成立:1.nextTable被设置为Null了,扩容完毕后,会被设为Null
  18. // 2.再次出发扩容了...咱们拿到的nextTab 也已经过期了...
  19. // 条件二:table == tab
  20. // 成立:说明 扩容正在进行中,还未完成
  21. // 不成立:说明扩容已经结束了,扩容结束之后,最后退出的线程 会设置 nextTable 为 table
  22. // 条件三:(sc = sizeCtl) < 0
  23. // 成立:说明扩容正在进行中
  24. // 不成立:说明sizeCtl当前是一个大于0的数,此时代表下次扩容的阈值,当前扩容已经结束。
  25. while (nextTab == nextTable && table == tab &&
  26. (sc = sizeCtl) < 0) {
  27. // 条件一:(sc >>> RESIZE_STAMP_SHIFT) != rs
  28. // true -> 说明当前线程获取到的扩容唯一标识戳 非 本批次扩容
  29. // false -> 说明当前线程获取到的扩容唯一标识戳 是 本批次扩容
  30. // 条件二:JDK1.8 中有bug jira已经提出来了 其实想表达的是 = sc == (rs << 16 ) + 1
  31. // true -> 表示扩容完毕,当前线程不需要再参与进来了
  32. // false -> 扩容还在进行中,当前线程可以参与
  33. // 条件三:JDK1.8 中有bug jira已经提出来了 其实想表达的是 = sc == (rs<<16) + MAX_RESIZERS
  34. // true -> 表示当前参与并发扩容的线程达到了最大值 65535 - 1
  35. // false -> 表示当前线程可以参与进来
  36. // 条件四:transferIndex <= 0
  37. // true -> 说明map对象全局范围内的任务已经分配完了,当前线程进去也没活干..
  38. // false -> 还有任务可以分配。
  39. if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
  40. sc == rs + MAX_RESIZERS || transferIndex <= 0)
  41. break;
  42. // 扩容线程数加1
  43. if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
  44. // 当前线程帮忙迁移元素
  45. transfer(tab, nextTab);
  46. break;
  47. }
  48. }
  49. return nextTab;
  50. }
  51. return table;
  52. }

这块代码逻辑很难理解,可以结合着ConcurrentHashMap面试题去巩固一下,重新吸收~

发表评论

表情:
评论列表 (有 0 条评论,96人围观)

还没有评论,来说两句吧...

相关阅读