Java-MQTT客户端监控连接状态事件
MQTT客户端监控连接状态事件
项目采用paho.mqtt.java客户端,需要监控连接状态变更事件,以进行异常维测和处理。
代码中提供了MqttCallback接口如下:
org.eclipse.paho.client.mqttv3-1.2.0-sources.jar!\org\eclipse\paho\client\mqttv3\MqttClient.java
public void setCallback(MqttCallback callback) {
aClient.setCallback(callback);
}
其中MqttCallback 接口定义如下:
public interface MqttCallback {
/** 连接断连
* This method is called when the connection to the server is lost.
*
* @param cause the reason behind the loss of connection.
*/
public void connectionLost(Throwable cause);
/** 消息收到回调的缺省回调,如果subscribe(String topicFilter, int qos, IMqttMessageListener messageListener)指定了 IMqttMessageListener 会覆盖该回调,也即:使用IMqttMessageListener 回调,MqttCallback 中的messageArrived回调无响应了
* This method is called when a message arrives from the server.
*
* <p>
* This method is invoked synchronously by the MQTT client. An
* acknowledgment is not sent back to the server until this
* method returns cleanly.</p>
* <p>
* If an implementation of this method throws an <code>Exception</code>, then the
* client will be shut down. When the client is next re-connected, any QoS
* 1 or 2 messages will be redelivered by the server.</p>
* <p>
* Any additional messages which arrive while an
* implementation of this method is running, will build up in memory, and
* will then back up on the network.</p>
* <p>
* If an application needs to persist data, then it
* should ensure the data is persisted prior to returning from this method, as
* after returning from this method, the message is considered to have been
* delivered, and will not be reproducible.</p>
* <p>
* It is possible to send a new message within an implementation of this callback
* (for example, a response to this message), but the implementation must not
* disconnect the client, as it will be impossible to send an acknowledgment for
* the message being processed, and a deadlock will occur.</p>
*
* @param topic name of the topic on the message was published to
* @param message the actual message.
* @throws Exception if a terminal error has occurred, and the client should be
* shut down.
*/
public void messageArrived(String topic, MqttMessage message) throws Exception;
/** 消息publish完成回调
* Called when delivery for a message has been completed, and all
* acknowledgments have been received. For QoS 0 messages it is
* called once the message has been handed to the network for
* delivery. For QoS 1 it is called when PUBACK is received and
* for QoS 2 when PUBCOMP is received. The token will be the same
* token as that returned when the message was published.
*
* @param token the delivery token associated with the message.
*/
public void deliveryComplete(IMqttDeliveryToken token);
}
可以通过connectionLost回调进行断连提示,但是重连的状态回调呢?
(说明:可以通过mqttClient.connect(mqttConnectOptions);中的
mqttConnectOptions.setAutomaticReconnect(true); // 自动重连
设置自动重连)
哈哈。paho.mqtt.java用了一个很有意思且巧妙的方法处理,如下:
/**
* Extension of {@link MqttCallback} to allow new callbacks
* without breaking the API for existing applications.
* Classes implementing this interface can be registered on
* both types of client: {@link IMqttClient#setCallback(MqttCallback)}
* and {@link IMqttAsyncClient#setCallback(MqttCallback)}
*/
public interface MqttCallbackExtended extends MqttCallback {
/**
* Called when the connection to the server is completed successfully.
* @param reconnect If true, the connection was the result of automatic reconnect.
* @param serverURI The server URI that the connection was made to.
*/
public void connectComplete(boolean reconnect, String serverURI);
}
使用时调用:mqttClient.setCallback(callback); 传入的callback采用 MqttCallbackExtended 的实现类就可以得到 connectComplete回调了。具体实现可以详见代码。
一句话:MqttCallbackExtended 应该是后期扩展的接口,为了保持接口不变及已有代码的功能,就对原有MqttCallback接口进行了继承扩展,内部实现检测到传入的callback也是MqttCallbackExtended 的实例,就会回调 connectComplete 接口了。
// If we are using the MqttCallbackExtended, set it on the
// connectActionListener
if (this.mqttCallback instanceof MqttCallbackExtended) {
connectActionListener.setMqttCallbackExtended((MqttCallbackExtended) this.mqttCallback);
}
可以谓之为: “扩展模式”吧!
补充说明一点:MqttCallback 中的messageArrived回调是消息收到回调的缺省回调,如果subscribe(String topicFilter, int qos, IMqttMessageListener messageListener)指定了 IMqttMessageListener 会覆盖该回调,也即:使用IMqttMessageListener 回调,MqttCallback 中的messageArrived回调无响应了
/* (non-Javadoc)
* @see org.eclipse.paho.client.mqttv3.IMqttClient#subscribe(java.lang.String, int)
*/
public void subscribe(String topicFilter, int qos, IMqttMessageListener messageListener) throws MqttException {
this.subscribe(new String[] {topicFilter}, new int[] {qos}, new IMqttMessageListener[] {messageListener});
}
具体原因:详见代码。
org.eclipse.paho.client.mqttv3-1.2.0-sources.jar!\org\eclipse\paho\client\mqttv3\internal\CommsCallback.java
protected boolean deliverMessage(String topicName, int messageId, MqttMessage aMessage) throws Exception
{
boolean delivered = false;
Enumeration keys = callbacks.keys();
while (keys.hasMoreElements()) {
String topicFilter = (String)keys.nextElement();
if (MqttTopic.isMatched(topicFilter, topicName)) {
aMessage.setId(messageId);
((IMqttMessageListener)(callbacks.get(topicFilter))).messageArrived(topicName, aMessage);
delivered = true;
}
}
/* if the message hasn't been delivered to a per subscription handler, give it to the default handler */
if (mqttCallback != null && !delivered) {
aMessage.setId(messageId);
mqttCallback.messageArrived(topicName, aMessage);
delivered = true;
}
return delivered;
}
OK。验证如下:
taskClient.getMqttClient().setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
AlLog.getLogger().warn("TaskClient connectComplete reconnect={} serverURI={}", reconnect, serverURI);
}
@Override
public void connectionLost(Throwable cause) {
AlLog.getLogger().error("TaskClient connectionLost " + taskClient.isConnected() + " cause:", cause);
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
AlLog.getLogger().debug("TaskClient messageArrived message:{}", message);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
AlLog.getLogger().debug("TaskClient deliveryComplete token:{}", token);
}
});
日志:
02-28 14:45:09.668 E/29252.64,MQTT Ping: devXXX SubscribeTask$1.connectionLost.41: TaskClient connectionLost false cause:
02-28 14:45:10.738 W/29252.63,MQTT Call: devXXX SubscribeTask$1.connectComplete.63: TaskClient connectComplete reconnect=true serverURI=tcp://XXX.XXX.XXX.XXX:1883
02-28 14:45:26.514 W/29252.35,Timer-0 SubscribeTask$3.run.93: TimerTask taskClient isConnected true
还没有评论,来说两句吧...