SpringBoot整合ActiveMQ实现延时消息队列 r囧r小猫 2022-12-03 09:08 613阅读 0赞 1、修改mq配置文件 activemq.xml <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true"> 添加上 schedulerSupport="true" 2、pom文件添加 <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.16.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.16.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> 3、springboot配置文件添加 spring.activemq.broker-url=tcp://localhost:61616 spring.activemq.in-memory=true spring.activemq.pool.enabled=false spring.activemq.password=admin spring.activemq.user=admin 4、添加MQ代码配置 必须步骤 package com.zsj.daka.daka.config; import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.ArrayList; import java.util.List; /** * @Description TODO * @Date 2020/9/2 18:04 * @Author zsj */ @Configuration public class ActiveMqConfig { @Bean public ActiveMQConnectionFactory factory(@Value("${spring.activemq.broker-url}") String url){ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); // 设置信任序列化包集合 List<String> models = new ArrayList<>(); models.add("com.zsj.daka.daka.model"); factory.setTrustedPackages(models); return factory; } } 5、消息类 package com.zsj.daka.daka.model; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; /** * @Description TODO * @Date 2020/9/2 17:56 * @Author zsj */ @Data @Builder @AllArgsConstructor @NoArgsConstructor public class MessageModel implements Serializable { private String message; private String title; } 6、消息生产者 package com.zsj.daka.daka.quartz; /** * @Description TODO * @Date 2020/9/2 17:43 * @Author zsj */ import lombok.extern.slf4j.Slf4j; import org.apache.activemq.ScheduledMessage; import org.apache.activemq.command.ActiveMQQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.jms.JmsProperties; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Service; import javax.jms.*; import java.io.Serializable; @Slf4j @Service("producer") public class Producer { @Autowired // 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装 private JmsMessagingTemplate jmsTemplate; /** * 发送消息 * * @param destination destination是发送到的队列 * @param message message是待发送的消息 */ public <T extends Serializable> void send(Destination destination, T message) { jmsTemplate.convertAndSend(destination, message); } /** * 延时发送 * * @param destination 发送的队列 * @param data 发送的消息 * @param time 延迟时间 */ public <T extends Serializable> void delaySend(Destination destination, T data, Long time) { Connection connection = null; Session session = null; MessageProducer producer = null; // 获取连接工厂 ConnectionFactory connectionFactory = jmsTemplate.getConnectionFactory(); try { // 获取连接 connection = connectionFactory.createConnection(); connection.start(); // 获取session,true开启事务,false关闭事务 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建一个消息队列 producer = session.createProducer(destination); producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue()); ObjectMessage message = session.createObjectMessage(data); //设置延迟时间 message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); // 发送消息 producer.send(message); log.info("发送消息:{}", data); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } } } } 7、消息消费者 package com.zsj.daka.daka.quartz; /** * @Description TODO * @Date 2020/9/2 17:44 * @Author zsj */ import com.zsj.daka.daka.model.MessageModel; import org.springframework.jms.annotation.JmsListener; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Component; import java.util.Date; @Component public class Consumer { // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息 @JmsListener(destination = "mytest.queue") public void receiveQueue(MessageModel messageModel) throws InterruptedException { System.out.println("consumer接收到"+messageModel.getTitle()+"的请求并处理完毕,时间是"+new Date()); } } 8、生产消息入口 package com.zsj.daka.daka.quartz; import com.zsj.daka.daka.model.MessageModel; import org.apache.activemq.command.ActiveMQQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import javax.jms.Destination; /** * @Description TODO * @Date 2020/9/2 17:44 * @Author zsj */ @Controller public class UserController { @Autowired private Producer producer; /** * 消息队列实现方式 */ @RequestMapping("/queue") @ResponseBody public String queue() { Destination destination = new ActiveMQQueue("mytest.queue"); for (int i = 0; i < 10; i++) { MessageModel messageModel = MessageModel.builder() .message("测试消息" + i) .title("消息" + i) .build(); // 发送消息 producer.send(destination, messageModel); } return "queue 发送成功"; } @RequestMapping("/delay") @ResponseBody public String delay() { Destination destination = new ActiveMQQueue("mytest.queue"); for (int i = 0; i < 10; i++) { MessageModel messageModel = MessageModel.builder() .message("测试消息" + i) .title("消息" + i) .build(); // 延时发送消息 单位毫秒 producer.delaySend(destination, messageModel,60*1000L); } return "delay 发送成功"; } }
还没有评论,来说两句吧...