消息队列消费端性能优化
消费消息,避免拉取
RabbitMQ实现了两个不同的AMQP RPC命令来获取队列中的消息:
Basic.Get:是一个轮询模型(这种模式的性能通常比后低二倍以上)
Basic.Cunsume:是一个推送模型(即 发布-订阅模式)
所以使用推送模型,能很大程度提高RabbitMQ的消费能力。
注:RPC是一种C/S方式,跨语言跨系统的远程调用的方式。
RabbitMQ提供一下的方式来提升消费消息的能力(性能依次降低)
1.基于no-ack模式进行消费
ack模式即为应答模式,即消费者确认,我们可以通过关闭ack模式(默认是打开的)的方式实现更快的吞吐量。
但是我们要知道RabbitMQ将数据投递到消费者的过程中会进过系统自带的数据缓冲区。
由于缺少消费者确认,当系统出现网络波动会导致当前操作系统的套接字接收缓冲区爆满,从而影响程序的正常运行。
在linux系统可以通过增加net_core.rmem_default和net.core.rmem_max值,通常设置为16M即可。
sysctl -w net.core.rmem_max=16777216
sysctl -w net.core.rmem_default=16777216
2.基于确认和Qos>1进行消费(Qos 默认是1 ,默认每条消息都会确认)
Qos服务质量设置,即在确认消息接收之前,消费者可以预先要求接收一定数量的消息。QoS设置允许RabbitMQ通过为消费者预先分配一定数量的消息来实现更高效地消息发送。
这种方式不可以与no-ack同时设置。
它本身可以视为ack的一种优化,它不需要对每条消息进行确认,可以通过设置multiple为true,对所有以前没有进行确认的消息进行确认。
RabbitMQ可以设置basicQoS(Consumer Prefetch)来对consumer进行流控,从而限制未ack的消息数量
//每个consumer单独流控
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
//多个consumer共享流控
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true); // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
3.使用事务来批量进行
事务可能会对消息吞吐量产生负面影响,但有一个例外。如果你不适用QoS设置,那么在使用事务来批量确认消息时,实际上可能会看到略微的性能提升。
并且事务不适用于已禁止确认的消费者。
拒绝消息
当消息本身或消息处理的过程中出现问题,RabbitMQ提供了两种将消息踢回代理服务器的机制:
Basic.Reject: 一次只允许拒绝一个消息
Basic.Nack:一次可以拒绝多个消息
@Component
@Slf4j
public class RabbitConsumer {
/**
* 监听 queue-rabbit-springboot-advance 队列
*
* @param receiveMessage 接收到的消息
* @param message
* @param channel
*/
@RabbitListener(queues = "queue-rabbit-springboot-advance")
public void receiveMessage(String receiveMessage, Message message, Channel channel) {
//拒绝多个消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
//拒绝一个消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false, false);
//进行消息应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
死信交换器:
过期的消息、basic.nack或basic.reject且requeue参数为false或队列满的消息将进入此交换器。
RabbitMQ通过死信交换器将消息路由到绑定的队列,就像正常发送给交换器的任何其他消息一样。
死信功能还允许你使用预先指定的值覆盖路由键(死信交换器即可以绑定一个非死信的队列和一个死信的队列)。这样可以允许你使用一个交换器同时处理死信消息和非死信消息,
但需要确保死信消息不被投递到非死信队列中,需要在声明队列时指定一个额外的参数 x-dead-letter-routing-key
控制队列
控制队列的方向有以下几点:
对于临时队列(可以被删除的队列,比如一个聊天房)
自动删除队列
只允许单个消费者
自动过期队列
对永久队列(始终存在的队列,如用来处理信用卡操作的队列)
队列持久性
队列中消息自动过期
其他可以设置的参数
在springboot项目中生命一个队列的方式
- 队列通常与交换器绑定,所以在定义生产者的时候,就会绑定声明队列
Queue的创建可以new,也可以借助QueueBuider.durable来创建
@Bean(“deadLetterQueue”)
public Queue deadLetterQueue() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 声明 死信队列Exchange
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 声明 死信队列抛出异常重定向队列的routingKey(TKEY_R)
args.put("x-dead-letter-routing-key", DEAD_LETTER_REDIRECT_ROUTING_KEY);
return QueueBuilder.durable(DEAD_LETTER_QUEUE).withArguments(args).build();
}
还没有评论,来说两句吧...