fix: when rows is greater than merge limit, skip the block

This commit is contained in:
slzhou 2024-02-02 13:17:13 +08:00
parent 2bfdc88e21
commit 3d4764b383
1 changed files with 14 additions and 6 deletions

View File

@ -1043,7 +1043,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
return 0; return 0;
} }
static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk, bool* pExtractedBlock) { static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk, bool* pExtractedBlock, bool *pSkipBlock) {
int64_t nRows = 0; int64_t nRows = 0;
int64_t prevRows = 0; int64_t prevRows = 0;
void* pNum = tSimpleHashGet(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid)); void* pNum = tSimpleHashGet(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid));
@ -1062,9 +1062,15 @@ static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSH
if (pHandle->mergeLimitReachedFn) { if (pHandle->mergeLimitReachedFn) {
pHandle->mergeLimitReachedFn(pOrigBlk->info.id.uid, pHandle->mergeLimitReachedParam); pHandle->mergeLimitReachedFn(pOrigBlk->info.id.uid, pHandle->mergeLimitReachedParam);
} }
keepRows = pHandle->mergeLimit - prevRows; keepRows = pHandle->mergeLimit > prevRows ? (pHandle->mergeLimit - prevRows) : 0;
} }
if (keepRows == 0) {
*pSkipBlock = true;
return pOrigBlk;
}
*pSkipBlock = false;
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
if (keepRows != pOrigBlk->info.rows) { if (keepRows != pOrigBlk->info.rows) {
pBlock = blockDataExtractBlock(pOrigBlk, 0, keepRows); pBlock = blockDataExtractBlock(pOrigBlk, 0, keepRows);
@ -1106,8 +1112,12 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
int64_t p = taosGetTimestampUs(); int64_t p = taosGetTimestampUs();
bool bExtractedBlock = false; bool bExtractedBlock = false;
bool bSkipBlock = false;
if (pBlk != NULL && pHandle->mergeLimit > 0) { if (pBlk != NULL && pHandle->mergeLimit > 0) {
pBlk = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock); pBlk = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock, &bSkipBlock);
if (bSkipBlock) {
continue;
}
} }
if (pBlk != NULL) { if (pBlk != NULL) {
@ -1121,7 +1131,6 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
if (pBlk != NULL) { if (pBlk != NULL) {
szSort += blockDataGetSize(pBlk); szSort += blockDataGetSize(pBlk);
void* ppBlk = tSimpleHashGet(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid)); void* ppBlk = tSimpleHashGet(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid));
if (ppBlk != NULL) { if (ppBlk != NULL) {
SSDataBlock* tBlk = *(SSDataBlock**)(ppBlk); SSDataBlock* tBlk = *(SSDataBlock**)(ppBlk);
@ -1138,7 +1147,6 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) { if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) {
tSimpleHashClear(mUidBlk); tSimpleHashClear(mUidBlk);
code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tSimpleHashCleanup(mUidBlk); tSimpleHashCleanup(mUidBlk);