Spring Cloud Stream 消息驱动

短命女 2022-05-24 13:52 320阅读 0赞

1、什么是Spring Cloud Stream

Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于Spring Boot 来创建独立的,可用于生产的Spring 应用程序。他通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前仅支持RabbitMQ、Kafka。

什么是Spring Integration ? Integration 集成

企业应用集成(EAI)是集成应用之间数据和服务的一种应用技术。四种集成风格:

  1. 文件传输:两个系统生成文件,文件的有效负载就是由另一个系统处理的消息。该类风格的例子之一是针对文件轮询目录或FTP目录,并处理该文件。
  2. 共享数据库:两个系统查询同一个数据库以获取要传递的数据。一个例子是你部署了两个EAR应用,它们的实体类(JPA、Hibernate等)共用同一个表。
  3. 远程过程调用:两个系统都暴露另一个能调用的服务。该类例子有EJB服务,或SOAP和REST服务。
  4. 消息:两个系统连接到一个公用的消息系统,互相交换数据,并利用消息调用行为。该风格的例子就是众所周知的中心辐射式的(hub-and-spoke)JMS架构。
    Spring Integration作为一种企业级集成框架,遵从现代经典书籍《企业集成模式》,为开发者提供了一种便捷的实现模式。Spring Integration构建在Spring控制反转设计模式之上,抽象了消息源和目标,利用消息传送和消息操作来集成应用环境下的各种组件。消息和集成关注点都被框架处理,所以业务组件能更好地与基础设施隔离,从而降低开发者所要面对的复杂的集成职责。

模型

Spring Cloud Stream由一个中间件中立的核组成。应用通过Spring Cloud Stream插入的input和output通道与外界交流。通道通过指定中间件的Binder实现与外部代理连接。
业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可。
这里写图片描述

绑定器

通过定义绑定器作为中间层,实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通过,是的应用程序不需要再考虑各种不同的消息中间件的实现。当需要升级消息中间件,或者是更换其他消息中间件产品时,我们需要做的就是更换对应的Binder绑定器而不需要修改任何应用逻辑 。
目前只提供了RabbitMQ和Kafka的Binder实现

消费组

由于发布-订阅模型使得共享主题的应用之间连接更简便,创建给定应用的不同实例来进行弹性扩张的能力也同样重要。如果存在多个应用实例,那么同一应用的额不同实例便会成为相互竞争的消费者,其中应该只有一个实例处理给定消息。
Spring Cloud Stream通过消费者组的概念给这种情况进行建模。每一个单独的消费者可以使用spring.cloud.stream.bindings.input.group属性来指定一个组名字。下图中展示的消费者们,这一属性被设置为spring.cloud.stream.bindings.input.group=hdfsWrite或者spring.cloud.stream.bindings.input.group=average。
所有订阅给定目标的组都会收到发布消息的一个拷贝,但是每一个组内只有一个成员会收到该消息。默认情况下,如果没有指定组,Spring Cloud Stream 会将该应用指定给一个匿名的独立的单成员消费者组,后者与所有其他组都处于一个发布-订阅关系中。
这里写图片描述

消息分区

Spring Cloud Stream对给定应用的多个实例之间分隔数据予以支持。在分隔方案中,物理交流媒介(如:代理主题)被视为分隔成了多个片(partitions)。一个或者多个生产者应用实例给多个消费者应用实例发送消息并确保相同特征的数据被同一消费者实例处理。
Spring Cloud Stream对分割的进程实例实现进行了抽象。使得Spring Cloud Stream 为不具备分区功能的消息中间件(RabbitMQ)也增加了分区功能扩展。
这里写图片描述

pom.xml

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-test</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.cloud</groupId>
  7. <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
  8. </dependency>
  9. </dependencies>
  10. <repositories>
  11. <repository>
  12. <id>spring-milestones</id>
  13. <name>Spring Milestones</name>
  14. <url>https://repo.spring.io/milestone</url>
  15. <snapshots>
  16. <enabled>false</enabled>
  17. </snapshots>
  18. </repository>
  19. </repositories>
  20. <dependencyManagement>
  21. <dependencies>
  22. <dependency>
  23. <groupId>org.springframework.cloud</groupId>
  24. <artifactId>spring-cloud-dependencies</artifactId>
  25. <version>Dalston.SR1</version>
  26. <type>pom</type>
  27. <scope>import</scope>
  28. </dependency>
  29. </dependencies>

接收者接口

  1. import org.springframework.cloud.stream.annotation.Input;
  2. import org.springframework.messaging.SubscribableChannel;
  3. public interface Sink {
  4. String INPUT="input";
  5. @Input(Sink.INPUT)
  6. SubscribableChannel input();
  7. }

它通过@Input注解绑定了一个名为input的通道。除了Sink之外,Spring Cloud Stream还默认实现了绑定output通道的Source接口,还有结合了Sink和Source的Processor接口,实际使用时我们也可以自己通过@Input和@Output注解来定义绑定消息通道的接口。当我们需要为@EnableBinding指定多个接口来绑定消息通道的时候,可以这样定义:@EnableBinding(value = {Sink.class, Source.class})。

接受者实现

  1. @EnableBinding(Sink.class) //绑定@input注解的接口 实现消息通道的绑定
  2. public class SinkReceiver {
  3. @StreamListener(Sink.INPUT)////将被修饰的方法注册为消息中间件上数据流的事件监听器,
  4. public void receive(Object object){
  5. System.out.println(object+" .........");
  6. }
  7. }

生产者接口

  1. @Component
  2. public interface SinkSender {
  3. String OUTPUT = "input";
  4. @Output(SinkSender.OUTPUT)
  5. MessageChannel output();
  6. }

生产者实现

  1. @RunWith(SpringRunner.class)
  2. @EnableBinding(value = {SinkSender.class})
  3. public class SinkApplicationTests {
  4. @Autowired
  5. private SinkSender sinkSender;
  6. @org.junit.Test
  7. public void sindMessage(){
  8. sinkSender.output().send(MessageBuilder.withPayload("啥").build());
  9. }
  10. }

先启动消费者 在启动生产者

Spring integration支持

@ServiceActivator 和 @InboundChannelAdapter
@ServiceActivator注解 和 @StreamListener 都实现了对消息的监听,ServiceActivator 没有内置消息转换,需要自己实现转换
@StreamListener 不需要自己实现,只需要在配置文件增加spring.cloud.stream.bindings.input.content-type=application/json 属性(默认支持json,json格式的可以不用配置)
详细内容可运行demo:metadata-rabbit-stream-integration-consumer和metadata-rabbit-stream-integration-server

生产者

  1. @RunWith(SpringRunner.class)
  2. @EnableBinding(value = {SinkSender.class})
  3. public class SinkApplicationTests {
  4. @Autowired
  5. private SinkSender sinkSender;
  6. @org.junit.Test
  7. public void sindMessage(){
  8. Student student=new Student();
  9. student.setId(1);
  10. student.setName("tom");
  11. sinkSender.output().send(MessageBuilder.withPayload(student).build());
  12. }
  13. }

消费者

  1. @EnableBinding({Sink.class}) //绑定@input注解的接口 实现消息通道的绑定
  2. public class SinkReceiver {
  3. @StreamListener(Sink.INPUT)////将被修饰的方法注册为消息中间件上数据流的事件监听器,
  4. public void receive(Student object){
  5. System.out.println(object+" .........");
  6. }
  7. }

发表评论

表情:
评论列表 (有 0 条评论,320人围观)

还没有评论,来说两句吧...

相关阅读