RabbitMQ整合Spring Booot【消费者补偿幂等问题】 素颜马尾好姑娘i 2022-12-08 04:27 78阅读 0赞 如果消费者 运行时候 报错了 package com.toov5.msg.SMS; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues="fanout_sms_queue") public class SMSConsumer { @RabbitHandler public void process(String mString) { System.out.println("短信消费者获取生产者消息msg"+mString); int i = 1/0; } } 当生产者投递消息后: 消费者会不停的进行打印: ![ba1e68811a1414fac6848e110b014ec8.png][] 消息一直没有被消费 ![bc3c1333da5bc2ebc88654ce9fbce6b5.png][] 原因 Rabbitmq 默认情况下 如果消费者程序出现异常情况 会自动实现补偿机制 也就是 重试机制 @RabbitListener底层使用AOP进行拦截,如果程序没有抛出异常,自动提交事务。 如果Aop使用异常通知 拦截获取异常信息的话 , 自动实现补偿机制,该消息会一直缓存在Rabbitmq服务器端进行重放,一直重试到不抛出异常为准。 可以修改重试策略 一般来说默认5s重试一次, 消费者配置: listener: simple: retry: ####开启消费者重试 enabled: true ####最大重试次数(默认无数次) max-attempts: 5 ####重试间隔次数 initial-interval: 3000 效果: 充实5次 不行就放弃了 ![fe75addec6d3b2d08140d4698d2a7760.png][] ![8fb257dfb625d8d5a0856554527d2c18.png][] MQ重试机制机制 需要注意的问题 #### 如何合适选择重试机制 #### 情况1: 消费者获取到消息后,调用第三方接口,但接口暂时无法访问,是否需要重试? 需要重试 别人的问题不是我自己的问题 情况2: 消费者获取到消息后,抛出数据转换异常,是否需要重试? 不需要重试 充实一亿次也是如此 木有必要 需要发布版本解决 **总结:** * 对于情况2,如果消费者代码抛出异常是需要发布新版本才能解决的问题,那么不需要重试,重试也无济于事。应该采用 日志记录+定时任务job健康检查+人工进行补偿 * 把错误记录在日志里面,通过定时Job去自动的补偿,或通过人工去补偿。 传统的HTTP请求 如果失败了没法自动重试 ,当然自己可以写个循环实现。MQ完全自己自带的。 情况2的拓展延申: 将之前的案例改为 邮件消费者 调用邮件第三方接口 **伪代码:** 在consumer 中 调用接口后 判断返回值 由于RabbitMQ 在消费者异常时候 会进行重试机制 进行补偿 所以可以抛出个异常 来实现 Consumer: String result = template.Email(); if(result == null)\{ throw new Exception("调用第三方邮件服务器接口失败!"); \} producer: ![ffb1b094be957bbca3d2aea81956c260.png][] pom: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.itmayiedu</groupId> <artifactId>rabbitmq_producer_springboot</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent> <dependencies> <!-- springboot-web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 添加springboot对amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <!--fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.49</version> </dependency> </dependencies> </project> config: package com.itmayiedu.rabbitmq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; //Fanout 类型 发布订阅模式 @Component public class FanoutConfig { // 邮件队列 private String FANOUT_EMAIL_QUEUE = "fanout_email_queue"; // 短信队列 private String FANOUT_SMS_QUEUE = "fanout_sms_queue"; // fanout 交换机 private String EXCHANGE_NAME = "fanoutExchange"; // 1.定义邮件队列 @Bean public Queue fanOutEamilQueue() { return new Queue(FANOUT_EMAIL_QUEUE); } // 2.定义短信队列 @Bean public Queue fanOutSmsQueue() { return new Queue(FANOUT_SMS_QUEUE); } // 2.定义交换机 @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(EXCHANGE_NAME); } // 3.队列与交换机绑定邮件队列 @Bean Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange); } // 4.队列与交换机绑定短信队列 @Bean Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange); } } Producer: package com.itmayiedu.rabbitmq; import java.util.UUID; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageProperties; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; @Component public class FanoutProducer { @Autowired private AmqpTemplate amqpTemplate; public void send(String queueName) { JSONObject jsonObject = new JSONObject(); jsonObject.put("email", "xx@163.com"); jsonObject.put("timestamp", System.currentTimeMillis()); String jsonString = jsonObject.toJSONString(); System.out.println("jsonString:" + jsonString); // 设置消息唯一id 保证每次重试消息id唯一 /*Message message = MessageBuilder.withBody(jsonString.getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8") .setMessageId(UUID.randomUUID() + "").build();*/ amqpTemplate.convertAndSend(queueName, jsonString); } } Controller: package com.itmayiedu.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.itmayiedu.rabbitmq.FanoutProducer; @RestController public class ProducerController { @Autowired private FanoutProducer fanoutProducer; @RequestMapping("/sendFanout") public String sendFanout(String queueName) { fanoutProducer.send(queueName); return "success"; } } yml: spring: rabbitmq: ####连接地址 host: 192.168.91.6 ####端口号 port: 5672 ####账号 username: admin ####密码 password: admin ### 地址 virtual-host: /admin_toov5 启动类: package com.itmayiedu; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class AppProducer { public static void main(String[] args) { SpringApplication.run(AppProducer.class, args); } } Consumer: pom: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.itmayiedu</groupId> <artifactId>rabbitmq_consumer_springboot</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent> <dependencies> <!-- springboot-web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 添加springboot对amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-mail</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <!--fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.49</version> </dependency> </dependencies> </project> utils: package com.itmayiedu.rabbitmq.utils; import com.alibaba.fastjson.JSONObject; import org.apache.http.HttpEntity; import org.apache.http.HttpStatus; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * HttpClient4.3工具类 * * @author hang.luo */ public class HttpClientUtils { private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class); // 日志记录 private static RequestConfig requestConfig = null; static { // 设置请求和传输超时时间 requestConfig = RequestConfig.custom().setSocketTimeout(2000).setConnectTimeout(2000).build(); } /** * post请求传输json参数 * * @param url * url地址 * @param json * 参数 * @return */ public static JSONObject httpPost(String url, JSONObject jsonParam) { // post请求返回结果 CloseableHttpClient httpClient = HttpClients.createDefault(); JSONObject jsonResult = null; HttpPost httpPost = new HttpPost(url); // 设置请求和传输超时时间 httpPost.setConfig(requestConfig); try { if (null != jsonParam) { // 解决中文乱码问题 StringEntity entity = new StringEntity(jsonParam.toString(), "utf-8"); entity.setContentEncoding("UTF-8"); entity.setContentType("application/json"); httpPost.setEntity(entity); } CloseableHttpResponse result = httpClient.execute(httpPost); // 请求发送成功,并得到响应 if (result.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { String str = ""; try { // 读取服务器返回过来的json字符串数据 str = EntityUtils.toString(result.getEntity(), "utf-8"); // 把json字符串转换成json对象 jsonResult = JSONObject.parseObject(str); } catch (Exception e) { logger.error("post请求提交失败:" + url, e); } } } catch (IOException e) { logger.error("post请求提交失败:" + url, e); } finally { httpPost.releaseConnection(); } return jsonResult; } /** * post请求传输String参数 例如:name=Jack&sex=1&type=2 * Content-type:application/x-www-form-urlencoded * * @param url * url地址 * @param strParam * 参数 * @return */ public static JSONObject httpPost(String url, String strParam) { // post请求返回结果 CloseableHttpClient httpClient = HttpClients.createDefault(); JSONObject jsonResult = null; HttpPost httpPost = new HttpPost(url); httpPost.setConfig(requestConfig); try { if (null != strParam) { // 解决中文乱码问题 StringEntity entity = new StringEntity(strParam, "utf-8"); entity.setContentEncoding("UTF-8"); entity.setContentType("application/x-www-form-urlencoded"); httpPost.setEntity(entity); } CloseableHttpResponse result = httpClient.execute(httpPost); // 请求发送成功,并得到响应 if (result.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { String str = ""; try { // 读取服务器返回过来的json字符串数据 str = EntityUtils.toString(result.getEntity(), "utf-8"); // 把json字符串转换成json对象 jsonResult = JSONObject.parseObject(str); } catch (Exception e) { logger.error("post请求提交失败:" + url, e); } } } catch (IOException e) { logger.error("post请求提交失败:" + url, e); } finally { httpPost.releaseConnection(); } return jsonResult; } /** * 发送get请求 * * @param url * 路径 * @return */ public static JSONObject httpGet(String url) { // get请求返回结果 JSONObject jsonResult = null; CloseableHttpClient client = HttpClients.createDefault(); // 发送get请求 HttpGet request = new HttpGet(url); request.setConfig(requestConfig); try { CloseableHttpResponse response = client.execute(request); // 请求发送成功,并得到响应 if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { // 读取服务器返回过来的json字符串数据 HttpEntity entity = response.getEntity(); String strResult = EntityUtils.toString(entity, "utf-8"); // 把json字符串转换成json对象 jsonResult = JSONObject.parseObject(strResult); } else { logger.error("get请求提交失败:" + url); } } catch (IOException e) { logger.error("get请求提交失败:" + url, e); } finally { request.releaseConnection(); } return jsonResult; } } consumer: package com.itmayiedu.rabbitmq; import java.util.Map; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; import com.itmayiedu.rabbitmq.utils.HttpClientUtils; import com.rabbitmq.client.Channel; //邮件队列 @Component public class FanoutEamilConsumer { @RabbitListener(queues = "fanout_email_queue") public void process(String msg) throws Exception { System.out.println("邮件消费者获取生产者消息msg:" + msg); JSONObject jsonObject = JSONObject.parseObject(msg); // 获取email参数 String email = jsonObject.getString("email"); // 请求地址 String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email; JSONObject result = HttpClientUtils.httpGet(emailUrl); if (result == null) { // 因为网络原因,造成无法访问,继续重试 throw new Exception("调用接口失败!"); } System.out.println("执行结束...."); } } yml: spring: rabbitmq: ####连接地址 host: 192.168.91.6 ####端口号 port: 5672 ####账号 username: admin ####密码 password: admin ### 地址 virtual-host: /admin_toov5 listener: simple: retry: ####开启消费者异常重试 enabled: true ####最大重试次数 max-attempts: 5 ####重试间隔次数 initial-interval: 2000 server: port: 8081 启动类: package com.itmayiedu.rabbitmq; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class AppConsumer { public static void main(String[] args) { SpringApplication.run(AppConsumer.class, args); } } 邮件服务器: package com.mayikt.controller; import java.util.HashMap; import java.util.Map; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @SpringBootApplication @RestController public class MsgController { // 模拟第三方发送邮件 @RequestMapping("/sendEmail") public Map<String, Object> sendEmail(String email) { System.out.println("开始发送邮件:" + email); Map<String, Object> result = new HashMap<String, Object>(); result.put("code", "200"); result.put("msg", "发送邮件成功.."); System.out.println("发送邮件成功"); return result; } public static void main(String[] args) { SpringApplication.run(MsgController.class, args); } } yml: server: port: 8083 pom: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.mayikt</groupId> <artifactId>mayikt_sms</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent> <dependencies> <!-- springboot-web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 添加springboot对amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-mail</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <!--fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.49</version> </dependency> </dependencies> </project> 在没有启动邮件服务器时候,消费者调用接口失败会一直重试,重试五次。 在此期间,如果启动成功,则重试成功,不再重试, 不再进行补偿机制。 #### 消费者如果保证消息幂等性,不被重复消费 #### **背景:** 网络延迟传输中,或者消费出现异常或者是消费延迟,会造成进行MQ重试进行重试补偿机制,在重试过程中,可能会造成重复消费。 **解决办法:** 使用全局MessageID判断消费方使用同一个,解决幂等性。 只要重试过程中,判断如果已经走完了 不能再继续走 继续执行了 MQ消费者的幂等行的解决 一般使用全局ID 或者写个唯一标识比如时间戳 或者UUID 或者订单号 改进: producer: 添加: // 设置消息唯一id 保证每次重试消息id唯一 Message message = MessageBuilder.withBody(jsonString.getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8") .setMessageId(UUID.randomUUID() + "").build(); //消息id设置在请求头里面 用UUID做全局ID amqpTemplate.convertAndSend(queueName, message); 全部代码: package com.itmayiedu.rabbitmq; import java.util.UUID; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageProperties; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; @Component public class FanoutProducer { @Autowired private AmqpTemplate amqpTemplate; public void send(String queueName) { JSONObject jsonObject = new JSONObject(); jsonObject.put("email", "xx@163.com"); jsonObject.put("timestamp", System.currentTimeMillis()); String jsonString = jsonObject.toJSONString(); System.out.println("jsonString:" + jsonString); // 设置消息唯一id 保证每次重试消息id唯一 Message message = MessageBuilder.withBody(jsonString.getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8") .setMessageId(UUID.randomUUID() + "").build(); //消息id设置在请求头里面 用UUID做全局ID amqpTemplate.convertAndSend(queueName, message); } } 同样的 消费者也需要修改: 方法参数类型为 Message 然后可以获取这个ID 然后可以进行业务逻辑操作 @RabbitListener(queues = "fanout_email_queue") public void process(Message message) throws Exception { // 获取消息Id String messageId = message.getMessageProperties().getMessageId(); //id获取之 String msg = new String(message.getBody(), "UTF-8"); //消息内容获取之 System.out.println("-----邮件消费者获取生产者消息-----------------" + "messageId:" + messageId + ",消息内容:" + msg); if (messageId == null) { return; } JSONObject jsonObject = JSONObject.parseObject(msg); // 获取email参数 String email = jsonObject.getString("email"); // 请求地址 String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email; JSONObject result = HttpClientUtils.httpGet(emailUrl); if (result == null) { // 因为网络原因,造成无法访问,继续重试 throw new Exception("调用接口失败!"); } System.out.println("执行结束...."); //messId 的情况写入到redis 中 成功就修改为空 } 重试机制都是间隔性的 每次都是一个线程 单线程重试 ** 关于应答模式:** Spring boot 中进行 AOP拦截 自动帮助做重试 手动应答的话 ,如果不告诉服务器已经消费成功,则服务器不会删除 消息。告诉消费成功了才会删除。 消费者的yml加入: acknowledge-mode: manual spring: rabbitmq: ####连接地址 host: 192.168.91.6 ####端口号 port: 5672 ####账号 username: admin ####密码 password: admin ### 地址 virtual-host: /admin_toov5 listener: simple: retry: ####开启消费者异常重试 enabled: true ####最大重试次数 max-attempts: 5 ####重试间隔次数 initial-interval: 2000 ####开启手动ack acknowledge-mode: manual server: port: 8081 开启模式之后: 消费者参数需要加入: @Headers Map<String, Object> headers, Channel channel 代码逻辑最后面加入: // // 手动ack Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 手动签收 告诉RabbitMQ 消费成功了 消息可以删除了 channel.basicAck(deliveryTag, false); 代码如下: @RabbitListener(queues = "fanout_email_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { // 获取消息Id String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); System.out.println("邮件消费者获取生产者消息" + "messageId:" + messageId + ",消息内容:" + msg); JSONObject jsonObject = JSONObject.parseObject(msg); // 获取email参数 String email = jsonObject.getString("email"); // 请求地址 String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email; JSONObject result = HttpClientUtils.httpGet(emailUrl); if (result == null) { // 因为网络原因,造成无法访问,继续重试 throw new Exception("调用接口失败!"); } // // 手动ack Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 手动签收 channel.basicAck(deliveryTag, false); System.out.println("执行结束...."); } [ba1e68811a1414fac6848e110b014ec8.png]: /images/20221123/f77e118480cc4d1995806b628009ac58.png [bc3c1333da5bc2ebc88654ce9fbce6b5.png]: /images/20221123/2628a10c75484522a7610df9ed37b459.png [fe75addec6d3b2d08140d4698d2a7760.png]: /images/20221123/6a8bf0cdf6ec4ec6a38c38c36e38ec75.png [8fb257dfb625d8d5a0856554527d2c18.png]: /images/20221123/67d01139cf354cfe9186dba189419d18.png [ffb1b094be957bbca3d2aea81956c260.png]: /images/20221123/08bb444014b74fbbbb48f23d76527859.png
还没有评论,来说两句吧...