【并发编程】ReentrantReadWriteLock 源码分析 短命女 2023-01-12 01:48 147阅读 0赞 # 前言 # Github:[https://github.com/yihonglei/jdk-source-code-reading][https_github.com_yihonglei_jdk-source-code-reading](java-concurrent) # 一 ReentrantReadWriteLock 概述 # ReentrantReadWriteLock 基于[AQS][] 实现的读写锁,应用于读多写少的场景。 ## 1、写锁(互斥锁) ## ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3lobF9qeHk_size_16_color_FFFFFF_t_70][] 1)判断是否已经存在锁,如果有锁,判断是否是重入锁,是,加锁成功,否则加锁失败,基于当前线程构建独占式节点进入等待队列, 线程挂起,等待被唤醒尝试重新获取锁; 2)如果无锁,尝试加锁,加锁成功返回,否则基于当前线程构建独占式节点进入等待队列,线程挂起,等待被唤醒尝试重新获取锁; ## 2、读锁(共享锁) ## ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3lobF9qeHk_size_16_color_FFFFFF_t_70 1][] 1)判断是否有写锁,如果存在写锁,基于当前线程构建共享式节点进入等待队列,线程挂起,等待被唤醒尝试重新获取锁; 2)尝试获取锁,如果获取锁失败,基于当前线程构建共享式节点进入等待队列,线程挂起,等待被唤醒尝试重新获取锁; # 二 ReentrantReadWriteLock 实例 # package com.jpeony.concurrent.locks.reentrantreadwritelock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * 写锁(互斥锁):当有读或写锁,不能加写锁,进入队列等待; * 读锁(共享锁):当有写锁,不能加读锁,进入队列等待; * @author yihonglei */ public class ReentrantReadWriteLockSimple { private final static ReadWriteLock lock = new ReentrantReadWriteLock(); public static void main(String[] args) { Thread writeThread = new Thread(new WriteTask()); Thread readThread1 = new Thread(new ReadTask1()); Thread readThread2 = new Thread(new ReadTask2()); // writeThread.start(); readThread1.start(); readThread2.start(); } private static class WriteTask implements Runnable { @Override public void run() { try { System.out.println("写-WriteTask-lock"); lock.writeLock().lock(); System.out.println("写-WriteTask-start"); Thread.sleep(10000); System.out.println("写-WriteTask-end"); } catch (InterruptedException e) { // ignore } finally { lock.writeLock().unlock(); } } } private static class ReadTask1 implements Runnable { @Override public void run() { try { System.out.println("读-ReadTask1-lock"); lock.readLock().lock(); System.out.println("读-ReadTask1-start"); Thread.sleep(5000); System.out.println("读-ReadTask1-end"); } catch (InterruptedException e) { // ignore } finally { lock.readLock().unlock(); } } } private static class ReadTask2 implements Runnable { @Override public void run() { try { System.out.println("读-ReadTask2-lock"); lock.readLock().lock(); System.out.println("读-ReadTask2-start"); Thread.sleep(1000); System.out.println("读-ReadTask2-end"); } catch (InterruptedException e) { // ignore } finally { lock.readLock().unlock(); } } } } 启动 writeThread 和 readThread1 可以测试 加写锁未释放,这个时候加读锁被阻塞,直到写锁释放,读锁才能获取成功; 启动 readThread1 和 readThread2 可以测试 读锁共享的特性,可以感受在线程加读锁未释放时,别的线程也可以加读锁成功; 启动 readThread1 和 writeThread 调下顺序,并调整读线程 sleep 时间,通常情况下可以按顺序执行,也可以控制执行顺序, 可以看到读锁加锁时,写锁被阻塞; 也可以增加多个写线程,测试写锁互斥被阻塞。 # 三 ReentrantReadWriteLock 源码分析 # ## 1、构造方法 ## 默认fair 是 false,为非公平锁,如果想用公平锁,fair 传 true。 public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } ReentrantReadWriteLock 实现了 WriteLock 和 ReadLock,共享 valatile 的 state 资源。 ## 2、如何实现加写锁互斥? ## public final void acquire(int arg) { // 加锁 if (!tryAcquire(arg) && // 加锁失败,构建独占式节点进入队列,线程挂起,等待被唤醒尝试重新获取锁 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ // 获取当前线程 Thread current = Thread.currentThread(); // 获取共享资源 int c = getState(); int w = exclusiveCount(c); // c 大于 0,说明这个时候有已经存在读锁或写锁,不进行加锁操作 if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) // 如果不存在写锁 或 不是重入锁,返回加锁失败 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 如果是重入锁,判断重入次数是否大于最大次数,异常则返回 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire // 重入锁 state 加 1,返回成功 setState(c + acquires); return true; } // 加锁操作,writerShouldBlock 如果非公平,返回 false,如果是公平需要判断 // 等待队列头结点是不是当前线程,不是返回 false,因为要保证公平的顺序性, // CAS 操作加锁状态 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; // 写锁是独占的,构建独占线程 setExclusiveOwnerThread(current); return true; } ## 3、如何实现读锁共享加锁? ## public final void acquireShared(int arg) { // 加锁 if (tryAcquireShared(arg) < 0) // 获取失败,构建共享式节点进入队列,线程挂起,等待被缓存尝试重新获取锁 doAcquireShared(arg); } protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Thread current = Thread.currentThread(); // 获取共享资源 int c = getState(); // 判断是否有独占线程,即是否有写锁存在,如果有,返回获取锁失败 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 获取锁并返回 int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); } ## 4、写锁释放锁? ## public final boolean release(int arg) { // 释放锁成功 if (tryRelease(arg)) { Node h = head; // 如果头结点是即将被唤醒的线程,直接进行唤醒操作尝试获取锁 if (h != null && h.waitStatus != 0) // 唤醒线程 unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; // 释放独占对象引用 if (free) setExclusiveOwnerThread(null); // 释放共享资源状态,减 1 setState(nextc); return free; } ## 5、读锁释放锁? ## public final void acquireShared(int arg) { // 释放共享锁 if (tryAcquireShared(arg) < 0) // 唤醒队列线程尝试获取锁 doAcquireShared(arg); } protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } // 每次释放锁,共享资源减1 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } } # 四 总结 # 1)ReentantReadWriteLock 基于 AQS 实现,内部读写锁共同使用 valatile state 共享资源实现了 WriteLock(写互斥) 和 ReadLock (读共享)的逻辑; 2)用于读多,写少的场景,如果写很多,还不如用 [ReentrantLock][],也是基于 AQS 模板方法实现的,加锁和解锁实现简单,性能更高; 3)理解了中心思想,内部很多实现细节可以参考 AQS 去抠; [https_github.com_yihonglei_jdk-source-code-reading]: https://github.com/yihonglei/jdk-source-code-reading [AQS]: https://jpeony.blog.csdn.net/article/details/102535590 [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3lobF9qeHk_size_16_color_FFFFFF_t_70]: /images/20221119/f9471b0858334353bedaf3cb6f007925.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3lobF9qeHk_size_16_color_FFFFFF_t_70 1]: /images/20221119/f5eba72717ea4c628421f45d97a8f317.png [ReentrantLock]: https://jpeony.blog.csdn.net/article/details/87088314
还没有评论,来说两句吧...