MQTT 协议是当今世界上最受欢迎的物联网协议。MQTT 协议为设备提供了稳定、可靠、简单易用的通信基础,截至目前通过 MQTT 协议连接的设备已经过亿,广泛应用于 IoT、M2M 等领域。本篇将从最基础的知识开始,向您讲解 MQTT 协议的原理与应用。

        目前 MQTT 主流版本有 MQTT3.1.1 和 MQTT5。MQTT5 完全兼容 MQTT3.1.1,是在 MQTT3.1.1 的基础上进行完善补充。目前 MQTT3.1.1 的使用人数还是更多,所以本文用 MQTT3.1.1 来讲解。


MQTT的基本介绍

        MQTT(Message Queing Telemetry Transport,消息队列遥测传输)协议是基于 TCP / IP协议栈构建的 异步通信消息协议,是一种 轻量级 的、客户端服务端 架构的、发布/订阅模式的消息传输协议。

疑问:TCP和IP协议?

        关于MQTT协议的常识:MQTT协议最初是为了利用卫星通讯监测输油管道开发的协议,由此可见,MQTT就是专门为低带宽、高延迟或不可靠的网络设计的。

MQTT协议特点

简单易用,方便集成;

安全可靠,支持TLS/SSL加密和认证机制;

轻量级,占用带宽小,支持多种消息传输模式;

灵活性,可知设备连接状态,可控数据传输质量。

 MQTT原理

        在 MQTT 协议通讯中,最重要的两个角色是服务端和客户端。客户端向一「主题」「发布」消息,服务端处理并推送给「订阅」了该「主题」的其他客户端。

为方便理解通过案例来进行介绍:

  •  假如你是张三,一名普通的抖音用户,你关注了李四的抖音账号。在这里,张三跟李四不会直接产生关系,而是会通过抖音服务器。抖音服务器就是「服务端」,所有抖音用户就是「客户端」,你关注李四的这个动作,就叫作「订阅」。
  • 李四如果「发布」了一条视频,那么张三、王五、老六,等等所有关注了李四的粉丝都会收到这个视频推送。这是因为抖音里没有主题的概念,只要李四有发视频,粉丝都会收到推送。

        假如抖音也有主题的概念,发布的视频都带有主题的属性。那么,许有杨发布了编程、副业、职场、吃喝拉撒相关主题的视频,而张三只订阅了吃喝拉撒这个「主题」,那么只有当许有杨发布了吃喝拉撒这个主题的视频,张三才会收到这个视频。而如果发布了编程、副业相关的视频,张三不会收到任何通知。 

服务端

MQTT服务端通常是一台服务器,它充当着MQTT信息传输的中心节点。其主要功能:

  • 接受来自MQTT客户端的消息并将其 传递 给其他MQTT客户端;
  • MQTT服务端还负责 管理客户端,确保客户端之间的通讯畅通无阻;
  • 保证MQTT消息被正确接收和准确投递。

        服务端一般就是云平台,OneNET、阿里云、腾讯云等;也可以用 EMQ 或 Mosquitto 自己搭建服务端。 

疑问:如何通过EMQ或Mosquitto自己搭建服务器?

客户端

        MQTT 客户端可以向服务端通过「发布」发送信息,也可以从服务端「订阅」来收取信息。

        客户端一般就是我们的单片机,STM32、C51、树莓派等。

主题

        在 MQTT 通讯中,客户端订阅的是一个个「主题」。MQTT 服务端在管理信息通讯时,使用「主题」来控制。

发布与订阅的特点

  • 相互独立:客户端相互独立,彼此没有直接联系,不用知道对方的任何状态、情况。
  • 空间分离:客户端只要连接同一个 MQTT 通讯网络,无论是互联网或者局域网都可以通讯。
  • 时间异步:客户端的发布与订阅无需同步。若有客户端断连,服务端保存信息,待客户端上线后推送。

MQTT报文

一个MQTT报文由3部分组成:固定报头、可变报头和有效载荷(消息体)三部分组成;

  • 固定报头:所有的MQTT报文都存在,表示 报文类型 及 报文的分组 类标识
  • 可变报头(由固定报头的报文类型决定是否存在及具体内容):部分MQTT报文有,报文类型 决定了 可变头是否存及其具体内容;
  • 有效载荷/消息体:部分MQTT报文有,存放报文的具体内容。

示意图如下所示:

固定报头(Fixed header) 

  • 消息类型(报文类型) :

位于 byte 1 的第 7~4 位,表示 MQTT 报文类型:

名称 报文流动方向 描述
Reserved 0 禁止 保留位
CONNECT 1 客户端到服务器 客户端请求连接到服务器
CONNACK 2 服务器到客户端 连接确认
PUBLISH 3 双向 发布消息
PUBACK 4 双向 发布确认
PUBREC 5 双向 发布收到(保证第1部分到达)
PUBREL 6 双向 发布释放(保证第2部分到达)
PUBCOMP 7 双向 发布完成(保证第3部分到达)
SUBSCRIBE 8 客户端到服务器 客户端请求订阅
SUBACK 9 服务器到客户端 订阅确认
UNSUBSCRIBE 10 客户端到服务器 请求取消订阅
UNSUBACK 11 服务器到客户端 取消订阅确认
PINGREQ 12 客户端到服务器 心跳请求
PINGRESP 13 服务器到客户端 心跳响应
DISCONNECT 14 客户端到服务器 中断连接
Reserved 15 禁止 保留位
  • 标志位(DUP、QoS Level、RET)

位于 byte 1 的第 3~0 位,表示 MQTT 报文的分组 类标识

DUP 发布消息的 副本
QoS

发布消息的 服务质量

RETAIN 发布 保留标识
控制报文 固定报头标志 Bit 3 Bit 2 Bit 1 Bit 0
CONNECT 保留位 0 0 0 0
CONNACK 保留位 0 0 0 0
PUBLISH MQTT 3.1.1使用 DUP1 QoS2 QoS2 RETAIN3
PUBACK 保留位 0 0 0 0
PUBREC 保留位 0 0 0 0
PUBREL 保留位 0 0 1 0
PUBCOMP 保留位 0 0 0 0
SUBSCRIBE 保留位 0 0 1 0
SUBACK 保留位 0 0 0 0
UNSUBSCRIBE 保留位 0 0 1 0
UNSUBACK 保留位 0 0 0 0
PINGREQ 保留位 0 0 0 0
PINGRESP 保留位 0 0 0 0
DISCONNECT 保留位 0 0 0 0
  •  剩余长度(Remaining Length)

        位于 byte 2 的第 3~0 位,表示当前剩余字节数,包括 可变报头 和 负载的数据。剩余长度不包括用于编码剩余长度字段本身的字节数。

可变报头(Variable header)

        某些 MQTT 报文有可变头。它在固定头和有效载荷之间。可 变头的内容根据报文类型的不同而不同。可变头的报文标识符字段存在于在多个类型的报文里。

