消息队列 : spring-boot 集成 rabbitmq

r囧r小猫 2021-10-09 10:14 478阅读 0赞

介绍:

RabbitMQ是基于Erlang语言编写的开源消息队列,通过Erlang的Actor模型实现了数据的稳定可靠传输。本身是实现AMQP的消息队列,因此官方推荐,如果仅仅是使用RabbitMQ的话,建议使用AMQP 0-9-1的协议。不过,因为其可扩展性,可以通过插件的形式使用STOMP、XMPP、AMQP 1.0,还可以通过插件使用HTTP这种非消息的传输协议。所以,RabbitMQ可以说是适应性非常强的一个消息队列中间件了。

当然,不仅是协议支持的多,还因为它实现了代理(Broker)架构,意味着消息在发送到客户端之前可以在中央节点上排队。此特性使得RabbitMQ易于使用和部署,适宜于很多场景如路由、负载均衡或消息持久化等,用消息队列只需几行代码即可搞定。但是,这使得它的可扩展性差,速度较慢,因为中央节点增加了延迟,消息封装后也比较大,如需配置RabbitMQ则需要在目标机器上安装Erlang环境。

总的来说,RabbitMQ在数据一致性、稳定性和可靠性方面比较优秀,而且直接或间接的支持多种协议,对多种语言支持良好。但是其性能和吞吐量差强人意,由于Erlang语言本身的限制,二次开发成本较高

代码部分:

1、增加rabbitmq的依赖包

  1. <!-- ampq 依赖包 -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

2、application.xml文件中配置

  1. #RabbitMQ配置信息
  2. #spring.application.name=spirng-boot-rabbitmq
  3. spring.rabbitmq.host=39.108.***.****
  4. spring.rabbitmq.port=*****
  5. spring.rabbitmq.username=****
  6. spring.rabbitmq.password=******
  7. spring.rabbitmq.virtual-host=/hotcomm_manager
  8. spring.rabbitmq.queue=hotcomm.manager.qingdao

