From a334337755b6a87c083aca8bfc8628a48914edca Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Mon, 27 Jun 2022 15:57:02 +0800 Subject: [PATCH] feat: merge and merge scan add group sort field --- include/libs/nodes/plannodes.h | 9 +++-- source/libs/executor/src/scanoperator.c | 4 +- source/libs/nodes/src/nodesCloneFuncs.c | 5 ++- source/libs/nodes/src/nodesCodeFuncs.c | 26 +++++++++--- source/libs/nodes/src/nodesUtilFuncs.c | 4 +- source/libs/planner/src/planOptimizer.c | 14 +++---- source/libs/planner/src/planPhysiCreater.c | 6 ++- source/libs/planner/src/planSpliter.c | 47 +++++++++++++++------- 8 files changed, 77 insertions(+), 38 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 4b84850cc0..0bd917a9c6 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -76,8 +76,8 @@ typedef struct SScanLogicNode { int16_t tsColId; double filesFactor; SArray* pSmaIndexes; - SNodeList* pPartTags; - bool partSort; + SNodeList* pGroupTags; + bool groupSort; } SScanLogicNode; typedef struct SJoinLogicNode { @@ -141,6 +141,7 @@ typedef struct SMergeLogicNode { SNodeList* pInputs; int32_t numOfChannels; int32_t srcGroupId; + bool groupSort; } SMergeLogicNode; typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW_TYPE_STATE } EWindowType; @@ -284,7 +285,8 @@ typedef struct STableScanPhysiNode { double ratio; int32_t dataRequired; SNodeList* pDynamicScanFuncs; - SNodeList* pPartitionTags; + SNodeList* pGroupTags; + bool groupSort; int64_t interval; int64_t offset; int64_t sliding; @@ -358,6 +360,7 @@ typedef struct SMergePhysiNode { SNodeList* pTargets; int32_t numOfChannels; int32_t srcGroupId; + bool groupSort; } SMergePhysiNode; typedef struct SWinodwPhysiNode { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ac57d3a9da..47a195dc85 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2051,7 +2051,7 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId); return TSDB_CODE_SUCCESS; } - code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pPartitionTags); + code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pGroupTags); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2455,7 +2455,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN if (pInfo == NULL || pOperator == NULL) { goto _error; } - if (pTableScanNode->pPartitionTags) { + if (pTableScanNode->pGroupTags) { taosArraySort(pTableListInfo->pTableList, compareTableKeyInfoByGid); } diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index dae9e4b33e..9d8baf472b 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -351,7 +351,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { COPY_SCALAR_FIELD(watermark); COPY_SCALAR_FIELD(tsColId); COPY_SCALAR_FIELD(filesFactor); - CLONE_NODE_LIST_FIELD(pPartTags); + CLONE_NODE_LIST_FIELD(pGroupTags); return TSDB_CODE_SUCCESS; } @@ -401,6 +401,7 @@ static int32_t logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst CLONE_NODE_LIST_FIELD(pInputs); COPY_SCALAR_FIELD(numOfChannels); COPY_SCALAR_FIELD(srcGroupId); + COPY_SCALAR_FIELD(groupSort); return TSDB_CODE_SUCCESS; } @@ -501,7 +502,7 @@ static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhy COPY_SCALAR_FIELD(ratio); COPY_SCALAR_FIELD(dataRequired); CLONE_NODE_LIST_FIELD(pDynamicScanFuncs); - CLONE_NODE_LIST_FIELD(pPartitionTags); + CLONE_NODE_LIST_FIELD(pGroupTags); COPY_SCALAR_FIELD(interval); COPY_SCALAR_FIELD(offset); COPY_SCALAR_FIELD(sliding); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 547c78f485..df7429bd88 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -541,7 +541,7 @@ static const char* jkScanLogicPlanScanPseudoCols = "ScanPseudoCols"; static const char* jkScanLogicPlanTableId = "TableId"; static const char* jkScanLogicPlanTableType = "TableType"; static const char* jkScanLogicPlanTagCond = "TagCond"; -static const char* jkScanLogicPlanPartTags = "PartTags"; +static const char* jkScanLogicPlanGroupTags = "GroupTags"; static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { const SScanLogicNode* pNode = (const SScanLogicNode*)pObj; @@ -563,7 +563,7 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { code = tjsonAddObject(pJson, jkScanLogicPlanTagCond, nodeToJson, pNode->pTagCond); } if (TSDB_CODE_SUCCESS == code) { - code = nodeListToJson(pJson, jkScanLogicPlanPartTags, pNode->pPartTags); + code = nodeListToJson(pJson, jkScanLogicPlanGroupTags, pNode->pGroupTags); } return code; @@ -590,7 +590,7 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) { code = jsonToNodeObject(pJson, jkScanLogicPlanTagCond, &pNode->pTagCond); } if (TSDB_CODE_SUCCESS == code) { - code = jsonToNodeList(pJson, jkScanLogicPlanPartTags, &pNode->pPartTags); + code = jsonToNodeList(pJson, jkScanLogicPlanGroupTags, &pNode->pGroupTags); } return code; @@ -1432,7 +1432,8 @@ static const char* jkTableScanPhysiPlanTriggerType = "triggerType"; static const char* jkTableScanPhysiPlanWatermark = "watermark"; static const char* jkTableScanPhysiPlanTsColId = "tsColId"; static const char* jkTableScanPhysiPlanFilesFactor = "FilesFactor"; -static const char* jkTableScanPhysiPlanPartitionTags = "PartitionTags"; +static const char* jkTableScanPhysiPlanGroupTags = "GroupTags"; +static const char* jkTableScanPhysiPlanGroupSort = "GroupSort"; static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj; @@ -1487,7 +1488,10 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { code = tjsonAddDoubleToObject(pJson, jkTableScanPhysiPlanFilesFactor, pNode->filesFactor); } if (TSDB_CODE_SUCCESS == code) { - code = nodeListToJson(pJson, jkTableScanPhysiPlanPartitionTags, pNode->pPartitionTags); + code = nodeListToJson(pJson, jkTableScanPhysiPlanGroupTags, pNode->pGroupTags); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanGroupSort, pNode->groupSort); } return code; @@ -1546,7 +1550,10 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { code = tjsonGetDoubleValue(pJson, jkTableScanPhysiPlanFilesFactor, &pNode->filesFactor); } if (TSDB_CODE_SUCCESS == code) { - code = jsonToNodeList(pJson, jkTableScanPhysiPlanPartitionTags, &pNode->pPartitionTags); + code = jsonToNodeList(pJson, jkTableScanPhysiPlanGroupTags, &pNode->pGroupTags); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanGroupSort, &pNode->groupSort); } return code; @@ -1727,6 +1734,7 @@ static const char* jkMergePhysiPlanMergeKeys = "MergeKeys"; static const char* jkMergePhysiPlanTargets = "Targets"; static const char* jkMergePhysiPlanNumOfChannels = "NumOfChannels"; static const char* jkMergePhysiPlanSrcGroupId = "SrcGroupId"; +static const char* jkMergePhysiPlanGroupSort = "GroupSort"; static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) { const SMergePhysiNode* pNode = (const SMergePhysiNode*)pObj; @@ -1744,6 +1752,9 @@ static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkMergePhysiPlanSrcGroupId, pNode->srcGroupId); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkMergePhysiPlanGroupSort, pNode->groupSort); + } return code; } @@ -1764,6 +1775,9 @@ static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetIntValue(pJson, jkMergePhysiPlanSrcGroupId, &pNode->srcGroupId); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkMergePhysiPlanGroupSort, &pNode->groupSort); + } return code; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index d47c8c0816..be6faa92cb 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -711,7 +711,7 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pLogicNode->pTagCond); nodesDestroyNode(pLogicNode->pTagIndexCond); taosArrayDestroy(pLogicNode->pSmaIndexes); - nodesDestroyList(pLogicNode->pPartTags); + nodesDestroyList(pLogicNode->pGroupTags); break; } case QUERY_NODE_LOGIC_PLAN_JOIN: { @@ -815,7 +815,7 @@ void nodesDestroyNode(SNode* pNode) { STableScanPhysiNode* pPhyNode = (STableScanPhysiNode*)pNode; destroyScanPhysiNode((SScanPhysiNode*)pNode); nodesDestroyList(pPhyNode->pDynamicScanFuncs); - nodesDestroyList(pPhyNode->pPartitionTags); + nodesDestroyList(pPhyNode->pGroupTags); break; } case QUERY_NODE_PHYSICAL_PLAN_PROJECT: { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 2493e966ec..53c9ef8dd0 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -20,7 +20,7 @@ #define OPTIMIZE_FLAG_MASK(n) (1 << n) -#define OPTIMIZE_FLAG_OSD OPTIMIZE_FLAG_MASK(0) +#define OPTIMIZE_FLAG_SCAN_PATH OPTIMIZE_FLAG_MASK(0) #define OPTIMIZE_FLAG_PUSH_DOWN_CONDE OPTIMIZE_FLAG_MASK(1) #define OPTIMIZE_FLAG_SET_MASK(val, mask) (val) |= (mask) @@ -91,7 +91,7 @@ static bool scanPathOptHaveNormalCol(SNodeList* pList) { } static bool scanPathOptMayBeOptimized(SLogicNode* pNode) { - if (OPTIMIZE_FLAG_TEST_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_OSD)) { + if (OPTIMIZE_FLAG_TEST_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_SCAN_PATH)) { return false; } if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode)) { @@ -241,7 +241,7 @@ static int32_t scanPathOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub if (TSDB_CODE_SUCCESS == code && (NULL != info.pDsoFuncs || NULL != info.pSdrFuncs)) { info.pScan->dataRequired = scanPathOptGetDataRequired(info.pSdrFuncs); info.pScan->pDynamicScanFuncs = info.pDsoFuncs; - OPTIMIZE_FLAG_SET_MASK(info.pScan->node.optimizedFlag, OPTIMIZE_FLAG_OSD); + OPTIMIZE_FLAG_SET_MASK(info.pScan->node.optimizedFlag, OPTIMIZE_FLAG_SCAN_PATH); pCxt->optimized = true; } nodesDestroyList(info.pSdrFuncs); @@ -1073,7 +1073,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub int32_t code = TSDB_CODE_SUCCESS; SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0); if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) { - TSWAP(((SPartitionLogicNode*)pNode)->pPartitionKeys, pScan->pPartTags); + TSWAP(((SPartitionLogicNode*)pNode)->pPartitionKeys, pScan->pGroupTags); int32_t code = replaceLogicNode(pLogicSubplan, pNode, (SLogicNode*)pScan); if (TSDB_CODE_SUCCESS == code) { NODES_CLEAR_LIST(pNode->pChildren); @@ -1083,7 +1083,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub SNode* pGroupKey = NULL; FOREACH(pGroupKey, ((SAggLogicNode*)pNode)->pGroupKeys) { code = nodesListMakeStrictAppend( - &pScan->pPartTags, nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pGroupKey)->pParameterList, 0))); + &pScan->pGroupTags, nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pGroupKey)->pParameterList, 0))); if (TSDB_CODE_SUCCESS != code) { break; } @@ -1091,7 +1091,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub NODES_DESTORY_LIST(((SAggLogicNode*)pNode)->pGroupKeys); } if (TSDB_CODE_SUCCESS == code) { - code = partTagsOptRebuildTbanme(pScan->pPartTags); + code = partTagsOptRebuildTbanme(pScan->pGroupTags); } return code; } @@ -1212,7 +1212,7 @@ static bool rewriteTailOptNeedGroupSort(SIndefRowsFuncLogicNode* pIndef) { } SNode* pChild = nodesListGetNode(pIndef->node.pChildren, 0); return QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pChild) || - (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && NULL != ((SScanLogicNode*)pChild)->pPartTags); + (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && NULL != ((SScanLogicNode*)pChild)->pGroupTags); } static int32_t rewriteTailOptCreateSort(SIndefRowsFuncLogicNode* pIndef, SLogicNode** pOutput) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index dee316f8aa..46747af3a9 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -520,12 +520,13 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName); pTableScan->dataRequired = pScanLogicNode->dataRequired; pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs); - pTableScan->pPartitionTags = nodesCloneList(pScanLogicNode->pPartTags); + pTableScan->pGroupTags = nodesCloneList(pScanLogicNode->pGroupTags); if ((NULL != pScanLogicNode->pDynamicScanFuncs && NULL == pTableScan->pDynamicScanFuncs) || - (NULL != pScanLogicNode->pPartTags && NULL == pTableScan->pPartitionTags)) { + (NULL != pScanLogicNode->pGroupTags && NULL == pTableScan->pGroupTags)) { nodesDestroyNode((SNode*)pTableScan); return TSDB_CODE_OUT_OF_MEMORY; } + pTableScan->groupSort = pScanLogicNode->groupSort; pTableScan->interval = pScanLogicNode->interval; pTableScan->offset = pScanLogicNode->offset; pTableScan->sliding = pScanLogicNode->sliding; @@ -1330,6 +1331,7 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM pMerge->numOfChannels = pMergeLogicNode->numOfChannels; pMerge->srcGroupId = pMergeLogicNode->srcGroupId; + pMerge->groupSort = pMergeLogicNode->groupSort; int32_t code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc); diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 500184718d..49ab50f913 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -362,7 +362,7 @@ static int32_t stbSplGetNumOfVgroups(SLogicNode* pNode) { } static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode, - SNodeList* pMergeKeys, SLogicNode* pPartChild) { + SNodeList* pMergeKeys, SLogicNode* pPartChild, bool groupSort) { SMergeLogicNode* pMerge = (SMergeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE); if (NULL == pMerge) { return TSDB_CODE_OUT_OF_MEMORY; @@ -371,6 +371,7 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla pMerge->srcGroupId = pCxt->groupId; pMerge->node.precision = pPartChild->precision; pMerge->pMergeKeys = pMergeKeys; + pMerge->groupSort = groupSort; int32_t code = TSDB_CODE_SUCCESS; pMerge->pInputs = nodesCloneList(pPartChild->pTargets); @@ -430,7 +431,7 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo SNodeList* pMergeKeys = NULL; code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys); if (TSDB_CODE_SUCCESS == code) { - code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow); + code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, false); } if (TSDB_CODE_SUCCESS != code) { nodesDestroyList(pMergeKeys); @@ -497,12 +498,16 @@ static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo return code; } -static void splSetTableScanType(SLogicNode* pNode, EScanType scanType) { +static void stbSplSetTableMergeScan(SLogicNode* pNode) { if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { - ((SScanLogicNode*)pNode)->scanType = scanType; + SScanLogicNode* pScan = (SScanLogicNode*)pNode; + pScan->scanType = SCAN_TYPE_TABLE_MERGE; + if (NULL != pScan->pGroupTags) { + pScan->groupSort = true; + } } else { if (1 == LIST_LENGTH(pNode->pChildren)) { - splSetTableScanType((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), scanType); + stbSplSetTableMergeScan((SLogicNode*)nodesListGetNode(pNode->pChildren, 0)); } } } @@ -515,7 +520,7 @@ static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSpl int32_t code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk, &pMergeKeys); if (TSDB_CODE_SUCCESS == code) { - code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild); + code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild, true); } if (TSDB_CODE_SUCCESS == code) { @@ -524,13 +529,10 @@ static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSpl } if (TSDB_CODE_SUCCESS == code) { - splSetTableScanType(pChild, SCAN_TYPE_TABLE_MERGE); - ++(pCxt->groupId); - } - - if (TSDB_CODE_SUCCESS == code) { + stbSplSetTableMergeScan(pChild); pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT); + ++(pCxt->groupId); } else { nodesDestroyList(pMergeKeys); } @@ -560,7 +562,7 @@ static int32_t stbSplSplitState(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) { if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { - return ((SScanLogicNode*)pNode)->pPartTags; + return ((SScanLogicNode*)pNode)->pGroupTags; } else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) { return ((SPartitionLogicNode*)pNode)->pPartitionKeys; } else { @@ -790,12 +792,29 @@ static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOut return code; } +static void stbSplSetScanPartSort(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { + SScanLogicNode* pScan = (SScanLogicNode*)pNode; + if (NULL != pScan->pGroupTags) { + pScan->groupSort = true; + } + } else { + if (1 == LIST_LENGTH(pNode->pChildren)) { + stbSplSetScanPartSort((SLogicNode*)nodesListGetNode(pNode->pChildren, 0)); + } + } +} + static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { SLogicNode* pPartSort = NULL; SNodeList* pMergeKeys = NULL; + bool groupSort = ((SSortLogicNode*)pInfo->pSplitNode)->groupSort; int32_t code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort, &pMergeKeys); if (TSDB_CODE_SUCCESS == code) { - code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort); + code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort, groupSort); + } + if (TSDB_CODE_SUCCESS == code && groupSort) { + stbSplSetScanPartSort(pPartSort); } if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, @@ -830,7 +849,7 @@ static int32_t stbSplSplitScanNodeForJoin(SSplitContext* pCxt, SLogicSubplan* pS SNodeList* pMergeKeys = NULL; int32_t code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pScan), &pMergeKeys); if (TSDB_CODE_SUCCESS == code) { - code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, (SLogicNode*)pScan); + code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, (SLogicNode*)pScan, false); } if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(&pSubplan->pChildren,