diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index bd33d7f3c7..17a06a40c1 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -774,7 +774,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols, SArray* pIndexMap, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, - SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo); + SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo, + SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d094dc0203..beb5e1152f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4524,8 +4524,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createResDataBlock(pDescNode); SArray* sortInfo = createSortInfo(pMergePhyNode->pMergeKeys); + int32_t numOfOutputCols = 0; + SArray* pColList = NULL; + //extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID); - pOptr = createMultiwaySortMergeOperatorInfo(ops, size, pResBlock, sortInfo, pTaskInfo); + + pOptr = createMultiwaySortMergeOperatorInfo(ops, size, pResBlock, sortInfo, pColList, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 0618c80682..c219acb687 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -224,30 +224,11 @@ typedef struct SMultiwaySortMergeOperatorInfo { SArray* pSortInfo; SSortHandle* pSortHandle; + SArray* pColMatchInfo; // for index map from table scan output int64_t startTs; // sort start time } SMultiwaySortMergeOperatorInfo; -SSDataBlock* getMultiwaySortMergedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity) { - blockDataCleanup(pDataBlock); - - blockDataEnsureCapacity(pDataBlock, capacity); - - while (1) { - STupleHandle* pTupleHandle = tsortNextTuple(pHandle); - if (pTupleHandle == NULL) { - break; - } - - appendOneRowToDataBlock(pDataBlock, pTupleHandle); - if (pDataBlock->info.rows >= capacity) { - return pDataBlock; - } - } - - return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; -} - int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { SMultiwaySortMergeOperatorInfo * pInfo = pOperator->info; @@ -261,7 +242,7 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, NULL, SORT_MULTISOURCE_MERGE, + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, NULL, pTaskInfo->id.str); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); @@ -298,7 +279,10 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) { longjmp(pTaskInfo->env, code); } - SSDataBlock* pBlock = getMultiwaySortMergedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity); + SSDataBlock* pBlock = getSortedBlockData(pInfo->pSortHandle, + pInfo->binfo.pRes, + pOperator->resultInfo.capacity, + pInfo->pColMatchInfo); if (pBlock != NULL) { pOperator->resultInfo.totalRows += pBlock->info.rows; @@ -328,7 +312,8 @@ int32_t getMultiwaySortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrEx } SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, - SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo) { + SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo, + SExecTaskInfo* pTaskInfo) { SMultiwaySortMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwaySortMergeOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); int32_t rowSize = pResBlock->info.rowSize; @@ -342,6 +327,7 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, initResultSizeInfo(pOperator, 1024); pInfo->pSortInfo = pSortInfo; + pInfo->pColMatchInfo= pColMatchColInfo; pOperator->name = "MultiwaySortMerge"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE; pOperator->blocking = true;