消息队列消费端性能优化

Myth丶恋晨 2023-07-11 12:54 69阅读 0赞

消费消息,避免拉取

RabbitMQ实现了两个不同的AMQP RPC命令来获取队列中的消息:
Basic.Get:是一个轮询模型(这种模式的性能通常比后低二倍以上)
Basic.Cunsume:是一个推送模型(即 发布-订阅模式)
所以使用推送模型,能很大程度提高RabbitMQ的消费能力。

注:RPC是一种C/S方式,跨语言跨系统的远程调用的方式。

RabbitMQ提供一下的方式来提升消费消息的能力(性能依次降低)

1.基于no-ack模式进行消费
ack模式即为应答模式,即消费者确认,我们可以通过关闭ack模式(默认是打开的)的方式实现更快的吞吐量。

但是我们要知道RabbitMQ将数据投递到消费者的过程中会进过系统自带的数据缓冲区。
由于缺少消费者确认,当系统出现网络波动会导致当前操作系统的套接字接收缓冲区爆满,从而影响程序的正常运行。

在linux系统可以通过增加net_core.rmem_default和net.core.rmem_max值,通常设置为16M即可。

  1. sysctl -w net.core.rmem_max=16777216
  2. sysctl -w net.core.rmem_default=16777216

2.基于确认和Qos>1进行消费(Qos 默认是1 ,默认每条消息都会确认)
Qos服务质量设置,即在确认消息接收之前,消费者可以预先要求接收一定数量的消息。QoS设置允许RabbitMQ通过为消费者预先分配一定数量的消息来实现更高效地消息发送。
这种方式不可以与no-ack同时设置。
它本身可以视为ack的一种优化,它不需要对每条消息进行确认,可以通过设置multiple为true,对所有以前没有进行确认的消息进行确认。
RabbitMQ可以设置basicQoS(Consumer Prefetch)来对consumer进行流控,从而限制未ack的消息数量

  1. //每个consumer单独流控
  2. channel.basicQos(10); // Per consumer limit
  3. channel.basicConsume("my-queue1", false, consumer1);
  4. channel.basicConsume("my-queue2", false, consumer2);
  5. //多个consumer共享流控
  6. channel.basicQos(10, false); // Per consumer limit
  7. channel.basicQos(15, true); // Per channel limit
  8. channel.basicConsume("my-queue1", false, consumer1);
  9. channel.basicConsume("my-queue2", false, consumer2);

3.使用事务来批量进行
事务可能会对消息吞吐量产生负面影响,但有一个例外。如果你不适用QoS设置,那么在使用事务来批量确认消息时,实际上可能会看到略微的性能提升。
并且事务不适用于已禁止确认的消费者。

拒绝消息

当消息本身或消息处理的过程中出现问题,RabbitMQ提供了两种将消息踢回代理服务器的机制:
Basic.Reject: 一次只允许拒绝一个消息
Basic.Nack:一次可以拒绝多个消息

  1. @Component
  2. @Slf4j
  3. public class RabbitConsumer {
  4. /**
  5. * 监听 queue-rabbit-springboot-advance 队列
  6. *
  7. * @param receiveMessage 接收到的消息
  8. * @param message
  9. * @param channel
  10. */
  11. @RabbitListener(queues = "queue-rabbit-springboot-advance")
  12. public void receiveMessage(String receiveMessage, Message message, Channel channel) {
  13. //拒绝多个消息
  14. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
  15. //拒绝一个消息
  16. channel.basicReject(message.getMessageProperties().getDeliveryTag(), false, false);
  17. //进行消息应答
  18. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  19. }

死信交换器:
过期的消息、basic.nack或basic.reject且requeue参数为false或队列满的消息将进入此交换器。
RabbitMQ通过死信交换器将消息路由到绑定的队列,就像正常发送给交换器的任何其他消息一样。
死信功能还允许你使用预先指定的值覆盖路由键(死信交换器即可以绑定一个非死信的队列和一个死信的队列)。这样可以允许你使用一个交换器同时处理死信消息和非死信消息,
但需要确保死信消息不被投递到非死信队列中,需要在声明队列时指定一个额外的参数 x-dead-letter-routing-key

控制队列

控制队列的方向有以下几点:

对于临时队列(可以被删除的队列,比如一个聊天房)
自动删除队列
在这里插入图片描述
只允许单个消费者
在这里插入图片描述
自动过期队列
在这里插入图片描述

对永久队列(始终存在的队列,如用来处理信用卡操作的队列)
队列持久性
在这里插入图片描述
队列中消息自动过期
在这里插入图片描述
其他可以设置的参数
在这里插入图片描述

在springboot项目中生命一个队列的方式

  • 队列通常与交换器绑定,所以在定义生产者的时候,就会绑定声明队列
  • Queue的创建可以new,也可以借助QueueBuider.durable来创建

    @Bean(“deadLetterQueue”)

    1. public Queue deadLetterQueue() {
    2. Map<String, Object> args = new HashMap<>(2);

    // x-dead-letter-exchange 声明 死信队列Exchange

    1. args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);

    // x-dead-letter-routing-key 声明 死信队列抛出异常重定向队列的routingKey(TKEY_R)

    1. args.put("x-dead-letter-routing-key", DEAD_LETTER_REDIRECT_ROUTING_KEY);
    2. return QueueBuilder.durable(DEAD_LETTER_QUEUE).withArguments(args).build();
    3. }

在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,69人围观)

还没有评论,来说两句吧...

相关阅读

    相关 队列消息重复消费解决

    如何保证消息不被重复消费(如何保证消息消费时的幂等性)? 其实这个很常见的一个问题,这俩问题基本可以连起来问。既然是消费消息,那肯定要考虑考虑会不会重复消费?能不能避免重