SpringBoot-RabbitMQ篇(2)-消息可靠投递 悠悠 2022-08-31 02:14 119阅读 0赞 ### 文章目录 ### * 一、发送者异常监控 * * 1.1 发送者异常种类 * 1.2 消息发送回调 * 二、消息持久化 * * 2.1 持久化说明 * 2.2 持久化表 * 2.3 持久化实现 * 2.3 补偿机制 * 2.5 消息测试 # 一、发送者异常监控 # ## 1.1 发送者异常种类 ## 1. 基本处理流程 ![ac6cb91389a7737b89e4c77dca1fd7f4.png_height_535_id_sESOk_originHeight_535_originWidth_1048_originalType_binary_size_0_status_done_style_none_width_1048][] * 补偿(兜底)方案 ![8fbca8fe4a9485e489aee091fce40210.png_height_477_id_E9rJZ_originHeight_477_originWidth_1042_originalType_binary_size_0_status_done_style_none_width_1042][] 2. 模拟broker宕机:修改发送者端口如5673,然后启动,发送消息,端口不对无法连接主机 * 错误信息:java.net.ConnectException: Connection timed out: connect * 补偿方案:加入异常处理,如果不可达则返回错误 * 这种错误在发送的时候就已经可以发现,直接将错误返回给调用方即可 @RequestMapping("/direct") public Object sendEmail(String msg) { try { rabbitTemplate.convertAndSend("exchange.direct.springboot.email", "queue.email.routing.key", msg); return msg; } catch (AmqpException e) { System.out.println("发送出现异常:" + e.getMessage()); return "网络中断,请稍后再试"; } } 1. 模拟无交换器异常 * 错误信息 * ERROR 4880 — \[.200.57.39:5672\] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: \#method<channel.close>(reply-code=404, reply-text=NOT\_FOUND - no exchange ‘noExchange’ in vhost ‘/’, class-id=60, method-id=40) * 错误说明:如果没有交换器并不会报错,只会输出一条日志 * 补偿方案:需要采用发送回调来确认是否成功发送消息 2. 模拟无路由异常 * 错误信息:无任何提示,消息直接被丢弃 * 补偿方案:需要采用发送回调来确认是否成功发送消息 ## 1.2 消息发送回调 ## 1. 因为消息是异步发送,所以需要确保消息能正确发送 2. 所以可配置RabbitTemplate然后指定回调信息 3. 步骤01:修改配置文件,配置回调参数 * publisher-confirm-type * org.springframework.boot.autoconfigure.amqp.RabbitProperties\#publisherConfirmType * org.springframework.amqp.rabbit.connection.CachingConnectionFactory.ConfirmType spring: rabbitmq: host: 127.0.0.1 port: 5672 username: tianxin password: tianxin # 开启消息发broker回调 publisher-confirm-type: correlated # 开启路由消息路由回调 publisher-returns: true # 强制确认,也可以在代码中开启 template: mandatory: true /** * The type of publisher confirms to use. */ public enum ConfirmType { /** * Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()} * within scoped operations. */ SIMPLE, /** * Use with {@code CorrelationData} to correlate confirmations with sent * messsages. */ CORRELATED, /** * Publisher confirms are disabled (default). */ NONE } 1. 步骤02:配置RabbitTemplate,设置交换器确认回调和路由回调 * setConfirmCallback:无论成功与否都会调用 * setReturnCallback:错误时才调用 import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Objects; @Configuration public class CustomRabbitTemplate { @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); // 开启mandatory为true才能触发回调方法,无论消息推送结果如何强制调用回调方法 rabbitTemplate.setMandatory(true); // 设置连接工厂信息 rabbitTemplate.setConnectionFactory(connectionFactory); // 消息发broker回调:发送者到broker的exchange是否正确找到 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { System.out.println("setConfirmCallback 消息数据:" + correlationData); if (Objects.nonNull(correlationData)) { System.out.println("setConfirmCallback 消息数据:" + correlationData.getReturnedMessage()); } System.out.println("setConfirmCallback 消息确认:" + ack); System.out.println("setConfirmCallback 原因:" + cause); System.out.println("-----------------------------------"); }); // 消息路由回调:从交换器路由到队列是否正确发送 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { System.out.println("setReturnCallback 消息:" + message); System.out.println("setReturnCallback 回应码:" + replyCode); System.out.println("setReturnCallback 回应信息:" + replyText); System.out.println("setReturnCallback 交换器:" + exchange); System.out.println("setReturnCallback 路由键:" + routingKey); System.out.println("-----------------------------------"); }); return rabbitTemplate; } } * 路由回调和消息回调 /** * A callback for publisher confirmations. * */ @FunctionalInterface public interface ConfirmCallback { /** * Confirmation callback. * @param correlationData correlation data for the callback. * @param ack true for ack, false for nack * @param cause An optional cause, for nack, when available, otherwise null. */ void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause); } /** * A callback for returned messages. * */ @FunctionalInterface public interface ReturnCallback { /** * Returned message callback. * @param message the returned message. * @param replyCode the reply code. * @param replyText the reply text. * @param exchange the exchange. * @param routingKey the routing key. */ void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey); } 1. 步骤03:测试controller * convertAndSend(String exchange, String routingKey, final Object object, @Nullable CorrelationData correlationData) * 指定CorrelationData(关联数据/对比数据) * CorrelationData中可以指定消息id和回调消息 * \{“id”: “dataId”, data: “biz数据”\} 2. 测试无交换器 * http://127.0.0.1:8071/noExchange?message=direct * 找不到交换器:直接回调**setConfirmCallback**,不再继续调用 @RequestMapping("/noExchange") public Object noExchange(String message) { try { // 连接不上路由,则消息直接丢弃 String id = UUID.randomUUID().toString(); rabbitTemplate.convertAndSend("noExchange", "springboot.email.routing.key", message, new CorrelationData(id)); return "ok"; } catch (AmqpException e) { System.out.println(e.getMessage()); return e.getMessage(); } } setConfirmCallback 消息数据:CorrelationData [id=9aca9a83-5815-455b-acf0-71b0caed534c] setConfirmCallback 消息数据:null setConfirmCallback 消息确认:false setConfirmCallback 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noExchange' in vhost '/', class-id=60, method-id=40) 1. 测试无路由 * http://127.0.0.1:8071/noQueue?message=direct * 找不到路由:先回调**setReturnCallback**再回调**setConfirmCallback** @RequestMapping("/noQueue") public Object noQueue(String message) { try { // 发送不到队列 ,则消息直接丢弃 String id = UUID.randomUUID().toString(); rabbitTemplate.convertAndSend("exchange.direct.springboot.email", "noQueue", message, new CorrelationData(id)); return "ok"; } catch (AmqpException e) { System.out.println(e.getMessage()); return e.getMessage(); } } setReturnCallback 消息:(Body:'direct' MessageProperties [headers={ spring_returned_message_correlation=a4b6e77c-4b13-48e4-9a2e-21bd6ef4a697}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) setReturnCallback 回应码:312 setReturnCallback 回应信息:NO_ROUTE setReturnCallback 交换器:exchange.direct.springboot.email setReturnCallback 路由键:noQueue ----------------------------------- setConfirmCallback 消息数据:CorrelationData [id=a4b6e77c-4b13-48e4-9a2e-21bd6ef4a697] setConfirmCallback 消息数据:(Body:'direct' MessageProperties [headers={ spring_listener_return_correlation=42813c45-b804-4303-b9f0-10a73dad71ca, spring_returned_message_correlation=a4b6e77c-4b13-48e4-9a2e-21bd6ef4a697}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=exchange.direct.springboot.email, receivedRoutingKey=noQueue, deliveryTag=0]) setConfirmCallback 消息确认:true setConfirmCallback 原因:null 1. 测试消息正常发送 * http://127.0.0.1:8071/direct/confirm?message=direct * 消息发送成功:只回调**setConfirmCallback** @RequestMapping("/direct/confirm") public Object directConfirm(String message) { try { String id = UUID.randomUUID().toString(); rabbitTemplate.convertAndSend("exchange.direct.springboot.email", "springboot.email.routing.key", message, new CorrelationData(id)); return "ok"; } catch (AmqpException e) { System.out.println(e.getMessage()); return "网络中断,请稍后再试~"; } } setConfirmCallback 消息数据:CorrelationData [id=9bb8a203-2345-4a7e-8bfd-8ad0226da4dc] setConfirmCallback 消息数据:null setConfirmCallback 消息确认:true setConfirmCallback 原因:null 1. 指定回调消息的id和消息数据 * http://127.0.0.1:8071/correlationData/message?msg=direct * 可以指定更多业务类型 * org.springframework.amqp.core.Message * org.springframework.amqp.core.MessageProperties @RequestMapping("/correlationData/message") public Object correlationDataMessage(String msg) { try { String id = UUID.randomUUID().toString(); CorrelationData correlationData = new CorrelationData(); correlationData.setId(id); // 指定回调更多信息 MessageProperties properties = new MessageProperties(); properties.setMessageId(id); Message message = new Message(msg.getBytes(), properties); correlationData.setReturnedMessage(message); rabbitTemplate.convertAndSend("exchange.direct.springboot.email", "springboot.email.routing.key", msg, correlationData); return msg; } catch (AmqpException e) { System.out.println(e.getMessage()); return "网络中断,请稍后再试~"; } } setConfirmCallback 消息数据:CorrelationData [id=9f598758-4b0b-4e4a-981a-e7e04eab1335] setConfirmCallback 消息数据:(Body:'[B@1465d3ea(byte[6])' MessageProperties [headers={ }, messageId=9f598758-4b0b-4e4a-981a-e7e04eab1335, contentType=application/octet-stream, contentLength=0, deliveryMode=PERSISTENT, priority=0, deliveryTag=0]) setConfirmCallback 消息确认:true setConfirmCallback 原因:null # 二、消息持久化 # ## 2.1 持久化说明 ## 1. 消息在网络发送、网络传输、存盘等都有可能出现意外而导致消息丢失 2. 例如如果队列、交换器、消息其中一个没有开启持久化,在broker重启后消息丢失 3. 所以需要在消息发送前进行存盘,然后根据状态区分不同的消息种类,可以用来做重试等 ## 2.2 持久化表 ## 1. 持久化需要创建存储消息的表结构 create table msg_log ( id bigint primary key comment '消息唯一标识', msg text null comment '消息体, json格式化', exchange varchar(255) default '' null comment '交换机', routing_key varchar(255) default '' null comment '路由键', status int default -1 null comment '状态: -1新建 0投递中 1投递成功 2投递失败 3已消费 4人工处理 5消费失败', try_count int default 0 null comment '重试次数', next_try_time datetime null comment '下一次重试时间', origin_id varchar(32) null comment '原始id', note varchar(500) null comment '错误信息', create_time datetime null comment '创建时间', update_time datetime null comment '更新时间', ) comment '消息投递日志'; ## 2.3 持久化实现 ## 1. 使用MybatisPlus生成对应的service、mapper、domain信息,标准mybatis使用方式 2. 首先需要配置rabbitTemplate配置回调信息 import com.alibaba.fastjson.JSONObject; import com.codecoord.domain.MsgLog; import com.codecoord.domain.MsgLogStatus; import com.codecoord.serivce.MsgLogService; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; import java.time.LocalDateTime; import java.util.Objects; import java.util.UUID; @Configuration public class CustomRabbitTemplate { @Resource private MsgLogService msgLogService; @Bean public RabbitTemplate callbackRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(); // 需要设置连接工程 template.setConnectionFactory(connectionFactory); // 设置强制性 template.setMandatory(true); // setConfirmCallback: 消息发送到 Broker 后触发回调(是否正确到达Exchange中) // 需要在配置文件中开启 publisher-confirm-type: correlated 配置 template.setConfirmCallback((correlationData, ack, cause) -> { if (Objects.nonNull(correlationData) && Objects.nonNull(correlationData.getId())) { MsgLog updateLog = new MsgLog(); updateLog.setId(Long.parseLong(correlationData.getId())); updateLog.setUpdateTime(LocalDateTime.now()); if (ack) { updateLog.setStatus(MsgLogStatus.DELIVERY_SUCCESS); } else { updateLog.setStatus(MsgLogStatus.DELIVERY_FAIL); } msgLogService.updateById(updateLog); } else { System.out.println("消息异常处理"); } // 根据ack判断是否投递成功 System.out.println("setConfirmCallback 消息数据:" + JSONObject.toJSONString(correlationData)); System.out.println("setConfirmCallback 消息确认:" + ack); System.out.println("setConfirmCallback 原因:" + cause); System.out.println("-----------------------------------"); }); // setReturnCallback: 启动消息失败返回,比如路由不到队列时触发回调 // 需要在配置文件中开启 publisher-returns: true 配置 template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 消息无法投递到队列,新建消息人工处理,因为原始消息会在setConfirmCallback中被置为投递成功 MsgLog msgLog = new MsgLog(); msgLog.setMsg(message.toString()); msgLog.setExchange(exchange); msgLog.setRoutingKey(routingKey); msgLog.setStatus(MsgLogStatus.MANUAL_HANDLING); msgLog.setTryCount(0); LocalDateTime currentTime = LocalDateTime.now(); msgLog.setNote(replyText); msgLog.setCreateTime(currentTime); msgLog.setUpdateTime(currentTime); // 处理原始id MsgLog originLog = JSONObject.parseObject(new String(message.getBody()), MsgLog.class); msgLog.setOriginId(originLog.getId().toString()); msgLogService.save(msgLog); System.out.println("setReturnCallback 消息:" + message); System.out.println("setReturnCallback 回应码:" + replyCode); System.out.println("setReturnCallback 回应信息:" + replyText); System.out.println("setReturnCallback 交换器:" + exchange); System.out.println("setReturnCallback 路由键:" + routingKey); System.out.println("-----------------------------------"); }); return template; } } 1. 消息发送前对消息存盘,这里使用的rabbitTemplate为新配置的模板 1. http://localhost:8071/reliable?message=direct&exchange=noExchange&routingKey=reliable.routing.key 2. http://localhost:8071/reliable?message=direct&exchange=exchange.direct.reliable&routingKey=noQueue 3. http://localhost:8071/reliable?message=direct&exchange=exchange.direct.reliable&routingKey=reliable.routing.key import com.alibaba.fastjson.JSONObject; import com.codecoord.domain.MsgLog; import com.codecoord.domain.MsgLogStatus; import com.codecoord.serivce.MsgLogService; import org.springframework.amqp.AmqpException; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.util.IdGenerator; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.time.LocalDateTime; @RestController public class RabbitReliableController { @Resource private RabbitTemplate callbackRabbitTemplate; @Resource private MsgLogService msgLogService; @Resource private IdGenerator idGenerator; @RequestMapping("/reliable") public Object direct(String exchange, String routingKey, String message) { try { // 先存盘再发送,如果存盘失败则没有必要继续发送 MsgLog msgLog = saveMessageLog(exchange, routingKey, message); CorrelationData correlationData = new CorrelationData(msgLog.getId().toString()); callbackRabbitTemplate.convertAndSend(exchange, routingKey, JSONObject.toJSONString(msgLog), correlationData); return msgLog; } catch (AmqpException e) { System.out.println(e.getMessage()); return "网络中断,请稍后再试~"; } } private MsgLog saveMessageLog(String exchange, String routingKey, String msg) { MsgLog msgLog = new MsgLog(); // 测试,生产中使用id生成器 msgLog.setId(System.currentTimeMillis()); msgLog.setMsg(msg); msgLog.setStatus(MsgLogStatus.CREATE); msgLog.setExchange(exchange); msgLog.setRoutingKey(routingKey); msgLog.setTryCount(0); LocalDateTime currentTime = LocalDateTime.now(); msgLog.setCreateTime(currentTime); msgLog.setUpdateTime(currentTime); msgLogService.save(msgLog); return msgLog; } } ## 2.3 补偿机制 ## 1. 可以使用补偿机制来将存盘的消息重新发送 2. 使用springboot自带的定时任务 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.codecoord.domain.MsgLog; import com.codecoord.serivce.MsgLogService; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.time.LocalDateTime; import java.util.List; @Component @EnableScheduling public class RabbitMqJob { @Resource private MsgLogService msgLogService; @Resource private RabbitTemplate callbackRabbitTemplate; @Scheduled(cron = "10/10 * * * * ?") public void msgResend() { // 每个消息最多重试三次 LambdaQueryWrapper<MsgLog> retryMsg = Wrappers.<MsgLog>lambdaQuery() .eq(MsgLog::getStatus, -1) .lt(MsgLog::getTryCount, 3); List<MsgLog> msgLogList = msgLogService.list(retryMsg); for (MsgLog msgLog : msgLogList) { msgLog.setTryCount(msgLog.getTryCount() + 1); msgLog.setUpdateTime(LocalDateTime.now()); LambdaUpdateWrapper<MsgLog> updateWrapper = Wrappers.<MsgLog>lambdaUpdate() .eq(MsgLog::getId, msgLog.getId()); boolean update = msgLogService.update(msgLog, updateWrapper); System.out.println("重试状态更新:" + update); callbackRabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), msgLog.getMsg(), new CorrelationData(msgLog.getId().toString())); } } } ## 2.5 消息测试 ## 1. 消息存盘之后效果如下 ![bd1fbec785370ea799b10d2fab3dbf44.png_height_250_id_LzWUW_originHeight_250_originWidth_1628_originalType_binary_size_0_status_done_style_none_width_1628][] [ac6cb91389a7737b89e4c77dca1fd7f4.png_height_535_id_sESOk_originHeight_535_originWidth_1048_originalType_binary_size_0_status_done_style_none_width_1048]: /images/20220829/69e41a2f9c9c4c34be196cf81249f779.png [8fbca8fe4a9485e489aee091fce40210.png_height_477_id_E9rJZ_originHeight_477_originWidth_1042_originalType_binary_size_0_status_done_style_none_width_1042]: /images/20220829/9f53331e5bca471da3c3672a6f1ff841.png [bd1fbec785370ea799b10d2fab3dbf44.png_height_250_id_LzWUW_originHeight_250_originWidth_1628_originalType_binary_size_0_status_done_style_none_width_1628]: /images/20220829/a5ded9dbfe944582bbd24f4ad3c443bf.png
还没有评论,来说两句吧...