1、Optimize mqtt transfer speed 3k/s

2、add json file to sd card by mqtt
This commit is contained in:
wgzAIIT 2023-09-19 16:07:24 +08:00
parent 60a7835ef8
commit 9fd61d3d2a
8 changed files with 104 additions and 36 deletions

View File

@ -29,7 +29,7 @@
#define REPLY_TIME_OUT 10
#ifdef TOOL_USING_OTA
#define ENTM_RECV_MAX OTA_RX_BUFFERSIZE
#define ENTM_RECV_MAX (OTA_FRAME_SIZE + 512)
#else
#define ENTM_RECV_MAX 256
#endif

View File

@ -41,6 +41,13 @@ menu "lib using MQTT"
string "xiuos platform server port."
default "1883"
endmenu
menuconfig USING_DOWNLOAD_JSON
bool "Enable support download json file function"
default n
select BSP_USING_SDIO
select MOUNT_SDCARD_FS
select LIB_USING_CJSON
endif
if ALIBABA_PLATFORM

View File

@ -422,9 +422,9 @@ bool MQTT_SendHeart(void)
* : MQTT_DealPublishData
* : 0,topic信息
* : redata:,data_len:
* :
* : +
*******************************************************************************/
void MQTT_DealPublishData(uint8_t *data, uint16_t data_len)
uint16_t MQTT_DealPublishData(uint8_t *data, uint16_t data_len)
{
uint8_t i;
uint16_t cmdpos,cmdlen;
@ -438,7 +438,7 @@ void MQTT_DealPublishData(uint8_t *data, uint16_t data_len)
//1代表固定报头占一个字节,i代表可变报头长度字段所占用字节数,2代表主题长度字段占2字节,cmdpos代表报文里主题名称起始位置
cmdpos = 1+i+2;
//data_len减去1+i+2就是报文中有效载荷的长度
//data_len减去1+i+2就是报文中主题部分+实际负载的长度
cmdlen = data_len-(1+i+2);
if(data_len <= CMD_SIZE)
@ -446,4 +446,6 @@ void MQTT_DealPublishData(uint8_t *data, uint16_t data_len)
memset(Platform_mqtt.cmdbuff, 0, CMD_SIZE);
memcpy(Platform_mqtt.cmdbuff, &data[cmdpos], cmdlen);
}
return cmdlen;
}

View File