可变报头在后续的报文案例中会详细介绍。

有效载荷(Payload)

        有效载荷就是 应用消息,但并不是所有的报文都有有效载荷,只有部分 MQTT 报文有有效载荷,具体如下:

控制报文 有效载荷 控制报文 有效载荷
CONNECT 需要 SUBSCRIBE 需要
CONNACK 不需要 SUBACK 需要
PUBLISH 可选 UNSUBSCRIBE 需要
PUBACK 不需要 UNSUBACK 不需要
PUBREC 不需要 PINGREQ 不需要
PUBREL 不需要 PINGRESP 不需要
PUBCOMP 不需要 DISCONNECT 不需要

 QoS,服务质量 

简介:QoS(Quality of Service,服务质量)。在数据通信的过程中,有的消息很重要,不可以丢失;有的消息不重要,丢了也没关系。所以在 MQTT 中可以配置 QoS,给不同重要的消息不同的服务质量。

MQTT 协议有三种服务质量级别:

QoS = 0:最多发一次

QoS = 1:最少发一次

QoS = 2:保证收一次

对于不同重要级的消息选择不同的 QoS,较为重要消息的使用 QoS = 1 和 QoS = 2。

QoS = 0:最多发一次

        这种服务质量消息最多只发送一次。接收者不会发送响应,发送者也不会重试。消息可能送达一次也可能根本没送达。

        想象你是一个快递员,QoS = 0(最多发一次)相当于你将包裹送给收件人后,没有任何确认回执。你只是简单地把包裹放在门口,然后离开。在这种情况下,你无法确定包裹是否成功被收件人接收,也无法知道是否有其他人偷了这个包裹。

 QoS = 1:最少发一次 

        服务质量确保消息至少送达一次。QoS = 1 的 PUBLISH 报文的可变报头中包含一个报文标识符,需要 PUBACK 报文确认

        QoS = 1(最少发一次)相当于你在送货后要求收件人给你一个回执确认。你将包裹送给收件人,然后等待他给你一个回执,告诉你已经收到包裹。如果你没有收到回执,你会重新尝试送货,直到收到回执为止。这样,你可以确保包裹被收件人接收,但可能会增加一些延迟和工作量。

QoS = 2:保证收一次 

        这是最高等级的服务质量,消息丢失和重复都是不可接受的。使用这个服务质量等级会有额外的开销。QoS = 2 的消息可变报头中有报文标识符。QoS = 2 的 PUBLISH 报文的接收者使用一个两步确认过程来确认收到。

        QoS = 2(保证收一次)相对于你要确认对方可以收货再发货。你在送货前给收件人发消息问他在不在家,收件人告诉你他在家,你把将包裹送给收件人,然后等待他给你一个回执,告诉你已经收到包裹。如果他没回消息,不在家,就继续发消息直到收件人回消息,告诉你他在家,再送包裹。

MQTT心跳机制 

为了维持正常的运行状态和连接的稳定性。

基本的原理如下:

        这种机制就像是我们的心脏,MQTT客户端定期发送心跳包,定时的向服务器发送信号,表明自己的存在和健康状况。如果服务器在一定的时间内没有接收到心跳包,服务端就会认为客户端出现异常或者离线。

        客户端定时向服务端发送心跳请求(PINGREQ),告诉服务端,我和你还在连接着;服务端收到心跳请求后,会回复一条心跳响应(PINGRESP),告诉客户端,我知道你和我存在连接。示意图如下所示:

作用:通过心跳机制,服务端可以实时的检测客户端的连接状态,及时发现和处理异常情况,确保通信的可靠性和稳定性。 MQTT的心跳机制也是保障通信链路顺畅运行的重要机制之一。

MQTT遗嘱 

        可以和生活中的人的行为进行对比,当客户端不断的发送心跳包时,证明客户端还连接着服务端;当客户端不发送心跳包后,表明客户端和服务端断开连接,这时客户端[活着]之前向服务端设置发送的遗嘱消息,有服务端进行执行。

        MQTT遗嘱是一种机制,允许客户端在「活着」的时候设置并发送遗嘱消息,以便在客户端意外断线时由服务端公布。

        意外断线指的是当客户端在没有发送 DISCONNECT 报文的情况下失去了心跳信号,这通常发生在网络故障或电池耗尽等情况下。此时,服务端会察觉到客户端的异常断开,并将客户端的遗嘱消息发布出来。然而,如果客户端正常断开连接并发送了 DISCONNECT 报文,遗嘱则不会启动,服务端也不会发布客户端的遗嘱消息。

        通过合理设置和使用 MQTT 遗嘱机制,可以增强客户端在服务端管理中的作用,并提供实时的设备状态信息。

服务端搭建(OneNET配置)

点开 OneNET 官方网址:中移坤灵 - 中国移动物联网开放平台 (10086.cn)

第一步:登录账号(若没有进行创建)

第二步:创建一个产品,具体步骤如下所示:

配置这个产品的属性(这个产品实现的功能) 

 第三步:创建该产品下的具体设备

 第四步:查看创建的产品和设备的详细信息

 最后:保存好【MQTT三元组】

  • 设备 ID : vibrate01
  • 产品 ID :iD3cXFKFPz
  • 设备密钥:eW1IckdEeUZjbW9aY05QNUJUdmZWSVEyNWE4eDRQWk0=

连接服务端报文和响应

CONNECT报文 

PS:我们都知道,在计算机内部计算、通信只有 0 和 1 ,但是二进制对于程序员表达来说并不方便,常用的是十六进制,因为一个两位的十六进制数刚好可以表达八位二进制数也就是一比特的值,所以我们的例子会以十六进制来表示二进制报文。

在此附上一张 ASCII 码表,里面包含了常用的字符、十六进制、二进制转换。

CONNECT报文的作用:用于客户端请求连接到服务端。ba00

CONNECT报文的组成:固定报头、可变报头、有效载荷三部分的十六进制数拼接起来就是CONNECT报文。示意如下:

CONNECT 报文 = 固定报头 + 可变报头 + 有效载荷 的十六进制数

        是不是有点像火车?一列火车拉了三个车厢,每个车厢拉的是固定报头、可变报头、有效载荷的十六进制数。

固定报头

固定报头由2个字节组成,结构如下图: 

        接着以 CONNECT 报文为例,结构如下图,固定报头中 消息类型是1,标志位是 0000,所以第一字节是 0001 0000,转成十六进制是 10剩余长度未知,所以 CONNECT 报文固定报头的十六进制是10 XX XX。

可变报头

可变报头由 协议名、协议级别、连接标志、保持连接 四个部分组成。基本是 10个字节 

以 CONNECT 报文为例:

  • 协议名:(6byte)

        byte 1 和 byte 2 表示协议名后面部分的数据长度,这个长度是固定的 4 位。所以 byte 1 保存 00 ,byte 4 保存 04 (16进制)。

        而 byte 3~6 则固定为 M Q T T 这四个字符,每个字节保存成一个字符,对应就是 4D 51 54 54

