【并发编程】ReentrantReadWriteLock 源码分析

短命女 2023-01-12 01:48 264阅读 0赞

前言

Github:https://github.com/yihonglei/jdk-source-code-reading(java-concurrent)

一 ReentrantReadWriteLock 概述

ReentrantReadWriteLock 基于AQS 实现的读写锁,应用于读多写少的场景。

1、写锁(互斥锁)

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3lobF9qeHk_size_16_color_FFFFFF_t_70

1)判断是否已经存在锁,如果有锁,判断是否是重入锁,是,加锁成功,否则加锁失败,基于当前线程构建独占式节点进入等待队列,

线程挂起,等待被唤醒尝试重新获取锁;

2)如果无锁,尝试加锁,加锁成功返回,否则基于当前线程构建独占式节点进入等待队列,线程挂起,等待被唤醒尝试重新获取锁;

2、读锁(共享锁)

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3lobF9qeHk_size_16_color_FFFFFF_t_70 1

1)判断是否有写锁,如果存在写锁,基于当前线程构建共享式节点进入等待队列,线程挂起,等待被唤醒尝试重新获取锁;

2)尝试获取锁,如果获取锁失败,基于当前线程构建共享式节点进入等待队列,线程挂起,等待被唤醒尝试重新获取锁;

二 ReentrantReadWriteLock 实例

  1. package com.jpeony.concurrent.locks.reentrantreadwritelock;
  2. import java.util.concurrent.locks.ReadWriteLock;
  3. import java.util.concurrent.locks.ReentrantReadWriteLock;
  4. /**
  5. * 写锁(互斥锁):当有读或写锁,不能加写锁,进入队列等待;
  6. * 读锁(共享锁):当有写锁,不能加读锁,进入队列等待;
  7. * @author yihonglei
  8. */
  9. public class ReentrantReadWriteLockSimple {
  10. private final static ReadWriteLock lock = new ReentrantReadWriteLock();
  11. public static void main(String[] args) {
  12. Thread writeThread = new Thread(new WriteTask());
  13. Thread readThread1 = new Thread(new ReadTask1());
  14. Thread readThread2 = new Thread(new ReadTask2());
  15. // writeThread.start();
  16. readThread1.start();
  17. readThread2.start();
  18. }
  19. private static class WriteTask implements Runnable {
  20. @Override
  21. public void run() {
  22. try {
  23. System.out.println("写-WriteTask-lock");
  24. lock.writeLock().lock();
  25. System.out.println("写-WriteTask-start");
  26. Thread.sleep(10000);
  27. System.out.println("写-WriteTask-end");
  28. } catch (InterruptedException e) {
  29. // ignore
  30. } finally {
  31. lock.writeLock().unlock();
  32. }
  33. }
  34. }
  35. private static class ReadTask1 implements Runnable {
  36. @Override
  37. public void run() {
  38. try {
  39. System.out.println("读-ReadTask1-lock");
  40. lock.readLock().lock();
  41. System.out.println("读-ReadTask1-start");
  42. Thread.sleep(5000);
  43. System.out.println("读-ReadTask1-end");
  44. } catch (InterruptedException e) {
  45. // ignore
  46. } finally {
  47. lock.readLock().unlock();
  48. }
  49. }
  50. }
  51. private static class ReadTask2 implements Runnable {
  52. @Override
  53. public void run() {
  54. try {
  55. System.out.println("读-ReadTask2-lock");
  56. lock.readLock().lock();
  57. System.out.println("读-ReadTask2-start");
  58. Thread.sleep(1000);
  59. System.out.println("读-ReadTask2-end");
  60. } catch (InterruptedException e) {
  61. // ignore
  62. } finally {
  63. lock.readLock().unlock();
  64. }
  65. }
  66. }
  67. }

启动 writeThread 和 readThread1 可以测试 加写锁未释放,这个时候加读锁被阻塞,直到写锁释放,读锁才能获取成功;

启动 readThread1 和 readThread2 可以测试 读锁共享的特性,可以感受在线程加读锁未释放时,别的线程也可以加读锁成功;

