Spring Cloud Bus(消息总线)(1)

红太狼 2022-05-09 08:58 333阅读 0赞

消息代理

消息代理是一种消息验证、传输、路由的架构模式。它在应用程序之间起到通信调度并最小化应用之间的依赖作用,使得应用程序可以高效地解耦通信过程。消息代理是一个中间件产品,它的核心是一个消息的路由程序,用来实现接受和分发消息,并根据设定好的消息处理流来转发给正确的应用。它包括独立的通信和消息传递协议,能够实现组织内部和组织间的网络通信。常用的场景有:

1.将消息路由到一个或多个目的地。

2.消息转化为其他的表现方式。

3.执行消息的聚集、消息的分解,并将结果发送到它们的目的地,然后重新组合响应返回给消息用户。

4.调用Web服务来检索数据。

5.响应事件或错误。

6.使用发布订阅模式来提供内容或基于主题的消息路由。

现成的有:

ActiveMQ

Kafka

RabbitMQ

RocketMQ

……

当前版本的Spring Cloud Bus仅支持RibbitMQ和Kafka。

RabbitMQ

基本概念

Broker:可以理解为消息队列服务器的实体,他是一个中间件应用,负载接受消息生产者的消息,然后将消息发送至消息接受者或者其他的Broker。

Exchange:消息交换机,是消息第一个到达的地方,消息通过它指定路由规则,分发到不同的消息队列中。

Queue:消息队列,消息通过发送和路由之后最终到达的地方,到达Queue的消息即进入逻辑上等待消费的状态。每个消息都会被发送到一个或多个队列。

Binding:绑定,它的作用就是把Exchange和Queue按照路由规则绑定起来,也就是Exchange和Queue之间的虚拟连接。

Routing Key:路由关键字,Exchange根据这个关键字进行消息投递。

Virtual host:虚拟主机,它是对Broker的虚拟划分,将消费者、生产者和它们依赖的AMQP相关结构进行隔离,一般都是为了安全考虑。比如在Broker中设置多个虚拟主机,对不同用户进行权限的分离。

Connection:连接,代表生产者、消费者、Broker之间进行通信的物理网络。

Channel:消息通道,用于连接生产者、消费者、Broker之间进行通信的物理网络。

Producer:消息生产者,制造消息并发送消息的程序。

Consumer:消息消费者,接受消息并处理消息的程序。

消息投递到队列的过程如下:

1.客户端连接到消息队列服务器,打开一个Channel。

2.客户端声明一个Exchange,并设置相关属性。

3.客户端声明一个Queue,并设置相关属性。

4.客户端使用Routing Key,在Exchange和Queue之间建立好绑定关系。

5.客户端投递消息到Exchange。

6.Exchange接受到消息后,根据消息的Key和已设置的Binding,进行消息路由,将消息投递到一个或多个Queue里。

Exchange有三种类型:

1.Direct交换机:完全根据Key进行投递。比如绑定时设置Routing Key为abc,那么客户端提交信息时,只有设置了Key为abc才会被投递到队列。

2.Topic交换机:对Key进行模式匹配后进行投递,可以使用#匹配一个或多个词,符号*匹配正好一个词。

3.Fanout交换机:不需要任何Key,它采取广播的模式,一个消息进来时,投递到与该交换机绑定的所有队列。

SpringBoot整合RabbitMQ

(在此之前需要先行安装RabbitMQ,安装:RabbitMQ安装)

1.新建一个Spring Boot工程,命名为rabbitmq-hello。

2.在pom.xml中引入spring-boot-starter-amqp支持RabbitMQ。

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <groupId>com.example</groupId>
  6. <artifactId>rabbitmq-hello</artifactId>
  7. <version>0.0.1-SNAPSHOT</version>
  8. <packaging>jar</packaging>
  9. <name>rabbitmq-hello</name>
  10. <description>Demo project for Spring Boot</description>
  11. <parent>
  12. <groupId>org.springframework.boot</groupId>
  13. <artifactId>spring-boot-starter-parent</artifactId>
  14. <version>2.0.5.RELEASE</version>
  15. <relativePath/> <!-- lookup parent from repository -->
  16. </parent>
  17. <properties>
  18. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  19. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  20. <java.version>1.8</java.version>
  21. </properties>
  22. <dependencies>
  23. <dependency>
  24. <groupId>org.springframework.boot</groupId>
  25. <artifactId>spring-boot-starter</artifactId>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.springframework.boot</groupId>
  29. <artifactId>spring-boot-starter-test</artifactId>
  30. <scope>test</scope>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.springframework.boot</groupId>
  34. <artifactId>spring-boot-starter-amqp</artifactId>
  35. </dependency>
  36. </dependencies>
  37. <build>
  38. <plugins>
  39. <plugin>
  40. <groupId>org.springframework.boot</groupId>
  41. <artifactId>spring-boot-maven-plugin</artifactId>
  42. </plugin>
  43. </plugins>
  44. </build>
  45. </project>

