用C语言实现MQTT协议,可在嵌入式系统中应用。

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <string.h>
#include <stdio.h>
#include <winsock2.h>


//---------------------------------------MQTT协议相关的子函数声明-------------------------------------------------------
//发布主题
unsigned char MQTT_PublishData(char* topic, char* message, unsigned char qos);
//订阅或者取消订阅主题
unsigned char MQTT_SubscribeTopic(char* topic, unsigned char qos, unsigned char whether);
//登录MQTT服务器
unsigned char MQTT_Connect(char* ClientID, char* Username, char* Password);
//MQTT协议缓冲区初始化
void MQTT_Init(void);
//调用底层接口发送数据包
void MQTT_SendBuf(unsigned char* buf, unsigned short len);
//MQTT协议里最底层的接口,最底层的如果要移植协议到其他地方运行,那么改这里就行了。其他地方不用改的。
int Client_SendData(unsigned char* buff, unsigned int len);//发送数据到服务器
int Client_GetData(unsigned char* buff);//从服务器获取数据


//---------------------------------------全局变量定义--------------------------------------------------------------------
#define BYTE0(dwTemp)       (*( char *)(&dwTemp))
#define BYTE1(dwTemp)       (*((char *)(&dwTemp) + 1))
#define BYTE2(dwTemp)       (*((char *)(&dwTemp) + 2))
#define BYTE3(dwTemp)       (*((char *)(&dwTemp) + 3))


unsigned char mqtt_rxbuf[5 * 1024];//发送数据缓存区
unsigned char mqtt_txbuf[256];//接收数据缓存区
unsigned int  mqtt_rxlen;
unsigned int  mqtt_txlen;


typedef enum
{
	//名字 	     值 	报文流动方向 	描述
	M_RESERVED1 = 0,//	禁止	保留
	M_CONNECT,	//	客户端到服务端	客户端请求连接服务端
	M_CONNACK,	//	服务端到客户端	连接报文确认
	M_PUBLISH,	//	两个方向都允许	发布消息
	M_PUBACK,	//	两个方向都允许	QoS 1消息发布收到确认
	M_PUBREC,	//	两个方向都允许	发布收到(保证交付第一步)
	M_PUBREL,	//	两个方向都允许	发布释放(保证交付第二步)
	M_PUBCOMP,	//	两个方向都允许	QoS 2消息发布完成(保证交互第三步)
	M_SUBSCRIBE,//	客户端到服务端	客户端订阅请求
	M_SUBACK,	//	服务端到客户端	订阅请求报文确认
	M_UNSUBSCRIBE,//客户端到服务端	客户端取消订阅请求
	M_UNSUBACK,	//	服务端到客户端	取消订阅报文确认
	M_PINGREQ,	//	客户端到服务端	心跳请求
	M_PINGRESP,	//	服务端到客户端	心跳响应
	M_DISCONNECT,//客户端到服务端	客户端断开连接
	M_RESERVED2,//	禁止	       保留
}_typdef_mqtt_message;

//连接成功服务器回应 20 02 00 00
//客户端主动断开连接 e0 00
const unsigned char parket_connetAck[] = { 0x20,0x02,0x00,0x00 };
const unsigned char parket_disconnet[] = { 0xe0,0x00 };
const unsigned char parket_heart[] = { 0xc0,0x00 };
const unsigned char parket_heart_reply[] = { 0xc0,0x00 };
const unsigned char parket_subAck[] = { 0x90,0x03 };

