当前位置: 首页 > news >正文

Android 消息队列之MQTT的使用(二):会话+消息过期机制,设备远程控制,批量控制实现

目录

一、实际应用场景

  1. 室内温湿度数据上传
  2. 设备远程控制
  3. 批量控制实现

二、会话管理、消息过期设置

4.1 会话管理

  1. Clean Session参数
    • 新旧会话模式对比
    • 典型应用场景

4.2 消息过期设置

  1. MQTT 5.0消息过期机制
    • Message Expiry Interval属性
    • QoS级别影响

三、实际应用场景

3.1 室内温湿度数据上传

应该如何设计呢?比如我现在有十台设备,数据都要上传到后台服务器:

  1. 主题:比如你温湿度数据发送到哪个主题,后台服务器这边就会订阅这个主题,
  2. 数据:数据里面需要保护设备的唯一号,这样服务器才知道是哪台设备发上来的。
# 设备端:每隔30秒发布数据
client.publish("envmon/D001/sensor", json.dumps(data), qos=1)# 服务端:订阅所有设备的传感器主题,使用通配符
mosquitto_sub -t "envmon/+/sensor" -v

3.2 后台服务器重启温湿度器

比如有时候,想远程在移动端或web端,关掉温湿度器,那么应该如何设计呢?

  1. 主题:客户端这边订阅指定的主题,一般是以设备唯一号作为主题,后台会往这个主题里面发送消息。这样就能精准的发送到指定的设备上去。
  2. 数据:数据格式,{功能码:开关码}。因为可能会有很多远程功能,所以我们需要设计功能码。
服务端:
client.publish("envmon/D001/control", xxx, qos=2)客户端:
client.subscribe("envmon/D001/control", qos=2)

3.2 如果我想实现批量重启呢?或者批量关闭温湿度器呢?

客户端可以订阅多个主题,通过多次调用 subscribe() 方法实现多主题订阅。

// 订阅温湿度数据主题
subscribe("envmon/D001/sensor", 1);
subscribe("envmon/D002/sensor", 1);// 订阅设备控制主题
subscribe("envmon/D001/control", 2);
subscribe("envmon/D002/control", 2);

这里,我们要实现的批量,可以使用通配符

  1. 客户端
// 订阅所有设备的传感器数据(envmon/任何设备ID/sensor)
subscribe("envmon/+/sensor", 1); 
  1. 服务端:发送所有设备就都能收到。
envmon/D001/sensor

如果说,要分组,或者勾选,也可以订阅单条的来进行发送。

四、会话、消息过期设置

4.1 会话

会话,指的是MQTT客户端与MQTT服务端之间的连接。

会话一般可以设置两个参数

  1. 每次打开是否是一个全新的会话,还是延续上一次的会话。
  2. 会话的状态会维持多久。

参数一:是否延续会话(Clean Session)​

​两种模式对比​
场景Clean Session=true(新会话)Clean Session=false(延续会话)
设备断网后重连丢失所有订阅和未读消息自动恢复订阅列表和离线期间消息
适用设备临时设备(如一次性传感器)重要设备(如智能门锁)
服务器资源消耗高(需存储消息)
典型代码场景setCleanSession(true)setCleanSession(false)

实际案例:​
智能门锁(Clean Session=false):

  1. 门锁断网时,用户通过APP发送开锁指令
  2. 服务器将指令暂存在「会话快递柜」
  3. 门锁网络恢复后,立即收到开锁指令

​参数二:会话有效期(Session Expiry)​

​设置方式​
// 设置会话保留时间为1小时(单位:秒)
options.setSessionExpiryInterval(3600); 
​关键行为规则​
有效期设置服务器行为
0关闭连接立即销毁会话(等同 Clean Session=true)
3600(1小时)设备离线后,会话保留1小时,超时后自动清除
0xFFFFFFFF永久保留会话(慎用!会导致服务器内存泄漏)

也就是,即使Clean Session=false(延续会话),会话有效期设置为60秒,如果离线超过60秒,那么也会清空之前的消息。

4.2 消息过期设置

MQTT 5.0 协议新增了 ​Message Expiry Interval​ 属性,可直接设置消息过期时间。

import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;// 创建消息并设置过期时间为60秒
MqttMessage message = new MqttMessage();
message.setPayload("Hello".getBytes());
message.setQos(1);// MQTT 5.0 属性设置
MqttProperties properties = new MqttProperties();
properties.setMessageExpiryInterval(60L); // 单位:秒
message.setProperties(properties);// 发布消息
client.publish("topic/commands", message);
2. 服务器行为(需支持MQTT5.0)
  • 消息在服务器存储超过设定时间后自动删除
  • 过期消息不会转发给订阅者
  • QoS 2 消息在过期前会完成确认流程

相关文章:

  • JavaScript高级进阶(四)
  • Crusader Kings III 王国风云 3(十字军之王 3) [DLC 解锁] [Steam] [Windows SteamOS macOS]
  • Python(14)推导式
  • PCI/PXI 总线的可编程电阻卡
  • JVM模型、GC、OOM定位
  • leetcode 876. 链表的中间结点
  • 云上玩转DeepSeek系列之六:DeepSeek云端加速版发布,具备超高推理性能
  • SpringBoot实现接口防刷的5种高效方案详解
  • 安装qt4.8.7
  • cuDNN 安装、版本查看及指定版本删除操作指南
  • 社交电商和泛娱乐平台出海南美市场支付方式与策略
  • 人工智能搜索时代:如何优化SEO以保持领先
  • Context7 MCP:提供实时、版本特定的文档以解决AI幻觉问题
  • 【爬虫】一文掌握 adb 的各种指令(adb备忘清单)
  • 普发ASM392EUV检漏仪维修说明手测内容可目录
  • 厚铜pcb生产厂家哪家好?
  • 媒资管理之视频管理
  • 【Python数据驱动决策】数据分析与可视化全流程实战指南
  • django admin.E035 处理办法
  • 数据结构每日一题day12(链表)★★★★★
  • 西班牙葡萄牙遭遇史上最严重停电:交通瘫了,通信崩了,民众疯抢物资
  • 上海出台灵活就业人员公积金新政:不限户籍、提取自由,6月起施行
  • 我的科学观|张峥:AI快速迭代,我们更需学会如何与科技共处
  • 因高颜值走红的女通缉犯出狱后当主播自称“改邪归正”,账号已被封
  • 商务部:入境消费增长潜力巨大,离境退税有助降低境外旅客购物成本
  • 同款瑞幸咖啡竟差了6元,开了会员仍比别人贵!客服回应