并发模式(三)——生产者-消费模式

朱雀 2021-09-14 06:24 389阅读 0赞

生产者-消费模式,通常有两类线程,即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程负责具体处理生产者提交的任务。两者之间通过共享内存缓冲去进行通信。

一、架构模式图:

SouthEast

类图:

SouthEast 1

生产者:提交用户请求,提取用户任务,并装入内存缓冲区;

消费者:在内存缓冲区中提取并处理任务;

内存缓冲区:缓存生产者提交的任务或数据,供消费者使用;

任务:生产者向内存缓冲区提交的数据结构;

Main:使用生产者和消费者的客户端。

二、代码实现一个基于生产者-消费者模式的求整数平方的并行计算:

(1)Producer生产者线程:

[java] view plain copy

  1. package ProducerConsumer;
  2. import java.util.Random;
  3. import java.util.concurrent.BlockingQueue;
  4. import java.util.concurrent.TimeUnit;
  5. import java.util.concurrent.atomic.AtomicInteger;
  6. public class Producer implements Runnable{
  7. //Volatile修饰的成员变量在每次被线程访问时,都强迫从共享内存中重读该成员变量的值。
  8. //而且,当成员变量发生变化时,强迫线程将变化值回写到共享内存。
  9. //这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值。
  10. private volatile boolean isRunning= true;
  11. //内存缓冲区
  12. private BlockingQueue queue;
  13. //总数,原子操作
  14. private static AtomicInteger count = new AtomicInteger();
  15. private static final int SLEEPTIME=1000;
  16. public Producer(BlockingQueue queue) {
  17. this.queue = queue;
  18. }
  19. @Override
  20. public void run() {
  21. PCData data=null;
  22. Random r = new Random();
  23. System.out.println(“start producer id = “+ Thread .currentThread().getId());
  24. try{
  25. while(isRunning){
  26. Thread.sleep(r.nextInt(SLEEPTIME));
  27. //构造任务数据
  28. data= new PCData(count.incrementAndGet());
  29. System.out.println(“data is put into queue “);
  30. //提交数据到缓冲区
  31. if(!queue.offer(data,2,TimeUnit.SECONDS)){
  32. System.out.println(“faile to put data: “+ data);
  33. }
  34. }
  35. }catch (InterruptedException e){
  36. e.printStackTrace();
  37. Thread.currentThread().interrupt();
  38. }
  39. }
  40. public void stop(){
  41. isRunning=false;
  42. }
  43. }

(2)Consumer消费者线程:

[java] view plain copy

  1. package ProducerConsumer;
  2. import java.text.MessageFormat;
  3. import java.util.Random;
  4. import java.util.concurrent.BlockingQueue;
  5. public class Consumer implements Runnable {
  6. //缓冲区
  7. private BlockingQueue queue;
  8. private static final int SLEEPTIME=1000;
  9. public Consumer(BlockingQueue queue) {
  10. this.queue = queue;
  11. }
  12. @Override
  13. public void run() {
  14. System.out.println(“start Consumer id= “+ Thread .currentThread().getId());
  15. Random r = new Random();
  16. try {
  17. //提取任务
  18. while(true){
  19. PCData data= queue.take();
  20. if(null!= data){
  21. //计算平方
  22. int re= data.getData()*data.getData();
  23. System.out.println(MessageFormat.format(“{0}*{1}={2}“,
  24. data.getData(),data.getData(),re
  25. ));
  26. Thread.sleep(r.nextInt(SLEEPTIME));
  27. }
  28. }
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. Thread.currentThread().interrupt();
  32. }
  33. }
  34. }

(3)PCData共享数据模型:

[java] view plain copy

  1. package ProducerConsumer;
  2. public final class PCData {
  3. private final int intData;
  4. public PCData(int d) {
  5. intData=d;
  6. }
  7. public PCData(String d) {
  8. intData=Integer.valueOf(d);
  9. }
  10. public int getData(){
  11. return intData;
  12. }
  13. @Override
  14. public String toString(){
  15. return “data:”+ intData ;
  16. }
  17. }

(4)Main函数:

[java] view plain copy

  1. package ProducerConsumer;
  2. import java.util.concurrent.BlockingQueue;
  3. import java.util.concurrent.Executor;
  4. import java.util.concurrent.ExecutorService;
  5. import java.util.concurrent.Executors;
  6. import java.util.concurrent.LinkedBlockingDeque;
  7. public class Main {
  8. /**
  9. * @param args
  10. */
  11. public static void main(String[] args) throws InterruptedException{
  12. //建立缓冲区
  13. BlockingQueue queue = new LinkedBlockingDeque(10);
  14. //建立生产者
  15. Producer producer1 = new Producer(queue);
  16. Producer producer2 = new Producer(queue);
  17. Producer producer3 = new Producer(queue);
  18. //建立消费者
  19. Consumer consumer1 = new Consumer(queue);
  20. Consumer consumer2 = new Consumer(queue);
  21. Consumer consumer3 = new Consumer(queue);
  22. //建立线程池
  23. ExecutorService service = Executors.newCachedThreadPool();
  24. //运行生产者
  25. service.execute(producer1);
  26. service.execute(producer2);
  27. service.execute(producer3);
  28. //运行消费者
  29. service.execute(consumer1);
  30. service.execute(consumer2);
  31. service.execute(consumer3);
  32. Thread.sleep(10*1000);
  33. //停止生产者
  34. producer1.stop();
  35. producer2.stop();
  36. producer3.stop();
  37. Thread.sleep(3000);
  38. service.shutdown();
  39. }
  40. }

三、注意:

  1. volatile关键字:Volatile修饰的成员变量在每次被线程访问时,都强迫从共享内存中重读该成员变量的值。而且,当成员变量发生变化时,强迫线程将变化值回写到共享内存。这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值。
  2. 生产-消费模式的核心组件是共享内存缓冲区,是两者的通信桥梁,起到解耦作用,优化系统整体结构。
  3. 由于缓冲区的存在,生产者和消费者,无论谁在某一局部时间内速度相对较高,都可以使用缓冲区得到缓解,保证系统正常运行,这在一定程度上缓解了性能瓶颈对系统系能的影响。

发表评论

表情:
评论列表 (有 0 条评论,389人围观)

还没有评论,来说两句吧...

相关阅读

    相关 并发模式)——生产者-消费模式

     生产者-消费模式,通常有两类线程,即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程负责具体处理生产者提交的任务。两者之间通过共享内存缓冲去进行通信