消息总线Bus - Spring Cloud系列(八)
本文章基于spring-boot-starter-parent 2.0.6RELEASE,spring-cloud-dependencies Finchley.SR2。
Spring Cloud Bus是什么
Spring Cloud Bus用于为微服务架构系统提供消息总线功能,它使用轻量级消息代理将分布式系统的各节点连接起来,然后可以使用此代理广播状态变更,例如配置信息的变更或其他一些管理操作等,总线可以作为应用程序之间的沟通渠道。当前版本的Spring CLoud Bus支持两款中间件产品:RabbitMQ和Kafka,并提供了相应的starters
。
Spring Cloud Bus使用
RabbitMQ实现消息总线
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件,也称为面向消息的中间件。RabbitMQ服务器是用高性能、可伸缩而闻名的Erlang语言编写而成,RabbitMQ支持多种操作系统、多种编程语言,在微服务架构消息中间件的选型中,它是一个非常适合且优秀的选择。
RabbitMQ安装和使用
因为系统是MAC所以使用homebrew工具来安装,命令行中执行如下命令:
> brew update
> brew install rabbitmq
正常情况下,RabbitMQ Server会被安装到/usr/local/sbin
,我们需要在 .bash_profile 文件中添加环境变量,因为我本机使用的是zsh终端,所以在 .zshrc 文件中添加:PATH=$PATH:/usr/local/sbin
。
注意:需要确保
/usr/local/sbin
文件夹有写权限,不然RabbitMQ的安装会不完全,后续还需要执行brew link
命令。
命令行中执行rabbitmq-server
命令来启动RabbitMQ服务
浏览器访问http://localhost:15672
即可看到管理页面,用户名、密码默认都是guest。
注意:项目启动的时候如果报 epmd error for host localhost: nxdomain (non-existing domain) 错误,需要绑定127.0.0.1这个host为localhost。我之前搭建集群的时候改过hosts文件,导致无法正常启动。
点击Admin选项卡,如下图所示,我们来建一个名为springcloud的用户,用于后面的例子演示:
新建后的用户是没有访问virtual hosts权限的,需要我们点击用户,进去后手动点击下方的Set permission
。
到目前为止,我们已安装并启动了RabbitMQ,并在管理控制台中添加了一个用户。下面我们结合前面的分布式配置中心Config里的示例,来看下如何整合Spring Cloud Bus,并以RabbitMQ作为消息代理来实现对配置信息的动态更新。
- eureka: 服务注册中心
- config-server: 配置了github仓库,并注册到了Eureka的服务端。
- order: 通过Eureka发现Config Server的客户端,用来访问配置服务器以获取配置信息。
扩展order应用
第一步:pom.xml中添加spring-cloud-starter-bus-amqp
和spring-boot-starter-actuator
依赖,后者用来提供刷新端点。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<version>2.0.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
第二步:配置文件中增加关于RabbitMQ的相关信息
这里是直接放在了application.yml
文件中,也可以直接放到github远程配置文件里
spring:
rabbitmq:
host: localhost
port: 5672
username: springcloud
password: 12345
management:
endpoints:
web:
exposure:
include: info,health,refresh,bus-refresh # 端点必须要开放出来,不然无法使用
bootstrap.yml
文件保持不动
spring:
cloud:
config:
name: order
profile: dev
label: master
discovery:
enabled: true
service-id: config-server
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8081/eureka/
github上order-dev.yml
配置文件内容如下:
server:
port: 7000
spring:
application:
name: order-micro
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8081/eureka
instance:
instance-id: order #此实例注册到eureka服务端的唯一的实例ID
prefer-ip-address: true #是否显示IP地址
from: github-order-dev-1.0
第三步:新建TestController用于测试读取远程配置文件里的信息,这里我们读取from配置项的数据
@RefreshScope
@RestController
public class TestController {
@Value("${from}'")
private String from;
@Autowired
private Environment env;
// 第一种方式
@RequestMapping("/from")
public String from() {
return this.from;
}
// 第二种方式
@RequestMapping("/from2")
public String from2() {
return env.getProperty("from");
}
}
先后启动eureka(8081)、config-server(8080)、order(7000)工程,可以发现order控制台中多了一个/actuator/bus-refresh
请求。
浏览器访问,http
,可以得到此时github上配置文件的信息7000/from
接着修改github上order-dev.yml中的from属性值,比如修改为:from: github-order-dev-2.0
访问http
,发现页面中展示的依然是之前的信息。7000/from
使用Postman或其它工具发送一个POST请求,请求地址http://localhost:7000/actuator/bus-refresh
再次访问http
,就会发现页面返回了最新的配置信息7000/from
到这里,我们已经能够通过Spring Cloud Bus结合RabbitMQ来动态更新总线上的属性配置了。
原理说明:
为了更好的说明原理,我们假设启动了多个order实例,来说明下是如何利用消息总线来动态更新属性的。
当系统启动后,图上”order“服务的多个实例会请求Config Server获取配置信息。此时若我们修改github上的from属性,这个修改是不会触发”order“实例的属性更新的。当我们向”order-1“(任意一个连接在总线上的实例都行)服务实例发送POST请求,访问/actuator/bus-refresh
接口。此时”order-1“服务实例就会将刷新请求发送到消息总线中,该消息事件会被“order”的其它连接在消息总线上的实例获取到,并重新从Config Server中获取它们的配置信息,从而实现配置信息的动态更新。
另外,从Git仓库中配置的修改到发起/actuator/bus-refresh
的POST请求这一步可以配合Github的Web Hook来自动触发,所有连接到消息总线上的应用都会收到更新请求。
指定刷新范围/actuator/bus-refresh?destination=XXX
,指定destination参数可以用来定位具体要刷新的应用程序,有两种方式:
- 定位具体实例
/actuator/bus-refresh?destination=customers:9000
,此时总线上的各应用实例会根据destination指定的属性值来判断是否是自己的实例ID,符合才进行配置刷新。这里我理解的是customers服务下,端口号为9000的实例才会进行配置更新。 - 定位具体服务
/actuator/bus-refresh?destination=customers:**
,该请求会触发customers服务的所有实例来进行刷新。
架构优化
上面我们是通过向order服务的某个实例来发送配置更新请求,从而触发整个服务集群的配置请求。为了让服务集群中的各个节点对等,降低将来的运维工作。我们可以将触发实例配置更新的请求,移到config-server工程中,让Config Server也连接到消息总线上,由Config Server来触发请求。改造方案如下:
- 在config-server也引入Spring Cloud Bus
/actuator/bus-refresh
请求发送到config-server上,并通过destination参数来制定需要更新配置的服务或实例。
读者请自行尝试,这里不再演示。
Kafka实现消息总线
Kafka是一个由LinkedIn开发的分布式消息系统,用于构建实时数据管道和流媒体应用。它是水平可伸缩的、容错的、非常快速,并已在数千家公司中使用。
Kafka安装和使用
第一步:官网下载后,解压缩。
> tar -xzf kafka_2.11-2.1.1.tgz
> cd kafka_2.11-2.1.1
第二步:启动测试
因为Kafka的设计中依赖了Zookeeper,所以我们要先启动Zookeeper。这里Kafka和Zookeeper均使用默认配置。
启动ZooKeeper
> bin/zookeeper-server-start.sh config/zookeeper.properties
启动Kafka
> bin/kafka-server-start.sh config/server.properties
第三步:创建Topic
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
该命令创建了一个名为test的topic,该Topic包含一个分区和一个Replica。
> bin/kafka-topics.sh --list --zookeeper localhost:2181
该命令用来查看当存在的Topic。
第四步:创建消息生产者
上面是我们手动创建Topic,直接使用下面的内容进行消息创建时也会自动创建Topics。
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
kafka-console-producer命令启动Kafka基于命令行的消息生产客户端,启动后可直接在控制台中输入消息来发送,控制台中的每一行数据都会被视为一条消息来发送。
第五步:创建消息消费者
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
kafka-console-consumer命令启动Kafka基于命令行的消息消费客户端。可以在上一步打开的消息生产者客户端中输入信息,并观察消费者客户端的消息输出。
红框上面的内容是之前发送的信息,请忽略!
工程改造
将之前的order工程引入的spring-cloud-starter-bus-amqp
替换成spring-cloud-starter-bus-kafka
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
如果启动Kafka时采用的均是默认配置,那么就不需要在做任何其他配置就能在本地实现RabbitMQ到Kafka的切换。重启order等工程,可以发现控制台中输出了连接到Kafka的相关信息。
命令行中输入> bin/kafka-topics.sh --list --zookeeper localhost:2181
,会发现多出了一个名为springCloudBus的Topic。
浏览器请求http://localhost:7000/from
获取到配置内容,然后修改github上配置文件中from的值,再次访问http://localhost:7000/
发现内容没有更新。发送POST请求,http://localhost:7000/actuator/bus-refresh
,此时再去访问http://localhost:7000/from
即可发现已经加载到了最新的配置内容。
从order工程的控制台中,我们可以看到如下内容:
深入理解:
下面我们来进一步了解下当执行刷新请求时,总线上的消息消费者都收到了什么信息?可以通过Kafka提供的控制台消费者来监测。从上文可以看出,order工程启动后,使用了名为springCloudBus的Topic,所以执行如下命令来启动消费者控制台:> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic springCloudBus --from-beginning
通过向order工程发送POST请求/actuator/bus-refresh
,消费者控制台中可以看到如下内容(Json工具解析后):
{
"type": "RefreshRemoteApplicationEvent",
"timestamp": 1551402226681,
"originService": "order-micro:7000:2dc075303c1d3a65588392821f4478bb",
"destinationService": "**",
"id": "4ed2b6f0-65fd-4e1c-aad5-147188a5f32f"
} {
"type": "AckRemoteApplicationEvent",
"timestamp": 1551402226690,
"originService": "order-micro:7000:2dc075303c1d3a65588392821f4478bb",
"destinationService": "**",
"id": "0f4d853e-1618-4df2-96f2-4933e7743b8d",
"ackId": "4ed2b6f0-65fd-4e1c-aad5-147188a5f32f",
"ackDestinationService": "**",
"event": "org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent"
}
下面我们来看下消息中收到的信息内容:
- type: 消息的事件类型。在上文中可以看到有两种事件,其中RefreshRemoteApplicationEvent就是我们用来刷新配置的事件,而AckRemoteApplicationEvent是相应消息已经正确接收的告知消息事件。
- timestamp: 消息的时间戳
- originService: 消息的来源服务实例(我们是通过POST order实例触发的刷新请求)
- destinationService: 消息的目标服务实例。
"**"
代表了总线上的所有服务实例。如果要指定服务或实例,使用我们之前提到的destination参数配置即可。 - id: 消息的唯一标识
- ackId: Ack消息对应的消息来源,我们可以看到下面的AckRemoteApplicationEvent对应了上面RefreshRemoteApplicationEvent的ID。告知RefreshRemoteApplicationEvent事件的消息已收到。
- ackDestinationService: Ack消息的目标服务实例,
"**"
代表了总线上的所有服务实例都会收到该Ack消息。 - event: Ack消息的来源事件。
参考资料
《Spring Cloud 微服务实战》 翟永超著
Spring Cloud Bus官网
-—————-本文结束感谢您的阅读——————
还没有评论,来说两句吧...