cloud stream 官方文档阅读笔记1
1、创建项目
在 spring initialzar 中选择 Rabbitmq和cloud Stream两个模块,最好的方法是搜索就可以出来了
2、一个简单的例子
修改生成的主程序为下面的形式
@SpringBootApplication
@EnableBinding(Sink.class)
public class LoggingConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(LoggingConsumerApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void handle(Person person) {
System.out.println("Received: " + person);
}
public static class Person {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String toString() {
return this.name;
}
}
}
这里使用了@EnableBinding(Sink.class),使得Sink能够绑定消息的 input和output。初始化了框架绑定到消息中间件
而且它自动生成了配置(包含队列,主题和其他的组件)通过 Sink.INPUT 消息通道。
这里加入了一个handler方法去接收收到的Person类型的消息,该方法将直接将接收到的消息转化成Person对象。
现在,我们已经有一个完整的springcloud Stream项目来监听消息队列了。不过在此之前,我们默认你选择了RabbitMQ作为
消息中间件。默认你RabbitMQ服务已经安装且正在运行(注:默认在本机的RabbitMQ默认端口打开服务,而没有修改端口),现在你可以通过运行
主程序的 main 方法来开始这个程序了。
你在控制台中可能会看见这些输出:
--- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg, bound to: input
--- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
--- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2a3a299:0/SimpleConnection@66c83fc8. . .
. . .
--- [ main] o.s.i.a.i.AmqpInboundChannelAdapter : started inbound.input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
. . .
--- [ main] c.e.l.LoggingConsumerApplication : Started LoggingConsumerApplication in 2.531 seconds (JVM running for 2.897)
现在区你的 RabbitMQ web管理界面或者其他 消息队列客户端向当前队列发送一个消息,发送的队列就是以上控制台输出中 input
绑定的队列(注:rabbitmq支持动态绑定,根据程序的实现进行绑定)。例如,根据以上信息,我应该向
input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg 队列发送检测信息。其中,anonymous.CbMIwdkJSBO1ZoPDOtHtCg 代表的是一个消息
组而且被注册了。所以它在你的环境中可能是不一样的,更优秀的做法是,你可以给它指派一个确切的名字,只要你喜欢,你可以这样做:
spring.cloud.stream.bindings.input.group=hello
指派你的组名为 hello
在消息队列中我们发一个JSON格式的消息来代表一个Person对象,就像下面这样:
{ "name":"Sam Spade"}
然后在你的控制台下面就应该会输出
Received: Sam Spade
还没有评论,来说两句吧...