1、improving MQTT transmission speed 2K Bytes per package

2、rename mqtt file name
This commit is contained in:
wgzAIIT 2023-06-21 18:59:04 +08:00
parent 6f3d3b8862
commit 5900b504e3
10 changed files with 520 additions and 473 deletions

View File

@ -18,12 +18,21 @@
extern int FrameworkInit(); extern int FrameworkInit();
extern void ApplicationOtaTaskInit(void); extern void ApplicationOtaTaskInit(void);
#ifdef OTA_BY_PLATFORM
extern int OtaTask(void);
#endif
int main(void) int main(void)
{ {
printf("Hello, world! \n"); printf("Hello, world! \n");
FrameworkInit(); FrameworkInit();
#ifdef APPLICATION_OTA #ifdef APPLICATION_OTA
ApplicationOtaTaskInit(); ApplicationOtaTaskInit();
#endif
#ifdef OTA_BY_PLATFORM
OtaTask();
#endif #endif
return 0; return 0;
} }

View File

@ -163,8 +163,12 @@ static int Ec200tIoctl(struct Adapter *adapter, int cmd, void *args)
serial_cfg.serial_parity_mode = PARITY_NONE; serial_cfg.serial_parity_mode = PARITY_NONE;
serial_cfg.serial_bit_order = STOP_BITS_1; serial_cfg.serial_bit_order = STOP_BITS_1;
serial_cfg.serial_invert_mode = NRZ_NORMAL; serial_cfg.serial_invert_mode = NRZ_NORMAL;
//serial receive timeout 0.6s #ifdef TOOL_USING_OTA
serial_cfg.serial_timeout = 600; serial_cfg.serial_timeout = OTA_RX_TIMEOUT;
#else
//serial receive timeout 10s
serial_cfg.serial_timeout = 100000;
#endif
serial_cfg.is_ext_uart = 0; serial_cfg.is_ext_uart = 0;
#ifdef ADAPTER_EC200T_DRIVER_EXT_PORT #ifdef ADAPTER_EC200T_DRIVER_EXT_PORT
serial_cfg.is_ext_uart = 1; serial_cfg.is_ext_uart = 1;

View File

