Spring Cloud Stream消息总线

布满荆棘的人生 2022-01-28 11:37 356阅读 0赞

Spring Cloud Stream消息总线

Springcloud 里面对于MQ的整合一个是前一篇的消息总线一个是本文介绍的消息驱动

大体要学习这么几个知识点:

课题:SpringCloud消息驱动Stream
1.什么是SpringCloud消息驱动
2.消息驱动Stream实现原理
3.消息驱动Stream与传统MQ区别
4.基于消息驱动整合Kafka
5.基于消息驱动整合Rabbitmq
6.基于消息驱动Stream消息分组

什么是消息驱动?

SpringCloud Stream消息驱动可以简化开发人员对消息中间件的使用复杂度,让系统开发人员更多尽力专注与核心业务逻辑的开发。SpringCloud Stream基于SpringBoot实现,自动配置化的功能可以帮助我们快速上手学习,类似与我们之前学习的orm框架,可以平滑的切换多种不同的数据库。

目前SpringCloud Stream 目前只支持 RabbitMQ和kafka

在topic模式上 区别很大的 这两个MQ SpringCloud去整合了 类似于Hibernate 通过对象得到sql语句

开发人员不需要知道具体的MQ底层实现,只需要关心业务逻辑编码就OK了

底层是如何实现的? Stream组件对rabbitMQ和kafka进行封装成同一个API,开发人员只需要对接Stream即可 !

消息驱动原理

绑定器

通过定义绑定器作为中间层,实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通过,是的应用程序不需要再考虑各种不同的消息中间件的实现。当需要升级消息中间件,或者是更换其他消息中间件产品时,我们需要做的就是更换对应的Binder绑定器而不需要修改任何应用逻辑 。

1179709-20190119234815728-261348111.png

在该模型图上有如下几个核心概念:

  • Source: 当需要发送消息时,我们就需要通过Source,Source将会把我们所要发送的消息(POJO对象)进行序列化(默认转换成JSON格式字符串),然后将这些数据发送到Channel中;
  • Sink: 当我们需要监听消息时就需要通过Sink来,Sink负责从消息通道中获取消息,并将消息反序列化成消息对象(POJO对象),然后交给具体的消息监听处理进行业务处理;
  • Channel: 消息通道是Stream的抽象之一。通常我们向消息中间件发送消息或者监听消息时需要指定主题(Topic)/消息队列名称,但这样一旦我们需要变更主题名称的时候需要修改消息发送或者消息监听的代码,但是通过Channel抽象,我们的业务代码只需要对Channel就可以了,具体这个Channel对应的是那个主题,就可以在配置文件中来指定,这样当主题变更的时候我们就不用对代码做任何修改,从而实现了与具体消息中间件的解耦;
  • Binder: Stream中另外一个抽象层。通过不同的Binder可以实现与不同消息中间件的整合,比如上面的示例我们所使用的就是针对Kafka的Binder,通过Binder提供统一的消息收发接口,从而使得我们可以根据实际需要部署不同的消息中间件,或者根据实际生产中所部署的消息中间件来调整我们的配置。

消息驱动有通道,绑定MQ。

生产者消息传递到通道里面之后,通道是跟MQ做绑定,封装的。消息一旦到MQ之后,发送给消费者通道,然后消费者进行消费 。绑定部分是底层帮助实现的。

封装也只是实现了部分功能。MQ的功能不是百分百都实现了的

消息驱动环境搭建

  1. 生产者:
  2. mavenpom

1179709-20190120004830969-1744921365.png

引入配置文件pom:

其实就是对springboot整合rabbitmq再进行了一层封装

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  2. <modelVersion>4.0.0</modelVersion>
  3. <groupId>com.toov5</groupId>
  4. <artifactId>SpringCloud-stream-producer</artifactId>
  5. <version>0.0.1-SNAPSHOT</version>
  6. <parent>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-parent</artifactId>
  9. <version>2.0.1.RELEASE</version>
  10. </parent>
  11. <dependencies>
  12. <!-- SpringBoot整合Web组件 -->
  13. <dependency>
  14. <groupId>org.springframework.boot</groupId>
  15. <artifactId>spring-boot-starter-web</artifactId>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.springframework.cloud</groupId>
  19. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  20. <version>2.0.1.RELEASE</version>
  21. </dependency>
  22. </dependencies>
  23. </project>

