feat: support push agg operator condition to scan
This commit is contained in:
parent
8ac13a1234
commit
b6d6657174
|
@ -1728,7 +1728,6 @@ static EDealRes classifyConditionImpl(SNode* pNode, void* pContext) {
|
||||||
} else {
|
} else {
|
||||||
pCxt->hasOtherCol = true;
|
pCxt->hasOtherCol = true;
|
||||||
}
|
}
|
||||||
return *((bool*)pContext) ? DEAL_RES_CONTINUE : DEAL_RES_END;
|
|
||||||
}
|
}
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
|
@ -587,11 +587,160 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAgg) {
|
typedef struct SPartAggCondContext {
|
||||||
// todo
|
SAggLogicNode* pAgg;
|
||||||
|
bool hasAggFunc;
|
||||||
|
} SPartAggCondContext;
|
||||||
|
|
||||||
|
static EDealRes partAggCondHasAggFuncImpl(SNode* pNode, void* pContext) {
|
||||||
|
SPartAggCondContext* pCxt = pContext;
|
||||||
|
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
|
SNode* pAggFunc = NULL;
|
||||||
|
FOREACH(pAggFunc, pCxt->pAgg->pAggFuncs) {
|
||||||
|
if (strcmp(((SColumnNode*)pNode)->colName, ((SFunctionNode*)pAggFunc)->node.aliasName) == 0) {
|
||||||
|
pCxt->hasAggFunc = true;
|
||||||
|
return DEAL_RES_END;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t partitionAggCondHasAggFunc(SAggLogicNode* pAgg, SNode* pCond) {
|
||||||
|
SPartAggCondContext cxt = {.pAgg = pAgg, .hasAggFunc = false};
|
||||||
|
nodesWalkExpr(pCond, partAggCondHasAggFuncImpl, &cxt);
|
||||||
|
return cxt.hasAggFunc;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t partitionAggCondConj(SAggLogicNode* pAgg, SNode** ppAggFuncCond, SNode** ppGroupKeyCond) {
|
||||||
|
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)pAgg->node.pConditions;
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
SNodeList* pAggFuncConds = NULL;
|
||||||
|
SNodeList* pGroupKeyConds = NULL;
|
||||||
|
SNode* pCond = NULL;
|
||||||
|
FOREACH(pCond, pLogicCond->pParameterList) {
|
||||||
|
if (partitionAggCondHasAggFunc(pAgg, pCond)) {
|
||||||
|
code = nodesListMakeAppend(&pAggFuncConds, nodesCloneNode(pCond));
|
||||||
|
} else {
|
||||||
|
code = nodesListMakeAppend(&pGroupKeyConds, nodesCloneNode(pCond));
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* pTempAggFuncCond = NULL;
|
||||||
|
SNode* pTempGroupKeyCond = NULL;
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = nodesMergeConds(&pTempAggFuncCond, &pAggFuncConds);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = nodesMergeConds(&pTempGroupKeyCond, &pGroupKeyConds);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
*ppAggFuncCond = pTempAggFuncCond;
|
||||||
|
*ppGroupKeyCond = pTempGroupKeyCond;
|
||||||
|
} else {
|
||||||
|
nodesDestroyList(pAggFuncConds);
|
||||||
|
nodesDestroyList(pGroupKeyConds);
|
||||||
|
nodesDestroyNode(pTempAggFuncCond);
|
||||||
|
nodesDestroyNode(pTempGroupKeyCond);
|
||||||
|
}
|
||||||
|
pAgg->node.pConditions = NULL;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t partitionAggCond(SAggLogicNode* pAgg, SNode** ppAggFunCond, SNode** ppGroupKeyCond) {
|
||||||
|
SNode* pAggNodeCond = pAgg->node.pConditions;
|
||||||
|
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pAggNodeCond) &&
|
||||||
|
LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pAggNodeCond))->condType) {
|
||||||
|
return partitionAggCondConj(pAgg, ppAggFunCond, ppGroupKeyCond);
|
||||||
|
}
|
||||||
|
if (partitionAggCondHasAggFunc(pAgg, pAggNodeCond)) {
|
||||||
|
*ppAggFunCond = pAggNodeCond;
|
||||||
|
} else {
|
||||||
|
*ppGroupKeyCond = pAggNodeCond;
|
||||||
|
}
|
||||||
|
pAgg->node.pConditions = NULL;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t pushCondToAggCond(SOptimizeContext* pCxt, SAggLogicNode* pAgg, SNode** pAggFuncCond) {
|
||||||
|
pushDownCondOptAppendCond(&pAgg->node.pConditions, pAggFuncCond);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
typedef struct SRewriteAggGroupKeyCondContext{
|
||||||
|
SAggLogicNode *pAgg;
|
||||||
|
int32_t errCode;
|
||||||
|
} SRewriteAggGroupKeyCondContext;
|
||||||
|
|
||||||
|
static EDealRes rewriteAggGroupKeyCondForPushDownImpl(SNode** pNode, void* pContext) {
|
||||||
|
SRewriteAggGroupKeyCondContext* pCxt = pContext;
|
||||||
|
SAggLogicNode* pAgg = pCxt->pAgg;
|
||||||
|
if (QUERY_NODE_COLUMN == nodeType(*pNode)) {
|
||||||
|
SNode* pGroupKey = NULL;
|
||||||
|
FOREACH(pGroupKey, pAgg->pGroupKeys) {
|
||||||
|
SNode* pGroup = NULL;
|
||||||
|
FOREACH(pGroup, ((SGroupingSetNode*)pGroupKey)->pParameterList) {
|
||||||
|
if (0 == strcmp(((SExprNode*)pGroup)->aliasName, ((SColumnNode*)(*pNode))->colName)) {
|
||||||
|
SNode* pExpr = nodesCloneNode(pGroup);
|
||||||
|
if (pExpr == NULL) {
|
||||||
|
pCxt->errCode = terrno;
|
||||||
|
return DEAL_RES_ERROR;
|
||||||
|
}
|
||||||
|
nodesDestroyNode(*pNode);
|
||||||
|
*pNode = pExpr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return DEAL_RES_IGNORE_CHILD;
|
||||||
|
}
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t rewriteAggGroupKeyCondForPushDown(SOptimizeContext* pCxt, SAggLogicNode* pAgg, SNode* pGroupKeyCond) {
|
||||||
|
SRewriteAggGroupKeyCondContext cxt = {.pAgg = pAgg, .errCode = TSDB_CODE_SUCCESS};
|
||||||
|
nodesRewriteExpr(&pGroupKeyCond, rewriteAggGroupKeyCondForPushDownImpl, &cxt);
|
||||||
|
return cxt.errCode;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAgg) {
|
||||||
|
if (NULL == pAgg->node.pConditions ||
|
||||||
|
OPTIMIZE_FLAG_TEST_MASK(pAgg->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
//TODO: remove it after full implementation of pushing down to child
|
||||||
|
if (1 != LIST_LENGTH(pAgg->node.pChildren) ||
|
||||||
|
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pAgg->node.pChildren, 0))) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* pAggFuncCond = NULL;
|
||||||
|
SNode* pGroupKeyCond = NULL;
|
||||||
|
int32_t code = partitionAggCond(pAgg, &pAggFuncCond, &pGroupKeyCond);
|
||||||
|
if (TSDB_CODE_SUCCESS == code && NULL != pAggFuncCond) {
|
||||||
|
code = pushCondToAggCond(pCxt, pAgg, &pAggFuncCond);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeyCond) {
|
||||||
|
code = rewriteAggGroupKeyCondForPushDown(pCxt, pAgg, pGroupKeyCond);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeyCond) {
|
||||||
|
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0);
|
||||||
|
code = pushDownCondOptPushCondToChild(pCxt, pChild, &pGroupKeyCond);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
OPTIMIZE_FLAG_SET_MASK(pAgg->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE);
|
||||||
|
pCxt->optimized = true;
|
||||||
|
} else {
|
||||||
|
nodesDestroyNode(pGroupKeyCond);
|
||||||
|
nodesDestroyNode(pAggFuncCond);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
|
static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
switch (nodeType(pLogicNode)) {
|
switch (nodeType(pLogicNode)) {
|
||||||
|
@ -1668,8 +1817,8 @@ static EDealRes mergeProjectionsExpr(SNode** pNode, void* pContext) {
|
||||||
|
|
||||||
static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SLogicNode* pSelfNode) {
|
static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SLogicNode* pSelfNode) {
|
||||||
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSelfNode->pChildren, 0);
|
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSelfNode->pChildren, 0);
|
||||||
SMergeProjectionsContext cxt = {.pChildProj = (SProjectLogicNode*)pChild, .errCode = TSDB_CODE_SUCCESS};
|
|
||||||
|
|
||||||
|
SMergeProjectionsContext cxt = {.pChildProj = (SProjectLogicNode*)pChild, .errCode = TSDB_CODE_SUCCESS};
|
||||||
nodesRewriteExprs(((SProjectLogicNode*)pSelfNode)->pProjections, mergeProjectionsExpr, &cxt);
|
nodesRewriteExprs(((SProjectLogicNode*)pSelfNode)->pProjections, mergeProjectionsExpr, &cxt);
|
||||||
int32_t code = cxt.errCode;
|
int32_t code = cxt.errCode;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue