Spring Boot异步消息之AMQP讲解及实战(附源码)

我就是我 2023-09-30 16:18 63阅读 0赞

觉得有帮助请点赞关注收藏~~~

AMQP(高级消息队列协议)是一个提供统一消息服务的应用层标准高级消息队列协议。是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可 传递消息,并不受客户端/中间件的不同产品,不同开发语言等条件的限制。

下面实现主要用RabbitMQ讲解AMQP实例,因此需要事先安装RabbitMQ和erlang语言

erlang下载地址 https://www.erlang.org/downloads

RabbitMQ下载地址 https://www.rabbitmq.com/download.html

使用RabbitMQ实现发布/订阅异步消息模式

1:创建发布者应用ch8_2Sender

2:在pom.xml文件中添加依赖

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. -<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0">
  3. <modelVersion>4.0.0</modelVersion>
  4. -<parent>
  5. <groupId>org.springframework.boot</groupId>
  6. <artifactId>spring-boot-starter-parent</artifactId>
  7. <version>2.1.8.RELEASE</version>
  8. <relativePath/>
  9. <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.ch</groupId>
  12. <artifactId>ch8_2Sender</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>ch8_2Sender</name>
  15. <description>Demo project for Spring Boot</description>
  16. +<properties>
  17. -<dependencies>
  18. -<dependency>
  19. <groupId>org.springframework.boot</groupId>
  20. <artifactId>spring-boot-starter-amqp</artifactId>
  21. </dependency>
  22. +<dependency>
  23. -<dependency>
  24. <groupId>org.springframework.boot</groupId>
  25. <artifactId>spring-boot-starter-test</artifactId>
  26. <scope>test</scope>
  27. </dependency>
  28. -<dependency>
  29. <groupId>org.springframework.amqp</groupId>
  30. <artifactId>spring-rabbit-test</artifactId>
  31. <scope>test</scope>
  32. </dependency>
  33. </dependencies>
  34. -<build>
  35. -<plugins>
  36. -<plugin>
  37. <groupId>org.springframework.boot</groupId>
  38. <artifactId>spring-boot-maven-plugin</artifactId>
  39. </plugin>
  40. </plugins>
  41. </build>
  42. </project>

3:创建Weather实体类

  1. package com.ch.ch8_2Sender.entity;
  2. import java.io.Serializable;
  3. public class Weather implements Serializable{
  4. private static final long serialVersionUID = -8221467966772683998L;
  5. private String id;
  6. private String city;
  7. private String weatherDetail;
  8. public String getCity() {
  9. return city;
  10. }
  11. public void setCity(String city) {
  12. this.city = city;
  13. }
  14. public String getWeatherDetail() {
  15. return weatherDetail;
  16. }
  17. public void setWeatherDetail(String weatherDetail) {
  18. this.weatherDetail = weatherDetail;
  19. }
  20. public String getId() {
  21. return id;
  22. }
  23. public void setId(String id) {
  24. this.id = id;
  25. }
  26. @Override
  27. public String toString() {
  28. return "Weather [id=" + id + ", city=" + city + ", weatherDetail=" + weatherDetail + "]";
  29. }
  30. }

4:重写Ch82SenderApplication主类

  1. package com.ch.ch8_2Sender;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.core.MessageBuilder;
  4. import org.springframework.amqp.core.MessageDeliveryMode;
  5. import org.springframework.amqp.rabbit.connection.CorrelationData;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.boot.CommandLineRunner;
  10. import org.springframework.boot.SpringApplication;
  11. import org.springframework.boot.autoconfigure.SpringBootApplication;
  12. import com.ch.ch8_2Sender.entity.Weather;
  13. import com.fasterxml.jackson.databind.ObjectMapper;
  14. @SpringBootApplication
  15. public class Ch82SenderApplication implements CommandLineRunner{
  16. @Autowired
  17. private ObjectMapper objectMapper;
  18. @Autowired
  19. RabbitTemplate rabbitTemplate;
  20. public static void main(String[] args) {
  21. SpringApplication.run(Ch82SenderApplication.class, args);
  22. }
  23. /**
  24. * 定义发布者
  25. */
  26. @Override
  27. public void run(String... args) throws Exception {
  28. //定义消息对象
  29. Weather weather = new Weather();
  30. weather.setId("010");
  31. weather.setCity("北京");
  32. weather.setWeatherDetail("今天晴到多云,南风5-6级,温度19-26°C");
  33. //指定Json转换器,Jackson2JsonMessageConverter默认将消息转换成byte[]类型的消息
  34. rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
  35. //objectMapper将weather对象转换为JSON字节数组
  36. Message msg=MessageBuilder.withBody(objectMapper.writeValueAsBytes(weather))
  37. .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
  38. .build();
  39. // 消息唯一ID
  40. CorrelationData correlationData = new CorrelationData(weather.getId());
  41. //使用已封装好的convertAndSend(String exchange , String routingKey , Object message, CorrelationData correlationData)
  42. //将特定的路由key发送消息到指定的交换机
  43. rabbitTemplate.convertAndSend(
  44. "weather-exchange", //分发消息的交换机名称
  45. "weather.message", //用来匹配消息的路由Key
  46. msg, //消息体
  47. correlationData);
  48. }
  49. }

