第四章 Spring集成和实战笔记

比眉伴天荒 2023-06-06 04:53 123阅读 0赞

一、与 Spring 集成—生产者端

具体代码实现,参见rq-spring-with和rq-spring-with-consumer模块

1、pom文件

使用Maven,这里项目中使用的4.3.11,所以这里引入的是rabbit是2.0.0,如果兼容性的话请自行去Spring的官网上去查。

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>

这里补充一下,spring的引入也是对原生进行包装:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L20wXzM3NjYxNDU4_size_16_color_FFFFFF_t_70

2、统一配置

配置文件中增加命名空间:


<beans xmlns**=”http://www.springframework.org/schema/beans"**
**xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance"**
** xmlns:rabbit=”http://www.springframework.org/schema/rabbit"**
**xmlns:context=”http://www.springframework.org/schema/context"**
**xsi:schemaLocation=”http://www.springframework.org/schema/beans**
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
**http://www.springframework.org/schema/rabbit**
http://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd“>

3、连接相关配置:

客户端连接:


<bean id**=”rabbitConnectionFactory” class**=”org.springframework.amqp.rabbit.connection.CachingConnectionFactory”>
<constructor-arg value**=”10.200.169.5”/>
<
property name**=”username” value**=”chj”/>
<
property name**=”password” value**=”123456”/>
<
property name**=”virtualHost” value**=”chj_vhost” />
<
property name**=”channelCacheSize” value**=”8”/>
<
property name**=”port” value**=”5672”></property>
</
bean**>

管理配置:

  1. <!--Spring的rabbitmq admin-->
  2. <rabbit:admin connection-factory="rabbitConnectionFactory"/>

4、生产者端(基础配置)

RabbitTemplate配置:


<bean id**=”rabbitTemplate” class**=”org.springframework.amqp.rabbit.core.RabbitTemplate”>
<constructor-arg ref**=”rabbitConnectionFactory”/>
</
bean**>

或下面这种声明方式也是可以的。


<rabbit**:templete id**=”rabbitTemplate2” connection-factory**=”rabbitConnectionFactory”** />

5、队列和交换器

可以在生产者配置文件中增加队列和交换器:


<rabbit**:queue name**=”h4_queue” durable**=”false”></rabbit**:queue>

<rabbit**:fanout-exchange name**=”fanout-exchange”

xmlns**=”http://www.springframework.org/schema/rabbit"** durable**=”false”>
<
bindings>
<
binding queue**=”h4_queue”></binding>
</bindings>
</rabbit**:fanout-exchange>

<
rabbit**:topic-exchange name**=”topic-exchange”**

xmlns**=”http://www.springframework.org/schema/rabbit"** durable**=”false”**>

</rabbit**:topic-exchange**>

6、代码实现

发送消息时,使用rabbitTemplate即可。同时还可以给消息配置属性MessageProperties。

  1. /**
  2. *类说明:spring生产者,模拟两种消息发送,一种fanout、一种topic
  3. */
  4. @Controller
  5. @RequestMapping("/rabbitmq")
  6. public class RabbitMqController {
  7. private Logger logger = LoggerFactory.getLogger(RabbitMqController.class);
  8. @Autowired
  9. RabbitTemplate rabbitTemplate;
  10. @ResponseBody
  11. @RequestMapping("/fanoutSender")
  12. public String fanoutSender(@RequestParam("message")String message){
  13. String opt="";
  14. try {
  15. for(int i=0;i<3;i++){
  16. String str = "Fanout,the message_"+i+" is : "+message;
  17. logger.info("**************************Send Message:["+str+"]");
  18. //TODO 生产者发送消息
  19. rabbitTemplate.send("fanout-exchange","", new Message(str.getBytes(),new MessageProperties()));
  20. }
  21. opt = "suc";
  22. } catch (Exception e) {
  23. opt = e.getCause().toString();
  24. }
  25. return opt;
  26. }
  27. @ResponseBody
  28. @RequestMapping("/topicSender")
  29. public String topicSender(@RequestParam("message")String message){
  30. String opt="";
  31. try {
  32. String[] routekeys={"king","mark","james"};
  33. String[] modules={"kafka","jvm","redis"};
  34. for(int i=0;i<routekeys.length;i++){
  35. for(int j=0;j<modules.length;j++){
  36. String routeKey = routekeys[i]+"."+modules[j];
  37. String str = "Topic,the message_["+i+","+j+"] is [rk:"+routeKey+"][msg:"+message+"]";
  38. logger.info("**************************Send Message:["+str+"]");
  39. //TODO 生产者发送消息 属性可以自由配置
  40. MessageProperties messageProperties = new MessageProperties();
  41. rabbitTemplate.send("topic-exchange",routeKey, new Message(str.getBytes(), messageProperties));
  42. }
  43. }
  44. opt = "suc";
  45. } catch (Exception e) {
  46. opt = e.getCause().toString();
  47. }
  48. return opt;
  49. }
  50. }

