java——spring boot集成RabbitMQ——高级特效——可靠性投递——spring boot实现confirm机制...

男娘i 2023-09-29 20:59 17阅读 0赞

生产者:

pom文件:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>org.example</groupId>
  7. <artifactId>springrmqtopicsender</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <maven.compiler.source>8</maven.compiler.source>
  11. <maven.compiler.target>8</maven.compiler.target>
  12. </properties>
  13. <parent>
  14. <groupId>org.springframework.boot</groupId>
  15. <artifactId>spring-boot-starter-parent</artifactId>
  16. <version>2.4.5</version>
  17. <relativePath/>
  18. </parent>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-web</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-amqp</artifactId>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.springframework.boot</groupId>
  30. <artifactId>spring-boot-devtools</artifactId>
  31. <scope>runtime</scope>
  32. <optional>true</optional>
  33. </dependency>
  34. </dependencies>
  35. </project>

5544feac801774abddae0788894af70e.png

yml文件:

  1. server:
  2. port: 8082
  3. spring:
  4. rabbitmq:
  5. host: 127.0.0.1
  6. port: 5672
  7. username: guest
  8. password: guest
  9. publisher-confirm-type: correlated

3e06411d6060abcf8b3d54e14c8fc39e.png

56fb4243b30db5ecfc0442cad1659286.png

748915b9f4f40c675311111248309c9d.png

522d1916425343621c2b4c698d843d28.png

7ebc2fd4676db98f903c41064b92605f.png

设置config:

  1. package org.example.config;
  2. import org.springframework.amqp.core.TopicExchange;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /**
  6. * 主题交换机
  7. * topic策略可以根据routingKey的规则(通配符方式)进行去匹配队列进行转发规则为*.#.*
  8. */
  9. @Configuration
  10. public class RabbitTopicConfig
  11. {
  12. public final static String TOPIC_NAME = "amqp-topic";
  13. @Bean
  14. TopicExchange topicExchange()
  15. {
  16. return new TopicExchange(TOPIC_NAME,true,false);
  17. }
  18. }

cfd7be25ab14e605489a9a407ef68162.png

注入监听器:

  1. package org.example.config;
  2. import org.springframework.amqp.rabbit.connection.CorrelationData;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. import javax.annotation.PostConstruct;
  7. @Component
  8. public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback {
  9. //把监听器注入到RabbitTemplate中
  10. @Autowired
  11. RabbitTemplate rabbitTemplate;
  12. @PostConstruct
  13. public void init()
  14. {
  15. rabbitTemplate.setConfirmCallback(this);
  16. }
  17. @Override
  18. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  19. /**
  20. *
  21. * 生产者发送消息给交换机;交换机返回ack的同时,还会带一些消息云数据,云数据从correlationData中获得
  22. *
  23. * correlationData中包括:
  24. *
  25. * private volatile String id;
  26. *
  27. * private volatile ReturnedMessage returnedMessage;
  28. *
  29. * ============================================================================
  30. *
  31. * ack是个布尔值。如果生产者发送一个消息到交换机,交换机签收成功,返回true。否则返回false;
  32. *
  33. * ================================================================================
  34. *
  35. * cause:原因。ack返回false时,返回的具体原因
  36. *
  37. */
  38. String id = correlationData.getId();
  39. if(ack)
  40. {
  41. //消息投递成功
  42. System.out.println("消息投递成功:" + id);
  43. }
  44. else {
  45. //失败。存入到缓存中,通过定时任务,定时发送
  46. System.out.println("消息投递失败,原因" + cause);
  47. }
  48. }
  49. }

08e0d0c0516e34e6e432b7280ec6b59f.png

833a4c73f16cec1cec2c1728360ab767.png

发送消息:

  1. package org.example.sender;
  2. import org.springframework.amqp.rabbit.connection.CorrelationData;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. import java.util.UUID;
  7. /**
  8. * 消息生产者 发送消息
  9. */
  10. @Component
  11. public class MessageSender {
  12. @Autowired
  13. RabbitTemplate rabbitTemplate;
  14. /**
  15. * 发送消息
  16. * @param info
  17. */
  18. public void send(String info)
  19. {
  20. System.out.println("发送消息>>>"+info);
  21. CorrelationData correlationData = new CorrelationData();
  22. String uuid = UUID.randomUUID().toString();
  23. System.out.println(uuid);
  24. correlationData.setId(uuid);
  25. rabbitTemplate.convertAndSend("amqp-topic","huawei.a",info,correlationData);
  26. }
  27. }

f91595bf99866f9d97489b98b36e5f23.png

