diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 9d13276e6d..1880d169bf 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -232,6 +232,260 @@ int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* return TSDB_CODE_SUCCESS; } +//===================================================================================== +// Group Sort Operator +typedef enum EChildOperatorStatus {CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED} EChildOperatorStatus; + +typedef struct SGroupSortOperatorInfo { + SOptrBasicInfo binfo; + uint32_t sortBufSize; // max buffer size for in-memory sort + SArray* pSortInfo; + SArray* pColMatchInfo; // for index map from table scan output + int32_t bufPageSize; + + int64_t startTs; // sort start time + uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. + SSDataBlock* prefetchedDatablock; + bool hasGroupId; + uint64_t currGroupId; + SSortHandle* pCurrSortHandle; + + EChildOperatorStatus childOpStatus; +} SGroupSortOperatorInfo; + + +SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, + SGroupSortOperatorInfo* pInfo) { + blockDataCleanup(pDataBlock); + + SSDataBlock* p = tsortGetSortedDataBlock(pHandle); + if (p == NULL) { + return NULL; + } + + blockDataEnsureCapacity(p, capacity); + + while (1) { + STupleHandle* pTupleHandle = tsortNextTuple(pHandle); + if (pTupleHandle == NULL) { + break; + } + + appendOneRowToDataBlock(p, pTupleHandle); + if (p->info.rows >= capacity) { + break; + } + } + + if (p->info.rows > 0) { + int32_t numOfCols = taosArrayGetSize(pColMatchInfo); + for (int32_t i = 0; i < numOfCols; ++i) { + SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i); + ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); + + SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); + SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId); + colDataAssign(pDst, pSrc, p->info.rows); + } + + pDataBlock->info.rows = p->info.rows; + pDataBlock->info.capacity = p->info.rows; + } + + blockDataDestroy(p); + return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; +} + + +typedef struct SGroupSortSourceParam { + SOperatorInfo* childOpInfo; + SGroupSortOperatorInfo* grpSortOpInfo; +} SGroupSortSourceParam; + + +SSDataBlock* fetchNextGroupSortDataBlock(void* param) { + SGroupSortSourceParam* source = param; + SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo; + if (grpSortOpInfo->prefetchedDatablock) { + SSDataBlock* pBlock = grpSortOpInfo->prefetchedDatablock; + grpSortOpInfo->prefetchedDatablock = NULL; + return pBlock; + } else { + SOperatorInfo* childOp = source->childOpInfo; + SSDataBlock* block = childOp->fpSet.getNextFn(childOp); + if (block != NULL) { + if (block->info.groupId == grpSortOpInfo->currGroupId) { + grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP; + return block; + } else { + grpSortOpInfo->childOpStatus = CHILD_OP_NEW_GROUP; + grpSortOpInfo->prefetchedDatablock = block; + return NULL; + } + } else { + grpSortOpInfo->childOpStatus = CHILD_OP_FINISHED; + return NULL; + } + } +} + + +int32_t beginSortGroup(SOperatorInfo* pOperator) { + SGroupSortOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + + // pInfo->binfo.pRes is not equalled to the input datablock. + pInfo->pCurrSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, + NULL, pTaskInfo->id.str); + + tsortSetFetchRawDataFp(pInfo->pCurrSortHandle, fetchNextGroupSortDataBlock, applyScalarFunction, pOperator); + + SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); + SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam)); + param->childOpInfo = pOperator->pDownstream[0]; + param->grpSortOpInfo = pInfo; + ps->param = param; + tsortAddSource(pInfo->pCurrSortHandle, ps); + + int32_t code = tsortOpen(pInfo->pCurrSortHandle); + taosMemoryFreeClear(ps); + + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, terrno); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t finishSortGroup(SOperatorInfo* pOperator) { + SGroupSortOperatorInfo* pInfo = pOperator->info; + if (pInfo->pCurrSortHandle != NULL) { + tsortDestroySortHandle(pInfo->pCurrSortHandle); + } + pInfo->pCurrSortHandle = NULL; + return TSDB_CODE_SUCCESS; + +} + +SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SGroupSortOperatorInfo* pInfo = pOperator->info; + + int32_t code = pOperator->fpSet._openFn(pOperator); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } + + if (!pInfo->hasGroupId) { + pInfo->hasGroupId = true; + + pInfo->prefetchedDatablock = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0]); + pInfo->currGroupId = pInfo->prefetchedDatablock->info.groupId; + pInfo->childOpStatus = CHILD_OP_NEW_GROUP; + beginSortGroup(pOperator); + } + SSDataBlock* pBlock = NULL; + while (pInfo->childOpStatus != CHILD_OP_FINISHED) { + pBlock = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, + pInfo->pColMatchInfo, pInfo); + + if (pBlock != NULL) { + pBlock->info.groupId = pInfo->currGroupId; + pOperator->resultInfo.totalRows += pBlock->info.rows; + } + if (pInfo->childOpStatus == CHILD_OP_NEW_GROUP) { + finishSortGroup(pOperator); + pInfo->currGroupId = pInfo->prefetchedDatablock->info.groupId; + beginSortGroup(pOperator); + } + return pBlock; + } + + if (pInfo->childOpStatus == CHILD_OP_FINISHED) { + finishSortGroup(pOperator); + doSetOperatorCompleted(pOperator); + } + + return pBlock; +} + +int32_t getGroupSortExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { + //TODO: accumulate all sort handles; + return TSDB_CODE_SUCCESS; +} + +//TODO: +SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo) { + SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL/* || rowSize > 100 * 1024 * 1024*/) { + goto _error; + } + + SDataBlockDescNode* pDescNode = pSortPhyNode->node.pOutputDataBlockDesc; + + int32_t numOfCols = 0; + SSDataBlock* pResBlock = createResDataBlock(pDescNode); + SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols); + + int32_t numOfOutputCols = 0; + SArray* pColMatchColInfo = + extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); + + pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); + pInfo->binfo.pRes = pResBlock; + + initResultSizeInfo(pOperator, 1024); + + pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);; + pInfo->pColMatchInfo = pColMatchColInfo; + pOperator->name = "GroupSortOperator"; + //TODO + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->exprSupp.pExprInfo = pExprInfo; + pOperator->exprSupp.numOfExprs = numOfCols; + pOperator->pTaskInfo = pTaskInfo; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, + getGroupSortExplainExecInfo); + + int32_t code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + return pOperator; + +_error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); + return NULL; +} + +void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) { + SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)param; + pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); + + taosArrayDestroy(pInfo->pSortInfo); + taosArrayDestroy(pInfo->pColMatchInfo); +} + + +//TODO: sort group +//TODO: msortCompare compare group id in multiway merge sort. +//TODO: table merge scan, group first, then for each group, multiple readers + +//===================================================================================== +// Multiway Sort Merge operator typedef struct SMultiwaySortMergeOperatorInfo { SOptrBasicInfo binfo;