【Spring Cloud】分布式必学springcloud(十一)——消息驱动 SpringCloud Stream

深碍√TFBOYSˉ_ 2022-05-26 05:50 261阅读 0赞

一、前言

  1. 在前面的博客中,基本上已经把springcloud系列的大部分都介绍过了。如果有太明白的小白,还是建议从小编的第一篇博客进行学习。
  2. 在这篇博客中,小白向大家介绍一个消息事件驱动框架——Spring Cloud Stream

二、什么是Spring Cloud Stream?

  1. 首先要说说消息驱动和事件驱动:

假设系统是这样的:处理A事后还有B事要处理

  • 事件驱动,告诉处理A事的程序B事是如何做的,在A事处理完后,直接调用处理B事的程序(或接口)来处理B事。
  • 消息驱动,处理完A事,放个消息在某个地方,意思是我处理完A事了,此时,处理A的程序已经完事大吉了。至于何时,如何处理B事,由另一个程序根据那个消息来处理。

    事件模式耦合高,同模块内好用;消息模式耦合低,跨模块好用。事件模式集成其它语言比较繁琐,消息模式集成其他语言比较轻松。事件是侵入式设计,霸占你的主循环;消息是非侵入式设计,将主循环该怎样设计的自由留给用户。

    Spring Cloud Stream 就是一个可以使得微服务拥有消息驱动的能力的框架。提供了消息驱动的机制,它通过Spring Integration来连接消息中间件以实现消息驱动。并为消息中间件提供了个性化的自动化配置,引入了发布-订阅、消费组和分区三个核心概念。

三、快速上手

3.1 准备工作

  1. 还是老样子,依旧是依托上一篇博客整理的内容,小编把上几次的demo都已经上传到git,大家可以自行下载。
  2. [https://github.com/AresKingCarry/SpringCloudDemo][https_github.com_AresKingCarry_SpringCloudDemo]
  3. 另外既然是消息驱动,自然需要用到消息中间件,目前为止,spring cloud stream只支持rabbitmqkafuka。这里呢,小编继续使用rabbitmq

3.2 消息接收端

  1. 建立一个新的springboot项目,命名为stream

这里写图片描述

3.3 添加对stream 的依赖

  1. 因为使用的是rabbitmq,所以添加`spring-cloud-starter-stream-rabbit`
  2. spring-cloud-starter-stream-rabbit依赖是Spring Cloud StreamRabbitMQ的封装,这里边也包含了对RabbitMQ的自动化配置,比如连接的RabbitMQ的默认地址就是localhost,默认端口就是5672,默认用户名是guest,默认密码也是guest,由于我们的RabbitMQ都是采用了默认配置,所以这里的配置可以不去修改,一样也可以运行。如果小伙伴需要修改,则和上篇文章一样,直接在application.properties中修改即可。
  3. <dependency>
  4. <groupId>org.springframework.cloud</groupId>
  5. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  6. </dependency>

3.4 创建接收器

  1. 建立一个类SinkRecevier,用来接收RabbitMQ发送来的消息:
  2. package com.wl.stream.listener;
  3. import org.springframework.cloud.stream.annotation.EnableBinding;
  4. import org.springframework.cloud.stream.annotation.StreamListener;
  5. import org.springframework.cloud.stream.messaging.Sink;
  6. /** * Created by Ares on 2018/4/18. */
  7. @EnableBinding(Sink.class)
  8. public class SinkRecevier {
  9. @StreamListener(Sink.INPUT)
  10. public void receive(Object payload){
  11. System.out.println("接收到消息:"+payload);
  12. }
  13. }
  14. 代码说明:
  15. `@EnableBinding`注解,绑定消息通道。该注解用来指定一个或者多个定义了`@Input``@Output`注解的接口。
  16. 在代码中,我们通过@EnableBinding(Sink.class),绑定了Sink接口,Sink接口是Spring Cloud 中默认绑定输入通道,除此之外,还有绑定输出通道Source,还有绑定输入输出通道的Processor通道。除了Spring Cloud定义的接口外,我们也可以自定义。
  17. @StreamListener注解是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。
  18. 在代码中,我们通过receive方注册为input消息通道的处理方法,当监听到input消息通道的消息的时候,receive方法会运行。

3.7 修改配置文件

  1. 在配置文件中,我们要配置要监听的rabbitmq的地址,用户名等。
  2. server:
  3. port: 10000
  4. spring:
  5. rabbitmq:
  6. host: 192.168.137.16
  7. port: 5672
  8. username: admin
  9. password: admin
  10. application:
  11. name: stream
  12. cloud:
  13. stream:
  14. bindings:
  15. input:
  16. destination: trade
  17. contentType: 'application/json'
  18. 其中:
  19. cloud.stream.bindings.input.destination: trade
  20. cloud.stream.bindings.input.contentType: application/json
  21. 分别指明了要监听的队列为trade,监听的数据格式为json

3.6 运行项目

  1. 我们运行项目,启动代码为默认代码,不用修改,直接运行就可以了
  2. 运行后,在mq的监控平台会发现,队列中多了一条记录,`trade.anonymous.XK5Pb5dZTwSoly1iowTDVQ`,这个就是我们监听的队列。

这里写图片描述

  1. 我们点击这条队列,在队列中发送一条消息:

这里写图片描述

  1. 在控制平台中,会看到监听打印出来的信息:

这里写图片描述

  1. 这样就简单的实现了接收端的搭建。下面我们进行发送端的搭建。

3.7 搭建发送端

  1. 同样我们需要建立一个springboot项目,streamsend

这里写图片描述

3.8 添加依赖

  1. 通接收端,添加对rabbitmq的依赖:
  2. <dependency>
  3. <groupId>org.springframework.cloud</groupId>
  4. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  5. </dependency>

3.9 编写发送端的代码

  1. 建立一个类sender
  2. package com.wl.streamsend.sender;
  3. import org.springframework.cloud.stream.annotation.EnableBinding;
  4. import org.springframework.cloud.stream.messaging.Source;
  5. import org.springframework.integration.annotation.InboundChannelAdapter;
  6. import java.text.SimpleDateFormat;
  7. import java.util.Date;
  8. /** * Created by Ares on 2018/4/18. */
  9. @EnableBinding(Source.class)
  10. public class Sender {
  11. @InboundChannelAdapter(value = Source.OUTPUT)
  12. public String timerMessageSource() {
  13. String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
  14. System.out.println(format);
  15. return format;
  16. }
  17. }
  18. 代码说明:
  19. 同样用到了上文提到的注解@EnableBinding,因为是要发送消息,所以绑定了发送接口Source
  20. 另外,还有@InboundChannelAdapter注解,发送接口。

3.10 修改配置文件

  1. 和接收端一样,绑定指定的队列。
  2. server:
  3. port: 10001
  4. spring:
  5. rabbitmq:
  6. host: 192.168.137.16
  7. port: 5672
  8. username: admin
  9. password: admin
  10. application:
  11. name: stream-send
  12. cloud:
  13. stream:
  14. bindings:
  15. output:
  16. destination: trade
  17. contentType: 'application/json'

3.11 发送消息

  1. 启动类也是默认的,没有做修改。启动项目。
  2. 会看到接收和发送控制台打印出日志。

这里写图片描述

四、小结

  1. 通过这次学习,发现springcloud已经利用注解把很多代码省略,效果非常好,开发人员通过简单的配置注解,就可以简单的开发,约定大于配置。

发表评论

表情:
评论列表 (有 0 条评论,261人围观)

还没有评论,来说两句吧...

相关阅读