RabbitMQ(五):路由模式

爱被打了一巴掌 2022-05-23 04:18 304阅读 0赞

一、路由模式

官方内容参考:http://www.rabbitmq.com/tutorials/tutorial-four-java.html

跟订阅模式类似,只不过在订阅模式的基础上加上了类型,订阅模式是分发到所有绑定到交换机的队列,路由模式只分发到绑定在交换机上面指定路由键的队列。

image

二、direct交换机

生产者申明一个direct类型交换机,然后发送消息到这个交换机指定路由键。
消费者指定消费这个交换机的这个路由键,即可接收到消息,其他消费者收不到。

上章节中的生产者:

  1. channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

第二个参数就是路由键

消费者:

  1. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

第三个参数就是路由键

三、代码演示

同样的,只是交换机类型改为direct,加了个路由键而已。

这里按图演示两个,即表示一个日志事件,根据日志类型进行处理。

连接RabbitMQ工具类

  1. package cn.saytime.rabbitmq.util;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. /** * RabbitMQ连接工具类 */
  7. public class ConnectionUtil {
  8. private static final String host = "192.168.239.128";
  9. private static final int port = 5672;
  10. /** * 获取RabbitMQ Connection连接 * @return * @throws IOException * @throws TimeoutException */
  11. public static Connection getConnection() throws IOException, TimeoutException {
  12. ConnectionFactory connectionFactory = new ConnectionFactory();
  13. connectionFactory.setHost(host);
  14. connectionFactory.setPort(port);
  15. // connectionFactory.setUsername("test");
  16. // connectionFactory.setPassword("123456");
  17. // connectionFactory.setVirtualHost("/vhost_test");
  18. return connectionFactory.newConnection();
  19. }
  20. }

如上所示,如果配置有用户名密码以及vhost,则配置即可。

生产者
  1. package cn.saytime.rabbitmq.routing;
  2. import cn.saytime.rabbitmq.util.ConnectionUtil;
  3. import com.rabbitmq.client.BuiltinExchangeType;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.Connection;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. /** * 生产者 */
  9. public class Send {
  10. private static final String EXCHANGE_NAME = "test_exchange_fanout";
  11. public static void main(String[] args) throws IOException, TimeoutException {
  12. // 获取连接
  13. Connection connection = ConnectionUtil.getConnection();
  14. // 从连接开一个通道
  15. Channel channel = connection.createChannel();
  16. // 声明一个direct路由交换机
  17. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  18. // 发送info路由键消息
  19. String infoMessage = "hello, info";
  20. channel.basicPublish(EXCHANGE_NAME, "info", null, infoMessage.getBytes());
  21. System.out.println(" [x] Sent routing info message : '" + infoMessage + "'");
  22. // 发送error路由键消息
  23. String errorMessage = "hello, error";
  24. channel.basicPublish(EXCHANGE_NAME, "error", null, errorMessage.getBytes());
  25. System.out.println(" [x] Sent routing error message : '" + errorMessage + "'");
  26. channel.close();
  27. connection.close();
  28. }
  29. }

info日志消费者

  1. package cn.saytime.rabbitmq.routing;
  2. import cn.saytime.rabbitmq.util.ConnectionUtil;
  3. import com.rabbitmq.client.*;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. /** * info日志消费者 */
  7. public class Recv {
  8. // info日志队列
  9. private static final String QUEUE_NAME = "test_queue_routing_info";
  10. private static final String EXCHANGE_NAME = "test_exchange_routing";
  11. public static void main(String[] args) throws IOException, TimeoutException {
  12. // 获取连接
  13. Connection connection = ConnectionUtil.getConnection();
  14. // 打开通道
  15. Channel channel = connection.createChannel();
  16. // 申明要消费的队列
  17. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  18. // 绑定队列到交换机
  19. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
  20. // 这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。
  21. channel.basicQos(1);
  22. // 创建一个回调的消费者处理类
  23. Consumer consumer = new DefaultConsumer(channel) {
  24. @Override
  25. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  26. // 接收到的消息
  27. String message = new String(body);
  28. System.out.println(" [1] Received '" + message + "'");
  29. try {
  30. Thread.sleep(1000);
  31. } catch (InterruptedException e) {
  32. e.printStackTrace();
  33. } finally {
  34. System.out.println(" [1] done ");
  35. channel.basicAck(envelope.getDeliveryTag(), false);
  36. }
  37. }
  38. };
  39. // 消费消息
  40. channel.basicConsume(QUEUE_NAME, false, consumer);
  41. }
  42. }
