Spring Boot文档阅读笔记-对Messaging with RabbitMQ解析

野性酷女 2022-12-24 04:56 212阅读 0赞

此篇教程以Rabbitmq作为消息队列服务端,使用Spring Boot产生和发布消息。

使用Spring AMQP的RabbitTemplate发布消息,使用MessageListenerAdapter订阅消息。

其中对应的Maven如下:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.4.0</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>cn.it1995</groupId>
  12. <artifactId>demo</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>demo</name>
  15. <description>Demo project for Spring Boot</description>
  16. <properties>
  17. <java.version>1.8</java.version>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-web</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.projectlombok</groupId>
  26. <artifactId>lombok</artifactId>
  27. <optional>true</optional>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.springframework.boot</groupId>
  31. <artifactId>spring-boot-starter-test</artifactId>
  32. <scope>test</scope>
  33. </dependency>
  34. <dependency>
  35. <groupId>org.springframework.amqp</groupId>
  36. <artifactId>spring-rabbit-test</artifactId>
  37. <scope>test</scope>
  38. </dependency>
  39. <dependency>
  40. <groupId>org.springframework.amqp</groupId>
  41. <artifactId>spring-amqp</artifactId>
  42. <version>2.3.1</version>
  43. <scope>compile</scope>
  44. </dependency>
  45. <dependency>
  46. <groupId>org.springframework.amqp</groupId>
  47. <artifactId>spring-rabbit</artifactId>
  48. <version>2.3.1</version>
  49. <scope>compile</scope>
  50. </dependency>
  51. </dependencies>
  52. <build>
  53. <plugins>
  54. <plugin>
  55. <groupId>org.springframework.boot</groupId>
  56. <artifactId>spring-boot-maven-plugin</artifactId>
  57. </plugin>
  58. </plugins>
  59. </build>
  60. </project>

下面是创建接受者,获取发布者的发布的消息:

  1. package cn.it1995.demo;
  2. import org.springframework.stereotype.Component;
  3. import java.util.concurrent.CountDownLatch;
  4. @Component
  5. public class Receiver {
  6. private CountDownLatch latch = new CountDownLatch(1);
  7. public void receiveMessage(String message){
  8. System.out.println("Received <" + message + ">");
  9. latch.countDown();
  10. }
  11. public CountDownLatch getLatch(){
  12. return latch;
  13. }
  14. }

这个Receiver是POJO类,这里的POJO是指没有带有业务处理的类,当注册后,就能在任意的地方进使用。

CountDownLatch,里面会有一个接收消息的信号。

注册监听者以及发送消息

使用Spring AMQP的RabbitTemplate提供的函数用于接收消息,逻辑如下:

  1. 配置消息监听者容器;

2.声明队列,交换机对其都进行绑定;

3.配置发送者组建,使得监听者能接收到。

代码如下:

  1. package cn.it1995.demo;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.amqp.core.TopicExchange;
  6. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  7. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
  8. import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
  9. import org.springframework.boot.SpringApplication;
  10. import org.springframework.boot.autoconfigure.SpringBootApplication;
  11. import org.springframework.context.annotation.Bean;
  12. @SpringBootApplication
  13. public class DemoApplication {
  14. static final String topicExchangeName = "spring-boot-exchange";
  15. static final String queueName = "spring-boot";
  16. @Bean
  17. Queue queue(){
  18. return new Queue(queueName, false);
  19. }
  20. @Bean
  21. TopicExchange exchange(){
  22. return new TopicExchange(topicExchangeName);
  23. }
  24. @Bean
  25. Binding binding(Queue queue, TopicExchange exchange){
  26. return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
  27. }
  28. @Bean
  29. SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
  30. MessageListenerAdapter listenerAdapter){
  31. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
  32. container.setConnectionFactory(connectionFactory);
  33. container.setQueueNames(queueName);
  34. container.setMessageListener(listenerAdapter);
  35. return container;
  36. }
  37. @Bean
  38. MessageListenerAdapter listenerAdapter(Receiver receiver){
  39. return new MessageListenerAdapter(receiver, "receiveMessage");
  40. }
  41. public static void main(String[] args) {
  42. SpringApplication.run(DemoApplication.class, args);
  43. }
  44. }

listenerAdapter()方法将消息监听者注册到容器中(默认是在container())。

queue()方法创建了AMQP的队列。exchange()方法创建了topic交换机。

binding()方法将其进行绑定。

发送测试数据到Rabbitmq

  1. package cn.it1995.demo;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.boot.CommandLineRunner;
  4. import org.springframework.stereotype.Component;
  5. import java.util.concurrent.TimeUnit;
  6. @Component
  7. public class Runner implements CommandLineRunner {
  8. private final RabbitTemplate rabbitTemplate;
  9. private final Receiver receiver;
  10. public Runner(Receiver receiver, RabbitTemplate rabbitTemplate){
  11. this.receiver = receiver;
  12. this.rabbitTemplate = rabbitTemplate;
  13. }
  14. @Override
  15. public void run(String... args) throws Exception {
  16. System.out.println("Sending message ...");
  17. rabbitTemplate.convertAndSend(DemoApplication.topicExchangeName, "foo.bar.baz", "Hello from RabbitMQ!");
  18. receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
  19. }
  20. }

项目的application.properties如下:

  1. spring.rabbitmq.host=122.xxx.xxx.xxx
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=xxxx
  4. spring.rabbitmq.password=xxxxxxx
  5. spring.rabbitmq.virtual-host=/xxxxx

程序运行截图如下:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxNzg0NDI3NjE_size_16_color_FFFFFF_t_70

源码打包下载地址:

https://github.com/fengfanchen/Java/tree/master/SpringBootRabbitmq

发表评论

表情:
评论列表 (有 0 条评论,212人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Spring Boot文档阅读

    原因之初 最初习惯百度各种博客教程,然后跟着操作,因为觉得跟着别人走过的路走可以少走很多弯路,省时间。然而,很多博客的内容并不够完整,甚至错误,看多了的博客甚至有千篇一律的感