From d1a1976aa47cdac58e4fe56e3634c121ac43c96c Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 19 Jul 2023 13:59:43 +0800 Subject: [PATCH] enhance: add limit to merge sort --- source/libs/executor/inc/tsort.h | 12 +++-- source/libs/executor/src/scanoperator.c | 8 +++- source/libs/executor/src/tsort.c | 58 ++++++++++++++++--------- 3 files changed, 51 insertions(+), 27 deletions(-) diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index a15b1e0eaf..538a9f18f6 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -26,7 +26,7 @@ extern "C" { enum { SORT_MULTISOURCE_MERGE = 0x1, SORT_SINGLESOURCE_SORT = 0x2, - SORT_TABLE_MERGE_SCAN = 0x3 + SORT_BLOCK_TS_MERGE = 0x3 }; typedef struct SMultiMergeSource { @@ -56,7 +56,7 @@ typedef struct SMsortComparParam { bool cmpGroupId; int32_t sortType; - // the following field to speed up when sortType == SORT_TABLE_MERGE_SCAN + // the following field to speed up when sortType == SORT_BLOCK_TS_MERGE int32_t tsSlotId; int32_t order; __compar_fn_t cmpFn; @@ -77,8 +77,8 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void* * @return */ SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages, - SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength, - uint32_t sortBufSize); + SSDataBlock* pBlock, const char* idstr, uint64_t pqMaxRows, uint32_t pqMaxTupleLength, + uint32_t pqSortBufSize); void tsortSetForceUsePQSort(SSortHandle* pHandle); @@ -117,6 +117,10 @@ int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetc */ int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp); +/** + * +*/ +void tsortSetMergeLimit(SSortHandle* pHandle, int64_t mergeLimit); /** * */ diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ab8991bd05..6edd7b2d47 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2842,9 +2842,13 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->sortBufSize = 1024 * pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_TABLE_MERGE_SCAN, pInfo->bufPageSize, numOfBufPage, + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); - + int64_t mergeLimit = -1; + if (pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1) { + mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset; + } + tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit); tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL); // one table has one data block diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 4bcf455ef7..c56dab1e33 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -42,13 +42,15 @@ struct SSortHandle { int64_t startTs; uint64_t totalElapsed; - uint64_t maxRows; - uint32_t maxTupleLength; - uint32_t sortBufSize; + uint64_t pqMaxRows; + uint32_t pqMaxTupleLength; + uint32_t pqSortBufSize; bool forceUsePQSort; BoundedQueue* pBoundedQueue; uint32_t tmpRowIdx; + int64_t mergeLimit; + int32_t sourceId; SSDataBlock* pDataBlock; SMsortComparParam cmpParam; @@ -173,8 +175,8 @@ void destroyTuple(void* t) { * @return */ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages, - SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength, - uint32_t sortBufSize) { + SSDataBlock* pBlock, const char* idstr, uint64_t pqMaxRows, uint32_t pqMaxTupleLength, + uint32_t pqSortBufSize) { SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle)); pSortHandle->type = type; @@ -183,10 +185,10 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page pSortHandle->pSortInfo = pSortInfo; pSortHandle->loops = 0; - pSortHandle->maxTupleLength = maxTupleLength; - if (maxRows != 0) { - pSortHandle->sortBufSize = sortBufSize; - pSortHandle->maxRows = maxRows; + pSortHandle->pqMaxTupleLength = pqMaxTupleLength; + if (pqMaxRows != 0) { + pSortHandle->pqSortBufSize = pqSortBufSize; + pSortHandle->pqMaxRows = pqMaxRows; } pSortHandle->forceUsePQSort = false; @@ -194,11 +196,13 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page pSortHandle->pDataBlock = createOneDataBlock(pBlock, false); } + pSortHandle->mergeLimit = -1; + pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES); pSortHandle->cmpParam.orderInfo = pSortInfo; pSortHandle->cmpParam.cmpGroupId = false; pSortHandle->cmpParam.sortType = type; - if (type == SORT_TABLE_MERGE_SCAN) { + if (type == SORT_BLOCK_TS_MERGE) { SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pSortInfo, 0); pSortHandle->cmpParam.tsSlotId = pOrder->slotId; pSortHandle->cmpParam.order = pOrder->order; @@ -586,7 +590,7 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) { } } - if (pParam->sortType == SORT_TABLE_MERGE_SCAN) { + if (pParam->sortType == SORT_BLOCK_TS_MERGE) { SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pParam->tsSlotId); SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pParam->tsSlotId); int64_t* left1 = (int64_t*)(pLeftColInfoData->pData) + pLeftSource->src.rowIndex; @@ -709,18 +713,23 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { taosArrayDestroy(pResList); return code; } - + int nRows = 0; SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t)); while (1) { if (tsortIsClosed(pHandle)) { code = terrno = TSDB_CODE_TSC_QUERY_CANCELLED; return code; } - + + if (pHandle->mergeLimit != -1 && nRows >= pHandle->mergeLimit) { + break; + } + SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows); if (pDataBlock == NULL) { break; } + nRows += pDataBlock->info.rows; int32_t pageId = -1; void* pPage = getNewBufPage(pHandle->pBuf, &pageId); @@ -740,7 +749,6 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { setBufPageDirty(pPage, true); releaseBufPage(pHandle->pBuf, pPage); - blockDataCleanup(pDataBlock); } @@ -846,7 +854,6 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, setBufPageDirty(pPage, true); releaseBufPage(pHandle->pBuf, pPage); - blockDataCleanup(blk); return 0; } @@ -913,6 +920,9 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO size_t blkPgSz = pgHeaderSz; while (nRows < totalRows) { + if (pHandle->mergeLimit != -1 && nRows >= pHandle->mergeLimit) { + break; + } int32_t minIdx = tMergeTreeGetChosenIndex(pTree); SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx); int32_t minRow = sup.aRowIdx[minIdx]; @@ -920,6 +930,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) { appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); + blockDataCleanup(pHandle->pDataBlock); blkPgSz = pgHeaderSz; bufInc = getPageBufIncForRow(minBlk, minRow, 0); } @@ -939,6 +950,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO } if (pHandle->pDataBlock->info.rows > 0) { appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); + blockDataCleanup(pHandle->pDataBlock); } SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false); doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId); @@ -1061,7 +1073,7 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) { int64_t el = taosGetTimestampUs() - p; pHandle->sortElapsed += el; - if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows); + if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows); code = doAddToBuf(pHandle->pDataBlock, pHandle); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1086,7 +1098,7 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) { return code; } - if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows); + if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows); int64_t el = taosGetTimestampUs() - p; pHandle->sortElapsed += el; @@ -1111,7 +1123,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) { if (pHandle->type == SORT_SINGLESOURCE_SORT) { code = createBlocksQuickSortInitialSources(pHandle); - } else if (pHandle->type == SORT_TABLE_MERGE_SCAN) { + } else if (pHandle->type == SORT_BLOCK_TS_MERGE) { code = createBlocksMergeSortInitialSources(pHandle); } uInfo("%zu sources created", taosArrayGetSize(pHandle->pOrderedSource)); @@ -1165,6 +1177,10 @@ void tsortSetClosed(SSortHandle* pHandle) { atomic_store_8(&pHandle->closed, 2); } +void tsortSetMergeLimit(SSortHandle* pHandle, int64_t mergeLimit) { + pHandle->mergeLimit = mergeLimit; +} + int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*), void* param) { pHandle->fetchfp = fetchFp; @@ -1244,8 +1260,8 @@ void tsortSetForceUsePQSort(SSortHandle* pHandle) { static bool tsortIsPQSortApplicable(SSortHandle* pHandle) { if (pHandle->type != SORT_SINGLESOURCE_SORT) return false; if (tsortIsForceUsePQSort(pHandle)) return true; - uint64_t maxRowsFitInMemory = pHandle->sortBufSize / (pHandle->maxTupleLength + sizeof(char*)); - return maxRowsFitInMemory > pHandle->maxRows; + uint64_t maxRowsFitInMemory = pHandle->pqSortBufSize / (pHandle->pqMaxTupleLength + sizeof(char*)); + return maxRowsFitInMemory > pHandle->pqMaxRows; } static bool tsortPQCompFn(void* a, void* b, void* param) { @@ -1291,7 +1307,7 @@ static int32_t tupleComparFn(const void* pLeft, const void* pRight, void* param) } static int32_t tsortOpenForPQSort(SSortHandle* pHandle) { - pHandle->pBoundedQueue = createBoundedQueue(pHandle->maxRows, tsortPQCompFn, destroyTuple, pHandle); + pHandle->pBoundedQueue = createBoundedQueue(pHandle->pqMaxRows, tsortPQCompFn, destroyTuple, pHandle); if (NULL == pHandle->pBoundedQueue) return TSDB_CODE_OUT_OF_MEMORY; tsortSetComparFp(pHandle, tupleComparFn);