所以协议名转成十六进制是 00 04 4D 51 54 54。

  • 协议级别(1 byte)

        只由1个字节组成,对于协议 MQTT3.1.1 协议级别的值是  4 ,结构如下图,

转成十六进制是  04

  • 连接标志(1byte)

        也只由1个字节组成,包含一些用于指定 MQTT 连接行为的参数,还指出有效载荷中的字段是否存在。结构如下图,×  是表示不固定,可能是 0,也可能是 1,但是保留位(Reserved)必须是0。

从 0 到 7 位分别是 保留位(Reserved)清理会话(Clean Session)遗嘱标志(Will Flag)遗嘱 QoS(Will QoS)遗嘱保留(Will Retain)密码标志(Password Flag)用户名标志(User Name Flag)。对应位填 1 则表示有填 0 则无。 

举例:这里我们假设连接标志有用户名标志(User Name Flag)、密码标志(Password Flag)和清理会话(Clean Session):

那么连接标志的二进制表示是 1100 0010转成十六进制是  C2

  • 保持连接:(2byte)

        由 2 个字节组成,是一个 以秒为单位的时间间隔,表示为一个 16 位的字,它是指在客户端传输完成一个控制报文的时刻到发送下一个报文的时刻,两者之间允许空闲的最大时间间隔。

假设 空闲的最大时间间隔是 100 秒(十进制),则结构如下图,转成十六进制 00 64。 

 综上所述:

        我们刚刚已经以 CONNECT 报文的可变头为例,将协议名、协议级别、连接标志、保持连接都给出了例子,写出了十六进制。我们把这些十六进制数按顺序组合起来,CONNECT 报文的可变头的十六进制是:00 04 4D 51 54 54 04 C2 00 64

有效载荷

        以 CONNECT 报文为例:有效载荷 = 设备 ID + 产品 ID + token。设备 ID、产品 ID、token 是在 MQTT 协议中用于服务端与客户端对接的参数,缺一不可。

保存好【MQTT三元组】

  • 设备 ID : vibrate01
  • 产品 ID :iD3cXFKFPz
  • 设备密钥:eW1IckdEeUZjbW9aY05QNUJUdmZWSVEyNWE4eDRQWk0=
  • 设备ID :

  • 产品ID: 

  • token: 

设备密钥需要经过加密,加密需要用 OneNET 官方的 token 生成工具

官网下载地址:OneNET - 中国移动物联网开放平台 (10086.cn)

软件的各个参数如下: 

名称 类型 参数说明 参数示例
res string

访问资源 resource 格式为:

products/{产品id}/devices/{设备名}

products/iD3cXFKFPz/devices/vibrate01
et int 访问过期时间,单位秒,unix 时间。当一次访问参数中的 et 时间小于当前时间时,平台会认为访问参数过期从而拒绝该访问 2017881776 表示:北京时间 2033-12-11 10:42:56
key string MQTT 三元组的设备密钥 eW1IckdEeUZjbW9aY05QNUJUdmZWSVEyNWE4eDRQWk0=
method string 加密方式,支持 hmacmd5、hmacsha1、hmacsha256 md5(代表使用hmacmd5算法)
sha1(代表使用hmacsha1算法)用这个
sha256(代表使用hmacsha256 算法)
version string 参数组版本号,日期格式,目前仅支持"2018-10-31" 2018-10-31

 完成后,如下所示:

最后 有效载荷 的十六进制数为: 00 09 76 69 62 72 61 74 65 30 31 00 0A 69 44 33 63 58 46 4B 46 50 7A  00 80 76 65 72 73 69 6F 6E 3D 32 30 31 38 2D 31 30 2D 33 31 26 72 65 73 3D 70 72 6F 64 75 63 74 73 25 32 46 69 44 33 63 58 46 4B 46 50 7A 25 32 46 64 65 76 69 63 65 73 25 32 46 76 69 62 72 61 74 65 30 31 26 65 74 3D 32 30 31 37 38 38 31 37 37 36 26 6D 65 74 68 6F 64 3D 73 68 61 31 26 73 69 67 6E 3D 46 4B 4A 6F 51 6A 37 31 77 6E 68 77 55 61 4B 4C 43 66 44 74 75 47 46 31 53 37 34 25 33 44

 剩余长度 和 组装CONNECT报文

  • 剩余长度:

        XX 是剩余长度,表示当前剩余字节数,包括 可变报头 和 负载的数据的长度。剩余长度不包括用于编码剩余长度字段本身的字节数。简单来说,XX 后面有多少个字符,XX 就是多少,剩余长度就是多少。

如何将159转化成 所要求的表达形式?

        这是因为剩余长度是使用变长度编码方案,低 7 位(第 0 到 6 位)用于编码数据,最高有效位(第 7 位)用于表示是否有更多字节,最大可以使用 4 个字节

举例:

  • byte1第7位为0时:

 取值范围:1~128

  • byte1 第7位为1时:

取值范围128~16383 

  • byte2和byte3第7位为1时:

byte3时,取值范围:16383~2097151 ;byte4时,取值范围:16383~2097152

由上述可知剩余长度163的表达方式如下:

由上述可知,可变报头和有效载体的的字节数量是163,故剩余长度转化成二进制为:

1010 0011 ----> A3

0000 0001 ----> 01

  • 组装 CONNECT报文

由上面的讲解我们可知:

  • 一条 CONNECT 报文是以 固定报头 + 可变报头 + 有效载荷 三部分组成。
  • CONNECT 报文固定报头的十六进制是:10 A3 01
  • CONNECT 报文的可变头的十六进制是:00 04 4D 51 54 54 04 C2 00 64
  • CONNECT 报文的有效载荷的十六进制是:00 09 76 69 62 72 61 74 65 30 31 00 0A 69 44 33 63 58 46 4B 46 50 7A  00 80 76 65 72 73 69 6F 6E 3D 32 30 31 38 2D 31 30 2D 33 31 26 72 65 73 3D 70 72 6F 64 75 63 74 73 25 32 46 69 44 33 63 58 46 4B 46 50 7A 25 32 46 64 65 76 69 63 65 73 25 32 46 76 69 62 72 61 74 65 30 31 26 65 74 3D 32 30 31 37 38 38 31 37 37 36 26 6D 65 74 68 6F 64 3D 73 68 61 31 26 73 69 67 6E 3D 46 4B 4A 6F 51 6A 37 31 77 6E 68 77 55 61 4B 4C 43 66 44 74 75 47 46 31 53 37 34 25 33 44

求 CONNECT 报文?

