Hi3863 MQTT 客户端
下一步就是数据的格式化,因为现实中往往发送各种类型的数据,不能每做个应用就要针对专门的数据写个转译函数吧,通过cjson帮助实现复杂数据的传输,而且cjson也已经被系统移植好了,真是太方便了。hi3863配套的源码版本非常好,系统已经内置了paho.mqtt.c,不需要再做移植的工作,直接引入函数就行,真是太贴心了。MQTT 是异步通讯,3种主要状态,发送信息结果,接收信息结果,发送中断连接,当
hi3863配套的源码版本非常好,系统已经内置了paho.mqtt.c,不需要再做移植的工作,直接引入函数就行,真是太贴心了。
MQTT 协议,它是一种规则,并不是一个独立的技术,是基于TCP/IP协议之上。具体协议看站内的各种教程贴吧,我说也说不好。
我简单归纳成3个主要功能
// 订阅 topic主题
uint8_t Mqtt_Subscribe_Topic(char *topic);
// 发布 topic主题 buff内容
uint8_t Mqtt_Publishing_Content(char *topic, char *buff);
// 连接 server_url服务端地址 client_id客户端id username访问用户名 password访问密码
uint8_t Mqtt_Connect(char *server_url, char *client_id, char *username, char *password);
第一个部分,MQTT 客户端连接 服务端。
输入 服务端 地址,ID,用户名,密码, 连接到服务端,这里设置3个回调。
// 交付 完成
static void MQTTClient_Dlivery_Complete(void *context, MQTTClient_deliveryToken dt);
// 消息 已到达
static int MQTTClient_Message_Arrived(void *context, char *topicName, int topicLen, MQTTClient_message *message);
// 连接 中断
static void MQTTClient_Connection_Lost(void *context, char *cause);
MQTT 是异步通讯,3种主要状态,发送信息结果,接收信息结果,发送中断连接,当这些状态发生变化时会通过回调函数反馈回来。
主要重点关注消息到达的回调,当接收到订阅消息会通过信息到达回调函数返回出结果。
第二部分订阅主题,发送个主题。
第三部分发布消息,一个主题参数,带发送的信息,就可以了。
这么归纳一下使用起来就很简单了,3步,先连接,然后订阅,然后发送。
做软件一直被叮嘱解耦,就是让函数相对独立,方便以后的移植重用。所以MQTT函数的所有参数都是外部输入,接收到的信息通过内部一个读取函数调取。
// 获取 内容
uint8_t Mqtt_Get_Payload(char *buff);
最后再做个清场退出。顺便做个错误信息打印。现在有种风格就是减少提示信息,我也挺喜欢的,如果正常就不提示,如果出错,选择是否打印错误信息。
// 关闭
void Mqtt_Close(void);
// 打印 错误信息
void Mqtt_Print_Error(uint8_t error);
上一篇写过WiFi-STA模式连接WiFi,那个文件也要拿过来,要先连网,然后mqtt初始化,这样mqtt就是拥有了系统的网络能力。这个系统这里做的太好了,再不用自己瞎搞了。
整体目录结构