这里重申一下,生产者和消费都可以申明交换器、申明队列、绑定关系,一般处理是生产者和消费者都相同配置,这样以防止万一,如果生产者或者消费者单独启动,发送或者消费数据不会出现问题。

二、与 Spring 集成—消费者端

1、消费者端(基础配置)

队列和交换器**:**消费者中也可配置队列和交换器,以及指定队列和交换器绑定的路由键



<rabbit**:queue name**=”h1_queue” durable**=”false”/>
<
rabbit**:queue name**=”h2_queue” durable**=”false”/>
<rabbit**:queue name**=”h3_queue” durable**=”false”/>

<
rabbit**:fanout-exchange name**=”fanout-exchange” xmlns**=”http://www.springframework.org/schema/rabbit“ durable**=”false”>
<
rabbit**:bindings>
<rabbit**:binding queue**=”h1_queue”></rabbit**:binding>
<
rabbit**:binding queue**=”h2_queue”></rabbit**:binding>
<rabbit**:binding queue**=”h3_queue”></rabbit**:binding>
</
rabbit**:bindings>
</rabbit**:fanout-exchange*> *



<rabbit**:queue name**=”all_queue” durable**=”false”/>
<
rabbit**:queue name**=”all_kafka_queue” durable**=”false”/>
<rabbit**:queue name**=”king_kafka_queue” durable**=”false”/>
<
rabbit**:queue name**=”king_all_queue” durable**=”false”/>

<rabbit**:topic-exchange name**=”topic-exchange” xmlns**=”http://www.springframework.org/schema/rabbit"** durable**=”false”>
<
rabbit**:bindings>
<binding pattern**=”#“ queue**=”all_queue”></binding>
<binding pattern**=”*.kafka” queue**=”all_kafka_queue”></binding>
<binding pattern**=”king.kafka” queue**=”king_kafka_queue”></binding>
<binding pattern**=”king.*“ queue**=”king_all_queue”></binding>
</rabbit**:bindings>
</
rabbit**:topic-exchange>

2、消费者bean

两种方式:一种配置文件,一种注解定义

配置文件:


<bean id**=”h1_Service” class**=”com.chj.service.fanout.H1_Service”></bean>
<bean id**=”h2_Service” class**=”com.chj.service.fanout.H2_Service”></bean>
<bean id**=”h3_Service” class**=”com.chj.service.fanout.H3_Service”></bean>

注解定义:

  1. /**
  2. *类说明:订阅所有的消息
  3. */
  4. @Component
  5. public class AllTopicService implements MessageListener {
  6. private Logger logger = LoggerFactory.getLogger(AllTopicService.class);
  7. public void onMessage(Message message) {
  8. logger.info("Get message: "+new String( message.getBody()));
  9. }
  10. }

3、监听容器

将消费者bean和队列联系起来:


<rabbit**:listener-container connection-factory**=”rabbitConnectionFactory”>

  • <rabbit**:listener ref**=”h1_Service” queues**=”h1_queue” method**=”onMessage”/>
    <rabbit**:listener ref**=”h2_Service” queues**=”h2_queue” method**=”onMessage”/>
    <rabbit**:listener ref**=”h3_Service” queues**=”h3_queue” method**=”onMessage”/>
    *
  • <rabbit**:listener ref**=”allTopicService” queues**=”all_queue” method**=”onMessage”/>
    <rabbit**:listener ref**=”allKafkaTopicService” queues**=”all_kafka_queue” method**=”onMessage”/>
    <rabbit**:listener ref**=”kingKafkaTopicService” queues**=”king_kafka_queue” method**=”onMessage”/>
    <rabbit**:listener ref**=”kingAllTopicService” queues**=”king_all_queue” method**=”onMessage”/>
    </rabbit**:listener-container*
    >

