并发编程——ReentrantReadWriteLock 我会带着你远行 2023-10-16 11:56 2阅读 0赞 #### 文章目录 #### * 为什么要出现读写锁 * 读写锁的核心思想 * 写锁的操作 * * 写锁加锁-acquire * 写锁-释放锁操作 * 读锁的操作 * * 读锁的加锁操作 * 总结 ## 为什么要出现读写锁 ## 因为ReentrantLock是互斥锁,如果有一个操作是读多写少,同时还需要保证线程安全,那么使用ReentrantLock会导致效率比较低。因为多个线程在对同一个数据进行读操作时,也不会造成线程安全问题。所以出现了ReentrantReadWriteLock锁: * 读读操作是共享的。 * 写写操作是互斥的。 * 读写操作是互斥的。 * 写读操作是互斥的。 单个线程获取写锁后,再次获取读锁,可以拿到。(写读可重入) 单个线程获取读锁后,再次获取写锁,拿不到。(读写不可重入) 使用方式: public class XxxTest { // 读写锁 static ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); // 写锁 static ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); // 读锁 static ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); public static void main(String[] args) throws InterruptedException { readLock.lock(); try { System.out.println("拿到读锁!"); } finally { readLock.unlock(); } writeLock.lock(); try { System.out.println("拿到写锁!"); } finally { writeLock.unlock(); } } } ## 读写锁的核心思想 ## ReentrantReadWriteLock基于AQS实现的,很多功能的实现和ReentrantLock类似。还是基于AQS的state来确定当前线程是否拿到锁资源。将state的高16位作为读锁的标识,将state的低16位作为写锁的标识 锁重入问题: * 写锁重入:因为写操作和其他操作是互斥的,代表同一时间,只有一个线程持有着写锁,只要锁重入,就对低位+1即可。 * 读锁重入:读锁的重入不能仿照写锁的方式,因为写锁属于互斥锁,同一时间只会有一个线程持有写锁,但是读锁是共享锁,同一时间会有多个线程持有读锁。所以每个获取到读锁的线程,记录锁重入的方式都是基于自己的ThreadLocal存储锁重入次数。 读锁重入修改state,只是记录当前线程锁重入的次数,需要基于ThreadLocal记录。 state二进制表示:00000000 00000000 00000000 00000000 将state的高16位作为读锁的标识,将state的低16位作为写锁的标识。 写锁:00000000 00000000 00000000 00000001 写锁重入,低16位+1:00000000 00000000 00000000 00000010 读锁: 00000000 00000001 00000000 00000000 读锁重入,高16位+1: 00000000 00000010 00000000 00000000 每个读操作的线程,在获取读锁时,都需要开辟一个ThreadLocal。读写锁为了优化这个事情,做了两手操作: * 第一个拿到读锁的线程,不用ThreadLocal记录重入次数,在读写锁内有有一个firstRead记录重入次数 * 记录了最后一个拿到读锁的线程的重入次数,交给cachedHoldCounter属性标识,可以避免频繁的在锁重入时,从TL中获取。 ## 写锁的操作 ## ### 写锁加锁-acquire ### public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } tryAcquire:尝试获取锁资源,能否以CAS的方式将state 从0 改为 1,改成功,拿锁成功。 addWaiter:将当前没按到锁资源的,封装成Node,排到AQS里。 acquireQueued:当前排队的能否竞争锁资源,不能挂起线程阻塞。 因为都是AQS的实现,主要看tryAcquire // state,高16:读,低16:写 00000000 00000000 00000000 00000000 00000000 00000001 00000000 00000000 - SHARED_UNIT 00000000 00000000 11111111 11111111 - MAX_COUNT 00000000 00000000 11111111 11111111 - EXCLUSIVE_MASK & 00000000 00000000 00000000 00000001 static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 只拿到表示读锁的高16位。 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } // 只拿到表示写锁的低16位。 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } // 读写锁的写锁,获取流程 protected final boolean tryAcquire(int acquires) { // 拿到当前线程 Thread current = Thread.currentThread(); // 拿到state int c = getState(); // 拿到了写锁的低16位标识w int w = exclusiveCount(c); // c != 0:要么有读操作拿着锁,要么有写操作拿着锁 if (c != 0) { // 如果w == 0,代表没有写锁,拿不到 // 如果w != 0,代表有写锁,看一下拿占用写锁是不是当前线程,如果不是,拿不到 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 到这,说明肯定是写锁,并且是当前线程持有 // 判断对低位 + 1,是否会超过MAX_COUNT,超过抛Error if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 如果没超过锁重入次数, + 1,返回true,拿到锁资源。 setState(c + acquires); return true; } // 到这,说明c == 0 // 读写锁也分为公平锁和非公平锁 // 公平:看下排队不,排队就不抢了 // 走hasQueuedPredecessors方法,有排队的返回true,没排队的返回false // 非公平:直接抢! // 方法实现直接返回false if (writerShouldBlock() || // 以CAS的方式,将state从0修改为 1 !compareAndSetState(c, c + acquires)) // 要么不让抢,要么CAS操作失败,返回false return false; // 将当前持有互斥锁的线程,设置为自己 setExclusiveOwnerThread(current); return true; } addWaiter和acquireQueued和ReentrantLock看的一样,都是AQS自身提供的方法。 ### 写锁-释放锁操作 ### 读写锁的释放操作,跟ReentrantLock一致,只是需要单独获取低16位,判断是否为0,为0就释放成功 // 写锁的释放锁 public final boolean release(int arg) { // 只有tryRealse是读写锁重新实现的方法,其他的和ReentrantLock一致 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // 读写锁的真正释放 protected final boolean tryRelease(int releases) { // 判断释放锁的线程是不是持有锁的线程 if (!isHeldExclusively()) // 不是抛异常 throw new IllegalMonitorStateException(); // 对state - 1 int nextc = getState() - releases; // 拿着next从获取低16位的值,判断是否为0 boolean free = exclusiveCount(nextc) == 0; // 返回true if (free) // 将持有互斥锁的线程信息置位null setExclusiveOwnerThread(null); // 将-1之后的nextc复制给state setState(nextc); return free; } ## 读锁的操作 ## ### 读锁的加锁操作 ### // 读锁加锁操作 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } tryAcquireShared,尝试获取锁资源,获取到返回1,没获取到返回-1 doAcquireShared,前面没拿到锁,这边需要排队 // tryAcquireShared方法 protected final int tryAcquireShared(int unused) { // 获取当前线程 Thread current = Thread.currentThread(); // 拿到state int c = getState(); // 那写锁标识,如果 !=0,代表有写锁 if (exclusiveCount(c) != 0 && // 如果持有写锁的不是当前线程,排队去! getExclusiveOwnerThread() != current) // 排队! return -1; // 没有写锁! // 获取读锁信息 int r = sharedCount(c); // 公平锁: 有人排队,返回true,直接拜拜,没人排队,返回false // 非公平锁:正常的逻辑是非公平直接抢,因为是读锁,每次抢占只要CAS成功,必然成功 // 这就会出现问题,写操作无法在读锁的情况抢占资源,导致写线程饥饿,一直阻塞 // 非公平锁会查看next是否是写锁的,如果是,返回true,如果不是返回false if (!readerShouldBlock() && // 查看读锁是否已经达到了最大限制 r < MAX_COUNT && // 以CAS的方式,对state的高16位+1 compareAndSetState(c, c + SHARED_UNIT)) { // 拿到锁资源成功!!! if (r == 0) { // 第一个拿到锁资源的线程,用first存储 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // 锁重入,第一个拿到读锁的线程,直接对firstReaderHoldCount++记录重入的次数 firstReaderHoldCount++; } else { // 不是第一个拿到锁资源的 // 先拿到cachedHoldCounter,最后一个线程的重入次数 HoldCounter rh = cachedHoldCounter; // rh == null: 第二个拿到读锁的! // 或者发现之前有最后一个来的,但是不我,将我设置为最后一个。 if (rh == null || rh.tid != getThreadId(current)) // 获取自己的重入次数,并赋值给cachedHoldCounter cachedHoldCounter = rh = readHolds.get(); // 之前拿过,现在如果为0,赋值给TL else if (rh.count == 0) readHolds.set(rh); // 重入次数+1, // 第一个:可能是第一次拿 // 第二个:可能是重入操作 rh.count++; } return 1; } return fullTryAcquireShared(current); } // 通过tryAcquireShared没拿到锁资源,也没返回-1,就走这 final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { // 拿state int c = getState(); // 现在有互斥锁,不是自己,拜拜! if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // 公平:有排队的,进入逻辑。 没排队的,过! // 非公平:head的next是写不,是,进入逻辑。 如果不是,过! } else if (readerShouldBlock()) { // 这里代码特别乱,因为这里的代码为了处理JDK1.5的内存泄漏问题,修改过~ // 这个逻辑里不会让你拿到锁,做被阻塞前的准备 if (firstReader == current) { // 什么都不做 } else { if (rh == null) { // 获取最后一个拿到读锁资源的 rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { // 拿到我自己的记录重入次数的。 rh = readHolds.get(); // 如果我的次数是0,绝对不是重入操作! if (rh.count == 0) // 将我的TL中的值移除掉,不移除会造成内存泄漏 readHolds.remove(); } } // 如果我的次数是0,绝对不是重入操作! if (rh.count == 0) // 返回-1,等待阻塞吧! return -1; } } // 超过读锁的最大值了没? if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 到这,就CAS竞争锁资源 if (compareAndSetState(c, c + SHARED_UNIT)) { // 跟tryAcquireShared一模一样 if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; } return 1; } } } 加锁-扔到队列准备阻塞操作 // 没拿到锁,准备挂起 private void doAcquireShared(int arg) { // 将当前线程封装为Node,当前Node为共享锁,并添加到队列的模式 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取上一个节点 final Node p = node.predecessor(); if (p == head) { // 如果我的上一个是head,尝试再次获取锁资源 int r = tryAcquireShared(arg); if (r >= 0) { // 如果r大于等于0,代表获取锁资源成功 // 唤醒AQS中我后面的要获取读锁的线程(SHARED模式的Node) setHeadAndPropagate(node, r); p.next = null; if (interrupted) selfInterrupt(); failed = false; return; } } // 能否挂起当前线程,需要保证我前面Node的状态为-1,才能执行后面操作 if (shouldParkAfterFailedAcquire(p, node) && //LockSupport.park挂起~~ parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } ## 总结 ## ReentrantReadWriteLock的使用可以提高并发性,特别适用于读操作远多于写操作的情况。它相对于排他锁,因为排他锁在同一时间只允许一个线程访问。ReentrantReadWriteLock允许多个读线程同时访问,但不允许写线程和读线程同时访问,也不允许写线程和写线程同时访问。
还没有评论,来说两句吧...