@ -355,12 +355,25 @@ static int GetCompleteATReply(ATAgentType agent)
PrivMutexObtain(&agent->lock); PrivMutexObtain(&agent->lock);
if (agent->receive_mode == ENTM_MODE) { if (agent->receive_mode == ENTM_MODE) {
if (agent->entm_recv_len < ENTM_RECV_MAX) { if (agent->entm_recv_len < ENTM_RECV_MAX) {
if((res == 1) && (agent->entm_recv_len < agent->read_len)) { #ifdef TOOL_USING_MQTT
if((res == 1) && (agent->entm_recv_len < agent->read_len))
{
agent->entm_recv_buf[agent->entm_recv_len] = ch; agent->entm_recv_buf[agent->entm_recv_len] = ch;
agent->entm_recv_len++; agent->entm_recv_len++;
PrivMutexAbandon(&agent->lock); PrivMutexAbandon(&agent->lock);
continue; continue;
} else { }
#else
agent->entm_recv_buf[agent->entm_recv_len] = ch;
agent->entm_recv_len++;
if(agent->entm_recv_len < agent->read_len)
{
PrivMutexAbandon(&agent->lock);
continue;
}
#endif
else
{
#ifdef CONNECTION_FRAMEWORK_DEBUG #ifdef CONNECTION_FRAMEWORK_DEBUG
printf("ENTM_MODE recv %d Bytes done.\n",agent->entm_recv_len); printf("ENTM_MODE recv %d Bytes done.\n",agent->entm_recv_len);
#endif #endif

View File

@ -28,6 +28,12 @@
#define REPLY_TIME_OUT 10 #define REPLY_TIME_OUT 10
#ifdef TOOL_USING_OTA
#define ENTM_RECV_MAX OTA_RX_BUFFERSIZE
#else
#define ENTM_RECV_MAX 256
#endif
enum ReceiveMode enum ReceiveMode
{ {
DEFAULT_MODE = 0, DEFAULT_MODE = 0,
@ -70,7 +76,6 @@ struct ATAgent
#endif #endif
pthread_t at_handler; pthread_t at_handler;
#define ENTM_RECV_MAX 2048
char entm_recv_buf[ENTM_RECV_MAX]; char entm_recv_buf[ENTM_RECV_MAX];
uint32 entm_recv_len; uint32 entm_recv_len;
enum ReceiveMode receive_mode; enum ReceiveMode receive_mode;

View File

@ -17,23 +17,19 @@ menu "OTA function"
endchoice endchoice
if MCUBOOT_APPLICATION if MCUBOOT_APPLICATION
menu "The way of OTA firmware upgrade." choice
config OTA_BY_IAP prompt "The way of OTA firmware upgrade."
bool "Through serial port IAP." default OTA_BY_PLATFORM
default y
config OTA_BY_TCPSERVER
bool "Through the public network TCP server."
default n
select SUPPORT_CONNECTION_FRAMEWORK
select CONNECTION_ADAPTER_4G
config OTA_BY_PLATFORM config OTA_BY_PLATFORM
bool "Through IoT management platform." bool "Through IoT management platform."
default n select TOOL_USING_MQTT
config OTA_BY_TCPSERVER
bool "Through the public network TCP server."
select SUPPORT_CONNECTION_FRAMEWORK select SUPPORT_CONNECTION_FRAMEWORK
select CONNECTION_ADAPTER_4G select CONNECTION_ADAPTER_4G
endmenu endchoice
endif endif
@ -62,7 +58,18 @@ menu "OTA function"
hex "Application package size,the default size is limited to 1M." hex "Application package size,the default size is limited to 1M."
default 0x00100000 default 0x00100000
endmenu endmenu
config OTA_RX_TIMEOUT
int "OTA receive data timeout(ms)."
default 600 if OTA_BY_PLATFORM
default 10000 if OTA_BY_TCPSERVER
default 10000 if MCUBOOT_BOOTLOADER
config OTA_RX_BUFFERSIZE
int "OTA receive data buffer size."
default 3072 if OTA_BY_PLATFORM
default 2048 if OTA_BY_TCPSERVER
default 256 if MCUBOOT_BOOTLOADER
endif endif
endmenu endmenu

View File

@ -31,7 +31,7 @@
#endif #endif
#ifdef OTA_BY_PLATFORM #ifdef OTA_BY_PLATFORM
#include "aliyun_mqtt.h" #include "platform_mqtt.h"
#endif #endif
/**************************************************************************** /****************************************************************************
@ -159,7 +159,7 @@ static int create_version(uint8_t* cur_version, uint8_t* new_version)
//更新版本号 //更新版本号
sprintf(new_version, "%03d.%03d.%03d", major, minor, patch); sprintf(new_version, "%03d.%03d.%03d", major, minor, patch);
return 0; return 0;
} }
@ -419,7 +419,6 @@ static void BootLoaderJumpApp(void)
} }
#ifdef OTA_BY_IAP
/********************************************************************************* /*********************************************************************************
* : app_ota_by_iap * : app_ota_by_iap
* : ota升级,,iap方式传输bin文件 * : ota升级,,iap方式传输bin文件
@ -470,7 +469,6 @@ static void app_ota_by_iap(void)
mcuboot.op_reset(); mcuboot.op_reset();
} }
SHELL_EXPORT_CMD(SHELL_CMD_PERMISSION(0)|SHELL_CMD_TYPE(SHELL_TYPE_CMD_FUNC)|SHELL_CMD_PARAM_NUM(0),iap, app_ota_by_iap, ota by iap function); SHELL_EXPORT_CMD(SHELL_CMD_PERMISSION(0)|SHELL_CMD_TYPE(SHELL_TYPE_CMD_FUNC)|SHELL_CMD_PARAM_NUM(0),iap, app_ota_by_iap, ota by iap function);
#endif
#ifdef OTA_BY_TCPSERVER #ifdef OTA_BY_TCPSERVER
@ -738,8 +736,8 @@ SHELL_EXPORT_CMD(SHELL_CMD_PERMISSION(0)|SHELL_CMD_TYPE(SHELL_TYPE_CMD_FUNC)|SHE
#ifdef OTA_BY_PLATFORM #ifdef OTA_BY_PLATFORM
#define FRAME_LEN 1024 //每帧数据的数据包长度 #define FRAME_LEN 2048 //每帧数据的数据包长度
static uint8_t MqttRxbuf[2048]; static uint8_t MqttRxbuf[3072];
static uint8_t FrameBuf[FRAME_LEN]; static uint8_t FrameBuf[FRAME_LEN];
static OTA_TCB AliOTA; static OTA_TCB AliOTA;
/******************************************************************************* /*******************************************************************************
@ -783,7 +781,7 @@ void OTA_Download(int size, int offset)
* : * :
* : * :
*******************************************************************************/ *******************************************************************************/
static void app_ota_by_platform(void) static void app_ota_by_platform(void* parameter)
{ {
int datalen; int datalen;
int ret = 0; int ret = 0;
@ -810,7 +808,7 @@ static void app_ota_by_platform(void)
if(datalen > 0 && (MqttRxbuf[0] == 0x30)) if(datalen > 0 && (MqttRxbuf[0] == 0x30))
{ {
MQTT_DealPublishData(MqttRxbuf, datalen); MQTT_DealPublishData(MqttRxbuf, datalen);
if(sscanf((char *)Aliyun_mqtt.cmdbuff,"/ota/device/upgrade/iv74JbFgzhv/D001{\"code\":\"1000\",\"data\":{\"size\":%d,\"streamId\":%d,\"sign\":\"%*32s\",\"dProtocol\":\"mqtt\",\"version\":\"%11s\"",&AliOTA.size,&AliOTA.streamId,AliOTA.version)==3) if(sscanf((char *)Platform_mqtt.cmdbuff,"/ota/device/upgrade/iv74JbFgzhv/D001{\"code\":\"1000\",\"data\":{\"size\":%d,\"streamId\":%d,\"sign\":\"%*32s\",\"dProtocol\":\"mqtt\",\"version\":\"%11s\"",&AliOTA.size,&AliOTA.streamId,AliOTA.version)==3)
{ {
KPrintf("ota file size:%d\r\n",AliOTA.size); KPrintf("ota file size:%d\r\n",AliOTA.size);
KPrintf("ota file id:%d\r\n",AliOTA.streamId); KPrintf("ota file id:%d\r\n",AliOTA.streamId);
@ -822,12 +820,12 @@ static void app_ota_by_platform(void)
break; break;
} }
AliOTA.counter = (AliOTA.size%FRAME_LEN != 0)? (AliOTA.size/FRAME_LEN + 1):(AliOTA.size/FRAME_LEN); AliOTA.counter = (AliOTA.size%FRAME_LEN != 0)? (AliOTA.size/FRAME_LEN + 1):(AliOTA.size/FRAME_LEN);
AliOTA.num = 1; //下载次数,初始值为1 AliOTA.num = 1; //下载次数,初始值为1
AliOTA.downlen = FRAME_LEN; //记录本次下载量 AliOTA.downlen = FRAME_LEN; //记录本次下载量
OTA_Download(AliOTA.downlen,(AliOTA.num - 1)*FRAME_LEN); //发送要下载的数据信息给服务器 OTA_Download(AliOTA.downlen,(AliOTA.num - 1)*FRAME_LEN); //发送要下载的数据信息给服务器
} }
if(strstr((char *)Aliyun_mqtt.cmdbuff,"download_reply")) if(strstr((char *)Platform_mqtt.cmdbuff,"download_reply"))
{ {
memset(FrameBuf,0,sizeof(FrameBuf)); memset(FrameBuf,0,sizeof(FrameBuf));
memcpy(FrameBuf, &MqttRxbuf[datalen-AliOTA.downlen-2], AliOTA.downlen); memcpy(FrameBuf, &MqttRxbuf[datalen-AliOTA.downlen-2], AliOTA.downlen);
@ -906,7 +904,18 @@ static void app_ota_by_platform(void)
mcuboot.op_reset(); mcuboot.op_reset();
} }
SHELL_EXPORT_CMD(SHELL_CMD_PERMISSION(0)|SHELL_CMD_TYPE(SHELL_TYPE_CMD_FUNC)|SHELL_CMD_PARAM_NUM(0),aliyun, app_ota_by_platform, ota by 4g function); int OtaTask(void)
{
int32 ota_task = 0;
ota_task = KTaskCreate("ota_platform", app_ota_by_platform, NULL,8192, 10);
if(ota_task < 0) {
KPrintf("ota_task create failed ...%s %d.\n", __FUNCTION__,__LINE__);
return ERROR;
}
StartupKTask(ota_task);
return 0;
}
#endif #endif

View File

@ -1,3 +1,3 @@
SRC_FILES := aliyun_mqtt.c SRC_FILES := platform_mqtt.c
include $(KERNEL_ROOT)/compiler.mk include $(KERNEL_ROOT)/compiler.mk

View File

@ -1,432 +0,0 @@
/*
* Copyright (c) 2020 AIIT XUOS Lab
* XiUOS is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
/**
* @file: aliyun_mqtt.c
* @brief: connect aliyun mqtt
* @version: 1.0
* @author: AIIT XUOS Lab
* @date: 2023/6/14
*
*/
#include <string.h>
#include <stdint.h>
#include <adapter.h>
#include "shell.h"
#include "xsconfig.h"
#include <adapter.h>
#include "aliyun_mqtt.h"
MQTT_TCB Aliyun_mqtt; //创建一个用于连接阿里云mqtt的结构体
static struct Adapter *adapter;
static const uint8_t parket_connetAck[] = {0x20,0x02,0x00,0x00}; //连接成功服务器回应报文
static const uint8_t parket_disconnet[] = {0xe0,0x00}; //客户端主动断开连接发送报文
static const uint8_t parket_heart[] = {0xc0,0x00}; //客户端发送保活心跳包
static const uint8_t parket_subAck[] = {0x90,0x03,0x00,0x0A,0x01}; //订阅成功服务器回应报文
static const uint8_t parket_unsubAck[] = {0xB0,0x02,0x00,0x0A}; //取消订阅成功服务器回应报文
static uint8_t mqtt_rxbuf[16];
/*******************************************************************************
* : AdapterNetActive
* : 使,TCP服务器并进入透传模式使4G方式
* :
* : 0,
*******************************************************************************/
int AdapterNetActive(void)
{
int ret = 0;
uint32_t baud_rate = BAUD_RATE_115200;
adapter = AdapterDeviceFindByName(ADAPTER_4G_NAME);
adapter->socket.socket_id = 0;
ret = AdapterDeviceOpen(adapter);
if (ret < 0)
{
goto out;
}
ret = AdapterDeviceControl(adapter, OPE_INT, &baud_rate);
if (ret < 0)
{
goto out;
}
ret = AdapterDeviceConnect(adapter, CLIENT, SERVERIP, SERVERPORT, IPV4);
if (ret < 0)
{
goto out;
}
out:
if (ret < 0)
{
AdapterDeviceClose(adapter);
}
return ret;
}
/*******************************************************************************
* : MQTT_Send
* : MQTT client数据发送函数
* : buf:,buflen:
* : 0,-1
*******************************************************************************/
int MQTT_Send(const uint8_t* buf, int buflen)
{
return AdapterDeviceSend(adapter, buf, buflen) ;
}
/*******************************************************************************
* : MQTT_Recv
* : MQTT client数据接收函数
* : buf:,buflen:
* : ,-1
*******************************************************************************/
int MQTT_Recv(uint8_t* buf, int buflen)
{
return AdapterDeviceRecv(adapter, buf, buflen) ;
}
/*******************************************************************************
* : MQTT_Connect
* : MQTT服务器
* :
* : 0,1
*******************************************************************************/
int MQTT_Connect(void)
{
uint8_t TryConnect_time = 10; //尝试登录次数
Aliyun_mqtt.MessageID = 0; //报文标识符清零,CONNECT报文虽然不需要添加报文标识符,但是CONNECT报文是第一个发送的报文,在此清零报文标识符为后续报文做准备
Aliyun_mqtt.Fixed_len = 1; //CONNECT报文固定报头长度暂定为1
Aliyun_mqtt.Variable_len = 10; //CONNECT报文可变报头长度为10
Aliyun_mqtt.Payload_len = (2+strlen(CLIENTID)) + (2+strlen(USERNAME)) + (2+strlen(PASSWORD)); //CONNECT报文中负载长度
Aliyun_mqtt.Remaining_len = Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len; //剩余长度=可变报头长度+负载长度
memset(Aliyun_mqtt.Pack_buff,0,sizeof(Aliyun_mqtt.Pack_buff));
Aliyun_mqtt.Pack_buff[0] = 0x10; //CONNECT报文 固定报头第1个字节0x10
do{
if((Aliyun_mqtt.Remaining_len/128) == 0)
{
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len] = Aliyun_mqtt.Remaining_len;
}
else
{
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len] = (Aliyun_mqtt.Remaining_len%128)|0x80;
}
Aliyun_mqtt.Fixed_len++;
Aliyun_mqtt.Remaining_len = Aliyun_mqtt.Remaining_len/128;
}while(Aliyun_mqtt.Remaining_len);
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+0] = 0x00; //CONNECT报文,可变报头第1个字节:固定0x00
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+1] = 0x04; //CONNECT报文,可变报头第2个字节:固定0x04
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+2] = 0x4D; //CONNECT报文,可变报头第3个字节:固定0x4D,大写字母M
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+3] = 0x51; //CONNECT报文,可变报头第4个字节:固定0x51,大写字母Q
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+4] = 0x54; //CONNECT报文,可变报头第5个字节:固定0x54,大写字母T
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+5] = 0x54; //CONNECT报文,可变报头第6个字节:固定0x54,大写字母T
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+6] = 0x04; //CONNECT报文,可变报头第7个字节:固定0x04
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+7] = 0xC2; //CONNECT报文,可变报头第8个字节:使能用户名和密码校验,不使用遗嘱功能,不保留会话功能
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+8] = 0x00; //CONNECT报文,可变报头第9个字节:保活时间高字节
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+9] = 0x64; //CONNECT报文,可变报头第10个字节:保活时间高字节
/* CLIENT_ID */
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+10] = strlen(CLIENTID)/256; //客户端ID长度高字节
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+11] = strlen(CLIENTID)%256; //客户端ID长度低字节
memcpy(&Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+12],CLIENTID,strlen(CLIENTID)); //复制过来客户端ID字串
/* USER_NAME */
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+12+strlen(CLIENTID)] = strlen(USERNAME)/256; //用户名长度高字节
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+13+strlen(CLIENTID)] = strlen(USERNAME)%256; //用户名长度低字节
memcpy(&Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+14+strlen(CLIENTID)],USERNAME,strlen(USERNAME)); //复制过来用户名字串
/* PASSWARD */
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+14+strlen(CLIENTID)+strlen(USERNAME)] = strlen(PASSWORD)/256; //密码长度高字节
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+15+strlen(CLIENTID)+strlen(USERNAME)] = strlen(PASSWORD)%256; //密码长度低字节
memcpy(&Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+16+strlen(CLIENTID)+strlen(USERNAME)],PASSWORD,strlen(PASSWORD)); //复制过来密码字串
while(TryConnect_time > 0)
{
memset(mqtt_rxbuf,0,sizeof(mqtt_rxbuf));
MQTT_Send(Aliyun_mqtt.Pack_buff,Aliyun_mqtt.Fixed_len + Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len);
MdelayKTask(50);
MQTT_Recv(mqtt_rxbuf, 4);
if(mqtt_rxbuf[0] == parket_connetAck[0] && mqtt_rxbuf[1] == parket_connetAck[1]) //连接成功
{
return 0;
}
TryConnect_time--;
}
return 1;
}
/*******************************************************************************
* : MQTT_Disconnect
* : MQTT服务器的连接
* :
* :
*******************************************************************************/
void MQTT_Disconnect(void)
{
while(MQTT_Send(parket_disconnet,sizeof(parket_disconnet)) < 0);
}
/*******************************************************************************
* : MQTT_SubscribeTopic
* : MQTT订阅单个主题
* : topic_name:
* : 0,1
*******************************************************************************/
int MQTT_SubscribeTopic(uint8_t *topic_name)
{
uint8_t TrySub_time = 10; //尝试订阅次数
Aliyun_mqtt.Fixed_len = 1; //SUBSCRIBE报文,固定报头长度暂定为1
Aliyun_mqtt.Variable_len = 2;//SUBSCRIBE报文,可变报头长度=2,2为字节报文标识符
Aliyun_mqtt.Payload_len = 0; //SUBSCRIBE报文,负载数据长度暂定为0
Aliyun_mqtt.Payload_len = strlen(topic_name) + 2 + 1; //每个需要订阅的topic除了本身的字符串长度,还包含表示topic字符串长度的2字节,以及订阅等级1字节
Aliyun_mqtt.Remaining_len = Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len; //计算剩余长度=可变报头长度+负载长度
memset(Aliyun_mqtt.Pack_buff,0,sizeof(Aliyun_mqtt.Pack_buff));
Aliyun_mqtt.Pack_buff[0]=0x82; //SUBSCRIBE报文,固定报头第1个字节0x82
do{
if((Aliyun_mqtt.Remaining_len/128) == 0)
{
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len] = Aliyun_mqtt.Remaining_len;
}
else
{
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len] = (Aliyun_mqtt.Remaining_len%128)|0x80;
}
Aliyun_mqtt.Fixed_len++;
Aliyun_mqtt.Remaining_len = Aliyun_mqtt.Remaining_len/128;
}while(Aliyun_mqtt.Remaining_len);
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+0] = Aliyun_mqtt.MessageID/256; //报文标识符高字节
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+1] = Aliyun_mqtt.MessageID%256; //报文标识符低字节
Aliyun_mqtt.MessageID++; //每用一次MessageID加1
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+2] = strlen(topic_name)/256; //主题长度高字节
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+3] = strlen(topic_name)%256; //主题长度低字节
memcpy(&Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+4],topic_name,strlen(topic_name)); //复制主题字串
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+4+strlen(topic_name)] = 0; //QOS等级设置为0
while(TrySub_time > 0)
{
memset(mqtt_rxbuf,0,sizeof(mqtt_rxbuf));
MQTT_Send(Aliyun_mqtt.Pack_buff,Aliyun_mqtt.Fixed_len + Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len);
MdelayKTask(50);
MQTT_Recv(mqtt_rxbuf, 5);
if(mqtt_rxbuf[0] == parket_subAck[0] && mqtt_rxbuf[1] == parket_subAck[1]) //订阅成功
{
return 0;
}
TrySub_time--;
}
return 1;
}
/*******************************************************************************
* : MQTT_UnSubscribeTopic
* : MQTT取消订阅单个主题
* : topic_name:
* : 0,1
*******************************************************************************/
int MQTT_UnSubscribeTopic(uint8_t *topic_name)
{
uint8_t TryUnSub_time = 10; //尝试取消订阅次数
Aliyun_mqtt.Fixed_len = 1; //UNSUBSCRIBE报文,固定报头长度暂定为1
Aliyun_mqtt.Variable_len = 2; //UNSUBSCRIBE报文,可变报头长度=2,2为字节报文标识符
Aliyun_mqtt.Payload_len = strlen(topic_name) + 2; //每个需要取消的订阅topic除了本身的字符串长度,还包含表示topic字符串长度的2字节
Aliyun_mqtt.Remaining_len = Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len; //计算剩余长度=可变报头长度+负载长度
memset(Aliyun_mqtt.Pack_buff,0,sizeof(Aliyun_mqtt.Pack_buff));
Aliyun_mqtt.Pack_buff[0]=0xA0; //UNSUBSCRIBE报文,固定报头第1个字节0xA0
do{
if((Aliyun_mqtt.Remaining_len/128) == 0)
{
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len] = Aliyun_mqtt.Remaining_len;
}
else
{
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len] = (Aliyun_mqtt.Remaining_len%128)|0x80;
}
Aliyun_mqtt.Fixed_len++;
Aliyun_mqtt.Remaining_len = Aliyun_mqtt.Remaining_len/128;
}while(Aliyun_mqtt.Remaining_len);
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+0] = Aliyun_mqtt.MessageID/256; //报文标识符高字节
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+1] = Aliyun_mqtt.MessageID%256; //报文标识符低字节
Aliyun_mqtt.MessageID++; //每用一次MessageID加1
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+2] = strlen(topic_name)/256; //主题长度高字节
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+3] = strlen(topic_name)%256; //主题长度低字节
memcpy(&Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+4],topic_name,strlen(topic_name)); //复制主题字串
while(TryUnSub_time > 0)
{
memset(mqtt_rxbuf,0,sizeof(mqtt_rxbuf));
MQTT_Send(Aliyun_mqtt.Pack_buff,Aliyun_mqtt.Fixed_len + Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len);
MdelayKTask(50);
MQTT_Recv(mqtt_rxbuf, 4);
if(mqtt_rxbuf[0] == parket_unsubAck[0] && mqtt_rxbuf[1] == parket_unsubAck[1]) //取消订阅成功
{
return 0;
}
TryUnSub_time--;
}
return 1;
}
/*******************************************************************************
* : MQTT_PublishDataQs0
* : 0Publish报文
* : topic_name:
data:
data_len:
* : Qs=0
*******************************************************************************/
void MQTT_PublishDataQs0(uint8_t *topic_name,uint8_t *data, uint16_t data_len)
{
Aliyun_mqtt.Fixed_len = 1; //PUBLISH等级0报文固定报头长度暂定为1
Aliyun_mqtt.Variable_len = 2 + strlen(topic_name); //PUBLISH等级0报文,可变报头长度=2字节topic长度标识字节+topic字符串的长度
Aliyun_mqtt.Payload_len = data_len; //PUBLISH等级0报文,负载数据长度=data_len
Aliyun_mqtt.Remaining_len = Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len; //计算剩余长度=可变报头长度+负载长度
memset(Aliyun_mqtt.Pack_buff,0,sizeof(Aliyun_mqtt.Pack_buff));
Aliyun_mqtt.Pack_buff[0]=0x30; //PUBLISH等级0报文固定报头第1个字节0x30
do{
if((Aliyun_mqtt.Remaining_len/128) == 0)
{
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len] = Aliyun_mqtt.Remaining_len;
}
else
{
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len] = (Aliyun_mqtt.Remaining_len%128)|0x80;
}
Aliyun_mqtt.Fixed_len++;
Aliyun_mqtt.Remaining_len = Aliyun_mqtt.Remaining_len/128;
}while(Aliyun_mqtt.Remaining_len);
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+0]=strlen(topic_name)/256; //主题长度高字节
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+1]=strlen(topic_name)%256; //主题长度低字节
memcpy(&Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+2],topic_name,strlen(topic_name)); //复制主题字串
memcpy(&Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+2+strlen(topic_name)],data,data_len); //复制data数据
MQTT_Send(Aliyun_mqtt.Pack_buff, Aliyun_mqtt.Fixed_len + Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len);
}
/*******************************************************************************
* : MQTT_PublishDataQs1
* : 1Publish报文
* : topic_name:
data:
data_len:
* :
*******************************************************************************/
void MQTT_PublishDataQs1(uint8_t *topic_name,uint8_t *data, uint16_t data_len)
{
Aliyun_mqtt.Fixed_len = 1; //PUBLISH等级1报文固定报头长度暂定为1
Aliyun_mqtt.Variable_len = 2 + 2 + strlen(topic_name); //PUBLISH等级1报文,可变报头长度=2字节消息标识符+2字节topic长度标识字节+topic字符串的长度
Aliyun_mqtt.Payload_len = data_len; //PUBLISH等级1报文,负载数据长度=data_len
Aliyun_mqtt.Remaining_len = Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len; //计算剩余长度=可变报头长度+负载长度
Aliyun_mqtt.Pack_buff[0] = 0x32; //等级1的Publish报文固定报头第1个字节0x32
do{
if(Aliyun_mqtt.Remaining_len/128 == 0)
{
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len] = Aliyun_mqtt.Remaining_len;
}
else
{
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len] = (Aliyun_mqtt.Remaining_len%128)|0x80;
}
Aliyun_mqtt.Fixed_len++;
Aliyun_mqtt.Remaining_len = Aliyun_mqtt.Remaining_len/128;
}while(Aliyun_mqtt.Remaining_len);
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+0] = strlen(topic_name)/256; //主题长度高字节
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+1] = strlen(topic_name)%256; //主题长度低字节
memcpy(&Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+2],topic_name,strlen(topic_name)); //复制主题字串
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+2+strlen(topic_name)] = Aliyun_mqtt.MessageID/256; //报文标识符高字节
Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+3+strlen(topic_name)] = Aliyun_mqtt.MessageID%256; //报文标识符低字节
Aliyun_mqtt.MessageID++; //每用一次MessageID加1
memcpy(&Aliyun_mqtt.Pack_buff[Aliyun_mqtt.Fixed_len+4+strlen(topic_name)],data,strlen(data)); //复制data数据
MQTT_Send(Aliyun_mqtt.Pack_buff,Aliyun_mqtt.Fixed_len + Aliyun_mqtt.Variable_len + Aliyun_mqtt.Payload_len);
}
/*******************************************************************************
* : MQTT_SendHeart
* :
* :
* : 0,
*******************************************************************************/
int MQTT_SendHeart(void)
{
uint8_t TrySentHeart_time = 10; //尝试发送心跳保活次数
while(TrySentHeart_time > 0)
{
memset(mqtt_rxbuf,0,sizeof(mqtt_rxbuf));
MQTT_Send(parket_heart,sizeof(parket_heart));
MdelayKTask(50);
MQTT_Recv(mqtt_rxbuf, 2);
if(mqtt_rxbuf[0] == 0xD0 && mqtt_rxbuf[1] == 0x00)
{
return 0;
}
TrySentHeart_time--;
}
return 1;
}
/*******************************************************************************
* : MQTT_DealPublishData
* : 0,topic信息
* : redata:,data_len:
* :
*******************************************************************************/
void MQTT_DealPublishData(uint8_t *data, uint16_t data_len)
{
uint8_t i;
uint16_t cmdpos,cmdlen;
for(i = 1;i < 5;i++)
{
if((data[i] & 0x80) == 0)
break;
}
cmdpos = 1+i+2;
cmdlen = data_len-(1+i+2);
if(data_len <= CMD_SIZE)
{
memset(Aliyun_mqtt.cmdbuff, 0, CMD_SIZE);
memcpy(Aliyun_mqtt.cmdbuff, &data[cmdpos], cmdlen);
}
}