然后编码:

在rebbitmq中需要有交换机 队列 底层都帮助实现了! 以通道名称创建交换机 消费者启动时候 随机创建一个队列名称

创建通道:

  1. package com.toov5.stream;
  2. //创建发送通道
  3. import org.springframework.cloud.stream.annotation.Output;
  4. import org.springframework.messaging.SubscribableChannel;
  5. public interface SendMsgInterface {
  6. //创建发送通道
  7. @Output("my_stream_channel")
  8. SubscribableChannel sendMsg();
  9. }

controller:

  1. package com.toov5.controller;
  2. import java.util.UUID;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.messaging.Message;
  5. import org.springframework.messaging.support.MessageBuilder;
  6. import com.toov5.stream.SendMsgInterface;
  7. public class SendMsgController {
  8. //生产者流程
  9. // 1 生产者发送消息通道 获取通道
  10. @Autowired
  11. private SendMsgInterface sendMsgInterface;
  12. //2. 生产者投递消息 往通道发送消息
  13. public String sendMsg() {
  14. String msg = UUID.randomUUID().toString();
  15. System.out.println("生产者发送内容msg:" + msg);
  16. Message build = MessageBuilder.withPayload(msg.getBytes()).build();
  17. sendMsgInterface.sendMsg().send(build);
  18. return "success";
  19. }
  20. //3. 开启绑定 然后业务逻辑中就可以从Springboot中拿对象了 启动类去绑定
  21. }

启动类: 开启通道绑定

  1. package com.toov5;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. import org.springframework.cloud.stream.annotation.EnableBinding;
  5. import com.toov5.stream.SendMsgInterface;
  6. @SpringBootApplication
  7. @EnableBinding(SendMsgInterface.class) //开启绑定通道(通道接口) 然后业务逻辑中就可以从Springboot中拿对象了
  8. public class AppProducer {
  9. public static void main(String[] args) {
  10. SpringApplication.run(AppProducer.class, args);
  11. }
  12. }

yml:

  1. server:
  2. port: 9000
  3. spring:
  4. application:
  5. name: spingcloud-stream-producer
  6. rabbitmq:
  7. ####连接地址
  8. host: 192.168.91.6
  9. ####端口号
  10. port: 5672
  11. ####账号
  12. username: admin
  13. ####密码
  14. password: admin
  15. ### 地址 主机独立的virtualhost
  16. virtual-host: /admin_toov5

启动生产者:

1179709-20190120011559448-1709559510.png

此时还没有创建队列 没有进行绑定:

小结:只需要发送信息到通道里面就OK了

下面创建消费者: 对应生产者去写就OK了

maven依赖:

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  2. <modelVersion>4.0.0</modelVersion>
  3. <groupId>com.toov5</groupId>
  4. <artifactId>SpringCloud-stream-consumer</artifactId>
  5. <version>0.0.1-SNAPSHOT</version>
  6. <parent>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-parent</artifactId>
  9. <version>2.0.1.RELEASE</version>
  10. </parent>
  11. <dependencies>
  12. <!-- SpringBoot整合Web组件 -->
  13. <dependency>
  14. <groupId>org.springframework.boot</groupId>
  15. <artifactId>spring-boot-starter-web</artifactId>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.springframework.cloud</groupId>
  19. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  20. <version>2.0.1.RELEASE</version>
  21. </dependency>
  22. </dependencies>
  23. </project>

通道接口:

  1. package com.toov5.stream;
  2. //创建发送通道
  3. import org.springframework.cloud.stream.annotation.Input;
  4. import org.springframework.messaging.SubscribableChannel;
  5. public interface ReadMsgInterface {
  6. //创建发送通道
  7. @Input("my_stream_channel")
  8. SubscribableChannel readMsg();
  9. }

