diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 839a936e72..52d301dde8 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1348,7 +1348,7 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i); SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); // it is a reserved column for scalar function, and no data in this column yet. - if (pDst->pData == NULL) { + if (pDst->pData == NULL || pSrc->pData == NULL) { continue; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index dc9d9b92ee..1972010e25 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1728,7 +1728,6 @@ static EDealRes classifyConditionImpl(SNode* pNode, void* pContext) { } else { pCxt->hasOtherCol = true; } - return *((bool*)pContext) ? DEAL_RES_CONTINUE : DEAL_RES_END; } return DEAL_RES_CONTINUE; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index f11ef7f797..c303c63ab7 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -586,11 +586,160 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p return code; } -static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAgg) { - // todo +typedef struct SPartAggCondContext { + SAggLogicNode* pAgg; + bool hasAggFunc; +} SPartAggCondContext; + +static EDealRes partAggCondHasAggFuncImpl(SNode* pNode, void* pContext) { + SPartAggCondContext* pCxt = pContext; + if (QUERY_NODE_COLUMN == nodeType(pNode)) { + SNode* pAggFunc = NULL; + FOREACH(pAggFunc, pCxt->pAgg->pAggFuncs) { + if (strcmp(((SColumnNode*)pNode)->colName, ((SFunctionNode*)pAggFunc)->node.aliasName) == 0) { + pCxt->hasAggFunc = true; + return DEAL_RES_END; + } + } + } + return DEAL_RES_CONTINUE; +} + +static int32_t partitionAggCondHasAggFunc(SAggLogicNode* pAgg, SNode* pCond) { + SPartAggCondContext cxt = {.pAgg = pAgg, .hasAggFunc = false}; + nodesWalkExpr(pCond, partAggCondHasAggFuncImpl, &cxt); + return cxt.hasAggFunc; +} + +static int32_t partitionAggCondConj(SAggLogicNode* pAgg, SNode** ppAggFuncCond, SNode** ppGroupKeyCond) { + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)pAgg->node.pConditions; + int32_t code = TSDB_CODE_SUCCESS; + + SNodeList* pAggFuncConds = NULL; + SNodeList* pGroupKeyConds = NULL; + SNode* pCond = NULL; + FOREACH(pCond, pLogicCond->pParameterList) { + if (partitionAggCondHasAggFunc(pAgg, pCond)) { + code = nodesListMakeAppend(&pAggFuncConds, nodesCloneNode(pCond)); + } else { + code = nodesListMakeAppend(&pGroupKeyConds, nodesCloneNode(pCond)); + } + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + + SNode* pTempAggFuncCond = NULL; + SNode* pTempGroupKeyCond = NULL; + if (TSDB_CODE_SUCCESS == code) { + code = nodesMergeConds(&pTempAggFuncCond, &pAggFuncConds); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesMergeConds(&pTempGroupKeyCond, &pGroupKeyConds); + } + + if (TSDB_CODE_SUCCESS == code) { + *ppAggFuncCond = pTempAggFuncCond; + *ppGroupKeyCond = pTempGroupKeyCond; + } else { + nodesDestroyList(pAggFuncConds); + nodesDestroyList(pGroupKeyConds); + nodesDestroyNode(pTempAggFuncCond); + nodesDestroyNode(pTempGroupKeyCond); + } + pAgg->node.pConditions = NULL; + return code; +} + +static int32_t partitionAggCond(SAggLogicNode* pAgg, SNode** ppAggFunCond, SNode** ppGroupKeyCond) { + SNode* pAggNodeCond = pAgg->node.pConditions; + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pAggNodeCond) && + LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pAggNodeCond))->condType) { + return partitionAggCondConj(pAgg, ppAggFunCond, ppGroupKeyCond); + } + if (partitionAggCondHasAggFunc(pAgg, pAggNodeCond)) { + *ppAggFunCond = pAggNodeCond; + } else { + *ppGroupKeyCond = pAggNodeCond; + } + pAgg->node.pConditions = NULL; return TSDB_CODE_SUCCESS; } +static int32_t pushCondToAggCond(SOptimizeContext* pCxt, SAggLogicNode* pAgg, SNode** pAggFuncCond) { + pushDownCondOptAppendCond(&pAgg->node.pConditions, pAggFuncCond); + return TSDB_CODE_SUCCESS; +} + +typedef struct SRewriteAggGroupKeyCondContext{ + SAggLogicNode *pAgg; + int32_t errCode; +} SRewriteAggGroupKeyCondContext; + +static EDealRes rewriteAggGroupKeyCondForPushDownImpl(SNode** pNode, void* pContext) { + SRewriteAggGroupKeyCondContext* pCxt = pContext; + SAggLogicNode* pAgg = pCxt->pAgg; + if (QUERY_NODE_COLUMN == nodeType(*pNode)) { + SNode* pGroupKey = NULL; + FOREACH(pGroupKey, pAgg->pGroupKeys) { + SNode* pGroup = NULL; + FOREACH(pGroup, ((SGroupingSetNode*)pGroupKey)->pParameterList) { + if (0 == strcmp(((SExprNode*)pGroup)->aliasName, ((SColumnNode*)(*pNode))->colName)) { + SNode* pExpr = nodesCloneNode(pGroup); + if (pExpr == NULL) { + pCxt->errCode = terrno; + return DEAL_RES_ERROR; + } + nodesDestroyNode(*pNode); + *pNode = pExpr; + } + } + } + return DEAL_RES_IGNORE_CHILD; + } + return DEAL_RES_CONTINUE; +} + +static int32_t rewriteAggGroupKeyCondForPushDown(SOptimizeContext* pCxt, SAggLogicNode* pAgg, SNode* pGroupKeyCond) { + SRewriteAggGroupKeyCondContext cxt = {.pAgg = pAgg, .errCode = TSDB_CODE_SUCCESS}; + nodesRewriteExpr(&pGroupKeyCond, rewriteAggGroupKeyCondForPushDownImpl, &cxt); + return cxt.errCode; +} + +static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAgg) { + if (NULL == pAgg->node.pConditions || + OPTIMIZE_FLAG_TEST_MASK(pAgg->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) { + return TSDB_CODE_SUCCESS; + } + //TODO: remove it after full implementation of pushing down to child + if (1 != LIST_LENGTH(pAgg->node.pChildren) || + QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pAgg->node.pChildren, 0))) { + return TSDB_CODE_SUCCESS; + } + + SNode* pAggFuncCond = NULL; + SNode* pGroupKeyCond = NULL; + int32_t code = partitionAggCond(pAgg, &pAggFuncCond, &pGroupKeyCond); + if (TSDB_CODE_SUCCESS == code && NULL != pAggFuncCond) { + code = pushCondToAggCond(pCxt, pAgg, &pAggFuncCond); + } + if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeyCond) { + code = rewriteAggGroupKeyCondForPushDown(pCxt, pAgg, pGroupKeyCond); + } + if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeyCond) { + SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0); + code = pushDownCondOptPushCondToChild(pCxt, pChild, &pGroupKeyCond); + } + if (TSDB_CODE_SUCCESS == code) { + OPTIMIZE_FLAG_SET_MASK(pAgg->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE); + pCxt->optimized = true; + } else { + nodesDestroyNode(pGroupKeyCond); + nodesDestroyNode(pAggFuncCond); + } + return code; +} + static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { int32_t code = TSDB_CODE_SUCCESS; switch (nodeType(pLogicNode)) { @@ -1707,8 +1856,8 @@ static EDealRes mergeProjectionsExpr(SNode** pNode, void* pContext) { static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SLogicNode* pSelfNode) { SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSelfNode->pChildren, 0); - SMergeProjectionsContext cxt = {.pChildProj = (SProjectLogicNode*)pChild, .errCode = TSDB_CODE_SUCCESS}; + SMergeProjectionsContext cxt = {.pChildProj = (SProjectLogicNode*)pChild, .errCode = TSDB_CODE_SUCCESS}; nodesRewriteExprs(((SProjectLogicNode*)pSelfNode)->pProjections, mergeProjectionsExpr, &cxt); int32_t code = cxt.errCode;