View File

@ -0,0 +1,432 @@
/*
* Copyright (c) 2020 AIIT XUOS Lab
* XiUOS is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
/**
* @file: platform_mqtt.c
* @brief: platform_mqtt.c file
* @version: 1.0
* @author: AIIT XUOS Lab
* @date: 2023/6/14
*
*/
#include <string.h>
#include <stdint.h>
#include <adapter.h>
#include "shell.h"
#include "xsconfig.h"
#include <adapter.h>
#include "platform_mqtt.h"
MQTT_TCB Platform_mqtt; //创建一个用于连接云平台mqtt的结构体
static struct Adapter *adapter;
static const uint8_t parket_connetAck[] = {0x20,0x02,0x00,0x00}; //连接成功服务器回应报文
static const uint8_t parket_disconnet[] = {0xe0,0x00}; //客户端主动断开连接发送报文
static const uint8_t parket_heart[] = {0xc0,0x00}; //客户端发送保活心跳包
static const uint8_t parket_subAck[] = {0x90,0x03,0x00,0x0A,0x01}; //订阅成功服务器回应报文
static const uint8_t parket_unsubAck[] = {0xB0,0x02,0x00,0x0A}; //取消订阅成功服务器回应报文
static uint8_t mqtt_rxbuf[16];
/*******************************************************************************
* : AdapterNetActive
* : 使,TCP服务器并进入透传模式使4G方式
* :
* : 0,
*******************************************************************************/
int AdapterNetActive(void)
{
int ret = 0;
uint32_t baud_rate = BAUD_RATE_115200;
adapter = AdapterDeviceFindByName(ADAPTER_4G_NAME);
adapter->socket.socket_id = 0;
ret = AdapterDeviceOpen(adapter);
if (ret < 0)
{
goto out;
}
ret = AdapterDeviceControl(adapter, OPE_INT, &baud_rate);
if (ret < 0)
{
goto out;
}
ret = AdapterDeviceConnect(adapter, CLIENT, SERVERIP, SERVERPORT, IPV4);
if (ret < 0)
{
goto out;
}
out:
if (ret < 0)
{
AdapterDeviceClose(adapter);
}
return ret;
}
/*******************************************************************************
* : MQTT_Send
* : MQTT client数据发送函数
* : buf:,buflen:
* : 0,-1
*******************************************************************************/
int MQTT_Send(const uint8_t* buf, int buflen)
{
return AdapterDeviceSend(adapter, buf, buflen) ;
}
/*******************************************************************************
* : MQTT_Recv
* : MQTT client数据接收函数
* : buf:,buflen:
* : ,-1
*******************************************************************************/
int MQTT_Recv(uint8_t* buf, int buflen)
{
return AdapterDeviceRecv(adapter, buf, buflen) ;
}
/*******************************************************************************
* : MQTT_Connect
* : MQTT服务器
* :
* : 0,1
*******************************************************************************/
int MQTT_Connect(void)
{
uint8_t TryConnect_time = 10; //尝试登录次数
Platform_mqtt.MessageID = 0; //报文标识符清零,CONNECT报文虽然不需要添加报文标识符,但是CONNECT报文是第一个发送的报文,在此清零报文标识符为后续报文做准备
Platform_mqtt.Fixed_len = 1; //CONNECT报文固定报头长度暂定为1
Platform_mqtt.Variable_len = 10; //CONNECT报文可变报头长度为10
Platform_mqtt.Payload_len = (2+strlen(CLIENTID)) + (2+strlen(USERNAME)) + (2+strlen(PASSWORD)); //CONNECT报文中负载长度
Platform_mqtt.Remaining_len = Platform_mqtt.Variable_len + Platform_mqtt.Payload_len; //剩余长度=可变报头长度+负载长度
memset(Platform_mqtt.Pack_buff,0,sizeof(Platform_mqtt.Pack_buff));
Platform_mqtt.Pack_buff[0] = 0x10; //CONNECT报文 固定报头第1个字节0x10
do{
if((Platform_mqtt.Remaining_len/128) == 0)
{
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len] = Platform_mqtt.Remaining_len;
}
else
{
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len] = (Platform_mqtt.Remaining_len%128)|0x80;
}
Platform_mqtt.Fixed_len++;
Platform_mqtt.Remaining_len = Platform_mqtt.Remaining_len/128;
}while(Platform_mqtt.Remaining_len);
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+0] = 0x00; //CONNECT报文,可变报头第1个字节:固定0x00
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+1] = 0x04; //CONNECT报文,可变报头第2个字节:固定0x04
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+2] = 0x4D; //CONNECT报文,可变报头第3个字节:固定0x4D,大写字母M
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+3] = 0x51; //CONNECT报文,可变报头第4个字节:固定0x51,大写字母Q
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+4] = 0x54; //CONNECT报文,可变报头第5个字节:固定0x54,大写字母T
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+5] = 0x54; //CONNECT报文,可变报头第6个字节:固定0x54,大写字母T
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+6] = 0x04; //CONNECT报文,可变报头第7个字节:固定0x04
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+7] = 0xC2; //CONNECT报文,可变报头第8个字节:使能用户名和密码校验,不使用遗嘱功能,不保留会话功能
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+8] = 0x00; //CONNECT报文,可变报头第9个字节:保活时间高字节
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+9] = 0x64; //CONNECT报文,可变报头第10个字节:保活时间高字节
/* CLIENT_ID */
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+10] = strlen(CLIENTID)/256; //客户端ID长度高字节
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+11] = strlen(CLIENTID)%256; //客户端ID长度低字节
memcpy(&Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+12],CLIENTID,strlen(CLIENTID)); //复制过来客户端ID字串
/* USER_NAME */
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+12+strlen(CLIENTID)] = strlen(USERNAME)/256; //用户名长度高字节
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+13+strlen(CLIENTID)] = strlen(USERNAME)%256; //用户名长度低字节
memcpy(&Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+14+strlen(CLIENTID)],USERNAME,strlen(USERNAME)); //复制过来用户名字串
/* PASSWARD */
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+14+strlen(CLIENTID)+strlen(USERNAME)] = strlen(PASSWORD)/256; //密码长度高字节
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+15+strlen(CLIENTID)+strlen(USERNAME)] = strlen(PASSWORD)%256; //密码长度低字节
memcpy(&Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+16+strlen(CLIENTID)+strlen(USERNAME)],PASSWORD,strlen(PASSWORD)); //复制过来密码字串
while(TryConnect_time > 0)
{
memset(mqtt_rxbuf,0,sizeof(mqtt_rxbuf));
MQTT_Send(Platform_mqtt.Pack_buff,Platform_mqtt.Fixed_len + Platform_mqtt.Variable_len + Platform_mqtt.Payload_len);
MdelayKTask(50);
MQTT_Recv(mqtt_rxbuf, 4);
if(mqtt_rxbuf[0] == parket_connetAck[0] && mqtt_rxbuf[1] == parket_connetAck[1]) //连接成功
{
return 0;
}
TryConnect_time--;
}
return 1;
}
/*******************************************************************************
* : MQTT_Disconnect
* : MQTT服务器的连接
* :
* :
*******************************************************************************/
void MQTT_Disconnect(void)
{
while(MQTT_Send(parket_disconnet,sizeof(parket_disconnet)) < 0);
}
/*******************************************************************************
* : MQTT_SubscribeTopic
* : MQTT订阅单个主题
* : topic_name:
* : 0,1
*******************************************************************************/
int MQTT_SubscribeTopic(uint8_t *topic_name)
{
uint8_t TrySub_time = 10; //尝试订阅次数
Platform_mqtt.Fixed_len = 1; //SUBSCRIBE报文,固定报头长度暂定为1
Platform_mqtt.Variable_len = 2;//SUBSCRIBE报文,可变报头长度=2,2为字节报文标识符
Platform_mqtt.Payload_len = 0; //SUBSCRIBE报文,负载数据长度暂定为0
Platform_mqtt.Payload_len = strlen(topic_name) + 2 + 1; //每个需要订阅的topic除了本身的字符串长度,还包含表示topic字符串长度的2字节,以及订阅等级1字节
Platform_mqtt.Remaining_len = Platform_mqtt.Variable_len + Platform_mqtt.Payload_len; //计算剩余长度=可变报头长度+负载长度
memset(Platform_mqtt.Pack_buff,0,sizeof(Platform_mqtt.Pack_buff));
Platform_mqtt.Pack_buff[0]=0x82; //SUBSCRIBE报文,固定报头第1个字节0x82
do{
if((Platform_mqtt.Remaining_len/128) == 0)
{
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len] = Platform_mqtt.Remaining_len;
}
else
{
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len] = (Platform_mqtt.Remaining_len%128)|0x80;
}
Platform_mqtt.Fixed_len++;
Platform_mqtt.Remaining_len = Platform_mqtt.Remaining_len/128;
}while(Platform_mqtt.Remaining_len);
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+0] = Platform_mqtt.MessageID/256; //报文标识符高字节
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+1] = Platform_mqtt.MessageID%256; //报文标识符低字节
Platform_mqtt.MessageID++; //每用一次MessageID加1
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+2] = strlen(topic_name)/256; //主题长度高字节
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+3] = strlen(topic_name)%256; //主题长度低字节
memcpy(&Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+4],topic_name,strlen(topic_name)); //复制主题字串
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+4+strlen(topic_name)] = 0; //QOS等级设置为0
while(TrySub_time > 0)
{
memset(mqtt_rxbuf,0,sizeof(mqtt_rxbuf));
MQTT_Send(Platform_mqtt.Pack_buff,Platform_mqtt.Fixed_len + Platform_mqtt.Variable_len + Platform_mqtt.Payload_len);
MdelayKTask(50);
MQTT_Recv(mqtt_rxbuf, 5);
if(mqtt_rxbuf[0] == parket_subAck[0] && mqtt_rxbuf[1] == parket_subAck[1]) //订阅成功
{
return 0;
}
TrySub_time--;
}
return 1;
}
/*******************************************************************************
* : MQTT_UnSubscribeTopic
* : MQTT取消订阅单个主题
* : topic_name:
* : 0,1
*******************************************************************************/
int MQTT_UnSubscribeTopic(uint8_t *topic_name)
{
uint8_t TryUnSub_time = 10; //尝试取消订阅次数
Platform_mqtt.Fixed_len = 1; //UNSUBSCRIBE报文,固定报头长度暂定为1
Platform_mqtt.Variable_len = 2; //UNSUBSCRIBE报文,可变报头长度=2,2为字节报文标识符
Platform_mqtt.Payload_len = strlen(topic_name) + 2; //每个需要取消的订阅topic除了本身的字符串长度,还包含表示topic字符串长度的2字节
Platform_mqtt.Remaining_len = Platform_mqtt.Variable_len + Platform_mqtt.Payload_len; //计算剩余长度=可变报头长度+负载长度
memset(Platform_mqtt.Pack_buff,0,sizeof(Platform_mqtt.Pack_buff));
Platform_mqtt.Pack_buff[0]=0xA0; //UNSUBSCRIBE报文,固定报头第1个字节0xA0
do{
if((Platform_mqtt.Remaining_len/128) == 0)
{
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len] = Platform_mqtt.Remaining_len;
}
else
{
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len] = (Platform_mqtt.Remaining_len%128)|0x80;
}
Platform_mqtt.Fixed_len++;
Platform_mqtt.Remaining_len = Platform_mqtt.Remaining_len/128;
}while(Platform_mqtt.Remaining_len);
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+0] = Platform_mqtt.MessageID/256; //报文标识符高字节
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+1] = Platform_mqtt.MessageID%256; //报文标识符低字节
Platform_mqtt.MessageID++; //每用一次MessageID加1
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+2] = strlen(topic_name)/256; //主题长度高字节
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+3] = strlen(topic_name)%256; //主题长度低字节
memcpy(&Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+4],topic_name,strlen(topic_name)); //复制主题字串
while(TryUnSub_time > 0)
{
memset(mqtt_rxbuf,0,sizeof(mqtt_rxbuf));
MQTT_Send(Platform_mqtt.Pack_buff,Platform_mqtt.Fixed_len + Platform_mqtt.Variable_len + Platform_mqtt.Payload_len);
MdelayKTask(50);
MQTT_Recv(mqtt_rxbuf, 4);
if(mqtt_rxbuf[0] == parket_unsubAck[0] && mqtt_rxbuf[1] == parket_unsubAck[1]) //取消订阅成功
{
return 0;
}
TryUnSub_time--;
}
return 1;
}
/*******************************************************************************
* : MQTT_PublishDataQs0
* : 0Publish报文
* : topic_name:
data:
data_len:
* : Qs=0
*******************************************************************************/
void MQTT_PublishDataQs0(uint8_t *topic_name,uint8_t *data, uint16_t data_len)
{
Platform_mqtt.Fixed_len = 1; //PUBLISH等级0报文固定报头长度暂定为1
Platform_mqtt.Variable_len = 2 + strlen(topic_name); //PUBLISH等级0报文,可变报头长度=2字节topic长度标识字节+topic字符串的长度
Platform_mqtt.Payload_len = data_len; //PUBLISH等级0报文,负载数据长度=data_len
Platform_mqtt.Remaining_len = Platform_mqtt.Variable_len + Platform_mqtt.Payload_len; //计算剩余长度=可变报头长度+负载长度
memset(Platform_mqtt.Pack_buff,0,sizeof(Platform_mqtt.Pack_buff));
Platform_mqtt.Pack_buff[0]=0x30; //PUBLISH等级0报文固定报头第1个字节0x30
do{
if((Platform_mqtt.Remaining_len/128) == 0)
{
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len] = Platform_mqtt.Remaining_len;
}
else
{
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len] = (Platform_mqtt.Remaining_len%128)|0x80;
}
Platform_mqtt.Fixed_len++;
Platform_mqtt.Remaining_len = Platform_mqtt.Remaining_len/128;
}while(Platform_mqtt.Remaining_len);
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+0]=strlen(topic_name)/256; //主题长度高字节
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+1]=strlen(topic_name)%256; //主题长度低字节
memcpy(&Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+2],topic_name,strlen(topic_name)); //复制主题字串
memcpy(&Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+2+strlen(topic_name)],data,data_len); //复制data数据
MQTT_Send(Platform_mqtt.Pack_buff, Platform_mqtt.Fixed_len + Platform_mqtt.Variable_len + Platform_mqtt.Payload_len);
}
/*******************************************************************************
* : MQTT_PublishDataQs1
* : 1Publish报文
* : topic_name:
data:
data_len:
* :
*******************************************************************************/
void MQTT_PublishDataQs1(uint8_t *topic_name,uint8_t *data, uint16_t data_len)
{
Platform_mqtt.Fixed_len = 1; //PUBLISH等级1报文固定报头长度暂定为1
Platform_mqtt.Variable_len = 2 + 2 + strlen(topic_name); //PUBLISH等级1报文,可变报头长度=2字节消息标识符+2字节topic长度标识字节+topic字符串的长度
Platform_mqtt.Payload_len = data_len; //PUBLISH等级1报文,负载数据长度=data_len
Platform_mqtt.Remaining_len = Platform_mqtt.Variable_len + Platform_mqtt.Payload_len; //计算剩余长度=可变报头长度+负载长度
Platform_mqtt.Pack_buff[0] = 0x32; //等级1的Publish报文固定报头第1个字节0x32
do{
if(Platform_mqtt.Remaining_len/128 == 0)
{
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len] = Platform_mqtt.Remaining_len;
}
else
{
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len] = (Platform_mqtt.Remaining_len%128)|0x80;
}
Platform_mqtt.Fixed_len++;
Platform_mqtt.Remaining_len = Platform_mqtt.Remaining_len/128;
}while(Platform_mqtt.Remaining_len);
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+0] = strlen(topic_name)/256; //主题长度高字节
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+1] = strlen(topic_name)%256; //主题长度低字节
memcpy(&Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+2],topic_name,strlen(topic_name)); //复制主题字串
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+2+strlen(topic_name)] = Platform_mqtt.MessageID/256; //报文标识符高字节
Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+3+strlen(topic_name)] = Platform_mqtt.MessageID%256; //报文标识符低字节
Platform_mqtt.MessageID++; //每用一次MessageID加1
memcpy(&Platform_mqtt.Pack_buff[Platform_mqtt.Fixed_len+4+strlen(topic_name)],data,strlen(data)); //复制data数据
MQTT_Send(Platform_mqtt.Pack_buff,Platform_mqtt.Fixed_len + Platform_mqtt.Variable_len + Platform_mqtt.Payload_len);
}
/*******************************************************************************
* : MQTT_SendHeart
* :
* :
* : 0,
*******************************************************************************/
int MQTT_SendHeart(void)
{
uint8_t TrySentHeart_time = 10; //尝试发送心跳保活次数
while(TrySentHeart_time > 0)
{
memset(mqtt_rxbuf,0,sizeof(mqtt_rxbuf));
MQTT_Send(parket_heart,sizeof(parket_heart));
MdelayKTask(50);
MQTT_Recv(mqtt_rxbuf, 2);
if(mqtt_rxbuf[0] == 0xD0 && mqtt_rxbuf[1] == 0x00)
{
return 0;
}
TrySentHeart_time--;
}
return 1;
}
/*******************************************************************************
* : MQTT_DealPublishData
* : 0,topic信息
* : redata:,data_len:
* :
*******************************************************************************/
void MQTT_DealPublishData(uint8_t *data, uint16_t data_len)
{
uint8_t i;
uint16_t cmdpos,cmdlen;
for(i = 1;i < 5;i++)
{
if((data[i] & 0x80) == 0)
break;
}
cmdpos = 1+i+2;
cmdlen = data_len-(1+i+2);
if(data_len <= CMD_SIZE)
{
memset(Platform_mqtt.cmdbuff, 0, CMD_SIZE);
memcpy(Platform_mqtt.cmdbuff, &data[cmdpos], cmdlen);
}
}

