Spring Cloud Stream 消息驱动微服务 相关图形,dljd,cat - spring cloud alibaba

分手后的思念是犯贱 2023-09-27 09:13 184阅读 0赞

一 业务

  1. 多个微服务

二 需求

  1. 多个微服务之间需要通过消息中间件互相传递消息。
  2. 在面对不同的MQ产品(RabbitMQ、Kafka、RocketMQ)时,做到消息中间件和微服务的代码之间耦合性降到最低,甚至任意切换消息中间件而不用动微服务的代码。

三 解决方案

1 功能“集”:降低微服务和消息中间件的耦合性

2 “神”冰箱:spring cloud stream整合消息中间件(RabbitMQ、Kafka、RocketMQ)

四 完成学习

1 思想、思路

  1. 按照官方的定义,Spring Cloud Stream 是一个构建消息驱动微服务的框架

    1. Spring Cloud Stream解决了开发人员无感知的使用消息中间件的问题。Spring Cloud Stream对消息中间件的进一步封装,可以做到代码层面对消息中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为rocketmq或者kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程;

      1. String cloud stream对各个MQ产品(RabbitMQ、Kafka、RocketMQ)的连接和各种操作,做了一层高度的抽象。在String cloud stream的加持下,我们可任意选用某一种消息中间件而不用修改微服务的代码。
      2. 相当于ORM框架(hibernate、mybatis),对底层的多种数据库产品的连接和各种操作做了一层高度的抽象。在ORM框架的加持下,我们可任意选用某一种数据库而不用修改代码。
  2. Spring Cloud Stream官方只支持rabbitmq 和 kafka,spring cloud alibaba新写了一个starter可以使Spring Cloud Stream支持RocketMQ;

2 体系组织

59bde2a5dad54e3c983e8d38ff42cb71.png

  1. spring cloud stream是spring cloud提供的一个子项目(或称 框架)。
  2. application core是我们写的微服务的代码,可以有多个application core,即有多个微服务。

    1. output:微服务发消息到消息中间件
    2. input:微服务接收消息中间件的消息。
  3. binder:绑定器

    1. spring cloud stream子项目(框架)给我们封装的东西叫binder。
    2. spring cloud stream封装的binder对微服务和消息中间件之间发消息和接消息的操作做了高度的抽象、封装。
    3. binder用于创建binding,从而绑定具体的消息中间件。
    4. 消息中间件都有自己的Binder实现;比如Kafka 的实现KafkaMessageChannelBinder,RabbitMQ的实现RabbitMessageChannelBinder以及RocketMQ 的实现 RocketMQMessageChannelBinder;
  4. binding

    1. 包括 Input Binding 和 Output Binding;
    2. Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触;
  5. 常用的消息中间件:kafka、rabbimq、rokectmq
  6. 开发过程中使用到的类、注解:




























    组成

    说明

    Binder 绑定器(各个消息中间件的绑定器不同)

    Binder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现;

    @Input 注解,表明是一个输入管道

    该注解标识输入通道,通过该输入通道接收消息进入应用程序

    @Output 注解,表明是一个输出管道

    该注解标识输出通道,发布的消息将通过该通道离开应用程序

    @StreamListener 注解

    监听队列,用于消费者的队列的消息接收

    @EnableBinding 注解,开启(Enable)绑定功能

    将信道channel和exchange、topic绑定在一起

3 工作流程

aad10ed1893c443d9d2d9aee98a3c220.png

  1. 第一步:Pom.xml引入依赖
  2. 第二步:application.properties配置

    1. ➌Binder

      1. RocketMQ服务器地址
    2. ➍Bindings

      1. input的3要素键值对:String:BindingProperties对象的属性(这里的string是input)
      2. output的3要素键值对:String:BindingProperties对象的属性(这里的string是output)
  3. 第三步:编码

    1. ➊Source

      1. ➋输出信道,发送消费
    2. ➊Sink

      1. ➋输入信道,消费消息
  4. 第四步:验证

4 源码分析:弊端