答:CONNECT 报文:10 A3 01 00 04 4D 51 54 54 04 C2 00 64 00 09 76 69 62 72 61 74 65 30 31 00 0A 69 44 33 63 58 46 4B 46 50 7A  00 80 76 65 72 73 69 6F 6E 3D 32 30 31 38 2D 31 30 2D 33 31 26 72 65 73 3D 70 72 6F 64 75 63 74 73 25 32 46 69 44 33 63 58 46 4B 46 50 7A 25 32 46 64 65 76 69 63 65 73 25 32 46 76 69 62 72 61 74 65 30 31 26 65 74 3D 32 30 31 37 38 38 31 37 37 36 26 6D 65 74 68 6F 64 3D 73 68 61 31 26 73 69 67 6E 3D 46 4B 4A 6F 51 6A 37 31 77 6E 68 77 55 61 4B 4C 43 66 44 74 75 47 46 31 53 37 34 25 33 44

CONNECT报文发送

        我们还没有写单片机的程序,所以使用网络调试助手当作客户端,展示一下 CONNECT 报文效果。我们首先要知道 OneNET 服务器地址是 mqtts.heclouds.com : 1883,地址是从 OneNET 文档中心得到的。

  • 网络调试助手 配置

问题:自己创建的产品和设备,与服务端连接失败? 

原因:剩余长度计算错误,有效载荷中的产品ID转化成16进制出现错误,以及有效载荷中的token的长度计算错误。

成功连接后,会收到20 02 00 00的报文: 

此时创建的设备处于在线的状态: 

CONNACK报文

服务端向客户端发送的响应反馈,只有固定报头和可变报头。 

  • 固定报头

        固定报头中 消息类型是 2,标志位是 0000,所以 1byte 是 00100000,转成十六进制是 20

        剩余长度为2(后面只有两字节的可变报头),所以 CONNACK 报文固定报头的十六进制是:20 02

  • 可变报头 

        CONNACK 报文的可变报头比 CONNECT 报文简单很多,只由 连接确认标志  连接返回码构成,结构如下图,× 是表示不固定,可能是0,也可能是1:

连接确认标志: 

        位 7-1 是保留位且必须设置为 0。第 0 (SP)位是 当前会话(Session Present)标志,如果 没有保存的会话状态 或 收到清理会话命令 则为 0 ,如果 收到保存会话 则为 1

连接返回码:

 可变报头的十六进制都是未知,所以是:XX XX。


订阅报文和响应

SUBSCRIBLE报文

        SUBSCRIBE 报文是用于客户端请求订阅主题。订阅某主题之后,所有发布的该主题的消息我们都可以收到。

         一条 SUBSCRIBE 报文是以 固定报头、可变报头、有效载荷 三部分组成。

  • 固定报头

        固定报头中订阅消息类型是 8保留位必须是 0010,所以第一字节是 10000010,转成十六进制是 82,剩余长度未知,所以 SUBSCRIBE 报文固定报头的十六进制是:82 XX。 

  • 可变报头: 

        设置了报文标识符,相对于给报文起个名字。下图来自 MQTT 官方文档,它的示例是报文标识符是 10,大家可以设成自己喜欢的,也可以按照示例,我按照示例来讲解,于是

转成十六进制是 00 0A。 

  • 有效载荷:

有效载荷由 主题过滤器 和 服务质量 要求两部分组成,结构如下图。

如何查找主题过滤器?

 主题过滤器:24 73 79 73 2F 69 44 33 63 58 46 4B 46 50 7A 2F 76 69 62 72 61 74 65 30 31 2F 74 68 69 6E 67 2F 70 72 6F 70 65 72 74 79 2F 70 6F 73 74 2F 72 65 70 6C 79

主题过滤器的长度:51 转化成16进制为 00 33

服务质量要求?

答:要求为0,则二进制是 00000000,转成十六进制是:00

总结:有效荷载(54) = 长度(2) + 主题过滤器(51) + 服务质量要求(1)

00 33 24 73 79 73 2F 69 44 33 63 58 46 4B 46 50 7A 2F 76 69 62 72 61 74 65 30 31 2F 74 68 69 6E 67 2F 70 72 6F 70 65 72 74 79 2F 70 6F 73 74 2F 72 65 70 6C 79 00


由上述可知,报文的剩余长度为 ( 可变报文  2 + 有效载荷 54) 56 ,转化成16进制 38

因此,SUBSCRIBLE报文的16进制为:82 38 00 0A 00 33 24 73 79 73 2F 69 44 33 63 58 46 4B 46 50 7A 2F 76 69 62 72 61 74 65 30 31 2F 74 68 69 6E 67 2F 70 72 6F 70 65 72 74 79 2F 70 6F 73 74 2F 72 65 70 6C 79 00

订阅成功后:


 SUBACK报文

        SUBACK,订阅确认,报文是用于服务端回复客户端表示自己收到了 SUBSCRIBE 报文。和 CONNACK 报文一样,类似于外卖送到了确认收货的动作。

一条 SUBACK 报文由固定报头、可变报头、有效载荷三部分组成。

  • 固定报头

        固定报头中消息类型是 9,标志位是 0000,所以第一字节是 10010000,转成十六进制是 90,剩余长度为3(后面固定有三字节)

        所以 CONNACK 报文固定报头的十六进制是:90 03。 

  • 可变报头

是这个名字的报文被我确认了。我们刚刚的例子报文标识符是 00 0A,所以这里也是 00 0A

  • 有效载荷 

有效载荷只有一字节的返回码,我们发送的服务质量等级是多少,返回码就是多少。 

  • 0x00 - 最大 QoS 0

  • 0x01 - 成功 – 最大 QoS 1

  • 0x02 - 成功 – 最大 QoS 2

  • 0x80 - Failure 失败

90 03 00 0A XX。


取消订阅报文和响应

 UNSUBSCRIBE报文

        UNSUBSCRIBE 报文,客户端向服务端发送,用于取消订阅主题,取消订阅某主题后,就不会收到该主题的新消息了。

        一条 UNSUBSCRIBE 报文是以 固定报头、可变报头、有效载荷 三部分组成。

  • 固定报头:

        固定报头中 消息类型是 10,标志位是 0010,所以第一字节是 10100010,转成十六进制是 A2,剩余长度未知,所以 UNSUBSCRIBE报文固定报头的十六进制是:A2 XX

  • 可变报头:

大家可以随意取名字,这里我们就使用:00 0B

  • 有效载荷:

        那就是你 想要取消订阅的主题,以长度为前缀。这里和 SUBSCRIBE 报文不一样的是没有了服务质量要求

示例:

我们取消订阅刚刚订阅的 $sys/iD3cXFKFPz/vibrate01/thing/property/post/reply 。

转成十六进制,加上长度前缀是:00 33 24 73 79 73 2F 69 44 33 63 58 46 4B 46 50 7A 2F 76 69 62 72 61 74 65 30 31 2F 74 68 69 6E 67 2F 70 72 6F 70 65 72 74 79 2F 70 6F 73 74 2F 72 65 70 6C 79。


由上述可知,报文的剩余长度为 ( 可变报头  2 + 有效载荷 53) 55 ,转化成16进制 37

