分析与实战:高性能读写印戳锁StampedLock

向右看齐 2024-03-17 11:14 98阅读 0赞

文章目录

    • 前言
    • StampedLock原理
    • StampedLock源码解读
      • CLH双向队列缓存阻塞节点
      • 加解锁使用CAS修改STATE
      • 乐观读和锁的转换
    • 实战演示
      • 1、创建缓存工具类
      • 2、创建测试用例
      • 3、查看测试结果
    • 写在最后

前言

在之前的博文中我们讲到了可重入读写锁ReentrantReadWriteLock,读写锁机制是读写互斥、写写互斥、读读共享。这里面就有一个问题,在线程加的读锁是一个悲观读锁,会阻塞其他线程加写锁。在我们高并发读多写少的场景下会影响写操作,验证将阻塞写操作以致于影响业务操作。那有没有一种方式可以在线程对资源加读锁后,资源还能够被加写锁呢,只要读锁线程在操作数据之前进行票据验证,不就能够达到数据的一致性了吗。所以,我们引入今天的主角——高性能读写印戳锁StampedLock。

StampedLock原理

StampedLock就如同它的含义一样,是一个贴上邮票印记的锁,我们叫它印戳锁。印戳印戳就是在进行加锁的时候会会返回一个印戳,在解锁的时候会传入这个印戳来保证加解锁为同一个线程。

和其他的锁机制一样,印戳锁StampedLock和AQS一样都是用CLH虚拟双向FIFO队列实现同步功能,并定义STATE值标识队列头部线程占用资源状态。由于StampedLock都是加的非公平锁,当队列头部节点线程获取到资源后会增加STATE值,释放锁会减少STATE值,如果STATE == 0会唤醒后续节点的自旋线程获取资源。

当然,印戳锁StampedLock最重要的是它的乐观读锁。大家都知道普通读写锁是互斥的,加了读锁是不能够加写锁的。但是印戳锁加了乐观读锁,其他线程是可以继续加写锁,只是在乐观读锁线程进行数据操作时候需要进行数据验证,以保证数据的一致性。

StampedLock源码解读

CLH双向队列缓存阻塞节点

对于StampedLock的源码解读,其实意义并不是很大。大概就是提供了虚拟CLH双向队列来保存阻塞节点,通过STATE值来表示资源是否能够被加锁。如下源码所示,提供了CLH双向队列来满足锁机制:

  1. //等待节点
  2. static final class WNode {
  3. volatile WNode prev;
  4. volatile WNode next;
  5. volatile WNode cowait; // list of linked readers
  6. volatile Thread thread; // non-null while possibly parked
  7. volatile int status; // 0, WAITING, or CANCELLED
  8. final int mode; // RMODE or WMODE
  9. WNode(int m, WNode p) { mode = m; prev = p; }
  10. }

加解锁使用CAS修改STATE

