【JUC并发编程】BlockingQueue实现原理(BlockingQueue接口/ Java阻塞队列)

旧城等待, 2023-09-29 13:34 65阅读 0赞

目录

    1. 简单回顾数据结构
    1. 数组结构
    1. 链表结构
    1. Lock锁使用回顾
    1. 什么是阻塞队列
    1. BlockingQueue接口
    1. 有界与无界区别
    1. Java里的阻塞队列
    • 8.1 ArrayBlockingQueue
    • 8.2 ArrayBlockingQueue
    • 8.3 ArrayBlockingQueue 实现生产者与消费者模型
    • 8.4 纯手写ArrayBlockingQueue
    • 8.5 LinkedBlockingQueue
    • 8.6 ArrayBlockingQueue 与LinkedBlockingQueue 区别

1. 简单回顾数据结构

队列:基于数组或者链表实现,先进先出,后进后出规则。

2. 数组结构

连续固定的内存空间,对内存要求较高;
在这里插入图片描述

优点:可以直接根据下标查询 时间复杂度为0(1) 支持随机访问;
缺点:增加、删除元素效率慢;

3. 链表结构

在这里插入图片描述
优点:插入删除速度快
缺点:不支持随机访问,需要从头查询到尾部 时间复杂度为o(n)

4. Lock锁使用回顾

ReentrantLock
lock():加锁操作,如果此时有竞争会进入等待队列中阻塞直到获取锁。
lockInterruptibly():加锁操作,但是优先支持响应中断。
tryLock():尝试获取锁,不等待,获取成功返回true,获取不成功直接返回false。
tryLock(long timeout, TimeUnit unit):尝试获取锁,在指定的时间内获取成功返回true,获取失败返回false。
unlock():释放锁。

Condition
通常和ReentrantLock一起使用的
await():阻塞当前线程,并释放锁。
signal():唤醒一个等待时间最长的线程。

  1. private static ReentrantLock lock = new ReentrantLock();
  2. private static Condition condition = lock.newCondition();
  3. public static void main(String[] args) {
  4. new Thread(() -> {
  5. try {
  6. lock.lock();
  7. System.out.println("1");
  8. condition.await();
  9. System.out.println("2");
  10. } catch (Exception e) {
  11. } finally {
  12. lock.unlock();
  13. }
  14. }).start();
  15. try {
  16. Thread.sleep(2000);
  17. } catch (Exception e) {
  18. }
  19. new Thread(new Runnable() {
  20. @Override
  21. public void run() {
  22. try {
  23. lock.lock();
  24. condition.signal();
  25. } catch (Exception e) {
  26. } finally {
  27. lock.unlock();
  28. }
  29. }
  30. }).start();
  31. }

5. 什么是阻塞队列

Java中的BlockingQueue接口是一个线程安全的存取队列,适用于生产者消费者的应用场景中,支持两个附加操作:
1.生产者线程会一直不断的往阻塞队列中放入数据,直到队列满了为止。队列满了后,生产者线程阻塞等待消费者线程取出数据。
2.消费者线程会一直不断的从阻塞队列中取出数据,直到队列空了为止。队列空了后,消费者线程阻塞等待生产者线程放入数据。

6. BlockingQueue接口

BlockingQueue提供四种不同的处理方法。


































方法 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(o) offer(o) put(o) offer(o, timeout, timeunit)
移除方法 remove(o) poll() take(o) poll(o, timeout, timeunit)
检查方法 element() peek()

抛出异常:

  • add: 插入数据时,如果阻塞队列满,那么抛出异常IllegalStateException,否则插入成功返回true。当使用有界(capacity-restricted queue)阻塞队列时,建议使用offer方法。
    llegalStateException - if the element cannot be added at this time due to capacity restrictions
    ClassCastException - if the class of the specified element prevents it from being added to this queue
    NullPointerException - if the specified element is null
    IllegalArgumentException - if some property of the specified element prevents it from being added to this queue
  • remove: 删除数据时,如果队列中有此数据,删除成功返回true,否则返回false。如果包含一个或者多个object,那么只移除一个就返回true。注意:remove(o)是BlockingQueue接口的方法,remove()是Queue接口的方法。
  • element: 如果队列为空,那么抛出异常NoSuchElementException。如果队列不为空,查询返回队列头部的数据,但是不移除数据,这点不同于remove(),element同样是Queue接口的方法。

返回特殊值:

  • offer: 插入数据时,如果阻塞队列没满,那么插入成功返回true,否则返回false。当使用有界(capacity-restricted queue)阻塞队列时,建议使用offer方法,不建议会抛出异常的add方法。
  • poll: 此方法是Queue接口的。如果队列不为空,查询、移除并返回队列头部元素。如果队列为空,那么返回null。
  • peek: 此方法是Queue接口的。如果队列为空,返回null,这点不同于poll。如果队列不为空,查询返回队列头部的数据,但是不移除数据,这点不同于remove()。

