MQTT fusesource 的 Callback API 消息发布的实现

╰+攻爆jí腚メ 2022-08-14 04:51 159阅读 0赞

当QoS level 1的时候 发布消息时,Server会回调通知客户端。fusesource 有实现带callback的发布方式。

所有QoS level 1都要在可变头部中附加一个16位的消息ID。

SUBSCRIBE和UNSUBSCRIBE消息使用QoS level 1。

针对消息的发布,Qos level 1,意味着消息至少被传输一次。

发送者若在一段时间内接收不到PUBACK消息,发送者需要打开DUB标记为1,然后重新发送PUBLISH消息。因此会导致接收方可能会收到两次PUBLISH消息。针对客户端发布消息到服务器的消息流:





















Client Message and direction Server
QoS = 1
DUP = 0
Message ID = x

Action: Store message

PUBLISH 
—————>


Actions:

  • Store message


  • Publish message to subscribers

  • Delete message





Reception: >=1
Action: Discard message PUBACK 
<—————
Message ID = x

针对服务器发布到订阅者的消息流:





















Server Message and direction Subscriber
QoS = 1
DUP = 0
Message ID = x
PUBLISH 
—————>


Actions:

  • Store message


  • Make message available                       




Reception: >=1

PUBACK 
<—————
Message ID = x

发布者(客户端/服务器)若因种种异常接收不到PUBACK消息,会再次重新发送PUBLISH消息,同时设置DUP标记为1。接收者以服务器为例,这可能会导致服务器收到重复消息,按照流程,broker(服务器)发布消息到订阅者(会导致订阅者接收到重复消息),然后发送一条PUBACK确认消息到发布者。

在业务层面,或许可以弥补MQTT协议的不足之处:重试的消息ID一定要一致接收方一定判断当前接收的消息ID是否已经接受过

但一样不能够完全确保,消息一定到达了。

  1. import org.fusesource.hawtbuf.Buffer;
  2. import org.fusesource.hawtbuf.UTF8Buffer;
  3. import org.fusesource.mqtt.client.*;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import java.net.URISyntaxException;
  7. /**
  8. * Created by tiantao on 14-12-2.
  9. */
  10. public class MQTTCallbackServer {
  11. private static final Logger LOG = LoggerFactory.getLogger(MQTTCallbackServer.class);
  12. private final static String CONNECTION_STRING = "tcp://localhost:1883";
  13. private final static boolean CLEAN_START = true;
  14. private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s
  15. public final static long RECONNECTION_ATTEMPT_MAX=6;
  16. public final static long RECONNECTION_DELAY=2000;
  17. public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M
  18. public static void main(String[] args) {
  19. //创建MQTT对象
  20. MQTT mqtt = new MQTT();
  21. try {
  22. //设置mqtt broker的ip和端口
  23. mqtt.setHost("localhost", 1883);
  24. //连接前清空会话信息
  25. mqtt.setCleanSession(CLEAN_START);
  26. //设置重新连接的次数
  27. mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
  28. //设置重连的间隔时间
  29. mqtt.setReconnectDelay(RECONNECTION_DELAY);
  30. //设置心跳时间
  31. mqtt.setKeepAlive(KEEP_ALIVE);
  32. //设置缓冲的大小
  33. mqtt.setSendBufferSize(SEND_BUFFER_SIZE);
  34. //获取mqtt的连接对象BlockingConnection
  35. final CallbackConnection connection = mqtt.callbackConnection();
  36. //添加连接的监听事件
  37. connection.listener(new Listener() {
  38. public void onDisconnected() {
  39. }
  40. public void onConnected() {
  41. }
  42. public void onPublish(UTF8Buffer topic, Buffer payload, Runnable ack) {
  43. // You can now process a received message from a topic.
  44. // Once process execute the ack runnable.
  45. ack.run();
  46. System.out.println("topic"+topic.toString()+"="+new String(payload.getData()));
  47. }
  48. public void onFailure(Throwable value) {
  49. }
  50. });
  51. //添加连接事件
  52. connection.connect(new Callback<Void>() {
  53. /**
  54. * 连接失败的操作
  55. */
  56. public void onFailure(Throwable value) {
  57. // If we could not connect to the server.
  58. System.out.println("MQTTCallbackServer.CallbackConnection.connect.onFailure"+"连接失败......"+value.getMessage());
  59. value.printStackTrace();
  60. }
  61. /**
  62. * 连接成功的操作
  63. * @param v
  64. */
  65. public void onSuccess(Void v) {
  66. int count=1;
  67. // 用于发布消息,目前手机段不需要向服务端发送消息
  68. //主题的内容
  69. final String message="hello "+count+"chinese people !";
  70. final String topic = "tokudu/aedb222fdf736c94";
  71. System.out.println("MQTTCallbackServer publish topic="+topic+" message :"+message);
  72. connection.publish(topic, message.getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
  73. public void onSuccess(Void v) {
  74. System.out.println("OK");
  75. // the pubish operation completed successfully.
  76. }
  77. public void onFailure(Throwable value) {
  78. value.printStackTrace();
  79. }
  80. });
  81. try {
  82. Thread.sleep(2000);
  83. } catch (InterruptedException e) {
  84. // TODO Auto-generated catch block
  85. e.printStackTrace();
  86. }
  87. }
  88. });
  89. } catch (URISyntaxException e) {
  90. // TODO Auto-generated catch block
  91. e.printStackTrace();
  92. } catch (Exception e) {
  93. // TODO Auto-generated catch block
  94. e.printStackTrace();
  95. }finally{
  96. }
  97. }
  98. }

发表评论

表情:
评论列表 (有 0 条评论,159人围观)

还没有评论,来说两句吧...

相关阅读