Springboot整合activemq(二):收发具有优先级的队列方法 客官°小女子只卖身不卖艺 2022-04-03 11:11 777阅读 1赞 在使用mq作为中间件做异步消息推送时,可能会遇到一个场景,就是消息在消费后执行一系列的逻辑到一半,突然遇到异常或者是断电等之类问题,这时消息在mq的队列中已经出队列,而正常逻辑没有执行完就异常终止,这样就可能会造成数据的缺失和数据的不完整,如何解决这个问题?其实挺简单的,就是在消息进入消费者消费的同时做一个记录,再在逻辑执行完成后再删除这条记录或者是改变这条记录的状态,同时,在项目初始化时或者是执行一个定时任务扫描这个记录表,如果存在则产生一条相同的记录发送到activemq中(相同的队列)。这样就能解决这个问题。然而,今天在开发中我又思考到一个问题,如图:![20181221222343887.jpg][] 假设消息1没有消费完成,则消息1要重新进入队列,此时有一条消息4跟消息1操作的是同一条数据,如果消息1再进入队列,则4会在1之前消费掉,此时数据就会发生错乱,如果要保证消息1在消息4之前消费,则就需要对重新入队列的消息1进行一些操作来使得消息1优先消费。 activemq是提供对队列设置为具有优先级消息属性的功能,那么下面就要来实现具有优先级属性的消息队列: 第一步,进入activemq的conf文件夹下,修改activemq.xml配置文件,在<policyEntries>标签下插入一个配置: <policyEntry queue="queue1" strictOrderDispatch="true" useCache="false" queuePrefetch="1" prioritizedMessages="true" /> 完成后保存,并重启mq服务。这条配置就配置了对于queue1这个队列内的消息是具有优先级的属性的。 第二步,整合一个springboot+activemq的demo项目用于测试。 第三步,就是具体的代码实现了,首先我们来关注下springboot整合activemq的源码: 1.JmsMessagingTemplate这个类是实现发送消息的主要类,正常我们只需要在producer中注入这个类,调用其中发送消息的方法就能够实现消息的发送。具体代码如下: import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Service; import javax.jms.Destination; @Service public class TestProducer { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; public void sendMsg(Destination destination, String text){ jmsMessagingTemplate.convertAndSend(destination,text); } } 但是,此消息是不具有优先级的,接着往下看源码。 2.在JmsMessagingTemplate类中实际上是封装了一个JmsTemplate这个类,实际上这个类才是真正实现消息发送的类,它只是将它进一步封装而已。 package org.springframework.jms.core; import java.util.Map; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Session; import org.springframework.beans.factory.InitializingBean; import org.springframework.jms.InvalidDestinationException; import org.springframework.jms.JmsException; import org.springframework.jms.support.converter.MessageConverter; import org.springframework.jms.support.converter.MessagingMessageConverter; import org.springframework.jms.support.destination.DestinationResolutionException; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessagingException; import org.springframework.messaging.converter.MessageConversionException; import org.springframework.messaging.core.AbstractMessagingTemplate; import org.springframework.messaging.core.MessagePostProcessor; import org.springframework.util.Assert; public class JmsMessagingTemplate extends AbstractMessagingTemplate<Destination> implements JmsMessageOperations, InitializingBean { @Nullable private JmsTemplate jmsTemplate; private MessageConverter jmsMessageConverter = new MessagingMessageConverter(); private boolean converterSet; @Nullable private String defaultDestinationName; public JmsMessagingTemplate() { } public JmsMessagingTemplate(ConnectionFactory connectionFactory) { this.jmsTemplate = new JmsTemplate(connectionFactory); } public JmsMessagingTemplate(JmsTemplate jmsTemplate) { Assert.notNull(jmsTemplate, "JmsTemplate must not be null"); this.jmsTemplate = jmsTemplate; } public void setConnectionFactory(ConnectionFactory connectionFactory) { if (this.jmsTemplate != null) { this.jmsTemplate.setConnectionFactory(connectionFactory); } else { this.jmsTemplate = new JmsTemplate(connectionFactory); } } @Nullable public ConnectionFactory getConnectionFactory() { return this.jmsTemplate != null ? this.jmsTemplate.getConnectionFactory() : null; } public void setJmsTemplate(@Nullable JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } @Nullable public JmsTemplate getJmsTemplate() { return this.jmsTemplate; } public void setJmsMessageConverter(MessageConverter jmsMessageConverter) { Assert.notNull(jmsMessageConverter, "MessageConverter must not be null"); this.jmsMessageConverter = jmsMessageConverter; this.converterSet = true; } public MessageConverter getJmsMessageConverter() { return this.jmsMessageConverter; } public void setDefaultDestinationName(@Nullable String defaultDestinationName) { this.defaultDestinationName = defaultDestinationName; } @Nullable public String getDefaultDestinationName() { return this.defaultDestinationName; } public void afterPropertiesSet() { Assert.notNull(this.jmsTemplate, "Property 'connectionFactory' or 'jmsTemplate' is required"); if (!this.converterSet && this.jmsTemplate.getMessageConverter() != null) { ((MessagingMessageConverter)this.jmsMessageConverter).setPayloadConverter(this.jmsTemplate.getMessageConverter()); } } private JmsTemplate obtainJmsTemplate() { Assert.state(this.jmsTemplate != null, "No JmsTemplate set"); return this.jmsTemplate; } public void send(Message<?> message) { Destination defaultDestination = (Destination)this.getDefaultDestination(); if (defaultDestination != null) { this.send(defaultDestination, message); } else { this.send(this.getRequiredDefaultDestinationName(), message); } } public void convertAndSend(Object payload) throws MessagingException { this.convertAndSend((Object)payload, (MessagePostProcessor)null); } public void convertAndSend(Object payload, @Nullable MessagePostProcessor postProcessor) throws MessagingException { Destination defaultDestination = (Destination)this.getDefaultDestination(); if (defaultDestination != null) { this.convertAndSend((Object)defaultDestination, payload, (MessagePostProcessor)postProcessor); } else { this.convertAndSend(this.getRequiredDefaultDestinationName(), payload, postProcessor); } } public void send(String destinationName, Message<?> message) throws MessagingException { this.doSend(destinationName, message); } public void convertAndSend(String destinationName, Object payload) throws MessagingException { this.convertAndSend(destinationName, payload, (Map)null); } public void convertAndSend(String destinationName, Object payload, @Nullable Map<String, Object> headers) throws MessagingException { this.convertAndSend(destinationName, payload, headers, (MessagePostProcessor)null); } public void convertAndSend(String destinationName, Object payload, @Nullable MessagePostProcessor postProcessor) throws MessagingException { this.convertAndSend(destinationName, payload, (Map)null, postProcessor); } public void convertAndSend(String destinationName, Object payload, @Nullable Map<String, Object> headers, @Nullable MessagePostProcessor postProcessor) throws MessagingException { Message<?> message = this.doConvert(payload, headers, postProcessor); this.send(destinationName, message); } @Nullable public Message<?> receive() { Destination defaultDestination = (Destination)this.getDefaultDestination(); return defaultDestination != null ? this.receive(defaultDestination) : this.receive(this.getRequiredDefaultDestinationName()); } @Nullable public <T> T receiveAndConvert(Class<T> targetClass) { Destination defaultDestination = (Destination)this.getDefaultDestination(); return defaultDestination != null ? this.receiveAndConvert(defaultDestination, targetClass) : this.receiveAndConvert(this.getRequiredDefaultDestinationName(), targetClass); } @Nullable public Message<?> receive(String destinationName) throws MessagingException { return this.doReceive(destinationName); } @Nullable public <T> T receiveAndConvert(String destinationName, Class<T> targetClass) throws MessagingException { Message<?> message = this.doReceive(destinationName); return message != null ? this.doConvert(message, targetClass) : null; } @Nullable public Message<?> sendAndReceive(Message<?> requestMessage) { Destination defaultDestination = (Destination)this.getDefaultDestination(); return defaultDestination != null ? this.sendAndReceive(defaultDestination, requestMessage) : this.sendAndReceive(this.getRequiredDefaultDestinationName(), requestMessage); } @Nullable public Message<?> sendAndReceive(String destinationName, Message<?> requestMessage) throws MessagingException { return this.doSendAndReceive(destinationName, requestMessage); } @Nullable public <T> T convertSendAndReceive(String destinationName, Object request, Class<T> targetClass) throws MessagingException { return this.convertSendAndReceive(destinationName, request, (Map)null, (Class)targetClass); } @Nullable public <T> T convertSendAndReceive(Object request, Class<T> targetClass) { return this.convertSendAndReceive((Object)request, (Class)targetClass, (MessagePostProcessor)null); } @Nullable public <T> T convertSendAndReceive(String destinationName, Object request, @Nullable Map<String, Object> headers, Class<T> targetClass) throws MessagingException { return this.convertSendAndReceive(destinationName, request, headers, targetClass, (MessagePostProcessor)null); } @Nullable public <T> T convertSendAndReceive(Object request, Class<T> targetClass, @Nullable MessagePostProcessor postProcessor) { Destination defaultDestination = (Destination)this.getDefaultDestination(); return defaultDestination != null ? this.convertSendAndReceive((Object)defaultDestination, request, (Class)targetClass, (MessagePostProcessor)postProcessor) : this.convertSendAndReceive(this.getRequiredDefaultDestinationName(), request, targetClass, postProcessor); } @Nullable public <T> T convertSendAndReceive(String destinationName, Object request, Class<T> targetClass, @Nullable MessagePostProcessor requestPostProcessor) throws MessagingException { return this.convertSendAndReceive(destinationName, request, (Map)null, targetClass, requestPostProcessor); } @Nullable public <T> T convertSendAndReceive(String destinationName, Object request, @Nullable Map<String, Object> headers, Class<T> targetClass, @Nullable MessagePostProcessor postProcessor) { Message<?> requestMessage = this.doConvert(request, headers, postProcessor); Message<?> replyMessage = this.sendAndReceive(destinationName, requestMessage); return replyMessage != null ? this.getMessageConverter().fromMessage(replyMessage, targetClass) : null; } protected void doSend(Destination destination, Message<?> message) { try { this.obtainJmsTemplate().send(destination, this.createMessageCreator(message)); } catch (JmsException var4) { throw this.convertJmsException(var4); } } protected void doSend(String destinationName, Message<?> message) { try { this.obtainJmsTemplate().send(destinationName, this.createMessageCreator(message)); } catch (JmsException var4) { throw this.convertJmsException(var4); } } @Nullable protected Message<?> doReceive(Destination destination) { try { javax.jms.Message jmsMessage = this.obtainJmsTemplate().receive(destination); return this.convertJmsMessage(jmsMessage); } catch (JmsException var3) { throw this.convertJmsException(var3); } } @Nullable protected Message<?> doReceive(String destinationName) { try { javax.jms.Message jmsMessage = this.obtainJmsTemplate().receive(destinationName); return this.convertJmsMessage(jmsMessage); } catch (JmsException var3) { throw this.convertJmsException(var3); } } @Nullable protected Message<?> doSendAndReceive(Destination destination, Message<?> requestMessage) { try { javax.jms.Message jmsMessage = this.obtainJmsTemplate().sendAndReceive(destination, this.createMessageCreator(requestMessage)); return this.convertJmsMessage(jmsMessage); } catch (JmsException var4) { throw this.convertJmsException(var4); } } @Nullable protected Message<?> doSendAndReceive(String destinationName, Message<?> requestMessage) { try { javax.jms.Message jmsMessage = this.obtainJmsTemplate().sendAndReceive(destinationName, this.createMessageCreator(requestMessage)); return this.convertJmsMessage(jmsMessage); } catch (JmsException var4) { throw this.convertJmsException(var4); } } private JmsMessagingTemplate.MessagingMessageCreator createMessageCreator(Message<?> message) { return new JmsMessagingTemplate.MessagingMessageCreator(message, this.getJmsMessageConverter()); } protected String getRequiredDefaultDestinationName() { String name = this.getDefaultDestinationName(); if (name == null) { throw new IllegalStateException("No 'defaultDestination' or 'defaultDestinationName' specified. Check configuration of JmsMessagingTemplate."); } else { return name; } } @Nullable protected Message<?> convertJmsMessage(@Nullable javax.jms.Message message) { if (message == null) { return null; } else { try { return (Message)this.getJmsMessageConverter().fromMessage(message); } catch (Exception var3) { throw new MessageConversionException("Could not convert '" + message + "'", var3); } } } protected MessagingException convertJmsException(JmsException ex) { if (!(ex instanceof DestinationResolutionException) && !(ex instanceof InvalidDestinationException)) { return (MessagingException)(ex instanceof org.springframework.jms.support.converter.MessageConversionException ? new MessageConversionException(ex.getMessage(), ex) : new MessagingException(ex.getMessage(), ex)); } else { return new org.springframework.messaging.core.DestinationResolutionException(ex.getMessage(), ex); } } private static class MessagingMessageCreator implements MessageCreator { private final Message<?> message; private final MessageConverter messageConverter; public MessagingMessageCreator(Message<?> message, MessageConverter messageConverter) { this.message = message; this.messageConverter = messageConverter; } public javax.jms.Message createMessage(Session session) throws JMSException { try { return this.messageConverter.toMessage(this.message, session); } catch (Exception var3) { throw new MessageConversionException("Could not convert '" + this.message + "'", var3); } } } } 我们可以跟踪方法的执行,进入到JmsTemplate这个类中,发现里面336行封装着一个方法。 protected void doSend(MessageProducer producer, Message message) throws JMSException { if (this.deliveryDelay >= 0L) { producer.setDeliveryDelay(this.deliveryDelay); } if (this.isExplicitQosEnabled()) { producer.send(message, this.getDeliveryMode(), this.getPriority(), this.getTimeToLive()); } else { producer.send(message); } } 这里就可以看到producer这个对象调用的send方法中有设置几个参数: (1)、message不用说了,就是要发送的消息内容; (2)、Delivery Mode翻译过来就是发送模式发送模式; (3)、priority!这就是我们要找的优先级,传入0-9整数,数字越大优先级越高; (4)、timeToLive延时发送时间; Ok,到这里我们就可以看到,如果要执行这个方法就只需要isExplicitQosEnabled()这个方法返回值为true。 再看代码的第175行的方法: public void setQosSettings(QosSettings settings) { Assert.notNull(settings, "Settings must not be null"); this.setExplicitQosEnabled(true); this.setDeliveryMode(settings.getDeliveryMode()); this.setPriority(settings.getPriority()); this.setTimeToLive(settings.getTimeToLive()); } 这个方法就是设置上面几个参数的设置,只需传入QosSettings这个对象就行。 来看JmsTemplate这个类的构造方法: public JmsTemplate() { this.transactionalResourceFactory = new JmsTemplate.JmsTemplateResourceFactory(); this.messageIdEnabled = true; this.messageTimestampEnabled = true; this.pubSubNoLocal = false; this.receiveTimeout = 0L; this.deliveryDelay = -1L; this.explicitQosEnabled = false; this.deliveryMode = 2; this.priority = 4; this.timeToLive = 0L; this.initDefaultStrategies(); } 可以看到几个参数的默认值,也就是没有执行任何操作时如果将explicitQosEnabled设置为true则该消息发送出去优先级默认为4。 3.所以我们将生产者代码进行修改: import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.support.QosSettings; import org.springframework.stereotype.Service; import javax.jms.DeliveryMode; import javax.jms.Destination; @Service public class TestProducer { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; public void sendMsg(Destination destination, String text, int priority){ //获取jmsTemplate对象 JmsTemplate jmsTemplate = jmsMessagingTemplate.getJmsTemplate(); //创建QosSettings对象 QosSettings settings = new QosSettings(); //设置优先级 settings.setPriority(priority); //设置发送模式 settings.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //设置延时发送时间 settings.setTimeToLive(1000L); //将设置传入 jmsTemplate.setQosSettings(settings); //发送消息 jmsMessagingTemplate.convertAndSend(destination,text); } } 这样生产者就写完了。 4.测试 首先,先写一个消费者,监听队列queue1: import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; @Component public class TestConsumer { @JmsListener(destination = "queue1") public void receiveTest(String text){ System.out.println("接收到queue1发送的消息:"+text); } } 再写一个controller类可以通过前端调用发送消息的方法: import com.example.springboot.mq.demo.producer.TestProducer; import org.apache.activemq.command.ActiveMQQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.jms.Destination; import java.util.Random; @RestController public class TestController { @Autowired private TestProducer producer; @RequestMapping("/test") public String test(){ Destination destination = new ActiveMQQueue("queue1"); Random random = new Random(10); for (int i=0;i < 100;i++){ //随机产生优先级数字 int priority = random.nextInt(9); producer.sendMsg(destination,"消息No."+i+"优先级为:"+priority,priority); } return "发送成功"; } } 然后启动项目进行测试,测试结果: 发送消息优先级为:0 发送消息优先级为:6 发送消息优先级为:6 发送消息优先级为:0 发送消息优先级为:4 发送消息优先级为:1 发送消息优先级为:1 发送消息优先级为:4 发送消息优先级为:3 发送消息优先级为:1 发送消息优先级为:1 发送消息优先级为:2 发送消息优先级为:6 发送消息优先级为:6 发送消息优先级为:3 发送消息优先级为:3 发送消息优先级为:1 发送消息优先级为:1 发送消息优先级为:4 发送消息优先级为:7 发送消息优先级为:8 发送消息优先级为:7 发送消息优先级为:6 发送消息优先级为:8 发送消息优先级为:5 发送消息优先级为:1 发送消息优先级为:5 发送消息优先级为:5 发送消息优先级为:8 发送消息优先级为:8 发送消息优先级为:2 发送消息优先级为:8 发送消息优先级为:6 发送消息优先级为:8 发送消息优先级为:7 发送消息优先级为:1 发送消息优先级为:7 发送消息优先级为:1 发送消息优先级为:0 发送消息优先级为:4 发送消息优先级为:8 发送消息优先级为:1 发送消息优先级为:8 发送消息优先级为:0 发送消息优先级为:7 发送消息优先级为:5 发送消息优先级为:6 发送消息优先级为:0 发送消息优先级为:2 发送消息优先级为:4 接收到的消息: 消息内容:1优先级为:6 消息内容:20优先级为:8 消息内容:23优先级为:8 消息内容:28优先级为:8 消息内容:29优先级为:8 消息内容:31优先级为:8 消息内容:33优先级为:8 消息内容:40优先级为:8 消息内容:42优先级为:8 消息内容:19优先级为:7 消息内容:21优先级为:7 消息内容:34优先级为:7 消息内容:36优先级为:7 消息内容:44优先级为:7 消息内容:2优先级为:6 消息内容:12优先级为:6 消息内容:13优先级为:6 消息内容:22优先级为:6 消息内容:32优先级为:6 消息内容:46优先级为:6 消息内容:24优先级为:5 消息内容:26优先级为:5 消息内容:27优先级为:5 消息内容:45优先级为:5 消息内容:4优先级为:4 消息内容:7优先级为:4 消息内容:18优先级为:4 消息内容:39优先级为:4 消息内容:49优先级为:4 消息内容:8优先级为:3 消息内容:14优先级为:3 消息内容:15优先级为:3 消息内容:11优先级为:2 消息内容:30优先级为:2 消息内容:48优先级为:2 消息内容:5优先级为:1 消息内容:6优先级为:1 消息内容:9优先级为:1 消息内容:10优先级为:1 消息内容:16优先级为:1 消息内容:17优先级为:1 消息内容:25优先级为:1 消息内容:35优先级为:1 消息内容:37优先级为:1 消息内容:41优先级为:1 消息内容:0优先级为:0 消息内容:3优先级为:0 消息内容:38优先级为:0 消息内容:43优先级为:0 消息内容:47优先级为:0 结果显示,除第一条外,其他消息的接收全部是按消息的优先级出队列。所以大功告成,至于第一个消息为什么没有按优先级出队列后续我再研究一下。 [20181221222343887.jpg]: /images/20220403/c796e12254be4a9eba1aa47c2e1bc6fb.png
相关 SpringBoot整合ActiveMQ ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQy 谁借莪1个温暖的怀抱¢/ 2022年12月31日 13:21/ 0 赞/ 150 阅读
相关 SpringBoot整合ActiveMQ实现延时消息队列 1、修改mq配置文件 activemq.xml <broker xmlns="http://activemq.apache.org/schema/core" brok r囧r小猫/ 2022年12月03日 09:08/ 0 赞/ 677 阅读
相关 SpringBoot整合ActiveMQ ActiveMQ简介 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS 川长思鸟来/ 2022年05月24日 06:46/ 0 赞/ 299 阅读
相关 springboot整合activemq 第一步:导入jar包 <dependency> <groupId>org.springframework.boot</groupId> 悠悠/ 2022年05月14日 08:36/ 0 赞/ 303 阅读
相关 具有优先级的线程池 问题由来: 多线程接收kafka的消息,有时消息几乎同时达到,先简单处理后提交给线程池再次处理,结果出现当先到达的消息msgA和后到达的消息msgB到达时间相差很小时,例如 小灰灰/ 2022年05月11日 13:34/ 0 赞/ 264 阅读
相关 Springboot整合activemq(二):收发具有优先级的队列方法 在使用mq作为中间件做异步消息推送时,可能会遇到一个场景,就是消息在消费后执行一系列的逻辑到一半,突然遇到异常或者是断电等之类问题,这时消息在mq的队列中已经出队列,而正常逻辑 客官°小女子只卖身不卖艺/ 2022年04月03日 11:11/ 1 赞/ 778 阅读
相关 springboot 整合activeMq 消息队列概述 常用的消息队列的对比 整合的步骤 业务场景,用户登录之后 给用户 发送短信,邮件,增加积分 pom文件配置 <dependenc 太过爱你忘了你带给我的痛/ 2022年03月11日 14:52/ 0 赞/ 366 阅读
相关 SpringBoot整合ActiveMQ 1. 引入Jar包 <!-- ActiveMQ --> <dependency> <groupId>org.springframework. ╰半橙微兮°/ 2021年11月04日 19:32/ 0 赞/ 402 阅读
相关 SpringBoot--整合ActiveMQ 1. 创建项目,添加ActiveMQ依赖 <dependency> <groupId>org.springframework.bo 「爱情、让人受尽委屈。」/ 2021年08月24日 15:59/ 0 赞/ 482 阅读
相关 ActiveMQ--SpringBoot整合ActiveMQ队列 整合生产者 创建Maven工程 pom.xml <?xml version="1.0" encoding="UTF-8"?> <project x 叁歲伎倆/ 2021年08月13日 17:40/ 0 赞/ 511 阅读