因此,SUBSCRIBLE报文的16进制为:A2 37 00 0B 00 33 24 73 79 73 2F 69 44 33 63 58 46 4B 46 50 7A 2F 76 69 62 72 61 74 65 30 31 2F 74 68 69 6E 67 2F 70 72 6F 70 65 72 74 79 2F 70 6F 73 74 2F 72 65 70 6C 79


UNSUBACK报文

 一条 UNSUBACK 报文由固定报头、可变报头两部分组成。

  • 固定报头

        固定报头中 消息类型是 11,标志位是 0000,所以第一字节是 10110000,转成十六进制是 B0,剩余长度为 2(后面固定有两字节)

        所以 UNSUBACK 报文固定报头的十六进制是B0 02。 

  • 可变报头

        到 UNSUBACK 报文服务端就需要返回一模一样的报文标识符,告诉客户端,是这个名字的报文被我确认了。我们刚刚的例子是 00 0B,所以这里也是 00 0B。


PUBLISH报文

         PUBLISH 报文,发布消息,是双向的,既可以客户端到服务端,也可以服务端到客户端。但是需要注意的是发布消息只能在同一产品 ID下进行,不能进行跨产品消息推送

  • 固定报头:(执行发布的命令)

结构如下图,固定报头中 消息类型是 3 ,标志位是未知,剩余长度未知

标志位填写规则如下: 

  • DUP:消息第一次发送为 0 ,如果重发为 1 。若 QoS = 0 则 DUP 必须为 0,毕竟 QoS = 0 没有请求重发。
  • QoS 等级:QoS = 0 则 为 00;如果是 01,则 QoS = 1;如果是 10,则 QoS = 2。
  • RETAIN:

        服务端发送 PUBLISH 报文给客户端时,如果消息是作为客户端一个新订阅的结果发送,报文的保留标志设为 1。当一个 PUBLISH 报文发送给客户端是因为匹配一个已建立的订阅时,服务端必须将保留标志设为 0,不管它收到的这个消息中保留标志的值是多少。

        如果客户端发给服务端的 PUBLISH 报文的保留标志位 0,服务端不能存储这个消息也不能移除或替换任何现存的保留消息。是不是听不懂,没关系,我直接告诉大家,这里设成 0 就行了

 作为例子,我让 QoS = 0,于是 PUBLISH 报文固定报头的十六进制是:30 XX

  • 可变报头:(向哪个主题发布)

可变报头由主题名报文标识符两部分组成

        主题名(即主题)用于识别有效载荷应该被发布到哪一个信息通道,必须是 UTF-8 编码的格式,也是需要以长度为前缀。

        只有当 QoS = 1 或 QoS = 2 时,报文标识符才能出现在 PUBLISH 报文中。因为 QoS0 不需要接收端返回报文,所以也就不用指明是哪条报文,也就不用给报文取名字了。

        加上长度(45)前缀 00 2D,可变报头为:00 2D 24 73 79 73 2F 69 44 33 63 58 46 4B 46 50 7A 2F 76 69 62 72 61 74 65 30 31 2F 74 68 69 6E 67 2F 70 72 6F 70 65 72 74 79 2F 70 6F 73 74。

  • 有效载荷:(发布的内容)

我们要发布的内容就是有效载荷,其格式必须是 JSON 格式,也要转为十六进制。

注意:这里不需要以长度为前缀了,因为是JSON格式,不是UTF-8。

JSON格式规则如下:

  • 数组用方括号 [ ] 表示。
  • 对象用大括号 { } 表示。
  • 名称 / 值对组合成数组和对象(键值对)。
  • 名称置于双引号中,值有字符串、数值、布尔值、null、对象和数组。
  • 并列的数据之间用逗号 , 分隔

示例如下:

{
    "id": "123",
    "version": "1.0",
    "params": {
        "Power": {
            "value": "12345",
            "time": 1599534283111
        },
        "temp": {
            "value": 23.6,
            "time": 1599534283111
        }   
    }
}

 将创建的设备vibrate01的属性值vibrate_value进行更改:

{
	"id":"1386772172",
	"version":"1.0",
	"params":{
		"vibrate_vlaue":{
			"value":"123"
		}
	}
}

 注意:

  • 转换十六进制时要把换行符号和 tab 去掉,于是{"id":"1386772172","version":"1.0","params":{"vibrate_vlaue":{"value":"123"}}}

转成十六进制是:

7B 22 69 64 22 3A 22 31 33 38 36 37 37 32 31 37 32 22 2C 22 76 65 72 73 69 6F 6E 22 3A 22 31 2E 30 22 2C 22 70 61 72 61 6D 73 22 3A 7B 22 76 69 62 72 61 74 65 5F 76 6C 61 75 65 22 3A 7B 22 76 61 6C 75 65 22 3A 22 31 32 33 22 7D 7D 7D 。

  • 这里不需要以长度为前缀了,因为是 JSON 格式,不是 UTF-8。

剩余长度 = 可变报头(47) + 有效载荷(78)= 125 转化成 16 进制   7D

固定报头:30 7D 

可变报头:00 2D 24 73 79 73 2F 69 44 33 63 58 46 4B 46 50 7A 2F 76 69 62 72 61 74 65 30 31 2F 74 68 69 6E 67 2F 70 72 6F 70 65 72 74 79 2F 70 6F 73 74

有效载荷:7B 22 69 64 22 3A 22 31 33 38 36 37 37 32 31 37 32 22 2C 22 76 65 72 73 69 6F 6E 22 3A 22 31 2E 30 22 2C 22 70 61 72 61 6D 73 22 3A 7B 22 76 69 62 72 61 74 65 5F 76 6C 61 75 65 22 3A 7B 22 76 61 6C 75 65 22 3A 22 31 32 33 22 7D 7D 7D

因此,该报文为:30 7D 00 2D 24 73 79 73 2F 69 44 33 63 58 46 4B 46 50 7A 2F 76 69 62 72 61 74 65 30 31 2F 74 68 69 6E 67 2F 70 72 6F 70 65 72 74 79 2F 70 6F 73 74 7B 22 69 64 22 3A 22 31 33 38 36 37 37 32 31 37 32 22 2C 22 76 65 72 73 69 6F 6E 22 3A 22 31 2E 30 22 2C 22 70 61 72 61 6D 73 22 3A 7B 22 76 69 62 72 61 74 65 5F 76 6C 61 75 65 22 3A 7B 22 76 61 6C 75 65 22 3A 22 31 32 33 22 7D 7D 7D


PINGREQ报文和PINGRESP报文

        PINGREQ 报文和 PINGRESP 报文是心跳请求和心跳响应,是 MQTT 心跳机制的两个报文。客户端定时向服务端发送心跳请求(PINGREQ),告诉服务端,我还和你连接着哦。服务端收到心跳请求后,会回复一条心跳响应(PINGRESP),告诉客户端,我知道你还连着我啦。

 作用:

        MQTT 可以实时监测客户端的连接状态,及时发现和处理异常情况,确保通信的可靠性和稳定性。就像我们依赖心脏维持身体的正常运转一样,MQTT的心跳机制也是保障通信链路顺畅运行的重要机制之一。