5:创建订阅者应用ch8_2Receiver-1

  1. package com.ch.ch8_2Receiver1;
  2. import org.springframework.amqp.rabbit.annotation.Exchange;
  3. import org.springframework.amqp.rabbit.annotation.Queue;
  4. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  5. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.messaging.handler.annotation.Payload;
  9. import org.springframework.stereotype.Component;
  10. import com.fasterxml.jackson.databind.ObjectMapper;
  11. /**
  12. * 定义订阅者Receiver1
  13. */
  14. @Component
  15. public class Receiver1 {
  16. @Autowired
  17. private ObjectMapper objectMapper;
  18. @RabbitListener(
  19. bindings =
  20. @QueueBinding(
  21. //队列名weather-queue1保证和别的订阅者不一样
  22. value = @Queue(value = "weather-queue1",durable = "true"),
  23. //weather-exchange与发布者的交换机名相同
  24. exchange = @Exchange(value = "weather-exchange",durable = "true",type = "topic"),
  25. //weather.message与发布者的消息的路由Key相同
  26. key = "weather.message"
  27. )
  28. )
  29. @RabbitHandler
  30. public void receiveWeather(@Payload byte[] weatherMessage)throws Exception{
  31. System.out.println("-----------订阅者Receiver1接收到消息--------");
  32. //将JSON字节数组转换为Weather对象
  33. Weather w=objectMapper.readValue(weatherMessage, Weather.class);
  34. System.out.println("Receiver1收到的消息内容:"+w);
  35. }
  36. }

6:创建订阅者应用ch8_2Receiver-2

  1. package com.ch.ch8_2Receiver1;
  2. import org.springframework.amqp.rabbit.annotation.Exchange;
  3. import org.springframework.amqp.rabbit.annotation.Queue;
  4. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  5. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.messaging.handler.annotation.Payload;
  9. import org.springframework.stereotype.Component;
  10. import com.fasterxml.jackson.databind.ObjectMapper;
  11. /**
  12. * 定义订阅者Receiver2
  13. */
  14. @Component
  15. public class Receiver2 {
  16. @Autowired
  17. private ObjectMapper objectMapper;
  18. @RabbitListener(
  19. bindings =
  20. @QueueBinding(
  21. //队列名weather-queue2保证和别的订阅者不一样
  22. value = @Queue(value = "weather-queue2",durable = "true"),
  23. //weather-exchange与发布者的交换机名相同
  24. exchange = @Exchange(value = "weather-exchange",durable = "true",type = "topic"),
  25. //weather.message与发布者的消息的路由Key相同
  26. key = "weather.message"
  27. )
  28. )
  29. @RabbitHandler
  30. public void receiveWeather(@Payload byte[] weatherMessage)throws Exception{
  31. System.out.println("-----------订阅者Receiver2接收到消息--------");
  32. Weather w=objectMapper.readValue(weatherMessage, Weather.class);
  33. //将JSON字节数组转换为Weather对象
  34. System.out.println("Receiver2收到的消息内容:"+w);
  35. }
  36. }

接下来分别运行发布者和订阅者的主类即可,发现一个发布者发布的消息可以被多个订阅者订阅。

发表评论

表情:
评论列表 (有 0 条评论,63人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Spring消息JMS与AMQP

    异步消息是应用程序之间通用的交流方式。异步消息通信与同步RPC相比有几个优点。间接通信带来了应用之间的松散耦合,因此减轻了其中任意一个应用崩溃所带来的影响。此外,因为消息转发给