de8f4c2eb90846d585832c2070b71840.png

  1. 组id是com.alibaba.cloud

    1. pom.xml中依赖是:spring-cloud-starter-rocketmq

      1. 对应的jar包是:com.alibaba.cloud.spring-cloud-starter-stream-rocketmq.jar

        1. 自动装配解析:META-INF/spring.factories

          1. org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
          2. com.alibaba.cloud.stream.binder.rocketmq.config.RocketMQComponent4BinderAutoConfiguration
          1. 自动装配的类:RocketMQComponent4BinderAutoConfiguration.java

            1. RocketMQ的自动装配:@AutoConfigureAfter({RocketMQAutoConfiguration.class})

              1. RocketMQ属性的配置:@EnableConfigurationProperties({RocketMQProperties.class})

              2. 命名服务的地址:nameServer

              3. 生产者:producer
        2. 源码解析:3eff3e12d3ea49afb636f7598ed256ca.png

          1. Binder属性配置:RocketMQBinderConfigurationProperties.java
            applicatoin.properties中

            # 客户端接入点,必填
            spring.cloud.stream.rocketmq.binder.name-server=192.168.172.128:9876
            对应的源码,如下图所示

            a1377db55800446ea0e473bbc046b011.png

          2. Bindings属性配置:RocketMQBindingProperties.java

            1. 生产者Bindings属性配置
            2. 消费者Bindings属性配置
          3. 消费者属性配置:RocketMQConsumerProperties.java

            1. 标签属性配置
            2. sql属性配置
            3. 集群方式:广播模式 或 集群模式
          4. 生产者属性配置:RocketMQProducerProperties.java

            1. 组属性配置
            2. 事务消息属性配置
            3. 同步异步属性配置
          5. RocketMQExtendedBindingProperties
  2. 分析总结:

    1. 配置项目太少,所以在spring cloud stream加持下RocketMQ的一些功能是实现不了的。即使用spring cloud stream + RocketMQ比单独使用RocketMQ时,功能要少一些,只提供了核心的常用的功能配置属性。
    2. 以等alibaba升级了版本后,直接去看生产者或消费者的属性配置类(RocketMQProducerProperties.java、RocketMQConsumerProperties.java)多加了哪些属性,多加了属性就表明相应地会增加一些对应的功能。

五 配置应用

1 stream + rocketmq的HelloWorld程序

  1. 第一步:新建springboot项目、pom.xml

    1. 第1步:引入spring cloud stream的起步依赖:


      com.alibaba.cloud
      spring-cloud-starter-stream-rocketmq
    2. 第2步:依赖父项目spring cloud alibaba,解决了spring cloud stream的起步依赖没有版本号从而不能引入的问题:
      2.2.1.RELEASE52957f26d9c741229cd21566343df8ec.png
    3. 注意:兼容性问题:

      注意版本需要使用springboot2.2.5

      2.2.5.RELEASE
      2.2.1.RELEASE

    4. 第3步:依赖spring-cloud-starter-web,有
  2. 第二步:application.properties配置文件:组一样竞争关系

    ########## RocketMQ 通用配置
    # 配置绑定器binder连接某个消息中间件(由ip和port指定),高度抽象,用于屏蔽不同消息中间件之间的差异(相当于ORM框架中的hibernate和mybatis)
    spring.cloud.stream.rocketmq.binder.name-server=192.168.172.128:9876

    # 日志级别
    logging.level.com.alibaba.cloud.stream.binder.rocketmq=INFO

    ########## Consumer Config### bindings是微服务与具体消息中间件沟通(发消息,收消息)的桥梁 #######
    # input 的配置:
    spring.cloud.stream.bindings.input.destination=test-topic
    spring.cloud.stream.bindings.input.content-type=text/plain
    spring.cloud.stream.bindings.input.group=test-group

    ########## Produce Config
    # output 的配置如下:
    spring.cloud.stream.bindings.output.destination=test-topic
    spring.cloud.stream.bindings.output.content-type=text/plain
    spring.cloud.stream.bindings.output.group=test-group

  3. 第三步:编码

    1. 第1步:SendService.java

      1. 发送消息:

        @EnableBinding(Source.class)
        @Service
        public class SenderService {

        //Source是spring cloud stream封装的
        @Autowired
        private Source source;
        //发送消息的方法
        public void send(String msg) throws Exception {

        //获取订阅通(信)道:source.output()==MessageChannel

        //发送消息MessageChannel.send()的参数是:Message.java

        // 返回值:布尔类型
        boolean flag = source.output().send(MessageBuilder.withPayload(msg).build());
        System.out.println(“消息发送:” + flag);
        }
        }

    2. 第2步:Application.java发送消息的方式(或使用XxxController.java)26f78b9943fa498badf18af4cb557a7d.png
    3. ReceiveService.java接收消

      1. 方案1:手动调用方法去接收(while true死循环):Sink.input获取订阅信道

        //获取消息通(信)道:source.input()==SubscribableChannel

        //发送消息SubscribableChannel.subscrible()的参数是:Message.java

      2. 方案2:自动监听方式@EnableBinding(Sink.class) + @StreamListener(“input”)

        @EnableBinding(Sink.class)
        public class ReceiveService {

        @StreamListener(“input”)
        public void receiveInput1(String receiveMsg) {
        System.out.println(“input 接收到的消息: “ + receiveMsg);
        }
        }

  4. 第四步:验证

    1. MQ管控台

      1. 目的地有消息:spring.cloud.stream.bindings.input.destination=test-topic
      2. 消息被消费
      3. 控制台打印

