From 8ab78009a9e0621ee1aa11001c2216b3a9f03b43 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 7 Aug 2024 16:59:19 +0800 Subject: [PATCH] fix issue --- source/libs/executor/src/aggregateoperator.c | 3 ++- source/libs/executor/src/countwindowoperator.c | 3 ++- source/libs/executor/src/eventwindowoperator.c | 2 +- source/libs/executor/src/exchangeoperator.c | 4 +++- source/libs/executor/src/executil.c | 6 +++--- source/libs/executor/src/scanoperator.c | 3 ++- source/libs/executor/src/streamtimewindowoperator.c | 3 ++- source/libs/executor/src/tsort.c | 2 ++ 8 files changed, 17 insertions(+), 9 deletions(-) diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index af10bf8e49..3f67ac0395 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -340,6 +340,7 @@ int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; + SSDataBlock* pBlock = NULL; if (!tsCountAlwaysReturnValue) { return TSDB_CODE_SUCCESS; } @@ -363,7 +364,6 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc return TSDB_CODE_SUCCESS; } - SSDataBlock* pBlock = NULL; code = createDataBlock(&pBlock); if (code) { return code; @@ -411,6 +411,7 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc _end: if (code != TSDB_CODE_SUCCESS) { + blockDataDestroy(pBlock); qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); } return code; diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index 63c0c5fe87..1dcb141c62 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -294,10 +294,11 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy SSDataBlock* pResBlock = createDataBlockFromDescNode(pCountWindowNode->window.node.pOutputDataBlockDesc); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + initBasicInfo(&pInfo->binfo, pResBlock); + code = blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _error); - initBasicInfo(&pInfo->binfo, pResBlock); initResultRowInfo(&pInfo->binfo.resultRowInfo); pInfo->binfo.inputTsOrder = physiNode->inputTsOrder; pInfo->binfo.outputTsOrder = physiNode->outputTsOrder; diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index f9ae8be84f..acee61b44d 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -110,11 +110,11 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy SSDataBlock* pResBlock = createDataBlockFromDescNode(pEventWindowNode->window.node.pOutputDataBlockDesc); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + initBasicInfo(&pInfo->binfo, pResBlock); code = blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _error); - initBasicInfo(&pInfo->binfo, pResBlock); initResultRowInfo(&pInfo->binfo.resultRowInfo); pInfo->binfo.inputTsOrder = physiNode->inputTsOrder; pInfo->binfo.outputTsOrder = physiNode->outputTsOrder; diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 21b1c2838b..8f87a955ba 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -688,6 +688,7 @@ void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; + SSDataBlock* pBlock = NULL; if (pColList == NULL) { // data from other sources blockDataCleanup(pRes); code = blockDecode(pRes, pData, (const char**) pNextStart); @@ -710,7 +711,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo pStart += sizeof(SSysTableSchema); } - SSDataBlock* pBlock = NULL; + pBlock = NULL; code = createDataBlock(&pBlock); QUERY_CHECK_CODE(code, lino, _end); @@ -739,6 +740,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo _end: if (code != TSDB_CODE_SUCCESS) { + blockDataDestroy(pBlock); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } return code; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index c2297d9fba..127e0f18f1 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -278,7 +278,7 @@ SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); blockDataDestroy(pBlock); pBlock = NULL; - terrno = code; + terrno = TSDB_CODE_INVALID_PARA; break; } SColumnInfoData idata = @@ -1094,7 +1094,7 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S code = blockDataEnsureCapacity(pResBlock, numOfTables); if (code != TSDB_CODE_SUCCESS) { terrno = code; - taosMemoryFree(pResBlock); + blockDataDestroy(pResBlock); return NULL; } @@ -1166,7 +1166,7 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S _end: if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(pResBlock); + blockDataDestroy(pResBlock); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); terrno = code; return NULL; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index da0df786fb..7d88cc3948 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4279,11 +4279,12 @@ static int32_t tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; int32_t numOfTables = taosArrayGetSize(aUidTags); + SArray* pBlockList = NULL; SSDataBlock* pResBlock = createTagValBlockForFilter(pInfo->filterCtx.cInfoList, numOfTables, aUidTags, pVnode, pAPI); QUERY_CHECK_NULL(pResBlock, code, lino, _end, terrno); - SArray* pBlockList = taosArrayInit(1, POINTER_BYTES); + pBlockList = taosArrayInit(1, POINTER_BYTES); QUERY_CHECK_NULL(pBlockList, code, lino, _end, terrno); void* tmp = taosArrayPush(pBlockList, &pResBlock); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 08b644b6ec..d719bbf9af 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -5191,6 +5191,8 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + initBasicInfo(&pInfo->binfo, pResBlock); + pInfo->interval = (SInterval){ .interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, @@ -5218,7 +5220,6 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SExprSupp* pSup = &pOperator->exprSupp; pSup->hasWindowOrGroup = true; - initBasicInfo(&pInfo->binfo, pResBlock); code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 7d13566e0f..c39979d8a9 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1698,6 +1698,7 @@ static int32_t initRowIdSort(SSortHandle* pHandle) { biTs.compFn = getKeyComparFunc(TSDB_DATA_TYPE_TIMESTAMP, biTs.order); void* p = taosArrayPush(pOrderInfoList, &biTs); if (p == NULL) { + taosArrayDestroy(pOrderInfoList); return terrno; } @@ -1710,6 +1711,7 @@ static int32_t initRowIdSort(SSortHandle* pHandle) { void* px = taosArrayPush(pOrderInfoList, &biPk); if (px == NULL) { + taosArrayDestroy(pOrderInfoList); return terrno; } }