diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 9d77c9badd..88f2fedd28 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -479,6 +479,8 @@ typedef struct SAggOperatorInfo { uint64_t groupId; SGroupResInfo groupResInfo; SExprSupp scalarExprSup; + + SNode *pCondition; } SAggOperatorInfo; typedef struct SProjectOperatorInfo { @@ -758,7 +760,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, +SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pNode, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 25b61e15c3..f1f424af2e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3012,11 +3012,19 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { } blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf); - if (pInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pAggInfo->groupResInfo)) { - doSetOperatorCompleted(pOperator); - } + while (1) { + doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf); + doFilter(pAggInfo->pCondition, pInfo->pRes); + if (!hasDataInGroupInfo(&pAggInfo->groupResInfo)) { + doSetOperatorCompleted(pOperator); + break; + } + + if (pInfo->pRes->info.rows > 0) { + break; + } + } size_t rows = blockDataGetNumOfRows(pInfo->pRes); pOperator->resultInfo.totalRows += rows; @@ -3557,7 +3565,7 @@ int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) { } SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, + SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) { SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -3581,6 +3589,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* } pInfo->groupId = INT32_MIN; + pInfo->pCondition = pCondition; pOperator->name = "TableAggregate"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG; pOperator->blocking = true; @@ -4328,7 +4337,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pScalarExprInfo, numOfScalarExpr, pTaskInfo); } else { pOptr = - createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pScalarExprInfo, numOfScalarExpr, pTaskInfo); + createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pAggNode->node.pConditions, pScalarExprInfo, numOfScalarExpr, pTaskInfo); } } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;