From faf5424b188156452dc466883dc284aaa998eb74 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 22 Jun 2022 23:01:55 +0800 Subject: [PATCH] feat: add group sort operator --- include/libs/nodes/nodes.h | 1 + include/libs/nodes/plannodes.h | 2 ++ source/libs/executor/inc/executorimpl.h | 5 ++--- source/libs/executor/src/executorimpl.c | 2 ++ source/libs/executor/src/scanoperator.c | 2 +- source/libs/executor/src/sortoperator.c | 30 ++++++++++--------------- 6 files changed, 20 insertions(+), 22 deletions(-) diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 58e2393970..f460c7d8ed 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -225,6 +225,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, QUERY_NODE_PHYSICAL_PLAN_MERGE, QUERY_NODE_PHYSICAL_PLAN_SORT, + QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index b07e8f39d5..b98c6bfb15 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -417,6 +417,8 @@ typedef struct SSortPhysiNode { SNodeList* pTargets; } SSortPhysiNode; +typedef SSortPhysiNode SGroupSortPhysiNode; + typedef struct SPartitionPhysiNode { SPhysiNode node; SNodeList* pExprs; // these are expression list of partition_by_clause diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index eaf9a8439f..a809e3b089 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -837,9 +837,8 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond); -int32_t doCreateMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, - STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId, - uint64_t taskId); +SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, + SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo *pTableListInfo, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo, uint64_t queryId, uint64_t taskId); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c974d90fed..e62d239849 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4210,6 +4210,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) { + pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) { SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 13a6a8855c..922402a921 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2323,7 +2323,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList); if (!pInfo->hasGroupId) { pInfo->hasGroupId = true; - + if (tableListSize == 0) { doSetOperatorCompleted(pOperator); return NULL; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 0fabee5a1e..6086138301 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -424,10 +424,17 @@ int32_t getGroupSortExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, u return TSDB_CODE_SUCCESS; } -// TODO: -SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, +void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) { + SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)param; + pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); + + taosArrayDestroy(pInfo->pSortInfo); + taosArrayDestroy(pInfo->pColMatchInfo); +} + +SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo) { - SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); + SGroupSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupSortOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) { goto _error; @@ -452,8 +459,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SSortPhysi ; pInfo->pColMatchInfo = pColMatchColInfo; pOperator->name = "GroupSortOperator"; - // TODO - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT; pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; @@ -461,7 +467,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SSortPhysi pOperator->exprSupp.numOfExprs = numOfCols; pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyOrderOperatorInfo, NULL, + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyGroupSortOperatorInfo, NULL, NULL, getGroupSortExplainExecInfo); int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -478,18 +484,6 @@ _error: return NULL; } -void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) { - SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)param; - pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); - - taosArrayDestroy(pInfo->pSortInfo); - taosArrayDestroy(pInfo->pColMatchInfo); -} - -// TODO: sort group -// TODO: msortCompare compare group id in multiway merge sort. -// TODO: table merge scan, group first, then for each group, multiple readers - //===================================================================================== // Multiway Sort Merge operator typedef struct SMultiwaySortMergeOperatorInfo {