From 5589799e90a7dce1a7ab013d9536d923961bb209 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 15 Aug 2024 10:58:53 +0800 Subject: [PATCH 1/4] fix issue --- source/libs/executor/src/tfill.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index e346946a7a..a7e2ea3429 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -570,8 +570,8 @@ int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFi _end: if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(pFillInfo->next.pRowVal); - taosArrayDestroy(pFillInfo->prev.pRowVal); + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + pFillInfo = taosDestroyFillInfo(pFillInfo); } (*ppFillInfo) = pFillInfo; return code; From ef5f69e3cf97eb77fe7cb46ff478c16768a6d0b3 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 15 Aug 2024 14:13:08 +0800 Subject: [PATCH 2/4] fix issue --- source/libs/executor/src/timesliceoperator.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 8cd547e333..b14f4f0266 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -1249,10 +1249,11 @@ void destroyTimeSliceOperatorInfo(void* param) { } cleanupExprSupp(&pInfo->scalarSup); - - for (int32_t i = 0; i < pInfo->pFillColInfo->numOfFillExpr; ++i) { - taosVariantDestroy(&pInfo->pFillColInfo[i].fillVal); + if (pInfo->pFillColInfo != NULL) { + for (int32_t i = 0; i < pInfo->pFillColInfo->numOfFillExpr; ++i) { + taosVariantDestroy(&pInfo->pFillColInfo[i].fillVal); + } + taosMemoryFree(pInfo->pFillColInfo); } - taosMemoryFree(pInfo->pFillColInfo); taosMemoryFreeClear(param); } From cf796a45c1bb867330ac2447f49f9adc2ffd7d26 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 15 Aug 2024 17:03:32 +0800 Subject: [PATCH 3/4] fix issue --- source/libs/executor/src/aggregateoperator.c | 3 +++ source/libs/executor/src/countwindowoperator.c | 3 +++ source/libs/executor/src/eventwindowoperator.c | 3 +++ source/libs/executor/src/executil.c | 6 +++++- source/libs/executor/src/filloperator.c | 3 +++ source/libs/executor/src/groupcacheoperator.c | 3 +++ source/libs/executor/src/groupoperator.c | 9 +++++++++ source/libs/executor/src/projectoperator.c | 15 +++++++++++---- source/libs/executor/src/sortoperator.c | 6 ++++++ .../libs/executor/src/streamcountwindowoperator.c | 3 +++ .../libs/executor/src/streameventwindowoperator.c | 3 +++ source/libs/executor/src/streamfilloperator.c | 3 +++ .../libs/executor/src/streamtimewindowoperator.c | 15 +++++++++++++++ source/libs/executor/src/timesliceoperator.c | 3 +++ source/libs/executor/src/timewindowoperator.c | 15 +++++++++++++++ 15 files changed, 88 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 093555c9c5..d7b60b2bcd 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -149,6 +149,9 @@ _error: } if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index 9019fa0fef..a9858eeb96 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -344,6 +344,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index d4e5dedd20..b80ea74006 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -147,6 +147,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index b732fccd8e..210c073c6d 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1814,6 +1814,10 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { QUERY_CHECK_CODE(code, lino, _end); res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT}; code = nodesListAppend(pFuncNode->pParameterList, (SNode*)res); + if (code != TSDB_CODE_SUCCESS) { + nodesDestroyNode((SNode*)res); + res = NULL; + } QUERY_CHECK_CODE(code, lino, _end); } #endif @@ -1945,7 +1949,7 @@ int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** SExprInfo* pExp = &pExprs[i]; code = createExprFromTargetNode(pExp, pTargetNode); if (code != TSDB_CODE_SUCCESS) { - taosMemoryFreeClear(pExprs); + destroyExprInfo(pExprs, *numOfExprs); qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); return code; } diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 0b66834d45..5ece57cad1 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -579,6 +579,9 @@ _error: pTaskInfo->code = code; if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } return code; diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 00b8c3b9ae..d5e6061a0f 100644 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -1506,6 +1506,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && pDownstream != NULL && (*pDownstream) != NULL) { + destroyOperator(*pDownstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 69a9045004..064ce42840 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -618,6 +618,9 @@ _error: if (pOperator) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } @@ -1248,6 +1251,9 @@ _error: pTaskInfo->code = code; if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } TAOS_RETURN(code); @@ -1797,6 +1803,9 @@ _error: if (pInfo != NULL) destroyStreamPartitionOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 66a7408b13..8426cb73fe 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -107,10 +107,6 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pOperator->pTaskInfo = pTaskInfo; int32_t lino = 0; - int32_t numOfCols = 0; - SExprInfo* pExprInfo = NULL; - code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols); - TSDB_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pProjPhyNode->node.pOutputDataBlockDesc); TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno); @@ -148,6 +144,11 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* } initResultSizeInfo(&pOperator->resultInfo, numOfRows); + + int32_t numOfCols = 0; + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols); + TSDB_CHECK_CODE(code, lino, _error); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); TSDB_CHECK_CODE(code, lino, _error); @@ -182,6 +183,9 @@ _error: if (pInfo != NULL) destroyProjectOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -534,6 +538,9 @@ _error: if (pInfo != NULL) destroyIndefinitOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 59b4e1cbbb..36f9ac0954 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -166,6 +166,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -841,6 +844,9 @@ _error: } if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } return code; diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 62506858fc..fb4b9db05a 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -928,6 +928,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 93f30ea899..67929678e5 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -986,6 +986,9 @@ _error: if (pInfo != NULL) destroyStreamEventOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 39e602ee84..507ae724e0 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1463,6 +1463,9 @@ _error: if (pInfo != NULL) destroyStreamFillOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index cf3b53bf02..823897eccd 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2018,6 +2018,9 @@ _error: if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -3843,6 +3846,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -4102,6 +4108,9 @@ _error: } if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -4998,6 +5007,9 @@ _error: if (pInfo != NULL) destroyStreamStateOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -5337,6 +5349,9 @@ _error: if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index b14f4f0266..258f886805 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -1212,6 +1212,9 @@ _error: if (pInfo != NULL) destroyTimeSliceOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index fa9dc79cc3..b3f060e213 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1421,6 +1421,9 @@ _error: } if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -1700,6 +1703,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -1796,6 +1802,9 @@ _error: if (pInfo != NULL) destroySWindowOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -2113,6 +2122,9 @@ _error: if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -2450,6 +2462,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; From a82c220645b01760f1738250960cd11a5185629c Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 15 Aug 2024 17:38:29 +0800 Subject: [PATCH 4/4] fix issue --- source/libs/executor/src/groupoperator.c | 12 +++---- source/libs/executor/src/projectoperator.c | 10 +++--- .../executor/src/streamcountwindowoperator.c | 7 ++-- .../executor/src/streameventwindowoperator.c | 6 ++-- source/libs/executor/src/streamfilloperator.c | 6 ++-- .../executor/src/streamtimewindowoperator.c | 32 +++++++++++-------- source/libs/executor/src/tfill.c | 1 + source/libs/executor/src/timewindowoperator.c | 18 +++++------ 8 files changed, 50 insertions(+), 42 deletions(-) diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 064ce42840..e5289fa216 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -561,6 +561,10 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo } initBasicInfo(&pInfo->binfo, pResBlock); + pInfo->pGroupCols = NULL; + code = extractColumnInfo(pAggNode->pGroupKeys, &pInfo->pGroupCols); + QUERY_CHECK_CODE(code, lino, _error); + int32_t numOfScalarExpr = 0; SExprInfo* pScalarExprInfo = NULL; if (pAggNode->pExprs != NULL) { @@ -568,10 +572,6 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo QUERY_CHECK_CODE(code, lino, _error); } - pInfo->pGroupCols = NULL; - code = extractColumnInfo(pAggNode->pGroupKeys, &pInfo->pGroupCols); - QUERY_CHECK_CODE(code, lino, _error); - code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -1165,6 +1165,8 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo SExprInfo* pExprInfo = NULL; code = createExprInfo(pPartNode->pTargets, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); + pOperator->exprSupp.numOfExprs = numOfCols; + pOperator->exprSupp.pExprInfo = pExprInfo; pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys); @@ -1230,8 +1232,6 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 8426cb73fe..4d2bdc62f8 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -474,11 +474,6 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode; - int32_t numOfExpr = 0; - SExprInfo* pExprInfo = NULL; - code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr); - TSDB_CHECK_CODE(code, lino, _error); - if (pPhyNode->pExprs != NULL) { int32_t num = 0; SExprInfo* pSExpr = NULL; @@ -505,6 +500,11 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* code = blockDataEnsureCapacity(pResBlock, numOfRows); TSDB_CHECK_CODE(code, lino, _error); + int32_t numOfExpr = 0; + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr); + TSDB_CHECK_CODE(code, lino, _error); + code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); TSDB_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index fb4b9db05a..44a383772d 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -834,12 +834,14 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* } SExprSupp* pExpSup = &pOperator->exprSupp; + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + pInfo->binfo.pRes = pResBlock; + SExprInfo* pExprInfo = NULL; code = createExprInfo(pCountNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); - QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -863,7 +865,6 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); QUERY_CHECK_CODE(code, lino, _error); - pInfo->binfo.pRes = pResBlock; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = tSimpleHashInit(64, hashFn); QUERY_CHECK_NULL(pInfo->pStDeleted, code, lino, _error, terrno); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 67929678e5..ff1ff579fc 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -895,14 +895,16 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + pInfo->binfo.pRes = pResBlock; + SExprSupp* pExpSup = &pOperator->exprSupp; int32_t numOfCols = 0; SExprInfo* pExprInfo = NULL; code = createExprInfo(pEventNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); - QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 507ae724e0..75b15dbea4 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1370,6 +1370,9 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pFillExprInfo, &numOfFillCols); QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI); if (!pInfo->pFillSup) { code = TSDB_CODE_FAILED; @@ -1440,9 +1443,6 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); QUERY_CHECK_CODE(code, lino, _error); - code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore); - QUERY_CHECK_CODE(code, lino, _error); - pInfo->srcRowIndex = -1; setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 823897eccd..756a6d71e1 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1896,11 +1896,6 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN QUERY_CHECK_CODE(code, lino, _error); } - int32_t numOfCols = 0; - SExprInfo* pExprInfo = NULL; - code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); - QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); @@ -1914,6 +1909,12 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState); pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex); + + int32_t numOfCols = 0; + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -3742,13 +3743,15 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode } } SExprSupp* pExpSup = &pOperator->exprSupp; + + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + pInfo->binfo.pRes = pResBlock; SExprInfo* pExprInfo = NULL; code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); - QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -3774,7 +3777,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode if (pSessionNode->window.pTsEnd) { pInfo->endTsIndex = ((SColumnNode*)pSessionNode->window.pTsEnd)->slotId; } - pInfo->binfo.pRes = pResBlock; + pInfo->order = TSDB_ORDER_ASC; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = tSimpleHashInit(64, hashFn); @@ -4924,14 +4927,16 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + pInfo->binfo.pRes = pResBlock; + SExprSupp* pExpSup = &pOperator->exprSupp; int32_t numOfCols = 0; SExprInfo* pExprInfo = NULL; code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); - QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -5218,10 +5223,6 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode; - SExprInfo* pExprInfo = NULL; - code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); - QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); @@ -5265,6 +5266,9 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex); size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index a7e2ea3429..59c19a706c 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -764,6 +764,7 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn SValueNode* pv = (SValueNode*)nodesListGetNode(pValNode->pNodeList, index); QUERY_CHECK_NULL(pv, code, lino, _end, terrno); code = nodesValueNodeToVariant(pv, &pFillCol[i].fillVal); + QUERY_CHECK_CODE(code, lino, _end); } if (TSDB_CODE_SUCCESS != code) { goto _end; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b3f060e213..6a74c6a093 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1746,15 +1746,15 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + initBasicInfo(&pInfo->binfo, pResBlock); + int32_t numOfCols = 0; SExprInfo* pExprInfo = NULL; code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc); - QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); - initBasicInfo(&pInfo->binfo, pResBlock); - code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -2392,11 +2392,6 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva goto _error; } - int32_t num = 0; - SExprInfo* pExprInfo = NULL; - code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num); - QUERY_CHECK_CODE(code, lino, _error); - SInterval interval = {.interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, .intervalUnit = pIntervalPhyNode->intervalUnit, @@ -2420,6 +2415,11 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); + int32_t num = 0; + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) {