diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 093555c9c5..d7b60b2bcd 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -149,6 +149,9 @@ _error: } if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index 9019fa0fef..a9858eeb96 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -344,6 +344,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index d4e5dedd20..b80ea74006 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -147,6 +147,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index b732fccd8e..210c073c6d 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1814,6 +1814,10 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { QUERY_CHECK_CODE(code, lino, _end); res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT}; code = nodesListAppend(pFuncNode->pParameterList, (SNode*)res); + if (code != TSDB_CODE_SUCCESS) { + nodesDestroyNode((SNode*)res); + res = NULL; + } QUERY_CHECK_CODE(code, lino, _end); } #endif @@ -1945,7 +1949,7 @@ int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** SExprInfo* pExp = &pExprs[i]; code = createExprFromTargetNode(pExp, pTargetNode); if (code != TSDB_CODE_SUCCESS) { - taosMemoryFreeClear(pExprs); + destroyExprInfo(pExprs, *numOfExprs); qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); return code; } diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 0b66834d45..5ece57cad1 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -579,6 +579,9 @@ _error: pTaskInfo->code = code; if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } return code; diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 00b8c3b9ae..d5e6061a0f 100644 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -1506,6 +1506,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && pDownstream != NULL && (*pDownstream) != NULL) { + destroyOperator(*pDownstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 69a9045004..064ce42840 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -618,6 +618,9 @@ _error: if (pOperator) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } @@ -1248,6 +1251,9 @@ _error: pTaskInfo->code = code; if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } TAOS_RETURN(code); @@ -1797,6 +1803,9 @@ _error: if (pInfo != NULL) destroyStreamPartitionOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 66a7408b13..8426cb73fe 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -107,10 +107,6 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pOperator->pTaskInfo = pTaskInfo; int32_t lino = 0; - int32_t numOfCols = 0; - SExprInfo* pExprInfo = NULL; - code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols); - TSDB_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pProjPhyNode->node.pOutputDataBlockDesc); TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno); @@ -148,6 +144,11 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* } initResultSizeInfo(&pOperator->resultInfo, numOfRows); + + int32_t numOfCols = 0; + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols); + TSDB_CHECK_CODE(code, lino, _error); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); TSDB_CHECK_CODE(code, lino, _error); @@ -182,6 +183,9 @@ _error: if (pInfo != NULL) destroyProjectOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -534,6 +538,9 @@ _error: if (pInfo != NULL) destroyIndefinitOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 59b4e1cbbb..36f9ac0954 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -166,6 +166,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -841,6 +844,9 @@ _error: } if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } return code; diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 62506858fc..fb4b9db05a 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -928,6 +928,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 93f30ea899..67929678e5 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -986,6 +986,9 @@ _error: if (pInfo != NULL) destroyStreamEventOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 39e602ee84..507ae724e0 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1463,6 +1463,9 @@ _error: if (pInfo != NULL) destroyStreamFillOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index cf3b53bf02..823897eccd 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2018,6 +2018,9 @@ _error: if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -3843,6 +3846,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -4102,6 +4108,9 @@ _error: } if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -4998,6 +5007,9 @@ _error: if (pInfo != NULL) destroyStreamStateOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -5337,6 +5349,9 @@ _error: if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index b14f4f0266..258f886805 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -1212,6 +1212,9 @@ _error: if (pInfo != NULL) destroyTimeSliceOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index fa9dc79cc3..b3f060e213 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1421,6 +1421,9 @@ _error: } if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -1700,6 +1703,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -1796,6 +1802,9 @@ _error: if (pInfo != NULL) destroySWindowOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -2113,6 +2122,9 @@ _error: if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -2450,6 +2462,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code;