From d5541cf1b0572027961703dcfd0fb38786a0663b Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Tue, 14 Jun 2022 21:13:10 +0800 Subject: [PATCH] feat: super table join --- source/libs/planner/src/planSpliter.c | 198 ++++++++++----------- source/libs/planner/test/planOtherTest.cpp | 7 + source/libs/planner/test/planTestUtil.h | 4 + 3 files changed, 109 insertions(+), 100 deletions(-) diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 54bc206619..4b372bb7d4 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -15,6 +15,7 @@ #include "functionMgt.h" #include "planInt.h" +#include "tglobal.h" #define SPLIT_FLAG_MASK(n) (1 << n) @@ -37,7 +38,8 @@ typedef struct SSplitRule { FSplit splitFunc; } SSplitRule; -typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, void* pInfo); +// typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, void* pInfo); +typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, void* pInfo); static void splSetSubplanVgroups(SLogicSubplan* pSubplan, SLogicNode* pNode) { if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { @@ -95,9 +97,23 @@ static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubpla return code; } +static bool splMatchByNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, FSplFindSplitNode func, + void* pInfo) { + if (func(pCxt, pSubplan, pNode, pInfo)) { + return true; + } + SNode* pChild; + FOREACH(pChild, pNode->pChildren) { + if (splMatchByNode(pCxt, pSubplan, (SLogicNode*)pChild, func, pInfo)) { + return true; + } + } + return NULL; +} + static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag, FSplFindSplitNode func, void* pInfo) { if (!SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, flag)) { - if (func(pCxt, pSubplan, pInfo)) { + if (splMatchByNode(pCxt, pSubplan, pSubplan->pNode, func, pInfo)) { return true; } } @@ -110,6 +126,11 @@ static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag, return false; } +static void splSetParent(SLogicNode* pNode) { + SNode* pChild = NULL; + FOREACH(pChild, pNode->pChildren) { ((SLogicNode*)pChild)->pParent = pNode; } +} + typedef struct SStableSplitInfo { SLogicNode* pSplitNode; SLogicSubplan* pSubplan; @@ -149,8 +170,8 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { switch (nodeType(pNode)) { case QUERY_NODE_LOGIC_PLAN_SCAN: return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode); - case QUERY_NODE_LOGIC_PLAN_JOIN: - return !(((SJoinLogicNode*)pNode)->isSingleTableJoin); + // case QUERY_NODE_LOGIC_PLAN_JOIN: + // return !(((SJoinLogicNode*)pNode)->isSingleTableJoin); case QUERY_NODE_LOGIC_PLAN_AGG: return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_WINDOW: { @@ -168,27 +189,14 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { return false; } -static SLogicNode* stbSplMatchByNode(bool streamQuery, SLogicNode* pNode) { - if (stbSplNeedSplit(streamQuery, pNode)) { - return pNode; - } - SNode* pChild; - FOREACH(pChild, pNode->pChildren) { - SLogicNode* pSplitNode = stbSplMatchByNode(streamQuery, (SLogicNode*)pChild); - if (NULL != pSplitNode) { - return pSplitNode; - } - } - return NULL; -} - -static bool stbSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SStableSplitInfo* pInfo) { - SLogicNode* pSplitNode = stbSplMatchByNode(pCxt->pPlanCxt->streamQuery, pSubplan->pNode); - if (NULL != pSplitNode) { - pInfo->pSplitNode = pSplitNode; +static bool stbSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, + SStableSplitInfo* pInfo) { + if (stbSplNeedSplit(pCxt->pPlanCxt->streamQuery, pNode)) { + pInfo->pSplitNode = pNode; pInfo->pSubplan = pSubplan; + return true; } - return NULL != pSplitNode; + return false; } static int32_t stbSplRewriteFuns(const SNodeList* pFuncs, SNodeList** pPartialFuncs, SNodeList** pMergeFuncs) { @@ -266,6 +274,7 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic if (TSDB_CODE_SUCCESS == code) { pMergeWindow->node.pTargets = pTargets; pPartWin->node.pChildren = pChildren; + splSetParent((SLogicNode*)pPartWin); code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, &pMergeWindow->pFuncs); } int32_t index = 0; @@ -458,6 +467,7 @@ static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pO pMergeAgg->node.pConditions = pConditions; pMergeAgg->node.pTargets = pTargets; pPartAgg->node.pChildren = pChildren; + splSetParent((SLogicNode*)pPartAgg); code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, &pMergeAgg->pAggFuncs); } @@ -572,6 +582,7 @@ static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOut SNodeList* pMergeKeys = NULL; if (TSDB_CODE_SUCCESS == code) { pPartSort->node.pChildren = pChildren; + splSetParent((SLogicNode*)pPartSort); pPartSort->pSortKeys = pSortKeys; code = stbSplCreateMergeKeys(pPartSort->pSortKeys, pPartSort->node.pTargets, &pMergeKeys); } @@ -703,7 +714,12 @@ typedef struct SSigTbJoinSplitInfo { SLogicSubplan* pSubplan; } SSigTbJoinSplitInfo; -static bool sigTbJoinSplNeedSplit(SJoinLogicNode* pJoin) { +static bool sigTbJoinSplNeedSplit(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode)) { + return false; + } + + SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode; if (!pJoin->isSingleTableJoin) { return false; } @@ -711,28 +727,15 @@ static bool sigTbJoinSplNeedSplit(SJoinLogicNode* pJoin) { QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pJoin->node.pChildren, 1)); } -static SJoinLogicNode* sigTbJoinSplMatchByNode(SLogicNode* pNode) { - if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pNode) && sigTbJoinSplNeedSplit((SJoinLogicNode*)pNode)) { - return (SJoinLogicNode*)pNode; - } - SNode* pChild; - FOREACH(pChild, pNode->pChildren) { - SJoinLogicNode* pSplitNode = sigTbJoinSplMatchByNode((SLogicNode*)pChild); - if (NULL != pSplitNode) { - return pSplitNode; - } - } - return NULL; -} - -static bool sigTbJoinSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SSigTbJoinSplitInfo* pInfo) { - SJoinLogicNode* pJoin = sigTbJoinSplMatchByNode(pSubplan->pNode); - if (NULL != pJoin) { - pInfo->pJoin = pJoin; - pInfo->pSplitNode = (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1); +static bool sigTbJoinSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, + SSigTbJoinSplitInfo* pInfo) { + if (sigTbJoinSplNeedSplit(pNode)) { + pInfo->pJoin = (SJoinLogicNode*)pNode; + pInfo->pSplitNode = (SLogicNode*)nodesListGetNode(pNode->pChildren, 1); pInfo->pSubplan = pSubplan; + return true; } - return NULL != pJoin; + return false; } static int32_t singleTableJoinSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { @@ -825,27 +828,14 @@ typedef struct SUnionAllSplitInfo { SLogicSubplan* pSubplan; } SUnionAllSplitInfo; -static SLogicNode* unAllSplMatchByNode(SLogicNode* pNode) { +static bool unAllSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, + SUnionAllSplitInfo* pInfo) { if (QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) { - return pNode; - } - SNode* pChild; - FOREACH(pChild, pNode->pChildren) { - SLogicNode* pSplitNode = unAllSplMatchByNode((SLogicNode*)pChild); - if (NULL != pSplitNode) { - return pSplitNode; - } - } - return NULL; -} - -static bool unAllSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SUnionAllSplitInfo* pInfo) { - SLogicNode* pSplitNode = unAllSplMatchByNode(pSubplan->pNode); - if (NULL != pSplitNode) { - pInfo->pProject = (SProjectLogicNode*)pSplitNode; + pInfo->pProject = (SProjectLogicNode*)pNode; pInfo->pSubplan = pSubplan; + return true; } - return NULL != pSplitNode; + return false; } static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SProjectLogicNode* pProject) { @@ -900,20 +890,6 @@ typedef struct SUnionDistinctSplitInfo { SLogicSubplan* pSubplan; } SUnionDistinctSplitInfo; -static SLogicNode* unDistSplMatchByNode(SLogicNode* pNode) { - if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) { - return pNode; - } - SNode* pChild; - FOREACH(pChild, pNode->pChildren) { - SLogicNode* pSplitNode = unDistSplMatchByNode((SLogicNode*)pChild); - if (NULL != pSplitNode) { - return pSplitNode; - } - } - return NULL; -} - static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SAggLogicNode* pAgg) { SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE); if (NULL == pExchange) { @@ -931,13 +907,14 @@ static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* p return nodesListMakeAppend(&pAgg->node.pChildren, (SNode*)pExchange); } -static bool unDistSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SUnionDistinctSplitInfo* pInfo) { - SLogicNode* pSplitNode = unDistSplMatchByNode(pSubplan->pNode); - if (NULL != pSplitNode) { - pInfo->pAgg = (SAggLogicNode*)pSplitNode; +static bool unDistSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, + SUnionDistinctSplitInfo* pInfo) { + if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) { + pInfo->pAgg = (SAggLogicNode*)pNode; pInfo->pSubplan = pSubplan; + return true; } - return NULL != pSplitNode; + return false; } static int32_t unionDistinctSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { @@ -960,27 +937,14 @@ typedef struct SSmaIndexSplitInfo { SLogicSubplan* pSubplan; } SSmaIndexSplitInfo; -static SLogicNode* smaIdxSplMatchByNode(SLogicNode* pNode) { +static bool smaIdxSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, + SSmaIndexSplitInfo* pInfo) { if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) { - return pNode; - } - SNode* pChild; - FOREACH(pChild, pNode->pChildren) { - SLogicNode* pSplitNode = smaIdxSplMatchByNode((SLogicNode*)pChild); - if (NULL != pSplitNode) { - return pSplitNode; - } - } - return NULL; -} - -static bool smaIdxSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SSmaIndexSplitInfo* pInfo) { - SLogicNode* pSplitNode = smaIdxSplMatchByNode(pSubplan->pNode); - if (NULL != pSplitNode) { - pInfo->pMerge = (SMergeLogicNode*)pSplitNode; + pInfo->pMerge = (SMergeLogicNode*)pNode; pInfo->pSubplan = pSubplan; + return true; } - return NULL != pSplitNode; + return false; } static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { @@ -998,13 +962,47 @@ static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { return code; } +typedef struct SQnodeSplitInfo { + SLogicNode* pSplitNode; + SLogicSubplan* pSubplan; +} SQnodeSplitInfo; + +static bool qndSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, + SQnodeSplitInfo* pInfo) { + if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && NULL != pNode->pParent) { + pInfo->pSplitNode = pNode; + pInfo->pSubplan = pSubplan; + return true; + } + return false; +} + +static int32_t qnodeSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { + if (QUERY_POLICY_QNODE != tsQueryPolicy) { + return TSDB_CODE_SUCCESS; + } + + SQnodeSplitInfo info = {0}; + if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)qndSplFindSplitNode, &info)) { + return TSDB_CODE_SUCCESS; + } + int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType); + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pSplitNode, 0)); + } + ++(pCxt->groupId); + pCxt->split = true; + return code; +} + // clang-format off static const SSplitRule splitRuleSet[] = { {.pName = "SuperTableSplit", .splitFunc = stableSplit}, {.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit}, {.pName = "UnionAllSplit", .splitFunc = unionAllSplit}, {.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit}, - {.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit} + {.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit}, + {.pName = "QnodeSplit", .splitFunc = qnodeSplit} }; // clang-format on diff --git a/source/libs/planner/test/planOtherTest.cpp b/source/libs/planner/test/planOtherTest.cpp index 4bfb9a6fda..f9feb8d1fe 100644 --- a/source/libs/planner/test/planOtherTest.cpp +++ b/source/libs/planner/test/planOtherTest.cpp @@ -83,3 +83,10 @@ TEST_F(PlanOtherTest, delete) { run("DELETE FROM st1 WHERE ts > now - 2d and ts < now - 1d AND tag1 = 10"); } + +TEST_F(PlanOtherTest, queryPolicy) { + useDb("root", "test"); + + tsQueryPolicy = QUERY_POLICY_QNODE; + run("SELECT COUNT(*) FROM st1"); +} diff --git a/source/libs/planner/test/planTestUtil.h b/source/libs/planner/test/planTestUtil.h index 6b75573fb3..b188a7a054 100644 --- a/source/libs/planner/test/planTestUtil.h +++ b/source/libs/planner/test/planTestUtil.h @@ -18,6 +18,10 @@ #include +#define ALLOW_FORBID_FUNC + +#include "planInt.h" + class PlannerTestBaseImpl; struct TAOS_MULTI_BIND;