diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 093555c9c5..d7b60b2bcd 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -149,6 +149,9 @@ _error: } if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index 9019fa0fef..a9858eeb96 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -344,6 +344,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index d4e5dedd20..b80ea74006 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -147,6 +147,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index b732fccd8e..210c073c6d 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1814,6 +1814,10 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { QUERY_CHECK_CODE(code, lino, _end); res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT}; code = nodesListAppend(pFuncNode->pParameterList, (SNode*)res); + if (code != TSDB_CODE_SUCCESS) { + nodesDestroyNode((SNode*)res); + res = NULL; + } QUERY_CHECK_CODE(code, lino, _end); } #endif @@ -1945,7 +1949,7 @@ int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** SExprInfo* pExp = &pExprs[i]; code = createExprFromTargetNode(pExp, pTargetNode); if (code != TSDB_CODE_SUCCESS) { - taosMemoryFreeClear(pExprs); + destroyExprInfo(pExprs, *numOfExprs); qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); return code; } diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 0b66834d45..5ece57cad1 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -579,6 +579,9 @@ _error: pTaskInfo->code = code; if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } return code; diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 3e9ac2b10a..b899524988 100644 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -1510,6 +1510,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && pDownstream != NULL && (*pDownstream) != NULL) { + destroyOperator(*pDownstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 69a9045004..e5289fa216 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -561,6 +561,10 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo } initBasicInfo(&pInfo->binfo, pResBlock); + pInfo->pGroupCols = NULL; + code = extractColumnInfo(pAggNode->pGroupKeys, &pInfo->pGroupCols); + QUERY_CHECK_CODE(code, lino, _error); + int32_t numOfScalarExpr = 0; SExprInfo* pScalarExprInfo = NULL; if (pAggNode->pExprs != NULL) { @@ -568,10 +572,6 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo QUERY_CHECK_CODE(code, lino, _error); } - pInfo->pGroupCols = NULL; - code = extractColumnInfo(pAggNode->pGroupKeys, &pInfo->pGroupCols); - QUERY_CHECK_CODE(code, lino, _error); - code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -618,6 +618,9 @@ _error: if (pOperator) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } @@ -1162,6 +1165,8 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo SExprInfo* pExprInfo = NULL; code = createExprInfo(pPartNode->pTargets, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); + pOperator->exprSupp.numOfExprs = numOfCols; + pOperator->exprSupp.pExprInfo = pExprInfo; pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys); @@ -1227,8 +1232,6 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); @@ -1248,6 +1251,9 @@ _error: pTaskInfo->code = code; if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } TAOS_RETURN(code); @@ -1797,6 +1803,9 @@ _error: if (pInfo != NULL) destroyStreamPartitionOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 66a7408b13..4d2bdc62f8 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -107,10 +107,6 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pOperator->pTaskInfo = pTaskInfo; int32_t lino = 0; - int32_t numOfCols = 0; - SExprInfo* pExprInfo = NULL; - code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols); - TSDB_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pProjPhyNode->node.pOutputDataBlockDesc); TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno); @@ -148,6 +144,11 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* } initResultSizeInfo(&pOperator->resultInfo, numOfRows); + + int32_t numOfCols = 0; + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols); + TSDB_CHECK_CODE(code, lino, _error); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); TSDB_CHECK_CODE(code, lino, _error); @@ -182,6 +183,9 @@ _error: if (pInfo != NULL) destroyProjectOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -470,11 +474,6 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode; - int32_t numOfExpr = 0; - SExprInfo* pExprInfo = NULL; - code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr); - TSDB_CHECK_CODE(code, lino, _error); - if (pPhyNode->pExprs != NULL) { int32_t num = 0; SExprInfo* pSExpr = NULL; @@ -501,6 +500,11 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* code = blockDataEnsureCapacity(pResBlock, numOfRows); TSDB_CHECK_CODE(code, lino, _error); + int32_t numOfExpr = 0; + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr); + TSDB_CHECK_CODE(code, lino, _error); + code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); TSDB_CHECK_CODE(code, lino, _error); @@ -534,6 +538,9 @@ _error: if (pInfo != NULL) destroyIndefinitOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 59b4e1cbbb..36f9ac0954 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -166,6 +166,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -841,6 +844,9 @@ _error: } if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } return code; diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 62506858fc..44a383772d 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -834,12 +834,14 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* } SExprSupp* pExpSup = &pOperator->exprSupp; + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + pInfo->binfo.pRes = pResBlock; + SExprInfo* pExprInfo = NULL; code = createExprInfo(pCountNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); - QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -863,7 +865,6 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); QUERY_CHECK_CODE(code, lino, _error); - pInfo->binfo.pRes = pResBlock; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = tSimpleHashInit(64, hashFn); QUERY_CHECK_NULL(pInfo->pStDeleted, code, lino, _error, terrno); @@ -928,6 +929,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 93f30ea899..ff1ff579fc 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -895,14 +895,16 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + pInfo->binfo.pRes = pResBlock; + SExprSupp* pExpSup = &pOperator->exprSupp; int32_t numOfCols = 0; SExprInfo* pExprInfo = NULL; code = createExprInfo(pEventNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); - QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -986,6 +988,9 @@ _error: if (pInfo != NULL) destroyStreamEventOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 39e602ee84..75b15dbea4 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1370,6 +1370,9 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pFillExprInfo, &numOfFillCols); QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI); if (!pInfo->pFillSup) { code = TSDB_CODE_FAILED; @@ -1440,9 +1443,6 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); QUERY_CHECK_CODE(code, lino, _error); - code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore); - QUERY_CHECK_CODE(code, lino, _error); - pInfo->srcRowIndex = -1; setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -1463,6 +1463,9 @@ _error: if (pInfo != NULL) destroyStreamFillOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index cf3b53bf02..756a6d71e1 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1896,11 +1896,6 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN QUERY_CHECK_CODE(code, lino, _error); } - int32_t numOfCols = 0; - SExprInfo* pExprInfo = NULL; - code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); - QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); @@ -1914,6 +1909,12 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState); pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex); + + int32_t numOfCols = 0; + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -2018,6 +2019,9 @@ _error: if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -3739,13 +3743,15 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode } } SExprSupp* pExpSup = &pOperator->exprSupp; + + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + pInfo->binfo.pRes = pResBlock; SExprInfo* pExprInfo = NULL; code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); - QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -3771,7 +3777,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode if (pSessionNode->window.pTsEnd) { pInfo->endTsIndex = ((SColumnNode*)pSessionNode->window.pTsEnd)->slotId; } - pInfo->binfo.pRes = pResBlock; + pInfo->order = TSDB_ORDER_ASC; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = tSimpleHashInit(64, hashFn); @@ -3843,6 +3849,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -4102,6 +4111,9 @@ _error: } if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -4915,14 +4927,16 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + pInfo->binfo.pRes = pResBlock; + SExprSupp* pExpSup = &pOperator->exprSupp; int32_t numOfCols = 0; SExprInfo* pExprInfo = NULL; code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); - QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -4998,6 +5012,9 @@ _error: if (pInfo != NULL) destroyStreamStateOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -5206,10 +5223,6 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode; - SExprInfo* pExprInfo = NULL; - code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); - QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); @@ -5253,6 +5266,9 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex); size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -5337,6 +5353,9 @@ _error: if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index e346946a7a..59c19a706c 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -570,8 +570,8 @@ int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFi _end: if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(pFillInfo->next.pRowVal); - taosArrayDestroy(pFillInfo->prev.pRowVal); + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + pFillInfo = taosDestroyFillInfo(pFillInfo); } (*ppFillInfo) = pFillInfo; return code; @@ -764,6 +764,7 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn SValueNode* pv = (SValueNode*)nodesListGetNode(pValNode->pNodeList, index); QUERY_CHECK_NULL(pv, code, lino, _end, terrno); code = nodesValueNodeToVariant(pv, &pFillCol[i].fillVal); + QUERY_CHECK_CODE(code, lino, _end); } if (TSDB_CODE_SUCCESS != code) { goto _end; diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 8cd547e333..258f886805 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -1212,6 +1212,9 @@ _error: if (pInfo != NULL) destroyTimeSliceOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -1249,10 +1252,11 @@ void destroyTimeSliceOperatorInfo(void* param) { } cleanupExprSupp(&pInfo->scalarSup); - - for (int32_t i = 0; i < pInfo->pFillColInfo->numOfFillExpr; ++i) { - taosVariantDestroy(&pInfo->pFillColInfo[i].fillVal); + if (pInfo->pFillColInfo != NULL) { + for (int32_t i = 0; i < pInfo->pFillColInfo->numOfFillExpr; ++i) { + taosVariantDestroy(&pInfo->pFillColInfo[i].fillVal); + } + taosMemoryFree(pInfo->pFillColInfo); } - taosMemoryFree(pInfo->pFillColInfo); taosMemoryFreeClear(param); } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index fa9dc79cc3..6a74c6a093 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1421,6 +1421,9 @@ _error: } if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -1700,6 +1703,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -1740,15 +1746,15 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + initBasicInfo(&pInfo->binfo, pResBlock); + int32_t numOfCols = 0; SExprInfo* pExprInfo = NULL; code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc); - QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); - initBasicInfo(&pInfo->binfo, pResBlock); - code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -1796,6 +1802,9 @@ _error: if (pInfo != NULL) destroySWindowOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -2113,6 +2122,9 @@ _error: if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -2380,11 +2392,6 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva goto _error; } - int32_t num = 0; - SExprInfo* pExprInfo = NULL; - code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num); - QUERY_CHECK_CODE(code, lino, _error); - SInterval interval = {.interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, .intervalUnit = pIntervalPhyNode->intervalUnit, @@ -2408,6 +2415,11 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); + int32_t num = 0; + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { @@ -2450,6 +2462,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code;