启动 readThread1 和 writeThread 调下顺序,并调整读线程 sleep 时间,通常情况下可以按顺序执行,也可以控制执行顺序,

可以看到读锁加锁时,写锁被阻塞;

也可以增加多个写线程,测试写锁互斥被阻塞。

三 ReentrantReadWriteLock 源码分析

1、构造方法

默认fair 是 false,为非公平锁,如果想用公平锁,fair 传 true。

  1. public ReentrantReadWriteLock(boolean fair) {
  2. sync = fair ? new FairSync() : new NonfairSync();
  3. readerLock = new ReadLock(this);
  4. writerLock = new WriteLock(this);
  5. }

ReentrantReadWriteLock 实现了 WriteLock 和 ReadLock,共享 valatile 的 state 资源。

2、如何实现加写锁互斥?

  1. public final void acquire(int arg) {
  2. // 加锁
  3. if (!tryAcquire(arg) &&
  4. // 加锁失败,构建独占式节点进入队列,线程挂起,等待被唤醒尝试重新获取锁
  5. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  6. selfInterrupt();
  7. }
  8. protected final boolean tryAcquire(int acquires) {
  9. /*
  10. * Walkthrough:
  11. * 1. If read count nonzero or write count nonzero
  12. * and owner is a different thread, fail.
  13. * 2. If count would saturate, fail. (This can only
  14. * happen if count is already nonzero.)
  15. * 3. Otherwise, this thread is eligible for lock if
  16. * it is either a reentrant acquire or
  17. * queue policy allows it. If so, update state
  18. * and set owner.
  19. */
  20. // 获取当前线程
  21. Thread current = Thread.currentThread();
  22. // 获取共享资源
  23. int c = getState();
  24. int w = exclusiveCount(c);
  25. // c 大于 0,说明这个时候有已经存在读锁或写锁,不进行加锁操作
  26. if (c != 0) {
  27. // (Note: if c != 0 and w == 0 then shared count != 0)
  28. // 如果不存在写锁 或 不是重入锁,返回加锁失败
  29. if (w == 0 || current != getExclusiveOwnerThread())
  30. return false;
  31. // 如果是重入锁,判断重入次数是否大于最大次数,异常则返回
  32. if (w + exclusiveCount(acquires) > MAX_COUNT)
  33. throw new Error("Maximum lock count exceeded");
  34. // Reentrant acquire
  35. // 重入锁 state 加 1,返回成功
  36. setState(c + acquires);
  37. return true;
  38. }
  39. // 加锁操作,writerShouldBlock 如果非公平,返回 false,如果是公平需要判断
  40. // 等待队列头结点是不是当前线程,不是返回 false,因为要保证公平的顺序性,
  41. // CAS 操作加锁状态
  42. if (writerShouldBlock() ||
  43. !compareAndSetState(c, c + acquires))
  44. return false;
  45. // 写锁是独占的,构建独占线程
  46. setExclusiveOwnerThread(current);
  47. return true;
  48. }

3、如何实现读锁共享加锁?

  1. public final void acquireShared(int arg) {
  2. // 加锁
  3. if (tryAcquireShared(arg) < 0)
  4. // 获取失败,构建共享式节点进入队列,线程挂起,等待被缓存尝试重新获取锁
  5. doAcquireShared(arg);
  6. }
  7. protected final int tryAcquireShared(int unused) {
  8. /*
  9. * Walkthrough:
  10. * 1. If write lock held by another thread, fail.
  11. * 2. Otherwise, this thread is eligible for
  12. * lock wrt state, so ask if it should block
  13. * because of queue policy. If not, try
  14. * to grant by CASing state and updating count.
  15. * Note that step does not check for reentrant
  16. * acquires, which is postponed to full version
  17. * to avoid having to check hold count in
  18. * the more typical non-reentrant case.
  19. * 3. If step 2 fails either because thread
  20. * apparently not eligible or CAS fails or count
  21. * saturated, chain to version with full retry loop.
  22. */
  23. Thread current = Thread.currentThread();
  24. // 获取共享资源
  25. int c = getState();
  26. // 判断是否有独占线程,即是否有写锁存在,如果有,返回获取锁失败
  27. if (exclusiveCount(c) != 0 &&
  28. getExclusiveOwnerThread() != current)
  29. return -1;
  30. // 获取锁并返回
  31. int r = sharedCount(c);
  32. if (!readerShouldBlock() &&
  33. r < MAX_COUNT &&
  34. compareAndSetState(c, c + SHARED_UNIT)) {
  35. if (r == 0) {
  36. firstReader = current;
  37. firstReaderHoldCount = 1;
  38. } else if (firstReader == current) {
  39. firstReaderHoldCount++;
  40. } else {
  41. HoldCounter rh = cachedHoldCounter;
  42. if (rh == null || rh.tid != getThreadId(current))
  43. cachedHoldCounter = rh = readHolds.get();
  44. else if (rh.count == 0)
  45. readHolds.set(rh);
  46. rh.count++;
  47. }
  48. return 1;
  49. }
  50. return fullTryAcquireShared(current);
  51. }

4、写锁释放锁?

  1. public final boolean release(int arg) {
  2. // 释放锁成功
  3. if (tryRelease(arg)) {
  4. Node h = head;
  5. // 如果头结点是即将被唤醒的线程,直接进行唤醒操作尝试获取锁
  6. if (h != null && h.waitStatus != 0)
  7. // 唤醒线程
  8. unparkSuccessor(h);
  9. return true;
  10. }
  11. return false;
  12. }
  13. protected final boolean tryRelease(int releases) {
  14. if (!isHeldExclusively())
  15. throw new IllegalMonitorStateException();
  16. int nextc = getState() - releases;
  17. boolean free = exclusiveCount(nextc) == 0;
  18. // 释放独占对象引用
  19. if (free)
  20. setExclusiveOwnerThread(null);
  21. // 释放共享资源状态,减 1
  22. setState(nextc);
  23. return free;
  24. }

5、读锁释放锁?

  1. public final void acquireShared(int arg) {
  2. // 释放共享锁
  3. if (tryAcquireShared(arg) < 0)
  4. // 唤醒队列线程尝试获取锁
  5. doAcquireShared(arg);
  6. }
  7. protected final boolean tryReleaseShared(int unused) {
  8. Thread current = Thread.currentThread();
  9. if (firstReader == current) {
  10. // assert firstReaderHoldCount > 0;
  11. if (firstReaderHoldCount == 1)
  12. firstReader = null;
  13. else
  14. firstReaderHoldCount--;
  15. } else {
  16. HoldCounter rh = cachedHoldCounter;
  17. if (rh == null || rh.tid != getThreadId(current))
  18. rh = readHolds.get();
  19. int count = rh.count;
  20. if (count <= 1) {
  21. readHolds.remove();
  22. if (count <= 0)
  23. throw unmatchedUnlockException();
  24. }
  25. --rh.count;
  26. }
  27. // 每次释放锁,共享资源减1
  28. for (;;) {
  29. int c = getState();
  30. int nextc = c - SHARED_UNIT;
  31. if (compareAndSetState(c, nextc))
  32. // Releasing the read lock has no effect on readers,
  33. // but it may allow waiting writers to proceed if
  34. // both read and write locks are now free.
  35. return nextc == 0;
  36. }
  37. }

四 总结

1)ReentantReadWriteLock 基于 AQS 实现,内部读写锁共同使用 valatile state 共享资源实现了 WriteLock(写互斥) 和 ReadLock (读共享)的逻辑;

2)用于读多,写少的场景,如果写很多,还不如用 ReentrantLock,也是基于 AQS 模板方法实现的,加锁和解锁实现简单,性能更高;

3)理解了中心思想,内部很多实现细节可以参考 AQS 去抠;

发表评论

表情:
评论列表 (有 0 条评论,264人围观)

还没有评论,来说两句吧...

相关阅读