feat: multiway merge handles group+mergekeys, group and only merge keys
This commit is contained in:
parent
87b8131409
commit
11f8d302fa
|
@ -730,10 +730,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pNode, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, SSDataBlock* pInputBlock,
|
||||
SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
|
|
|
@ -159,7 +159,12 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
|
|||
}
|
||||
|
||||
SArray* createSortInfo(SNodeList* pNodeList) {
|
||||
size_t numOfCols = LIST_LENGTH(pNodeList);
|
||||
size_t numOfCols = 0;
|
||||
if (pNodeList != NULL) {
|
||||
numOfCols = LIST_LENGTH(pNodeList);
|
||||
} else {
|
||||
numOfCols = 0;
|
||||
}
|
||||
SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo));
|
||||
if (pList == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
|
|
@ -4269,17 +4269,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
|
||||
SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
|
||||
|
||||
SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc;
|
||||
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||
|
||||
SArray* sortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
|
||||
int32_t numOfOutputCols = 0;
|
||||
SArray* pColList =
|
||||
extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
||||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
|
||||
SSDataBlock* pInputDataBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc);
|
||||
pOptr = createMultiwaySortMergeOperatorInfo(ops, size, pInputDataBlock, pResBlock, sortInfo, pColList, pTaskInfo);
|
||||
pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
|
||||
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
||||
|
||||
|
|
|
@ -475,8 +475,8 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
|
|||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyGroupSortOperatorInfo, NULL,
|
||||
NULL, getGroupSortExplainExecInfo);
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyGroupSortOperatorInfo,
|
||||
NULL, NULL, getGroupSortExplainExecInfo);
|
||||
|
||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -494,7 +494,7 @@ _error:
|
|||
|
||||
//=====================================================================================
|
||||
// Multiway Sort Merge operator
|
||||
typedef struct SMultiwaySortMergeOperatorInfo {
|
||||
typedef struct SMultiwayMergeOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
|
||||
int32_t bufPageSize;
|
||||
|
@ -506,13 +506,14 @@ typedef struct SMultiwaySortMergeOperatorInfo {
|
|||
|
||||
SSDataBlock* pInputBlock;
|
||||
int64_t startTs; // sort start time
|
||||
bool groupSort;
|
||||
bool hasGroupId;
|
||||
uint64_t groupId;
|
||||
STupleHandle* prefetchedTuple;
|
||||
} SMultiwaySortMergeOperatorInfo;
|
||||
} SMultiwayMergeOperatorInfo;
|
||||
|
||||
int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
|
||||
SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info;
|
||||
int32_t doOpenMultiwayMergeOperator(SOperatorInfo* pOperator) {
|
||||
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
if (OPTR_IS_OPENED(pOperator)) {
|
||||
|
@ -527,7 +528,8 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
|
|||
pInfo->pInputBlock, pTaskInfo->id.str);
|
||||
|
||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
|
||||
tsortSetCompareGroupId(pInfo->pSortHandle, true);
|
||||
|
||||
tsortSetCompareGroupId(pInfo->pSortHandle, pInfo->groupSort);
|
||||
|
||||
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
|
||||
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||
|
@ -550,7 +552,7 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
|
|||
|
||||
SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
|
||||
SArray* pColMatchInfo, SOperatorInfo* pOperator) {
|
||||
SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info;
|
||||
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
blockDataCleanup(pDataBlock);
|
||||
|
@ -564,6 +566,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
|||
|
||||
while (1) {
|
||||
STupleHandle* pTupleHandle = NULL;
|
||||
if (pInfo->groupSort) {
|
||||
if (pInfo->prefetchedTuple == NULL) {
|
||||
pTupleHandle = tsortNextTuple(pHandle);
|
||||
} else {
|
||||
|
@ -571,11 +574,18 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
|||
pInfo->groupId = tsortGetGroupId(pTupleHandle);
|
||||
pInfo->prefetchedTuple = NULL;
|
||||
}
|
||||
}
|
||||
else {
|
||||
pTupleHandle = tsortNextTuple(pHandle);
|
||||
pInfo->groupId = 0;
|
||||
}
|
||||
|
||||
if (pTupleHandle == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (pInfo->groupSort)
|
||||
{
|
||||
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
|
||||
if (!pInfo->hasGroupId) {
|
||||
pInfo->groupId = tupleGroupId;
|
||||
|
@ -587,6 +597,9 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
|||
pInfo->prefetchedTuple = pTupleHandle;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
appendOneRowToDataBlock(p, pTupleHandle);
|
||||
}
|
||||
if (p->info.rows >= capacity) {
|
||||
break;
|
||||
}
|
||||
|
@ -614,13 +627,13 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
|||
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
|
||||
}
|
||||
|
||||
SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
|
||||
SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) {
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info;
|
||||
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
int32_t code = pOperator->fpSet._openFn(pOperator);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -637,8 +650,8 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
|
|||
return pBlock;
|
||||
}
|
||||
|
||||
void destroyMultiwaySortMergeOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SMultiwaySortMergeOperatorInfo* pInfo = (SMultiwaySortMergeOperatorInfo*)param;
|
||||
void destroyMultiwayMergeOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)param;
|
||||
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
|
||||
pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock);
|
||||
|
||||
|
@ -646,11 +659,11 @@ void destroyMultiwaySortMergeOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
taosArrayDestroy(pInfo->pColMatchInfo);
|
||||
}
|
||||
|
||||
int32_t getMultiwaySortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
|
||||
int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
|
||||
ASSERT(pOptr != NULL);
|
||||
SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
|
||||
|
||||
SMultiwaySortMergeOperatorInfo* pOperatorInfo = (SMultiwaySortMergeOperatorInfo*)pOptr->info;
|
||||
SMultiwayMergeOperatorInfo* pOperatorInfo = (SMultiwayMergeOperatorInfo*)pOptr->info;
|
||||
|
||||
*pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle);
|
||||
*pOptrExplain = pInfo;
|
||||
|
@ -658,24 +671,35 @@ int32_t getMultiwaySortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrEx
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams,
|
||||
SSDataBlock* pInputBlock, SSDataBlock* pResBlock, SArray* pSortInfo,
|
||||
SArray* pColMatchColInfo, SExecTaskInfo* pTaskInfo) {
|
||||
SMultiwaySortMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwaySortMergeOperatorInfo));
|
||||
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numStreams,
|
||||
SMergePhysiNode* pMergePhyNode, SExecTaskInfo* pTaskInfo) {
|
||||
SPhysiNode* pPhyNode = (SPhysiNode*)pMergePhyNode;
|
||||
|
||||
SMultiwayMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwayMergeOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc;
|
||||
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||
|
||||
int32_t rowSize = pResBlock->info.rowSize;
|
||||
|
||||
if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
SArray* pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
|
||||
int32_t numOfOutputCols = 0;
|
||||
SArray* pColMatchColInfo =
|
||||
extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
||||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
|
||||
SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc);
|
||||
initResultSizeInfo(pOperator, 1024);
|
||||
|
||||
pInfo->groupSort = pMergePhyNode->groupSort;
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->pSortInfo = pSortInfo;
|
||||
pInfo->pColMatchInfo = pColMatchColInfo;
|
||||
pInfo->pInputBlock = pInputBlock;
|
||||
pOperator->name = "MultiwaySortMerge";
|
||||
pOperator->name = "MultiwayMerge";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
|
@ -687,9 +711,8 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
|
|||
// one additional is reserved for merged result.
|
||||
pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1);
|
||||
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL,
|
||||
destroyMultiwaySortMergeOperatorInfo, NULL, NULL, getMultiwaySortMergeExplainExecInfo);
|
||||
pOperator->fpSet = createOperatorFpSet(doOpenMultiwayMergeOperator, doMultiwayMerge, NULL, NULL,
|
||||
destroyMultiwayMergeOperatorInfo, NULL, NULL, getMultiwayMergeExplainExecInfo);
|
||||
|
||||
int32_t code = appendDownstream(pOperator, downStreams, numStreams);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
|
Loading…
Reference in New Issue