RabbitMQ基本概念及使用

朱雀 2022-11-15 11:38 362阅读 0赞

RabbitMQ

文章目录

  • RabbitMQ
    • 一、MQ的介绍
        1. MQ的基本概念
        1. MQ的优势和劣势
        • 1). 优势
        • 2). 劣势
        • 3). 何时使用MQ
        1. 常见的MQ产品
    • 二、RabbitMQ的介绍
        1. AMQP介绍
        1. RabbitMQ基础架构
        1. RabbitMQ架构图中概念介绍
        1. RabbitMQ工作模式简介
        1. JMS介绍
        1. 小结
    • 三、RabbitMQ的安装和配置
        1. 直接安装
        1. 使用Docker安装
        1. 配置虚拟主机及用户
        • 3.1. 用户角色
        • 3.2. Virtual Hosts配置
          • 3.2.1. 创建Virtual Hosts
          • 3.2.2. 设置Virtual Hosts权限
    • 四、RabbitMQ快速入门
        1. 简单模式的介绍
        1. 代码实现
    • 五、RabbitMQ工作模式的使用
        1. 工作队列模式
        1. 发布/订阅模式
        1. 路由模式
        1. 通配符模式

一、MQ的介绍

1. MQ的基本概念

MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。

没有使用MQ的通信方式:

在这里插入图片描述

使用了MQ之后的通信方式:

在这里插入图片描述

小结:

  • MQ,消息队列,存储消息的中间件
  • 分布式系统通信两种方式:直接远程调用 和 借助第三方 完成间接通信
  • 发送方称为生产者,接收方称为消费者

2. MQ的优势和劣势

1). 优势

应用解耦、异步提速、削峰填谷

  • 应用解耦

    没有使用MQ的情况:

    在这里插入图片描述

    • 系统耦合度高,库存系统出问题会导致上层的订单系统出问题,对整个系统都有影响
    • 如果要扩展一个X系统,还需修改订单系统
    • 系统的耦合性越高,容错性就越低,可维护性就越低

    使用了MQ之后:

    在这里插入图片描述

    • 用户发送消息给订单系统,订单系统将消息发送给消息队列,此时不等下层系统是否执行操作,直接将返回值返回给用户
    • 下层系统从消息队列中取出消息执行相关的操作,即使其中某一功能出现故障也不影响其余功能的实现
    • 使用 MQ 使得应用间解耦,提升容错性和可维护性
  • 异步提速

    没有使用MQ的情况:

    在这里插入图片描述

    使用了MQ之后:

    在这里插入图片描述

  • 削峰填谷

    没有使用MQ的情况:

在这里插入图片描述

使用了MQ之后:

在这里插入图片描述

填谷的概念:

在这里插入图片描述

使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”

小结:

  • 应用解耦:提高系统容错性和可维护性
  • 异步提速:提升用户体验和系统吞吐量
  • 削峰填谷:提高系统稳定性

2). 劣势

  • 系统可用性降低

    • 系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响
  • 系统复杂度提高

    • MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用
  • 一致性问题

    • A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败,消息数据处理不一致

3). 何时使用MQ

  • 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能
  • 容许短暂的不一致性。上层系统返回之后,下层系统还需要从MQ中读取消息进行处理,无法保证完全一致
  • 确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本

3. 常见的MQ产品

在这里插入图片描述

二、RabbitMQ的介绍

1. AMQP介绍

AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。

在这里插入图片描述

2. RabbitMQ基础架构

在这里插入图片描述

Connection中使用管道进行通信,并不需要每次都建立Connection,节约资源

3. RabbitMQ架构图中概念介绍

  • Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
  • Connection:publisher/consumer 和 broker 之间的 TCP 连接
  • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
  • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:消息最终被送到这里等待 consumer 取走
  • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

4. RabbitMQ工作模式简介

RabbitMQ 提供了 6 种工作模式:简单模式、work queues 工作队列模式、Publish/Subscribe 发布/订阅模式、Routing 路由模式、Topics 主题模式(通配符模式)、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)

官网对应模式介绍:https://www.rabbitmq.com/getstarted.html

在这里插入图片描述

5. JMS介绍

  • JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件的API
  • JMS 是 JavaEE 规范中的一种,类比JDBC
  • 很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ 官方没有提供 JMS 的实现包,但是开源社区有

6. 小结

  • RabbitMQ 是基于 AMQP 协议使用 Erlang 语言开发的一款消息队列产品。
  • RabbitMQ提供了6种工作模式,但只学习5种。
  • AMQP 是协议,类比HTTP。
  • JMS 是 API 规范接口,类比 JDBC。
  • 即AMQP与JMS不是一种东西,JMS是接口、API;AMQP是一种协议

三、RabbitMQ的安装和配置

1. 直接安装

