RabbitMQ:订阅模型-消息订阅模式

淡淡的烟草味﹌ 2023-09-23 20:12 116阅读 0赞

订阅模型-消息订阅模式,也可以称为广播模式,生产者将消息发送到 Exchange,Exchange 再转发到与之绑定的 Queue中,每个消费者再到自己的 Queue 中取消息。

RabbitMQ 单生产单消费模型主要有以下五个角色构成:

  • 生产者(producer/ publisher):一个发送消息的用户应用程序。
  • 交换机(Exchange) :在 RabbitMQ 的消息传递模型中,对于 Exchange 的核心思想就是:生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机。交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。Exchanges 的类型:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)
  • 消费者1(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序,领取任务并完成
  • 消费者2(consumer):领取任务并完成…
  • 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

文章目录

    • 一、RabbitMQ 订阅模型-消息订阅(Fanout)模式
        • 1、RabbitMQ 消息订阅(Fanout)模式
        • 2、消息订阅(Fanout)模式组成
        • 3、消息订阅(Fanout)模式流程
    • 二、RabbitMQ 订阅模型-消息订阅(Fanout)模式实现
        • 1、添加 Maven 依赖
        • 2、封装工具类 ConnectionUtil
        • 3、生产者实现
        • 4、消费者-1 实现
        • 5、消费者-2 实现
        • 6、消费者-3 实现
    • 三、订阅模型 三种模式区别
        • 1、RabbitMQ 消息订阅(Fanout)模式
        • 2、RabbitMQ 路由(direct)模式
        • 3、RabbitMQ 主题(topic)模式

一、RabbitMQ 订阅模型-消息订阅(Fanout)模式

1、RabbitMQ 消息订阅(Fanout)模式

订阅模型-消息订阅模式,也可以称为广播模式,生产者将消息发送到 Exchange,Exchange 再转发到与之绑定的 Queue中,每个消费者再到自己的 Queue 中取消息。

image-20221201012238266

2、消息订阅(Fanout)模式组成

RabbitMQ 订阅模型-消息订阅(Fanout)模式主要有以下五个角色构成:

  • 生产者(producer/ publisher):一个发送消息的用户应用程序。
  • 交换机(Exchange) :在 RabbitMQ 的消息传递模型中,对于 Exchange 的核心思想就是:生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机。交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。Exchanges 的类型:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)
  • 消费者1(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序,领取任务并完成
  • 消费者2(consumer):领取任务并完成…
  • 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

3、消息订阅(Fanout)模式流程

消息订阅(Fanout)模式流程:

  1. 消息订阅(Fanout)模式 可以有多个消费者
  2. 每个消费者有自己的 queue(队列)
  3. 每个队列都要绑定到 Exchange(交换机)
  4. 生产者发送的消息,只能发送到交换机,交换机来决定要发给那个队列,生产者无法决定
  5. 交换机把消息发送给绑定过的所有队列
  6. 队列的消费者都能拿到消息,实现一条消息被多个消费者消费

二、RabbitMQ 订阅模型-消息订阅(Fanout)模式实现

1、添加 Maven 依赖

# 在 pom.xml 文件中添加以下依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>com.rabbitmq</groupId>
  4. <artifactId>amqp-client</artifactId>
  5. <version>5.16.0</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>junit</groupId>
  9. <artifactId>junit</artifactId>
  10. <version>4.13.2</version>
  11. <scope>test</scope>
  12. </dependency>
  13. ...
  14. </dependencies>

2、封装工具类 ConnectionUtil

  1. package com.lizhengi.fanout;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. /**
  6. * @author liziheng
  7. * @version 1.0.0
  8. * @description 连接工具类封装
  9. * @date 2022-12-26 11:39 上午
  10. **/
  11. public class ConnectionUtil {
  12. /**
  13. * 创建 MQ 连接工厂对象
  14. */
  15. private static final ConnectionFactory CONNECTION_FACTORY;
  16. // 类加载时,重量级资源加载
  17. static {
  18. CONNECTION_FACTORY = new ConnectionFactory();
  19. //设置连接MQ主机
  20. CONNECTION_FACTORY.setHost("127.0.0.1");
  21. //设置连接MQ端口号
  22. CONNECTION_FACTORY.setPort(5672);
  23. //设置连接MQ虚拟主机
  24. CONNECTION_FACTORY.setVirtualHost("/test");
  25. //设置连接MQ虚拟主机的用户名密码
  26. CONNECTION_FACTORY.setUsername("admin");
  27. CONNECTION_FACTORY.setPassword("123456");
  28. }
  29. /**
  30. * 建立与RabbitMQ连接 工具方法
  31. *
  32. * @return Connection
  33. */
  34. public static Connection getConnection() {
  35. try {
  36. //返回连接对象
  37. return CONNECTION_FACTORY.newConnection();
  38. } catch (Exception e) {
  39. e.printStackTrace();
  40. }
  41. return null;
  42. }
  43. /**
  44. * 关闭通道和连接 工具方法
  45. *
  46. * @param channel Channel
  47. * @param connection Connection
  48. */
  49. public static void closeConnectionAndChanel(Channel channel, Connection connection) {
  50. try {
  51. if (channel != null) {
  52. channel.close();
  53. }
  54. if (connection != null) {
  55. connection.close();
  56. }
  57. } catch (Exception e) {
  58. e.printStackTrace();
  59. }
  60. }
  61. }

3、生产者实现

  1. package com.lizhengi.fanout;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import java.io.IOException;
  5. /**
  6. * @author liziheng
  7. * @version 1.0.0
  8. * @description 订阅模型-消息订阅(Fanout)模式 生产者
  9. * @date 2022-12-26 11:44 上午
  10. **/
  11. public class Producer {
  12. public static void main(String[] args) throws IOException {
  13. // 获取连接对象
  14. Connection connection = ConnectionUtil.getConnection();
  15. assert connection != null;
  16. Channel channel = connection.createChannel();
  17. /* 为通道声明交换机
  18. * 参数1:交换机名称;参数2:交换机类型
  19. */
  20. channel.exchangeDeclare("logs", "fanout");
  21. //发送消息
  22. for (int i = 0; i < 100; i++) {
  23. channel.basicPublish("logs", "", null, ("fanout type message" + i).getBytes());
  24. }
  25. //释放资源
  26. ConnectionUtil.closeConnectionAndChanel(channel, connection);
  27. }
  28. }

4、消费者-1 实现

  1. package com.lizhengi.fanout;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. /**
  5. * @author liziheng
  6. * @version 1.0.0
  7. * @description 订阅模型-消息订阅(Fanout)模式 消费者
  8. * @date 2022-12-26 11:45 上午
  9. **/
  10. public class Customer1 {
  11. public static void main(String[] args) throws IOException {
  12. // 获取连接对象
  13. Connection connection = ConnectionUtil.getConnection();
  14. assert connection != null;
  15. Channel channel = connection.createChannel();
  16. // 绑定交换机
  17. channel.exchangeDeclare("logs", "fanout");
  18. // 为单个 Customer 创建临时队列
  19. String queueName = channel.queueDeclare().getQueue();
  20. // 绑定交换机和队列
  21. // 参数1:队列名字;参数2:交换机名称;参数3:routingKey么啥意义
  22. channel.queueBind(queueName, "logs", "");
  23. // 消费消息
  24. channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
  25. @Override
  26. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
  27. System.out.println("消费者1:" + new String(body));
  28. }
  29. });
  30. }
  31. }