一直阻塞:

  • put: 插入数据时,如果队列已满,那么阻塞等待队列可用,等待期间如果被中断,那么抛出InterruptedException。
  • take: 查询、删除并返回队列头部元素,如果队列为空,那么阻塞等待队列可用,等待期间如果被中断,那么抛出InterruptedException。

超时退出:

  • offer: 插入数据时,如果队列已满,那么阻塞指定时间等待队列可用,等待期间如果被中断,那么抛出InterruptedException。如果插入成功,那么返回true,如果在达到指定时间后仍然队列不可用,那么返回false。
  • poll: 查询、删除并返回队列头部元素,如果队列为空,那么阻塞指定时间等待队列可用,等待期间如果被中断,那么抛出InterruptedException。如果删除成功,那么返回队列头部元素,如果在达到指定时间后仍然队列不可用,那么返回null。

Queue队列不能插入null,否则会抛出NullPointerException。

7. 有界与无界区别

有界就是队列有容量限制;
无界就是队列没有容量限制;—
如果当前队列容量限制是为(Integer.MAX_VALUE)
该队列容量是为无界队列

8. Java里的阻塞队列

ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
在这里插入图片描述

8.1 ArrayBlockingQueue

ArrayBlockingQueue是基于数组(array-based)的先进先出(FIFO)有界(bounded)阻塞队列。

  1. ArrayBlockingQueue是基于数组实现
  2. 存入方法 采用lock锁保证存取线程安全问题
  3. ArrayBlockingQueue 属于有界队列 默认的情况下 会创建指定
    大小的数组创建一个数组 名称=items
    如果现在设置队列容量限制过大的话,(缺陷 有可能会引发内存溢出的问题)
  4. ArrayBlockingQueue 读写都会使用到同一把锁
    2个线程 A线程做写的操作 B线程做读的操作

8.2 ArrayBlockingQueue

ArrayBlockingQueue是基于数组(array-based)的先进先出(FIFO)有界(bounded)阻塞队列。

  1. ArrayBlockingQueue是基于数组实现
  2. 存入方法 采用lock锁保证存取线程安全问题
  3. ArrayBlockingQueue 属于有界队列 默认的情况下 会创建指定
    大小的数组创建一个数组 名称=items
    如果现在设置队列容量限制过大的话,(缺陷 有可能会引发内存溢出的问题)
  4. ArrayBlockingQueue 读写都会使用到同一把锁
    2个线程 A线程做写的操作 B线程做读的操作

    // 有界
    BlockingQueue strings = new ArrayBlockingQueue(1);
    strings.offer(“xiaowang”);
    strings.offer(“xiaochao”);
    // 先进先出原则 取出xiaowang同时从队列中删除
    System.out.println(strings.poll());
    // 先进先出原则 取出xiaochao同时从队列中删除
    System.out.println(strings.poll());
    // 先进先出原则 null
    System.out.println(strings.poll());

strings.poll(3, TimeUnit.SECONDS)—如果3s内没有从队列中获取到内容,则当前线程会阻塞等待,超时时间为3s。
当队列满了,继续投递数据在队列中 当前线程会阻塞等待。
strings.offer(“xiaowang”, 3, TimeUnit.SECONDS);

8.3 ArrayBlockingQueue 实现生产者与消费者模型

