Android 消息队列之MQTT的使用(二):会话+消息过期机制,设备远程控制,批量控制实现
目录
一、实际应用场景
- 室内温湿度数据上传
- 设备远程控制
- 批量控制实现
二、会话管理、消息过期设置
4.1 会话管理
- Clean Session参数
- 新旧会话模式对比
- 典型应用场景
4.2 消息过期设置
- MQTT 5.0消息过期机制
- Message Expiry Interval属性
- QoS级别影响
三、实际应用场景
3.1 室内温湿度数据上传
应该如何设计呢?比如我现在有十台设备,数据都要上传到后台服务器:
- 主题:比如你温湿度数据发送到哪个主题,后台服务器这边就会订阅这个主题,
- 数据:数据里面需要保护设备的唯一号,这样服务器才知道是哪台设备发上来的。
# 设备端:每隔30秒发布数据
client.publish("envmon/D001/sensor", json.dumps(data), qos=1)# 服务端:订阅所有设备的传感器主题,使用通配符
mosquitto_sub -t "envmon/+/sensor" -v
3.2 后台服务器重启温湿度器
比如有时候,想远程在移动端或web端,关掉温湿度器,那么应该如何设计呢?
- 主题:客户端这边订阅指定的主题,一般是以设备唯一号作为主题,后台会往这个主题里面发送消息。这样就能精准的发送到指定的设备上去。
- 数据:数据格式,{功能码:开关码}。因为可能会有很多远程功能,所以我们需要设计功能码。
服务端:
client.publish("envmon/D001/control", xxx, qos=2)客户端:
client.subscribe("envmon/D001/control", qos=2)
3.2 如果我想实现批量重启呢?或者批量关闭温湿度器呢?
客户端可以订阅多个主题
,通过多次调用 subscribe()
方法实现多主题订阅。
// 订阅温湿度数据主题
subscribe("envmon/D001/sensor", 1);
subscribe("envmon/D002/sensor", 1);// 订阅设备控制主题
subscribe("envmon/D001/control", 2);
subscribe("envmon/D002/control", 2);
这里,我们要实现的批量,可以使用通配符
:
- 客户端
// 订阅所有设备的传感器数据(envmon/任何设备ID/sensor)
subscribe("envmon/+/sensor", 1);
- 服务端:发送所有设备就都能收到。
envmon/D001/sensor
如果说,要分组,或者勾选,也可以订阅单条的来进行发送。
四、会话、消息过期设置
4.1 会话
会话,指的是MQTT客户端与MQTT服务端之间的连接。
会话一般可以设置两个参数
- 每次打开是否是一个全新的会话,还是延续上一次的会话。
- 会话的状态会维持多久。
参数一:是否延续会话(Clean Session)
两种模式对比
场景 | Clean Session=true(新会话) | Clean Session=false(延续会话) |
---|---|---|
设备断网后重连 | 丢失所有订阅和未读消息 | 自动恢复订阅列表和离线期间消息 |
适用设备 | 临时设备(如一次性传感器) | 重要设备(如智能门锁) |
服务器资源消耗 | 低 | 高(需存储消息) |
典型代码场景 | setCleanSession(true) | setCleanSession(false) |
实际案例:
智能门锁(Clean Session=false):
- 门锁断网时,用户通过APP发送开锁指令
- 服务器将指令暂存在「会话快递柜」
- 门锁网络恢复后,立即收到开锁指令
参数二:会话有效期(Session Expiry)
设置方式
// 设置会话保留时间为1小时(单位:秒)
options.setSessionExpiryInterval(3600);
关键行为规则
有效期设置 | 服务器行为 |
---|---|
0 | 关闭连接立即销毁会话(等同 Clean Session=true) |
3600(1小时) | 设备离线后,会话保留1小时,超时后自动清除 |
0xFFFFFFFF | 永久保留会话(慎用!会导致服务器内存泄漏) |
也就是,即使Clean Session=false(延续会话),会话有效期设置为60秒,如果离线超过60秒,那么也会清空之前的消息。
4.2 消息过期设置
MQTT 5.0 协议新增了 Message Expiry Interval
属性,可直接设置消息过期时间。
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;// 创建消息并设置过期时间为60秒
MqttMessage message = new MqttMessage();
message.setPayload("Hello".getBytes());
message.setQos(1);// MQTT 5.0 属性设置
MqttProperties properties = new MqttProperties();
properties.setMessageExpiryInterval(60L); // 单位:秒
message.setProperties(properties);// 发布消息
client.publish("topic/commands", message);
2. 服务器行为(需支持MQTT5.0)
- 消息在服务器存储超过设定时间后自动删除
- 过期消息不会转发给订阅者
- QoS 2 消息在过期前会完成确认流程