5、消费者-2 实现

  1. package com.lizhengi.fanout;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. /**
  5. * @author liziheng
  6. * @version 1.0.0
  7. * @description 订阅模型-消息订阅(Fanout)模式 消费者
  8. * @date 2022-12-26 11:45 上午
  9. **/
  10. public class Customer2 {
  11. public static void main(String[] args) throws IOException {
  12. // 获取连接对象
  13. Connection connection = ConnectionUtil.getConnection();
  14. assert connection != null;
  15. Channel channel = connection.createChannel();
  16. // 绑定交换机
  17. channel.exchangeDeclare("logs", "fanout");
  18. // 为单个 Customer 创建临时队列
  19. String queueName = channel.queueDeclare().getQueue();
  20. // 绑定交换机和队列
  21. // 参数1:队列名字;参数2:交换机名称;参数3:routingKey么啥意义
  22. channel.queueBind(queueName, "logs", "");
  23. // 消费消息
  24. channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
  25. @Override
  26. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
  27. System.out.println("消费者2:" + new String(body));
  28. }
  29. });
  30. }
  31. }

6、消费者-3 实现

  1. package com.lizhengi.fanout;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. /**
  5. * @author liziheng
  6. * @version 1.0.0
  7. * @description 订阅模型-消息订阅(Fanout)模式 消费者
  8. * @date 2022-12-26 11:45 上午
  9. **/
  10. public class Customer3 {
  11. public static void main(String[] args) throws IOException {
  12. // 获取连接对象
  13. Connection connection = ConnectionUtil.getConnection();
  14. assert connection != null;
  15. Channel channel = connection.createChannel();
  16. // 绑定交换机
  17. channel.exchangeDeclare("logs", "fanout");
  18. // 为单个 Customer 创建临时队列
  19. String queueName = channel.queueDeclare().getQueue();
  20. // 绑定交换机和队列
  21. // 参数1:队列名字;参数2:交换机名称;参数3:routingKey么啥意义
  22. channel.queueBind(queueName, "logs", "");
  23. // 消费消息
  24. channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
  25. @Override
  26. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
  27. System.out.println("消费者3:" + new String(body));
  28. }
  29. });
  30. }
  31. }