void MQTT_Init(void)
{
	//缓冲区赋值
	mqtt_rxlen = sizeof(mqtt_rxbuf);
	mqtt_txlen = sizeof(mqtt_txbuf);
	memset(mqtt_rxbuf, 0, mqtt_rxlen);
	memset(mqtt_txbuf, 0, mqtt_txlen);

}
/*
函数功能: 登录服务器
函数返回值: 0表示成功 1表示失败
*/
unsigned char MQTT_Connect(char* ClientID, char* Username, char* Password)
{
	unsigned short i, j;
	int ClientIDLen = (int)strlen(ClientID);
	int UsernameLen = (int)strlen(Username);
	int PasswordLen = (int)strlen(Password);
	unsigned int DataLen;
	mqtt_txlen = 0;
	unsigned int size = 0;
	unsigned char buff[256];
	
	//可变报头+Payload  每个字段包含两个字节的长度标识
	DataLen = 10 + (ClientIDLen + 2) + (UsernameLen + 2) + (PasswordLen + 2);

	//固定报头
	//控制报文类型
	mqtt_txbuf[mqtt_txlen++] = 0x10;		//MQTT Message Type CONNECT
	//剩余长度(不包括固定头部)
	do
	{
		unsigned char encodedByte = DataLen%0x80;
		DataLen = DataLen/0x80;
		// if there are more data to encode, set the top bit of this byte
		if (DataLen > 0)
			encodedByte = encodedByte|0x80;
		mqtt_txbuf[mqtt_txlen++] = encodedByte;
	} while (DataLen > 0);

	//可变报头
	//协议名
	mqtt_txbuf[mqtt_txlen++] = 0;        	// Protocol Name Length MSB    
	mqtt_txbuf[mqtt_txlen++] = 4;           // Protocol Name Length LSB    
	mqtt_txbuf[mqtt_txlen++] = 'M';        	// ASCII Code for M    
	mqtt_txbuf[mqtt_txlen++] = 'Q';        	// ASCII Code for Q    
	mqtt_txbuf[mqtt_txlen++] = 'T';        	// ASCII Code for T    
	mqtt_txbuf[mqtt_txlen++] = 'T';        	// ASCII Code for T    
	//协议级别
	mqtt_txbuf[mqtt_txlen++] = 4;        		// MQTT Protocol version = 4    
	//连接标志
	mqtt_txbuf[mqtt_txlen++] = 0xc2;        	// conn flags 
	mqtt_txbuf[mqtt_txlen++] = 0;        		// Keep-alive Time Length MSB    
	mqtt_txbuf[mqtt_txlen++] = 100;        	// Keep-alive Time Length LSB  100S心跳包  

	mqtt_txbuf[mqtt_txlen++] = BYTE1(ClientIDLen);// Client ID length MSB    
	mqtt_txbuf[mqtt_txlen++] = BYTE0(ClientIDLen);// Client ID length LSB  	
	memcpy(&mqtt_txbuf[mqtt_txlen], ClientID, ClientIDLen);
	mqtt_txlen += ClientIDLen;
	if (UsernameLen > 0)
	{
		mqtt_txbuf[mqtt_txlen++] = BYTE1(UsernameLen);		//username length MSB    
		mqtt_txbuf[mqtt_txlen++] = BYTE0(UsernameLen);    	//username length LSB    
		memcpy(&mqtt_txbuf[mqtt_txlen], Username, UsernameLen);
		mqtt_txlen += UsernameLen;
	}

	if (PasswordLen > 0)
	{
		mqtt_txbuf[mqtt_txlen++] = BYTE1(PasswordLen);		//password length MSB    
		mqtt_txbuf[mqtt_txlen++] = BYTE0(PasswordLen);    	//password length LSB  
		memcpy(&mqtt_txbuf[mqtt_txlen], Password, PasswordLen);
		mqtt_txlen += PasswordLen;
	}
	for (i = 0; i < 5; i++)
	{
		memset(mqtt_rxbuf, 0, mqtt_rxlen);
		MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
		size = Client_GetData(buff);//从服务器获取数据
		if (size <= 0)continue;
		memcpy(mqtt_rxbuf, buff, size);

		printf("connect response:\r\n");
		for (j = 0; j < size; j++)
		{
			printf("0x%0X ", buff[j]);
		}
		printf("\r\n");

		if (mqtt_rxbuf[0] == parket_connetAck[0] && mqtt_rxbuf[1] == parket_connetAck[1]) //连接成功			   
		{
			return 0;//连接成功
		}
	}
	return 1;
}