只有一个固定报头

  • PINGREQ报文:

二进制:1100 0000 0000 0000,十六进制:C0 00

  • PINGRESP 报文

二进制:1100 0000 0000 0000,十六进制:D0 00

 MQTT编程实战

        利用代码的形式,连接onenet服务器,实现CONNECT、SUBSCRIBLE、UNSUBSCRIBLE、PUBLISH报文的代码编写。

文件代码:

  • onenet.c文件代码:
#include "onenet.h"
#include "esp8266.h"

char MQTT_ClientID[100]; //MQTT_客户端ID
char MQTT_UserName[100]; //MQTT_用户名
char MQTT_PassWord[200]; //MQTT_密码

uint8_t *mqtt_rxbuf;
uint8_t *mqtt_txbuf;
uint16_t mqtt_rxlen;
uint16_t mqtt_txlen;
uint8_t _mqtt_txbuf[512];//发送数据缓存区
uint8_t _mqtt_rxbuf[512];//接收数据缓存区

typedef enum
{
    //名字         值             报文流动方向     描述
    M_RESERVED1    =0       ,    //    禁止    保留
    M_CONNECT               ,    //    客户端到服务端    客户端请求连接服务端
    M_CONNACK               ,    //    服务端到客户端    连接报文确认
    M_PUBLISH               ,    //    两个方向都允许    发布消息
    M_PUBACK                ,    //    两个方向都允许    QoS 1消息发布收到确认
    M_PUBREC                ,    //    两个方向都允许    发布收到(保证交付第一步)
    M_PUBREL                ,    //    两个方向都允许    发布释放(保证交付第二步)
    M_PUBCOMP               ,    //    两个方向都允许    QoS 2消息发布完成(保证交互第三步)
    M_SUBSCRIBE             ,    //    客户端到服务端    客户端订阅请求
    M_SUBACK                ,    //    服务端到客户端    订阅请求报文确认
    M_UNSUBSCRIBE           ,    //    客户端到服务端    客户端取消订阅请求
    M_UNSUBACK              ,    //    服务端到客户端    取消订阅报文确认
    M_PINGREQ               ,    //    客户端到服务端    心跳请求
    M_PINGRESP              ,    //    服务端到客户端    心跳响应
    M_DISCONNECT            ,    //    客户端到服务端    客户端断开连接
    M_RESERVED2             ,    //    禁止    保留
}_typdef_mqtt_message;

//连接成功服务器回应 20 02 00 00
//客户端主动断开连接 e0 00
const uint8_t parket_connetAck[] = {0x20,0x02,0x00,0x00};
const uint8_t parket_disconnet[] = {0xe0,0x00};
const uint8_t parket_heart[] = {0xc0,0x00};
const uint8_t parket_heart_reply[] = {0xc0,0x00};
const uint8_t parket_subAck[] = {0x90,0x03};

/*
函数功能: 初始化阿里云物联网服务器的登录参数
*/

//密码
//加密之前的数据格式:  clientId*deviceName*productKey#
// *替换为DeviceName  #替换为ProductKey  加密密钥是DeviceSecret  加密方式是HmacSHA1  
//PassWord明文=  clientIdiot_devicedeviceNameiot_deviceproductKeya1VMIfYeEEE
//hmacsha1加密网站:http://encode.chahuo.com/
//加密的密钥:DeviceSecret

void mqtt_login_init(char *ProductKey,char *DeviceName,char *DeviceSecret)
{
//    sprintf(MQTT_ClientID,"%s.%s|securemode=2,signmethod=hmacsha256,timestamp=1695871022945|",ProductKey,DeviceName);
//    sprintf(MQTT_UserName,"%s&%s",DeviceName,ProductKey);
//    sprintf(MQTT_PassWord,"%s","a8921500839307ec3fedbbcd8c0cbc19f133f68c831dcad41fe13d92dc90b89d");
    sprintf(MQTT_ClientID,"%s", DeviceName);
    sprintf(MQTT_UserName,"%s", ProductKey);
    sprintf(MQTT_PassWord,"version=2018-10-31&res=products%%2F%s%%2Fdevices%%2F%s&et=2017881776&method=sha1&sign=%s",ProductKey,DeviceName,DEVICE_SECRET);
}

void mqtt_init(void)
{
    mqtt_login_init(PRODUCT_KEY,DEVICE_NAME,DEVICE_SECRET);
    //缓冲区赋值
    mqtt_rxbuf = _mqtt_rxbuf;
    mqtt_rxlen = sizeof(_mqtt_rxbuf);
    mqtt_txbuf = _mqtt_txbuf;
    mqtt_txlen = sizeof(_mqtt_txbuf);
    memset(mqtt_rxbuf,0,mqtt_rxlen);
    memset(mqtt_txbuf,0,mqtt_txlen);     //memset函数:将数字0复制到数组mqqtt_txbuf中前mqtt_txleng个元素。
    
    //无条件先主动断开
    mqtt_disconnect();
    delay_ms(100);
    mqtt_disconnect();
    delay_ms(100);
}

