springBoot整合activeMQ,并绑定生产者ip

我不是女神ヾ 2023-07-23 09:59 13阅读 0赞

springBoot整合activeMQ,并绑定生产者ip

    • 前言
    • springBoot整合
      • 1.依赖
      • 2.配置参数
      • 3.配置转化器
      • 4.生产消息
      • 5.通配符订阅消息
    • 绑定生产者ip
        • 插件工程依赖
        • activemq.xml

前言

因为物联网项目的毕业设计需要,最好使用mqtt协议通信。
有两个需求:
1.获得物联网设备的ip(生产者ip)
2.使用通配符订阅Queue(因为设备很多,而且是动态新增,主题是不确定的)

springBoot整合

1.依赖

  1. <dependency>
  2. <groupId>org.messaginghub</groupId>
  3. <artifactId>pooled-jms</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-activemq</artifactId>
  8. </dependency>
  9. <!--消息队列连接池-->
  10. <dependency>
  11. <groupId>org.apache.activemq</groupId>
  12. <artifactId>activemq-pool</artifactId>
  13. </dependency>

2.配置参数

  1. #activeMQ
  2. spring.activemq.broker-url=tcp://localhost:61616
  3. #true 表示使用内置的MQ,false则连接服务器
  4. spring.activemq.in-memory=false
  5. #true表示使用连接池;false时,每发送一条数据创建一个连接,内存销毁巨大
  6. spring.activemq.pool.enabled=true
  7. #连接池最大连接数
  8. spring.activemq.pool.max-connections=10

3.配置转化器

springBoot的自动配置默认使用的转化器是在监听到消息时转化为字符串,为了拿到更多信息,我们可以自己定义转化器,转化器需要实现MessageConverter 接口,toMessage方法在发送前会执行,fromMessage方法在监听回调前会执行

  1. @Component
  2. public class IOTMQConverter implements MessageConverter {
  3. @Autowired
  4. private ObjectMapper objectMapper;
  5. @Override
  6. public Message toMessage(Object o, Session session) throws JMSException, MessageConversionException {
  7. try {
  8. String ret = objectMapper.writeValueAsString(o);
  9. return session.createTextMessage(ret);
  10. }catch (Exception e){
  11. return null;
  12. }
  13. }
  14. @Override
  15. public Object fromMessage(Message message) throws JMSException, MessageConversionException {
  16. ActiveMQTextMessage textMessage = (ActiveMQTextMessage) message;
  17. return textMessage;
  18. }
  19. }

然后使用自己定义的convert创建一个DefaultJmsListenerContainerFactory的bean对象就可以了

  1. @Configuration
  2. @AutoConfigureAfter(ActiveMQAutoConfiguration.class)
  3. public class MQTTConfig{
  4. @Bean
  5. public JmsListenerContainerFactory queueContainer(ConnectionFactory connectionFactory, IOTMQConverter iotmqConverter){
  6. DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
  7. factory.setConnectionFactory(connectionFactory);
  8. factory.setPubSubDomain(Boolean.FALSE);//支持queue
  9. factory.setMessageConverter(iotmqConverter);
  10. return factory;
  11. }
  12. }

4.生产消息

  1. @Autowired
  2. private JmsMessagingTemplate jmsMessagingTemplate;
  3. public void solve(String queueName, String message){
  4. jmsMessagingTemplate.convertAndSend(queueName,message);
  5. }

只需要注入自动生成的jmsMessagingTemplate,使用convertAndSend方法即可完成生产

5.通配符订阅消息

activemq支持使用相同前缀,以.结束加上通配符的名称作为主题名来批量订阅(实现原理是他的虚拟主题)。如下订阅 A.*的所有主题(A.1,A.b,A.2,A.ss,A.xxx…)。此处的方法的参数,就是convert的fromMessage最终返回的对象,方法的参数类型需要兼容该对象(是其类型或父类),否则此条消息不会被消费

  1. @Component
  2. public class JmsListener{
  3. @JmsListener(destination = "A.*")
  4. public void solve(ActiveMQTextMessage message){
  5. //todo
  6. }
  7. }

绑定生产者ip

activeMQ提供通过plugin扩展开发的方式,来在mq服务端做一些开发扩展。其使用的是责任链的设计模式,需要先定义一个实现BrokerPlugin的类,其只有一个 installPlugin方法等待重写,在里面使用自己定义的broker过滤器包装当前broker。
自己定义的broker过滤器需要继承BrokerFilter,然后可以重写其中的方法,在消息的不同阶段进行回调处理。创建一个新的maven工程,添加activemq的依赖(可以直接使用all),我这里因为自己要用序列化还用了阿里巴巴的fastjson

