Java锁详解之改进读写锁StampedLock
文章目录
- 先了解一下ReentrantReadWriteLock
- StampedLock重要方法
- StampedLock示例
- StampedLock可能出现的性能问题
- StampedLock原理
- StampedLock源码分析
先了解一下ReentrantReadWriteLock
当系统存在读和写两种操作的时候,读和读之间并不会对程序结果产生影响。所以后来设计了ReentrantReadWriteLock这种读写分离锁,它做到了读与读之间不用等待。
示例:
// 读写锁
private static ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
// 读锁
private static ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
// 写锁
private static ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();
// 共享资源
private static Integer count = 0;
// 读操作
public Integer readCount() throws InterruptedException {
readLock.lock();
try{
Thread.sleep(1000);
return count;
}finally {
readLock.unlock();
}
}
// 写操作
public void addOne() throws InterruptedException {
writeLock.lock();
try{
Thread.sleep(1000);
++count;
}finally {
writeLock.unlock();
}
}
这种读写分离锁的缺点是,只有读读操作不会竞争锁,即读与读操作是并行的,而读写、写写都会竞争锁。很明显读写其实大部分情况也都可以不竞争锁的,这就是后来StampedLock的优化点。
StampedLock重要方法
方法名 | 方法说明 |
---|---|
long readLock() | 获取读锁(悲观锁),如果资源正在被修改,则当前线程被阻塞 。返回值是在释放锁时会用到的stamp |
long tryReadLock() | 获取读锁 ,如果拿到锁则返回一个在释放锁时会用到的stamp。如果拿不到锁,直接返回0,且不阻塞线程 |
long readLockInterruptibly() | 获取读锁,可中断 |
void unlockRead(long stamp) | 如果参数里的stamp与该读锁的stamp一致,则释放读锁 |
long writeLock() | 获取写锁(悲观锁),如果拿不到锁则当前线程被阻塞。返回值在释放锁时会用到 |
long tryWriteLock() | 获取写锁,如果拿到锁则返回一个在释放锁时会用到的stamp。如果拿不到锁,则直接返回0,且不阻塞线程 |
long writeLockInterruptibly() | 获取写锁,可中断 |
void unlockWrite(long stamp) | 如果参数里的stamp与该写锁的stamp一致,则释放写锁 |
long tryOptimisticRead() | 乐观读,返回一个后面会被验证的stamp,如果资源被锁住了,则返回0 |
boolean validate(long stamp) | stamp值没有被修改过,返回true,否则返回false。如果stamp为0,始终返回false。 |
StampedLock示例
以下是官方例子:
public class Point {
//内部定义表示坐标点
private double x, y;
private final StampedLock s1 = new StampedLock();
void move(double deltaX, double deltaY) {
// 获取写锁,并拿到此时的stamp
long stamp = s1.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
// 释放写锁时,传入了获取写锁的stamp,
// 这也就是下面读方法里面validate能判断stamp是否被修改的原因
s1.unlockWrite(stamp);
}
}
//只读方法
double distanceFormOrigin() {
// 试图尝试一次乐观读 返回一个类似于时间戳的整数stamp,它在后面的validate方法中将被验证。
// 如果资源已经被锁住了,则返回0
long stamp = s1.tryOptimisticRead();
//读取x和y的值。这时候我们并不确定x和y是否是一致的
double currentX = x, currentY = y;
// 判断这个stamp在读的过程中是否被修改过
// 如果stamp没有被修改过返回true,被修改过返回false。如果stamp为0,是返回false。
if (!s1.validate(stamp)) {
// 发现被修改了,这里使用readLock()获得悲观的读锁,并进一步读取数据。
// 如果当前对象正在被修改,则读锁的申请可能导致线程挂起。
stamp = s1.readLock();
try {
currentX = x;
currentY = y;
} finally {
s1.unlockRead(stamp);//退出临界区,释放读锁
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}
上面读方法中,如果发现资源被修改了,还可以通过像JDK7中AtomicInteger类里的CAS操作那样写一个死循环(JDK8不是这样写的),通过不断的尝试使得最终拿到锁:
double distanceFormOrigin() {
double currentX , currentY ;
for(;;){
long stamp = s1.tryOptimisticRead();
currentX = x;
currentY = y;
if(s1.validate(stamp)){
break;
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
从上面代码也能看出,在读方法里面,必须是以下顺序写代码:
long stamp = s1.tryOptimisticRead();
- 读取共享资源
- 判断在第2步时,共享资源是否被更改:
s1.validate(stamp)
StampedLock锁适用于读多写少的场景
StampedLock可能出现的性能问题
StampedLock内部实现时,使用类似于CAS操作的死循环反复尝试的策略。
在它挂起线程时,使用的是Unsafe.park()函数,而park()函数在遇到线程中断时,会直接返回(不会抛出异常)。而在StampedLock的死循环逻辑中,没有处理有关中断的逻辑。因此,这就会导致阻塞在park()上的线程被中断后,会再次进入循环。而当退出条件得不到满足时,就会发生疯狂占用CPU的情况。
下面演示了这个问题:
public class StampedLockCUPDemo {
static Thread[] holdCpuThreads = new Thread[3];
static final StampedLock lock = new StampedLock();
public static void main(String[] args) throws InterruptedException {
new Thread() {
public void run(){
long readLong = lock.writeLock();
// 一直占着锁,可使其他线程被挂起
LockSupport.parkNanos(6100000000L);
lock.unlockWrite(readLong);
}
}.start();
Thread.sleep(100);
for( int i = 0; i < 3; ++i) {
holdCpuThreads [i] = new Thread(new HoldCPUReadThread());
holdCpuThreads [i].start();
}
Thread.sleep(10000);
// 中断三个线程:中断是问题的关键原因
for(int i=0; i<3; i++) {
holdCpuThreads [i].interrupt();
}
}
private static class HoldCPUReadThread implements Runnable {
public void run() {
// 获取读锁,将被阻塞,循环
long lockr = lock.readLock();
System.out.println(Thread.currentThread().getName() + " get read lock");
lock.unlockRead(lockr);
}
}
}
上面获取读锁被park()阻塞的线程由于中断操作,导致线程再次进入死循环,导致线程一直处于RUNNABLE状态,消耗着CPU资源。当拿到锁后,这种情况就会消失。这种情况在实际上出现的概率是较低的。
StampedLock原理
针对ReentrantReadWriteLock只能读与读并行,而读与写不能并行的问题,JDK8实现了StampedLock。
StampedLock的内部实现是基于CLH锁的。CLH锁是一种自旋锁,它保证没有饥饿发生,并且可以保证FIFO的服务顺序。
CLH锁的基本思想如下:锁维护一个等待线程队列,所有申请锁,但是没有成功的线程都记录在这个队列中。每一个节点(一个节点代表一个线程),保存一个标记位(locked),用于判断当前线程是否已经释放锁。
当一个线程试图获得锁时,取得当前等待队列的尾部节点作为其前序节点,并使用类似如下代码判断前序节点是否已经成功释放:
while(pred.locked) {
}
只要前序节点(pred)没有释放锁,则表示当前线程还不能继续执行,因此会自旋等待。
反之,如果前序线程已经释放锁,则当前线程可以继续执行。
释放锁时,也遵循这个逻辑,线程会将自身节点的locked位置标记为false,那么后续等待的线程就能继续执行了。
在StampedLock内部,为维护一个等待链表队列:
static final class WNode {
volatile WNode prev;
volatile WNode next;
volatile WNode cowait; // list of linked readers
volatile Thread thread; // non-null while possibly parked
volatile int status; // 0, WAITING, or CANCELLED
final int mode; // RMODE or WMODE
WNode(int m, WNode p) { // 构造器
mode = m; prev = p;
}
}
/** Head of CLH queue */
private transient volatile WNode whead;
/** Tail (last) of CLH queue */
private transient volatile WNode wtail;
上述代码中,WNode(wait node)为链表的基本元素,每一个WNode表示一个等待线程。字段whead和wtail分别指向等待链表的头部和尾部。
另外一个重要的字段为state:
private transient volatile long state;
字段state表示当前锁的状态。它是一个long型,有64位,其中,倒数第8位表示写锁状态,如果该位为1,表示当前由写锁占用。
对于一次乐观读的操作,它会执行如下操作:
public long tryOptimisticRead() {
long s;
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
一次成功的乐观读必须保证当前锁没有写锁占用。其中WBIT用来获取写锁状态位,值为0x80。如果成功,则返回当前state的值(末尾7位清零,末尾7位表示当前正在读取的线程数量)。
如果在乐观读后,有线程申请了写锁,那么state的状态就会改变:
public long writeLock() {
long s, next; // bypass acquireWrite in fully unlocked case only
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : acquireWrite(false, 0L));
}
上述代码中第4行,设置写锁位为1(通过加上WBIT(0x80))。这样,就会改变state的取值。那么在乐观锁确认(validate)时,就会发现这个改动,而导致乐观锁失败。
public boolean validate(long stamp) {
// See above about current use of getLongVolatile here
return (stamp & SBITS) == (U.getLongVolatile(this, STATE) & SBITS);
}
上述validate()函数比较当前stamp和发生乐观锁时取得的stamp,如果不一致,则宣告乐观锁失败。
乐观锁失败后,则可以提升锁级别,使用悲观读锁。
public long readLock() {
long s, next; // bypass acquireRead on fully unlocked case only
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
next : acquireRead(false, 0L));
}
悲观读会尝试设置state状态,它会将state加1,用于统计读线程的数量。如果失败,则进入acquireRead()二次尝试锁获取。
在acquireRead()中,线程会在不同条件下进行若干次自旋,试图通过CAS操作获得锁。如果自旋宣告失败,则会启用CLH队列,将自己加到队列中。之后再进行自旋,如果发现自己成功获得了读锁,则会进一步把自己cowait队列中的读线程全部激活(使用Usafe.unpark()方法)。如果最终依然无法成功获得读锁,则会使用Unsafe.park()方法挂起当前线程。
方法acquireWrite()和acquireRead()也非常类似,也是通过自旋尝试、加入等待队列、直至最终Unsafe.park()挂起线程的逻辑进行的。释放锁时与加锁动作相反,以unlockWrite()为例:
public void unlockWrite(long stamp) {
WNode h;
if (state != stamp || (stamp & WBIT) == 0L)
throw new IllegalMonitorStateException();
state = (stamp += WBIT) == 0L ? ORIGIN : stamp;//将写标记位清零
if ((h = whead) != null && h.status != 0)
release(h);
}
上述代码,将写标记位清零,如果state发生溢出,则退回到初始值。
接着,如果等待队列不为空,则从等待队列中激活一个线程(绝大部分情况下是第1个等待线程)继续执行release(h)
。
StampedLock源码分析
https://blog.csdn.net/ryo1060732496/article/details/88973923
https://blog.csdn.net/huzhiqiangCSDN/article/details/76694836
还没有评论,来说两句吧...