RabbitMQ订阅者模式

ゝ一纸荒年。 2022-05-16 04:39 312阅读 0赞

订阅者模式

一个生产者,多个消费者
每一个消费者,都有一个独立的队列
生产者没有将消息直接发送到队列,而是发送到了交换机

每个队列都要绑定到交换机
生产者发送的消息,经过交换机,到达队列
实现,一个消息被多个消费者获取的目的

注意
消息发送到没有队列绑定的交换机时,消息将丢失
因为,交换机没有存储消息的能力,消息只能存在在队列中
这里写图片描述

Send

生产者

  1. package cn.itcast.rabbitmq.ps;
  2. import cn.itcast.rabbitmq.util.ConnectionUtil;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. public class Send {
  6. private final static String EXCHANGE_NAME = "test_exchange_fanout";
  7. public static void main(String[] argv) throws Exception {
  8. // 获取到连接以及mq通道
  9. Connection connection = ConnectionUtil.getConnection();
  10. Channel channel = connection.createChannel();
  11. // 声明exchange
  12. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  13. // 消息内容
  14. String message = "消息已经修改,商品id=1000";
  15. channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
  16. System.out.println(" [x] Sent '" + message + "'");
  17. channel.close();
  18. connection.close();
  19. }
  20. }

Recv

消费者1

  1. package cn.itcast.rabbitmq.ps;
  2. import cn.itcast.rabbitmq.util.ConnectionUtil;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.QueueingConsumer;
  6. public class Recv {
  7. private final static String QUEUE_NAME = "test_queue_work";
  8. private final static String EXCHANGE_NAME = "test_exchange_fanout";
  9. public static void main(String[] argv) throws Exception {
  10. // 获取到连接以及mq通道
  11. Connection connection = ConnectionUtil.getConnection();
  12. Channel channel = connection.createChannel();
  13. // 声明队列
  14. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  15. // 绑定队列到交换机
  16. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
  17. // 同一时刻服务器只会发一条消息给消费者
  18. channel.basicQos(1);
  19. // 定义队列的消费者
  20. QueueingConsumer consumer = new QueueingConsumer(channel);
  21. // 监听队列,手动返回完成
  22. channel.basicConsume(QUEUE_NAME, false, consumer);
  23. // 获取消息
  24. while (true) {
  25. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  26. String message = new String(delivery.getBody());
  27. System.out.println(" [x] Received '" + message + "'");
  28. Thread.sleep(10);
  29. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  30. }
  31. }
  32. }

Recv2

消费者2

  1. package cn.itcast.rabbitmq.ps;
  2. import cn.itcast.rabbitmq.util.ConnectionUtil;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.QueueingConsumer;
  6. public class Recv2 {
  7. private final static String QUEUE_NAME = "test_queue_work2";
  8. private final static String EXCHANGE_NAME = "test_exchange_fanout";
  9. public static void main(String[] argv) throws Exception {
  10. // 获取到连接以及mq通道
  11. Connection connection = ConnectionUtil.getConnection();
  12. Channel channel = connection.createChannel();
  13. // 声明队列
  14. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  15. // 绑定队列到交换机
  16. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
  17. // 同一时刻服务器只会发一条消息给消费者
  18. channel.basicQos(1);
  19. // 定义队列的消费者
  20. QueueingConsumer consumer = new QueueingConsumer(channel);
  21. // 监听队列,手动返回完成
  22. channel.basicConsume(QUEUE_NAME, false, consumer);
  23. // 获取消息
  24. while (true) {
  25. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  26. String message = new String(delivery.getBody());
  27. System.out.println(" [x] Received '" + message + "'");
  28. Thread.sleep(10);
  29. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  30. }
  31. }
  32. }

启动发送者

这里写图片描述
此时
没有绑定队列
这里写图片描述
把消息发送到,没有绑定队列的交换机时
消息将会丢失

启动两个消费者

这里写图片描述

测试

生产者,发送消息
消费者1,拿到了消息
这里写图片描述
消费者2,拿到了消息
这里写图片描述
同一个消息
可以被多个消费者获取

发表评论

表情:
评论列表 (有 0 条评论,312人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RabbitMQ:发布订阅模式

    P:生产者,发送消息给交换机C:消费者,接收消息X:交换机,一方面接收生产者发送的消息,另一方面知道怎么处理消息,是否应将其附加到特定队列?是否应将其附加到多个队列中?或...

    相关 发布订阅模式

    使用售楼处与买楼的一个例子太好理解了: > eg: 买房的人(订阅者)和售楼部(发布者): 在房子开售的时候,售楼部发布可以买房的信息,买房的人收到可以买房的信息。 发

    相关 rabbitmq(5)订阅模式

    1、模型 一个生产者绑定一个交换机,每个消费者绑定一个队列。生产者将消息通过交换器分发给所有在线的消费者。 交换机没有消息存储的能力,只能向当前在线的消费者发送消息。

    相关 RabbitMQ订阅模式

    订阅者模式 一个生产者,多个消费者 每一个消费者,都有一个独立的队列 生产者没有将消息直接发送到队列,而是发送到了交换机 每个队列都要绑定到交换机 生产者发

    相关 发布订阅模式(观察模式

    设计模式的目的就是使类成为可复用的组件。 在观察者模式中观察者接口只注重被观察者,而被观察者接口只注重观察者,具体是观察者接口实现类中的哪一个并不在意,而被观察者也是如此。这