From 3b71401e9698efb77199a1f0c52d34326cc410d3 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 7 Aug 2024 15:09:21 +0800 Subject: [PATCH 1/6] fix issue --- source/libs/executor/src/tsort.c | 36 +++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 9e902140f4..7d13566e0f 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -152,8 +152,10 @@ static void destoryAllocatedTuple(void* t) { taosMemoryFree(t); } * @param colIndex the columnIndex, for setting null bitmap * @return the next offset to add field * */ -static inline size_t tupleAddField(char** t, uint32_t colNum, uint32_t offset, uint32_t colIdx, void* data, - size_t length, bool isNull, uint32_t tupleLen) { +static inline int32_t tupleAddField(char** t, uint32_t colNum, uint32_t offset, uint32_t colIdx, void* data, + size_t length, bool isNull, uint32_t tupleLen, uint32_t* pOffset) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; tupleSetOffset(*t, colIdx, offset); if (isNull) { @@ -161,16 +163,20 @@ static inline size_t tupleAddField(char** t, uint32_t colNum, uint32_t offset, u } else { if (offset + length > tupleLen + tupleGetDataStartOffset(colNum)) { void* px = taosMemoryRealloc(*t, offset + length); - if (px == NULL) { - return terrno; - } + QUERY_CHECK_NULL(px, code, lino, _end, terrno); *t = px; } tupleSetData(*t, offset, data, length); } - return offset + length; + (*pOffset) = offset + length; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) { @@ -200,6 +206,7 @@ typedef struct ReferencedTuple { } ReferencedTuple; static int32_t createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint32_t tupleLen, size_t rowIdx, TupleDesc** pDesc) { + int32_t code = TSDB_CODE_SUCCESS; TupleDesc* t = taosMemoryCalloc(1, sizeof(TupleDesc)); if (t == NULL) { return terrno; @@ -216,15 +223,20 @@ static int32_t createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint32_t for (size_t colIdx = 0; colIdx < colNum; ++colIdx) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx); if (pCol == NULL) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); return terrno; } if (colDataIsNull_s(pCol, rowIdx)) { - offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen); + code = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen, &offset); } else { colLen = colDataGetRowLength(pCol, rowIdx); - offset = - tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false, tupleLen); + code = + tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false, tupleLen, &offset); + } + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + return code; } } @@ -232,7 +244,7 @@ static int32_t createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint32_t t->data = pTuple; *pDesc = t; - return 0; + return code; } int32_t tupleDescGetField(const TupleDesc* pDesc, int32_t colIdx, uint32_t colNum, void** pResult) { @@ -259,7 +271,7 @@ int32_t tupleDescGetField(const TupleDesc* pDesc, int32_t colIdx, uint32_t colNu void destroyTuple(void* t) { TupleDesc* pDesc = t; - if (pDesc->type == AllocatedTupleType) { + if (pDesc != NULL && pDesc->type == AllocatedTupleType) { destoryAllocatedTuple(pDesc->data); taosMemoryFree(pDesc); } @@ -2713,7 +2725,7 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) { pqNode.data = &refTuple; PriorityQueueNode* pPushedNode = taosBQPush(pHandle->pBoundedQueue, &pqNode); if (!pPushedNode) { - // do nothing if push failed + return terrno; } else { pPushedNode->data = NULL; int32_t code = createAllocatedTuple(pBlock, colNum, tupleLen, rowIdx, (TupleDesc**)&pPushedNode->data); From 8ab78009a9e0621ee1aa11001c2216b3a9f03b43 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 7 Aug 2024 16:59:19 +0800 Subject: [PATCH 2/6] fix issue --- source/libs/executor/src/aggregateoperator.c | 3 ++- source/libs/executor/src/countwindowoperator.c | 3 ++- source/libs/executor/src/eventwindowoperator.c | 2 +- source/libs/executor/src/exchangeoperator.c | 4 +++- source/libs/executor/src/executil.c | 6 +++--- source/libs/executor/src/scanoperator.c | 3 ++- source/libs/executor/src/streamtimewindowoperator.c | 3 ++- source/libs/executor/src/tsort.c | 2 ++ 8 files changed, 17 insertions(+), 9 deletions(-) diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index af10bf8e49..3f67ac0395 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -340,6 +340,7 @@ int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; + SSDataBlock* pBlock = NULL; if (!tsCountAlwaysReturnValue) { return TSDB_CODE_SUCCESS; } @@ -363,7 +364,6 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc return TSDB_CODE_SUCCESS; } - SSDataBlock* pBlock = NULL; code = createDataBlock(&pBlock); if (code) { return code; @@ -411,6 +411,7 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc _end: if (code != TSDB_CODE_SUCCESS) { + blockDataDestroy(pBlock); qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); } return code; diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index 63c0c5fe87..1dcb141c62 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -294,10 +294,11 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy SSDataBlock* pResBlock = createDataBlockFromDescNode(pCountWindowNode->window.node.pOutputDataBlockDesc); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + initBasicInfo(&pInfo->binfo, pResBlock); + code = blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _error); - initBasicInfo(&pInfo->binfo, pResBlock); initResultRowInfo(&pInfo->binfo.resultRowInfo); pInfo->binfo.inputTsOrder = physiNode->inputTsOrder; pInfo->binfo.outputTsOrder = physiNode->outputTsOrder; diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index f9ae8be84f..acee61b44d 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -110,11 +110,11 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy SSDataBlock* pResBlock = createDataBlockFromDescNode(pEventWindowNode->window.node.pOutputDataBlockDesc); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + initBasicInfo(&pInfo->binfo, pResBlock); code = blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _error); - initBasicInfo(&pInfo->binfo, pResBlock); initResultRowInfo(&pInfo->binfo.resultRowInfo); pInfo->binfo.inputTsOrder = physiNode->inputTsOrder; pInfo->binfo.outputTsOrder = physiNode->outputTsOrder; diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 21b1c2838b..8f87a955ba 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -688,6 +688,7 @@ void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; + SSDataBlock* pBlock = NULL; if (pColList == NULL) { // data from other sources blockDataCleanup(pRes); code = blockDecode(pRes, pData, (const char**) pNextStart); @@ -710,7 +711,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo pStart += sizeof(SSysTableSchema); } - SSDataBlock* pBlock = NULL; + pBlock = NULL; code = createDataBlock(&pBlock); QUERY_CHECK_CODE(code, lino, _end); @@ -739,6 +740,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo _end: if (code != TSDB_CODE_SUCCESS) { + blockDataDestroy(pBlock); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } return code; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index c2297d9fba..127e0f18f1 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -278,7 +278,7 @@ SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); blockDataDestroy(pBlock); pBlock = NULL; - terrno = code; + terrno = TSDB_CODE_INVALID_PARA; break; } SColumnInfoData idata = @@ -1094,7 +1094,7 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S code = blockDataEnsureCapacity(pResBlock, numOfTables); if (code != TSDB_CODE_SUCCESS) { terrno = code; - taosMemoryFree(pResBlock); + blockDataDestroy(pResBlock); return NULL; } @@ -1166,7 +1166,7 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S _end: if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(pResBlock); + blockDataDestroy(pResBlock); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); terrno = code; return NULL; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index da0df786fb..7d88cc3948 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4279,11 +4279,12 @@ static int32_t tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; int32_t numOfTables = taosArrayGetSize(aUidTags); + SArray* pBlockList = NULL; SSDataBlock* pResBlock = createTagValBlockForFilter(pInfo->filterCtx.cInfoList, numOfTables, aUidTags, pVnode, pAPI); QUERY_CHECK_NULL(pResBlock, code, lino, _end, terrno); - SArray* pBlockList = taosArrayInit(1, POINTER_BYTES); + pBlockList = taosArrayInit(1, POINTER_BYTES); QUERY_CHECK_NULL(pBlockList, code, lino, _end, terrno); void* tmp = taosArrayPush(pBlockList, &pResBlock); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 08b644b6ec..d719bbf9af 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -5191,6 +5191,8 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + initBasicInfo(&pInfo->binfo, pResBlock); + pInfo->interval = (SInterval){ .interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, @@ -5218,7 +5220,6 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SExprSupp* pSup = &pOperator->exprSupp; pSup->hasWindowOrGroup = true; - initBasicInfo(&pInfo->binfo, pResBlock); code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 7d13566e0f..c39979d8a9 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1698,6 +1698,7 @@ static int32_t initRowIdSort(SSortHandle* pHandle) { biTs.compFn = getKeyComparFunc(TSDB_DATA_TYPE_TIMESTAMP, biTs.order); void* p = taosArrayPush(pOrderInfoList, &biTs); if (p == NULL) { + taosArrayDestroy(pOrderInfoList); return terrno; } @@ -1710,6 +1711,7 @@ static int32_t initRowIdSort(SSortHandle* pHandle) { void* px = taosArrayPush(pOrderInfoList, &biPk); if (px == NULL) { + taosArrayDestroy(pOrderInfoList); return terrno; } } From 531dfbb135cfce103b40179a0eb4a8af62f82d59 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 7 Aug 2024 17:08:50 +0800 Subject: [PATCH 3/6] fix issue --- source/libs/executor/src/tsort.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index c39979d8a9..bc9cc920a4 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -2727,7 +2727,7 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) { pqNode.data = &refTuple; PriorityQueueNode* pPushedNode = taosBQPush(pHandle->pBoundedQueue, &pqNode); if (!pPushedNode) { - return terrno; + if (terrno != TSDB_CODE_SUCCESS) return terrno; } else { pPushedNode->data = NULL; int32_t code = createAllocatedTuple(pBlock, colNum, tupleLen, rowIdx, (TupleDesc**)&pPushedNode->data); From 2039bb593481503422be09f6e8a3c3638f88fd83 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 8 Aug 2024 08:53:43 +0800 Subject: [PATCH 4/6] fix issue --- source/libs/executor/src/tsort.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index bc9cc920a4..cedc23ed2d 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -2727,7 +2727,7 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) { pqNode.data = &refTuple; PriorityQueueNode* pPushedNode = taosBQPush(pHandle->pBoundedQueue, &pqNode); if (!pPushedNode) { - if (terrno != TSDB_CODE_SUCCESS) return terrno; + // do nothing if push failed } else { pPushedNode->data = NULL; int32_t code = createAllocatedTuple(pBlock, colNum, tupleLen, rowIdx, (TupleDesc**)&pPushedNode->data); From 4bebadd0002f387f62101a80bae9832180a3071f Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 8 Aug 2024 11:23:50 +0800 Subject: [PATCH 5/6] fix issue --- source/libs/executor/src/scanoperator.c | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 76ba88ef34..647909cc13 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -5837,7 +5837,8 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); - int32_t code = 0; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5850,16 +5851,10 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR int32_t numOfCols = 0; code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo); - int32_t lino = 0; - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode, readHandle); - if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(pInfo->base.matchInfo.pList); - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); if (pTableScanNode->scan.pScanPseudoCols != NULL) { SExprSupp* pSup = &pInfo->base.pseudoSup; @@ -5874,10 +5869,7 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5); - if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) { - code = terrno; - goto _error; - } + QUERY_CHECK_NULL(pInfo->base.metaCache.pTableMetaEntryCache, code, lino, _error, terrno); pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader; pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD; @@ -5894,9 +5886,7 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR pInfo->sample.seed = taosGetTimestampSec(); code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo); @@ -5958,6 +5948,9 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR return code; _error: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } pTaskInfo->code = code; pInfo->base.pTableListInfo = NULL; if (pInfo != NULL) destroyTableMergeScanOperatorInfo(pInfo); From 3515af8099bcde60ae5fdc68418ebeb793f28184 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 8 Aug 2024 11:36:36 +0800 Subject: [PATCH 6/6] check function res --- source/libs/executor/src/sysscanoperator.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 1ca8999e79..0de0575746 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -780,6 +780,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { int32_t ret = 0; if (pInfo->pCur == NULL) { pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode); + QUERY_CHECK_NULL(pInfo->pCur, code, lino, _end, terrno); } else { pAPI->metaFn.resumeTableMetaCursor(pInfo->pCur, 0, 0); } @@ -1578,6 +1579,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { SSysTableScanInfo* pInfo = pOperator->info; if (pInfo->pCur == NULL) { pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode); + QUERY_CHECK_NULL(pInfo->pCur, code, lino, _end, terrno); firstMetaCursor = 1; } if (!firstMetaCursor) {