diff --git a/Ubiquitous/XiZi_IIoT/tool/mqtt/aliyun_mqtt.c b/Ubiquitous/XiZi_IIoT/tool/mqtt/aliyun_mqtt.c index 4d5f38148..735e35dfa 100644 --- a/Ubiquitous/XiZi_IIoT/tool/mqtt/aliyun_mqtt.c +++ b/Ubiquitous/XiZi_IIoT/tool/mqtt/aliyun_mqtt.c @@ -21,7 +21,6 @@ #include #include -#include #include #include "shell.h" #include "xsconfig.h" @@ -80,6 +79,31 @@ out: } + +/******************************************************************************* +* 函 数 名: MQTT_Send +* 功能描述: MQTT client数据发送函数 +* 形 参: buf:要发送的数据,buflen:要发送的数据长度 +* 返 回 值: 发送成功为0,发送失败为-1 +*******************************************************************************/ +int MQTT_Send(const uint8_t* buf, int buflen) +{ + return AdapterDeviceSend(adapter, buf, buflen) ; +} + + +/******************************************************************************* +* 函 数 名: MQTT_Recv +* 功能描述: MQTT client数据接收函数 +* 形 参: buf:数据缓冲区,buflen:期望接收的数据长度 +* 返 回 值: 实际接收到的数据长度,接收失败为-1 +*******************************************************************************/ +int MQTT_Recv(uint8_t* buf, int buflen) +{ + return AdapterDeviceRecv(adapter, buf, buflen) ; +} + + /******************************************************************************* * 函 数 名: MQTT_Connect * 功能描述: 登录MQTT服务器 @@ -138,9 +162,9 @@ int MQTT_Connect(void) while(TryConnect_time > 0) { memset(mqtt_rxbuf,0,sizeof(mqtt_rxbuf)); - AdapterDeviceSend(adapter,Aliyun_mqtt.Pack_buff,Aliyun_mqtt.Fixed_len + Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len); - PrivTaskDelay(50); - AdapterDeviceRecv(adapter, mqtt_rxbuf, 4); + MQTT_Send(Aliyun_mqtt.Pack_buff,Aliyun_mqtt.Fixed_len + Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len); + MdelayKTask(50); + MQTT_Recv(mqtt_rxbuf, 4); if(mqtt_rxbuf[0] == parket_connetAck[0] && mqtt_rxbuf[1] == parket_connetAck[1]) //连接成功 { return 0; @@ -159,7 +183,7 @@ int MQTT_Connect(void) *******************************************************************************/ void MQTT_Disconnect(void) { - while(AdapterDeviceSend(adapter,parket_disconnet,sizeof(parket_disconnet)) < 0); + while(MQTT_Send(parket_disconnet,sizeof(parket_disconnet)) < 0); } @@ -207,9 +231,9 @@ int MQTT_SubscribeTopic(uint8_t *topic_name) while(TrySub_time > 0) { memset(mqtt_rxbuf,0,sizeof(mqtt_rxbuf)); - AdapterDeviceSend(adapter,Aliyun_mqtt.Pack_buff,Aliyun_mqtt.Fixed_len + Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len); - PrivTaskDelay(50); - AdapterDeviceRecv(adapter, mqtt_rxbuf, 5); + MQTT_Send(Aliyun_mqtt.Pack_buff,Aliyun_mqtt.Fixed_len + Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len); + MdelayKTask(50); + MQTT_Recv(mqtt_rxbuf, 5); if(mqtt_rxbuf[0] == parket_subAck[0] && mqtt_rxbuf[1] == parket_subAck[1]) //订阅成功 { return 0; @@ -230,8 +254,8 @@ int MQTT_UnSubscribeTopic(uint8_t *topic_name) { uint8_t TryUnSub_time = 10; //尝试取消订阅次数 - Aliyun_mqtt.Fixed_len = 1; //UNSUBSCRIBE报文,固定报头长度暂定为1 - Aliyun_mqtt.Variable_len = 2;//UNSUBSCRIBE报文,可变报头长度=2,2为字节报文标识符 + Aliyun_mqtt.Fixed_len = 1; //UNSUBSCRIBE报文,固定报头长度暂定为1 + Aliyun_mqtt.Variable_len = 2; //UNSUBSCRIBE报文,可变报头长度=2,2为字节报文标识符 Aliyun_mqtt.Payload_len = strlen(topic_name) + 2; //每个需要取消的订阅topic除了本身的字符串长度,还包含表示topic字符串长度的2字节 Aliyun_mqtt.Remaining_len = Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len; //计算剩余长度=可变报头长度+负载长度 memset(Aliyun_mqtt.Pack_buff,0,sizeof(Aliyun_mqtt.Pack_buff)); @@ -261,10 +285,10 @@ int MQTT_UnSubscribeTopic(uint8_t *topic_name) while(TryUnSub_time > 0) { memset(mqtt_rxbuf,0,sizeof(mqtt_rxbuf)); - AdapterDeviceSend(adapter,Aliyun_mqtt.Pack_buff,Aliyun_mqtt.Fixed_len + Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len); - PrivTaskDelay(50); - AdapterDeviceRecv(adapter, mqtt_rxbuf, 4); - if(mqtt_rxbuf[0] == parket_unsubAck[0] && mqtt_rxbuf[1] == parket_unsubAck[1]) //取消订阅成功 + MQTT_Send(Aliyun_mqtt.Pack_buff,Aliyun_mqtt.Fixed_len + Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len); + MdelayKTask(50); + MQTT_Recv(mqtt_rxbuf, 4); + if(mqtt_rxbuf[0] == parket_unsubAck[0] && mqtt_rxbuf[1] == parket_unsubAck[1]) //取消订阅成功 { return 0; } @@ -275,17 +299,17 @@ int MQTT_UnSubscribeTopic(uint8_t *topic_name) /******************************************************************************* -* 函 数 名: MQTT_PublishQs0 -* 功能描述: MQTT发布QS0的主题 +* 函 数 名: MQTT_PublishDataQs0 +* 功能描述: 向服务器发送等级0的Publish报文 * 形 参: topic_name:主题名称 - data:数据buffer + data:数据缓存 data_len:数据长度 * 返 回 值: 发布Qs=0的消息服务器不返回确认消息 *******************************************************************************/ -void MQTT_PublishQs0(uint8_t *topic_name,uint8_t *data, uint16_t data_len) +void MQTT_PublishDataQs0(uint8_t *topic_name,uint8_t *data, uint16_t data_len) { Aliyun_mqtt.Fixed_len = 1; //PUBLISH等级0报文固定报头长度暂定为1 - Aliyun_mqtt.Variable_len = 2 + strlen(topic_name); //PUBLISH等级0报文,可变报头长度=2字节(topic长度)标识字节+topic字符串的长度 + Aliyun_mqtt.Variable_len = 2 + strlen(topic_name); //PUBLISH等级0报文,可变报头长度=2字节topic长度标识字节+topic字符串的长度 Aliyun_mqtt.Payload_len = data_len; //PUBLISH等级0报文,负载数据长度=data_len Aliyun_mqtt.Remaining_len = Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len; //计算剩余长度=可变报头长度+负载长度 memset(Aliyun_mqtt.Pack_buff,0,sizeof(Aliyun_mqtt.Pack_buff)); @@ -309,7 +333,50 @@ void MQTT_PublishQs0(uint8_t *topic_name,uint8_t *data, uint16_t data_len) memcpy(&Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+2],topic_name,strlen(topic_name)); //复制主题字串 memcpy(&Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+2+strlen(topic_name)],data,data_len); //复制data数据 - AdapterDeviceSend(adapter,Aliyun_mqtt.Pack_buff, Aliyun_mqtt.Fixed_len + Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len);//整个报文数据直到发送成功 + MQTT_Send(Aliyun_mqtt.Pack_buff, Aliyun_mqtt.Fixed_len + Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len); +} + + +/******************************************************************************* +* 函 数 名: MQTT_PublishDataQs1 +* 功能描述: 向服务器发送等级1的Publish报文 +* 形 参: topic_name:主题名称 + data:数据缓存 + data_len:数据长度 +* 返 回 值: 无 +*******************************************************************************/ +void MQTT_PublishDataQs1(uint8_t *topic_name,uint8_t *data, uint16_t data_len) +{ + Aliyun_mqtt.Fixed_len = 1; //PUBLISH等级1报文固定报头长度暂定为1 + Aliyun_mqtt.Variable_len = 2 + 2 + strlen(topic_name); //PUBLISH等级1报文,可变报头长度=2字节消息标识符+2字节topic长度标识字节+topic字符串的长度 + Aliyun_mqtt.Payload_len = data_len; //PUBLISH等级1报文,负载数据长度=data_len + Aliyun_mqtt.Remaining_len = Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len; //计算剩余长度=可变报头长度+负载长度 + + Aliyun_mqtt.Pack_buff[0] = 0x32; //等级1的Publish报文固定报头第1个字节,0x32 + do{ + if(Aliyun_mqtt.Remaining_len/128 == 0) + { + Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len] = Aliyun_mqtt.Remaining_len; + } + else + { + Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len] = (Aliyun_mqtt.Remaining_len%128)|0x80; + } + Aliyun_mqtt.Fixed_len++; + Aliyun_mqtt.Remaining_len = Aliyun_mqtt.Remaining_len/128; + }while(Aliyun_mqtt.Remaining_len); + + Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+0] = strlen(topic_name)/256; //主题长度高字节 + Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+1] = strlen(topic_name)%256; //主题长度低字节 + memcpy(&Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+2],topic_name,strlen(topic_name)); //复制主题字串 + + Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+2+strlen(topic_name)] = Aliyun_mqtt.MessageID/256; //报文标识符高字节 + Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+3+strlen(topic_name)] = Aliyun_mqtt.MessageID%256; //报文标识符低字节 + Aliyun_mqtt.MessageID++; //每用一次MessageID加1 + + memcpy(&Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+4+strlen(topic_name)],data,strlen(data)); //复制data数据 + + MQTT_Send(Aliyun_mqtt.Pack_buff,Aliyun_mqtt.Fixed_len + Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len); } @@ -325,10 +392,10 @@ int MQTT_SendHeart(void) while(TrySentHeart_time > 0) { memset(mqtt_rxbuf,0,sizeof(mqtt_rxbuf)); - AdapterDeviceSend(adapter,parket_heart,sizeof(parket_heart)); - PrivTaskDelay(50); - AdapterDeviceRecv(adapter, mqtt_rxbuf, 2); - if(mqtt_rxbuf[0] == 0xD0 && mqtt_rxbuf[1] == 0x00) + MQTT_Send(parket_heart,sizeof(parket_heart)); + MdelayKTask(50); + MQTT_Recv(mqtt_rxbuf, 2); + if(mqtt_rxbuf[0] == 0xD0 && mqtt_rxbuf[1] == 0x00) { return 0; } @@ -380,7 +447,7 @@ void testmqtt(void) { KPrintf("Log in to aliyun mqtt successfully.\n"); } - PrivTaskDelay(2000); + MdelayKTask(2000); ret = MQTT_SubscribeTopic(TOPIC); if(ret == 0) { @@ -389,14 +456,14 @@ void testmqtt(void) while(1) { memset(mqtt_rxbuf,0,sizeof(mqtt_rxbuf)); - len = AdapterDeviceRecv(adapter, mqtt_rxbuf, 256); + len = MQTT_Recv(mqtt_rxbuf, 256); if(len > 0 && (mqtt_rxbuf[0] == 0x30)) { MQTT_DealPublishData(mqtt_rxbuf, len); KPrintf("%s",Aliyun_mqtt.cmdbuff); KPrintf("\r\n"); } - PrivTaskDelay(200); + MdelayKTask(200); MQTT_SendHeart(); } } diff --git a/Ubiquitous/XiZi_IIoT/tool/mqtt/aliyun_mqtt.h b/Ubiquitous/XiZi_IIoT/tool/mqtt/aliyun_mqtt.h index b04fb57a5..9a03b7448 100644 --- a/Ubiquitous/XiZi_IIoT/tool/mqtt/aliyun_mqtt.h +++ b/Ubiquitous/XiZi_IIoT/tool/mqtt/aliyun_mqtt.h @@ -45,11 +45,14 @@ typedef struct{ }MQTT_TCB; int AdapterNetActive(void); +int MQTT_Send(const uint8_t* buf, int buflen); +int MQTT_Recv(uint8_t* buf, int buflen); int MQTT_Connect(void); void MQTT_Disconnect(void); int MQTT_SubscribeTopic(uint8_t *topic_name); int MQTT_UnSubscribeTopic(uint8_t *topic_name); -void MQTT_PublishQs0(uint8_t *topic_name,uint8_t *data, uint16_t data_len); +void MQTT_PublishDataQs0(uint8_t *topic_name,uint8_t *data, uint16_t data_len); +void MQTT_PublishDataQs1(uint8_t *topic_name,uint8_t *data, uint16_t data_len); int MQTT_SendHeart(void); void MQTT_DealPublishData(uint8_t *data, uint16_t data_len); #endif \ No newline at end of file