发送消息:

  1. package org.example.controller;
  2. import org.example.sender.MessageSender;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.RequestMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. /**
  7. * @Auther: moerhai@qq.com
  8. * @Date: 2020/10/4 11:34
  9. */
  10. @RestController
  11. public class IndexController {
  12. @Autowired
  13. MessageSender messageSender;
  14. @RequestMapping("/index")
  15. public String index()
  16. {
  17. messageSender.send("中国——路由——华为");
  18. return "SUCCESS";
  19. }
  20. }

72f4a59f1bbd6b7b0fc949788dcb961e.png

启动服务:

  1. package org.example;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class HelloWorldApplication
  6. {
  7. public static void main(String[] args)
  8. {
  9. SpringApplication.run(HelloWorldApplication.class, args);
  10. }
  11. }

eb23018543c7502e82b75133b14714d2.png

bffd8fce35def175661f9453322445e9.png

b662dd7993f382a39e4d40af3edf1428.png

===================================================================

消费者:

pom文件:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>org.example</groupId>
  7. <artifactId>springrmqtopicreceiver</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <maven.compiler.source>8</maven.compiler.source>
  11. <maven.compiler.target>8</maven.compiler.target>
  12. </properties>
  13. <parent>
  14. <groupId>org.springframework.boot</groupId>
  15. <artifactId>spring-boot-starter-parent</artifactId>
  16. <version>2.4.5</version>
  17. <relativePath/>
  18. </parent>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-web</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-amqp</artifactId>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.springframework.boot</groupId>
  30. <artifactId>spring-boot-devtools</artifactId>
  31. <scope>runtime</scope>
  32. <optional>true</optional>
  33. </dependency>
  34. </dependencies>
  35. </project>

aa51dea2648e439189bd5f962acb7469.png

yml文件:

  1. server:
  2. port: 8081
  3. spring:
  4. rabbitmq:
  5. host: 127.0.0.1
  6. port: 5672
  7. username: guest
  8. password: guest
  9. #关闭自动ack,设置为手动ack
  10. listener:
  11. simple:
  12. acknowledge-mode: manual

ee3961971eb2a02b34cec0e17e986f81.png

配置文件:

  1. package org.example.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.amqp.core.TopicExchange;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. /**
  9. * 主题交换机
  10. * topic策略可以根据routingKey的规则(通配符方式)进行去匹配队列进行转发规则为*.#.*
  11. */
  12. @Configuration
  13. public class RabbitTopicConfig {
  14. public final static String TOPIC_NAME = "amqp-topic";
  15. @Bean
  16. TopicExchange topicExchange(){
  17. return new TopicExchange(TOPIC_NAME,true,false);
  18. }
  19. @Bean
  20. Queue xiaomi(){
  21. return new Queue("xiaomi",true);
  22. }
  23. @Bean
  24. Queue huawei(){
  25. return new Queue("huawei",true);
  26. }
  27. @Bean
  28. Binding xiaomiBinding(){
  29. //xiaomi.#:表示消息的routingKey是以xiaomi开头的就会路由到xiaomi的队列
  30. return BindingBuilder.bind(xiaomi()).to(topicExchange()).with("xiaomi.#");
  31. }
  32. @Bean
  33. Binding huaweiBinding(){
  34. return BindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#");
  35. }
  36. }

b620c1d390a43c295e36cfe476cecaec.png

接收消息:

  1. package org.example.receiver;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.io.IOException;
  7. /**
  8. */
  9. @Component
  10. public class TopicReceiver {
  11. //分别监听名称为xiaomi、huawei的队列
  12. @RabbitListener(queues = "xiaomi")
  13. public void handlerXM(Message message,String msg, Channel channel) throws IOException {
  14. System.out.println("小米:"+msg);
  15. //手动签收,不启动批量签收
  16. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  17. System.out.println(message.getMessageProperties().getDeliveryTag());
  18. }
  19. @RabbitListener(queues = "huawei")
  20. public void handlerHW(Message message,String msg, Channel channel) throws IOException {
  21. System.out.println("华为:"+msg);
  22. //告诉rmq签收的消息的id。以及是否批量签收
  23. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  24. }
  25. }

2481963332b68c49f7dad9a6c730e15d.png

启动服务:

  1. package org.example;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class HelloWorldApplication
  6. {
  7. public static void main(String[] args)
  8. {
  9. SpringApplication.run(HelloWorldApplication.class, args);
  10. }
  11. }

87ef88fec0f13402a1302eb4dfb746d6.png

dacd073e15c43cbeb0c1a1a5a545ccdd.png

发表评论

表情:
评论列表 (有 0 条评论,17人围观)

还没有评论,来说两句吧...

相关阅读