代码:消费者实现 MessageListener 接口即可。

  1. @Component
  2. public class AllKafkaTopicService implements MessageListener {
  3. private Logger logger = LoggerFactory.getLogger(AllKafkaTopicService.class);
  4. public void onMessage(Message message) {
  5. logger.info("Get message: "+new String( message.getBody()));
  6. }
  7. }

4、生产者端(高级配置)

发送者确认的回调,实现方式和原生差不多,如下图,代码见rq-order包中的配置

<bean id**=”rabbitConnectionFactory” class=”org.springframework.amqp.rabbit.connection.CachingConnectionFactory”>
<
constructor-arg value**=”127.0.0.1”/>
<property name**=”username” value**=”guest”/>
<property name**=”password” value**=”guest”/>
<property name**=”channelCacheSize” value**=”8”/>
<property name**=”port” value**=”5672”></property>

  • <property name=”publisherConfirms” value=”true”></property>
    </*bean
    >

失败通知的回调,实现方式和原生差不多,如下所示:

<rabbit**:template id**=”rabbitTemplate” connection-factory**=”rabbitConnectionFactory” mandatory**=”true”
**return-callback**=”sendReturnCallback” confirm-callback**=”confirmCallback”>

</
rabbit**:template>

代码见rq-order包中的配置:

  1. /**
  2. * 类说明:失败通知的回调
  3. */
  4. @Component
  5. public class SendReturnCallback implements RabbitTemplate.ReturnCallback {
  6. public void returnedMessage(Message message, int replyCode,String replyText,
  7. String exchange,String routingKey) {
  8. String msg = new String(message.getBody());
  9. System.out.println("返回的replyText :"+replyText);
  10. System.out.println("返回的exchange :"+exchange);
  11. System.out.println("返回的routingKey :"+routingKey);
  12. System.out.println("返回的message :"+message);
  13. }
  14. }

5、消费者端(高级配置)

手动确认,实现方式和原生差不多,如下:acknowledge=”manual”

  • <rabbit**:listener-container connection-factory**=”rabbitConnectionFactory” acknowledge=”manual”>
    <rabbit**:listener queues**=”depot_queue” ref**=”processDepot” method**=”onMessage” />
    </rabbit**:listener-container*
    >

代码见rq-depot包中的配置:

  1. @Service
  2. public class ProcessDepot implements ChannelAwareMessageListener {
  3. private static Logger logger = LoggerFactory.getLogger(ProcessDepot.class);
  4. @Autowired
  5. private DepotManager depotManager;
  6. private static Gson gson = new Gson();
  7. public void onMessage(Message message, Channel channel) throws Exception {
  8. try {
  9. String msg = new String(message.getBody());
  10. logger.info(">>>>>>>>>>>>>>接收到消息:"+msg);
  11. GoodTransferVo goodTransferVo = gson.fromJson(msg,GoodTransferVo.class);
  12. try {
  13. depotManager.operDepot(goodTransferVo);
  14. //throw new RuntimeException("库存系统异常了!!!!!");
  15. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  16. logger.info(">>>>>>>>>>>>>>库存处理完成,应答Mq服务");
  17. } catch (Exception e) {
  18. logger.error(e.getMessage());
  19. channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
  20. logger.info(">>>>>>>>>>>>>>库存处理失败,拒绝消息,要求Mq重新派发");
  21. throw e;
  22. }
  23. } catch (Exception e) {
  24. logger.error(e.getMessage());
  25. }
  26. }
  27. }

这里能够拿到信道Channel的话,具体操作就和原生一样,前面讲过的原生中的各种情况就可以根据你的业务场景来处理了。

Qos**方式配置:**


<rabbit**:listener-container connection-factory**=”rabbitConnectionFactory” acknowledge=”manual” prefetch=”150” >
<rabbit**:listener queues**=”depot_queue” ref**=”processDepot” method**=”onMessage” />
</rabbit**:listener-container**>

三、与SpringBoot集成

具体代码实现,参见rq-springboot-with模块

1、pom文件

