MQTT - 订阅发布demo

àì夳堔傛蜴生んèń 2022-12-25 02:56 152阅读 0赞

本文中 MQTTBroker 基于 moquette 实现,本地安装 moquette 参见 MQTT - broker - 本地安装启动moquette

demo基于 MQTT Cllient 实现, mqtt-client项目代码参见: https://github.com/fusesource/mqtt-client

一 MQTT Server

自定义MQTTServer实现类,提供消息发布服务,即生产消息

  1. package org.fusesource.mqtt.client;
  2. /**
  3. * MQTT moquette 的Server 用于发布消息至指定topic
  4. */
  5. public class MQTTServer {
  6. public static void main(String[] args) {
  7. MQTT mqtt = new MQTT();
  8. BlockingConnection connection = null;
  9. try {
  10. //设置服务端的ip
  11. mqtt.setHost("tcp://localhost:1883");
  12. // mqtt.setHost("tcp://192.168.8.10:1883");
  13. // mqtt.setUserName("test");
  14. // mqtt.setPassword("test");
  15. //连接前清空会话信息
  16. mqtt.setCleanSession(true);
  17. //设置重新连接的次数
  18. mqtt.setReconnectAttemptsMax(6);
  19. //设置重连的间隔时间
  20. mqtt.setReconnectDelay(2000);
  21. //设置心跳时间 低耗网络,但是又需要及时获取数据,心跳30s
  22. mqtt.setKeepAlive((short)30);
  23. //设置缓冲的大小 最大2M
  24. mqtt.setSendBufferSize(2 * 1024 * 1024);
  25. //创建阻塞式连接
  26. connection = mqtt.blockingConnection();
  27. connection.connect();
  28. try {
  29. int count = 0;
  30. while (true) {
  31. count++;
  32. // 指定topic
  33. String topic = "test/test1";
  34. // 设置消息内容
  35. String message = "hello " + count;
  36. connection.publish(topic, message.getBytes(), QoS.EXACTLY_ONCE, false);
  37. System.out.println("MQTTServer receive message: Topic: " + topic + " Content :" + message);
  38. Thread.sleep(2000);
  39. }
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. }
  43. } catch (Exception e) {
  44. e.printStackTrace();
  45. } finally {
  46. try {
  47. connection.disconnect();
  48. } catch (Exception e) {
  49. e.printStackTrace();
  50. }
  51. }
  52. }
  53. }

二 MQTT Client

自定义 MQTTCllient类,实现消息订阅服务,即消费消息

  1. package org.fusesource.mqtt.client;
  2. /**
  3. * MQTT moquette 的Client 用于消费指定topic的消息
  4. */
  5. public class MQTTClient {
  6. public static void main(String[] args) {
  7. //创建MQTT对象
  8. MQTT mqtt = new MQTT();
  9. BlockingConnection connection = null;
  10. try {
  11. //设置mqtt broker的ip和端口
  12. mqtt.setHost("tcp://localhost:1883");
  13. // 设置用户名密码类型连接
  14. // mqtt.setHost("tcp://192.168.8.10:1883");
  15. // mqtt.setUserName("test");
  16. // mqtt.setPassword("test");
  17. // mqtt.setClientId("client_123");
  18. //连接前清空会话信息
  19. mqtt.setCleanSession(true);
  20. //设置重新连接的次数
  21. mqtt.setReconnectAttemptsMax(6);
  22. //设置重连的间隔时间 单位毫秒
  23. mqtt.setReconnectDelay(2000);
  24. //设置心跳时间 低耗网络,但是又需要及时获取数据,心跳30s
  25. mqtt.setKeepAlive((short)30);
  26. //设置缓冲的大小 最大2M
  27. mqtt.setSendBufferSize(2 * 1024 * 1024);
  28. //获取mqtt的连接对象BlockingConnection
  29. connection = mqtt.blockingConnection();
  30. //创建连接
  31. connection.connect();
  32. //创建相关的MQTT 的主题列表
  33. Topic[] topics = {new Topic("test/test1", QoS.AT_LEAST_ONCE)};
  34. //订阅相关的主题信息
  35. byte[] qoses = connection.subscribe(topics);
  36. //
  37. while (true) {
  38. System.out.println("waiting...");
  39. //接收订阅的消息内容
  40. Message message = connection.receive();
  41. System.out.println("received...");
  42. //获取订阅的消息内容
  43. byte[] payload = message.getPayload();
  44. //
  45. System.out.println("MQTTClient Message Topic:" + message.getTopic() + " Content :" + new String(payload));
  46. //确认消息回执
  47. message.ack();
  48. Thread.sleep(2000);
  49. }
  50. } catch (Exception e) {
  51. e.printStackTrace();
  52. } finally {
  53. try {
  54. connection.disconnect();
  55. } catch (Exception e) {
  56. e.printStackTrace();
  57. }
  58. }
  59. }
  60. }

三 本地运行

本地运行 MQTTServer.java,生产消息:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3N4ZzAyMDU_size_16_color_FFFFFF_t_70

本地运行MQTTClient.java,消费消息:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3N4ZzAyMDU_size_16_color_FFFFFF_t_70 1

发表评论

表情:
评论列表 (有 0 条评论,152人围观)

还没有评论,来说两句吧...

相关阅读