Merge pull request #14389 from taosdata/szhou/feature/pushdown-agg-cond
feat: support push agg operator condition to scan
This commit is contained in:
commit
d6e1d1bcd0
|
@ -1348,7 +1348,7 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
|
|||
SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i);
|
||||
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
|
||||
// it is a reserved column for scalar function, and no data in this column yet.
|
||||
if (pDst->pData == NULL) {
|
||||
if (pDst->pData == NULL || pSrc->pData == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -1728,7 +1728,6 @@ static EDealRes classifyConditionImpl(SNode* pNode, void* pContext) {
|
|||
} else {
|
||||
pCxt->hasOtherCol = true;
|
||||
}
|
||||
return *((bool*)pContext) ? DEAL_RES_CONTINUE : DEAL_RES_END;
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
|
|
@ -586,11 +586,160 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAgg) {
|
||||
// todo
|
||||
typedef struct SPartAggCondContext {
|
||||
SAggLogicNode* pAgg;
|
||||
bool hasAggFunc;
|
||||
} SPartAggCondContext;
|
||||
|
||||
static EDealRes partAggCondHasAggFuncImpl(SNode* pNode, void* pContext) {
|
||||
SPartAggCondContext* pCxt = pContext;
|
||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||
SNode* pAggFunc = NULL;
|
||||
FOREACH(pAggFunc, pCxt->pAgg->pAggFuncs) {
|
||||
if (strcmp(((SColumnNode*)pNode)->colName, ((SFunctionNode*)pAggFunc)->node.aliasName) == 0) {
|
||||
pCxt->hasAggFunc = true;
|
||||
return DEAL_RES_END;
|
||||
}
|
||||
}
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static int32_t partitionAggCondHasAggFunc(SAggLogicNode* pAgg, SNode* pCond) {
|
||||
SPartAggCondContext cxt = {.pAgg = pAgg, .hasAggFunc = false};
|
||||
nodesWalkExpr(pCond, partAggCondHasAggFuncImpl, &cxt);
|
||||
return cxt.hasAggFunc;
|
||||
}
|
||||
|
||||
static int32_t partitionAggCondConj(SAggLogicNode* pAgg, SNode** ppAggFuncCond, SNode** ppGroupKeyCond) {
|
||||
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)pAgg->node.pConditions;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SNodeList* pAggFuncConds = NULL;
|
||||
SNodeList* pGroupKeyConds = NULL;
|
||||
SNode* pCond = NULL;
|
||||
FOREACH(pCond, pLogicCond->pParameterList) {
|
||||
if (partitionAggCondHasAggFunc(pAgg, pCond)) {
|
||||
code = nodesListMakeAppend(&pAggFuncConds, nodesCloneNode(pCond));
|
||||
} else {
|
||||
code = nodesListMakeAppend(&pGroupKeyConds, nodesCloneNode(pCond));
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
SNode* pTempAggFuncCond = NULL;
|
||||
SNode* pTempGroupKeyCond = NULL;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesMergeConds(&pTempAggFuncCond, &pAggFuncConds);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesMergeConds(&pTempGroupKeyCond, &pGroupKeyConds);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*ppAggFuncCond = pTempAggFuncCond;
|
||||
*ppGroupKeyCond = pTempGroupKeyCond;
|
||||
} else {
|
||||
nodesDestroyList(pAggFuncConds);
|
||||
nodesDestroyList(pGroupKeyConds);
|
||||
nodesDestroyNode(pTempAggFuncCond);
|
||||
nodesDestroyNode(pTempGroupKeyCond);
|
||||
}
|
||||
pAgg->node.pConditions = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t partitionAggCond(SAggLogicNode* pAgg, SNode** ppAggFunCond, SNode** ppGroupKeyCond) {
|
||||
SNode* pAggNodeCond = pAgg->node.pConditions;
|
||||
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pAggNodeCond) &&
|
||||
LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pAggNodeCond))->condType) {
|
||||
return partitionAggCondConj(pAgg, ppAggFunCond, ppGroupKeyCond);
|
||||
}
|
||||
if (partitionAggCondHasAggFunc(pAgg, pAggNodeCond)) {
|
||||
*ppAggFunCond = pAggNodeCond;
|
||||
} else {
|
||||
*ppGroupKeyCond = pAggNodeCond;
|
||||
}
|
||||
pAgg->node.pConditions = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t pushCondToAggCond(SOptimizeContext* pCxt, SAggLogicNode* pAgg, SNode** pAggFuncCond) {
|
||||
pushDownCondOptAppendCond(&pAgg->node.pConditions, pAggFuncCond);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
typedef struct SRewriteAggGroupKeyCondContext{
|
||||
SAggLogicNode *pAgg;
|
||||
int32_t errCode;
|
||||
} SRewriteAggGroupKeyCondContext;
|
||||
|
||||
static EDealRes rewriteAggGroupKeyCondForPushDownImpl(SNode** pNode, void* pContext) {
|
||||
SRewriteAggGroupKeyCondContext* pCxt = pContext;
|
||||
SAggLogicNode* pAgg = pCxt->pAgg;
|
||||
if (QUERY_NODE_COLUMN == nodeType(*pNode)) {
|
||||
SNode* pGroupKey = NULL;
|
||||
FOREACH(pGroupKey, pAgg->pGroupKeys) {
|
||||
SNode* pGroup = NULL;
|
||||
FOREACH(pGroup, ((SGroupingSetNode*)pGroupKey)->pParameterList) {
|
||||
if (0 == strcmp(((SExprNode*)pGroup)->aliasName, ((SColumnNode*)(*pNode))->colName)) {
|
||||
SNode* pExpr = nodesCloneNode(pGroup);
|
||||
if (pExpr == NULL) {
|
||||
pCxt->errCode = terrno;
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
nodesDestroyNode(*pNode);
|
||||
*pNode = pExpr;
|
||||
}
|
||||
}
|
||||
}
|
||||
return DEAL_RES_IGNORE_CHILD;
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static int32_t rewriteAggGroupKeyCondForPushDown(SOptimizeContext* pCxt, SAggLogicNode* pAgg, SNode* pGroupKeyCond) {
|
||||
SRewriteAggGroupKeyCondContext cxt = {.pAgg = pAgg, .errCode = TSDB_CODE_SUCCESS};
|
||||
nodesRewriteExpr(&pGroupKeyCond, rewriteAggGroupKeyCondForPushDownImpl, &cxt);
|
||||
return cxt.errCode;
|
||||
}
|
||||
|
||||
static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAgg) {
|
||||
if (NULL == pAgg->node.pConditions ||
|
||||
OPTIMIZE_FLAG_TEST_MASK(pAgg->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
//TODO: remove it after full implementation of pushing down to child
|
||||
if (1 != LIST_LENGTH(pAgg->node.pChildren) ||
|
||||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pAgg->node.pChildren, 0))) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SNode* pAggFuncCond = NULL;
|
||||
SNode* pGroupKeyCond = NULL;
|
||||
int32_t code = partitionAggCond(pAgg, &pAggFuncCond, &pGroupKeyCond);
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pAggFuncCond) {
|
||||
code = pushCondToAggCond(pCxt, pAgg, &pAggFuncCond);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeyCond) {
|
||||
code = rewriteAggGroupKeyCondForPushDown(pCxt, pAgg, pGroupKeyCond);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeyCond) {
|
||||
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0);
|
||||
code = pushDownCondOptPushCondToChild(pCxt, pChild, &pGroupKeyCond);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
OPTIMIZE_FLAG_SET_MASK(pAgg->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE);
|
||||
pCxt->optimized = true;
|
||||
} else {
|
||||
nodesDestroyNode(pGroupKeyCond);
|
||||
nodesDestroyNode(pAggFuncCond);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
switch (nodeType(pLogicNode)) {
|
||||
|
@ -1707,8 +1856,8 @@ static EDealRes mergeProjectionsExpr(SNode** pNode, void* pContext) {
|
|||
|
||||
static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SLogicNode* pSelfNode) {
|
||||
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSelfNode->pChildren, 0);
|
||||
SMergeProjectionsContext cxt = {.pChildProj = (SProjectLogicNode*)pChild, .errCode = TSDB_CODE_SUCCESS};
|
||||
|
||||
SMergeProjectionsContext cxt = {.pChildProj = (SProjectLogicNode*)pChild, .errCode = TSDB_CODE_SUCCESS};
|
||||
nodesRewriteExprs(((SProjectLogicNode*)pSelfNode)->pProjections, mergeProjectionsExpr, &cxt);
|
||||
int32_t code = cxt.errCode;
|
||||
|
||||
|
|
Loading…
Reference in New Issue