2 自定义信道:input1 + output1 + 扩展多个inputX多个outputX

在前面的案例中,我们已经实现了一个基础的 Spring Cloud Stream 消息传递处理操作,但在操作之中使用的是系统提供的 Source (output)、Sink(input),接下来我们来看一下自定义信道名称;

  1. 第一步:pom.xml
  2. 第二步:application.properties::组一样竞争关系

    1. ########## 自定义
      # input 的配置:
      spring.cloud.stream.bindings.input1.destination=test-topic1
      spring.cloud.stream.bindings.input1.content-type=text/plain
      spring.cloud.stream.bindings.input1.group=test-group1

      # output 的配置:
      spring.cloud.stream.bindings.output1.destination=test-topic1
      spring.cloud.stream.bindings.output1.content-type=text/plain

    2. spring.cloud.stream.bindings.input1.group=test-group1
  3. 第三步:编码

    1. 第1步:自定义Source接口

      public interface MySource {

      String OUTPUT1 = “output1”;

  1. @Output(MySource.*OUTPUT1*)
  2. MessageChannel output1();
  3. \}
  4. 2. 2步:发送消息业务层,SendService.java
  5. 3. 3步:自定义Sink接口
  6. public interface MySink \{
  7. String *INPUT1* = "input1";
  8. @Input(MySink.*INPUT1*)
  9. SubscribableChannel input1();
  10. \}
  11. 4. 4步:接收消息业务层,ReceiveService.java
  12. 5. 5步:applicaton.java XxxController.java中发送、消费消息
  13. 1. 发送消息
  14. 2. 接收消息
  15. 1. 传统方式:手动调用receive方法(while死循环)
  16. 2. 自动监听方式:@EnableBinding(Sink.class) + @StreamListener("input")
  1. 第四步:验证
  2. 第五步:扩展

    1. 第1步:多个发送

      1. 0c4987aaae2a4c22a0610c2d9cfbf601.png
    2. 第2步:多个消费

      b1de047822b54e4fa289e53e63a664a2.png

3 Stream + RocketMQ事务消息

(1)流原

5056c34962e547d2a82afd10549541cc.png

  1. 第一步:生产者MQ Producer发送一条“半”消息Send Half Msg到消息中间件服务器MQ Server,即并不提交。

    1. 注:此时这条“半”消息对于消费者MQ Subscriber来说是不能消费、甚至说是不可见的
  2. 第二步:消息中间件服务器MQ Server返回给生产者MQ Producer一个响应,如200 OK。
  3. 第三步:生产者MQ Producer执行本地的事务Local Transaction

    1. 注:本地事务指:你做业务逻辑处理,比如说你要插入订单、比如说你要修改订单状态。
  4. 第四步:如果本地的事务Local Transaction提交成功,返回提交Commit,否则返回回滚Rollback,给到消息服务器MQ Server。消息服务器MQ Server接收到提交Commit后,把“半”消息Send Half Msg变成了“可用的”消息。如果消息服务器MQ Server接收到回滚Rollback,那么消息服务器MQ Server就会把这条消息删除掉Rollback Delete Msg。

    1. 注:此时这条“可用消息”就可以让消费者MQ Subscriber可见、可消费了。
  5. 第五步:如果本地的事务Local Transaction一直没有给消息服务器MQ Server发送状态(提交或回滚状态),即超时。那么消息服务器MQ Server会回调(5 Check back)生产者MQ Producer的方法。
  6. 第六步:生产者MQ Producer的回调(5 Check back)方法中,要写代码去检查本地事务(6 check the state of local Transaction)。检查什么呢?即检查本地事务插入订单操作有没有成功,我们可以通过查询数据库来得到结果。
  7. 第七步:如果检查的结果是插入成功,可以返回提交Commit,否则返回回滚Rollback,给到消息服务器MQ Server。

