rabbitmq基础知识介绍及总结 Dear 丶 2021-10-01 05:02 312阅读 0赞 **Table of Contents** 简介 rabbitmq当中的术语 rabbitmq hello world/queue Sender解析 Receiver解析 rabbitmq轮询调度机制 消息确认 消息持久性 公平派遣 Publish/Subscribe Exchanges 临时队列Temporary queues Bindings Routing Direct exchange Multiple bindings Topics -------------------- # 简介 # RabbitMQ是一个message broker:它接收和转发消息。 我们可以将其视为一个邮局:当我们把将要发出的邮件放在邮箱中时,我们可以确定邮差会最终将邮件发送给正确的收件人。 在这个类比当中,RabbitMQ扮演的角色是一个邮箱,邮局和邮递员。 RabbitMQ和邮局之间的主要区别在于它不处理纸张,而是接受,存储和转发二进制数据 -> 即消息。 ## rabbitmq当中的术语 ## RabbitMQ和一般的消息传递机制类似,使用了一些术语。主要包括producer,queue,consumer。 * produce就是发送的意思。 发送消息的程序就是producer: ![aHR0cHM6Ly93d3cucmFiYml0bXEuY29tL2ltZy90dXRvcmlhbHMvcHJvZHVjZXIucG5n][] * queue就是RabbitMQ中的邮箱的名称。 消息流经RabbitMQ和我们的应用程序,但它们只能存储在一个具体的queue中。 queue只受主机的内存和磁盘限制的约束,它本质上是一个大的消息缓冲区。 不同的producer可以发送消息到同一个队列当中,并且不同的consumer消费者也可以尝试从一个队列接收数据。 下面的图可以用来表示queue ![aHR0cHM6Ly93d3cucmFiYml0bXEuY29tL2ltZy90dXRvcmlhbHMvcXVldWUucG5n][] * Consume的意思和接受不了的意思相近。一个接收消息的应用程序就是一个consumer。 ![aHR0cHM6Ly93d3cucmFiYml0bXEuY29tL2ltZy90dXRvcmlhbHMvY29uc3VtZXIucG5n][] 值得注意的是,生产者,消费者和broker不必都存在同一个主机上; 实际上在大多数应用中他们没有在一个主机上。 同一个应用程序也可以既是生产者又是消费者。 # rabbitmq hello world/queue # 本例子基于java 消息发送代码: import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } } 消息接收代码: import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.pri channel.queueDeclare(QUEUE_NAME, false, false, false, null); ntln(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } } ### Sender解析 ### 上面代码的功能如下图所示: ![(P) -> \[|||\] -> (C)][P_ -_ _ -_ _C] P代表消息生产者producer,C代表消息消费者consumer。上面代码实现的功能就是生产者发送一条消息,consumer接收这条消息并打印出来 Sender代码解析: 首先我们定义了queue的名字: private final static String QUEUE_NAME = "hello"; 接着,既然我们想要往rabbitmq上发送消息,那么我们必须要连接到rabbitmq上,因此我们创建一个连接到rabbitmq server上的connection: onnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { } Connection抽象了socket连接,并为我们负责协议版本协商和身份验证等。这里我们连接到本地机器上的broker ->因此传入的参数是localhost。 如果我们想要连接到不同机器上的broker,我们只需在此处指定其名称或IP地址。 接下来,我们创建一个channel,这是完成broker任务的大部分API所在的位置。 注意我们可以使用try-with-resources语句,因为Connection和Channel都实现了java.io.Closeable接口。这样我们就不需要在代码中再去关闭它们。 要发送消息,我们必须声明一个queue供我们发送;声明了queue之后我们可以在try-with-resources语句中将消息发布到队列中 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); 声明队列是幂等的 ->即只有在该队列不存在的情况下才能够创建它。消息内容是一个字节数组,因此可以编码我们想发送的任何内容。 ### Receiver解析 ### 消息消费者会监听rabbitmq里的message,所以与消息发布者不一样的是,消息消费者需要一直running并监听消息并打印他们。消息消费者处于相对被动的位置。 Recv类中的代码与Send类中的代码几乎是类似的。建立一个connection和一个channel,声明一个我们要消费的queue。 注意这里的queue要和消息生产者中发布的相匹配。 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 请注意,我们也消费者里也声明了队列。 原因在于在代码中我们并没有强制要求生产者是先启动的,从技术上来说先启动消息消费者不会有任何问题,因此我们需要在尝试使用消息之前确保队列存在。 为什么我们不使用try-with-resource语句来自动关闭通道和连接? 通过这样做,我们只需让程序继续运行,关闭所有内容,然后退出!原因在于我们的消息机制是异步的,我们需要当消息到达时,消费者进程保持活动状态。 我们需要告诉rabbitmq服务器从队列中传递消息。 因为它会异步地向我们发送消息,所以我们以对象的形式提供一个回调,它将缓冲收到的消息,直到我们准备好使用它们。 这就是DeliverCallback子类的作用。 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); # rabbitmq轮询调度机制 # 当有多个consumer都监听同一个queue上的message的时候,rabbitmq每次发送的消息会轮流发给这些consumer,这样的机制叫做Round-robin dispatching->轮询调度。 这保证每个消费者得到的消息数量都是大致一样多的。 例子程序比较简单,只需要起两个consumer监听同一个queue,然后不停的用生产者发送消息就可以了。 ## 消息确认 ## 消息确认即Message acknowledgment。这是一种机制。其目的是用于保证消息不会丢失。 在上面的例子程序中,当rabbitmq向消费者发送消息之后,它就会将这条消息从rabbitmq当中删除。这里的问题在于虽然向消费者发送了这条消息,但是我们却无从知晓消费者是否真的收到了这条消息。 为了确保消息永不丢失,RabbitMQ支持消息确认机制。 消费者在在收到,处理了消息之后发回ack(nowledgement)告诉RabbitMQ,之后RabbitMQ就可以删除它。 如果消费者因为某种原因挂掉(通道关闭,连接关闭或TCP连接丢失)而不发送确认,RabbitMQ将理解为消息未完全处理并会将消息重新排队。 如果同时有其他在线消费者,则会迅速将其重新发送给其他消费者。 这样就可以确保没有消息丢失,即使在消息消费者挂掉的情况下。 在rabbitmq当中没有任何消息超时, 即使处理消息需要非常长的时间,也没关系。 ## 消息持久性 ## 我们已经学会了如何确保即使消费者挂掉的时候,任务也不会丢失。 但是如果RabbitMQ服务器本身停止,我们的任务仍然会丢失。 当RabbitMQ退出或崩溃时,它将丢失所有的队列和消息,但是我们可以通过调整设置来确保消息不会丢失,为了达到这个效果我们需要做两件事:**将队列和消息都标记为持久(durable)的**。 首先,我们需要让RabbitMQ永远不会丢失我们的队列。 为此,我们需要声明它是durable持久的: boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null); queueDeclare方法的需要在生产者和消费者代码中都更改才能保证queue的更改生效。 在这样修改之后,即使RabbitMQ重新启动,task\_queue队列也不会丢失。 现在还需要做的是将消息标记为durable的,方法是通过将MessageProperties(继承自BasicProperties)的属性设置为PERSISTENT\_TEXT\_PLAIN。 import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); 通过这样做能保证所有的消息都决不会丢失了吗?答案是否定的。 当RabbitMQ接受到消息并且尚未保存消息时,仍然有一个很短的时间窗口存在消息丢失的可能性。 此外,RabbitMQ不会为每条消息执行fsync - 即它可能只是保存到缓存而不是真正写入磁盘。 虽然持久性保证不强,但对于我们简单的任务队列来说已经足够了。 如果需要更强的保证,那么我们可以使用发布者确认。 **问题:持久化的消息存在哪里?** ![format_png][] ## 公平派遣 ## 如果细心的话我们不难发现此时消息调度仍然无法完全按照我们的意愿运行。 例如,在有两个consumer的情况下,无论两个consumer是否以及处理完了收到的消息发回确认,RabbitMQ对此一无所知,仍然会均匀地发送消息。 发生这种情况是因为RabbitMQ只是在消息进入队列时调度消息。 它不会查看消费者未确认消息的数量。 而只是盲目地向第n个消费者发送每个第n条消息。 为了避免这种情况,我们可以使用basicQos方法并设置prefetchCount = 1。 这个配置将告诉RabbitMQ一次不向一个worker发送多于一条消息。 换句话说,在处理并确认前一个消息之前,不要向consumer发送新消息。 相反,它会将该消息发送给下一个不那么忙的的consumer,也就是处理完了上一个消息并发回确认的consumer。 queue是有大小限制的,当消息过多时,queue会被填满,我们在实际使用时需要留意这样的情况,当遇到时可以考虑增加consumer或者调整策略等。 # Publish/Subscribe # 上面我们介绍的所有的知识,接收消息的消费者始终只有一个consumer,从这个角度来说,这种消息模式是一对一的,其消息发布和接收是通过queue。除了这种消息模式之外,我们还有一种重要的消息模式,即同样的消息被发送给多个consumer,这种消息模式叫做发布/订阅模式。 ### Exchanges ### RabbitMQ中消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列。 实际上,生产者通常甚至不知道消息是否会被发送到队列。 相反,生产者只能向exchaneg发送消息。 exchange是一个非常简单的抽象。 一方面,它接收来自生产者的消息,另一方面将它们推送到队列。 exchange必须确切知道如何处理它收到的消息。 它应该发送消息到特定队列吗? 它应该发送消息到许多队列吗? 或者它应该将消息丢弃。 其规则由exchange的类型定义。 ![format_png 1][] 有几种exchange类型可供选择:direct, topic, headers和 fanout。 我们以最后一个 - fanout为例。 创建一个这种类型的exchange,代码如下: channel.exchangeDeclare("logs", "fanout"); fanout类型的exchange非常简单。 它只是将收到的所有消息广播到它知道的所有队列中。 而这正是我们记日志的logger所需要的。 接下来我们就可以向我们创建的这个exchange发送消息了 channel.basicPublish( "logs", "", null, message.getBytes()); ### 临时队列Temporary queues ### 在上面我们使用过具有特定名称的队列(hello和task\_queue)。 能够命名队列对我们来说至关重要 - 我们需要将消费者指向同一个队列。 当想要在生产者和消费者之间**共享一个队列时,为队列命名非常重要。** 但logger并非如此。 我们希望了解所有日志消息,而不仅仅是它们的一部分。 我们也只对目前流动的消息感兴趣,而不是旧消息。 要解决这个问题,需要做两件事。 首先,每当我们连接到Rabbit时,我们都需要一个新的空队列。 为此,我们可以使用随机名称创建队列,或者更好 - 让服务器为我们选择随机队列名称。 其次,一旦我们断开消费者,就应该自动删除队列。 在Java的rabbitmq client中,当我们没有向queueDeclare()提供参数时,我们使用生成的名称创建一个非持久的,独占的自动删除队列: String queueName = channel.queueDeclare().getQueue(); 此时,queueName就是一个随机队列名称。它的格式类似于amq.gen-JzTY20BRgKO-HjmUJj0wLg。 ### Bindings ### binding的示意图如下: ![format_png 2][] 在上面的代码当中我们已经创建了一个fanout类型的exchange和一个队列。 现在我们需要告诉exchange将消息发送到我们的队列。 **exchange和队列queue之间的关系就称为绑定binding**。 channel.queueBind(queueName, "logs", ""); 现在logs这个exchange就会将消息发送到我们定义的queue上面了,如下图所示: ![format_png 3][] ## Routing ## 在上面的介绍当中我们已经知道了binding是exchange和queue之间的一种关系。 queue会监听从exchaneg发过来的消息。在binding这个概念当中我们可以配置一个额外的参数:binding key(注意和routing key区分,routing key有一种动态的意思在里面)。我们可以按如下方式创建一个有bindingkey参数的queue: channel.queueBind(queueName, EXCHANGE_NAME, "black"); 这里的black就是binding key。binding key的具体作用取决于它所依赖的exchange的类型,例如对于fanout类型的exchange,这个参数就是无意义的,即便配置了也会被直接忽略。因为我们知道fanout类型的exchange会发消息给所有binding的queue。 ### Direct exchange ### Direct exchange是区别于fanout exhcnage的另一种exchange 的类型,它的作用在于它可以支持配置binding key,当direct exchange收到消息时,它只会把消息发送给binding key和message的routing key匹配的queue。以下图为例,假设我们现在有一套queue和exchange定义如下: ![format_png 4][] 我们可以看到X是一个direct类型的exchange,它绑定了两个queue。 第一个queue的binding key是orange,第二个queue有两个bingding,其binding key分别是back和green。 在这样的配置当中,一条消息的routing key假如是orange则会被routed到Q1,消息的routing key假如是black或者green就会去Q2.其他所有的消息则会被忽略。 ### Multiple bindings ### 如下图所示: ![format_png 5][] 一个exchange用同样的binding key绑定了Q1和Q2,则消息既会发送到Q1,也会发送到Q2. ## Topics ## 上面提到的direct exchange本质上给我们提供了一种筛选消息的能力,但是它仍然是不够的。我们需要另一个叫做topic exchange的东西。 发送到一个topic exchange的消息不能只有一个唯一的的routing key。它必须是一个list的消息,这些消息以逗号分割。binding key也是一样的形式。它和direct exchange在消息发送时的机制是类似的,将消息发给routing key和 binding key一致的queue。但是对于topic exchange来说有两个重要的case: * \* (star) 是通配符可以匹配任意字符 * \# (hash) 可以匹配0到多个字符 我们可以通过下图来理解这个机制: # ![format_png 6][] # 在这个例子当中,我们发送的消息都是拿来描述动物的。这些消息发送的时候会包含三个单词,由两个逗号来分割。第一个单词描述celerity速度,第二个单词描述colour颜色,第三个单词描述species种类。其格式就如下所示: <celerity>.<colour>.<species> 我们创建的binding则有三个:Q1的binding key是 "\*.orange.\*",Q2 是"\*.\*.rabbit"和"lazy.\#"。即下面的意思: * Q1 对所有orange颜色的动物感兴趣. * Q2 想知道所有的兔子,以及所有懒惰的动物。 此时,如果一条routing key为 qucik.orange.rabbit的消息发送过来,则它会被发送到两个queue当中。routing key为lazy.orange.elephant的消息也会到达这两条queue。 quick.orange.fox只会去第一条queue lazy.brown.fox只会去第二天queue。 lazy.pink.rabbit只会去第二条queue,且只会被发送一次,**尽管它满足这个条queue的两个binding** quick.brown.fox则会被忽略掉。 如果我们不按这个格式的binding key发送消息,比如发送一个单词或四个单词例如orange" 或者 "quick.orange.male.rabbit"。则这些消息全都会丢失掉。 然而lazy.orange.male.rabbit的routing key会被发送到第二条queue,因为它满足lazy.\# 这个binding。尽管它有四个单词仍然会被视作match。 我们不难发现topic exchange具有非常强大的功能,它也可以达到和其他类型exchange一样的效果。例如我们将binding key设置为\#的话则它就会接收所有消息,就达到了和fanout exchange一样的效果。 当\*和\#没有被使用时,topic exhcange的行为和direct exhcange就是一样的了。 ## RPC ## 我们前面介绍的通信方式,发送消息到某一个queue上面,有一个限制条件事实上,那就是我们的消息生产者和消费者是要在同一个环境里面的。现在,假如我们要发送一个消息到另外一台机器上,应该怎么来实现呢。这种通信形式我们一般称之为remote procedure call简称为rpc。 在rabbitmq里面我们是通过回调的方式来实现rpc。rabbitmq发送一个request给远端,同时在callback里面等待和处理response 逻辑示意图如下: ![format_png 7][] 其执行流程大致如下: * 对于RPC请求,客户端发送带有两个属性的消息:replyTo和correlationId(唯一) * 请求被发送到rpc\_queue队列。 * RPC worker(aka:server)正在等待该队列上的请求。 当出现请求时,它会执行该作业,并使用来自replyTo字段的队列将带有结果的消息发送回客户端。 * 客户端等待回复队列上的数据。 出现消息时,它会检查correlationId属性。 如果它与请求中的值匹配,则将响应返回给应用程序。 [aHR0cHM6Ly93d3cucmFiYml0bXEuY29tL2ltZy90dXRvcmlhbHMvcHJvZHVjZXIucG5n]: /images/20210920/3b09915cfe7941e68229530cee3e475e.png [aHR0cHM6Ly93d3cucmFiYml0bXEuY29tL2ltZy90dXRvcmlhbHMvcXVldWUucG5n]: /images/20210920/32cc56c6903f489eaaeca15bb7af9041.png [aHR0cHM6Ly93d3cucmFiYml0bXEuY29tL2ltZy90dXRvcmlhbHMvY29uc3VtZXIucG5n]: /images/20210920/e5ee24e09c3c4428bf791417ee4771d2.png [P_ -_ _ -_ _C]: /images/20210920/bd5b2a860b4e47f294793133618a5bc0.png [format_png]: /images/20210920/e4df299e770e4e76a65093670e80936a.png [format_png 1]: /images/20210920/1c1bb72a03744c3ab81510e0fd1b50cd.png [format_png 2]: /images/20210920/6f3deecd630a4711ade2078af6979bc7.png [format_png 3]: /images/20210920/5c25755fd93e45a89ce993cfe75aaf09.png [format_png 4]: /images/20210920/aaf4ae7b4ccf4666ad124d67baadc159.png [format_png 5]: /images/20210920/066536f9b76f41a4bedc0588bc908e12.png [format_png 6]: /images/20210920/ffb0896ac2af44c2b4866d65aa4278bd.png [format_png 7]: /images/20210920/62a3e581c22b4fbf8c5448b40bfa0f56.png
还没有评论,来说两句吧...