/*
函数功能: 登录服务器
函数返回值: 0表示成功 1表示失败
*/
uint8_t mqtt_connect(char *ClientID,char *Username,char *Password)
{
//    uint8_t i;
    uint8_t j;
    int ClientIDLen = strlen(ClientID);  //设备名长度
    int UsernameLen = strlen(Username);  //产品ID长度
    int PasswordLen = strlen(Password);  // token长度
    int DataLen;                         //剩余长度的128进制表示
    mqtt_txlen=0;
    //剩余长度(DataLen) = 可变报头+Payload  每个字段包含两个字节的长度标识
    DataLen = 10 + (ClientIDLen+2) + (UsernameLen+2) + (PasswordLen+2);
    
    //固定报头
    //控制报文类型
    mqtt_txbuf[mqtt_txlen++] = 0x10;        //MQTT Message Type CONNECT
    
    //剩余长度(不包括固定头部) ---- 循环求余法,将128进制转化成2进制
    do
    {
        uint8_t encodedByte = DataLen % 128;
        DataLen = DataLen / 128;
        // if there are more data to encode, set the top bit of this byte
        if ( DataLen > 0 )
            encodedByte = encodedByte | 128;
        mqtt_txbuf[mqtt_txlen++] = encodedByte;
    }while ( DataLen > 0 );
        
    //可变报头
    //协议名
    mqtt_txbuf[mqtt_txlen++] = 0;            // Protocol Name Length MSB    
    mqtt_txbuf[mqtt_txlen++] = 4;           // Protocol Name Length LSB    
    mqtt_txbuf[mqtt_txlen++] = 'M';            // ASCII Code for M    
    mqtt_txbuf[mqtt_txlen++] = 'Q';            // ASCII Code for Q    
    mqtt_txbuf[mqtt_txlen++] = 'T';            // ASCII Code for T    
    mqtt_txbuf[mqtt_txlen++] = 'T';            // ASCII Code for T    
    //协议级别
    mqtt_txbuf[mqtt_txlen++] = 4;                // MQTT Protocol version = 4    
    //连接标志
    mqtt_txbuf[mqtt_txlen++] = 0xc2;            // conn flags 
    mqtt_txbuf[mqtt_txlen++] = 0;                // Keep-alive Time Length MSB    
    mqtt_txbuf[mqtt_txlen++] = 100;            // Keep-alive Time Length LSB  100S心跳包  
    //有效载荷(设备ID、产品ID、token)
    mqtt_txbuf[mqtt_txlen++] = BYTE1(ClientIDLen);// Client ID length MSB    
    mqtt_txbuf[mqtt_txlen++] = BYTE0(ClientIDLen);// Client ID length LSB      
    memcpy(&mqtt_txbuf[mqtt_txlen],ClientID,ClientIDLen); //将长度为ClientDLen的ClientID的数据复制到mqtt_txbuf的数组中
    mqtt_txlen += ClientIDLen; //进行递增
    
    if(UsernameLen > 0)
    {   
        mqtt_txbuf[mqtt_txlen++] = BYTE1(UsernameLen);        //username length MSB    
        mqtt_txbuf[mqtt_txlen++] = BYTE0(UsernameLen);        //username length LSB    
        memcpy(&mqtt_txbuf[mqtt_txlen],Username,UsernameLen);
        mqtt_txlen += UsernameLen;
    }
    
    if(PasswordLen > 0)
    {    
        mqtt_txbuf[mqtt_txlen++] = BYTE1(PasswordLen);        //password length MSB    
        mqtt_txbuf[mqtt_txlen++] = BYTE0(PasswordLen);        //password length LSB  
        memcpy(&mqtt_txbuf[mqtt_txlen],Password,PasswordLen);
        mqtt_txlen += PasswordLen; 
    }    
    
    
    
//    for(i=0;i<10;i++)
//    {
        memset(mqtt_rxbuf,0,mqtt_rxlen);        //初始化接收缓冲区
        mqtt_send_data(mqtt_txbuf,mqtt_txlen);  //将connect报文发送出去,利用的ESP8266
//        for(j=0;j<10;j++)
//            printf("%c",mqtt_txbuf[j]);
    
        //等待返回值:是否是所期待的数值,等待10每次50ms
        for(j=0;j<10;j++)
        {
            delay_ms(50);
            if (esp8266_wait_receive() == ESP8266_EOK)
                esp8266_copy_rxdata((char *)mqtt_rxbuf);//若返回值,将返回值保存到mqtt_rxbuf的数值中

            //CONNECT
            if(mqtt_rxbuf[0]==parket_connetAck[0] && mqtt_rxbuf[1]==parket_connetAck[1] && mqtt_rxbuf[2]==parket_connetAck[2]) //连接成功
            {
                return 0;//连接成功
            }
        }
//    }
    return 1;
}

/*
函数功能: MQTT订阅/取消订阅数据打包函数
函数参数:
    topic       主题   
    qos         消息等级 0:最多分发一次  1: 至少分发一次  2: 仅分发一次
    whether     订阅/取消订阅请求包 (1表示订阅,0表示取消订阅)
返回值: 0表示成功 1表示失败
*/
uint8_t mqtt_subscribe_topic(char *topic,uint8_t qos,uint8_t whether)
{    
//    uint8_t i;
    uint8_t j;
    mqtt_txlen=0;
    int topiclen = strlen(topic);
    
    //剩余长度(128进制)
    int DataLen = 2 + (topiclen+2) + (whether?1:0);//可变报头的长度(2字节)加上有效载荷的长度
    
    //固定报头
    //控制报文类型
    if(whether)mqtt_txbuf[mqtt_txlen++] = 0x82; //消息类型和标志订阅
    else    mqtt_txbuf[mqtt_txlen++] = 0xA2;    //取消订阅

    //剩余长度
    do
    {
        uint8_t encodedByte = DataLen % 128;
        DataLen = DataLen / 128;
        // if there are more data to encode, set the top bit of this byte
        if ( DataLen > 0 )
            encodedByte = encodedByte | 128;
        mqtt_txbuf[mqtt_txlen++] = encodedByte;
    }while ( DataLen > 0 );    
    
    //可变报头
    mqtt_txbuf[mqtt_txlen++] = 0;            //消息标识符 MSB
    mqtt_txbuf[mqtt_txlen++] = 0x01;        //消息标识符 LSB
    //有效载荷
    mqtt_txbuf[mqtt_txlen++] = BYTE1(topiclen);//主题长度 MSB
    mqtt_txbuf[mqtt_txlen++] = BYTE0(topiclen);//主题长度 LSB   
    memcpy(&mqtt_txbuf[mqtt_txlen],topic,topiclen);
    mqtt_txlen += topiclen;
    
    if(whether)
    {
       mqtt_txbuf[mqtt_txlen++] = qos;//QoS级别
    }
    
//    for(i=0;i<10;i++)
//    {
        memset(mqtt_rxbuf,0,mqtt_rxlen);
        mqtt_send_data(mqtt_txbuf,mqtt_txlen);

        for(j=0;j<10;j++)
        {
            delay_ms(50);
            if (esp8266_wait_receive() == ESP8266_EOK)
                esp8266_copy_rxdata((char *)mqtt_rxbuf);

            if(mqtt_rxbuf[0]==parket_subAck[0] && mqtt_rxbuf[1]==parket_subAck[1]) //订阅成功               
            {
                return 0;//订阅成功
            }
        }
//    }
    return 1; //失败
}

