SpringBoot整合ActiveMQ消息队列和双向队列、点对点与发布订阅

男娘i 2022-05-09 11:28 366阅读 0赞

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

1.添加SpringBoot集成ActiveMQ所需依赖

  1. <!-- activeMQ-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-activemq</artifactId>
  5. </dependency>

2.配置application.properties文件

  1. ## URL of the ActiveMQ broker. Auto-generated by default. For instance `tcp://localhost:61616`
  2. # failover:(tcp://localhost:61616,tcp://localhost:61617)
  3. # tcp://localhost:61616
  4. spring.activemq.broker-url=tcp://localhost:61616
  5. spring.activemq.in-memory=true
  6. #如果此处设置为true,需要加如下的依赖包,否则会自动配置失败,报JmsMessagingTemplate注入失败
  7. spring.activemq.pool.enabled=false
  8. #默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置
  9. #spring.jms.pub-sub-domain=true

3.在启动类中使用同步、异步消息队列

加入@EnableJms注解就是异步,没有加 @EnableJms注解则默认是同步。

  1. @EnableJms
  2. @SpringBootApplication
  3. public class QuartzsApplication {
  4. public static void main(String[] args) {
  5. SpringApplication.run(QuartzsApplication.class, args);
  6. }
  7. }

4.点对点模式和发布订阅模式

  • 点对点模式:生产者发送一条消息到queue,只有一个消费者能收到。
  • 发布订阅模式:发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。

在JMS中,TOPIC实现了分发和订阅,当你分发一个消息,所有订阅这个消息的服务都能得到这个服务,所以从0到许多个订阅者都能得到一个消息的拷贝,只有在消息代理收到消息时有一个有效订阅时的订阅者才能得到这个消息的拷贝。

JMS Queue实现了负载均衡,一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到有 一个可用的消费者,一个queue可以有很多消费者,他们之间实现了负载均衡,所以Queue实现了一个可靠的JMS负载均衡。

5.Producer消息生产者

  1. package com.primeton.quartzs.activeMQ;
  2. import javax.jms.Destination;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.jms.annotation.JmsListener;
  5. import org.springframework.jms.core.JmsMessagingTemplate;
  6. import org.springframework.stereotype.Service;
  7. import java.util.ArrayList;
  8. @Service("producer")
  9. public class Producer {
  10. @Autowired // 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装
  11. private JmsMessagingTemplate jmsTemplate;
  12. // 发送消息,destination是发送到的队列,message是待发送的消息
  13. public void sendMessage(Destination destination, final String message){
  14. jmsTemplate.convertAndSend(destination, message);
  15. }
  16. @JmsListener(destination="out.queue")//实现双向队列
  17. public void consumerMessage(String text){
  18. System.out.println("从out.queue队列收到的回复报文为:"+text);
  19. }
  20. }

6.Consumer两个消费者

  1. package com.primeton.quartzs.activeMQ;
  2. import org.springframework.jms.annotation.JmsListener;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class Consumer {
  6. // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息
  7. @JmsListener(destination = "mytest.queue")
  8. public void receiveQueue(String text) {
  9. System.out.println("Consumer收到的报文为:"+text);
  10. }
  11. }
  12. package com.primeton.quartzs.activeMQ;
  13. import org.springframework.jms.annotation.JmsListener;
  14. import org.springframework.messaging.handler.annotation.SendTo;
  15. import org.springframework.stereotype.Component;
  16. @Component
  17. public class Consumer2 {
  18. @JmsListener(destination = "mytest.queue")
  19. @SendTo("out.queue")
  20. public String receiveQueue(String text) {
  21. System.out.println("Consumer2收到的报文为:"+text);
  22. return "return message"+text;
  23. }
  24. }

7.实现消息队列和双向队列

在生产者上加入out.queue

  1. @JmsListener(destination="out.queue")//实现双向队列
  2. public void consumerMessage(String text){
  3. System.out.println("从out.queue队列收到的回复报文为:"+text);
  4. }

