Java-MQTT客户端监控连接状态事件

约定不等于承诺〃 2021-06-24 13:57 664阅读 0赞

MQTT客户端监控连接状态事件

项目采用paho.mqtt.java客户端,需要监控连接状态变更事件,以进行异常维测和处理。

代码中提供了MqttCallback接口如下:

org.eclipse.paho.client.mqttv3-1.2.0-sources.jar!\org\eclipse\paho\client\mqttv3\MqttClient.java

  1. public void setCallback(MqttCallback callback) {
  2. aClient.setCallback(callback);
  3. }

其中MqttCallback 接口定义如下:

  1. public interface MqttCallback {
  2. /** 连接断连
  3. * This method is called when the connection to the server is lost.
  4. *
  5. * @param cause the reason behind the loss of connection.
  6. */
  7. public void connectionLost(Throwable cause);
  8. /** 消息收到回调的缺省回调,如果subscribe(String topicFilter, int qos, IMqttMessageListener messageListener)指定了 IMqttMessageListener 会覆盖该回调,也即:使用IMqttMessageListener 回调,MqttCallback 中的messageArrived回调无响应了
  9. * This method is called when a message arrives from the server.
  10. *
  11. * <p>
  12. * This method is invoked synchronously by the MQTT client. An
  13. * acknowledgment is not sent back to the server until this
  14. * method returns cleanly.</p>
  15. * <p>
  16. * If an implementation of this method throws an <code>Exception</code>, then the
  17. * client will be shut down. When the client is next re-connected, any QoS
  18. * 1 or 2 messages will be redelivered by the server.</p>
  19. * <p>
  20. * Any additional messages which arrive while an
  21. * implementation of this method is running, will build up in memory, and
  22. * will then back up on the network.</p>
  23. * <p>
  24. * If an application needs to persist data, then it
  25. * should ensure the data is persisted prior to returning from this method, as
  26. * after returning from this method, the message is considered to have been
  27. * delivered, and will not be reproducible.</p>
  28. * <p>
  29. * It is possible to send a new message within an implementation of this callback
  30. * (for example, a response to this message), but the implementation must not
  31. * disconnect the client, as it will be impossible to send an acknowledgment for
  32. * the message being processed, and a deadlock will occur.</p>
  33. *
  34. * @param topic name of the topic on the message was published to
  35. * @param message the actual message.
  36. * @throws Exception if a terminal error has occurred, and the client should be
  37. * shut down.
  38. */
  39. public void messageArrived(String topic, MqttMessage message) throws Exception;
  40. /** 消息publish完成回调
  41. * Called when delivery for a message has been completed, and all
  42. * acknowledgments have been received. For QoS 0 messages it is
  43. * called once the message has been handed to the network for
  44. * delivery. For QoS 1 it is called when PUBACK is received and
  45. * for QoS 2 when PUBCOMP is received. The token will be the same
  46. * token as that returned when the message was published.
  47. *
  48. * @param token the delivery token associated with the message.
  49. */
  50. public void deliveryComplete(IMqttDeliveryToken token);
  51. }

可以通过connectionLost回调进行断连提示,但是重连的状态回调呢?

(说明:可以通过mqttClient.connect(mqttConnectOptions);中的

  1. mqttConnectOptions.setAutomaticReconnect(true); // 自动重连

设置自动重连)

哈哈。paho.mqtt.java用了一个很有意思且巧妙的方法处理,如下:

  1. /**
  2. * Extension of {@link MqttCallback} to allow new callbacks
  3. * without breaking the API for existing applications.
  4. * Classes implementing this interface can be registered on
  5. * both types of client: {@link IMqttClient#setCallback(MqttCallback)}
  6. * and {@link IMqttAsyncClient#setCallback(MqttCallback)}
  7. */
  8. public interface MqttCallbackExtended extends MqttCallback {
  9. /**
  10. * Called when the connection to the server is completed successfully.
  11. * @param reconnect If true, the connection was the result of automatic reconnect.
  12. * @param serverURI The server URI that the connection was made to.
  13. */
  14. public void connectComplete(boolean reconnect, String serverURI);
  15. }
  16. 使用时调用:mqttClient.setCallback(callback); 传入的callback采用 MqttCallbackExtended 的实现类就可以得到 connectComplete回调了。具体实现可以详见代码。
  17. 一句话:MqttCallbackExtended 应该是后期扩展的接口,为了保持接口不变及已有代码的功能,就对原有MqttCallback接口进行了继承扩展,内部实现检测到传入的callback也是MqttCallbackExtended 的实例,就会回调 connectComplete 接口了。
  18. // If we are using the MqttCallbackExtended, set it on the
  19. // connectActionListener
  20. if (this.mqttCallback instanceof MqttCallbackExtended) {
  21. connectActionListener.setMqttCallbackExtended((MqttCallbackExtended) this.mqttCallback);
  22. }

