From 805ce63fdd049dc60de02c64ff393ed2e74ef3c4 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 21 Jun 2022 18:39:06 +0800 Subject: [PATCH 1/3] feat: add sort group operator --- source/libs/executor/src/sortoperator.c | 254 ++++++++++++++++++++++++ 1 file changed, 254 insertions(+) diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 9d13276e6d..1880d169bf 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -232,6 +232,260 @@ int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* return TSDB_CODE_SUCCESS; } +//===================================================================================== +// Group Sort Operator +typedef enum EChildOperatorStatus {CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED} EChildOperatorStatus; + +typedef struct SGroupSortOperatorInfo { + SOptrBasicInfo binfo; + uint32_t sortBufSize; // max buffer size for in-memory sort + SArray* pSortInfo; + SArray* pColMatchInfo; // for index map from table scan output + int32_t bufPageSize; + + int64_t startTs; // sort start time + uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. + SSDataBlock* prefetchedDatablock; + bool hasGroupId; + uint64_t currGroupId; + SSortHandle* pCurrSortHandle; + + EChildOperatorStatus childOpStatus; +} SGroupSortOperatorInfo; + + +SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, + SGroupSortOperatorInfo* pInfo) { + blockDataCleanup(pDataBlock); + + SSDataBlock* p = tsortGetSortedDataBlock(pHandle); + if (p == NULL) { + return NULL; + } + + blockDataEnsureCapacity(p, capacity); + + while (1) { + STupleHandle* pTupleHandle = tsortNextTuple(pHandle); + if (pTupleHandle == NULL) { + break; + } + + appendOneRowToDataBlock(p, pTupleHandle); + if (p->info.rows >= capacity) { + break; + } + } + + if (p->info.rows > 0) { + int32_t numOfCols = taosArrayGetSize(pColMatchInfo); + for (int32_t i = 0; i < numOfCols; ++i) { + SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i); + ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); + + SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); + SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId); + colDataAssign(pDst, pSrc, p->info.rows); + } + + pDataBlock->info.rows = p->info.rows; + pDataBlock->info.capacity = p->info.rows; + } + + blockDataDestroy(p); + return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; +} + + +typedef struct SGroupSortSourceParam { + SOperatorInfo* childOpInfo; + SGroupSortOperatorInfo* grpSortOpInfo; +} SGroupSortSourceParam; + + +SSDataBlock* fetchNextGroupSortDataBlock(void* param) { + SGroupSortSourceParam* source = param; + SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo; + if (grpSortOpInfo->prefetchedDatablock) { + SSDataBlock* pBlock = grpSortOpInfo->prefetchedDatablock; + grpSortOpInfo->prefetchedDatablock = NULL; + return pBlock; + } else { + SOperatorInfo* childOp = source->childOpInfo; + SSDataBlock* block = childOp->fpSet.getNextFn(childOp); + if (block != NULL) { + if (block->info.groupId == grpSortOpInfo->currGroupId) { + grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP; + return block; + } else { + grpSortOpInfo->childOpStatus = CHILD_OP_NEW_GROUP; + grpSortOpInfo->prefetchedDatablock = block; + return NULL; + } + } else { + grpSortOpInfo->childOpStatus = CHILD_OP_FINISHED; + return NULL; + } + } +} + + +int32_t beginSortGroup(SOperatorInfo* pOperator) { + SGroupSortOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + + // pInfo->binfo.pRes is not equalled to the input datablock. + pInfo->pCurrSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, + NULL, pTaskInfo->id.str); + + tsortSetFetchRawDataFp(pInfo->pCurrSortHandle, fetchNextGroupSortDataBlock, applyScalarFunction, pOperator); + + SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); + SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam)); + param->childOpInfo = pOperator->pDownstream[0]; + param->grpSortOpInfo = pInfo; + ps->param = param; + tsortAddSource(pInfo->pCurrSortHandle, ps); + + int32_t code = tsortOpen(pInfo->pCurrSortHandle); + taosMemoryFreeClear(ps); + + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, terrno); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t finishSortGroup(SOperatorInfo* pOperator) { + SGroupSortOperatorInfo* pInfo = pOperator->info; + if (pInfo->pCurrSortHandle != NULL) { + tsortDestroySortHandle(pInfo->pCurrSortHandle); + } + pInfo->pCurrSortHandle = NULL; + return TSDB_CODE_SUCCESS; + +} + +SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SGroupSortOperatorInfo* pInfo = pOperator->info; + + int32_t code = pOperator->fpSet._openFn(pOperator); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } + + if (!pInfo->hasGroupId) { + pInfo->hasGroupId = true; + + pInfo->prefetchedDatablock = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0]); + pInfo->currGroupId = pInfo->prefetchedDatablock->info.groupId; + pInfo->childOpStatus = CHILD_OP_NEW_GROUP; + beginSortGroup(pOperator); + } + SSDataBlock* pBlock = NULL; + while (pInfo->childOpStatus != CHILD_OP_FINISHED) { + pBlock = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, + pInfo->pColMatchInfo, pInfo); + + if (pBlock != NULL) { + pBlock->info.groupId = pInfo->currGroupId; + pOperator->resultInfo.totalRows += pBlock->info.rows; + } + if (pInfo->childOpStatus == CHILD_OP_NEW_GROUP) { + finishSortGroup(pOperator); + pInfo->currGroupId = pInfo->prefetchedDatablock->info.groupId; + beginSortGroup(pOperator); + } + return pBlock; + } + + if (pInfo->childOpStatus == CHILD_OP_FINISHED) { + finishSortGroup(pOperator); + doSetOperatorCompleted(pOperator); + } + + return pBlock; +} + +int32_t getGroupSortExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { + //TODO: accumulate all sort handles; + return TSDB_CODE_SUCCESS; +} + +//TODO: +SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo) { + SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL/* || rowSize > 100 * 1024 * 1024*/) { + goto _error; + } + + SDataBlockDescNode* pDescNode = pSortPhyNode->node.pOutputDataBlockDesc; + + int32_t numOfCols = 0; + SSDataBlock* pResBlock = createResDataBlock(pDescNode); + SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols); + + int32_t numOfOutputCols = 0; + SArray* pColMatchColInfo = + extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); + + pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); + pInfo->binfo.pRes = pResBlock; + + initResultSizeInfo(pOperator, 1024); + + pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);; + pInfo->pColMatchInfo = pColMatchColInfo; + pOperator->name = "GroupSortOperator"; + //TODO + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->exprSupp.pExprInfo = pExprInfo; + pOperator->exprSupp.numOfExprs = numOfCols; + pOperator->pTaskInfo = pTaskInfo; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, + getGroupSortExplainExecInfo); + + int32_t code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + return pOperator; + +_error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); + return NULL; +} + +void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) { + SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)param; + pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); + + taosArrayDestroy(pInfo->pSortInfo); + taosArrayDestroy(pInfo->pColMatchInfo); +} + + +//TODO: sort group +//TODO: msortCompare compare group id in multiway merge sort. +//TODO: table merge scan, group first, then for each group, multiple readers + +//===================================================================================== +// Multiway Sort Merge operator typedef struct SMultiwaySortMergeOperatorInfo { SOptrBasicInfo binfo; From faebe29ab7240b952f5f2885ab681ac30a859640 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 22 Jun 2022 09:42:04 +0800 Subject: [PATCH 2/3] feat: group sort read child input during tsortOpen --- source/libs/executor/src/sortoperator.c | 203 ++++++++++++------------ 1 file changed, 104 insertions(+), 99 deletions(-) diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 1880d169bf..eebe5f2d8a 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -22,10 +22,11 @@ static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); -SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, + SExecTaskInfo* pTaskInfo) { SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL/* || rowSize > 100 * 1024 * 1024*/) { + if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) { goto _error; } @@ -44,16 +45,17 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* initResultSizeInfo(pOperator, 1024); - pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);; - pInfo->pColMatchInfo = pColMatchColInfo; - pOperator->name = "SortOperator"; + pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys); + ; + pInfo->pColMatchInfo = pColMatchColInfo; + pOperator->name = "SortOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->exprSupp.pExprInfo = pExprInfo; + pOperator->exprSupp.numOfExprs = numOfCols; + pOperator->pTaskInfo = pTaskInfo; // lazy evaluation for the following parameter since the input datablock is not known till now. // pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; @@ -146,8 +148,8 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) { SOperatorInfo* pOperator = param; SSortOperatorInfo* pSort = pOperator->info; if (pOperator->exprSupp.pExprInfo != NULL) { - int32_t code = - projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL); + int32_t code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx, + pOperator->exprSupp.numOfExprs, NULL); if (code != TSDB_CODE_SUCCESS) { longjmp(pOperator->pTaskInfo->env, code); } @@ -165,8 +167,7 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) { pInfo->startTs = taosGetTimestampUs(); // pInfo->binfo.pRes is not equalled to the input datablock. - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, - NULL, pTaskInfo->id.str); + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator); @@ -234,28 +235,27 @@ int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* //===================================================================================== // Group Sort Operator -typedef enum EChildOperatorStatus {CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED} EChildOperatorStatus; +typedef enum EChildOperatorStatus { CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED } EChildOperatorStatus; typedef struct SGroupSortOperatorInfo { SOptrBasicInfo binfo; - uint32_t sortBufSize; // max buffer size for in-memory sort - SArray* pSortInfo; - SArray* pColMatchInfo; // for index map from table scan output - int32_t bufPageSize; + SArray* pSortInfo; + SArray* pColMatchInfo; - int64_t startTs; // sort start time - uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. - SSDataBlock* prefetchedDatablock; - bool hasGroupId; - uint64_t currGroupId; - SSortHandle* pCurrSortHandle; + int64_t startTs; + uint64_t sortElapsed; + bool hasGroupId; + uint64_t currGroupId; + SSDataBlock* prefetchedSortInput; + SSortHandle* pCurrSortHandle; EChildOperatorStatus childOpStatus; + + SSortExecInfo sortExecInfo; } SGroupSortOperatorInfo; - -SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, - SGroupSortOperatorInfo* pInfo) { +SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, + SArray* pColMatchInfo, SGroupSortOperatorInfo* pInfo) { blockDataCleanup(pDataBlock); SSDataBlock* p = tsortGetSortedDataBlock(pHandle); @@ -296,30 +296,28 @@ SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; } - typedef struct SGroupSortSourceParam { - SOperatorInfo* childOpInfo; + SOperatorInfo* childOpInfo; SGroupSortOperatorInfo* grpSortOpInfo; } SGroupSortSourceParam; - SSDataBlock* fetchNextGroupSortDataBlock(void* param) { - SGroupSortSourceParam* source = param; + SGroupSortSourceParam* source = param; SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo; - if (grpSortOpInfo->prefetchedDatablock) { - SSDataBlock* pBlock = grpSortOpInfo->prefetchedDatablock; - grpSortOpInfo->prefetchedDatablock = NULL; - return pBlock; + if (grpSortOpInfo->prefetchedSortInput) { + SSDataBlock* block = grpSortOpInfo->prefetchedSortInput; + grpSortOpInfo->prefetchedSortInput = NULL; + return block; } else { SOperatorInfo* childOp = source->childOpInfo; - SSDataBlock* block = childOp->fpSet.getNextFn(childOp); + SSDataBlock* block = childOp->fpSet.getNextFn(childOp); if (block != NULL) { if (block->info.groupId == grpSortOpInfo->currGroupId) { grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP; return block; } else { grpSortOpInfo->childOpStatus = CHILD_OP_NEW_GROUP; - grpSortOpInfo->prefetchedDatablock = block; + grpSortOpInfo->prefetchedSortInput = block; return NULL; } } else { @@ -329,19 +327,17 @@ SSDataBlock* fetchNextGroupSortDataBlock(void* param) { } } - int32_t beginSortGroup(SOperatorInfo* pOperator) { SGroupSortOperatorInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; // pInfo->binfo.pRes is not equalled to the input datablock. - pInfo->pCurrSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, - NULL, pTaskInfo->id.str); + pInfo->pCurrSortHandle = + tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str); tsortSetFetchRawDataFp(pInfo->pCurrSortHandle, fetchNextGroupSortDataBlock, applyScalarFunction, pOperator); - SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); + SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam)); param->childOpInfo = pOperator->pDownstream[0]; param->grpSortOpInfo = pInfo; @@ -360,12 +356,18 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) { int32_t finishSortGroup(SOperatorInfo* pOperator) { SGroupSortOperatorInfo* pInfo = pOperator->info; + + SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pCurrSortHandle); + pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod; + pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer; + pInfo->sortExecInfo.loops += sortExecInfo.loops; + pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes; + pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes; if (pInfo->pCurrSortHandle != NULL) { tsortDestroySortHandle(pInfo->pCurrSortHandle); } pInfo->pCurrSortHandle = NULL; return TSDB_CODE_SUCCESS; - } SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { @@ -373,7 +375,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { return NULL; } - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SGroupSortOperatorInfo* pInfo = pOperator->info; int32_t code = pOperator->fpSet._openFn(pOperator); @@ -384,46 +386,50 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { if (!pInfo->hasGroupId) { pInfo->hasGroupId = true; - pInfo->prefetchedDatablock = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0]); - pInfo->currGroupId = pInfo->prefetchedDatablock->info.groupId; + pInfo->prefetchedSortInput = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0]); + pInfo->currGroupId = pInfo->prefetchedSortInput->info.groupId; pInfo->childOpStatus = CHILD_OP_NEW_GROUP; beginSortGroup(pOperator); } - SSDataBlock* pBlock = NULL; - while (pInfo->childOpStatus != CHILD_OP_FINISHED) { - pBlock = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, - pInfo->pColMatchInfo, pInfo); + SSDataBlock* pBlock = NULL; + while (pInfo->pCurrSortHandle != NULL) { + // beginSortGroup would fetch all child blocks of pInfo->currGroupId; + ASSERT(pInfo->childOpStatus != CHILD_OP_SAME_GROUP); + pBlock = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, + pInfo->pColMatchInfo, pInfo); if (pBlock != NULL) { pBlock->info.groupId = pInfo->currGroupId; pOperator->resultInfo.totalRows += pBlock->info.rows; + return pBlock; + } else { + if (pInfo->childOpStatus == CHILD_OP_NEW_GROUP) { + finishSortGroup(pOperator); + pInfo->currGroupId = pInfo->prefetchedSortInput->info.groupId; + beginSortGroup(pOperator); + } else if (pInfo->childOpStatus == CHILD_OP_FINISHED) { + finishSortGroup(pOperator); + doSetOperatorCompleted(pOperator); + return NULL; + } } - if (pInfo->childOpStatus == CHILD_OP_NEW_GROUP) { - finishSortGroup(pOperator); - pInfo->currGroupId = pInfo->prefetchedDatablock->info.groupId; - beginSortGroup(pOperator); - } - return pBlock; } - - if (pInfo->childOpStatus == CHILD_OP_FINISHED) { - finishSortGroup(pOperator); - doSetOperatorCompleted(pOperator); - } - - return pBlock; + return NULL; } int32_t getGroupSortExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { - //TODO: accumulate all sort handles; + SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)pOptr->info; + *pOptrExplain = &pInfo->sortExecInfo; + *len = sizeof(SSortExecInfo); return TSDB_CODE_SUCCESS; } -//TODO: -SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo) { +// TODO: +SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, + SExecTaskInfo* pTaskInfo) { SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL/* || rowSize > 100 * 1024 * 1024*/) { + if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) { goto _error; } @@ -442,20 +448,21 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SSortPhysi initResultSizeInfo(pOperator, 1024); - pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);; - pInfo->pColMatchInfo = pColMatchColInfo; - pOperator->name = "GroupSortOperator"; - //TODO + pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys); + ; + pInfo->pColMatchInfo = pColMatchColInfo; + pOperator->name = "GroupSortOperator"; + // TODO pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->exprSupp.pExprInfo = pExprInfo; + pOperator->exprSupp.numOfExprs = numOfCols; + pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, - getGroupSortExplainExecInfo); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyOrderOperatorInfo, NULL, + NULL, getGroupSortExplainExecInfo); int32_t code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -479,10 +486,9 @@ void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) { taosArrayDestroy(pInfo->pColMatchInfo); } - -//TODO: sort group -//TODO: msortCompare compare group id in multiway merge sort. -//TODO: table merge scan, group first, then for each group, multiple readers +// TODO: sort group +// TODO: msortCompare compare group id in multiway merge sort. +// TODO: table merge scan, group first, then for each group, multiple readers //===================================================================================== // Multiway Sort Merge operator @@ -513,8 +519,8 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, - pInfo->bufPageSize, numOfBufPage, pInfo->pInputBlock, pTaskInfo->id.str); + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, + pInfo->pInputBlock, pTaskInfo->id.str); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); @@ -540,7 +546,7 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, SOperatorInfo* pOperator) { SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; blockDataCleanup(pDataBlock); @@ -641,24 +647,23 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, goto _error; } - initResultSizeInfo(pOperator, 1024); - pInfo->binfo.pRes = pResBlock; - pInfo->pSortInfo = pSortInfo; + pInfo->binfo.pRes = pResBlock; + pInfo->pSortInfo = pSortInfo; pInfo->pColMatchInfo = pColMatchColInfo; - pInfo->pInputBlock = pInputBlock; - pOperator->name = "MultiwaySortMerge"; + pInfo->pInputBlock = pInputBlock; + pOperator->name = "MultiwaySortMerge"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pInfo->bufPageSize = getProperSortPageSize(rowSize); + pInfo->bufPageSize = getProperSortPageSize(rowSize); // one additional is reserved for merged result. - pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); + pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); pOperator->fpSet = createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL, From fadc11b68543c71fea5131f0aab9ddfe90abab59 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 22 Jun 2022 10:21:05 +0800 Subject: [PATCH 3/3] feat: add compare parameter that enable/disable group id comparision --- source/libs/executor/inc/tsort.h | 6 ++++++ source/libs/executor/src/sortoperator.c | 3 ++- source/libs/executor/src/tsort.c | 12 ++++++++++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index 363f379ee4..35bdd4ee55 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -50,6 +50,7 @@ typedef struct SMsortComparParam { void **pSources; int32_t numOfSources; SArray *orderInfo; // SArray + bool cmpGroupId; } SMsortComparParam; typedef struct SSortHandle SSortHandle; @@ -99,6 +100,11 @@ int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetc */ int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp); +/** + * + */ +int32_t tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId); + /** * * @param pHandle diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index eebe5f2d8a..0fabee5a1e 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -523,7 +523,8 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { pInfo->pInputBlock, pTaskInfo->id.str); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); - + tsortSetCompareGroupId(pInfo->pSortHandle, true); + for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); ps->param = pOperator->pDownstream[i]; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 581935514e..1dd72fd83f 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -86,6 +86,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES); pSortHandle->cmpParam.orderInfo = pSortInfo; + pSortHandle->cmpParam.cmpGroupId = false; tsortSetComparFp(pSortHandle, msortComparFn); @@ -374,6 +375,12 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { SSDataBlock* pLeftBlock = pLeftSource->src.pBlock; SSDataBlock* pRightBlock = pRightSource->src.pBlock; + if (pParam->cmpGroupId) { + if (pLeftBlock->info.groupId != pRightBlock->info.groupId) { + return pLeftBlock->info.groupId < pRightBlock->info.groupId ? -1 : 1; + } + } + for(int32_t i = 0; i < pInfo->size; ++i) { SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i); SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId); @@ -680,6 +687,11 @@ int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp) { return TSDB_CODE_SUCCESS; } +int32_t tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId) { + pHandle->cmpParam.cmpGroupId = compareGroupId; + return TSDB_CODE_SUCCESS; +} + STupleHandle* tsortNextTuple(SSortHandle* pHandle) { if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) { return NULL;