【并发编程】ReentrantReadWriteLock 源码分析
前言
Github:https://github.com/yihonglei/jdk-source-code-reading(java-concurrent)
一 ReentrantReadWriteLock 概述
ReentrantReadWriteLock 基于AQS 实现的读写锁,应用于读多写少的场景。
1、写锁(互斥锁)
1)判断是否已经存在锁,如果有锁,判断是否是重入锁,是,加锁成功,否则加锁失败,基于当前线程构建独占式节点进入等待队列,
线程挂起,等待被唤醒尝试重新获取锁;
2)如果无锁,尝试加锁,加锁成功返回,否则基于当前线程构建独占式节点进入等待队列,线程挂起,等待被唤醒尝试重新获取锁;
2、读锁(共享锁)
1)判断是否有写锁,如果存在写锁,基于当前线程构建共享式节点进入等待队列,线程挂起,等待被唤醒尝试重新获取锁;
2)尝试获取锁,如果获取锁失败,基于当前线程构建共享式节点进入等待队列,线程挂起,等待被唤醒尝试重新获取锁;
二 ReentrantReadWriteLock 实例
package com.jpeony.concurrent.locks.reentrantreadwritelock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 写锁(互斥锁):当有读或写锁,不能加写锁,进入队列等待;
* 读锁(共享锁):当有写锁,不能加读锁,进入队列等待;
* @author yihonglei
*/
public class ReentrantReadWriteLockSimple {
private final static ReadWriteLock lock = new ReentrantReadWriteLock();
public static void main(String[] args) {
Thread writeThread = new Thread(new WriteTask());
Thread readThread1 = new Thread(new ReadTask1());
Thread readThread2 = new Thread(new ReadTask2());
// writeThread.start();
readThread1.start();
readThread2.start();
}
private static class WriteTask implements Runnable {
@Override
public void run() {
try {
System.out.println("写-WriteTask-lock");
lock.writeLock().lock();
System.out.println("写-WriteTask-start");
Thread.sleep(10000);
System.out.println("写-WriteTask-end");
} catch (InterruptedException e) {
// ignore
} finally {
lock.writeLock().unlock();
}
}
}
private static class ReadTask1 implements Runnable {
@Override
public void run() {
try {
System.out.println("读-ReadTask1-lock");
lock.readLock().lock();
System.out.println("读-ReadTask1-start");
Thread.sleep(5000);
System.out.println("读-ReadTask1-end");
} catch (InterruptedException e) {
// ignore
} finally {
lock.readLock().unlock();
}
}
}
private static class ReadTask2 implements Runnable {
@Override
public void run() {
try {
System.out.println("读-ReadTask2-lock");
lock.readLock().lock();
System.out.println("读-ReadTask2-start");
Thread.sleep(1000);
System.out.println("读-ReadTask2-end");
} catch (InterruptedException e) {
// ignore
} finally {
lock.readLock().unlock();
}
}
}
}
启动 writeThread 和 readThread1 可以测试 加写锁未释放,这个时候加读锁被阻塞,直到写锁释放,读锁才能获取成功;
启动 readThread1 和 readThread2 可以测试 读锁共享的特性,可以感受在线程加读锁未释放时,别的线程也可以加读锁成功;
启动 readThread1 和 writeThread 调下顺序,并调整读线程 sleep 时间,通常情况下可以按顺序执行,也可以控制执行顺序,
可以看到读锁加锁时,写锁被阻塞;
也可以增加多个写线程,测试写锁互斥被阻塞。
三 ReentrantReadWriteLock 源码分析
1、构造方法
默认fair 是 false,为非公平锁,如果想用公平锁,fair 传 true。
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
ReentrantReadWriteLock 实现了 WriteLock 和 ReadLock,共享 valatile 的 state 资源。
2、如何实现加写锁互斥?
public final void acquire(int arg) {
// 加锁
if (!tryAcquire(arg) &&
// 加锁失败,构建独占式节点进入队列,线程挂起,等待被唤醒尝试重新获取锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
// 获取当前线程
Thread current = Thread.currentThread();
// 获取共享资源
int c = getState();
int w = exclusiveCount(c);
// c 大于 0,说明这个时候有已经存在读锁或写锁,不进行加锁操作
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// 如果不存在写锁 或 不是重入锁,返回加锁失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 如果是重入锁,判断重入次数是否大于最大次数,异常则返回
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 重入锁 state 加 1,返回成功
setState(c + acquires);
return true;
}
// 加锁操作,writerShouldBlock 如果非公平,返回 false,如果是公平需要判断
// 等待队列头结点是不是当前线程,不是返回 false,因为要保证公平的顺序性,
// CAS 操作加锁状态
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 写锁是独占的,构建独占线程
setExclusiveOwnerThread(current);
return true;
}
3、如何实现读锁共享加锁?
public final void acquireShared(int arg) {
// 加锁
if (tryAcquireShared(arg) < 0)
// 获取失败,构建共享式节点进入队列,线程挂起,等待被缓存尝试重新获取锁
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
// 获取共享资源
int c = getState();
// 判断是否有独占线程,即是否有写锁存在,如果有,返回获取锁失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取锁并返回
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
4、写锁释放锁?
public final boolean release(int arg) {
// 释放锁成功
if (tryRelease(arg)) {
Node h = head;
// 如果头结点是即将被唤醒的线程,直接进行唤醒操作尝试获取锁
if (h != null && h.waitStatus != 0)
// 唤醒线程
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
// 释放独占对象引用
if (free)
setExclusiveOwnerThread(null);
// 释放共享资源状态,减 1
setState(nextc);
return free;
}
5、读锁释放锁?
public final void acquireShared(int arg) {
// 释放共享锁
if (tryAcquireShared(arg) < 0)
// 唤醒队列线程尝试获取锁
doAcquireShared(arg);
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// 每次释放锁,共享资源减1
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
四 总结
1)ReentantReadWriteLock 基于 AQS 实现,内部读写锁共同使用 valatile state 共享资源实现了 WriteLock(写互斥) 和 ReadLock (读共享)的逻辑;
2)用于读多,写少的场景,如果写很多,还不如用 ReentrantLock,也是基于 AQS 模板方法实现的,加锁和解锁实现简单,性能更高;
3)理解了中心思想,内部很多实现细节可以参考 AQS 去抠;
还没有评论,来说两句吧...