@ -25,13 +25,14 @@
#include <stdint.h>
#include <stdbool.h>
#define KEEPALIVE_TIME 300 //保活时间(单位s),300s
#define HEART_TIME 200000 //空闲时发送心跳包的时间间隔(单位ms),200s
#define PACK_SIZE 512 //存放报文数据缓冲区大小
#define CMD_SIZE 3072 //保存推送的PUBLISH报文中的数据缓冲区大小
#define CLIENTID_SIZE 64 //存放客户端ID的缓冲区大小
#define USERNAME_SIZE 64 //存放用户名的缓冲区大小
#define PASSWARD_SIZE 64 //存放密码的缓冲区大小
#define KEEPALIVE_TIME 300 //保活时间(单位s),300s
#define HEART_TIME 200000 //空闲时发送心跳包的时间间隔(单位ms),200s
#define PACK_SIZE 512 //存放报文数据缓冲区大小
#define MQTT_FRAME_SIZE 3072 //保存推送的PUBLISH报文中的数据负载大小,最大3k
#define CMD_SIZE (MQTT_FRAME_SIZE + 512) //保存推送的PUBLISH报文中的数据缓冲区大小
#define CLIENTID_SIZE 64 //存放客户端ID的缓冲区大小
#define USERNAME_SIZE 64 //存放用户名的缓冲区大小
#define PASSWARD_SIZE 64 //存放密码的缓冲区大小
typedef struct{
uint8_t ClientID[CLIENTID_SIZE]; //存放客户端ID的缓冲区
@ -58,5 +59,5 @@ bool MQTT_UnSubscribeTopic(uint8_t *topic_name);
void MQTT_PublishDataQs0(uint8_t *topic_name,uint8_t *data, uint16_t data_len);
void MQTT_PublishDataQs1(uint8_t *topic_name,uint8_t *data, uint16_t data_len);
bool MQTT_SendHeart(void);
void MQTT_DealPublishData(uint8_t *data, uint16_t data_len);
uint16_t MQTT_DealPublishData(uint8_t *data, uint16_t data_len);
#endif

View File

@ -350,10 +350,9 @@ static uint8 SdCardReadCd(void)
return BusDevReadData(pin->owner_haldev, &read_param);
}
static int sd_card_status = 0;
static void SdCardTask(void* parameter)
{
static int sd_card_status = 0;
while (1) {
if (!SdCardReadCd()) {
if (!sd_card_status) {
@ -369,6 +368,11 @@ static void SdCardTask(void* parameter)
}
}
int GetSdCardStatus(void)
{
return sd_card_status;
}
#ifdef MOUNT_SDCARD
int MountSDCard()
{

View File

@ -65,10 +65,10 @@ menu "OTA function"
default 10000 if OTA_BY_TCPSERVER
default 10000 if MCUBOOT_BOOTLOADER
config OTA_RX_BUFFERSIZE
int "OTA receive data buffer size."
config OTA_FRAME_SIZE
int "OTA receive data frame size."
default 3072 if OTA_BY_PLATFORM
default 2048 if OTA_BY_TCPSERVER
default 1024 if OTA_BY_TCPSERVER
default 256 if MCUBOOT_BOOTLOADER
endif

View File

@ -19,10 +19,10 @@
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <transform.h>
#include "shell.h"
#include "xsconfig.h"
#include "mcuboot.h"
#include "ymodem.h"
#include "ota.h"
@ -35,6 +35,10 @@
#include "platform_mqtt.h"
#endif
#ifdef USING_DOWNLOAD_JSON
#include "cJSON.h"
#endif
/****************************************************************************
* Private Function Prototypes
****************************************************************************/
@ -790,10 +794,14 @@ static void app_ota_by_4g(void)
SHELL_EXPORT_CMD(SHELL_CMD_PERMISSION(0)|SHELL_CMD_TYPE(SHELL_TYPE_CMD_FUNC)|SHELL_CMD_PARAM_NUM(0),ota, app_ota_by_4g, ota by 4g function);
#endif
#ifdef OTA_BY_PLATFORM
#define FRAME_LEN 2048 //每帧数据的数据包长度
static uint8_t MqttRxbuf[3072];
#if (OTA_FRAME_SIZE <= MQTT_FRAME_SIZE)
#define FRAME_LEN OTA_FRAME_SIZE
#else
#error "The value of FRAME_LEN should not be greater than MQTT_FRAME_SIZE!!"
#endif
static uint8_t MqttRxbuf[FRAME_LEN + 512];
static uint8_t FrameBuf[FRAME_LEN];
static OTA_TCB platform_ota;
@ -837,12 +845,12 @@ static void OTA_Download(int size, int offset)
}
/*******************************************************************************
* : app_ota_by_platform
* : MQTT进行升级
* : mqttCloudInteraction
* : MQTT协议与云平台交互
* :
* :
*******************************************************************************/
static void app_ota_by_platform(void* parameter)
static void mqttCloudInteraction(void* parameter)
{
int datalen;
int ret = 0;
@ -850,8 +858,9 @@ static void app_ota_by_platform(void* parameter)
ota_info_t ota_info;
uint32_t heart_time = 0;
uint32_t flashdestination = DOWN_FLAH_ADDRESS;
uint8_t topicdatabuff[2][64];
uint8_t topicdatabuff[2][32];
char *ptr1, *ptr2;
uint16_t cmdlen;
mcuboot.flash_init();
memset(&ota_info, 0, sizeof(ota_info_t));
@ -865,7 +874,7 @@ static void app_ota_by_platform(void* parameter)
reconnect:
if((AdapterNetActive() == 0) && MQTT_Connect() && MQTT_SubscribeTopic(topicdatabuff[0]) && MQTT_SubscribeTopic(topicdatabuff[1]))
{
KPrintf("Log in to the cloud platform and subscribe to the topic successfully.\n");
KPrintf("Log in to the cloud platform and subscribe to the ota topic successfully.\n");
PropertyVersion();
}
else
@ -874,6 +883,17 @@ reconnect:
goto reconnect;
}
#ifdef USING_DOWNLOAD_JSON
uint8_t jsontopicdatabuff[32];
uint8_t jsonfilename[32];
memset(jsontopicdatabuff,0,sizeof(jsontopicdatabuff));
sprintf(jsontopicdatabuff,"protocol/%s/files",CLIENTID);
if(MQTT_SubscribeTopic(jsontopicdatabuff))
{
KPrintf("subscribe to the json download topic successfully.\n");
}
#endif
while(1)
{
memset(MqttRxbuf,0,sizeof(MqttRxbuf));
@ -896,7 +916,7 @@ reconnect:
else if(MqttRxbuf[0] == 0x30)
{
freecnt = 0;
MQTT_DealPublishData(MqttRxbuf, datalen);
cmdlen = MQTT_DealPublishData(MqttRxbuf, datalen);
// 1.获取新版本固件大小及版本信息
ptr1 = strstr((char *)Platform_mqtt.cmdbuff,topicdatabuff[0]);
@ -993,7 +1013,40 @@ reconnect:
break;
}
}
#ifdef USING_DOWNLOAD_JSON
// 3.下载json文件,SD卡要确保已经插入
extern int GetSdCardStatus(void);
if(strstr((char *)Platform_mqtt.cmdbuff,jsontopicdatabuff) && GetSdCardStatus())
{
KPrintf("------Start download joson file !------\r\n");
memset(jsonfilename,0,sizeof(jsonfilename));
memset(FrameBuf,0,sizeof(FrameBuf));
memcpy(FrameBuf, &Platform_mqtt.cmdbuff[strlen(jsontopicdatabuff)],cmdlen-strlen(jsontopicdatabuff));
cJSON *json_obj = cJSON_Parse(FrameBuf);
char* product_name = cJSON_GetObjectItem(json_obj, "productName")->valuestring;
sprintf(jsonfilename,"%s",product_name);
strcat(jsonfilename,".json");
FILE *fp = fopen(jsonfilename, "w");
if(fp == NULL)
{
KPrintf("%s file create failed,please check!\r\n",jsonfilename);
cJSON_Delete(json_obj);
continue;
}
else
{
KPrintf("%s file create success!\r\n",jsonfilename);
char *json_print_str = cJSON_Print(json_obj);
fprintf(fp, "%s", json_print_str);
fclose(fp);
cJSON_free(json_print_str);
cJSON_Delete(json_obj);
}
KPrintf("download %s file done!------\r\n",jsonfilename);
}
#endif
}
else
{
@ -1083,12 +1136,12 @@ static void OTA_Download(int size, int offset)
}
/*******************************************************************************
* : app_ota_by_platform
* : MQTT进行升级
* : mqttCloudInteraction
* : MQTT协议与云平台交互
* :
* :
*******************************************************************************/
static void app_ota_by_platform(void* parameter)
static void mqttCloudInteraction(void* parameter)
{
int datalen;
int ret = 0;
@ -1277,9 +1330,9 @@ reconnect:
int OtaTask(void)
{
int32 ota_task = 0;
ota_task = KTaskCreate("ota_platform", app_ota_by_platform, NULL,8192, 10);
ota_task = KTaskCreate("mqtt_platform", mqttCloudInteraction, NULL,10240, 10);
if(ota_task < 0) {
KPrintf("ota_task create failed ...%s %d.\n", __FUNCTION__,__LINE__);
KPrintf("matt platform task create failed ...%s %d.\n", __FUNCTION__,__LINE__);
return ERROR;
}

View File

@ -22,6 +22,7 @@
#define __OTA_DEF_H__
#include "flash_ops.h"
#include "xsconfig.h"
#define JUMP_FAILED_FLAG 0XABABABAB
#define JUMP_SUCCESS_FLAG 0XCDCDCDCD
@ -58,10 +59,10 @@ typedef struct {
#ifdef OTA_BY_TCPSERVER
#define STARTFLAG 0x1A2B //数据帧开始信号标记
#define DATAFLAG 0x3C4D //数据帧数据信号标记
#define ENDTFLAG 0x5E6F //数据帧结束信号标记
#define LENGTH 1024 //每帧数据的数据包长度
#define STARTFLAG 0x1A2B //数据帧开始信号标记
#define DATAFLAG 0x3C4D //数据帧数据信号标记
#define ENDTFLAG 0x5E6F //数据帧结束信号标记
#define LENGTH OTA_FRAME_SIZE //每帧数据的数据包长度
/*bin包传输过程中的数据帧相关的结构体*/
typedef struct
{