读写锁ReentrantReadWriteLock

小灰灰 2022-09-11 08:19 310阅读 0赞

读线程与读线程之间不互斥

  1. public interface ReadWriteLock {
  2. Lock readLock();
  3. Lock writeLock();
  4. }

ReentrantReadWriteLock实现了这个接口
ReentrantReadWriteLock的使用方法

  1. ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
  2. Lock readLock = readWriteLock.readLock();
  3. readLock.lock();
  4. readLock.unlock();
  5. Lock writeLock = readWriteLock.writeLock();
  6. writeLock.lock();
  7. writeLock.unlock();

ReentrantReadWriteLock的实现
构造方法

  1. public ReentrantReadWriteLock() {
  2. //是否公平
  3. this(false);
  4. }
  5. public ReentrantReadWriteLock(boolean fair) {
  6. //使用公平或者非公平的sync
  7. sync = fair ? new FairSync() : new NonfairSync();
  8. //readerLock 和writerLock 使用同一个sync
  9. readerLock = new ReadLock(this);
  10. writerLock = new WriteLock(this);
  11. }

读写锁和互斥锁一样也是用state来表示锁的状态的,只是操作方式不同

  1. abstract static class Sync extends AbstractQueuedSynchronizer {
  2. //...
  3. static final int SHARED_SHIFT = 16;
  4. static final int SHARED_UNIT = (1 << SHARED_SHIFT);
  5. static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
  6. static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
  7. //int的长度是32位,这里会把state作为参数传入,然后取state的高16位
  8. static int sharedCount(int c) {
  9. return c >>> SHARED_SHIFT; }
  10. //取state的低16位
  11. static int exclusiveCount(int c) {
  12. return c & EXCLUSIVE_MASK; }
  13. //...
  14. }

ReentrantReadWriteLock会把state变量拆成两半,低16位,用来记录写锁。如果低16位的值等于5,表示一个写线程重入了5次。高16位,用来记录读锁。例如,高16位的值等于5,既可以表示5个读线程都拿到了该锁;也可以表示一个读线程重入了5次

当state=0时,说明既没有线程持有读锁,也没有线程持有写锁;当state != 0时,要么有线程持有读锁,要么有线程持有写锁,两者不能同时成立,因为读和写互斥。这时再进一步通过sharedCount(state)和exclusiveCount(state)判断到底是读线程还是写线程持有了该锁。

ReentrantReadWriteLock的两个内部类ReadLock和WriteLock中使用state变量

  1. public static class ReadLock implements Lock, java.io.Serializable {
  2. private final Sync sync;
  3. protected ReadLock(ReentrantReadWriteLock lock) {
  4. sync = lock.sync;
  5. }
  6. public void lock() {
  7. sync.acquireShared(1);
  8. }
  9. public void unlock() {
  10. sync.releaseShared(1);
  11. }
  12. }
  13. public static class WriteLock implements Lock, java.io.Serializable {
  14. private final Sync sync;
  15. protected WriteLock(ReentrantReadWriteLock lock) {
  16. sync = lock.sync;
  17. }
  18. public void lock() {
  19. sync.acquire(1);
  20. }
  21. public void unlock() {
  22. sync.release(1);
  23. }
  24. }