在这里插入图片描述

  1. private static ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<String>(20);
  2. public static void main(String[] args) {
  3. new Thread(() -> {
  4. for (int i = 0; i <= 30; i++) {
  5. try {
  6. // 模拟生产者存入的线程速率 30毫秒
  7. Thread.sleep(30);
  8. String msg = i + "";
  9. boolean result = arrayBlockingQueue.offer(msg, 1, TimeUnit.SECONDS);
  10. System.out.println(Thread.currentThread().getName() + "生产者线程存入" + msg + "," + (result ? "成功" : "失败"));
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. }
  15. }, "生产者线程").start();
  16. new Thread(() -> {
  17. while (true) {
  18. String msg = arrayBlockingQueue.poll();
  19. if (msg != null)
  20. System.out.println(Thread.currentThread().getName() + "消费者消费:" + msg);
  21. try {
  22. // 模拟处理消费者线程处理业务逻辑的时间3s
  23. Thread.sleep(3000);
  24. } catch (Exception e) {
  25. }
  26. }
  27. }, "消费者线程").start();
  28. }

8.4 纯手写ArrayBlockingQueue

  1. public class DemoArrayBlockingQueue<E> {
  2. /**
  3. * 基于数组形式实现队列
  4. */
  5. private ArrayList<E> blockingQueue;
  6. private Lock lock = new ReentrantLock();
  7. private Condition condition = lock.newCondition();
  8. /**
  9. * 初始化队列容量
  10. */
  11. private int items;
  12. public DemoArrayBlockingQueue(int capacity) {
  13. this.items = capacity;
  14. blockingQueue = new ArrayList<E>(capacity);
  15. }
  16. public boolean offer(E e) {
  17. lock.lock();
  18. try {
  19. if (blockingQueue.size() == items)
  20. return false;
  21. else {
  22. blockingQueue.add(e);
  23. return true;
  24. }
  25. } finally {
  26. lock.unlock();
  27. }
  28. }
  29. /**
  30. * 阻塞队列
  31. *
  32. * @param e
  33. * @param timeout
  34. * @param unit
  35. * @return
  36. */
  37. public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
  38. lock.lock();
  39. try {
  40. long nanos = unit.toNanos(timeout);
  41. while (blockingQueue.size() == items) {
  42. // 如果当前队列满了 则阻塞等待
  43. if (nanos <= 0) {
  44. return false;
  45. }
  46. nanos = condition.awaitNanos(nanos);
  47. }
  48. blockingQueue.add(e);
  49. return true;
  50. } finally {
  51. lock.unlock();
  52. }
  53. }
  54. public E poll() {
  55. lock.lock();
  56. try {
  57. return (blockingQueue.size() == 0) ? null : dequeue();
  58. } finally {
  59. lock.unlock();
  60. }
  61. }
  62. public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  63. lock.lock();
  64. try {
  65. long nanos = unit.toNanos(timeout);
  66. // 没有获取到内容 则阻塞等待
  67. while (blockingQueue.size() == 0) {
  68. if (nanos <= 0) {
  69. return null;
  70. }
  71. nanos = condition.awaitNanos(nanos);
  72. }
  73. return dequeue();
  74. } finally {
  75. lock.unlock();
  76. }
  77. }
  78. private E dequeue() {
  79. E e = blockingQueue.get(0);// 取出该元素
  80. blockingQueue.remove(0);// 同时删除该元素
  81. return e;
  82. }
  83. public static void main(String[] args) throws InterruptedException {
  84. DemoArrayBlockingQueue<String> blockingQueue = new DemoArrayBlockingQueue<String>(2);
  85. blockingQueue.offer("xiaowang");
  86. blockingQueue.offer("xiaochao");
  87. // blockingQueue.offer("xiaodan", 3, TimeUnit.SECONDS);
  88. System.out.println(">2<<");
  89. System.out.println(blockingQueue.poll(3, TimeUnit.SECONDS));
  90. System.out.println(blockingQueue.poll(3, TimeUnit.SECONDS));
  91. System.out.println(blockingQueue.poll(3, TimeUnit.SECONDS));
  92. System.out.println("结束");
  93. }
  94. }

8.5 LinkedBlockingQueue

LinkedBlockingQueue是基于链表(linked nodes)的先进先出(FIFO)的可选界(optionally-bounded)的阻塞队列。

  1. //LinkedBlockingDeque默认是无界队列 底层采用链表实现
  2. LinkedBlockingDeque<String> strings = new LinkedBlockingDeque<>();
  3. strings.offer("xiaowang");
  4. strings.offer("xiaochao");
  5. System.out.println(strings.poll());
  6. System.out.println(strings.poll());
  7. System.out.println(strings.poll());

8.6 ArrayBlockingQueue 与LinkedBlockingQueue 区别

ArrayBlockingQueue 与LinkedBlockingQueue 区别:

  1. ArrayBlockingQueue 底层基于数组实现;
  2. LinkedBlockingQueue 底层基于链表实现;
  3. ArrayBlockingQueue 默认是有界队列;
  4. LinkedBlockingQueue 默认是无界队列 容量为 Integer.MAX_VALUE;
  5. ArrayBlockingQueue 读写采用同一把锁, LinkedBlockingQueue 锁是读写分离;
  6. LinkedBlockingQueue clear方法 同时清理两把锁
  7. LinkedBlockingQueue使用AtomicInteger计入个数,ArrayBlockingQueue int count计数

发表评论

表情:
评论列表 (有 0 条评论,65人围观)

还没有评论,来说两句吧...

相关阅读

    相关 BlockingQueue阻塞队列

    BlockingQueue简介 阻塞队列,顾名思义,首先它是一个队列, 通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出; 当队列是空的,从队列中获取元

    相关 阻塞队列BlockingQueue

    > 固定长度的队列往里放数据,如果放满了还要放,阻塞式队列就会等待,直到有数据取出,空出位置后才继续放;非阻塞式队列不能等待就只能报错了。 多线程环境中,通过队列可以很容易实