hi3863配套的源码版本非常好,系统已经内置了paho.mqtt.c,不需要再做移植的工作,直接引入函数就行,真是太贴心了。

MQTT 协议,它是一种规则,并不是一个独立的技术,是基于TCP/IP协议之上。具体协议看站内的各种教程贴吧,我说也说不好。

我简单归纳成3个主要功能

// 订阅 topic主题
uint8_t Mqtt_Subscribe_Topic(char *topic);

// 发布 topic主题 buff内容
uint8_t Mqtt_Publishing_Content(char *topic, char *buff);

// 连接  server_url服务端地址 client_id客户端id username访问用户名 password访问密码
uint8_t Mqtt_Connect(char *server_url, char *client_id, char *username, char *password);

第一个部分,MQTT 客户端连接 服务端。

输入 服务端 地址,ID,用户名,密码, 连接到服务端,这里设置3个回调。

// 交付 完成
static void MQTTClient_Dlivery_Complete(void *context, MQTTClient_deliveryToken dt);

// 消息 已到达
static int MQTTClient_Message_Arrived(void *context, char *topicName, int topicLen, MQTTClient_message *message);

// 连接 中断
static void MQTTClient_Connection_Lost(void *context, char *cause);

MQTT 是异步通讯,3种主要状态,发送信息结果,接收信息结果,发送中断连接,当这些状态发生变化时会通过回调函数反馈回来。

主要重点关注消息到达的回调,当接收到订阅消息会通过信息到达回调函数返回出结果。

第二部分订阅主题,发送个主题。

第三部分发布消息,一个主题参数,带发送的信息,就可以了。

这么归纳一下使用起来就很简单了,3步,先连接,然后订阅,然后发送。

做软件一直被叮嘱解耦,就是让函数相对独立,方便以后的移植重用。所以MQTT函数的所有参数都是外部输入,接收到的信息通过内部一个读取函数调取。

// 获取 内容
uint8_t Mqtt_Get_Payload(char *buff);

最后再做个清场退出。顺便做个错误信息打印。现在有种风格就是减少提示信息,我也挺喜欢的,如果正常就不提示,如果出错,选择是否打印错误信息。

// 关闭
void Mqtt_Close(void);


// 打印 错误信息
void Mqtt_Print_Error(uint8_t error);

上一篇写过WiFi-STA模式连接WiFi,那个文件也要拿过来,要先连网,然后mqtt初始化,这样mqtt就是拥有了系统的网络能力。这个系统这里做的太好了,再不用自己瞎搞了。

整体目录结构

编译配置文件 \application\samples\hi3863\mqtt_test\mqtt_client_t2\CMakeLists.txt 的内容

set(SOURCES_LIST
    ${CMAKE_CURRENT_SOURCE_DIR}/entry.c
    ${CMAKE_CURRENT_SOURCE_DIR}/wifi_sta.c
    ${CMAKE_CURRENT_SOURCE_DIR}/mqtt_client.c
)

set(PUBLIC_HEADER_LIST
    ${CMAKE_CURRENT_SOURCE_DIR}
)

set(SOURCES "${SOURCES_LIST}" PARENT_SCOPE)
set(PUBLIC_HEADER "${PUBLIC_HEADER_LIST}" PARENT_SCOPE)

mqtt_client.c 完整代码

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "common_def.h"
#include "soc_osal.h"
#include "app_init.h"
#include "osal_addr.h"
#include "cmsis_os2.h"

#include "cJSON.h"
#include "MQTTClient.h"
#include "MQTTClientPersistence.h"

#include "mqtt_client.h"


#define QOS         1               // 至少交付一次,保证消息到达
#define TIMEOUT     1000L          // 超时时间

// 初始化 能够得到系统的网络能力 
extern int MQTTClient_init(void);

// 客户端
static MQTTClient mqttClient;

// 传递令牌 通过这个对象,来确认消息是否成功发送,或者处理发送失败的情况‌
static MQTTClient_deliveryToken deliveryToken;

// 接收 标志
static uint8_t rec_flag = 0;

// 接收到的数据
static char payload_buff[32] = {0};

// 交付 完成 回调
static void MQTTClient_Dlivery_Complete(void *context, MQTTClient_deliveryToken dt)
{
    (void)context;
    // 令牌值为 %d 的消息已确认送达
    // osal_printk(" :: [MQTTClient_Dlivery_Complete] dt = %d \n", dt);

    deliveryToken = dt;
}

