diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 26a100bb1b..5f89d1f665 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -226,6 +226,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, QUERY_NODE_PHYSICAL_PLAN_MERGE, QUERY_NODE_PHYSICAL_PLAN_SORT, + QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index cb09bf6a5f..6ea295434d 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -420,6 +420,8 @@ typedef struct SSortPhysiNode { SNodeList* pTargets; } SSortPhysiNode; +typedef SSortPhysiNode SGroupSortPhysiNode; + typedef struct SPartitionPhysiNode { SPhysiNode node; SNodeList* pExprs; // these are expression list of partition_by_clause diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 053e35c6c9..69500313da 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -841,9 +841,8 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond); -int32_t doCreateMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, - STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId, - uint64_t taskId); +SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, + SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo *pTableListInfo, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo, uint64_t queryId, uint64_t taskId); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index afd57e83a1..8e153c3a9b 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4225,6 +4225,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) { + pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) { SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 055a7caba3..f013eed9e4 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2330,7 +2330,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList); if (!pInfo->hasGroupId) { pInfo->hasGroupId = true; - + if (tableListSize == 0) { doSetOperatorCompleted(pOperator); return NULL; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 97be4645a8..4b5ad7b123 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -424,10 +424,17 @@ int32_t getGroupSortExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, u return TSDB_CODE_SUCCESS; } -// TODO: -SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, +void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) { + SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)param; + pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); + + taosArrayDestroy(pInfo->pSortInfo); + taosArrayDestroy(pInfo->pColMatchInfo); +} + +SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo) { - SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); + SGroupSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupSortOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) { goto _error; @@ -452,8 +459,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SSortPhysi ; pInfo->pColMatchInfo = pColMatchColInfo; pOperator->name = "GroupSortOperator"; - // TODO - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT; pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; @@ -461,7 +467,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SSortPhysi pOperator->exprSupp.numOfExprs = numOfCols; pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyOrderOperatorInfo, NULL, + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyGroupSortOperatorInfo, NULL, NULL, getGroupSortExplainExecInfo); int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -478,18 +484,6 @@ _error: return NULL; } -void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) { - SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)param; - pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); - - taosArrayDestroy(pInfo->pSortInfo); - taosArrayDestroy(pInfo->pColMatchInfo); -} - -// TODO: sort group -// TODO: msortCompare compare group id in multiway merge sort. -// TODO: table merge scan, group first, then for each group, multiple readers - //===================================================================================== // Multiway Sort Merge operator typedef struct SMultiwaySortMergeOperatorInfo {