Optimize mqtt function add MQTT_PublishDataQs1 function

This commit is contained in:
wgzAIIT 2023-06-15 18:32:12 +08:00
parent 9995980a1a
commit 555b014bc9
2 changed files with 98 additions and 28 deletions

View File

@ -21,7 +21,6 @@
#include <string.h> #include <string.h>
#include <stdint.h> #include <stdint.h>
#include <transform.h>
#include <adapter.h> #include <adapter.h>
#include "shell.h" #include "shell.h"
#include "xsconfig.h" #include "xsconfig.h"
@ -80,6 +79,31 @@ out:
} }
/*******************************************************************************
* : MQTT_Send
* : MQTT client数据发送函数
* : buf:,buflen:
* : 0,-1
*******************************************************************************/
int MQTT_Send(const uint8_t* buf, int buflen)
{
return AdapterDeviceSend(adapter, buf, buflen) ;
}
/*******************************************************************************
* : MQTT_Recv
* : MQTT client数据接收函数
* : buf:,buflen:
* : ,-1
*******************************************************************************/
int MQTT_Recv(uint8_t* buf, int buflen)
{
return AdapterDeviceRecv(adapter, buf, buflen) ;
}
/******************************************************************************* /*******************************************************************************
* : MQTT_Connect * : MQTT_Connect
* : MQTT服务器 * : MQTT服务器
@ -138,9 +162,9 @@ int MQTT_Connect(void)
while(TryConnect_time > 0) while(TryConnect_time > 0)
{ {
memset(mqtt_rxbuf,0,sizeof(mqtt_rxbuf)); memset(mqtt_rxbuf,0,sizeof(mqtt_rxbuf));
AdapterDeviceSend(adapter,Aliyun_mqtt.Pack_buff,Aliyun_mqtt.Fixed_len + Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len); MQTT_Send(Aliyun_mqtt.Pack_buff,Aliyun_mqtt.Fixed_len + Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len);
PrivTaskDelay(50); MdelayKTask(50);
AdapterDeviceRecv(adapter, mqtt_rxbuf, 4); MQTT_Recv(mqtt_rxbuf, 4);
if(mqtt_rxbuf[0] == parket_connetAck[0] && mqtt_rxbuf[1] == parket_connetAck[1]) //连接成功 if(mqtt_rxbuf[0] == parket_connetAck[0] && mqtt_rxbuf[1] == parket_connetAck[1]) //连接成功
{ {
return 0; return 0;
@ -159,7 +183,7 @@ int MQTT_Connect(void)
*******************************************************************************/ *******************************************************************************/
void MQTT_Disconnect(void) void MQTT_Disconnect(void)
{ {
while(AdapterDeviceSend(adapter,parket_disconnet,sizeof(parket_disconnet)) < 0); while(MQTT_Send(parket_disconnet,sizeof(parket_disconnet)) < 0);
} }
@ -207,9 +231,9 @@ int MQTT_SubscribeTopic(uint8_t *topic_name)
while(TrySub_time > 0) while(TrySub_time > 0)
{ {
memset(mqtt_rxbuf,0,sizeof(mqtt_rxbuf)); memset(mqtt_rxbuf,0,sizeof(mqtt_rxbuf));
AdapterDeviceSend(adapter,Aliyun_mqtt.Pack_buff,Aliyun_mqtt.Fixed_len + Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len); MQTT_Send(Aliyun_mqtt.Pack_buff,Aliyun_mqtt.Fixed_len + Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len);
PrivTaskDelay(50); MdelayKTask(50);
AdapterDeviceRecv(adapter, mqtt_rxbuf, 5); MQTT_Recv(mqtt_rxbuf, 5);
if(mqtt_rxbuf[0] == parket_subAck[0] && mqtt_rxbuf[1] == parket_subAck[1]) //订阅成功 if(mqtt_rxbuf[0] == parket_subAck[0] && mqtt_rxbuf[1] == parket_subAck[1]) //订阅成功
{ {
return 0; return 0;
@ -230,8 +254,8 @@ int MQTT_UnSubscribeTopic(uint8_t *topic_name)
{ {
uint8_t TryUnSub_time = 10; //尝试取消订阅次数 uint8_t TryUnSub_time = 10; //尝试取消订阅次数
Aliyun_mqtt.Fixed_len = 1; //UNSUBSCRIBE报文,固定报头长度暂定为1 Aliyun_mqtt.Fixed_len = 1; //UNSUBSCRIBE报文,固定报头长度暂定为1
Aliyun_mqtt.Variable_len = 2;//UNSUBSCRIBE报文,可变报头长度=2,2为字节报文标识符 Aliyun_mqtt.Variable_len = 2; //UNSUBSCRIBE报文,可变报头长度=2,2为字节报文标识符
Aliyun_mqtt.Payload_len = strlen(topic_name) + 2; //每个需要取消的订阅topic除了本身的字符串长度,还包含表示topic字符串长度的2字节 Aliyun_mqtt.Payload_len = strlen(topic_name) + 2; //每个需要取消的订阅topic除了本身的字符串长度,还包含表示topic字符串长度的2字节
Aliyun_mqtt.Remaining_len = Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len; //计算剩余长度=可变报头长度+负载长度 Aliyun_mqtt.Remaining_len = Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len; //计算剩余长度=可变报头长度+负载长度
memset(Aliyun_mqtt.Pack_buff,0,sizeof(Aliyun_mqtt.Pack_buff)); memset(Aliyun_mqtt.Pack_buff,0,sizeof(Aliyun_mqtt.Pack_buff));
@ -261,10 +285,10 @@ int MQTT_UnSubscribeTopic(uint8_t *topic_name)
while(TryUnSub_time > 0) while(TryUnSub_time > 0)
{ {
memset(mqtt_rxbuf,0,sizeof(mqtt_rxbuf)); memset(mqtt_rxbuf,0,sizeof(mqtt_rxbuf));
AdapterDeviceSend(adapter,Aliyun_mqtt.Pack_buff,Aliyun_mqtt.Fixed_len + Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len); MQTT_Send(Aliyun_mqtt.Pack_buff,Aliyun_mqtt.Fixed_len + Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len);
PrivTaskDelay(50); MdelayKTask(50);
AdapterDeviceRecv(adapter, mqtt_rxbuf, 4); MQTT_Recv(mqtt_rxbuf, 4);
if(mqtt_rxbuf[0] == parket_unsubAck[0] && mqtt_rxbuf[1] == parket_unsubAck[1]) //取消订阅成功 if(mqtt_rxbuf[0] == parket_unsubAck[0] && mqtt_rxbuf[1] == parket_unsubAck[1]) //取消订阅成功
{ {
return 0; return 0;
} }
@ -275,17 +299,17 @@ int MQTT_UnSubscribeTopic(uint8_t *topic_name)
/******************************************************************************* /*******************************************************************************
* : MQTT_PublishQs0 * : MQTT_PublishDataQs0
* : MQTT发布QS0的主题 * : 0Publish报文
* : topic_name: * : topic_name:
data:buffer data:
data_len: data_len:
* : Qs=0 * : Qs=0
*******************************************************************************/ *******************************************************************************/
void MQTT_PublishQs0(uint8_t *topic_name,uint8_t *data, uint16_t data_len) void MQTT_PublishDataQs0(uint8_t *topic_name,uint8_t *data, uint16_t data_len)
{ {
Aliyun_mqtt.Fixed_len = 1; //PUBLISH等级0报文固定报头长度暂定为1 Aliyun_mqtt.Fixed_len = 1; //PUBLISH等级0报文固定报头长度暂定为1
Aliyun_mqtt.Variable_len = 2 + strlen(topic_name); //PUBLISH等级0报文,可变报头长度=2字节(topic长度)标识字节+topic字符串的长度 Aliyun_mqtt.Variable_len = 2 + strlen(topic_name); //PUBLISH等级0报文,可变报头长度=2字节topic长度标识字节+topic字符串的长度
Aliyun_mqtt.Payload_len = data_len; //PUBLISH等级0报文,负载数据长度=data_len Aliyun_mqtt.Payload_len = data_len; //PUBLISH等级0报文,负载数据长度=data_len
Aliyun_mqtt.Remaining_len = Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len; //计算剩余长度=可变报头长度+负载长度 Aliyun_mqtt.Remaining_len = Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len; //计算剩余长度=可变报头长度+负载长度
memset(Aliyun_mqtt.Pack_buff,0,sizeof(Aliyun_mqtt.Pack_buff)); memset(Aliyun_mqtt.Pack_buff,0,sizeof(Aliyun_mqtt.Pack_buff));
@ -309,7 +333,50 @@ void MQTT_PublishQs0(uint8_t *topic_name,uint8_t *data, uint16_t data_len)
memcpy(&Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+2],topic_name,strlen(topic_name)); //复制主题字串 memcpy(&Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+2],topic_name,strlen(topic_name)); //复制主题字串
memcpy(&Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+2+strlen(topic_name)],data,data_len); //复制data数据 memcpy(&Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+2+strlen(topic_name)],data,data_len); //复制data数据
AdapterDeviceSend(adapter,Aliyun_mqtt.Pack_buff, Aliyun_mqtt.Fixed_len + Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len);//整个报文数据直到发送成功 MQTT_Send(Aliyun_mqtt.Pack_buff, Aliyun_mqtt.Fixed_len + Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len);
}
/*******************************************************************************
* : MQTT_PublishDataQs1
* : 1Publish报文
* : topic_name:
data:
data_len:
* :
*******************************************************************************/
void MQTT_PublishDataQs1(uint8_t *topic_name,uint8_t *data, uint16_t data_len)
{
Aliyun_mqtt.Fixed_len = 1; //PUBLISH等级1报文固定报头长度暂定为1
Aliyun_mqtt.Variable_len = 2 + 2 + strlen(topic_name); //PUBLISH等级1报文,可变报头长度=2字节消息标识符+2字节topic长度标识字节+topic字符串的长度
Aliyun_mqtt.Payload_len = data_len; //PUBLISH等级1报文,负载数据长度=data_len
Aliyun_mqtt.Remaining_len = Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len; //计算剩余长度=可变报头长度+负载长度
Aliyun_mqtt.Pack_buff[0] = 0x32; //等级1的Publish报文固定报头第1个字节0x32
do{
if(Aliyun_mqtt.Remaining_len/128 == 0)
{
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len] = Aliyun_mqtt.Remaining_len;
}
else
{
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len] = (Aliyun_mqtt.Remaining_len%128)|0x80;
}
Aliyun_mqtt.Fixed_len++;
Aliyun_mqtt.Remaining_len = Aliyun_mqtt.Remaining_len/128;
}while(Aliyun_mqtt.Remaining_len);
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+0] = strlen(topic_name)/256; //主题长度高字节
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+1] = strlen(topic_name)%256; //主题长度低字节
memcpy(&Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+2],topic_name,strlen(topic_name)); //复制主题字串
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+2+strlen(topic_name)] = Aliyun_mqtt.MessageID/256; //报文标识符高字节
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+3+strlen(topic_name)] = Aliyun_mqtt.MessageID%256; //报文标识符低字节
Aliyun_mqtt.MessageID++; //每用一次MessageID加1
memcpy(&Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+4+strlen(topic_name)],data,strlen(data)); //复制data数据
MQTT_Send(Aliyun_mqtt.Pack_buff,Aliyun_mqtt.Fixed_len + Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len);
} }
@ -325,10 +392,10 @@ int MQTT_SendHeart(void)
while(TrySentHeart_time > 0) while(TrySentHeart_time > 0)
{ {
memset(mqtt_rxbuf,0,sizeof(mqtt_rxbuf)); memset(mqtt_rxbuf,0,sizeof(mqtt_rxbuf));
AdapterDeviceSend(adapter,parket_heart,sizeof(parket_heart)); MQTT_Send(parket_heart,sizeof(parket_heart));
PrivTaskDelay(50); MdelayKTask(50);
AdapterDeviceRecv(adapter, mqtt_rxbuf, 2); MQTT_Recv(mqtt_rxbuf, 2);
if(mqtt_rxbuf[0] == 0xD0 && mqtt_rxbuf[1] == 0x00) if(mqtt_rxbuf[0] == 0xD0 && mqtt_rxbuf[1] == 0x00)
{ {
return 0; return 0;
} }
@ -380,7 +447,7 @@ void testmqtt(void)
{ {
KPrintf("Log in to aliyun mqtt successfully.\n"); KPrintf("Log in to aliyun mqtt successfully.\n");
} }
PrivTaskDelay(2000); MdelayKTask(2000);
ret = MQTT_SubscribeTopic(TOPIC); ret = MQTT_SubscribeTopic(TOPIC);
if(ret == 0) if(ret == 0)
{ {
@ -389,14 +456,14 @@ void testmqtt(void)
while(1) while(1)
{ {
memset(mqtt_rxbuf,0,sizeof(mqtt_rxbuf)); memset(mqtt_rxbuf,0,sizeof(mqtt_rxbuf));
len = AdapterDeviceRecv(adapter, mqtt_rxbuf, 256); len = MQTT_Recv(mqtt_rxbuf, 256);
if(len > 0 && (mqtt_rxbuf[0] == 0x30)) if(len > 0 && (mqtt_rxbuf[0] == 0x30))
{ {
MQTT_DealPublishData(mqtt_rxbuf, len); MQTT_DealPublishData(mqtt_rxbuf, len);
KPrintf("%s",Aliyun_mqtt.cmdbuff); KPrintf("%s",Aliyun_mqtt.cmdbuff);
KPrintf("\r\n"); KPrintf("\r\n");
} }
PrivTaskDelay(200); MdelayKTask(200);
MQTT_SendHeart(); MQTT_SendHeart();
} }
} }

View File

@ -45,11 +45,14 @@ typedef struct{
}MQTT_TCB; }MQTT_TCB;
int AdapterNetActive(void); int AdapterNetActive(void);
int MQTT_Send(const uint8_t* buf, int buflen);
int MQTT_Recv(uint8_t* buf, int buflen);
int MQTT_Connect(void); int MQTT_Connect(void);
void MQTT_Disconnect(void); void MQTT_Disconnect(void);
int MQTT_SubscribeTopic(uint8_t *topic_name); int MQTT_SubscribeTopic(uint8_t *topic_name);
int MQTT_UnSubscribeTopic(uint8_t *topic_name); int MQTT_UnSubscribeTopic(uint8_t *topic_name);
void MQTT_PublishQs0(uint8_t *topic_name,uint8_t *data, uint16_t data_len); void MQTT_PublishDataQs0(uint8_t *topic_name,uint8_t *data, uint16_t data_len);
void MQTT_PublishDataQs1(uint8_t *topic_name,uint8_t *data, uint16_t data_len);
int MQTT_SendHeart(void); int MQTT_SendHeart(void);
void MQTT_DealPublishData(uint8_t *data, uint16_t data_len); void MQTT_DealPublishData(uint8_t *data, uint16_t data_len);
#endif #endif