3.在application.properties中配置关于RabbitMQ的连接信息和用户信息。

  1. spring.application.name=rabbitmq-hello
  2. spring.rabbitmq.host=localhost
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=SpringCloud
  5. spring.rabbitmq.password=123456789

4.创建消息生产者,通过注入AmqpTemplate接口的实例来实现消息的发送,AmqpTemplate接口定义了一套针对AMQP协议的基础操作。

  1. @Component
  2. public class Sender {
  3. @Autowired
  4. private AmqpTemplate amqpTemplate;
  5. public void send(){
  6. String context="hello "+new Date();
  7. System.out.println();
  8. System.out.println("--------------------------------------------------------");
  9. System.out.println("Sender:"+context);
  10. System.out.println("--------------------------------------------------------");
  11. System.out.println();
  12. amqpTemplate.convertAndSend("hello",context);
  13. }
  14. }

5.创建消息消费者Receiver。通过@RabbitListener注解定义该类对hello队列的监听,并用@RabbitHandler注解来指定对消息的处理方法:

  1. @Component
  2. @RabbitListener(queues = "hello")
  3. public class Receiver {
  4. @RabbitHandler
  5. public void process(String hello){
  6. System.out.println();
  7. System.out.println("--------------------------------------------------------");
  8. System.out.println("Receiver:"+hello);
  9. System.out.println("--------------------------------------------------------");
  10. System.out.println();
  11. }
  12. }

创建RabbitMQ的配置类RabbitConfig用来配置队列,交换器路由等信息:

  1. @Configuration
  2. public class RabbitConfig {
  3. @Bean
  4. public Queue helloQueue(){
  5. return new Queue("hello");
  6. }
  7. }

最后的目录结构如下:

70

下面创建一个单元测试类用于调用消息生产:

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. public class RabbitmqHelloApplicationTests {
  4. @Autowired
  5. private Sender sender;
  6. @Test
  7. public void contextLoads() {
  8. }
  9. @Test
  10. public void hello(){
  11. sender.send();
  12. }
  13. }

启动应用主类后:

70 1

70 2

70 3

运行单元测试类:

70 4

可以看到它发送了一条队列消息。

然后转到应用主类的控制台,可以看到消息消费者对hello队列的监听程序执行了并输出接收到的消息。

70 5

Ok一个简单的RabbitMQ与Spring Boot的程序就简单整合了。

整合Spring Cloud Bus

1.在config-server和config-service中新增:

  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-bus-amqp</artifactId>
  4. <version>2.0.0.RELEASE</version>
  5. </dependency>

在配置文件中加上配置信息:

  1. spring.rabbitmq.host=localhost
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=SpringCloud
  4. spring.rabbitmq.password=123456789
  5. management.endpoints.web.exposure.include=*

然后修改github上的文件后访问:http://localhost:7001/bus/refresh

它会对文件进行更新:

70 6

70 7

然后再访问客户端就可以发现配置已经更新了。

70 8

如果需要服务端对某个客户端单个刷新则需要这么访问:http://localhost:7001/bus/refresh?destination=configClient:7002

destination参数除了可以定位具体的实例之外还能用来定位具体的服务。如:/bus/refresh?destination=customers:**,该请求会触发customers服务的所有实例进行刷新。

参考《Spring Cloud 微服务实战》

发表评论

表情:
评论列表 (有 0 条评论,333人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Spring Cloud Bus(消息总线)(1

    消息代理 消息代理是一种消息验证、传输、路由的架构模式。它在应用程序之间起到通信调度并最小化应用之间的依赖作用,使得应用程序可以高效地解耦通信过程。消息代理是一个中间件产