From 575c86fcd7e9f7b06dd353ac1268817d920ad849 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 16 May 2022 17:27:54 +0800 Subject: [PATCH 01/18] fix:add scale unit test to CI --- source/libs/scalar/test/scalar/CMakeLists.txt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/libs/scalar/test/scalar/CMakeLists.txt b/source/libs/scalar/test/scalar/CMakeLists.txt index 03418d17ac..15d1c2cb44 100644 --- a/source/libs/scalar/test/scalar/CMakeLists.txt +++ b/source/libs/scalar/test/scalar/CMakeLists.txt @@ -17,3 +17,7 @@ TARGET_INCLUDE_DIRECTORIES( PUBLIC "${TD_SOURCE_DIR}/source/libs/parser/inc" PRIVATE "${TD_SOURCE_DIR}/source/libs/scalar/inc" ) +add_test( + NAME scalarTest + COMMAND scalarTest +) From 0a2f1cb7733c425a51546fcbe6d9bbd62982e2c7 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 16 May 2022 20:51:00 +0800 Subject: [PATCH 02/18] refactor:add the configuration of child table name --- include/common/tglobal.h | 3 + source/client/src/clientSml.c | 123 ++++++++++++++++++++-------------- source/common/src/tglobal.c | 6 ++ 3 files changed, 80 insertions(+), 52 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 83283cfdad..32cfb5dbaa 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -125,6 +125,9 @@ extern SDiskCfg tsDiskCfg[]; // udf extern bool tsStartUdfd; +// schemaless +extern char tsSmlChildTableName[]; + // internal extern int32_t tsTransPullupInterval; extern int32_t tsMqRebalanceInterval; diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 8fce5bfb00..a18b78b501 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -16,6 +16,7 @@ #include "clientInt.h" #include "tname.h" #include "cJSON.h" +#include "tglobal.h" //================================================================================================= #define SPACE ' ' @@ -54,6 +55,9 @@ for (int i = 1; i < keyLen; ++i) { \ } \ } +#define IS_INVALID_COL_LEN(len) ((len) <= 0 || (len) >= TSDB_COL_NAME_LEN) +#define IS_INVALID_TABLE_LEN(len) ((len) <= 0 || (len) >= TSDB_TABLE_NAME_LEN) + #define OTD_MAX_FIELDS_NUM 2 #define OTD_JSON_SUB_FIELDS_NUM 2 #define OTD_JSON_FIELDS_NUM 4 @@ -899,8 +903,8 @@ static int32_t smlParseInfluxString(const char* sql, SSmlLineInfo *elements, SSm sql++; } elements->measureLen = sql - elements->measure; - if(elements->measureLen == 0) { - smlBuildInvalidDataMsg(msg, "measure is empty", NULL); + if(IS_INVALID_TABLE_LEN(elements->measureLen)) { + smlBuildInvalidDataMsg(msg, "measure is empty or too large than 192", NULL); return TSDB_CODE_SML_INVALID_DATA; } @@ -969,8 +973,9 @@ static void smlParseTelnetElement(const char **sql, const char **data, int32_t * } } -static int32_t smlParseTelnetTags(const char* data, SArray *cols, SHashObj *dumplicateKey, SSmlMsgBuf *msg){ +static int32_t smlParseTelnetTags(const char* data, SArray *cols, char *childTableName, SHashObj *dumplicateKey, SSmlMsgBuf *msg){ const char *sql = data; + size_t childTableNameLen = strlen(tsSmlChildTableName); while(*sql != '\0'){ JUMP_SPACE(sql) if(*sql == '\0') break; @@ -992,7 +997,7 @@ static int32_t smlParseTelnetTags(const char* data, SArray *cols, SHashObj *dump sql++; } - if(keyLen == 0 || keyLen >= TSDB_COL_NAME_LEN){ + if(IS_INVALID_COL_LEN(keyLen)){ smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key); return TSDB_CODE_SML_INVALID_DATA; } @@ -1022,6 +1027,13 @@ static int32_t smlParseTelnetTags(const char* data, SArray *cols, SHashObj *dump return TSDB_CODE_SML_INVALID_DATA; } + //handle child table name + if(childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0){ + memset(childTableName, 0, TSDB_TABLE_NAME_LEN); + strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN)); + continue; + } + // add kv to SSmlKv SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); if(!kv) return TSDB_CODE_OUT_OF_MEMORY; @@ -1043,7 +1055,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char* sql, SSmlTable // parse metric smlParseTelnetElement(&sql, &tinfo->sTableName, &tinfo->sTableNameLen); - if (!(tinfo->sTableName) || tinfo->sTableNameLen == 0) { + if (!(tinfo->sTableName) || IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) { smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql); return TSDB_CODE_SML_INVALID_DATA; } @@ -1085,7 +1097,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char* sql, SSmlTable } // parse tags - ret = smlParseTelnetTags(sql, tinfo->tags, info->dumplicateKey, &info->msgBuf); + ret = smlParseTelnetTags(sql, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf); if (ret != TSDB_CODE_SUCCESS) { smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql); return TSDB_CODE_SML_INVALID_DATA; @@ -1094,7 +1106,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char* sql, SSmlTable return TSDB_CODE_SUCCESS; } -static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool isTag, SHashObj *dumplicateKey, SSmlMsgBuf *msg){ +static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, char *childTableName, bool isTag, SHashObj *dumplicateKey, SSmlMsgBuf *msg){ if(isTag && len == 0){ SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); if(!kv) return TSDB_CODE_OUT_OF_MEMORY; @@ -1107,6 +1119,7 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is return TSDB_CODE_SUCCESS; } + size_t childTableNameLen = strlen(tsSmlChildTableName); const char *sql = data; while(sql < data + len){ const char *key = sql; @@ -1126,7 +1139,7 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is sql++; } - if(keyLen == 0 || keyLen >= TSDB_COL_NAME_LEN){ + if(IS_INVALID_COL_LEN(keyLen)){ smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key); return TSDB_CODE_SML_INVALID_DATA; } @@ -1169,6 +1182,13 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is PROCESS_SLASH(key, keyLen) PROCESS_SLASH(value, valueLen) + //handle child table name + if(childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0){ + memset(childTableName, 0, TSDB_TABLE_NAME_LEN); + strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN)); + continue; + } + // add kv to SSmlKv SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); if(!kv) return TSDB_CODE_OUT_OF_MEMORY; @@ -1477,8 +1497,8 @@ static int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *root, SSmlTableIn } tinfo->sTableNameLen = strlen(metric->valuestring); - if (tinfo->sTableNameLen >= TSDB_TABLE_NAME_LEN) { - uError("OTD:0x%"PRIx64" Metric cannot exceeds %d characters in JSON", info->id, TSDB_TABLE_NAME_LEN - 1); + if (IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) { + uError("OTD:0x%"PRIx64" Metric lenght is 0 or large than 192", info->id); return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; } @@ -1828,60 +1848,49 @@ static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) { return TSDB_CODE_SUCCESS; } -static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, SHashObj *dumplicateKey, SSmlMsgBuf *msg) { +static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, char *childTableName, SHashObj *dumplicateKey, SSmlMsgBuf *msg) { int32_t ret = TSDB_CODE_SUCCESS; cJSON *tags = cJSON_GetObjectItem(root, "tags"); if (tags == NULL || tags->type != cJSON_Object) { return TSDB_CODE_TSC_INVALID_JSON; } - //handle child table name todo -// size_t childTableNameLen = strlen(tsSmlChildTableName); -// char childTbName[TSDB_TABLE_NAME_LEN] = {0}; -// if (childTableNameLen != 0) { -// memcpy(childTbName, tsSmlChildTableName, childTableNameLen); -// cJSON *id = cJSON_GetObjectItem(tags, childTbName); -// if (id != NULL) { -// if (!cJSON_IsString(id)) { -// tscError("OTD:0x%"PRIx64" ID must be JSON string", info->id); -// return TSDB_CODE_TSC_INVALID_JSON; -// } -// size_t idLen = strlen(id->valuestring); -// *childTableName = tcalloc(idLen + TS_BACKQUOTE_CHAR_SIZE + 1, sizeof(char)); -// memcpy(*childTableName, id->valuestring, idLen); -// addEscapeCharToString(*childTableName, (int32_t)idLen); -// -// //check duplicate IDs -// cJSON_DeleteItemFromObject(tags, childTbName); -// id = cJSON_GetObjectItem(tags, childTbName); -// if (id != NULL) { -// return TSDB_CODE_TSC_DUP_TAG_NAMES; -// } -// } -// } + size_t childTableNameLen = strlen(tsSmlChildTableName); int32_t tagNum = cJSON_GetArraySize(tags); for (int32_t i = 0; i < tagNum; ++i) { cJSON *tag = cJSON_GetArrayItem(tags, i); if (tag == NULL) { return TSDB_CODE_TSC_INVALID_JSON; } + size_t keyLen = strlen(tag->string); + if (IS_INVALID_COL_LEN(keyLen)) { + uError("OTD:Tag key length is 0 or too large than 64"); + return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH; + } //check duplicate keys - if (smlCheckDuplicateKey(tag->string, strlen(tag->string), dumplicateKey)) { + if (smlCheckDuplicateKey(tag->string, keyLen, dumplicateKey)) { return TSDB_CODE_TSC_DUP_TAG_NAMES; } + //handle child table name + if(childTableNameLen != 0 && strcmp(tag->string, tsSmlChildTableName) == 0){ + if (!cJSON_IsString(tag)) { + uError("OTD:ID must be JSON string"); + return TSDB_CODE_TSC_INVALID_JSON; + } + memset(childTableName, 0, TSDB_TABLE_NAME_LEN); + strncpy(childTableName, tag->valuestring, TSDB_TABLE_NAME_LEN); + continue; + } + // add kv to SSmlKv SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); if(!kv) return TSDB_CODE_OUT_OF_MEMORY; if(pKVs) taosArrayPush(pKVs, &kv); //key - kv->keyLen = strlen(tag->string); - if (kv->keyLen >= TSDB_COL_NAME_LEN) { - uError("OTD:Tag key cannot exceeds %d characters in JSON", TSDB_COL_NAME_LEN - 1); - return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH; - } + kv->keyLen = keyLen; ret = smlJsonCreateSring(&kv->key, tag->string, kv->keyLen); if (ret != TSDB_CODE_SUCCESS) { return ret; @@ -1937,7 +1946,7 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo * uDebug("OTD:0x%"PRIx64" Parse metric value from JSON payload finished", info->id); //Parse tags - ret = smlParseTagsFromJSON(root, tinfo->tags, info->dumplicateKey, &info->msgBuf); + ret = smlParseTagsFromJSON(root, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf); if (ret) { uError("OTD:0x%"PRIx64" Unable to parse tags from JSON payload", info->id); return ret; @@ -2019,11 +2028,16 @@ static int32_t smlParseInfluxLine(SSmlHandle* info, const char* sql) { (*oneTable)->sTableName = elements.measure; (*oneTable)->sTableNameLen = elements.measureLen; - RandTableName rName = { (*oneTable)->tags, (*oneTable)->sTableName, (uint8_t)(*oneTable)->sTableNameLen, - (*oneTable)->childTableName, 0 }; + if(strlen((*oneTable)->childTableName) == 0){ + RandTableName rName = { (*oneTable)->tags, (*oneTable)->sTableName, (uint8_t)(*oneTable)->sTableNameLen, + (*oneTable)->childTableName, 0 }; + + buildChildTableName(&rName); + (*oneTable)->uid = rName.uid; + }else{ + (*oneTable)->uid = *(uint64_t*)((*oneTable)->childTableName); + } - buildChildTableName(&rName); - (*oneTable)->uid = rName.uid; } SSmlSTableMeta** tableMeta = (SSmlSTableMeta**)taosHashGet(info->superTables, elements.measure, elements.measureLen); @@ -2087,10 +2101,15 @@ static int32_t smlParseTelnetLine(SSmlHandle* info, void *data) { } taosHashClear(info->dumplicateKey); - RandTableName rName = { tinfo->tags, tinfo->sTableName, (uint8_t)tinfo->sTableNameLen, - tinfo->childTableName, 0 }; - buildChildTableName(&rName); - tinfo->uid = rName.uid; + if(strlen(tinfo->childTableName) == 0){ + RandTableName rName = { tinfo->tags, tinfo->sTableName, (uint8_t)tinfo->sTableNameLen, + tinfo->childTableName, 0 }; + buildChildTableName(&rName); + tinfo->uid = rName.uid; + }else{ + tinfo->uid = *(uint64_t*)(tinfo->childTableName); // generate uid by name simple + } + bool hasTable = true; SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashGet(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName)); @@ -2313,9 +2332,9 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr return (TAOS_RES*)request; } - if (numLines <= 0 || numLines > 65536) { + if (!lines) { request->code = TSDB_CODE_SML_INVALID_DATA; - smlBuildInvalidDataMsg(&info->msgBuf, "numLines should be between 1 and 65536", NULL); + smlBuildInvalidDataMsg(&info->msgBuf, "lines is null", NULL); goto end; } diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index aaad606feb..8a109800e6 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -76,6 +76,10 @@ int32_t tsTelemInterval = 86400; char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com"; uint16_t tsTelemPort = 80; +// schemaless +char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; //user defined child table name can be specified in tag value. + //If set to empty system will generate table name using MD5 hash. + // query int32_t tsQueryPolicy = 1; @@ -512,6 +516,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { return -1; } + tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN); + tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32; tsCompressMsgSize = cfgGetItem(pCfg, "compressMsgSize")->i32; tsCompressColData = cfgGetItem(pCfg, "compressColData")->i32; From 65ef7c626deba2d8d295ba3d39461d544b23054d Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 16 May 2022 21:05:36 +0800 Subject: [PATCH 03/18] refactor:add the configuration of child table name --- source/client/src/clientSml.c | 6 +++--- source/client/test/smlTest.cpp | 30 ++++++++++++++++-------------- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index a18b78b501..001c947318 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1183,7 +1183,7 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, char *c PROCESS_SLASH(value, valueLen) //handle child table name - if(childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0){ + if(childTableName && childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0){ memset(childTableName, 0, TSDB_TABLE_NAME_LEN); strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN)); continue; @@ -1984,7 +1984,7 @@ static int32_t smlParseInfluxLine(SSmlHandle* info, const char* sql) { if(info->dataFormat) taosArrayDestroy(cols); return ret; } - ret = smlParseCols(elements.cols, elements.colsLen, cols, false, info->dumplicateKey, &info->msgBuf); + ret = smlParseCols(elements.cols, elements.colsLen, cols, NULL, false, info->dumplicateKey, &info->msgBuf); if(ret != TSDB_CODE_SUCCESS){ uError("SML:0x%"PRIx64" smlParseCols parse cloums fields failed", info->id); smlDestroyCols(cols); @@ -2015,7 +2015,7 @@ static int32_t smlParseInfluxLine(SSmlHandle* info, const char* sql) { } if(!hasTable){ - ret = smlParseCols(elements.tags, elements.tagsLen, (*oneTable)->tags, true, info->dumplicateKey, &info->msgBuf); + ret = smlParseCols(elements.tags, elements.tagsLen, (*oneTable)->tags, (*oneTable)->childTableName, true, info->dumplicateKey, &info->msgBuf); if(ret != TSDB_CODE_SUCCESS){ uError("SML:0x%"PRIx64" smlParseCols parse tag fields failed", info->id); return ret; diff --git a/source/client/test/smlTest.cpp b/source/client/test/smlTest.cpp index e567a0c3e8..1eb6909281 100644 --- a/source/client/test/smlTest.cpp +++ b/source/client/test/smlTest.cpp @@ -207,7 +207,7 @@ TEST(testCase, smlParseCols_Error_Test) { char *sql = (char*)taosMemoryCalloc(256, 1); memcpy(sql, data[i], len + 1); SArray *cols = taosArrayInit(8, POINTER_BYTES); - int32_t ret = smlParseCols(sql, len, cols, false, dumplicateKey, &msgBuf); + int32_t ret = smlParseCols(sql, len, cols, NULL, false, dumplicateKey, &msgBuf); ASSERT_NE(ret, TSDB_CODE_SUCCESS); taosHashClear(dumplicateKey); taosMemoryFree(sql); @@ -233,7 +233,7 @@ TEST(testCase, smlParseCols_tag_Test) { const char *data = "cbin=\"passit helloc\",cnch=L\"iisdfsf\",cbool=false,cf64=4.31f64,cf64_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l\"iuwq\""; int32_t len = strlen(data); - int32_t ret = smlParseCols(data, len, cols, true, dumplicateKey, &msgBuf); + int32_t ret = smlParseCols(data, len, cols, NULL, true, dumplicateKey, &msgBuf); ASSERT_EQ(ret, TSDB_CODE_SUCCESS); int32_t size = taosArrayGetSize(cols); ASSERT_EQ(size, 19); @@ -265,7 +265,7 @@ TEST(testCase, smlParseCols_tag_Test) { len = 0; memset(msgBuf.buf, 0, msgBuf.len); taosHashClear(dumplicateKey); - ret = smlParseCols(data, len, cols, true, dumplicateKey, &msgBuf); + ret = smlParseCols(data, len, cols, NULL, true, dumplicateKey, &msgBuf); ASSERT_EQ(ret, TSDB_CODE_SUCCESS); size = taosArrayGetSize(cols); ASSERT_EQ(size, 1); @@ -298,7 +298,7 @@ TEST(testCase, smlParseCols_Test) { int32_t len = strlen(data); char *sql = (char*)taosMemoryCalloc(1024, 1); memcpy(sql, data, len + 1); - int32_t ret = smlParseCols(sql, len, cols, false, dumplicateKey, &msgBuf); + int32_t ret = smlParseCols(sql, len, cols, NULL, false, dumplicateKey, &msgBuf); ASSERT_EQ(ret, TSDB_CODE_SUCCESS); int32_t size = taosArrayGetSize(cols); ASSERT_EQ(size, 19); @@ -516,16 +516,18 @@ TEST(testCase, smlProcess_influx_Test) { int ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0])); ASSERT_EQ(ret, 0); -// TAOS_RES *res = taos_query(taos, "select * from t_6885c584b98481584ee13dac399e173d"); -// ASSERT_NE(res, nullptr); -// int fieldNum = taos_field_count(res); -// ASSERT_EQ(fieldNum, 5); -// int rowNum = taos_affected_rows(res); -// ASSERT_EQ(rowNum, 2); -// for (int i = 0; i < rowNum; ++i) { -// TAOS_ROW rows = taos_fetch_row(res); -// } -// taos_free_result(res); + TAOS_RES *res = taos_query(taos, "select * from t_6885c584b98481584ee13dac399e173d"); + ASSERT_NE(res, nullptr); + int fieldNum = taos_field_count(res); + //ASSERT_EQ(fieldNum, 5); + printf("fieldNum:%d\n", fieldNum); + int rowNum = taos_affected_rows(res); + //ASSERT_EQ(rowNum, 2); + printf("rowNum:%d\n", rowNum); + for (int i = 0; i < rowNum; ++i) { + TAOS_ROW rows = taos_fetch_row(res); + } + taos_free_result(res); destroyRequest(request); smlDestroyInfo(info); } From 831dbc5b5074976c8650517c17f1e7ce588bf390 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 16 May 2022 21:40:30 +0800 Subject: [PATCH 04/18] refactor:add the configuration of child table name --- source/client/test/CMakeLists.txt | 4 ++++ source/common/src/tglobal.c | 1 + 2 files changed, 5 insertions(+) diff --git a/source/client/test/CMakeLists.txt b/source/client/test/CMakeLists.txt index 3fc91d07b6..03d2b48134 100644 --- a/source/client/test/CMakeLists.txt +++ b/source/client/test/CMakeLists.txt @@ -41,3 +41,7 @@ TARGET_INCLUDE_DIRECTORIES( PRIVATE "${TD_SOURCE_DIR}/source/client/inc" ) +add_test( + NAME smlTest + COMMAND smlTest +) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 8a109800e6..5beb5454de 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -323,6 +323,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, 1) != 0) return -1; if (cfgAddBool(pCfg, "keepColumnName", tsKeepOriginalColumnName, 1) != 0) return -1; if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1; + if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1; tsNumOfTaskQueueThreads = tsNumOfCores / 4; tsNumOfTaskQueueThreads = TRANGE(tsNumOfTaskQueueThreads, 1, 2); From 7cde35219f1ea73c00f7508baaf9259f2d2ce833 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 16 May 2022 21:48:45 +0800 Subject: [PATCH 05/18] refactor:add the configuration of child table name --- source/libs/scalar/test/scalar/CMakeLists.txt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/libs/scalar/test/scalar/CMakeLists.txt b/source/libs/scalar/test/scalar/CMakeLists.txt index 15d1c2cb44..866297b017 100644 --- a/source/libs/scalar/test/scalar/CMakeLists.txt +++ b/source/libs/scalar/test/scalar/CMakeLists.txt @@ -17,7 +17,7 @@ TARGET_INCLUDE_DIRECTORIES( PUBLIC "${TD_SOURCE_DIR}/source/libs/parser/inc" PRIVATE "${TD_SOURCE_DIR}/source/libs/scalar/inc" ) -add_test( - NAME scalarTest - COMMAND scalarTest -) +#add_test( +# NAME scalarTest +# COMMAND scalarTest +#) From c2a918a85e8f2ad4c45503eeefb4b512629cc9d9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 17 May 2022 12:36:32 +0800 Subject: [PATCH 06/18] refactor: do some internal refactor. --- source/libs/executor/src/executorimpl.c | 26 ++++++------------------- 1 file changed, 6 insertions(+), 20 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 688901226c..e280724200 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -155,7 +155,7 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, void operatorDummyCloseFn(void* param, int32_t numOfCols) {} static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, - int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs); + int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo); @@ -2136,23 +2136,13 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p * @param result */ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, - int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs) { + int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs) { int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); - int32_t numOfResult = pBlock->info.rows; // there are already exists result rows - - int32_t start = 0; - int32_t step = 1; + int32_t start = pGroupResInfo->index; // qDebug("QInfo:0x%"PRIx64" start to copy data from windowResInfo to output buf", GET_TASKID(pRuntimeEnv)); - assert(orderType == TSDB_ORDER_ASC || orderType == TSDB_ORDER_DESC); - if (orderType == TSDB_ORDER_ASC) { - start = pGroupResInfo->index; - } else { // desc order copy all data - start = numOfRows - pGroupResInfo->index - 1; - } - - for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) { + for (int32_t i = start; i < numOfRows; i += 1) { SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i); SFilePage* page = getBufPage(pBuf, pPos->pos.pageId); @@ -2162,9 +2152,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn continue; } - // TODO copy multiple rows? - int32_t numOfRowsToCopy = pRow->numOfRows; - if (numOfResult + numOfRowsToCopy >= pBlock->info.capacity) { + if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { break; } @@ -2195,7 +2183,6 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn } releaseBufPage(pBuf, page); - pBlock->info.rows += pRow->numOfRows; if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full break; @@ -2223,8 +2210,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG return; } - int32_t orderType = TSDB_ORDER_ASC; - doCopyToSDataBlock(pTaskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, orderType, rowCellOffset, pCtx, numOfExprs); + doCopyToSDataBlock(pTaskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, rowCellOffset, pCtx, numOfExprs); // add condition (pBlock->info.rows >= 1) just to runtime happy blockDataUpdateTsWindow(pBlock); From c2c2dbf2bc6526e85632fe4c3103cbc1b79b4c25 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 17 May 2022 15:12:55 +0800 Subject: [PATCH 07/18] refactor:add test cases for schemaless --- source/client/src/clientSml.c | 6 +- source/client/test/smlTest.cpp | 199 ++++++++++++++++++++++++--------- 2 files changed, 148 insertions(+), 57 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 001c947318..741301cab7 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -663,12 +663,12 @@ static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg){ static bool smlParseBool(SSmlKv *kvVal) { const char *pVal = kvVal->value; int32_t len = kvVal->length; - if ((len == 1) && pVal[0] == 't') { + if ((len == 1) && (pVal[0] == 't' || pVal[0] == 'T')) { kvVal->i = true; return true; } - if ((len == 1) && pVal[0] == 'f') { + if ((len == 1) && (pVal[0] == 'f' || pVal[0] == 'F')) { kvVal->i = false; return true; } @@ -2344,7 +2344,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr goto end; } - if(protocol == TSDB_SML_LINE_PROTOCOL && (precision < TSDB_SML_TIMESTAMP_HOURS || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)){ + if(protocol == TSDB_SML_LINE_PROTOCOL && (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)){ request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE; smlBuildInvalidDataMsg(&info->msgBuf, "precision invalidate for line protocol", NULL); goto end; diff --git a/source/client/test/smlTest.cpp b/source/client/test/smlTest.cpp index 1eb6909281..37cb1989b3 100644 --- a/source/client/test/smlTest.cpp +++ b/source/client/test/smlTest.cpp @@ -488,10 +488,10 @@ TEST(testCase, smlProcess_influx_Test) { TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(taos, nullptr); - TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db"); + TAOS_RES* pRes = taos_query(taos, "create database if not exists inflx_db"); taos_free_result(pRes); - pRes = taos_query(taos, "use sml_db"); + pRes = taos_query(taos, "use inflx_db"); taos_free_result(pRes); SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); @@ -501,33 +501,105 @@ TEST(testCase, smlProcess_influx_Test) { ASSERT_NE(info, nullptr); const char *sql[] = { - "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0 1451606400000000000", - "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451607400000000000", - "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,heading=221,grade=0,fuel_consumption=25 1451608400000000000", - "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451609400000000000", - "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451619400000000000", - "readings,name=truck_1,fleet=South,driver=Albert,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=72.45258,longitude=68.83761,elevation=255,velocity=0,heading=181,grade=0,fuel_consumption=25 1451606400000000000", - "readings,name=truck_2,driver=Derek,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451606400000000000", - "readings,name=truck_2,fleet=North,driver=Derek,model=F-150 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451609400000000000", - "readings,fleet=South,name=truck_0,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451629400000000000", - "stable,t1=t1,t2=t2,t3=t3 c1=1,c2=2,c3=3,c4=4 1451629500000000000", - "stable,t2=t2,t1=t1,t3=t3 c1=1,c3=3,c4=4 1451629600000000000", + "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0 1451606401000000000", + "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451607402000000000", + "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,heading=221,grade=0,fuel_consumption=25 1451608403000000000", + "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451609404000000000", + "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451619405000000000", + "readings,name=truck_1,fleet=South,driver=Albert,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=72.45258,longitude=68.83761,elevation=255,velocity=0,heading=181,grade=0,fuel_consumption=25 145160640600000000", + "readings,name=truck_2,driver=Derek,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451606407000000000", + "readings,name=truck_2,fleet=North,driver=Derek,model=F-150 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451609408000000000", + "readings,fleet=South,name=truck_0,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451629409000000000", + "stable,t1=t1,t2=t2,t3=t3 c1=1,c2=2,c3=\"kk\",c4=4 1451629501000000000", + "stable,t2=t2,t1=t1,t3=t3 c1=1,c3=\"\",c4=4 1451629602000000000", }; int ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0])); ASSERT_EQ(ret, 0); - TAOS_RES *res = taos_query(taos, "select * from t_6885c584b98481584ee13dac399e173d"); + // case 1 + TAOS_RES *res = taos_query(taos, "select * from t_91e0b182be80332b5c530cbf872f760e"); ASSERT_NE(res, nullptr); int fieldNum = taos_field_count(res); - //ASSERT_EQ(fieldNum, 5); + ASSERT_EQ(fieldNum, 11); printf("fieldNum:%d\n", fieldNum); - int rowNum = taos_affected_rows(res); - //ASSERT_EQ(rowNum, 2); - printf("rowNum:%d\n", rowNum); - for (int i = 0; i < rowNum; ++i) { - TAOS_ROW rows = taos_fetch_row(res); + + TAOS_ROW row = NULL; + int32_t rowIndex = 0; + while((row = taos_fetch_row(res)) != NULL) { + int64_t ts = *(int64_t*)row[0]; + double load_capacity = *(double*)row[1]; + double fuel_capacity = *(double*)row[2]; + double nominal_fuel_consumption = *(double*)row[3]; + double latitude = *(double*)row[4]; + double longitude = *(double*)row[5]; + double elevation = *(double*)row[6]; + double velocity = *(double*)row[7]; + double heading = *(double*)row[8]; + double grade = *(double*)row[9]; + double fuel_consumption = *(double*)row[10]; + if(rowIndex == 0){ + ASSERT_EQ(ts, 1451606407000); + ASSERT_EQ(load_capacity, 2000); + ASSERT_EQ(fuel_capacity, 200); + ASSERT_EQ(nominal_fuel_consumption, 15); + ASSERT_EQ(latitude, 24.5208); + ASSERT_EQ(longitude, 28.09377); + ASSERT_EQ(elevation, 428); + ASSERT_EQ(velocity, 0); + ASSERT_EQ(heading, 304); + ASSERT_EQ(grade, 0); + ASSERT_EQ(fuel_consumption, 25); + }else{ + ASSERT_FALSE(1); + } + rowIndex++; } taos_free_result(res); + + // case 2 + res = taos_query(taos, "select * from t_6885c584b98481584ee13dac399e173d"); + ASSERT_NE(res, nullptr); + fieldNum = taos_field_count(res); + ASSERT_EQ(fieldNum, 5); + printf("fieldNum:%d\n", fieldNum); + + rowIndex = 0; + while((row = taos_fetch_row(res)) != NULL) { + int *length = taos_fetch_lengths(res); + + int64_t ts = *(int64_t*)row[0]; + double c1 = *(double*)row[1]; + double c4 = *(double*)row[4]; + if(rowIndex == 0){ + ASSERT_EQ(ts, 1451629501000); + ASSERT_EQ(c1, 1); + ASSERT_EQ(*(double*)row[2], 2); + ASSERT_EQ(length[3], 2); + ASSERT_EQ(memcmp(row[3], "kk", length[3]), 0); + ASSERT_EQ(c4, 4); + }else if(rowIndex == 1){ + ASSERT_EQ(ts, 1451629602000); + ASSERT_EQ(c1, 1); + ASSERT_EQ(row[2], nullptr); + ASSERT_EQ(length[3], 0); + ASSERT_EQ(c4, 4); + }else{ + ASSERT_FALSE(1); + } + rowIndex++; + } + taos_free_result(res); + + // case 2 + res = taos_query(taos, "show tables"); + ASSERT_NE(res, nullptr); + + row = taos_fetch_row(res); + int rowNum = taos_affected_rows(res); + ASSERT_EQ(rowNum, 5); + taos_free_result(res); + + destroyRequest(request); smlDestroyInfo(info); } @@ -586,10 +658,10 @@ TEST(testCase, smlProcess_telnet_Test) { TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(taos, nullptr); - TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db"); + TAOS_RES* pRes = taos_query(taos, "create database if not exists telnet_db"); taos_free_result(pRes); - pRes = taos_query(taos, "use sml_db"); + pRes = taos_query(taos, "use telnet_db"); taos_free_result(pRes); SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); @@ -607,27 +679,31 @@ TEST(testCase, smlProcess_telnet_Test) { int ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0])); ASSERT_EQ(ret, 0); -// TAOS_RES *res = taos_query(taos, "select * from t_8c30283b3c4131a071d1e16cf6d7094a"); -// ASSERT_NE(res, nullptr); -// int fieldNum = taos_field_count(res); -// ASSERT_EQ(fieldNum, 2); -// int rowNum = taos_affected_rows(res); -// ASSERT_EQ(rowNum, 1); -// for (int i = 0; i < rowNum; ++i) { -// TAOS_ROW rows = taos_fetch_row(res); -// } -// taos_free_result(res); + // case 1 + TAOS_RES *res = taos_query(taos, "select * from t_8c30283b3c4131a071d1e16cf6d7094a"); + ASSERT_NE(res, nullptr); + int fieldNum = taos_field_count(res); + ASSERT_EQ(fieldNum, 2); + + TAOS_ROW row = taos_fetch_row(res); + int64_t ts = *(int64_t*)row[0]; + double c1 = *(double*)row[1]; + ASSERT_EQ(ts, 1479496100000); + ASSERT_EQ(c1, 42); + + int rowNum = taos_affected_rows(res); + ASSERT_EQ(rowNum, 1); + taos_free_result(res); + + // case 2 + res = taos_query(taos, "show tables"); + ASSERT_NE(res, nullptr); + + row = taos_fetch_row(res); + rowNum = taos_affected_rows(res); + ASSERT_EQ(rowNum, 3); + taos_free_result(res); -// res = taos_query(taos, "select * from t_6931529054e5637ca92c78a1ad441961"); -// ASSERT_NE(res, nullptr); -// fieldNum = taos_field_count(res); -// ASSERT_EQ(fieldNum, 2); -// rowNum = taos_affected_rows(res); -// ASSERT_EQ(rowNum, 2); -// for (int i = 0; i < rowNum; ++i) { -// TAOS_ROW rows = taos_fetch_row(res); -// } -// taos_free_result(res); destroyRequest(request); smlDestroyInfo(info); } @@ -636,10 +712,10 @@ TEST(testCase, smlProcess_json1_Test) { TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(taos, nullptr); - TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db"); + TAOS_RES *pRes = taos_query(taos, "create database if not exists json_db"); taos_free_result(pRes); - pRes = taos_query(taos, "use sml_db"); + pRes = taos_query(taos, "use json_db"); taos_free_result(pRes); SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT); @@ -672,16 +748,31 @@ TEST(testCase, smlProcess_json1_Test) { int ret = smlProcess(info, (char **)(&sql), -1); ASSERT_EQ(ret, 0); -// TAOS_RES *res = taos_query(taos, "select * from t_cb27a7198d637b4f1c6464bd73f756a7"); -// ASSERT_NE(res, nullptr); -// int fieldNum = taos_field_count(res); -// ASSERT_EQ(fieldNum, 2); - // int rowNum = taos_affected_rows(res); - // ASSERT_EQ(rowNum, 1); - // for (int i = 0; i < rowNum; ++i) { - // TAOS_ROW rows = taos_fetch_row(res); - // } -// taos_free_result(res); + // case 1 + TAOS_RES *res = taos_query(taos, "select * from t_cb27a7198d637b4f1c6464bd73f756a7"); + ASSERT_NE(res, nullptr); + int fieldNum = taos_field_count(res); + ASSERT_EQ(fieldNum, 2); + + TAOS_ROW row = taos_fetch_row(res); + int64_t ts = *(int64_t*)row[0]; + double c1 = *(double*)row[1]; + ASSERT_EQ(ts, 1346846400000); + ASSERT_EQ(c1, 18); + + int rowNum = taos_affected_rows(res); + ASSERT_EQ(rowNum, 1); + taos_free_result(res); + + // case 2 + res = taos_query(taos, "show tables"); + ASSERT_NE(res, nullptr); + + row = taos_fetch_row(res); + rowNum = taos_affected_rows(res); + ASSERT_EQ(rowNum, 2); + taos_free_result(res); + destroyRequest(request); smlDestroyInfo(info); } From e8880e154411ed066ea866d250ddfd4fe795a760 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 17 May 2022 15:14:16 +0800 Subject: [PATCH 08/18] fix: udf memory sanitizing --- include/util/tcoding.h | 4 +-- source/libs/function/src/udfd.c | 54 ++++++++++++++++++++++++++++----- 2 files changed, 48 insertions(+), 10 deletions(-) diff --git a/include/util/tcoding.h b/include/util/tcoding.h index 74e64d5292..5962949a70 100644 --- a/include/util/tcoding.h +++ b/include/util/tcoding.h @@ -63,14 +63,14 @@ static FORCE_INLINE void *taosSkipFixedLen(const void *buf, size_t len) { return static FORCE_INLINE int32_t taosEncodeFixedBool(void **buf, bool value) { if (buf != NULL) { - ((int8_t *)(*buf))[0] = value ? 1 : 0; + ((int8_t *)(*buf))[0] = (value ? 1 : 0); *buf = POINTER_SHIFT(*buf, sizeof(int8_t)); } return (int32_t)sizeof(int8_t); } static FORCE_INLINE void *taosDecodeFixedBool(const void *buf, bool *value) { - *value = ((int8_t *)buf)[0] == 0 ? false : true; + *value = ( (((int8_t *)buf)[0] == 0) ? false : true ); return POINTER_SHIFT(buf, sizeof(int8_t)); } diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index e644ea6172..19421072f8 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -226,7 +226,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { SUdfDataBlock input = {0}; convertDataBlockToUdfDataBlock(&call->block, &input); code = udf->scalarProcFunc(&input, &output); - + freeUdfDataDataBlock(&input); convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); freeUdfColumn(&output); break; @@ -246,6 +246,8 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { .bufLen= udf->bufSize, .numOfResult = 0}; code = udf->aggProcFunc(&input, &call->interBuf, &outBuf); + freeUdfInterBuf(&call->interBuf); + freeUdfDataDataBlock(&input); subRsp->resultBuf = outBuf; break; @@ -255,6 +257,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { .bufLen= udf->bufSize, .numOfResult = 0}; code = udf->aggFinishFunc(&call->interBuf, &outBuf); + freeUdfInterBuf(&call->interBuf); subRsp->resultBuf = outBuf; break; } @@ -274,6 +277,30 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { encodeUdfResponse(&buf, rsp); uvUdf->output = uv_buf_init(bufBegin, len); + switch (call->callType) { + case TSDB_UDF_CALL_SCALA_PROC: { + tDeleteSSDataBlock(&call->block); + tDeleteSSDataBlock(&subRsp->resultData); + break; + } + case TSDB_UDF_CALL_AGG_INIT: { + freeUdfInterBuf(&subRsp->resultBuf); + break; + } + case TSDB_UDF_CALL_AGG_PROC: { + tDeleteSSDataBlock(&call->block); + freeUdfInterBuf(&subRsp->resultBuf); + break; + } + case TSDB_UDF_CALL_AGG_FIN: { + freeUdfInterBuf(&subRsp->resultBuf); + break; + } + default: + break; + + } + taosMemoryFree(uvUdf->input.base); return; } @@ -348,9 +375,8 @@ void udfdProcessRequest(uv_work_t *req) { void udfdOnWrite(uv_write_t *req, int status) { SUvUdfWork *work = (SUvUdfWork *)req->data; if (status < 0) { - // TODO:log error and process it. + fnError("udfd send response error, length: %zu code: %s", work->output.len, uv_err_name(status)); } - fnDebug("send response. length:%zu, status: %s", work->output.len, uv_err_name(status)); taosMemoryFree(work->output.base); taosMemoryFree(work); taosMemoryFree(req); @@ -549,6 +575,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); taosCloseFile(&file); strncpy(udf->path, path, strlen(path)); + tFreeSFuncInfo(pFuncInfo); taosArrayDestroy(retrieveRsp.pFuncInfos); msgInfo->code = 0; } @@ -800,15 +827,26 @@ static int32_t udfdUvInit() { return 0; } +static void udfdCloseWalkCb(uv_handle_t* handle, void* arg) { + if (!uv_is_closing(handle)) { + uv_close(handle, NULL); + } +} + static int32_t udfdRun() { global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); uv_mutex_init(&global.udfsMutex); - fnInfo("start the udfd"); - int code = uv_run(global.loop, UV_RUN_DEFAULT); - fnInfo("udfd stopped. result: %s, code: %d", uv_err_name(code), code); - int codeClose = uv_loop_close(global.loop); - fnDebug("uv loop close. result: %s", uv_err_name(codeClose)); + fnInfo("start udfd event loop"); + uv_run(global.loop, UV_RUN_DEFAULT); + fnInfo("udfd event loop stopped."); + + uv_loop_close(global.loop); + + uv_walk(global.loop, udfdCloseWalkCb, NULL); + uv_run(global.loop, UV_RUN_DEFAULT); + uv_loop_close(global.loop); + uv_mutex_destroy(&global.udfsMutex); taosHashCleanup(global.udfsHash); return 0; From 7967f2d7c034d7317289b7db479e2892c2d6b556 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 17 May 2022 15:19:32 +0800 Subject: [PATCH 09/18] fix: sanitizing tudf memory --- source/libs/function/src/tudf.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 03a3891a4c..192e04c9a3 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1287,7 +1287,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { task->type = UDF_TASK_SETUP; SUdfSetupRequest *req = &task->_setup.req; - memcpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN); + strncpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN); int32_t errCode = udfcRunUdfUvTask(task, UV_TASK_CONNECT); if (errCode != 0) { From d614473ae20dbc4e6b25a57a6f8e50be21b6fd88 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 17 May 2022 15:27:48 +0800 Subject: [PATCH 10/18] refactor:add test cases for schemaless --- source/client/src/clientSml.c | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 741301cab7..cdae558653 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -598,19 +598,25 @@ static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg){ kvVal->type = TSDB_DATA_TYPE_FLOAT; kvVal->f = (float)result; }else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)){ - if(smlDoubleToInt64OverFlow(result)){ - smlBuildInvalidDataMsg(msg, "big int is too large, out of precision", pVal); - return false; + if(result >= (double)INT64_MAX){ + kvVal->i = INT64_MAX; + }else if(result <= (double)INT64_MIN){ + kvVal->i = INT64_MIN; + }else{ + kvVal->i = result; } kvVal->type = TSDB_DATA_TYPE_BIGINT; - kvVal->i = (int64_t)result; }else if ((left == 3 && strncasecmp(endptr, "u64", left) == 0)){ - if(result >= (double)UINT64_MAX || result < 0){ + if(result < 0){ smlBuildInvalidDataMsg(msg, "unsigned big int is too large, out of precision", pVal); return false; } + if(result >= (double)UINT64_MAX){ + kvVal->u = UINT64_MAX; + }else{ + kvVal->u = result; + } kvVal->type = TSDB_DATA_TYPE_UBIGINT; - kvVal->u = result; }else if (left == 3 && strncasecmp(endptr, "i32", left) == 0){ if(!IS_VALID_INT(result)){ smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal); @@ -1694,11 +1700,13 @@ static int32_t smlConvertJSONNumber(SSmlKv *pVal, char* typeStr, cJSON *value) { strcasecmp(typeStr, "bigint") == 0) { pVal->type = TSDB_DATA_TYPE_BIGINT; pVal->length = (int16_t)tDataTypes[pVal->type].bytes; - if(smlDoubleToInt64OverFlow(value->valuedouble)){ - uError("OTD:JSON value(%f) cannot fit in type(big int)", value->valuedouble); - return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; + if(value->valuedouble >= (double)INT64_MAX){ + pVal->i = INT64_MAX; + }else if(value->valuedouble <= (double)INT64_MIN){ + pVal->i = INT64_MIN; + }else{ + pVal->i = value->valuedouble; } - pVal->i = value->valuedouble; return TSDB_CODE_SUCCESS; } //float From 636020942c04d6978a346eb45ce96dbb2799a229 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 17 May 2022 15:43:58 +0800 Subject: [PATCH 11/18] refactor:add test cases for schemaless --- source/client/test/CMakeLists.txt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/client/test/CMakeLists.txt b/source/client/test/CMakeLists.txt index 03d2b48134..bb9f0ed23e 100644 --- a/source/client/test/CMakeLists.txt +++ b/source/client/test/CMakeLists.txt @@ -41,7 +41,7 @@ TARGET_INCLUDE_DIRECTORIES( PRIVATE "${TD_SOURCE_DIR}/source/client/inc" ) -add_test( - NAME smlTest - COMMAND smlTest -) +#add_test( +# NAME smlTest +# COMMAND smlTest +#) From 7c8a72809671b268820f6aff9b87629493a9ca01 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 17 May 2022 15:55:54 +0800 Subject: [PATCH 12/18] refactor:add the configuration of dataFormat --- include/common/tglobal.h | 1 + source/client/src/clientSml.c | 12 ++++++++---- source/common/src/tglobal.c | 3 +++ 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 32cfb5dbaa..1f7dbb8b62 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -127,6 +127,7 @@ extern bool tsStartUdfd; // schemaless extern char tsSmlChildTableName[]; +extern bool tsSmlDataFormat; // internal extern int32_t tsTransPullupInterval; diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index cdae558653..fe574b8ef6 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -166,7 +166,7 @@ typedef struct { SMLProtocolType protocol; int8_t precision; - bool dataFormat; // true means that the name, number and order of keys in each line are the same(only for influx protocol) + bool dataFormat; // true means that the name and order of keys in each line are the same(only for influx protocol) SHashObj *childTables; SHashObj *superTables; @@ -1422,7 +1422,7 @@ static void smlDestroyInfo(SSmlHandle* info){ taosMemoryFreeClear(info); } -static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocolType protocol, int8_t precision, bool dataFormat){ +static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocolType protocol, int8_t precision){ int32_t code = TSDB_CODE_SUCCESS; SSmlHandle* info = (SSmlHandle*)taosMemoryCalloc(1, sizeof(SSmlHandle)); if (NULL == info) { @@ -1454,7 +1454,11 @@ static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocol info->precision = precision; info->protocol = protocol; - info->dataFormat = dataFormat; + if(protocol == TSDB_SML_LINE_PROTOCOL){ + info->dataFormat = tsSmlDataFormat; + }else{ + info->dataFormat = true; + } info->pRequest = request; info->msgBuf.buf = info->pRequest->msgBuf; info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE; @@ -2335,7 +2339,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr return NULL; } - SSmlHandle* info = smlBuildSmlInfo(taos, request, (SMLProtocolType)protocol, precision, true); + SSmlHandle* info = smlBuildSmlInfo(taos, request, (SMLProtocolType)protocol, precision); if(!info){ return (TAOS_RES*)request; } diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 5beb5454de..33ba9473d0 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -79,6 +79,7 @@ uint16_t tsTelemPort = 80; // schemaless char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; //user defined child table name can be specified in tag value. //If set to empty system will generate table name using MD5 hash. +bool tsSmlDataFormat = true; // true means that the name and order of cols in each line are the same(only for influx protocol) // query int32_t tsQueryPolicy = 1; @@ -324,6 +325,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "keepColumnName", tsKeepOriginalColumnName, 1) != 0) return -1; if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1; if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1; + if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1; tsNumOfTaskQueueThreads = tsNumOfCores / 4; tsNumOfTaskQueueThreads = TRANGE(tsNumOfTaskQueueThreads, 1, 2); @@ -518,6 +520,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { } tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN); + tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval; tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32; tsCompressMsgSize = cfgGetItem(pCfg, "compressMsgSize")->i32; From afd352c84ff789da284ad5d014e5e28d552bd548 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 17 May 2022 15:57:48 +0800 Subject: [PATCH 13/18] refactor:add the configuration of dataFormat --- source/client/src/clientSml.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index fe574b8ef6..884756ced9 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1469,7 +1469,7 @@ static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocol info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); info->dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - if(!dataFormat){ + if(!info->dataFormat){ info->colsContainer = taosArrayInit(32, POINTER_BYTES); if(NULL == info->colsContainer){ uError("SML:0x%"PRIx64" create info failed", info->id); From c508c89a92ea2f20f07c01a6726ec074d44ed504 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 17 May 2022 16:02:03 +0800 Subject: [PATCH 14/18] enh(query): filter the child table that are not belongs to current super table when adding into candidate list. --- source/libs/executor/inc/executorimpl.h | 11 +++--- source/libs/executor/src/executor.c | 29 +++++++++++++-- source/libs/executor/src/executorimpl.c | 7 ++-- source/libs/executor/src/scanoperator.c | 48 ++++++++++++------------- 4 files changed, 59 insertions(+), 36 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 6100d416b1..bf178612ba 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -388,13 +388,15 @@ typedef struct SStreamBlockScanInfo { SColumnInfo* pCols; // the output column info uint64_t numOfRows; // total scanned rows uint64_t numOfExec; // execution times - void* readerHandle; // stream block reader handle + void* streamBlockReader;// stream block reader handle SArray* pColMatchInfo; // SNode* pCondition; SArray* tsArray; SUpdateInfo* pUpdateInfo; int32_t primaryTsIndex; // primary time stamp slot id void* pDataReader; + SReadHandle readHandle; + uint64_t tableUid; // queried super table uid EStreamScanMode scanMode; SOperatorInfo* pOperatorDumy; SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. @@ -706,9 +708,10 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SSDataBlock* pResBlock, - SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo, - SNode* pConditions, SOperatorInfo* pOperatorDumy); +SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SReadHandle* pHandle, + uint64_t uid, SSDataBlock* pResBlock, SArray* pColList, + SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pCondition, + SOperatorInfo* pOperatorDumy); SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal, diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 4863b03fb9..4cc46ea835 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -14,6 +14,7 @@ */ #include "executor.h" +#include #include "executorimpl.h" #include "planner.h" #include "tdatablock.h" @@ -46,7 +47,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) { - if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) { + if (tqReadHandleSetMsg(pInfo->streamBlockReader, input, 0) < 0) { qError("submit msg messed up when initing stream block, %s" PRIx64, id); return TSDB_CODE_QRY_APP_ERROR; } @@ -128,7 +129,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - // traverse to the streamscan node to add this table id + // traverse to the stream scanner node to add this table id SOperatorInfo* pInfo = pTaskInfo->pRoot; while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { pInfo = pInfo->pDownstream[0]; @@ -136,7 +137,29 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA SStreamBlockScanInfo* pScanInfo = pInfo->info; if (isAdd) { - int32_t code = tqReadHandleAddTbUidList(pScanInfo->readerHandle, tableIdList); + SArray* qa = taosArrayInit(4, sizeof(tb_uid_t)); + + SMetaReader mr = {0}; + metaReaderInit(&mr, pScanInfo->readHandle.meta, 0); + for(int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) { + int64_t* id = (int64_t*)taosArrayGet(tableIdList, i); + + int32_t code = metaGetTableEntryByUid(&mr, *id); + if (code != TSDB_CODE_SUCCESS) { + qError("failed to get table meta, uid:%"PRIu64" code:%s", *id, tstrerror(terrno)); + continue; + } + + ASSERT(mr.me.type == TSDB_CHILD_TABLE); + if (mr.me.ctbEntry.suid != pScanInfo->tableUid) { + continue; + } + + taosArrayPush(qa, id); + } + + qDebug(" %d qualified child tables added into stream scanner", (int32_t) taosArrayGetSize(qa)); + int32_t code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, qa); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e280724200..c261271728 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4703,10 +4703,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); SArray* tableIdList = extractTableIdList(pTableGroupInfo); - SSDataBlock* pResBlock = createResDataBlock(pDescNode); - SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); - SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pDataReader, pResBlock, pCols, tableIdList, pTaskInfo, + SSDataBlock* pResBlock = createResDataBlock(pDescNode); + SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); + + SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pDataReader, pHandle, pScanPhyNode->uid, pResBlock, pCols, tableIdList, pTaskInfo, pScanPhyNode->node.pConditions, pOperatorDumy); taosArrayDestroy(tableIdList); return pOperator; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1a862c94f3..df7478264f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -710,14 +710,14 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; blockDataCleanup(pInfo->pRes); - while (tqNextDataBlock(pInfo->readerHandle)) { + while (tqNextDataBlock(pInfo->streamBlockReader)) { SArray* pCols = NULL; uint64_t groupId = 0; uint64_t uid = 0; int32_t numOfRows = 0; int16_t outputCol = 0; - int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &uid, &numOfRows, &outputCol); + int32_t code = tqRetrieveDataBlock(&pCols, pInfo->streamBlockReader, &groupId, &uid, &numOfRows, &outputCol); if (code != TSDB_CODE_SUCCESS || numOfRows == 0) { pTaskInfo->code = code; @@ -791,9 +791,10 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { } } -SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, - SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, - SExecTaskInfo* pTaskInfo, SNode* pCondition, SOperatorInfo* pOperatorDumy ) { +SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SReadHandle* pHandle, + uint64_t uid, SSDataBlock* pResBlock, SArray* pColList, + SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pCondition, + SOperatorInfo* pOperatorDumy) { SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -829,37 +830,32 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR pInfo->tsArray = taosArrayInit(4, sizeof(TSKEY)); if (pInfo->tsArray == NULL) { - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); - return NULL; + goto _error; } pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, 10000); // TODO(liuyao) get watermark from physical plan if (pInfo->pUpdateInfo == NULL) { - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); - return NULL; + goto _error; } - pInfo->readerHandle = streamReadHandle; - pInfo->pRes = pResBlock; - pInfo->pCondition = pCondition; - pInfo->pDataReader = pDataReader; - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - pInfo->pOperatorDumy = pOperatorDumy; - pInfo->interval = pSTInfo->interval; + pInfo->readHandle = *pHandle; + pInfo->tableUid = uid; + pInfo->streamBlockReader = streamReadHandle; + pInfo->pRes = pResBlock; + pInfo->pCondition = pCondition; + pInfo->pDataReader = pDataReader; + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + pInfo->pOperatorDumy = pOperatorDumy; + pInfo->interval = pSTInfo->interval; - pOperator->name = "StreamBlockScanOperator"; + pOperator->name = "StreamBlockScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; pOperator->numOfExprs = pResBlock->info.numOfCols; - pOperator->fpSet._openFn = operatorDummyOpenFn; - pOperator->fpSet.getNextFn = doStreamBlockScan; - pOperator->fpSet.closeFn = operatorDummyCloseFn; - pOperator->pTaskInfo = pTaskInfo; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL); From 1d577dbc6999b03db4780cb2f4527543f566213e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 17 May 2022 16:06:57 +0800 Subject: [PATCH 15/18] fix(query): free resources after load table meta. --- source/libs/executor/src/executor.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 4cc46ea835..320450eb6e 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -158,6 +158,8 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA taosArrayPush(qa, id); } + metaReaderClear(&mr); + qDebug(" %d qualified child tables added into stream scanner", (int32_t) taosArrayGetSize(qa)); int32_t code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, qa); if (code != TSDB_CODE_SUCCESS) { From bdcaa3e1f3c8af653b36c5b7dedfd28f4a8b8503 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 17 May 2022 16:19:16 +0800 Subject: [PATCH 16/18] refactor:add the configuration of dataFormat --- source/client/test/smlTest.cpp | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/source/client/test/smlTest.cpp b/source/client/test/smlTest.cpp index 37cb1989b3..eeab844712 100644 --- a/source/client/test/smlTest.cpp +++ b/source/client/test/smlTest.cpp @@ -497,7 +497,7 @@ TEST(testCase, smlProcess_influx_Test) { SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); - SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); + SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); ASSERT_NE(info, nullptr); const char *sql[] = { @@ -618,7 +618,7 @@ TEST(testCase, smlParseLine_error_Test) { SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); - SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); + SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); ASSERT_NE(info, nullptr); const char *sql[] = { @@ -667,7 +667,7 @@ TEST(testCase, smlProcess_telnet_Test) { SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); - SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); + SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); ASSERT_NE(info, nullptr); const char *sql[] = { @@ -721,7 +721,7 @@ TEST(testCase, smlProcess_json1_Test) { SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); - SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); + SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); ASSERT_NE(info, nullptr); const char *sql = @@ -790,7 +790,7 @@ TEST(testCase, smlProcess_json2_Test) { SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); - SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); + SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); ASSERT_NE(info, nullptr); const char *sql = "{\n" @@ -834,7 +834,7 @@ TEST(testCase, smlProcess_json3_Test) { SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); - SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); + SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); ASSERT_NE(info, nullptr); const char *sql = "{\n" @@ -906,7 +906,7 @@ TEST(testCase, smlProcess_json4_Test) { SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); - SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); + SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); ASSERT_NE(info, nullptr); const char *sql = "{\n" " \"metric\": \"meter_current2\",\n" @@ -968,7 +968,7 @@ TEST(testCase, smlParseTelnetLine_error_Test) { SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); - SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); + SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); ASSERT_NE(info, nullptr); int32_t ret = 0; @@ -1017,7 +1017,7 @@ TEST(testCase, smlParseTelnetLine_diff_type_Test) { SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); - SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); + SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); ASSERT_NE(info, nullptr); const char *sql[2] = { @@ -1044,7 +1044,7 @@ TEST(testCase, smlParseTelnetLine_json_error_Test) { SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); - SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); + SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); ASSERT_NE(info, nullptr); int32_t ret = 0; @@ -1112,7 +1112,7 @@ TEST(testCase, smlParseTelnetLine_diff_json_type1_Test) { SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); - SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); + SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); ASSERT_NE(info, nullptr); const char *sql[2] = { @@ -1157,7 +1157,7 @@ TEST(testCase, smlParseTelnetLine_diff_json_type2_Test) { SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); - SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); + SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); ASSERT_NE(info, nullptr); const char *sql[2] = { From 8ceae5854d280131f8525762639af92b604034d6 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 17 May 2022 17:00:21 +0800 Subject: [PATCH 17/18] fix(tmq): fix memory leak --- source/dnode/mnode/impl/src/mndConsumer.c | 21 +++-- source/dnode/mnode/impl/src/mndSubscribe.c | 16 +++- source/dnode/mnode/impl/src/mndTopic.c | 92 ++++++++++++++-------- source/dnode/vnode/src/tq/tq.c | 1 + 4 files changed, 87 insertions(+), 43 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 76b8081849..820c25f1f2 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -135,16 +135,18 @@ FAIL: } static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { - SMqRebInfo *pRebSub = taosHashGet(pHash, key, strlen(key) + 1); - if (pRebSub == NULL) { - pRebSub = tNewSMqRebSubscribe(key); - if (pRebSub == NULL) { + SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1); + if (pRebInfo == NULL) { + pRebInfo = tNewSMqRebSubscribe(key); + if (pRebInfo == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebInfo)); + taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo)); + taosMemoryFree(pRebInfo); + pRebInfo = taosHashGet(pHash, key, strlen(key) + 1); } - return pRebSub; + return pRebInfo; } static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { @@ -305,8 +307,10 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) { ASSERT(pTopic); taosRLockLatch(&pTopic->lock); topicEp.schema.nCols = pTopic->schema.nCols; - topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema)); - memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema)); + if (topicEp.schema.nCols) { + topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema)); + memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema)); + } taosRUnLockLatch(&pTopic->lock); mndReleaseTopic(pMnode, pTopic); @@ -517,6 +521,7 @@ SUBSCRIBE_OVER: } if (pConsumerNew) { tDeleteSMqConsumerObj(pConsumerNew); + taosMemoryFree(pConsumerNew); } // TODO: replace with destroy subscribe msg if (subscribe.topicNames) taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 3e932e8a67..9aee411ece 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -502,9 +502,9 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) { SMqRebInputObj rebInput = {0}; SMqRebOutputObj rebOutput = {0}; - rebOutput.newConsumers = taosArrayInit(0, sizeof(void *)); - rebOutput.removedConsumers = taosArrayInit(0, sizeof(void *)); - rebOutput.touchedConsumers = taosArrayInit(0, sizeof(void *)); + rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t)); + rebOutput.removedConsumers = taosArrayInit(0, sizeof(int64_t)); + rebOutput.touchedConsumers = taosArrayInit(0, sizeof(int64_t)); rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg)); SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter; @@ -547,6 +547,16 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) { if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) { mError("persist rebalance output error, possibly vnode splitted or dropped"); } + taosArrayDestroy(pRebInfo->lostConsumers); + taosArrayDestroy(pRebInfo->newConsumers); + taosArrayDestroy(pRebInfo->removedConsumers); + + taosArrayDestroy(rebOutput.newConsumers); + taosArrayDestroy(rebOutput.touchedConsumers); + taosArrayDestroy(rebOutput.removedConsumers); + taosArrayDestroy(rebOutput.rebVgs); + tDeleteSubscribeObj(rebOutput.pSub); + taosMemoryFree(rebOutput.pSub); } // reset flag diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index bc650ec95b..2d9681f543 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -72,8 +72,15 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) { SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { terrno = TSDB_CODE_OUT_OF_MEMORY; - int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1; - int32_t schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema); + void *swBuf = NULL; + int32_t physicalPlanLen = 0; + if (pTopic->physicalPlan) { + physicalPlanLen = strlen(pTopic->physicalPlan) + 1; + } + int32_t schemaLen = 0; + if (pTopic->schema.nCols) { + taosEncodeSSchemaWrapper(NULL, &pTopic->schema); + } int32_t size = sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + MND_TOPIC_RESERVE_SIZE; SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size); @@ -96,18 +103,24 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER); - SDB_SET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_ENCODE_OVER); - SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER); - SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER); - - void *swBuf = taosMemoryMalloc(schemaLen); - if (swBuf == NULL) { - goto TOPIC_ENCODE_OVER; + if (pTopic->astLen) { + SDB_SET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_ENCODE_OVER); + } + SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER); + if (physicalPlanLen) { + SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER); } - void *aswBuf = swBuf; - taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema); SDB_SET_INT32(pRaw, dataPos, schemaLen, TOPIC_ENCODE_OVER); - SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER); + if (schemaLen) { + swBuf = taosMemoryMalloc(schemaLen); + if (swBuf == NULL) { + goto TOPIC_ENCODE_OVER; + } + void *aswBuf = swBuf; + taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema); + SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER); + } + SDB_SET_INT32(pRaw, dataPos, pTopic->refConsumerCnt, TOPIC_ENCODE_OVER); SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER); @@ -116,6 +129,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { terrno = TSDB_CODE_SUCCESS; TOPIC_ENCODE_OVER: + if (swBuf) taosMemoryFree(swBuf); if (terrno != TSDB_CODE_SUCCESS) { mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr()); sdbFreeRaw(pRaw); @@ -168,29 +182,43 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &pTopic->astLen, TOPIC_DECODE_OVER); - pTopic->ast = taosMemoryCalloc(pTopic->astLen, sizeof(char)); - if (pTopic->ast == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto TOPIC_DECODE_OVER; + if (pTopic->astLen) { + pTopic->ast = taosMemoryCalloc(pTopic->astLen, sizeof(char)); + if (pTopic->ast == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto TOPIC_DECODE_OVER; + } + } else { + pTopic->ast = NULL; } SDB_GET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); - pTopic->physicalPlan = taosMemoryCalloc(len, sizeof(char)); - if (pTopic->physicalPlan == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto TOPIC_DECODE_OVER; + if (len) { + pTopic->physicalPlan = taosMemoryCalloc(len, sizeof(char)); + if (pTopic->physicalPlan == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto TOPIC_DECODE_OVER; + } + SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER); + } else { + pTopic->physicalPlan = NULL; } - SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); - void *buf = taosMemoryMalloc(len); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto TOPIC_DECODE_OVER; - } - SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER); - if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) { - goto TOPIC_DECODE_OVER; + if (len) { + void *buf = taosMemoryMalloc(len); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto TOPIC_DECODE_OVER; + } + SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER); + if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) { + goto TOPIC_DECODE_OVER; + } + } else { + pTopic->schema.nCols = 0; + pTopic->schema.sver = 0; + pTopic->schema.pSchema = NULL; } SDB_GET_INT32(pRaw, dataPos, &pTopic->refConsumerCnt, TOPIC_DECODE_OVER); @@ -340,9 +368,9 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq return -1; } } else { - topicObj.ast = strdup(""); - topicObj.astLen = 1; - topicObj.physicalPlan = strdup(""); + topicObj.ast = NULL; + topicObj.astLen = 0; + topicObj.physicalPlan = NULL; topicObj.subType = TOPIC_SUB_TYPE__DB; topicObj.withTbName = 1; topicObj.withSchema = 1; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bc9893b8a0..6cc986d54b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -613,6 +613,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { fetchOffset++; } + taosMemoryFree(pHeadWithCkSum); ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum); ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum); From d762ec63714c5cc88559fbee788993fb18fe2ffe Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 17 May 2022 17:36:01 +0800 Subject: [PATCH 18/18] enh(tmq): adaptive schema --- source/client/inc/clientInt.h | 6 +++++- source/dnode/mnode/impl/src/mndTopic.c | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index f9d257af98..516b289f08 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -234,6 +234,10 @@ static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool conver if (msg->rsp.withSchema) { SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(msg->rsp.blockSchema, msg->resIter); setResSchemaInfo(&msg->resInfo, pSW->pSchema, pSW->nCols); + taosMemoryFreeClear(msg->resInfo.row); + taosMemoryFreeClear(msg->resInfo.pCol); + taosMemoryFreeClear(msg->resInfo.length); + taosMemoryFreeClear(msg->resInfo.convertBuf); } setQueryResultFromRsp(&msg->resInfo, pRetrieve, convertUcs4); return &msg->resInfo; @@ -310,7 +314,7 @@ void hbMgrInitMqHbRspHandle(); SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery, void** res); int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList); int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** res); -int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest); +int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 2d9681f543..6c57eb714c 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -79,7 +79,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { } int32_t schemaLen = 0; if (pTopic->schema.nCols) { - taosEncodeSSchemaWrapper(NULL, &pTopic->schema); + schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema); } int32_t size = sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + MND_TOPIC_RESERVE_SIZE;