/*
函数功能: MQTT订阅/取消订阅数据打包函数
函数参数:
	topic       主题
	qos         消息等级 0:最多分发一次  1: 至少分发一次  2: 仅分发一次
	whether     订阅/取消订阅请求包 (1表示订阅,0表示取消订阅)
返回值: 0表示成功 1表示失败
*/
unsigned char MQTT_SubscribeTopic(char* topic, unsigned char qos, unsigned char whether)
{
	unsigned char i, j;
	mqtt_txlen = 0;
	unsigned int size = 0;
	unsigned char buff[256];
	unsigned int topiclen = (int)strlen(topic);
	unsigned int DataLen = 2 + (topiclen + 2) + (whether ? 1 : 0);//可变报头的长度(2字节)加上有效载荷的长度
	//固定报头
	//控制报文类型
	if (whether)mqtt_txbuf[mqtt_txlen++] = 0x82; //消息类型和标志订阅
	else	mqtt_txbuf[mqtt_txlen++] = 0xA2;    //取消订阅
	//剩余长度
	do
	{
		unsigned char encodedByte = DataLen % 128;
		DataLen = DataLen / 128;
		// if there are more data to encode, set the top bit of this byte
		if (DataLen > 0)
			encodedByte = encodedByte | 128;
		mqtt_txbuf[mqtt_txlen++] = encodedByte;
	} while (DataLen > 0);
	//可变报头
	mqtt_txbuf[mqtt_txlen++] = 0;			//消息标识符 MSB
	mqtt_txbuf[mqtt_txlen++] = 0x0A;        //消息标识符 LSB
	//有效载荷
	mqtt_txbuf[mqtt_txlen++] = BYTE1(topiclen);//主题长度 MSB
	mqtt_txbuf[mqtt_txlen++] = BYTE0(topiclen);//主题长度 LSB   
	memcpy(&mqtt_txbuf[mqtt_txlen], topic, topiclen);
	mqtt_txlen += topiclen;
	if (whether)
	{
		mqtt_txbuf[mqtt_txlen++] = qos;//QoS级别
	}
	for (i = 0; i < 100; i++)
	{
		memset(mqtt_rxbuf, 0, mqtt_rxlen);
		MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
		//printf("订阅消息发布成功\n");
		size = Client_GetData(buff);//从服务器获取数据
		if (size <= 0)
		{
			continue;
		}
		memcpy(mqtt_rxbuf, buff, size);

		printf("subscriber response:\r\n");
		for (j = 0; j < size; j++)
		{
			printf("0x%0X ", buff[j]);
		}
		printf("\r\n");

		if (mqtt_rxbuf[0] == parket_subAck[0] && mqtt_rxbuf[1] == parket_subAck[1]) //连接成功			   
		{
			return 0;//连接成功
		}
		Sleep(1000);
	}
	return 1; //失败
}



//MQTT发布数据打包函数
//topic   主题 
//message 消息
//qos     消息等级 
unsigned char MQTT_PublishData(char* topic, char* message, unsigned char qos)
{
	unsigned int topicLength = (int)strlen(topic);
	unsigned int messageLength = (int)strlen(message);
	unsigned short id = 0;
	unsigned int DataLen;
	mqtt_txlen = 0;

	//printf(" report JSON len:%d\r\n", messageLength);
	//printf("message=%s\r\n", message);
	//有效载荷的长度这样计算:用固定报头中的剩余长度字段的值减去可变报头的长度
	//QOS为0时没有标识符
	//数据长度             主题名   报文标识符   有效载荷
	if (qos)	DataLen = (2 + topicLength) + 2 + messageLength;
	else	DataLen = (2 + topicLength) + messageLength;

	//固定报头
	//控制报文类型
	mqtt_txbuf[mqtt_txlen++] = 0x30;    // MQTT Message Type PUBLISH  

	//剩余长度
	do
	{
		unsigned char encodedByte = DataLen % 128;
		DataLen = DataLen / 128;
		// if there are more data to encode, set the top bit of this byte
		if (DataLen > 0)
			encodedByte = encodedByte | 128;
		mqtt_txbuf[mqtt_txlen++] = encodedByte;
	} while (DataLen > 0);
	mqtt_txbuf[mqtt_txlen++] = BYTE1(topicLength);//主题长度MSB
	mqtt_txbuf[mqtt_txlen++] = BYTE0(topicLength);//主题长度LSB 
	memcpy(&mqtt_txbuf[mqtt_txlen], topic, topicLength);//拷贝主题
	mqtt_txlen += topicLength;

	//报文标识符
	if (qos)
	{
		mqtt_txbuf[mqtt_txlen++] = BYTE1(id);
		mqtt_txbuf[mqtt_txlen++] = BYTE0(id);
		id++;
	}
	memcpy(&mqtt_txbuf[mqtt_txlen], message, messageLength);
	mqtt_txlen += messageLength;

	MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
	return mqtt_txlen;
}

void MQTT_SendBuf(unsigned char* buf, unsigned short len)
{
	Client_SendData(buf, len);//发送数据到服务器
}





