From f77c72c25a5500250830f24ff81a688ba654ee67 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Mon, 6 Jun 2022 17:51:25 +0800 Subject: [PATCH] feat: order by distributed split --- include/libs/nodes/nodes.h | 2 +- include/libs/nodes/plannodes.h | 8 +- source/dnode/mnode/impl/src/mndTopic.c | 2 +- source/libs/nodes/src/nodesCloneFuncs.c | 2 + source/libs/nodes/src/nodesCodeFuncs.c | 18 ++ source/libs/nodes/src/nodesUtilFuncs.c | 4 +- source/libs/parser/test/parInitialDTest.cpp | 2 +- source/libs/planner/src/planLogicCreater.c | 2 +- source/libs/planner/src/planPhysiCreater.c | 24 ++- source/libs/planner/src/planSpliter.c | 190 +++++++++++++------ source/libs/planner/test/planOrderByTest.cpp | 3 + 11 files changed, 186 insertions(+), 71 deletions(-) diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 29fabb5418..6f52f8dab0 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -211,7 +211,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_MERGE, QUERY_NODE_PHYSICAL_PLAN_SORT, QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, - QUERY_NODE_PHYSICAL_PLAN_SORT_MERGE_INTERVAL, + QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 38c3f150ef..d646effa58 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -42,6 +42,7 @@ typedef struct SScanLogicNode { SNodeList* pScanPseudoCols; int8_t tableType; uint64_t tableId; + uint64_t stableId; SVgroupsInfo* pVgroupList; EScanType scanType; uint8_t scanSeq[2]; // first is scan count, and second is reverse scan count @@ -109,6 +110,7 @@ typedef struct SExchangeLogicNode { typedef struct SMergeLogicNode { SLogicNode node; SNodeList* pMergeKeys; + SNodeList* pInputs; int32_t numOfChannels; int32_t srcGroupId; } SMergeLogicNode; @@ -117,7 +119,7 @@ typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW typedef enum EIntervalAlgorithm { INTERVAL_ALGO_HASH = 1, - INTERVAL_ALGO_SORT_MERGE, + INTERVAL_ALGO_MERGE, INTERVAL_ALGO_STREAM_FINAL, INTERVAL_ALGO_STREAM_SEMI, INTERVAL_ALGO_STREAM_SINGLE, @@ -220,6 +222,7 @@ typedef struct SScanPhysiNode { SNodeList* pScanCols; SNodeList* pScanPseudoCols; uint64_t uid; // unique id of the table + uint64_t suid; int8_t tableType; SName tableName; } SScanPhysiNode; @@ -296,6 +299,7 @@ typedef struct SExchangePhysiNode { typedef struct SMergePhysiNode { SPhysiNode node; SNodeList* pMergeKeys; + SNodeList* pTargets; int32_t numOfChannels; int32_t srcGroupId; } SMergePhysiNode; @@ -319,7 +323,7 @@ typedef struct SIntervalPhysiNode { int8_t slidingUnit; } SIntervalPhysiNode; -typedef SIntervalPhysiNode SSortMergeIntervalPhysiNode; +typedef SIntervalPhysiNode SMergeIntervalPhysiNode; typedef SIntervalPhysiNode SStreamIntervalPhysiNode; typedef SIntervalPhysiNode SStreamFinalIntervalPhysiNode; typedef SIntervalPhysiNode SStreamSemiIntervalPhysiNode; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index a810a6a487..060bd94889 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -91,7 +91,7 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId } SHashObj *pColHash = NULL; - SNodeList *pNodeList; + SNodeList *pNodeList = NULL; nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList); SNode *pNode = NULL; FOREACH(pNode, pNodeList) { diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index f277d30eb1..657ffd32a5 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -318,6 +318,7 @@ static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { CLONE_NODE_LIST_FIELD(pScanPseudoCols); COPY_SCALAR_FIELD(tableType); COPY_SCALAR_FIELD(tableId); + COPY_SCALAR_FIELD(stableId); CLONE_OBJECT_FIELD(pVgroupList, vgroupsInfoClone); COPY_SCALAR_FIELD(scanType); COPY_OBJECT_FIELD(scanSeq[0], sizeof(uint8_t) * 2); @@ -387,6 +388,7 @@ static SNode* logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNo static SNode* logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); CLONE_NODE_LIST_FIELD(pMergeKeys); + CLONE_NODE_LIST_FIELD(pInputs); COPY_SCALAR_FIELD(numOfChannels); COPY_SCALAR_FIELD(srcGroupId); return (SNode*)pDst; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index c54289bc56..246ee2bfed 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -230,6 +230,8 @@ const char* nodesNodeName(ENodeType type) { return "PhysiSort"; case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: return "PhysiHashInterval"; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: + return "PhysiMergeInterval"; case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: return "PhysiStreamInterval"; case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: @@ -1249,6 +1251,7 @@ static int32_t jsonToName(const SJson* pJson, void* pObj) { static const char* jkScanPhysiPlanScanCols = "ScanCols"; static const char* jkScanPhysiPlanScanPseudoCols = "ScanPseudoCols"; static const char* jkScanPhysiPlanTableId = "TableId"; +static const char* jkScanPhysiPlanSTableId = "STableId"; static const char* jkScanPhysiPlanTableType = "TableType"; static const char* jkScanPhysiPlanTableName = "TableName"; @@ -1265,6 +1268,9 @@ static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableId, pNode->uid); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanSTableId, pNode->suid); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableType, pNode->tableType); } @@ -1288,6 +1294,9 @@ static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetUBigIntValue(pJson, jkScanPhysiPlanTableId, &pNode->uid); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetUBigIntValue(pJson, jkScanPhysiPlanSTableId, &pNode->suid); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonGetTinyIntValue(pJson, jkScanPhysiPlanTableType, &pNode->tableType); } @@ -1644,6 +1653,7 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) { } static const char* jkMergePhysiPlanMergeKeys = "MergeKeys"; +static const char* jkMergePhysiPlanTargets = "Targets"; static const char* jkMergePhysiPlanNumOfChannels = "NumOfChannels"; static const char* jkMergePhysiPlanSrcGroupId = "SrcGroupId"; @@ -1654,6 +1664,9 @@ static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkMergePhysiPlanMergeKeys, pNode->pMergeKeys); } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkMergePhysiPlanTargets, pNode->pTargets); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkMergePhysiPlanNumOfChannels, pNode->numOfChannels); } @@ -1671,6 +1684,9 @@ static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkMergePhysiPlanMergeKeys, &pNode->pMergeKeys); } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkMergePhysiPlanTargets, &pNode->pTargets); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonGetIntValue(pJson, jkMergePhysiPlanNumOfChannels, &pNode->numOfChannels); } @@ -3802,6 +3818,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_PHYSICAL_PLAN_SORT: return physiSortNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: @@ -3930,6 +3947,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { case QUERY_NODE_PHYSICAL_PLAN_SORT: return jsonToPhysiSortNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 16ce4d7a48..0aec211e4c 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -260,8 +260,8 @@ SNodeptr nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SSortPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: return makeNode(type, sizeof(SIntervalPhysiNode)); - case QUERY_NODE_PHYSICAL_PLAN_SORT_MERGE_INTERVAL: - return makeNode(type, sizeof(SSortMergeIntervalPhysiNode)); + case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: + return makeNode(type, sizeof(SMergeIntervalPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: return makeNode(type, sizeof(SStreamIntervalPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: diff --git a/source/libs/parser/test/parInitialDTest.cpp b/source/libs/parser/test/parInitialDTest.cpp index d4f9a540ae..528d465dbd 100644 --- a/source/libs/parser/test/parInitialDTest.cpp +++ b/source/libs/parser/test/parInitialDTest.cpp @@ -51,7 +51,7 @@ TEST_F(ParserInitialDTest, dropBnode) { } // DROP CONSUMER GROUP [ IF EXISTS ] cgroup_name ON topic_name -TEST_F(ParserInitialDTest, dropCGroup) { +TEST_F(ParserInitialDTest, dropConsumerGroup) { useDb("root", "test"); SMDropCgroupReq expect = {0}; diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index c3aad632c7..d37e22b316 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -219,9 +219,9 @@ static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealT return TSDB_CODE_OUT_OF_MEMORY; } - // TSWAP(pScan->pMeta, pRealTable->pMeta); TSWAP(pScan->pVgroupList, pRealTable->pVgroupList); pScan->tableId = pRealTable->pMeta->uid; + pScan->stableId = pRealTable->pMeta->suid; pScan->tableType = pRealTable->pMeta->tableType; pScan->scanSeq[0] = hasRepeatScanFuncs ? 2 : 1; pScan->scanSeq[1] = 0; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 3c000e3ff7..b2a675dbb1 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -425,6 +425,7 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pS if (TSDB_CODE_SUCCESS == code) { pScanPhysiNode->uid = pScanLogicNode->tableId; + pScanPhysiNode->suid = pScanLogicNode->stableId; pScanPhysiNode->tableType = pScanLogicNode->tableType; memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName)); if (NULL != pScanLogicNode->pTagCond) { @@ -923,8 +924,8 @@ static ENodeType getIntervalOperatorType(EIntervalAlgorithm intervalAlgo) { switch (intervalAlgo) { case INTERVAL_ALGO_HASH: return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL; - case INTERVAL_ALGO_SORT_MERGE: - return QUERY_NODE_PHYSICAL_PLAN_SORT_MERGE_INTERVAL; + case INTERVAL_ALGO_MERGE: + return QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL; case INTERVAL_ALGO_STREAM_FINAL: return QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL; case INTERVAL_ALGO_STREAM_SEMI: @@ -1155,6 +1156,8 @@ static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) { nodesDestroyNode(pExchange); return TSDB_CODE_OUT_OF_MEMORY; } + SNode* pSlot = NULL; + FOREACH(pSlot, pExchange->node.pOutputDataBlockDesc->pSlots) { ((SSlotDescNode*)pSlot)->output = true; } return nodesListMakeStrictAppend(&pMerge->node.pChildren, pExchange); } @@ -1168,12 +1171,14 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM pMerge->numOfChannels = pMergeLogicNode->numOfChannels; pMerge->srcGroupId = pMergeLogicNode->srcGroupId; - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc); - for (int32_t i = 0; i < pMerge->numOfChannels; ++i) { - code = createExchangePhysiNodeByMerge(pMerge); - if (TSDB_CODE_SUCCESS != code) { - break; + if (TSDB_CODE_SUCCESS == code) { + for (int32_t i = 0; i < pMerge->numOfChannels; ++i) { + code = createExchangePhysiNodeByMerge(pMerge); + if (TSDB_CODE_SUCCESS != code) { + break; + } } } @@ -1182,6 +1187,11 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM &pMerge->pMergeKeys); } + if (TSDB_CODE_SUCCESS == code) { + code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->node.pTargets, + &pMerge->pTargets); + } + if (TSDB_CODE_SUCCESS == code) { *pPhyNode = (SPhysiNode*)pMerge; } else { diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 9e6c25ce78..7be33d54e3 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -80,30 +80,36 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SE return TSDB_CODE_SUCCESS; } -static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode, - ESubplanType subplanType) { - SExchangeLogicNode* pExchange = NULL; - if (TSDB_CODE_SUCCESS != splCreateExchangeNode(pCxt, pSplitNode, &pExchange)) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - pSubplan->subplanType = subplanType; - - if (NULL == pSplitNode->pParent) { - pSubplan->pNode = (SLogicNode*)pExchange; +static int32_t splReplaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* pNew) { + if (NULL == pOld->pParent) { + pSubplan->pNode = (SLogicNode*)pNew; return TSDB_CODE_SUCCESS; } SNode* pNode; - FOREACH(pNode, pSplitNode->pParent->pChildren) { - if (nodesEqualNode(pNode, pSplitNode)) { - REPLACE_NODE(pExchange); - pExchange->node.pParent = pSplitNode->pParent; + FOREACH(pNode, pOld->pParent->pChildren) { + if (nodesEqualNode(pNode, pOld)) { + REPLACE_NODE(pNew); + pNew->pParent = pOld->pParent; return TSDB_CODE_SUCCESS; } } - nodesDestroyNode(pExchange); - return TSDB_CODE_FAILED; + return TSDB_CODE_PLAN_INTERNAL_ERROR; +} + +static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode, + ESubplanType subplanType) { + SExchangeLogicNode* pExchange = NULL; + int32_t code = splCreateExchangeNode(pCxt, pSplitNode, &pExchange); + if (TSDB_CODE_SUCCESS == code) { + code = splReplaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pExchange); + } + if (TSDB_CODE_SUCCESS == code) { + pSubplan->subplanType = subplanType; + } else { + nodesDestroyNode(pExchange); + } + return code; } static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag, FSplFindSplitNode func, void* pInfo) { @@ -295,24 +301,34 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic return code; } -static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, SNodeList* pMergeKeys, - SLogicNode* pPartChild) { +static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode, + SNodeList* pMergeKeys, SLogicNode* pPartChild) { SMergeLogicNode* pMerge = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE); if (NULL == pMerge) { return TSDB_CODE_OUT_OF_MEMORY; } pMerge->numOfChannels = ((SScanLogicNode*)nodesListGetNode(pPartChild->pChildren, 0))->pVgroupList->numOfVgroups; pMerge->srcGroupId = pCxt->groupId; - pMerge->node.pParent = pParent; pMerge->node.precision = pPartChild->precision; pMerge->pMergeKeys = pMergeKeys; - pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets); - if (NULL == pMerge->node.pTargets) { - nodesDestroyNode(pMerge); - return TSDB_CODE_OUT_OF_MEMORY; - } - return nodesListMakeAppend(&pParent->pChildren, pMerge); + int32_t code = TSDB_CODE_SUCCESS; + pMerge->pInputs = nodesCloneList(pPartChild->pTargets); + pMerge->node.pTargets = nodesCloneList(pSplitNode->pTargets); + if (NULL == pMerge->node.pTargets || NULL == pMerge->pInputs) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + if (TSDB_CODE_SUCCESS == code) { + if (NULL == pSubplan) { + code = nodesListMakeAppend(&pSplitNode->pChildren, pMerge); + } else { + code = splReplaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge); + } + } + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyNode(pMerge); + } + return code; } static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent, SLogicNode* pPartChild) { @@ -329,8 +345,15 @@ static int32_t stbSplSplitWindowNodeForBatch(SSplitContext* pCxt, SStableSplitIn int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow); if (TSDB_CODE_SUCCESS == code) { ((SWindowLogicNode*)pPartWindow)->intervalAlgo = INTERVAL_ALGO_HASH; - ((SWindowLogicNode*)pInfo->pSplitNode)->intervalAlgo = INTERVAL_ALGO_SORT_MERGE; - code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow); + ((SWindowLogicNode*)pInfo->pSplitNode)->intervalAlgo = INTERVAL_ALGO_MERGE; + SNodeList* pMergeKeys = NULL; + code = nodesListMakeStrictAppend(&pMergeKeys, nodesCloneNode(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk)); + if (TSDB_CODE_SUCCESS == code) { + code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow); + } + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyList(pMergeKeys); + } } if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, @@ -424,37 +447,99 @@ static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) return code; } -static int32_t stbSplCreatePartSortNode(SSortLogicNode* pMergeSort, SLogicNode** pOutput) { - SNodeList* pSortKeys = pMergeSort->pSortKeys; - pMergeSort->pSortKeys = NULL; - SNodeList* pTargets = pMergeSort->node.pTargets; - pMergeSort->node.pTargets = NULL; - SNodeList* pChildren = pMergeSort->node.pChildren; - pMergeSort->node.pChildren = NULL; +static SNode* stbSplCreateColumnNode(SExprNode* pExpr) { + SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN); + if (NULL == pCol) { + return NULL; + } + if (QUERY_NODE_COLUMN == nodeType(pExpr)) { + strcpy(pCol->tableAlias, ((SColumnNode*)pExpr)->tableAlias); + } + strcpy(pCol->colName, pExpr->aliasName); + strcpy(pCol->node.aliasName, pExpr->aliasName); + pCol->node.resType = pExpr->resType; + return (SNode*)pCol; +} + +static SNode* stbSplCreateOrderByExpr(SOrderByExprNode* pSortKey, SNode* pCol) { + SOrderByExprNode* pOutput = nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR); + if (NULL == pOutput) { + return NULL; + } + pOutput->pExpr = nodesCloneNode(pCol); + if (NULL == pOutput->pExpr) { + nodesDestroyNode(pOutput); + return NULL; + } + pOutput->order = pSortKey->order; + pOutput->nullOrder = pSortKey->nullOrder; + return (SNode*)pOutput; +} + +static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets, SNodeList** pOutput) { + int32_t code = TSDB_CODE_SUCCESS; + SNodeList* pMergeKeys = NULL; + SNode* pNode = NULL; + FOREACH(pNode, pSortKeys) { + SOrderByExprNode* pSortKey = (SOrderByExprNode*)pNode; + SNode* pTarget = NULL; + bool found = false; + FOREACH(pTarget, pTargets) { + if (0 == strcmp(((SExprNode*)pSortKey->pExpr)->aliasName, ((SColumnNode*)pTarget)->colName)) { + code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pTarget)); + if (TSDB_CODE_SUCCESS != code) { + break; + } + found = true; + } + } + if (TSDB_CODE_SUCCESS == code && !found) { + SNode* pCol = stbSplCreateColumnNode((SExprNode*)pSortKey->pExpr); + code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pCol)); + if (TSDB_CODE_SUCCESS == code) { + code = nodesListStrictAppend(pTargets, pCol); + } else { + nodesDestroyNode(pCol); + } + } + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + if (TSDB_CODE_SUCCESS == code) { + *pOutput = pMergeKeys; + } else { + nodesDestroyList(pMergeKeys); + } + return code; +} + +static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOutputPartSort, + SNodeList** pOutputMergeKeys) { + SNodeList* pSortKeys = pSort->pSortKeys; + pSort->pSortKeys = NULL; + SNodeList* pChildren = pSort->node.pChildren; + pSort->node.pChildren = NULL; int32_t code = TSDB_CODE_SUCCESS; - SSortLogicNode* pPartSort = nodesCloneNode(pMergeSort); + SSortLogicNode* pPartSort = nodesCloneNode(pSort); if (NULL == pPartSort) { code = TSDB_CODE_OUT_OF_MEMORY; } - pMergeSort->node.pTargets = pTargets; - pPartSort->node.pChildren = pChildren; + SNodeList* pMergeKeys = NULL; if (TSDB_CODE_SUCCESS == code) { + pPartSort->node.pChildren = pChildren; pPartSort->pSortKeys = pSortKeys; - code = createColumnByRewriteExps(pPartSort->pSortKeys, &pPartSort->node.pTargets); - } - if (TSDB_CODE_SUCCESS == code) { - pMergeSort->pSortKeys = nodesCloneList(pPartSort->node.pTargets); - if (NULL == pMergeSort->pSortKeys) { - code = TSDB_CODE_OUT_OF_MEMORY; - } + code = stbSplCreateMergeKeys(pPartSort->pSortKeys, pPartSort->node.pTargets, &pMergeKeys); } if (TSDB_CODE_SUCCESS == code) { - *pOutput = (SLogicNode*)pPartSort; + *pOutputPartSort = (SLogicNode*)pPartSort; + *pOutputMergeKeys = pMergeKeys; } else { nodesDestroyNode(pPartSort); + nodesDestroyList(pMergeKeys); } return code; @@ -462,17 +547,10 @@ static int32_t stbSplCreatePartSortNode(SSortLogicNode* pMergeSort, SLogicNode** static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { SLogicNode* pPartSort = NULL; - int32_t code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort); + SNodeList* pMergeKeys = NULL; + int32_t code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort, &pMergeKeys); if (TSDB_CODE_SUCCESS == code) { - SNodeList* pMergeKeys = nodesCloneList(((SSortLogicNode*)pInfo->pSplitNode)->pSortKeys); - if (NULL != pMergeKeys) { - code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pMergeKeys, pPartSort); - if (TSDB_CODE_SUCCESS != code) { - nodesDestroyList(pMergeKeys); - } - } else { - code = TSDB_CODE_OUT_OF_MEMORY; - } + code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort); } if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, diff --git a/source/libs/planner/test/planOrderByTest.cpp b/source/libs/planner/test/planOrderByTest.cpp index ef6f438b55..851eda81b5 100644 --- a/source/libs/planner/test/planOrderByTest.cpp +++ b/source/libs/planner/test/planOrderByTest.cpp @@ -46,4 +46,7 @@ TEST_F(PlanOrderByTest, stable) { // ORDER BY key is in the projection list run("SELECT c1 FROM st1 ORDER BY c1"); + + // ORDER BY key is not in the projection list + run("SELECT c2 FROM st1 ORDER BY c1"); }