消息总线SpringCloud Bus

迈不过友情╰ 2022-05-08 16:12 421阅读 0赞

消息总线SpringCloud Bus

文章目录

    • 消息总线SpringCloud Bus
      • RabbitMQ 实现消息总线
        • 快速入门
        • 整合 Spring Cloud Bus
      • kafka 实现消息总线
        • 快速入门

RabbitMQ 实现消息总线

快速入门

新建 rabbitmq-hello 项目

pom 依赖

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4. <groupId>com.zk.springcloud.rabbitmq</groupId>
  5. <artifactId>springcloud-rabbitmq</artifactId>
  6. <version>0.0.1-SNAPSHOT</version>
  7. <packaging>jar</packaging>
  8. <name>springcloud-rabbitmq</name>
  9. <description>Demo project for Spring Boot</description>
  10. <parent>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-starter-parent</artifactId>
  13. <version>2.0.5.RELEASE</version>
  14. <relativePath/> <!-- lookup parent from repository -->
  15. </parent>
  16. <properties>
  17. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  18. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  19. <java.version>1.8</java.version>
  20. </properties>
  21. <dependencies>
  22. <dependency>
  23. <groupId>org.springframework.boot</groupId>
  24. <artifactId>spring-boot-starter-amqp</artifactId>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.springframework.boot</groupId>
  28. <artifactId>spring-boot-starter-test</artifactId>
  29. <scope>test</scope>
  30. </dependency>
  31. </dependencies>
  32. <build>
  33. <plugins>
  34. <plugin>
  35. <groupId>org.springframework.boot</groupId>
  36. <artifactId>spring-boot-maven-plugin</artifactId>
  37. </plugin>
  38. </plugins>
  39. </build>
  40. </project>

application.properties

  1. spring.application.name=rabbitmq-hello
  2. spring.rabbitmq.host=localhost
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=springcloud
  5. spring.rabbitmq.password=123456

创建消息生产者

  1. package com.zk.springcloud.rabbitmq;
  2. import org.springframework.amqp.core.AmqpTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. import java.util.Date;
  6. @Component
  7. public class Sender {
  8. @Autowired
  9. AmqpTemplate rabbitTemplate;
  10. public void send(){
  11. String context = "hello " + new Date();
  12. System.out.println("Sender : " + context);
  13. this.rabbitTemplate.convertAndSend("hello",context);
  14. }
  15. }

创建消息消费者

  1. package com.zk.springcloud.rabbitmq;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. @RabbitListener(queues = "hello")
  7. public class Receiver {
  8. @RabbitHandler
  9. public void process(String hello) {
  10. System.out.println("Receiver : " + hello);
  11. }
  12. }

创建 RabbitMQ 的配置类 RabbitConfig

  1. package com.zk.springcloud.rabbitmq;
  2. import org.springframework.amqp.core.Queue;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. @Configuration
  6. public class RabbitConfig {
  7. @Bean
  8. public Queue helloQueue(){
  9. return new Queue("hello");
  10. }
  11. }

应用主类

  1. import org.springframework.boot.SpringApplication;
  2. import org.springframework.boot.autoconfigure.SpringBootApplication;
  3. @SpringBootApplication
  4. public class SpringcloudRabbitmqApplication {
  5. public static void main(String[] args) {
  6. SpringApplication.run(SpringcloudRabbitmqApplication.class, args);
  7. }
  8. }

单元测试类,用来调用消息生产

  1. package com.zk.springcloud.rabbitmq;
  2. import org.junit.Test;
  3. import org.junit.runner.RunWith;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. import org.springframework.test.context.junit4.SpringRunner;
  7. @RunWith(SpringRunner.class)
  8. @SpringBootTest
  9. public class SpringcloudRabbitmqApplicationTests {
  10. @Autowired
  11. private Sender sender;
  12. @Test
  13. public void contextLoads() {
  14. sender.send();
  15. }
  16. }

启动应用主类,在控制台看到如下内容:

  1. 2018-10-15 10:59:17.592 INFO 30292 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#32c726ee:0/SimpleConnection@54c28ee5 [delegate=amqp://springcloud@127.0.0.1:5672/, localPort= 46069]

同时,通过 RabbitMQ 控制面板,看到 Connections 和 Channels 中包含当前连接的条目

运行单元测试类,在控制台输出如下内容:

  1. Sender : hello Mon Oct 15 11:10:36 CST 2018

在应用主类的控制台,输出如下内容:

  1. Receiver : hello Mon Oct 15 11:10:36 CST 2018

整合 Spring Cloud Bus

也可以将 SpringCloudBus 放在 config 9012 进行配置和刷新配置

config-client 9013 项目添加依赖,用来刷新端点

  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-bus-amqp</artifactId>
  4. </dependency>

在配置文件中添加关于 RabbitMQ 的链接和用户信息

  1. spring.rabbitmq.host=localhost
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=springcloud
  4. spring.rabbitmq.password=123456

启动 config 9012 再启动 config-client 9013 ,可以在 config-client 控制台看到多了 /bus/refresh 请求

  1. 2018-10-15 11:16:35.060 INFO 32424 --- [ main] s.b.a.e.w.s.WebMvcEndpointHandlerMapping : Mapped "{[/actuator/bus-refresh/{destination}],methods=[POST]}" onto public java.lang.Object org.springframework.boot.actuate.endpoint.web.servlet.AbstractWebMvcEndpointHandlerMapping$OperationHandler.handle(javax.servlet.http.HttpServletRequest,java.util.Map<java.lang.String, java.lang.String>)
  2. 2018-10-15 11:16:35.060 INFO 32424 --- [ main] s.b.a.e.w.s.WebMvcEndpointHandlerMapping : Mapped "{[/actuator/bus-refresh],methods=[POST]}" onto public java.lang.Object
  • 访问 http://localhost:9013/from 返回 git-default-1.0
  • 修改 git 仓库 config-repo/didispace.properties 为 git-default-2.0 ,并发送 POST 请求 http://localhost:9013/actuator/bus-refresh
  • 再访问 http://localhost:9013/from 返回 git-default-2.0

已经能够通过 Spring Cloud Bus 来更新总线上的属性配置了。

kafka 实现消息总线

快速入门

kafka在windows上的安装、运行

修改 config-client 9013 上的依赖,将 spring-cloud-starter-bus-amqp 替换成 spring-cloud-starter-bus-kafka

  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-bus-kafka</artifactId>
  4. </dependency>

kafka 采用默认配置,启动 config-client 9013 查看日志,订阅了名为 springCloudBus 的 Topic

  1. o.s.c.s.b.k.KafkaMessageChannelBinder$1 : partitions assigned: [springCloudBus-0]
  • 访问 http://localhost:9013/from 返回 git-default-1.0
  • 修改 git 仓库 config-repo/didispace.properties 为 git-default-2.0 ,并发送 POST 请求 http://localhost:9013/actuator/bus-refresh
  • 再访问 http://localhost:9013/from 返回 git-default-2.0

config-client 9013 查看日志

  1. com.netflix.discovery.DiscoveryClient : Saw local status change event StatusChangeEvent [timestamp=1539586923629, current=UP, previous=DOWN]
  2. o.s.cloud.bus.event.RefreshListener : Received remote refresh request. Keys refreshed [config.client.version, from]

发表评论

表情:
评论列表 (有 0 条评论,421人围观)

还没有评论,来说两句吧...

相关阅读

    相关 SpringCloud 消息总线Bus

    Spring Cloud Bus 将分布式的节点用轻量的消息代理连接起来。它可以实现通知微服务架构的配置文件的更改等操作,也可用于监控等。 开始本例之前需要安装rabbit