springboot集成ActiveMQ

悠悠 2023-07-18 03:17 72阅读 0赞

一、出现以下异常时,需要注意,receive获取消息时不可有返回值,否则循环报此异常

  1. Execution of JMS message listener failed, and no ErrorHandler has been set.

二、如果使用pool,记得设置为false否则报错

  1. #是否用Pooledconnectionfactory代替普通的ConnectionFactory
  2. spring.activemq.pool.enabled=false

ActiveMQ页面列表说明:
Number Of Pending Messages:消息队列中待处理的消息
Number Of Consumers:消费者的数量
Messages Enqueued:累计进入过消息队列的总量
Messages Dequeued:累计消费过的消息总量

定时调度功能:
直接在发送消息类名之上加上@EnableScheduling,
发送消息的方法之上加上注解@Scheduled(fixedDelay = 5000),fixedDelay单位为毫秒

说明:
需要建立两个springboot项目
广播模式:群发
订阅模式:点对点

  1. <!--消息队列-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-activemq</artifactId>
  5. </dependency>
  6. <!--消息队列连接池-->
  7. <dependency>
  8. <groupId>org.apache.activemq</groupId>
  9. <artifactId>activemq-pool</artifactId>
  10. <version>5.15.0</version>
  11. </dependency>

发布方配置

  1. server:
  2. #Tomcat端口
  3. port: 8099
  4. spring:
  5. activemq:
  6. # MQ所在的服务器的地址
  7. broker-url: tcp://localhost:61616

发布方配置代码

  1. import javax.jms.Queue;
  2. import javax.jms.Topic;
  3. import org.apache.activemq.command.ActiveMQQueue;
  4. import org.apache.activemq.command.ActiveMQTopic;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class BeanConfig {
  9. //定义存放消息的队列
  10. @Bean
  11. public Queue queue() {
  12. return new ActiveMQQueue("ActiveMQQueue");
  13. }
  14. //定义存放消息的队列
  15. @Bean
  16. public Topic topic() {
  17. return new ActiveMQTopic("ActiveMQTopic");
  18. }
  19. }

发布方接口代码

  1. import javax.jms.Queue;
  2. import javax.jms.Topic;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.jms.core.JmsMessagingTemplate;
  5. import org.springframework.web.bind.annotation.RequestMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. @RestController
  8. public class ProviderController {
  9. @Autowired
  10. private JmsMessagingTemplate jmsMessagingTemplate;
  11. @Autowired
  12. private Queue queue;
  13. @Autowired
  14. private Topic topic;
  15. @RequestMapping("/sendqueue")
  16. public void sendqueue(String msg) {
  17. // 指定消息发送的目的地及内容
  18. System.out.println("成功发送订阅消息:" + msg);
  19. this.jmsMessagingTemplate.convertAndSend(this.queue, msg);
  20. }
  21. @RequestMapping("/sendtopic")
  22. public void sendtopic(String msg) {
  23. // 指定消息发送的目的地及内容
  24. System.out.println("成功发送广播消息:" + msg);
  25. this.jmsMessagingTemplate.convertAndSend(this.topic, msg);
  26. }
  27. }

消费方配置

  1. server:
  2. #Tomcat端口
  3. port: 8077
  4. spring:
  5. activemq:
  6. # MQ所在的服务器的地址
  7. broker-url: tcp://localhost:61616
  8. #true开启广播模式,订阅模式无法获取消息
  9. #false开启订阅模式,广播模式无法获取消息
  10. jms:
  11. pub-sub-domain: false

消费方代码

  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.jms.annotation.JmsListener;
  3. import org.springframework.jms.core.JmsMessagingTemplate;
  4. import org.springframework.messaging.handler.annotation.SendTo;
  5. import org.springframework.stereotype.Component;
  6. import javax.jms.JMSException;
  7. import javax.jms.TextMessage;
  8. @Component
  9. public class ConsumerService {
  10. @Autowired
  11. private JmsMessagingTemplate jmsMessagingTemplate;
  12. @JmsListener(destination = "ActiveMQQueue")// 使用JmsListener配置消费者监听的队列,destination是队列名称
  13. // @SendTo("SQueue")// SendTo 会将此方法返回的数据,发送到消息队列 ,否则不设置返回类型-void
  14. public void receviceQueueMsg(String msg) {
  15. System.out.println("成功接受订阅消息:" + msg);
  16. }
  17. @JmsListener(destination="ActiveMQTopic")
  18. public void receviceTopicMsg(TextMessage msg) { //注:此处方法不可有返回值
  19. String str = "";
  20. try {
  21. str = msg.getText();
  22. System.out.println("成功接收广播消息:"+ msg.getText());
  23. } catch (JMSException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. }

发表评论

表情:
评论列表 (有 0 条评论,72人围观)

还没有评论,来说两句吧...

相关阅读