//-----------------------------------------MQTT服务器的参数------------------------------------------------------------
//服务器IP
#define SERVER_IP "127.0.0.1"
#define SERVER_PORT 1883 //端口号

//MQTT三元组
#define ClientID "stm32_0"
#define Username "stm32"
#define Password "123456"//密文 

//订阅主题:
#define SET_TOPIC  "test/#"//订阅
//发布主题:
#define POST_TOPIC "test/2"//发布




//-----------------------------------------主函数------------------------------------------------------------

char mqtt_message[1024];//数据缓存区


SOCKET connectSocket; //网络套接字
WSADATA wsaData; //创建一个结构体变量,用于存储关于Winsock库的信息

double TEMP = 10.0;


int main()
{
	int result = WSAStartup(MAKEWORD(2, 2), &wsaData); //初始化Winsock库,指定版本号2.2,检查返回值
	if (result != 0)
	{
		printf("WSAStartup failed: %d\r\n", result);//输出错误信息并退出程序
		return 1;
	}

	 connectSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); //创建一个TCP套接字,检查返回值
	if (connectSocket == INVALID_SOCKET)
	{
		printf("socket failed with error: %d", WSAGetLastError());//输出错误信息并退出程序
		WSACleanup(); //清除Winsock库
		return 1;
	}


	struct sockaddr_in service; //创建一个结构体变量,用于存储服务器地址信息
	service.sin_family = AF_INET; //指定地址族为IPv4	
	service.sin_port = htons(SERVER_PORT); //将端口号从主机字节序转换为网络字节序,并存储在结构体中
    service.sin_addr.S_un.S_addr = inet_addr(SERVER_IP);
	result = connect(connectSocket, (SOCKADDR*)&service, sizeof(SOCKADDR)); //连接到服务器,检查返回值
	if (result == SOCKET_ERROR)
	{
		printf("connect fail\r\n"); //输出错误信息并退出程序
		closesocket(connectSocket); //关闭套接字
		WSACleanup(); //清除Winsock库
		return 1;
	}

	printf("Connected to server ok\r\n" ); //连接成功,输出消息

	MQTT_Init();

	while (1)
	{
		/*登录服务器*/
		if (MQTT_Connect((char*)ClientID, (char*)Username, (char*)Password) == 0)
		{
			break;
		}
		// 延时1000毫秒,即1秒
		Sleep(1000);
		printf("MQTT connect....\n");
	}


	printf("MQTT connect\r\n");
	//订阅物联网平台数据
	int stat = MQTT_SubscribeTopic((char*)SET_TOPIC, 1, 1);
	if (stat)
	{
		printf("subscribertopic fail\r\n");
		closesocket(connectSocket); //关闭套接字
		WSACleanup(); //清除Winsock库
		return 1;
	}
	printf("subscribertopic ok\r\n");


	/*创建线程*/

	while (1)
	{
		sprintf(mqtt_message, "{\"services\": [{\"service_id\": \"stm32\",\"properties\":{\"TEMP\":%.1f}}]}", (double)(TEMP+=0.2));//温度
		
		//发布主题
		MQTT_PublishData((char*)POST_TOPIC, mqtt_message, 0);
		char recbuf[4096]={0};
		int ret = Client_GetData(recbuf);
		if(ret)
		{
           printf("%s\r\n",recbuf+6);
		}
		
		Sleep(100);
	}
}



/*发送数据到服务器*/
int Client_SendData(unsigned char* buff, unsigned int len)
{
	int result = send(connectSocket,(const char*)buff, len, 0); //向服务器发送数据,检查返回值
	if (result == SOCKET_ERROR)
	{
		printf("send failed with error\r\n"); //输出错误信息并退出程序
		return -1;
	}
	return 0;
}



/*获取服务器下发数据*/
int Client_GetData(unsigned char* buff)
{
	int result = recv(connectSocket, (char*)buff,200, 0); //从服务器接收数据,检查返回值
	if (result == SOCKET_ERROR)
	{
		printf("recv failed with error\r\n") ; //输出错误信息并退出程序
		return -1;
	}
	return result;
}

Logo

智能硬件社区聚焦AI智能硬件技术生态,汇聚嵌入式AI、物联网硬件开发者,打造交流分享平台,同步全国赛事资讯、开展 OPC 核心人才招募,助力技术落地与开发者成长。

更多推荐