多线程并发AQS源码分析

古城微笑少年丶 2022-09-12 14:53 298阅读 0赞

一、业务场景描述

业务场景,银行中存在一个办理业务的窗口,A、B、C三个用户(线程)去办理业务,先是A用户去窗口办理业务,然后B、C在等候区排队等候,过一会儿,A用户(线程)办理完业务,B用户(线程)去业务窗口办理业务,以此类推C用户一样。

二、分析之前上分析图

watermark_type_ZHJvaWRzYW5zZmFsbGJhY2s_shadow_50_text_Q1NETiBA6ZOB5rGJ5p-U5oOFbGk_size_20_color_FFFFFF_t_70_g_se_x_16

watermark_type_ZHJvaWRzYW5zZmFsbGJhY2s_shadow_50_text_Q1NETiBA6ZOB5rGJ5p-U5oOFbGk_size_20_color_FFFFFF_t_70_g_se_x_16 1

三、源码分析

  1. #(1)A线程开始工作,第一步调用lock方法
  2. public void lock() {
  3. sync.lock();
  4. }
  5. #其中:sync继承 AbstractQueuedSynchronizer——(AQS)
  6. abstract static class Sync extends AbstractQueuedSynchronizer
  7. #(2)进入lock方法体
  8. final void lock() {
  9. #比较并设置state的状态为1;1-表示占用,0-表示未占用
  10. if (compareAndSetState(0, 1))
  11. setExclusiveOwnerThread(Thread.currentThread());
  12. else
  13. acquire(1);
  14. }
  15. #(3)进入setExclusiveOwnerThread()设置当前拥有独占访问权限的线程。
  16. 到此A线程开始在窗口执行工作,此代码块到此执行完成。
  17. #(4)用户(线程)B开启
  18. 进入方法
  19. final void lock() {
  20. if (compareAndSetState(0, 1))
  21. setExclusiveOwnerThread(Thread.currentThread());
  22. else
  23. #进入这个代码块
  24. acquire(1);
  25. }
  26. #(5)acquire的方法结构
  27. public final void acquire(int arg) {
  28. if (!tryAcquire(arg) &&
  29. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  30. selfInterrupt();
  31. }
  32. protected final boolean tryAcquire(int acquires) {
  33. return nonfairTryAcquire(acquires);
  34. }
  35. #(6)先进入tryAcquire()方法解析
  36. protected final boolean tryAcquire(int acquires) {
  37. return nonfairTryAcquire(acquires);
  38. }
  39. #(7)进入nonfairTryAcquire方法 传入参数为1
  40. final boolean nonfairTryAcquire(int acquires) {
  41. final Thread current = Thread.currentThread();
  42. #c=2不进入
  43. int c = getState();
  44. if (c == 0) {
  45. if (compareAndSetState(0, acquires)) {
  46. setExclusiveOwnerThread(current);
  47. return true;
  48. }
  49. }
  50. #current为B线程,getExclusiveOwnerThread()为A线程
  51. #所以也不进入
  52. else if (current == getExclusiveOwnerThread()) {
  53. int nextc = c + acquires;
  54. if (nextc < 0) // overflow
  55. throw new Error("Maximum lock count exceeded");
  56. setState(nextc);
  57. return true;
  58. }
  59. #返回false
  60. return false;
  61. }
  62. #(8)向下执行addWaiter()方法
  63. public final void acquire(int arg) {
  64. if (!tryAcquire(arg) &&
  65. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  66. selfInterrupt();
  67. }
  68. #(9)进入方法体addWaiter()
  69. private Node addWaiter(Node mode) {
  70. Node node = new Node(Thread.currentThread(), mode);
  71. // Try the fast path of enq; backup to full enq on failure
  72. Node pred = tail;
  73. #第一次进入pred 为空所以不进入
  74. if (pred != null) {
  75. node.prev = pred;
  76. if (compareAndSetTail(pred, node)) {
  77. pred.next = node;
  78. return node;
  79. }
  80. }
  81. #进入下面enq
  82. enq(node);
  83. return node;
  84. }
  85. #(10)进入enq方法
  86. private Node enq(final Node node) {
  87. for (;;) {
  88. Node t = tail;
  89. #满足条件进入
  90. if (t == null) { // Must initialize
  91. #创建一个哨兵节点,头节点的地址指向一个空node(哨兵节点)
  92. if (compareAndSetHead(new Node()))
  93. #刚刚创建的头节点指向尾节点
  94. tail = head;
  95. } else {
  96. node.prev = t;
  97. if (compareAndSetTail(t, node)) {
  98. t.next = node;
  99. return t;
  100. }
  101. }
  102. }
  103. }
  104. #进入二次循环
  105. private Node enq(final Node node) {
  106. for (;;) {
  107. Node t = tail;
  108. #条件不满足,因为上次循环已经初始化
  109. if (t == null) { // Must initialize
  110. if (compareAndSetHead(new Node()))
  111. #刚刚创建的头节点指向尾节点
  112. tail = head;
  113. #满足else条件进入下面代码块
  114. } else {
  115. #哨兵节点的前指针指向初始节点
  116. node.prev = t;
  117. #进入下面代码compareAndSetTail,将尾指针指向新的B节点
  118. #
  119. if (compareAndSetTail(t, node)) {
  120. #将哨兵节点的后指针指向node(B节点)
  121. t.next = node;
  122. #返回哨兵节点,然后此方法结束
  123. return t;
  124. }
  125. }
  126. }
  127. }
  128. #(11)addWaiter方法结束,放回B的node节点
  129. #(12)进入acquireQueued方法,注意当前还处于B线程中
  130. public final void acquire(int arg) {
  131. if (!tryAcquire(arg) &&
  132. #arg=1,addWaiter(Node.EXCLUSIVE)为B节点
  133. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  134. selfInterrupt();
  135. }
  136. #(13)进入acquireQueued方法
  137. final boolean acquireQueued(final Node node, int arg) {
  138. boolean failed = true;
  139. try {
  140. boolean interrupted = false;
  141. for (;;) {
  142. #返回前置节点,也就是哨兵节点
  143. final Node p = node.predecessor();
  144. #p == head 结果为true
  145. if (p == head && tryAcquire(arg)) {
  146. setHead(node);
  147. p.next = null; // help GC
  148. failed = false;
  149. return interrupted;
  150. }
  151. if (shouldParkAfterFailedAcquire(p, node) &&
  152. parkAndCheckInterrupt())
  153. interrupted = true;
  154. }
  155. } finally {
  156. if (failed)
  157. cancelAcquire(node);
  158. }
  159. }
  160. #(14)分析一下tryAcquire(arg)方法
  161. protected final boolean tryAcquire(int acquires) {
  162. return nonfairTryAcquire(acquires);
  163. }
  164. #进入nonfairTryAcquire(acquires);方法
  165. final boolean nonfairTryAcquire(int acquires) {
  166. final Thread current = Thread.currentThread();
  167. #state = 1;
  168. int c = getState();
  169. #故不满足条件,不进入
  170. if (c == 0) {
  171. if (compareAndSetState(0, acquires)) {
  172. setExclusiveOwnerThread(current);
  173. return true;
  174. }
  175. }
  176. #current为B线程,getExclusiveOwnerThread()为A线程,故不进入这个代码块
  177. else if (current == getExclusiveOwnerThread()) {
  178. int nextc = c + acquires;
  179. if (nextc < 0) // overflow
  180. throw new Error("Maximum lock count exceeded");
  181. setState(nextc);
  182. return true;
  183. }
  184. #返回false
  185. return false;
  186. }
  187. #(15)拿到tryAcquire的结果后继续分析acquireQueued方法
  188. final boolean acquireQueued(final Node node, int arg) {
  189. boolean failed = true;
  190. try {
  191. boolean interrupted = false;
  192. for (;;) {
  193. final Node p = node.predecessor();
  194. #条件不满足,p == head结果为true,tryAcquire(arg)结果为false,所以不进入if
  195. if (p == head && tryAcquire(arg)) {
  196. setHead(node);
  197. p.next = null; // help GC
  198. failed = false;
  199. return interrupted;
  200. }
  201. #进入shouldParkAfterFailedAcquire(p, node) 方法
  202. if (shouldParkAfterFailedAcquire(p, node) &&
  203. parkAndCheckInterrupt())
  204. interrupted = true;
  205. }
  206. } finally {
  207. if (failed)
  208. cancelAcquire(node);
  209. }
  210. }
  211. #(16)分析shouldParkAfterFailedAcquire(p, node)方法,p为哨兵节点,node为B节点
  212. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  213. #ws为0
  214. int ws = pred.waitStatus;
  215. if (ws == Node.SIGNAL)
  216. /*
  217. * This node has already set status asking a release
  218. * to signal it, so it can safely park.
  219. */
  220. return true;
  221. if (ws > 0) {
  222. /*
  223. * Predecessor was cancelled. Skip over predecessors and
  224. * indicate retry.
  225. */
  226. do {
  227. node.prev = pred = pred.prev;
  228. } while (pred.waitStatus > 0);
  229. pred.next = node;
  230. #进入else
  231. } else {
  232. /*
  233. * waitStatus must be 0 or PROPAGATE. Indicate that we
  234. * need a signal, but don't park yet. Caller will need to
  235. * retry to make sure it cannot acquire before parking.
  236. */
  237. #将哨兵节点的waitStatus设置成-1
  238. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  239. }
  240. #最后返回false
  241. return false;
  242. }
  243. #(17)拿到shouldParkAfterFailedAcquire(p, node)的返回结果为false,继续分析acquireQueued(final Node node, int arg)方法
  244. final boolean acquireQueued(final Node node, int arg) {
  245. boolean failed = true;
  246. try {
  247. boolean interrupted = false;
  248. for (;;) {
  249. final Node p = node.predecessor();
  250. if (p == head && tryAcquire(arg)) {
  251. setHead(node);
  252. p.next = null; // help GC
  253. failed = false;
  254. return interrupted;
  255. }
  256. #shouldParkAfterFailedAcquire(p, node) 结果为false不进入
  257. if (shouldParkAfterFailedAcquire(p, node) &&
  258. parkAndCheckInterrupt())
  259. interrupted = true;
  260. }
  261. } finally {
  262. if (failed)
  263. cancelAcquire(node);
  264. }
  265. }
  266. #(18)再次执行acquireQueued的第二次循环
  267. final boolean acquireQueued(final Node node, int arg) {
  268. boolean failed = true;
  269. try {
  270. boolean interrupted = false;
  271. for (;;) {
  272. final Node p = node.predecessor();
  273. #p == head满足,同样tryAcquire(arg)返回false不满足,不进入
  274. if (p == head && tryAcquire(arg)) {
  275. setHead(node);
  276. p.next = null; // help GC
  277. failed = false;
  278. return interrupted;
  279. }
  280. #进入这个节点继续分析
  281. if (shouldParkAfterFailedAcquire(p, node) &&
  282. parkAndCheckInterrupt())
  283. interrupted = true;
  284. }
  285. } finally {
  286. if (failed)
  287. cancelAcquire(node);
  288. }
  289. }
  290. #(19)再次分析shouldParkAfterFailedAcquire(p, node)方法
  291. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  292. #等于-1
  293. int ws = pred.waitStatus;
  294. #符合条件进入
  295. if (ws == Node.SIGNAL)
  296. /*
  297. * This node has already set status asking a release
  298. * to signal it, so it can safely park.
  299. */
  300. #返回true
  301. return true;
  302. if (ws > 0) {
  303. /*
  304. * Predecessor was cancelled. Skip over predecessors and
  305. * indicate retry.
  306. */
  307. do {
  308. node.prev = pred = pred.prev;
  309. } while (pred.waitStatus > 0);
  310. pred.next = node;
  311. } else {
  312. /*
  313. * waitStatus must be 0 or PROPAGATE. Indicate that we
  314. * need a signal, but don't park yet. Caller will need to
  315. * retry to make sure it cannot acquire before parking.
  316. */
  317. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  318. }
  319. return false;
  320. }
  321. #(20)根据shouldParkAfterFailedAcquire返回的结果(true),继续执行下一步parkAndCheckInterrupt()
  322. final boolean acquireQueued(final Node node, int arg) {
  323. boolean failed = true;
  324. try {
  325. boolean interrupted = false;
  326. for (;;) {
  327. final Node p = node.predecessor();
  328. if (p == head && tryAcquire(arg)) {
  329. setHead(node);
  330. p.next = null; // help GC
  331. failed = false;
  332. return interrupted;
  333. }
  334. if (shouldParkAfterFailedAcquire(p, node) &&
  335. parkAndCheckInterrupt())
  336. interrupted = true;
  337. }
  338. } finally {
  339. if (failed)
  340. cancelAcquire(node);
  341. }
  342. }
  343. #当前的B线程开始阻塞,因为执行了LockSupport.park(this);待A线程结束的时候,唤醒B
  344. private final boolean parkAndCheckInterrupt() {
  345. LockSupport.park(this);
  346. return Thread.interrupted();
  347. }
  348. #(21)C线程也是像B一样,最后阻塞等待被唤醒
  349. private final boolean parkAndCheckInterrupt() {
  350. LockSupport.park(this);
  351. #如果当前线程已被中断,则为true ; 否则为false
  352. return Thread.interrupted();
  353. }
  354. #(22)此时A线业务办理完成,调用lock.unlock();方法解锁
  355. public void unlock() {
  356. sync.release(1);
  357. }
  358. #进入release方法,arg = 1
  359. public final boolean release(int arg) {
  360. if (tryRelease(arg)) {
  361. Node h = head;
  362. if (h != null && h.waitStatus != 0)
  363. unparkSuccessor(h);
  364. return true;
  365. }
  366. return false;
  367. }
  368. #(23)进入tryRelease(int releases)方法,releases = 1;
  369. protected final boolean tryRelease(int releases) {
  370. # c = 0;
  371. int c = getState() - releases;
  372. #相等不进入
  373. if (Thread.currentThread() != getExclusiveOwnerThread())
  374. throw new IllegalMonitorStateException();
  375. boolean free = false;
  376. #进入
  377. if (c == 0) {
  378. free = true;
  379. #将原来的A线程变为null
  380. setExclusiveOwnerThread(null);
  381. }
  382. #设置当前状态为0;state = 1
  383. setState(c);
  384. #返回free = true;
  385. return free;
  386. }
  387. #(24)返回release(int arg)方法,继续向下执行
  388. public final boolean release(int arg) {
  389. #返回true进入
  390. if (tryRelease(arg)) {
  391. Node h = head;
  392. #此时h为哨兵节点,h.waitStatus 为 -1;所以进入
  393. if (h != null && h.waitStatus != 0)
  394. #接下来看该方法
  395. unparkSuccessor(h);
  396. return true;
  397. }
  398. return false;
  399. }
  400. #(25)解析unparkSuccessor(Node node)方法,其中node为哨兵节点
  401. private void unparkSuccessor(Node node) {
  402. /*
  403. * If status is negative (i.e., possibly needing signal) try
  404. * to clear in anticipation of signalling. It is OK if this
  405. * fails or if status is changed by waiting thread.
  406. */
  407. int ws = node.waitStatus;
  408. if (ws < 0)
  409. #进入,将哨兵node节点的waitStatus,由-1改为0
  410. compareAndSetWaitStatus(node, ws, 0);
  411. /*
  412. * Thread to unpark is held in successor, which is normally
  413. * just the next node. But if cancelled or apparently null,
  414. * traverse backwards from tail to find the actual
  415. * non-cancelled successor.
  416. */
  417. #s为B结点
  418. Node s = node.next;
  419. #s!=null,所以不进入
  420. if (s == null || s.waitStatus > 0) {
  421. s = null;
  422. for (Node t = tail; t != null && t != node; t = t.prev)
  423. if (t.waitStatus <= 0)
  424. s = t;
  425. }
  426. #进入
  427. if (s != null)
  428. #唤醒B进程
  429. LockSupport.unpark(s.thread);
  430. }
  431. #(26)到此lock.unlock()方法执行完毕,执行结果:A线程执行完成,B线程开始执行;
  432. #(27)B线程被唤醒后,继续执行后面的代码
  433. private final boolean parkAndCheckInterrupt() {
  434. LockSupport.park(this);
  435. return Thread.interrupted();
  436. }
  437. #(28)继续执行for循环,node为B节点,arg为1
  438. final boolean acquireQueued(final Node node, int arg) {
  439. boolean failed = true;
  440. try {
  441. boolean interrupted = false;
  442. for (;;) {
  443. final Node p = node.predecessor();
  444. #再次进入tryAcquire(arg)
  445. if (p == head && tryAcquire(arg)) {
  446. setHead(node);
  447. p.next = null; // help GC
  448. failed = false;
  449. return interrupted;
  450. }
  451. if (shouldParkAfterFailedAcquire(p, node) &&
  452. parkAndCheckInterrupt())
  453. interrupted = true;
  454. }
  455. } finally {
  456. if (failed)
  457. cancelAcquire(node);
  458. }
  459. }
  460. #的内部方法
  461. protected final boolean tryAcquire(int acquires) {
  462. return nonfairTryAcquire(acquires);
  463. }
  464. #(29)再次分析nonfairTryAcquire(acquires)方法;acquires = 1;
  465. final boolean nonfairTryAcquire(int acquires) {
  466. #B线程
  467. final Thread current = Thread.currentThread();
  468. # c = 0;
  469. int c = getState();
  470. #进入条件
  471. if (c == 0) {
  472. #修改状态为1
  473. if (compareAndSetState(0, acquires)) {
  474. #设置当前线程B为有独占访问权限的线程
  475. setExclusiveOwnerThread(current);
  476. #设置返回值为true
  477. return true;
  478. }
  479. }
  480. else if (current == getExclusiveOwnerThread()) {
  481. int nextc = c + acquires;
  482. if (nextc < 0) // overflow
  483. throw new Error("Maximum lock count exceeded");
  484. setState(nextc);
  485. return true;
  486. }
  487. return false;
  488. }
  489. #tryAcquire(int acquires) 返回true
  490. protected final boolean tryAcquire(int acquires) {
  491. return nonfairTryAcquire(acquires);
  492. }
  493. #(30)返回上层方法acquireQueued(final Node node, int arg),node为B节点,arg为1
  494. final boolean acquireQueued(final Node node, int arg) {
  495. boolean failed = true;
  496. try {
  497. boolean interrupted = false;
  498. for (;;) {
  499. final Node p = node.predecessor();
  500. #p==head = true; tryAcquire(arg))=true
  501. if (p == head && tryAcquire(arg)) {
  502. #设置B节点为头节点
  503. setHead(node);
  504. #哨兵节点的后指针为null,此时的哨兵node,为不可达状态,gc时会被回收掉
  505. p.next = null; // help GC
  506. failed = false;
  507. return interrupted;
  508. }
  509. if (shouldParkAfterFailedAcquire(p, node) &&
  510. parkAndCheckInterrupt())
  511. interrupted = true;
  512. }
  513. } finally {
  514. if (failed)
  515. cancelAcquire(node);
  516. }
  517. }
  518. #node为B节点
  519. private void setHead(Node node) {
  520. #B节点为head节点
  521. head = node;
  522. node.thread = null;
  523. #B节点取消前置节点
  524. node.prev = null;
  525. }
  526. #(31)到此lock.lock()方法执行结束。

发表评论

表情:
评论列表 (有 0 条评论,298人围观)

还没有评论,来说两句吧...

相关阅读

    相关 分析:AQS

    在开始这篇源码之前,最好先看下转载整理的[这篇文章][Link 1],有很多值得学习的地方。AQS是用来构建锁或者其他同步组件的基础框架。总体来说,它使用一个 int 成员变量

    相关 并发-AQS分析

    微信搜索:“二十同学” 公众号,欢迎关注一条不一样的成长之路 一、概述   谈到并发,不得不谈ReentrantLock;而谈到ReentrantLock,不得不谈