消息总线SpringCloud Bus
消息总线SpringCloud Bus
文章目录
- 消息总线SpringCloud Bus
- RabbitMQ 实现消息总线
- 快速入门
- 整合 Spring Cloud Bus
- kafka 实现消息总线
- 快速入门
RabbitMQ 实现消息总线
快速入门
新建 rabbitmq-hello 项目
pom 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zk.springcloud.rabbitmq</groupId>
<artifactId>springcloud-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>springcloud-rabbitmq</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties
spring.application.name=rabbitmq-hello
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=springcloud
spring.rabbitmq.password=123456
创建消息生产者
package com.zk.springcloud.rabbitmq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class Sender {
@Autowired
AmqpTemplate rabbitTemplate;
public void send(){
String context = "hello " + new Date();
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("hello",context);
}
}
创建消息消费者
package com.zk.springcloud.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "hello")
public class Receiver {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}
创建 RabbitMQ 的配置类 RabbitConfig
package com.zk.springcloud.rabbitmq;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public Queue helloQueue(){
return new Queue("hello");
}
}
应用主类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringcloudRabbitmqApplication {
public static void main(String[] args) {
SpringApplication.run(SpringcloudRabbitmqApplication.class, args);
}
}
单元测试类,用来调用消息生产
package com.zk.springcloud.rabbitmq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringcloudRabbitmqApplicationTests {
@Autowired
private Sender sender;
@Test
public void contextLoads() {
sender.send();
}
}
启动应用主类,在控制台看到如下内容:
2018-10-15 10:59:17.592 INFO 30292 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#32c726ee:0/SimpleConnection@54c28ee5 [delegate=amqp://springcloud@127.0.0.1:5672/, localPort= 46069]
同时,通过 RabbitMQ 控制面板,看到 Connections 和 Channels 中包含当前连接的条目
运行单元测试类,在控制台输出如下内容:
Sender : hello Mon Oct 15 11:10:36 CST 2018
在应用主类的控制台,输出如下内容:
Receiver : hello Mon Oct 15 11:10:36 CST 2018
整合 Spring Cloud Bus
也可以将 SpringCloudBus 放在 config 9012 进行配置和刷新配置
config-client 9013 项目添加依赖,用来刷新端点
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
在配置文件中添加关于 RabbitMQ 的链接和用户信息
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=springcloud
spring.rabbitmq.password=123456
启动 config 9012 再启动 config-client 9013 ,可以在 config-client 控制台看到多了 /bus/refresh 请求
2018-10-15 11:16:35.060 INFO 32424 --- [ main] s.b.a.e.w.s.WebMvcEndpointHandlerMapping : Mapped "{[/actuator/bus-refresh/{destination}],methods=[POST]}" onto public java.lang.Object org.springframework.boot.actuate.endpoint.web.servlet.AbstractWebMvcEndpointHandlerMapping$OperationHandler.handle(javax.servlet.http.HttpServletRequest,java.util.Map<java.lang.String, java.lang.String>)
2018-10-15 11:16:35.060 INFO 32424 --- [ main] s.b.a.e.w.s.WebMvcEndpointHandlerMapping : Mapped "{[/actuator/bus-refresh],methods=[POST]}" onto public java.lang.Object
- 访问 http://localhost:9013/from 返回 git-default-1.0
- 修改 git 仓库 config-repo/didispace.properties 为 git-default-2.0 ,并发送 POST 请求 http://localhost:9013/actuator/bus-refresh
- 再访问 http://localhost:9013/from 返回 git-default-2.0
已经能够通过 Spring Cloud Bus 来更新总线上的属性配置了。
kafka 实现消息总线
快速入门
kafka在windows上的安装、运行
修改 config-client 9013 上的依赖,将 spring-cloud-starter-bus-amqp 替换成 spring-cloud-starter-bus-kafka
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
kafka 采用默认配置,启动 config-client 9013 查看日志,订阅了名为 springCloudBus 的 Topic
o.s.c.s.b.k.KafkaMessageChannelBinder$1 : partitions assigned: [springCloudBus-0]
- 访问 http://localhost:9013/from 返回 git-default-1.0
- 修改 git 仓库 config-repo/didispace.properties 为 git-default-2.0 ,并发送 POST 请求 http://localhost:9013/actuator/bus-refresh
- 再访问 http://localhost:9013/from 返回 git-default-2.0
config-client 9013 查看日志
com.netflix.discovery.DiscoveryClient : Saw local status change event StatusChangeEvent [timestamp=1539586923629, current=UP, previous=DOWN]
o.s.cloud.bus.event.RefreshListener : Received remote refresh request. Keys refreshed [config.client.version, from]
还没有评论,来说两句吧...