From 9728518db030fd556f7ef12a692bf2b5a935e363 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 22 Jan 2024 15:18:49 +0800 Subject: [PATCH 1/6] feat: extract rows within limit before sort to disk --- source/libs/executor/src/tsort.c | 38 ++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index ee1d831a24..24df12d06b 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1040,6 +1040,33 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO return 0; } +static SSDataBlock* getBlockWithinLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk) { + int64_t keepRows = pOrigBlk->info.rows; + int64_t nRows = 0; + int64_t prevRows = 0; + void* pNum = tSimpleHashGet(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid)); + if (pNum == NULL) { + prevRows = 0; + nRows = pOrigBlk->info.rows; + tSimpleHashPut(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid), &nRows, sizeof(nRows)); + } else { + prevRows = *(int64_t*)pNum; + *(int64_t*)pNum = *(int64_t*)pNum + pOrigBlk->info.rows; + nRows = *(int64_t*)pNum; + } + + if (nRows >= pHandle->mergeLimit) { + keepRows = pHandle->mergeLimit - prevRows; + } + SSDataBlock* pBlock = NULL; + if (keepRows != pOrigBlk->info.rows) { + pBlock = blockDataExtractBlock(pOrigBlk, 0, keepRows); + } else { + pBlock = createOneDataBlock(pOrigBlk, true); + } + return pBlock; +} + static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0); size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); @@ -1062,10 +1089,17 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { pHandle->currMergeLimitTs = INT64_MIN; } + SSHashObj* mTableNumRows = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES); SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); while (1) { SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); + + int64_t p = taosGetTimestampUs(); + if (pBlk != NULL && pHandle->mergeLimit != -1) { + pBlk = getBlockWithinLimit(pHandle, mTableNumRows, pBlk); + } + if (pBlk != NULL) { SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrder->slotId); int64_t firstRowTs = *(int64_t*)tsCol->pData; @@ -1074,6 +1108,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { continue; } } + if (pBlk != NULL) { szSort += blockDataGetSize(pBlk); @@ -1091,7 +1126,6 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) { tSimpleHashClear(mUidBlk); - int64_t p = taosGetTimestampUs(); code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); if (code != TSDB_CODE_SUCCESS) { tSimpleHashCleanup(mUidBlk); @@ -1131,7 +1165,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); } taosArrayDestroy(aExtSrc); - + tSimpleHashCleanup(mTableNumRows); pHandle->type = SORT_SINGLESOURCE_SORT; return TSDB_CODE_SUCCESS; } From 3f441bb8cfa9d2652fe86cf25c48c24930a6c940 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 22 Jan 2024 15:18:49 +0800 Subject: [PATCH 2/6] feat: extract rows within limit before sort to disk --- source/libs/executor/src/tsort.c | 38 ++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index ee1d831a24..24df12d06b 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1040,6 +1040,33 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO return 0; } +static SSDataBlock* getBlockWithinLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk) { + int64_t keepRows = pOrigBlk->info.rows; + int64_t nRows = 0; + int64_t prevRows = 0; + void* pNum = tSimpleHashGet(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid)); + if (pNum == NULL) { + prevRows = 0; + nRows = pOrigBlk->info.rows; + tSimpleHashPut(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid), &nRows, sizeof(nRows)); + } else { + prevRows = *(int64_t*)pNum; + *(int64_t*)pNum = *(int64_t*)pNum + pOrigBlk->info.rows; + nRows = *(int64_t*)pNum; + } + + if (nRows >= pHandle->mergeLimit) { + keepRows = pHandle->mergeLimit - prevRows; + } + SSDataBlock* pBlock = NULL; + if (keepRows != pOrigBlk->info.rows) { + pBlock = blockDataExtractBlock(pOrigBlk, 0, keepRows); + } else { + pBlock = createOneDataBlock(pOrigBlk, true); + } + return pBlock; +} + static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0); size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); @@ -1062,10 +1089,17 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { pHandle->currMergeLimitTs = INT64_MIN; } + SSHashObj* mTableNumRows = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES); SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); while (1) { SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); + + int64_t p = taosGetTimestampUs(); + if (pBlk != NULL && pHandle->mergeLimit != -1) { + pBlk = getBlockWithinLimit(pHandle, mTableNumRows, pBlk); + } + if (pBlk != NULL) { SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrder->slotId); int64_t firstRowTs = *(int64_t*)tsCol->pData; @@ -1074,6 +1108,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { continue; } } + if (pBlk != NULL) { szSort += blockDataGetSize(pBlk); @@ -1091,7 +1126,6 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) { tSimpleHashClear(mUidBlk); - int64_t p = taosGetTimestampUs(); code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); if (code != TSDB_CODE_SUCCESS) { tSimpleHashCleanup(mUidBlk); @@ -1131,7 +1165,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); } taosArrayDestroy(aExtSrc); - + tSimpleHashCleanup(mTableNumRows); pHandle->type = SORT_SINGLESOURCE_SORT; return TSDB_CODE_SUCCESS; } From 6ca92a3d925ec5f364969b4c2df0f5f2cb5b22ed Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 23 Jan 2024 08:41:59 +0800 Subject: [PATCH 3/6] fix: meory leak --- source/libs/executor/src/tsort.c | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 24df12d06b..3e8d1628aa 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1040,7 +1040,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO return 0; } -static SSDataBlock* getBlockWithinLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk) { +static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk, bool* pExtractedBlock) { int64_t keepRows = pOrigBlk->info.rows; int64_t nRows = 0; int64_t prevRows = 0; @@ -1061,8 +1061,9 @@ static SSDataBlock* getBlockWithinLimit(const SSortHandle* pHandle, SSHashObj* m SSDataBlock* pBlock = NULL; if (keepRows != pOrigBlk->info.rows) { pBlock = blockDataExtractBlock(pOrigBlk, 0, keepRows); + *pExtractedBlock = true; } else { - pBlock = createOneDataBlock(pOrigBlk, true); + *pExtractedBlock = false; } return pBlock; } @@ -1096,8 +1097,9 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); int64_t p = taosGetTimestampUs(); + bool bExtractedBlock = false; if (pBlk != NULL && pHandle->mergeLimit != -1) { - pBlk = getBlockWithinLimit(pHandle, mTableNumRows, pBlk); + pBlk = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock); } if (pBlk != NULL) { @@ -1116,8 +1118,11 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if (ppBlk != NULL) { SSDataBlock* tBlk = *(SSDataBlock**)(ppBlk); blockDataMerge(tBlk, pBlk); + if (bExtractedBlock) { + blockDataDestroy(pBlk); + } } else { - SSDataBlock* tBlk = createOneDataBlock(pBlk, true); + SSDataBlock* tBlk = (bExtractedBlock) ? pBlk : createOneDataBlock(pBlk, true); tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES); taosArrayPush(aBlkSort, &tBlk); } From 5c9edce538b8f5d36e22869802542d260c98c56f Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 23 Jan 2024 10:45:26 +0800 Subject: [PATCH 4/6] fix: whole block error --- source/libs/executor/src/scanoperator.c | 2 +- source/libs/executor/src/tsort.c | 6 ++++-- tests/script/tsim/parser/limit_stb.sim | 1 + 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3ed5128858..808956ff5c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3660,7 +3660,7 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* terrno = TSDB_CODE_TSC_QUERY_CANCELLED; T_LONG_JMP(pOperator->pTaskInfo->env, terrno); } - + bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo); qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows, pInfo->limitInfo.numOfOutputRows); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 3e8d1628aa..38aac39d8e 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -885,7 +885,7 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, int32_t size = blockDataGetSize(blk) + sizeof(int32_t) + taosArrayGetSize(blk->pDataBlock) * sizeof(int32_t); ASSERT(size <= getBufPageSize(pHandle->pBuf)); - + blockDataToBuf(pPage, blk); setBufPageDirty(pPage, true); @@ -1041,7 +1041,6 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO } static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk, bool* pExtractedBlock) { - int64_t keepRows = pOrigBlk->info.rows; int64_t nRows = 0; int64_t prevRows = 0; void* pNum = tSimpleHashGet(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid)); @@ -1055,15 +1054,18 @@ static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSH nRows = *(int64_t*)pNum; } + int64_t keepRows = pOrigBlk->info.rows; if (nRows >= pHandle->mergeLimit) { keepRows = pHandle->mergeLimit - prevRows; } + SSDataBlock* pBlock = NULL; if (keepRows != pOrigBlk->info.rows) { pBlock = blockDataExtractBlock(pOrigBlk, 0, keepRows); *pExtractedBlock = true; } else { *pExtractedBlock = false; + pBlock = pOrigBlk; } return pBlock; } diff --git a/tests/script/tsim/parser/limit_stb.sim b/tests/script/tsim/parser/limit_stb.sim index 7d6aff3b51..2e8f029260 100644 --- a/tests/script/tsim/parser/limit_stb.sim +++ b/tests/script/tsim/parser/limit_stb.sim @@ -129,6 +129,7 @@ endi $offset = $tbNum * $rowNum $offset = $offset - 1 +print select * from $stb order by ts limit 2 offset $offset sql select * from $stb order by ts limit 2 offset $offset if $rows != 1 then return -1 From 70f869ce33f30eeeb446219f2979204f9c968c5d Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 23 Jan 2024 17:05:03 +0800 Subject: [PATCH 5/6] fix: memory error when limit 0 --- source/libs/executor/src/tsort.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 38aac39d8e..64f47baca9 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1100,7 +1100,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { int64_t p = taosGetTimestampUs(); bool bExtractedBlock = false; - if (pBlk != NULL && pHandle->mergeLimit != -1) { + if (pBlk != NULL && pHandle->mergeLimit > 0) { pBlk = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock); } From 57a9ac75a8644510e17f9d3359f8ef54ba2f35f2 Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 24 Jan 2024 11:37:47 +0800 Subject: [PATCH 6/6] feat: remove limit reached from merge scan operator --- source/libs/executor/inc/tsort.h | 4 +++ source/libs/executor/src/scanoperator.c | 34 +++++++------------------ source/libs/executor/src/tsort.c | 11 ++++++++ 3 files changed, 24 insertions(+), 25 deletions(-) diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index 365acf2bff..436d1cefb8 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -204,6 +204,10 @@ void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void* param), vo */ int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* keyLen, const STupleHandle* pTuple); +/** + * @brief set the merge limit reached callback. it calls mergeLimitReached param with tableUid and param +*/ +void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReached)(uint64_t tableUid, void* param), void* param); #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 808956ff5c..18ab5fc17d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3324,26 +3324,16 @@ _error: return NULL; } -static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock* pBlock) { - int64_t nRows = 0; - void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid)); - if (pNum == NULL) { - nRows = pBlock->info.rows; - tSimpleHashPut(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &nRows, sizeof(nRows)); - } else { - *(int64_t*)pNum = *(int64_t*)pNum + pBlock->info.rows; - nRows = *(int64_t*)pNum; - } - - if (nRows >= pInfo->mergeLimit) { - if (pInfo->mSkipTables == NULL) { +static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeScanInfo) { + STableMergeScanInfo* pInfo = pTableMergeScanInfo; + if (pInfo->mSkipTables == NULL) { pInfo->mSkipTables = taosHashInit(pInfo->tableEndIndex - pInfo->tableStartIndex + 1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); - } - int bSkip = 1; - taosHashPut(pInfo->mSkipTables, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &bSkip, sizeof(bSkip)); } - return TSDB_CODE_SUCCESS; + int bSkip = 1; + if (pInfo->mSkipTables != NULL) { + taosHashPut(pInfo->mSkipTables, &uid, sizeof(uid), &bSkip, sizeof(bSkip)); + } } static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) { @@ -3459,10 +3449,6 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { } pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid); - if (pInfo->mergeLimit != -1) { - tableMergeScanDoSkipTable(pInfo, pBlock); - } - pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; return pBlock; @@ -3529,6 +3515,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit); + tsortSetMergeLimitReachedFp(pInfo->pSortHandle, tableMergeScanDoSkipTable, pInfo); tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo); tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL); @@ -3756,8 +3743,6 @@ void destroyTableMergeScanOperatorInfo(void* param) { taosArrayDestroy(pTableScanInfo->sortSourceParams); tsortDestroySortHandle(pTableScanInfo->pSortHandle); pTableScanInfo->pSortHandle = NULL; - tSimpleHashCleanup(pTableScanInfo->mTableNumRows); - pTableScanInfo->mTableNumRows = NULL; taosHashCleanup(pTableScanInfo->mSkipTables); pTableScanInfo->mSkipTables = NULL; destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI); @@ -3849,8 +3834,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order); pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo); - pInfo->mTableNumRows = tSimpleHashInit(1024, - taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); + pInfo->mergeLimit = -1; bool hasLimit = pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1; if (hasLimit) { diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 64f47baca9..db9266cb8f 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -75,6 +75,9 @@ struct SSortHandle { bool (*abortCheckFn)(void* param); void* abortCheckParam; + + void (*mergeLimitReachedFn)(uint64_t tableUid, void* param); + void* mergeLimitReachedParam; }; void tsortSetSingleTableMerge(SSortHandle* pHandle) { @@ -1056,6 +1059,9 @@ static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSH int64_t keepRows = pOrigBlk->info.rows; if (nRows >= pHandle->mergeLimit) { + if (pHandle->mergeLimitReachedFn) { + pHandle->mergeLimitReachedFn(pOrigBlk->info.id.uid, pHandle->mergeLimitReachedParam); + } keepRows = pHandle->mergeLimit - prevRows; } @@ -1651,3 +1657,8 @@ int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* ke } return ret; } + +void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReachedCb)(uint64_t tableUid, void* param), void* param) { + pHandle->mergeLimitReachedFn = mergeLimitReachedCb; + pHandle->mergeLimitReachedParam = param; +}