springboot开发笔记(5.2 整合rocketmq) 悠悠 2022-05-31 02:38 125阅读 0赞 ### 1.前言 ### 项目中用到阿里的rocketmq,本以为已经和springboot集成好了,找了一圈也没找到,只好自己写一个凑合先用用。用的还是以前的项目,加入了rocketmq。这里没有具体研究rocketmq,只是单纯的用了单master,pushConsumer做了监听消息。 代码地址:[https://github.com/bjjoy/joy\_bms][https_github.com_bjjoy_joy_bms] ### 2.准备 ### (1)根据springboot开发笔记(1)新建项目 (2)根据springboot开发笔记(5.1)写个producer,这里用的就是5.1里边的 (3)根据springboot开发笔记(5.1)开启rockemq (4)主要工作是根据springboot开发笔记(5.1)中的consumer,写一个类似工具consumer demo ### 3.目录结构 ### ![这里写图片描述][SouthEast] 说明:红框部分是每新加一个不同业务逻辑的consumer,都要创建的两个class。 (1)UserMqConsumer要继承BaseMqConsumer,具体监听消息,属性可配置到properties,这里采用springboot配置方式 (2)UserMqMessageService要继承MqMessageProcessor,具体业务消费信息过程 (3)其它3个class,直接copy使用即可 ### 4.代码 ### (1)pom.xml添加内容(如果有,不需要添加) <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.5.8</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.29</version> </dependency> (2)BaseMqConsumer.class package cn.bjjoy.web.bms.rocketmq; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; /** * 创建consumer基础类 * @author bjjoy * @date 2018/2/4 **/ public abstract class BaseMqConsumer { /** * 消费组名称 * @return */ public abstract String getConsumerGroup(); /** * NameSrvAddr * @return String */ public abstract String getNameSrvAddr(); /** * topic * @return */ public abstract String getTopic(); /** * tag或者subExpression * @return String */ public abstract String getSubExpression(); /** * 添加消费者监听消息 * @param mqMessageProcessor * @return DefaultMQPushConsumer 返回值没有意义,主要是方法里边内容 * @throws InterruptedException * @throws MQClientException */ public DefaultMQPushConsumer getConsumer(MqMessageProcessor mqMessageProcessor) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getConsumerGroup()); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setNamesrvAddr(getNameSrvAddr()); consumer.subscribe(getTopic(), getSubExpression()); consumer.registerMessageListener(new MqMessageListener(mqMessageProcessor)); consumer.start(); return consumer; } } (3)MqMessageListener.class package cn.bjjoy.web.bms.rocketmq; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; /** * 创建consumer需要的messageListener * @author bjjoy * @date 2018/2/4 **/ public class MqMessageListener implements MessageListenerConcurrently{ private MqMessageProcessor mqMessageProcessor; public void setMqMessageProcessor(MqMessageProcessor mqMessageProcessor) { this.mqMessageProcessor = mqMessageProcessor; } public MqMessageListener(MqMessageProcessor mqMessageProcessor) { this.mqMessageProcessor = mqMessageProcessor; } @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs){ boolean result = mqMessageProcessor.handleMessage(msg); if (!result){ return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } (4)MqMessageProcessor.class package cn.bjjoy.web.bms.rocketmq; import com.alibaba.rocketmq.common.message.MessageExt; /** * 创建MqMessageListener 需要的消息处理过程 * 新建consumer需要实现该接口 * @author bjjoy * @date 2018/2/4 **/ public interface MqMessageProcessor { /** * 处理消息的接口 * @param messageExt * @return */ boolean handleMessage(MessageExt messageExt); } (5)UserMqConsumer.class 这里用了springboot @Configuration,通过@Bean,完成启动后consumer的配置 package cn.bjjoy.web.bms.rocketmq.consumer; import cn.bjjoy.web.bms.rocketmq.BaseMqConsumer; import cn.bjjoy.web.bms.rocketmq.service.UserMqMessageService; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.exception.MQClientException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 用户相关mq消费,需要定义自己的属性 * @author bjjoy * @date 2018/2/3 **/ @Configuration @ConfigurationProperties(prefix = "joy.rocketmq.user") public class UserMqConsumer extends BaseMqConsumer { private final String consumerGroup = "user_group"; private final String nameSrvAddr = "127.0.0.1:9876"; private final String topic = "user_topic"; private final String subExpression = "TagA"; /** * 实际消息处理过程 */ @Autowired UserMqMessageService userMqMessageService; @Override public String getConsumerGroup() { return consumerGroup; } @Override public String getNameSrvAddr() { return nameSrvAddr; } @Override public String getTopic() { return topic; } @Override public String getSubExpression() { return subExpression; } @Bean public DefaultMQPushConsumer getUserConsumer() throws MQClientException, InterruptedException { return super.getConsumer(userMqMessageService); } } (6)UserMqMessageService.class 具体消费信息进行的操作,这里啥都没做,就是打印输出了 package cn.bjjoy.web.bms.rocketmq.service; import cn.bjjoy.web.bms.rocketmq.MqMessageProcessor; import com.alibaba.rocketmq.common.message.MessageExt; import org.springframework.stereotype.Service; import java.io.UnsupportedEncodingException; /** * 具体消费信息处理过程,例更新数据库 * @author bjjoy * @date 2018/2/4 **/ @Service public class UserMqMessageService implements MqMessageProcessor { @Override public boolean handleMessage(MessageExt messageExt) { try { System.out.println("consumer-service=============="); System.out.println(new String(messageExt.getBody(),"utf-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return false; } } ### 5.后续 ### (1)感觉这个工具用起来还是复杂,没有那种注解的方式简洁,但是注解方式还需要研究。有新的消息处理时,这里暂时就先按照例子写2个class。 (2)上面内容涉及的rocketmq内容很少,单纯的获取了消息,需要完善consumer功能。 (3)需要加一个producer的整合。这个相对简单一点,按照springboot开发笔记(5.1)写一个BaseProducer,每多加一个producer就新建一个class继承BaseProducer即可,同样可以用@Configuration来完成。 [https_github.com_bjjoy_joy_bms]: https://github.com/bjjoy/joy_bms [SouthEast]: /images/20220531/2cfa65014fab43228edb820aeefef3d8.png
还没有评论,来说两句吧...