《Rabbit MQ 实战》读书笔记 (二:事务与消息确认模式) 傷城~ 2022-01-05 04:47 168阅读 0赞 AMQP协议的一个亮点就是对消息的可靠性投递-事务。 事务在AMQP-0-9-1中正式成为规范的一部分。虽然AMQP事务保证了在信道开启了事务模式后,全部命令的执行成功。但AMQP事务大大降低了Rabbit的吞吐量,性能极低。同时使用AMQP事务使得生产者与应用程序之间产生同步。 为了解决这个可靠性投递和性能的兼容性问题。RabbitMQ提供了发送方确认模式。 在客户端与Rabbit服务端的链接信道中,设置其属性为confirm模式。一旦信道开启了这个模式变不能再取消,只能通过重新创建来取消。 信道在confirm模式中,所有发布的消息都会被指派一个唯一的ID号(从1开始)。一旦消息投递成功,投递到所匹配的队列后,信道会发送一个包含消息唯一id的消息给生产者应用程序。如果消息和队列是可持久化的,那么确认消息会在队列将消息写入磁盘后发出。 与AMQP自身事务相比,发送方确认模式的是异步,在发送了消息等待确认的同时可以继续处理下一条消息。接受到确认消息时,生产者的回调函数处理。 如果Rabbit发生了内部错误导致了消息的丢失,Rabbit会发送一条nack(未确认)消息如同发送确认消息一样,说明消息已经丢失。 分别用java和Go两种编程语言来体验RabbitMQ的消息确认模式: 生产者: ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); factory.setHost("127.0.0.1"); factory.setPort(32769); Connection conn = null; try { conn = factory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { Channel channel = conn.createChannel(); channel.exchangeDeclare("exchangeWx", "direct", true); channel.queueBind("Queuewx", "exchangeWx","Queuewx"); //开启发送方确认模式 AMQP.Confirm.SelectOk selectOk = channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener(){ @Override public void handleAck(long l, boolean b) throws IOException { //参数2对boolean为是否该信道批量发送的消息 System.out.println("消息成功送达,id:"+l); } @Override public void handleNack(long l, boolean b) throws IOException { //参数2对boolean为是否该信道批量发送的消息 System.out.println("消息丢失,id:"+l); } }); //参数三指定了deliveryMode 为2,即对消息持久化 channel.basicPublish("exchangeWx", "Queuewx", MessageProperties.PERSISTENT_TEXT_PLAIN, "messageBodyBytes".getBytes()); } catch (IOException e) { e.printStackTrace(); } 消费者: ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); factory.setHost("127.0.0.1"); factory.setPort(32769); Connection conn = null; try { conn = factory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { Channel channel = conn.createChannel(); channel.exchangeDeclare("exchangeWx", "direct", true); channel.queueBind("Queuewx", "exchangeWx","Queuewx"); //参数2 boolean 为autoAck,是否自动确认消息已送达 channel.basicConsume("Queuewx", false, "myConsumerTag", new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); long messageId = envelope.getDeliveryTag(); // (process the message components here ...) System.out.println("收到routingKey:"+routingKey+"的消息"+new String(body,"UTF-8")); //basicConsume 的 autoAck 自动回复参数为false,故必须手动确认消息 channel.basicAck(messageId, false); } }); } catch (IOException e) { e.printStackTrace(); } 如果消费者没有手动处理消息已接收,则消息会一直保存在队列中。
还没有评论,来说两句吧...