1. 概述
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上。
基本原理如下图所示
MQTT协议已经成为物联网数据传输的标准,具备以下优势:
轻量、高效:IoT设备上的MQTT实施需要最少的资源,最小的MQTT控制消息可以少至2个数据字节,MQTT消息的头也很小,可以优化网络带宽。
可扩展:MQTT实施最要最少的代码:在操作中消耗的功率非常少。该协议还具有支持与大量物联网设备通信的内置功能。
可靠:许多IoT设备通过低带宽、高延迟的不可靠网络,MQTT具有内置功能,可减少IoT设备重新连接云所需时间,还定义了三种不同服务级别,确保IoT用例的可靠性。
安全:可以轻松使用现代身份验证协议(OAuth、TLS1.3)加密消息,并对设备和用户进行身份认证。
得到良好的支持:多种语言对MQTT协议的实施,提供广泛的支持。
2. 协议格式
MQTT报文由三部分组成:
Fiexd header固定报头,所有控制报文都包含
Variable header可变报头,部分控制报文包含
Payload有效载荷,部分控制报文包含
在控制报文传输中,每一个字符串都有一个两字节的长度(用了大端序序,高字节在低字节前面。高字节用MSB表示,低字节用LSB表示)字段作为前缀,这也意味着,控制报文中的字符串(注意不是有效载荷里的内容),长度最大只能到65535字节。
固定报头
第一个字节内容:控制报文类型、控制报文类型标志位
第二个字节内容:剩余长度,表示当前报文剩余部分的字节数,包括可变报头和负载的数据。但是一个字节只能表示255的长度,所以这里设计的是可变字节长度,用来表示不同长度。最大可以传输268435422个字节内容
字节数 | 最小值 | 最大值 |
---|---|---|
1 | 0(00000000) | 127(011111111) |
2 | 128(10000000,00000001) | 16383(11111111,01111111) |
3 | 16384(10000000,10000000,00000001) | 2097151(11111111,11111111,01111111) |
4 | 2097152(10000000,10000000,10000000,00000001) | 268435422(11111111,11111111,11111111,01111111) |
3. 控制报文
3.1 CONNECT
客户端到服务端的网络建立之后,客户端发送给服务端的第一个报文必须是Connect报文。在同一个TCP连接上,客户端只能发送一次Connect报文。
3.1.1 固定报文
第一个字节:报文类型为1,无标志位
3.1.2 可变报文
可变报文由四个字段,共10字节组成,按字节顺序:协议名(6字节)、协议级别(1字节)、连接标志(1字节)、保持连接(2字节)
协议名:前两个字节为协议名称长度,值为4,后四个字节为“MQTT”字符串。
协议级别:对于MQTT 3.1.1协议,值为4,如果是MQTT5.0协议,值为5。
3.1.2.1 连接标志
连接标志位比较多,具体如下所示。
Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
User Name Flag | Password Flag | Will Retain | Will QoS | Will QoS | Will Flag | Clean Sesson | Reserved | |
byte 8 | X | X | X | X | X | X | X | 0 |
清理会话标志
Clean Session设置为1,客户端和服务端必须丢弃之前任何会话,并且开始一个新的会话。
Clean Session设置为0,服务端必须基于当前会话(使用客户端标识符识别)的状态,恢复与客户端通讯。如果没有历史会话,则创建一个新的会话。在连接断开后,客户端和服务端必须保存会话信息,服务端还要将之后的QoS1和QoS2级别的消息,保存为会话状态的一部分(如果这些消息匹配断开连接客户端的订阅Topic)。
遗嘱标志
Will Flag被设置为1,表示如果连接请求被接受了,遗嘱消息(Will Message)必须被存储在服务端,并且与这个客户端网络关联。如果这个客户端网络连接关闭时,服务端必须发布这个遗嘱消息,除非客户端在DISCONNECT报文时删除了遗嘱消息。
Will Flag被设置为1,Will QoS和Will Retain字段会用到,同时有效载荷中必须包含Will Topic和Will Message。
遗嘱QoS标志
用于指定发布遗嘱消息时使用的服务质量等级,如果Will Flag为0,Will QoS也必须为0
遗嘱保留标志
Retain消息=保留消息
首先理解保留消息:如果客户端发送的消息中设置了保留标志,那么服务端必须存储这个消息和对应的服务等级,以便它可以发送给后来订阅的客户端。
一个topic只能有1条保留消息,新的保留消息会覆盖旧的。当发送保留消息是,载荷内容为空,会删除保留消息。
如果遗嘱保留标志设置为1,服务端必须将遗嘱消息当做保留消息发布。也就是说,如果这个客户端断开时,会发送遗嘱消息,同时消息也会保留起来,让后续新的客户端定于这个遗嘱topic时,也能收到这个遗嘱消息。
用户名标志
如果用户名标志被设置为1,那么有效载荷必须包含用户名字段
密码标志
如果密码标志被设置为1,那么有效载荷必须包含密码字段。如果用户标志被设置为0,密码标志也必须设置为0
3.1.2.2 保持连接
连接保持:以秒为单位的时间间隔,必须要在规定时间内发送控制报文,如果没有可以发送PINGREQ,同时服务端在1.5倍保持连接时间内没有收到客户端的控制报文,需要断开客户端的网络连接。因为保持连接的只有2字节存储,所以理论上最大的就是65535秒,也就是18小时12分。(不过实际应该不会设置这么长的时间,因为中间路由可能有过期时间)
3.1.3有效载荷
可变报文中的标志,决定是否包含这些字段,如果包含,必须按照这个顺序出现:客户端标识符、遗嘱主题,遗嘱消息,用户名,密码。
客户端标识符(ClientId)必须存在,而且必须是Coneect报文的第一个字段。不过,可以允许客户端提供一个0字节的ClientId,服务端需要分配唯一的客户端标识符给客户端。如果客户端提供了0字节的clientId,必须同时建清理会话标志设置为1。(因为会话恢复时根据clientId的)
3.2 PUBLISH
客户端向服务端,或者服务端向客户端发送的一个消息
3.2.1 固定报头
第一个字节:报文类型为3,有三个标志位:DUP、QoS等级、RETAIN。
重发标志
DUP设置为0,表示这是客户端或者服务端第一次发送这个报文。DUP被设置为1,表示这可能是一个之前发送过的报文。对于QoS为0的消息,DUP标志必须设置为0。
服务质量等级
QoS为0,最多分发一次。
QoS为1,至少分发一次。
QoS为2,只分发一次
保留标志
如果客户端发送的消息中设置了保留标志,那么服务端必须存储这个消息和对应的服务等级,以便它可以发送给后来订阅的客户端。
一个topic只能有1条保留消息,新的保留消息会覆盖旧的。当发送保留消息时,载荷内容为空,表示删除保留消息。
3.2.2可变报头
主题名:主题名必须是PUBLISH报文中可变报头的第一个字段。
报文标识符:当QoS等级是1或2时,报文标识符字段才能出现在PUBLISH报文中。
4. 会话
在物联网场景中,设备可能因为网络问题或者电源问题频繁断开连接。如果客户端和服务端总是以全新的上下文建立连接,那么会有以下问题:
客户端在重连后必须重新订阅主题才能继续接收消息,这会给服务器带来额外的开销。
客户端将会错过离线期间的消息。
QoS1和QoS2的服务质量将无法得到保证。
为此,MQTT协议设计了会话机制。MQTT会话本质上就是,一组需要服务端和客户端额外存储的上下文数据,这些数据可以仅持续与网络连接一样长的时间,也可以跨越多个连续的网络连接存在。
服务端使用ClientID来唯一标识每个会话,如果客户端想要在连接时复用之前的会话,那么必须使用与此前一致的ClientID。
在MQTT3.1.1版本中,有Clean Session的会话机制。而在MQTT5.0则升级为Clean Start和Session Expiry Interval更多灵活的机制。
5.QoS
QoS关注的是单个发送者到单个接受者的应用消息。
5.1 QoS0:最多分发一次
问题:消息可能会丢失,因为消息的可靠性完全依赖底层的TCP协议,而TCP连接出现关闭、重置,就会出现消息丢失。
消息的分发依赖底层网络的能力。接受者不会发送消息响应,发送者也不会重试。消息可能送达一次,也可能没发送达。
5.2 QoS1:至少分发一次
问题:消息不会丢失,但是可能会重复。
重复的原因:
对于发送方来说:
PUBLISH未到达接收方
PUBLISH已到达接收方,接受方的PUBACK报文未到达发送方。
虽然重传是PUBLISH报文中的DUP标志被设置为1,用于表示是一个重传报文,但是接收方无法确认,主要接收方有两种情况:
发送方确实因为没有收到PUBACK而重传PUBLISH报文,接收方收到相同PacketID,并且第二个PUBLISH报文DUP为1,确实是重复消息。
第一个PUBLISH消息完成投递,PacketID可用。发送方使用这个PacketID发送全新的PUBLISH报文,但是第一次没有发送到对端,又重新发了PUBLISH报文,此时是相同的PacketID,并且DUP为1,确实是个全新的消息。
接收方无法区分这两种情况,只能将PUBLISH报文都当作全新的报文处理,因为使用QoS1时,消息的重复在协议层面无法避免。
QoS1确保消息至少发送一次。QoS1的PUBLISH报文的可变报头中包含一个未使用的报文标识符(msgId),需要PUBACK确认。
没有要求要在发送PUBACK之前,把消息分发完。当发送者收到PUBACK时,报文标识符可以复用。
5.3 QoS2:仅分发一次
消息丢失和重复都不可接受。那么是如何保证不会重复呢?关键在于通信双发如何正确地同步释放PacketID。不论是重传消息还是发布新消息,一定是和对端达成了共识。
根据QoS2规定:一旦发送了对应的PUBREL报文后,就不能重发这个PUBLISH报文。收到PUBCOM报文后,这个报文标识符就可以重用。
因此,对于接收方来说,能够以PUBREL报文为界限,凡是在PUBREL报文之前到达的PUBLISH报文,都是重复的消息。凡是在PUBREL之后达到的PUBLISH报文,都是全新的消息。
QoS2可能的流程一:
QoS2可能的流程二:
6. Q&A
结合moqueue的实现,进一步理解MQTT协议的内容
保留消息处理逻辑?
Broker会将消息进行存储起来,以topic的维度。正常不会清理,除非发送者发送了清理保留消息。
遗嘱消息处理逻辑
在ChannelInactive(Netty监听客户端断开),如果有遗嘱消息的话,会发送遗嘱消息逻辑。也就是把遗嘱消息发送给对应的订阅者。
遗嘱保留消息处理逻辑?
没有看到处理,只有遗嘱消息的处理,在处理的时候没有再判断保留的情况。
哪些数据需要持久化?会话的信息?服务器宕机的话,怎么处理持久化信息?
保留消息、订阅消息。在moquette的实现中,还可能会创建队列,用来保存暂时无法写入的消息(缓冲区满了,窗口限流等),会在一些场景(缓冲区可写,Session会话重开的是)下重新执行。会话是基于内存存储的,服务器宕机需要重新创建。
MQTT服务端怎么做负载均衡?
协议上不支持,需要在MQTT集群前引入一个负载均衡器,可以是HAProxy 2.4 和 Nginx Plus(Nginx的付费版本)支持。不然只能当成4层转发进行处理。
QoS1/QoS2如何保障?是收到publish消息就发送其他客户端,还是怎么样?
QoS1,在broker的实现,会先pulish消息,然后在回复ACK。
在QoS2的实现上,会先回复pubrec,然后会publish消息,偏向于流程二。
实际QoS生效,不仅仅取决于发送端,还取决于订阅端。最终到订阅的QoS为min(发送端,订阅端)。实际消息发送到订阅端,就是broker发送Publish消息到订阅段,此时broker是发送端,订阅端是接收方。
7. 参考链接
MQTT3.1.1协议
MQTT5.0 : github.com/hui6075/mqt…
MQTT中文网:www.mqtt.cn/822.html