RabbitMQ -springboot整合rabbitmq

Love The Way You Lie 2021-06-10 20:39 757阅读 0赞

课程:https://www.cnblogs.com/guchunchao/p/13173406.html

阅读目录

  • 配置生产者
  • 配置消费者

导入pom.xml

961ddebeb323a10fe0623af514929fc1.png

复制代码

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.3.1.RELEASE</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.everjiankang</groupId>
  12. <artifactId>rabbitmqDemo</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>rabbitmqDemo</name>
  15. <description>Demo project for Spring Boot</description>
  16. <properties>
  17. <java.version>1.8</java.version>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-web</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-devtools</artifactId>
  27. <scope>runtime</scope>
  28. <optional>true</optional>
  29. </dependency>
  30. <dependency>
  31. <groupId>org.springframework.boot</groupId>
  32. <artifactId>spring-boot-starter-amqp</artifactId>
  33. </dependency>
  34. <dependency>
  35. <groupId>com.rabbitmq</groupId>
  36. <artifactId>amqp-client</artifactId>
  37. </dependency>
  38. <dependency>
  39. <groupId>org.springframework.boot</groupId>
  40. <artifactId>spring-boot-starter-test</artifactId>
  41. <scope>test</scope>
  42. <exclusions>
  43. <exclusion>
  44. <groupId>org.junit.vintage</groupId>
  45. <artifactId>junit-vintage-engine</artifactId>
  46. </exclusion>
  47. </exclusions>
  48. </dependency>
  49. <!-- 导入配置文件处理器,配置文件进行绑定就会有提示 -->
  50. <dependency>
  51. <groupId>org.springframework.boot</groupId>
  52. <artifactId>spring-boot-configuration-processor</artifactId>
  53. </dependency>
  54. <dependency>
  55. <groupId>org.projectlombok</groupId>
  56. <artifactId>lombok</artifactId>
  57. <optional>true</optional>
  58. </dependency>
  59. <dependency>
  60. <groupId>com.alibaba</groupId>
  61. <artifactId>fastjson</artifactId>
  62. <version>1.2.71</version>
  63. </dependency>
  64. <dependency>
  65. <groupId>com.rabbitmq</groupId>
  66. <artifactId>amqp-client</artifactId>
  67. </dependency>
  68. </dependencies>
  69. <build>
  70. <plugins>
  71. <plugin>
  72. <groupId>org.springframework.boot</groupId>
  73. <artifactId>spring-boot-maven-plugin</artifactId>
  74. </plugin>
  75. </plugins>
  76. </build>
  77. </project>

复制代码

回到顶部

配置生产者

第一步:配置application.properties

复制代码

  1. spring.rabbitmq.addresses=127.0.0.1:5672
  2. spring.rabbitmq.username=xiaochao
  3. spring.rabbitmq.password=root
  4. spring.rabbitmq.virtual-host=/
  5. spring.rabbitmq.connection-timeout=15000
  6. #生产端配置
  7. #开启发送确认,此配置在Springboot2.3.0版本中已经@Deprecated了,默认就是
  8. # spring.rabbitmq.publisher-confirms=true
  9. #
  10. spring.rabbitmq.publisher-confirm-type=simple
  11. #开启发送失败退回
  12. spring.rabbitmq.publisher-returns=true
  13. #开启执行return回调
  14. spring.rabbitmq.template.mandatory=true

复制代码

第二步:开发启动类Application.java

复制代码

  1. package com.dwz.springboot;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class Application {
  6. public static void main(String[] args) {
  7. SpringApplication.run(Application.class, args);
  8. }
  9. }

复制代码

第三步:加入实体类order

复制代码

  1. package com.everjiankang.dependency.model;
  2. import java.io.Serializable;
  3. public class Order implements Serializable {
  4. /**
  5. *
  6. */
  7. private static final long serialVersionUID = 1L;
  8. private String id;
  9. private String name;
  10. public Order() {
  11. }
  12. public Order(String id, String name) {
  13. super();
  14. this.id = id;
  15. this.name = name;
  16. }
  17. public String getId() {
  18. return id;
  19. }
  20. public void setId(String id) {
  21. this.id = id;
  22. }
  23. public String getName() {
  24. return name;
  25. }
  26. public void setName(String name) {
  27. this.name = name;
  28. }
  29. }

复制代码

第四步:开发消息发送方法

  (需要先手动创建交换器、队列、绑定关系。或者通过代码Build)

