diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 368107ca5c..cea1bbbb5a 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -451,6 +451,8 @@ typedef struct SIntervalAggOperatorInfo { SArray* pDelWins; // SWinRes int32_t delIndex; SSDataBlock* pDelRes; + + SNode *pCondition; } SIntervalAggOperatorInfo; typedef struct SStreamFinalIntervalOperatorInfo { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index fdef432d95..4683d66260 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1208,10 +1208,17 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { } blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity); - doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - - if (pBlock->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { - doSetOperatorCompleted(pOperator); + while (1) { + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); + doFilter(pInfo->pCondition, pBlock); + bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); + if (!hasRemain) { + doSetOperatorCompleted(pOperator); + break; + } + if (pBlock->info.rows > 0) { + break; + } } size_t rows = pBlock->info.rows; @@ -1662,6 +1669,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pInfo->execModel = pTaskInfo->execModel; pInfo->twAggSup = *pTwAggSupp; pInfo->ignoreExpiredData = pPhyNode->window.igExpired; + pInfo->pCondition = pPhyNode->window.node.pConditions; if (pPhyNode->window.pExprs != NULL) { int32_t numOfScalar = 0; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index ea8bae8259..458e0f545d 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -890,6 +890,20 @@ static int32_t pushDownCondOptDealProject(SOptimizeContext* pCxt, SProjectLogicN return code; } +static int32_t pushDownCondOptDealLogicNode(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { + if (NULL == pLogicNode->pConditions || + OPTIMIZE_FLAG_TEST_MASK(pLogicNode->optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) { + return TSDB_CODE_SUCCESS; + } + SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pLogicNode->pChildren, 0); + int32_t code = pushDownCondOptPushCondToChild(pCxt, pChild, &pLogicNode->pConditions); + if (TSDB_CODE_SUCCESS == code) { + OPTIMIZE_FLAG_SET_MASK(pLogicNode->optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE); + pCxt->optimized = true; + } + return code; +} + static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { int32_t code = TSDB_CODE_SUCCESS; switch (nodeType(pLogicNode)) { @@ -905,6 +919,10 @@ static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLog case QUERY_NODE_LOGIC_PLAN_PROJECT: code = pushDownCondOptDealProject(pCxt, (SProjectLogicNode*)pLogicNode); break; + case QUERY_NODE_LOGIC_PLAN_SORT: + case QUERY_NODE_LOGIC_PLAN_PARTITION: + code = pushDownCondOptDealLogicNode(pCxt, pLogicNode); + break; default: break; }