diff --git a/cmake/taosadapter_CMakeLists.txt.in b/cmake/taosadapter_CMakeLists.txt.in index 13826a1a74..ef6ed4af1d 100644 --- a/cmake/taosadapter_CMakeLists.txt.in +++ b/cmake/taosadapter_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taosadapter ExternalProject_Add(taosadapter GIT_REPOSITORY https://github.com/taosdata/taosadapter.git - GIT_TAG main + GIT_TAG 3.0 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index 9bbda8309f..9a6a5329ae 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG main + GIT_TAG 3.0 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/cmake/taosws_CMakeLists.txt.in b/cmake/taosws_CMakeLists.txt.in index b013d45911..17446d184d 100644 --- a/cmake/taosws_CMakeLists.txt.in +++ b/cmake/taosws_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taosws-rs ExternalProject_Add(taosws-rs GIT_REPOSITORY https://github.com/taosdata/taos-connector-rust.git - GIT_TAG main + GIT_TAG 3.0 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/docs/en/14-reference/01-components/01-taosd.md b/docs/en/14-reference/01-components/01-taosd.md index 1b7f63510b..c86b631df4 100644 --- a/docs/en/14-reference/01-components/01-taosd.md +++ b/docs/en/14-reference/01-components/01-taosd.md @@ -190,7 +190,8 @@ The effective value of charset is UTF-8. |Parameter Name |Supported Version |Dynamic Modification|Description| |-----------------------|-------------------------|--------------------|------------| |supportVnodes | |Supported, effective immediately |Maximum number of vnodes supported by a dnode, range 0-4096, default value is twice the number of CPU cores + 5| -|numOfCommitThreads | |Supported, effective after restart|Maximum number of commit threads, range 0-1024, default value 4| +|numOfCommitThreads | |Supported, effective after restart|Maximum number of commit threads, range 1-1024, default value 4| +|numOfCompactThreads | |Supported, effective after restart|Maximum number of commit threads, range 1-16, default value 2| |numOfMnodeReadThreads | |Supported, effective after restart|Number of Read threads for mnode, range 0-1024, default value is one quarter of the CPU cores (not exceeding 4)| |numOfVnodeQueryThreads | |Supported, effective after restart|Number of Query threads for vnode, range 0-1024, default value is twice the number of CPU cores (not exceeding 16)| |numOfVnodeFetchThreads | |Supported, effective after restart|Number of Fetch threads for vnode, range 0-1024, default value is one quarter of the CPU cores (not exceeding 4)| diff --git a/docs/zh/14-reference/01-components/01-taosd.md b/docs/zh/14-reference/01-components/01-taosd.md index 7a0a306a41..74e6f485ff 100644 --- a/docs/zh/14-reference/01-components/01-taosd.md +++ b/docs/zh/14-reference/01-components/01-taosd.md @@ -185,7 +185,8 @@ charset 的有效值是 UTF-8。 |参数名称|支持版本|动态修改|参数含义| |--------------------------|----------|-------------------------|-| |supportVnodes | |支持动态修改 立即生效 |dnode 支持的最大 vnode 数目,取值范围 0-4096,默认值 CPU 核数的 2 倍 + 5| -|numOfCommitThreads | |支持动态修改 重启生效 |落盘线程的最大数量,取值范围 0-1024,默认值为 4| +|numOfCommitThreads | |支持动态修改 重启生效 |落盘线程的最大数量,取值范围 1-1024,默认值为 4| +|numOfCompactThreads | |支持动态修改 重启生效 |落盘线程的最大数量,取值范围 1-16,默认值为 2| |numOfMnodeReadThreads | |支持动态修改 重启生效 |mnode 的 Read 线程数目,取值范围 0-1024,默认值为 CPU 核数的四分之一(不超过 4)| |numOfVnodeQueryThreads | |支持动态修改 重启生效 |vnode 的 Query 线程数目,取值范围 0-1024,默认值为 CPU 核数的两倍(不超过 16)| |numOfVnodeFetchThreads | |支持动态修改 重启生效 |vnode 的 Fetch 线程数目,取值范围 0-1024,默认值为 CPU 核数的四分之一(不超过 4)| diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 0b34e882c8..292e7f561a 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -182,7 +182,7 @@ void tColDataClear(SColData *pColData); void tColDataDeepClear(SColData *pColData); int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal); int32_t tColDataUpdateValue(SColData *pColData, SColVal *pColVal, bool forward); -void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal); +int32_t tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal); uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal); int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMalloc, void *arg); void tColDataArrGetRowKey(SColData *aColData, int32_t nColData, int32_t iRow, SRowKey *key); diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 501f1cabc1..ec130ad6b5 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -112,9 +112,8 @@ extern int32_t tsNumOfSnodeWriteThreads; extern int64_t tsQueueMemoryAllowed; extern int32_t tsRetentionSpeedLimitMB; -extern const char *tsAlterCompactTaskKeywords; -extern int32_t tsNumOfCompactThreads; -extern int32_t tsNumOfRetentionThreads; +extern int32_t tsNumOfCompactThreads; +extern int32_t tsNumOfRetentionThreads; // sync raft extern int32_t tsElectInterval; diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index acd118acc9..d48142811c 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -848,6 +848,7 @@ static int stmtSetDbName2(TAOS_STMT2* stmt, const char* dbName) { STMT_DLOG("start to set dbName: %s", dbName); pStmt->db = taosStrdup(dbName); + (void)strdequote(pStmt->db); STMT_ERR_RET(stmtCreateRequest(pStmt)); // The SQL statement specifies a database name, overriding the previously specified database diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 3e4667fbe7..60f0a72e39 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -126,6 +126,10 @@ void fetchCallback(void* param, void* res, int32_t numOfRow) { void queryCallback(void* param, void* res, int32_t code) { if (code != TSDB_CODE_SUCCESS) { (void)printf("failed to execute, reason:%s\n", taos_errstr(res)); + taos_free_result(res); + tsem_t *sem = (tsem_t *)param; + tsem_post(sem); + return; } (void)printf("start to fetch data\n"); taos_fetch_raw_block_a(res, fetchCallback, param); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index f1aacfed15..3efccf23b4 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -2671,8 +2671,13 @@ static void (*tColDataGetValueImpl[])(SColData *pColData, int32_t iVal, SColVal tColDataGetValue6, // HAS_VALUE | HAS_NULL tColDataGetValue7 // HAS_VALUE | HAS_NULL | HAS_NONE }; -void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal) { +int32_t tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal) { + if (iVal < 0 || iVal >= pColData->nVal || + (pColData->flag <= 0 || pColData->flag >= sizeof(tColDataGetValueImpl)/POINTER_BYTES)){ + return TSDB_CODE_INVALID_PARA; + } tColDataGetValueImpl[pColData->flag](pColData, iVal, pColVal); + return TSDB_CODE_SUCCESS; } uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal) { @@ -3436,7 +3441,10 @@ static int32_t tColDataCopyRowAppend(SColData *aFromColData, int32_t iFromRow, S for (int32_t i = 0; i < nColData; i++) { SColVal cv = {0}; - tColDataGetValue(&aFromColData[i], iFromRow, &cv); + code = tColDataGetValue(&aFromColData[i], iFromRow, &cv); + if (code != TSDB_CODE_SUCCESS) { + return code; + } code = tColDataAppendValue(&aToColData[i], &cv); if (code != TSDB_CODE_SUCCESS) { return code; @@ -3575,7 +3583,10 @@ static int32_t tColDataMerge(SArray **colArr) { SColData *dstCol = taosArrayGet(dst, j); SColVal cv; - tColDataGetValue(srcCol, i, &cv); + code = tColDataGetValue(srcCol, i, &cv); + if (code != TSDB_CODE_SUCCESS) { + goto _exit; + } code = tColDataAppendValue(dstCol, &cv); if (code) { goto _exit; @@ -3588,7 +3599,10 @@ static int32_t tColDataMerge(SArray **colArr) { SColData *dstCol = taosArrayGet(dst, j); SColVal cv; - tColDataGetValue(srcCol, i, &cv); + code = tColDataGetValue(srcCol, i, &cv); + if (code != TSDB_CODE_SUCCESS) { + goto _exit; + } code = tColDataUpdateValue(dstCol, &cv, true); if (code) { goto _exit; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index c58ad32a18..b298cac725 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -102,9 +102,8 @@ int32_t tsMaxStreamBackendCache = 128; // M int32_t tsPQSortMemThreshold = 16; // M int32_t tsRetentionSpeedLimitMB = 0; // unlimited -const char *tsAlterCompactTaskKeywords = "max_compact_tasks"; -int32_t tsNumOfCompactThreads = 2; -int32_t tsNumOfRetentionThreads = 1; +int32_t tsNumOfCompactThreads = 2; +int32_t tsNumOfRetentionThreads = 1; // sync raft int32_t tsElectInterval = 25 * 1000; @@ -745,8 +744,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tsmaDataDeleteMark", tsmaDataDeleteMark, 60 * 60 * 1000, INT64_MAX, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL)); - - TAOS_CHECK_RETURN(cfgAddBool(pCfg, "streamCoverage", tsStreamCoverage, CFG_DYN_CLIENT, CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL)); + + TAOS_CHECK_RETURN( + cfgAddBool(pCfg, "streamCoverage", tsStreamCoverage, CFG_DYN_CLIENT, CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL)); TAOS_RETURN(TSDB_CODE_SUCCESS); } @@ -795,9 +795,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfCommitThreads = tsNumOfCores / 2; tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4); - tsNumOfCompactThreads = tsNumOfCommitThreads; - tsNumOfCompactThreads = TRANGE(tsNumOfCompactThreads, 2, 4); - tsNumOfSupportVnodes = tsNumOfCores * 2 + 5; tsNumOfSupportVnodes = TMAX(tsNumOfSupportVnodes, 2); @@ -842,7 +839,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY, CFG_CATEGORY_LOCAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_LOCAL)); - TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "maxCompactConcurrency", tsNumOfCompactThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_LOCAL)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfCompactThreads", tsNumOfCompactThreads, 1, 16, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_LOCAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "retentionSpeedLimitMB", tsRetentionSpeedLimitMB, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "queryUseMemoryPool", tsQueryUseMemoryPool, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_LOCAL) != 0); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "memPoolFullFunc", tsMemPoolFullFunc, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_LOCAL) != 0); @@ -1040,10 +1037,8 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem->stype = stype; } - pItem = cfgGetItem(pCfg, "maxCompactConcurrency"); + pItem = cfgGetItem(pCfg, "numOfCompactThreads"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { - tsNumOfCompactThreads = numOfCores / 2; - tsNumOfCompactThreads = TRANGE(tsNumOfCompactThreads, 2, 4); pItem->i32 = tsNumOfCompactThreads; pItem->stype = stype; } @@ -1548,7 +1543,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "numOfCommitThreads"); tsNumOfCommitThreads = pItem->i32; - TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "maxCompactConcurrency"); + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "numOfCompactThreads"); tsNumOfCompactThreads = pItem->i32; TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "retentionSpeedLimitMB"); @@ -2354,6 +2349,8 @@ static int32_t taosCfgSetOption(OptionNameAndVar *pOptions, int32_t optionSize, TAOS_RETURN(code); } +extern void tsdbAlterNumCompactThreads(); + static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = -1; @@ -2404,6 +2401,17 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { goto _exit; } + if (strcasecmp(name, "numOfCompactThreads") == 0) { +#ifdef TD_ENTERPRISE + tsNumOfCompactThreads = pItem->i32; + code = TSDB_CODE_SUCCESS; + // tsdbAlterNumCompactThreads(); +#else + code = TSDB_CODE_INVALID_CFG; +#endif + goto _exit; + } + { // 'bool/int32_t/int64_t/float/double' variables with general modification function static OptionNameAndVar debugOptions[] = { {"dDebugFlag", &dDebugFlag}, {"vDebugFlag", &vDebugFlag}, diff --git a/source/common/test/dataformatTest.cpp b/source/common/test/dataformatTest.cpp index ebf91025bb..10c1077697 100644 --- a/source/common/test/dataformatTest.cpp +++ b/source/common/test/dataformatTest.cpp @@ -449,6 +449,20 @@ static void checkTSRow(const char **data, STSRow *row, STSchema *pTSchema) { checkSColVal(data[i], &colVal, pCol->type); } } +#ifndef WINDOWS +TEST(testCase, tColDataGetValue) { + SColData pColData = {0}; + SColVal pColVal = {0}; + ASSERT_NE(tColDataGetValue(&pColData, 0, &pColVal),0); + + pColData = {.flag = 8}; + pColVal = {0}; + ASSERT_NE(tColDataGetValue(&pColData, 0, &pColVal),0); + + pColData = {.nVal = 1, .flag = 8}; + ASSERT_NE(tColDataGetValue(&pColData, 0, &pColVal),0); +} +#endif TEST(testCase, AllNormTest) { int16_t nCols = 14; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 8139e4aa98..9ed4ee83c4 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -475,27 +475,6 @@ int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { return 0; } -extern void tsdbAlterNumCompactThreads(); -static int32_t dmAlterMaxCompactTask(const char *value) { - int32_t max_compact_tasks; - char *endptr = NULL; - - max_compact_tasks = taosStr2Int32(value, &endptr, 10); - if (endptr == value || endptr[0] != '\0') { - return TSDB_CODE_INVALID_MSG; - } - - if (max_compact_tasks != tsNumOfCompactThreads) { - dInfo("alter max compact tasks from %d to %d", tsNumOfCompactThreads, max_compact_tasks); - tsNumOfCompactThreads = max_compact_tasks; -#ifdef TD_ENTERPRISE - (void)tsdbAlterNumCompactThreads(); -#endif - } - - return TSDB_CODE_SUCCESS; -} - int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t code = 0; SDCfgDnodeReq cfgReq = {0}; @@ -509,10 +488,6 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { return taosUpdateTfsItemDisable(pCfg, cfgReq.value, pMgmt->pTfs); } - if (strncmp(cfgReq.config, tsAlterCompactTaskKeywords, strlen(tsAlterCompactTaskKeywords) + 1) == 0) { - return dmAlterMaxCompactTask(cfgReq.value); - } - dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value); code = cfgGetAndSetItem(pCfg, &pItem, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_SERVER_CMD, true); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 8e595f76c9..165437ed28 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -129,7 +129,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_USER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c index 30a20cd68d..8fe7d3823a 100644 --- a/source/dnode/vnode/src/meta/metaSnapshot.c +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -325,7 +325,6 @@ int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8 ctx->suidInfo = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); if (ctx->suidInfo == NULL) { return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY); - ; } taosHashSetFreeFp(ctx->suidInfo, destroySTableInfoForChildTable); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index c0c4c4a5a3..a2b6194375 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -796,7 +796,8 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* sourceIdx++; } else if (pCol->cid == pColData->info.colId) { for (int32_t i = 0; i < pCol->nVal; i++) { - tColDataGetValue(pCol, i, &colVal); + code = tColDataGetValue(pCol, i, &colVal); + TSDB_CHECK_CODE(code, line, END); code = doSetVal(pColData, i, &colVal); TSDB_CHECK_CODE(code, line, END); } @@ -937,7 +938,7 @@ static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData pCol = taosArrayGet(pCols, j); TQ_NULL_GO_TO_END(pCol); SColVal colVal = {0}; - tColDataGetValue(pCol, i, &colVal); + TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal)); PROCESS_VAL } @@ -961,7 +962,7 @@ static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx); TQ_NULL_GO_TO_END(pColData); SColVal colVal = {0}; - tColDataGetValue(pCol, i, &colVal); + TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal)); SET_DATA } diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 5151ea3958..4dff1d08d9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1715,7 +1715,7 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo uint8_t colType = tColDataGetBitValue(pColData, tRow.iRow); if (colType == 2) { SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type); - tColDataGetValue(pColData, tRow.iRow, &colVal); + TAOS_CHECK_GOTO(tColDataGetValue(pColData, tRow.iRow, &colVal), &lino, _exit); SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal}; if (!taosArrayPush(ctxArray, &updateCtx)) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index a9f3893b96..4ca140c03e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -123,7 +123,8 @@ static int32_t tColRowGetPriamyKeyDeepCopy(SBlockData* pBlock, int32_t irow, int pColData = &pBlock->aColData[slotId]; - tColDataGetValue(pColData, irow, &cv); + code = tColDataGetValue(pColData, irow, &cv); + TSDB_CHECK_CODE(code, lino, _end); pKey->numOfPKs = 1; pKey->pks[0].type = cv.value.type; @@ -1603,7 +1604,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro TSDB_CHECK_CODE(code, lino, _end); } else { // varchar/nchar type for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step) { - tColDataGetValue(pData, j, &cv); + code = tColDataGetValue(pData, j, &cv); + TSDB_CHECK_CODE(code, lino, _end); code = doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo); TSDB_CHECK_CODE(code, lino, _end); } @@ -5282,7 +5284,8 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, pSupInfo->slotId[i]); if (pData->cid == pSupInfo->colId[i]) { - tColDataGetValue(pData, rowIndex, &cv); + code = tColDataGetValue(pData, rowIndex, &cv); + TSDB_CHECK_CODE(code, lino, _end); code = doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo); TSDB_CHECK_CODE(code, lino, _end); j += 1; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 16f6777765..88c6ac3d00 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -622,7 +622,9 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal * SColData *pColData = tBlockDataGetColData(pRow->pBlockData, pTColumn->colId); if (pColData) { - tColDataGetValue(pColData, pRow->iRow, pColVal); + if (tColDataGetValue(pColData, pRow->iRow, pColVal) != 0){ + tsdbError("failed to tColDataGetValue"); + } } else { *pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type); } @@ -645,7 +647,9 @@ void tColRowGetPrimaryKey(SBlockData *pBlock, int32_t irow, SRowKey *key) { SColData *pColData = &pBlock->aColData[i]; if (pColData->cflag & COL_IS_KEY) { SColVal cv; - tColDataGetValue(pColData, irow, &cv); + if (tColDataGetValue(pColData, irow, &cv) != 0){ + break; + } key->pks[key->numOfPKs] = cv.value; key->numOfPKs++; } else { @@ -719,7 +723,9 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter) { } if (pIter->iColData <= pIter->pRow->pBlockData->nColData) { - tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData - 1], pIter->pRow->iRow, &pIter->cv); + if (tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData - 1], pIter->pRow->iRow, &pIter->cv) != 0){ + return NULL; + } ++pIter->iColData; return &pIter->cv; } else { @@ -1251,7 +1257,8 @@ static int32_t tBlockDataUpsertBlockRow(SBlockData *pBlockData, SBlockData *pBlo cv = COL_VAL_NONE(pColDataTo->cid, pColDataTo->type); if (flag == 0 && (code = tColDataAppendValue(pColDataTo, &cv))) goto _exit; } else { - tColDataGetValue(pColDataFrom, iRow, &cv); + code = tColDataGetValue(pColDataFrom, iRow, &cv); + if (code) goto _exit; if (flag) { code = tColDataUpdateValue(pColDataTo, &cv, flag > 0); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index f52a0c3aba..723fd14145 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -243,7 +243,7 @@ int32_t vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { code = TSDB_CODE_VND_HASH_MISMATCH; goto _exit; } else if (mer1.me.type == TSDB_CHILD_TABLE) { - metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_LOCK); + metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_NOLOCK); if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit; tstrncpy(cfgRsp.stbName, mer2.me.name, TSDB_TABLE_NAME_LEN); @@ -279,7 +279,8 @@ int32_t vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { } } else { vError("vnodeGetTableCfg get invalid table type:%d", mer1.me.type); - return TSDB_CODE_APP_ERROR; + code = TSDB_CODE_APP_ERROR; + goto _exit; } cfgRsp.numOfTags = schemaTag.nCols; diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index dd1a52022e..eb72edb964 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -61,9 +61,13 @@ static int32_t anomalyCacheBlock(SAnomalyWindowOperatorInfo* pInfo, SSDataBlock* int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_PARAM_CHECK(pOptrInfo); + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + size_t keyBufSize = 0; + int32_t num = 0; + SExprInfo* pExprInfo = NULL; + const char* id = GET_TASKID(pTaskInfo); - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; SAnomalyWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAnomalyWindowOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SAnomalyWindowPhysiNode* pAnomalyNode = (SAnomalyWindowPhysiNode*)physiNode; @@ -74,13 +78,13 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p } if (!taosAnalGetOptStr(pAnomalyNode->anomalyOpt, "algo", pInfo->algoName, sizeof(pInfo->algoName))) { - qError("failed to get anomaly_window algorithm name from %s", pAnomalyNode->anomalyOpt); + qError("%s failed to get anomaly_window algorithm name from %s", id, pAnomalyNode->anomalyOpt); code = TSDB_CODE_ANA_ALGO_NOT_FOUND; goto _error; } if (taosAnalGetAlgoUrl(pInfo->algoName, ANAL_ALGO_TYPE_ANOMALY_DETECT, pInfo->algoUrl, sizeof(pInfo->algoUrl)) != 0) { - qError("failed to get anomaly_window algorithm url from %s", pInfo->algoName); + qError("%s failed to get anomaly_window algorithm url from %s", id, pInfo->algoName); code = TSDB_CODE_ANA_ALGO_NOT_LOAD; goto _error; } @@ -94,20 +98,18 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p SExprInfo* pScalarExprInfo = NULL; code = createExprInfo(pAnomalyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr); QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } - size_t keyBufSize = 0; - int32_t num = 0; - SExprInfo* pExprInfo = NULL; code = createExprInfo(pAnomalyNode->window.pFuncs, NULL, &pExprInfo, &num); QUERY_CHECK_CODE(code, lino, _error); initResultSizeInfo(&pOperator->resultInfo, 4096); - code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, - pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, id, pTaskInfo->streamInfo.pState, + &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pAnomalyNode->window.node.pOutputDataBlockDesc); @@ -124,27 +126,19 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p pInfo->anomalyCol = extractColumnFromColumnNode(pColNode); pInfo->anomalyKey.type = pInfo->anomalyCol.type; pInfo->anomalyKey.bytes = pInfo->anomalyCol.bytes; + pInfo->anomalyKey.pData = taosMemoryCalloc(1, pInfo->anomalyCol.bytes); - if (pInfo->anomalyKey.pData == NULL) { - goto _error; - } + QUERY_CHECK_NULL(pInfo->anomalyKey.pData, code, lino, _error, terrno) int32_t itemSize = sizeof(int32_t) + pInfo->aggSup.resultRowSize + pInfo->anomalyKey.bytes; pInfo->anomalySup.pResultRow = taosMemoryCalloc(1, itemSize); - if (pInfo->anomalySup.pResultRow == NULL) { - code = terrno; - goto _error; - } + QUERY_CHECK_NULL(pInfo->anomalySup.pResultRow, code, lino, _error, terrno) + pInfo->anomalySup.blocks = taosArrayInit(16, sizeof(SSDataBlock*)); - if (pInfo->anomalySup.blocks == NULL) { - code = terrno; - goto _error; - } + QUERY_CHECK_NULL(pInfo->anomalySup.blocks, code, lino, _error, terrno) + pInfo->anomalySup.windows = taosArrayInit(16, sizeof(STimeWindow)); - if (pInfo->anomalySup.windows == NULL) { - code = terrno; - goto _error; - } + QUERY_CHECK_NULL(pInfo->anomalySup.windows, code, lino, _error, terrno) code = filterInitFromNode((SNode*)pAnomalyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); QUERY_CHECK_CODE(code, lino, _error); @@ -162,18 +156,21 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p *pOptrInfo = pOperator; - qDebug("anomaly_window operator is created, algo:%s url:%s opt:%s", pInfo->algoName, pInfo->algoUrl, + qDebug("%s anomaly_window operator is created, algo:%s url:%s opt:%s", id, pInfo->algoName, pInfo->algoUrl, pInfo->anomalyOpt); return TSDB_CODE_SUCCESS; _error: + qError("%s failed to create anomaly_window operator, line:%d algo:%s code:%s", id, lino, pAnomalyNode->anomalyOpt, + tstrerror(code)); + if (pInfo != NULL) { anomalyDestroyOperatorInfo(pInfo); } destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; - qError("failed to create anomaly_window operator, algo:%s code:0x%x", pInfo->algoName, code); + return code; } diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 13ae220116..3f08db0e98 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1441,7 +1441,7 @@ static int32_t doSetUserTableMetaInfo(SStoreMetaReader* pMetaReaderFn, SStoreMet SMetaReader mr1 = {0}; pMetaReaderFn->initReader(&mr1, pVnode, META_READER_NOLOCK, pMetaFn); - + int64_t suid = pMReader->me.ctbEntry.suid; code = pMetaReaderFn->getTableEntryByUid(&mr1, suid); if (code != TSDB_CODE_SUCCESS) { @@ -1752,7 +1752,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { SMetaReader mr = {0}; pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_NOLOCK, &pAPI->metaFn); - + uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid; code = pAPI->metaReaderFn.getTableEntryByUid(&mr, suid); if (code != TSDB_CODE_SUCCESS) { @@ -2284,7 +2284,7 @@ static SSDataBlock* sysTableBuildUserFileSets(SOperatorInfo* pOperator) { // db_name pColInfoData = taosArrayGet(p->pDataBlock, index++); QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno); - code = colDataSetVal(pColInfoData, numOfRows, db, false); + code = colDataSetVal(pColInfoData, numOfRows, dbname, false); QUERY_CHECK_CODE(code, lino, _end); // vgroup_id diff --git a/source/libs/index/inc/indexFstRegex.h b/source/libs/index/inc/indexFstRegex.h index 2814b5dc16..a6954afab6 100644 --- a/source/libs/index/inc/indexFstRegex.h +++ b/source/libs/index/inc/indexFstRegex.h @@ -28,7 +28,7 @@ extern "C" { #endif -typedef enum { MATCH, JUMP, SPLIT, RANGE } InstType; +typedef enum { INS_MATCH, INS_JUMP, INS_SPLIT, INS_RANGE } InstType; typedef struct MatchValue { #ifdef WINDOWS diff --git a/source/libs/index/src/indexFstDfa.c b/source/libs/index/src/indexFstDfa.c index 3b0014f16a..fa7dbb5f1f 100644 --- a/source/libs/index/src/indexFstDfa.c +++ b/source/libs/index/src/indexFstDfa.c @@ -159,14 +159,14 @@ bool dfaBuilderCacheState(FstDfaBuilder *builder, FstSparseSet *set, uint32_t *r if (false == sparSetGet(set, i, &ip)) continue; Inst *inst = taosArrayGet(builder->dfa->insts, ip); - if (inst->ty == JUMP || inst->ty == SPLIT) { + if (inst->ty == INS_JUMP || inst->ty == INS_SPLIT) { continue; - } else if (inst->ty == RANGE) { + } else if (inst->ty == INS_RANGE) { if (taosArrayPush(tinsts, &ip) == NULL) { code = terrno; goto _exception; } - } else if (inst->ty == MATCH) { + } else if (inst->ty == INS_MATCH) { isMatch = true; if (taosArrayPush(tinsts, &ip) == NULL) { code = terrno; @@ -234,11 +234,11 @@ void dfaAdd(FstDfa *dfa, FstSparseSet *set, uint32_t ip) { } bool succ = sparSetAdd(set, ip, NULL); Inst *inst = taosArrayGet(dfa->insts, ip); - if (inst->ty == MATCH || inst->ty == RANGE) { + if (inst->ty == INS_MATCH || inst->ty == INS_RANGE) { // do nothing - } else if (inst->ty == JUMP) { + } else if (inst->ty == INS_JUMP) { dfaAdd(dfa, set, inst->jv.step); - } else if (inst->ty == SPLIT) { + } else if (inst->ty == INS_SPLIT) { dfaAdd(dfa, set, inst->sv.len1); dfaAdd(dfa, set, inst->sv.len2); } @@ -253,11 +253,11 @@ bool dfaRun(FstDfa *dfa, FstSparseSet *from, FstSparseSet *to, uint8_t byte) { if (false == sparSetGet(from, i, &ip)) continue; Inst *inst = taosArrayGet(dfa->insts, ip); - if (inst->ty == JUMP || inst->ty == SPLIT) { + if (inst->ty == INS_JUMP || inst->ty == INS_SPLIT) { continue; - } else if (inst->ty == MATCH) { + } else if (inst->ty == INS_MATCH) { isMatch = true; - } else if (inst->ty == RANGE) { + } else if (inst->ty == INS_RANGE) { if (inst->rv.start <= byte && byte <= inst->rv.end) { dfaAdd(dfa, to, ip + 1); } diff --git a/source/libs/index/test/utilUT.cc b/source/libs/index/test/utilUT.cc index 299b62b6fb..5b22b51172 100644 --- a/source/libs/index/test/utilUT.cc +++ b/source/libs/index/test/utilUT.cc @@ -17,6 +17,7 @@ #include "tglobal.h" #include "tskiplist.h" #include "tutil.h" +#include "indexFstDfa.h" class UtilEnv : public ::testing::Test { protected: @@ -41,6 +42,29 @@ class UtilEnv : public ::testing::Test { SArray *rslt; }; +class UtilComm : public ::testing::Test { + protected: + virtual void SetUp() { + // src = (SArray *)taosArrayInit(2, sizeof(void *)); + // for (int i = 0; i < 3; i++) { + // SArray *m = taosArrayInit(10, sizeof(uint64_t)); + // taosArrayPush(src, &m); + // } + + // rslt = (SArray *)taosArrayInit(10, sizeof(uint64_t)); + } + virtual void TearDown() { + // for (int i = 0; i < taosArrayGetSize(src); i++) { + // SArray *m = (SArray *)taosArrayGetP(src, i); + // taosArrayDestroy(m); + // } + // taosArrayDestroy(src); + } + // SArray *src; + // SArray *rslt; + +}; + static void clearSourceArray(SArray *p) { for (int i = 0; i < taosArrayGetSize(p); i++) { SArray *m = (SArray *)taosArrayGetP(p, i); @@ -369,3 +393,35 @@ TEST_F(UtilEnv, testDictComm) { EXPECT_EQ(COMMON_INPUTS[v], i); } } + +TEST_F(UtilComm, testCompress) { + for (int32_t i = 0; i < 6; i++) { + _cache_range_compare cmpFunc = idxGetCompare((RangeType)(i)); + //char[32]a = 0, b = 1; + char a[32] = {0}; + char b[32] = {1}; + for (int32_t j = 0; j < TSDB_DATA_TYPE_MAX; j++) { + cmpFunc(a, b, j); + } + } +} +TEST_F(UtilComm, testfstDfa) { + { + FstDfaBuilder *builder = dfaBuilderCreate(NULL); + ASSERT_TRUE(builder != NULL); + dfaBuilderDestroy(builder); + } + { + SArray *pInst = taosArrayInit(32, sizeof(uint8_t)); + for (int32_t i = 0; i < 26; i++) { + uint8_t v = 'a' + i; + taosArrayPush(pInst, &v); + } + FstDfaBuilder *builder = dfaBuilderCreate(pInst); + FstDfa *dfa = dfaBuilderBuild(builder); + dfaBuilderDestroy(builder); + } +} + + + diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index e2069deefc..128fb50b8f 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1841,6 +1841,9 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* } if (TK_NK_QUESTION == pToken->type) { + if (!pCxt->pComCxt->isStmtBind && i != 0) { + return buildInvalidOperationMsg(&pCxt->msg, "not support mixed bind and non-bind values"); + } pCxt->isStmtBind = true; pStmt->usingTableProcessing = true; if (pCols->pColIndex[i] == tbnameIdx) { @@ -1874,6 +1877,9 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* return buildInvalidOperationMsg(&pCxt->msg, "not expected numOfBound"); } } else { + if (pCxt->pComCxt->isStmtBind) { + return buildInvalidOperationMsg(&pCxt->msg, "not support mixed bind and non-bind values"); + } if (pCols->pColIndex[i] < numOfCols) { const SSchema* pSchema = &pSchemas[pCols->pColIndex[i]]; SColVal* pVal = taosArrayGet(pStbRowsCxt->aColVals, pCols->pColIndex[i]); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 342bd6d66e..9f411c4296 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -10086,7 +10086,6 @@ static int32_t translateAlterDnode(STranslateContext* pCxt, SAlterDnodeStmt* pSt const char* validConfigs[] = { "encrypt_key", - tsAlterCompactTaskKeywords, }; if (0 == strncasecmp(cfgReq.config, validConfigs[0], strlen(validConfigs[0]) + 1)) { int32_t klen = strlen(cfgReq.value); @@ -10097,28 +10096,6 @@ static int32_t translateAlterDnode(STranslateContext* pCxt, SAlterDnodeStmt* pSt ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN); } code = buildCmdMsg(pCxt, TDMT_MND_CREATE_ENCRYPT_KEY, (FSerializeFunc)tSerializeSMCfgDnodeReq, &cfgReq); - } else if (0 == strncasecmp(cfgReq.config, validConfigs[1], strlen(validConfigs[1]) + 1)) { - char* endptr = NULL; - int32_t maxCompactTasks = taosStr2Int32(cfgReq.value, &endptr, 10); - int32_t minMaxCompactTasks = MIN_MAX_COMPACT_TASKS; - int32_t maxMaxCompactTasks = MAX_MAX_COMPACT_TASKS; - - // check format - if (endptr == cfgReq.value || endptr[0] != '\0') { - tFreeSMCfgDnodeReq(&cfgReq); - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_DNODE_INVALID_COMPACT_TASKS, - "Invalid max compact tasks: %s", cfgReq.value); - } - - // check range - if (maxCompactTasks < minMaxCompactTasks || maxCompactTasks > maxMaxCompactTasks) { - tFreeSMCfgDnodeReq(&cfgReq); - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_DNODE_INVALID_COMPACT_TASKS, - "Invalid max compact tasks: %d, valid range [%d,%d]", maxCompactTasks, - minMaxCompactTasks, maxMaxCompactTasks); - } - - code = buildCmdMsg(pCxt, TDMT_MND_CONFIG_DNODE, (FSerializeFunc)tSerializeSMCfgDnodeReq, &cfgReq); } else { code = buildCmdMsg(pCxt, TDMT_MND_CONFIG_DNODE, (FSerializeFunc)tSerializeSMCfgDnodeReq, &cfgReq); } diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index b329bbbd44..6689e12edd 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -5128,8 +5128,8 @@ static int32_t fltSclCollectOperatorFromNode(SNode *pNode, SArray *sclOpList) { SOperatorNode *pOper = (SOperatorNode *)pNode; - SValueNode *valNode = (SValueNode *)pOper->pRight; - if (IS_NUMERIC_TYPE(valNode->node.resType.type) || valNode->node.resType.type == TSDB_DATA_TYPE_TIMESTAMP) { + SExprNode* pLeft = (SExprNode*)pOper->pLeft; + if (IS_NUMERIC_TYPE(pLeft->resType.type) || pLeft->resType.type == TSDB_DATA_TYPE_TIMESTAMP) { SNode* pLeft = NULL, *pRight = NULL; int32_t code = nodesCloneNode(pOper->pLeft, &pLeft); if (TSDB_CODE_SUCCESS != code) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index fa16cace25..42d7f44b62 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -954,7 +954,6 @@ static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, void* param, int32_t n int32_t vgId = pTask->pMeta->vgId; if (pTmrInfo->launchChkptId != pActiveInfo->activeId) { - streamCleanBeforeQuitTmr(pTmrInfo, param); stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64 ", quit", id, vgId, pTmrInfo->launchChkptId); @@ -963,13 +962,11 @@ static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, void* param, int32_t n // active checkpoint info is cleared for now if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (num == 0) || (pTask->chkInfo.startTs == 0)) { - streamCleanBeforeQuitTmr(pTmrInfo, param); stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr", id, vgId); return -1; } if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) { - streamCleanBeforeQuitTmr(pTmrInfo, param); stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr", id, vgId, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num); return -1; @@ -998,6 +995,7 @@ static int32_t doFindNotConfirmUpstream(SArray** ppNotRspList, SArray* pList, in void* p = taosArrayPush(pTmp, &pInfo->upstreamTaskId); if (p == NULL) { stError("s-task:%s vgId:%d failed to record not rsp task, code: out of memory", id, vgId); + taosArrayDestroy(pTmp); return terrno; } else { stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId, level, @@ -1047,13 +1045,13 @@ static void doSendChkptReadyMsg(SStreamTask* pTask, SArray* pNotRspList, int64_t } } -static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, void* param, SArray* pNotRspList) { +static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, void* param, SArray** pNotRspList) { SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; SArray* pList = pActiveInfo->pReadyMsgList; int32_t num = taosArrayGetSize(pList); int32_t vgId = pTask->pMeta->vgId; - int32_t checkpointId = pActiveInfo->activeId; + int64_t checkpointId = pActiveInfo->activeId; const char* id = pTask->id.idStr; int32_t notRsp = 0; @@ -1062,18 +1060,17 @@ static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, void* param, SArray* return code; } - code = doFindNotConfirmUpstream(&pNotRspList, pList, num, vgId, pTask->info.taskLevel, id); + code = doFindNotConfirmUpstream(pNotRspList, pList, num, vgId, pTask->info.taskLevel, id); if (code) { - streamCleanBeforeQuitTmr(pTmrInfo, param); stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr", id, tstrerror(code)); return code; } - notRsp = taosArrayGetSize(pNotRspList); + notRsp = taosArrayGetSize(*pNotRspList); if (notRsp == 0) { streamClearChkptReadyMsg(pActiveInfo); } else { - doSendChkptReadyMsg(pTask, pNotRspList, checkpointId, pList); + doSendChkptReadyMsg(pTask, *pNotRspList, checkpointId, pList); } return code; @@ -1137,10 +1134,12 @@ static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) { } streamMutexLock(&pActiveInfo->lock); - code = chkptReadyMsgSendHelper(pTask, param, pNotRspList); + code = chkptReadyMsgSendHelper(pTask, param, &pNotRspList); streamMutexUnlock(&pActiveInfo->lock); if (code != TSDB_CODE_SUCCESS) { + streamCleanBeforeQuitTmr(pTmrInfo, param); + streamMetaReleaseTask(pTask->pMeta, pTask); taosArrayDestroy(pNotRspList); return; @@ -1176,7 +1175,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { int32_t num = taosArrayGetSize(pList); if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) { - stError("s-task:%s invalid number of sent readyMsg:%d to upstream:%d", id, num, + stError("s-task:%s invalid number of sent readyMsg:%d to upstream:%d not send chkpt-ready msg", id, num, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList)); streamMutexUnlock(&pActiveInfo->lock); return TSDB_CODE_STREAM_INTERNAL_ERROR; @@ -1200,7 +1199,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { stError("s-task:%s failed to send checkpoint-ready msg, try nex time in 10s", id); } } else { - stError("s-task:%s failed to prepare the checkpoint-ready msg, try nex time in 10s", id); + stError("s-task:%s failed to prepare the checkpoint-ready msg, try next time in 10s", id); } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 85f287f301..89cb4153fe 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -915,8 +915,7 @@ int32_t streamResumeTask(SStreamTask* pTask) { while (1) { code = doStreamExecTask(pTask); if (code) { - stError("s-task:%s failed to exec stream task, code:%s", id, tstrerror(code)); - return code; + stError("s-task:%s failed to exec stream task, code:%s, continue", id, tstrerror(code)); } // check if continue streamMutexLock(&pTask->lock); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index fbde104f4e..4862a4b963 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -422,13 +422,11 @@ int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) { SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) { SyncIndex minMatchIndex = SYNC_INDEX_INVALID; - if (pSyncNode->peersNum > 0) { - minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0])); - } - - for (int32_t i = 1; i < pSyncNode->peersNum; ++i) { + for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i])); - if (matchIndex < minMatchIndex) { + if (minMatchIndex == SYNC_INDEX_INVALID) { + minMatchIndex = matchIndex; + } else if (matchIndex > 0 && matchIndex < minMatchIndex) { minMatchIndex = matchIndex; } } diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 19a3f211b1..c806bdfbd8 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -2023,16 +2023,29 @@ int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) { memcpy(pKey, cd.pKey, (size_t)cd.kLen); if (ppVal) { - // TODO: vLen may be zero - pVal = tdbRealloc(*ppVal, cd.vLen); - if (pVal == NULL) { - tdbFree(pKey); - return terrno; + if (cd.vLen > 0) { + pVal = tdbRealloc(*ppVal, cd.vLen); + if (pVal == NULL) { + tdbFree(pKey); + return terrno; + } + + memcpy(pVal, cd.pVal, (size_t)cd.vLen); + if (TDB_CELLDECODER_FREE_VAL(&cd)) { + tdbTrace("tdb/btree-next decoder: %p pVal free: %p", &cd, cd.pVal); + tdbFree(cd.pVal); + } + } else { + pVal = NULL; } *ppVal = pVal; *vLen = cd.vLen; - memcpy(pVal, cd.pVal, (size_t)cd.vLen); + } else { + if (TDB_CELLDECODER_FREE_VAL(&cd)) { + tdbTrace("tdb/btree-next2 decoder: %p pVal free: %p", &cd, cd.pVal); + tdbFree(cd.pVal); + } } ret = tdbBtcMoveToPrev(pBtc); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 468d9d9b50..3b84fc4574 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -151,7 +151,6 @@ typedef struct SCliThrd { TdThreadMutex msgMtx; SDelayQueue* delayQueue; SDelayQueue* timeoutQueue; - SDelayQueue* waitConnQueue; uint64_t nextTimeout; // next timeout STrans* pInst; // @@ -159,8 +158,6 @@ typedef struct SCliThrd { SHashObj* fqdn2ipCache; SCvtAddr* pCvtAddr; - SHashObj* failFastCache; - SHashObj* batchCache; SHashObj* connHeapCache; SCliReq* stopMsg; @@ -224,8 +221,6 @@ static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void cliConnCb(uv_connect_t* req, int status); static void cliAsyncCb(uv_async_t* handle); -SCliBatch* cliGetHeadFromList(SCliBatchList* pList); - static void destroyCliConnQTable(SCliConn* conn); static void cliHandleException(SCliConn* conn); @@ -1299,8 +1294,8 @@ static void cliHandleException(SCliConn* conn) { if (conn->registered) { int8_t ref = transGetRefCount(conn); if (ref == 0 && !uv_is_closing((uv_handle_t*)conn->stream)) { -// tTrace("%s conn %p fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd, -// conn->stream->io_watcher.fd, conn->stream->accepted_fd, conn->stream->queued_fds); + // tTrace("%s conn %p fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd, + // conn->stream->io_watcher.fd, conn->stream->accepted_fd, conn->stream->queued_fds); uv_close((uv_handle_t*)conn->stream, cliDestroy); } } @@ -2124,144 +2119,7 @@ static void cliDoReq(queue* wq, SCliThrd* pThrd) { tTrace("cli process batch size:%d", count); } } -SCliBatch* cliGetHeadFromList(SCliBatchList* pList) { - if (QUEUE_IS_EMPTY(&pList->wq) || pList->connCnt > pList->connMax || pList->sending > pList->connMax) { - return NULL; - } - queue* hr = QUEUE_HEAD(&pList->wq); - QUEUE_REMOVE(hr); - pList->sending += 1; - pList->len -= 1; - - SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq); - return batch; -} -static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* pReq); - -static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port); - -static void destroyBatchList(SCliBatchList* pList); -static void cliBuildBatch(SCliReq* pReq, queue* h, SCliThrd* pThrd) { - int32_t code = 0; - STrans* pInst = pThrd->pInst; - SReqCtx* pCtx = pReq->ctx; - - char* ip = EPSET_GET_INUSE_IP(pCtx->epSet); - uint32_t port = EPSET_GET_INUSE_PORT(pCtx->epSet); - char key[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(key, ip, port); - size_t klen = strlen(key); - SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen); - if (ppBatchList == NULL || *ppBatchList == NULL) { - SCliBatchList* pBatchList = NULL; - code = createBatchList(&pBatchList, key, ip, port); - if (code != 0) { - destroyReq(pReq); - return; - } - - pBatchList->batchLenLimit = pInst->shareConnLimit; - - SCliBatch* pBatch = NULL; - code = createBatch(&pBatch, pBatchList, pReq); - if (code != 0) { - destroyBatchList(pBatchList); - destroyReq(pReq); - return; - } - - code = taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*)); - if (code != 0) { - destroyBatchList(pBatchList); - } - } else { - if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) { - SCliBatch* pBatch = NULL; - code = createBatch(&pBatch, *ppBatchList, pReq); - if (code != 0) { - destroyReq(pReq); - cliDestroyBatch(pBatch); - } - } else { - queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq)); - SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq); - if ((pBatch->shareConnLimit + pReq->msg.contLen) < (*ppBatchList)->batchLenLimit) { - QUEUE_PUSH(&pBatch->wq, h); - pBatch->shareConnLimit += pReq->msg.contLen; - pBatch->wLen += 1; - } else { - SCliBatch* tBatch = NULL; - code = createBatch(&tBatch, *ppBatchList, pReq); - if (code != 0) { - destroyReq(pReq); - } - } - } - } - return; -} -static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port) { - SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); - if (pBatchList == NULL) { - tError("failed to create batch list since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - return terrno; - } - QUEUE_INIT(&pBatchList->wq); - pBatchList->port = port; - pBatchList->connMax = 1; - pBatchList->connCnt = 0; - pBatchList->batchLenLimit = 0; - pBatchList->len += 1; - - pBatchList->ip = taosStrdup(ip); - pBatchList->dst = taosStrdup(key); - if (pBatchList->ip == NULL || pBatchList->dst == NULL) { - taosMemoryFree(pBatchList->ip); - taosMemoryFree(pBatchList->dst); - taosMemoryFree(pBatchList); - tError("failed to create batch list since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - return terrno; - } - *ppBatchList = pBatchList; - return 0; -} -static void destroyBatchList(SCliBatchList* pList) { - if (pList == NULL) { - return; - } - while (!QUEUE_IS_EMPTY(&pList->wq)) { - queue* h = QUEUE_HEAD(&pList->wq); - QUEUE_REMOVE(h); - - SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq); - cliDestroyBatch(pBatch); - } - taosMemoryFree(pList->ip); - taosMemoryFree(pList->dst); - taosMemoryFree(pList); -} -static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* pReq) { - SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); - if (pBatch == NULL) { - tError("failed to create batch since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - return terrno; - } - - QUEUE_INIT(&pBatch->wq); - QUEUE_INIT(&pBatch->listq); - - QUEUE_PUSH(&pBatch->wq, &pReq->q); - pBatch->wLen += 1; - pBatch->shareConnLimit = pReq->msg.contLen; - pBatch->pList = pList; - - QUEUE_PUSH(&pList->wq, &pBatch->listq); - pList->len += 1; - - *ppBatch = pBatch; - return 0; -} static void cliDoBatchReq(queue* wq, SCliThrd* pThrd) { return cliDoReq(wq, pThrd); } static void cliAsyncCb(uv_async_t* handle) { @@ -2494,10 +2352,6 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { TAOS_CHECK_GOTO(code, NULL, _end); } - if ((code = transDQCreate(pThrd->loop, &pThrd->waitConnQueue)) != 0) { - TAOS_CHECK_GOTO(code, NULL, _end); - } - pThrd->destroyAhandleFp = pInst->destroyFp; pThrd->fqdn2ipCache = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); @@ -2505,11 +2359,6 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { TAOS_CHECK_GOTO(terrno, NULL, _end); } - pThrd->batchCache = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - if (pThrd->batchCache == NULL) { - TAOS_CHECK_GOTO(terrno, NULL, _end); - } - pThrd->connHeapCache = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); if (pThrd->connHeapCache == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); @@ -2553,10 +2402,7 @@ _end: transDQDestroy(pThrd->delayQueue, NULL); transDQDestroy(pThrd->timeoutQueue, NULL); - transDQDestroy(pThrd->waitConnQueue, NULL); taosHashCleanup(pThrd->fqdn2ipCache); - taosHashCleanup(pThrd->failFastCache); - taosHashCleanup(pThrd->batchCache); taosHashCleanup(pThrd->pIdConnTable); taosArrayDestroy(pThrd->pQIdBuf); @@ -2580,7 +2426,6 @@ static void destroyThrdObj(SCliThrd* pThrd) { transDQDestroy(pThrd->delayQueue, destroyReqAndAhanlde); transDQDestroy(pThrd->timeoutQueue, NULL); - transDQDestroy(pThrd->waitConnQueue, NULL); tDebug("thread destroy %" PRId64, pThrd->pid); for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) { @@ -2592,24 +2437,6 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosMemoryFree(pThrd->loop); taosHashCleanup(pThrd->fqdn2ipCache); - void** pIter = taosHashIterate(pThrd->batchCache, NULL); - while (pIter != NULL) { - SCliBatchList* pBatchList = (SCliBatchList*)(*pIter); - while (!QUEUE_IS_EMPTY(&pBatchList->wq)) { - queue* h = QUEUE_HEAD(&pBatchList->wq); - QUEUE_REMOVE(h); - - SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq); - cliDestroyBatch(pBatch); - } - taosMemoryFree(pBatchList->ip); - taosMemoryFree(pBatchList->dst); - taosMemoryFree(pBatchList); - - pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); - } - taosHashCleanup(pThrd->batchCache); - void* pIter2 = taosHashIterate(pThrd->connHeapCache, NULL); while (pIter2 != NULL) { SHeap* heap = (SHeap*)(pIter2); diff --git a/source/libs/transport/test/transUT2.cpp b/source/libs/transport/test/transUT2.cpp index 6dfb5e503a..fabe9e9c4f 100644 --- a/source/libs/transport/test/transUT2.cpp +++ b/source/libs/transport/test/transUT2.cpp @@ -615,6 +615,21 @@ TEST_F(TransEnv, http) { #endif } +#if 1 + STelemAddrMgmt mgt; + taosTelemetryMgtInit(&mgt, "telemetry.taosdata.com"); + int32_t code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, "test", strlen("test"),HTTP_FLAT); + printf("old addr:%s new addr:%s\n",mgt.defaultAddr, mgt.cachedAddr); + + taosMsleep(2000); + code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, pCont, len,HTTP_FLAT); + for (int32_t i = 0; i < 1; i++) { + code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, pCont, len,HTTP_FLAT); + printf("old addr:%s new addr:%s\n",mgt.defaultAddr, mgt.cachedAddr); + taosMsleep(2000); + } + taosTelemetryDestroy(&mgt); +#endif { STelemAddrMgmt mgt; taosTelemetryMgtInit(&mgt, "error"); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 43a2ff6a23..4d47de98b4 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -208,28 +208,22 @@ static int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { SWalFileInfo tmpInfo; tmpInfo.firstVer = ver; TAOS_UNUSED(taosThreadRwlockRdlock(&pWal->mutex)); - SWalFileInfo *gloablPRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); - if (gloablPRet == NULL) { + SWalFileInfo *globalRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); + if (globalRet == NULL) { wError("failed to find WAL log file with ver:%" PRId64, ver); TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER); } - SWalFileInfo *pRet = taosMemoryMalloc(sizeof(SWalFileInfo)); - if (pRet == NULL) { - wError("failed to allocate memory for localRet"); - TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); - TAOS_RETURN(terrno); - } - TAOS_MEMCPY(pRet, gloablPRet, sizeof(SWalFileInfo)); + SWalFileInfo ret; + TAOS_MEMCPY(&ret, globalRet, sizeof(SWalFileInfo)); TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); - if (pReader->curFileFirstVer != pRet->firstVer) { + if (pReader->curFileFirstVer != ret.firstVer) { // error code was set inner - TAOS_CHECK_RETURN_WITH_FREE(walReadChangeFile(pReader, pRet->firstVer), pRet); + TAOS_CHECK_RETURN(walReadChangeFile(pReader, ret.firstVer)); } // error code was set inner - TAOS_CHECK_RETURN_WITH_FREE(walReadSeekFilePos(pReader, pRet->firstVer, ver), pRet); - taosMemoryFree(pRet); + TAOS_CHECK_RETURN(walReadSeekFilePos(pReader, ret.firstVer, ver)); wDebug("vgId:%d, wal version reset from %" PRId64 " to %" PRId64, pReader->pWal->cfg.vgId, pReader->curVersion, ver); pReader->curVersion = ver; @@ -437,15 +431,15 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { seeked = true; continue; } else { - wError("vgId:%d, failed to read WAL record head, index:%" PRId64 ", from log file since %s", - pReader->pWal->cfg.vgId, ver, terrstr()); - TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex)); - if (contLen < 0) { - TAOS_RETURN(terrno); + code = terrno; } else { - TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); + code = TSDB_CODE_WAL_FILE_CORRUPTED; } + wError("vgId:%d, failed to read WAL record head, index:%" PRId64 ", from log file since %s", + pReader->pWal->cfg.vgId, ver, tstrerror(code)); + TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex)); + TAOS_RETURN(code); } } @@ -478,15 +472,15 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { } if ((contLen = taosReadFile(pReader->pLogFile, pReader->pHead->head.body, cryptedBodyLen)) != cryptedBodyLen) { - wError("vgId:%d, failed to read WAL record body, index:%" PRId64 ", from log file since %s", - pReader->pWal->cfg.vgId, ver, terrstr()); - TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex)); - if (contLen < 0) { - TAOS_RETURN(terrno); + code = terrno; } else { - TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); + code = TSDB_CODE_WAL_FILE_CORRUPTED; } + wError("vgId:%d, failed to read WAL record body, index:%" PRId64 ", from log file since %s", + pReader->pWal->cfg.vgId, ver, tstrerror(code)); + TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex)); + TAOS_RETURN(code); } if (pReader->pHead->head.version != ver) { diff --git a/source/util/src/tanalytics.c b/source/util/src/tanalytics.c index e68edd4b76..bf2cb4fd07 100644 --- a/source/util/src/tanalytics.c +++ b/source/util/src/tanalytics.c @@ -20,7 +20,7 @@ #ifdef USE_ANALYTICS #include -#define ANAL_ALGO_SPLIT "," +#define ANALYTICS_ALOG_SPLIT_CHAR "," typedef struct { int64_t ver; @@ -136,7 +136,7 @@ bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, return false; } - pEnd = strstr(pStart, ANAL_ALGO_SPLIT); + pEnd = strstr(pStart, ANALYTICS_ALOG_SPLIT_CHAR); if (optMaxLen > 0) { if (pEnd > pStart) { int32_t len = (int32_t)(pEnd - pStart); @@ -168,7 +168,7 @@ bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValu int32_t bufLen = tsnprintf(buf, sizeof(buf), "%s=", optName); char *pos1 = strstr(option, buf); - char *pos2 = strstr(option, ANAL_ALGO_SPLIT); + char *pos2 = strstr(option, ANALYTICS_ALOG_SPLIT_CHAR); if (pos1 != NULL) { *optValue = taosStr2Int64(pos1 + bufLen, NULL, 10); return true; diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 88eccfaffd..4df5b322a2 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -673,11 +673,11 @@ static int32_t taosInitNormalLog(const char *logName, int32_t maxFileNum) { } _exit: - taosUnLockLogFile(tsLogObj.logHandle->pFile); if (code != 0) { + taosUnLockLogFile(tsLogObj.logHandle->pFile); TAOS_UNUSED(printf("failed to init normal log file:%s at line %d, reason:%s\n", name, lino, tstrerror(code))); } - return 0; + return code; } static void taosUpdateLogNums(ELogLevel level) { diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index bb7b8411f9..cdfe3ce8a0 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -100,7 +100,7 @@ class TDSql: if drop: s = f'drop database if exists {dbname}' self.cursor.execute(s) - s = f'create database {dbname}' + s = f'create database {dbname} stt_trigger 1' for k, v in kwargs.items(): s += f" {k} {v}" if "duration" not in kwargs: diff --git a/tests/script/api/stmt2-performance.c b/tests/script/api/stmt2-performance.c index a539affaf1..7ab20d873f 100644 --- a/tests/script/api/stmt2-performance.c +++ b/tests/script/api/stmt2-performance.c @@ -5,8 +5,8 @@ #include #include "taos.h" -int CTB_NUMS = 2; -int ROW_NUMS = 2; +int CTB_NUMS = 1; +int ROW_NUMS = 1; int CYC_NUMS = 2; void do_query(TAOS* taos, const char* sql) { @@ -217,7 +217,7 @@ int main() { exit(1); } - do_stmt(taos, "insert into db.stb(tbname,ts,b,t1,t2) values(?,?,?,?,?)"); + do_stmt(taos, "insert into `db`.`stb` (tbname,ts,b,t1,t2) values(?,?,?,?,?)"); // do_stmt(taos, "insert into db.? using db.stb tags(?,?)values(?,?)"); // do_taosc(taos); taos_close(taos); diff --git a/tests/script/sh/stop_dnodes.sh b/tests/script/sh/stop_dnodes.sh index 8923804547..da2083b013 100755 --- a/tests/script/sh/stop_dnodes.sh +++ b/tests/script/sh/stop_dnodes.sh @@ -13,7 +13,7 @@ if [ -n "$PID" ]; then systemctl stop taosd fi -PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'` +PID=`ps -ef|grep -w taosd | grep -v grep | grep -v taosanode | awk '{print $2}'` while [ -n "$PID" ]; do echo kill -9 $PID #pkill -9 taosd @@ -38,10 +38,10 @@ while [ -n "$PID" ]; do else lsof -nti:6030 | xargs kill -9 fi - PID=`ps -ef|grep -w taos | grep -v grep | awk '{print $2}'` + PID=`ps -ef|grep -w taos | grep -v grep |grep -v taosanode| awk '{print $2}'` done -PID=`ps -ef|grep -w tmq_sim | grep -v grep | awk '{print $2}'` +PID=`ps -ef|grep -w tmq_sim | grep -v grep | grep -v taosanode|awk '{print $2}'` while [ -n "$PID" ]; do echo kill -9 $PID #pkill -9 tmq_sim @@ -52,5 +52,5 @@ while [ -n "$PID" ]; do else lsof -nti:6030 | xargs kill -9 fi - PID=`ps -ef|grep -w tmq_sim | grep -v grep | awk '{print $2}'` + PID=`ps -ef|grep -w tmq_sim | grep -v grep | grep -v taosanode| awk '{print $2}'` done \ No newline at end of file diff --git a/tests/script/tsim/analytics/basic0.sim b/tests/script/tsim/analytics/basic0.sim index 77c9184e8f..3ac49b1fc3 100644 --- a/tests/script/tsim/analytics/basic0.sim +++ b/tests/script/tsim/analytics/basic0.sim @@ -3,7 +3,17 @@ system sh/deploy.sh -n dnode1 -i 1 system sh/exec.sh -n dnode1 -s start sql connect -print =============== create anode +print =============== failed to create anode on '127.0.0.1:1101' +sql_error create anode '127.0.0.1:1101' + +sql show anodes +if $rows != 0 then + return -1 +endi + +sql_error drop anode 1 + +print ================ create anode sql create anode '192.168.1.116:6050' sql show anodes @@ -30,7 +40,7 @@ print $data00 $data01 $data02 sql use d0 print =============== create super table, include column type for count/sum/min/max/first -sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned) +sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double, c4 tinyint, c5 bigint, c6 varchar(12)) tags (t1 int unsigned) sql show stables if $rows != 1 then @@ -42,10 +52,11 @@ sql create table ct1 using stb tags(1000) print ==================== insert data # input_list = [5, 14, 15, 15, 14, 19, 17, 16, 20, 22, 8, 21, 28, 11, 9, 29, 40] -sql insert into ct1(ts, c1) values(now-1a, 5)(now+1a, 14)(now+2a, 15)(now+3a, 15)(now+4a, 14) -sql insert into ct1(ts, c1) values(now+5a, 19)(now+6a, 17)(now+7a, 16)(now+8a, 20)(now+9a, 22) -sql insert into ct1(ts, c1) values(now+10a, 8)(now+11a, 21)(now+12a, 28)(now+13a, 11)(now+14a, 9) -sql insert into ct1(ts, c1) values(now+15a, 29)(now+16a, 40) +sql insert into ct1(ts, c1, c2, c3, c4, c5, c6) values(now-1a, 5, 5, 5, 5, 5, 'a')(now+1a, 14, 14, 14, 14, 14, 'a')(now+2a, 15, 15, 15, 15, 15, 'a') +sql insert into ct1 values(now+3a, 15, 15, 15, 15, 15, 'a')(now+4a, 14, 14, 14, 14, 14, 'a')(now+5a, 19, 19, 19, 19, 19, 'a')(now+6a, 17, 17, 17, 17, 17, 'a') +sql insert into ct1 values(now+7a, 16, 16, 16, 16, 16, 'a')(now+8a, 20, 20, 20, 20, 20, 'a')(now+9a, 22, 22, 22, 22, 22, 'a') +sql insert into ct1 values(now+10a, 8, 8, 8, 8, 8, 'a')(now+11a, 21, 21, 21, 21, 21, 'a')(now+12a, 28, 28, 28, 28, 28, 'a')(now+13a, 11, 11, 11, 11, 11, 'a')(now+14a, 9, 9, 9, 9, 9, 'a') +sql insert into ct1 values(now+15a, 29, 29, 29, 29, 29, 'a')(now+16a, 40, 40, 40, 40, 40, 'a') sql select count(*) from ct1 if $data00 != 17 then @@ -58,6 +69,87 @@ if $data00 != 1 then return -1 endi +print ================= try every loaded anomaly detection algorithm +sql select count(*) from ct1 anomaly_window(c1, 'algo=iqr'); +sql select count(*) from ct1 anomaly_window(c1, 'algo=ksigma'); +sql select count(*) from ct1 anomaly_window(c1, 'algo=lof'); +sql select count(*) from ct1 anomaly_window(c1, 'algo=shesd'); +sql select count(*) from ct1 anomaly_window(c1, 'algo=grubbs'); + +print ================= try every column type of column +sql select count(*) from ct1 anomaly_window(c1, 'algo=ksigma,k=2'); +sql select count(*) from ct1 anomaly_window(c2, 'algo=ksigma,k=2'); +sql select count(*) from ct1 anomaly_window(c3, 'algo=ksigma,k=2'); +sql select count(*) from ct1 anomaly_window(c4, 'algo=ksigma,k=2'); +sql select count(*) from ct1 anomaly_window(c5, 'algo=ksigma,k=2'); + +print =================== invalid column type +sql_error select count(*) from ct1 anomaly_window(c6, 'algo=ksigma,k=2'); +sql_error select forecast(c6, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1 + + +sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1 +sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=119,wncheck=1,period=0') from ct1 +sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters1,conf=0.5,wncheck=1,period=0') from ct1 +sql_error select forecast(c1, 'conf=50 ,algo = arima, rows=0') from ct1 +sql_error select forecast(c1, 'conf=50 ,algo = arima, rows=-10') from ct1 +sql_error select forecast(c1, 'conf=50 ,algo = arima, every=0') from ct1 + +sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters, conf=50 ') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, ' algo=holtwinters , conf=50 ') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, ' algo = holtwinters , conf = 50 ') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, ') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, ,') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, , ,') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, a =') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, = a ,') from ct1 + +print =================== valid column type +sql select forecast(c1, 'conf=50 ,algo = arima') from ct1 +sql select forecast(c1, 'conf=50 ,algo = arima, rows=1') from ct1 +sql select forecast(c2, 'conf=50 ,algo = arima, rows=1') from ct1 +sql select forecast(c3, 'conf=50 ,algo = arima, rows=1') from ct1 +sql select forecast(c4, 'conf=50 ,algo = arima, rows=1') from ct1 +sql select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1 +sql select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1 + +sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=50,wncheck=1,period=0,start=1700000000000,every=2') from ct1 +if $rows != 10 then + return -1 +endi + +if $data03 != 28 then + return -1 +endi + +if $data00 != @23-11-15 06:13:20.000@ then + print expect 23-11-15 06:13:20.000 , actual $data00 + return -1 +endi + +if $data10 != @23-11-15 06:13:20.002@ then + print expect 23-11-15 06:13:20.002 , actual $data10 + return -1 +endi + +if $data20 != @23-11-15 06:13:20.004@ then + return -1 +endi + +print test the every option and rows option + +sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=50,wncheck=1,period=0,start=1700000000000,every=100,rows=5') from ct1 +if $rows != 5 then + return -1 +endi + +if $data00 != @23-11-15 06:13:20.000@ then + return -1 +endi + +if $data10 != @23-11-15 06:13:20.100@ then + return -1 +endi sql drop anode 1 sql show anodes @@ -66,6 +158,13 @@ if $rows != 0 then return -1 endi +sleep 1000 + +print ===================== query without anodes +sql_error select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1 +sql_error select count(*) from ct1 anomaly_window(c1, 'algo=iqr'); + + _OVER: system sh/exec.sh -n dnode1 -s stop -x SIGINT print =============== check diff --git a/tests/system-test/2-query/smaTest.py b/tests/system-test/2-query/smaTest.py index 04fb893e75..355ac04707 100644 --- a/tests/system-test/2-query/smaTest.py +++ b/tests/system-test/2-query/smaTest.py @@ -39,6 +39,27 @@ class TDTestCase: self.create_tables(); self.ts = 1500000000000 + def test_TD_33336(self): + sql = "flush database db" + tdSql.execute(sql) + time.sleep(5) + sql = f'select last(ts) + 1d, last(ts) - 1d from db.t1' + tdSql.query(sql, queryTimes=1) + lastTs_add1d = tdSql.queryResult[0][0] + lastTs_sub1d = tdSql.queryResult[0][1] + + sql = f'select count(*) from db.t1 where ts < "{lastTs_add1d}" and vc1 = 1' + tdSql.query(sql, queryTimes=1) + all_row_count = tdSql.queryResult[0][0] + tdLog.debug(f"all rows: {all_row_count}") + + sql = f'select count(*) from db.t1 where ts < "{lastTs_sub1d}" and vc1 = 1' + tdSql.query(sql, queryTimes=1) + row_count_sub1d = tdSql.queryResult[0][0] + tdLog.debug(f"row_count_sub1d: {row_count_sub1d}") + + if row_count_sub1d > all_row_count: + tdLog.exit(f' err rows returned for sql: {sql} row_count_sub1d: {row_count_sub1d} > all_row_count: {all_row_count}') # run case def run(self): @@ -53,6 +74,8 @@ class TDTestCase: # self.test_case2() tdLog.debug(" LIMIT test_case2 ............ [OK]") + self.test_TD_33336() + # stop def stop(self): tdSql.close() @@ -65,11 +88,11 @@ class TDTestCase: # create table def create_tables(self, dbname="db"): # super table - tdSql.execute(f"create table {dbname}.st(ts timestamp, i1 int,i2 int) tags(area int)") + tdSql.execute(f"create table {dbname}.st(ts timestamp, i1 int,i2 int, vc1 varchar(255)) tags(area int)") # child table tdSql.execute(f"create table {dbname}.t1 using {dbname}.st tags(1)") - tdSql.execute(f"create table {dbname}.st1(ts timestamp, i1 int ,i2 int) tags(area int) sma(i2) ") + tdSql.execute(f"create table {dbname}.st1(ts timestamp, i1 int ,i2 int, vc1 varchar(255)) tags(area int) sma(i2) ") tdSql.execute(f"create table {dbname}.t4 using {dbname}.st1 tags(1)") return @@ -98,7 +121,11 @@ class TDTestCase: sql = pre_insert tdLog.debug("insert table %s rows=%d ..." % (tbname, count)) for i in range(count): - sql += " (%d,%d,%d)" % (ts_start + i*1000, i, i+1) + if random.randint(0, 4) > 2: + tail = '' + else: + tail = 'asd' + sql += " (%d,%d,%d,%s)" % (ts_start + i*1000, i, i+1, '"' + str(random.randint(0,5)) + f'{tail}"') if i > 0 and i % 20000 == 0: tdLog.info("%d rows inserted" % i) tdSql.execute(sql) @@ -125,4 +152,4 @@ class TDTestCase: # add case with filename # tdCases.addWindows(__file__, TDTestCase()) -tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file +tdCases.addLinux(__file__, TDTestCase())