diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index e29750d8a0..de8e1edb72 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -123,12 +123,15 @@ typedef struct SScanLogicNode { typedef struct SJoinLogicNode { SLogicNode node; EJoinType joinType; + EJoinSubType subType; + SNode* pWindowOffset; + SNode* pJLimit; EJoinAlgorithm joinAlgo; SNode* pPrimKeyEqCond; SNode* pColEqCond; SNode* pTagEqCond; SNode* pTagOnCond; - SNode* pOtherOnCond; + SNode* pFullOnCond; bool isSingleTableJoin; bool hasSubQuery; bool isLowLevelJoin; @@ -468,7 +471,7 @@ typedef struct SSortMergeJoinPhysiNode { EJoinType joinType; SNode* pPrimKeyCond; SNode* pColEqCond; - SNode* pOtherOnCond; + SNode* pFullOnCond; SNodeList* pTargets; } SSortMergeJoinPhysiNode; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 8b8f7fefd6..0c98820408 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -203,21 +203,25 @@ typedef struct SViewNode { int8_t cacheLastMode; } SViewNode; +#define IS_INNER_NONE_JOIN(_type, _stype) ((_type) == JOIN_TYPE_INNER && (_stype) == JOIN_STYPE_NONE) + typedef enum EJoinType { - JOIN_TYPE_INNER = 1, + JOIN_TYPE_INNER = 0, JOIN_TYPE_LEFT, JOIN_TYPE_RIGHT, JOIN_TYPE_FULL, + JOIN_TYPE_MAX_VALUE } EJoinType; typedef enum EJoinSubType { - JOIN_STYPE_NONE = 1, + JOIN_STYPE_NONE = 0, JOIN_STYPE_OUTER, JOIN_STYPE_SEMI, JOIN_STYPE_ANTI, JOIN_STYPE_ANY, JOIN_STYPE_ASOF, JOIN_STYPE_WIN, + JOIN_STYPE_MAX_VALUE } EJoinSubType; typedef enum EJoinAlgorithm { @@ -238,6 +242,7 @@ typedef struct SJoinTableNode { SNode* pJLimit; bool hasSubQuery; bool isLowLevelJoin; + SNode* pParent; SNode* pLeft; SNode* pRight; SNode* pOnCond; @@ -546,12 +551,15 @@ typedef struct SQuery { bool stableQuery; } SQuery; +void nodesWalkSelectStmtImpl(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext, bool ignoreFrom); void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext); void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewriter rewriter, void* pContext); typedef enum ECollectColType { COLLECT_COL_TYPE_COL = 1, COLLECT_COL_TYPE_TAG, COLLECT_COL_TYPE_ALL } ECollectColType; int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char* pTableAlias, ECollectColType type, SNodeList** pCols); +int32_t nodesCollectColumnsExt(SSelectStmt* pSelect, ESqlClause clause, const char* pTableAlias, ECollectColType type, + SNodeList** pCols, bool ignoreFrom); int32_t nodesCollectColumnsFromNode(SNode* node, const char* pTableAlias, ECollectColType type, SNodeList** pCols); typedef bool (*FFuncClassifier)(int32_t funcId); diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 66b50bcb47..de7d7ee111 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -631,10 +631,10 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ON_CONDITIONS_FORMAT); QRY_ERR_RET( nodesNodeToSQL(pJoinNode->pPrimKeyCond, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); - if (pJoinNode->pOtherOnCond) { + if (pJoinNode->pFullOnCond) { EXPLAIN_ROW_APPEND(" AND "); QRY_ERR_RET( - nodesNodeToSQL(pJoinNode->pOtherOnCond, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + nodesNodeToSQL(pJoinNode->pFullOnCond, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); } EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index b4461f20b1..5e74edc47f 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -255,7 +255,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t extractTimeCondition(pInfo, pJoinNode, GET_TASKID(pTaskInfo)); - if (pJoinNode->pOtherOnCond != NULL && pJoinNode->node.pConditions != NULL) { + if (pJoinNode->pFullOnCond != NULL && pJoinNode->node.pConditions != NULL) { pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); if (pInfo->pCondAfterMerge == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -269,11 +269,11 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t goto _error; } - nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pOtherOnCond)); + nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pFullOnCond)); nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions)); pLogicCond->condType = LOGIC_COND_TYPE_AND; - } else if (pJoinNode->pOtherOnCond != NULL) { - pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pOtherOnCond); + } else if (pJoinNode->pFullOnCond != NULL) { + pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pFullOnCond); } else if (pJoinNode->pColEqCond != NULL) { pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pColEqCond); } else if (pJoinNode->node.pConditions != NULL) { diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index e21cab40b9..aa4661cf3e 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -443,7 +443,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { CLONE_NODE_FIELD(pColEqCond); CLONE_NODE_FIELD(pTagEqCond); CLONE_NODE_FIELD(pTagOnCond); - CLONE_NODE_FIELD(pOtherOnCond); + CLONE_NODE_FIELD(pFullOnCond); COPY_SCALAR_FIELD(isSingleTableJoin); COPY_SCALAR_FIELD(hasSubQuery); return TSDB_CODE_SUCCESS; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 82f89f4471..3246c81e48 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1567,7 +1567,7 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { code = tjsonAddObject(pJson, jkJoinLogicPlanTagEqCondition, nodeToJson, pNode->pTagEqCond); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddObject(pJson, jkJoinLogicPlanOnConditions, nodeToJson, pNode->pOtherOnCond); + code = tjsonAddObject(pJson, jkJoinLogicPlanOnConditions, nodeToJson, pNode->pFullOnCond); } return code; } @@ -1592,7 +1592,7 @@ static int32_t jsonToLogicJoinNode(const SJson* pJson, void* pObj) { code = jsonToNodeObject(pJson, jkJoinLogicPlanTagEqCondition, &pNode->pTagEqCond); } if (TSDB_CODE_SUCCESS == code) { - code = jsonToNodeObject(pJson, jkJoinLogicPlanOnConditions, &pNode->pOtherOnCond); + code = jsonToNodeObject(pJson, jkJoinLogicPlanOnConditions, &pNode->pFullOnCond); } return code; @@ -2080,7 +2080,7 @@ static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) { code = tjsonAddObject(pJson, jkJoinPhysiPlanPrimKeyCondition, nodeToJson, pNode->pPrimKeyCond); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddObject(pJson, jkJoinPhysiPlanOnConditions, nodeToJson, pNode->pOtherOnCond); + code = tjsonAddObject(pJson, jkJoinPhysiPlanOnConditions, nodeToJson, pNode->pFullOnCond); } if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkJoinPhysiPlanTargets, pNode->pTargets); @@ -2099,7 +2099,7 @@ static int32_t jsonToPhysiMergeJoinNode(const SJson* pJson, void* pObj) { tjsonGetNumberValue(pJson, jkJoinPhysiPlanJoinType, pNode->joinType, code); } if (TSDB_CODE_SUCCESS == code) { - code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pOtherOnCond); + code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pFullOnCond); } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkJoinPhysiPlanPrimKeyCondition, &pNode->pPrimKeyCond); diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 479ea2e5d2..bef4730ec6 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2413,7 +2413,7 @@ static int32_t physiMergeJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_PRIM_KEY_CONDITION, nodeToMsg, pNode->pPrimKeyCond); } if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS, nodeToMsg, pNode->pOtherOnCond); + code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS, nodeToMsg, pNode->pFullOnCond); } if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_TARGETS, nodeListToMsg, pNode->pTargets); @@ -2441,7 +2441,7 @@ static int32_t msgToPhysiMergeJoinNode(STlvDecoder* pDecoder, void* pObj) { code = msgToNodeFromTlv(pTlv, (void**)&pNode->pPrimKeyCond); break; case PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS: - code = msgToNodeFromTlv(pTlv, (void**)&pNode->pOtherOnCond); + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pFullOnCond); break; case PHY_SORT_MERGE_JOIN_CODE_TARGETS: code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); diff --git a/source/libs/nodes/src/nodesTraverseFuncs.c b/source/libs/nodes/src/nodesTraverseFuncs.c index 9040551d37..1df71cce2c 100644 --- a/source/libs/nodes/src/nodesTraverseFuncs.c +++ b/source/libs/nodes/src/nodesTraverseFuncs.c @@ -412,14 +412,16 @@ void nodesRewriteExprsPostOrder(SNodeList* pList, FNodeRewriter rewriter, void* (void)rewriteExprs(pList, TRAVERSAL_POSTORDER, rewriter, pContext); } -void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext) { +void nodesWalkSelectStmtImpl(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext, bool ignoreFrom) { if (NULL == pSelect) { return; } switch (clause) { case SQL_CLAUSE_FROM: - nodesWalkExpr(pSelect->pFromTable, walker, pContext); + if (!ignoreFrom) { + nodesWalkExpr(pSelect->pFromTable, walker, pContext); + } nodesWalkExpr(pSelect->pWhere, walker, pContext); case SQL_CLAUSE_WHERE: nodesWalkExprs(pSelect->pPartitionByList, walker, pContext); @@ -448,6 +450,10 @@ void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker wa return; } +void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext) { + nodesWalkSelectStmtImpl(pSelect, clause, walker, pContext, false); +} + void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewriter rewriter, void* pContext) { if (NULL == pSelect) { return; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 15ff382bf2..c9014e91db 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1194,7 +1194,7 @@ void nodesDestroyNode(SNode* pNode) { SJoinLogicNode* pLogicNode = (SJoinLogicNode*)pNode; destroyLogicNode((SLogicNode*)pLogicNode); nodesDestroyNode(pLogicNode->pPrimKeyEqCond); - nodesDestroyNode(pLogicNode->pOtherOnCond); + nodesDestroyNode(pLogicNode->pFullOnCond); nodesDestroyNode(pLogicNode->pColEqCond); break; } @@ -1339,7 +1339,7 @@ void nodesDestroyNode(SNode* pNode) { SSortMergeJoinPhysiNode* pPhyNode = (SSortMergeJoinPhysiNode*)pNode; destroyPhysiNode((SPhysiNode*)pPhyNode); nodesDestroyNode(pPhyNode->pPrimKeyCond); - nodesDestroyNode(pPhyNode->pOtherOnCond); + nodesDestroyNode(pPhyNode->pFullOnCond); nodesDestroyList(pPhyNode->pTargets); nodesDestroyNode(pPhyNode->pColEqCond); break; @@ -2071,6 +2071,37 @@ int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char* return TSDB_CODE_SUCCESS; } +int32_t nodesCollectColumnsExt(SSelectStmt* pSelect, ESqlClause clause, const char* pTableAlias, ECollectColType type, + SNodeList** pCols, bool ignoreFrom) { + if (NULL == pSelect || NULL == pCols) { + return TSDB_CODE_FAILED; + } + + SCollectColumnsCxt cxt = { + .errCode = TSDB_CODE_SUCCESS, + .pTableAlias = pTableAlias, + .collectType = type, + .pCols = (NULL == *pCols ? nodesMakeList() : *pCols), + .pColHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK)}; + if (NULL == cxt.pCols || NULL == cxt.pColHash) { + return TSDB_CODE_OUT_OF_MEMORY; + } + *pCols = NULL; + nodesWalkSelectStmtImpl(pSelect, clause, collectColumns, &cxt, ignoreFrom); + taosHashCleanup(cxt.pColHash); + if (TSDB_CODE_SUCCESS != cxt.errCode) { + nodesDestroyList(cxt.pCols); + return cxt.errCode; + } + if (LIST_LENGTH(cxt.pCols) > 0) { + *pCols = cxt.pCols; + } else { + nodesDestroyList(cxt.pCols); + } + + return TSDB_CODE_SUCCESS; +} + int32_t nodesCollectColumnsFromNode(SNode* node, const char* pTableAlias, ECollectColType type, SNodeList** pCols) { if (NULL == pCols) { return TSDB_CODE_FAILED; diff --git a/source/libs/parser/inc/parInt.h b/source/libs/parser/inc/parInt.h index a4a7812474..cfad073f48 100644 --- a/source/libs/parser/inc/parInt.h +++ b/source/libs/parser/inc/parInt.h @@ -38,7 +38,7 @@ int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery); int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow); int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow); int32_t buildQueryAfterParse(SQuery** pQuery, SNode* pRootNode, int16_t placeholderNo, SArray** pPlaceholderValues); -int32_t translateTable(STranslateContext* pCxt, SNode** pTable); +int32_t translateTable(STranslateContext* pCxt, SNode** pTable, SNode* pJoinParent); int32_t getMetaDataFromHash(const char* pKey, int32_t len, SHashObj* pHash, void** pOutput); void tfreeSParseQueryRes(void* p); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 4ebe4f824a..2a0fe849d1 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2963,7 +2963,7 @@ static int32_t translateJoinTable(STranslateContext* pCxt, SJoinTableNode* pJoin return code; } -int32_t translateTable(STranslateContext* pCxt, SNode** pTable) { +int32_t translateTable(STranslateContext* pCxt, SNode** pTable, SNode* pJoinParent) { int32_t code = TSDB_CODE_SUCCESS; switch (nodeType(*pTable)) { case QUERY_NODE_REAL_TABLE: { @@ -3023,12 +3023,13 @@ int32_t translateTable(STranslateContext* pCxt, SNode** pTable) { } case QUERY_NODE_JOIN_TABLE: { SJoinTableNode* pJoinTable = (SJoinTableNode*)*pTable; + pJoinTable->pParent = pJoinParent; code = translateJoinTable(pCxt, pJoinTable); if (TSDB_CODE_SUCCESS == code) { - code = translateTable(pCxt, &pJoinTable->pLeft); + code = translateTable(pCxt, &pJoinTable->pLeft, (SNode*)pJoinTable); } if (TSDB_CODE_SUCCESS == code) { - code = translateTable(pCxt, &pJoinTable->pRight); + code = translateTable(pCxt, &pJoinTable->pRight, (SNode*)pJoinTable); } if (TSDB_CODE_SUCCESS == code) { code = checkJoinTable(pCxt, pJoinTable); @@ -4275,7 +4276,7 @@ static int32_t translateWhere(STranslateContext* pCxt, SSelectStmt* pSelect) { static int32_t translateFrom(STranslateContext* pCxt, SNode** pTable) { pCxt->currClause = SQL_CLAUSE_FROM; - return translateTable(pCxt, pTable); + return translateTable(pCxt, pTable, NULL); } static int32_t checkLimit(STranslateContext* pCxt, SSelectStmt* pSelect) { diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 452a322a98..bdd8a5f753 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -502,14 +502,26 @@ static int32_t createSubqueryLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe return createQueryLogicNode(pCxt, pTable->pSubquery, pLogicNode); } +int32_t collectJoinResColumns(SSelectStmt* pSelect, SJoinTableNode* pJoinTable, SNodeList** pCols) { + int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_WHERE, ((STableNode*)pJoinTable->pLeft)->tableAlias, COLLECT_COL_TYPE_ALL, pCols); + if (TSDB_CODE_SUCCESS == code) { + code = nodesCollectColumns(pSelect, SQL_CLAUSE_WHERE, ((STableNode*)pJoinTable->pRight)->tableAlias, COLLECT_COL_TYPE_ALL, pCols); + } + + return code; +} + + static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SJoinTableNode* pJoinTable, SLogicNode** pLogicNode) { + int32_t code = TSDB_CODE_SUCCESS; SJoinLogicNode* pJoin = (SJoinLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_JOIN); if (NULL == pJoin) { return TSDB_CODE_OUT_OF_MEMORY; } pJoin->joinType = pJoinTable->joinType; + pJoin->subType = pJoinTable->subType; pJoin->joinAlgo = JOIN_ALGO_UNKNOWN; pJoin->isSingleTableJoin = pJoinTable->table.singleTable; pJoin->hasSubQuery = pJoinTable->hasSubQuery; @@ -518,10 +530,8 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; pJoin->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; pJoin->isLowLevelJoin = pJoinTable->isLowLevelJoin; - - int32_t code = TSDB_CODE_SUCCESS; - - // set left and right node + pJoin->pWindowOffset = nodesCloneNode(pJoinTable->pWindowOffset); + pJoin->pJLimit = nodesCloneNode(pJoinTable->pJLimit); pJoin->node.pChildren = nodesMakeList(); if (NULL == pJoin->node.pChildren) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -548,12 +558,13 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect // set on conditions if (TSDB_CODE_SUCCESS == code && NULL != pJoinTable->pOnCond) { - pJoin->pOtherOnCond = nodesCloneNode(pJoinTable->pOnCond); - if (NULL == pJoin->pOtherOnCond) { + pJoin->pFullOnCond = nodesCloneNode(pJoinTable->pOnCond); + if (NULL == pJoin->pFullOnCond) { code = TSDB_CODE_OUT_OF_MEMORY; } } +#if 0 // set the output if (TSDB_CODE_SUCCESS == code) { SNodeList* pColList = NULL; @@ -600,6 +611,19 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect } } +#else + // set the output + if (TSDB_CODE_SUCCESS == code) { + SNodeList* pColList = NULL; + code = collectJoinResColumns(pSelect, pJoinTable, &pColList); + if (TSDB_CODE_SUCCESS == code && NULL != pColList) { + code = createColumnByRewriteExprs(pColList, &pJoin->node.pTargets); + } + nodesDestroyList(pColList); + } +#endif + + if (TSDB_CODE_SUCCESS == code) { *pLogicNode = (SLogicNode*)pJoin; } else { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index e71e18d37d..9fa45efd12 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -65,9 +65,26 @@ typedef enum ECondAction { // after supporting outer join, there are other possibilities } ECondAction; +#define PUSH_DOWN_LEFT_FLT (1 << 0) +#define PUSH_DOWN_RIGHT_FLT (1 << 1) +#define PUSH_DOWN_ON_COND (1 << 2) +#define PUSH_DOWN_ALL_COND (PUSH_DOWN_LEFT_FLT | PUSH_DOWN_RIGHT_FLT | PUSH_DOWN_ON_COND) + +typedef struct SJoinOptimizeOpt { + int8_t pushDownFlag; +} SJoinOptimizeOpt; + typedef bool (*FMayBeOptimized)(SLogicNode* pNode); typedef bool (*FShouldBeOptimized)(SLogicNode* pNode, void* pInfo); +static SJoinOptimizeOpt gJoinOpt[JOIN_TYPE_MAX_VALUE][JOIN_STYPE_MAX_VALUE] = { + /* NONE OUTER SEMI ANTI ANY ASOF WINDOW */ +/*INNER*/ {{PUSH_DOWN_ALL_COND}, {0}, {0}, {0}, {PUSH_DOWN_ALL_COND}, {0}, {0}}, +/*LEFT*/ {{0}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_ALL_COND}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_LEFT_FLT}}, +/*RIGHT*/ {{0}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DOWN_ALL_COND}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DOWN_RIGHT_FLT}}, +/*FULL*/ {{0}, {0}, {0}, {0}, {0}, {0}, {0}}, +}; + static SLogicNode* optFindPossibleNode(SLogicNode* pNode, FMayBeOptimized func) { if (func(pNode)) { return pNode; @@ -392,7 +409,7 @@ static int32_t scanPathOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub return code; } -static int32_t pushDownCondOptMergeCond(SNode** pDst, SNode** pSrc) { +static int32_t pdcMergeCondsToLogic(SNode** pDst, SNode** pSrc) { SLogicConditionNode* pLogicCond = (SLogicConditionNode*)nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); if (NULL == pLogicCond) { return TSDB_CODE_OUT_OF_MEMORY; @@ -413,7 +430,7 @@ static int32_t pushDownCondOptMergeCond(SNode** pDst, SNode** pSrc) { return code; } -static int32_t pushDownCondOptAppendCond(SNode** pCond, SNode** pAdditionalCond) { +static int32_t pdcMergeConds(SNode** pCond, SNode** pAdditionalCond) { if (NULL == *pCond) { TSWAP(*pCond, *pAdditionalCond); return TSDB_CODE_SUCCESS; @@ -427,7 +444,7 @@ static int32_t pushDownCondOptAppendCond(SNode** pCond, SNode** pAdditionalCond) *pAdditionalCond = NULL; } } else { - code = pushDownCondOptMergeCond(pCond, pAdditionalCond); + code = pdcMergeCondsToLogic(pCond, pAdditionalCond); } return code; } @@ -436,7 +453,7 @@ static int32_t pushDownCondOptCalcTimeRange(SOptimizeContext* pCxt, SScanLogicNo SNode** pOtherCond) { int32_t code = TSDB_CODE_SUCCESS; if (pCxt->pPlanCxt->topicQuery || pCxt->pPlanCxt->streamQuery) { - code = pushDownCondOptAppendCond(pOtherCond, pPrimaryKeyCond); + code = pdcMergeConds(pOtherCond, pPrimaryKeyCond); } else { bool isStrict = false; code = filterGetTimeRange(*pPrimaryKeyCond, &pScan->scanRange, &isStrict); @@ -444,7 +461,7 @@ static int32_t pushDownCondOptCalcTimeRange(SOptimizeContext* pCxt, SScanLogicNo if (isStrict) { nodesDestroyNode(*pPrimaryKeyCond); } else { - code = pushDownCondOptAppendCond(pOtherCond, pPrimaryKeyCond); + code = pdcMergeConds(pOtherCond, pPrimaryKeyCond); } *pPrimaryKeyCond = NULL; } @@ -458,7 +475,7 @@ static int32_t pushDownCondOptRebuildTbanme(SNode** pTagCond) { return code; } -static int32_t pushDownCondOptDealScan(SOptimizeContext* pCxt, SScanLogicNode* pScan) { +static int32_t pdcDealScan(SOptimizeContext* pCxt, SScanLogicNode* pScan) { if (NULL == pScan->node.pConditions || OPTIMIZE_FLAG_TEST_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE) || TSDB_SYSTEM_TABLE == pScan->tableType) { @@ -490,7 +507,7 @@ static int32_t pushDownCondOptDealScan(SOptimizeContext* pCxt, SScanLogicNode* p return code; } -static bool pushDownCondOptBelongThisTable(SNode* pCondCol, SNodeList* pTableCols) { +static bool pdcColBelongThisTable(SNode* pCondCol, SNodeList* pTableCols) { SNode* pTableCol = NULL; FOREACH(pTableCol, pTableCols) { if (nodesEqualNode(pCondCol, pTableCol)) { @@ -509,7 +526,7 @@ static bool pushDownCondOptColInTableList(SNode* pCondCol, SSHashObj* pTables) { } -static EDealRes pushDownCondOptIsCrossTableCond(SNode* pNode, void* pContext) { +static EDealRes pdcJoinIsCrossTableCond(SNode* pNode, void* pContext) { SCpdIsMultiTableCondCxt* pCxt = pContext; if (QUERY_NODE_COLUMN == nodeType(pNode)) { if (pushDownCondOptColInTableList(pNode, pCxt->pLeftTbls)) { @@ -522,19 +539,20 @@ static EDealRes pushDownCondOptIsCrossTableCond(SNode* pNode, void* pContext) { return DEAL_RES_CONTINUE; } -static ECondAction pushDownCondOptGetCondAction(EJoinType joinType, SSHashObj* pLeftTbls, SSHashObj* pRightTbls, +static ECondAction pdcJoinGetCondAction(SJoinLogicNode* pJoin, SSHashObj* pLeftTbls, SSHashObj* pRightTbls, SNode* pNode) { + EJoinType t = pJoin->joinType; SCpdIsMultiTableCondCxt cxt = { .pLeftTbls = pLeftTbls, .pRightTbls = pRightTbls, .havaLeftCol = false, .haveRightCol = false}; - nodesWalkExpr(pNode, pushDownCondOptIsCrossTableCond, &cxt); - return (JOIN_TYPE_INNER != joinType + nodesWalkExpr(pNode, pdcJoinIsCrossTableCond, &cxt); + return (JOIN_TYPE_INNER != t ? COND_ACTION_STAY : (cxt.havaLeftCol && cxt.haveRightCol ? COND_ACTION_PUSH_JOIN : (cxt.havaLeftCol ? COND_ACTION_PUSH_LEFT_CHILD : COND_ACTION_PUSH_RIGHT_CHILD))); } -static int32_t pushDownCondOptPartLogicCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond, +static int32_t pdcJoinSplitLogicCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond, SNode** pRightChildCond) { SLogicConditionNode* pLogicCond = (SLogicConditionNode*)pJoin->node.pConditions; if (LOGIC_COND_TYPE_AND != pLogicCond->condType) { @@ -553,7 +571,7 @@ static int32_t pushDownCondOptPartLogicCond(SJoinLogicNode* pJoin, SNode** pOnCo SNodeList* pRemainConds = NULL; SNode* pCond = NULL; FOREACH(pCond, pLogicCond->pParameterList) { - ECondAction condAction = pushDownCondOptGetCondAction(pJoin->joinType, pLeftTables, pRightTables, pCond); + ECondAction condAction = pdcJoinGetCondAction(pJoin, pLeftTables, pRightTables, pCond); if (COND_ACTION_PUSH_JOIN == condAction) { code = nodesListMakeAppend(&pOnConds, nodesCloneNode(pCond)); } else if (COND_ACTION_PUSH_LEFT_CHILD == condAction) { @@ -608,15 +626,14 @@ static int32_t pushDownCondOptPartLogicCond(SJoinLogicNode* pJoin, SNode** pOnCo return code; } -static int32_t pushDownCondOptPartOpCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond, +static int32_t pdcJoinSplitOpCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond, SNode** pRightChildCond) { SSHashObj* pLeftTables = NULL; SSHashObj* pRightTables = NULL; collectTableAliasFromNodes(nodesListGetNode(pJoin->node.pChildren, 0), &pLeftTables); collectTableAliasFromNodes(nodesListGetNode(pJoin->node.pChildren, 1), &pRightTables); - ECondAction condAction = - pushDownCondOptGetCondAction(pJoin->joinType, pLeftTables, pRightTables, pJoin->node.pConditions); + ECondAction condAction = pdcJoinGetCondAction(pJoin, pLeftTables, pRightTables, pJoin->node.pConditions); tSimpleHashCleanup(pLeftTables); tSimpleHashCleanup(pRightTables); @@ -634,21 +651,21 @@ static int32_t pushDownCondOptPartOpCond(SJoinLogicNode* pJoin, SNode** pOnCond, return TSDB_CODE_SUCCESS; } -static int32_t pushDownCondOptPartCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond, +static int32_t pdcJoinSplitOrigCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond, SNode** pRightChildCond) { if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->node.pConditions)) { - return pushDownCondOptPartLogicCond(pJoin, pOnCond, pLeftChildCond, pRightChildCond); + return pdcJoinSplitLogicCond(pJoin, pOnCond, pLeftChildCond, pRightChildCond); } else { - return pushDownCondOptPartOpCond(pJoin, pOnCond, pLeftChildCond, pRightChildCond); + return pdcJoinSplitOpCond(pJoin, pOnCond, pLeftChildCond, pRightChildCond); } } -static int32_t pushDownCondOptPushCondToOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode** pCond) { - return pushDownCondOptAppendCond(&pJoin->pOtherOnCond, pCond); +static int32_t pdcJoinPushDownOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode** pCond) { + return pdcMergeConds(&pJoin->pFullOnCond, pCond); } -static int32_t pushDownCondOptPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SNode** pCond) { - return pushDownCondOptAppendCond(&pChild->pConditions, pCond); +static int32_t pdcPushDownCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SNode** pCond) { + return pdcMergeConds(&pChild->pConditions, pCond); } static bool pushDownCondOptIsPriKey(SNode* pNode, SSHashObj* pTables) { @@ -662,7 +679,7 @@ static bool pushDownCondOptIsPriKey(SNode* pNode, SSHashObj* pTables) { return pushDownCondOptColInTableList(pNode, pTables); } -static bool pushDownCondOptIsPriKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) { +static bool pdcJoinIsPrimEqualCond(SJoinLogicNode* pJoin, SNode* pCond) { if (QUERY_NODE_OPERATOR != nodeType(pCond)) { return false; } @@ -690,7 +707,7 @@ static bool pushDownCondOptIsPriKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond return res; } -static bool pushDownCondOptContainPriKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond, bool* errCond) { +static bool pdcJoinHasPrimEqualCond(SJoinLogicNode* pJoin, SNode* pCond, bool* errCond) { if (QUERY_NODE_LOGIC_CONDITION == nodeType(pCond)) { SLogicConditionNode* pLogicCond = (SLogicConditionNode*)pCond; if (LOGIC_COND_TYPE_AND != pLogicCond->condType) { @@ -702,39 +719,25 @@ static bool pushDownCondOptContainPriKeyEqualCond(SJoinLogicNode* pJoin, SNode* bool hasPrimaryKeyEqualCond = false; SNode* pCond = NULL; FOREACH(pCond, pLogicCond->pParameterList) { - if (pushDownCondOptContainPriKeyEqualCond(pJoin, pCond, NULL)) { + if (pdcJoinHasPrimEqualCond(pJoin, pCond, NULL)) { hasPrimaryKeyEqualCond = true; break; } } return hasPrimaryKeyEqualCond; } else { - return pushDownCondOptIsPriKeyEqualCond(pJoin, pCond); + return pdcJoinIsPrimEqualCond(pJoin, pCond); } } -static int32_t pushDownCondOptCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { - if (NULL == pJoin->pOtherOnCond) { - return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN); - } - bool errCond = false; - if (!pushDownCondOptContainPriKeyEqualCond(pJoin, pJoin->pOtherOnCond, &errCond)) { - if (errCond) { - return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_JOIN_COND); - } - return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL); - } - return TSDB_CODE_SUCCESS; -} - static int32_t pushDownCondOptPartJoinOnCondLogicCond(SJoinLogicNode* pJoin, SNode** ppPrimKeyEqCond, SNode** ppOnCond) { - SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOtherOnCond); + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pFullOnCond); int32_t code = TSDB_CODE_SUCCESS; SNodeList* pOnConds = NULL; SNode* pCond = NULL; WHERE_EACH(pCond, pLogicCond->pParameterList) { - if (pushDownCondOptIsPriKeyEqualCond(pJoin, pCond)) { + if (pdcJoinIsPrimEqualCond(pJoin, pCond)) { nodesDestroyNode(*ppPrimKeyEqCond); *ppPrimKeyEqCond = nodesCloneNode(pCond); ERASE_NODE(pLogicCond->pParameterList); @@ -751,8 +754,8 @@ static int32_t pushDownCondOptPartJoinOnCondLogicCond(SJoinLogicNode* pJoin, SNo if (TSDB_CODE_SUCCESS == code && NULL != *ppPrimKeyEqCond) { *ppOnCond = pTempOnCond; - nodesDestroyNode(pJoin->pOtherOnCond); - pJoin->pOtherOnCond = NULL; + nodesDestroyNode(pJoin->pFullOnCond); + pJoin->pFullOnCond = NULL; return TSDB_CODE_SUCCESS; } else { nodesDestroyList(pOnConds); @@ -761,32 +764,29 @@ static int32_t pushDownCondOptPartJoinOnCondLogicCond(SJoinLogicNode* pJoin, SNo } } -static int32_t pushDownCondOptPartJoinOnCond(SJoinLogicNode* pJoin, SNode** ppPrimKeyEqCond, SNode** ppOnCond) { - if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOtherOnCond) && - LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pOtherOnCond))->condType) { +static int32_t pdcPartJoinPrimCond(SJoinLogicNode* pJoin, SNode** ppPrimKeyEqCond, SNode** ppOnCond) { + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pFullOnCond) && + LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pFullOnCond))->condType) { return pushDownCondOptPartJoinOnCondLogicCond(pJoin, ppPrimKeyEqCond, ppOnCond); } - if (pushDownCondOptIsPriKeyEqualCond(pJoin, pJoin->pOtherOnCond)) { - *ppPrimKeyEqCond = pJoin->pOtherOnCond; + if (pdcJoinIsPrimEqualCond(pJoin, pJoin->pFullOnCond)) { + *ppPrimKeyEqCond = pJoin->pFullOnCond; *ppOnCond = NULL; - pJoin->pOtherOnCond = NULL; + pJoin->pFullOnCond = NULL; return TSDB_CODE_SUCCESS; } else { return TSDB_CODE_PLAN_INTERNAL_ERROR; } } -static int32_t pushDownCondOptJoinExtractCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { - int32_t code = pushDownCondOptCheckJoinOnCond(pCxt, pJoin); +static int32_t pdcJoinCheckPartPrimOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { SNode* pPrimKeyEqCond = NULL; SNode* pJoinOnCond = NULL; - if (TSDB_CODE_SUCCESS == code) { - code = pushDownCondOptPartJoinOnCond(pJoin, &pPrimKeyEqCond, &pJoinOnCond); - } + int32_t code = pdcPartJoinPrimCond(pJoin, &pPrimKeyEqCond, &pJoinOnCond); if (TSDB_CODE_SUCCESS == code) { pJoin->pPrimKeyEqCond = pPrimKeyEqCond; - pJoin->pOtherOnCond = pJoinOnCond; + pJoin->pFullOnCond = pJoinOnCond; } else { nodesDestroyNode(pPrimKeyEqCond); nodesDestroyNode(pJoinOnCond); @@ -794,15 +794,15 @@ static int32_t pushDownCondOptJoinExtractCond(SOptimizeContext* pCxt, SJoinLogic return code; } -static bool pushDownCondOptIsTableColumn(SNode* pNode, SNodeList* pTableCols) { +static bool pdcIsTableColumn(SNode* pNode, SNodeList* pTableCols) { if (QUERY_NODE_COLUMN != nodeType(pNode)) { return false; } SColumnNode* pCol = (SColumnNode*)pNode; - return pushDownCondOptBelongThisTable(pNode, pTableCols); + return pdcColBelongThisTable(pNode, pTableCols); } -static bool pushDownCondOptIsColEqualOnCond(SJoinLogicNode* pJoin, SNode* pCond, bool* allTags) { +static bool pdcIsEqualOnCond(SJoinLogicNode* pJoin, SNode* pCond, bool* allTags) { if (QUERY_NODE_OPERATOR != nodeType(pCond)) { return false; } @@ -826,18 +826,17 @@ static bool pushDownCondOptIsColEqualOnCond(SJoinLogicNode* pJoin, SNode* pCond, SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets; SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets; bool isEqual = false; - if (pushDownCondOptIsTableColumn(pOper->pLeft, pLeftCols)) { - isEqual = pushDownCondOptIsTableColumn(pOper->pRight, pRightCols); - } else if (pushDownCondOptIsTableColumn(pOper->pLeft, pRightCols)) { - isEqual = pushDownCondOptIsTableColumn(pOper->pRight, pLeftCols); - } - if (isEqual) { + if (pdcIsTableColumn(pOper->pLeft, pLeftCols)) { + isEqual = pdcIsTableColumn(pOper->pRight, pRightCols); + } else if (pdcIsTableColumn(pOper->pLeft, pRightCols)) { + isEqual = pdcIsTableColumn(pOper->pRight, pLeftCols); } + return isEqual; } -static int32_t pushDownCondOptJoinExtractEqualOnLogicCond(SJoinLogicNode* pJoin) { - SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOtherOnCond); +static int32_t pdcJoinPartLogicEqualOnCond(SJoinLogicNode* pJoin) { + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pFullOnCond); int32_t code = TSDB_CODE_SUCCESS; SNodeList* pColEqOnConds = NULL; @@ -847,7 +846,7 @@ static int32_t pushDownCondOptJoinExtractEqualOnLogicCond(SJoinLogicNode* pJoin) bool allTags = false; FOREACH(pCond, pLogicCond->pParameterList) { allTags = false; - if (pushDownCondOptIsColEqualOnCond(pJoin, pCond, &allTags)) { + if (pdcIsEqualOnCond(pJoin, pCond, &allTags)) { if (allTags) { code = nodesListMakeAppend(&pTagEqOnConds, nodesCloneNode(pCond)); } else { @@ -887,33 +886,33 @@ static int32_t pushDownCondOptJoinExtractEqualOnLogicCond(SJoinLogicNode* pJoin) return TSDB_CODE_SUCCESS; } -static int32_t pushDownCondOptJoinExtractEqualOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { - if (NULL == pJoin->pOtherOnCond) { +static int32_t pdcJoinPartEqualOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { + if (NULL == pJoin->pFullOnCond) { pJoin->pColEqCond = NULL; pJoin->pTagEqCond = NULL; return TSDB_CODE_SUCCESS; } - if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOtherOnCond) && - LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pOtherOnCond))->condType) { - return pushDownCondOptJoinExtractEqualOnLogicCond(pJoin); + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pFullOnCond) && + LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pFullOnCond))->condType) { + return pdcJoinPartLogicEqualOnCond(pJoin); } bool allTags = false; - if (pushDownCondOptIsColEqualOnCond(pJoin, pJoin->pOtherOnCond, &allTags)) { + if (pdcIsEqualOnCond(pJoin, pJoin->pFullOnCond, &allTags)) { if (allTags) { - pJoin->pTagEqCond = nodesCloneNode(pJoin->pOtherOnCond); + pJoin->pTagEqCond = nodesCloneNode(pJoin->pFullOnCond); } else { - pJoin->pColEqCond = nodesCloneNode(pJoin->pOtherOnCond); + pJoin->pColEqCond = nodesCloneNode(pJoin->pFullOnCond); } } else if (allTags) { - pJoin->pTagOnCond = nodesCloneNode(pJoin->pOtherOnCond); + pJoin->pTagOnCond = nodesCloneNode(pJoin->pFullOnCond); } return TSDB_CODE_SUCCESS; } -static int32_t pushDownCondOptAppendFilterCol(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { - if (NULL == pJoin->pOtherOnCond) { +static int32_t pdcJoinAddPreFilterColsToTarget(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { + if (NULL == pJoin->pFullOnCond) { return TSDB_CODE_SUCCESS; } @@ -923,7 +922,7 @@ static int32_t pushDownCondOptAppendFilterCol(SOptimizeContext* pCxt, SJoinLogic if (NULL == pCondCols) { code = TSDB_CODE_OUT_OF_MEMORY; } else { - code = nodesCollectColumnsFromNode(pJoin->pOtherOnCond, NULL, COLLECT_COL_TYPE_ALL, &pCondCols); + code = nodesCollectColumnsFromNode(pJoin->pFullOnCond, NULL, COLLECT_COL_TYPE_ALL, &pCondCols); } if (TSDB_CODE_SUCCESS == code) { code = createColumnByRewriteExprs(pCondCols, &pTargets); @@ -953,8 +952,26 @@ static int32_t pushDownCondOptAppendFilterCol(SOptimizeContext* pCxt, SJoinLogic return code; } +static int32_t pdcJoinCheckAllCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { + if (NULL == pJoin->pFullOnCond) { + if ((!IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType)) || NULL == pJoin->node.pConditions) { + return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN); + } + } -static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { + SNode* pCond = pJoin->pFullOnCond ? pJoin->pFullOnCond : pJoin->node.pConditions; + bool errCond = false; + if (!pdcJoinHasPrimEqualCond(pJoin, pCond, &errCond)) { + if (errCond) { + return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_JOIN_COND); + } + return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL); + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t pdcDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) { return TSDB_CODE_SUCCESS; } @@ -962,13 +979,33 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p return TSDB_CODE_SUCCESS; } + EJoinType t = pJoin->joinType; + EJoinSubType s = pJoin->subType; + SNode* pOnCond = NULL; + SNode* pLeftChildCond = NULL; + SNode* pRightChildCond = NULL; + int32_t code = pdcJoinCheckAllCond(pCxt, pJoin); + if (TSDB_CODE_SUCCESS == code && NULL != pJoin->node.pConditions && 0 != gJoinOpt[t][s].pushDownFlag) { + code = pdcJoinSplitOrigCond(pJoin, &pOnCond, &pLeftChildCond, &pRightChildCond); + } + if (TSDB_CODE_SUCCESS == code && NULL != pOnCond) { + code = pdcJoinPushDownOnCond(pCxt, pJoin, &pOnCond); + } + if (TSDB_CODE_SUCCESS == code && NULL != pLeftChildCond) { + code = pdcPushDownCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0), &pLeftChildCond); + } + if (TSDB_CODE_SUCCESS == code && NULL != pRightChildCond) { + code = pdcPushDownCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1), &pRightChildCond); + } + + if (NULL == pJoin->node.pConditions) { - int32_t code = pushDownCondOptJoinExtractCond(pCxt, pJoin); + int32_t code = pdcJoinCheckPartPrimOnCond(pCxt, pJoin); if (TSDB_CODE_SUCCESS == code) { - code = pushDownCondOptJoinExtractEqualOnCond(pCxt, pJoin); + code = pdcJoinPartEqualOnCond(pCxt, pJoin); } if (TSDB_CODE_SUCCESS == code) { - code = pushDownCondOptAppendFilterCol(pCxt, pJoin); + code = pdcJoinAddPreFilterColsToTarget(pCxt, pJoin); } if (TSDB_CODE_SUCCESS == code) { OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE); @@ -977,32 +1014,18 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p return code; } - SNode* pOnCond = NULL; - SNode* pLeftChildCond = NULL; - SNode* pRightChildCond = NULL; - int32_t code = pushDownCondOptPartCond(pJoin, &pOnCond, &pLeftChildCond, &pRightChildCond); - if (TSDB_CODE_SUCCESS == code && NULL != pOnCond) { - code = pushDownCondOptPushCondToOnCond(pCxt, pJoin, &pOnCond); - } - if (TSDB_CODE_SUCCESS == code && NULL != pLeftChildCond) { - code = - pushDownCondOptPushCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0), &pLeftChildCond); - } - if (TSDB_CODE_SUCCESS == code && NULL != pRightChildCond) { - code = - pushDownCondOptPushCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1), &pRightChildCond); + + + if (TSDB_CODE_SUCCESS == code) { + code = pdcJoinCheckPartPrimOnCond(pCxt, pJoin); } if (TSDB_CODE_SUCCESS == code) { - code = pushDownCondOptJoinExtractCond(pCxt, pJoin); + code = pdcJoinPartEqualOnCond(pCxt, pJoin); } if (TSDB_CODE_SUCCESS == code) { - code = pushDownCondOptJoinExtractEqualOnCond(pCxt, pJoin); - } - - if (TSDB_CODE_SUCCESS == code) { - code = pushDownCondOptAppendFilterCol(pCxt, pJoin); + code = pdcJoinAddPreFilterColsToTarget(pCxt, pJoin); } if (TSDB_CODE_SUCCESS == code) { @@ -1099,7 +1122,7 @@ static int32_t partitionAggCond(SAggLogicNode* pAgg, SNode** ppAggFunCond, SNode } static int32_t pushCondToAggCond(SOptimizeContext* pCxt, SAggLogicNode* pAgg, SNode** pAggFuncCond) { - return pushDownCondOptAppendCond(&pAgg->node.pConditions, pAggFuncCond); + return pdcMergeConds(&pAgg->node.pConditions, pAggFuncCond); } typedef struct SRewriteAggGroupKeyCondContext { @@ -1137,7 +1160,7 @@ static int32_t rewriteAggGroupKeyCondForPushDown(SOptimizeContext* pCxt, SAggLog return cxt.errCode; } -static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAgg) { +static int32_t pdcDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAgg) { if (NULL == pAgg->node.pConditions || OPTIMIZE_FLAG_TEST_MASK(pAgg->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) { return TSDB_CODE_SUCCESS; @@ -1158,7 +1181,7 @@ static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAg } if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeyCond) { SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0); - code = pushDownCondOptPushCondToChild(pCxt, pChild, &pGroupKeyCond); + code = pdcPushDownCondToChild(pCxt, pChild, &pGroupKeyCond); } if (TSDB_CODE_SUCCESS == code) { OPTIMIZE_FLAG_SET_MASK(pAgg->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE); @@ -1211,7 +1234,7 @@ static int32_t rewriteProjectCondForPushDown(SOptimizeContext* pCxt, SProjectLog return cxt.errCode; } -static int32_t pushDownCondOptDealProject(SOptimizeContext* pCxt, SProjectLogicNode* pProject) { +static int32_t pdcDealProject(SOptimizeContext* pCxt, SProjectLogicNode* pProject) { if (NULL == pProject->node.pConditions || OPTIMIZE_FLAG_TEST_MASK(pProject->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) { return TSDB_CODE_SUCCESS; @@ -1230,7 +1253,7 @@ static int32_t pushDownCondOptDealProject(SOptimizeContext* pCxt, SProjectLogicN code = rewriteProjectCondForPushDown(pCxt, pProject, &pProjCond); if (TSDB_CODE_SUCCESS == code) { SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProject->node.pChildren, 0); - code = pushDownCondOptPushCondToChild(pCxt, pChild, &pProjCond); + code = pdcPushDownCondToChild(pCxt, pChild, &pProjCond); } if (TSDB_CODE_SUCCESS == code) { @@ -1242,13 +1265,13 @@ static int32_t pushDownCondOptDealProject(SOptimizeContext* pCxt, SProjectLogicN return code; } -static int32_t pushDownCondOptTrivialPushDown(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { +static int32_t pdcTrivialPushDown(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { if (NULL == pLogicNode->pConditions || OPTIMIZE_FLAG_TEST_MASK(pLogicNode->optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) { return TSDB_CODE_SUCCESS; } SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pLogicNode->pChildren, 0); - int32_t code = pushDownCondOptPushCondToChild(pCxt, pChild, &pLogicNode->pConditions); + int32_t code = pdcPushDownCondToChild(pCxt, pChild, &pLogicNode->pConditions); if (TSDB_CODE_SUCCESS == code) { OPTIMIZE_FLAG_SET_MASK(pLogicNode->optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE); pCxt->optimized = true; @@ -1256,24 +1279,24 @@ static int32_t pushDownCondOptTrivialPushDown(SOptimizeContext* pCxt, SLogicNode return code; } -static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { +static int32_t pdcOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { int32_t code = TSDB_CODE_SUCCESS; switch (nodeType(pLogicNode)) { case QUERY_NODE_LOGIC_PLAN_SCAN: - code = pushDownCondOptDealScan(pCxt, (SScanLogicNode*)pLogicNode); + code = pdcDealScan(pCxt, (SScanLogicNode*)pLogicNode); break; case QUERY_NODE_LOGIC_PLAN_JOIN: - code = pushDownCondOptDealJoin(pCxt, (SJoinLogicNode*)pLogicNode); + code = pdcDealJoin(pCxt, (SJoinLogicNode*)pLogicNode); break; case QUERY_NODE_LOGIC_PLAN_AGG: - code = pushDownCondOptDealAgg(pCxt, (SAggLogicNode*)pLogicNode); + code = pdcDealAgg(pCxt, (SAggLogicNode*)pLogicNode); break; case QUERY_NODE_LOGIC_PLAN_PROJECT: - code = pushDownCondOptDealProject(pCxt, (SProjectLogicNode*)pLogicNode); + code = pdcDealProject(pCxt, (SProjectLogicNode*)pLogicNode); break; case QUERY_NODE_LOGIC_PLAN_SORT: case QUERY_NODE_LOGIC_PLAN_PARTITION: - code = pushDownCondOptTrivialPushDown(pCxt, pLogicNode); + code = pdcTrivialPushDown(pCxt, pLogicNode); break; default: break; @@ -1281,7 +1304,7 @@ static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLog if (TSDB_CODE_SUCCESS == code) { SNode* pChild = NULL; FOREACH(pChild, pLogicNode->pChildren) { - code = pushDownCondOptimizeImpl(pCxt, (SLogicNode*)pChild); + code = pdcOptimizeImpl(pCxt, (SLogicNode*)pChild); if (TSDB_CODE_SUCCESS != code) { break; } @@ -1290,8 +1313,8 @@ static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLog return code; } -static int32_t pushDownCondOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { - return pushDownCondOptimizeImpl(pCxt, pLogicSubplan->pNode); +static int32_t pdcOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { + return pdcOptimizeImpl(pCxt, pLogicSubplan->pNode); } static bool sortPriKeyOptIsPriKeyOrderBy(SNodeList* pSortKeys) { @@ -1966,10 +1989,10 @@ static bool eliminateProjOptCanChildConditionUseChildTargets(SLogicNode* pChild, if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild) && ((SJoinLogicNode*)pChild)->joinAlgo != JOIN_ALGO_UNKNOWN) { return false; } - if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild) && ((SJoinLogicNode*)pChild)->pOtherOnCond) { + if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild) && ((SJoinLogicNode*)pChild)->pFullOnCond) { SJoinLogicNode* pJoinLogicNode = (SJoinLogicNode*)pChild; CheckNewChildTargetsCxt cxt = {.pNewChildTargets = pNewChildTargets, .canUse = false}; - nodesWalkExpr(pJoinLogicNode->pOtherOnCond, eliminateProjOptCanUseNewChildTargetsImpl, &cxt); + nodesWalkExpr(pJoinLogicNode->pFullOnCond, eliminateProjOptCanUseNewChildTargetsImpl, &cxt); if (!cxt.canUse) return false; } return true; @@ -3629,7 +3652,7 @@ static int32_t stbJoinOptCreateTagHashJoinNode(SLogicNode* pOrig, SNodeList* pCh pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; pJoin->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; pJoin->pTagEqCond = nodesCloneNode(pOrigJoin->pTagEqCond); - pJoin->pOtherOnCond = nodesCloneNode(pOrigJoin->pTagOnCond); + pJoin->pFullOnCond = nodesCloneNode(pOrigJoin->pTagOnCond); int32_t code = TSDB_CODE_SUCCESS; pJoin->node.pChildren = pChildren; @@ -3741,12 +3764,12 @@ static int32_t stbJoinOptCreateGroupCacheNode(SLogicNode* pRoot, SNodeList* pChi } static void stbJoinOptRemoveTagEqCond(SJoinLogicNode* pJoin) { - if (QUERY_NODE_OPERATOR == nodeType(pJoin->pOtherOnCond) && nodesEqualNode(pJoin->pOtherOnCond, pJoin->pTagEqCond)) { - NODES_DESTORY_NODE(pJoin->pOtherOnCond); + if (QUERY_NODE_OPERATOR == nodeType(pJoin->pFullOnCond) && nodesEqualNode(pJoin->pFullOnCond, pJoin->pTagEqCond)) { + NODES_DESTORY_NODE(pJoin->pFullOnCond); return; } - if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOtherOnCond)) { - SLogicConditionNode* pLogic = (SLogicConditionNode*)pJoin->pOtherOnCond; + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pFullOnCond)) { + SLogicConditionNode* pLogic = (SLogicConditionNode*)pJoin->pFullOnCond; SNode* pNode = NULL; FOREACH(pNode, pLogic->pParameterList) { if (nodesEqualNode(pNode, pJoin->pTagEqCond)) { @@ -3765,7 +3788,7 @@ static void stbJoinOptRemoveTagEqCond(SJoinLogicNode* pJoin) { } if (pLogic->pParameterList->length <= 0) { - NODES_DESTORY_NODE(pJoin->pOtherOnCond); + NODES_DESTORY_NODE(pJoin->pFullOnCond); } } } @@ -4042,7 +4065,7 @@ static int32_t partitionColsOpt(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub // clang-format off static const SOptimizeRule optimizeRuleSet[] = { {.pName = "ScanPath", .optimizeFunc = scanPathOptimize}, - {.pName = "PushDownCondition", .optimizeFunc = pushDownCondOptimize}, + {.pName = "PushDownCondition", .optimizeFunc = pdcOptimize}, {.pName = "StableJoin", .optimizeFunc = stableJoinOptimize}, {.pName = "sortNonPriKeyOptimize", .optimizeFunc = sortNonPriKeyOptimize}, {.pName = "SortPrimaryKey", .optimizeFunc = sortPrimaryKeyOptimize}, diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 598bce3133..aa563034bc 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -787,9 +787,9 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi &pJoin->pTargets); } - if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOtherOnCond) { + if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pFullOnCond) { code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, - pJoinLogicNode->pOtherOnCond, &pJoin->pOtherOnCond); + pJoinLogicNode->pFullOnCond, &pJoin->pFullOnCond); } if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColEqCond) || (NULL != pJoinLogicNode->pTagEqCond))) { @@ -963,8 +963,8 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil if (TSDB_CODE_SUCCESS == code) { code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqCond, &pJoin->pTagEqCond); } - if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOtherOnCond) { - code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pOtherOnCond, &pJoin->pFilterConditions); + if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pFullOnCond) { + code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pFullOnCond, &pJoin->pFilterConditions); } if (TSDB_CODE_SUCCESS == code) { code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets, &pJoin->pTargets); diff --git a/source/libs/qcom/inc/queryInt.h b/source/libs/qcom/inc/queryInt.h index 8f52f21d23..ee7d4499d2 100644 --- a/source/libs/qcom/inc/queryInt.h +++ b/source/libs/qcom/inc/queryInt.h @@ -20,6 +20,11 @@ extern "C" { #endif +#define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS) +#define VALIDNUMOFTAGS(x) ((x) >= 0 && (x) <= TSDB_MAX_TAGS) + + + #ifdef __cplusplus } #endif diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 5c9c3da5f4..f2d6668863 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -21,18 +21,17 @@ #include "tsched.h" // clang-format off #include "cJSON.h" +#include "queryInt.h" -#define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS) -#define VALIDNUMOFTAGS(x) ((x) >= 0 && (x) <= TSDB_MAX_TAGS) - -static struct SSchema _s = { - .colId = TSDB_TBNAME_COLUMN_INDEX, - .type = TSDB_DATA_TYPE_BINARY, - .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, - .name = "tbname", -}; - -const SSchema* tGetTbnameColumnSchema() { return &_s; } +const SSchema* tGetTbnameColumnSchema() { + static struct SSchema _s = { + .colId = TSDB_TBNAME_COLUMN_INDEX, + .type = TSDB_DATA_TYPE_BINARY, + .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, + .name = "tbname", + }; + return &_s; +} static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen) { int32_t rowLen = 0;