Merge pull request #14605 from taosdata/szhou/feature/push-project-condition

feat: push condition down the tree
This commit is contained in:
shenglian-zhou 2022-07-06 17:45:37 +08:00 committed by GitHub
commit 7aebf05699
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 32 additions and 4 deletions

View File

@ -451,6 +451,8 @@ typedef struct SIntervalAggOperatorInfo {
SArray* pDelWins; // SWinRes SArray* pDelWins; // SWinRes
int32_t delIndex; int32_t delIndex;
SSDataBlock* pDelRes; SSDataBlock* pDelRes;
SNode *pCondition;
} SIntervalAggOperatorInfo; } SIntervalAggOperatorInfo;
typedef struct SStreamFinalIntervalOperatorInfo { typedef struct SStreamFinalIntervalOperatorInfo {

View File

@ -1208,10 +1208,17 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
} }
blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity);
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pBlock->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { doFilter(pInfo->pCondition, pBlock);
doSetOperatorCompleted(pOperator); bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) {
doSetOperatorCompleted(pOperator);
break;
}
if (pBlock->info.rows > 0) {
break;
}
} }
size_t rows = pBlock->info.rows; size_t rows = pBlock->info.rows;
@ -1662,6 +1669,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->execModel = pTaskInfo->execModel; pInfo->execModel = pTaskInfo->execModel;
pInfo->twAggSup = *pTwAggSupp; pInfo->twAggSup = *pTwAggSupp;
pInfo->ignoreExpiredData = pPhyNode->window.igExpired; pInfo->ignoreExpiredData = pPhyNode->window.igExpired;
pInfo->pCondition = pPhyNode->window.node.pConditions;
if (pPhyNode->window.pExprs != NULL) { if (pPhyNode->window.pExprs != NULL) {
int32_t numOfScalar = 0; int32_t numOfScalar = 0;

View File

@ -890,6 +890,20 @@ static int32_t pushDownCondOptDealProject(SOptimizeContext* pCxt, SProjectLogicN
return code; return code;
} }
static int32_t pushDownCondOptDealLogicNode(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
if (NULL == pLogicNode->pConditions ||
OPTIMIZE_FLAG_TEST_MASK(pLogicNode->optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) {
return TSDB_CODE_SUCCESS;
}
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pLogicNode->pChildren, 0);
int32_t code = pushDownCondOptPushCondToChild(pCxt, pChild, &pLogicNode->pConditions);
if (TSDB_CODE_SUCCESS == code) {
OPTIMIZE_FLAG_SET_MASK(pLogicNode->optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE);
pCxt->optimized = true;
}
return code;
}
static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pLogicNode)) { switch (nodeType(pLogicNode)) {
@ -905,6 +919,10 @@ static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLog
case QUERY_NODE_LOGIC_PLAN_PROJECT: case QUERY_NODE_LOGIC_PLAN_PROJECT:
code = pushDownCondOptDealProject(pCxt, (SProjectLogicNode*)pLogicNode); code = pushDownCondOptDealProject(pCxt, (SProjectLogicNode*)pLogicNode);
break; break;
case QUERY_NODE_LOGIC_PLAN_SORT:
case QUERY_NODE_LOGIC_PLAN_PARTITION:
code = pushDownCondOptDealLogicNode(pCxt, pLogicNode);
break;
default: default:
break; break;
} }