RabbitMQ 最常用的三大模式

浅浅的花香味﹌ 2022-01-23 08:09 195阅读 0赞

Java开发交流 2019-06-02 16:00:42

Direct 模式

  • 所有发送到 Direct Exchange 的消息被转发到 RouteKey 中指定的 Queue。
  • Direct 模式可以使用 RabbitMQ 自带的 Exchange: default Exchange,所以不需要将 Exchange 进行任何绑定(binding)操作。
  • 消息传递时,RouteKey 必须完全匹配才会被队列接收,否则该消息会被抛弃,

RabbitMQ 最常用的三大模式

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. public class DirectProducer {
  5. public static void main(String[] args) throws Exception {
  6. //1. 创建一个 ConnectionFactory 并进行设置
  7. ConnectionFactory factory = new ConnectionFactory();
  8. factory.setHost("localhost");
  9. factory.setVirtualHost("/");
  10. factory.setUsername("guest");
  11. factory.setPassword("guest");
  12. //2. 通过连接工厂来创建连接
  13. Connection connection = factory.newConnection();
  14. //3. 通过 Connection 来创建 Channel
  15. Channel channel = connection.createChannel();
  16. //4. 声明
  17. String exchangeName = "test_direct_exchange";
  18. String routingKey = "item.direct";
  19. //5. 发送
  20. String msg = "this is direct msg";
  21. channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
  22. System.out.println("Send message : " + msg);
  23. //6. 关闭连接
  24. channel.close();
  25. connection.close();
  26. }
  27. }
  28. import com.rabbitmq.client.*;
  29. import java.io.IOException;
  30. public class DirectConsumer {
  31. public static void main(String[] args) throws Exception {
  32. //1. 创建一个 ConnectionFactory 并进行设置
  33. ConnectionFactory factory = new ConnectionFactory();
  34. factory.setHost("localhost");
  35. factory.setVirtualHost("/");
  36. factory.setUsername("guest");
  37. factory.setPassword("guest");
  38. factory.setAutomaticRecoveryEnabled(true);
  39. factory.setNetworkRecoveryInterval(3000);
  40. //2. 通过连接工厂来创建连接
  41. Connection connection = factory.newConnection();
  42. //3. 通过 Connection 来创建 Channel
  43. Channel channel = connection.createChannel();
  44. //4. 声明
  45. String exchangeName = "test_direct_exchange";
  46. String queueName = "test_direct_queue";
  47. String routingKey = "item.direct";
  48. channel.exchangeDeclare(exchangeName, "direct", true, false, null);
  49. channel.queueDeclare(queueName, false, false, false, null);
  50. //一般不用代码绑定,在管理界面手动绑定
  51. channel.queueBind(queueName, exchangeName, routingKey);
  52. //5. 创建消费者并接收消息
  53. Consumer consumer = new DefaultConsumer(channel) {
  54. @Override
  55. public void handleDelivery(String consumerTag, Envelope envelope,
  56. AMQP.BasicProperties properties, byte[] body)
  57. throws IOException {
  58. String message = new String(body, "UTF-8");
  59. System.out.println(" [x] Received '" + message + "'");
  60. }
  61. };
  62. //6. 设置 Channel 消费者绑定队列
  63. channel.basicConsume(queueName, true, consumer);
  64. }
  65. }
  66. Send message : this is direct msg
  67. [x] Received 'this is direct msg'

Topic 模式

可以使用通配符进行模糊匹配

  • 符号’#“ 匹配一个或多个词
  • 符号”*”匹配不多不少一个词

例如:

  • ‘log.#“能够匹配到’log.info.oa”
  • “log.*“只会匹配到”log.erro“

RabbitMQ 最常用的三大模式

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. public class TopicProducer {
  5. public static void main(String[] args) throws Exception {
  6. //1. 创建一个 ConnectionFactory 并进行设置
  7. ConnectionFactory factory = new ConnectionFactory();
  8. factory.setHost("localhost");
  9. factory.setVirtualHost("/");
  10. factory.setUsername("guest");
  11. factory.setPassword("guest");
  12. //2. 通过连接工厂来创建连接
  13. Connection connection = factory.newConnection();
  14. //3. 通过 Connection 来创建 Channel
  15. Channel channel = connection.createChannel();
  16. //4. 声明
  17. String exchangeName = "test_topic_exchange";
  18. String routingKey1 = "item.update";
  19. String routingKey2 = "item.delete";
  20. String routingKey3 = "user.add";
  21. //5. 发送
  22. String msg = "this is topic msg";
  23. channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
  24. channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
  25. channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
  26. System.out.println("Send message : " + msg);
  27. //6. 关闭连接
  28. channel.close();
  29. connection.close();
  30. }
  31. }
  32. import com.rabbitmq.client.*;
  33. import java.io.IOException;
  34. public class TopicConsumer {
  35. public static void main(String[] args) throws Exception {
  36. //1. 创建一个 ConnectionFactory 并进行设置
  37. ConnectionFactory factory = new ConnectionFactory();
  38. factory.setHost("localhost");
  39. factory.setVirtualHost("/");
  40. factory.setUsername("guest");
  41. factory.setPassword("guest");
  42. factory.setAutomaticRecoveryEnabled(true);
  43. factory.setNetworkRecoveryInterval(3000);
  44. //2. 通过连接工厂来创建连接
  45. Connection connection = factory.newConnection();
  46. //3. 通过 Connection 来创建 Channel
  47. Channel channel = connection.createChannel();
  48. //4. 声明
  49. String exchangeName = "test_topic_exchange";
  50. String queueName = "test_topic_queue";
  51. String routingKey = "item.#";
  52. channel.exchangeDeclare(exchangeName, "topic", true, false, null);
  53. channel.queueDeclare(queueName, false, false, false, null);
  54. //一般不用代码绑定,在管理界面手动绑定
  55. channel.queueBind(queueName, exchangeName, routingKey);
  56. //5. 创建消费者并接收消息
  57. Consumer consumer = new DefaultConsumer(channel) {
  58. @Override
  59. public void handleDelivery(String consumerTag, Envelope envelope,
  60. AMQP.BasicProperties properties, byte[] body)
  61. throws IOException {
  62. String message = new String(body, "UTF-8");
  63. System.out.println(" [x] Received '" + message + "'");
  64. }
  65. };
  66. //6. 设置 Channel 消费者绑定队列
  67. channel.basicConsume(queueName, true, consumer);
  68. }
  69. }
  70. Send message : this is topc msg
  71. [x] Received 'this is topc msg'
  72. [x] Received 'this is topc msg'

