分布式MQTT消息订阅-发布框架:高可用性ActiveMQ

深碍√TFBOYSˉ_ 2024-02-19 20:25 113阅读 0赞

分布式MQTT消息订阅-发布框架:高可用性ActiveMQ

ActiveMQ是MQTT的一种实现。ActiveMQ基于JMS。ActiveMQ开发包下载地址:http://activemq.apache.org/download.html

ActiveMQ最适合做消息推送。国内很多厂商基于ActiveMQ改造出多种消息推送平台。下载完成ActiveMQ压缩包后解压,解压后直接在Java环境中运行apache-activemq-5.15.9\bin\win64\activemq.bat。运行后,如图:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly96aGFuZ3BoaWwuYmxvZy5jc2RuLm5ldA_size_16_color_FFFFFF_t_70

此时可以通过网页

http://localhost:8161/admin/

观察ActiveMQ的控制台,用户名和密码默认都是admin。

下面做一个消息发送的例子:

  1. //消息生产,生产5条持久化消息,发送、存放到远程服务器ActiveMQ队列。
  2. public static void main(String[] args){
  3. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
  4. ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
  5. try {
  6. Connection connection = connectionFactory.createConnection();
  7. Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
  8. Destination destination = session.createQueue("fly");
  9. MessageProducer messageProducer = session.createProducer(destination);
  10. //发送的消息将持久化保存到ActiveMQ,直到有接收者消费掉。
  11. messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
  12. for (int i = 0; i < 5; i++) {
  13. TextMessage textMessage = session.createTextMessage("fly:" + i + " " + System.currentTimeMillis());
  14. messageProducer.send(textMessage);
  15. System.out.println("发送消息:" + textMessage);
  16. }
  17. session.commit();
  18. } catch (Exception e) {
  19. e.printStackTrace();
  20. }
  21. }

运行输出:

  1. 发送消息:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:P80253699-65130-1562327065803-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://fly, transactionId = TX:ID:P80253699-65130-1562327065803-1:1:1, expiration = 0, timestamp = 1562327066115, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = fly:0 1562327066115}
  2. 发送消息:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:P80253699-65130-1562327065803-1:1:1:1:2, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://fly, transactionId = TX:ID:P80253699-65130-1562327065803-1:1:1, expiration = 0, timestamp = 1562327066209, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = fly:1 1562327066209}
  3. 发送消息:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:P80253699-65130-1562327065803-1:1:1:1:3, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://fly, transactionId = TX:ID:P80253699-65130-1562327065803-1:1:1, expiration = 0, timestamp = 1562327066209, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = fly:2 1562327066209}
  4. 发送消息:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:P80253699-65130-1562327065803-1:1:1:1:4, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://fly, transactionId = TX:ID:P80253699-65130-1562327065803-1:1:1, expiration = 0, timestamp = 1562327066209, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = fly:3 1562327066209}
  5. 发送消息:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:P80253699-65130-1562327065803-1:1:1:1:5, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://fly, transactionId = TX:ID:P80253699-65130-1562327065803-1:1:1, expiration = 0, timestamp = 1562327066209, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = fly:4 1562327066209}

再写一个消息消费者:

  1. //接收消息。从ActiveMQ远程服务器拉取消息。
  2. public static void main(String[] args) {
  3. try {
  4. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
  5. Connection connection = connectionFactory.createConnection();
  6. connection.start();
  7. Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
  8. Destination destination = session.createQueue("fly");
  9. MessageConsumer messageConsumer = session.createConsumer(destination);
  10. messageConsumer.setMessageListener(new MessageListener() {
  11. @Override
  12. public void onMessage(Message message) {
  13. TextMessage textMessage = (TextMessage) message;
  14. System.out.println("收到消息:" + textMessage);
  15. }
  16. });
  17. session.commit();
  18. } catch (Exception e) {
  19. e.printStackTrace();
  20. }
  21. }

运行输出:

  1. 收到消息:ActiveMQTextMessage {commandId = 6, responseRequired = false, messageId = ID:P80253699-65130-1562327065803-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:P80253699-65130-1562327065803-1:1:1:1, destination = queue://fly, transactionId = TX:ID:P80253699-65130-1562327065803-1:1:1, expiration = 0, timestamp = 1562327066115, arrival = 0, brokerInTime = 1562327066131, brokerOutTime = 1562327192793, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@78acc912, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = fly:0 1562327066115}
  2. 收到消息:ActiveMQTextMessage {commandId = 7, responseRequired = false, messageId = ID:P80253699-65130-1562327065803-1:1:1:1:2, originalDestination = null, originalTransactionId = null, producerId = ID:P80253699-65130-1562327065803-1:1:1:1, destination = queue://fly, transactionId = TX:ID:P80253699-65130-1562327065803-1:1:1, expiration = 0, timestamp = 1562327066209, arrival = 0, brokerInTime = 1562327066209, brokerOutTime = 1562327192793, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@56c6daaf, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = fly:1 1562327066209}
  3. 收到消息:ActiveMQTextMessage {commandId = 8, responseRequired = false, messageId = ID:P80253699-65130-1562327065803-1:1:1:1:3, originalDestination = null, originalTransactionId = null, producerId = ID:P80253699-65130-1562327065803-1:1:1:1, destination = queue://fly, transactionId = TX:ID:P80253699-65130-1562327065803-1:1:1, expiration = 0, timestamp = 1562327066209, arrival = 0, brokerInTime = 1562327066209, brokerOutTime = 1562327192793, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@13733638, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = fly:2 1562327066209}
  4. 收到消息:ActiveMQTextMessage {commandId = 9, responseRequired = false, messageId = ID:P80253699-65130-1562327065803-1:1:1:1:4, originalDestination = null, originalTransactionId = null, producerId = ID:P80253699-65130-1562327065803-1:1:1:1, destination = queue://fly, transactionId = TX:ID:P80253699-65130-1562327065803-1:1:1, expiration = 0, timestamp = 1562327066209, arrival = 0, brokerInTime = 1562327066209, brokerOutTime = 1562327192793, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@553a6f8f, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = fly:3 1562327066209}
  5. 收到消息:ActiveMQTextMessage {commandId = 10, responseRequired = false, messageId = ID:P80253699-65130-1562327065803-1:1:1:1:5, originalDestination = null, originalTransactionId = null, producerId = ID:P80253699-65130-1562327065803-1:1:1:1, destination = queue://fly, transactionId = TX:ID:P80253699-65130-1562327065803-1:1:1, expiration = 0, timestamp = 1562327066209, arrival = 0, brokerInTime = 1562327066209, brokerOutTime = 1562327192793, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@4b40c863, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = fly:4 1562327066209}

附:

ActiveMQ安装配置和使用简例

https://blog.csdn.net/zhangphil/article/details/48173665

发表评论

表情:
评论列表 (有 0 条评论,113人围观)

还没有评论,来说两句吧...

相关阅读