From 0a2d006c02a99e9a81a62f8f87ea56c3020c8f59 Mon Sep 17 00:00:00 2001 From: dmchen Date: Mon, 5 Aug 2024 09:26:36 +0000 Subject: [PATCH 01/17] fix/TD-31091 --- source/dnode/mnode/impl/src/mndStreamUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 38fddd8bf0..4f84d5184e 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -484,7 +484,7 @@ static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTas } // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. - code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, 0); + code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); if (code != 0) { taosMemoryFree(pReq); return code; From 6a225d5639d424dd3caedd2999865da3593eeebe Mon Sep 17 00:00:00 2001 From: dmchen Date: Wed, 7 Aug 2024 01:51:09 +0000 Subject: [PATCH 02/17] fix/TD-31091-fix-case --- source/dnode/mnode/impl/src/mndStreamUtil.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 4f84d5184e..3c50c9f41d 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -540,7 +540,8 @@ static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask } // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. - code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, 0); + code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, + TSDB_CODE_VND_INVALID_VGROUP_ID); if (code != 0) { taosMemoryFree(pReq); return code; From 1e0b6b08b216f01cdc6daa88b72dbc59eadfcfca Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 7 Aug 2024 09:54:35 +0800 Subject: [PATCH 03/17] fix(stream):set accept code for trans. --- source/dnode/mnode/impl/src/mndStream.c | 2 +- source/dnode/mnode/impl/src/mndStreamUtil.c | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 0b7562b923..c6b4de74d1 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -533,7 +533,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) { return code; } - code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0, 0); + code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); if (code) { taosMemoryFree(buf); } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 38fddd8bf0..684749956f 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -320,7 +320,7 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT return terrno; } - code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, 0); + code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); if (code != 0) { taosMemoryFree(pReq); return terrno; @@ -424,7 +424,7 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa (void) epsetToStr(&epset, buf, tListLen(buf)); mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf); - code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, 0); + code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); if (code != 0) { taosMemoryFree(pReq); return code; @@ -484,7 +484,7 @@ static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTas } // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. - code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, 0); + code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); if (code != 0) { taosMemoryFree(pReq); return code; @@ -713,7 +713,7 @@ static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa return code; } - code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0, 0); + code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pReq); } From e6cc6a47e4ae073ccd6a4e679690ae74a3ec6610 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 7 Aug 2024 10:06:04 +0800 Subject: [PATCH 04/17] fix mem leak --- source/libs/executor/src/exchangeoperator.c | 8 +++++--- source/libs/executor/src/sysscanoperator.c | 19 ++++++++++++++++--- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 21b1c2838b..77587b41d3 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -686,8 +686,9 @@ void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t } int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SSDataBlock* pBlock = NULL; if (pColList == NULL) { // data from other sources blockDataCleanup(pRes); code = blockDecode(pRes, pData, (const char**) pNextStart); @@ -710,7 +711,6 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo pStart += sizeof(SSysTableSchema); } - SSDataBlock* pBlock = NULL; code = createDataBlock(&pBlock); QUERY_CHECK_CODE(code, lino, _end); @@ -735,10 +735,12 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo QUERY_CHECK_CODE(code, lino, _end); blockDataDestroy(pBlock); + pBlock = NULL; } _end: if (code != TSDB_CODE_SUCCESS) { + blockDataDestroy(pBlock); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } return code; diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index cdd22c2adc..e50d638caf 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -672,6 +672,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { } blockDataDestroy(pDataBlock); + pDataBlock = NULL; if (ret != 0) { pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); pInfo->pCur = NULL; @@ -683,6 +684,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { _end: if (code != TSDB_CODE_SUCCESS) { + blockDataDestroy(pDataBlock); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); @@ -695,6 +697,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { int32_t lino = 0; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; + SSDataBlock* dataBlock = NULL; SSysTableScanInfo* pInfo = pOperator->info; if (pOperator->status == OP_EXEC_DONE) { @@ -704,7 +707,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { blockDataCleanup(pInfo->pRes); int32_t numOfRows = 0; - SSDataBlock* dataBlock = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TAGS); + dataBlock = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TAGS); code = blockDataEnsureCapacity(dataBlock, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _end); @@ -826,6 +829,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { } blockDataDestroy(dataBlock); + dataBlock = NULL; if (ret != 0) { pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); pInfo->pCur = NULL; @@ -837,6 +841,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + blockDataDestroy(dataBlock); pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); pInfo->pCur = NULL; pTaskInfo->code = code; @@ -1310,9 +1315,11 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) { QUERY_CHECK_CODE(code, lino, _end); blockDataDestroy(p); + p = NULL; _end: if (code != TSDB_CODE_SUCCESS) { + blockDataDestroy(p); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } return code; @@ -1325,6 +1332,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { SStorageAPI* pAPI = &pTaskInfo->storageAPI; SSysTableScanInfo* pInfo = pOperator->info; SSysTableIndex* pIdx = pInfo->pIdx; + SSDataBlock* p = NULL; blockDataCleanup(pInfo->pRes); int32_t numOfRows = 0; @@ -1344,7 +1352,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { varDataSetLen(dbname, strlen(varDataVal(dbname))); - SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES); + p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES); code = blockDataEnsureCapacity(p, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _end); @@ -1545,12 +1553,14 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { } blockDataDestroy(p); + p = NULL; pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + blockDataDestroy(p); pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); } @@ -1563,6 +1573,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; int8_t firstMetaCursor = 0; + SSDataBlock* p = NULL; SSysTableScanInfo* pInfo = pOperator->info; if (pInfo->pCur == NULL) { @@ -1590,7 +1601,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { varDataSetLen(dbname, strlen(varDataVal(dbname))); - SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES); + p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES); QUERY_CHECK_NULL(p, code, lino, _end, terrno); code = blockDataEnsureCapacity(p, pOperator->resultInfo.capacity); @@ -1783,6 +1794,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { } blockDataDestroy(p); + p = NULL; // todo temporarily free the cursor here, the true reason why the free is not valid needs to be found if (ret != 0) { @@ -1796,6 +1808,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + blockDataDestroy(p); pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); } From 621bc6750db80f1bfd19036430fdff064f1a43c7 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 7 Aug 2024 13:59:47 +0800 Subject: [PATCH 05/17] fix:[TD-31047] change poll delay bigger to avoid consumer null if split too long --- tests/system-test/7-tmq/tmqVnodeSplit-column-false.py | 2 +- tests/system-test/7-tmq/tmqVnodeSplit-column.py | 4 ++-- tests/system-test/7-tmq/tmqVnodeSplit-db-false.py | 2 +- tests/system-test/7-tmq/tmqVnodeSplit-db.py | 4 ++-- .../7-tmq/tmqVnodeSplit-stb-select-duplicatedata-false.py | 2 +- .../7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py | 2 +- tests/system-test/7-tmq/tmqVnodeSplit-stb-select-false.py | 2 +- tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py | 2 +- tests/system-test/7-tmq/tmqVnodeSplit-stb.py | 4 ++-- tests/system-test/7-tmq/tmqVnodeTransform-db-removewal.py | 4 ++-- tests/system-test/7-tmq/tmqVnodeTransform-db.py | 6 +++--- 11 files changed, 17 insertions(+), 17 deletions(-) diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-column-false.py b/tests/system-test/7-tmq/tmqVnodeSplit-column-false.py index 6ef28a4e77..b5105def37 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-column-false.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-column-false.py @@ -52,7 +52,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 60, + 'pollDelay': 120, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-column.py b/tests/system-test/7-tmq/tmqVnodeSplit-column.py index 8987cf5251..1488e304cb 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-column.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-column.py @@ -52,7 +52,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 60, + 'pollDelay': 120, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} @@ -121,7 +121,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 60, + 'pollDelay': 120, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-db-false.py b/tests/system-test/7-tmq/tmqVnodeSplit-db-false.py index bad9e09da5..f77fb53c85 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-db-false.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-db-false.py @@ -121,7 +121,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 60, + 'pollDelay': 120, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-db.py b/tests/system-test/7-tmq/tmqVnodeSplit-db.py index a9fb1c2d4b..979d75d558 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-db.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-db.py @@ -52,7 +52,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 60, + 'pollDelay': 120, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} @@ -121,7 +121,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 60, + 'pollDelay': 120, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata-false.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata-false.py index 3965168fa7..7c8f56f40d 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata-false.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata-false.py @@ -54,7 +54,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 60, + 'pollDelay': 120, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py index d4c76c4f61..5ff2ca6e27 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py @@ -54,7 +54,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 60, + 'pollDelay': 120, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-false.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-false.py index a5e61adc8d..9d89e3b1c0 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-false.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-false.py @@ -56,7 +56,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 60, + 'pollDelay': 120, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py index eb35ebc718..3c5f3ecb30 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py @@ -56,7 +56,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 60, + 'pollDelay': 120, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb.py index 5aa2054e96..b7cebb51e0 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-stb.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb.py @@ -54,7 +54,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 60, + 'pollDelay': 120, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} @@ -123,7 +123,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 60, + 'pollDelay': 120, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} diff --git a/tests/system-test/7-tmq/tmqVnodeTransform-db-removewal.py b/tests/system-test/7-tmq/tmqVnodeTransform-db-removewal.py index a853489c3f..c34c50edb6 100644 --- a/tests/system-test/7-tmq/tmqVnodeTransform-db-removewal.py +++ b/tests/system-test/7-tmq/tmqVnodeTransform-db-removewal.py @@ -140,7 +140,7 @@ class TDTestCase: 'rowsPerTbl': 10000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 2, + 'pollDelay': 60, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} @@ -219,7 +219,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 10, + 'pollDelay': 60, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} diff --git a/tests/system-test/7-tmq/tmqVnodeTransform-db.py b/tests/system-test/7-tmq/tmqVnodeTransform-db.py index cef90ff65f..74d1aaf3f8 100644 --- a/tests/system-test/7-tmq/tmqVnodeTransform-db.py +++ b/tests/system-test/7-tmq/tmqVnodeTransform-db.py @@ -49,7 +49,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 30, + 'pollDelay': 60, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} @@ -141,7 +141,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 10, + 'pollDelay': 60, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} @@ -220,7 +220,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 10, + 'pollDelay': 60, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} From 7559c1e1319e5b278406207ec6fa1aa2d6e13cb5 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 7 Aug 2024 15:33:09 +0800 Subject: [PATCH 06/17] fix mem leak --- source/libs/executor/src/cachescanoperator.c | 2 +- source/libs/executor/src/scanoperator.c | 10 ++++++---- source/libs/executor/src/sysscanoperator.c | 6 ++++-- source/libs/executor/src/timewindowoperator.c | 2 +- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 652ebc0f9a..a6224bf2f0 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -448,7 +448,7 @@ void destroyCacheScanOperator(void* param) { taosArrayDestroy(pInfo->matchInfo.pList); tableListDestroy(pInfo->pTableList); - if (pInfo->pLastrowReader != NULL) { + if (pInfo->pLastrowReader != NULL && pInfo->readHandle.api.cacheFn.closeReader != NULL) { pInfo->readHandle.api.cacheFn.closeReader(pInfo->pLastrowReader); pInfo->pLastrowReader = NULL; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index da0df786fb..c7bb7a333d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3743,7 +3743,7 @@ static void destroyStreamScanOperatorInfo(void* param) { destroyOperator(pStreamScan->pTableScanOp); } - if (pStreamScan->tqReader) { + if (pStreamScan->tqReader != NULL && pStreamScan->readerFn.tqReaderClose != NULL) { pStreamScan->readerFn.tqReaderClose(pStreamScan->tqReader); } if (pStreamScan->matchInfo.pList) { @@ -4573,7 +4573,7 @@ static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) { static void destroyTagScanOperatorInfo(void* param) { STagScanInfo* pInfo = (STagScanInfo*)param; - if (pInfo->pCtbCursor != NULL) { + if (pInfo->pCtbCursor != NULL && pInfo->pStorageAPI != NULL) { pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor); } taosHashCleanup(pInfo->filterCtx.colHash); @@ -5735,8 +5735,10 @@ void destroyTableMergeScanOperatorInfo(void* param) { STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; // start one reader variable - pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); - pTableScanInfo->base.dataReader = NULL; + if (pTableScanInfo->base.readerAPI.tsdReaderClose != NULL) { + pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); + pTableScanInfo->base.dataReader = NULL; + } for (int32_t i = 0; i < pTableScanInfo->numNextDurationBlocks; ++i) { if (pTableScanInfo->nextDurationBlocks[i] != NULL) { diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index e50d638caf..2ce9b1d756 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2257,7 +2257,7 @@ void destroySysScanOperator(void* param) { if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 || strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) { - if (pInfo->pAPI->metaFn.closeTableMetaCursor != NULL) { + if (pInfo->pAPI != NULL && pInfo->pAPI->metaFn.closeTableMetaCursor != NULL) { pInfo->pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); } @@ -2739,7 +2739,9 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { static void destroyBlockDistScanOperatorInfo(void* param) { SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param; blockDataDestroy(pDistInfo->pResBlock); - pDistInfo->readHandle.api.tsdReader.tsdReaderClose(pDistInfo->pHandle); + if (pDistInfo->readHandle.api.tsdReader.tsdReaderClose != NULL) { + pDistInfo->readHandle.api.tsdReader.tsdReaderClose(pDistInfo->pHandle); + } tableListDestroy(pDistInfo->pTableListInfo); taosMemoryFreeClear(param); } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index a367afeed4..71c22d208c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2082,7 +2082,7 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge return code; _error: - destroyMAIOperatorInfo(miaInfo); + if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo); destroyOperator(pOperator); pTaskInfo->code = code; return code; From db0c2e4be461ef10f05a550048198e07cda3c66e Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 7 Aug 2024 15:53:23 +0800 Subject: [PATCH 07/17] free operator --- source/libs/executor/src/aggregateoperator.c | 5 +++- source/libs/executor/src/cachescanoperator.c | 5 +++- .../libs/executor/src/countwindowoperator.c | 5 +++- .../libs/executor/src/eventwindowoperator.c | 5 +++- source/libs/executor/src/exchangeoperator.c | 5 +++- source/libs/executor/src/filloperator.c | 5 +++- source/libs/executor/src/groupcacheoperator.c | 5 +++- source/libs/executor/src/groupoperator.c | 10 ++++++-- source/libs/executor/src/projectoperator.c | 10 ++++++-- source/libs/executor/src/scanoperator.c | 25 +++++++++++++++---- source/libs/executor/src/sortoperator.c | 10 ++++++-- .../executor/src/streamcountwindowoperator.c | 5 +++- .../executor/src/streameventwindowoperator.c | 5 +++- source/libs/executor/src/streamfilloperator.c | 5 +++- .../executor/src/streamtimewindowoperator.c | 25 +++++++++++++++---- source/libs/executor/src/sysscanoperator.c | 10 ++++++-- source/libs/executor/src/timesliceoperator.c | 5 +++- source/libs/executor/src/timewindowoperator.c | 25 +++++++++++++++---- 18 files changed, 136 insertions(+), 34 deletions(-) diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index af10bf8e49..585a0f3672 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -147,7 +147,10 @@ _error: if (pInfo != NULL) { destroyAggOperatorInfo(pInfo); } - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 652ebc0f9a..0501a71e1a 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -247,7 +247,10 @@ _error: } pInfo->pTableList = NULL; destroyCacheScanOperator(pInfo); - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } return code; } diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index 63c0c5fe87..0a818a179f 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -341,7 +341,10 @@ _error: destroyCountWindowOperatorInfo(pInfo); } - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index f9ae8be84f..a72c1fecf1 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -145,7 +145,10 @@ _error: destroyEWindowOperatorInfo(pInfo); } - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 21b1c2838b..d4e720cb19 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -443,7 +443,10 @@ _error: doDestroyExchangeOperatorInfo(pInfo); } - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 882a0dc4b6..f61b514626 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -567,7 +567,10 @@ _error: } pTaskInfo->code = code; - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } return code; } diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index f2e24f160c..00b8c3b9ae 100644 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -1504,7 +1504,10 @@ _error: destroyGroupCacheOperator(pInfo); } - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 309fb4b52f..69a9045004 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1246,7 +1246,10 @@ _error: destroyPartitionOperatorInfo(pInfo); } pTaskInfo->code = code; - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } TAOS_RETURN(code); } @@ -1792,7 +1795,10 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart _error: pTaskInfo->code = code; if (pInfo != NULL) destroyStreamPartitionOperatorInfo(pInfo); - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; } diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 7c9f242a63..3f1eb43578 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -180,7 +180,10 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* _error: destroyProjectOperatorInfo(pInfo); - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; return code; } @@ -529,7 +532,10 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* _error: destroyIndefinitOperatorInfo(pInfo); - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index da0df786fb..488c92bba0 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1432,7 +1432,10 @@ _error: destroyTableScanOperatorInfo(pInfo); } - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; return code; } @@ -4113,7 +4116,10 @@ _error: destroyStreamScanOperatorInfo(pInfo); } - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; return code; } @@ -4670,7 +4676,10 @@ _error: } if (pInfo != NULL) destroyTagScanOperatorInfo(pInfo); - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } return code; } @@ -5915,7 +5924,10 @@ _error: pTaskInfo->code = code; pInfo->base.pTableListInfo = NULL; if (pInfo != NULL) destroyTableMergeScanOperatorInfo(pInfo); - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } return code; } @@ -6072,7 +6084,10 @@ _error: if (pInfo != NULL) { destoryTableCountScanOperator(pInfo); } - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 2a1b4f3e23..59b4e1cbbb 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -164,7 +164,10 @@ _error: destroySortOperatorInfo(pInfo); } - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; return code; } @@ -836,6 +839,9 @@ _error: if (pInfo != NULL) { destroyGroupSortOperatorInfo(pInfo); } - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } return code; } diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 39fe78502d..62506858fc 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -926,7 +926,10 @@ _error: destroyStreamCountAggOperatorInfo(pInfo); } - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 57e31cfebe..bde6198709 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -981,7 +981,10 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* _error: if (pInfo != NULL) destroyStreamEventOperatorInfo(pInfo); - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index b8686d0f19..8d91af46b1 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1459,7 +1459,10 @@ _error: qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } if (pInfo != NULL) destroyStreamFillOperatorInfo(pInfo); - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 08b644b6ec..2064c652b7 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2013,7 +2013,10 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN _error: if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo); - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; return code; } @@ -3832,7 +3835,10 @@ _error: destroyStreamSessionAggOperatorInfo(pInfo); } - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; @@ -4088,7 +4094,10 @@ _error: if (pInfo != NULL) { destroyStreamSessionAggOperatorInfo(pInfo); } - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); @@ -4978,7 +4987,10 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* _error: if (pInfo != NULL) destroyStreamStateOperatorInfo(pInfo); - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; @@ -5313,7 +5325,10 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* _error: if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo); - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index cdd22c2adc..8fe8c07ee2 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2209,7 +2209,10 @@ _error: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; return code; } @@ -2815,6 +2818,9 @@ _error: pInfo->pTableListInfo = NULL; destroyBlockDistScanOperatorInfo(pInfo); } - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } return code; } diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 47ba8e560a..65fcc4d4bc 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -1208,7 +1208,10 @@ _error: qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } if (pInfo != NULL) destroyTimeSliceOperatorInfo(pInfo); - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 99fa941071..44c69a7f54 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1414,7 +1414,10 @@ _error: if (pInfo != NULL) { destroyIntervalOperatorInfo(pInfo); } - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; return code; } @@ -1690,7 +1693,10 @@ _error: destroyStateWindowOperatorInfo(pInfo); } - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; return code; } @@ -1783,7 +1789,10 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh _error: if (pInfo != NULL) destroySWindowOperatorInfo(pInfo); - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; return code; } @@ -2095,7 +2104,10 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge _error: destroyMAIOperatorInfo(miaInfo); - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; return code; } @@ -2427,7 +2439,10 @@ _error: destroyMergeIntervalOperatorInfo(pMergeIntervalInfo); } - destroyOperator(pOperator); + if (pOperator != NULL) { + pOperator->info = NULL; + destroyOperator(pOperator); + } pTaskInfo->code = code; return code; } From 3fdf936c88ce5a31a11c035d29769343e579caac Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 7 Aug 2024 16:46:17 +0800 Subject: [PATCH 08/17] fix: drop user notification --- source/dnode/mnode/impl/src/mndUser.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 8bb2f11e7c..80b1af8bbb 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -1677,7 +1677,7 @@ int32_t mndAcquireUser(SMnode *pMnode, const char *userName, SUserObj **ppUser) *ppUser = sdbAcquire(pSdb, SDB_USER, userName); if (*ppUser == NULL) { - if (code == TSDB_CODE_SDB_OBJ_NOT_THERE) { + if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { code = TSDB_CODE_MND_USER_NOT_EXIST; } else { code = TSDB_CODE_MND_USER_NOT_AVAILABLE; @@ -3150,6 +3150,7 @@ int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_ (void)taosArrayPush(batchRsp.pArray, &rsp); } mError("user:%s, failed to auth user since %s", pUsers[i].user, terrstr()); + code = 0; // reset since it is not an error continue; } From 6f599c9a74d1954fed8c4977dd6d2212a5a53dc0 Mon Sep 17 00:00:00 2001 From: sima Date: Wed, 7 Aug 2024 15:24:16 +0800 Subject: [PATCH 09/17] fix:[TD-31298] fix core on show connection after kill connection. --- source/client/src/clientEnv.c | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 35e6651c41..44abf71bd2 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -526,19 +526,17 @@ int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj int32_t code = TSDB_CODE_SUCCESS; *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj)); if (NULL == *pRequest) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } STscObj *pTscObj = acquireTscObj(connId); if (pTscObj == NULL) { - code = TSDB_CODE_TSC_DISCONNECTED; - goto _return; + TSC_ERR_JRET(terrno); } SSyncQueryParam *interParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam)); if (interParam == NULL) { releaseTscObj(connId); - code = TSDB_CODE_OUT_OF_MEMORY; - goto _return; + TSC_ERR_JRET(terrno); } TSC_ERR_JRET(tsem_init(&interParam->sem, 0, 0)); interParam->pRequest = *pRequest; @@ -566,7 +564,11 @@ int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj return TSDB_CODE_SUCCESS; _return: - doDestroyRequest(*pRequest); + if ((*pRequest)->pTscObj) { + doDestroyRequest(*pRequest); + } else { + taosMemoryFree(*pRequest); + } return code; } From 0ca2ced9514ded0793c9f0c890dfdb74627569bd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 7 Aug 2024 17:33:43 +0800 Subject: [PATCH 10/17] fix(query): check return value. --- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 148 ++++++++------------ 1 file changed, 57 insertions(+), 91 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index e943ef2442..d192dfafed 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -371,13 +371,15 @@ static int32_t tValueDupPayload(SValue *pVal) { static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo, TStatisBlkArray *pStatisBlkArray, uint64_t suid, const char *id) { int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; void* px = NULL; + int32_t startIndex = 0; + int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray); if (numOfBlocks <= 0) { return code; } - int32_t startIndex = 0; while ((startIndex < numOfBlocks) && (pStatisBlkArray->data[startIndex].maxTbid.suid < suid)) { ++startIndex; } @@ -413,150 +415,113 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl // existed if (i < rows) { - if (pBlockLoadInfo->info.pUid == NULL) { - pBlockLoadInfo->info.pUid = taosArrayInit(rows, sizeof(int64_t)); - pBlockLoadInfo->info.pFirstTs = taosArrayInit(rows, sizeof(int64_t)); - pBlockLoadInfo->info.pLastTs = taosArrayInit(rows, sizeof(int64_t)); - pBlockLoadInfo->info.pCount = taosArrayInit(rows, sizeof(int64_t)); + SSttTableRowsInfo* pInfo = &pBlockLoadInfo->info; - pBlockLoadInfo->info.pFirstKey = taosArrayInit(rows, sizeof(SValue)); - pBlockLoadInfo->info.pLastKey = taosArrayInit(rows, sizeof(SValue)); + if (pInfo->pUid == NULL) { + pInfo->pUid = taosArrayInit(rows, sizeof(int64_t)); + pInfo->pFirstTs = taosArrayInit(rows, sizeof(int64_t)); + pInfo->pLastTs = taosArrayInit(rows, sizeof(int64_t)); + pInfo->pCount = taosArrayInit(rows, sizeof(int64_t)); + + pInfo->pFirstKey = taosArrayInit(rows, sizeof(SValue)); + pInfo->pLastKey = taosArrayInit(rows, sizeof(SValue)); + + if (pInfo->pUid == NULL || pInfo->pFirstTs == NULL || pInfo->pLastTs == NULL || pInfo->pCount == NULL || + pInfo->pFirstKey == NULL || pInfo->pLastKey == NULL) { + code = terrno; + goto _end; + } } if (pStatisBlkArray->data[k].maxTbid.suid == suid) { int32_t size = rows - i; int32_t offset = i * sizeof(int64_t); - px = taosArrayAddBatch(pBlockLoadInfo->info.pUid, tBufferGetDataAt(&block.uids, offset), size); - if (px == NULL) { - return terrno; - } + px = taosArrayAddBatch(pInfo->pUid, tBufferGetDataAt(&block.uids, offset), size); + TSDB_CHECK_NULL(px, code, lino, _end, terrno); - px = taosArrayAddBatch(pBlockLoadInfo->info.pFirstTs, tBufferGetDataAt(&block.firstKeyTimestamps, offset), size); - if (px == NULL){ - return terrno; - } + px = taosArrayAddBatch(pInfo->pFirstTs, tBufferGetDataAt(&block.firstKeyTimestamps, offset), size); + TSDB_CHECK_NULL(px, code, lino, _end, terrno); - px = taosArrayAddBatch(pBlockLoadInfo->info.pLastTs, tBufferGetDataAt(&block.lastKeyTimestamps, offset), size); - if (px == NULL){ - return terrno; - } + px = taosArrayAddBatch(pInfo->pLastTs, tBufferGetDataAt(&block.lastKeyTimestamps, offset), size); + TSDB_CHECK_NULL(px, code, lino, _end, terrno); - px = taosArrayAddBatch(pBlockLoadInfo->info.pCount, tBufferGetDataAt(&block.counts, offset), size); - if (px == NULL){ - return terrno; - } + px = taosArrayAddBatch(pInfo->pCount, tBufferGetDataAt(&block.counts, offset), size); + TSDB_CHECK_NULL(px, code, lino, _end, terrno); if (block.numOfPKs > 0) { SValue vFirst = {0}, vLast = {0}; for (int32_t f = i; f < rows; ++f) { code = tValueColumnGet(&block.firstKeyPKs[0], f, &vFirst); - if (code) { - break; - } + TSDB_CHECK_CODE(code, lino, _end); code = tValueDupPayload(&vFirst); - if (code) { - break; - } + TSDB_CHECK_CODE(code, lino, _end); - px = taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst); - if (px == NULL) { - return terrno; - } + px = taosArrayPush(pInfo->pFirstKey, &vFirst); + TSDB_CHECK_NULL(px, code, lino, _end, terrno); // todo add api to clone the original data code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast); - if (code) { - break; - } + TSDB_CHECK_CODE(code, lino, _end); code = tValueDupPayload(&vLast); - if (code) { - break; - } + TSDB_CHECK_CODE(code, lino, _end); - px = taosArrayPush(pBlockLoadInfo->info.pLastKey, &vLast); - if (px == NULL) { - return terrno; - } + px = taosArrayPush(pInfo->pLastKey, &vLast); + TSDB_CHECK_NULL(px, code, lino, _end, terrno); } } else { SValue vFirst = {0}; for (int32_t j = 0; j < size; ++j) { - px = taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst); - if (px == NULL) { - return terrno; - } + px = taosArrayPush(pInfo->pFirstKey, &vFirst); + TSDB_CHECK_NULL(px, code, lino, _end, terrno); - px = taosArrayPush(pBlockLoadInfo->info.pLastKey, &vFirst); - if (px == NULL) { - return terrno; - } + px = taosArrayPush(pInfo->pLastKey, &vFirst); + TSDB_CHECK_NULL(px, code, lino, _end, terrno); } } } else { STbStatisRecord record = {0}; - while (i < rows) { (void)tStatisBlockGet(&block, i, &record); if (record.suid != suid) { break; } - px = taosArrayPush(pBlockLoadInfo->info.pUid, &record.uid); - if (px == NULL) { - return terrno; - } + px = taosArrayPush(pInfo->pUid, &record.uid); + TSDB_CHECK_NULL(px, code, lino, _end, terrno); - px = taosArrayPush(pBlockLoadInfo->info.pCount, &record.count); - if (px == NULL) { - return terrno; - } + px = taosArrayPush(pInfo->pCount, &record.count); + TSDB_CHECK_NULL(px, code, lino, _end, terrno); - px = taosArrayPush(pBlockLoadInfo->info.pFirstTs, &record.firstKey.ts); - if (px == NULL) { - return terrno; - } + px = taosArrayPush(pInfo->pFirstTs, &record.firstKey.ts); + TSDB_CHECK_NULL(px, code, lino, _end, terrno); - px = taosArrayPush(pBlockLoadInfo->info.pLastTs, &record.lastKey.ts); - if (px == NULL) { - return terrno; - } + px = taosArrayPush(pInfo->pLastTs, &record.lastKey.ts); + TSDB_CHECK_NULL(px, code, lino, _end, terrno); if (record.firstKey.numOfPKs > 0) { SValue s = record.firstKey.pks[0]; code = tValueDupPayload(&s); - if (code) { - return code; - } + TSDB_CHECK_CODE(code, lino, _end); - px = taosArrayPush(pBlockLoadInfo->info.pFirstKey, &s); - if (px == NULL) { - return terrno; - } + px = taosArrayPush(pInfo->pFirstKey, &s); + TSDB_CHECK_NULL(px, code, lino, _end, terrno); s = record.lastKey.pks[0]; code = tValueDupPayload(&s); - if (code) { - return code; - } + TSDB_CHECK_CODE(code, lino, _end); - px = taosArrayPush(pBlockLoadInfo->info.pLastKey, &s); - if (px == NULL) { - return terrno; - } + px = taosArrayPush(pInfo->pLastKey, &s); + TSDB_CHECK_NULL(px, code, lino, _end, terrno); } else { SValue v = {0}; - px = taosArrayPush(pBlockLoadInfo->info.pFirstKey, &v); - if (px == NULL) { - return terrno; - } + px = taosArrayPush(pInfo->pFirstKey, &v); + TSDB_CHECK_NULL(px, code, lino, _end, terrno); - px = taosArrayPush(pBlockLoadInfo->info.pLastKey, &v); - if (px == NULL) { - return terrno; - } + px = taosArrayPush(pInfo->pLastKey, &v); + TSDB_CHECK_NULL(px, code, lino, _end, terrno); } i += 1; @@ -565,6 +530,7 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl } } + _end: (void)tStatisBlockDestroy(&block); double el = (taosGetTimestampUs() - st) / 1000.0; From 9712357b87a68f414df1b645752b8c0cfb8cda02 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 7 Aug 2024 17:35:33 +0800 Subject: [PATCH 11/17] refactor: do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index d192dfafed..e55ede560e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -28,7 +28,7 @@ int32_t tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t num SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(1, sizeof(SSttBlockLoadInfo)); if (pLoadInfo == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } pLoadInfo->blockData[0].sttBlockIndex = -1; @@ -50,9 +50,8 @@ int32_t tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t num pLoadInfo->aSttBlk = taosArrayInit(4, sizeof(SSttBlk)); if (pLoadInfo->aSttBlk == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFreeClear(pLoadInfo); - return code; + return terrno; } pLoadInfo->pSchema = pSchema; @@ -358,7 +357,7 @@ static int32_t tValueDupPayload(SValue *pVal) { char *p = (char *)pVal->pData; char *pBuf = taosMemoryMalloc(pVal->nData); if (pBuf == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } memcpy(pBuf, p, pVal->nData); From 06532977281ef2ca50d0ace120bf7607bd09f9d2 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 7 Aug 2024 18:20:11 +0800 Subject: [PATCH 12/17] fix:[TD-31250] change poll delay 1s->5s to avoid no data generated by stream --- utils/test/c/tmq_taosx_ci.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index 8b9cfce395..49cfa3dff8 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -633,7 +633,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { } int32_t cnt = 0; while (running) { - TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000); + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 5000); if (tmqmessage) { cnt++; msg_process(tmqmessage); From d9477c8dd7ee0398f1b9bc26e6c5fb5303e59887 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 7 Aug 2024 18:41:03 +0800 Subject: [PATCH 13/17] Update mndUser.c --- source/dnode/mnode/impl/src/mndUser.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 80b1af8bbb..c4823dc62e 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -3149,8 +3149,8 @@ int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_ (void)memcpy(rsp.user, pUsers[i].user, TSDB_USER_LEN); (void)taosArrayPush(batchRsp.pArray, &rsp); } - mError("user:%s, failed to auth user since %s", pUsers[i].user, terrstr()); - code = 0; // reset since it is not an error + mError("user:%s, failed to auth user since %s", pUsers[i].user, tstrerror(code)); + code = 0; continue; } From c9eb3ca2f9bbd08f2d15aec203b443e3301acb0e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 7 Aug 2024 19:06:42 +0800 Subject: [PATCH 14/17] fix:[TD-31047] change poll delay bigger to avoid consumer null if split too long --- tests/system-test/7-tmq/tmqVnodeTransform-db-removewal.py | 7 ++----- tests/system-test/7-tmq/tmqVnodeTransform-db.py | 6 +++--- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/tests/system-test/7-tmq/tmqVnodeTransform-db-removewal.py b/tests/system-test/7-tmq/tmqVnodeTransform-db-removewal.py index c34c50edb6..034197e0f9 100644 --- a/tests/system-test/7-tmq/tmqVnodeTransform-db-removewal.py +++ b/tests/system-test/7-tmq/tmqVnodeTransform-db-removewal.py @@ -140,7 +140,7 @@ class TDTestCase: 'rowsPerTbl': 10000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 60, + 'pollDelay': 5, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} @@ -190,9 +190,6 @@ class TDTestCase: # redistribute vgroup self.redistributeVgroups() - tdLog.info("start consume processor") - tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) @@ -219,7 +216,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 60, + 'pollDelay': 10, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} diff --git a/tests/system-test/7-tmq/tmqVnodeTransform-db.py b/tests/system-test/7-tmq/tmqVnodeTransform-db.py index 74d1aaf3f8..cef90ff65f 100644 --- a/tests/system-test/7-tmq/tmqVnodeTransform-db.py +++ b/tests/system-test/7-tmq/tmqVnodeTransform-db.py @@ -49,7 +49,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 60, + 'pollDelay': 30, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} @@ -141,7 +141,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 60, + 'pollDelay': 10, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} @@ -220,7 +220,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 60, + 'pollDelay': 10, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} From f313004b4cc94a57c08a0eb823f40f51f945bb9d Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 7 Aug 2024 19:09:04 +0800 Subject: [PATCH 15/17] fix: keep ETIMEDOUT error code in timewait state --- source/os/src/osThread.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/os/src/osThread.c b/source/os/src/osThread.c index 2f418d5a01..3e37d12759 100644 --- a/source/os/src/osThread.c +++ b/source/os/src/osThread.c @@ -242,7 +242,7 @@ int32_t taosThreadCondTimedWait(TdThreadCond *cond, TdThreadMutex *mutex, const return EINVAL; #else int32_t code = pthread_cond_timedwait(cond, mutex, abstime); - if (code) { + if (code && code != ETIMEDOUT) { terrno = TAOS_SYSTEM_ERROR(code); return terrno; } From cccec022eb7808a0c7cee81a1f6d795d440f4b87 Mon Sep 17 00:00:00 2001 From: Ping Xiao Date: Wed, 7 Aug 2024 19:20:07 +0800 Subject: [PATCH 16/17] add test case for TS-4236 --- tests/system-test/2-query/interval_limit_opt.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/system-test/2-query/interval_limit_opt.py b/tests/system-test/2-query/interval_limit_opt.py index aa1702fe3c..2f222d5b43 100644 --- a/tests/system-test/2-query/interval_limit_opt.py +++ b/tests/system-test/2-query/interval_limit_opt.py @@ -195,6 +195,10 @@ class TDTestCase: tdSql.checkData(1, 4, 2) tdSql.checkData(2, 4, 9) tdSql.checkData(3, 4, 9) + + sql = "SELECT _wstart, last(c1) FROM t6 INTERVAL(1w);" + tdSql.query(sql) + tdSql.checkRows(11) def test_partition_by_limit_no_agg(self): sql_template = 'select t1 from meters partition by t1 limit %d' From f180cf091cc20e24748d6593065e6c1bd3e40011 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 8 Aug 2024 08:38:17 +0800 Subject: [PATCH 17/17] free operator --- source/libs/executor/src/operator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 174e68ea7a..af52c31364 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -650,7 +650,7 @@ void destroyOperator(SOperatorInfo* pOperator) { freeResetOperatorParams(pOperator, OP_GET_PARAM, true); freeResetOperatorParams(pOperator, OP_NOTIFY_PARAM, true); - if (pOperator->fpSet.closeFn != NULL) { + if (pOperator->fpSet.closeFn != NULL && pOperator->info != NULL) { pOperator->fpSet.closeFn(pOperator->info); }