feat: partition by distributed split

This commit is contained in:
Xiaoyu Wang 2022-06-30 11:04:24 +08:00
parent cf35174d53
commit 2ad6599810
3 changed files with 20 additions and 11 deletions

View File

@ -154,16 +154,12 @@ static int32_t createSelectRootLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
return createRootLogicNode(pCxt, pSelect, pSelect->precision, (FCreateLogicNode)func, pRoot); return createRootLogicNode(pCxt, pSelect, pSelect->precision, (FCreateLogicNode)func, pRoot);
} }
static EScanType getScanType(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SNodeList* pScanPseudoCols, static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols, SNodeList* pScanCols,
SNodeList* pScanCols, int8_t tableType) { int8_t tableType) {
if (pCxt->pPlanCxt->topicQuery || pCxt->pPlanCxt->streamQuery) { if (pCxt->pPlanCxt->topicQuery || pCxt->pPlanCxt->streamQuery) {
return SCAN_TYPE_STREAM; return SCAN_TYPE_STREAM;
} }
// if (pSelect->hasLastRowFunc) {
// return SCAN_TYPE_LAST_ROW;
// }
if (NULL == pScanCols) { if (NULL == pScanCols) {
// select count(*) from t // select count(*) from t
return NULL == pScanPseudoCols return NULL == pScanPseudoCols
@ -279,7 +275,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
code = rewriteExprsForSelect(pScan->pScanPseudoCols, pSelect, SQL_CLAUSE_FROM); code = rewriteExprsForSelect(pScan->pScanPseudoCols, pSelect, SQL_CLAUSE_FROM);
} }
pScan->scanType = getScanType(pCxt, pSelect, pScan->pScanPseudoCols, pScan->pScanCols, pScan->tableType); pScan->scanType = getScanType(pCxt, pScan->pScanPseudoCols, pScan->pScanCols, pScan->tableType);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = addPrimaryKeyCol(pScan->tableId, &pScan->pScanCols); code = addPrimaryKeyCol(pScan->tableId, &pScan->pScanCols);

View File

@ -1344,7 +1344,7 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM
} }
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code && NULL != pMergeLogicNode->pMergeKeys) {
code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys, code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys,
&pMerge->pMergeKeys); &pMerge->pMergeKeys);
} }

View File

@ -197,8 +197,8 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode); return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
case QUERY_NODE_LOGIC_PLAN_JOIN: case QUERY_NODE_LOGIC_PLAN_JOIN:
return !(((SJoinLogicNode*)pNode)->isSingleTableJoin); return !(((SJoinLogicNode*)pNode)->isSingleTableJoin);
// case QUERY_NODE_LOGIC_PLAN_PARTITION: case QUERY_NODE_LOGIC_PLAN_PARTITION:
// return stbSplHasMultiTbScan(streamQuery, pNode); return stbSplHasMultiTbScan(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_AGG: case QUERY_NODE_LOGIC_PLAN_AGG:
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_WINDOW: case QUERY_NODE_LOGIC_PLAN_WINDOW:
@ -433,7 +433,7 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
SNodeList* pMergeKeys = NULL; SNodeList* pMergeKeys = NULL;
code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys); code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, false); code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true);
} }
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pMergeKeys); nodesDestroyList(pMergeKeys);
@ -889,6 +889,16 @@ static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
return code; return code;
} }
static int32_t stbSplSplitPartitionNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
int32_t code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, NULL, pInfo->pSplitNode, true);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
}
++(pCxt->groupId);
return code;
}
static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
if (pCxt->pPlanCxt->rSmaQuery) { if (pCxt->pPlanCxt->rSmaQuery) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -907,6 +917,9 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
case QUERY_NODE_LOGIC_PLAN_JOIN: case QUERY_NODE_LOGIC_PLAN_JOIN:
code = stbSplSplitJoinNode(pCxt, &info); code = stbSplSplitJoinNode(pCxt, &info);
break; break;
case QUERY_NODE_LOGIC_PLAN_PARTITION:
code = stbSplSplitPartitionNode(pCxt, &info);
break;
case QUERY_NODE_LOGIC_PLAN_AGG: case QUERY_NODE_LOGIC_PLAN_AGG:
code = stbSplSplitAggNode(pCxt, &info); code = stbSplSplitAggNode(pCxt, &info);
break; break;