C语言实现MQTT协议
用C语言实现MQTT协议,可在嵌入式系统中应用。
·
用C语言实现MQTT协议,可在嵌入式系统中应用。
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <string.h>
#include <stdio.h>
#include <winsock2.h>
//---------------------------------------MQTT协议相关的子函数声明-------------------------------------------------------
//发布主题
unsigned char MQTT_PublishData(char* topic, char* message, unsigned char qos);
//订阅或者取消订阅主题
unsigned char MQTT_SubscribeTopic(char* topic, unsigned char qos, unsigned char whether);
//登录MQTT服务器
unsigned char MQTT_Connect(char* ClientID, char* Username, char* Password);
//MQTT协议缓冲区初始化
void MQTT_Init(void);
//调用底层接口发送数据包
void MQTT_SendBuf(unsigned char* buf, unsigned short len);
//MQTT协议里最底层的接口,最底层的如果要移植协议到其他地方运行,那么改这里就行了。其他地方不用改的。
int Client_SendData(unsigned char* buff, unsigned int len);//发送数据到服务器
int Client_GetData(unsigned char* buff);//从服务器获取数据
//---------------------------------------全局变量定义--------------------------------------------------------------------
#define BYTE0(dwTemp) (*( char *)(&dwTemp))
#define BYTE1(dwTemp) (*((char *)(&dwTemp) + 1))
#define BYTE2(dwTemp) (*((char *)(&dwTemp) + 2))
#define BYTE3(dwTemp) (*((char *)(&dwTemp) + 3))
unsigned char mqtt_rxbuf[5 * 1024];//发送数据缓存区
unsigned char mqtt_txbuf[256];//接收数据缓存区
unsigned int mqtt_rxlen;
unsigned int mqtt_txlen;
typedef enum
{
//名字 值 报文流动方向 描述
M_RESERVED1 = 0,// 禁止 保留
M_CONNECT, // 客户端到服务端 客户端请求连接服务端
M_CONNACK, // 服务端到客户端 连接报文确认
M_PUBLISH, // 两个方向都允许 发布消息
M_PUBACK, // 两个方向都允许 QoS 1消息发布收到确认
M_PUBREC, // 两个方向都允许 发布收到(保证交付第一步)
M_PUBREL, // 两个方向都允许 发布释放(保证交付第二步)
M_PUBCOMP, // 两个方向都允许 QoS 2消息发布完成(保证交互第三步)
M_SUBSCRIBE,// 客户端到服务端 客户端订阅请求
M_SUBACK, // 服务端到客户端 订阅请求报文确认
M_UNSUBSCRIBE,//客户端到服务端 客户端取消订阅请求
M_UNSUBACK, // 服务端到客户端 取消订阅报文确认
M_PINGREQ, // 客户端到服务端 心跳请求
M_PINGRESP, // 服务端到客户端 心跳响应
M_DISCONNECT,//客户端到服务端 客户端断开连接
M_RESERVED2,// 禁止 保留
}_typdef_mqtt_message;
//连接成功服务器回应 20 02 00 00
//客户端主动断开连接 e0 00
const unsigned char parket_connetAck[] = { 0x20,0x02,0x00,0x00 };
const unsigned char parket_disconnet[] = { 0xe0,0x00 };
const unsigned char parket_heart[] = { 0xc0,0x00 };
const unsigned char parket_heart_reply[] = { 0xc0,0x00 };
const unsigned char parket_subAck[] = { 0x90,0x03 };
void MQTT_Init(void)
{
//缓冲区赋值
mqtt_rxlen = sizeof(mqtt_rxbuf);
mqtt_txlen = sizeof(mqtt_txbuf);
memset(mqtt_rxbuf, 0, mqtt_rxlen);
memset(mqtt_txbuf, 0, mqtt_txlen);
}
/*
函数功能: 登录服务器
函数返回值: 0表示成功 1表示失败
*/
unsigned char MQTT_Connect(char* ClientID, char* Username, char* Password)
{
unsigned short i, j;
int ClientIDLen = (int)strlen(ClientID);
int UsernameLen = (int)strlen(Username);
int PasswordLen = (int)strlen(Password);
unsigned int DataLen;
mqtt_txlen = 0;
unsigned int size = 0;
unsigned char buff[256];
//可变报头+Payload 每个字段包含两个字节的长度标识
DataLen = 10 + (ClientIDLen + 2) + (UsernameLen + 2) + (PasswordLen + 2);
//固定报头
//控制报文类型
mqtt_txbuf[mqtt_txlen++] = 0x10; //MQTT Message Type CONNECT
//剩余长度(不包括固定头部)
do
{
unsigned char encodedByte = DataLen%0x80;
DataLen = DataLen/0x80;
// if there are more data to encode, set the top bit of this byte
if (DataLen > 0)
encodedByte = encodedByte|0x80;
mqtt_txbuf[mqtt_txlen++] = encodedByte;
} while (DataLen > 0);
//可变报头
//协议名
mqtt_txbuf[mqtt_txlen++] = 0; // Protocol Name Length MSB
mqtt_txbuf[mqtt_txlen++] = 4; // Protocol Name Length LSB
mqtt_txbuf[mqtt_txlen++] = 'M'; // ASCII Code for M
mqtt_txbuf[mqtt_txlen++] = 'Q'; // ASCII Code for Q
mqtt_txbuf[mqtt_txlen++] = 'T'; // ASCII Code for T
mqtt_txbuf[mqtt_txlen++] = 'T'; // ASCII Code for T
//协议级别
mqtt_txbuf[mqtt_txlen++] = 4; // MQTT Protocol version = 4
//连接标志
mqtt_txbuf[mqtt_txlen++] = 0xc2; // conn flags
mqtt_txbuf[mqtt_txlen++] = 0; // Keep-alive Time Length MSB
mqtt_txbuf[mqtt_txlen++] = 100; // Keep-alive Time Length LSB 100S心跳包
mqtt_txbuf[mqtt_txlen++] = BYTE1(ClientIDLen);// Client ID length MSB
mqtt_txbuf[mqtt_txlen++] = BYTE0(ClientIDLen);// Client ID length LSB
memcpy(&mqtt_txbuf[mqtt_txlen], ClientID, ClientIDLen);
mqtt_txlen += ClientIDLen;
if (UsernameLen > 0)
{
mqtt_txbuf[mqtt_txlen++] = BYTE1(UsernameLen); //username length MSB
mqtt_txbuf[mqtt_txlen++] = BYTE0(UsernameLen); //username length LSB
memcpy(&mqtt_txbuf[mqtt_txlen], Username, UsernameLen);
mqtt_txlen += UsernameLen;
}
if (PasswordLen > 0)
{
mqtt_txbuf[mqtt_txlen++] = BYTE1(PasswordLen); //password length MSB
mqtt_txbuf[mqtt_txlen++] = BYTE0(PasswordLen); //password length LSB
memcpy(&mqtt_txbuf[mqtt_txlen], Password, PasswordLen);
mqtt_txlen += PasswordLen;
}
for (i = 0; i < 5; i++)
{
memset(mqtt_rxbuf, 0, mqtt_rxlen);
MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
size = Client_GetData(buff);//从服务器获取数据
if (size <= 0)continue;
memcpy(mqtt_rxbuf, buff, size);
printf("connect response:\r\n");
for (j = 0; j < size; j++)
{
printf("0x%0X ", buff[j]);
}
printf("\r\n");
if (mqtt_rxbuf[0] == parket_connetAck[0] && mqtt_rxbuf[1] == parket_connetAck[1]) //连接成功
{
return 0;//连接成功
}
}
return 1;
}
/*
函数功能: MQTT订阅/取消订阅数据打包函数
函数参数:
topic 主题
qos 消息等级 0:最多分发一次 1: 至少分发一次 2: 仅分发一次
whether 订阅/取消订阅请求包 (1表示订阅,0表示取消订阅)
返回值: 0表示成功 1表示失败
*/
unsigned char MQTT_SubscribeTopic(char* topic, unsigned char qos, unsigned char whether)
{
unsigned char i, j;
mqtt_txlen = 0;
unsigned int size = 0;
unsigned char buff[256];
unsigned int topiclen = (int)strlen(topic);
unsigned int DataLen = 2 + (topiclen + 2) + (whether ? 1 : 0);//可变报头的长度(2字节)加上有效载荷的长度
//固定报头
//控制报文类型
if (whether)mqtt_txbuf[mqtt_txlen++] = 0x82; //消息类型和标志订阅
else mqtt_txbuf[mqtt_txlen++] = 0xA2; //取消订阅
//剩余长度
do
{
unsigned char encodedByte = DataLen % 128;
DataLen = DataLen / 128;
// if there are more data to encode, set the top bit of this byte
if (DataLen > 0)
encodedByte = encodedByte | 128;
mqtt_txbuf[mqtt_txlen++] = encodedByte;
} while (DataLen > 0);
//可变报头
mqtt_txbuf[mqtt_txlen++] = 0; //消息标识符 MSB
mqtt_txbuf[mqtt_txlen++] = 0x0A; //消息标识符 LSB
//有效载荷
mqtt_txbuf[mqtt_txlen++] = BYTE1(topiclen);//主题长度 MSB
mqtt_txbuf[mqtt_txlen++] = BYTE0(topiclen);//主题长度 LSB
memcpy(&mqtt_txbuf[mqtt_txlen], topic, topiclen);
mqtt_txlen += topiclen;
if (whether)
{
mqtt_txbuf[mqtt_txlen++] = qos;//QoS级别
}
for (i = 0; i < 100; i++)
{
memset(mqtt_rxbuf, 0, mqtt_rxlen);
MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
//printf("订阅消息发布成功\n");
size = Client_GetData(buff);//从服务器获取数据
if (size <= 0)
{
continue;
}
memcpy(mqtt_rxbuf, buff, size);
printf("subscriber response:\r\n");
for (j = 0; j < size; j++)
{
printf("0x%0X ", buff[j]);
}
printf("\r\n");
if (mqtt_rxbuf[0] == parket_subAck[0] && mqtt_rxbuf[1] == parket_subAck[1]) //连接成功
{
return 0;//连接成功
}
Sleep(1000);
}
return 1; //失败
}
//MQTT发布数据打包函数
//topic 主题
//message 消息
//qos 消息等级
unsigned char MQTT_PublishData(char* topic, char* message, unsigned char qos)
{
unsigned int topicLength = (int)strlen(topic);
unsigned int messageLength = (int)strlen(message);
unsigned short id = 0;
unsigned int DataLen;
mqtt_txlen = 0;
//printf(" report JSON len:%d\r\n", messageLength);
//printf("message=%s\r\n", message);
//有效载荷的长度这样计算:用固定报头中的剩余长度字段的值减去可变报头的长度
//QOS为0时没有标识符
//数据长度 主题名 报文标识符 有效载荷
if (qos) DataLen = (2 + topicLength) + 2 + messageLength;
else DataLen = (2 + topicLength) + messageLength;
//固定报头
//控制报文类型
mqtt_txbuf[mqtt_txlen++] = 0x30; // MQTT Message Type PUBLISH
//剩余长度
do
{
unsigned char encodedByte = DataLen % 128;
DataLen = DataLen / 128;
// if there are more data to encode, set the top bit of this byte
if (DataLen > 0)
encodedByte = encodedByte | 128;
mqtt_txbuf[mqtt_txlen++] = encodedByte;
} while (DataLen > 0);
mqtt_txbuf[mqtt_txlen++] = BYTE1(topicLength);//主题长度MSB
mqtt_txbuf[mqtt_txlen++] = BYTE0(topicLength);//主题长度LSB
memcpy(&mqtt_txbuf[mqtt_txlen], topic, topicLength);//拷贝主题
mqtt_txlen += topicLength;
//报文标识符
if (qos)
{
mqtt_txbuf[mqtt_txlen++] = BYTE1(id);
mqtt_txbuf[mqtt_txlen++] = BYTE0(id);
id++;
}
memcpy(&mqtt_txbuf[mqtt_txlen], message, messageLength);
mqtt_txlen += messageLength;
MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
return mqtt_txlen;
}
void MQTT_SendBuf(unsigned char* buf, unsigned short len)
{
Client_SendData(buf, len);//发送数据到服务器
}
//-----------------------------------------MQTT服务器的参数------------------------------------------------------------
//服务器IP
#define SERVER_IP "127.0.0.1"
#define SERVER_PORT 1883 //端口号
//MQTT三元组
#define ClientID "stm32_0"
#define Username "stm32"
#define Password "123456"//密文
//订阅主题:
#define SET_TOPIC "test/#"//订阅
//发布主题:
#define POST_TOPIC "test/2"//发布
//-----------------------------------------主函数------------------------------------------------------------
char mqtt_message[1024];//数据缓存区
SOCKET connectSocket; //网络套接字
WSADATA wsaData; //创建一个结构体变量,用于存储关于Winsock库的信息
double TEMP = 10.0;
int main()
{
int result = WSAStartup(MAKEWORD(2, 2), &wsaData); //初始化Winsock库,指定版本号2.2,检查返回值
if (result != 0)
{
printf("WSAStartup failed: %d\r\n", result);//输出错误信息并退出程序
return 1;
}
connectSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); //创建一个TCP套接字,检查返回值
if (connectSocket == INVALID_SOCKET)
{
printf("socket failed with error: %d", WSAGetLastError());//输出错误信息并退出程序
WSACleanup(); //清除Winsock库
return 1;
}
struct sockaddr_in service; //创建一个结构体变量,用于存储服务器地址信息
service.sin_family = AF_INET; //指定地址族为IPv4
service.sin_port = htons(SERVER_PORT); //将端口号从主机字节序转换为网络字节序,并存储在结构体中
service.sin_addr.S_un.S_addr = inet_addr(SERVER_IP);
result = connect(connectSocket, (SOCKADDR*)&service, sizeof(SOCKADDR)); //连接到服务器,检查返回值
if (result == SOCKET_ERROR)
{
printf("connect fail\r\n"); //输出错误信息并退出程序
closesocket(connectSocket); //关闭套接字
WSACleanup(); //清除Winsock库
return 1;
}
printf("Connected to server ok\r\n" ); //连接成功,输出消息
MQTT_Init();
while (1)
{
/*登录服务器*/
if (MQTT_Connect((char*)ClientID, (char*)Username, (char*)Password) == 0)
{
break;
}
// 延时1000毫秒,即1秒
Sleep(1000);
printf("MQTT connect....\n");
}
printf("MQTT connect\r\n");
//订阅物联网平台数据
int stat = MQTT_SubscribeTopic((char*)SET_TOPIC, 1, 1);
if (stat)
{
printf("subscribertopic fail\r\n");
closesocket(connectSocket); //关闭套接字
WSACleanup(); //清除Winsock库
return 1;
}
printf("subscribertopic ok\r\n");
/*创建线程*/
while (1)
{
sprintf(mqtt_message, "{\"services\": [{\"service_id\": \"stm32\",\"properties\":{\"TEMP\":%.1f}}]}", (double)(TEMP+=0.2));//温度
//发布主题
MQTT_PublishData((char*)POST_TOPIC, mqtt_message, 0);
char recbuf[4096]={0};
int ret = Client_GetData(recbuf);
if(ret)
{
printf("%s\r\n",recbuf+6);
}
Sleep(100);
}
}
/*发送数据到服务器*/
int Client_SendData(unsigned char* buff, unsigned int len)
{
int result = send(connectSocket,(const char*)buff, len, 0); //向服务器发送数据,检查返回值
if (result == SOCKET_ERROR)
{
printf("send failed with error\r\n"); //输出错误信息并退出程序
return -1;
}
return 0;
}
/*获取服务器下发数据*/
int Client_GetData(unsigned char* buff)
{
int result = recv(connectSocket, (char*)buff,200, 0); //从服务器接收数据,检查返回值
if (result == SOCKET_ERROR)
{
printf("recv failed with error\r\n") ; //输出错误信息并退出程序
return -1;
}
return result;
}
更多推荐



所有评论(0)