复制代码

  1. package com.everjiankang.dependency.springboot.producer;
  2. import java.util.Map;
  3. import org.springframework.amqp.rabbit.connection.CorrelationData;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.messaging.Message;
  9. import org.springframework.messaging.MessageHeaders;
  10. import org.springframework.messaging.support.MessageBuilder;
  11. import org.springframework.stereotype.Component;
  12. import com.everjiankang.dependency.model.Order;
  13. @Component
  14. public class RabbitmqSender {
  15. @Autowired
  16. private RabbitTemplate rabbitTemplate;
  17. /**
  18. * publisher-confirms,实现一个监听器用于监听broker端给我们返回的确认请求:
  19. * RabbitTemplate.ConfirmCallback
  20. *
  21. * publisher-returns,保证消息对broker端是可达的,如果出现路由键不可达的情况,
  22. * 则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功:
  23. * RabbitTemplate.ReturnCallback
  24. *
  25. * 注意:在发送消息的时候对template配置mandatory=true保证ReturnCallback监听有效
  26. * 生产端还可以配置其他属性,比如发送重试,超时时间、次数、间隔等。
  27. */
  28. final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
  29. @Override
  30. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  31. System.err.println("confirm correlationData:" + correlationData);
  32. System.err.println("confirm ack:" + ack);
  33. if(!ack) {
  34. System.err.println("异常处理。。。");
  35. }
  36. }
  37. };
  38. final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
  39. @Override
  40. public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
  41. String exchange, String routingKey) {
  42. System.err.println("return exchange:" + exchange);
  43. System.err.println("return routingKey:" + routingKey);
  44. System.err.println("return replyCode:" + replyCode);
  45. System.err.println("return replyText:" + replyText);
  46. }
  47. };
  48. public void send(Object message, Map<String, Object> properties) throws Exception {
  49. MessageHeaders mhs = new MessageHeaders(properties);
  50. Message msg = MessageBuilder.createMessage(message, mhs);
  51. rabbitTemplate.setConfirmCallback(confirmCallback);
  52. rabbitTemplate.setReturnCallback(returnCallback);
  53. CorrelationData correlationData = new CorrelationData();
  54. //实际消息的唯一id
  55. correlationData.setId("dwz123456");//id + 时间戳 (必须是全局唯一的)
  56. rabbitTemplate.convertAndSend("exchange-1", "spring.abc", msg, correlationData);
  57. }
  58. public void sendOrder(Order order) throws Exception {
  59. rabbitTemplate.setConfirmCallback(confirmCallback);
  60. rabbitTemplate.setReturnCallback(returnCallback);
  61. CorrelationData correlationData = new CorrelationData();
  62. correlationData.setId("zheaven123456");//id + 时间戳 (必须是全局唯一的)
  63. rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
  64. }
  65. }

复制代码

第五步:测试发送消息

复制代码

  1. package com.dwz.springboot;
  2. import java.text.SimpleDateFormat;
  3. import java.util.Date;
  4. import java.util.HashMap;
  5. import java.util.Map;
  6. import org.junit.Test;
  7. import org.junit.runner.RunWith;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.boot.test.context.SpringBootTest;
  10. import org.springframework.test.context.junit4.SpringRunner;
  11. import com.dwz.springboot.entity.Order;
  12. import com.dwz.springboot.producer.RabbitSender;
  13. @RunWith(SpringRunner.class)
  14. @SpringBootTest
  15. public class SpringbootTest {
  16. @Autowired
  17. private RabbitSender send;
  18. private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
  19. @Test
  20. public void testSender1() throws Exception {
  21. Map<String, Object> properties = new HashMap<String, Object>();
  22. properties.put("number", "12345");
  23. properties.put("send_time", simpleDateFormat.format(new Date()));
  24. send.send("Hello Rabbirmq For springboot!", properties);
  25. }
  26. @Test
  27. public void testSender2() throws Exception {
  28. Order order = new Order("001", "第一个订单");
  29. send.sendOrder(order);
  30. }
  31. }

复制代码

回到顶部

配置消费者

第一步:配置application.properties

复制代码