//MQTT发布数据打包函数
//topic   主题 
//message 消息
//qos     消息等级 
uint8_t mqtt_publish_data(char *topic, char *message, uint8_t qos)
{  
    int topicLength = strlen(topic);    
    int messageLength = strlen(message);     
    static uint16_t id=0;
    int DataLen;
    mqtt_txlen=0;
    //有效载荷的长度这样计算:用固定报头中的剩余长度字段的值减去可变报头的长度
    //QOS为0时没有标识符
    //数据长度             主题名   报文标识符   有效载荷
    if(qos)    DataLen = (2+topicLength) + 2 + messageLength;       
    else    DataLen = (2+topicLength) + messageLength;   

    //固定报头
    //控制报文类型
    mqtt_txbuf[mqtt_txlen++] = 0x30;    // MQTT Message Type PUBLISH  

    //剩余长度
    do
    {
        uint8_t encodedByte = DataLen % 128;
        DataLen = DataLen / 128;
        // if there are more data to encode, set the top bit of this byte
        if ( DataLen > 0 )
            encodedByte = encodedByte | 128;
        mqtt_txbuf[mqtt_txlen++] = encodedByte;
    }while ( DataLen > 0 );    
    
    mqtt_txbuf[mqtt_txlen++] = BYTE1(topicLength);//主题长度MSB
    mqtt_txbuf[mqtt_txlen++] = BYTE0(topicLength);//主题长度LSB 
    memcpy(&mqtt_txbuf[mqtt_txlen],topic,topicLength);//拷贝主题
    mqtt_txlen += topicLength;
        
    //报文标识符
    if(qos)
    {
        mqtt_txbuf[mqtt_txlen++] = BYTE1(id);
        mqtt_txbuf[mqtt_txlen++] = BYTE0(id);
        id++;
    }
    memcpy(&mqtt_txbuf[mqtt_txlen],message,messageLength);
    mqtt_txlen += messageLength;

//    int i = 0;
//    for(i=0;i<mqtt_txlen;i++)
//        printf("%02X ", mqtt_txbuf[i]);
//    printf("\r\n");
    mqtt_send_data(mqtt_txbuf,mqtt_txlen);
    return mqtt_txlen;
}

//接收的回调函数(服务器向客户端发送的消息)
uint8_t mqtt_receive_handle(uint8_t *data_received, Mqtt_RxData_Type *rx_data)
{
    uint8_t *p;
    uint8_t encodeByte = 0;
    uint32_t multiplier = 1, Remaining_len = 0;
    uint8_t QS_level = 0;
    
    p = data_received;
    memset(rx_data, 0, sizeof(Mqtt_RxData_Type));
    
    //解析接收数据
    if((*p != 0x30)&&(*p != 0x32)&&(*p != 0x34))   //不是发布报文头
        return 1;
    
    if(*p != 0x30) QS_level = 1;    //标记qs等级不为0
    
    p++;
    //提取剩余数据长度(将2进制转化成128进制)
    do{
        encodeByte = *p++;
        Remaining_len += (encodeByte & 0x7F) * multiplier;
        multiplier *= 128;
        
        if(multiplier > 128*128*128) //超出剩余长度最大4个字节的要求,错误
            return 2;
    }while((encodeByte & 0x80) != 0);
    
    //提取主题数据长度
    rx_data->topic_len = *p++;
    rx_data->topic_len = rx_data->topic_len * 256 + *p++;
    //提取主题
    memcpy(rx_data->topic,p,rx_data->topic_len);
    p += rx_data->topic_len;
    
    if(QS_level != 0)  //跳过报文标识符
        p += 2;
    
    //提取payload(有效载荷)
    rx_data->payload_len = Remaining_len - rx_data->topic_len - 2;
    memcpy(rx_data->payload, p, rx_data->payload_len);
    
//    printf("topic: %s\r\n", rx_data->topic);
//    printf("topic_len: %d\r\n", rx_data->topic_len);
//    printf("payload: %s\r\n", rx_data->payload);
//    printf("payload_len: %d\r\n", rx_data->payload_len);

    return 0;
}

//测试的函数
void mqtt_send_response(uint8_t *id)
{
    char buf[128] = {0};
    sprintf(buf,"{\"id\":\"%s\",\"code\":200,\"msg\":\"success\"}",id);
    
    mqtt_publish_data(RELY_PUBLISH_TOPIC,(char *)buf,0);
    
    printf("\r\n发布数据:\r\n");
    printf((const char *)buf);    //发布的数据打印出来
    printf("\r\n");
}

//发送心跳包
void mqtt_send_heart(void)
{
    mqtt_send_data((uint8_t *)parket_heart,sizeof(parket_heart));
}
//取消连接
void mqtt_disconnect(void)
{
    mqtt_send_data((uint8_t *)parket_disconnet,sizeof(parket_disconnet));
}
//发送数据
void mqtt_send_data(uint8_t *buf,uint16_t len)
{
    esp8266_send_data((char *)buf, len);
}
  •  onenet.h文件代码
#ifndef _ONENET_H_
#define _ONENET_H_

#include "string.h"
#include "stdio.h"
#include "stdlib.h"
#include "stdarg.h"
#include "delay.h"

#define BYTE0(dwTemp)       (*( char *)(&dwTemp))
#define BYTE1(dwTemp)       (*((char *)(&dwTemp) + 1))
#define BYTE2(dwTemp)       (*((char *)(&dwTemp) + 2))
#define BYTE3(dwTemp)       (*((char *)(&dwTemp) + 3))
    
extern char MQTT_ClientID[100]; //MQTT_客户端ID
extern char MQTT_UserName[100]; //MQTT_用户名
extern char MQTT_PassWord[200]; //MQTT_密码

typedef struct
{
    uint8_t topic[512];
    uint16_t topic_len;
    uint8_t payload[512];
    uint16_t payload_len;
} Mqtt_RxData_Type;

//云服务器的设备证书
#define PRODUCT_KEY "pC0uTV161W"
#define DEVICE_NAME "dht11_01"
#define DEVICE_SECRET "75AKO7FD5KBEuSJ6BTDLPFC227w%3D"

//订阅与发布的主题
#define RELY_PUBLISH_TOPIC  "$sys/pC0uTV161W/dht11_01/thing/property/set_reply"  //属性设置应答订阅主题,onenet studio定义好的
#define SET_TOPIC  "$sys/pC0uTV161W/dht11_01/thing/property/set"
#define POST_TOPIC "$sys/pC0uTV161W/dht11_01/thing/property/post"

//事件上报主题
#define EVENT_PUBLISH_TOPIC   "$sys/pC0uTV161W/dht11_01/thing/event/post"  //发布主题,onenet studio定义好的

//阿里云用户名初始化  --- MQTT三元组保存到数组当中
void mqtt_login_init(char *ProductKey,char *DeviceName,char *DeviceSecret);

//MQTT协议相关函数声明
uint8_t mqtt_publish_data(char *topic, char *message, uint8_t qos);
uint8_t mqtt_subscribe_topic(char *topic,uint8_t qos,uint8_t whether);
void mqtt_init(void);
uint8_t mqtt_connect(char *ClientID,char *Username,char *Password);
void mqtt_send_heart(void);
void mqtt_disconnect(void);
void mqtt_send_data(uint8_t *buf,uint16_t len);
void mqtt_send_response(uint8_t *id);
uint8_t mqtt_receive_handle(uint8_t *data_received, Mqtt_RxData_Type *rx_data);
#endif

 总结:

  • 关于Sprintf函数、memset函数、memcpy函数的使用方法和作用;
  • 关于进制转化的代码实现;
  • 对于所定义的数组进行初始化;
Logo

智能硬件社区聚焦AI智能硬件技术生态,汇聚嵌入式AI、物联网硬件开发者,打造交流分享平台,同步全国赛事资讯、开展 OPC 核心人才招募,助力技术落地与开发者成长。

更多推荐