diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index 363f379ee4..35bdd4ee55 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -50,6 +50,7 @@ typedef struct SMsortComparParam { void **pSources; int32_t numOfSources; SArray *orderInfo; // SArray + bool cmpGroupId; } SMsortComparParam; typedef struct SSortHandle SSortHandle; @@ -99,6 +100,11 @@ int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetc */ int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp); +/** + * + */ +int32_t tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId); + /** * * @param pHandle diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 9d13276e6d..0fabee5a1e 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -22,10 +22,11 @@ static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); -SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, + SExecTaskInfo* pTaskInfo) { SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL/* || rowSize > 100 * 1024 * 1024*/) { + if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) { goto _error; } @@ -44,16 +45,17 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* initResultSizeInfo(pOperator, 1024); - pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);; - pInfo->pColMatchInfo = pColMatchColInfo; - pOperator->name = "SortOperator"; + pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys); + ; + pInfo->pColMatchInfo = pColMatchColInfo; + pOperator->name = "SortOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->exprSupp.pExprInfo = pExprInfo; + pOperator->exprSupp.numOfExprs = numOfCols; + pOperator->pTaskInfo = pTaskInfo; // lazy evaluation for the following parameter since the input datablock is not known till now. // pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; @@ -146,8 +148,8 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) { SOperatorInfo* pOperator = param; SSortOperatorInfo* pSort = pOperator->info; if (pOperator->exprSupp.pExprInfo != NULL) { - int32_t code = - projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL); + int32_t code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx, + pOperator->exprSupp.numOfExprs, NULL); if (code != TSDB_CODE_SUCCESS) { longjmp(pOperator->pTaskInfo->env, code); } @@ -165,8 +167,7 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) { pInfo->startTs = taosGetTimestampUs(); // pInfo->binfo.pRes is not equalled to the input datablock. - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, - NULL, pTaskInfo->id.str); + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator); @@ -232,6 +233,265 @@ int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* return TSDB_CODE_SUCCESS; } +//===================================================================================== +// Group Sort Operator +typedef enum EChildOperatorStatus { CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED } EChildOperatorStatus; + +typedef struct SGroupSortOperatorInfo { + SOptrBasicInfo binfo; + SArray* pSortInfo; + SArray* pColMatchInfo; + + int64_t startTs; + uint64_t sortElapsed; + bool hasGroupId; + uint64_t currGroupId; + + SSDataBlock* prefetchedSortInput; + SSortHandle* pCurrSortHandle; + EChildOperatorStatus childOpStatus; + + SSortExecInfo sortExecInfo; +} SGroupSortOperatorInfo; + +SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, + SArray* pColMatchInfo, SGroupSortOperatorInfo* pInfo) { + blockDataCleanup(pDataBlock); + + SSDataBlock* p = tsortGetSortedDataBlock(pHandle); + if (p == NULL) { + return NULL; + } + + blockDataEnsureCapacity(p, capacity); + + while (1) { + STupleHandle* pTupleHandle = tsortNextTuple(pHandle); + if (pTupleHandle == NULL) { + break; + } + + appendOneRowToDataBlock(p, pTupleHandle); + if (p->info.rows >= capacity) { + break; + } + } + + if (p->info.rows > 0) { + int32_t numOfCols = taosArrayGetSize(pColMatchInfo); + for (int32_t i = 0; i < numOfCols; ++i) { + SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i); + ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); + + SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); + SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId); + colDataAssign(pDst, pSrc, p->info.rows); + } + + pDataBlock->info.rows = p->info.rows; + pDataBlock->info.capacity = p->info.rows; + } + + blockDataDestroy(p); + return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; +} + +typedef struct SGroupSortSourceParam { + SOperatorInfo* childOpInfo; + SGroupSortOperatorInfo* grpSortOpInfo; +} SGroupSortSourceParam; + +SSDataBlock* fetchNextGroupSortDataBlock(void* param) { + SGroupSortSourceParam* source = param; + SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo; + if (grpSortOpInfo->prefetchedSortInput) { + SSDataBlock* block = grpSortOpInfo->prefetchedSortInput; + grpSortOpInfo->prefetchedSortInput = NULL; + return block; + } else { + SOperatorInfo* childOp = source->childOpInfo; + SSDataBlock* block = childOp->fpSet.getNextFn(childOp); + if (block != NULL) { + if (block->info.groupId == grpSortOpInfo->currGroupId) { + grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP; + return block; + } else { + grpSortOpInfo->childOpStatus = CHILD_OP_NEW_GROUP; + grpSortOpInfo->prefetchedSortInput = block; + return NULL; + } + } else { + grpSortOpInfo->childOpStatus = CHILD_OP_FINISHED; + return NULL; + } + } +} + +int32_t beginSortGroup(SOperatorInfo* pOperator) { + SGroupSortOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + // pInfo->binfo.pRes is not equalled to the input datablock. + pInfo->pCurrSortHandle = + tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str); + + tsortSetFetchRawDataFp(pInfo->pCurrSortHandle, fetchNextGroupSortDataBlock, applyScalarFunction, pOperator); + + SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); + SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam)); + param->childOpInfo = pOperator->pDownstream[0]; + param->grpSortOpInfo = pInfo; + ps->param = param; + tsortAddSource(pInfo->pCurrSortHandle, ps); + + int32_t code = tsortOpen(pInfo->pCurrSortHandle); + taosMemoryFreeClear(ps); + + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, terrno); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t finishSortGroup(SOperatorInfo* pOperator) { + SGroupSortOperatorInfo* pInfo = pOperator->info; + + SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pCurrSortHandle); + pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod; + pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer; + pInfo->sortExecInfo.loops += sortExecInfo.loops; + pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes; + pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes; + if (pInfo->pCurrSortHandle != NULL) { + tsortDestroySortHandle(pInfo->pCurrSortHandle); + } + pInfo->pCurrSortHandle = NULL; + return TSDB_CODE_SUCCESS; +} + +SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SGroupSortOperatorInfo* pInfo = pOperator->info; + + int32_t code = pOperator->fpSet._openFn(pOperator); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } + + if (!pInfo->hasGroupId) { + pInfo->hasGroupId = true; + + pInfo->prefetchedSortInput = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0]); + pInfo->currGroupId = pInfo->prefetchedSortInput->info.groupId; + pInfo->childOpStatus = CHILD_OP_NEW_GROUP; + beginSortGroup(pOperator); + } + + SSDataBlock* pBlock = NULL; + while (pInfo->pCurrSortHandle != NULL) { + // beginSortGroup would fetch all child blocks of pInfo->currGroupId; + ASSERT(pInfo->childOpStatus != CHILD_OP_SAME_GROUP); + pBlock = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, + pInfo->pColMatchInfo, pInfo); + if (pBlock != NULL) { + pBlock->info.groupId = pInfo->currGroupId; + pOperator->resultInfo.totalRows += pBlock->info.rows; + return pBlock; + } else { + if (pInfo->childOpStatus == CHILD_OP_NEW_GROUP) { + finishSortGroup(pOperator); + pInfo->currGroupId = pInfo->prefetchedSortInput->info.groupId; + beginSortGroup(pOperator); + } else if (pInfo->childOpStatus == CHILD_OP_FINISHED) { + finishSortGroup(pOperator); + doSetOperatorCompleted(pOperator); + return NULL; + } + } + } + return NULL; +} + +int32_t getGroupSortExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { + SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)pOptr->info; + *pOptrExplain = &pInfo->sortExecInfo; + *len = sizeof(SSortExecInfo); + return TSDB_CODE_SUCCESS; +} + +// TODO: +SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, + SExecTaskInfo* pTaskInfo) { + SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) { + goto _error; + } + + SDataBlockDescNode* pDescNode = pSortPhyNode->node.pOutputDataBlockDesc; + + int32_t numOfCols = 0; + SSDataBlock* pResBlock = createResDataBlock(pDescNode); + SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols); + + int32_t numOfOutputCols = 0; + SArray* pColMatchColInfo = + extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); + + pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); + pInfo->binfo.pRes = pResBlock; + + initResultSizeInfo(pOperator, 1024); + + pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys); + ; + pInfo->pColMatchInfo = pColMatchColInfo; + pOperator->name = "GroupSortOperator"; + // TODO + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->exprSupp.pExprInfo = pExprInfo; + pOperator->exprSupp.numOfExprs = numOfCols; + pOperator->pTaskInfo = pTaskInfo; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyOrderOperatorInfo, NULL, + NULL, getGroupSortExplainExecInfo); + + int32_t code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + return pOperator; + +_error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); + return NULL; +} + +void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) { + SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)param; + pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); + + taosArrayDestroy(pInfo->pSortInfo); + taosArrayDestroy(pInfo->pColMatchInfo); +} + +// TODO: sort group +// TODO: msortCompare compare group id in multiway merge sort. +// TODO: table merge scan, group first, then for each group, multiple readers + +//===================================================================================== +// Multiway Sort Merge operator typedef struct SMultiwaySortMergeOperatorInfo { SOptrBasicInfo binfo; @@ -259,11 +519,12 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, - pInfo->bufPageSize, numOfBufPage, pInfo->pInputBlock, pTaskInfo->id.str); + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, + pInfo->pInputBlock, pTaskInfo->id.str); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); - + tsortSetCompareGroupId(pInfo->pSortHandle, true); + for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); ps->param = pOperator->pDownstream[i]; @@ -286,7 +547,7 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, SOperatorInfo* pOperator) { SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; blockDataCleanup(pDataBlock); @@ -387,24 +648,23 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, goto _error; } - initResultSizeInfo(pOperator, 1024); - pInfo->binfo.pRes = pResBlock; - pInfo->pSortInfo = pSortInfo; + pInfo->binfo.pRes = pResBlock; + pInfo->pSortInfo = pSortInfo; pInfo->pColMatchInfo = pColMatchColInfo; - pInfo->pInputBlock = pInputBlock; - pOperator->name = "MultiwaySortMerge"; + pInfo->pInputBlock = pInputBlock; + pOperator->name = "MultiwaySortMerge"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pInfo->bufPageSize = getProperSortPageSize(rowSize); + pInfo->bufPageSize = getProperSortPageSize(rowSize); // one additional is reserved for merged result. - pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); + pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); pOperator->fpSet = createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL, diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 581935514e..1dd72fd83f 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -86,6 +86,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES); pSortHandle->cmpParam.orderInfo = pSortInfo; + pSortHandle->cmpParam.cmpGroupId = false; tsortSetComparFp(pSortHandle, msortComparFn); @@ -374,6 +375,12 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { SSDataBlock* pLeftBlock = pLeftSource->src.pBlock; SSDataBlock* pRightBlock = pRightSource->src.pBlock; + if (pParam->cmpGroupId) { + if (pLeftBlock->info.groupId != pRightBlock->info.groupId) { + return pLeftBlock->info.groupId < pRightBlock->info.groupId ? -1 : 1; + } + } + for(int32_t i = 0; i < pInfo->size; ++i) { SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i); SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId); @@ -680,6 +687,11 @@ int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp) { return TSDB_CODE_SUCCESS; } +int32_t tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId) { + pHandle->cmpParam.cmpGroupId = compareGroupId; + return TSDB_CODE_SUCCESS; +} + STupleHandle* tsortNextTuple(SSortHandle* pHandle) { if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) { return NULL;