三、订阅模型 三种模式区别

1、RabbitMQ 消息订阅(Fanout)模式

RabbitMQ 消息订阅(Fanout)模式把交换机(Exchange)收到的消息发送给所有绑定了该交换机的队列,忽略路由(RoutingKey)。

这种模式下,消息会被所有消费者消费。也就是说,只要是”绑定”到某个交换机的队列,都会收到生产者发送到该交换机的消息。

2、RabbitMQ 路由(direct)模式

RabbitMQ 路由(direct)模式生产者发送信息时,需要指定一个路由(RoutingKey),交换机(Exchange)会根据路由将消息发送到绑定了此路由的队列中。

3、RabbitMQ 主题(topic)模式

在实际的运用中,广播模式(fanout)和路由模式(direct)虽然功能能支持一定场景,但是任然有一定的局限性,比如不能根据多重条件来进行路由选择。

发送给主题模式交换机的信息不能是任意设置的选择键,必须是用小数点隔开的一系列的标识符,标识符可以随意填充,但是一般是与某些特征相关联。

选择键不能超过 255 个字符。合法的键比如 “vip.user.order”,“common.user.pay”。 绑定键必须以相同的格式,特殊情况:“*” (星号)可以代替任意一个标识符;“#”(井号)可以代替0个或多个标识符。

发表评论

表情:
评论列表 (有 0 条评论,116人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RabbitMQ:发布订阅模式

    P:生产者,发送消息给交换机C:消费者,接收消息X:交换机,一方面接收生产者发送的消息,另一方面知道怎么处理消息,是否应将其附加到特定队列?是否应将其附加到多个队列中?或...

    相关 RabbitMQ: 订阅

    多个订阅者同时订阅一个队列时,消息交替发送给多个订阅者.每条消息只发送给一个订阅者,而不是每个订阅者发一份 使用Topic和Fanout交换器时,要先运行consumer,再

    相关 rabbitmq(5)订阅模式

    1、模型 一个生产者绑定一个交换机,每个消费者绑定一个队列。生产者将消息通过交换器分发给所有在线的消费者。 交换机没有消息存储的能力,只能向当前在线的消费者发送消息。

    相关 RabbitMQ订阅模式

    订阅者模式 一个生产者,多个消费者 每一个消费者,都有一个独立的队列 生产者没有将消息直接发送到队列,而是发送到了交换机 每个队列都要绑定到交换机 生产者发