详见博客 / 文档 RabbitMQ的安装.pdf

2. 使用Docker安装

详见博客 / 文档 Docker安装RabbitMQ.pdf

3. 配置虚拟主机及用户

3.1. 用户角色

RabbitMQ在安装好后,可以访问http://ip地址:15672 进入RabbitMQ的管理界面;其自带了guest/guest的用户名和密码;如果需要创建自定义用户;那么也可以登录管理界面后,进行如下操作:

在这里插入图片描述

“/”并不表示所有虚拟机,“/”只是一个默认的虚拟机

角色说明

1、 超级管理员(administrator)

可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

2、 监控者(monitoring)

可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

3、 策略制定者(policymaker)

可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

4、 普通管理者(management)

仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

5、 其他

无法登陆管理控制台,通常就是普通的生产者和消费者。

3.2. Virtual Hosts配置

mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。RabbitMQ也有类似的权限管理;在RabbitMQ中可以虚拟消息服务器Virtual Host,每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通 (相当于mysql的db)

Virtual Hosts 的名称一般以 “/” 开头

3.2.1. 创建Virtual Hosts

在这里插入图片描述

3.2.2. 设置Virtual Hosts权限

在这里插入图片描述

四、RabbitMQ快速入门

1. 简单模式的介绍

在这里插入图片描述

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接收者,会一直等待消息到来
  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

2. 代码实现

需求:使用简单模式完成消息传递

步骤:

  • 创建工程(生产者、消费者)
  • 分别添加依赖
  • 编写生产者发送消息
  • 编写消费者接收消息

代码实现:

  1. 创建工程(生产者、消费者)

    在这里插入图片描述

  2. 分别添加依赖




    com.rabbitmq
    amqp-client
    5.6.0




    org.slf4j
    slf4j-simple
    1.7.25
    compile




    org.apache.maven.plugins
    maven-compiler-plugin
    3.8.0

    1.8
    1.8



  3. 编写生产者发送消息

    public class Producer_HelloWorld {

    1. public static void main(String[] args) throws IOException, TimeoutException {
    2. //1. 创建连接工厂
    3. ConnectionFactory factory = new ConnectionFactory();
    4. //2. 设置参数
    5. factory.setHost("172.16.98.133"); //安装MQ的ip地址值,默认localhost
    6. factory.setPort(5672); //RabbitMQ的默认端口
    7. factory.setVirtualHost("/itcast"); //之前创建好的虚拟机,默认为/
    8. factory.setUsername("heima"); //之前创建的用户名,默认为guest
    9. factory.setPassword("heima"); //之前创建的密码,默认为guest
    10. //3. 创建连接
    11. Connection connection = factory.newConnection();
    12. //4. 创建Channel
    13. Channel channel = connection.createChannel();
    14. //与RabbitMQ Server的交互都是通过channel完成
    15. //简单模式下不用自定义交换机,使用默认的即可
    16. //5. 创建队列Queue,使用channel.queueDeclare方法
    17. //队列是由生产者创建的,消费者从队列中读取消息
    18. /** queueDeclare方法的介绍: queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 参数: 1. queue:队列名称 2. durable: 是否持久化到硬盘,当mq重启之后队列还存在 3. exclusive: * 功能一:是否独占,即只能有一个消费者监听此队列 * 功能二:当Connection关闭时,是否删除队列 4. autoDelete: 是否自动删除;最后一个消费者和该队列断开连接后,自动删除队列,必须是与消费者连接之后才能删除 5. arguments:参数信息,暂不配置 */
    19. channel.queueDeclare("hello_world", true, false, false, null);
    20. //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
    21. //6. 发送消息,使用channel.basicPublish方法
    22. /** basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) 参数: 1. exchange:交换机名称;简单模式下交换机使用默认的"" 2. routingKey:路由名称;默认交换机的情况,路由名称只要与队列相同就可以绑定至队列 3. props:配置信息,暂不配置 4. body:发送的消息数据;字节数组 */
    23. //定义发送的消息
    24. String body = "hello rabbitmq~~~";
    25. //发送消息
    26. channel.basicPublish("", "hello_world", null, body.getBytes());
    27. //7.释放资源
    28. channel.close();
    29. connection.close();
    30. }

    }

运行结果:

