Spring Cloud Stream 消息驱动微服务 相关图形,dljd,cat - spring cloud alibaba
一 业务
- 多个微服务
二 需求
- 多个微服务之间需要通过消息中间件互相传递消息。
- 在面对不同的MQ产品(RabbitMQ、Kafka、RocketMQ)时,做到消息中间件和微服务的代码之间耦合性降到最低,甚至任意切换消息中间件而不用动微服务的代码。
三 解决方案
1 功能“集”:降低微服务和消息中间件的耦合性
2 “神”冰箱:spring cloud stream整合消息中间件(RabbitMQ、Kafka、RocketMQ)
四 完成学习
1 思想、思路
按照官方的定义,Spring Cloud Stream 是一个构建消息驱动微服务的框架
Spring Cloud Stream解决了开发人员无感知的使用消息中间件的问题。Spring Cloud Stream对消息中间件的进一步封装,可以做到代码层面对消息中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为rocketmq或者kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程;
- String cloud stream对各个MQ产品(RabbitMQ、Kafka、RocketMQ)的连接和各种操作,做了一层高度的抽象。在String cloud stream的加持下,我们可任意选用某一种消息中间件而不用修改微服务的代码。
- 相当于ORM框架(hibernate、mybatis),对底层的多种数据库产品的连接和各种操作做了一层高度的抽象。在ORM框架的加持下,我们可任意选用某一种数据库而不用修改代码。
- Spring Cloud Stream官方只支持rabbitmq 和 kafka,spring cloud alibaba新写了一个starter可以使Spring Cloud Stream支持RocketMQ;
2 体系组织
- spring cloud stream是spring cloud提供的一个子项目(或称 框架)。
application core是我们写的微服务的代码,可以有多个application core,即有多个微服务。
- output:微服务发消息到消息中间件
- input:微服务接收消息中间件的消息。
binder:绑定器
- spring cloud stream子项目(框架)给我们封装的东西叫binder。
- spring cloud stream封装的binder对微服务和消息中间件之间发消息和接消息的操作做了高度的抽象、封装。
- binder用于创建binding,从而绑定具体的消息中间件。
- 消息中间件都有自己的Binder实现;比如Kafka 的实现KafkaMessageChannelBinder,RabbitMQ的实现RabbitMessageChannelBinder以及RocketMQ 的实现 RocketMQMessageChannelBinder;
binding
- 包括 Input Binding 和 Output Binding;
- Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触;
- 常用的消息中间件:kafka、rabbimq、rokectmq
开发过程中使用到的类、注解:
组成
说明
Binder 绑定器(各个消息中间件的绑定器不同)
Binder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现;
@Input 注解,表明是一个输入管道
该注解标识输入通道,通过该输入通道接收消息进入应用程序
@Output 注解,表明是一个输出管道
该注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener 注解
监听队列,用于消费者的队列的消息接收
@EnableBinding 注解,开启(Enable)绑定功能
将信道channel和exchange、topic绑定在一起
3 工作流程
- 第一步:Pom.xml引入依赖
第二步:application.properties配置
➌Binder
- RocketMQ服务器地址
➍Bindings
- input的3要素键值对:String:BindingProperties对象的属性(这里的string是input)
- output的3要素键值对:String:BindingProperties对象的属性(这里的string是output)
第三步:编码
➊Source
- ➋输出信道,发送消费
➊Sink
- ➋输入信道,消费消息
- 第四步:验证
4 源码分析:弊端
组id是com.alibaba.cloud
pom.xml中依赖是:
spring-cloud-starter-rocketmq 对应的jar包是:com.alibaba.cloud.spring-cloud-starter-stream-rocketmq.jar
自动装配解析:META-INF/spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.stream.binder.rocketmq.config.RocketMQComponent4BinderAutoConfiguration
自动装配的类:RocketMQComponent4BinderAutoConfiguration.java
RocketMQ的自动装配:@AutoConfigureAfter({RocketMQAutoConfiguration.class})
RocketMQ属性的配置:@EnableConfigurationProperties({RocketMQProperties.class})
命名服务的地址:nameServer
- 生产者:producer
源码解析:
Binder属性配置:RocketMQBinderConfigurationProperties.java
applicatoin.properties中# 客户端接入点,必填
spring.cloud.stream.rocketmq.binder.name-server=192.168.172.128:9876
对应的源码,如下图所示Bindings属性配置:RocketMQBindingProperties.java
- 生产者Bindings属性配置
- 消费者Bindings属性配置
消费者属性配置:RocketMQConsumerProperties.java
- 标签属性配置
- sql属性配置
- 集群方式:广播模式 或 集群模式
生产者属性配置:RocketMQProducerProperties.java
- 组属性配置
- 事务消息属性配置
- 同步异步属性配置
- RocketMQExtendedBindingProperties
分析总结:
- 配置项目太少,所以在spring cloud stream加持下RocketMQ的一些功能是实现不了的。即使用spring cloud stream + RocketMQ比单独使用RocketMQ时,功能要少一些,只提供了核心的常用的功能配置属性。
- 以等alibaba升级了版本后,直接去看生产者或消费者的属性配置类(RocketMQProducerProperties.java、RocketMQConsumerProperties.java)多加了哪些属性,多加了属性就表明相应地会增加一些对应的功能。
五 配置应用
1 stream + rocketmq的HelloWorld程序
第一步:新建springboot项目、pom.xml
第1步:引入spring cloud stream的起步依赖:
com.alibaba.cloud
spring-cloud-starter-stream-rocketmq - 第2步:依赖父项目spring cloud alibaba,解决了spring cloud stream的起步依赖没有版本号从而不能引入的问题:
2.2.1.RELEASE 注意:兼容性问题:
注意版本需要使用springboot2.2.5
2.2.5.RELEASE 2.2.1.RELEASE - 第3步:依赖spring-cloud-starter-web,有
第二步:application.properties配置文件:组一样竞争关系
########## RocketMQ 通用配置
# 配置绑定器binder连接某个消息中间件(由ip和port指定),高度抽象,用于屏蔽不同消息中间件之间的差异(相当于ORM框架中的hibernate和mybatis)
spring.cloud.stream.rocketmq.binder.name-server=192.168.172.128:9876# 日志级别
logging.level.com.alibaba.cloud.stream.binder.rocketmq=INFO########## Consumer Config### bindings是微服务与具体消息中间件沟通(发消息,收消息)的桥梁 #######
# input 的配置:
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.content-type=text/plain
spring.cloud.stream.bindings.input.group=test-group########## Produce Config
# output 的配置如下:
spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.bindings.output.group=test-group第三步:编码
第1步:SendService.java
发送消息:
@EnableBinding(Source.class)
@Service
public class SenderService {//Source是spring cloud stream封装的
@Autowired
private Source source;
//发送消息的方法
public void send(String msg) throws Exception {//获取订阅通(信)道:source.output()==MessageChannel
//发送消息MessageChannel.send()的参数是:Message.java
// 返回值:布尔类型
boolean flag = source.output().send(MessageBuilder.withPayload(msg).build());
System.out.println(“消息发送:” + flag);
}
}
- 第2步:Application.java发送消息的方式(或使用XxxController.java)
ReceiveService.java接收消
方案1:手动调用方法去接收(while true死循环):Sink.input获取订阅信道
//获取消息通(信)道:source.input()==SubscribableChannel
//发送消息SubscribableChannel.subscrible()的参数是:Message.java
方案2:自动监听方式@EnableBinding(Sink.class) + @StreamListener(“input”)
@EnableBinding(Sink.class)
public class ReceiveService {@StreamListener(“input”)
public void receiveInput1(String receiveMsg) {
System.out.println(“input 接收到的消息: “ + receiveMsg);
}
}
第四步:验证
MQ管控台
- 目的地有消息:spring.cloud.stream.bindings.input.destination=test-topic
- 消息被消费
- 控制台打印
2 自定义信道:input1 + output1 + 扩展多个inputX多个outputX
在前面的案例中,我们已经实现了一个基础的 Spring Cloud Stream 消息传递处理操作,但在操作之中使用的是系统提供的 Source (output)、Sink(input),接下来我们来看一下自定义信道名称;
- 第一步:pom.xml
第二步:application.properties::组一样竞争关系
########## 自定义
# input 的配置:
spring.cloud.stream.bindings.input1.destination=test-topic1
spring.cloud.stream.bindings.input1.content-type=text/plain
spring.cloud.stream.bindings.input1.group=test-group1# output 的配置:
spring.cloud.stream.bindings.output1.destination=test-topic1
spring.cloud.stream.bindings.output1.content-type=text/plain- spring.cloud.stream.bindings.input1.group=test-group1
第三步:编码
第1步:自定义Source接口
public interface MySource {
String OUTPUT1 = “output1”;
@Output(MySource.*OUTPUT1*)
MessageChannel output1();
\}
2. 第2步:发送消息业务层,SendService.java
3. 第3步:自定义Sink接口
public interface MySink \{
String *INPUT1* = "input1";
@Input(MySink.*INPUT1*)
SubscribableChannel input1();
\}
4. 第4步:接收消息业务层,ReceiveService.java
5. 第5步:applicaton.java 或 XxxController.java中发送、消费消息
1. 发送消息
2. 接收消息
1. 传统方式:手动调用receive方法(while死循环)
2. 自动监听方式:@EnableBinding(Sink.class) + @StreamListener("input")
- 第四步:验证
第五步:扩展
第1步:多个发送
第2步:多个消费
3 Stream + RocketMQ事务消息
(1)流原
第一步:生产者MQ Producer发送一条“半”消息Send Half Msg到消息中间件服务器MQ Server,即并不提交。
- 注:此时这条“半”消息对于消费者MQ Subscriber来说是不能消费、甚至说是不可见的
- 第二步:消息中间件服务器MQ Server返回给生产者MQ Producer一个响应,如200 OK。
第三步:生产者MQ Producer执行本地的事务Local Transaction
- 注:本地事务指:你做业务逻辑处理,比如说你要插入订单、比如说你要修改订单状态。
第四步:如果本地的事务Local Transaction提交成功,返回提交Commit,否则返回回滚Rollback,给到消息服务器MQ Server。消息服务器MQ Server接收到提交Commit后,把“半”消息Send Half Msg变成了“可用的”消息。如果消息服务器MQ Server接收到回滚Rollback,那么消息服务器MQ Server就会把这条消息删除掉Rollback Delete Msg。
- 注:此时这条“可用消息”就可以让消费者MQ Subscriber可见、可消费了。
- 第五步:如果本地的事务Local Transaction一直没有给消息服务器MQ Server发送状态(提交或回滚状态),即超时。那么消息服务器MQ Server会回调(5 Check back)生产者MQ Producer的方法。
- 第六步:生产者MQ Producer的回调(5 Check back)方法中,要写代码去检查本地事务(6 check the state of local Transaction)。检查什么呢?即检查本地事务插入订单操作有没有成功,我们可以通过查询数据库来得到结果。
- 第七步:如果检查的结果是插入成功,可以返回提交Commit,否则返回回滚Rollback,给到消息服务器MQ Server。
(2)配置应用:Stream + RocketMQ事务消息
- 第一步:创建新的springboot项目
- 第二步:pom.xml加依赖::
spring-cloud-starter-rocketmq 第三步:application.properties
#————————————— 事务消息 begin ————————————————
#**生产的配置
spring.cloud.stream.bindings.outputTX.destination=TransactionTopic
spring.cloud.stream.bindings.outputTX.content-type=application/json
spring.cloud.stream.rocketmq.bindings.outputTX.producer.group=myTxProducerGroup
#是否为事务消息,默认为false表示不是事务消息,true**表示是事务消息
spring.cloud.stream.rocketmq.bindings.outputTX.producer.transactional=true#**消费的配置:
spring.cloud.stream.bindings.inputTX.destination=TransactionTopic
spring.cloud.stream.bindings.inputTX.content-type=text/plain
spring.cloud.stream.bindings.inputTX.group=transaction-group
spring.cloud.stream.rocketmq.bindings.inputTX.consumer.broadcasting=false
1#————————————— 事务消息 end ————————————————第四步:编码
第1步:生产者
a.自定义信道
public interface MySource {
String OUTPUTTx= “outpuTx”;
@Output(MySource.*OUTPUTTX*)
MessageChannel outputTX();
\}
2. b.发送事务消息,SenderService.java
@Autowired
private MySource mySource;
public <T> void sendTransactionalMsg(T msg, int num) throws Exception {
// 需要构建的消息对象MessageBuilder
MessageBuilder builder = MessageBuilder.withPayload(msg)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);
builder.setHeader("test", String.valueOf(num));
Message message = builder.build();
boolean flag = mySource.outputTX().send(message);
System.out.println("inputTX 事务消息发送:" + flag);
}
3. c.执行本地事务和本地事务检查
// 执行本地事务和本地事务检查:
// 监听器
@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5, maximumPoolSize = 10)
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Object num = msg.getHeaders().get("test");
if ("1".equals(num)) {
System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " unknown");
// 未知,需要二次检查
return RocketMQLocalTransactionState.UNKNOWN;
} else if ("2".equals(num)) {
System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " rollback");
return RocketMQLocalTransactionState.ROLLBACK;
}
System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " commit");
return RocketMQLocalTransactionState.COMMIT;
}
// 回调检查
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
System.out.println("check: " + new String((byte[]) msg.getPayload()));
// 假设回调检查没有异常,则返回commit
return RocketMQLocalTransactionState.COMMIT;
}
}
4. d.程序入口类application.java![b0caaccf747e43aeb11f126b9f0dd43b.png][]
2. 第2步:消费者
1. a.MySink.java
2. b.ReceiveService.java
1. 监听器自动接收消费
2. .output 或 .input手动调用消息
3. 第3步:程序入口类![631967e803934e1fba6d1bcb413c7d02.png][]
第五步:测试验证
生产者
- 返回commit,事务正常提交,消费者可见、也可消费消息
- 返回rallback,事务回滚,消息删除
- 超时,运行二次检查方法
消费者
- 成功接收到所有的消息
4 纲:生产者和消费者按标签tag发送或消费消息
六 开发的时候有两种选择
- 一种就是 直接SpringBoot + RocketMQ整合实现消息传送;
- 一种就是 使用Spring Cloud Stream对消息中间件的包装,来实现消息传送;
还没有评论,来说两句吧...