From 3a3c0867b3cdd77c938a87b4894e78d114291e4c Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 14 May 2022 18:59:53 +0800 Subject: [PATCH 01/11] refactor:add escape for schemaless --- source/client/src/clientSml.c | 271 +++++++++++++++++---------------- source/client/test/smlTest.cpp | 94 +++++++----- 2 files changed, 193 insertions(+), 172 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index fc909cb0a3..f388afec1d 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -9,7 +9,6 @@ #include "tdef.h" #include "tlog.h" #include "tmsg.h" -#include "tstrbuild.h" #include "ttime.h" #include "ttypes.h" #include "tcommon.h" @@ -26,6 +25,35 @@ #define SLASH '\\' #define tsMaxSQLStringLen (1024*1024) +#define JUMP_SPACE(sql) while (*sql != '\0'){if(*sql == SPACE) sql++;else break;} +// comma , +#define IS_SLASH_COMMA(sql) (*(sql) == COMMA && *((sql) - 1) == SLASH) +#define IS_COMMA(sql) (*(sql) == COMMA && *((sql) - 1) != SLASH) +// space +#define IS_SLASH_SPACE(sql) (*(sql) == SPACE && *((sql) - 1) == SLASH) +#define IS_SPACE(sql) (*(sql) == SPACE && *((sql) - 1) != SLASH) +// equal = +#define IS_SLASH_EQUAL(sql) (*(sql) == EQUAL && *((sql) - 1) == SLASH) +#define IS_EQUAL(sql) (*(sql) == EQUAL && *((sql) - 1) != SLASH) +// quote " +#define IS_SLASH_QUOTE(sql) (*(sql) == QUOTE && *((sql) - 1) == SLASH) +#define IS_QUOTE(sql) (*(sql) == QUOTE && *((sql) - 1) != SLASH) +// SLASH +#define IS_SLASH_SLASH(sql) (*(sql) == SLASH && *((sql) - 1) == SLASH) + +#define IS_SLASH_LETTER(sql) (IS_SLASH_COMMA(sql) || IS_SLASH_SPACE(sql) || IS_SLASH_EQUAL(sql) || IS_SLASH_QUOTE(sql) || IS_SLASH_SLASH(sql)) + +#define MOVE_FORWARD_ONE(sql,len) (memmove((void*)((sql) - 1), (sql), len)) + +#define PROCESS_SLASH(key,keyLen) \ +for (int i = 1; i < keyLen; ++i) { \ + if(IS_SLASH_LETTER(key+i)){ \ + MOVE_FORWARD_ONE(key+i, keyLen-i); \ + i--; \ + keyLen--; \ + } \ +} + #define OTD_MAX_FIELDS_NUM 2 #define OTD_JSON_SUB_FIELDS_NUM 2 #define OTD_JSON_FIELDS_NUM 4 @@ -760,7 +788,7 @@ static int64_t smlParseInfluxTime(SSmlHandle* info, const char* data, int32_t le smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp precision", NULL); return -1; } - if(!data){ + if(len == 0){ return smlGetTimeNow(tsType); } @@ -850,66 +878,56 @@ static bool smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) { static int32_t smlParseInfluxString(const char* sql, SSmlLineInfo *elements, SSmlMsgBuf *msg){ if(!sql) return TSDB_CODE_SML_INVALID_DATA; - while (*sql != '\0') { // jump the space at the begining - if(*sql != SPACE) { - elements->measure = sql; - break; - } - sql++; - } - if (!elements->measure || *sql == COMMA) { - smlBuildInvalidDataMsg(msg, "invalid data", sql); - return TSDB_CODE_SML_INVALID_DATA; - } + JUMP_SPACE(sql) + if(*sql == COMMA) return TSDB_CODE_SML_INVALID_DATA; + elements->measure = sql; - // parse measure and tag + // parse measure while (*sql != '\0') { - if (elements->measureLen == 0 && *sql == COMMA && *(sql - 1) != SLASH) { // find the first comma - elements->measureLen = sql - elements->measure; - sql++; - elements->tags = sql; + if(IS_SLASH_LETTER(sql)){ + MOVE_FORWARD_ONE(sql,strlen(sql) + 1); continue; } - - if (*sql == SPACE && *(sql - 1) != SLASH) { // find the first space - if (elements->measureLen == 0) { - elements->measureLen = sql - elements->measure; - elements->tags = sql; - } - elements->tagsLen = sql - elements->tags; - elements->measureTagsLen = sql - elements->measure; + if(IS_COMMA(sql)){ break; } + if(IS_SPACE(sql)){ + break; + } sql++; } - if(elements->tagsLen == 0){ // measure, cols1=a measure cols1=a - elements->measureTagsLen = elements->measureLen; - } + elements->measureLen = sql - elements->measure; if(elements->measureLen == 0) { - smlBuildInvalidDataMsg(msg, "invalid measure", elements->measure); + smlBuildInvalidDataMsg(msg, "measure is empty", NULL); return TSDB_CODE_SML_INVALID_DATA; } + // parse tag + if(*sql == SPACE){ + elements->tagsLen = 0; + }else{ + if(*sql == COMMA) sql++; + elements->tags = sql; + while (*sql != '\0') { + if(IS_SPACE(sql)){ + break; + } + sql++; + } + elements->tagsLen = sql - elements->tags; + } + elements->measureTagsLen = sql - elements->measure; + // parse cols - while (*sql != '\0') { - if(*sql != SPACE) { - elements->cols = sql; - break; - } - sql++; - } - if(!elements->cols) { - smlBuildInvalidDataMsg(msg, "invalid columns", elements->cols); - return TSDB_CODE_SML_INVALID_DATA; - } - + JUMP_SPACE(sql) + elements->cols = sql; bool isInQuote = false; while (*sql != '\0') { - if(*sql == QUOTE && *(sql - 1) != SLASH){ + if(IS_QUOTE(sql)){ isInQuote = !isInQuote; } - if(!isInQuote && *sql == SPACE && *(sql - 1) != SLASH) { + if(!isInQuote && IS_SPACE(sql)){ break; } sql++; @@ -919,20 +937,21 @@ static int32_t smlParseInfluxString(const char* sql, SSmlLineInfo *elements, SSm return TSDB_CODE_SML_INVALID_DATA; } elements->colsLen = sql - elements->cols; + if(elements->colsLen == 0) { + smlBuildInvalidDataMsg(msg, "cols is empty", NULL); + return TSDB_CODE_SML_INVALID_DATA; + } - // parse ts,ts can be empty + // parse timestamp + JUMP_SPACE(sql) + elements->timestamp = sql; while (*sql != '\0') { - if(*sql != SPACE && elements->timestamp == NULL) { - elements->timestamp = sql; - } - if(*sql == SPACE && elements->timestamp != NULL){ + if(*sql == SPACE){ break; } sql++; } - if(elements->timestamp){ - elements->timestampLen = sql - elements->timestamp; - } + elements->timestampLen = sql - elements->timestamp; return TSDB_CODE_SUCCESS; } @@ -949,43 +968,58 @@ static void smlParseTelnetElement(const char **sql, const char **data, int32_t * } } -static int32_t smlParseTelnetTags(const char* data, int32_t len, SArray *cols, SHashObj *dumplicateKey, SSmlMsgBuf *msg){ - for(int i = 0; i < len; i++){ - // parse key - const char *key = data + i; +static int32_t smlParseTelnetTags(const char* data, SArray *cols, SHashObj *dumplicateKey, SSmlMsgBuf *msg){ + const char *sql = data; + while(*sql != '\0'){ + JUMP_SPACE(sql) + const char *key = sql; int32_t keyLen = 0; - while(i < len){ - if(data[i] == EQUAL){ - keyLen = data + i - key; + + // parse key + while(*sql != '\0'){ + if(*sql == SPACE) { + smlBuildInvalidDataMsg(msg, "invalid data", sql); + return TSDB_CODE_SML_INVALID_DATA; + } + if(*sql == EQUAL) { + keyLen = sql - key; + sql++; break; } - i++; + sql++; } + if(keyLen == 0 || keyLen >= TSDB_COL_NAME_LEN){ smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key); return TSDB_CODE_SML_INVALID_DATA; } - if(smlCheckDuplicateKey(key, keyLen, dumplicateKey)){ smlBuildInvalidDataMsg(msg, "dumplicate key", key); return TSDB_CODE_TSC_DUP_TAG_NAMES; } // parse value - i++; - const char *value = data + i; - while(i < len){ - if(data[i] == SPACE){ + const char *value = sql; + int32_t valueLen = 0; + while(*sql != '\0') { + // parse value + if (*sql == SPACE) { break; } - i++; + if (*sql == EQUAL) { + smlBuildInvalidDataMsg(msg, "invalid data", sql); + return TSDB_CODE_SML_INVALID_DATA; + } + sql++; } - int32_t valueLen = data + i - value; + valueLen = sql - value; + sql++; + JUMP_SPACE(sql) + if(valueLen == 0){ smlBuildInvalidDataMsg(msg, "invalid value", value); return TSDB_CODE_SML_INVALID_DATA; } - // add kv to SSmlKv SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); if(!kv) return TSDB_CODE_OUT_OF_MEMORY; @@ -993,13 +1027,14 @@ static int32_t smlParseTelnetTags(const char* data, int32_t len, SArray *cols, S kv->keyLen = keyLen; kv->value = value; kv->length = valueLen; - kv->type = TSDB_DATA_TYPE_NCHAR; //todo + kv->type = TSDB_DATA_TYPE_NCHAR; if(cols) taosArrayPush(cols, &kv); } return TSDB_CODE_SUCCESS; } + // format: =[ =] static int32_t smlParseTelnetString(SSmlHandle *info, const char* sql, SSmlTableInfo *tinfo, SArray *cols){ if(!sql) return TSDB_CODE_SML_INVALID_DATA; @@ -1048,10 +1083,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char* sql, SSmlTable } // parse tags - while(*sql == SPACE){ - sql++; - } - ret = smlParseTelnetTags(sql, strlen(sql), tinfo->tags, info->dumplicateKey, &info->msgBuf); + ret = smlParseTelnetTags(sql, tinfo->tags, info->dumplicateKey, &info->msgBuf); if (ret != TSDB_CODE_SUCCESS) { smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql); return TSDB_CODE_SML_INVALID_DATA; @@ -1073,49 +1105,67 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is return TSDB_CODE_SUCCESS; } - for(int i = 0; i < len; i++){ - // parse key - const char *key = data + i; + const char *sql = data; + while(sql < data + len){ + const char *key = sql; int32_t keyLen = 0; - while(i < len){ - if(data[i] == EQUAL && i > 0 && data[i-1] != SLASH){ - keyLen = data + i - key; + + while(sql < data + len){ + // parse key + if(IS_COMMA(sql)) { + smlBuildInvalidDataMsg(msg, "invalid data", sql); + return TSDB_CODE_SML_INVALID_DATA; + } + if(IS_EQUAL(sql)) { + keyLen = sql - key; + sql++; break; } - i++; + sql++; } + if(keyLen == 0 || keyLen >= TSDB_COL_NAME_LEN){ smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key); return TSDB_CODE_SML_INVALID_DATA; } - if(smlCheckDuplicateKey(key, keyLen, dumplicateKey)){ smlBuildInvalidDataMsg(msg, "dumplicate key", key); return TSDB_CODE_TSC_DUP_TAG_NAMES; } // parse value - i++; - const char *value = data + i; + const char *value = sql; + int32_t valueLen = 0; bool isInQuote = false; - while(i < len){ - if(!isTag && data[i] == QUOTE && data[i-1] != SLASH){ + while(sql < data + len) { + // parse value + if(!isTag && IS_QUOTE(sql)){ isInQuote = !isInQuote; + sql++; + continue; } - if(!isInQuote && data[i] == COMMA && i > 0 && data[i-1] != SLASH){ + if (!isInQuote && IS_COMMA(sql)) { break; } - i++; + if (!isInQuote && IS_EQUAL(sql)) { + smlBuildInvalidDataMsg(msg, "invalid data", sql); + return TSDB_CODE_SML_INVALID_DATA; + } + sql++; } - if(!isTag && isInQuote){ + valueLen = sql - value; + sql++; + + if(isInQuote){ smlBuildInvalidDataMsg(msg, "only one quote", value); return TSDB_CODE_SML_INVALID_DATA; } - int32_t valueLen = data + i - value; if(valueLen == 0){ smlBuildInvalidDataMsg(msg, "invalid value", value); return TSDB_CODE_SML_INVALID_DATA; } + PROCESS_SLASH(key, keyLen) + PROCESS_SLASH(value, valueLen) // add kv to SSmlKv SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); @@ -1138,49 +1188,6 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is return TSDB_CODE_SUCCESS; } -//static int32_t parseSmlCols(const char* data, SArray *cols){ -// while(*data != '\0'){ -// if(*data == EQUAL) return TSDB_CODE_SML_INVALID_DATA; -// const char *key = data; -// int32_t keyLen = 0; -// while(*data != '\0'){ -// if(*data == EQUAL && *(data-1) != SLASH){ -// keyLen = data - key; -// data ++; -// break; -// } -// data++; -// } -// if(keyLen == 0){ -// return TSDB_CODE_SML_INVALID_DATA; -// } -// -// if(*data == COMMA) return TSDB_CODE_SML_INVALID_DATA; -// const char *value = data; -// int32_t valueLen = 0; -// while(*data != '\0'){ -// if(*data == COMMA && *(data-1) != SLASH){ -// valueLen = data - value; -// data ++; -// break; -// } -// data++; -// } -// if(valueLen == 0){ -// return TSDB_CODE_SML_INVALID_DATA; -// } -// -// TAOS_SML_KV *kv = taosMemoryCalloc(sizeof(TAOS_SML_KV), 1); -// kv->key = key; -// kv->keyLen = keyLen; -// kv->value = value; -// kv->valueLen = valueLen; -// kv->type = TSDB_DATA_TYPE_NCHAR; -// if(cols) taosArrayPush(cols, &kv); -// } -// return TSDB_CODE_SUCCESS; -//} - static bool smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, SSmlMsgBuf *msg){ for (int i = 0; i < taosArrayGetSize(cols); ++i) { //jump timestamp SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i); diff --git a/source/client/test/smlTest.cpp b/source/client/test/smlTest.cpp index b9870633db..3565815b6c 100644 --- a/source/client/test/smlTest.cpp +++ b/source/client/test/smlTest.cpp @@ -41,12 +41,14 @@ TEST(testCase, smlParseInfluxString_Test) { SSmlLineInfo elements = {0}; // case 1 - char *sql = "st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000 ,32,c=3"; + char *tmp = "\\,st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000 ,32,c=3"; + char *sql = (char*)taosMemoryCalloc(256, 1); + memcpy(sql, tmp, strlen(tmp) + 1); int ret = smlParseInfluxString(sql, &elements, &msgBuf); ASSERT_EQ(ret, 0); ASSERT_EQ(elements.measure, sql); - ASSERT_EQ(elements.measureLen, strlen("st")); - ASSERT_EQ(elements.measureTagsLen, strlen("st,t1=3,t2=4,t3=t3")); + ASSERT_EQ(elements.measureLen, strlen(",st")); + ASSERT_EQ(elements.measureTagsLen, strlen(",st,t1=3,t2=4,t3=t3")); ASSERT_EQ(elements.tags, sql + elements.measureLen + 1); ASSERT_EQ(elements.tagsLen, strlen("t1=3,t2=4,t3=t3")); @@ -58,21 +60,24 @@ TEST(testCase, smlParseInfluxString_Test) { ASSERT_EQ(elements.timestampLen, strlen("1626006833639000000")); // case 2 false - sql = "st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000"; + tmp = "st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000"; + memcpy(sql, tmp, strlen(tmp) + 1); memset(&elements, 0, sizeof(SSmlLineInfo)); ret = smlParseInfluxString(sql, &elements, &msgBuf); ASSERT_NE(ret, 0); // case 3 false - sql = "st, t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000"; + tmp = "st, t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000"; + memcpy(sql, tmp, strlen(tmp) + 1); memset(&elements, 0, sizeof(SSmlLineInfo)); ret = smlParseInfluxString(sql, &elements, &msgBuf); ASSERT_EQ(ret, 0); - ASSERT_EQ(elements.cols, sql + elements.measureTagsLen + 2); + ASSERT_EQ(elements.cols, sql + elements.measureTagsLen + 1); ASSERT_EQ(elements.colsLen, strlen("t1=3,t2=4,t3=t3")); // case 4 tag is null - sql = "st, c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000"; + tmp = "st, c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000"; + memcpy(sql, tmp, strlen(tmp) + 1); memset(&elements, 0, sizeof(SSmlLineInfo)); ret = smlParseInfluxString(sql, &elements, &msgBuf); ASSERT_EQ(ret, 0); @@ -90,44 +95,45 @@ TEST(testCase, smlParseInfluxString_Test) { ASSERT_EQ(elements.timestampLen, strlen("1626006833639000000")); // case 5 tag is null - sql = " st c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000 "; + tmp = " st c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000 "; + memcpy(sql, tmp, strlen(tmp) + 1); memset(&elements, 0, sizeof(SSmlLineInfo)); ret = smlParseInfluxString(sql, &elements, &msgBuf); - sql++; ASSERT_EQ(ret, 0); - ASSERT_EQ(elements.measure, sql); + ASSERT_EQ(elements.measure, sql+1); ASSERT_EQ(elements.measureLen, strlen("st")); ASSERT_EQ(elements.measureTagsLen, strlen("st")); - ASSERT_EQ(elements.tags, sql + elements.measureLen); + ASSERT_EQ(elements.tags, sql + 1 + elements.measureLen); ASSERT_EQ(elements.tagsLen, 0); - ASSERT_EQ(elements.cols, sql + elements.measureTagsLen + 3); + ASSERT_EQ(elements.cols, sql + 1 + elements.measureTagsLen + 3); ASSERT_EQ(elements.colsLen, strlen("c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64")); - ASSERT_EQ(elements.timestamp, sql + elements.measureTagsLen + 3 + elements.colsLen + 2); + ASSERT_EQ(elements.timestamp, sql + 1 + elements.measureTagsLen + 3 + elements.colsLen + 2); ASSERT_EQ(elements.timestampLen, strlen("1626006833639000000")); // case 6 - sql = " st c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 "; + tmp = " st c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 "; + memcpy(sql, tmp, strlen(tmp) + 1); memset(&elements, 0, sizeof(SSmlLineInfo)); ret = smlParseInfluxString(sql, &elements, &msgBuf); ASSERT_EQ(ret, 0); // case 7 - sql = " st , "; - memset(&elements, 0, sizeof(SSmlLineInfo)); - ret = smlParseInfluxString(sql, &elements, &msgBuf); - sql++; - ASSERT_EQ(ret, 0); - ASSERT_EQ(elements.cols, sql + elements.measureTagsLen + 3); - ASSERT_EQ(elements.colsLen, strlen(",")); - - // case 8 false - sql = ", st , "; + tmp = " st , "; + memcpy(sql, tmp, strlen(tmp) + 1); memset(&elements, 0, sizeof(SSmlLineInfo)); ret = smlParseInfluxString(sql, &elements, &msgBuf); ASSERT_NE(ret, 0); + + // case 8 false + tmp = ", st , "; + memcpy(sql, tmp, strlen(tmp) + 1); + memset(&elements, 0, sizeof(SSmlLineInfo)); + ret = smlParseInfluxString(sql, &elements, &msgBuf); + ASSERT_NE(ret, 0); + taosMemoryFree(sql); } TEST(testCase, smlParseCols_Error_Test) { @@ -188,7 +194,8 @@ TEST(testCase, smlParseCols_Error_Test) { "c=-3.402823466e+39u64", "c=-339u64", "c=18446744073709551616u64", - "c=1,c=2" + "c=1,c=2", + "c=1=2" }; SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); @@ -198,9 +205,12 @@ TEST(testCase, smlParseCols_Error_Test) { msgBuf.buf = msg; msgBuf.len = 256; int32_t len = strlen(data[i]); - int32_t ret = smlParseCols(data[i], len, NULL, false, dumplicateKey, &msgBuf); + char *sql = (char*)taosMemoryCalloc(256, 1); + memcpy(sql, data[i], len + 1); + int32_t ret = smlParseCols(sql, len, NULL, false, dumplicateKey, &msgBuf); ASSERT_NE(ret, TSDB_CODE_SUCCESS); taosHashClear(dumplicateKey); + taosMemoryFree(sql); } taosHashCleanup(dumplicateKey); } @@ -216,7 +226,7 @@ TEST(testCase, smlParseCols_tag_Test) { SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); const char *data = - "cbin=\"passit helloc=2\",cnch=L\"iisdfsf\",cbool=false,cf64=4.31f64,cf64_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l\"iuwq\""; + "cbin=\"passit helloc\",cnch=L\"iisdfsf\",cbool=false,cf64=4.31f64,cf64_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l\"iuwq\""; int32_t len = strlen(data); int32_t ret = smlParseCols(data, len, cols, true, dumplicateKey, &msgBuf); ASSERT_EQ(ret, TSDB_CODE_SUCCESS); @@ -278,20 +288,22 @@ TEST(testCase, smlParseCols_Test) { SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - const char *data = "cbin=\"passit hello,c=2\",cnch=L\"iisdfsf\",cbool=false,cf64=4.31f64,cf64_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l\"iuwq\""; + const char *data = "cb\\=in=\"pass\\,it hello,c=2\",cnch=L\"ii\\=sdfsf\",cbool=false,cf64=4.31f64,cf64_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l\"iuwq\""; int32_t len = strlen(data); - int32_t ret = smlParseCols(data, len, cols, false, dumplicateKey, &msgBuf); + char *sql = (char*)taosMemoryCalloc(1024, 1); + memcpy(sql, data, len + 1); + int32_t ret = smlParseCols(sql, len, cols, false, dumplicateKey, &msgBuf); ASSERT_EQ(ret, TSDB_CODE_SUCCESS); int32_t size = taosArrayGetSize(cols); ASSERT_EQ(size, 19); // binary SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, 0); - ASSERT_EQ(strncasecmp(kv->key, "cbin", 4), 0); + ASSERT_EQ(strncasecmp(kv->key, "cb=in", 5), 0); ASSERT_EQ(kv->keyLen, 4); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_BINARY); ASSERT_EQ(kv->length, 16); - ASSERT_EQ(strncasecmp(kv->value, "passit", 6), 0); + ASSERT_EQ(strncasecmp(kv->value, "pass,it ", 8), 0); taosMemoryFree(kv); // nchar @@ -300,7 +312,7 @@ TEST(testCase, smlParseCols_Test) { ASSERT_EQ(kv->keyLen, 4); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR); ASSERT_EQ(kv->length, 7); - ASSERT_EQ(strncasecmp(kv->value, "iisd", 4), 0); + ASSERT_EQ(strncasecmp(kv->value, "ii=sd", 5), 0); taosMemoryFree(kv); // bool @@ -463,6 +475,7 @@ TEST(testCase, smlParseCols_Test) { taosArrayDestroy(cols); taosHashCleanup(dumplicateKey); + taosMemoryFree(sql); } TEST(testCase, smlProcess_influx_Test) { @@ -492,7 +505,7 @@ TEST(testCase, smlProcess_influx_Test) { "readings,name=truck_2,fleet=North,driver=Derek,model=F-150 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451609400000000000", "readings,fleet=South,name=truck_0,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451629400000000000", "stable,t1=t1,t2=t2,t3=t3 c1=1,c2=2,c3=3,c4=4 1451629500000000000", - "stable,t2=t2,t1=t1,t3=t3 c1=1,c3=3,c4=4 1451629600000000000" + "stable,t2=t2,t1=t1,t3=t3 c1=1,c3=3,c4=4 1451629600000000000", }; smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0])); @@ -575,12 +588,11 @@ TEST(testCase, smlProcess_telnet_Test) { SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); ASSERT_NE(info, nullptr); - const char *sql[5] = { - "sys.if.bytes.out 1479496100 1.3E0 host=web01 interface=eth0", - "sys.if.bytes.out 1479496101 1.3E1 interface=eth0 host=web01", - "sys.if.bytes.out 1479496102 1.3E3 network=tcp", - "sys.procs.running 1479496100 42 host=web01 ", - " sys.procs.running 1479496200 42 host=web01=4" + const char *sql[4] = { + "sys.if.bytes.out 1479496100 1.3E0 host=web01 interface=eth0", + "sys.if.bytes.out 1479496101 1.3E1 interface=eth0 host=web01 ", + "sys.if.bytes.out 1479496102 1.3E3 network=tcp", + " sys.procs.running 1479496100 42 host=web01 " }; int ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0])); ASSERT_EQ(ret, 0); @@ -857,7 +869,7 @@ TEST(testCase, smlParseTelnetLine_error_Test) { ASSERT_NE(info, nullptr); int32_t ret = 0; - const char *sql[19] = { + const char *sql[] = { "sys.procs.running 14794961040 42 host=web01", "sys.procs.running 14791040 42 host=web01", "sys.procs.running erere 42 host=web01", @@ -877,6 +889,8 @@ TEST(testCase, smlParseTelnetLine_error_Test) { "sys.procs.running 1479496100 42 host=web01 cpu= ", "sys.procs.running 1479496100 42 host=web01 host=w2", "sys.procs.running 1479496100 42 host=web01 host", + "sys.procs.running 1479496100 42 host=web01=er", + "sys.procs.running 1479496100 42 host= web01", }; for(int i = 0; i < sizeof(sql)/sizeof(sql[0]); i++){ ret = smlParseTelnetLine(info, (void*)sql[i]); From 60e8bc24cd768131023f1cc7670520f4c78b203a Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 14 May 2022 19:17:53 +0800 Subject: [PATCH 02/11] fix: some problems of parser and planner --- include/common/ttime.h | 15 +-- include/libs/qcom/query.h | 119 +++++++++++------- include/util/taoserror.h | 2 + source/libs/nodes/src/nodesCodeFuncs.c | 98 +++++++++++++-- source/libs/parser/src/parAstCreater.c | 17 +++ source/libs/parser/src/parAuthenticator.c | 13 +- source/libs/parser/src/parCalcConst.c | 4 +- source/libs/parser/src/parTranslater.c | 35 +++++- source/libs/parser/src/parUtil.c | 4 + source/libs/parser/test/parSelectTest.cpp | 6 +- source/libs/planner/src/planOptimizer.c | 10 +- source/libs/planner/src/planPhysiCreater.c | 18 +++ source/libs/planner/test/planJoinTest.cpp | 14 ++- source/libs/planner/test/planSubqueryTest.cpp | 8 +- source/libs/planner/test/planTestMain.cpp | 32 ++++- source/libs/planner/test/planTestUtil.cpp | 1 + source/libs/planner/test/planTestUtil.h | 2 + 17 files changed, 315 insertions(+), 83 deletions(-) diff --git a/include/common/ttime.h b/include/common/ttime.h index 3de0b98d85..cd704bb1f7 100644 --- a/include/common/ttime.h +++ b/include/common/ttime.h @@ -59,10 +59,11 @@ static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) { * precision == TSDB_TIME_PRECISION_NANO, it returns timestamp in nanosecond. */ static FORCE_INLINE int64_t taosGetTimestampToday(int32_t precision) { - int64_t factor = (precision == TSDB_TIME_PRECISION_MILLI) ? 1000 : - (precision == TSDB_TIME_PRECISION_MICRO) ? 1000000 : 1000000000; - time_t t = taosTime(NULL); - struct tm * tm= taosLocalTime(&t, NULL); + int64_t factor = (precision == TSDB_TIME_PRECISION_MILLI) ? 1000 + : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000000 + : 1000000000; + time_t t = taosTime(NULL); + struct tm* tm = taosLocalTime(&t, NULL); tm->tm_hour = 0; tm->tm_min = 0; tm->tm_sec = 0; @@ -79,13 +80,13 @@ int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* durati int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth); void deltaToUtcInitOnce(); -char getPrecisionUnit(int32_t precision); +char getPrecisionUnit(int32_t precision); int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision); int64_t convertTimeFromPrecisionToUnit(int64_t time, int32_t fromPrecision, char toUnit); -int32_t convertStringToTimestamp(int16_t type, char *inputData, int64_t timePrec, int64_t *timeVal); +int32_t convertStringToTimestamp(int16_t type, char* inputData, int64_t timePrec, int64_t* timeVal); -void taosFormatUtcTime(char *buf, int32_t bufLen, int64_t time, int32_t precision); +void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t time, int32_t precision); #ifdef __cplusplus } diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index c390f67153..711db65e97 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -51,14 +51,12 @@ typedef struct STableComInfo { } STableComInfo; typedef struct SIndexMeta { - #ifdef WINDOWS size_t avoidCompilationErrors; #endif } SIndexMeta; - /* * ASSERT(sizeof(SCTableMeta) == 24) * ASSERT(tableType == TSDB_CHILD_TABLE) @@ -95,7 +93,7 @@ typedef struct SDBVgInfo { int32_t vgVersion; int8_t hashMethod; int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT - SHashObj *vgHash; //key:vgId, value:SVgroupInfo + SHashObj* vgHash; // key:vgId, value:SVgroupInfo } SDBVgInfo; typedef struct SUseDbOutput { @@ -135,7 +133,7 @@ typedef struct SMsgSendInfo { } SMsgSendInfo; typedef struct SQueryNodeStat { - int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT + int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT } SQueryNodeStat; int32_t initTaskQueue(); @@ -172,7 +170,7 @@ const SSchema* tGetTbnameColumnSchema(); bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta); -char *jobTaskStatusStr(int32_t status); +char* jobTaskStatusStr(int32_t status); SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name); @@ -184,62 +182,87 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE #define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE -#define NEED_CLIENT_RM_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_PAR_TABLE_NOT_EXIST || (_code) == TSDB_CODE_VND_TB_NOT_EXIST) -#define NEED_CLIENT_REFRESH_VG_ERROR(_code) ((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID) +#define NEED_CLIENT_RM_TBLMETA_ERROR(_code) \ + ((_code) == TSDB_CODE_PAR_TABLE_NOT_EXIST || (_code) == TSDB_CODE_VND_TB_NOT_EXIST) +#define NEED_CLIENT_REFRESH_VG_ERROR(_code) \ + ((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID) #define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_TABLE_RECREATED) -#define NEED_CLIENT_HANDLE_ERROR(_code) (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code)) +#define NEED_CLIENT_HANDLE_ERROR(_code) \ + (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \ + NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code)) -#define NEED_SCHEDULER_RETRY_ERROR(_code) ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) +#define NEED_SCHEDULER_RETRY_ERROR(_code) \ + ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) #define REQUEST_MAX_TRY_TIMES 5 -#define qFatal(...) \ - do { \ - if (qDebugFlag & DEBUG_FATAL) { \ - taosPrintLog("QRY FATAL ", DEBUG_FATAL, qDebugFlag, __VA_ARGS__); \ - } \ +#define qFatal(...) \ + do { \ + if (qDebugFlag & DEBUG_FATAL) { \ + taosPrintLog("QRY FATAL ", DEBUG_FATAL, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + } \ } while (0) -#define qError(...) \ - do { \ - if (qDebugFlag & DEBUG_ERROR) { \ - taosPrintLog("QRY ERROR ", DEBUG_ERROR, qDebugFlag, __VA_ARGS__); \ - } \ +#define qError(...) \ + do { \ + if (qDebugFlag & DEBUG_ERROR) { \ + taosPrintLog("QRY ERROR ", DEBUG_ERROR, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + } \ } while (0) -#define qWarn(...) \ - do { \ - if (qDebugFlag & DEBUG_WARN) { \ - taosPrintLog("QRY WARN ", DEBUG_WARN, qDebugFlag, __VA_ARGS__); \ - } \ +#define qWarn(...) \ + do { \ + if (qDebugFlag & DEBUG_WARN) { \ + taosPrintLog("QRY WARN ", DEBUG_WARN, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + } \ } while (0) -#define qInfo(...) \ - do { \ - if (qDebugFlag & DEBUG_INFO) { \ - taosPrintLog("QRY ", DEBUG_INFO, qDebugFlag, __VA_ARGS__); \ - } \ +#define qInfo(...) \ + do { \ + if (qDebugFlag & DEBUG_INFO) { \ + taosPrintLog("QRY ", DEBUG_INFO, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + } \ } while (0) -#define qDebug(...) \ - do { \ - if (qDebugFlag & DEBUG_DEBUG) { \ - taosPrintLog("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \ - } \ +#define qDebug(...) \ + do { \ + if (qDebugFlag & DEBUG_DEBUG) { \ + taosPrintLog("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + } \ } while (0) -#define qTrace(...) \ - do { \ - if (qDebugFlag & DEBUG_TRACE) { \ - taosPrintLog("QRY ", DEBUG_TRACE, qDebugFlag, __VA_ARGS__); \ - } \ +#define qTrace(...) \ + do { \ + if (qDebugFlag & DEBUG_TRACE) { \ + taosPrintLog("QRY ", DEBUG_TRACE, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + } \ } while (0) -#define qDebugL(...) \ - do { \ - if (qDebugFlag & DEBUG_DEBUG) { \ - taosPrintLongString("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \ - } \ +#define qDebugL(...) \ + do { \ + if (qDebugFlag & DEBUG_DEBUG) { \ + taosPrintLongString("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + } \ } while (0) -#define QRY_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) -#define QRY_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) -#define QRY_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) - +#define QRY_ERR_RET(c) \ + do { \ + int32_t _code = c; \ + if (_code != TSDB_CODE_SUCCESS) { \ + terrno = _code; \ + return _code; \ + } \ + } while (0) +#define QRY_RET(c) \ + do { \ + int32_t _code = c; \ + if (_code != TSDB_CODE_SUCCESS) { \ + terrno = _code; \ + } \ + return _code; \ + } while (0) +#define QRY_ERR_JRET(c) \ + do { \ + code = c; \ + if (code != TSDB_CODE_SUCCESS) { \ + terrno = code; \ + goto _return; \ + } \ + } while (0) #ifdef __cplusplus } diff --git a/include/util/taoserror.h b/include/util/taoserror.h index aa2b32daab..ae083780e4 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -635,6 +635,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_PERMISSION_DENIED TAOS_DEF_ERROR_CODE(0, 0x2644) #define TSDB_CODE_PAR_INVALID_STREAM_QUERY TAOS_DEF_ERROR_CODE(0, 0x2645) #define TSDB_CODE_PAR_INVALID_INTERNAL_PK TAOS_DEF_ERROR_CODE(0, 0x2646) +#define TSDB_CODE_PAR_INVALID_TIMELINE_FUNC TAOS_DEF_ERROR_CODE(0, 0x2647) +#define TSDB_CODE_PAR_INVALID_PASSWD TAOS_DEF_ERROR_CODE(0, 0x2648) //planner #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 71b0774ca6..dba202a45f 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1142,9 +1142,13 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { return code; } -static int32_t physiStreamScanNodeToJson(const void* pObj, SJson* pJson) { return physiTableScanNodeToJson(pObj, pJson); } +static int32_t physiStreamScanNodeToJson(const void* pObj, SJson* pJson) { + return physiTableScanNodeToJson(pObj, pJson); +} -static int32_t jsonToPhysiStreamScanNode(const SJson* pJson, void* pObj) { return jsonToPhysiTableScanNode(pJson, pObj); } +static int32_t jsonToPhysiStreamScanNode(const SJson* pJson, void* pObj) { + return jsonToPhysiTableScanNode(pJson, pObj); +} static const char* jkSysTableScanPhysiPlanMnodeEpSet = "MnodeEpSet"; static const char* jkSysTableScanPhysiPlanShowRewrite = "ShowRewrite"; @@ -2347,6 +2351,30 @@ static int32_t jsonToRealTableNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkTempTableSubquery = "Subquery"; + +static int32_t tempTableNodeToJson(const void* pObj, SJson* pJson) { + const STempTableNode* pNode = (const STempTableNode*)pObj; + + int32_t code = tableNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkTempTableSubquery, nodeToJson, pNode->pSubquery); + } + + return code; +} + +static int32_t jsonToTempTableNode(const SJson* pJson, void* pObj) { + STempTableNode* pNode = (STempTableNode*)pObj; + + int32_t code = jsonToTableNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkTempTableSubquery, &pNode->pSubquery); + } + + return code; +} + static const char* jkGroupingSetType = "GroupingSetType"; static const char* jkGroupingSetParameter = "Parameters"; @@ -2659,6 +2687,59 @@ static int32_t jsonToDataBlockDescNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkSetOperatorOpType = "OpType"; +static const char* jkSetOperatorProjections = "Projections"; +static const char* jkSetOperatorLeft = "Left"; +static const char* jkSetOperatorRight = "Right"; +static const char* jkSetOperatorOrderByList = "OrderByList"; +static const char* jkSetOperatorLimit = "Limit"; + +static int32_t setOperatorToJson(const void* pObj, SJson* pJson) { + const SSetOperator* pNode = (const SSetOperator*)pObj; + + int32_t code = tjsonAddIntegerToObject(pJson, jkSetOperatorOpType, pNode->opType); + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkSetOperatorProjections, pNode->pProjectionList); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkSetOperatorLeft, nodeToJson, pNode->pLeft); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkSetOperatorRight, nodeToJson, pNode->pRight); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkSetOperatorOrderByList, pNode->pOrderByList); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkSetOperatorLimit, nodeToJson, pNode->pLimit); + } + + return code; +} + +static int32_t jsonToSetOperator(const SJson* pJson, void* pObj) { + SSetOperator* pNode = (SSetOperator*)pObj; + + int32_t code = tjsonGetNumberValue(pJson, jkSetOperatorOpType, pNode->opType); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkSetOperatorProjections, &pNode->pProjectionList); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkSetOperatorLeft, &pNode->pLeft); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkSetOperatorRight, &pNode->pRight); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkSetOperatorOrderByList, &pNode->pOrderByList); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkSetOperatorLimit, &pNode->pLimit); + } + + return code; +} + static const char* jkSelectStmtDistinct = "Distinct"; static const char* jkSelectStmtProjections = "Projections"; static const char* jkSelectStmtFrom = "From"; @@ -2673,7 +2754,7 @@ static const char* jkSelectStmtSlimit = "Slimit"; static const char* jkSelectStmtStmtName = "StmtName"; static const char* jkSelectStmtHasAggFuncs = "HasAggFuncs"; -static int32_t selectStmtTojson(const void* pObj, SJson* pJson) { +static int32_t selectStmtToJson(const void* pObj, SJson* pJson) { const SSelectStmt* pNode = (const SSelectStmt*)pObj; int32_t code = tjsonAddBoolToObject(pJson, jkSelectStmtDistinct, pNode->isDistinct); @@ -2815,6 +2896,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_REAL_TABLE: return realTableNodeToJson(pObj, pJson); case QUERY_NODE_TEMP_TABLE: + return tempTableNodeToJson(pObj, pJson); case QUERY_NODE_JOIN_TABLE: break; case QUERY_NODE_GROUPING_SET: @@ -2844,9 +2926,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_DOWNSTREAM_SOURCE: return downstreamSourceNodeToJson(pObj, pJson); case QUERY_NODE_SET_OPERATOR: - break; + return setOperatorToJson(pObj, pJson); case QUERY_NODE_SELECT_STMT: - return selectStmtTojson(pObj, pJson); + return selectStmtToJson(pObj, pJson); case QUERY_NODE_VNODE_MODIF_STMT: case QUERY_NODE_CREATE_DATABASE_STMT: case QUERY_NODE_CREATE_TABLE_STMT: @@ -2914,7 +2996,6 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_PHYSICAL_PLAN: return planToJson(pObj, pJson); default: - // assert(0); break; } nodesWarn("specificNodeToJson unknown node = %s", nodesNodeName(nodeType(pObj))); @@ -2935,6 +3016,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToFunctionNode(pJson, pObj); case QUERY_NODE_REAL_TABLE: return jsonToRealTableNode(pJson, pObj); + case QUERY_NODE_TEMP_TABLE: + return jsonToTempTableNode(pJson, pObj); case QUERY_NODE_ORDER_BY_EXPR: return jsonToOrderByExprNode(pJson, pObj); case QUERY_NODE_INTERVAL_WINDOW: @@ -2951,6 +3034,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToSlotDescNode(pJson, pObj); case QUERY_NODE_DOWNSTREAM_SOURCE: return jsonToDownstreamSourceNode(pJson, pObj); + case QUERY_NODE_SET_OPERATOR: + return jsonToSetOperator(pJson, pObj); case QUERY_NODE_SELECT_STMT: return jsonToSelectStmt(pJson, pObj); case QUERY_NODE_CREATE_TOPIC_STMT: @@ -3003,7 +3088,6 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { case QUERY_NODE_PHYSICAL_PLAN: return jsonToPlan(pJson, pObj); default: - assert(0); break; } nodesWarn("jsonToSpecificNode unknown node = %s", nodesNodeName(nodeType(pObj))); diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 639da98f48..7dc2978ec7 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -14,6 +14,8 @@ * along with this program. If not, see . */ +#include + #include "parAst.h" #include "parUtil.h" #include "ttime.h" @@ -76,6 +78,19 @@ static bool checkUserName(SAstCreateContext* pCxt, SToken* pUserName) { return TSDB_CODE_SUCCESS == pCxt->errCode; } +static bool invalidPassword(const char* pPassword) { + regex_t regex; + + if (regcomp(®ex, "[ '\"`\\]", REG_EXTENDED | REG_ICASE) != 0) { + return false; + } + + /* Execute regular expression */ + int32_t res = regexec(®ex, pPassword, 0, NULL, 0); + regfree(®ex); + return 0 == res; +} + static bool checkPassword(SAstCreateContext* pCxt, const SToken* pPasswordToken, char* pPassword) { if (NULL == pPasswordToken) { pCxt->errCode = TSDB_CODE_PAR_SYNTAX_ERROR; @@ -86,6 +101,8 @@ static bool checkPassword(SAstCreateContext* pCxt, const SToken* pPasswordToken, strdequote(pPassword); if (strtrim(pPassword) <= 0) { pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_PASSWD_EMPTY); + } else if (invalidPassword(pPassword)) { + pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_PASSWD); } } return TSDB_CODE_SUCCESS == pCxt->errCode; diff --git a/source/libs/parser/src/parAuthenticator.c b/source/libs/parser/src/parAuthenticator.c index 8f686cefce..ad1dca6857 100644 --- a/source/libs/parser/src/parAuthenticator.c +++ b/source/libs/parser/src/parAuthenticator.c @@ -65,13 +65,19 @@ static int32_t authSetOperator(SAuthCxt* pCxt, SSetOperator* pSetOper) { return code; } +static int32_t authDropUser(SAuthCxt* pCxt, SDropUserReq* pStmt) { + if (!pCxt->pParseCxt->isSuperUser || 0 == strcmp(pStmt->user, TSDB_DEFAULT_USER)) { + return TSDB_CODE_PAR_PERMISSION_DENIED; + } + return TSDB_CODE_SUCCESS; +} + static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) { switch (nodeType(pStmt)) { case QUERY_NODE_SET_OPERATOR: return authSetOperator(pCxt, (SSetOperator*)pStmt); case QUERY_NODE_SELECT_STMT: return authSelect(pCxt, (SSelectStmt*)pStmt); - case QUERY_NODE_VNODE_MODIF_STMT: case QUERY_NODE_CREATE_DATABASE_STMT: case QUERY_NODE_DROP_DATABASE_STMT: case QUERY_NODE_ALTER_DATABASE_STMT: @@ -84,7 +90,10 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) { case QUERY_NODE_ALTER_TABLE_STMT: case QUERY_NODE_CREATE_USER_STMT: case QUERY_NODE_ALTER_USER_STMT: - case QUERY_NODE_DROP_USER_STMT: + break; + case QUERY_NODE_DROP_USER_STMT: { + return authDropUser(pCxt, (SDropUserReq*)pStmt); + } case QUERY_NODE_USE_DATABASE_STMT: case QUERY_NODE_CREATE_DNODE_STMT: case QUERY_NODE_DROP_DNODE_STMT: diff --git a/source/libs/parser/src/parCalcConst.c b/source/libs/parser/src/parCalcConst.c index 9c2bd10686..646ef4cf62 100644 --- a/source/libs/parser/src/parCalcConst.c +++ b/source/libs/parser/src/parCalcConst.c @@ -262,9 +262,9 @@ static int32_t calcConstQuery(SCalcConstContext* pCxt, SNode* pStmt, bool subque break; case QUERY_NODE_SET_OPERATOR: { SSetOperator* pSetOp = (SSetOperator*)pStmt; - code = calcConstQuery(pCxt, pSetOp->pLeft, subquery); + code = calcConstQuery(pCxt, pSetOp->pLeft, false); if (TSDB_CODE_SUCCESS == code) { - code = calcConstQuery(pCxt, pSetOp->pRight, subquery); + code = calcConstQuery(pCxt, pSetOp->pRight, false); } break; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index c7f0c13de0..bc4fa9169c 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -480,6 +480,31 @@ static EDealRes translateColumn(STranslateContext* pCxt, SColumnNode* pCol) { return res; } +static int32_t parseTimeFromValueNode(SValueNode* pVal) { + if (IS_SIGNED_NUMERIC_TYPE(pVal->node.resType.type)) { + return TSDB_CODE_SUCCESS; + } else if (IS_UNSIGNED_NUMERIC_TYPE(pVal->node.resType.type)) { + pVal->datum.i = pVal->datum.u; + return TSDB_CODE_SUCCESS; + } else if (IS_FLOAT_TYPE(pVal->node.resType.type)) { + pVal->datum.i = pVal->datum.d; + return TSDB_CODE_SUCCESS; + } else if (TSDB_DATA_TYPE_BOOL == pVal->node.resType.type) { + pVal->datum.i = pVal->datum.b; + return TSDB_CODE_SUCCESS; + } else if (IS_VAR_DATA_TYPE(pVal->node.resType.type)) { + if (TSDB_CODE_SUCCESS == taosParseTime(pVal->literal, &pVal->datum.i, pVal->node.resType.bytes, + pVal->node.resType.precision, tsDaylight)) { + return TSDB_CODE_SUCCESS; + } + char* pEnd = NULL; + pVal->datum.i = strtoll(pVal->literal, &pEnd, 10); + return (NULL != pEnd && '\0' == *pEnd) ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED; + } else { + return TSDB_CODE_FAILED; + } +} + static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SDataType targetDt) { uint8_t precision = (NULL != pCxt->pCurrStmt ? pCxt->pCurrStmt->precision : targetDt.precision); pVal->node.resType.precision = precision; @@ -571,7 +596,7 @@ static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SD break; } case TSDB_DATA_TYPE_TIMESTAMP: { - if (taosParseTime(pVal->literal, &pVal->datum.i, targetDt.bytes, precision, tsDaylight) != TSDB_CODE_SUCCESS) { + if (TSDB_CODE_SUCCESS != parseTimeFromValueNode(pVal)) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal); } *(int64_t*)&pVal->typeData = pVal->datum.i; @@ -1658,10 +1683,10 @@ static int32_t createPrimaryKeyColByTable(STranslateContext* pCxt, STableNode* p if (NULL == pCol) { return TSDB_CODE_OUT_OF_MEMORY; } - if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) { - setColumnInfoBySchema((SRealTableNode*)pTable, ((SRealTableNode*)pTable)->pMeta->schema, false, pCol); - } else { - // todo + pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID; + strcpy(pCol->colName, PK_TS_COL_INTERNAL_NAME); + if (!findAndSetColumn(pCol, pTable)) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TIMELINE_FUNC); } *pPrimaryKey = (SNode*)pCol; return TSDB_CODE_SUCCESS; diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 43aea8de7c..676fe5dbfd 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -148,6 +148,10 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "Invalid number of tag columns"; case TSDB_CODE_PAR_INVALID_INTERNAL_PK: return "Invalid _c0 or _rowts expression"; + case TSDB_CODE_PAR_INVALID_TIMELINE_FUNC: + return "Invalid timeline function"; + case TSDB_CODE_PAR_INVALID_PASSWD: + return "Invalid password"; case TSDB_CODE_OUT_OF_MEMORY: return "Out of memory"; default: diff --git a/source/libs/parser/test/parSelectTest.cpp b/source/libs/parser/test/parSelectTest.cpp index 0ba062ebe4..23b82d54bd 100644 --- a/source/libs/parser/test/parSelectTest.cpp +++ b/source/libs/parser/test/parSelectTest.cpp @@ -235,9 +235,11 @@ TEST_F(ParserSelectTest, semanticError) { TEST_F(ParserSelectTest, setOperator) { useDb("root", "test"); - run("SELECT * FROM t1 UNION ALL SELECT * FROM t1"); + // run("SELECT * FROM t1 UNION ALL SELECT * FROM t1"); - run("(SELECT * FROM t1) UNION ALL (SELECT * FROM t1)"); + // run("(SELECT * FROM t1) UNION ALL (SELECT * FROM t1)"); + + run("SELECT c1 FROM (SELECT c1 FROM t1 UNION ALL SELECT c1 FROM t1)"); } } // namespace ParserTest diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 9968f63c5d..edac2a879f 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -582,7 +582,7 @@ static bool cpdIsPrimaryKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) { return false; } - SOperatorNode* pOper = (SOperatorNode*)pJoin->pOnConditions; + SOperatorNode* pOper = (SOperatorNode*)pCond; if (OP_TYPE_EQUAL != pOper->opType) { return false; } @@ -608,12 +608,16 @@ static int32_t cpdCheckLogicCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, if (LOGIC_COND_TYPE_AND != pOnCond->condType) { return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL); } + bool hasPrimaryKeyEqualCond = false; SNode* pCond = NULL; FOREACH(pCond, pOnCond->pParameterList) { - if (!cpdIsPrimaryKeyEqualCond(pJoin, pCond)) { - return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL); + if (cpdIsPrimaryKeyEqualCond(pJoin, pCond)) { + hasPrimaryKeyEqualCond = true; } } + if (!hasPrimaryKeyEqualCond) { + return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL); + } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index edf44424e3..affe9ef2f6 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -261,6 +261,22 @@ typedef struct SSetSlotIdCxt { SHashObj* pRightHash; } SSetSlotIdCxt; +static void dumpSlots(const char* pName, SHashObj* pHash) { + if (NULL == pHash) { + return; + } + planDebug("%s", pName); + void* pIt = taosHashIterate(pHash, NULL); + while (NULL != pIt) { + size_t len = 0; + char* pKey = taosHashGetKey(pIt, &len); + char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN] = {0}; + strncpy(name, pKey, len); + planDebug("\tslot name = %s", name); + pIt = taosHashIterate(pHash, pIt); + } +} + static EDealRes doSetSlotId(SNode* pNode, void* pContext) { if (QUERY_NODE_COLUMN == nodeType(pNode) && 0 != strcmp(((SColumnNode*)pNode)->colName, "*")) { SSetSlotIdCxt* pCxt = (SSetSlotIdCxt*)pContext; @@ -273,6 +289,8 @@ static EDealRes doSetSlotId(SNode* pNode, void* pContext) { // pIndex is definitely not NULL, otherwise it is a bug if (NULL == pIndex) { planError("doSetSlotId failed, invalid slot name %s", name); + dumpSlots("left datablock desc", pCxt->pLeftHash); + dumpSlots("right datablock desc", pCxt->pRightHash); pCxt->errCode = TSDB_CODE_PLAN_INTERNAL_ERROR; return DEAL_RES_ERROR; } diff --git a/source/libs/planner/test/planJoinTest.cpp b/source/libs/planner/test/planJoinTest.cpp index 4098d383f8..714900c4e5 100644 --- a/source/libs/planner/test/planJoinTest.cpp +++ b/source/libs/planner/test/planJoinTest.cpp @@ -23,10 +23,16 @@ class PlanJoinTest : public PlannerTestBase {}; TEST_F(PlanJoinTest, basic) { useDb("root", "test"); - run("select t1.c1, t2.c2 from st1s1 t1, st1s2 t2 where t1.ts = t2.ts"); + run("SELECT t1.c1, t2.c2 FROM st1s1 t1, st1s2 t2 WHERE t1.ts = t2.ts"); - run("select t1.*, t2.* from st1s1 t1, st1s2 t2 where t1.ts = t2.ts"); + run("SELECT t1.*, t2.* FROM st1s1 t1, st1s2 t2 WHERE t1.ts = t2.ts"); - // run("select t1.c1, t2.c1 from st1s1 t1 join st1s2 t2 on t1.ts = t2.ts where t1.c1 > t2.c1 and t1.c2 = 'abc' and " - // "t2.c2 = 'qwe'"); + run("SELECT t1.c1, t2.c1 FROM st1s1 t1 JOIN st1s2 t2 ON t1.ts = t2.ts"); +} + +TEST_F(PlanJoinTest, withWhere) { + useDb("root", "test"); + + run("SELECT t1.c1, t2.c1 FROM st1s1 t1 JOIN st1s2 t2 ON t1.ts = t2.ts " + "WHERE t1.c1 > t2.c1 AND t1.c2 = 'abc' AND t2.c2 = 'qwe'"); } diff --git a/source/libs/planner/test/planSubqueryTest.cpp b/source/libs/planner/test/planSubqueryTest.cpp index f45cbc6f8f..11e5e98052 100644 --- a/source/libs/planner/test/planSubqueryTest.cpp +++ b/source/libs/planner/test/planSubqueryTest.cpp @@ -23,9 +23,13 @@ class PlanSubqeuryTest : public PlannerTestBase {}; TEST_F(PlanSubqeuryTest, basic) { useDb("root", "test"); - run("SELECT * FROM (SELECT * FROM t1)"); + if (0 == g_skipSql) { + run("SELECT * FROM (SELECT * FROM t1)"); - // run("SELECT LAST(c1) FROM ( SELECT * FROM t1)"); + run("SELECT LAST(c1) FROM (SELECT * FROM t1)"); + } + + run("SELECT c1 FROM (SELECT c1 FROM t1 UNION ALL SELECT c1 FROM t1)"); } TEST_F(PlanSubqeuryTest, doubleGroupBy) { diff --git a/source/libs/planner/test/planTestMain.cpp b/source/libs/planner/test/planTestMain.cpp index 464c636b66..36f66ddff6 100644 --- a/source/libs/planner/test/planTestMain.cpp +++ b/source/libs/planner/test/planTestMain.cpp @@ -25,23 +25,53 @@ class PlannerEnv : public testing::Environment { virtual void SetUp() { initMetaDataEnv(); generateMetaData(); + initLog("/tmp/td"); } virtual void TearDown() { destroyMetaDataEnv(); } PlannerEnv() {} virtual ~PlannerEnv() {} + + private: + void initLog(const char* path) { + dDebugFlag = 143; + vDebugFlag = 0; + mDebugFlag = 143; + cDebugFlag = 0; + jniDebugFlag = 0; + tmrDebugFlag = 135; + uDebugFlag = 135; + rpcDebugFlag = 143; + qDebugFlag = 143; + wDebugFlag = 0; + sDebugFlag = 0; + tsdbDebugFlag = 0; + tsLogEmbedded = 1; + tsAsyncLog = 0; + + taosRemoveDir(path); + taosMkDir(path); + tstrncpy(tsLogDir, path, PATH_MAX); + if (taosInitLog("taoslog", 1) != 0) { + std::cout << "failed to init log file" << std::endl; + } + } }; static void parseArg(int argc, char* argv[]) { int opt = 0; const char* optstring = ""; - static struct option long_options[] = {{"dump", optional_argument, NULL, 'd'}, {0, 0, 0, 0}}; + static struct option long_options[] = { + {"dump", optional_argument, NULL, 'd'}, {"skipSql", optional_argument, NULL, 's'}, {0, 0, 0, 0}}; while ((opt = getopt_long(argc, argv, optstring, long_options, NULL)) != -1) { switch (opt) { case 'd': setDumpModule(optarg); break; + case 's': + g_skipSql = 1; + break; default: break; } diff --git a/source/libs/planner/test/planTestUtil.cpp b/source/libs/planner/test/planTestUtil.cpp index b2c590667e..6b038ae8ea 100644 --- a/source/libs/planner/test/planTestUtil.cpp +++ b/source/libs/planner/test/planTestUtil.cpp @@ -47,6 +47,7 @@ enum DumpModule { }; DumpModule g_dumpModule = DUMP_MODULE_NOTHING; +int32_t g_skipSql = 0; void setDumpModule(const char* pModule) { if (NULL == pModule) { diff --git a/source/libs/planner/test/planTestUtil.h b/source/libs/planner/test/planTestUtil.h index 7913ef531f..a63bba1a97 100644 --- a/source/libs/planner/test/planTestUtil.h +++ b/source/libs/planner/test/planTestUtil.h @@ -32,6 +32,8 @@ class PlannerTestBase : public testing::Test { std::unique_ptr impl_; }; +extern int32_t g_skipSql; + extern void setDumpModule(const char* pModule); #endif // PLAN_TEST_UTIL_H From 2d456be89e65bc294ce3ea78eca19c08e4f5bf89 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 14 May 2022 19:31:53 +0800 Subject: [PATCH 03/11] fix: some problems of parser and planner --- source/libs/nodes/src/nodesCodeFuncs.c | 108 ++++++++++++++++--------- 1 file changed, 72 insertions(+), 36 deletions(-) diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 6a80c29303..57dfcaeddd 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -318,15 +318,19 @@ static int32_t jsonToTableComInfo(const SJson* pJson, void* pObj) { STableComInfo* pNode = (STableComInfo*)pObj; int32_t code; - tjsonGetNumberValue(pJson, jkTableComInfoNumOfTags, pNode->numOfTags, code);; + tjsonGetNumberValue(pJson, jkTableComInfoNumOfTags, pNode->numOfTags, code); + ; if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkTableComInfoPrecision, pNode->precision, code);; + tjsonGetNumberValue(pJson, jkTableComInfoPrecision, pNode->precision, code); + ; } if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkTableComInfoNumOfColumns, pNode->numOfColumns, code);; + tjsonGetNumberValue(pJson, jkTableComInfoNumOfColumns, pNode->numOfColumns, code); + ; } if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkTableComInfoRowSize, pNode->rowSize, code);; + tjsonGetNumberValue(pJson, jkTableComInfoRowSize, pNode->rowSize, code); + ; } return code; @@ -358,12 +362,15 @@ static int32_t jsonToSchema(const SJson* pJson, void* pObj) { SSchema* pNode = (SSchema*)pObj; int32_t code; - tjsonGetNumberValue(pJson, jkSchemaType, pNode->type, code);; + tjsonGetNumberValue(pJson, jkSchemaType, pNode->type, code); + ; if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkSchemaColId, pNode->colId, code);; + tjsonGetNumberValue(pJson, jkSchemaColId, pNode->colId, code); + ; } if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkSchemaBytes, pNode->bytes, code);; + tjsonGetNumberValue(pJson, jkSchemaBytes, pNode->bytes, code); + ; } if (TSDB_CODE_SUCCESS == code) { code = tjsonGetStringValue(pJson, jkSchemaName, pNode->name); @@ -415,21 +422,27 @@ static int32_t jsonToTableMeta(const SJson* pJson, void* pObj) { STableMeta* pNode = (STableMeta*)pObj; int32_t code; - tjsonGetNumberValue(pJson, jkTableMetaVgId, pNode->vgId, code);; + tjsonGetNumberValue(pJson, jkTableMetaVgId, pNode->vgId, code); + ; if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkTableMetaTableType, pNode->tableType, code);; + tjsonGetNumberValue(pJson, jkTableMetaTableType, pNode->tableType, code); + ; } if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkTableMetaUid, pNode->uid, code);; + tjsonGetNumberValue(pJson, jkTableMetaUid, pNode->uid, code); + ; } if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkTableMetaSuid, pNode->suid, code);; + tjsonGetNumberValue(pJson, jkTableMetaSuid, pNode->suid, code); + ; } if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkTableMetaSversion, pNode->sversion, code);; + tjsonGetNumberValue(pJson, jkTableMetaSversion, pNode->sversion, code); + ; } if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkTableMetaTversion, pNode->tversion, code);; + tjsonGetNumberValue(pJson, jkTableMetaTversion, pNode->tversion, code); + ; } if (TSDB_CODE_SUCCESS == code) { code = tjsonToObject(pJson, jkTableMetaComInfo, jsonToTableComInfo, &pNode->tableInfo); @@ -605,7 +618,8 @@ static int32_t jsonToLogicFillNode(const SJson* pJson, void* pObj) { int32_t code = jsonToLogicPlanNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkFillLogicPlanMode, pNode->mode, code);; + tjsonGetNumberValue(pJson, jkFillLogicPlanMode, pNode->mode, code); + ; } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkFillLogicPlanWStartTs, &pNode->pWStartTs); @@ -881,7 +895,8 @@ static int32_t jsonToLogicSubplan(const SJson* pJson, void* pObj) { code = jsonToNodeObject(pJson, jkLogicSubplanRootNode, (SNode**)&pNode->pNode); } if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkLogicSubplanType, pNode->subplanType, code);; + tjsonGetNumberValue(pJson, jkLogicSubplanType, pNode->subplanType, code); + ; } int32_t objSize = 0; if (TSDB_CODE_SUCCESS == code) { @@ -1121,25 +1136,31 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { code = tjsonGetDoubleValue(pJson, jkTableScanPhysiPlanRatio, &pNode->ratio); } if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkTableScanPhysiPlanDataRequired, pNode->dataRequired, code);; + tjsonGetNumberValue(pJson, jkTableScanPhysiPlanDataRequired, pNode->dataRequired, code); + ; } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkTableScanPhysiPlanDynamicScanFuncs, &pNode->pDynamicScanFuncs); } if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkTableScanPhysiPlanInterval, pNode->interval, code);; + tjsonGetNumberValue(pJson, jkTableScanPhysiPlanInterval, pNode->interval, code); + ; } if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkTableScanPhysiPlanOffset, pNode->offset, code);; + tjsonGetNumberValue(pJson, jkTableScanPhysiPlanOffset, pNode->offset, code); + ; } if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSliding, pNode->sliding, code);; + tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSliding, pNode->sliding, code); + ; } if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkTableScanPhysiPlanIntervalUnit, pNode->intervalUnit, code);; + tjsonGetNumberValue(pJson, jkTableScanPhysiPlanIntervalUnit, pNode->intervalUnit, code); + ; } if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSlidingUnit, pNode->slidingUnit, code);; + tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSlidingUnit, pNode->slidingUnit, code); + ; } return code; @@ -1185,7 +1206,8 @@ static int32_t jsonToPhysiSysTableScanNode(const SJson* pJson, void* pObj) { code = tjsonGetBoolValue(pJson, jkSysTableScanPhysiPlanShowRewrite, &pNode->showRewrite); } if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkSysTableScanPhysiPlanAccountId, pNode->accountId, code);; + tjsonGetNumberValue(pJson, jkSysTableScanPhysiPlanAccountId, pNode->accountId, code); + ; } return code; @@ -1269,7 +1291,8 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) { int32_t code = jsonToPhysicPlanNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkJoinPhysiPlanJoinType, pNode->joinType, code);; + tjsonGetNumberValue(pJson, jkJoinPhysiPlanJoinType, pNode->joinType, code); + ; } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pOnConditions); @@ -1431,10 +1454,12 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) { code = jsonToNodeObject(pJson, jkWindowPhysiPlanTsPk, (SNode**)&pNode->pTspk); } if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkWindowPhysiPlanTriggerType, pNode->triggerType, code);; + tjsonGetNumberValue(pJson, jkWindowPhysiPlanTriggerType, pNode->triggerType, code); + ; } if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkWindowPhysiPlanWatermark, pNode->watermark, code);; + tjsonGetNumberValue(pJson, jkWindowPhysiPlanWatermark, pNode->watermark, code); + ; } return code; @@ -1530,7 +1555,8 @@ static int32_t jsonToPhysiFillNode(const SJson* pJson, void* pObj) { int32_t code = jsonToPhysicPlanNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkFillPhysiPlanMode, pNode->mode, code);; + tjsonGetNumberValue(pJson, jkFillPhysiPlanMode, pNode->mode, code); + ; } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkFillPhysiPlanWStartTs, &pNode->pWStartTs); @@ -1569,7 +1595,8 @@ static int32_t jsonToPhysiSessionWindowNode(const SJson* pJson, void* pObj) { int32_t code = jsonToPhysiWindowNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkSessionWindowPhysiPlanGap, pNode->gap, code);; + tjsonGetNumberValue(pJson, jkSessionWindowPhysiPlanGap, pNode->gap, code); + ; } return code; @@ -1731,7 +1758,8 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) { int32_t code = tjsonToObject(pJson, jkSubplanId, jsonToSubplanId, &pNode->id); if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkSubplanType, pNode->subplanType, code);; + tjsonGetNumberValue(pJson, jkSubplanType, pNode->subplanType, code); + ; } if (TSDB_CODE_SUCCESS == code) { code = tjsonGetIntValue(pJson, jkSubplanMsgType, &pNode->msgType); @@ -1921,7 +1949,8 @@ static int32_t jsonToColumnNode(const SJson* pJson, void* pObj) { code = tjsonGetSmallIntValue(pJson, jkColumnColId, &pNode->colId); } if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkColumnColType, pNode->colType, code);; + tjsonGetNumberValue(pJson, jkColumnColType, pNode->colType, code); + ; } if (TSDB_CODE_SUCCESS == code) { code = tjsonGetStringValue(pJson, jkColumnDbName, pNode->dbName); @@ -2175,7 +2204,8 @@ static int32_t jsonToOperatorNode(const SJson* pJson, void* pObj) { int32_t code = jsonToExprNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkOperatorType, pNode->opType, code);; + tjsonGetNumberValue(pJson, jkOperatorType, pNode->opType, code); + ; } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkOperatorLeft, &pNode->pLeft); @@ -2209,7 +2239,8 @@ static int32_t jsonToLogicConditionNode(const SJson* pJson, void* pObj) { int32_t code = jsonToExprNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkLogicCondType, pNode->condType, code);; + tjsonGetNumberValue(pJson, jkLogicCondType, pNode->condType, code); + ; } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkLogicCondParameters, &pNode->pParameterList); @@ -2415,10 +2446,12 @@ static int32_t jsonToOrderByExprNode(const SJson* pJson, void* pObj) { int32_t code = jsonToNodeObject(pJson, jkOrderByExprExpr, &pNode->pExpr); if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkOrderByExprOrder, pNode->order, code);; + tjsonGetNumberValue(pJson, jkOrderByExprOrder, pNode->order, code); + ; } if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkOrderByExprNullOrder, pNode->nullOrder, code);; + tjsonGetNumberValue(pJson, jkOrderByExprNullOrder, pNode->nullOrder, code); + ; } return code; @@ -2525,7 +2558,8 @@ static int32_t jsonToFillNode(const SJson* pJson, void* pObj) { SFillNode* pNode = (SFillNode*)pObj; int32_t code; - tjsonGetNumberValue(pJson, jkFillMode, pNode->mode, code);; + tjsonGetNumberValue(pJson, jkFillMode, pNode->mode, code); + ; if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkFillValues, &pNode->pValues); } @@ -2724,7 +2758,8 @@ static int32_t setOperatorToJson(const void* pObj, SJson* pJson) { static int32_t jsonToSetOperator(const SJson* pJson, void* pObj) { SSetOperator* pNode = (SSetOperator*)pObj; - int32_t code = tjsonGetNumberValue(pJson, jkSetOperatorOpType, pNode->opType); + int32_t code = TSDB_CODE_SUCCESS; + tjsonGetNumberValue(pJson, jkSetOperatorOpType, pNode->opType, code); if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkSetOperatorProjections, &pNode->pProjectionList); } @@ -3122,7 +3157,8 @@ static int32_t jsonToNode(const SJson* pJson, void* pObj) { SNode* pNode = (SNode*)pObj; int32_t code; - tjsonGetNumberValue(pJson, jkNodeType, pNode->type, code);; + tjsonGetNumberValue(pJson, jkNodeType, pNode->type, code); + ; if (TSDB_CODE_SUCCESS == code) { code = tjsonToObject(pJson, nodesNodeName(pNode->type), jsonToSpecificNode, pNode); if (TSDB_CODE_SUCCESS != code) { From 24aba2479146f679091eb51c873586827a9fea31 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 14 May 2022 20:20:13 +0800 Subject: [PATCH 04/11] fix: some problems of parser and planner --- source/client/src/clientImpl.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index f879838d63..090638a4d1 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -172,7 +172,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE, .pTransporter = pTscObj->pAppInfo->pTransporter, .pStmtCb = pStmtCb, - .pUser = pTscObj->user}; + .pUser = pTscObj->user, + .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER))}; cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog); From cc021699a84bf9b109731e9bde2e426f054f738a Mon Sep 17 00:00:00 2001 From: afwerar <1296468573@qq.com> Date: Sat, 14 May 2022 20:34:59 +0800 Subject: [PATCH 05/11] fix(os): make tdb lib to static --- packaging/release.sh | 1 - packaging/tools/install.sh | 6 ------ source/libs/tdb/CMakeLists.txt | 2 +- 3 files changed, 1 insertion(+), 8 deletions(-) diff --git a/packaging/release.sh b/packaging/release.sh index ef3018a913..9230cafa85 100755 --- a/packaging/release.sh +++ b/packaging/release.sh @@ -67,7 +67,6 @@ bin_files="${compile_dir}/build/bin/taosd ${compile_dir}/build/bin/taos ${compi cp -rf ${bin_files} ${install_dir}/bin && chmod a+x ${install_dir}/bin/* || : cp ${compile_dir}/build/lib/libtaos.so ${install_dir}/lib/ -cp ${compile_dir}/build/lib/libtdb.so ${install_dir}/lib/ cp ${compile_dir}/build/lib/libavro* ${install_dir}/lib/ > /dev/null || echo -e "failed to copy avro libraries" cp -rf ${compile_dir}/build/lib/pkgconfig ${install_dir}/lib/ > /dev/null || echo -e "failed to copy pkgconfig directory" diff --git a/packaging/tools/install.sh b/packaging/tools/install.sh index 740d356f80..d2d52af955 100755 --- a/packaging/tools/install.sh +++ b/packaging/tools/install.sh @@ -215,15 +215,9 @@ function install_lib() { ${csudo} ln -s ${install_main_dir}/lib/libtaos.* ${lib_link_dir}/libtaos.so.1 ${csudo} ln -s ${lib_link_dir}/libtaos.so.1 ${lib_link_dir}/libtaos.so - ${csudo} ln -s ${install_main_dir}/lib/libtdb.* ${lib_link_dir}/libtdb.so.1 - ${csudo} ln -s ${lib_link_dir}/libtdb.so.1 ${lib_link_dir}/libtdb.so - if [[ -d ${lib64_link_dir} && ! -e ${lib64_link_dir}/libtaos.so ]]; then ${csudo} ln -s ${install_main_dir}/lib/libtaos.* ${lib64_link_dir}/libtaos.so.1 || : ${csudo} ln -s ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so || : - - ${csudo} ln -s ${install_main_dir}/lib/libtdb.* ${lib64_link_dir}/libtdb.so.1 || : - ${csudo} ln -s ${lib64_link_dir}/libtdb.so.1 ${lib64_link_dir}/libtdb.so || : fi ${csudo} ldconfig diff --git a/source/libs/tdb/CMakeLists.txt b/source/libs/tdb/CMakeLists.txt index 722f6bddef..01490030f2 100644 --- a/source/libs/tdb/CMakeLists.txt +++ b/source/libs/tdb/CMakeLists.txt @@ -1,5 +1,5 @@ # tdb -add_library(tdb SHARED "") +add_library(tdb STATIC "") target_sources(tdb PRIVATE "src/db/tdbPCache.c" From 71c63a9bb39110a5184682f32a06fc85f5b56b9e Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 14 May 2022 20:41:05 +0800 Subject: [PATCH 06/11] fix: some problems of parser and planner --- source/libs/parser/src/parTranslater.c | 2 +- source/libs/parser/test/parSelectTest.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 6f9faaa6f0..6427e13ae9 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -492,7 +492,7 @@ static int32_t parseTimeFromValueNode(SValueNode* pVal) { } else if (TSDB_DATA_TYPE_BOOL == pVal->node.resType.type) { pVal->datum.i = pVal->datum.b; return TSDB_CODE_SUCCESS; - } else if (IS_VAR_DATA_TYPE(pVal->node.resType.type)) { + } else if (IS_VAR_DATA_TYPE(pVal->node.resType.type) || TSDB_DATA_TYPE_TIMESTAMP == pVal->node.resType.type) { if (TSDB_CODE_SUCCESS == taosParseTime(pVal->literal, &pVal->datum.i, pVal->node.resType.bytes, pVal->node.resType.precision, tsDaylight)) { return TSDB_CODE_SUCCESS; diff --git a/source/libs/parser/test/parSelectTest.cpp b/source/libs/parser/test/parSelectTest.cpp index 23b82d54bd..821f480b20 100644 --- a/source/libs/parser/test/parSelectTest.cpp +++ b/source/libs/parser/test/parSelectTest.cpp @@ -187,7 +187,7 @@ TEST_F(ParserSelectTest, semanticError) { run("SELECT c2 FROM t1 tt1, t1 tt2 WHERE tt1.c1 = tt2.c1", TSDB_CODE_PAR_AMBIGUOUS_COLUMN, PARSER_STAGE_TRANSLATE); // TSDB_CODE_PAR_WRONG_VALUE_TYPE - run("SELECT timestamp '2010' FROM t1", TSDB_CODE_PAR_WRONG_VALUE_TYPE, PARSER_STAGE_TRANSLATE); + run("SELECT timestamp '2010a' FROM t1", TSDB_CODE_PAR_WRONG_VALUE_TYPE, PARSER_STAGE_TRANSLATE); // TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION run("SELECT c2 FROM t1 tt1 join t1 tt2 on COUNT(*) > 0", TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION, From 70171a66e018cea12d1f9f46a3b059113d91e6bb Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 14 May 2022 20:52:46 +0800 Subject: [PATCH 07/11] enh(index): fix sanitizer error --- source/libs/executor/src/indexoperator.c | 4 + .../executor/test/index_executor_tests.cpp | 4 +- source/libs/index/inc/indexComm.h | 2 + source/libs/index/inc/indexInt.h | 9 +- source/libs/index/src/index.c | 92 +++++++++++--- source/libs/index/src/indexCache.c | 3 + source/libs/index/src/indexComm.c | 114 +++++++++++++++++- source/libs/index/src/indexFstUtil.c | 5 +- source/libs/index/src/indexTfile.c | 13 +- source/libs/index/test/jsonUT.cc | 20 +++ 10 files changed, 233 insertions(+), 33 deletions(-) diff --git a/source/libs/executor/src/indexoperator.c b/source/libs/executor/src/indexoperator.c index c17fcacf1f..2c204e9356 100644 --- a/source/libs/executor/src/indexoperator.c +++ b/source/libs/executor/src/indexoperator.c @@ -398,6 +398,10 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) { output->status = SFLT_ACCURATE_INDEX; } + if (ctx->noExec) { + SIF_RET(code); + } + return operFn(¶ms[0], nParam > 1 ? ¶ms[1] : NULL, output); _return: taosMemoryFree(params); diff --git a/source/libs/executor/test/index_executor_tests.cpp b/source/libs/executor/test/index_executor_tests.cpp index 5b03da034e..2449bd1da1 100644 --- a/source/libs/executor/test/index_executor_tests.cpp +++ b/source/libs/executor/test/index_executor_tests.cpp @@ -249,7 +249,7 @@ TEST(testCase, index_filter_varify) { sifMakeOpNode(&opNode, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_DOUBLE, pLeft, pRight); SIdxFltStatus st = idxGetFltStatus(opNode); - EXPECT_EQ(st, SFLT_COARSE_INDEX); + EXPECT_EQ(st, SFLT_ACCURATE_INDEX); nodesDestroyNode(res); } { @@ -269,7 +269,7 @@ TEST(testCase, index_filter_varify) { sifMakeOpNode(&opNode, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_DOUBLE, pLeft, pRight); SIdxFltStatus st = idxGetFltStatus(opNode); - EXPECT_EQ(st, SFLT_COARSE_INDEX); + EXPECT_EQ(st, SFLT_ACCURATE_INDEX); nodesDestroyNode(res); } } diff --git a/source/libs/index/inc/indexComm.h b/source/libs/index/inc/indexComm.h index 4cab71f92c..043404f48f 100644 --- a/source/libs/index/inc/indexComm.h +++ b/source/libs/index/inc/indexComm.h @@ -37,6 +37,8 @@ TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b); _cache_range_compare indexGetCompare(RangeType ty); +int32_t indexConvertData(void* src, int8_t type, void** dst); + #ifdef __cplusplus } #endif diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 7b7050d80e..27c380beaf 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -46,9 +46,7 @@ typedef struct SIndexStat { } SIndexStat; struct SIndex { -#ifdef USE_LUCENE - index_t* index; -#endif + int64_t refId; void* cache; void* tindex; SHashObj* colObj; // < field name, field id> @@ -124,6 +122,11 @@ typedef struct TFileCacheKey { int indexFlushCacheToTFile(SIndex* sIdx, void*); +int64_t indexAddRef(void* p); +int32_t indexRemoveRef(int64_t ref); +void indexAcquireRef(int64_t ref); +void indexReleaseRef(int64_t ref); + int32_t indexSerialCacheKey(ICacheKey* key, char* buf); // int32_t indexSerialKey(ICacheKey* key, char* buf); // int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index d56413f840..46f2f7a93b 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -19,7 +19,10 @@ #include "indexInt.h" #include "indexTfile.h" #include "indexUtil.h" +#include "tcoding.h" +#include "tdataformat.h" #include "tdef.h" +#include "tref.h" #include "tsched.h" #ifdef USE_LUCENE @@ -27,36 +30,40 @@ #endif #define INDEX_NUM_OF_THREADS 4 -#define INDEX_QUEUE_SIZE 200 +#define INDEX_QUEUE_SIZE 200 -void* indexQhandle = NULL; - -#define INDEX_DATA_BOOL_NULL 0x02 -#define INDEX_DATA_TINYINT_NULL 0x80 -#define INDEX_DATA_SMALLINT_NULL 0x8000 -#define INDEX_DATA_INT_NULL 0x80000000L -#define INDEX_DATA_BIGINT_NULL 0x8000000000000000L +#define INDEX_DATA_BOOL_NULL 0x02 +#define INDEX_DATA_TINYINT_NULL 0x80 +#define INDEX_DATA_SMALLINT_NULL 0x8000 +#define INDEX_DATA_INT_NULL 0x80000000L +#define INDEX_DATA_BIGINT_NULL 0x8000000000000000L #define INDEX_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL -#define INDEX_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN -#define INDEX_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN -#define INDEX_DATA_NCHAR_NULL 0xFFFFFFFF -#define INDEX_DATA_BINARY_NULL 0xFF -#define INDEX_DATA_JSON_NULL 0xFFFFFFFF -#define INDEX_DATA_JSON_null 0xFFFFFFFE +#define INDEX_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN +#define INDEX_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN +#define INDEX_DATA_NCHAR_NULL 0xFFFFFFFF +#define INDEX_DATA_BINARY_NULL 0xFF +#define INDEX_DATA_JSON_NULL 0xFFFFFFFF +#define INDEX_DATA_JSON_null 0xFFFFFFFE #define INDEX_DATA_JSON_NOT_NULL 0x01 -#define INDEX_DATA_UTINYINT_NULL 0xFF +#define INDEX_DATA_UTINYINT_NULL 0xFF #define INDEX_DATA_USMALLINT_NULL 0xFFFF -#define INDEX_DATA_UINT_NULL 0xFFFFFFFF -#define INDEX_DATA_UBIGINT_NULL 0xFFFFFFFFFFFFFFFFL +#define INDEX_DATA_UINT_NULL 0xFFFFFFFF +#define INDEX_DATA_UBIGINT_NULL 0xFFFFFFFFFFFFFFFFL -#define INDEX_DATA_NULL_STR "NULL" +#define INDEX_DATA_NULL_STR "NULL" #define INDEX_DATA_NULL_STR_L "null" +void* indexQhandle = NULL; +int32_t indexRefMgt; + +static void indexDestroy(void* sIdx); + void indexInit() { // refactor later indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index"); + indexRefMgt = taosOpenRef(10, indexDestroy); } void indexCleanUp() { // refacto later @@ -100,7 +107,12 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { sIdx->cVersion = 1; sIdx->path = tstrdup(path); taosThreadMutexInit(&sIdx->mtx, NULL); + + sIdx->refId = indexAddRef(sIdx); + taosAcquireRef(indexRefMgt, sIdx->refId); + *index = sIdx; + return 0; END: @@ -112,8 +124,9 @@ END: return -1; } -void indexClose(SIndex* sIdx) { - void* iter = taosHashIterate(sIdx->colObj, NULL); +void indexDestroy(void* handle) { + SIndex* sIdx = handle; + void* iter = taosHashIterate(sIdx->colObj, NULL); while (iter) { IndexCache** pCache = iter; if (*pCache) { @@ -128,6 +141,27 @@ void indexClose(SIndex* sIdx) { taosMemoryFree(sIdx); return; } +void indexClose(SIndex* sIdx) { + indexReleaseRef(sIdx->refId); + indexRemoveRef(sIdx->refId); +} +int64_t indexAddRef(void* p) { + // impl + return taosAddRef(indexRefMgt, p); +} +int32_t indexRemoveRef(int64_t ref) { + // impl later + return taosRemoveRef(indexRefMgt, ref); +} + +void indexAcquireRef(int64_t ref) { + // impl + taosAcquireRef(indexRefMgt, ref); +} +void indexReleaseRef(int64_t ref) { + // impl + taosReleaseRef(indexRefMgt, ref); +} int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { // TODO(yihao): reduce the lock range @@ -222,6 +256,7 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colTy tm->operType = oper; tm->colType = colType; +#if 0 tm->colName = (char*)taosMemoryCalloc(1, nColName + 1); memcpy(tm->colName, colName, nColName); tm->nColName = nColName; @@ -229,6 +264,22 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colTy tm->colVal = (char*)taosMemoryCalloc(1, nColVal + 1); memcpy(tm->colVal, colVal, nColVal); tm->nColVal = nColVal; +#endif + +#if 1 + + tm->colName = (char*)taosMemoryCalloc(1, nColName + 1); + memcpy(tm->colName, colName, nColName); + tm->nColName = nColName; + + char* buf = NULL; + int32_t len = indexConvertData((void*)colVal, INDEX_TYPE_GET_TYPE(colType), (void**)&buf); + assert(len != -1); + + tm->colVal = buf; + tm->nColVal = len; + +#endif return tm; } @@ -457,6 +508,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { } else { indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000); } + indexReleaseRef(sIdx->refId); return ret; } void iterateValueDestroy(IterateValue* value, bool destroy) { diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index 5294ac8c19..d4231619ec 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -460,8 +460,11 @@ int indexCacheSchedToMerge(IndexCache* pCache) { schedMsg.fp = doMergeWork; schedMsg.ahandle = pCache; schedMsg.thandle = NULL; + // schedMsg.thandle = taosMemoryCalloc(1, sizeof(int64_t)); + // memcpy((char*)(schedMsg.thandle), (char*)&(pCache->index->refId), sizeof(int64_t)); schedMsg.msg = NULL; + indexAcquireRef(pCache->index->refId); taosScheduleTask(indexQhandle, &schedMsg); return 0; diff --git a/source/libs/index/src/indexComm.c b/source/libs/index/src/indexComm.c index 9e85a6680a..ac26ed1fab 100644 --- a/source/libs/index/src/indexComm.c +++ b/source/libs/index/src/indexComm.c @@ -16,25 +16,33 @@ #include "indexComm.h" #include "index.h" #include "indexInt.h" +#include "tcoding.h" #include "tcompare.h" +#include "tdataformat.h" char JSON_COLUMN[] = "JSON"; char JSON_VALUE_DELIM = '&'; +static __compar_fn_t indexGetCompar(int8_t type) { + if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { + return (__compar_fn_t)strcmp; + } + return getComparFunc(type, 0); +} static TExeCond tCompareLessThan(void* a, void* b, int8_t type) { - __compar_fn_t func = getComparFunc(type, 0); + __compar_fn_t func = indexGetCompar(type); return tDoCommpare(func, QUERY_LESS_THAN, a, b); } static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) { - __compar_fn_t func = getComparFunc(type, 0); + __compar_fn_t func = indexGetCompar(type); return tDoCommpare(func, QUERY_LESS_EQUAL, a, b); } static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) { - __compar_fn_t func = getComparFunc(type, 0); + __compar_fn_t func = indexGetCompar(type); return tDoCommpare(func, QUERY_GREATER_THAN, a, b); } static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { - __compar_fn_t func = getComparFunc(type, 0); + __compar_fn_t func = indexGetCompar(type); return tDoCommpare(func, QUERY_GREATER_EQUAL, a, b); } @@ -120,3 +128,101 @@ char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip) { return buf; } + +int32_t indexConvertData(void* src, int8_t type, void** dst) { + int tlen = -1; + switch (type) { + case TSDB_DATA_TYPE_TIMESTAMP: + tlen = taosEncodeFixedI64(NULL, *(int64_t*)src); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeFixedI64(dst, *(int64_t*)src); + break; + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_UTINYINT: + tlen = taosEncodeFixedU8(NULL, *(uint8_t*)src); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeFixedU8(dst, *(uint8_t*)src); + break; + case TSDB_DATA_TYPE_TINYINT: + tlen = taosEncodeFixedI8(NULL, *(uint8_t*)src); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeFixedI8(dst, *(uint8_t*)src); + break; + case TSDB_DATA_TYPE_SMALLINT: + tlen = taosEncodeFixedI16(NULL, *(int16_t*)src); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeFixedI16(dst, *(int16_t*)src); + break; + case TSDB_DATA_TYPE_USMALLINT: + tlen = taosEncodeFixedU16(NULL, *(uint16_t*)src); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeFixedU16(dst, *(uint16_t*)src); + break; + case TSDB_DATA_TYPE_INT: + tlen = taosEncodeFixedI32(NULL, *(int32_t*)src); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeFixedI32(dst, *(int32_t*)src); + break; + case TSDB_DATA_TYPE_FLOAT: + tlen = taosEncodeBinary(NULL, src, sizeof(float)); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeBinary(dst, src, sizeof(float)); + break; + case TSDB_DATA_TYPE_UINT: + tlen = taosEncodeFixedU32(NULL, *(uint32_t*)src); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeFixedU32(dst, *(uint32_t*)src); + break; + case TSDB_DATA_TYPE_BIGINT: + tlen = taosEncodeFixedI64(NULL, *(uint32_t*)src); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeFixedI64(dst, *(uint32_t*)src); + break; + case TSDB_DATA_TYPE_DOUBLE: + tlen = taosEncodeBinary(NULL, src, sizeof(double)); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeBinary(dst, src, sizeof(double)); + break; + case TSDB_DATA_TYPE_UBIGINT: + tlen = taosEncodeFixedU64(NULL, *(uint32_t*)src); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeFixedU64(dst, *(uint32_t*)src); + break; + case TSDB_DATA_TYPE_NCHAR: { + tlen = taosEncodeBinary(NULL, varDataVal(src), varDataLen(src)); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeBinary(dst, varDataVal(src), varDataLen(src)); + + break; + } + case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY +#if 1 + tlen = taosEncodeBinary(NULL, src, strlen(src)); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeBinary(dst, src, strlen(src)); + break; +#endif + } + case TSDB_DATA_TYPE_VARBINARY: +#if 1 + tlen = taosEncodeBinary(NULL, src, strlen(src)); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeBinary(dst, src, strlen(src)); + break; +#endif + default: + TASSERT(0); + break; + } + *dst = *dst - tlen; + if (type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR && type != TSDB_DATA_TYPE_VARBINARY && + type == TSDB_DATA_TYPE_VARCHAR) { + uint8_t* p = *dst; + for (int i = 0; i < tlen; i++) { + if (p[i] == 0) { + p[i] = (uint8_t)'0'; + } + } + } + return tlen; +} diff --git a/source/libs/index/src/indexFstUtil.c b/source/libs/index/src/indexFstUtil.c index ec9a6943dc..a980c6b740 100644 --- a/source/libs/index/src/indexFstUtil.c +++ b/source/libs/index/src/indexFstUtil.c @@ -82,7 +82,10 @@ FstSlice fstSliceCreate(uint8_t* data, uint64_t len) { str->ref = 1; str->len = len; str->data = taosMemoryMalloc(len * sizeof(uint8_t)); - memcpy(str->data, data, len); + + if (data != NULL) { + memcpy(str->data, data, len); + } FstSlice s = {.str = str, .start = 0, .end = len - 1}; return s; diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index 4cc2a4975f..b787da117d 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -469,13 +469,19 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { FstSlice* s = &rt->data; - char* ch = (char*)fstSliceData(s, NULL); - if (0 != strncmp(ch, p, skip)) { + int32_t sz = 0; + char* ch = (char*)fstSliceData(s, &sz); + char* tmp = taosMemoryCalloc(1, sz + 1); + memcpy(tmp, ch, sz); + + if (0 != strncmp(tmp, p, skip)) { swsResultDestroy(rt); + taosMemoryFree(tmp); break; } - TExeCond cond = cmpFn(ch + skip, tem->colVal, tem->colType); + TExeCond cond = cmpFn(tmp + skip, tem->colVal, INDEX_TYPE_GET_TYPE(tem->colType)); + if (MATCH == cond) { tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total); } else if (CONTINUE == cond) { @@ -483,6 +489,7 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR swsResultDestroy(rt); break; } + taosMemoryFree(tmp); swsResultDestroy(rt); } streamWithStateDestroy(st); diff --git a/source/libs/index/test/jsonUT.cc b/source/libs/index/test/jsonUT.cc index 08d58da07f..3de7cb66f2 100644 --- a/source/libs/index/test/jsonUT.cc +++ b/source/libs/index/test/jsonUT.cc @@ -17,12 +17,32 @@ #include "tutil.h" static std::string dir = "/tmp/json"; +static std::string logDir = "/tmp/log"; + +static void initLog() { + const char* defaultLogFileNamePrefix = "taoslog"; + const int32_t maxLogFileNum = 10; + + tsAsyncLog = 0; + sDebugFlag = 143; + strcpy(tsLogDir, logDir.c_str()); + taosRemoveDir(tsLogDir); + taosMkDir(tsLogDir); + + if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) { + printf("failed to open log file in directory:%s\n", tsLogDir); + } +} class JsonEnv : public ::testing::Test { protected: virtual void SetUp() { + taosRemoveDir(logDir.c_str()); + taosMkDir(logDir.c_str()); taosRemoveDir(dir.c_str()); taosMkDir(dir.c_str()); printf("set up\n"); + + initLog(); opts = indexOptsCreate(); int ret = tIndexJsonOpen(opts, dir.c_str(), &index); assert(ret == 0); From d6478273393a47021518d0f1a51fd197a7228cab Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 14 May 2022 21:14:58 +0800 Subject: [PATCH 08/11] refactor:add escape for schemaless --- source/client/src/clientSml.c | 7 +-- source/client/test/smlTest.cpp | 91 ++++++++++++++++++---------------- 2 files changed, 51 insertions(+), 47 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index f388afec1d..2a3c7f9cdd 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -70,6 +70,7 @@ for (int i = 1; i < keyLen; ++i) { \ #define BINARY_ADD_LEN 2 // "binary" 2 means " " #define NCHAR_ADD_LEN 3 // L"nchar" 3 means L" " +#define CHAR_SAVE_LENGTH 8 //================================================================================================= typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType; @@ -259,7 +260,7 @@ static int32_t smlBuildColumnDescription(SSmlKv* field, char* buf, int32_t bufSi char tname[TSDB_TABLE_NAME_LEN] = {0}; memcpy(tname, field->key, field->keyLen); if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { - int32_t bytes = field->length; // todo + int32_t bytes = field->length > CHAR_SAVE_LENGTH ? (2*field->length) : CHAR_SAVE_LENGTH; int out = snprintf(buf, bufSize,"`%s` %s(%d)", tname, tDataTypes[field->type].name, bytes); *outBytes = out; @@ -459,7 +460,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp); size_t superTableLen = 0; - void *superTable = taosHashGetKey(tableMetaSml, &superTableLen); // todo escape + void *superTable = taosHashGetKey(tableMetaSml, &superTableLen); SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}}; strcpy(pName.dbname, info->pRequest->pDb); memcpy(pName.tname, superTable, superTableLen); @@ -1305,7 +1306,7 @@ static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *col } for(size_t i = 0; i < taosArrayGetSize(cols); i++){ SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i); - taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); // todo key need escape, like \=, because find by schema name later + taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); } taosArrayPush(oneTable->cols, &kvHash); diff --git a/source/client/test/smlTest.cpp b/source/client/test/smlTest.cpp index 3565815b6c..dabf2c0209 100644 --- a/source/client/test/smlTest.cpp +++ b/source/client/test/smlTest.cpp @@ -83,15 +83,15 @@ TEST(testCase, smlParseInfluxString_Test) { ASSERT_EQ(ret, 0); ASSERT_EQ(elements.measure, sql); ASSERT_EQ(elements.measureLen, strlen("st")); - ASSERT_EQ(elements.measureTagsLen, strlen("st")); + ASSERT_EQ(elements.measureTagsLen, strlen("st,")); - ASSERT_EQ(elements.tags, sql + elements.measureLen + 1); + ASSERT_EQ(elements.tags, sql + elements.measureTagsLen); ASSERT_EQ(elements.tagsLen, 0); - ASSERT_EQ(elements.cols, sql + elements.measureTagsLen + 2); + ASSERT_EQ(elements.cols, sql + elements.measureTagsLen + 1); ASSERT_EQ(elements.colsLen, strlen("c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64")); - ASSERT_EQ(elements.timestamp, sql + elements.measureTagsLen + 2 + elements.colsLen + 1); + ASSERT_EQ(elements.timestamp, sql + elements.measureTagsLen + 1 + elements.colsLen + 1); ASSERT_EQ(elements.timestampLen, strlen("1626006833639000000")); // case 5 tag is null @@ -100,11 +100,10 @@ TEST(testCase, smlParseInfluxString_Test) { memset(&elements, 0, sizeof(SSmlLineInfo)); ret = smlParseInfluxString(sql, &elements, &msgBuf); ASSERT_EQ(ret, 0); - ASSERT_EQ(elements.measure, sql+1); + ASSERT_EQ(elements.measure, sql + 1); ASSERT_EQ(elements.measureLen, strlen("st")); ASSERT_EQ(elements.measureTagsLen, strlen("st")); - ASSERT_EQ(elements.tags, sql + 1 + elements.measureLen); ASSERT_EQ(elements.tagsLen, 0); ASSERT_EQ(elements.cols, sql + 1 + elements.measureTagsLen + 3); @@ -125,7 +124,7 @@ TEST(testCase, smlParseInfluxString_Test) { memcpy(sql, tmp, strlen(tmp) + 1); memset(&elements, 0, sizeof(SSmlLineInfo)); ret = smlParseInfluxString(sql, &elements, &msgBuf); - ASSERT_NE(ret, 0); + ASSERT_EQ(ret, 0); // case 8 false tmp = ", st , "; @@ -238,7 +237,7 @@ TEST(testCase, smlParseCols_tag_Test) { ASSERT_EQ(strncasecmp(kv->key, "cbin", 4), 0); ASSERT_EQ(kv->keyLen, 4); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR); - ASSERT_EQ(kv->length, 17); + ASSERT_EQ(kv->length, 15); ASSERT_EQ(strncasecmp(kv->value, "\"passit", 7), 0); taosMemoryFree(kv); @@ -300,9 +299,9 @@ TEST(testCase, smlParseCols_Test) { // binary SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, 0); ASSERT_EQ(strncasecmp(kv->key, "cb=in", 5), 0); - ASSERT_EQ(kv->keyLen, 4); + ASSERT_EQ(kv->keyLen, 5); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_BINARY); - ASSERT_EQ(kv->length, 16); + ASSERT_EQ(kv->length, 17); ASSERT_EQ(strncasecmp(kv->value, "pass,it ", 8), 0); taosMemoryFree(kv); @@ -311,7 +310,7 @@ TEST(testCase, smlParseCols_Test) { ASSERT_EQ(strncasecmp(kv->key, "cnch", 4), 0); ASSERT_EQ(kv->keyLen, 4); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR); - ASSERT_EQ(kv->length, 7); + ASSERT_EQ(kv->length, 8); ASSERT_EQ(strncasecmp(kv->value, "ii=sd", 5), 0); taosMemoryFree(kv); @@ -494,7 +493,7 @@ TEST(testCase, smlProcess_influx_Test) { SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); ASSERT_NE(info, nullptr); - const char *sql[11] = { + const char *sql[] = { "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0 1451606400000000000", "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451607400000000000", "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,heading=221,grade=0,fuel_consumption=25 1451608400000000000", @@ -507,18 +506,20 @@ TEST(testCase, smlProcess_influx_Test) { "stable,t1=t1,t2=t2,t3=t3 c1=1,c2=2,c3=3,c4=4 1451629500000000000", "stable,t2=t2,t1=t1,t3=t3 c1=1,c3=3,c4=4 1451629600000000000", }; - smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0])); + int ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0])); + ASSERT_EQ(ret, 0); TAOS_RES *res = taos_query(taos, "select * from t_6885c584b98481584ee13dac399e173d"); ASSERT_NE(res, nullptr); - int fieldNum = taos_field_count(res); - ASSERT_EQ(fieldNum, 5); - int rowNum = taos_affected_rows(res); - ASSERT_EQ(rowNum, 2); - for (int i = 0; i < rowNum; ++i) { - TAOS_ROW rows = taos_fetch_row(res); - } +// int fieldNum = taos_field_count(res); +// ASSERT_EQ(fieldNum, 5); +// int rowNum = taos_affected_rows(res); +// ASSERT_EQ(rowNum, 2); +// for (int i = 0; i < rowNum; ++i) { +// TAOS_ROW rows = taos_fetch_row(res); +// } taos_free_result(res); + destroyRequest(request); smlDestroyInfo(info); } @@ -539,7 +540,7 @@ TEST(testCase, smlParseLine_error_Test) { SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); ASSERT_NE(info, nullptr); - const char *sql[2] = { + const char *sql[] = { "measure,t1=3 c1=8", "measure,t2=3 c1=8u8" }; @@ -588,7 +589,7 @@ TEST(testCase, smlProcess_telnet_Test) { SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); ASSERT_NE(info, nullptr); - const char *sql[4] = { + const char *sql[] = { "sys.if.bytes.out 1479496100 1.3E0 host=web01 interface=eth0", "sys.if.bytes.out 1479496101 1.3E1 interface=eth0 host=web01 ", "sys.if.bytes.out 1479496102 1.3E3 network=tcp", @@ -599,25 +600,26 @@ TEST(testCase, smlProcess_telnet_Test) { TAOS_RES *res = taos_query(taos, "select * from t_8c30283b3c4131a071d1e16cf6d7094a"); ASSERT_NE(res, nullptr); - int fieldNum = taos_field_count(res); - ASSERT_EQ(fieldNum, 2); - int rowNum = taos_affected_rows(res); - ASSERT_EQ(rowNum, 1); - for (int i = 0; i < rowNum; ++i) { - TAOS_ROW rows = taos_fetch_row(res); - } - taos_free_result(pRes); +// int fieldNum = taos_field_count(res); +// ASSERT_EQ(fieldNum, 2); +// int rowNum = taos_affected_rows(res); +// ASSERT_EQ(rowNum, 1); +// for (int i = 0; i < rowNum; ++i) { +// TAOS_ROW rows = taos_fetch_row(res); +// } + taos_free_result(res); - res = taos_query(taos, "select * from t_6931529054e5637ca92c78a1ad441961"); - ASSERT_NE(res, nullptr); - fieldNum = taos_field_count(res); - ASSERT_EQ(fieldNum, 2); - rowNum = taos_affected_rows(res); - ASSERT_EQ(rowNum, 2); - for (int i = 0; i < rowNum; ++i) { - TAOS_ROW rows = taos_fetch_row(res); - } - taos_free_result(pRes); +// res = taos_query(taos, "select * from t_6931529054e5637ca92c78a1ad441961"); +// ASSERT_NE(res, nullptr); +// fieldNum = taos_field_count(res); +// ASSERT_EQ(fieldNum, 2); +// rowNum = taos_affected_rows(res); +// ASSERT_EQ(rowNum, 2); +// for (int i = 0; i < rowNum; ++i) { +// TAOS_ROW rows = taos_fetch_row(res); +// } +// taos_free_result(res); + destroyRequest(request); smlDestroyInfo(info); } @@ -670,7 +672,8 @@ TEST(testCase, smlProcess_json1_Test) { // for (int i = 0; i < rowNum; ++i) { // TAOS_ROW rows = taos_fetch_row(res); // } - taos_free_result(pRes); + taos_free_result(res); + destroyRequest(request); smlDestroyInfo(info); } @@ -714,7 +717,7 @@ TEST(testCase, smlProcess_json2_Test) { "}"; int32_t ret = smlProcess(info, (char **)(&sql), -1); ASSERT_EQ(ret, 0); - taos_free_result(pRes); + destroyRequest(request); smlDestroyInfo(info); } @@ -786,7 +789,7 @@ TEST(testCase, smlProcess_json3_Test) { "}"; int32_t ret = smlProcess(info, (char **)(&sql), -1); ASSERT_EQ(ret, 0); - taos_free_result(pRes); + destroyRequest(request); smlDestroyInfo(info); } @@ -848,7 +851,7 @@ TEST(testCase, smlProcess_json4_Test) { "}"; int32_t ret = smlProcess(info, (char**)(&sql), -1); ASSERT_EQ(ret, 0); - taos_free_result(pRes); + destroyRequest(request); smlDestroyInfo(info); } From b6677e1a5d9ba4cce15033e781bb6e148b8aed93 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 14 May 2022 22:13:27 +0800 Subject: [PATCH 09/11] fix(query): remove expired moving data operation during extract data from in-memory buffer. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 12 ------------ source/libs/executor/src/executorimpl.c | 10 ---------- 2 files changed, 22 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index b293f1399d..927babc26c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2769,20 +2769,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int } while (moveToNextRowInMem(pCheckInfo)); taosMemoryFreeClear(pSchema); // free the STSChema - assert(numOfRows <= maxRowsToRead); - // if the buffer is not full in case of descending order query, move the data in the front of the buffer - if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && numOfRows < maxRowsToRead) { - int32_t emptySize = maxRowsToRead - numOfRows; - - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i); - memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, - numOfRows * pColInfo->info.bytes); - } - } - int64_t elapsedTime = taosGetTimestampUs() - st; tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64 " us, numOfRows:%d, numOfCols:%d, %s", pTsdbReadHandle, elapsedTime, numOfRows, numOfCols, pTsdbReadHandle->idStr); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index cb8bee1e6a..2cbfb9b344 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2138,16 +2138,6 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SArray* pColMatchIn blockDataUpdateTsWindow(pBlock); } -static int32_t colIdSearchCompar(const void* p1, const void* p2) { - int32_t colId = *(int32_t*)p1; - SColMatchInfo* pInfo = (SColMatchInfo*)p2; - if (colId == pInfo->targetSlotId) { - return 0; - } - - return (colId < pInfo->colId) ? -1 : 1; -} - void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) { if (keep) { return; From 98a2d311b7fcf5068046f04d0df27eee38dba1bd Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 14 May 2022 22:15:55 +0800 Subject: [PATCH 10/11] refactor:fix memory leak --- source/client/src/clientSml.c | 14 ++++++++++---- source/client/test/smlTest.cpp | 15 +++++++++++---- source/libs/parser/src/parInsert.c | 3 +++ source/libs/parser/src/parInsertData.c | 6 +++--- 4 files changed, 27 insertions(+), 11 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 2a3c7f9cdd..8ed4308ea7 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -885,7 +885,7 @@ static int32_t smlParseInfluxString(const char* sql, SSmlLineInfo *elements, SSm // parse measure while (*sql != '\0') { - if(IS_SLASH_LETTER(sql)){ + if((sql != elements->measure) && IS_SLASH_LETTER(sql)){ MOVE_FORWARD_ONE(sql,strlen(sql) + 1); continue; } @@ -973,6 +973,8 @@ static int32_t smlParseTelnetTags(const char* data, SArray *cols, SHashObj *dump const char *sql = data; while(*sql != '\0'){ JUMP_SPACE(sql) + if(*sql == '\0') break; + const char *key = sql; int32_t keyLen = 0; @@ -1005,6 +1007,8 @@ static int32_t smlParseTelnetTags(const char* data, SArray *cols, SHashObj *dump while(*sql != '\0') { // parse value if (*sql == SPACE) { + valueLen = sql - value; + sql++; break; } if (*sql == EQUAL) { @@ -1013,14 +1017,15 @@ static int32_t smlParseTelnetTags(const char* data, SArray *cols, SHashObj *dump } sql++; } - valueLen = sql - value; - sql++; - JUMP_SPACE(sql) + if(valueLen == 0){ + valueLen = sql - value; + } if(valueLen == 0){ smlBuildInvalidDataMsg(msg, "invalid value", value); return TSDB_CODE_SML_INVALID_DATA; } + // add kv to SSmlKv SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); if(!kv) return TSDB_CODE_OUT_OF_MEMORY; @@ -1354,6 +1359,7 @@ static void smlDestroySTableMeta(SSmlSTableMeta *meta){ taosArrayDestroy(meta->tags); taosArrayDestroy(meta->cols); taosMemoryFree(meta->tableMeta); + taosMemoryFree(meta); } static void smlDestroyCols(SArray *cols) { diff --git a/source/client/test/smlTest.cpp b/source/client/test/smlTest.cpp index dabf2c0209..589c3d3a10 100644 --- a/source/client/test/smlTest.cpp +++ b/source/client/test/smlTest.cpp @@ -206,10 +206,16 @@ TEST(testCase, smlParseCols_Error_Test) { int32_t len = strlen(data[i]); char *sql = (char*)taosMemoryCalloc(256, 1); memcpy(sql, data[i], len + 1); - int32_t ret = smlParseCols(sql, len, NULL, false, dumplicateKey, &msgBuf); + SArray *cols = taosArrayInit(8, POINTER_BYTES); + int32_t ret = smlParseCols(sql, len, cols, false, dumplicateKey, &msgBuf); ASSERT_NE(ret, TSDB_CODE_SUCCESS); taosHashClear(dumplicateKey); taosMemoryFree(sql); + for(int j = 0; j < taosArrayGetSize(cols); j++){ + void *kv = taosArrayGetP(cols, j); + taosMemoryFree(kv); + } + taosArrayDestroy(cols); } taosHashCleanup(dumplicateKey); } @@ -239,7 +245,6 @@ TEST(testCase, smlParseCols_tag_Test) { ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR); ASSERT_EQ(kv->length, 15); ASSERT_EQ(strncasecmp(kv->value, "\"passit", 7), 0); - taosMemoryFree(kv); // nchar kv = (SSmlKv *)taosArrayGetP(cols, 3); @@ -248,11 +253,13 @@ TEST(testCase, smlParseCols_tag_Test) { ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR); ASSERT_EQ(kv->length, 7); ASSERT_EQ(strncasecmp(kv->value, "4.31f64", 7), 0); - taosMemoryFree(kv); + for(int i = 0; i < size; i++){ + void *tmp = taosArrayGetP(cols, i); + taosMemoryFree(tmp); + } taosArrayClear(cols); - // test tag is null data = "t=3e"; len = 0; diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 27383c0a51..15e4b8ef36 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1826,6 +1826,9 @@ void smlDestroyHandle(void* pHandle) { if (!pHandle) return; SSmlExecHandle* handle = (SSmlExecHandle*)pHandle; destroyBlockHashmap(handle->pBlockHash); + tdDestroyKVRowBuilder(&handle->tagsBuilder); + destroyBoundColumnInfo(&handle->tags); + destroyCreateSubTbReq(&handle->createTblReq); taosMemoryFree(handle); } diff --git a/source/libs/parser/src/parInsertData.c b/source/libs/parser/src/parInsertData.c index 677dbca0e9..8b649350b7 100644 --- a/source/libs/parser/src/parInsertData.c +++ b/source/libs/parser/src/parInsertData.c @@ -237,9 +237,9 @@ static void destroyDataBlock(STableDataBlocks* pDataBlock) { taosMemoryFreeClear(pDataBlock->pData); if (!pDataBlock->cloned) { // free the refcount for metermeta -// if (pDataBlock->pTableMeta != NULL) { -// taosMemoryFreeClear(pDataBlock->pTableMeta); -// } + if (pDataBlock->pTableMeta != NULL) { + taosMemoryFreeClear(pDataBlock->pTableMeta); + } destroyBoundColumnInfo(&pDataBlock->boundColumnInfo); } From 911cd1fe6694f74d843d2b943ba0531c3cae3a1b Mon Sep 17 00:00:00 2001 From: slzhou Date: Sat, 14 May 2022 22:51:13 +0800 Subject: [PATCH 11/11] fix: reuse existing udf handles and teardown the handle later --- include/libs/function/function.h | 2 +- include/libs/function/tudf.h | 32 +++++----- source/libs/function/inc/udfc.h | 2 +- source/libs/function/src/tudf.c | 99 +++++++++++++++++++++++------- source/libs/function/test/runUdf.c | 6 +- source/libs/scalar/src/scalar.c | 14 +---- 6 files changed, 99 insertions(+), 56 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 8d0b93dde2..616aec8c02 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -309,7 +309,7 @@ void qAddUdfInfo(uint64_t id, struct SUdfInfo* pUdfInfo); void qRemoveUdfInfo(uint64_t id, struct SUdfInfo* pUdfInfo); /** - * create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf + * create udfd proxy, called once in process that call doSetupUdf/callUdfxxx/doTeardownUdf * @return error code */ int32_t udfcOpen(); diff --git a/include/libs/function/tudf.h b/include/libs/function/tudf.h index b5c38e14f4..b37dcd2b61 100644 --- a/include/libs/function/tudf.h +++ b/include/libs/function/tudf.h @@ -39,16 +39,6 @@ extern "C" { //====================================================================================== //begin API to taosd and qworker -typedef void *UdfcFuncHandle; - -/** - * setup udf - * @param udf, in - * @param handle, out - * @return error code - */ -int32_t setupUdf(char udfName[], UdfcFuncHandle *handle); - typedef struct SUdfColumnMeta { int16_t type; int32_t bytes; @@ -95,32 +85,42 @@ typedef struct SUdfInterBuf { char* buf; int8_t numOfResult; //zero or one } SUdfInterBuf; +typedef void *UdfcFuncHandle; +/** + * setup udf + * @param udf, in + * @param funcHandle, out + * @return error code + */ +int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle); // output: interBuf -int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf); +int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf); // input: block, state // output: newState -int32_t callUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState); +int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState); // input: interBuf // output: resultData -int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData); +int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData); // input: interbuf1, interbuf2 // output: resultBuf -int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf); +int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf); // input: block // output: resultData -int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output); +int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output); /** * tearn down udf * @param handle * @return */ -int32_t teardownUdf(UdfcFuncHandle handle); +int32_t doTeardownUdf(UdfcFuncHandle handle); bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); int32_t udfAggProcess(struct SqlFunctionCtx *pCtx); int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock); + +int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output); // end API to taosd and qworker //============================================================================================================================= // begin API to UDF writer. diff --git a/source/libs/function/inc/udfc.h b/source/libs/function/inc/udfc.h index a693e476e8..f414c2b29e 100644 --- a/source/libs/function/inc/udfc.h +++ b/source/libs/function/inc/udfc.h @@ -43,7 +43,7 @@ int32_t setupUdf(SUdfInfo* udf, UdfcFuncHandle* handle); int32_t callUdf(UdfcFuncHandle handle, int8_t step, char *state, int32_t stateSize, SSDataBlock input, char **newstate, int32_t *newStateSize, SSDataBlock *output); -int32_t teardownUdf(UdfcFuncHandle handle); +int32_t doTeardownUdf(UdfcFuncHandle handle); typedef struct SUdfSetupRequest { char udfName[16]; // diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index d9e3ff0a5b..f2a0b4ec2c 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -310,6 +310,11 @@ enum { }; int64_t gUdfTaskSeqNum = 0; +typedef struct SUdfcFuncStub { + char udfName[TSDB_FUNC_NAME_LEN]; + UdfcFuncHandle handle; +} SUdfcFuncStub; + typedef struct SUdfcProxy { char udfdPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2]; uv_barrier_t initBarrier; @@ -325,6 +330,9 @@ typedef struct SUdfcProxy { QUEUE taskQueue; QUEUE uvProcTaskQueue; + uv_mutex_t udfStubsMutex; + SArray* udfStubs; // SUdfcFuncStub + int8_t initialized; } SUdfcProxy; @@ -1222,6 +1230,8 @@ int32_t udfcOpen() { atomic_store_8(&proxy->udfcState, UDFC_STATE_READY); proxy->udfcState = UDFC_STATE_READY; uv_barrier_wait(&proxy->initBarrier); + uv_mutex_init(&proxy->udfStubsMutex); + proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub)); fnInfo("udfc initialized") return 0; } @@ -1238,6 +1248,8 @@ int32_t udfcClose() { uv_thread_join(&udfc->loopThread); uv_mutex_destroy(&udfc->taskQueueMutex); uv_barrier_destroy(&udfc->initBarrier); + taosArrayDestroy(udfc->udfStubs); + uv_mutex_destroy(&udfc->udfStubsMutex); udfc->udfcState = UDFC_STATE_INITAL; fnInfo("udfc cleaned up"); return 0; @@ -1259,7 +1271,7 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) { return task->errCode; } -int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) { +int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { fnInfo("udfc setup udf. udfName: %s", udfName); if (gUdfdProxy.udfcState != UDFC_STATE_READY) { return TSDB_CODE_UDF_INVALID_STATE; @@ -1287,7 +1299,7 @@ int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) { task->session->outputLen = rsp->outputLen; task->session->bufSize = rsp->bufSize; if (task->errCode != 0) { - fnError("failed to setup udf. err: %d", task->errCode) + fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode) } else { fnInfo("sucessfully setup udf func handle. handle: %p", task->session); *funcHandle = task->session; @@ -1373,7 +1385,7 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf return err; } -int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) { +int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) { int8_t callType = TSDB_UDF_CALL_AGG_INIT; int32_t err = callUdf(handle, callType, NULL, NULL, NULL, NULL, interBuf); @@ -1383,7 +1395,7 @@ int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) { // input: block, state // output: interbuf, -int32_t callUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) { +int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) { int8_t callType = TSDB_UDF_CALL_AGG_PROC; int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState); return err; @@ -1391,7 +1403,7 @@ int32_t callUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBu // input: interbuf1, interbuf2 // output: resultBuf -int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf) { +int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf) { int8_t callType = TSDB_UDF_CALL_AGG_MERGE; int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf); return err; @@ -1399,13 +1411,13 @@ int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInte // input: interBuf // output: resultData -int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) { +int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) { int8_t callType = TSDB_UDF_CALL_AGG_FIN; int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData); return err; } -int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam* output) { +int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam* output) { int8_t callType = TSDB_UDF_CALL_SCALA_PROC; SSDataBlock inputBlock = {0}; convertScalarParamToDataBlock(input, numOfCols, &inputBlock); @@ -1417,7 +1429,50 @@ int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t nu return err; } -int32_t teardownUdf(UdfcFuncHandle handle) { +int compareUdfcFuncSub(const void* elem1, const void* elem2) { + SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1; + SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2; + return strcmp(stub1->udfName, stub2->udfName); +} + +int32_t setupUdf(char* udfName, UdfcFuncHandle* pHandle) { + int32_t code = 0; + uv_mutex_lock(&gUdfdProxy.udfStubsMutex); + SUdfcFuncStub key = {0}; + strcpy(key.udfName, udfName); + SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); + if (foundStub != NULL) { + uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); + *pHandle = foundStub->handle; + return 0; + } + *pHandle = NULL; + code = doSetupUdf(udfName, pHandle); + if (code == TSDB_CODE_SUCCESS) { + SUdfcFuncStub stub = {0}; + strcpy(stub.udfName, udfName); + stub.handle = *pHandle; + taosArrayPush(gUdfdProxy.udfStubs, &stub); + taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub); + } else { + *pHandle = NULL; + } + + uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); + return code; +} + +int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) { + UdfcFuncHandle handle = NULL; + int32_t code = setupUdf(udfName, &handle); + if (code != 0) { + return code; + } + code = doCallUdfScalarFunc(handle, input, numOfCols, output); + return code; +} + +int32_t doTeardownUdf(UdfcFuncHandle handle) { fnInfo("tear down udf. udf func handle: %p", handle); SClientUdfUvSession *session = (SClientUdfUvSession *) handle; @@ -1471,8 +1526,8 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult } UdfcFuncHandle handle; int32_t udfCode = 0; - if ((udfCode = setupUdf((char*)pCtx->udfName, &handle)) != 0) { - fnError("udfAggInit error. step setupUdf. udf code: %d", udfCode); + if ((udfCode = setupUdf((char *)pCtx->udfName, &handle)) != 0) { + fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode); return false; } SClientUdfUvSession *session = (SClientUdfUvSession *)handle; @@ -1485,8 +1540,8 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult udfRes->session = (SClientUdfUvSession *)handle; SUdfInterBuf buf = {0}; - if ((udfCode = callUdfAggInit(handle, &buf)) != 0) { - fnError("udfAggInit error. step callUdfAggInit. udf code: %d", udfCode); + if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) { + fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode); return false; } udfRes->interResNum = buf.numOfResult; @@ -1533,7 +1588,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { .numOfResult = udfRes->interResNum}; SUdfInterBuf newState = {0}; - int32_t udfCode = callUdfAggProcess(session, inputBlock, &state, &newState); + int32_t udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState); if (udfCode != 0) { fnError("udfAggProcess error. code: %d", udfCode); newState.numOfResult = 0; @@ -1567,9 +1622,9 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { .bufLen = session->bufSize, .numOfResult = udfRes->interResNum}; int32_t udfCallCode= 0; - udfCallCode= callUdfAggFinalize(session, &state, &resultBuf); - if (udfCallCode!= 0) { - fnError("udfAggFinalize error. callUdfAggFinalize step. udf code:%d", udfCallCode); + udfCallCode= doCallUdfAggFinalize(session, &state, &resultBuf); + if (udfCallCode != 0) { + fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode); GET_RES_INFO(pCtx)->numOfRes = 0; } else { memcpy(udfRes->finalResBuf, resultBuf.buf, session->outputLen); @@ -1577,11 +1632,11 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum; } - int32_t code = teardownUdf(session); - if (code != 0) { - fnError("udfAggFinalize error. teardownUdf step. udf code: %d", code); - } - - return functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf); +// int32_t code = doTeardownUdf(session); +// if (code != 0) { +// fnError("udfAggFinalize error. doTeardownUdf step. udf code: %d", code); +// } + int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf); + return udfCallCode == 0 ? numOfResults : udfCallCode; } \ No newline at end of file diff --git a/source/libs/function/test/runUdf.c b/source/libs/function/test/runUdf.c index a8d6fbd715..d7c539e5c2 100644 --- a/source/libs/function/test/runUdf.c +++ b/source/libs/function/test/runUdf.c @@ -47,7 +47,7 @@ int main(int argc, char *argv[]) { UdfcFuncHandle handle; - setupUdf("udf1", &handle); + doSetupUdf("udf1", &handle); SSDataBlock block = {0}; SSDataBlock *pBlock = █ @@ -73,12 +73,12 @@ int main(int argc, char *argv[]) { input.numOfRows = pBlock->info.rows; input.columnData = taosArrayGet(pBlock->pDataBlock, 0); SScalarParam output = {0}; - callUdfScalarFunc(handle, &input, 1, &output); + doCallUdfScalarFunc(handle, &input, 1, &output); SColumnInfoData *col = output.columnData; for (int32_t i = 0; i < output.numOfRows; ++i) { fprintf(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t))); } - teardownUdf(handle); + doTeardownUdf(handle); udfcClose(); } diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 6c8326b09f..7e3dbaf7d0 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -362,19 +362,7 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, ¶mNum, &rowNum)); if (fmIsUserDefinedFunc(node->funcId)) { - UdfcFuncHandle udfHandle = NULL; - - code = setupUdf(node->functionName, &udfHandle); - if (code != 0) { - sclError("fmExecFunction error. setupUdf. function name: %s, code:%d", node->functionName, code); - goto _return; - } - code = callUdfScalarFunc(udfHandle, params, paramNum, output); - if (code != 0) { - sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code); - goto _return; - } - code = teardownUdf(udfHandle); + code = callUdfScalarFunc(node->functionName, params, paramNum, output); if (code != 0) { sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code); goto _return;