3、RabbitMq的工厂连接和模板创建

  1. @Configuration
  2. public class RabbitMQConfig {
  3. /**
  4. * 注入配置文件属性
  5. */
  6. @Value("${spring.rabbitmq.host}")
  7. private String host;
  8. @Value("${spring.rabbitmq.port}")
  9. private Integer port;
  10. @Value("${spring.rabbitmq.username}")
  11. private String username;
  12. @Value("${spring.rabbitmq.password}")
  13. private String password;
  14. @Value("${spring.rabbitmq.virtual-host}")
  15. private String virtualhost;
  16. @Value("${spring.rabbitmq.queue}")
  17. private String queue;
  18. @Autowired
  19. SocketService socketService;
  20. @Autowired
  21. EventService eventService;
  22. @Autowired
  23. TMessageLogService MessageLogService;
  24. @Autowired
  25. AppPushService appPushService;
  26. @Autowired
  27. AppPushMsgMapper appPushMsgMapper;
  28. /**
  29. * 创建 ConnectionFactory
  30. *
  31. * @return
  32. * @throws Exception
  33. */
  34. @Bean
  35. public ConnectionFactory creatConnectionFactory() throws Exception {
  36. CachingConnectionFactory factory = new CachingConnectionFactory();
  37. factory.setHost(host);
  38. factory.setVirtualHost(virtualhost);
  39. factory.setPort(port);
  40. factory.setUsername(username);
  41. factory.setPassword(password);
  42. return factory;
  43. }
  44. //rabbitmq的模板配置
  45. @Bean
  46. public RabbitTemplate receiveRabbitTemplate(ConnectionFactory connectionFactory) {
  47. RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory);
  48. return firstRabbitTemplate;
  49. }
  50. //消费者
  51. @Bean
  52. public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
  53. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
  54. container.setQueues(new Queue[] { new Queue(queue) }); //设置监听的队列
  55. container.setExposeListenerChannel(true);
  56. container.setMaxConcurrentConsumers(5);
  57. container.setConcurrentConsumers(1);
  58. container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
  59. container.setMessageListener(new Receiver());
  60. return container;
  61. }
  62. class Receiver implements ChannelAwareMessageListener {
  63. @Override
  64. public void onMessage(Message message, Channel channel) throws Exception {
  65. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
  66. byte[] body = message.getBody();
  67. String ss = new String(body);
  68. System.out.println(ss);
  69. publishMessage(ss);
  70. }
  71. }
  72. /**
  73. * 解析message信息,确定设备类型,查询指定表,确定责任人,发送信息(测试)
  74. *
  75. * @param message
  76. */
  77. public void publishMessage(String message) throws MyException {
  78. TMessageLog tMessageLog = new TMessageLog();
  79. Gson gson = new Gson();
  80. T_dev_alarm dev_alarm = gson.fromJson(message, T_dev_alarm.class);
  81. // 查询当前设备的责任人(安装位置等)
  82. AlarmDev dev = eventService.selectDevForUser(dev_alarm.getDeviceid(), dev_alarm.getModuleid());
  83. dev.setAlarmtime(ConverUtil.dateForString(new Date()));
  84. String json = JSONUtil.toJson(dev);
  85. tMessageLog.setMessage(json);
  86. tMessageLog.setReceiverid(dev.getOwnid());
  87. tMessageLog.setSendtime(ConverUtil.dateForString(new Date()));
  88. tMessageLog.setUserid(0);
  89. // String s="{\"systate\":,\"message\":"+tMessageLog+"}";
  90. // 推送指定用户
  91. // socketService.sendMessageToOne(tMessageLog);
  92. // 广播
  93. socketService.sendMessageToAll(tMessageLog);
  94. if (dev.getOwnid() != null && !dev.getOwnid().equals(null) && !dev.getOwnid().equals("")) {
  95. String[] uid = dev.getOwnid().split(",");
  96. List<String> list = new ArrayList<>();
  97. for (String u : uid) {
  98. if (!list.contains(u)) {
  99. list.add(u);
  100. }
  101. }
  102. for (int i = 0; i < list.size(); i++) {
  103. // 查出用户regid
  104. T_hk_apppush t_hk_apppush = appPushService.selectRegid(Integer.valueOf(list.get(i)));
  105. int code = 0;
  106. if (t_hk_apppush != null) {
  107. // 向单个用户推送报警消息
  108. dev.setId(dev_alarm.getId());
  109. code = PushUtil.sendAllsetNotification("报警消息", "设备:" + dev.getDevnum() + "发生报警,请及时查看处理",
  110. JSONUtil.toJson(ApiResult.resultInfo("0", "报警", dev)), t_hk_apppush.getRegid(), 86400);
  111. }
  112. if (code == 0 || code == 201) {// 201推送失败
  113. // 推送失败的时候,把推送信息存进数据库,等下次登录的时候从数据库取出推送
  114. T_hk_apppush_msg t_hk_apppush_msg = new T_hk_apppush_msg();// 推送消息储存表
  115. t_hk_apppush_msg.setTitle("报警消息");
  116. t_hk_apppush_msg.setContent("设备:" + dev.getDevnum() + "发生报警,请及时查看处理");
  117. t_hk_apppush_msg.setMessage(JSONUtil.toJson(ApiResult.resultInfo("0", "报警", dev)));
  118. t_hk_apppush_msg.setRegids("0");
  119. t_hk_apppush_msg.setTimeToLive(String.valueOf(86400));
  120. t_hk_apppush_msg.setUserid(Integer.valueOf(list.get(i)));
  121. // 插入推送消息数据库
  122. appPushMsgMapper.insertSelective(t_hk_apppush_msg);
  123. }
  124. }
  125. }
  126. }
  127. }

发表评论

表情:
评论列表 (有 0 条评论,478人围观)

还没有评论,来说两句吧...

相关阅读