From a82c220645b01760f1738250960cd11a5185629c Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 15 Aug 2024 17:38:29 +0800 Subject: [PATCH] fix issue --- source/libs/executor/src/groupoperator.c | 12 +++---- source/libs/executor/src/projectoperator.c | 10 +++--- .../executor/src/streamcountwindowoperator.c | 7 ++-- .../executor/src/streameventwindowoperator.c | 6 ++-- source/libs/executor/src/streamfilloperator.c | 6 ++-- .../executor/src/streamtimewindowoperator.c | 32 +++++++++++-------- source/libs/executor/src/tfill.c | 1 + source/libs/executor/src/timewindowoperator.c | 18 +++++------ 8 files changed, 50 insertions(+), 42 deletions(-) diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 064ce42840..e5289fa216 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -561,6 +561,10 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo } initBasicInfo(&pInfo->binfo, pResBlock); + pInfo->pGroupCols = NULL; + code = extractColumnInfo(pAggNode->pGroupKeys, &pInfo->pGroupCols); + QUERY_CHECK_CODE(code, lino, _error); + int32_t numOfScalarExpr = 0; SExprInfo* pScalarExprInfo = NULL; if (pAggNode->pExprs != NULL) { @@ -568,10 +572,6 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo QUERY_CHECK_CODE(code, lino, _error); } - pInfo->pGroupCols = NULL; - code = extractColumnInfo(pAggNode->pGroupKeys, &pInfo->pGroupCols); - QUERY_CHECK_CODE(code, lino, _error); - code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -1165,6 +1165,8 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo SExprInfo* pExprInfo = NULL; code = createExprInfo(pPartNode->pTargets, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); + pOperator->exprSupp.numOfExprs = numOfCols; + pOperator->exprSupp.pExprInfo = pExprInfo; pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys); @@ -1230,8 +1232,6 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 8426cb73fe..4d2bdc62f8 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -474,11 +474,6 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode; - int32_t numOfExpr = 0; - SExprInfo* pExprInfo = NULL; - code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr); - TSDB_CHECK_CODE(code, lino, _error); - if (pPhyNode->pExprs != NULL) { int32_t num = 0; SExprInfo* pSExpr = NULL; @@ -505,6 +500,11 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* code = blockDataEnsureCapacity(pResBlock, numOfRows); TSDB_CHECK_CODE(code, lino, _error); + int32_t numOfExpr = 0; + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr); + TSDB_CHECK_CODE(code, lino, _error); + code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); TSDB_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index fb4b9db05a..44a383772d 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -834,12 +834,14 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* } SExprSupp* pExpSup = &pOperator->exprSupp; + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + pInfo->binfo.pRes = pResBlock; + SExprInfo* pExprInfo = NULL; code = createExprInfo(pCountNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); - QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -863,7 +865,6 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); QUERY_CHECK_CODE(code, lino, _error); - pInfo->binfo.pRes = pResBlock; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = tSimpleHashInit(64, hashFn); QUERY_CHECK_NULL(pInfo->pStDeleted, code, lino, _error, terrno); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 67929678e5..ff1ff579fc 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -895,14 +895,16 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + pInfo->binfo.pRes = pResBlock; + SExprSupp* pExpSup = &pOperator->exprSupp; int32_t numOfCols = 0; SExprInfo* pExprInfo = NULL; code = createExprInfo(pEventNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); - QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 507ae724e0..75b15dbea4 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1370,6 +1370,9 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pFillExprInfo, &numOfFillCols); QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI); if (!pInfo->pFillSup) { code = TSDB_CODE_FAILED; @@ -1440,9 +1443,6 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); QUERY_CHECK_CODE(code, lino, _error); - code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore); - QUERY_CHECK_CODE(code, lino, _error); - pInfo->srcRowIndex = -1; setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 823897eccd..756a6d71e1 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1896,11 +1896,6 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN QUERY_CHECK_CODE(code, lino, _error); } - int32_t numOfCols = 0; - SExprInfo* pExprInfo = NULL; - code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); - QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); @@ -1914,6 +1909,12 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState); pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex); + + int32_t numOfCols = 0; + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -3742,13 +3743,15 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode } } SExprSupp* pExpSup = &pOperator->exprSupp; + + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + pInfo->binfo.pRes = pResBlock; SExprInfo* pExprInfo = NULL; code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); - QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -3774,7 +3777,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode if (pSessionNode->window.pTsEnd) { pInfo->endTsIndex = ((SColumnNode*)pSessionNode->window.pTsEnd)->slotId; } - pInfo->binfo.pRes = pResBlock; + pInfo->order = TSDB_ORDER_ASC; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = tSimpleHashInit(64, hashFn); @@ -4924,14 +4927,16 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + pInfo->binfo.pRes = pResBlock; + SExprSupp* pExpSup = &pOperator->exprSupp; int32_t numOfCols = 0; SExprInfo* pExprInfo = NULL; code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); - QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -5218,10 +5223,6 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode; - SExprInfo* pExprInfo = NULL; - code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); - QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); @@ -5265,6 +5266,9 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex); size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index a7e2ea3429..59c19a706c 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -764,6 +764,7 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn SValueNode* pv = (SValueNode*)nodesListGetNode(pValNode->pNodeList, index); QUERY_CHECK_NULL(pv, code, lino, _end, terrno); code = nodesValueNodeToVariant(pv, &pFillCol[i].fillVal); + QUERY_CHECK_CODE(code, lino, _end); } if (TSDB_CODE_SUCCESS != code) { goto _end; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b3f060e213..6a74c6a093 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1746,15 +1746,15 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + initBasicInfo(&pInfo->binfo, pResBlock); + int32_t numOfCols = 0; SExprInfo* pExprInfo = NULL; code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc); - QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); - initBasicInfo(&pInfo->binfo, pResBlock); - code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -2392,11 +2392,6 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva goto _error; } - int32_t num = 0; - SExprInfo* pExprInfo = NULL; - code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num); - QUERY_CHECK_CODE(code, lino, _error); - SInterval interval = {.interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, .intervalUnit = pIntervalPhyNode->intervalUnit, @@ -2420,6 +2415,11 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); + int32_t num = 0; + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) {