From 413a57f4cbf1863fdd0fe5d651624cf80e7a3c47 Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 13 Aug 2024 18:55:48 +0800 Subject: [PATCH 01/20] fix(query): retry when column dropped in latest schema --- source/dnode/vnode/src/tsdb/tsdbDataFileRW.c | 4 +++- source/dnode/vnode/src/tsdb/tsdbSttFileRW.c | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index 2665cc1aaf..e6720466ce 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -463,7 +463,9 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe if (cid < blockCol.cid) { const STColumn *tcol = tTSchemaSearchColumn(pTSchema, cid); - ASSERT(tcol); + if (tcol == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER, &lino, _exit); + } SBlockCol none = { .cid = cid, .type = tcol->type, diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c index 6e24311017..3d30ea0d93 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c @@ -309,7 +309,9 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk * if (cid < blockCol.cid) { const STColumn *tcol = tTSchemaSearchColumn(pTSchema, cid); - ASSERT(tcol); + if (tcol == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER, &lino, _exit); + } SBlockCol none = { .cid = cid, .type = tcol->type, From fe07324d72bf289d27764e9d664e988817f9f2bc Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 14 Aug 2024 12:42:46 +0800 Subject: [PATCH 02/20] fix(tsdb): return error code when loading data --- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index e55ede560e..e9761d8c87 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -1004,6 +1004,10 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF pSttDataInfo->numOfRows += numOfRows; } } else { + if(terrno != TSDB_CODE_SUCCESS) { + code = terrno; + goto _end; + } if (!pMTree->ignoreEarlierTs) { pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs; } From cec7643ea825920081f491e66be52f4ae3145e5d Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 14 Aug 2024 13:57:33 +0800 Subject: [PATCH 03/20] enh: code optimization --- source/dnode/vnode/src/tsdb/tsdbDataFileRW.c | 4 +--- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 5 +---- source/dnode/vnode/src/tsdb/tsdbSttFileRW.c | 4 +--- 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index e6720466ce..a9c7ea3961 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -463,9 +463,7 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe if (cid < blockCol.cid) { const STColumn *tcol = tTSchemaSearchColumn(pTSchema, cid); - if (tcol == NULL) { - TAOS_CHECK_GOTO(TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER, &lino, _exit); - } + TSDB_CHECK_NULL(tcol, code,lino,_exit,TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER); SBlockCol none = { .cid = cid, .type = tcol->type, diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index e9761d8c87..9d99c51587 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -1004,10 +1004,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF pSttDataInfo->numOfRows += numOfRows; } } else { - if(terrno != TSDB_CODE_SUCCESS) { - code = terrno; - goto _end; - } + TAOS_CHECK_GOTO(terrno, NULL, _end); if (!pMTree->ignoreEarlierTs) { pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs; } diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c index 3d30ea0d93..d3990c645c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c @@ -309,9 +309,7 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk * if (cid < blockCol.cid) { const STColumn *tcol = tTSchemaSearchColumn(pTSchema, cid); - if (tcol == NULL) { - TAOS_CHECK_GOTO(TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER, &lino, _exit); - } + TSDB_CHECK_NULL(tcol, code,lino,_exit,TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER); SBlockCol none = { .cid = cid, .type = tcol->type, From f6ad63ac14516b7a904f9746744bc8980457f830 Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 14 Aug 2024 13:59:33 +0800 Subject: [PATCH 04/20] enh: code optimization --- source/dnode/vnode/src/tsdb/tsdbDataFileRW.c | 2 +- source/dnode/vnode/src/tsdb/tsdbSttFileRW.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index a9c7ea3961..7e7ea59a5b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -463,7 +463,7 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe if (cid < blockCol.cid) { const STColumn *tcol = tTSchemaSearchColumn(pTSchema, cid); - TSDB_CHECK_NULL(tcol, code,lino,_exit,TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER); + TSDB_CHECK_NULL(tcol, code, lino, _exit, TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER); SBlockCol none = { .cid = cid, .type = tcol->type, diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c index d3990c645c..e3d7f9d45f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c @@ -309,7 +309,7 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk * if (cid < blockCol.cid) { const STColumn *tcol = tTSchemaSearchColumn(pTSchema, cid); - TSDB_CHECK_NULL(tcol, code,lino,_exit,TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER); + TSDB_CHECK_NULL(tcol, code, lino, _exit, TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER); SBlockCol none = { .cid = cid, .type = tcol->type, From 6c917646b2aa30182772e0ddd6bae4376ffb7995 Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 14 Aug 2024 16:15:37 +0800 Subject: [PATCH 05/20] fix(tsdb/cache): return error code --- source/dnode/vnode/src/tsdb/tsdbCache.c | 44 +++++++++++-------------- 1 file changed, 19 insertions(+), 25 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 1216f0da81..46a498409b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1368,7 +1368,7 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols, SCacheRowsReader *pr, int8_t ltype) { - int32_t code = 0; + int32_t code = 0, lino = 0; rocksdb_writebatch_t *wb = NULL; SArray *pTmpColArray = NULL; @@ -1413,9 +1413,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr if (NULL == lastTmpIndexArray) { lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t)); if (!lastTmpIndexArray) { - taosArrayDestroy(lastrowTmpIndexArray); - - TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); } } (void)taosArrayPush(lastTmpIndexArray, &(i)); @@ -1426,9 +1424,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr if (NULL == lastrowTmpIndexArray) { lastrowTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t)); if (!lastrowTmpIndexArray) { - taosArrayDestroy(lastTmpIndexArray); - - TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); } } (void)taosArrayPush(lastrowTmpIndexArray, &(i)); @@ -1440,13 +1436,11 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr pTmpColArray = taosArrayInit(lastIndex + lastrowIndex, sizeof(SLastCol)); if (!pTmpColArray) { - taosArrayDestroy(lastrowTmpIndexArray); - taosArrayDestroy(lastTmpIndexArray); - TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); } if (lastTmpIndexArray != NULL) { - (void)mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds); + TAOS_CHECK_EXIT(mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds)); for (int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) { (void)taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastTmpIndexArray, i), taosArrayGet(lastTmpColArray, i)); @@ -1454,7 +1448,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr } if (lastrowTmpIndexArray != NULL) { - (void)mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds); + TAOS_CHECK_EXIT(mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds)); for (int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) { (void)taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastrowTmpIndexArray, i), taosArrayGet(lastrowTmpColArray, i)); @@ -1475,7 +1469,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type)}; if (!pLastCol) { pLastCol = &noneCol; - TAOS_CHECK_RETURN(reallocVarData(&pLastCol->colVal)); + TAOS_CHECK_EXIT(reallocVarData(&pLastCol->colVal)); } taosArraySet(pLastArray, idxKey->idx, pLastCol); @@ -1490,12 +1484,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); if (!pTmpLastCol) { - taosMemoryFree(slotIds); - taosMemoryFree(lastColIds); - taosMemoryFree(lastSlotIds); - taosMemoryFree(lastrowColIds); - taosMemoryFree(lastrowSlotIds); - TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); } *pTmpLastCol = *pLastCol; pLastCol = pTmpLastCol; @@ -1504,12 +1493,12 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { SValue *pValue = &pLastCol->rowKey.pks[i]; if (IS_VAR_DATA_TYPE(pValue->type)) { - TAOS_CHECK_RETURN(reallocVarDataVal(pValue)); + TAOS_CHECK_EXIT(reallocVarDataVal(pValue)); charge += pValue->nData; } } if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { - TAOS_CHECK_RETURN(reallocVarData(&pLastCol->colVal)); + TAOS_CHECK_EXIT(reallocVarData(&pLastCol->colVal)); charge += pLastCol->colVal.value.nData; } @@ -1538,6 +1527,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr rocksMayWrite(pTsdb, false, true, false); } +_exit: taosArrayDestroy(lastrowTmpIndexArray); taosArrayDestroy(lastrowTmpColArray); taosArrayDestroy(lastTmpIndexArray); @@ -3015,11 +3005,13 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC // inverse iterator CacheNextRowIter iter = {0}; - (void)nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr); + code = + nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr); + TAOS_CHECK_GOTO(code, &lino, _err); do { TSDBROW *pRow = NULL; - (void)nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray)); + code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray)); if (!pRow) { break; @@ -3199,11 +3191,13 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, // inverse iterator CacheNextRowIter iter = {0}; - (void)nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr); + code = + nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr); + TAOS_CHECK_GOTO(code, &lino, _err); do { TSDBROW *pRow = NULL; - (void)nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray)); + code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray)); if (!pRow) { break; From ab45ab1459801a068b8d339d777c3656ccd343d2 Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 14 Aug 2024 16:40:12 +0800 Subject: [PATCH 06/20] fix(tsdb/read): memory leak --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 6783eb2cbd..31101165fd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2440,20 +2440,23 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info); if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(info.pKeyRangeList); pReader->code = code; return false; } code = initMemDataIterator(pScanInfo, pReader); if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(info.pKeyRangeList); pReader->code = code; return false; } code = initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost); if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(info.pKeyRangeList); pReader->code = code; - return code; + return false; } if (conf.rspRows) { From 92704f8b7add1cdc5ce20c9ef0e92e2ece11d96b Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 14 Aug 2024 22:06:18 +0800 Subject: [PATCH 07/20] fix: check error code --- source/dnode/vnode/src/tsdb/tsdbCache.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 46a498409b..0b020830e1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -2891,11 +2891,13 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI if (!pIter->pSkyline) { pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); + TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _err, TSDB_CODE_OUT_OF_MEMORY); uint64_t uid = pIter->idx.uid; STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid); if (pInfo->pTombData == NULL) { pInfo->pTombData = taosArrayInit(4, sizeof(SDelData)); + TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _err, TSDB_CODE_OUT_OF_MEMORY); } (void)taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData); @@ -2903,6 +2905,7 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI size_t delSize = TARRAY_SIZE(pInfo->pTombData); if (delSize > 0) { code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline); + TAOS_CHECK_GOTO(code, &lino, _err); } pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1; } From 1059650e573a9a8795574449f04449868edd9dd5 Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 14 Aug 2024 22:20:03 +0800 Subject: [PATCH 08/20] enh: code optimization --- source/dnode/vnode/src/tsdb/tsdbCache.c | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index cc5759a151..aa92597211 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1440,11 +1440,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr int lastrowIndex = 0; if (!slotIds || !lastColIds || !lastSlotIds || !lastrowColIds || !lastrowSlotIds) { - taosMemoryFree(slotIds); - taosMemoryFree(lastColIds); - taosMemoryFree(lastSlotIds); - taosMemoryFree(lastrowColIds); - TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); } for (int i = 0; i < num_keys; ++i) { From cc8bff453a1e0fa14a0b8aad6c26a0de67305262 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 15 Aug 2024 10:45:16 +0800 Subject: [PATCH 09/20] fix(insert): return error when parsing csv file --- source/libs/parser/src/parInsertSql.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 70bd43559c..cb94cd42f7 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -2238,6 +2238,8 @@ static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpSt if (pStmt->insertType != TSDB_QUERY_TYPE_FILE_INSERT) { return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is exclusive", NULL); } + } else { + return buildInvalidOperationMsg(&pCxt->msg, tstrerror(code)); } // just record pTableCxt whose data come from file From d0059d2d9dfa0bd681bcf3a7dec22764519ddc29 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 15 Aug 2024 13:25:04 +0800 Subject: [PATCH 10/20] enh: support config randErrorScope dynamically --- include/os/os.h | 1 + include/util/tdef.h | 7 +++++++ source/common/src/tglobal.c | 23 +++++++++++++++-------- source/os/src/osFile.c | 17 +++++++++-------- source/os/src/osMemory.c | 12 +++++++----- 5 files changed, 39 insertions(+), 21 deletions(-) diff --git a/include/os/os.h b/include/os/os.h index 08b68f36d4..9e5e9221e4 100644 --- a/include/os/os.h +++ b/include/os/os.h @@ -126,6 +126,7 @@ extern "C" { extern int32_t tsRandErrChance; extern int64_t tsRandErrDivisor; +extern int64_t tsRandErrScope; extern threadlocal bool tsEnableRandErr; #ifdef __cplusplus diff --git a/include/util/tdef.h b/include/util/tdef.h index 890f1d8f95..35c4adab50 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -568,6 +568,13 @@ enum { SND_WORKER_TYPE__UNIQUE, }; +enum { + RAND_ERR_MEMORY = 1, + RAND_ERR_FILE = 2, + // RAND_ERR_SCOPE_XXX... = 4, + // ... +}; + #define DEFAULT_HANDLE 0 #define MNODE_HANDLE 1 #define QNODE_HANDLE -1 diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 46ad263d3d..a013c98b73 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -592,6 +592,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { CFG_SCOPE_CLIENT, CFG_DYN_NONE)); TAOS_CHECK_RETURN( cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "randErrorChance", tsRandErrChance, 0, 10000, CFG_SCOPE_BOTH, CFG_DYN_NONE)); + TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "randErrorDivisor", tsRandErrDivisor, 1, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH)); + TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "randErrorScope", tsRandErrScope, 0, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH)); tsNumOfRpcThreads = tsNumOfCores / 2; tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS); @@ -774,8 +777,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "compactPullupInterval", tsCompactPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER)); - TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "randErrorChance", tsRandErrChance, 0, 10000, CFG_SCOPE_BOTH, CFG_DYN_NONE)); - TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "randErrorDivisor", tsRandErrDivisor, 1, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushIntervalSec, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER)); @@ -1210,6 +1211,15 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "metaCacheMaxSize"); tsMetaCacheMaxSize = pItem->i32; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "randErrorChance"); + tsRandErrChance = pItem->i32; + + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "randErrorDivisor"); + tsRandErrDivisor = pItem->i64; + + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "randErrorScope"); + tsRandErrScope = pItem->i64; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "countAlwaysReturnValue"); tsCountAlwaysReturnValue = pItem->i32; @@ -1466,12 +1476,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "mqRebalanceInterval"); tsMqRebalanceInterval = pItem->i32; - TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "randErrorChance"); - tsRandErrChance = pItem->i32; - - TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "randErrorDivisor"); - tsRandErrDivisor = pItem->i64; - TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "ttlUnit"); tsTtlUnit = pItem->i32; @@ -1927,6 +1931,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"mndSdbWriteDelta", &tsMndSdbWriteDelta}, {"minDiskFreeSize", &tsMinDiskFreeSize}, {"randErrorDivisor", &tsRandErrDivisor}, + {"randErrorScope", &tsRandErrScope}, {"cacheLazyLoadThreshold", &tsCacheLazyLoadThreshold}, {"checkpointInterval", &tsStreamCheckpointInterval}, @@ -2205,6 +2210,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { {"queryPlannerTrace", &tsQueryPlannerTrace}, {"queryNodeChunkSize", &tsQueryNodeChunkSize}, {"queryUseNodeAllocator", &tsQueryUseNodeAllocator}, + {"randErrorDivisor", &tsRandErrDivisor}, + {"randErrorScope", &tsRandErrScope}, {"smlDot2Underline", &tsSmlDot2Underline}, {"shellActivityTimer", &tsShellActivityTimer}, {"useAdapter", &tsUseAdapter}, diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index b8160a14b7..a5df4f63f3 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -15,6 +15,7 @@ #define ALLOW_FORBID_FUNC #include "os.h" #include "osSemaphore.h" +#include "tdef.h" #include "zlib.h" #ifdef WINDOWS @@ -65,14 +66,14 @@ typedef struct TdFile { #define FILE_WITH_LOCK 1 #ifdef BUILD_WITH_RAND_ERR -#define STUB_RAND_IO_ERR(ret) \ - if (tsEnableRandErr) { \ - uint32_t r = taosRand() % tsRandErrDivisor; \ - if ((r + 1) <= tsRandErrChance) { \ - errno = EIO; \ - terrno = TAOS_SYSTEM_ERROR(errno); \ - return (ret); \ - } \ +#define STUB_RAND_IO_ERR(ret) \ + if (tsEnableRandErr && (tsRandErrScope & RAND_ERR_FILE)) { \ + uint32_t r = taosRand() % tsRandErrDivisor; \ + if ((r + 1) <= tsRandErrChance) { \ + errno = EIO; \ + terrno = TAOS_SYSTEM_ERROR(errno); \ + return (ret); \ + } \ } #else #define STUB_RAND_IO_ERR(ret) diff --git a/source/os/src/osMemory.c b/source/os/src/osMemory.c index 297b17b957..7a5a547354 100644 --- a/source/os/src/osMemory.c +++ b/source/os/src/osMemory.c @@ -20,9 +20,11 @@ #include #endif #include "os.h" +#include "tdef.h" int32_t tsRandErrChance = 1; int64_t tsRandErrDivisor = 10001; +int64_t tsRandErrScope = (RAND_ERR_MEMORY | RAND_ERR_FILE); threadlocal bool tsEnableRandErr = 0; #if defined(USE_TD_MEMORY) || defined(USE_ADDR2LINE) @@ -272,7 +274,7 @@ void *taosMemoryMalloc(int64_t size) { #else #ifdef BUILD_WITH_RAND_ERR - if (tsEnableRandErr) { + if (tsEnableRandErr && (tsRandErrScope & RAND_ERR_MEMORY)) { uint32_t r = taosRand() % tsRandErrDivisor; if ((r + 1) <= tsRandErrChance) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -302,7 +304,7 @@ void *taosMemoryCalloc(int64_t num, int64_t size) { return (char *)tmp + sizeof(TdMemoryInfo); #else #ifdef BUILD_WITH_RAND_ERR - if (tsEnableRandErr) { + if (tsEnableRandErr && (tsRandErrScope & RAND_ERR_MEMORY)) { uint32_t r = taosRand() % tsRandErrDivisor; if ((r + 1) <= tsRandErrChance) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -342,7 +344,7 @@ void *taosMemoryRealloc(void *ptr, int64_t size) { return (char *)tmp + sizeof(TdMemoryInfo); #else #ifdef BUILD_WITH_RAND_ERR - if (tsEnableRandErr) { + if (tsEnableRandErr && (tsRandErrScope & RAND_ERR_MEMORY)) { uint32_t r = taosRand() % tsRandErrDivisor; if ((r + 1) <= tsRandErrChance) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -377,7 +379,7 @@ char *taosStrdup(const char *ptr) { return (char *)tmp + sizeof(TdMemoryInfo); #else #ifdef BUILD_WITH_RAND_ERR - if (tsEnableRandErr) { + if (tsEnableRandErr && (tsRandErrScope & RAND_ERR_MEMORY)) { uint32_t r = taosRand() % tsRandErrDivisor; if ((r + 1) <= tsRandErrChance) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -443,7 +445,7 @@ void *taosMemoryMallocAlign(uint32_t alignment, int64_t size) { #else #if defined(LINUX) #ifdef BUILD_WITH_RAND_ERR - if (tsEnableRandErr) { + if (tsEnableRandErr && (tsRandErrScope & RAND_ERR_MEMORY)) { uint32_t r = taosRand() % tsRandErrDivisor; if ((r + 1) <= tsRandErrChance) { terrno = TSDB_CODE_OUT_OF_MEMORY; From ce7d70c3b3de7f832cbd875a333e2f73f31a63d6 Mon Sep 17 00:00:00 2001 From: jiajingbin Date: Thu, 15 Aug 2024 14:02:31 +0800 Subject: [PATCH 11/20] test: (fix case) add timeout-return --- tests/pytest/util/common.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/pytest/util/common.py b/tests/pytest/util/common.py index dee3f505c9..1141ca403d 100644 --- a/tests/pytest/util/common.py +++ b/tests/pytest/util/common.py @@ -1909,6 +1909,8 @@ class TDCom: if latency < self.stream_timeout: latency += 1 time.sleep(1) + else: + return False return tbname def get_group_id_from_stb(self, stbname): From f62e849222ab9f4f28177ebdcd159b32344bb088 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 15 Aug 2024 16:07:57 +0800 Subject: [PATCH 12/20] fix: support customized version --- source/dnode/mgmt/exe/dmMain.c | 6 +++--- source/os/src/osSysinfo.c | 6 +++++- tools/shell/src/shellArguments.c | 10 +++++++--- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/source/dnode/mgmt/exe/dmMain.c b/source/dnode/mgmt/exe/dmMain.c index b3e5015706..e5c37e3d55 100644 --- a/source/dnode/mgmt/exe/dmMain.c +++ b/source/dnode/mgmt/exe/dmMain.c @@ -125,7 +125,7 @@ void dmLogCrash(int signum, void *sigInfo, void *context) { _return: - taosLogCrashInfo("taosd", pMsg, msgLen, signum, sigInfo); + taosLogCrashInfo(CUS_PROMPT "d", pMsg, msgLen, signum, sigInfo); #ifdef _TD_DARWIN_64 exit(signum); @@ -258,7 +258,7 @@ static void dmPrintArgs(int32_t argc, char const *argv[]) { static void dmGenerateGrant() { mndGenerateMachineCode(); } static void dmPrintVersion() { - printf("%s\ntaosd version: %s compatible_version: %s\n", TD_PRODUCT_NAME, version, compatible_version); + printf("%s\n%sd version: %s compatible_version: %s\n", TD_PRODUCT_NAME, CUS_PROMPT, version, compatible_version); printf("git: %s\n", gitinfo); #ifdef TD_ENTERPRISE printf("gitOfInternal: %s\n", gitinfoOfInternal); @@ -268,7 +268,7 @@ static void dmPrintVersion() { static void dmPrintHelp() { char indent[] = " "; - printf("Usage: taosd [OPTION...] \n\n"); + printf("Usage: %sd [OPTION...] \n\n", CUS_PROMPT); printf("%s%s%s%s\n", indent, "-a,", indent, DM_APOLLO_URL); printf("%s%s%s%s\n", indent, "-c,", indent, DM_CFG_DIR); printf("%s%s%s%s\n", indent, "-s,", indent, DM_SDB_INFO); diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 67a2cd97c4..92e5967416 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -19,6 +19,10 @@ #if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL) #include "cus_name.h" +#else +#ifndef CUS_PROMPT +#define CUS_PROMPT "taos" +#endif #endif #define PROCESS_ITEM 12 @@ -987,7 +991,7 @@ void taosKillSystem() { exit(0); #else // SIGINT - (void)printf("taosd will shut down soon"); + (void)printf("%sd will shut down soon", CUS_PROMPT); (void)kill(tsProcId, 2); #endif } diff --git a/tools/shell/src/shellArguments.c b/tools/shell/src/shellArguments.c index 1eb61d2394..cf3c7824fa 100644 --- a/tools/shell/src/shellArguments.c +++ b/tools/shell/src/shellArguments.c @@ -22,6 +22,10 @@ #if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL) #include "cus_name.h" +#else +#ifndef CUS_PROMPT +#define CUS_PROMPT "taos" +#endif #endif #define TAOS_CONSOLE_PROMPT_CONTINUE " -> " @@ -435,11 +439,11 @@ int32_t shellParseArgs(int32_t argc, char *argv[]) { shell.info.promptSize = strlen(shell.info.promptHeader); #ifdef TD_ENTERPRISE snprintf(shell.info.programVersion, sizeof(shell.info.programVersion), - "%s\ntaos version: %s compatible_version: %s\ngit: %s\ngitOfInternal: %s\nbuild: %s", TD_PRODUCT_NAME, - version, compatible_version, gitinfo, gitinfoOfInternal, buildinfo); + "%s\n%s version: %s compatible_version: %s\ngit: %s\ngitOfInternal: %s\nbuild: %s", TD_PRODUCT_NAME, + CUS_PROMPT, version, compatible_version, gitinfo, gitinfoOfInternal, buildinfo); #else snprintf(shell.info.programVersion, sizeof(shell.info.programVersion), - "%s\ntaos version: %s compatible_version: %s\ngit: %s\nbuild: %s", TD_PRODUCT_NAME, version, + "%s\n%s version: %s compatible_version: %s\ngit: %s\nbuild: %s", TD_PRODUCT_NAME, CUS_PROMPT, version, compatible_version, gitinfo, buildinfo); #endif From cce4d1104f72a5c19411ef52a0231a7c943267d1 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 15 Aug 2024 16:29:20 +0800 Subject: [PATCH 13/20] fix: support customized version --- tools/shell/src/shellArguments.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/shell/src/shellArguments.c b/tools/shell/src/shellArguments.c index cf3c7824fa..4638f2ad74 100644 --- a/tools/shell/src/shellArguments.c +++ b/tools/shell/src/shellArguments.c @@ -61,7 +61,7 @@ static int32_t shellParseSingleOpt(int32_t key, char *arg); void shellPrintHelp() { char indent[] = " "; - printf("Usage: taos [OPTION...] \r\n\r\n"); + printf("Usage: %s [OPTION...] \r\n\r\n", CUS_PROMPT); printf("%s%s%s%s\r\n", indent, "-a,", indent, SHELL_AUTH); printf("%s%s%s%s\r\n", indent, "-A,", indent, SHELL_GEN_AUTH); printf("%s%s%s%s\r\n", indent, "-B,", indent, SHELL_BI_MODE); From 6adc0543e86555517f36006f886ea13826a56baf Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 15 Aug 2024 19:19:48 +0800 Subject: [PATCH 14/20] enh: support config randErrorChance dynamically --- source/common/src/tglobal.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index a013c98b73..6cd99d4443 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -592,7 +592,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { CFG_SCOPE_CLIENT, CFG_DYN_NONE)); TAOS_CHECK_RETURN( cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT)); - TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "randErrorChance", tsRandErrChance, 0, 10000, CFG_SCOPE_BOTH, CFG_DYN_NONE)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "randErrorChance", tsRandErrChance, 0, 10000, CFG_SCOPE_BOTH, CFG_DYN_BOTH)); TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "randErrorDivisor", tsRandErrDivisor, 1, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH)); TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "randErrorScope", tsRandErrScope, 0, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH)); @@ -1930,6 +1930,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"mndSdbWriteDelta", &tsMndSdbWriteDelta}, {"minDiskFreeSize", &tsMinDiskFreeSize}, + {"randErrorChance", &tsRandErrChance}, {"randErrorDivisor", &tsRandErrDivisor}, {"randErrorScope", &tsRandErrScope}, @@ -2210,6 +2211,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { {"queryPlannerTrace", &tsQueryPlannerTrace}, {"queryNodeChunkSize", &tsQueryNodeChunkSize}, {"queryUseNodeAllocator", &tsQueryUseNodeAllocator}, + {"randErrorChance", &tsRandErrChance}, {"randErrorDivisor", &tsRandErrDivisor}, {"randErrorScope", &tsRandErrScope}, {"smlDot2Underline", &tsSmlDot2Underline}, From 0f922fb3730231725e3a23b90dfd25c9006266da Mon Sep 17 00:00:00 2001 From: dmchen Date: Fri, 16 Aug 2024 01:23:01 +0000 Subject: [PATCH 15/20] fix/TD-31485 --- source/dnode/mnode/impl/src/mndDb.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index fe5c12419c..d0eed37f99 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1211,22 +1211,22 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p if (pTrans == NULL) { code = TSDB_CODE_MND_RETURN_VALUE_NULL; if (terrno != 0) code = terrno; - return -1; + TAOS_RETURN(code); } mInfo("trans:%d, used to alter db:%s", pTrans->id, pOld->name); mndTransSetDbName(pTrans, pOld->name, NULL); - TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans)); + TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER); - TAOS_CHECK_RETURN(mndSetAlterDbPrepareLogs(pMnode, pTrans, pOld, pNew)); - TAOS_CHECK_RETURN(mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew)); - TAOS_CHECK_RETURN(mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew)); - TAOS_CHECK_RETURN(mndTransPrepare(pMnode, pTrans)); + TAOS_CHECK_GOTO(mndSetAlterDbPrepareLogs(pMnode, pTrans, pOld, pNew), NULL, _OVER); + TAOS_CHECK_GOTO(mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew), NULL, _OVER); + TAOS_CHECK_GOTO(mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew), NULL, _OVER); + TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER); code = 0; _OVER: mndTransDrop(pTrans); - return code; + TAOS_RETURN(code); } static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) { From 76f56d940b19ce48df661dff6bb6993a47e9a2ba Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Fri, 16 Aug 2024 10:26:09 +0800 Subject: [PATCH 16/20] fix issue --- .../executor/src/streamtimewindowoperator.c | 28 +++++++++++++------ source/libs/stream/src/tstreamFileState.c | 25 +++++++++++------ 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 756a6d71e1..5c12db1ab9 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -477,10 +477,12 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pMidRetriveRes); blockDataDestroy(pInfo->pMidPulloverRes); - pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState); + if (pInfo->stateStore.streamFileStateDestroy != NULL) { + pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState); + } taosArrayDestroy(pInfo->pMidPullDatas); - if (pInfo->pState->dump == 1) { + if (pInfo->pState !=NULL && pInfo->pState->dump == 1) { taosMemoryFreeClear(pInfo->pState->pTdbState->pOwner); taosMemoryFreeClear(pInfo->pState->pTdbState); } @@ -1953,12 +1955,14 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN pInfo->numOfDatapack = 0; pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; + pInfo->stateStore = pTaskInfo->storageAPI.stateStore; int32_t funResSize = getMaxFunResSize(&pOperator->exprSupp, numOfCols); pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH); + QUERY_CHECK_NULL(pInfo->pState->pFileState, code, lino, _error, terrno); + pInfo->dataVersion = 0; - pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->recvGetAll = false; pInfo->recvPullover = false; pInfo->recvRetrive = false; @@ -2032,7 +2036,9 @@ void destroyStreamAggSupporter(SStreamAggSupporter* pSup) { tSimpleHashCleanup(pSup->pResultRows); destroyDiskbasedBuf(pSup->pResultBuf); blockDataDestroy(pSup->pScanBlock); - pSup->stateStore.streamFileStateDestroy(pSup->pState->pFileState); + if (pSup->stateStore.streamFileStateDestroy != NULL) { + pSup->stateStore.streamFileStateDestroy(pSup->pState->pFileState); + } taosMemoryFreeClear(pSup->pState); taosMemoryFreeClear(pSup->pDummyCtx); } @@ -2141,7 +2147,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr, SStorageAPI* pApi, int32_t tsIndex) { pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput); - + int32_t lino = 0; int32_t code = createSpecialDataBlock(STREAM_CLEAR, &pSup->pScanBlock); if (code) { return code; @@ -2156,6 +2162,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in } pSup->stateStore = *pStore; + pSup->pSessionAPI = pApi; initDummyFunction(pSup->pDummyCtx, pExpSup->pCtx, numOfOutput); pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState)); @@ -2168,6 +2175,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in pSup->pState->pFileState = pSup->stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState, pTwAggSup->deleteMark, taskIdStr, pHandle->checkpointId, STREAM_STATE_BUFF_SORT); + QUERY_CHECK_NULL(pSup->pState->pFileState, code, lino, _end, terrno); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pSup->pResultRows = tSimpleHashInit(32, hashFn); @@ -2179,8 +2187,11 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in pExpSup->pCtx[i].saveHandle.pState = pSup->pState; } - pSup->pSessionAPI = pApi; - return TSDB_CODE_SUCCESS; +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap) { @@ -5308,9 +5319,11 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->pUpdatedMap = NULL; int32_t funResSize = getMaxFunResSize(pSup, numOfCols); + pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH); + QUERY_CHECK_NULL(pInfo->pState->pFileState, code, lino, _error, terrno); setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -5319,7 +5332,6 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState); - pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->recvGetAll = false; code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 5dacd4c80c..3cdbad2dd5 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -131,21 +131,27 @@ static void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) { SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, int8_t type) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (memSize <= 0) { memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE; } if (rowSize == 0) { + code = TSDB_CODE_INVALID_PARA; goto _error; } SStreamFileState* pFileState = taosMemoryCalloc(1, sizeof(SStreamFileState)); - if (!pFileState) { - goto _error; - } + QUERY_CHECK_NULL(pFileState, code, lino, _error, terrno); + rowSize += selectRowSize; pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2); pFileState->usedBuffs = tdListNew(POINTER_BYTES); + QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _error, terrno); + pFileState->freeBuffs = tdListNew(POINTER_BYTES); + QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _error, terrno); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); int32_t cap = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount); if (type == STREAM_STATE_BUFF_HASH) { @@ -171,10 +177,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->cfName = taosStrdup("sess"); pFileState->stateFunctionGetFn = getSessionRowBuff; } - - if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowStateBuff) { - goto _error; - } + QUERY_CHECK_NULL(pFileState->rowStateBuff, code, lino, _error, terrno); pFileState->keyLen = keySize; pFileState->rowSize = rowSize; @@ -188,6 +191,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->flushMark = INT64_MIN; pFileState->maxTs = INT64_MIN; pFileState->id = taosStrdup(taskId); + QUERY_CHECK_NULL(pFileState->id, code, lino, _error, terrno); // todo(liuyao) optimize if (type == STREAM_STATE_BUFF_HASH) { @@ -198,8 +202,8 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ void* valBuf = NULL; int32_t len = 0; - int32_t code = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len); - if (code == TSDB_CODE_SUCCESS) { + int32_t tmpRes = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len); + if (tmpRes == TSDB_CODE_SUCCESS) { ASSERT(len == sizeof(TSKEY)); streamFileStateDecode(&pFileState->flushMark, valBuf, len); qDebug("===stream===flushMark read:%" PRId64, pFileState->flushMark); @@ -208,6 +212,9 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ return pFileState; _error: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } streamFileStateDestroy(pFileState); return NULL; } From 5361d5f38a0ff405b1511dfd1fd2fdaf0bf71d1f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 16 Aug 2024 13:27:51 +0800 Subject: [PATCH 17/20] fix: possible delete data loss when stt_trigger = 1 --- source/dnode/vnode/src/tsdb/tsdbCommit2.c | 48 ++++++++++------------- 1 file changed, 21 insertions(+), 27 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index 4467102d6f..3c407b31cf 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -157,41 +157,35 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) { int64_t numRecord = 0; SMetaInfo info; - if (committer->tsdb->imem->nDel == 0) { - goto _exit; - } + // if no history data and no new timestamp data, skip tomb data + if (committer->ctx->info->fset || committer->ctx->hasTSData) { + committer->ctx->tbid->suid = 0; + committer->ctx->tbid->uid = 0; + for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) { + if (record->uid != committer->ctx->tbid->uid) { + committer->ctx->tbid->suid = record->suid; + committer->ctx->tbid->uid = record->uid; - // do not need to write tomb data if there is no ts data - bool skip = (committer->ctx->info->fset == NULL && !committer->ctx->hasTSData); - - committer->ctx->tbid->suid = 0; - committer->ctx->tbid->uid = 0; - for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) { - if (record->uid != committer->ctx->tbid->uid) { - committer->ctx->tbid->suid = record->suid; - committer->ctx->tbid->uid = record->uid; - - if (metaGetInfo(committer->tsdb->pVnode->pMeta, record->uid, &info, NULL) != 0) { - TAOS_CHECK_GOTO(tsdbIterMergerSkipTableData(committer->tombIterMerger, committer->ctx->tbid), &lino, _exit); - continue; + if (metaGetInfo(committer->tsdb->pVnode->pMeta, record->uid, &info, NULL) != 0) { + TAOS_CHECK_GOTO(tsdbIterMergerSkipTableData(committer->tombIterMerger, committer->ctx->tbid), &lino, _exit); + continue; + } } - } - if (record->ekey < committer->ctx->minKey) { - // do nothing - } else if (record->skey > committer->ctx->maxKey) { - // committer->ctx->nextKey = TMIN(record->skey, committer->ctx->nextKey); - } else { - record->skey = TMAX(record->skey, committer->ctx->minKey); - record->ekey = TMIN(record->ekey, committer->ctx->maxKey); + if (record->ekey < committer->ctx->minKey) { + // do nothing + } else if (record->skey > committer->ctx->maxKey) { + // committer->ctx->nextKey = TMIN(record->skey, committer->ctx->nextKey); + } else { + record->skey = TMAX(record->skey, committer->ctx->minKey); + record->ekey = TMIN(record->ekey, committer->ctx->maxKey); - if (!skip) { numRecord++; TAOS_CHECK_GOTO(tsdbFSetWriteTombRecord(committer->writer, record), &lino, _exit); } - } - TAOS_CHECK_GOTO(tsdbIterMergerNext(committer->tombIterMerger), &lino, _exit); + TAOS_CHECK_GOTO(tsdbIterMergerNext(committer->tombIterMerger), &lino, _exit); + } } _exit: From a54e7bec9976628a5f5b772a45ead96b84729879 Mon Sep 17 00:00:00 2001 From: Ping Xiao Date: Fri, 16 Aug 2024 15:34:34 +0800 Subject: [PATCH 18/20] fix TD-31500 --- packaging/tools/remove.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packaging/tools/remove.sh b/packaging/tools/remove.sh index 093c81eef4..7af64fab1e 100755 --- a/packaging/tools/remove.sh +++ b/packaging/tools/remove.sh @@ -209,11 +209,11 @@ function clean_service_on_launchctl() { } function remove_data_and_config() { - data_dir=`grep dataDir /etc/taos/taos.cfg | grep -v '#' | tail -n 1 | awk {'print $2'}` + data_dir=`grep dataDir /etc/${PREFIX}/${PREFIX}.cfg | grep -v '#' | tail -n 1 | awk {'print $2'}` if [ X"$data_dir" == X"" ]; then data_dir="/var/lib/${PREFIX}" fi - log_dir=`grep logDir /etc/taos/taos.cfg | grep -v '#' | tail -n 1 | awk {'print $2'}` + log_dir=`grep logDir /etc/${PREFIX}/${PREFIX}.cfg | grep -v '#' | tail -n 1 | awk {'print $2'}` if [ X"$log_dir" == X"" ]; then log_dir="/var/log/${PREFIX}" fi From 3c3507f283f678bae2be88645624e141494863f1 Mon Sep 17 00:00:00 2001 From: sima Date: Fri, 16 Aug 2024 15:54:07 +0800 Subject: [PATCH 19/20] fix:[TD-31503] Return null when expr in timediff is null, and use ms as default time_unit when time_unit is null. --- source/libs/function/src/builtins.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 760a3c4a33..a93ae8e574 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2573,13 +2573,14 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le for (int32_t i = 0; i < 2; ++i) { uint8_t paraType = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, i))->type; - if (!IS_STR_DATA_TYPE(paraType) && !IS_INTEGER_TYPE(paraType) && !IS_TIMESTAMP_TYPE(paraType)) { + if (!IS_STR_DATA_TYPE(paraType) && !IS_INTEGER_TYPE(paraType) && !IS_TIMESTAMP_TYPE(paraType) && !IS_NULL_TYPE(paraType)) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } } - + uint8_t para2Type; if (3 == numOfParams) { - if (!IS_INTEGER_TYPE(getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 2))->type)) { + para2Type = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 2))->type; + if (!IS_INTEGER_TYPE(para2Type) && !IS_NULL_TYPE(para2Type)) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } } @@ -2587,7 +2588,7 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le // add database precision as param uint8_t dbPrec = pFunc->node.resType.precision; - if (3 == numOfParams) { + if (3 == numOfParams && !IS_NULL_TYPE(para2Type)) { int32_t code = validateTimeUnitParam(dbPrec, (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2)); if (code == TSDB_CODE_FUNC_TIME_UNIT_TOO_SMALL) { return buildFuncErrMsg(pErrBuf, len, code, From adc583a93653f490a2c26604e860fc9a73d0cfb7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 16 Aug 2024 17:08:58 +0800 Subject: [PATCH 20/20] fix(stream): fix memory leak. --- source/libs/stream/src/streamMeta.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index c50c3c484e..07c67ba007 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1051,7 +1051,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { } else { // todo this should replace the existed object put by replay creating stream task msg from mnode stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId); - taosMemoryFree(pTask); + tFreeStreamTask(pTask); continue; }