From 3fe2f16213339ddb24b29c7e71f9e523da06e574 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 2 Jun 2022 17:58:01 +0800 Subject: [PATCH 1/9] feat: group by distributed split --- source/libs/planner/src/planSpliter.c | 67 ++++++++++++++++++++++++++- 1 file changed, 65 insertions(+), 2 deletions(-) diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index e3c8b82e39..3aad68ae7d 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -151,8 +151,8 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) { static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { switch (nodeType(pNode)) { - // case QUERY_NODE_LOGIC_PLAN_AGG: - // return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(pNode); + case QUERY_NODE_LOGIC_PLAN_AGG: + return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_WINDOW: { SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode; if (WINDOW_TYPE_INTERVAL != pWindow->winType) { @@ -365,6 +365,66 @@ static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInf } } +static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) { + SNodeList* pFunc = pMergeAgg->pAggFuncs; + pMergeAgg->pAggFuncs = NULL; + SNodeList* pGroupKeys = pMergeAgg->pGroupKeys; + pMergeAgg->pGroupKeys = NULL; + SNodeList* pTargets = pMergeAgg->node.pTargets; + pMergeAgg->node.pTargets = NULL; + SNodeList* pChildren = pMergeAgg->node.pChildren; + pMergeAgg->node.pChildren = NULL; + + int32_t code = TSDB_CODE_SUCCESS; + SAggLogicNode* pPartAgg = nodesCloneNode(pMergeAgg); + if (NULL == pPartAgg) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + + if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) { + pPartAgg->pGroupKeys = pGroupKeys; + code = createColumnByRewriteExps(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets); + } + if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) { + pMergeAgg->pGroupKeys = nodesCloneList(pPartAgg->node.pTargets); + if (NULL == pMergeAgg->pGroupKeys) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + if (TSDB_CODE_SUCCESS == code) { + pMergeAgg->node.pTargets = pTargets; + pPartAgg->node.pChildren = pChildren; + + code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, &pMergeAgg->pAggFuncs); + } + if (TSDB_CODE_SUCCESS == code) { + code = createColumnByRewriteExps(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets); + } + + nodesDestroyList(pFunc); + if (TSDB_CODE_SUCCESS == code) { + *pOutput = (SLogicNode*)pPartAgg; + } else { + nodesDestroyNode(pPartAgg); + } + + return code; +} + +static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + SLogicNode* pPartAgg = NULL; + int32_t code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg); + if (TSDB_CODE_SUCCESS == code) { + code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, + splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT)); + } + pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; + return code; +} + static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE); if (TSDB_CODE_SUCCESS == code) { @@ -386,6 +446,9 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { int32_t code = TSDB_CODE_SUCCESS; switch (nodeType(info.pSplitNode)) { + case QUERY_NODE_LOGIC_PLAN_AGG: + code = stbSplSplitAggNode(pCxt, &info); + break; case QUERY_NODE_LOGIC_PLAN_WINDOW: code = stbSplSplitWindowNode(pCxt, &info); break; From e607f5a16e7f818040a73f6a0007aeb9e961f8c7 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 2 Jun 2022 21:01:24 +0800 Subject: [PATCH 2/9] fix: add htonl to numOfBlocks --- include/common/tmsg.h | 1 + source/common/src/tmsg.c | 1 + 2 files changed, 2 insertions(+) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 768f02d9ba..972181f77e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -241,6 +241,7 @@ typedef struct { int32_t schemaLen; // schema length, if length is 0, no schema exists int16_t numOfRows; // total number of rows in current submit block // head of SSubmitBlk + int32_t numOfBlocks; const void* pMsg; } SSubmitMsgIter; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 9c6c532bcd..e7ee164be7 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -35,6 +35,7 @@ int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { } pIter->totalLen = htonl(pMsg->length); + pIter->numOfBlocks = htonl(pMsg->numOfBlocks); ASSERT(pIter->totalLen > 0); pIter->len = 0; pIter->pMsg = pMsg; From ed87c39ff1d4df083a91aa5845ebef315aed13ca Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 3 Jun 2022 16:45:33 +0800 Subject: [PATCH 3/9] fix: a problem of parser async --- source/libs/parser/src/parAstParser.c | 18 ++++++++++ source/libs/parser/test/parInitialDTest.cpp | 38 +++++++++++++-------- 2 files changed, 42 insertions(+), 14 deletions(-) diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index 18c1341da8..7abe244261 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -199,6 +199,22 @@ static int32_t collectMetaKeyFromCreateMultiTable(SCollectMetaKeyCxt* pCxt, SCre return code; } +static int32_t collectMetaKeyFromDropTable(SCollectMetaKeyCxt* pCxt, SDropTableStmt* pStmt) { + int32_t code = TSDB_CODE_SUCCESS; + SNode* pNode = NULL; + FOREACH(pNode, pStmt->pTables) { + SDropTableClause* pClause = (SDropTableClause*)pNode; + code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, pCxt->pMetaCache); + if (TSDB_CODE_SUCCESS == code) { + code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, pCxt->pMetaCache); + } + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + return code; +} + static int32_t collectMetaKeyFromAlterTable(SCollectMetaKeyCxt* pCxt, SAlterTableStmt* pStmt) { int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache); if (TSDB_CODE_SUCCESS == code) { @@ -341,6 +357,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { return collectMetaKeyFromCreateTable(pCxt, (SCreateTableStmt*)pStmt); case QUERY_NODE_CREATE_MULTI_TABLE_STMT: return collectMetaKeyFromCreateMultiTable(pCxt, (SCreateMultiTableStmt*)pStmt); + case QUERY_NODE_DROP_TABLE_STMT: + return collectMetaKeyFromDropTable(pCxt, (SDropTableStmt*)pStmt); case QUERY_NODE_ALTER_TABLE_STMT: return collectMetaKeyFromAlterTable(pCxt, (SAlterTableStmt*)pStmt); case QUERY_NODE_USE_DATABASE_STMT: diff --git a/source/libs/parser/test/parInitialDTest.cpp b/source/libs/parser/test/parInitialDTest.cpp index 5ad427d964..1b1b930851 100644 --- a/source/libs/parser/test/parInitialDTest.cpp +++ b/source/libs/parser/test/parInitialDTest.cpp @@ -24,7 +24,7 @@ class ParserInitialDTest : public ParserDdlTest {}; // todo delete // todo desc // todo describe -// todo drop account +// todo DROP account TEST_F(ParserInitialDTest, dropBnode) { useDb("root", "test"); @@ -62,51 +62,61 @@ TEST_F(ParserInitialDTest, dropCGroup) { run("DROP CONSUMER GROUP IF EXISTS cg1 ON tp1"); } -// todo drop database -// todo drop dnode -// todo drop function +// todo DROP database +// todo DROP dnode +// todo DROP function TEST_F(ParserInitialDTest, dropIndex) { useDb("root", "test"); - run("drop index index1 on t1"); + run("DROP index index1 on t1"); } TEST_F(ParserInitialDTest, dropMnode) { useDb("root", "test"); - run("drop mnode on dnode 1"); + run("DROP mnode on dnode 1"); } TEST_F(ParserInitialDTest, dropQnode) { useDb("root", "test"); - run("drop qnode on dnode 1"); + run("DROP qnode on dnode 1"); } TEST_F(ParserInitialDTest, dropSnode) { useDb("root", "test"); - run("drop snode on dnode 1"); + run("DROP snode on dnode 1"); } -// todo drop stable -// todo drop stream -// todo drop table +TEST_F(ParserInitialDTest, dropSTable) { + useDb("root", "test"); + + run("DROP STABLE st1"); +} + +// todo DROP stream + +TEST_F(ParserInitialDTest, dropTable) { + useDb("root", "test"); + + run("DROP TABLE t1"); +} TEST_F(ParserInitialDTest, dropTopic) { useDb("root", "test"); - run("drop topic tp1"); + run("DROP topic tp1"); - run("drop topic if exists tp1"); + run("DROP topic if exists tp1"); } TEST_F(ParserInitialDTest, dropUser) { login("root"); useDb("root", "test"); - run("drop user wxy"); + run("DROP user wxy"); } } // namespace ParserTest From bc6326788a311110c3eedf610ef3c4cf310d670f Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 3 Jun 2022 16:57:52 +0800 Subject: [PATCH 4/9] feat: group by distributed split --- source/libs/planner/test/planGroupByTest.cpp | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/source/libs/planner/test/planGroupByTest.cpp b/source/libs/planner/test/planGroupByTest.cpp index cf51603470..201df2efde 100644 --- a/source/libs/planner/test/planGroupByTest.cpp +++ b/source/libs/planner/test/planGroupByTest.cpp @@ -67,3 +67,11 @@ TEST_F(PlanGroupByTest, selectFunc) { run("SELECT MAX(c1), c2 FROM t1 GROUP BY c3"); run("SELECT MAX(c1), t1.* FROM t1 GROUP BY c3"); } + +TEST_F(PlanGroupByTest, stable) { + useDb("root", "test"); + + run("SELECT COUNT(*) FROM st1"); + + run("SELECT COUNT(*) FROM st1 GROUP BY c1"); +} From 0b5cddeec6ae4dbd0b4e54c798ede5bfada1918b Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Fri, 3 Jun 2022 20:08:17 +0800 Subject: [PATCH 5/9] fix: use num of block from msg iter --- source/dnode/vnode/src/vnd/vnodeSvr.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 47eaf2d5a5..136ecd41d4 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -715,8 +715,8 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in goto _exit; } - submitRsp.pArray = taosArrayInit(pSubmitReq->numOfBlocks, sizeof(SSubmitBlkRsp)); - newTbUids = taosArrayInit(pSubmitReq->numOfBlocks, sizeof(int64_t)); + submitRsp.pArray = taosArrayInit(msgIter->numOfBlocks, sizeof(SSubmitBlkRsp)); + newTbUids = taosArrayInit(msgIter->numOfBlocks, sizeof(int64_t)); if (!submitRsp.pArray) { pRsp->code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; From 401610ab3c6f4560f32ea69fcb87ff88c3ebcaeb Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Fri, 3 Jun 2022 20:13:48 +0800 Subject: [PATCH 6/9] fix: use num of blocks from msg iter --- source/dnode/vnode/src/vnd/vnodeSvr.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 136ecd41d4..4b0016a746 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -715,8 +715,8 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in goto _exit; } - submitRsp.pArray = taosArrayInit(msgIter->numOfBlocks, sizeof(SSubmitBlkRsp)); - newTbUids = taosArrayInit(msgIter->numOfBlocks, sizeof(int64_t)); + submitRsp.pArray = taosArrayInit(msgIter.numOfBlocks, sizeof(SSubmitBlkRsp)); + newTbUids = taosArrayInit(msgIter.numOfBlocks, sizeof(int64_t)); if (!submitRsp.pArray) { pRsp->code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; From 0c94c65acb6e81c367f0d9364f803dbb274444e8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 3 Jun 2022 23:13:21 +0800 Subject: [PATCH 7/9] fix(query): add ref for job --- source/client/src/clientImpl.c | 2 +- source/libs/catalog/src/ctgAsync.c | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 774ef5f248..4c68edaff0 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1087,7 +1087,7 @@ void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUc tsem_wait(&pParam->sem); } - if (pRequest->code == TSDB_CODE_SUCCESS && setupOneRowPtr) { + if (pRequest->code == TSDB_CODE_SUCCESS && pResultInfo->numOfRows > 0 && setupOneRowPtr) { doSetOneRowPtr(pResultInfo); pResultInfo->current += 1; } diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 312f0c9250..a9e808e749 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -328,6 +328,18 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* CTG_ERR_JRET(ctgInitGetQnodeTask(pJob, taskIdx++)); } + pJob->refId = taosAddRef(gCtgMgmt.jobPool, pJob); + if (pJob->refId < 0) { + ctgError("add job to ref failed, error: %s", tstrerror(terrno)); + CTG_ERR_JRET(terrno); + } + + taosAcquireRef(gCtgMgmt.jobPool, pJob->refId); + + qDebug("QID:%" PRIx64 ", job %" PRIx64 " initialized, task num %d", pJob->queryId, pJob->refId, *taskNum); + return TSDB_CODE_SUCCESS; + + _return: taosMemoryFreeClear(*job); CTG_RET(code); From 7ede8db3c4addd25f001fb5df4d599e337e7b479 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 4 Jun 2022 09:33:13 +0800 Subject: [PATCH 8/9] fix: a problem of parser async --- source/libs/function/src/functionMgt.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 611ae8d81f..67efdc5c0b 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -226,6 +226,9 @@ static SColumnNode* createColumnByFunc(const SFunctionNode* pFunc) { } bool fmIsDistExecFunc(int32_t funcId) { + if (fmIsUserDefinedFunc(funcId)) { + return false; + } if (!fmIsVectorFunc(funcId)) { return true; } From 6e6c9bda1750b37b26375935fbeabe2272686815 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 4 Jun 2022 10:16:52 +0800 Subject: [PATCH 9/9] feat: order by distributed split --- source/libs/planner/src/planSpliter.c | 91 +++++++++++++++++--- source/libs/planner/test/planOrderByTest.cpp | 19 ++-- 2 files changed, 91 insertions(+), 19 deletions(-) diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 3aad68ae7d..a3d5086908 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -161,7 +161,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); } // case QUERY_NODE_LOGIC_PLAN_SORT: - // return stbSplHasMultiTbScan(pNode); + // return stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_SCAN: return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode); default: @@ -295,7 +295,8 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic return code; } -static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, SLogicNode* pPartChild) { +static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, SNodeList* pMergeKeys, + SLogicNode* pPartChild) { SMergeLogicNode* pMerge = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE); if (NULL == pMerge) { return TSDB_CODE_OUT_OF_MEMORY; @@ -304,25 +305,28 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, S pMerge->srcGroupId = pCxt->groupId; pMerge->node.pParent = pParent; pMerge->node.precision = pPartChild->precision; - int32_t code = nodesListMakeStrictAppend(&pMerge->pMergeKeys, nodesCloneNode(((SWindowLogicNode*)pParent)->pTspk)); - if (TSDB_CODE_SUCCESS == code) { - pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets); - if (NULL == pMerge->node.pTargets) { - code = TSDB_CODE_OUT_OF_MEMORY; - } - } - if (TSDB_CODE_SUCCESS == code) { - code = nodesListMakeAppend(&pParent->pChildren, pMerge); + pMerge->pMergeKeys = pMergeKeys; + pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets); + if (NULL == pMerge->node.pTargets) { + nodesDestroyNode(pMerge); + return TSDB_CODE_OUT_OF_MEMORY; } - return code; + return nodesListMakeAppend(&pParent->pChildren, pMerge); } static int32_t stbSplSplitWindowNodeForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) { SLogicNode* pPartWindow = NULL; int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow); if (TSDB_CODE_SUCCESS == code) { - code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pPartWindow); + SNodeList* pMergeKeys = NULL; + code = nodesListMakeStrictAppend(&pMergeKeys, nodesCloneNode(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk)); + if (TSDB_CODE_SUCCESS == code) { + code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pMergeKeys, pPartWindow); + } + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyList(pMergeKeys); + } } if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, @@ -425,6 +429,64 @@ static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) return code; } +static int32_t stbSplCreatePartSortNode(SSortLogicNode* pMergeSort, SLogicNode** pOutput) { + SNodeList* pSortKeys = pMergeSort->pSortKeys; + pMergeSort->pSortKeys = NULL; + SNodeList* pTargets = pMergeSort->node.pTargets; + pMergeSort->node.pTargets = NULL; + SNodeList* pChildren = pMergeSort->node.pChildren; + pMergeSort->node.pChildren = NULL; + + int32_t code = TSDB_CODE_SUCCESS; + SSortLogicNode* pPartSort = nodesCloneNode(pMergeSort); + if (NULL == pPartSort) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + + pMergeSort->node.pTargets = pTargets; + pPartSort->node.pChildren = pChildren; + if (TSDB_CODE_SUCCESS == code) { + pPartSort->pSortKeys = pSortKeys; + code = createColumnByRewriteExps(pPartSort->pSortKeys, &pPartSort->node.pTargets); + } + if (TSDB_CODE_SUCCESS == code) { + pMergeSort->pSortKeys = nodesCloneList(pPartSort->node.pTargets); + if (NULL == pMergeSort->pSortKeys) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + + if (TSDB_CODE_SUCCESS == code) { + *pOutput = (SLogicNode*)pPartSort; + } else { + nodesDestroyNode(pPartSort); + } + + return code; +} + +static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + SLogicNode* pPartSort = NULL; + int32_t code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort); + if (TSDB_CODE_SUCCESS == code) { + SNodeList* pMergeKeys = nodesCloneList(((SSortLogicNode*)pInfo->pSplitNode)->pSortKeys); + if (NULL != pMergeKeys) { + code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pMergeKeys, pPartSort); + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyList(pMergeKeys); + } + } else { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, + splCreateScanSubplan(pCxt, pPartSort, SPLIT_FLAG_STABLE_SPLIT)); + } + pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; + return code; +} + static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE); if (TSDB_CODE_SUCCESS == code) { @@ -452,6 +514,9 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { case QUERY_NODE_LOGIC_PLAN_WINDOW: code = stbSplSplitWindowNode(pCxt, &info); break; + case QUERY_NODE_LOGIC_PLAN_SORT: + code = stbSplSplitSortNode(pCxt, &info); + break; case QUERY_NODE_LOGIC_PLAN_SCAN: code = stbSplSplitScanNode(pCxt, &info); break; diff --git a/source/libs/planner/test/planOrderByTest.cpp b/source/libs/planner/test/planOrderByTest.cpp index d95d1bdf1d..ef6f438b55 100644 --- a/source/libs/planner/test/planOrderByTest.cpp +++ b/source/libs/planner/test/planOrderByTest.cpp @@ -23,20 +23,27 @@ class PlanOrderByTest : public PlannerTestBase {}; TEST_F(PlanOrderByTest, basic) { useDb("root", "test"); - // order by key is in the projection list - run("select c1 from t1 order by c1"); - // order by key is not in the projection list - run("select c1 from t1 order by c2"); + // ORDER BY key is in the projection list + run("SELECT c1 FROM t1 ORDER BY c1"); + // ORDER BY key is not in the projection list + run("SELECT c1 FROM t1 ORDER BY c2"); } TEST_F(PlanOrderByTest, expr) { useDb("root", "test"); - run("select * from t1 order by c1 + 10, c2"); + run("SELECT * FROM t1 ORDER BY c1 + 10, c2"); } TEST_F(PlanOrderByTest, nullsOrder) { useDb("root", "test"); - run("select * from t1 order by c1 desc nulls first"); + run("SELECT * FROM t1 ORDER BY c1 DESC NULLS FIRST"); +} + +TEST_F(PlanOrderByTest, stable) { + useDb("root", "test"); + + // ORDER BY key is in the projection list + run("SELECT c1 FROM st1 ORDER BY c1"); }