查看加解锁源码:

  1. //获取普通阻塞写锁
  2. public long writeLock() {
  3. long s, next; // bypass acquireWrite in fully unlocked case only
  4. return ((((s = state) & ABITS) == 0L &&
  5. U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
  6. next : acquireWrite(false, 0L));
  7. }
  8. /**
  9. *
  10. *获取非阻塞写锁
  11. */
  12. public long tryWriteLock() {
  13. long s, next;
  14. return ((((s = state) & ABITS) == 0L &&
  15. U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
  16. next : 0L);
  17. }
  18. //获取普通阻塞读锁
  19. public long readLock() {
  20. long s = state, next; // bypass acquireRead on common uncontended case
  21. return ((whead == wtail && (s & ABITS) < RFULL &&
  22. U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
  23. next : acquireRead(false, 0L));
  24. }
  25. /**
  26. * 获取非阻塞读锁
  27. */
  28. public long tryReadLock() {
  29. for (;;) {
  30. long s, m, next;
  31. if ((m = (s = state) & ABITS) == WBIT)
  32. return 0L;
  33. else if (m < RFULL) {
  34. if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
  35. return next;
  36. }
  37. else if ((next = tryIncReaderOverflow(s)) != 0L)
  38. return next;
  39. }
  40. }
  41. //写锁解锁
  42. public void unlockWrite(long stamp) {
  43. WNode h;
  44. if (state != stamp || (stamp & WBIT) == 0L)
  45. throw new IllegalMonitorStateException();
  46. state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
  47. if ((h = whead) != null && h.status != 0)
  48. release(h);
  49. }
  50. //读锁解锁
  51. public void unlockRead(long stamp) {
  52. long s, m; WNode h;
  53. for (;;) {
  54. if (((s = state) & SBITS) != (stamp & SBITS) ||
  55. (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
  56. throw new IllegalMonitorStateException();
  57. if (m < RFULL) {
  58. if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
  59. if (m == RUNIT && (h = whead) != null && h.status != 0)
  60. release(h);
  61. break;
  62. }
  63. }
  64. else if (tryDecReaderOverflow(s) != 0L)
  65. break;
  66. }
  67. }

如上源码所示,印戳锁的加解锁都提供了阻塞与非阻塞机制。加锁后通过CAS增加STATE值,并返回stamp票据,在解锁后通过传入的stamp票据进行线程验证保证加解锁为同一个线程,并使用CAS减少STATE值。解锁操作的同时会验证当前节点是否是头节点,是否存在后续节点,如果存在后续阻塞节点会唤醒该节点占用资源。

乐观读和锁的转换

印戳锁提供了乐观读锁和读锁与写锁转换、写锁和读锁转换方法。

  1. //获取乐观锁并返回票据
  2. public long tryOptimisticRead() {
  3. long s;
  4. return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
  5. }
  6. //验证票据是否改变
  7. public boolean validate(long stamp) {
  8. U.loadFence();
  9. return (stamp & SBITS) == (state & SBITS);
  10. }

如上所示,StampedLock提供获取乐观锁的方法,获取到乐观读锁并返回一个票据。我们在实际业务中需要使用时需要通过票据再次验证是否已经更改,如果更改则表示数据已经失效,需要重新获取。

对于锁的转换我们继续查看源码:

  1. /**
  2. * 将锁转为写锁
  3. */
  4. public long tryConvertToWriteLock(long stamp) {
  5. long a = stamp & ABITS, m, s, next;
  6. while (((s = state) & SBITS) == (stamp & SBITS)) {
  7. if ((m = s & ABITS) == 0L) {
  8. if (a != 0L)
  9. break;
  10. if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
  11. return next;
  12. }
  13. else if (m == WBIT) {
  14. if (a != m)
  15. break;
  16. return stamp;
  17. }
  18. else if (m == RUNIT && a != 0L) {
  19. if (U.compareAndSwapLong(this, STATE, s,
  20. next = s - RUNIT + WBIT))
  21. return next;
  22. }
  23. else
  24. break;
  25. }
  26. return 0L;
  27. }
  28. /**
  29. * 将锁转为读锁
  30. */
  31. public long tryConvertToReadLock(long stamp) {
  32. long a = stamp & ABITS, m, s, next; WNode h;
  33. while (((s = state) & SBITS) == (stamp & SBITS)) {
  34. if ((m = s & ABITS) == 0L) {
  35. if (a != 0L)
  36. break;
  37. else if (m < RFULL) {
  38. if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
  39. return next;
  40. }
  41. else if ((next = tryIncReaderOverflow(s)) != 0L)
  42. return next;
  43. }
  44. else if (m == WBIT) {
  45. if (a != m)
  46. break;
  47. state = next = s + (WBIT + RUNIT);
  48. if ((h = whead) != null && h.status != 0)
  49. release(h);
  50. return next;
  51. }
  52. else if (a != 0L && a < WBIT)
  53. return stamp;
  54. else
  55. break;
  56. }
  57. return 0L;
  58. }
  59. /**
  60. * 将锁转为客观锁
  61. */
  62. public long tryConvertToOptimisticRead(long stamp) {
  63. long a = stamp & ABITS, m, s, next; WNode h;
  64. U.loadFence();
  65. for (;;) {
  66. if (((s = state) & SBITS) != (stamp & SBITS))
  67. break;
  68. if ((m = s & ABITS) == 0L) {
  69. if (a != 0L)
  70. break;
  71. return s;
  72. }
  73. else if (m == WBIT) {
  74. if (a != m)
  75. break;
  76. state = next = (s += WBIT) == 0L ? ORIGIN : s;
  77. if ((h = whead) != null && h.status != 0)
  78. release(h);
  79. return next;
  80. }
  81. else if (a == 0L || a >= WBIT)
  82. break;
  83. else if (m < RFULL) {
  84. if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT)) {
  85. if (m == RUNIT && (h = whead) != null && h.status != 0)
  86. release(h);
  87. return next & SBITS;
  88. }
  89. }
  90. else if ((next = tryDecReaderOverflow(s)) != 0L)
  91. return next & SBITS;
  92. }
  93. return 0L;
  94. }

如上源码所示,印戳锁提供了读锁转为写锁、写锁转为读锁、写锁可以转为乐观读锁、读锁转为乐观读锁。需要注意的是在进行乐观读锁转换后,需要传入票据验证数据是否已经失效。

实战演示

其实印戳锁StampedLock的使用场景就是读多写少的场景,这种场景普通读写锁也能够满足。但是印戳锁增加了乐观读机制,这样会让写锁更加的高效。

以下实战演示用内存缓存数据进行演示,实际生产中不建议内存保存缓存,应当采用响应的内存缓存数据库,比如redis等等。

1、创建缓存工具类

  1. /**
  2. * StampedLockDemo
  3. * 模拟缓存
  4. * @author senfel
  5. * @version 1.0
  6. * @date 2023/5/25 11:23
  7. */
  8. @Slf4j
  9. public class StampedLockDemo {
  10. /**
  11. * 创建一个印戳锁
  12. */
  13. private static final StampedLock stampedLock = new StampedLock();
  14. /**
  15. * map内存缓存模拟缓存
  16. * 实际使用场景建议用缓存数据库redis等
  17. */
  18. private static final Map<String,String> cacheMap = new HashMap<>();
  19. /**
  20. * 添加缓存
  21. * @param key
  22. * @param value
  23. * @author senfel
  24. * @date 2023/5/25 11:26
  25. * @return void
  26. */
  27. public static Boolean putCache(String key,String value){
  28. long stamp = stampedLock.writeLock();
  29. try{
  30. //获取到写锁
  31. log.error("线程{}获取到写锁,进行缓存写入",Thread.currentThread().getName());
  32. cacheMap.put(key,value);
  33. log.error("线程{}写入数据当前缓存中的数据有:{}",Thread.currentThread().getName(), JSONObject.toJSONString(cacheMap));
  34. return true;
  35. }catch (Exception e){
  36. log.error("线程{}添加缓存异常:{}",Thread.currentThread().getName(),e.getMessage());
  37. return false;
  38. }finally {
  39. stampedLock.unlockWrite(stamp);
  40. }
  41. }
  42. /**
  43. * 获取缓存
  44. * @param key
  45. * @author senfel
  46. * @date 2023/5/25 11:34
  47. * @return java.lang.String
  48. */
  49. public static String getCache(String key){
  50. try{
  51. long stamp = stampedLock.tryOptimisticRead();
  52. String value = null;
  53. if(0 != stamp){
  54. log.error("线程{}获取到乐观读锁",Thread.currentThread().getName());
  55. value = cacheMap.get(key);
  56. if(!stampedLock.validate(stamp)){
  57. //校验不通过降级为悲观读
  58. log.error("线程{}获取到乐观读锁校验不通过降级为悲观读",Thread.currentThread().getName());
  59. try {
  60. stamp = stampedLock.readLock();
  61. value = cacheMap.get(key);
  62. }catch (Exception e){
  63. throw e;
  64. }finally {
  65. stampedLock.unlockRead(stamp);
  66. }
  67. }
  68. }else{
  69. log.error("线程{}未获取到乐观读锁,尝试悲观读",Thread.currentThread().getName());
  70. try {
  71. stamp = stampedLock.readLock();
  72. value = cacheMap.get(key);
  73. }catch (Exception e){
  74. throw e;
  75. }finally {
  76. stampedLock.unlockRead(stamp);
  77. }
  78. }
  79. log.error("线程{}获取到的数据为:{}",Thread.currentThread().getName(), value);
  80. return value;
  81. }catch (Exception e){
  82. log.error("线程{}获取缓存异常:{}",Thread.currentThread().getName(),e.getMessage());
  83. }
  84. return null;
  85. }
  86. }

2、创建测试用例

  1. /**
  2. * 印戳锁测试
  3. * @author senfel
  4. * @date 2023/5/25 12:38
  5. * @return void
  6. */
  7. @Test
  8. public void stampedLockTest() throws Exception{
  9. ExecutorService executorService = Executors.newFixedThreadPool(10);
  10. //等待结束
  11. CountDownLatch countDownLatch = new CountDownLatch(6);
  12. //保证同时执行
  13. CyclicBarrier cyclicBarrier = new CyclicBarrier(6);
  14. for(int i=0;i<3;i++){
  15. int finalI = i;
  16. executorService.execute(new Runnable() {
  17. @Override
  18. public void run() {
  19. try{
  20. cyclicBarrier.await();
  21. }catch (Exception e){
  22. log.error("线程相互等待异常:{}",e.getMessage());
  23. }
  24. StampedLockDemo.putCache(finalI +"", "senfel"+finalI);
  25. countDownLatch.countDown();
  26. }
  27. });
  28. }
  29. for(int i=0;i<3;i++){
  30. int finalI = i;
  31. executorService.execute(new Runnable() {
  32. @Override
  33. public void run() {
  34. try{
  35. cyclicBarrier.await();
  36. }catch (Exception e){
  37. log.error("线程相互等待异常:{}",e.getMessage());
  38. }
  39. StampedLockDemo.getCache(finalI+"");
  40. countDownLatch.countDown();
  41. }
  42. });
  43. }
  44. countDownLatch.await();
  45. executorService.shutdown();
  46. log.error("测试完成,关闭线程池");
  47. }

3、查看测试结果

线程pool-1-thread-4获取到乐观读锁
线程pool-1-thread-2获取到写锁,进行缓存写入
线程pool-1-thread-5获取到乐观读锁
线程pool-1-thread-6获取到乐观读锁
线程pool-1-thread-4获取到乐观读锁校验不通过降级为悲观读
线程pool-1-thread-5获取到乐观读锁校验不通过降级为悲观读
线程pool-1-thread-6获取到乐观读锁校验不通过降级为悲观读
线程pool-1-thread-2写入数据当前缓存中的数据有:{“1”:“senfel1”}
线程pool-1-thread-3获取到写锁,进行缓存写入
线程pool-1-thread-3写入数据当前缓存中的数据有:{“1”:“senfel1”,“2”:“senfel2”}
线程pool-1-thread-1获取到写锁,进行缓存写入
线程pool-1-thread-1写入数据当前缓存中的数据有:{“0”:“senfel0”,“1”:“senfel1”,“2”:“senfel2”}
线程pool-1-thread-5获取到的数据为:senfel1
线程pool-1-thread-6获取到的数据为:senfel2
线程pool-1-thread-4获取到的数据为:senfel0

写在最后

印戳锁StampedLock是一个高性能的读写锁,使用CLH队列来保存等待线程节点,STATE来标识资源加解锁。印戳锁内部提供了读锁、写锁、乐观读锁,当使用乐观读锁的时候其他线程是可以继续加写锁,只是在乐观读锁线程进行数据操作时候需要进行数据验证,以保证数据一致性。

发表评论

表情:
评论列表 (有 0 条评论,98人围观)

还没有评论,来说两句吧...

相关阅读