// 消息 已到达 回调
static int MQTTClient_Message_Arrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
    (void)context;
    (void)topicLen;
    // osal_printk(" :: [MQTTClient_Message_Arrived] \n");

    osal_printk(" :: topicName = %s \n", topicName);
    osal_printk(" :: payloadlen = %d \n", message->payloadlen);

    memset(payload_buff, 0, 32);

    char *payload_str = message->payload;

    for(int i=0; i<message->payloadlen; i++)
    {
        payload_buff[i] = payload_str[i];
    }

    // 标志 为1 表明收到
    rec_flag = 1;

    MQTTClient_freeMessage(&message);
    MQTTClient_free(topicName);

    return 1;
}

// 连接 中断 回调
static void MQTTClient_Connection_Lost(void *context, char *cause)
{
    (void)context;
    // osal_printk(" :: [MQTTClient_Message_Arrived] \n");
    osal_printk(" :: cause = %s \n", cause);
}

// 订阅 topic主题
uint8_t Mqtt_Subscribe_Topic(char *topic)
{
    // 订阅主题
    if(MQTTClient_subscribe(mqttClient, topic, QOS) != MQTTCLIENT_SUCCESS)
    {
        return 4;
    }

    return 0;
}

// 发布 topic主题 buff内容
uint8_t Mqtt_Publishing_Content(char *topic, char *buff)
{
    MQTTClient_message message = MQTTClient_message_initializer;

    message.payload = buff;
    message.payloadlen = (int)strlen(buff);
    message.qos = QOS;
    message.retained = 0;
    deliveryToken = 0;    

    // 发布 主题 内容
    if(MQTTClient_publishMessage(mqttClient, topic, &message, &deliveryToken) != MQTTCLIENT_SUCCESS)
    {
        return 5;
    }

    // 等待 发布 完毕
    if(MQTTClient_waitForCompletion(mqttClient, deliveryToken, TIMEOUT) != MQTTCLIENT_SUCCESS)
    {
        return 6;
    }

    return 0;
}

// 连接  server_url服务端地址 client_id客户端id username访问用户名 password访问密码
uint8_t Mqtt_Connect(char *server_url, char *client_id, char *username, char *password)
{
    MQTTClient_init();

    // 创建客户端
    if(MQTTClient_create(&mqttClient, server_url, client_id, MQTTCLIENT_PERSISTENCE_NONE, NULL) != MQTTCLIENT_SUCCESS)
    {
        return 1;
    }

    // 设置回调函数
    if(MQTTClient_setCallbacks(mqttClient, NULL, MQTTClient_Connection_Lost, MQTTClient_Message_Arrived, MQTTClient_Dlivery_Complete) != MQTTCLIENT_SUCCESS)
    {
        return 2;
    }

    // 连接选项 先简单初始化
    static MQTTClient_connectOptions connectOptions = MQTTClient_connectOptions_initializer;

    // 以秒为单位的“保持活动”间隔定义了客户端和服务器之间没有通信时应经过的最长时间。
    connectOptions.keepAliveInterval = 80;

    // 这是一个布尔值。cleanssession设置控制客户端和服务器在连接和断开连接时的行为。
    connectOptions.cleansession = 1;

    connectOptions.username = username;
    connectOptions.password = password;

    // 连接到服务器
    if(MQTTClient_connect(mqttClient, &connectOptions) != MQTTCLIENT_SUCCESS)
    {
        return 3;
    }

    return 0;
}

// 关闭
void Mqtt_Close(void)
{
    // 断开服务器连接 
    MQTTClient_disconnect(mqttClient, TIMEOUT);

    // 释放内存 
    MQTTClient_destroy(&mqttClient);
}

// 获取 内容
uint8_t Mqtt_Get_Payload(char *buff)
{
    if(rec_flag == 1)
    {
        strcpy(buff, payload_buff);
        rec_flag = 0;
        return 1;
    }

    return 0;
}

// 打印 错误信息
void Mqtt_Print_Error(uint8_t error)
{
    switch (error)
    {
        case 1:
            osal_printk(" [MQTTClient_create] FAILURE! \n");        
            break;
        case 2:
            osal_printk(" [MQTTClient_setCallbacks] FAILURE! \n");   
            break;
        case 3:
            osal_printk(" [MQTTClient_connect] FAILURE! \n");       
            break;
        case 4:
            osal_printk(" [MQTTClient_subscribe] FAILURE! \n");    
            break;
        case 5:
            osal_printk(" [MQTTClient_publishMessage] FAILURE! \n");         
            break;
        case 6:
            osal_printk(" [MQTTClient_waitForCompletion] FAILURE! \n");       
            break;       
        default:
            break;
    }
}

entry.c 主函数 完整代码,调用执行的的情况。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "common_def.h"
#include "soc_osal.h"
#include "app_init.h"

#include "wifi_sta.h"
#include "mqtt_client.h"

#define TASK_PRIO   6               // 任务 优先级 OSAL_TASK_PRIORITY_MIDDLE
#define TASK_SIZE   0x2000          // 任务 大小

