From faebe29ab7240b952f5f2885ab681ac30a859640 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 22 Jun 2022 09:42:04 +0800 Subject: [PATCH] feat: group sort read child input during tsortOpen --- source/libs/executor/src/sortoperator.c | 203 ++++++++++++------------ 1 file changed, 104 insertions(+), 99 deletions(-) diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 1880d169bf..eebe5f2d8a 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -22,10 +22,11 @@ static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); -SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, + SExecTaskInfo* pTaskInfo) { SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL/* || rowSize > 100 * 1024 * 1024*/) { + if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) { goto _error; } @@ -44,16 +45,17 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* initResultSizeInfo(pOperator, 1024); - pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);; - pInfo->pColMatchInfo = pColMatchColInfo; - pOperator->name = "SortOperator"; + pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys); + ; + pInfo->pColMatchInfo = pColMatchColInfo; + pOperator->name = "SortOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->exprSupp.pExprInfo = pExprInfo; + pOperator->exprSupp.numOfExprs = numOfCols; + pOperator->pTaskInfo = pTaskInfo; // lazy evaluation for the following parameter since the input datablock is not known till now. // pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; @@ -146,8 +148,8 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) { SOperatorInfo* pOperator = param; SSortOperatorInfo* pSort = pOperator->info; if (pOperator->exprSupp.pExprInfo != NULL) { - int32_t code = - projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL); + int32_t code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx, + pOperator->exprSupp.numOfExprs, NULL); if (code != TSDB_CODE_SUCCESS) { longjmp(pOperator->pTaskInfo->env, code); } @@ -165,8 +167,7 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) { pInfo->startTs = taosGetTimestampUs(); // pInfo->binfo.pRes is not equalled to the input datablock. - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, - NULL, pTaskInfo->id.str); + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator); @@ -234,28 +235,27 @@ int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* //===================================================================================== // Group Sort Operator -typedef enum EChildOperatorStatus {CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED} EChildOperatorStatus; +typedef enum EChildOperatorStatus { CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED } EChildOperatorStatus; typedef struct SGroupSortOperatorInfo { SOptrBasicInfo binfo; - uint32_t sortBufSize; // max buffer size for in-memory sort - SArray* pSortInfo; - SArray* pColMatchInfo; // for index map from table scan output - int32_t bufPageSize; + SArray* pSortInfo; + SArray* pColMatchInfo; - int64_t startTs; // sort start time - uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. - SSDataBlock* prefetchedDatablock; - bool hasGroupId; - uint64_t currGroupId; - SSortHandle* pCurrSortHandle; + int64_t startTs; + uint64_t sortElapsed; + bool hasGroupId; + uint64_t currGroupId; + SSDataBlock* prefetchedSortInput; + SSortHandle* pCurrSortHandle; EChildOperatorStatus childOpStatus; + + SSortExecInfo sortExecInfo; } SGroupSortOperatorInfo; - -SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, - SGroupSortOperatorInfo* pInfo) { +SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, + SArray* pColMatchInfo, SGroupSortOperatorInfo* pInfo) { blockDataCleanup(pDataBlock); SSDataBlock* p = tsortGetSortedDataBlock(pHandle); @@ -296,30 +296,28 @@ SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; } - typedef struct SGroupSortSourceParam { - SOperatorInfo* childOpInfo; + SOperatorInfo* childOpInfo; SGroupSortOperatorInfo* grpSortOpInfo; } SGroupSortSourceParam; - SSDataBlock* fetchNextGroupSortDataBlock(void* param) { - SGroupSortSourceParam* source = param; + SGroupSortSourceParam* source = param; SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo; - if (grpSortOpInfo->prefetchedDatablock) { - SSDataBlock* pBlock = grpSortOpInfo->prefetchedDatablock; - grpSortOpInfo->prefetchedDatablock = NULL; - return pBlock; + if (grpSortOpInfo->prefetchedSortInput) { + SSDataBlock* block = grpSortOpInfo->prefetchedSortInput; + grpSortOpInfo->prefetchedSortInput = NULL; + return block; } else { SOperatorInfo* childOp = source->childOpInfo; - SSDataBlock* block = childOp->fpSet.getNextFn(childOp); + SSDataBlock* block = childOp->fpSet.getNextFn(childOp); if (block != NULL) { if (block->info.groupId == grpSortOpInfo->currGroupId) { grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP; return block; } else { grpSortOpInfo->childOpStatus = CHILD_OP_NEW_GROUP; - grpSortOpInfo->prefetchedDatablock = block; + grpSortOpInfo->prefetchedSortInput = block; return NULL; } } else { @@ -329,19 +327,17 @@ SSDataBlock* fetchNextGroupSortDataBlock(void* param) { } } - int32_t beginSortGroup(SOperatorInfo* pOperator) { SGroupSortOperatorInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; // pInfo->binfo.pRes is not equalled to the input datablock. - pInfo->pCurrSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, - NULL, pTaskInfo->id.str); + pInfo->pCurrSortHandle = + tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str); tsortSetFetchRawDataFp(pInfo->pCurrSortHandle, fetchNextGroupSortDataBlock, applyScalarFunction, pOperator); - SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); + SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam)); param->childOpInfo = pOperator->pDownstream[0]; param->grpSortOpInfo = pInfo; @@ -360,12 +356,18 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) { int32_t finishSortGroup(SOperatorInfo* pOperator) { SGroupSortOperatorInfo* pInfo = pOperator->info; + + SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pCurrSortHandle); + pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod; + pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer; + pInfo->sortExecInfo.loops += sortExecInfo.loops; + pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes; + pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes; if (pInfo->pCurrSortHandle != NULL) { tsortDestroySortHandle(pInfo->pCurrSortHandle); } pInfo->pCurrSortHandle = NULL; return TSDB_CODE_SUCCESS; - } SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { @@ -373,7 +375,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { return NULL; } - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SGroupSortOperatorInfo* pInfo = pOperator->info; int32_t code = pOperator->fpSet._openFn(pOperator); @@ -384,46 +386,50 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { if (!pInfo->hasGroupId) { pInfo->hasGroupId = true; - pInfo->prefetchedDatablock = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0]); - pInfo->currGroupId = pInfo->prefetchedDatablock->info.groupId; + pInfo->prefetchedSortInput = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0]); + pInfo->currGroupId = pInfo->prefetchedSortInput->info.groupId; pInfo->childOpStatus = CHILD_OP_NEW_GROUP; beginSortGroup(pOperator); } - SSDataBlock* pBlock = NULL; - while (pInfo->childOpStatus != CHILD_OP_FINISHED) { - pBlock = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, - pInfo->pColMatchInfo, pInfo); + SSDataBlock* pBlock = NULL; + while (pInfo->pCurrSortHandle != NULL) { + // beginSortGroup would fetch all child blocks of pInfo->currGroupId; + ASSERT(pInfo->childOpStatus != CHILD_OP_SAME_GROUP); + pBlock = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, + pInfo->pColMatchInfo, pInfo); if (pBlock != NULL) { pBlock->info.groupId = pInfo->currGroupId; pOperator->resultInfo.totalRows += pBlock->info.rows; + return pBlock; + } else { + if (pInfo->childOpStatus == CHILD_OP_NEW_GROUP) { + finishSortGroup(pOperator); + pInfo->currGroupId = pInfo->prefetchedSortInput->info.groupId; + beginSortGroup(pOperator); + } else if (pInfo->childOpStatus == CHILD_OP_FINISHED) { + finishSortGroup(pOperator); + doSetOperatorCompleted(pOperator); + return NULL; + } } - if (pInfo->childOpStatus == CHILD_OP_NEW_GROUP) { - finishSortGroup(pOperator); - pInfo->currGroupId = pInfo->prefetchedDatablock->info.groupId; - beginSortGroup(pOperator); - } - return pBlock; } - - if (pInfo->childOpStatus == CHILD_OP_FINISHED) { - finishSortGroup(pOperator); - doSetOperatorCompleted(pOperator); - } - - return pBlock; + return NULL; } int32_t getGroupSortExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { - //TODO: accumulate all sort handles; + SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)pOptr->info; + *pOptrExplain = &pInfo->sortExecInfo; + *len = sizeof(SSortExecInfo); return TSDB_CODE_SUCCESS; } -//TODO: -SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo) { +// TODO: +SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, + SExecTaskInfo* pTaskInfo) { SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL/* || rowSize > 100 * 1024 * 1024*/) { + if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) { goto _error; } @@ -442,20 +448,21 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SSortPhysi initResultSizeInfo(pOperator, 1024); - pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);; - pInfo->pColMatchInfo = pColMatchColInfo; - pOperator->name = "GroupSortOperator"; - //TODO + pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys); + ; + pInfo->pColMatchInfo = pColMatchColInfo; + pOperator->name = "GroupSortOperator"; + // TODO pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->exprSupp.pExprInfo = pExprInfo; + pOperator->exprSupp.numOfExprs = numOfCols; + pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, - getGroupSortExplainExecInfo); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyOrderOperatorInfo, NULL, + NULL, getGroupSortExplainExecInfo); int32_t code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -479,10 +486,9 @@ void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) { taosArrayDestroy(pInfo->pColMatchInfo); } - -//TODO: sort group -//TODO: msortCompare compare group id in multiway merge sort. -//TODO: table merge scan, group first, then for each group, multiple readers +// TODO: sort group +// TODO: msortCompare compare group id in multiway merge sort. +// TODO: table merge scan, group first, then for each group, multiple readers //===================================================================================== // Multiway Sort Merge operator @@ -513,8 +519,8 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, - pInfo->bufPageSize, numOfBufPage, pInfo->pInputBlock, pTaskInfo->id.str); + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, + pInfo->pInputBlock, pTaskInfo->id.str); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); @@ -540,7 +546,7 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, SOperatorInfo* pOperator) { SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; blockDataCleanup(pDataBlock); @@ -641,24 +647,23 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, goto _error; } - initResultSizeInfo(pOperator, 1024); - pInfo->binfo.pRes = pResBlock; - pInfo->pSortInfo = pSortInfo; + pInfo->binfo.pRes = pResBlock; + pInfo->pSortInfo = pSortInfo; pInfo->pColMatchInfo = pColMatchColInfo; - pInfo->pInputBlock = pInputBlock; - pOperator->name = "MultiwaySortMerge"; + pInfo->pInputBlock = pInputBlock; + pOperator->name = "MultiwaySortMerge"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pInfo->bufPageSize = getProperSortPageSize(rowSize); + pInfo->bufPageSize = getProperSortPageSize(rowSize); // one additional is reserved for merged result. - pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); + pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); pOperator->fpSet = createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL,