【Spring Cloud】分布式必学springcloud(十一)——消息驱动 SpringCloud Stream
一、前言
在前面的博客中,基本上已经把springcloud系列的大部分都介绍过了。如果有太明白的小白,还是建议从小编的第一篇博客进行学习。
在这篇博客中,小白向大家介绍一个消息事件驱动框架——Spring Cloud Stream。
二、什么是Spring Cloud Stream?
首先要说说消息驱动和事件驱动:
假设系统是这样的:处理A事后还有B事要处理
- 事件驱动,告诉处理A事的程序B事是如何做的,在A事处理完后,直接调用处理B事的程序(或接口)来处理B事。
消息驱动,处理完A事,放个消息在某个地方,意思是我处理完A事了,此时,处理A的程序已经完事大吉了。至于何时,如何处理B事,由另一个程序根据那个消息来处理。
事件模式耦合高,同模块内好用;消息模式耦合低,跨模块好用。事件模式集成其它语言比较繁琐,消息模式集成其他语言比较轻松。事件是侵入式设计,霸占你的主循环;消息是非侵入式设计,将主循环该怎样设计的自由留给用户。
Spring Cloud Stream 就是一个可以使得微服务拥有消息驱动的能力的框架。提供了消息驱动的机制,它通过Spring Integration来连接消息中间件以实现消息驱动。并为消息中间件提供了个性化的自动化配置,引入了发布-订阅、消费组和分区三个核心概念。
三、快速上手
3.1 准备工作
还是老样子,依旧是依托上一篇博客整理的内容,小编把上几次的demo都已经上传到git,大家可以自行下载。
[https://github.com/AresKingCarry/SpringCloudDemo][https_github.com_AresKingCarry_SpringCloudDemo]
另外既然是消息驱动,自然需要用到消息中间件,目前为止,spring cloud stream只支持rabbitmq和kafuka。这里呢,小编继续使用rabbitmq。
3.2 消息接收端
建立一个新的springboot项目,命名为stream。
3.3 添加对stream 的依赖
因为使用的是rabbitmq,所以添加`spring-cloud-starter-stream-rabbit`。
spring-cloud-starter-stream-rabbit依赖是Spring Cloud Stream对RabbitMQ的封装,这里边也包含了对RabbitMQ的自动化配置,比如连接的RabbitMQ的默认地址就是localhost,默认端口就是5672,默认用户名是guest,默认密码也是guest,由于我们的RabbitMQ都是采用了默认配置,所以这里的配置可以不去修改,一样也可以运行。如果小伙伴需要修改,则和上篇文章一样,直接在application.properties中修改即可。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
3.4 创建接收器
建立一个类SinkRecevier,用来接收RabbitMQ发送来的消息:
package com.wl.stream.listener;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
/** * Created by Ares on 2018/4/18. */
@EnableBinding(Sink.class)
public class SinkRecevier {
@StreamListener(Sink.INPUT)
public void receive(Object payload){
System.out.println("接收到消息:"+payload);
}
}
代码说明:
`@EnableBinding`注解,绑定消息通道。该注解用来指定一个或者多个定义了`@Input`或`@Output`注解的接口。
在代码中,我们通过@EnableBinding(Sink.class),绑定了Sink接口,Sink接口是Spring Cloud 中默认绑定输入通道,除此之外,还有绑定输出通道Source,还有绑定输入输出通道的Processor通道。除了Spring Cloud定义的接口外,我们也可以自定义。
@StreamListener注解是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。
在代码中,我们通过receive方注册为input消息通道的处理方法,当监听到input消息通道的消息的时候,receive方法会运行。
3.7 修改配置文件
在配置文件中,我们要配置要监听的rabbitmq的地址,用户名等。
server:
port: 10000
spring:
rabbitmq:
host: 192.168.137.16
port: 5672
username: admin
password: admin
application:
name: stream
cloud:
stream:
bindings:
input:
destination: trade
contentType: 'application/json'
其中:
cloud.stream.bindings.input.destination: trade
cloud.stream.bindings.input.contentType: ‘application/json’
分别指明了要监听的队列为trade,监听的数据格式为json。
3.6 运行项目
我们运行项目,启动代码为默认代码,不用修改,直接运行就可以了
运行后,在mq的监控平台会发现,队列中多了一条记录,`trade.anonymous.XK5Pb5dZTwSoly1iowTDVQ`,这个就是我们监听的队列。
我们点击这条队列,在队列中发送一条消息:
在控制平台中,会看到监听打印出来的信息:
这样就简单的实现了接收端的搭建。下面我们进行发送端的搭建。
3.7 搭建发送端
同样我们需要建立一个springboot项目,streamsend。
3.8 添加依赖
通接收端,添加对rabbitmq的依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
3.9 编写发送端的代码
建立一个类sender:
package com.wl.streamsend.sender;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.InboundChannelAdapter;
import java.text.SimpleDateFormat;
import java.util.Date;
/** * Created by Ares on 2018/4/18. */
@EnableBinding(Source.class)
public class Sender {
@InboundChannelAdapter(value = Source.OUTPUT)
public String timerMessageSource() {
String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
System.out.println(format);
return format;
}
}
代码说明:
同样用到了上文提到的注解@EnableBinding,因为是要发送消息,所以绑定了发送接口Source。
另外,还有@InboundChannelAdapter注解,发送接口。
3.10 修改配置文件
和接收端一样,绑定指定的队列。
server:
port: 10001
spring:
rabbitmq:
host: 192.168.137.16
port: 5672
username: admin
password: admin
application:
name: stream-send
cloud:
stream:
bindings:
output:
destination: trade
contentType: 'application/json'
3.11 发送消息
启动类也是默认的,没有做修改。启动项目。
会看到接收和发送控制台打印出日志。
四、小结
通过这次学习,发现springcloud已经利用注解把很多代码省略,效果非常好,开发人员通过简单的配置注解,就可以简单的开发,约定大于配置。
还没有评论,来说两句吧...