diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 585a0f3672..093555c9c5 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -343,6 +343,7 @@ int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; + SSDataBlock* pBlock = NULL; if (!tsCountAlwaysReturnValue) { return TSDB_CODE_SUCCESS; } @@ -366,7 +367,6 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc return TSDB_CODE_SUCCESS; } - SSDataBlock* pBlock = NULL; code = createDataBlock(&pBlock); if (code) { return code; @@ -414,6 +414,7 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc _end: if (code != TSDB_CODE_SUCCESS) { + blockDataDestroy(pBlock); qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); } return code; diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index 0a818a179f..9019fa0fef 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -294,10 +294,11 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy SSDataBlock* pResBlock = createDataBlockFromDescNode(pCountWindowNode->window.node.pOutputDataBlockDesc); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + initBasicInfo(&pInfo->binfo, pResBlock); + code = blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _error); - initBasicInfo(&pInfo->binfo, pResBlock); initResultRowInfo(&pInfo->binfo.resultRowInfo); pInfo->binfo.inputTsOrder = physiNode->inputTsOrder; pInfo->binfo.outputTsOrder = physiNode->outputTsOrder; diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index a72c1fecf1..d4e5dedd20 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -110,11 +110,11 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy SSDataBlock* pResBlock = createDataBlockFromDescNode(pEventWindowNode->window.node.pOutputDataBlockDesc); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + initBasicInfo(&pInfo->binfo, pResBlock); code = blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _error); - initBasicInfo(&pInfo->binfo, pResBlock); initResultRowInfo(&pInfo->binfo.resultRowInfo); pInfo->binfo.inputTsOrder = physiNode->inputTsOrder; pInfo->binfo.outputTsOrder = physiNode->outputTsOrder; diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index b6c22f7335..e4ebb74252 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -735,6 +735,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo pStart += sizeof(SSysTableSchema); } + pBlock = NULL; code = createDataBlock(&pBlock); QUERY_CHECK_CODE(code, lino, _end); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index c2297d9fba..127e0f18f1 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -278,7 +278,7 @@ SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); blockDataDestroy(pBlock); pBlock = NULL; - terrno = code; + terrno = TSDB_CODE_INVALID_PARA; break; } SColumnInfoData idata = @@ -1094,7 +1094,7 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S code = blockDataEnsureCapacity(pResBlock, numOfTables); if (code != TSDB_CODE_SUCCESS) { terrno = code; - taosMemoryFree(pResBlock); + blockDataDestroy(pResBlock); return NULL; } @@ -1166,7 +1166,7 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S _end: if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(pResBlock); + blockDataDestroy(pResBlock); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); terrno = code; return NULL; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 97549e9ab0..647909cc13 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4319,11 +4319,12 @@ static int32_t tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; int32_t numOfTables = taosArrayGetSize(aUidTags); + SArray* pBlockList = NULL; SSDataBlock* pResBlock = createTagValBlockForFilter(pInfo->filterCtx.cInfoList, numOfTables, aUidTags, pVnode, pAPI); QUERY_CHECK_NULL(pResBlock, code, lino, _end, terrno); - SArray* pBlockList = taosArrayInit(1, POINTER_BYTES); + pBlockList = taosArrayInit(1, POINTER_BYTES); QUERY_CHECK_NULL(pBlockList, code, lino, _end, terrno); void* tmp = taosArrayPush(pBlockList, &pResBlock); @@ -5836,7 +5837,8 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); - int32_t code = 0; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5849,16 +5851,10 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR int32_t numOfCols = 0; code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo); - int32_t lino = 0; - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode, readHandle); - if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(pInfo->base.matchInfo.pList); - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); if (pTableScanNode->scan.pScanPseudoCols != NULL) { SExprSupp* pSup = &pInfo->base.pseudoSup; @@ -5873,10 +5869,7 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5); - if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) { - code = terrno; - goto _error; - } + QUERY_CHECK_NULL(pInfo->base.metaCache.pTableMetaEntryCache, code, lino, _error, terrno); pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader; pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD; @@ -5893,9 +5886,7 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR pInfo->sample.seed = taosGetTimestampSec(); code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo); @@ -5957,6 +5948,9 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR return code; _error: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } pTaskInfo->code = code; pInfo->base.pTableListInfo = NULL; if (pInfo != NULL) destroyTableMergeScanOperatorInfo(pInfo); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 2064c652b7..61d4eed156 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -5203,6 +5203,8 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + initBasicInfo(&pInfo->binfo, pResBlock); + pInfo->interval = (SInterval){ .interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, @@ -5230,7 +5232,6 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SExprSupp* pSup = &pOperator->exprSupp; pSup->hasWindowOrGroup = true; - initBasicInfo(&pInfo->binfo, pResBlock); code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 1efe9c1078..e11ee6b0dc 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -780,6 +780,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { int32_t ret = 0; if (pInfo->pCur == NULL) { pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode); + QUERY_CHECK_NULL(pInfo->pCur, code, lino, _end, terrno); } else { (void)pAPI->metaFn.resumeTableMetaCursor(pInfo->pCur, 0, 0); } @@ -1578,6 +1579,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { SSysTableScanInfo* pInfo = pOperator->info; if (pInfo->pCur == NULL) { pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode); + QUERY_CHECK_NULL(pInfo->pCur, code, lino, _end, terrno); firstMetaCursor = 1; } if (!firstMetaCursor) { diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 9e902140f4..cedc23ed2d 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -152,8 +152,10 @@ static void destoryAllocatedTuple(void* t) { taosMemoryFree(t); } * @param colIndex the columnIndex, for setting null bitmap * @return the next offset to add field * */ -static inline size_t tupleAddField(char** t, uint32_t colNum, uint32_t offset, uint32_t colIdx, void* data, - size_t length, bool isNull, uint32_t tupleLen) { +static inline int32_t tupleAddField(char** t, uint32_t colNum, uint32_t offset, uint32_t colIdx, void* data, + size_t length, bool isNull, uint32_t tupleLen, uint32_t* pOffset) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; tupleSetOffset(*t, colIdx, offset); if (isNull) { @@ -161,16 +163,20 @@ static inline size_t tupleAddField(char** t, uint32_t colNum, uint32_t offset, u } else { if (offset + length > tupleLen + tupleGetDataStartOffset(colNum)) { void* px = taosMemoryRealloc(*t, offset + length); - if (px == NULL) { - return terrno; - } + QUERY_CHECK_NULL(px, code, lino, _end, terrno); *t = px; } tupleSetData(*t, offset, data, length); } - return offset + length; + (*pOffset) = offset + length; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) { @@ -200,6 +206,7 @@ typedef struct ReferencedTuple { } ReferencedTuple; static int32_t createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint32_t tupleLen, size_t rowIdx, TupleDesc** pDesc) { + int32_t code = TSDB_CODE_SUCCESS; TupleDesc* t = taosMemoryCalloc(1, sizeof(TupleDesc)); if (t == NULL) { return terrno; @@ -216,15 +223,20 @@ static int32_t createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint32_t for (size_t colIdx = 0; colIdx < colNum; ++colIdx) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx); if (pCol == NULL) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); return terrno; } if (colDataIsNull_s(pCol, rowIdx)) { - offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen); + code = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen, &offset); } else { colLen = colDataGetRowLength(pCol, rowIdx); - offset = - tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false, tupleLen); + code = + tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false, tupleLen, &offset); + } + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + return code; } } @@ -232,7 +244,7 @@ static int32_t createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint32_t t->data = pTuple; *pDesc = t; - return 0; + return code; } int32_t tupleDescGetField(const TupleDesc* pDesc, int32_t colIdx, uint32_t colNum, void** pResult) { @@ -259,7 +271,7 @@ int32_t tupleDescGetField(const TupleDesc* pDesc, int32_t colIdx, uint32_t colNu void destroyTuple(void* t) { TupleDesc* pDesc = t; - if (pDesc->type == AllocatedTupleType) { + if (pDesc != NULL && pDesc->type == AllocatedTupleType) { destoryAllocatedTuple(pDesc->data); taosMemoryFree(pDesc); } @@ -1686,6 +1698,7 @@ static int32_t initRowIdSort(SSortHandle* pHandle) { biTs.compFn = getKeyComparFunc(TSDB_DATA_TYPE_TIMESTAMP, biTs.order); void* p = taosArrayPush(pOrderInfoList, &biTs); if (p == NULL) { + taosArrayDestroy(pOrderInfoList); return terrno; } @@ -1698,6 +1711,7 @@ static int32_t initRowIdSort(SSortHandle* pHandle) { void* px = taosArrayPush(pOrderInfoList, &biPk); if (px == NULL) { + taosArrayDestroy(pOrderInfoList); return terrno; } }