第四章 Spring集成和实战笔记
一、与 Spring 集成—生产者端
具体代码实现,参见rq-spring-with和rq-spring-with-consumer模块
1、pom文件
使用Maven,这里项目中使用的4.3.11,所以这里引入的是rabbit是2.0.0,如果兼容性的话请自行去Spring的官网上去查。
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
这里补充一下,spring的引入也是对原生进行包装:
2、统一配置
配置文件中增加命名空间:
<beans xmlns**=”http://www.springframework.org/schema/beans"**
**xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance"**
** xmlns:rabbit=”http://www.springframework.org/schema/rabbit"**
**xmlns:context=”http://www.springframework.org/schema/context"**
**xsi:schemaLocation=”http://www.springframework.org/schema/beans**
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
**http://www.springframework.org/schema/rabbit**
http://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd“>
3、连接相关配置:
客户端连接:
<bean id**=”rabbitConnectionFactory” class**=”org.springframework.amqp.rabbit.connection.CachingConnectionFactory”>
<constructor-arg value**=”10.200.169.5”/>
<property name**=”username” value**=”chj”/>
<property name**=”password” value**=”123456”/>
<property name**=”virtualHost” value**=”chj_vhost” />
<property name**=”channelCacheSize” value**=”8”/>
<property name**=”port” value**=”5672”></property>
</bean**>
管理配置:
<!--Spring的rabbitmq admin-->
<rabbit:admin connection-factory="rabbitConnectionFactory"/>
4、生产者端(基础配置)
RabbitTemplate配置:
<bean id**=”rabbitTemplate” class**=”org.springframework.amqp.rabbit.core.RabbitTemplate”>
<constructor-arg ref**=”rabbitConnectionFactory”/>
</bean**>
或下面这种声明方式也是可以的。
<rabbit**:templete id**=”rabbitTemplate2” connection-factory**=”rabbitConnectionFactory”** />
5、队列和交换器
可以在生产者配置文件中增加队列和交换器:
<rabbit**:queue name**=”h4_queue” durable**=”false”></rabbit**:queue>
<rabbit**:fanout-exchange name**=”fanout-exchange”
xmlns**=”http://www.springframework.org/schema/rabbit"** durable**=”false”>
<bindings>
<binding queue**=”h4_queue”></binding>
</bindings>
</rabbit**:fanout-exchange>
<rabbit**:topic-exchange name**=”topic-exchange”**
xmlns**=”http://www.springframework.org/schema/rabbit"** durable**=”false”**>
</rabbit**:topic-exchange**>
6、代码实现
发送消息时,使用rabbitTemplate即可。同时还可以给消息配置属性MessageProperties。
/**
*类说明:spring生产者,模拟两种消息发送,一种fanout、一种topic
*/
@Controller
@RequestMapping("/rabbitmq")
public class RabbitMqController {
private Logger logger = LoggerFactory.getLogger(RabbitMqController.class);
@Autowired
RabbitTemplate rabbitTemplate;
@ResponseBody
@RequestMapping("/fanoutSender")
public String fanoutSender(@RequestParam("message")String message){
String opt="";
try {
for(int i=0;i<3;i++){
String str = "Fanout,the message_"+i+" is : "+message;
logger.info("**************************Send Message:["+str+"]");
//TODO 生产者发送消息
rabbitTemplate.send("fanout-exchange","", new Message(str.getBytes(),new MessageProperties()));
}
opt = "suc";
} catch (Exception e) {
opt = e.getCause().toString();
}
return opt;
}
@ResponseBody
@RequestMapping("/topicSender")
public String topicSender(@RequestParam("message")String message){
String opt="";
try {
String[] routekeys={"king","mark","james"};
String[] modules={"kafka","jvm","redis"};
for(int i=0;i<routekeys.length;i++){
for(int j=0;j<modules.length;j++){
String routeKey = routekeys[i]+"."+modules[j];
String str = "Topic,the message_["+i+","+j+"] is [rk:"+routeKey+"][msg:"+message+"]";
logger.info("**************************Send Message:["+str+"]");
//TODO 生产者发送消息 属性可以自由配置
MessageProperties messageProperties = new MessageProperties();
rabbitTemplate.send("topic-exchange",routeKey, new Message(str.getBytes(), messageProperties));
}
}
opt = "suc";
} catch (Exception e) {
opt = e.getCause().toString();
}
return opt;
}
}
这里重申一下,生产者和消费都可以申明交换器、申明队列、绑定关系,一般处理是生产者和消费者都相同配置,这样以防止万一,如果生产者或者消费者单独启动,发送或者消费数据不会出现问题。
二、与 Spring 集成—消费者端
1、消费者端(基础配置)
队列和交换器**:**消费者中也可配置队列和交换器,以及指定队列和交换器绑定的路由键
<rabbit**:queue name**=”h1_queue” durable**=”false”/>
<rabbit**:queue name**=”h2_queue” durable**=”false”/>
<rabbit**:queue name**=”h3_queue” durable**=”false”/>
<rabbit**:fanout-exchange name**=”fanout-exchange” xmlns**=”http://www.springframework.org/schema/rabbit“ durable**=”false”>
<rabbit**:bindings>
<rabbit**:binding queue**=”h1_queue”></rabbit**:binding>
<rabbit**:binding queue**=”h2_queue”></rabbit**:binding>
<rabbit**:binding queue**=”h3_queue”></rabbit**:binding>
</rabbit**:bindings>
</rabbit**:fanout-exchange*>
*
<rabbit**:queue name**=”all_queue” durable**=”false”/>
<rabbit**:queue name**=”all_kafka_queue” durable**=”false”/>
<rabbit**:queue name**=”king_kafka_queue” durable**=”false”/>
<rabbit**:queue name**=”king_all_queue” durable**=”false”/>
<rabbit**:topic-exchange name**=”topic-exchange” xmlns**=”http://www.springframework.org/schema/rabbit"** durable**=”false”>
<rabbit**:bindings>
<binding pattern**=”#“ queue**=”all_queue”></binding>
<binding pattern**=”*.kafka” queue**=”all_kafka_queue”></binding>
<binding pattern**=”king.kafka” queue**=”king_kafka_queue”></binding>
<binding pattern**=”king.*“ queue**=”king_all_queue”></binding>
</rabbit**:bindings>
</rabbit**:topic-exchange>
2、消费者bean
两种方式:一种配置文件,一种注解定义
配置文件:
<bean id**=”h1_Service” class**=”com.chj.service.fanout.H1_Service”></bean>
<bean id**=”h2_Service” class**=”com.chj.service.fanout.H2_Service”></bean>
<bean id**=”h3_Service” class**=”com.chj.service.fanout.H3_Service”></bean>
注解定义:
/**
*类说明:订阅所有的消息
*/
@Component
public class AllTopicService implements MessageListener {
private Logger logger = LoggerFactory.getLogger(AllTopicService.class);
public void onMessage(Message message) {
logger.info("Get message: "+new String( message.getBody()));
}
}
3、监听容器
将消费者bean和队列联系起来:
<rabbit**:listener-container connection-factory**=”rabbitConnectionFactory”>
- <rabbit**:listener ref**=”h1_Service” queues**=”h1_queue” method**=”onMessage”/>
<rabbit**:listener ref**=”h2_Service” queues**=”h2_queue” method**=”onMessage”/>
<rabbit**:listener ref**=”h3_Service” queues**=”h3_queue” method**=”onMessage”/> * - <rabbit**:listener ref**=”allTopicService” queues**=”all_queue” method**=”onMessage”/>
<rabbit**:listener ref**=”allKafkaTopicService” queues**=”all_kafka_queue” method**=”onMessage”/>
<rabbit**:listener ref**=”kingKafkaTopicService” queues**=”king_kafka_queue” method**=”onMessage”/>
<rabbit**:listener ref**=”kingAllTopicService” queues**=”king_all_queue” method**=”onMessage”/>
</rabbit**:listener-container*>
代码:消费者实现 MessageListener 接口即可。
@Component
public class AllKafkaTopicService implements MessageListener {
private Logger logger = LoggerFactory.getLogger(AllKafkaTopicService.class);
public void onMessage(Message message) {
logger.info("Get message: "+new String( message.getBody()));
}
}
4、生产者端(高级配置)
发送者确认的回调,实现方式和原生差不多,如下图,代码见rq-order包中的配置
<bean id**=”rabbitConnectionFactory” class=”org.springframework.amqp.rabbit.connection.CachingConnectionFactory”>
<constructor-arg value**=”127.0.0.1”/>
<property name**=”username” value**=”guest”/>
<property name**=”password” value**=”guest”/>
<property name**=”channelCacheSize” value**=”8”/>
<property name**=”port” value**=”5672”></property>
- <property name=”publisherConfirms” value=”true”></property>
</*bean>
失败通知的回调,实现方式和原生差不多,如下所示:
<rabbit**:template id**=”rabbitTemplate” connection-factory**=”rabbitConnectionFactory” mandatory**=”true”
**return-callback**=”sendReturnCallback” confirm-callback**=”confirmCallback”>
</rabbit**:template>
代码见rq-order包中的配置:
/**
* 类说明:失败通知的回调
*/
@Component
public class SendReturnCallback implements RabbitTemplate.ReturnCallback {
public void returnedMessage(Message message, int replyCode,String replyText,
String exchange,String routingKey) {
String msg = new String(message.getBody());
System.out.println("返回的replyText :"+replyText);
System.out.println("返回的exchange :"+exchange);
System.out.println("返回的routingKey :"+routingKey);
System.out.println("返回的message :"+message);
}
}
5、消费者端(高级配置)
手动确认,实现方式和原生差不多,如下:acknowledge=”manual”
- <rabbit**:listener-container connection-factory**=”rabbitConnectionFactory” acknowledge=”manual”>
<rabbit**:listener queues**=”depot_queue” ref**=”processDepot” method**=”onMessage” />
</rabbit**:listener-container*>
代码见rq-depot包中的配置:
@Service
public class ProcessDepot implements ChannelAwareMessageListener {
private static Logger logger = LoggerFactory.getLogger(ProcessDepot.class);
@Autowired
private DepotManager depotManager;
private static Gson gson = new Gson();
public void onMessage(Message message, Channel channel) throws Exception {
try {
String msg = new String(message.getBody());
logger.info(">>>>>>>>>>>>>>接收到消息:"+msg);
GoodTransferVo goodTransferVo = gson.fromJson(msg,GoodTransferVo.class);
try {
depotManager.operDepot(goodTransferVo);
//throw new RuntimeException("库存系统异常了!!!!!");
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
logger.info(">>>>>>>>>>>>>>库存处理完成,应答Mq服务");
} catch (Exception e) {
logger.error(e.getMessage());
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
logger.info(">>>>>>>>>>>>>>库存处理失败,拒绝消息,要求Mq重新派发");
throw e;
}
} catch (Exception e) {
logger.error(e.getMessage());
}
}
}
这里能够拿到信道Channel的话,具体操作就和原生一样,前面讲过的原生中的各种情况就可以根据你的业务场景来处理了。
Qos**方式配置:**
<rabbit**:listener-container connection-factory**=”rabbitConnectionFactory” acknowledge=”manual” prefetch=”150” >
<rabbit**:listener queues**=”depot_queue” ref**=”processDepot” method**=”onMessage” />
</rabbit**:listener-container**>
三、与SpringBoot集成
具体代码实现,参见rq-springboot-with模块
1、pom文件
这里SpringBoot的版本我们使用2.1.1,SpringBoot也是对原生进行包装,同理与Spring中引入RabbitMQ
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
2、统一配置
2.1、配置连接相关配置
这里包括虚拟机、发送方确认我们都加上:
spring.application.name=springboot-rabbitmq
spring.rabbitmq.host=10.200.169.5
spring.rabbitmq.username=chj
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672
# 消息发布确认
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/chj_vhost
2.2、连接工厂
使用一个配置类RabbitConfig :
@Configuration
public class RabbitConfig {
@Value("${spring.rabbitmq.host}")
private String addresses;
@Value("${spring.rabbitmq.port}")
private String port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Value("${spring.rabbitmq.publisher-confirms}")
private boolean publisherConfirms;
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses+":"+port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
//TODO 消息发送确认 如果要进行消息回调,则这里必须要设置为true
connectionFactory.setPublisherConfirms(publisherConfirms);
return connectionFactory;
}
2.3、RabbitTemplate:
@Bean
public RabbitTemplate newRabbitTemplate(){
RabbitTemplate template = new RabbitTemplate(connectionFactory());
//TODO 失败通知
template.setMandatory(true);
//TODO 发送方确认
template.setConfirmCallback(confirmCallback());
///TODO 失败回调
template.setReturnCallback(returnCallback());
return template;
}
2.4、队列和交换器及绑定关系:
可以在生产者配置RabbitConfig类中增加队列和交换器
默认交换器(direct)**:**默认情况下,申明一个队列,如果没有建立与交换器的绑定关系,系统默认分配一个 Default 交换器(多个队列也是这一个),默认匹配队列名称
//TODO 申明队列(最简单的方式)
@Bean
public Queue helloQueue(){
return new Queue(RmConst.QUEUE_HELLO);
}
@Bean
public Queue userQueue() {
return new Queue(RmConst.QUEUE_USER);
}
Topic类型**:**
@Bean
public Queue queueEmailMessage() {
return new Queue(RmConst.QUEUE_TOPIC_EMAIL);
}
@Bean
public Queue queueUserMessages() {
return new Queue(RmConst.QUEUE_TOPIC_USER);
}
// TODO 申明交换器(topic交换器)
@Bean
public TopicExchange exchange(){
return new TopicExchange(RmConst.EXCHANGE_TOPIC);
}
//TODO 绑定关系
@Bean
public Binding bindingEmailExchangeMessage(){
return BindingBuilder.bind(queueEmailMessage()).to(exchange()).with("hankin.email");
}
@Bean
public Binding bindingUserExchangeMessages() {
return BindingBuilder.bind(queueUserMessages()).to(exchange()).with("hankin.*.user");
}
Fanout类型**:**
//TODO 申明队列
@Bean
public Queue AMessage() {
return new Queue("sb.fanout.A");
}
//TODO 申明交换器(fanout交换器)
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(RmConst.EXCHANGE_FANOUT);
}
//TODO 绑定关系
@Bean
Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
//TODO ===============消费者确认==========
public SimpleMessageListenerContainer messageContainer(){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
//TODO 绑定了这个hankin.user队列
container.setQueues(userQueue());
//TODO 手动提交
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//TODO 消费确认方法
container.setMessageListener(userReceiver);
return container;
}
2.5、发送者失败通知、发送者确认的回调
//TODO ===============生产者发送确认==========
@Bean
public RabbitTemplate.ConfirmCallback confirmCallback() {
return new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack){
System.out.println("发送者确认发送给mq成功");
}else {//处理失败的消息
System.out.println("发送者发送给mq失败,考虑重发:"+cause);
}
}
};
}
//TODO ===============失败通知==========
@Bean
public RabbitTemplate.ReturnCallback returnCallback() {
return new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey) {
System.out.println("无法路由的消息,需要考虑另外处理。");
System.out.println("Returned replyText:"+replyText);
System.out.println("Returned exchange:"+exchange);
System.out.println("Returned routingKey:"+routingKey);
String msgJson = new String(message.getBody());
System.out.println("Returned Message:"+msgJson);
}
};
}
3、生产者
默认情况下(direct交换器) :DefaultSender
Topic交换器 :TopicSenderFanout交换器
FanoutSender:
4、消费者
默认情况下(direct 交换器绑定的队列):
HelloReceiver、UserReceiver
Topic交换器(绑定的队列):
TopicEmailMessageReceiver、TopicUserMessageReceiver
Fanout 交换器(绑定的队列):
FanoutReceiver
5、演示效果
5.1、普通类型(direct交换)测试
/**
*类说明: localhost:8080/rabbit/hello
*/
@RestController
@RequestMapping("/rabbit")
public class RabbitController {
@Autowired
private DefaultSender defaultSender;
@Autowired
private TopicSender topicSender;
@Autowired
private FanoutSender fanoutSender;
/**
* 普通类型测试
*/
@GetMapping("/hello")
public void hello(){ //mq的消息发送
String message = "hello rabbitmq!!";
defaultSender.send(message);
}
/**
* topic exchange类型rabbitmq测试
*/
@GetMapping("/topicTest")
public void topicTest() {
topicSender.send();
}
/**
* fanout exchange类型rabbitmq测试
*/
@GetMapping("/fanoutTest")
public void fanoutTest() {
fanoutSender.send("hellomsg:OK");
}
}
生产者**代码:**
@Component
public class DefaultSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String message){
String sendMsg = message +"---"+ System.currentTimeMillis();;
System.out.println("Sender : " + sendMsg);
//TODO 普通消息处理
//this.rabbitTemplate.convertAndSend(RmConst.QUEUE_HELLO, sendMsg);
//TODO 消息处理--(消费者处理时,有手动应答)
this.rabbitTemplate.convertAndSend(RmConst.QUEUE_USER,message);
}
}
消费者**代码:简单消费者**
@Component
@RabbitListener(queues = "hankin.hello")
public class HelloReceiver {
@RabbitHandler
public void process(String hello){
System.out.println("HelloReceiver : " + hello);
}
}
消费者手动确认:
@Component
public class UserReceiver implements ChannelAwareMessageListener {
// @RabbitHandler
// public void process(String hello) {
// System.out.println("Receiver2 : " + hello);
// }
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
String msg = new String(message.getBody());
System.out.println("UserReceiver>>>>>>>接收到消息:"+msg);
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("UserReceiver>>>>>>消息已消费");
}catch (Exception e){
System.out.println(e.getMessage());
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
System.out.println("UserReceiver>>>>>>拒绝消息,要求Mq重新派发");
throw e;
}
}catch (Exception e){
System.out.println(e.getMessage());
}
}
}
5.2、广播类型(Fanout 交换)测试:
http://localhost:8080/rabbit/fanoutTest
生产者代码:
@Component
public class FanoutSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String msg) {
String sendMsg = msg +"---"+ System.currentTimeMillis();;
System.out.println("FanoutSender : " + sendMsg);
this.rabbitTemplate.convertAndSend(RmConst.EXCHANGE_FANOUT, "",sendMsg);
}
}
消费者代码,简单消费者:
@Component
@RabbitListener(queues = "hankin.fanout.A")
public class FanoutReceiver {
@RabbitHandler
public void process(String hello) {
System.out.println("FanoutReceiver : " + hello);
}
}
5.3、Topic类型(topic交换)测试
http://localhost:8080/rabbit/topicTest
生产者代码:
@Component
public class TopicSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(){
String msg1 = "I am email mesaage msg======";
System.out.println("TopicSender send the 1st : " + msg1);
// String exchange, String routingKey, Object object
this.rabbitTemplate.convertAndSend(RmConst.EXCHANGE_TOPIC,RmConst.RK_EMAIL,msg1);
String msg2 = "I am user mesaages msg########";
System.out.println("TopicSender send the 2nd : " + msg2);
this.rabbitTemplate.convertAndSend(RmConst.EXCHANGE_TOPIC, RmConst.RK_USER, msg2);
String msg3 = "I am error mesaages msg";
System.out.println("TopicSender send the 3rd : " + msg3);
this.rabbitTemplate.convertAndSend(RmConst.EXCHANGE_TOPIC, "errorkey", msg3);
}
}
消费者代码:
@Component
@RabbitListener(queues = "hankin.info.user")
public class TopicUserMessageReceiver {
@RabbitHandler
public void process(String message){
System.out.println("TopicUser Message Receiver : " +message);
}
}
四、实战-应用解耦
1、场景:
用户下订单买商品,订单处理成功后,去扣减库存,在这个场景里,订单系统是生产者,库存系统是消费者。
库存是必须扣减的,在业务上来说,有库存直接扣减即可,没库存或者低于某个阈值,可以扣减成功,不过要通知其他系统(如通知采购系统尽快采购,通知用户订单系统我们会尽快调货)。
2、RPC实现
通过RPC的实现,可以看到 RPC会造成耦合。一旦库存系统失败,订单系统也会跟着失败。我们希望库存系统本身的失败,不影响订单系统的继续执行,在业务流程上,进行订单系统和库存系统的解耦。
3、RabbitMQ的实现
对于我们消息模式的实现,为保证库存必须有扣减,我们要考虑几个问题:
1、订单系统发给Mq服务的扣减库存的消息必须要被Mq服务器接收到,意味着需要使用发送者确认。
2、Mq服务器在扣减库存的消息被库存服务正确处理前必须一直保存,那么需要消息进行持久化。
3、某个库存服务器出了问题,扣减库存的消息要能够被其他正常的库存服务处理,需要我们自行对消费进行确认,意味着不能使用消费者自动确认,而应该使用手动确认。
所以生产者订单系统这边需要 ,配置文件中队列和交换器进行持久化,消息发送时的持久化,发送者确认的相关配置和代码。所以消费者库存系统这边要进行手动确认。
代码实现具体参考:rq-depot、rq-order
还没有评论,来说两句吧...