生产者-消费者问题 柔情只为你懂 2021-09-29 20:32 406阅读 0赞 生产者消费者问题是研究多线程程序时绕不开的经典问题之一,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。解决生产者 / 消费者问题的方法可分为两类:(1)采用某种机制保护生产者和消费者之间的同步;(2)在生产者和消费者之间建立一个管道。第一种方式有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。第二种管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。因此本文只介绍同步机制实现的生产者 / 消费者问题。 同步问题核心在于:如何保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用信号或加锁机制,保证资源在任意时刻至多被一个线程访问。Java 语言在多线程编程上实现了完全对象化,提供了对同步机制的良好支持。在 Java 中一共有四种方法支持同步,其中前三个是同步方法,一个是管道方法。 (1)wait() / notify() 方法 (2)await() / signal() 方法 (3)BlockingQueue 阻塞队列方法 (4)PipedInputStream / PipedOutputStream 本文只介绍最常用的前三种,第四种暂不做讨论,有兴趣的读者可以自己去网上找答案。 **补充:实现代码,我在原来的代码的基础上引入了继承关系,实现对扩展开放,对修改封闭的开闭设计原则。** ### 一、wait() / notify() 方法 ### wait() / nofity() 方法是基类 Object 的两个方法,也就意味着所有 Java 类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制。 wait() 方法:当缓冲区已满 / 空时,生产者 / 消费者线程停止自己的执行,放弃锁,使自己处于等等状态,让其他线程执行。 notify() 方法:当生产者 / 消费者向缓冲区放入 / 取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。 **【Storage仓库抽象类】** public abstract class Storage { /** * 生产方法 * @param num 生产数量 * */ abstract public void produce(int num); /** * 消费方法 * @param num 消费数量 * */ abstract public void consume(int num); } **【StorageByWait仓库实现类】** /** * @author * 仓库类 * 通过object超类的wait/notify方法实现同步 */ public class StorageByWait extends Storage { /** * 仓库容量 * */ final private int MAX_SIZE = 100; private LinkedList<Object> list = new LinkedList<>(); /** * 生产方法 * */ @Override public void produce(int num) { synchronized (list) { // 无法放入仓库,一直阻塞 while(list.size() + num > MAX_SIZE) { System.out.println("仓库容量:" + list.size() + " 生产物品:" + num); System.out.println("此时无法执行生产任务。。。"); try { // 阻塞生产 list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 生产并放入仓库 for (int i=0; i<num; i++) { list.add(new Object()); } System.out.println("生产:" + num + " 当前仓库容量:" + list.size()); // 唤醒所有符合条件 list.notifyAll(); } } /** * 消费方法 * */ @Override public void consume(int num) { synchronized (list) { // 无法获取物品,一直阻塞 while (list.size() < num) { System.out.println("仓库容量:" + list.size() + " 消费物品:" + num); System.out.println("此时无法执行消费任务!!!"); try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 消费物品 for (int i=0; i<num; i++) { list.remove(); } System.out.println("消费:" + num + " 当前仓库容量:" + list.size()); // 唤醒所有符合条件 list.notifyAll(); } } public void setList(LinkedList<Object> list) { this.list = list; } public int getMAX_SIZE() { return MAX_SIZE; } public LinkedList<Object> getList() { return list; } } **【Producer生产者类】** /** * @author * 生产者类 */ public class Producer extends Thread { /** * 生产数量 * */ private int num; /** * 仓库类 * */ private Storage storage; public Producer(Storage storage) { this.storage = storage; } public void produce() { storage.produce(num); } @Override public void run() { produce(); } /** * setter and getter * */ public void setNum(int num) { this.num = num; } public int getNum() { return num; } } 【Consume消费者类】 /** * @author */ public class Consumer extends Thread{ /** * 消费数量 * */ private int num; /** * 仓库类 * */ private Storage storage; public Consumer(Storage storage) { this.storage = storage; } public void consume() { storage.consume(num); } @Override public void run() { consume(); } /** * setter and getter * */ public void setNum(int num) { this.num = num; } public int getNum() { return num; } } 【Test测试类】 /** * @author * 测试环境 */ public class Test { public static void main(String[] args) { Storage storage = new StorageByWait(); // Storage storage = new StorageByAwait(); // Storage storage = new StorageByBlockingQueue(); Producer producer1 = new Producer(storage); Producer producer2 = new Producer(storage); Producer producer3 = new Producer(storage); Producer producer4 = new Producer(storage); Consumer consumer1 = new Consumer(storage); Consumer consumer2 = new Consumer(storage); Consumer consumer3 = new Consumer(storage); producer1.setNum(20); producer2.setNum(30); producer3.setNum(40); producer4.setNum(90); consumer1.setNum(40); consumer2.setNum(40); consumer3.setNum(60); consumer1.start(); consumer2.start(); consumer3.start(); producer1.start(); producer2.start(); producer3.start(); producer4.start(); } } 看完上述代码,对 wait() / notify() 方法实现的同步有了了解。 当然,当生产和消费设置的数量不合适的时候,会产生死锁,这个是随机的,因为在main线程中,消费者线程和生产者线程获取CPU的机会不是程序控制的,是由调度算法决定。 -------------------- ### 二、await() / signal() 方法 ### 在 JDK5.0 之后,Java 提供了更加健壮的线程处理机制,包括同步、锁定、线程池等,它们可以实现更细粒度的线程控制。await() 和 signal() 就是其中用来做同步的两种方法,它们的功能基本上和 wait() / nofity() 相同,完全可以取代它们,但是它们和新引入的锁定机制 Lock 直接挂钩,具有更大的灵活性。\*\*通过在 Lock 对象上调用 newCondition() 方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。\*\*下面来看代码: **【StorageByAwait 仓库实现类】** /** * @author * 仓库类 * 通过lock类的await/single实现同步 */ public class StorageByAwait extends Storage { private final int MAX_SIZE = 100; LinkedList<Object> list = new LinkedList<>(); /** * 锁 */ Lock lock = new ReentrantLock(); /** * 声明满状态和空状态 */ private final Condition full = lock.newCondition(); private final Condition empty = lock.newCondition(); /** * 生产方法 */ @Override public void produce(int num) { // 加锁 lock.lock(); while (list.size() + num > MAX_SIZE) { System.out.println("仓库容量:" + list.size() + " 生产物品:" + num); System.out.println("此时无法执行生产任务。。。"); try { // 满状态时 阻塞 full.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 生产 for (int i = 0; i < num; i++) { list.add(new Object()); } System.out.println("生产:" + num + " 当前仓库容量:" + list.size()); // 唤醒其他任务 full.signalAll(); empty.signalAll(); // 解锁 lock.unlock(); } /** * 消费方法 */ @Override public void consume(int num) { lock.lock(); // 无法获取物品,一直阻塞 while (list.size() < num) { System.out.println("仓库容量:" + list.size() + " 消费物品:" + num); System.out.println("此时无法执行消费任务!!!"); try { empty.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 消费物品 for (int i = 0; i < num; i++) { list.remove(); } System.out.println("消费:" + num + " 当前仓库容量:" + list.size()); // 唤醒所有符合条件 full.signalAll(); empty.signalAll(); lock.unlock(); } } 你可能会对 Storage 类中为什么要定义 public void produce(int num); 和 public void consume(int num); 方法感到不解,为什么不直接在生产者类 Producer 和消费者类 Consumer 中实现这两个方法,却要调用 Storage 类中的实现呢? 通过这种方式,我们添加其他的实现方式,只需要新增仓库类 Storage 的实现类代码即可,生产者 Producer、消费者 Consumer、测试类 Test 的代码均不需要进行任何更改。这样我们就知道为神马我要在 Storage 类中定义 public void produce(int num); 和 public void consume(int num); 方法,并在生产者类 Producer 和消费者类 Consumer 中调用 Storage 类中的实现了吧。 ### 三、BlockingQueue 阻塞队列方法 ### BlockingQueue 是 JDK5.0 的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第 2 种 await() / signal() 方法。它可以在生成对象时指定容量大小。它用于阻塞操作的是 put() 和 take() 方法。 put() 方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。 take() 方法:类似于我们上面的消费者线程,容量为 0 时,自动阻塞。 关于 BlockingQueue 的内容网上有很多,大家可以自己搜,我在这不多介绍。下面直接看代码,跟以往一样,我们只需要更改仓库类 Storage 的代码即可: 【StorageByBlockingQueue 仓库实现类】 /** * @author */ public class StorageByBlockingQueue extends Storage{ /** * 仓库容量 * */ final private int MAX_SIZE = 100; /** * 阻塞队列作为仓库 * 自动实现满阻塞和空阻塞 * */ private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<>(100); @Override public void produce(int num) { if(list.size() == MAX_SIZE) { System.out.println("仓库已满!不能存货!存货为:" + MAX_SIZE); } for (int i=0; i<num; i++) { try { list.put(new Object()); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("存货成功,当前仓库存货为:" + list.size()); } } @Override public void consume(int num) { if(list.size() == 0) { System.out.println("仓库已空!不能取货!存货为:" + 0); } for (int i=0; i<num; i++) { try { list.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("取货成功,当前仓库存货为:" + list.size()); } } } 当然,你会发现这时对于 public void produce(int num); 和 public void consume(int num); 方法业务逻辑上的实现跟前面两个例子不太一样,没关系,这个例子只是为了说明 BlockingQueue 阻塞队列的使用。 有时使用 BlockingQueue 可能会出现 put() 和 System.out.println() 输出不匹配的情况,这是由于它们之间没有同步造成的。当缓冲区已满,生产者在 put() 操作时,put() 内部调用了 await() 方法,放弃了线程的执行,然后消费者线程执行,调用 take() 方法,take() 内部调用了 signal() 方法,通知生产者线程可以执行,致使在消费者的 println() 还没运行的情况下生产者的 println() 先被执行,所以有了输出不匹配的情况。 对于 BlockingQueue 大家可以放心使用,这可不是它的问题,只是在它和别的对象之间的同步有问题。
还没有评论,来说两句吧...