acquire/release、acquireShared/releaseShared 是AQS里面的两对模板方法。

  1. public final void acquireShared(int arg) {
  2. //由AQS的子类sync实现
  3. if (tryAcquireShared(arg) < 0)
  4. //获得锁失败把自己加入等待获取锁队列,阻塞线程等待唤醒
  5. doAcquireShared(arg);
  6. }
  7. public final boolean releaseShared(int arg) {
  8. //由AQS的子类sync实现
  9. if (tryReleaseShared(arg)) {
  10. //唤醒挂起的线程
  11. doReleaseShared();
  12. return true;
  13. }
  14. return false;
  15. }
  16. public final void acquire(int arg) {
  17. //由AQS的子类sync实现
  18. if (!tryAcquire(arg) &&
  19. //获得写锁失败把自己加入阻塞队列
  20. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  21. //响应中断
  22. selfInterrupt();
  23. }
  24. public final boolean release(int arg) {
  25. //由AQS的子类sync实现
  26. if (tryRelease(arg)) {
  27. Node h = head;
  28. if (h != null && h.waitStatus != 0)
  29. //唤醒线程
  30. unparkSuccessor(h);
  31. return true;
  32. }
  33. return false;
  34. }

ReentrantReadWriteLock中的sync

  1. //非公平的sync
  2. static final class NonfairSync extends Sync {
  3. private static final long serialVersionUID = -8159625535654395037L;
  4. //写线程是否应该阻塞
  5. final boolean writerShouldBlock() {
  6. //非公平锁不阻塞
  7. return false;
  8. }
  9. //读线程是否应该阻塞
  10. final boolean readerShouldBlock() {
  11. //队列中第一个是写线程则阻塞
  12. return apparentlyFirstQueuedIsExclusive();
  13. }
  14. }
  15. //公平的sync
  16. static final class FairSync extends Sync {
  17. private static final long serialVersionUID = -2274990926593161451L;
  18. //写线程是否应该阻塞
  19. final boolean writerShouldBlock() {
  20. //如果队列中已经有线程在排队则阻塞
  21. return hasQueuedPredecessors();
  22. }
  23. //读线程是否应该阻塞
  24. final boolean readerShouldBlock() {
  25. return hasQueuedPredecessors();
  26. }
  27. }

写锁的实现

  1. public boolean tryLock( ) {
  2. return sync.tryWriteLock();
  3. }
  4. final boolean tryWriteLock() {
  5. Thread current = Thread.currentThread();
  6. //获得state
  7. int c = getState();
  8. //当state不等于0的时候说明有线程获取了锁
  9. if (c != 0) {
  10. //获得写线程的数量
  11. int w = exclusiveCount(c);
  12. //如果写线程==0说明持有锁的是读线程,或者当前线程不是排它锁的持有线程
  13. if (w == 0 || current != getExclusiveOwnerThread())
  14. return false;
  15. if (w == MAX_COUNT)
  16. throw new Error("Maximum lock count exceeded");
  17. }
  18. //cas将state+1重入
  19. if (!compareAndSetState(c, c + 1))
  20. return false;
  21. //设置排它锁的持有线程为当前线程
  22. setExclusiveOwnerThread(current);
  23. return true;
  24. }
  25. //lock的时候调用tryAcquire,这里是fair的sync
  26. protected final boolean tryAcquire(int acquires) {
  27. final Thread current = Thread.currentThread();
  28. int c = getState();
  29. if (c == 0) {
  30. //前面没有其他线程
  31. if (!hasQueuedPredecessors() &&
  32. //cas设置state为acquires
  33. compareAndSetState(0, acquires)) {
  34. //设置排它锁线程为当前线程
  35. setExclusiveOwnerThread(current);
  36. return true;
  37. }
  38. }
  39. //如果排它锁的持有线程为当前线程
  40. else if (current == getExclusiveOwnerThread()) {
  41. int nextc = c + acquires;
  42. if (nextc < 0)
  43. throw new Error("Maximum lock count exceeded");
  44. setState(nextc);
  45. return true;
  46. }
  47. return false;
  48. }
  49. public void unlock() {
  50. sync.release(1);
  51. }
  52. public final boolean release(int arg) {
  53. if (tryRelease(arg)) {
  54. Node h = head;
  55. //唤醒队列里面的第一个线程
  56. if (h != null && h.waitStatus != 0)
  57. unparkSuccessor(h);
  58. return true;
  59. }
  60. return false;
  61. }

读锁的实现

  1. public boolean tryLock() {
  2. return sync.tryReadLock();
  3. }
  4. final boolean tryReadLock() {
  5. Thread current = Thread.currentThread();
  6. for (;;) {
  7. int c = getState();
  8. //如果排它锁的持有线程是写线程,并且当前线程不是持有锁线程
  9. if (exclusiveCount(c) != 0 &&
  10. getExclusiveOwnerThread() != current)
  11. return false;
  12. //获得读锁的state
  13. int r = sharedCount(c);
  14. if (r == MAX_COUNT)
  15. throw new Error("Maximum lock count exceeded");
  16. //cas设置读锁
  17. if (compareAndSetState(c, c + SHARED_UNIT)) {
  18. //如果r=0说明当前线程是第一个读线程
  19. if (r == 0) {
  20. firstReader = current;
  21. firstReaderHoldCount = 1;
  22. //第一个读线程是当前线程
  23. } else if (firstReader == current) {
  24. firstReaderHoldCount++;
  25. } else {
  26. // 如果firstReader不是当前线程,则从ThreadLocal中获取当前线程的读锁个数,并设置当前线程持有的读锁个数
  27. HoldCounter rh = cachedHoldCounter;
  28. if (rh == null || rh.tid != getThreadId(current))
  29. cachedHoldCounter = rh = readHolds.get();
  30. else if (rh.count == 0)
  31. readHolds.set(rh);
  32. rh.count++;
  33. }
  34. return true;
  35. }
  36. }
  37. }
  38. public void lock() {
  39. sync.acquireShared(1);
  40. }
  41. public final void acquireShared(int arg) {
  42. if (tryAcquireShared(arg) < 0)
  43. //获取锁失败,将当前线程加入阻塞队列
  44. doAcquireShared(arg);
  45. }
  46. public void unlock() {
  47. sync.releaseShared(1);
  48. }
  49. public final boolean releaseShared(int arg) {
  50. //cas设置读线程的state
  51. if (tryReleaseShared(arg)) {
  52. //唤醒线程
  53. doReleaseShared();
  54. return true;
  55. }
  56. return false;
  57. }

发表评论

表情:
评论列表 (有 0 条评论,310人围观)

还没有评论,来说两句吧...

相关阅读

    相关 ReentrantReadWriteLock

    1. 写锁比读锁的优先级要高,拥有写锁之后还可以再获取读锁,但是拥有读锁的线程在释放前无法再获取写锁。 2. 允许锁降级,即从写锁降级为读锁,实现的步骤是:先获取写锁,再获