编译配置文件 \application\samples\hi3863\mqtt_test\mqtt_client_t2\CMakeLists.txt 的内容
set(SOURCES_LIST
${CMAKE_CURRENT_SOURCE_DIR}/entry.c
${CMAKE_CURRENT_SOURCE_DIR}/wifi_sta.c
${CMAKE_CURRENT_SOURCE_DIR}/mqtt_client.c
)
set(PUBLIC_HEADER_LIST
${CMAKE_CURRENT_SOURCE_DIR}
)
set(SOURCES "${SOURCES_LIST}" PARENT_SCOPE)
set(PUBLIC_HEADER "${PUBLIC_HEADER_LIST}" PARENT_SCOPE)
mqtt_client.c 完整代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "common_def.h"
#include "soc_osal.h"
#include "app_init.h"
#include "osal_addr.h"
#include "cmsis_os2.h"
#include "cJSON.h"
#include "MQTTClient.h"
#include "MQTTClientPersistence.h"
#include "mqtt_client.h"
#define QOS 1 // 至少交付一次,保证消息到达
#define TIMEOUT 1000L // 超时时间
// 初始化 能够得到系统的网络能力
extern int MQTTClient_init(void);
// 客户端
static MQTTClient mqttClient;
// 传递令牌 通过这个对象,来确认消息是否成功发送,或者处理发送失败的情况
static MQTTClient_deliveryToken deliveryToken;
// 接收 标志
static uint8_t rec_flag = 0;
// 接收到的数据
static char payload_buff[32] = {0};
// 交付 完成 回调
static void MQTTClient_Dlivery_Complete(void *context, MQTTClient_deliveryToken dt)
{
(void)context;
// 令牌值为 %d 的消息已确认送达
// osal_printk(" :: [MQTTClient_Dlivery_Complete] dt = %d \n", dt);
deliveryToken = dt;
}
// 消息 已到达 回调
static int MQTTClient_Message_Arrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
(void)context;
(void)topicLen;
// osal_printk(" :: [MQTTClient_Message_Arrived] \n");
osal_printk(" :: topicName = %s \n", topicName);
osal_printk(" :: payloadlen = %d \n", message->payloadlen);
memset(payload_buff, 0, 32);
char *payload_str = message->payload;
for(int i=0; i<message->payloadlen; i++)
{
payload_buff[i] = payload_str[i];
}
// 标志 为1 表明收到
rec_flag = 1;
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
// 连接 中断 回调
static void MQTTClient_Connection_Lost(void *context, char *cause)
{
(void)context;
// osal_printk(" :: [MQTTClient_Message_Arrived] \n");
osal_printk(" :: cause = %s \n", cause);
}
// 订阅 topic主题
uint8_t Mqtt_Subscribe_Topic(char *topic)
{
// 订阅主题
if(MQTTClient_subscribe(mqttClient, topic, QOS) != MQTTCLIENT_SUCCESS)
{
return 4;
}
return 0;
}
// 发布 topic主题 buff内容
uint8_t Mqtt_Publishing_Content(char *topic, char *buff)
{
MQTTClient_message message = MQTTClient_message_initializer;
message.payload = buff;
message.payloadlen = (int)strlen(buff);
message.qos = QOS;
message.retained = 0;
deliveryToken = 0;
// 发布 主题 内容
if(MQTTClient_publishMessage(mqttClient, topic, &message, &deliveryToken) != MQTTCLIENT_SUCCESS)
{
return 5;
}
// 等待 发布 完毕
if(MQTTClient_waitForCompletion(mqttClient, deliveryToken, TIMEOUT) != MQTTCLIENT_SUCCESS)
{
return 6;
}
return 0;
}
// 连接 server_url服务端地址 client_id客户端id username访问用户名 password访问密码
uint8_t Mqtt_Connect(char *server_url, char *client_id, char *username, char *password)
{
MQTTClient_init();
// 创建客户端
if(MQTTClient_create(&mqttClient, server_url, client_id, MQTTCLIENT_PERSISTENCE_NONE, NULL) != MQTTCLIENT_SUCCESS)
{
return 1;
}
// 设置回调函数
if(MQTTClient_setCallbacks(mqttClient, NULL, MQTTClient_Connection_Lost, MQTTClient_Message_Arrived, MQTTClient_Dlivery_Complete) != MQTTCLIENT_SUCCESS)
{
return 2;
}
// 连接选项 先简单初始化
static MQTTClient_connectOptions connectOptions = MQTTClient_connectOptions_initializer;
// 以秒为单位的“保持活动”间隔定义了客户端和服务器之间没有通信时应经过的最长时间。
connectOptions.keepAliveInterval = 80;
// 这是一个布尔值。cleanssession设置控制客户端和服务器在连接和断开连接时的行为。
connectOptions.cleansession = 1;
connectOptions.username = username;
connectOptions.password = password;
// 连接到服务器
if(MQTTClient_connect(mqttClient, &connectOptions) != MQTTCLIENT_SUCCESS)
{
return 3;
}
return 0;
}
// 关闭
void Mqtt_Close(void)
{
// 断开服务器连接
MQTTClient_disconnect(mqttClient, TIMEOUT);
// 释放内存
MQTTClient_destroy(&mqttClient);
}
// 获取 内容
uint8_t Mqtt_Get_Payload(char *buff)
{
if(rec_flag == 1)
{
strcpy(buff, payload_buff);
rec_flag = 0;
return 1;
}
return 0;
}
// 打印 错误信息
void Mqtt_Print_Error(uint8_t error)
{
switch (error)
{
case 1:
osal_printk(" [MQTTClient_create] FAILURE! \n");
break;
case 2:
osal_printk(" [MQTTClient_setCallbacks] FAILURE! \n");
break;
case 3:
osal_printk(" [MQTTClient_connect] FAILURE! \n");
break;
case 4:
osal_printk(" [MQTTClient_subscribe] FAILURE! \n");
break;
case 5:
osal_printk(" [MQTTClient_publishMessage] FAILURE! \n");
break;
case 6:
osal_printk(" [MQTTClient_waitForCompletion] FAILURE! \n");
break;
default:
break;
}
}
entry.c 主函数 完整代码,调用执行的的情况。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "common_def.h"
#include "soc_osal.h"
#include "app_init.h"
#include "wifi_sta.h"
#include "mqtt_client.h"
#define TASK_PRIO 6 // 任务 优先级 OSAL_TASK_PRIORITY_MIDDLE
#define TASK_SIZE 0x2000 // 任务 大小
#define AP_NAME "WiFi名" // wifi ap 名
#define AP_PASS "WiFi密码" // wifi ap 连接密码
static char ip_str[16] = {0}; // IP 地址
#define SERVER_URL "tcp://192.168.3.23:1883"
#define CLIENT_ID "hi3863"
#define USERNAME "abc"
#define PASSWORD "123"
static char topic[10] = {0};
static char send_buff[32] = {0};
static char rece_buff[32] = {0};
// 整型 转 字符串
static char *Int_To_String(int num)
{
int int_length = sizeof(num);
char *result = (char *)malloc((int_length + 1) * sizeof(char));
if(result == NULL)
{
return NULL;
}
sprintf(result, "%d", num);
return result;
}
// 连接 字符串
static char *Connection_String(char *str1, char *str2)
{
size_t len1 = strlen(str1);
size_t len2 = strlen(str2);
char *result = (char *)malloc((len1 + len2 + 1) * sizeof(char));
if(result == NULL)
{
return NULL;
}
strcpy(result, str1);
strcpy(result + len1, str2);
return result;
}
// 任务
static void *wifi_sta_task(const char *arg)
{
unused(arg);
// 开始
osal_printk(" [wifi sta t1] start \n");
uint8_t ret;
// 连接 热点
ret = WiFi_STA_Conn_AP(AP_NAME, AP_PASS, ip_str);
if(ret == 0)
{
osal_printk(" [WiFi_STA_Conn_AP] SUCCESS. \n");
osal_printk(" ip addr : %s \n", ip_str);
}
else
{
Print_WiFi_STA_Error(ret);
}
// 连接 mqtt 服务器
if(ret == 0)
{
// 连接
ret = Mqtt_Connect(SERVER_URL, CLIENT_ID, USERNAME, PASSWORD);
if(ret == 0)
{
osal_printk(" [Mqtt_Connect] success. \n");
}
else
{
Mqtt_Print_Error(ret);
}
}
// 订阅 主题
if(ret == 0)
{
// 拷贝主题 方便后门更改主题
strcpy(topic, "test");
// 订阅
ret = Mqtt_Subscribe_Topic(topic);
if(ret == 0)
{
osal_printk(" [Mqtt_Subscribe_Topic] success. \n");
}
else
{
Mqtt_Print_Error(ret);
}
}
int n = 0;
while(n < 10)
{
// 生成 1 条 字符串
char *send_str = Connection_String("send : ", Int_To_String(n));
osal_printk(" send_str = %s \n", send_str);
// 复制到 发送数组
memset(send_buff, 0, 32);
strcpy(send_buff, send_str);
// 发送
ret = Mqtt_Publishing_Content(topic, send_buff);
if(ret != 0)
{
Mqtt_Print_Error(ret);
break;
}
// 等 2 秒
osal_msleep(2000);
// 查收 有没有收到数据
ret = Mqtt_Get_Payload(rece_buff);
// 如果 收到
if(ret == 1)
{
// 打印 收到信息
osal_printk(" rece [ %s ] \n", rece_buff);
memset(rece_buff, 0, 32);
}
// 等 2 秒
osal_msleep(2000);
n++;
}
// 断开 mqtt 服务器
Mqtt_Close();
// 关闭 sta
WiFi_STA_Close();
// 结束 退出
osal_printk(" [wifi sta t1] end \n");
return NULL;
}
// 线程
static void wifi_sta_entry(void)
{
osal_task *task_handle = NULL;
// 锁定任务
osal_kthread_lock();
// 创建线程
task_handle = osal_kthread_create((osal_kthread_handler)wifi_sta_task, 0, "APPTask", TASK_SIZE);
// 创建成功
if (task_handle != NULL)
{
// 优先级
osal_kthread_set_priority(task_handle, TASK_PRIO);
// 释放
osal_kfree(task_handle);
}
// 解锁任务
osal_kthread_unlock();
}
// 加载
app_run(wifi_sta_entry);
mqtt简单实现就完成了。
下一步就是数据的格式化,因为现实中往往发送各种类型的数据,不能每做个应用就要针对专门的数据写个转译函数吧,通过cjson帮助实现复杂数据的传输,而且cjson也已经被系统移植好了,真是太方便了。
更多推荐



所有评论(0)