(2)配置应用:Stream + RocketMQ事务消息

  1. 第一步:创建新的springboot项目
  2. 第二步:pom.xml加依赖::spring-cloud-starter-rocketmq
  3. 第三步:application.properties

    #————————————— 事务消息 begin ————————————————
    #**生产的配置
    spring.cloud.stream.bindings.outputTX.destination=TransactionTopic
    spring.cloud.stream.bindings.outputTX.content-type=application/json
    spring.cloud.stream.rocketmq.bindings.outputTX.producer.group=myTxProducerGroup
    #是否为事务消息,默认为false表示不是事务消息,true**表示是事务消息
    spring.cloud.stream.rocketmq.bindings.outputTX.producer.transactional=true

    #**消费的配置:
    spring.cloud.stream.bindings.inputTX.destination=TransactionTopic
    spring.cloud.stream.bindings.inputTX.content-type=text/plain
    spring.cloud.stream.bindings.inputTX.group=transaction-group
    spring.cloud.stream.rocketmq.bindings.inputTX.consumer.broadcasting=false
    1#————————————— 事务消息 end ————————————————

  4. 第四步:编码

    1. 第1步:生产者

      1. a.自定义信道

        public interface MySource {

        String OUTPUTTx= “outpuTx”;

  1. @Output(MySource.*OUTPUTTX*)
  2. MessageChannel outputTX();
  3. \}
  4. 2. b.发送事务消息,SenderService.java
  5. @Autowired
  6. private MySource mySource;
  7. public <T> void sendTransactionalMsg(T msg, int num) throws Exception {
  8. // 需要构建的消息对象MessageBuilder
  9. MessageBuilder builder = MessageBuilder.withPayload(msg)
  10. .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);
  11. builder.setHeader("test", String.valueOf(num));
  12. Message message = builder.build();
  13. boolean flag = mySource.outputTX().send(message);
  14. System.out.println("inputTX 事务消息发送:" + flag);
  15. }
  16. 3. c.执行本地事务和本地事务检查
  17. // 执行本地事务和本地事务检查:
  18. // 监听器
  19. @RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5, maximumPoolSize = 10)
  20. public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
  21. @Override
  22. public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  23. Object num = msg.getHeaders().get("test");
  24. if ("1".equals(num)) {
  25. System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " unknown");
  26. // 未知,需要二次检查
  27. return RocketMQLocalTransactionState.UNKNOWN;
  28. } else if ("2".equals(num)) {
  29. System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " rollback");
  30. return RocketMQLocalTransactionState.ROLLBACK;
  31. }
  32. System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " commit");
  33. return RocketMQLocalTransactionState.COMMIT;
  34. }
  35. // 回调检查
  36. @Override
  37. public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
  38. System.out.println("check: " + new String((byte[]) msg.getPayload()));
  39. // 假设回调检查没有异常,则返回commit
  40. return RocketMQLocalTransactionState.COMMIT;
  41. }
  42. }
  43. 4. d.程序入口类application.java![b0caaccf747e43aeb11f126b9f0dd43b.png][]
  44. 2. 2步:消费者
  45. 1. a.MySink.java
  46. 2. b.ReceiveService.java
  47. 1. 监听器自动接收消费
  48. 2. .output .input手动调用消息
  49. 3. 3步:程序入口类![631967e803934e1fba6d1bcb413c7d02.png][]
  1. 第五步:测试验证

    1. 生产者

      1. 返回commit,事务正常提交,消费者可见、也可消费消息
      2. 返回rallback,事务回滚,消息删除
      3. 超时,运行二次检查方法
    2. 消费者

      1. 成功接收到所有的消息

4 纲:生产者和消费者按标签tag发送或消费消息

六 开发的时候有两种选择

  1. 一种就是 直接SpringBoot + RocketMQ整合实现消息传送;
  2. 一种就是 使用Spring Cloud Stream对消息中间件的包装,来实现消息传送;

发表评论

表情:
评论列表 (有 0 条评论,184人围观)

还没有评论,来说两句吧...

相关阅读