RabbitMQ:发布订阅模式

£神魔★判官ぃ 2024-04-21 08:08 174阅读 0赞

✨ RabbitMQ:发布订阅模式

  • 1.订阅模式基本介绍
  • 2.交换机
  • 3.发布订阅模式
    • 3.1基本介绍
    • 3.2生产者
    • 3.3消费者
    • 3.4测试

?个人主页:不断前进的皮卡丘
?博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记
?个人专栏:消息中间件

1.订阅模式基本介绍

在这里插入图片描述

在这里插入图片描述

  • P:生产者,发送消息给交换机
  • C:消费者,接收消息
  • X:交换机,一方面接收生产者发送的消息,另一方面知道怎么处理消息,是否应将其附加到特定队列?是否应将其附加到多个队列中?或者它应该被丢弃。其规则由*交换类型*定义。**
  • Queue:消息队列,接收消息,缓存消息
  • 每个消费者都监听自己的队列
  • 生产者把消息发送给broker,然后交换机把消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。

2.交换机

  • RabbitMQ 中消息传递模型的核心思想是,生产者从不将任何消息直接发送到队列。实际上,很多时候,生产者甚至不知道消息是否会传递到任何队列。相反,生产者只能将消息发送到_交换机_。交换机的工作是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面则将它们推送到队列。交换必须确切地知道如何处理它收到的消息。是否应将其附加到特定队列?是否应将其附加到多个队列中?或者它应该被丢弃。其规则由_交换类型_定义。
  • 交换机只负责转发消息,并没有存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机类型

  • Fanout:广播,将消息交给所有绑定到交换机的队列
  • Direct:定向,把消息交给符合指定routing key 的队列
  • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

3.发布订阅模式

3.1基本介绍

在这里插入图片描述

要配置一个fanout类型的交换机,不需要指定对应的路由key,同时会把消息路由到每一个消息队列中,每个消息队列都可以对相同的消息进行存储,然被由各自的消息队列相关联的消费者消费

3.2生产者

  1. public class Producer {
  2. public static String FANOUT_EXCHANGE = " fanout_exchange";
  3. public static String FANOUT_QUEUE_1 = "fanout_queue_1";
  4. public static String FANOUT_QUEUE_2 = "fanout_queue_2";
  5. public static void main(String[] args) {
  6. try {
  7. Channel channel = ConnectUtil.getChannel();
  8. //声明交换机(交换机名称,交换机类型)
  9. channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
  10. //声明队列
  11. channel.queueDeclare(FANOUT_QUEUE_1,true,false,false,null);
  12. channel.queueDeclare(FANOUT_QUEUE_2,true,false,false,null);
  13. //把交换机和队列进行绑定
  14. channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHANGE,"");
  15. channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,"");
  16. //发送消息
  17. for (int i = 1; i <=10 ; i++) {
  18. String msg="你好,小兔子,发布订阅模式 : "+i;
  19. channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes());
  20. }
  21. } catch (IOException e) {
  22. e.printStackTrace();
  23. } catch (TimeoutException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. }

3.3消费者

消费者1

  1. public class Consumer1 {
  2. public static void main(String[] args) {
  3. try {
  4. Channel channel = ConnectUtil.getChannel();
  5. channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);
  6. channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
  7. //把队列和交换机绑定 队列名称,交换机名称,路由key
  8. channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHANGE, "");
  9. //接受消息
  10. DefaultConsumer consumer = new DefaultConsumer(channel) {
  11. /**
  12. * 消费回调函数,当收到消息以后,会自动执行这个方法
  13. * @param consumerTag 消费者标识
  14. * @param envelope 消息包的内容(比如交换机,路由key,消息id等)
  15. * @param properties 属性信息
  16. * @param body 消息数据
  17. * @throws IOException
  18. */
  19. @Override
  20. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  21. System.out.println("消息者1接受到的消息:" + new String(body, "UTF-8"));
  22. }
  23. };
  24. //监听消息(队列名称,是否自动确认消息,消费对象)
  25. channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer);
  26. } catch (IOException e) {
  27. e.printStackTrace();
  28. } catch (TimeoutException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. }

消费者2

  1. public class Consumer2 {
  2. public static void main(String[] args) {
  3. try {
  4. Channel channel = ConnectUtil.getChannel();
  5. channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);
  6. channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
  7. //把队列和交换机绑定 队列名称,交换机名称,路由key
  8. channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHANGE, "");
  9. //接受消息
  10. DefaultConsumer consumer = new DefaultConsumer(channel) {
  11. /**
  12. * 消费回调函数,当收到消息以后,会自动执行这个方法
  13. * @param consumerTag 消费者标识
  14. * @param envelope 消息包的内容(比如交换机,路由key,消息id等)
  15. * @param properties 属性信息
  16. * @param body 消息数据
  17. * @throws IOException
  18. */
  19. @Override
  20. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  21. System.out.println("消息者2接受到的消息:" + new String(body, "UTF-8"));
  22. }
  23. };
  24. //监听消息(队列名称,是否自动确认消息,消费对象)
  25. channel.basicConsume(Producer.FANOUT_QUEUE_2, true, consumer);
  26. } catch (IOException e) {
  27. e.printStackTrace();
  28. } catch (TimeoutException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. }

3.4测试

在这里插入图片描述

在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,174人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RabbitMQ:发布订阅模式

    P:生产者,发送消息给交换机C:消费者,接收消息X:交换机,一方面接收生产者发送的消息,另一方面知道怎么处理消息,是否应将其附加到特定队列?是否应将其附加到多个队列中?或...