这里SpringBoot的版本我们使用2.1.1,SpringBoot也是对原生进行包装,同理与Spring中引入RabbitMQ

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>

2、统一配置

2.1、配置连接相关配置

这里包括虚拟机、发送方确认我们都加上:

spring.application.name=springboot-rabbitmq
spring.rabbitmq.host=10.200.169.5
spring.rabbitmq.username=chj
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672
# 消息发布确认
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/chj_vhost

2.2、连接工厂

使用一个配置类RabbitConfig :

  1. @Configuration
  2. public class RabbitConfig {
  3. @Value("${spring.rabbitmq.host}")
  4. private String addresses;
  5. @Value("${spring.rabbitmq.port}")
  6. private String port;
  7. @Value("${spring.rabbitmq.username}")
  8. private String username;
  9. @Value("${spring.rabbitmq.password}")
  10. private String password;
  11. @Value("${spring.rabbitmq.virtual-host}")
  12. private String virtualHost;
  13. @Value("${spring.rabbitmq.publisher-confirms}")
  14. private boolean publisherConfirms;
  15. public ConnectionFactory connectionFactory(){
  16. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  17. connectionFactory.setAddresses(addresses+":"+port);
  18. connectionFactory.setUsername(username);
  19. connectionFactory.setPassword(password);
  20. connectionFactory.setVirtualHost(virtualHost);
  21. //TODO 消息发送确认 如果要进行消息回调,则这里必须要设置为true
  22. connectionFactory.setPublisherConfirms(publisherConfirms);
  23. return connectionFactory;
  24. }

2.3、RabbitTemplate:

  1. @Bean
  2. public RabbitTemplate newRabbitTemplate(){
  3. RabbitTemplate template = new RabbitTemplate(connectionFactory());
  4. //TODO 失败通知
  5. template.setMandatory(true);
  6. //TODO 发送方确认
  7. template.setConfirmCallback(confirmCallback());
  8. ///TODO 失败回调
  9. template.setReturnCallback(returnCallback());
  10. return template;
  11. }

2.4、队列和交换器及绑定关系:

可以在生产者配置RabbitConfig类中增加队列和交换器

默认交换器(direct)**:**默认情况下,申明一个队列,如果没有建立与交换器的绑定关系,系统默认分配一个 Default 交换器(多个队列也是这一个),默认匹配队列名称

  1. //TODO 申明队列(最简单的方式)
  2. @Bean
  3. public Queue helloQueue(){
  4. return new Queue(RmConst.QUEUE_HELLO);
  5. }
  6. @Bean
  7. public Queue userQueue() {
  8. return new Queue(RmConst.QUEUE_USER);
  9. }

Topic类型**:**

  1. @Bean
  2. public Queue queueEmailMessage() {
  3. return new Queue(RmConst.QUEUE_TOPIC_EMAIL);
  4. }
  5. @Bean
  6. public Queue queueUserMessages() {
  7. return new Queue(RmConst.QUEUE_TOPIC_USER);
  8. }
  9. // TODO 申明交换器(topic交换器)
  10. @Bean
  11. public TopicExchange exchange(){
  12. return new TopicExchange(RmConst.EXCHANGE_TOPIC);
  13. }
  14. //TODO 绑定关系
  15. @Bean
  16. public Binding bindingEmailExchangeMessage(){
  17. return BindingBuilder.bind(queueEmailMessage()).to(exchange()).with("hankin.email");
  18. }
  19. @Bean
  20. public Binding bindingUserExchangeMessages() {
  21. return BindingBuilder.bind(queueUserMessages()).to(exchange()).with("hankin.*.user");
  22. }

Fanout类型**:**

  1. //TODO 申明队列
  2. @Bean
  3. public Queue AMessage() {
  4. return new Queue("sb.fanout.A");
  5. }
  6. //TODO 申明交换器(fanout交换器)
  7. @Bean
  8. public FanoutExchange fanoutExchange() {
  9. return new FanoutExchange(RmConst.EXCHANGE_FANOUT);
  10. }
  11. //TODO 绑定关系
  12. @Bean
  13. Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
  14. return BindingBuilder.bind(AMessage).to(fanoutExchange);
  15. }
  16. //TODO ===============消费者确认==========
  17. public SimpleMessageListenerContainer messageContainer(){
  18. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
  19. //TODO 绑定了这个hankin.user队列
  20. container.setQueues(userQueue());
  21. //TODO 手动提交
  22. container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  23. //TODO 消费确认方法
  24. container.setMessageListener(userReceiver);
  25. return container;
  26. }