在这里插入图片描述

  1. 编写消费者接收消息

    public class Consumer_HelloWorld {

    1. public static void main(String[] args) throws IOException, TimeoutException {
    2. //消费者和生产者都有自己的Connection和Channel
    3. //1. 创建连接工厂
    4. ConnectionFactory factory = new ConnectionFactory();
    5. //2. 设置参数
    6. factory.setHost("172.16.98.133");
    7. factory.setPort(5672);
    8. factory.setVirtualHost("/itcast");
    9. factory.setUsername("heima");
    10. factory.setPassword("heima");
    11. //3. 创建连接
    12. Connection connection = factory.newConnection();
    13. //4. 创建Channel
    14. Channel channel = connection.createChannel();
    15. //生产者已经创建了队列,故消费者无需再创建队列,从队列中取出数据即可
    16. //但也可以写和生产者相同的声明队列,如下
    17. //channel.queueDeclare("hello_world", true, false, false, null);
    18. //5. 创建回调对象(消费者),作为参数传递到第6步的方法中
    19. Consumer consumer = new DefaultConsumer(channel) {
    20. /* 重写回调方法handleDelivery:当消费者从队列中读取消息后,会自动执行该方法 1. consumerTag:标识 2. envelope:获取一些信息,交换机,路由key等 3. properties: 配置信息,生产者并未定义任何配置信息,故为null 4. body:消费者读取到的消息 */
    21. @Override
    22. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    23. System.out.println("consumerTag:" + consumerTag);
    24. System.out.println("Exchange:" + envelope.getExchange());
    25. System.out.println("RoutingKey:" + envelope.getRoutingKey());
    26. System.out.println("properties:" + properties);
    27. System.out.println("body:" + new String(body));
    28. }
    29. };
    30. //6. 消费者取出数据
    31. /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck:是否自动确认,消费者收到消息之后自动的向MQ通知已收到消息 3. callback:回调对象,读取到消息后可以自动执行第5步的回调方法 */
    32. channel.basicConsume("hello_world", true, consumer);
    33. //消费者处于监听状态,不能关闭资源
    34. }

    }

运行结果:

一旦启动了消费者,就会自动的从指定的消息队列中读取消息

不要停止消费者程序,再次执行生产者程序往消息队列中添加消息,则消费者自动的又进行了一次输出

在这里插入图片描述

管理界面的运行结果:

在这里插入图片描述

五、RabbitMQ工作模式的使用

1. 工作队列模式

  • 示意图

    在这里插入图片描述

  • 与入门程序的简单模式相比,多了一个或一些消费者,多个消费端共同消费同一个队列中的消息

    • 这些消费者处于竞争关系,一条消息只能被一个消费者取出
    • 消费者是顺序读取消息的,C1取一条然后C2取一条…依次类推
  • 应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度
  • 代码实现:与简单模式几乎一模一样,只需要多定义几个消费者类即可完成(不再演示)

2. 发布/订阅模式

  • 示意图

    在这里插入图片描述

  • 与工作队列模式相比

    • 每个消费者监听自己的队列,而不再是共享一个队列
  • 添加了一个 Exchange 角色,即交换机

    • 生产者将消息不再发送到队列中,而是发给 X(交换机)
    • 对于交换机,一方面,接收生产者发送的消息;另一方面,决定如何处理消息,例如递交消息给某个特定的队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型;Exchange常见有以下4种类型:

      • Fanout:广播,把消息交给所有绑定到交换机的队列(一条消息可以被多个消费者读取)
      • Direct:定向,把消息交给特定的一个队列(一条消息只可以被一个消费者读取)
      • Topic:通配符,把消息交给符合 routing pattern(路由模式) 的队列
      • HEADERS:参数匹配,使用较少
  • Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失
  • 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)

代码实现

  1. 生产者代码

    1. public class Producer_PubSub {
    2. public static void main(String[] args) throws IOException, TimeoutException {
    3. //1.创建连接工厂
    4. ConnectionFactory factory = new ConnectionFactory();
    5. //2. 设置参数
    6. factory.setHost("172.16.98.133");
    7. factory.setPort(5672);
    8. factory.setVirtualHost("/itcast");
    9. factory.setUsername("heima");
    10. factory.setPassword("heima");
    11. //3. 创建连接 Connection
    12. Connection connection = factory.newConnection();
    13. //4. 创建Channel
    14. Channel channel = connection.createChannel();
    15. //5. 创建交换机,调用channel.exchangeDeclare(),方法对应的参数列表如下
    16. /* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) 参数: 1. exchange:自定义交换机名称 2. type:交换机类型(枚举类型) DIRECT("direct"):定向 FANOUT("fanout"):扇形(广播),发送消息到所有与之绑定队列 TOPIC("topic"):通配符的方式 HEADERS("headers"):参数匹配 3. durable:是否持久化 4. autoDelete:是否自动删除 5. internal:是否内部使用,一般为false 6. arguments:参数,此处设置为null */
    17. // 定义交换机名称
    18. String exchangeName = "test_fanout";
    19. // 创建交换机,类型为广播类型
    20. channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
    21. //6. 创建两个队列,调用channel.queueDeclare()
    22. String queue1Name = "test_fanout_queue1";
    23. String queue2Name = "test_fanout_queue2";
    24. channel.queueDeclare(queue1Name, true, false, false, null);
    25. channel.queueDeclare(queue2Name, true, false, false, null);
    26. //7. 将两个队列绑定到交换机,调用channel.queueBind(),方法参数如下:
    27. /* queueBind(String queue, String exchange, String routingKey) 参数: 1. queue:队列名称 2. exchange:交换机名称 3. routingKey:路由键(绑定规则) 如果交换机的类型为fanout,则routingKey设置为"",表示转发消息到所有绑定的队列 */
    28. channel.queueBind(queue1Name, exchangeName, "");
    29. channel.queueBind(queue2Name, exchangeName, "");
    30. //8. 发送消息
    31. String body = "发送消息啦"; //定义消息的内容
    32. channel.basicPublish(exchangeName, "", null, body.getBytes());
    33. //9. 释放资源
    34. channel.close();
    35. connection.close();
    36. }
    37. }

    运行结果:

    在这里插入图片描述

    两个队列中都存在一条消息

  2. 消费者代码

    消费者1:channel.basicConsume("test_fanout_queue1", true, consumer);

    消费者2:channel.basicConsume("test_fanout_queue2", true, consumer);

    运行结果:

    ​ 两个消费者都获取到 “发送消息啦” 的消息

