RabbitMQ五种消息队列学习(五)--路由模式

野性酷女 2022-06-03 03:59 271阅读 0赞

RabbitMQ五种消息队列学习(五)–路由模式

标签(空格分隔): RabbitMQ


发布订阅模式只是利用路由这个功能,将消息全部分发给所有与路由绑定的队列中,不能对不同的消息进行选择性的分发。

模型结构

这里写图片描述

路由根据Routing Key进行动态的绑定分发
消息路径如下:
这里写图片描述

实现

1、生产者

发送一个key:channel.basicPublish(EXCHANGE_NAME, “delete”, null, message.getBytes());

  1. private final static String EXCHANGE_NAME = "test_exchange_direct";
  2. public static void main(String[] argv) throws Exception {
  3. // 获取到连接以及mq通道
  4. Connection connection = ConnectionUtil.getConnection();
  5. Channel channel = connection.createChannel();
  6. // 声明exchange
  7. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  8. // 消息内容
  9. String message = "id = 1000";
  10. channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
  11. System.out.println(" [x] Sent '" + message + "'");
  12. channel.close();
  13. connection.close();
  14. }

2、消费者1
绑定路由key

// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “update”);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “delete”);

  1. private final static String QUEUE_NAME = "test_queue_direct_1";
  2. private final static String EXCHANGE_NAME = "test_exchange_direct";
  3. public static void main(String[] argv) throws Exception {
  4. // 获取到连接以及mq通道
  5. Connection connection = ConnectionUtil.getConnection();
  6. Channel channel = connection.createChannel();
  7. // 声明队列
  8. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  9. // 绑定队列到交换机
  10. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
  11. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
  12. // 同一时刻服务器只会发一条消息给消费者
  13. channel.basicQos(1);
  14. // 定义队列的消费者
  15. QueueingConsumer consumer = new QueueingConsumer(channel);
  16. // 监听队列,手动返回完成
  17. channel.basicConsume(QUEUE_NAME, false, consumer);
  18. // 获取消息
  19. while (true) {
  20. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  21. String message = new String(delivery.getBody());
  22. System.out.println(" [x] Received '" + message + "'");
  23. Thread.sleep(10);
  24. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  25. }
  26. }

3、消费者2

  1. private final static String QUEUE_NAME = "test_queue_direct_2";
  2. private final static String EXCHANGE_NAME = "test_exchange_direct";
  3. public static void main(String[] argv) throws Exception {
  4. // 获取到连接以及mq通道
  5. Connection connection = ConnectionUtil.getConnection();
  6. Channel channel = connection.createChannel();
  7. // 声明队列
  8. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  9. // 绑定队列到交换机
  10. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
  11. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
  12. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
  13. // 同一时刻服务器只会发一条消息给消费者
  14. channel.basicQos(1);
  15. // 定义队列的消费者
  16. QueueingConsumer consumer = new QueueingConsumer(channel);
  17. // 监听队列,手动返回完成
  18. channel.basicConsume(QUEUE_NAME, false, consumer);
  19. // 获取消息
  20. while (true) {
  21. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  22. String message = new String(delivery.getBody());
  23. System.out.println(" [x] Received '" + message + "'");
  24. Thread.sleep(10);
  25. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  26. }
  27. }

测试

总结:

该模式可以实现对绑定在同一个路由器的地方实现不同的分发模式。那有没有更灵活的处理方式呢?—通配符

发表评论

表情:
评论列表 (有 0 条评论,271人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RabbitMQ实战教程()-模式

    1. 路由模式 > 跟订阅模式类似,只不过在订阅模式的基础上加上路由,订阅模式是分发到所有绑定到该交换机的队列,路由模式只分发到绑定在该交换机上面指定的路由键队列. !