2.5、发送者失败通知、发送者确认的回调

  1. //TODO ===============生产者发送确认==========
  2. @Bean
  3. public RabbitTemplate.ConfirmCallback confirmCallback() {
  4. return new RabbitTemplate.ConfirmCallback() {
  5. @Override
  6. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  7. if (ack){
  8. System.out.println("发送者确认发送给mq成功");
  9. }else {//处理失败的消息
  10. System.out.println("发送者发送给mq失败,考虑重发:"+cause);
  11. }
  12. }
  13. };
  14. }
  15. //TODO ===============失败通知==========
  16. @Bean
  17. public RabbitTemplate.ReturnCallback returnCallback() {
  18. return new RabbitTemplate.ReturnCallback() {
  19. @Override
  20. public void returnedMessage(Message message, int replyCode, String replyText,
  21. String exchange, String routingKey) {
  22. System.out.println("无法路由的消息,需要考虑另外处理。");
  23. System.out.println("Returned replyText:"+replyText);
  24. System.out.println("Returned exchange:"+exchange);
  25. System.out.println("Returned routingKey:"+routingKey);
  26. String msgJson = new String(message.getBody());
  27. System.out.println("Returned Message:"+msgJson);
  28. }
  29. };
  30. }

3、生产者

默认情况下(direct交换器) :DefaultSender

Topic交换器 :TopicSenderFanout交换器

FanoutSender:

4、消费者

默认情况下(direct 交换器绑定的队列):

HelloReceiver、UserReceiver

Topic交换器(绑定的队列):

TopicEmailMessageReceiver、TopicUserMessageReceiver

Fanout 交换器(绑定的队列):

FanoutReceiver

5、演示效果

5.1、普通类型(direct交换)测试

  1. /**
  2. *类说明: localhost:8080/rabbit/hello
  3. */
  4. @RestController
  5. @RequestMapping("/rabbit")
  6. public class RabbitController {
  7. @Autowired
  8. private DefaultSender defaultSender;
  9. @Autowired
  10. private TopicSender topicSender;
  11. @Autowired
  12. private FanoutSender fanoutSender;
  13. /**
  14. * 普通类型测试
  15. */
  16. @GetMapping("/hello")
  17. public void hello(){ //mq的消息发送
  18. String message = "hello rabbitmq!!";
  19. defaultSender.send(message);
  20. }
  21. /**
  22. * topic exchange类型rabbitmq测试
  23. */
  24. @GetMapping("/topicTest")
  25. public void topicTest() {
  26. topicSender.send();
  27. }
  28. /**
  29. * fanout exchange类型rabbitmq测试
  30. */
  31. @GetMapping("/fanoutTest")
  32. public void fanoutTest() {
  33. fanoutSender.send("hellomsg:OK");
  34. }
  35. }

生产者**代码:**

  1. @Component
  2. public class DefaultSender {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. public void send(String message){
  6. String sendMsg = message +"---"+ System.currentTimeMillis();;
  7. System.out.println("Sender : " + sendMsg);
  8. //TODO 普通消息处理
  9. //this.rabbitTemplate.convertAndSend(RmConst.QUEUE_HELLO, sendMsg);
  10. //TODO 消息处理--(消费者处理时,有手动应答)
  11. this.rabbitTemplate.convertAndSend(RmConst.QUEUE_USER,message);
  12. }
  13. }

消费者**代码:简单消费者**

  1. @Component
  2. @RabbitListener(queues = "hankin.hello")
  3. public class HelloReceiver {
  4. @RabbitHandler
  5. public void process(String hello){
  6. System.out.println("HelloReceiver : " + hello);
  7. }
  8. }

