MQTT 协议简介

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(Publish/Subscribe)模式的轻量级通讯协议,该协议构建于 TCP/IP 协议上,由 IBM 在1999 年发布,目前最新版本为 v3.1.1。MQTT 最大的优点在于可以以极少的代码和有限的带宽,为远程设备提供实时可靠的消息服务。做为一种低开销、低带宽占用的即时通讯协议,MQTT在物联网、小型设备、移动应用等方面有广泛的应用,MQTT 协议属于应用层

MQTT 是一个基于客户端与服务器的消息发布/订阅传输协议。MQTT 协议是轻量、简单开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、医疗设备、智能家居、及一些小型化设备中已广泛使用。

实现 MQTT 协议需要:客户端和服务器端 MQTT 协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者,如下图所示。

在这里插入图片描述

MQTT 传输的消息分为:主题(Topic)和消息的内容(payload)两部分。
Topic:可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload)。
Payload:可以理解为消息的内容,是指订阅者具体要使用的内容。

MQTT 协议实现原理

要在客户端与代理服务端建立一个TCP连接,建立连接的过程是由客户端主动发起的,代理服务一直是处于指定端口的监听状态,当监听到有客户端要接入的时候,就会立刻去处理。客户端在发起连接请求时,携带客户端 ID、账号、密码(无账号密码使用除外,正式项目不会允许这样)、心跳间隔时间等数据。代理服务收到后检查自己的连接权限配置中是否允许该账号密码连接,如果允许则建立会话标识并保存,绑定客户端 ID 与会话,并记录心跳间隔时间(判断是否掉线和启动遗嘱时用)和遗嘱消息等,然后回发连接成功确认消息给客户端,客户端收到连接成功的确认消息后,进入下一步(通常是开始订阅主题,如果不需要订阅则跳过)。如下图所示:
在这里插入图片描述
客户端将需要订阅的主题经过 SUBSCRIBE 报文发送给代理服务,代理服务则将这个主题记录到该客户端 ID 下(以后有这个主题发布就会发送给该客户端),然后回复确认消息SUBACK报文,客户端接到 SUBACK报文后知道已经订阅成功,则处于等待监听代理服务推送的消息,也可以继续订阅其他主题或发布主题,如下图所示:
在这里插入图片描述当某一客户端发布一个主题到代理服务后,代理服务先回复该客户端收到主题的确认消息,该客户端收到确认后就可以继续自己的逻辑了。但这时主题消息还没有发给订阅了这个主题的客户端,代理要根据质量级别(QoS)来决定怎样处理这个主题。所以这里充分体现了是MQTT 协议是异步通信模式,不是立即端到端反应的,如下图所示:
在这里插入图片描述
如果发布和订阅时的质量级别 QoS 都是至多一次,那代理服务则检查当前订阅这个主题的客户端是否在线,在线则转发一次,收到与否不再做任何处理。这种质量对系统压力最小。 如果发布和订阅时的质量级别 QoS 都是至少一次,那要保证代理服务和订阅的客户端都有成功收到才可以,否则会尝试补充发送(具体机制后面讨论)。这也可能会出现同一主题多次重复发送的情况。这种质量对系统压力较大。
如果发布和订阅时的质量级别 QoS 都是只有一次,那要保证代理服务和订阅的客户端都有成功收到,并只收到一次不会重复发送(具体机制后面讨论)。这种质量对系统压力最大。

Mqttx

mqttx app下载:https://mqttx.app/zh
在这里插入图片描述
进入到创建页面后,在配置界面设置Name,Client ID , Host, Port, Username, Password等基础配置信息,然后点击MQTT X右上角的“Connect”按钮,完成MQTT客户端和MQTT服务端的连接。

在这里插入图片描述
在MQTTX客户端和MQTT服务器连接建立后,可以在连接主页面的下方的输入框内,输入 主题(Topic) 和 消息体(Payload) 后,点击右下角发送图标按钮,就可以向MQTT服务器发送测试消息了。
在这里插入图片描述
在MQTT X主程序页面点击“New Subscription”按钮
在这里插入图片描述
main.c

#include <stdio.h>
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/event_groups.h"
#include "esp_log.h"
#include "nvs_flash.h"
#include "mqtt_client.h"
#include "simple_wifi_sta.h"

static const char* TAG = "main";

#define MQTT_ADDRESS    "mqtt://broker-cn.emqx.io"     //MQTT连接地址
#define MQTT_PORT       1883                        //MQTT连接端口号
#define MQTT_CLIENT     "mqttx_d11213"              //Client ID(设备唯一,大家最好自行改一下)
#define MQTT_USERNAME   "hello1"                     //MQTT用户名
#define MQTT_PASSWORD   "12345678"                  //MQTT密码

#define MQTT_PUBLIC_TOPIC      "/test/topic1"       //测试用的,推送消息主题
#define MQTT_SUBSCRIBE_TOPIC    "/test/topic2"      //测试用的,需要订阅的主题