消费者注解@SendTo(“out.queue”)

  1. @JmsListener(destination = "mytest.queue")
  2. @SendTo("out.queue")
  3. public String receiveQueue(String text) {
  4. System.out.println("Consumer2收到的报文为:"+text);
  5. return "return message"+text;
  6. }

8.实现发布订阅

在消费者的注解@JmsListener加上containerFactory

  1. // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息
  2. @JmsListener(destination = "mytest.queue", containerFactory = "jmsListenerContainerQueue")
  3. public void receiveQueue(String text) {
  4. System.out.println("Consumer收到的报文为:"+text);
  5. }
  6. @JmsListener(destination = "mytest.topic", containerFactory = "jmsListenerContainerTopic")
  7. public void testTopicCusumer(String test){
  8. System.out.println(test);
  9. }

9.编写测试类

  1. package com.primeton.quartzs;
  2. import com.primeton.quartzs.activeMQ.Producer;
  3. import org.apache.activemq.command.ActiveMQQueue;
  4. import org.junit.Test;
  5. import org.junit.runner.RunWith;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.boot.test.context.SpringBootTest;
  8. import org.springframework.test.context.junit4.SpringRunner;
  9. import javax.jms.Destination;
  10. @RunWith(SpringRunner.class)
  11. @SpringBootTest
  12. public class QuartzsApplicationTests {
  13. @Autowired
  14. private Producer producer;
  15. @Test
  16. public void contextLoads() {
  17. Destination destination = new ActiveMQQueue("mytest.queue");
  18. for(int i=0; i<10; i++){
  19. producer.sendMessage(destination, "myname is chhliu!!!");
  20. }
  21. }
  22. }

10.查看测试结果

  1. Consumer2收到的报文为:myname is chhliu!!!
  2. out.queue队列收到的回复报文为:return messagemyname is chhliu!!!
  3. Consumer收到的报文为:myname is chhliu!!!
  4. Consumer2收到的报文为:myname is chhliu!!!
  5. out.queue队列收到的回复报文为:return messagemyname is chhliu!!!
  6. Consumer收到的报文为:myname is chhliu!!!
  7. Consumer2收到的报文为:myname is chhliu!!!
  8. out.queue队列收到的回复报文为:return messagemyname is chhliu!!!
  9. Consumer收到的报文为:myname is chhliu!!!
  10. Consumer2收到的报文为:myname is chhliu!!!
  11. Consumer收到的报文为:myname is chhliu!!!
  12. out.queue队列收到的回复报文为:return messagemyname is chhliu!!!
  13. Consumer2收到的报文为:myname is chhliu!!!
  14. out.queue队列收到的回复报文为:return messagemyname is chhliu!!!
  15. Consumer收到的报文为:myname is chhliu!!!

11.在application.properties下配置activeMQ时需要注意地方

  1. ## URL of the ActiveMQ broker. Auto-generated by default. For instance `tcp://localhost:61616`
  2. # failover:(tcp://localhost:61616,tcp://localhost:61617)
  3. # tcp://localhost:61616
  4. #spring.activemq.broker-url=tcp://localhost:61616
  5. #true时用内置activeMQ,否则用自己本机安装的activeMQ
  6. spring.activemq.in-memory=true
  7. #如果此处设置为true,需要加如下的依赖包,否则会自动配置失败,报JmsMessagingTemplate注入失败
  8. spring.activemq.pool.enabled=false
  9. #默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置
  10. #spring.jms.pub-sub-domain=true

如果大家有什么问题,可以在下方留言,想要源码的可以到我的资源库下载:链接地址

-————————————————————————————一个人的态度,决定他的高度。————————————————————————————-

发表评论

表情:
评论列表 (有 0 条评论,366人围观)

还没有评论,来说两句吧...

相关阅读