From 2ad65998107b88498917bcb902500e9912c4efef Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 30 Jun 2022 11:04:24 +0800 Subject: [PATCH] feat: partition by distributed split --- source/libs/planner/src/planLogicCreater.c | 10 +++------- source/libs/planner/src/planPhysiCreater.c | 2 +- source/libs/planner/src/planSpliter.c | 19 ++++++++++++++++--- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 05256e2696..d170482c48 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -154,16 +154,12 @@ static int32_t createSelectRootLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p return createRootLogicNode(pCxt, pSelect, pSelect->precision, (FCreateLogicNode)func, pRoot); } -static EScanType getScanType(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SNodeList* pScanPseudoCols, - SNodeList* pScanCols, int8_t tableType) { +static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols, SNodeList* pScanCols, + int8_t tableType) { if (pCxt->pPlanCxt->topicQuery || pCxt->pPlanCxt->streamQuery) { return SCAN_TYPE_STREAM; } - // if (pSelect->hasLastRowFunc) { - // return SCAN_TYPE_LAST_ROW; - // } - if (NULL == pScanCols) { // select count(*) from t return NULL == pScanPseudoCols @@ -279,7 +275,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect code = rewriteExprsForSelect(pScan->pScanPseudoCols, pSelect, SQL_CLAUSE_FROM); } - pScan->scanType = getScanType(pCxt, pSelect, pScan->pScanPseudoCols, pScan->pScanCols, pScan->tableType); + pScan->scanType = getScanType(pCxt, pScan->pScanPseudoCols, pScan->pScanCols, pScan->tableType); if (TSDB_CODE_SUCCESS == code) { code = addPrimaryKeyCol(pScan->tableId, &pScan->pScanCols); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 46747af3a9..0f19db26a5 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1344,7 +1344,7 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM } } - if (TSDB_CODE_SUCCESS == code) { + if (TSDB_CODE_SUCCESS == code && NULL != pMergeLogicNode->pMergeKeys) { code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys, &pMerge->pMergeKeys); } diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 2bc226804f..60c04c2c30 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -197,8 +197,8 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode); case QUERY_NODE_LOGIC_PLAN_JOIN: return !(((SJoinLogicNode*)pNode)->isSingleTableJoin); - // case QUERY_NODE_LOGIC_PLAN_PARTITION: - // return stbSplHasMultiTbScan(streamQuery, pNode); + case QUERY_NODE_LOGIC_PLAN_PARTITION: + return stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_AGG: return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_WINDOW: @@ -433,7 +433,7 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo SNodeList* pMergeKeys = NULL; code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys); if (TSDB_CODE_SUCCESS == code) { - code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, false); + code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true); } if (TSDB_CODE_SUCCESS != code) { nodesDestroyList(pMergeKeys); @@ -889,6 +889,16 @@ static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) return code; } +static int32_t stbSplSplitPartitionNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + int32_t code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, NULL, pInfo->pSplitNode, true); + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, + (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT)); + } + ++(pCxt->groupId); + return code; +} + static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { if (pCxt->pPlanCxt->rSmaQuery) { return TSDB_CODE_SUCCESS; @@ -907,6 +917,9 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { case QUERY_NODE_LOGIC_PLAN_JOIN: code = stbSplSplitJoinNode(pCxt, &info); break; + case QUERY_NODE_LOGIC_PLAN_PARTITION: + code = stbSplSplitPartitionNode(pCxt, &info); + break; case QUERY_NODE_LOGIC_PLAN_AGG: code = stbSplSplitAggNode(pCxt, &info); break;