3. 路由模式

  • 示意图

    在这里插入图片描述

  • 队列与交换机绑定时需要指定一个RoutingKey
  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的RoutingKey
  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 一致时,才会将消息转发到此队列
  • 此时交换机的类型必须是 Direct(定向)

代码实现

  1. 生产者代码

    1. // 定义Connection、channel等代码与之前一致,省略
    2. // 自定义交换机名称
    3. String exchangeName = "test_direct";
    4. // 创建交换机,类型为DIRECT
    5. channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);
    6. // 创建队列
    7. String queue1Name = "test_direct_queue1";
    8. String queue2Name = "test_direct_queue2";
    9. channel.queueDeclare(queue1Name, true, false, false, null);
    10. channel.queueDeclare(queue2Name, true, false, false, null);
    11. // 绑定队列和交换机,指定队列的RoutingKey
    12. // 队列1绑定error
    13. channel.queueBind(queue1Name, exchangeName, "error");
    14. // 队列2绑定info、error、warning
    15. channel.queueBind(queue2Name, exchangeName, "info");
    16. channel.queueBind(queue2Name, exchangeName, "error");
    17. channel.queueBind(queue2Name, exchangeName, "warning");
    18. // 发送消息
    19. String body = "发送消息啦";
    20. // 定义消息的RoutingKey
    21. channel.basicPublish(exchangeName, "error", null, body.getBytes());
    22. // 此消息会被发送到两个队列中
    23. // channel.basicPublish(exchangeName, "info", null, body.getBytes());
    24. // 此消息仅会转发至队列2
    25. // 释放资源
    26. channel.close();
    27. connection.close();
  2. 消费者代码,与之前一致,自行编写

4. 通配符模式

  • 示意图

    在这里插入图片描述

  • 与路由模式类似,都是可以根据 RoutingKey 把消息路由到不同的队列
  • 通配符模式的优点是可以在指定队列的 Routing key 时使用通配符
  • Routingkey 一般由一个或多个单词组成,多个单词之间以 ”.” 分割,例如: item.insert
  • 通配符规则

    • # 匹配零个或多个单词
    • * 匹配恰好一个单词
    • 例如:item.# 能够匹配 item.insert.abc 或者 item.insert;item.* 只能匹配 item.insert
  • 路由规则图示

    在这里插入图片描述

  • 此时交换机的类型必须是 Topic(通配符)

代码实现

  1. 生产者代码

    1. // 创建交换机,类型为TOPIC
    2. String exchangeName = "test_topic";
    3. channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);
    4. // 创建队列
    5. String queue1Name = "test_topic_queue1";
    6. String queue2Name = "test_topic_queue2";
    7. channel.queueDeclare(queue1Name, true, false, false, null);
    8. channel.queueDeclare(queue2Name, true, false, false, null);
    9. // 绑定队列和交换机,定义队列的通配符
    10. channel.queueBind(queue1Name,exchangeName, "#.error");
    11. channel.queueBind(queue1Name,exchangeName, "order.*");
    12. channel.queueBind(queue2Name,exchangeName, "*.*");
    13. // 发送消息
    14. String body = "发送消息啦";
    15. channel.basicPublish(exchangeName, "goods.error", null, body.getBytes());
    16. // 此消息将会被转发至两个队列
    17. // 释放资源
    18. channel.close();
    19. connection.close();
  2. 消费者代码,与之前一致,自行编写

发表评论

表情:
评论列表 (有 0 条评论,362人围观)

还没有评论,来说两句吧...

相关阅读