MQTT fusesource 的 Callback API 消息发布的实现
当QoS level 1的时候 发布消息时,Server会回调通知客户端。fusesource 有实现带callback的发布方式。
所有QoS level 1都要在可变头部中附加一个16位的消息ID。
SUBSCRIBE和UNSUBSCRIBE消息使用QoS level 1。
针对消息的发布,Qos level 1,意味着消息至少被传输一次。
发送者若在一段时间内接收不到PUBACK消息,发送者需要打开DUB标记为1,然后重新发送PUBLISH消息。因此会导致接收方可能会收到两次PUBLISH消息。针对客户端发布消息到服务器的消息流:
Client | Message and direction | Server |
---|---|---|
QoS = 1 DUP = 0 Message ID = x Action: Store message | PUBLISH —————> | Actions:
Reception: >=1 |
Action: Discard message | PUBACK <————— | Message ID = x |
针对服务器发布到订阅者的消息流:
Server | Message and direction | Subscriber |
---|---|---|
QoS = 1 DUP = 0 Message ID = x | PUBLISH —————> | Actions:
Reception: >=1 |
PUBACK <————— | Message ID = x |
发布者(客户端/服务器)若因种种异常接收不到PUBACK消息,会再次重新发送PUBLISH消息,同时设置DUP标记为1。接收者以服务器为例,这可能会导致服务器收到重复消息,按照流程,broker(服务器)发布消息到订阅者(会导致订阅者接收到重复消息),然后发送一条PUBACK确认消息到发布者。
在业务层面,或许可以弥补MQTT协议的不足之处:重试的消息ID一定要一致接收方一定判断当前接收的消息ID是否已经接受过
但一样不能够完全确保,消息一定到达了。
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URISyntaxException;
/**
* Created by tiantao on 14-12-2.
*/
public class MQTTCallbackServer {
private static final Logger LOG = LoggerFactory.getLogger(MQTTCallbackServer.class);
private final static String CONNECTION_STRING = "tcp://localhost:1883";
private final static boolean CLEAN_START = true;
private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s
public final static long RECONNECTION_ATTEMPT_MAX=6;
public final static long RECONNECTION_DELAY=2000;
public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M
public static void main(String[] args) {
//创建MQTT对象
MQTT mqtt = new MQTT();
try {
//设置mqtt broker的ip和端口
mqtt.setHost("localhost", 1883);
//连接前清空会话信息
mqtt.setCleanSession(CLEAN_START);
//设置重新连接的次数
mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
//设置重连的间隔时间
mqtt.setReconnectDelay(RECONNECTION_DELAY);
//设置心跳时间
mqtt.setKeepAlive(KEEP_ALIVE);
//设置缓冲的大小
mqtt.setSendBufferSize(SEND_BUFFER_SIZE);
//获取mqtt的连接对象BlockingConnection
final CallbackConnection connection = mqtt.callbackConnection();
//添加连接的监听事件
connection.listener(new Listener() {
public void onDisconnected() {
}
public void onConnected() {
}
public void onPublish(UTF8Buffer topic, Buffer payload, Runnable ack) {
// You can now process a received message from a topic.
// Once process execute the ack runnable.
ack.run();
System.out.println("topic"+topic.toString()+"="+new String(payload.getData()));
}
public void onFailure(Throwable value) {
}
});
//添加连接事件
connection.connect(new Callback<Void>() {
/**
* 连接失败的操作
*/
public void onFailure(Throwable value) {
// If we could not connect to the server.
System.out.println("MQTTCallbackServer.CallbackConnection.connect.onFailure"+"连接失败......"+value.getMessage());
value.printStackTrace();
}
/**
* 连接成功的操作
* @param v
*/
public void onSuccess(Void v) {
int count=1;
// 用于发布消息,目前手机段不需要向服务端发送消息
//主题的内容
final String message="hello "+count+"chinese people !";
final String topic = "tokudu/aedb222fdf736c94";
System.out.println("MQTTCallbackServer publish topic="+topic+" message :"+message);
connection.publish(topic, message.getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
public void onSuccess(Void v) {
System.out.println("OK");
// the pubish operation completed successfully.
}
public void onFailure(Throwable value) {
value.printStackTrace();
}
});
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
} catch (URISyntaxException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
}
}
}
还没有评论,来说两句吧...