//定义一个事件组,用于通知main函数WIFI连接成功
#define WIFI_CONNECT_BIT     BIT0
static EventGroupHandle_t   s_wifi_ev = NULL;

//MQTT客户端操作句柄
static esp_mqtt_client_handle_t     s_mqtt_client = NULL;

//MQTT连接标志
static bool   s_is_mqtt_connected = false;

/**
 * mqtt连接事件处理函数
 * @param event 事件参数
 * @return 无
 */
static void aliot_mqtt_event_handler(void* event_handler_arg,
                                        esp_event_base_t event_base,
                                        int32_t event_id,
                                        void* event_data)
{
    esp_mqtt_event_handle_t event = event_data;
    esp_mqtt_client_handle_t client = event->client;

    // your_context_t *context = event->context;
    switch ((esp_mqtt_event_id_t)event_id) {
        case MQTT_EVENT_CONNECTED:  //连接成功
            ESP_LOGI(TAG, "mqtt connected");
            s_is_mqtt_connected = true;
            //连接成功后,订阅测试主题
            esp_mqtt_client_subscribe_single(s_mqtt_client,MQTT_SUBSCRIBE_TOPIC,1);
            break;
        case MQTT_EVENT_DISCONNECTED:   //连接断开
            ESP_LOGI(TAG, "mqtt disconnected");
            s_is_mqtt_connected = false;
            break;
        case MQTT_EVENT_SUBSCRIBED:     //收到订阅消息ACK
            ESP_LOGI(TAG, " mqtt subscribed ack, msg_id=%d", event->msg_id);
            break;
        case MQTT_EVENT_UNSUBSCRIBED:   //收到解订阅消息ACK
            break;
        case MQTT_EVENT_PUBLISHED:      //收到发布消息ACK
            ESP_LOGI(TAG, "mqtt publish ack, msg_id=%d", event->msg_id);
            break;
        case MQTT_EVENT_DATA:
            printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);       //收到Pub消息直接打印出来
            printf("DATA=%.*s\r\n", event->data_len, event->data);
            break;
        case MQTT_EVENT_ERROR:
            ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
            break;
        default:
            break;
    }
}


/** 启动mqtt连接
 * @param 无
 * @return 无
*/
void mqtt_start(void)
{
    esp_mqtt_client_config_t mqtt_cfg = {0};
    mqtt_cfg.broker.address.uri = MQTT_ADDRESS;
    mqtt_cfg.broker.address.port = MQTT_PORT;
    //Client ID
    mqtt_cfg.credentials.client_id = MQTT_CLIENT;
    //用户名
    mqtt_cfg.credentials.username = MQTT_USERNAME;
    //密码
    mqtt_cfg.credentials.authentication.password = MQTT_PASSWORD;
    ESP_LOGI(TAG,"mqtt connect->clientId:%s,username:%s,password:%s",mqtt_cfg.credentials.client_id,
    mqtt_cfg.credentials.username,mqtt_cfg.credentials.authentication.password);
    //设置mqtt配置,返回mqtt操作句柄
    s_mqtt_client = esp_mqtt_client_init(&mqtt_cfg);
    //注册mqtt事件回调函数
    esp_mqtt_client_register_event(s_mqtt_client, ESP_EVENT_ANY_ID, aliot_mqtt_event_handler, s_mqtt_client);
    //启动mqtt连接
    esp_mqtt_client_start(s_mqtt_client);
}

/** wifi事件通知
 * @param 无
 * @return 无
*/
void wifi_event_handler(WIFI_EV_e ev)
{
    if(ev == WIFI_CONNECTED)
    {
        xEventGroupSetBits(s_wifi_ev,WIFI_CONNECT_BIT);
    }
}

void app_main(void)
{
    esp_err_t ret = nvs_flash_init();
    if (ret == ESP_ERR_NVS_NO_FREE_PAGES || ret == ESP_ERR_NVS_NEW_VERSION_FOUND) {
        //NVS出现错误,执行擦除
        ESP_ERROR_CHECK(nvs_flash_erase());
        //重新尝试初始化
        ESP_ERROR_CHECK(nvs_flash_init());
    }
    s_wifi_ev = xEventGroupCreate();
    EventBits_t ev = 0;

    //初始化WIFI,传入回调函数,用于通知连接成功事件
    wifi_sta_init(wifi_event_handler);

    //一直监听WIFI连接事件,直到WiFi连接成功后,才启动MQTT连接
    ev = xEventGroupWaitBits(s_wifi_ev,WIFI_CONNECT_BIT,pdTRUE,pdFALSE,portMAX_DELAY);
    if(ev & WIFI_CONNECT_BIT)
    {
        mqtt_start();
    }
    static char mqtt_pub_buff[64];
    while(1)
    {
        int count = 0;
        //延时2秒发布一条消息到/test/topic1主题
        if(s_is_mqtt_connected)
        {
            snprintf(mqtt_pub_buff,64,"{\"count\":\"%d\"}",count);
            esp_mqtt_client_publish(s_mqtt_client, MQTT_PUBLIC_TOPIC,
                            mqtt_pub_buff, strlen(mqtt_pub_buff),1, 0);
            count++;
        }

        vTaskDelay(pdMS_TO_TICKS(2000));
    }
    return;
}