复制代码

  1. spring.rabbitmq.addresses=127.0.0.1:5672
  2. spring.rabbitmq.username=xiaochao
  3. spring.rabbitmq.password=root
  4. spring.rabbitmq.virtual-host=/vhost_dwz
  5. spring.rabbitmq.connection-timeout=15000
  6. #消费端配置
  7. #配置手工确认模式,用于ACK的手工处理,这样我们可以保证消息的可靠性送达,或者在消费端消费失败的时候可以做到重回队列、根据业务记录日志等处理
  8. spring.rabbitmq.listener.simple.acknowledge-mode=manual
  9. #可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况
  10. spring.rabbitmq.listener.simple.concurrency=5
  11. spring.rabbitmq.listener.simple.max-concurrency=10
  12. #消费端binding配置 ,以订单为例
  13. spring.rabbitmq.listener.order.queue.name=queue-2
  14. spring.rabbitmq.listener.order.queue.durable=true
  15. spring.rabbitmq.listener.order.exchange.name=exchange-2
  16. spring.rabbitmq.listener.order.exchange.durable=true
  17. spring.rabbitmq.listener.order.exchange.type=topic
  18. spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
  19. spring.rabbitmq.listener.order.key=springboot.*

复制代码

复制代码

第二步:开发启动类Application.java

复制代码

  1. package com.dwz.springboot;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class Application {
  6. public static void main(String[] args) {
  7. SpringApplication.run(Application.class, args);
  8. }
  9. }

复制代码

第三步:添加主配置类

复制代码

  1. package com.dwz.springboot;
  2. import org.springframework.context.annotation.ComponentScan;
  3. import org.springframework.context.annotation.Configuration;
  4. @Configuration
  5. @ComponentScan({"com.dwz.springboot.*"})
  6. public class MainConfig {
  7. }

复制代码

第四步:添加消息处理方法

复制代码

  1. package com.dwz.springboot.consumer;
  2. import java.util.Map;
  3. import org.springframework.amqp.rabbit.annotation.Exchange;
  4. import org.springframework.amqp.rabbit.annotation.Queue;
  5. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  6. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  8. import org.springframework.amqp.support.AmqpHeaders;
  9. import org.springframework.messaging.Message;
  10. import org.springframework.messaging.handler.annotation.Headers;
  11. import org.springframework.messaging.handler.annotation.Payload;
  12. import org.springframework.stereotype.Component;
  13. import com.dwz.springboot.entity.Order;
  14. import com.rabbitmq.client.Channel;
  15. @Component
  16. public class RabbitReceiver {
  17. //此配置可以直接创建Exchange、Queue、Binding
  18. @RabbitListener(bindings = @QueueBinding(
  19. value = @Queue(
  20. name = "queue-1",
  21. durable = "true"
  22. ),
  23. exchange = @Exchange(
  24. name = "exchange-1",
  25. durable = "true",
  26. type = "topic",
  27. ignoreDeclarationExceptions = "true"
  28. ),
  29. key = "springboot.*"
  30. )
  31. )
  32. @RabbitHandler
  33. public void onMessage(Message message, Channel channel) throws Exception {
  34. System.err.println("------------------------------------");
  35. System.err.println("消费端Payload:" + message.getPayload());
  36. Long deliverytag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
  37. //手工ACK
  38. channel.basicAck(deliverytag, false);
  39. }
  40. @RabbitListener(bindings = @QueueBinding(
  41. value = @Queue(
  42. name = "${spring.rabbitmq.listener.order.queue.name}",
  43. durable = "${spring.rabbitmq.listener.order.queue.durable}"
  44. ),
  45. exchange = @Exchange(
  46. name = "${spring.rabbitmq.listener.order.exchange.name}",
  47. durable = "${spring.rabbitmq.listener.order.exchange.durable}",
  48. type = "${spring.rabbitmq.listener.order.exchange.type}",
  49. ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"
  50. ),
  51. key = "${spring.rabbitmq.listener.order.key}"
  52. )
  53. )
  54. @RabbitHandler
  55. //将Message用@Payload和@Headers拆分
  56. //@Payload表示body里面的信息,@Headers表示properties的信息
  57. public void onOrderMessage(@Payload Order order, Channel channel,
  58. @Headers Map<String, Object> headers) throws Exception {
  59. System.err.println("------------------------------------");
  60. System.err.println("消费端Order:" + order.getId());
  61. Long deliverytag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
  62. //手工ACK
  63. channel.basicAck(deliverytag, false);
  64. }
  65. }

复制代码

参考blog:

https://www.cnblogs.com/zheaven/p/11915480.html

参考视频:

https://www.bilibili.com/video/BV1Fz411B77A?p=34

发表评论

表情:
评论列表 (有 0 条评论,757人围观)

还没有评论,来说两句吧...

相关阅读

    相关 SpringBoot整合RabbitMQ

    一 RabbitMQ的介绍     RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apac