View File

@ -11,16 +11,16 @@
*/ */
/** /**
* @file: aliyun_mqtt.h * @file: platform_mqtt.h
* @brief: connect aliyun mqtt * @brief: platform_mqtt.h file
* @version: 1.0 * @version: 1.0
* @author: AIIT XUOS Lab * @author: AIIT XUOS Lab
* @date: 2023/6/14 * @date: 2023/6/14
* *
*/ */
#ifndef _ALIYUN_MQTT_H_ #ifndef _PLATFORM_MQTT_H_
#define _ALIYUN_MQTT_H_ #define _PLATFORM_MQTT_H_
#include <stdint.h> #include <stdint.h>
@ -31,7 +31,7 @@
#define SERVERPORT "1883" #define SERVERPORT "1883"
#define PACK_SIZE 512 //存放报文数据缓冲区大小 #define PACK_SIZE 512 //存放报文数据缓冲区大小
#define CMD_SIZE 2048 //保存推送的PUBLISH报文中的数据缓冲区大小 #define CMD_SIZE 3072 //保存推送的PUBLISH报文中的数据缓冲区大小
typedef struct{ typedef struct{
uint8_t Pack_buff[PACK_SIZE]; //存放发送报文数据缓冲区 uint8_t Pack_buff[PACK_SIZE]; //存放发送报文数据缓冲区
@ -43,7 +43,7 @@ typedef struct{
uint8_t cmdbuff[CMD_SIZE]; //保存推送的PUBLISH报文中的数据缓冲区 uint8_t cmdbuff[CMD_SIZE]; //保存推送的PUBLISH报文中的数据缓冲区
}MQTT_TCB; }MQTT_TCB;
extern MQTT_TCB Aliyun_mqtt; //外部变量声明 extern MQTT_TCB Platform_mqtt; //外部变量声明
int AdapterNetActive(void); int AdapterNetActive(void);
int MQTT_Send(const uint8_t* buf, int buflen); int MQTT_Send(const uint8_t* buf, int buflen);