simple_wifi_sta.c

#include "simple_wifi_sta.h"
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "esp_system.h"
#include "nvs_flash.h"
#include "nvs.h"

#include "freertos/event_groups.h"
#include "esp_wifi.h"
#include "esp_event.h"
#include "esp_log.h"

//需要把这两个修改成你家WIFI,测试是否连接成功
#define DEFAULT_WIFI_SSID           "HUAWEI-404"
#define DEFAULT_WIFI_PASSWORD       "zhuchengcheng123"

static const char *TAG = "wifi";

//事件通知回调函数
static wifi_event_cb    wifi_cb = NULL;

/** 事件回调函数
 * @param arg   用户传递的参数
 * @param event_base    事件类别
 * @param event_id      事件ID
 * @param event_data    事件携带的数据
 * @return 无
*/
static void event_handler(void* arg, esp_event_base_t event_base,int32_t event_id, void* event_data)
{   
    if(event_base == WIFI_EVENT)
    {
        switch (event_id)
        {
        case WIFI_EVENT_STA_START:      //WIFI以STA模式启动后触发此事件
            esp_wifi_connect();         //启动WIFI连接
            break;
        case WIFI_EVENT_STA_CONNECTED:  //WIFI连上路由器后,触发此事件
            ESP_LOGI(TAG, "connected to AP");
            break;
        case WIFI_EVENT_STA_DISCONNECTED:   //WIFI从路由器断开连接后触发此事件
            esp_wifi_connect();             //继续重连
            ESP_LOGI(TAG,"connect to the AP fail,retry now");
            break;
        default:
            break;
        }
    }
    if(event_base == IP_EVENT)                  //IP相关事件
    {
        switch(event_id)
        {
            case IP_EVENT_STA_GOT_IP:           //只有获取到路由器分配的IP,才认为是连上了路由器
                if(wifi_cb)
                    wifi_cb(WIFI_CONNECTED);
                ESP_LOGI(TAG,"get ip address");
                break;
        }
    }
}


//WIFI STA初始化
esp_err_t wifi_sta_init(wifi_event_cb f)
{   
    ESP_ERROR_CHECK(esp_netif_init());  //用于初始化tcpip协议栈
    ESP_ERROR_CHECK(esp_event_loop_create_default());       //创建一个默认系统事件调度循环,之后可以注册回调函数来处理系统的一些事件
    esp_netif_create_default_wifi_sta();    //使用默认配置创建STA对象

    //初始化WIFI
    wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT();
    ESP_ERROR_CHECK(esp_wifi_init(&cfg));
    
    //注册事件
    ESP_ERROR_CHECK(esp_event_handler_register(WIFI_EVENT,ESP_EVENT_ANY_ID,&event_handler,NULL));
    ESP_ERROR_CHECK(esp_event_handler_register(IP_EVENT,IP_EVENT_STA_GOT_IP,&event_handler,NULL));

    //WIFI配置
    wifi_config_t wifi_config = 
    { 
        .sta = 
        { 
            .ssid = DEFAULT_WIFI_SSID,              //WIFI的SSID
            .password = DEFAULT_WIFI_PASSWORD,      //WIFI密码
	        .threshold.authmode = WIFI_AUTH_WPA2_PSK,   //加密方式
            
            .pmf_cfg = 
            {
                .capable = true,
                .required = false
            },
        },
    };
    wifi_cb = f;
    //启动WIFI
    ESP_ERROR_CHECK(esp_wifi_set_mode(WIFI_MODE_STA) );         //设置工作模式为STA
    ESP_ERROR_CHECK(esp_wifi_set_config(WIFI_IF_STA, &wifi_config) );   //设置wifi配置
    ESP_ERROR_CHECK(esp_wifi_start() );                         //启动WIFI
    
    ESP_LOGI(TAG, "wifi_init_sta finished.");
    return ESP_OK;
}

simple_wifi_sta.h

#ifndef _WIFI_MANAGER_H_
#define _WIFI_MANAGER_H_
#include "esp_err.h"

typedef enum
{
    WIFI_DISCONNECTED,      //wifi断开
    WIFI_CONNECTED,         //wifi已连接
}WIFI_EV_e;

typedef void(*wifi_event_cb)(WIFI_EV_e);

//WIFI STA初始化
esp_err_t wifi_sta_init(wifi_event_cb f);




#endif

配置远程服务器

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

Logo

智能硬件社区聚焦AI智能硬件技术生态,汇聚嵌入式AI、物联网硬件开发者,打造交流分享平台,同步全国赛事资讯、开展 OPC 核心人才招募,助力技术落地与开发者成长。

更多推荐