From d418abd07f8523cdd0dfd6e73ab195646dc7a8da Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 7 May 2022 16:24:30 +0800 Subject: [PATCH 1/8] refactor:modify schemaless function to speed --- include/libs/parser/parser.h | 2 +- source/client/src/clientSml.c | 157 +++++++++++++++-------------- source/libs/parser/src/parInsert.c | 39 +++---- 3 files changed, 100 insertions(+), 98 deletions(-) diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 7c9602734b..998d45aee1 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -105,7 +105,7 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* void* smlInitHandle(SQuery *pQuery); void smlDestroyHandle(void *pHandle); -int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SHashObj *colsHash, SArray *cols, bool format, STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen); +int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *colsSchema, SArray *cols, bool format, STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen); int32_t smlBuildOutput(void* handle, SHashObj* pVgHash); #ifdef __cplusplus diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 5f9138bb43..fa54c98c47 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -37,8 +37,8 @@ typedef enum { typedef struct { char sTableName[TSDB_TABLE_NAME_LEN]; - SHashObj *tags; - SHashObj *fields; + SArray *tags; + SArray *fields; } SCreateSTableActionInfo; typedef struct { @@ -78,14 +78,17 @@ typedef struct { // colsFormat store cols formated, for quick parse, if info->formatData is true SArray *colsFormat; // elements are SArray - // cols & colsColumn store cols un formated + // cols store cols un formated SArray *cols; // elements are SHashObj for find by key quickly - SHashObj *columnsHash; // elements are , just for judge if key exists quickly. } SSmlTableInfo; typedef struct { - SHashObj *tagHash; + SArray *tags; // save the origin order to create table + SHashObj *tagHash; // elements are + + SArray *cols; SHashObj *fieldHash; + STableMeta *tableMeta; } SSmlSTableMeta; @@ -350,37 +353,26 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { int n = sprintf(result, "create stable %s (", action->createSTable.sTableName); char* pos = result + n; int freeBytes = capacity - n; - size_t size = taosHashGetSize(action->createSTable.fields); - SArray *cols = taosArrayInit(size, POINTER_BYTES); - SSmlKv **kv = taosHashIterate(action->createSTable.fields, NULL); - while(kv){ - if(strncmp((*kv)->key, TS, strlen(TS)) == 0 && (*kv)->type == TSDB_DATA_TYPE_TIMESTAMP){ - taosArrayInsert(cols, 0, kv); - }else{ - taosArrayPush(cols, kv); - } - kv = taosHashIterate(action->createSTable.fields, kv); - } + SArray *cols = action->createSTable.tags; for(int i = 0; i < taosArrayGetSize(cols); i++){ - SSmlKv *kvNew = taosArrayGetP(cols, i); - smlBuildColumnDescription(kvNew, pos, freeBytes, &outBytes); + SSmlKv *kv = taosArrayGetP(cols, i); + smlBuildColumnDescription(kv, pos, freeBytes, &outBytes); pos += outBytes; freeBytes -= outBytes; *pos = ','; ++pos; --freeBytes; } - taosArrayDestroy(cols); --pos; ++freeBytes; outBytes = snprintf(pos, freeBytes, ") tags ("); pos += outBytes; freeBytes -= outBytes; - kv = taosHashIterate(action->createSTable.tags, NULL); - while(kv){ - smlBuildColumnDescription(*kv, pos, freeBytes, &outBytes); + cols = action->createSTable.tags; + for(int i = 0; i < taosArrayGetSize(cols); i++){ + SSmlKv *kv = taosArrayGetP(cols, i); + smlBuildColumnDescription(kv, pos, freeBytes, &outBytes); pos += outBytes; freeBytes -= outBytes; *pos = ','; ++pos; --freeBytes; - kv = taosHashIterate(action->createSTable.tags, kv); } pos--; ++freeBytes; outBytes = snprintf(pos, freeBytes, ")"); @@ -419,7 +411,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { SSmlSTableMeta** tableMetaSml = taosHashIterate(info->superTables, NULL); while (tableMetaSml) { - SSmlSTableMeta* cTablePoints = *tableMetaSml; + SSmlSTableMeta* sTableData = *tableMetaSml; STableMeta *pTableMeta = NULL; SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp); @@ -436,8 +428,8 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { SSchemaAction schemaAction = {0}; schemaAction.action = SCHEMA_ACTION_CREATE_STABLE; memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen); - schemaAction.createSTable.tags = cTablePoints->tagHash; - schemaAction.createSTable.fields = cTablePoints->fieldHash; + schemaAction.createSTable.tags = sTableData->tags; + schemaAction.createSTable.fields = sTableData->cols; code = smlApplySchemaAction(info, &schemaAction); if (code != 0) { uError("SML:0x%"PRIx64" smlApplySchemaAction failed. can not create %s", info->id, schemaAction.createSTable.sTableName); @@ -454,7 +446,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code)); return code; } - cTablePoints->tableMeta = pTableMeta; + sTableData->tableMeta = pTableMeta; tableMetaSml = taosHashIterate(info->superTables, tableMetaSml); } @@ -1295,14 +1287,19 @@ static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols, SSmlKv *kv = taosArrayGetP(tags, i); ASSERT(kv->type == TSDB_DATA_TYPE_NCHAR); - SSmlKv **value = taosHashGet(tableMeta->tagHash, kv->key, kv->keyLen); - if(value){ + uint8_t *index = taosHashGet(tableMeta->tagHash, kv->key, kv->keyLen); + if(index){ + SSmlKv **value = taosArrayGet(tableMeta->tags, *index); ASSERT((*value)->type == TSDB_DATA_TYPE_NCHAR); if(kv->valueLen > (*value)->valueLen){ // tags type is nchar *value = kv; } }else{ - taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); + size_t tmp = taosArrayGetSize(tableMeta->tags); + ASSERT(tmp <= UINT8_MAX); + uint8_t size = tmp; + taosArrayPush(tableMeta->tags, &kv); + taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &size, CHAR_BYTES); } } } @@ -1310,8 +1307,10 @@ static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols, if(cols){ for (int i = 1; i < taosArrayGetSize(cols); ++i) { //jump timestamp SSmlKv *kv = taosArrayGetP(cols, i); - SSmlKv **value = taosHashGet(tableMeta->fieldHash, kv->key, kv->keyLen); - if(value){ + + int16_t *index = taosHashGet(tableMeta->fieldHash, kv->key, kv->keyLen); + if(index){ + SSmlKv **value = taosArrayGet(tableMeta->cols, *index); if(kv->type != (*value)->type){ smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key); return false; @@ -1323,7 +1322,11 @@ static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols, } } }else{ - taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); + size_t tmp = taosArrayGetSize(tableMeta->cols); + ASSERT(tmp <= INT16_MAX); + int16_t size = tmp; + taosArrayPush(tableMeta->cols, &kv); + taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &size, SHORT_BYTES); } } } @@ -1332,16 +1335,18 @@ static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols, static void smlInsertMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){ if(tags){ - for (int i = 0; i < taosArrayGetSize(tags); ++i) { + for (uint8_t i = 0; i < taosArrayGetSize(tags); ++i) { SSmlKv *kv = taosArrayGetP(tags, i); - taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); + taosArrayPush(tableMeta->tags, &kv); + taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &i, CHAR_BYTES); } } if(cols){ - for (int i = 0; i < taosArrayGetSize(cols); ++i) { + for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) { SSmlKv *kv = taosArrayGetP(cols, i); - taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); + taosArrayPush(tableMeta->cols, &kv); + taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &i, SHORT_BYTES); } } } @@ -1364,12 +1369,6 @@ static SSmlTableInfo* smlBuildTableInfo(bool format){ uError("SML:smlParseLine failed to allocate memory"); goto cleanup; } - - tag->columnsHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - if (tag->columnsHash == NULL) { - uError("SML:smlParseLine failed to allocate memory"); - goto cleanup; - } } tag->tags = taosArrayInit(16, POINTER_BYTES); @@ -1399,7 +1398,6 @@ static void smlDestroyBuildTableInfo(SSmlTableInfo *tag, bool format){ } taosHashCleanup(kvHash); } - taosHashCleanup(tag->columnsHash); } taosArrayDestroy(tag->tags); taosMemoryFreeClear(tag); @@ -1408,23 +1406,20 @@ static void smlDestroyBuildTableInfo(SSmlTableInfo *tag, bool format){ static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *cols){ if(dataFormat){ taosArrayPush(oneTable->colsFormat, &cols); - }else{ - SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - if(!kvHash){ - uError("SML:smlDealCols failed to allocate memory"); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - for(size_t i = 0; i < taosArrayGetSize(cols); i++){ - SSmlKv *kv = taosArrayGetP(cols, i); - taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); // todo key need escape, like \=, because find by schema name later - - if(taosHashGet(oneTable->columnsHash, kv->key, kv->keyLen) != NULL){ - continue; - } - taosHashPut(oneTable->columnsHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); - } - taosArrayPush(oneTable->cols, &kvHash); + return TSDB_CODE_SUCCESS; } + + SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + if(!kvHash){ + uError("SML:smlDealCols failed to allocate memory"); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + for(size_t i = 0; i < taosArrayGetSize(cols); i++){ + SSmlKv *kv = taosArrayGetP(cols, i); + taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); // todo key need escape, like \=, because find by schema name later + } + taosArrayPush(oneTable->cols, &kvHash); + return TSDB_CODE_SUCCESS; } @@ -1444,6 +1439,18 @@ static SSmlSTableMeta* smlBuildSTableMeta(){ uError("SML:smlBuildSTableMeta failed to allocate memory"); goto cleanup; } + + meta->tags = taosArrayInit(32, POINTER_BYTES); + if (meta->tags == NULL) { + uError("SML:smlBuildSTableMeta failed to allocate memory"); + goto cleanup; + } + + meta->cols = taosArrayInit(32, POINTER_BYTES); + if (meta->cols == NULL) { + uError("SML:smlBuildSTableMeta failed to allocate memory"); + goto cleanup; + } return meta; cleanup: @@ -1454,6 +1461,8 @@ cleanup: static void smlDestroySTableMeta(SSmlSTableMeta *meta){ taosHashCleanup(meta->tagHash); taosHashCleanup(meta->fieldHash); + taosArrayDestroy(meta->tags); + taosArrayDestroy(meta->cols); taosMemoryFree(meta->tableMeta); } @@ -1500,45 +1509,45 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) { return ret; } }else{ - SSmlTableInfo *tag = smlBuildTableInfo(info->dataFormat); - if(!tag){ + SSmlTableInfo *tinfo = smlBuildTableInfo(info->dataFormat); + if(!tinfo){ return TSDB_CODE_TSC_OUT_OF_MEMORY; } - ret = smlDealCols(tag, info->dataFormat, cols); + ret = smlDealCols(tinfo, info->dataFormat, cols); if(ret != TSDB_CODE_SUCCESS){ return ret; } - ret = smlParseCols(elements.tags, elements.tagsLen, tag->tags, true, &info->msgBuf); + ret = smlParseCols(elements.tags, elements.tagsLen, tinfo->tags, true, &info->msgBuf); if(ret != TSDB_CODE_SUCCESS){ uError("SML:0x%"PRIx64" smlParseCols parse tag fields failed", info->id); return ret; } - if(taosArrayGetSize(tag->tags) > TSDB_MAX_TAGS){ + if(taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS){ smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL); return TSDB_CODE_SML_INVALID_DATA; } - tag->sTableName = elements.measure; - tag->sTableNameLen = elements.measureLen; - smlBuildChildTableName(tag); - uDebug("SML:0x%"PRIx64" child table name: %s", info->id, tag->childTableName); + tinfo->sTableName = elements.measure; + tinfo->sTableNameLen = elements.measureLen; + smlBuildChildTableName(tinfo); + uDebug("SML:0x%"PRIx64" child table name: %s", info->id, tinfo->childTableName); SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen); if(tableMeta){ // update meta - ret = smlUpdateMeta(*tableMeta, tag->tags, cols, &info->msgBuf); + ret = smlUpdateMeta(*tableMeta, tinfo->tags, cols, &info->msgBuf); if(!ret){ uError("SML:0x%"PRIx64" smlUpdateMeta failed", info->id); return TSDB_CODE_SML_INVALID_DATA; } }else{ SSmlSTableMeta *meta = smlBuildSTableMeta(); - smlInsertMeta(meta, tag->tags, cols); + smlInsertMeta(meta, tinfo->tags, cols); taosHashPut(info->superTables, elements.measure, elements.measureLen, &meta, POINTER_BYTES); } - taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tag, POINTER_BYTES); + taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tinfo, POINTER_BYTES); } return TSDB_CODE_SUCCESS; } @@ -1651,7 +1660,7 @@ static int32_t smlInsertData(SSmlHandle* info) { (*pMeta)->tableMeta->vgId = vg.vgId; (*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid - code = smlBindData(info->exec, tableData->tags, tableData->colsFormat, tableData->columnsHash, + code = smlBindData(info->exec, tableData->tags, tableData->colsFormat, (*pMeta)->cols, tableData->cols, info->dataFormat, (*pMeta)->tableMeta, tableData->childTableName, info->msgBuf.buf, info->msgBuf.len); if(code != TSDB_CODE_SUCCESS){ return code; @@ -1730,7 +1739,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr return NULL; } - SSmlHandle* info = smlBuildSmlInfo(taos, request, protocol, precision, false); + SSmlHandle* info = smlBuildSmlInfo(taos, request, protocol, precision, true); if(!info){ return (TAOS_RES*)request; } diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 05d247c037..d80eb995eb 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1642,7 +1642,7 @@ static int32_t smlBoundTags(SArray *cols, SKVRowBuilder *tagsBuilder, SParsedDat return TSDB_CODE_SUCCESS; } -int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SHashObj *colsHash, SArray *cols, bool format, +int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *colsSchema, SArray *cols, bool format, STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen) { SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; @@ -1673,21 +1673,7 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SHashObj *co SSchema* pSchema = getTableColumnSchema(pTableMeta); - - if(format){ - ret = smlBoundColumns(taosArrayGetP(colsFormat, 0), &pDataBlock->boundColumnInfo, pSchema); - }else{ - SArray *columns = taosArrayInit(16, POINTER_BYTES); - void **p1 = taosHashIterate(colsHash, NULL); - while (p1) { - SSmlKv* kv = *p1; - taosArrayPush(columns, &kv); - p1 = taosHashIterate(colsHash, p1); - } - ret = smlBoundColumns(columns, &pDataBlock->boundColumnInfo, pSchema); - taosArrayDestroy(columns); - } - + ret = smlBoundColumns(colsSchema, &pDataBlock->boundColumnInfo, pSchema); if(ret != TSDB_CODE_SUCCESS){ buildInvalidOperationMsg(&pBuf, "bound cols error"); return ret; @@ -1712,8 +1698,10 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SHashObj *co STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header tdSRowResetBuf(pBuilder, row); void *rowData = NULL; + bool eleEqual = false; if(format){ rowData = taosArrayGetP(colsFormat, r); + eleEqual = (taosArrayGetSize(rowData) == spd->numOfBound); }else{ rowData = taosArrayGetP(cols, r); } @@ -1728,17 +1716,22 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SHashObj *co SSmlKv *kv = NULL; if(format){ kv = taosArrayGetP(rowData, c); - if (!kv){ - char msg[64] = {0}; - sprintf(msg, "cols num not the same like before:%d", r); - return buildInvalidOperationMsg(&pBuf, msg); - } + do{ + if (!eleEqual && kv && (kv->keyLen != strlen(pColSchema->name) || strncmp(kv->key, pColSchema->name, kv->keyLen) != 0)){ + MemRowAppend(&pBuf, NULL, 0, ¶m); + c++; + if(c >= spd->numOfBound) break; + pColSchema = &pSchema[spd->boundColumns[c] - 1]; + continue; + } + break; + }while(1); }else{ void **p =taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name)); - kv = *p; + if(p) kv = *p; } - if (kv->length == 0) { + if (!kv || kv->length == 0) { MemRowAppend(&pBuf, NULL, 0, ¶m); } else { int32_t colLen = pColSchema->bytes; From ca3b779a7213a5ce95bf71126ed1ef477971f273 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 7 May 2022 18:06:26 +0800 Subject: [PATCH 2/8] refactor:modify schemaless function to speed --- source/client/src/clientSml.c | 52 +++++++++++++++++++++++------- source/client/test/smlTest.cpp | 14 +++++--- source/libs/parser/src/parInsert.c | 2 ++ 3 files changed, 53 insertions(+), 15 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index fa54c98c47..7a697bfad8 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -116,6 +116,8 @@ typedef struct { int32_t affectedRows; SSmlMsgBuf msgBuf; + SHashObj *dumplicateKey; // for dumplicate key + SArray *colsContainer; // for cols parse, if is dataFormat == false } SSmlHandle; //================================================================================================= @@ -177,8 +179,9 @@ static void smlBuildChildTableName(SSmlTableInfo *tags) { tMD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len); tMD5Final(&context); uint64_t digest1 = *(uint64_t*)(context.digest); - uint64_t digest2 = *(uint64_t*)(context.digest + 8); - snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2); + //uint64_t digest2 = *(uint64_t*)(context.digest + 8); + //snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2); + snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64, digest1); taosStringBuilderDestroy(&sb); tags->uid = digest1; } @@ -353,7 +356,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { int n = sprintf(result, "create stable %s (", action->createSTable.sTableName); char* pos = result + n; int freeBytes = capacity - n; - SArray *cols = action->createSTable.tags; + SArray *cols = action->createSTable.fields; for(int i = 0; i < taosArrayGetSize(cols); i++){ SSmlKv *kv = taosArrayGetP(cols, i); @@ -1026,7 +1029,7 @@ static int32_t smlParseString(const char* sql, SSmlLineInfo *elements, SSmlMsgBu return TSDB_CODE_SUCCESS; } -static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool isTag, SSmlMsgBuf *msg){ +static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool isTag, SHashObj *dumplicateKey, SSmlMsgBuf *msg){ if(isTag && len == 0){ SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1); kv->key = TAG; @@ -1054,6 +1057,13 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is return TSDB_CODE_SML_INVALID_DATA; } + if(taosHashGet(dumplicateKey, key, keyLen)){ + smlBuildInvalidDataMsg(msg, "dumplicate key", key); + return TSDB_CODE_SML_INVALID_DATA; + }else{ + taosHashPut(dumplicateKey, key, keyLen, key, CHAR_BYTES); + } + // parse value i++; const char *value = data + i; @@ -1474,10 +1484,15 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) { return ret; } - SArray *cols = taosArrayInit(16, POINTER_BYTES); - if (cols == NULL) { - uError("SML:0x%"PRIx64" smlParseLine failed to allocate memory", info->id); - return TSDB_CODE_TSC_OUT_OF_MEMORY; + SArray *cols = NULL; + if(info->dataFormat){ // if dataFormat, cols need new memory to save data + cols = taosArrayInit(16, POINTER_BYTES); + if (cols == NULL) { + uError("SML:0x%"PRIx64" smlParseLine failed to allocate memory", info->id); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + }else{ // if dataFormat is false, cols do not need to save data, there is another new memory to save data + cols = info->colsContainer; } ret = smlParseTS(info, elements.timestamp, elements.timestampLen, cols); @@ -1485,7 +1500,7 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) { uError("SML:0x%"PRIx64" smlParseTS failed", info->id); return ret; } - ret = smlParseCols(elements.cols, elements.colsLen, cols, false, &info->msgBuf); + ret = smlParseCols(elements.cols, elements.colsLen, cols, false, info->dumplicateKey, &info->msgBuf); if(ret != TSDB_CODE_SUCCESS){ uError("SML:0x%"PRIx64" smlParseCols parse cloums fields failed", info->id); return ret; @@ -1518,7 +1533,7 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) { return ret; } - ret = smlParseCols(elements.tags, elements.tagsLen, tinfo->tags, true, &info->msgBuf); + ret = smlParseCols(elements.tags, elements.tagsLen, tinfo->tags, true, info->dumplicateKey, &info->msgBuf); if(ret != TSDB_CODE_SUCCESS){ uError("SML:0x%"PRIx64" smlParseCols parse tag fields failed", info->id); return ret; @@ -1549,6 +1564,11 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) { taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tinfo, POINTER_BYTES); } + + if(!info->dataFormat){ + taosArrayClear(info->colsContainer); + } + taosHashClear(info->dumplicateKey); return TSDB_CODE_SUCCESS; } @@ -1577,6 +1597,7 @@ static void smlDestroyInfo(SSmlHandle* info){ // destroy info->pVgHash taosHashCleanup(info->pVgHash); + taosHashCleanup(info->dumplicateKey); taosMemoryFreeClear(info); } @@ -1623,8 +1644,17 @@ static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocol info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + info->dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + if(!dataFormat){ + info->colsContainer = taosArrayInit(32, POINTER_BYTES); + if(NULL == info->colsContainer){ + uError("SML:0x%"PRIx64" create info failed", info->id); + goto cleanup; + } + } if(NULL == info->exec || NULL == info->childTables - || NULL == info->superTables || NULL == info->pVgHash){ + || NULL == info->superTables || NULL == info->pVgHash + || NULL == info->dumplicateKey){ uError("SML:0x%"PRIx64" create info failed", info->id); goto cleanup; } diff --git a/source/client/test/smlTest.cpp b/source/client/test/smlTest.cpp index 5f0f188b0b..11462b1c43 100644 --- a/source/client/test/smlTest.cpp +++ b/source/client/test/smlTest.cpp @@ -468,12 +468,18 @@ TEST(testCase, smlParseLine_Test) { SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); ASSERT_NE(info, NULL); - const char *sql[3] = { - "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451606400000000000", + const char *sql[9] = { + "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0 1451606400000000000", + "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451607400000000000", + "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,heading=221,grade=0,fuel_consumption=25 1451608400000000000", + "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451609400000000000", + "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451619400000000000", "readings,name=truck_1,fleet=South,driver=Albert,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=72.45258,longitude=68.83761,elevation=255,velocity=0,heading=181,grade=0,fuel_consumption=25 1451606400000000000", - "readings,name=truck_2,fleet=North,driver=Derek,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451606400000000000" + "readings,name=truck_2,driver=Derek,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451606400000000000", + "readings,name=truck_2,fleet=North,driver=Derek,model=F-150 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451609400000000000", + "readings,fleet=South,name=truck_0,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451629400000000000" }; - smlInsertLines(info, sql, 3); + smlInsertLines(info, sql, 9); // for (int i = 0; i < 3; i++) { // smlParseLine(info, sql[i]); // } diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index d80eb995eb..0b0dc7226b 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1737,6 +1737,8 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *cols int32_t colLen = pColSchema->bytes; if (IS_VAR_DATA_TYPE(pColSchema->type)) { colLen = kv->length; + } else if(pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP){ + kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision); } MemRowAppend(&pBuf, &(kv->value), colLen, ¶m); From 65c77f572a429ebd035ba0ad87eb95d3c4353198 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 7 May 2022 20:39:07 +0800 Subject: [PATCH 3/8] refactor:modify schemaless function to speed --- source/client/test/smlTest.cpp | 47 +++++++++++++++++++++++++++--- source/libs/parser/src/parInsert.c | 21 +++++++------ 2 files changed, 55 insertions(+), 13 deletions(-) diff --git a/source/client/test/smlTest.cpp b/source/client/test/smlTest.cpp index 11462b1c43..6a4823b855 100644 --- a/source/client/test/smlTest.cpp +++ b/source/client/test/smlTest.cpp @@ -190,17 +190,21 @@ TEST(testCase, smlParseCols_Error_Test) { "c=-3.402823466e+39u64", "c=-339u64", "c=18446744073709551616u64", + "c=1,c=2" }; + SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); for(int i = 0; i < sizeof(data)/sizeof(data[0]); i++){ char msg[256] = {0}; SSmlMsgBuf msgBuf; msgBuf.buf = msg; msgBuf.len = 256; int32_t len = strlen(data[i]); - int32_t ret = smlParseCols(data[i], len, NULL, false, &msgBuf); + int32_t ret = smlParseCols(data[i], len, NULL, false, dumplicateKey, &msgBuf); ASSERT_NE(ret, TSDB_CODE_SUCCESS); + taosHashClear(dumplicateKey); } + taosHashCleanup(dumplicateKey); } TEST(testCase, smlParseCols_tag_Test) { @@ -211,11 +215,12 @@ TEST(testCase, smlParseCols_tag_Test) { SArray *cols = taosArrayInit(16, POINTER_BYTES); ASSERT_NE(cols, NULL); + SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); const char *data = "cbin=\"passit hello,c=2\",cnch=L\"iisdfsf\",cbool=false,cf64=4.31f64,cf32_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l\"iuwq\""; int32_t len = strlen(data); - int32_t ret = smlParseCols(data, len, cols, true, &msgBuf); + int32_t ret = smlParseCols(data, len, cols, true, dumplicateKey, &msgBuf); ASSERT_EQ(ret, TSDB_CODE_SUCCESS); int32_t size = taosArrayGetSize(cols); ASSERT_EQ(size, 19); @@ -239,10 +244,14 @@ TEST(testCase, smlParseCols_tag_Test) { taosMemoryFree(kv); taosArrayClear(cols); + + + // test tag is null data = "t=3e"; len = 0; memset(msgBuf.buf, 0, msgBuf.len); - ret = smlParseCols(data, len, cols, true, &msgBuf); + taosHashClear(dumplicateKey); + ret = smlParseCols(data, len, cols, true, dumplicateKey, &msgBuf); ASSERT_EQ(ret, TSDB_CODE_SUCCESS); size = taosArrayGetSize(cols); ASSERT_EQ(size, 1); @@ -255,6 +264,9 @@ TEST(testCase, smlParseCols_tag_Test) { ASSERT_EQ(kv->valueLen, strlen(TAG)); ASSERT_EQ(strncasecmp(kv->value, TAG, strlen(TAG)), 0); taosMemoryFree(kv); + + taosArrayDestroy(cols); + taosHashCleanup(dumplicateKey); } TEST(testCase, smlParseCols_Test) { @@ -266,9 +278,11 @@ TEST(testCase, smlParseCols_Test) { SArray *cols = taosArrayInit(16, POINTER_BYTES); ASSERT_NE(cols, NULL); + SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + const char *data = "cbin=\"passit hello,c=2\",cnch=L\"iisdfsf\",cbool=false,cf64=4.31f64,cf32_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l\"iuwq\""; int32_t len = strlen(data); - int32_t ret = smlParseCols(data, len, cols, false, &msgBuf); + int32_t ret = smlParseCols(data, len, cols, false, dumplicateKey, &msgBuf); ASSERT_EQ(ret, TSDB_CODE_SUCCESS); int32_t size = taosArrayGetSize(cols); ASSERT_EQ(size, 19); @@ -450,6 +464,7 @@ TEST(testCase, smlParseCols_Test) { taosMemoryFree(kv); taosArrayDestroy(cols); + taosHashCleanup(dumplicateKey); } TEST(testCase, smlParseLine_Test) { @@ -485,6 +500,30 @@ TEST(testCase, smlParseLine_Test) { // } } +TEST(testCase, smlParseLine_error_Test) { + TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(taos, NULL); + + TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db"); + taos_free_result(pRes); + + pRes = taos_query(taos, "use sml_db"); + taos_free_result(pRes); + + SRequestObj *request = createRequest(taos, NULL, NULL, TSDB_SQL_INSERT); + ASSERT_NE(request, NULL); + + SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); + ASSERT_NE(info, NULL); + + const char *sql[2] = { + "measure,t1=3 c1=8", + "measure,t2=3 c1=8u8" + }; + int ret = smlInsertLines(info, sql, 2); + ASSERT_NE(ret, 0); +} + // TEST(testCase, smlParseTS_Test) { // char msg[256] = {0}; // SSmlMsgBuf msgBuf; diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 0b0dc7226b..c1ee4f48e9 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1549,7 +1549,7 @@ typedef struct SmlExecHandle { SQuery* pQuery; } SSmlExecHandle; -static int32_t smlBoundColumns(SArray *cols, SParsedDataColInfo* pColList, SSchema* pSchema) { +static int32_t smlBoundColumnData(SArray *cols, SParsedDataColInfo* pColList, SSchema* pSchema) { col_id_t nCols = pColList->numOfCols; pColList->numOfBound = 0; @@ -1620,7 +1620,7 @@ static int32_t smlBoundColumns(SArray *cols, SParsedDataColInfo* pColList, SSche return TSDB_CODE_SUCCESS; } -static int32_t smlBoundTags(SArray *cols, SKVRowBuilder *tagsBuilder, SParsedDataColInfo* tags, SSchema* pSchema, SKVRow *row, SMsgBuf *msg) { +static int32_t smlBuildTagRow(SArray *cols, SKVRowBuilder *tagsBuilder, SParsedDataColInfo* tags, SSchema* pSchema, SKVRow *row, SMsgBuf *msg) { if (tdInitKVRowBuilder(tagsBuilder) < 0) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -1649,13 +1649,13 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *cols SSmlExecHandle *smlHandle = (SSmlExecHandle *)handle; SSchema* pTagsSchema = getTableTagSchema(pTableMeta); setBoundColumnInfo(&smlHandle->tags, pTagsSchema, getNumOfTags(pTableMeta)); - int ret = smlBoundColumns(tags, &smlHandle->tags, pTagsSchema); + int ret = smlBoundColumnData(tags, &smlHandle->tags, pTagsSchema); if(ret != TSDB_CODE_SUCCESS){ buildInvalidOperationMsg(&pBuf, "bound tags error"); return ret; } SKVRow row = NULL; - ret = smlBoundTags(tags, &smlHandle->tagsBuilder, &smlHandle->tags, pTagsSchema, &row, &pBuf); + ret = smlBuildTagRow(tags, &smlHandle->tagsBuilder, &smlHandle->tags, pTagsSchema, &row, &pBuf); if(ret != TSDB_CODE_SUCCESS){ return ret; } @@ -1673,7 +1673,7 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *cols SSchema* pSchema = getTableColumnSchema(pTableMeta); - ret = smlBoundColumns(colsSchema, &pDataBlock->boundColumnInfo, pSchema); + ret = smlBoundColumnData(colsSchema, &pDataBlock->boundColumnInfo, pSchema); if(ret != TSDB_CODE_SUCCESS){ buildInvalidOperationMsg(&pBuf, "bound cols error"); return ret; @@ -1698,10 +1698,10 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *cols STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header tdSRowResetBuf(pBuilder, row); void *rowData = NULL; - bool eleEqual = false; + size_t rowDataSize = 0; if(format){ rowData = taosArrayGetP(colsFormat, r); - eleEqual = (taosArrayGetSize(rowData) == spd->numOfBound); + rowDataSize = taosArrayGetSize(rowData); }else{ rowData = taosArrayGetP(cols, r); } @@ -1715,9 +1715,12 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *cols SSmlKv *kv = NULL; if(format){ - kv = taosArrayGetP(rowData, c); do{ - if (!eleEqual && kv && (kv->keyLen != strlen(pColSchema->name) || strncmp(kv->key, pColSchema->name, kv->keyLen) != 0)){ + if(rowDataSize >= c){ + break; + } + kv = taosArrayGetP(rowData, c); + if (rowDataSize != spd->numOfBound && kv && (kv->keyLen != strlen(pColSchema->name) || strncmp(kv->key, pColSchema->name, kv->keyLen) != 0)){ MemRowAppend(&pBuf, NULL, 0, ¶m); c++; if(c >= spd->numOfBound) break; From 2e25f684a45e46d5e541257ab0aed9ffb19ead61 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 7 May 2022 22:24:24 +0800 Subject: [PATCH 4/8] refactor:fix error data incomplete if dataformat=true --- source/client/src/clientSml.c | 4 ++-- source/libs/parser/src/parInsert.c | 22 ++++++++-------------- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 7a697bfad8..2a0e85092b 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -148,8 +148,8 @@ static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf* pBuf, const char *msg1, const } static int smlCompareKv(const void* p1, const void* p2) { - SSmlKv* kv1 = (SSmlKv*)p1; - SSmlKv* kv2 = (SSmlKv*)p2; + SSmlKv* kv1 = *(SSmlKv**)p1; + SSmlKv* kv2 = *(SSmlKv**)p2; int32_t kvLen1 = kv1->keyLen; int32_t kvLen2 = kv2->keyLen; int32_t res = strncasecmp(kv1->key, kv2->key, TMIN(kvLen1, kvLen2)); diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index c1ee4f48e9..11dfe60015 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1707,7 +1707,7 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *cols } // 1. set the parsed value from sql string - for (int c = 0; c < spd->numOfBound; ++c) { + for (int c = 0, j = 0; c < spd->numOfBound; ++c) { SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1]; param.schema = pColSchema; @@ -1715,20 +1715,14 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *cols SSmlKv *kv = NULL; if(format){ - do{ - if(rowDataSize >= c){ - break; + if(j < rowDataSize){ + kv = taosArrayGetP(rowData, j); + if (rowDataSize != spd->numOfBound && (kv->keyLen != strlen(pColSchema->name) || strncmp(kv->key, pColSchema->name, kv->keyLen) != 0)){ + kv = NULL; + }else{ + j++; } - kv = taosArrayGetP(rowData, c); - if (rowDataSize != spd->numOfBound && kv && (kv->keyLen != strlen(pColSchema->name) || strncmp(kv->key, pColSchema->name, kv->keyLen) != 0)){ - MemRowAppend(&pBuf, NULL, 0, ¶m); - c++; - if(c >= spd->numOfBound) break; - pColSchema = &pSchema[spd->boundColumns[c] - 1]; - continue; - } - break; - }while(1); + } }else{ void **p =taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name)); if(p) kv = *p; From 5b2d99e6f167d013e3eb7f5d732b09fa20becd8d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 7 May 2022 22:38:17 +0800 Subject: [PATCH 5/8] enh(index): support numberic and json filter --- source/libs/index/src/indexCache.c | 67 ++++++++++++++++++++++-------- source/libs/index/test/jsonUT.cc | 34 ++++++++++++++- 2 files changed, 83 insertions(+), 18 deletions(-) diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index b3ae7b7dbe..929f33909e 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -30,6 +30,7 @@ static void indexMemUnRef(MemTable* tbl); static void indexCacheTermDestroy(CacheTerm* ct); static int32_t indexCacheTermCompare(const void* l, const void* r); +static int32_t indexCacheJsonTermCompare(const void* l, const void* r); static char* indexCacheTermGet(const void* pData); static MemTable* indexInternalCacheCreate(int8_t type); @@ -63,6 +64,7 @@ typedef enum { MATCH, CONTINUE, BREAK } TExeCond; typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type); static TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b) { + // optime later int32_t ret = func(a, b); switch (comType) { case QUERY_LESS_THAN: { @@ -242,6 +244,7 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTempResul break; } CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); + if (0 == strcmp(c->colVal, pCt->colVal)) { if (c->operaType == ADD_VALUE) { INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid) @@ -311,6 +314,7 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe } char* key = indexCacheTermGet(pCt); + // SSkipListIterator* iter = tSkipListCreateIter(mem->mem); SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); while (tSkipListIterNext(iter)) { SSkipListNode* node = tSkipListIterGet(iter); @@ -318,6 +322,10 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe break; } CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); + printf("json val: %s\n", c->colVal); + if (0 != strncmp(c->colVal, term->colName, term->nColName)) { + continue; + } TExeCond cond = cmpFn(c->colVal + skip, term->colVal, dType); if (cond == MATCH) { @@ -598,24 +606,11 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result indexMemRef(imm); taosThreadMutexUnlock(&pCache->mtx); - // SIndexTerm* term = query->term; - // EIndexQueryType qtype = query->qType; - - // bool isJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON); - // char* p = term->colVal; - // if (isJson) { - // p = indexPackJsonData(term); - //} - // CacheTerm ct = {.colVal = p, .version = atomic_load_32(&pCache->version)}; - int ret = indexQueryMem(mem, query, result, s); if (ret == 0 && *s != kTypeDeletion) { // continue search in imm ret = indexQueryMem(imm, query, result, s); } - // if (isJson) { - // taosMemoryFreeClear(p); - //} indexMemUnRef(mem); indexMemUnRef(imm); @@ -682,14 +677,52 @@ static int32_t indexCacheTermCompare(const void* l, const void* r) { return cmp; } +static int indexFindCh(char* a, char c) { + char* p = a; + while (*p != 0 && *p++ != c) { + } + return p - a; +} +static int indexCacheJsonTermCompareImpl(char* a, char* b) { + int alen = indexFindCh(a, '&'); + int blen = indexFindCh(b, '&'); + + int cmp = strncmp(a, b, MIN(alen, blen)); + if (cmp == 0) { + cmp = alen - blen; + if (cmp != 0) { + return cmp; + } + cmp = *(a + alen) - *(b + blen); + if (cmp != 0) { + return cmp; + } + alen += 2; + blen += 2; + cmp = strcmp(a + alen, b + blen); + } + return cmp; +} +static int32_t indexCacheJsonTermCompare(const void* l, const void* r) { + CacheTerm* lt = (CacheTerm*)l; + CacheTerm* rt = (CacheTerm*)r; + // compare colVal + int cmp = indexCacheJsonTermCompareImpl(lt->colVal, rt->colVal); + if (cmp == 0) { + return rt->version - lt->version; + } + return cmp; +} static MemTable* indexInternalCacheCreate(int8_t type) { - type = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : type; + int ttype = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : type; + int32_t (*cmpFn)(const void* l, const void* r) = + INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? indexCacheJsonTermCompare : indexCacheTermCompare; MemTable* tbl = taosMemoryCalloc(1, sizeof(MemTable)); indexMemRef(tbl); - if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { - tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, indexCacheTermCompare, SL_ALLOW_DUP_KEY, - indexCacheTermGet); + if (ttype == TSDB_DATA_TYPE_BINARY || ttype == TSDB_DATA_TYPE_NCHAR) { + tbl->mem = + tSkipListCreate(MAX_SKIP_LIST_LEVEL, ttype, MAX_INDEX_KEY_LEN, cmpFn, SL_ALLOW_DUP_KEY, indexCacheTermGet); } return tbl; } diff --git a/source/libs/index/test/jsonUT.cc b/source/libs/index/test/jsonUT.cc index f789d23136..e5692b98f9 100644 --- a/source/libs/index/test/jsonUT.cc +++ b/source/libs/index/test/jsonUT.cc @@ -129,7 +129,7 @@ TEST_F(JsonEnv, testWriteMillonData) { SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term); - for (size_t i = 0; i < 1000000; i++) { + for (size_t i = 0; i < 1000; i++) { tIndexJsonPut(index, terms, i); } indexMultiTermDestroy(terms); @@ -148,4 +148,36 @@ TEST_F(JsonEnv, testWriteMillonData) { assert(100 == taosArrayGetSize(result)); indexMultiTermQueryDestroy(mq); } + { + { + std::string colName("test"); + std::string colVal("ab"); + + SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + + SArray* result = taosArrayInit(1, sizeof(uint64_t)); + indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN); + tIndexJsonSearch(index, mq, result); + assert(0 == taosArrayGetSize(result)); + indexMultiTermQueryDestroy(mq); + } + { + { + std::string colName("test"); + std::string colVal("ab"); + + SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + + SArray* result = taosArrayInit(1, sizeof(uint64_t)); + indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL); + tIndexJsonSearch(index, mq, result); + assert(100 == taosArrayGetSize(result)); + indexMultiTermQueryDestroy(mq); + } + } + } } From 06fb98f0a4f928565385d8504bbf72bdee1ecc65 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 8 May 2022 17:18:11 +0800 Subject: [PATCH 6/8] fix(query): revise the length of var data type. --- source/libs/scalar/src/sclfunc.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 5127ae534a..5fa8eb0c9a 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -709,10 +709,6 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp int16_t outputType = GET_PARAM_TYPE(&pOutput[0]); int64_t outputLen = GET_PARAM_BYTES(&pOutput[0]); - if (IS_VAR_DATA_TYPE(outputType)) { - outputLen += VARSTR_HEADER_SIZE; - } - char *outputBuf = taosMemoryCalloc(outputLen * pInput[0].numOfRows, 1); char *output = outputBuf; From 12beb4d04dd892e003db40f4e7b884e962de96d3 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sun, 8 May 2022 22:20:30 +0800 Subject: [PATCH 7/8] fix: problem of sql command 'kill transaction' --- include/libs/nodes/nodes.h | 1 - source/libs/nodes/src/nodesCodeFuncs.c | 10 ++++------ source/libs/nodes/src/nodesUtilFuncs.c | 1 - source/libs/parser/src/parAstCreater.c | 5 +++-- source/libs/parser/src/parAuthenticator.c | 10 +++++++--- source/libs/parser/src/parTranslater.c | 1 + 6 files changed, 15 insertions(+), 13 deletions(-) diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 34ccea2898..ad8a472d08 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -164,7 +164,6 @@ typedef enum ENodeType { QUERY_NODE_SHOW_TOPICS_STMT, QUERY_NODE_SHOW_CONSUMERS_STMT, QUERY_NODE_SHOW_SUBSCRIBES_STMT, - QUERY_NODE_SHOW_TRANS_STMT, QUERY_NODE_SHOW_SMAS_STMT, QUERY_NODE_SHOW_CONFIGS_STMT, QUERY_NODE_SHOW_CONNECTIONS_STMT, diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 5dcacc4354..507cd79411 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -168,8 +168,6 @@ const char* nodesNodeName(ENodeType type) { return "ShowConsumersStmt"; case QUERY_NODE_SHOW_SUBSCRIBES_STMT: return "ShowSubscribesStmt"; - case QUERY_NODE_SHOW_TRANS_STMT: - return "ShowTransStmt"; case QUERY_NODE_SHOW_SMAS_STMT: return "ShowSmasStmt"; case QUERY_NODE_SHOW_CONFIGS_STMT: @@ -1972,10 +1970,10 @@ static int32_t datumToJson(const void* pObj, SJson* pJson) { code = tjsonAddDoubleToObject(pJson, jkValueDatum, pNode->datum.d); break; case TSDB_DATA_TYPE_NCHAR: { - //cJSON only support utf-8 encoding. Convert memory content to hex string. - char *buf = taosMemoryCalloc(varDataLen(pNode->datum.p) * 2 + 1, sizeof(char)); + // cJSON only support utf-8 encoding. Convert memory content to hex string. + char* buf = taosMemoryCalloc(varDataLen(pNode->datum.p) * 2 + 1, sizeof(char)); code = taosHexEncode(varDataVal(pNode->datum.p), buf, varDataLen(pNode->datum.p)); - if(code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(buf); return TSDB_CODE_TSC_INVALID_VALUE; } @@ -2086,7 +2084,7 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) { } varDataSetLen(pNode->datum.p, pNode->node.resType.bytes); if (TSDB_DATA_TYPE_NCHAR == pNode->node.resType.type) { - char *buf = taosMemoryCalloc(1, pNode->node.resType.bytes * 2 + VARSTR_HEADER_SIZE + 1); + char* buf = taosMemoryCalloc(1, pNode->node.resType.bytes * 2 + VARSTR_HEADER_SIZE + 1); if (NULL == buf) { code = TSDB_CODE_OUT_OF_MEMORY; break; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 4ae502b0d8..eeb069383f 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -190,7 +190,6 @@ SNodeptr nodesMakeNode(ENodeType type) { case QUERY_NODE_SHOW_TOPICS_STMT: case QUERY_NODE_SHOW_CONSUMERS_STMT: case QUERY_NODE_SHOW_SUBSCRIBES_STMT: - case QUERY_NODE_SHOW_TRANS_STMT: case QUERY_NODE_SHOW_SMAS_STMT: case QUERY_NODE_SHOW_CONFIGS_STMT: case QUERY_NODE_SHOW_QUERIES_STMT: diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index e36d0d9a54..e8ac562072 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -1300,9 +1300,10 @@ SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const } SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId) { - SNode* pStmt = nodesMakeNode(type); + SKillStmt* pStmt = nodesMakeNode(type); CHECK_OUT_OF_MEM(pStmt); - return pStmt; + pStmt->targetId = strtol(pId->z, NULL, 10); + return (SNode*)pStmt; } SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2) { diff --git a/source/libs/parser/src/parAuthenticator.c b/source/libs/parser/src/parAuthenticator.c index c343c934ea..8f686cefce 100644 --- a/source/libs/parser/src/parAuthenticator.c +++ b/source/libs/parser/src/parAuthenticator.c @@ -23,12 +23,17 @@ typedef struct SAuthCxt { static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt); -static int32_t checkAuth(SParseContext* pCxt, const char* dbName, AUTH_TYPE type) { +static int32_t checkAuth(SParseContext* pCxt, const char* pDbName, AUTH_TYPE type) { if (pCxt->isSuperUser) { return TSDB_CODE_SUCCESS; } + SName name; + tNameSetDbName(&name, pCxt->acctId, pDbName, strlen(pDbName)); + char dbFname[TSDB_DB_FNAME_LEN] = {0}; + tNameGetFullDbName(&name, dbFname); bool pass = false; - int32_t code = catalogChkAuth(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pCxt->pUser, dbName, type, &pass); + int32_t code = + catalogChkAuth(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pCxt->pUser, dbFname, type, &pass); return TSDB_CODE_SUCCESS == code ? (pass ? TSDB_CODE_SUCCESS : TSDB_CODE_PAR_PERMISSION_DENIED) : code; } @@ -130,7 +135,6 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) { case QUERY_NODE_SHOW_TOPICS_STMT: case QUERY_NODE_SHOW_CONSUMERS_STMT: case QUERY_NODE_SHOW_SUBSCRIBES_STMT: - case QUERY_NODE_SHOW_TRANS_STMT: case QUERY_NODE_SHOW_SMAS_STMT: case QUERY_NODE_SHOW_CONFIGS_STMT: case QUERY_NODE_SHOW_CONNECTIONS_STMT: diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 5ad488e214..1bd7e28c74 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -4131,6 +4131,7 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) { case QUERY_NODE_SHOW_QUERIES_STMT: case QUERY_NODE_SHOW_CLUSTER_STMT: case QUERY_NODE_SHOW_TOPICS_STMT: + case QUERY_NODE_SHOW_TRANSACTIONS_STMT: code = rewriteShow(pCxt, pQuery); break; case QUERY_NODE_CREATE_TABLE_STMT: From 64e8e0c09bf913e4f74a4fca3a238d1b99ab4710 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sun, 8 May 2022 22:31:25 +0800 Subject: [PATCH 8/8] fix: problem of sql command 'kill transaction' --- source/libs/parser/test/mockCatalog.cpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/source/libs/parser/test/mockCatalog.cpp b/source/libs/parser/test/mockCatalog.cpp index a82fbfce26..7297e4e93a 100644 --- a/source/libs/parser/test/mockCatalog.cpp +++ b/source/libs/parser/test/mockCatalog.cpp @@ -100,6 +100,14 @@ void generateInformationSchema(MockCatalogService* mcs) { } } +void generatePerformanceSchema(MockCatalogService* mcs) { + { + ITableBuilder& builder = mcs->createTableBuilder("performance_schema", "trans", TSDB_SYSTEM_TABLE, 1) + .addColumn("id", TSDB_DATA_TYPE_INT); + builder.done(); + } +} + /* * Table:t1 * Field | Type | DataType | Bytes | @@ -244,6 +252,7 @@ void initMetaDataEnv() { void generateMetaData() { generateInformationSchema(mockCatalogService.get()); + generatePerformanceSchema(mockCatalogService.get()); generateTestT1(mockCatalogService.get()); generateTestST1(mockCatalogService.get()); mockCatalogService->showTables();