#define AP_NAME     "WiFi名"        // wifi ap 名
#define AP_PASS     "WiFi密码"      // wifi ap 连接密码

static char ip_str[16] = {0};       // IP 地址

#define SERVER_URL      "tcp://192.168.3.23:1883"
#define CLIENT_ID       "hi3863"
#define USERNAME        "abc"
#define PASSWORD        "123"

static char topic[10] = {0};
static char send_buff[32] = {0};
static char rece_buff[32] = {0};

// 整型 转 字符串
static char *Int_To_String(int num)
{
    int int_length = sizeof(num);

    char *result = (char *)malloc((int_length + 1) * sizeof(char));

    if(result == NULL)
    {
        return NULL;
    }

    sprintf(result, "%d", num);

    return result;
}

// 连接 字符串
static char *Connection_String(char *str1, char *str2)
{
    size_t len1 = strlen(str1);
    size_t len2 = strlen(str2);

    char *result = (char *)malloc((len1 + len2 + 1) * sizeof(char));

    if(result == NULL)
    {
        return NULL;
    }

    strcpy(result, str1);
    strcpy(result + len1, str2);

    return result;
}

// 任务
static void *wifi_sta_task(const char *arg)
{
    unused(arg);

    // 开始
    osal_printk(" [wifi sta t1] start \n");

    uint8_t ret;

    // 连接 热点
    ret = WiFi_STA_Conn_AP(AP_NAME, AP_PASS, ip_str);

    if(ret == 0)
    {
        osal_printk(" [WiFi_STA_Conn_AP] SUCCESS. \n");
        osal_printk(" ip addr : %s \n", ip_str);
    }
    else
    {
        Print_WiFi_STA_Error(ret);
    }

    // 连接 mqtt 服务器 
    if(ret == 0)
    {
        // 连接
        ret = Mqtt_Connect(SERVER_URL, CLIENT_ID, USERNAME, PASSWORD);

        if(ret == 0)
        {
            osal_printk(" [Mqtt_Connect] success. \n");
        }
        else
        {
            Mqtt_Print_Error(ret);
        }
    }

    // 订阅 主题
    if(ret == 0)
    {
        // 拷贝主题 方便后门更改主题
        strcpy(topic, "test");

        // 订阅
        ret = Mqtt_Subscribe_Topic(topic);

        if(ret == 0)
        {
            osal_printk(" [Mqtt_Subscribe_Topic] success. \n");
        }
        else
        {
            Mqtt_Print_Error(ret);
        }
    }

    int n = 0;

    while(n < 10)
    {
        // 生成 1 条 字符串
        char *send_str = Connection_String("send : ", Int_To_String(n));
        osal_printk(" send_str = %s \n", send_str);

        // 复制到 发送数组
        memset(send_buff, 0, 32);
        strcpy(send_buff, send_str);

        // 发送
        ret = Mqtt_Publishing_Content(topic, send_buff);

        if(ret != 0)
        {
            Mqtt_Print_Error(ret);
            break;
        }

        // 等 2 秒
        osal_msleep(2000);

        // 查收 有没有收到数据
        ret = Mqtt_Get_Payload(rece_buff);

        // 如果 收到
        if(ret == 1)
        {
            // 打印 收到信息
            osal_printk(" rece [ %s ] \n", rece_buff);
            memset(rece_buff, 0, 32);
        }
        
        // 等 2 秒
        osal_msleep(2000);

        n++;
    }

    // 断开 mqtt 服务器
    Mqtt_Close();

    // 关闭 sta
    WiFi_STA_Close();

    // 结束 退出
    osal_printk(" [wifi sta t1] end \n");

    return NULL;
}

// 线程
static void wifi_sta_entry(void)
{
    osal_task *task_handle = NULL;

    // 锁定任务
    osal_kthread_lock();
    
    // 创建线程
    task_handle = osal_kthread_create((osal_kthread_handler)wifi_sta_task, 0, "APPTask", TASK_SIZE);

    // 创建成功
    if (task_handle != NULL)
    {
        // 优先级
        osal_kthread_set_priority(task_handle, TASK_PRIO);

        // 释放
        osal_kfree(task_handle);
    }

    // 解锁任务
    osal_kthread_unlock();
}

// 加载
app_run(wifi_sta_entry);

mqtt简单实现就完成了。

下一步就是数据的格式化,因为现实中往往发送各种类型的数据,不能每做个应用就要针对专门的数据写个转译函数吧,通过cjson帮助实现复杂数据的传输,而且cjson也已经被系统移植好了,真是太方便了。

Logo

智能硬件社区聚焦AI智能硬件技术生态,汇聚嵌入式AI、物联网硬件开发者,打造交流分享平台,同步全国赛事资讯、开展 OPC 核心人才招募,助力技术落地与开发者成长。

更多推荐