可以谓之为: “扩展模式”吧!

补充说明一点:MqttCallback 中的messageArrived回调是消息收到回调的缺省回调,如果subscribe(String topicFilter, int qos, IMqttMessageListener messageListener)指定了 IMqttMessageListener 会覆盖该回调,也即:使用IMqttMessageListener 回调,MqttCallback 中的messageArrived回调无响应了

  1. /* (non-Javadoc)
  2. * @see org.eclipse.paho.client.mqttv3.IMqttClient#subscribe(java.lang.String, int)
  3. */
  4. public void subscribe(String topicFilter, int qos, IMqttMessageListener messageListener) throws MqttException {
  5. this.subscribe(new String[] {topicFilter}, new int[] {qos}, new IMqttMessageListener[] {messageListener});
  6. }

具体原因:详见代码。

org.eclipse.paho.client.mqttv3-1.2.0-sources.jar!\org\eclipse\paho\client\mqttv3\internal\CommsCallback.java

  1. protected boolean deliverMessage(String topicName, int messageId, MqttMessage aMessage) throws Exception
  2. {
  3. boolean delivered = false;
  4. Enumeration keys = callbacks.keys();
  5. while (keys.hasMoreElements()) {
  6. String topicFilter = (String)keys.nextElement();
  7. if (MqttTopic.isMatched(topicFilter, topicName)) {
  8. aMessage.setId(messageId);
  9. ((IMqttMessageListener)(callbacks.get(topicFilter))).messageArrived(topicName, aMessage);
  10. delivered = true;
  11. }
  12. }
  13. /* if the message hasn't been delivered to a per subscription handler, give it to the default handler */
  14. if (mqttCallback != null && !delivered) {
  15. aMessage.setId(messageId);
  16. mqttCallback.messageArrived(topicName, aMessage);
  17. delivered = true;
  18. }
  19. return delivered;
  20. }

OK。验证如下:

  1. taskClient.getMqttClient().setCallback(new MqttCallbackExtended() {
  2. @Override
  3. public void connectComplete(boolean reconnect, String serverURI) {
  4. AlLog.getLogger().warn("TaskClient connectComplete reconnect={} serverURI={}", reconnect, serverURI);
  5. }
  6. @Override
  7. public void connectionLost(Throwable cause) {
  8. AlLog.getLogger().error("TaskClient connectionLost " + taskClient.isConnected() + " cause:", cause);
  9. }
  10. @Override
  11. public void messageArrived(String topic, MqttMessage message) throws Exception {
  12. AlLog.getLogger().debug("TaskClient messageArrived message:{}", message);
  13. }
  14. @Override
  15. public void deliveryComplete(IMqttDeliveryToken token) {
  16. AlLog.getLogger().debug("TaskClient deliveryComplete token:{}", token);
  17. }
  18. });

日志:

02-28 14:45:09.668 E/29252.64,MQTT Ping: devXXX SubscribeTask$1.connectionLost.41: TaskClient connectionLost false cause:
02-28 14:45:10.738 W/29252.63,MQTT Call: devXXX SubscribeTask$1.connectComplete.63: TaskClient connectComplete reconnect=true serverURI=tcp://XXX.XXX.XXX.XXX:1883
02-28 14:45:26.514 W/29252.35,Timer-0 SubscribeTask$3.run.93: TimerTask taskClient isConnected true

发表评论

表情:
评论列表 (有 0 条评论,664人围观)

还没有评论,来说两句吧...

相关阅读