插件工程依赖

  1. <dependency>
  2. <groupId>org.apache.activemq</groupId>
  3. <artifactId>activemq-all</artifactId>
  4. <version>5.15.12</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.alibaba</groupId>
  8. <artifactId>fastjson</artifactId>
  9. <version>1.2.61</version>
  10. </dependency>
  11. public interface BrokerPlugin {
  12. Broker installPlugin(Broker broker) throws Exception;
  13. }
  14. public interface Broker extends Region, Service {
  15. Broker getAdaptor(Class<?> type);
  16. BrokerId getBrokerId();
  17. String getBrokerName();
  18. void addBroker(Connection connection, BrokerInfo info);
  19. void removeBroker(Connection connection, BrokerInfo info);
  20. void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception;
  21. void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception;
  22. void addSession(ConnectionContext context, SessionInfo info) throws Exception;
  23. void removeSession(ConnectionContext context, SessionInfo info) throws Exception;
  24. @Override
  25. void addProducer(ConnectionContext context, ProducerInfo info) throws Exception;
  26. @Override
  27. void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception;
  28. .....
  29. }
  30. public class BrokerFilter implements Broker {
  31. protected final Broker next;
  32. public BrokerFilter(Broker next) {
  33. this.next = next;
  34. }
  35. public Broker getNext() {
  36. return next;
  37. }
  38. .....
  39. }

因为我们要完成对ip的绑定,主要在生产的时候,所以重写他的send方法

  1. public class IpPlugin implements BrokerPlugin {
  2. static class IpBindFilter extends BrokerFilter {
  3. public IpBindFilter(Broker next) {
  4. super(next);
  5. }
  6. @Override
  7. public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
  8. if(!(messageSend instanceof ActiveMQTextMessage)){
  9. throw new RuntimeException("not allow,must be json string message");
  10. }
  11. String remoteAddr = producerExchange.getConnectionContext().getConnection().getRemoteAddress();//tcp://127.0.0.1:61468
  12. String ip = remoteAddr.substring(6,remoteAddr.lastIndexOf(':'));
  13. ActiveMQTextMessage textMessage = (ActiveMQTextMessage) messageSend;
  14. String content = textMessage.getText();
  15. JSONObject jsonObj = JSON.parseObject(content);
  16. jsonObj.put("producerIp",ip);
  17. textMessage.setText(jsonObj.toJSONString());
  18. super.send(producerExchange, textMessage);
  19. }
  20. }
  21. public Broker installPlugin(Broker broker) throws Exception {
  22. return new IpBindFilter(broker);
  23. }
  24. }

这样就完成了插件的开发,接下来需要通过maven打包,将jar包,以及插件除了activeMQ的其他依赖的jar包,都放到activemq的lib目录下(比如:本工程用到fastjson),然后修改activemq的conf目录中的activemq.xml,在broker标签中的plugins标签中,加入以下内容(plugins标签不存在就手动创建),bean的id随便写,class包名不要错就行

activemq.xml

  1. <plugins>
  2. <bean xmlns="http://www.springframework.org/schema/beans" id="ipPlugin" class="iot.plugin.IpPlugin"/>
  3. </plugins>

至此就完成了插件的安装和配置,重启activemq就可以了。
附生产测试代码:

  1. public class Main {
  2. public static void main(String[] args) throws JMSException {
  3. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
  4. //2、使用连接工厂创建一个连接对象
  5. Connection connection = connectionFactory.createConnection();
  6. //3、开启连接
  7. connection.start();
  8. //4、使用连接对象创建会话(session)对象
  9. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  10. //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
  11. Queue queue = session.createQueue("myQueue");
  12. //6、使用会话对象创建生产者对象
  13. MessageProducer producer = session.createProducer(queue);
  14. //7、使用会话对象创建一个消息对象
  15. TextMessage textMessage = session.createTextMessage("{author:\"eetal\"}");
  16. //8、发送消息
  17. producer.send(textMessage);
  18. //9、关闭资源
  19. producer.close();
  20. session.close();
  21. connection.close();
  22. }
  23. }

测试结果,把ip带过来了
测试结果
这里我上传了一份到gitee上——gitee上的demo仓库

更多文章,请搜索公众号歪歪梯Club
更多资料,请搜索公众号编程宝可梦

发表评论

表情:
评论列表 (有 0 条评论,13人围观)

还没有评论,来说两句吧...

相关阅读