消费者:监听通道

  1. package com.toov5.consumer;
  2. import org.springframework.cloud.stream.annotation.StreamListener;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class Consumer {
  6. @StreamListener("my_stream_channel") //监听生产者响应的通道
  7. public void readMsg(String msg) {
  8. System.out.println("消费者获取到生产者投递的消息:"+msg);
  9. }
  10. }

启动类: 绑定通道

  1. package com.toov5;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. import org.springframework.cloud.stream.annotation.EnableBinding;
  5. import com.toov5.stream.ReadMsgInterface;
  6. @SpringBootApplication
  7. @EnableBinding(ReadMsgInterface.class)
  8. public class AppConsumer {
  9. public static void main(String[] args) {
  10. SpringApplication.run(AppConsumer.class, args);
  11. }
  12. }

yml:

  1. server:
  2. port: 8000
  3. spring:
  4. application:
  5. name: spingcloud-stream-consumer
  6. rabbitmq:
  7. ####连接地址
  8. host: 192.168.91.6
  9. ####端口号
  10. port: 5672
  11. ####账号
  12. username: admin
  13. ####密码
  14. password: admin
  15. ### 地址 主机独立的virtualhost
  16. virtual-host: /admin_toov5

消费者是有队列的 我们在代码中没有写队列的相关哦 底层会创建的哦 底层自动创建一个队列 绑定交换机 队列名字是随机的

消费者启动后的多了个队列

1179709-20190120013219879-423115665.png

  1. 也绑定上了

1179709-20190120013304378-130032464.png

下面访问接口 发送个消息试试:

1179709-20190120013650781-1893871098.png

1179709-20190120013659635-1953942402.png

如果换成kafka,修改下maven依赖,配置文件依赖名称改成kafka就OK了~~kafka 的连接信息一配置就搞定

关于消息分组:

  1. 生产者投递消息 投递消息到通道后 两个消费者会都进行消费 重复消费了
  2. 通过分组去解决,同一个组的消费者会进行轮训消费,只有一个消费者进行消费
  3. ![1179709-20190120014308057-1422533396.png][]

consumer关闭,控制台中的队列会消失了。连接关闭会消失,因为木有固定队列哦 随机生成的

我们启动两个队列 通过端口号去标识:

1179709-20190120014737373-1010973441.png

并且绑定了同一个交换机:

1179709-20190120014923071-672602391.png

访问接口后:

1179709-20190120015004738-420992295.png

1179709-20190120015026412-1865340570.png

都能收到消息呀

分组配置:

消费者yml加入配置:

  1. cloud:
  2. stream:
  3. bindings:
  4. my_stream_channel: ###指定 管道名称!!!!
  5. #指定该应用实例属于 stream 消费组
  6. group: stream

此时的yml:

  1. server:
  2. port: 8002
  3. spring:
  4. application:
  5. name: spingcloud-stream-consumer
  6. rabbitmq:
  7. ####连接地址
  8. host: 192.168.91.6
  9. ####端口号
  10. port: 5672
  11. ####账号
  12. username: admin
  13. ####密码
  14. password: admin
  15. ### 地址 主机独立的virtualhost
  16. virtual-host: /admin_toov5
  17. cloud:
  18. stream:
  19. bindings:
  20. my_stream_channel: ###指定 管道名称!!!!
  21. #指定该应用实例属于 stream 消费组
  22. group: stream

启动两个consumer:

分组之后只有一个队列!

1179709-20190120015549357-696382427.png

绑定信息依然不变

1179709-20190120015628408-744522995.png

访问:

1179709-20190120015710784-2069909321.png

1179709-20190120015721256-1967634509.png

只有一个能接收消息 且轮训

分组 的概念是由于这个框架搞的 不是原先的MQ固有的

Kafka的策略整合 暂时先不写了 很简单的 有空再补上吧~~~

发表评论

表情:
评论列表 (有 0 条评论,356人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Spring Cloud Bus(消息总线)(1)

    消息代理 消息代理是一种消息验证、传输、路由的架构模式。它在应用程序之间起到通信调度并最小化应用之间的依赖作用,使得应用程序可以高效地解耦通信过程。消息代理是一个中间件产