消费者手动确认:

  1. @Component
  2. public class UserReceiver implements ChannelAwareMessageListener {
  3. // @RabbitHandler
  4. // public void process(String hello) {
  5. // System.out.println("Receiver2 : " + hello);
  6. // }
  7. @Override
  8. public void onMessage(Message message, Channel channel) throws Exception {
  9. try {
  10. String msg = new String(message.getBody());
  11. System.out.println("UserReceiver>>>>>>>接收到消息:"+msg);
  12. try {
  13. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  14. System.out.println("UserReceiver>>>>>>消息已消费");
  15. }catch (Exception e){
  16. System.out.println(e.getMessage());
  17. channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
  18. System.out.println("UserReceiver>>>>>>拒绝消息,要求Mq重新派发");
  19. throw e;
  20. }
  21. }catch (Exception e){
  22. System.out.println(e.getMessage());
  23. }
  24. }
  25. }

5.2、广播类型(Fanout 交换)测试:

http://localhost:8080/rabbit/fanoutTest

生产者代码:

  1. @Component
  2. public class FanoutSender {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. public void send(String msg) {
  6. String sendMsg = msg +"---"+ System.currentTimeMillis();;
  7. System.out.println("FanoutSender : " + sendMsg);
  8. this.rabbitTemplate.convertAndSend(RmConst.EXCHANGE_FANOUT, "",sendMsg);
  9. }
  10. }

消费者代码,简单消费者:

  1. @Component
  2. @RabbitListener(queues = "hankin.fanout.A")
  3. public class FanoutReceiver {
  4. @RabbitHandler
  5. public void process(String hello) {
  6. System.out.println("FanoutReceiver : " + hello);
  7. }
  8. }

5.3、Topic类型(topic交换)测试

http://localhost:8080/rabbit/topicTest

生产者代码:

  1. @Component
  2. public class TopicSender {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. public void send(){
  6. String msg1 = "I am email mesaage msg======";
  7. System.out.println("TopicSender send the 1st : " + msg1);
  8. // String exchange, String routingKey, Object object
  9. this.rabbitTemplate.convertAndSend(RmConst.EXCHANGE_TOPIC,RmConst.RK_EMAIL,msg1);
  10. String msg2 = "I am user mesaages msg########";
  11. System.out.println("TopicSender send the 2nd : " + msg2);
  12. this.rabbitTemplate.convertAndSend(RmConst.EXCHANGE_TOPIC, RmConst.RK_USER, msg2);
  13. String msg3 = "I am error mesaages msg";
  14. System.out.println("TopicSender send the 3rd : " + msg3);
  15. this.rabbitTemplate.convertAndSend(RmConst.EXCHANGE_TOPIC, "errorkey", msg3);
  16. }
  17. }

消费者代码:

  1. @Component
  2. @RabbitListener(queues = "hankin.info.user")
  3. public class TopicUserMessageReceiver {
  4. @RabbitHandler
  5. public void process(String message){
  6. System.out.println("TopicUser Message Receiver : " +message);
  7. }
  8. }

四、实战-应用解耦

1、场景:

用户下订单买商品,订单处理成功后,去扣减库存,在这个场景里,订单系统是生产者,库存系统是消费者。

库存是必须扣减的,在业务上来说,有库存直接扣减即可,没库存或者低于某个阈值,可以扣减成功,不过要通知其他系统(如通知采购系统尽快采购,通知用户订单系统我们会尽快调货)。

2、RPC实现

通过RPC的实现,可以看到 RPC会造成耦合。一旦库存系统失败,订单系统也会跟着失败。我们希望库存系统本身的失败,不影响订单系统的继续执行,在业务流程上,进行订单系统和库存系统的解耦。

3、RabbitMQ的实现

对于我们消息模式的实现,为保证库存必须有扣减,我们要考虑几个问题:

1、订单系统发给Mq服务的扣减库存的消息必须要被Mq服务器接收到,意味着需要使用发送者确认。

2、Mq服务器在扣减库存的消息被库存服务正确处理前必须一直保存,那么需要消息进行持久化。

3、某个库存服务器出了问题,扣减库存的消息要能够被其他正常的库存服务处理,需要我们自行对消费进行确认,意味着不能使用消费者自动确认,而应该使用手动确认。

所以生产者订单系统这边需要 ,配置文件中队列和交换器进行持久化,消息发送时的持久化,发送者确认的相关配置和代码。所以消费者库存系统这边要进行手动确认。

代码实现具体参考:rq-depot、rq-order

发表评论

表情:
评论列表 (有 0 条评论,123人围观)

还没有评论,来说两句吧...

相关阅读