error日志消费者
  1. package cn.saytime.rabbitmq.routing;
  2. import cn.saytime.rabbitmq.util.ConnectionUtil;
  3. import com.rabbitmq.client.*;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. /** * error日志消费者 */
  7. public class Recv2 {
  8. // error日志队列
  9. private static final String QUEUE_NAME = "test_queue_routing_error";
  10. private static final String EXCHANGE_NAME = "test_exchange_routing";
  11. public static void main(String[] args) throws IOException, TimeoutException {
  12. // 获取连接
  13. Connection connection = ConnectionUtil.getConnection();
  14. // 打开通道
  15. Channel channel = connection.createChannel();
  16. // 申明要消费的队列
  17. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  18. // 绑定队列到交换机
  19. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
  20. // 这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。
  21. channel.basicQos(1);
  22. // 创建一个回调的消费者处理类
  23. Consumer consumer = new DefaultConsumer(channel) {
  24. @Override
  25. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  26. // 接收到的消息
  27. String message = new String(body);
  28. System.out.println(" [2] Received '" + message + "'");
  29. try {
  30. Thread.sleep(1000);
  31. } catch (InterruptedException e) {
  32. e.printStackTrace();
  33. } finally {
  34. System.out.println(" [2] done ");
  35. channel.basicAck(envelope.getDeliveryTag(), false);
  36. }
  37. }
  38. };
  39. // 消费消息
  40. channel.basicConsume(QUEUE_NAME, false, consumer);
  41. }
  42. }

四、测试结果

提前在管理控制台创建一个direct交换机,或者先执行一次生产者(执行时会判断交换机是否存在,不存在则创建交换机),这样保证交换机存在,不然直接启动消费者会提示交换机不存在。

这里写图片描述

注意点

如果在没有队列绑定在交换机上面时,往交换机发送消息会丢失,之后绑定在交换机上面的队列接收不到之前的消息,也就是先执行第一次发送,创建了交换机,但是还没有队列绑定在交换机上面,如果这次发送的消息就会丢失。

然后启动两个消费者,再执行生产者。

Send

  1. [x] Sent routing info message : 'hello, info'
  2. [x] Sent routing error message : 'hello, error'

Recv

  1. [1] Received 'hello, info'
  2. [1] done

Recv2

  1. [2] Received 'hello, error'
  2. [2] done

我们可以看到生产者往info路由键发送消息时,只有执行消费info路由键的消费者才能接收到消息,error路由键同样。

五、多绑定情况

1. 同一个消费者绑定队列多个路由键
  1. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
  2. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
  3. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warn");

如果一个消费者绑定了这三个路由键,那么当生产者发送其中一个路由键消息时,该消费者都能接收到消息。

2. 多个消费者绑定相同路由键

即消费者1绑定info,消费者2 绑定info, error

那么生产者发送info路由键消息时,消费者1, 2都能接收到消息,发送error路由键消息时,只有消费者2能接收到消息。

发表评论

表情:
评论列表 (有 0 条评论,304人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RabbitMQ:模式

    在路由工作模式中,我们需要配置一个类型为direct的交换机,并且需要指定不同的路由键(routing key),把对应的消息从交换机路由到不同的消息队列进行存储,由消费...

    相关 rabbitmq(6)模式

    1、模型 路由模式为升级版的订阅模式,增加了消费者选择性接收消息的功能。 每个消费者可以绑定多个routingKey,生产者在发送消息时指定routingKey致使只

    相关 RabbitMQ模式

    路由模式 一个生产者,发送消息 每个消费者,都有一个独立的队列 消息发送到交换机,交换机发送到每个队列 根据key,是否相等,来接收消息 ![这里写图片描述

    相关 RabbitMQ实战教程()-模式

    1. 路由模式 > 跟订阅模式类似,只不过在订阅模式的基础上加上路由,订阅模式是分发到所有绑定到该交换机的队列,路由模式只分发到绑定在该交换机上面指定的路由键队列. !