消息总线Bus - Spring Cloud系列(八) た 入场券 2022-03-16 13:22 232阅读 0赞 本文章基于spring-boot-starter-parent 2.0.6RELEASE,spring-cloud-dependencies Finchley.SR2。 ### Spring Cloud Bus是什么 ### Spring Cloud Bus用于为微服务架构系统提供消息总线功能,它使用轻量级消息代理将分布式系统的各节点连接起来,然后可以使用此代理广播状态变更,例如配置信息的变更或其他一些管理操作等,总线可以作为应用程序之间的沟通渠道。当前版本的Spring CLoud Bus支持两款中间件产品:**RabbitMQ**和**Kafka**,并提供了相应的`starters`。 ### Spring Cloud Bus使用 ### #### RabbitMQ实现消息总线 #### RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件,也称为面向消息的中间件。RabbitMQ服务器是用高性能、可伸缩而闻名的Erlang语言编写而成,RabbitMQ支持多种操作系统、多种编程语言,在微服务架构消息中间件的选型中,它是一个非常适合且优秀的选择。 ##### RabbitMQ安装和使用 ##### 因为系统是MAC所以使用homebrew工具来安装,命令行中执行如下命令: > brew update > brew install rabbitmq 正常情况下,RabbitMQ Server会被安装到`/usr/local/sbin`,我们需要在 **.bash\_profile** 文件中添加环境变量,因为我本机使用的是zsh终端,所以在 **.zshrc** 文件中添加:`PATH=$PATH:/usr/local/sbin`。 > 注意:需要确保`/usr/local/sbin`文件夹有写权限,不然RabbitMQ的安装会不完全,后续还需要执行`brew link`命令。 命令行中执行`rabbitmq-server`命令来启动RabbitMQ服务 ![rabbitmq-server][] 浏览器访问`http://localhost:15672`即可看到管理页面,用户名、密码默认都是guest。 ![rabbitmq管理控制台][rabbitmq] > 注意:项目启动的时候如果报 epmd error for host localhost: nxdomain (non-existing domain) 错误,需要绑定127.0.0.1这个host为localhost。我之前搭建集群的时候改过hosts文件,导致无法正常启动。 点击Admin选项卡,如下图所示,我们来建一个名为springcloud的用户,用于后面的例子演示: ![rabbitmy增加用户][rabbitmy] 新建后的用户是没有访问virtual hosts权限的,需要我们点击用户,进去后手动点击下方的`Set permission`。 ![set权限][set] 到目前为止,我们已安装并启动了RabbitMQ,并在管理控制台中添加了一个用户。下面我们结合前面的[分布式配置中心Config][Config]里的示例,来看下如何整合Spring Cloud Bus,并以RabbitMQ作为消息代理来实现对配置信息的动态更新。 * **eureka:** 服务注册中心 * **config-server:** 配置了github仓库,并注册到了Eureka的服务端。 * **order:** 通过Eureka发现Config Server的客户端,用来访问配置服务器以获取配置信息。 ##### 扩展order应用 ##### **第一步:pom.xml中添加`spring-cloud-starter-bus-amqp`和`spring-boot-starter-actuator`依赖,后者用来提供刷新端点。** <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> <version>2.0.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-amqp</artifactId> <version>2.0.0.RELEASE</version> </dependency> **第二步:配置文件中增加关于RabbitMQ的相关信息** 这里是直接放在了`application.yml`文件中,也可以直接放到github远程配置文件里 spring: rabbitmq: host: localhost port: 5672 username: springcloud password: 12345 management: endpoints: web: exposure: include: info,health,refresh,bus-refresh # 端点必须要开放出来,不然无法使用 `bootstrap.yml`文件保持不动 spring: cloud: config: name: order profile: dev label: master discovery: enabled: true service-id: config-server eureka: client: serviceUrl: defaultZone: http://localhost:8081/eureka/ github上`order-dev.yml`配置文件内容如下: server: port: 7000 spring: application: name: order-micro eureka: client: serviceUrl: defaultZone: http://localhost:8081/eureka instance: instance-id: order #此实例注册到eureka服务端的唯一的实例ID prefer-ip-address: true #是否显示IP地址 from: github-order-dev-1.0 **第三步:新建TestController用于测试读取远程配置文件里的信息,这里我们读取from配置项的数据** @RefreshScope @RestController public class TestController { @Value("${from}'") private String from; @Autowired private Environment env; // 第一种方式 @RequestMapping("/from") public String from() { return this.from; } // 第二种方式 @RequestMapping("/from2") public String from2() { return env.getProperty("from"); } } 先后启动eureka(8081)、config-server(8080)、order(7000)工程,可以发现order控制台中多了一个`/actuator/bus-refresh`请求。![bus-refresh][] 浏览器访问,`http:localhost:7000/from`,可以得到此时github上配置文件的信息 ![refresh 1.0][] 接着修改github上**order-dev.yml**中的from属性值,比如修改为:`from: github-order-dev-2.0` 访问`http:localhost:7000/from`,发现页面中展示的依然是之前的信息。 **使用Postman或其它工具发送一个POST请求,请求地址`http://localhost:7000/actuator/bus-refresh`** 再次访问`http:localhost:7000/from`,就会发现页面返回了最新的配置信息 ![refresh 2.0][] 到这里,我们已经能够通过Spring Cloud Bus结合RabbitMQ来动态更新总线上的属性配置了。 **原理说明:** 为了更好的说明原理,我们假设启动了多个order实例,来说明下是如何利用消息总线来动态更新属性的。 ![bus原理分析][bus] 当系统启动后,图上"**order**"服务的多个实例会请求Config Server获取配置信息。此时若我们修改github上的from属性,这个修改是不会触发"**order**"实例的属性更新的。当我们向"**order-1**"(任意一个连接在总线上的实例都行)服务实例发送POST请求,访问`/actuator/bus-refresh`接口。此时"**order-1**"服务实例就会将刷新请求发送到消息总线中,该消息事件会被“**order**”的其它连接在消息总线上的实例获取到,并重新从Config Server中获取它们的配置信息,从而实现配置信息的动态更新。 另外,从Git仓库中配置的修改到发起`/actuator/bus-refresh`的POST请求这一步可以配合Github的Web Hook来自动触发,所有连接到消息总线上的应用都会收到更新请求。 **指定刷新范围** `/actuator/bus-refresh?destination=XXX`,指定destination参数可以用来定位具体要刷新的应用程序,有两种方式: * 定位具体实例 `/actuator/bus-refresh?destination=customers:9000`,此时总线上的各应用实例会根据destination指定的属性值来判断是否是自己的实例ID,符合才进行配置刷新。这里我理解的是customers服务下,端口号为9000的实例才会进行配置更新。 * 定位具体服务 `/actuator/bus-refresh?destination=customers:**`,该请求会触发customers服务的所有实例来进行刷新。 **架构优化** 上面我们是通过向order服务的某个实例来发送配置更新请求,从而触发整个服务集群的配置请求。为了让服务集群中的各个节点对等,降低将来的运维工作。我们可以将触发实例配置更新的请求,移到config-server工程中,让Config Server也连接到消息总线上,由Config Server来触发请求。改造方案如下: * 在config-server也引入Spring Cloud Bus * `/actuator/bus-refresh`请求发送到config-server上,并通过destination参数来制定需要更新配置的服务或实例。 读者请自行尝试,这里不再演示。 #### Kafka实现消息总线 #### Kafka是一个由LinkedIn开发的分布式消息系统,用于构建实时数据管道和流媒体应用。它是水平可伸缩的、容错的、非常快速,并已在数千家公司中使用。 ##### Kafka安装和使用 ##### **第一步:[官网下载][Link 1]后,解压缩。** > tar -xzf kafka_2.11-2.1.1.tgz > cd kafka_2.11-2.1.1 **第二步:启动测试** 因为Kafka的设计中依赖了Zookeeper,所以我们要先启动Zookeeper。这里Kafka和Zookeeper均使用默认配置。 启动ZooKeeper > bin/zookeeper-server-start.sh config/zookeeper.properties 启动Kafka > bin/kafka-server-start.sh config/server.properties **第三步:创建Topic** > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 该命令创建了一个名为test的topic,该Topic包含一个分区和一个Replica。 > bin/kafka-topics.sh --list --zookeeper localhost:2181 该命令用来查看当存在的Topic。 **第四步:创建消息生产者** 上面是我们手动创建Topic,直接使用下面的内容进行消息创建时也会自动创建Topics。 > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test **kafka-console-producer**命令启动Kafka基于命令行的消息生产客户端,启动后可直接在控制台中输入消息来发送,控制台中的每一行数据都会被视为一条消息来发送。 **第五步:创建消息消费者** > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning **kafka-console-consumer**命令启动Kafka基于命令行的消息消费客户端。可以在上一步打开的消息生产者客户端中输入信息,并观察消费者客户端的消息输出。 ![kafka1][] ![kafka2][] 红框上面的内容是之前发送的信息,请忽略! ##### 工程改造 ##### 将之前的order工程引入的`spring-cloud-starter-bus-amqp`替换成`spring-cloud-starter-bus-kafka` <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-kafka</artifactId> <version>2.0.0.RELEASE</version> </dependency> 如果启动Kafka时采用的均是默认配置,那么就不需要在做任何其他配置就能在本地实现RabbitMQ到Kafka的切换。重启order等工程,可以发现控制台中输出了连接到Kafka的相关信息。 ![kafka][] 命令行中输入`> bin/kafka-topics.sh --list --zookeeper localhost:2181`,会发现多出了一个名为springCloudBus的Topic。 浏览器请求`http://localhost:7000/from`获取到配置内容,然后修改github上配置文件中from的值,再次访问`http://localhost:7000/`发现内容没有更新。发送POST请求,`http://localhost:7000/actuator/bus-refresh`,此时再去访问`http://localhost:7000/from`即可发现已经加载到了最新的配置内容。 从order工程的控制台中,我们可以看到如下内容: ![kafka更新配置信息][kafka 1] **深入理解:** 下面我们来进一步了解下当执行刷新请求时,总线上的消息消费者都收到了什么信息?可以通过Kafka提供的控制台消费者来监测。从上文可以看出,order工程启动后,使用了名为springCloudBus的Topic,所以执行如下命令来启动消费者控制台: `> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic springCloudBus --from-beginning` 通过向order工程发送POST请求`/actuator/bus-refresh`,消费者控制台中可以看到如下内容(Json工具解析后): { "type": "RefreshRemoteApplicationEvent", "timestamp": 1551402226681, "originService": "order-micro:7000:2dc075303c1d3a65588392821f4478bb", "destinationService": "**", "id": "4ed2b6f0-65fd-4e1c-aad5-147188a5f32f" } { "type": "AckRemoteApplicationEvent", "timestamp": 1551402226690, "originService": "order-micro:7000:2dc075303c1d3a65588392821f4478bb", "destinationService": "**", "id": "0f4d853e-1618-4df2-96f2-4933e7743b8d", "ackId": "4ed2b6f0-65fd-4e1c-aad5-147188a5f32f", "ackDestinationService": "**", "event": "org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent" } 下面我们来看下消息中收到的信息内容: * **type:** 消息的事件类型。在上文中可以看到有两种事件,其中RefreshRemoteApplicationEvent就是我们用来刷新配置的事件,而AckRemoteApplicationEvent是相应消息已经正确接收的告知消息事件。 * **timestamp:** 消息的时间戳 * **originService:** 消息的来源服务实例(我们是通过POST order实例触发的刷新请求) * **destinationService:** 消息的目标服务实例。`"**"`代表了总线上的所有服务实例。如果要指定服务或实例,使用我们之前提到的destination参数配置即可。 * **id:** 消息的唯一标识 * **ackId:** Ack消息对应的消息来源,我们可以看到下面的AckRemoteApplicationEvent对应了上面RefreshRemoteApplicationEvent的ID。告知RefreshRemoteApplicationEvent事件的消息已收到。 * **ackDestinationService:** Ack消息的目标服务实例,`"**"`代表了总线上的所有服务实例都会收到该Ack消息。 * **event:** Ack消息的来源事件。 -------------------- **参考资料** 《Spring Cloud 微服务实战》 翟永超著 [Spring Cloud Bus官网][Spring Cloud Bus] -------------------- \------------本文结束感谢您的阅读------------ [rabbitmq-server]: /images/20220316/55ff9ecd361b498e9659abbf944c5a40.png [rabbitmq]: /images/20220316/1c0f42a525264af8a52aa86a2ee3fcd6.png [rabbitmy]: /images/20220316/00880ff2aeca440683f789c14ac05d11.png [set]: /images/20220316/1ae55304b48c47fa892a294e2ff01fee.png [Config]: https://blog.csdn.net/hzygcs/article/details/87250528 [bus-refresh]: /images/20220316/f2a3c4eed5674cf6a0c66361e1cca9b7.png [refresh 1.0]: /images/20220316/524b503a2abe42b29670024dd2e08608.png [refresh 2.0]: /images/20220316/996cf98dcf924b5dbd0c750646bb7f35.png [bus]: /images/20220316/bb89c14a41a84b87abfaa67f0049f121.png [Link 1]: https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.0/kafka_2.11-2.1.0.tgz [kafka1]: /images/20220316/27f62101a2e3430585bc557d24653ec8.png [kafka2]: /images/20220316/4e8005ccdfe34c00a3bf5a24b07d9c6f.png [kafka]: /images/20220316/ea0b3277c09e4a5b859c870fc83a5e5a.png [kafka 1]: /images/20220316/c9ed0d7e6352416aac600d58a79dedd2.png [Spring Cloud Bus]: https://cloud.spring.io/spring-cloud-static/Finchley.SR2/single/spring-cloud.html#_spring_cloud_bus
还没有评论,来说两句吧...