Fanout 模式

不处理路由键,只需要简单的将队列绑定到交换机上发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。

Fanout交换机转发消息是最快的。

RabbitMQ 最常用的三大模式

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. public class FanoutConsumer {
  4. public static void main(String[] args) throws Exception {
  5. //1. 创建一个 ConnectionFactory 并进行设置
  6. ConnectionFactory factory = new ConnectionFactory();
  7. factory.setHost("localhost");
  8. factory.setVirtualHost("/");
  9. factory.setUsername("guest");
  10. factory.setPassword("guest");
  11. factory.setAutomaticRecoveryEnabled(true);
  12. factory.setNetworkRecoveryInterval(3000);
  13. //2. 通过连接工厂来创建连接
  14. Connection connection = factory.newConnection();
  15. //3. 通过 Connection 来创建 Channel
  16. Channel channel = connection.createChannel();
  17. //4. 声明
  18. String exchangeName = "test_fanout_exchange";
  19. String queueName = "test_fanout_queue";
  20. String routingKey = "item.#";
  21. channel.exchangeDeclare(exchangeName, "fanout", true, false, null);
  22. channel.queueDeclare(queueName, false, false, false, null);
  23. //一般不用代码绑定,在管理界面手动绑定
  24. channel.queueBind(queueName, exchangeName, routingKey);
  25. //5. 创建消费者并接收消息
  26. Consumer consumer = new DefaultConsumer(channel) {
  27. @Override
  28. public void handleDelivery(String consumerTag, Envelope envelope,
  29. AMQP.BasicProperties properties, byte[] body)
  30. throws IOException {
  31. String message = new String(body, "UTF-8");
  32. System.out.println(" [x] Received '" + message + "'");
  33. }
  34. };
  35. //6. 设置 Channel 消费者绑定队列
  36. channel.basicConsume(queueName, true, consumer);
  37. }
  38. }
  39. import com.rabbitmq.client.Channel;
  40. import com.rabbitmq.client.Connection;
  41. import com.rabbitmq.client.ConnectionFactory;
  42. public class FanoutProducer {
  43. public static void main(String[] args) throws Exception {
  44. //1. 创建一个 ConnectionFactory 并进行设置
  45. ConnectionFactory factory = new ConnectionFactory();
  46. factory.setHost("localhost");
  47. factory.setVirtualHost("/");
  48. factory.setUsername("guest");
  49. factory.setPassword("guest");
  50. //2. 通过连接工厂来创建连接
  51. Connection connection = factory.newConnection();
  52. //3. 通过 Connection 来创建 Channel
  53. Channel channel = connection.createChannel();
  54. //4. 声明
  55. String exchangeName = "test_fanout_exchange";
  56. String routingKey1 = "item.update";
  57. String routingKey2 = "";
  58. String routingKey3 = "ookjkjjkhjhk";//任意routingkey
  59. //5. 发送
  60. String msg = "this is fanout msg";
  61. channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
  62. channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
  63. channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
  64. System.out.println("Send message : " + msg);
  65. //6. 关闭连接
  66. channel.close();
  67. connection.close();
  68. }
  69. }
  70. Send message : this is fanout msg
  71. [x] Received 'this is fanout msg'
  72. [x] Received 'this is fanout msg'
  73. [x] Received 'this is fanout msg'

发表评论

表情:
评论列表 (有 0 条评论,195人围观)

还没有评论,来说两句吧...

相关阅读

    相关 常用设计模式

    设计模式是对设计原则的具体化。用江湖话说就是武林秘籍,总结出来的一些固定套路,可以帮助有根基的程序员迅速打通任督二脉,从此做什么都特别快。常用的模式及其场景如下。 1) 单例