分布式MQTT消息订阅-发布框架:高可用性ActiveMQ
分布式MQTT消息订阅-发布框架:高可用性ActiveMQ
ActiveMQ是MQTT的一种实现。ActiveMQ基于JMS。ActiveMQ开发包下载地址:http://activemq.apache.org/download.html
ActiveMQ最适合做消息推送。国内很多厂商基于ActiveMQ改造出多种消息推送平台。下载完成ActiveMQ压缩包后解压,解压后直接在Java环境中运行apache-activemq-5.15.9\bin\win64\activemq.bat。运行后,如图:
此时可以通过网页
http://localhost:8161/admin/
观察ActiveMQ的控制台,用户名和密码默认都是admin。
下面做一个消息发送的例子:
//消息生产,生产5条持久化消息,发送、存放到远程服务器ActiveMQ队列。
public static void main(String[] args){
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
try {
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("fly");
MessageProducer messageProducer = session.createProducer(destination);
//发送的消息将持久化保存到ActiveMQ,直到有接收者消费掉。
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < 5; i++) {
TextMessage textMessage = session.createTextMessage("fly:" + i + " " + System.currentTimeMillis());
messageProducer.send(textMessage);
System.out.println("发送消息:" + textMessage);
}
session.commit();
} catch (Exception e) {
e.printStackTrace();
}
}
运行输出:
发送消息:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:P80253699-65130-1562327065803-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://fly, transactionId = TX:ID:P80253699-65130-1562327065803-1:1:1, expiration = 0, timestamp = 1562327066115, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = fly:0 1562327066115}
发送消息:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:P80253699-65130-1562327065803-1:1:1:1:2, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://fly, transactionId = TX:ID:P80253699-65130-1562327065803-1:1:1, expiration = 0, timestamp = 1562327066209, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = fly:1 1562327066209}
发送消息:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:P80253699-65130-1562327065803-1:1:1:1:3, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://fly, transactionId = TX:ID:P80253699-65130-1562327065803-1:1:1, expiration = 0, timestamp = 1562327066209, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = fly:2 1562327066209}
发送消息:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:P80253699-65130-1562327065803-1:1:1:1:4, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://fly, transactionId = TX:ID:P80253699-65130-1562327065803-1:1:1, expiration = 0, timestamp = 1562327066209, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = fly:3 1562327066209}
发送消息:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:P80253699-65130-1562327065803-1:1:1:1:5, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://fly, transactionId = TX:ID:P80253699-65130-1562327065803-1:1:1, expiration = 0, timestamp = 1562327066209, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = fly:4 1562327066209}
再写一个消息消费者:
//接收消息。从ActiveMQ远程服务器拉取消息。
public static void main(String[] args) {
try {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("fly");
MessageConsumer messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
System.out.println("收到消息:" + textMessage);
}
});
session.commit();
} catch (Exception e) {
e.printStackTrace();
}
}
运行输出:
收到消息:ActiveMQTextMessage {commandId = 6, responseRequired = false, messageId = ID:P80253699-65130-1562327065803-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:P80253699-65130-1562327065803-1:1:1:1, destination = queue://fly, transactionId = TX:ID:P80253699-65130-1562327065803-1:1:1, expiration = 0, timestamp = 1562327066115, arrival = 0, brokerInTime = 1562327066131, brokerOutTime = 1562327192793, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@78acc912, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = fly:0 1562327066115}
收到消息:ActiveMQTextMessage {commandId = 7, responseRequired = false, messageId = ID:P80253699-65130-1562327065803-1:1:1:1:2, originalDestination = null, originalTransactionId = null, producerId = ID:P80253699-65130-1562327065803-1:1:1:1, destination = queue://fly, transactionId = TX:ID:P80253699-65130-1562327065803-1:1:1, expiration = 0, timestamp = 1562327066209, arrival = 0, brokerInTime = 1562327066209, brokerOutTime = 1562327192793, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@56c6daaf, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = fly:1 1562327066209}
收到消息:ActiveMQTextMessage {commandId = 8, responseRequired = false, messageId = ID:P80253699-65130-1562327065803-1:1:1:1:3, originalDestination = null, originalTransactionId = null, producerId = ID:P80253699-65130-1562327065803-1:1:1:1, destination = queue://fly, transactionId = TX:ID:P80253699-65130-1562327065803-1:1:1, expiration = 0, timestamp = 1562327066209, arrival = 0, brokerInTime = 1562327066209, brokerOutTime = 1562327192793, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@13733638, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = fly:2 1562327066209}
收到消息:ActiveMQTextMessage {commandId = 9, responseRequired = false, messageId = ID:P80253699-65130-1562327065803-1:1:1:1:4, originalDestination = null, originalTransactionId = null, producerId = ID:P80253699-65130-1562327065803-1:1:1:1, destination = queue://fly, transactionId = TX:ID:P80253699-65130-1562327065803-1:1:1, expiration = 0, timestamp = 1562327066209, arrival = 0, brokerInTime = 1562327066209, brokerOutTime = 1562327192793, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@553a6f8f, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = fly:3 1562327066209}
收到消息:ActiveMQTextMessage {commandId = 10, responseRequired = false, messageId = ID:P80253699-65130-1562327065803-1:1:1:1:5, originalDestination = null, originalTransactionId = null, producerId = ID:P80253699-65130-1562327065803-1:1:1:1, destination = queue://fly, transactionId = TX:ID:P80253699-65130-1562327065803-1:1:1, expiration = 0, timestamp = 1562327066209, arrival = 0, brokerInTime = 1562327066209, brokerOutTime = 1562327192793, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@4b40c863, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = fly:4 1562327066209}
附:
ActiveMQ安装配置和使用简例
https://blog.csdn.net/zhangphil/article/details/48173665
还没有评论,来说两句吧...