Merge pull request #22171 from taosdata/szhou/enhance-limit-proc
enhance: skip data blocks with the merge limit ts when create initial…
This commit is contained in:
commit
10f9c40dba
|
@ -51,6 +51,7 @@ struct SSortHandle {
|
||||||
uint32_t tmpRowIdx;
|
uint32_t tmpRowIdx;
|
||||||
|
|
||||||
int64_t mergeLimit;
|
int64_t mergeLimit;
|
||||||
|
int64_t currMergeLimitTs;
|
||||||
|
|
||||||
int32_t sourceId;
|
int32_t sourceId;
|
||||||
SSDataBlock* pDataBlock;
|
SSDataBlock* pDataBlock;
|
||||||
|
@ -921,7 +922,8 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
|
||||||
int32_t nMergedRows = 0;
|
int32_t nMergedRows = 0;
|
||||||
bool mergeLimitReached = false;
|
bool mergeLimitReached = false;
|
||||||
size_t blkPgSz = pgHeaderSz;
|
size_t blkPgSz = pgHeaderSz;
|
||||||
|
int64_t lastPageBufTs = (order->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
|
||||||
|
int64_t currTs = (order->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
|
||||||
while (nRows < totalRows) {
|
while (nRows < totalRows) {
|
||||||
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
|
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
|
||||||
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
|
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
|
||||||
|
@ -929,14 +931,21 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
|
||||||
int32_t bufInc = getPageBufIncForRow(minBlk, minRow, pHandle->pDataBlock->info.rows);
|
int32_t bufInc = getPageBufIncForRow(minBlk, minRow, pHandle->pDataBlock->info.rows);
|
||||||
|
|
||||||
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
|
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
|
||||||
|
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, order->slotId);
|
||||||
|
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
|
||||||
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
||||||
nMergedRows += pHandle->pDataBlock->info.rows;
|
nMergedRows += pHandle->pDataBlock->info.rows;
|
||||||
|
|
||||||
blockDataCleanup(pHandle->pDataBlock);
|
blockDataCleanup(pHandle->pDataBlock);
|
||||||
blkPgSz = pgHeaderSz;
|
blkPgSz = pgHeaderSz;
|
||||||
bufInc = getPageBufIncForRow(minBlk, minRow, 0);
|
bufInc = getPageBufIncForRow(minBlk, minRow, 0);
|
||||||
|
|
||||||
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
|
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
|
||||||
mergeLimitReached = true;
|
mergeLimitReached = true;
|
||||||
|
if ((lastPageBufTs < pHandle->currMergeLimitTs && order->order == TSDB_ORDER_ASC) ||
|
||||||
|
(lastPageBufTs > pHandle->currMergeLimitTs && order->order == TSDB_ORDER_DESC)) {
|
||||||
|
pHandle->currMergeLimitTs = lastPageBufTs;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -955,8 +964,17 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
|
||||||
}
|
}
|
||||||
if (pHandle->pDataBlock->info.rows > 0) {
|
if (pHandle->pDataBlock->info.rows > 0) {
|
||||||
if (!mergeLimitReached) {
|
if (!mergeLimitReached) {
|
||||||
|
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, order->slotId);
|
||||||
|
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
|
||||||
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
||||||
nMergedRows += pHandle->pDataBlock->info.rows;
|
nMergedRows += pHandle->pDataBlock->info.rows;
|
||||||
|
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
|
||||||
|
mergeLimitReached = true;
|
||||||
|
if ((lastPageBufTs < pHandle->currMergeLimitTs && order->order == TSDB_ORDER_ASC) ||
|
||||||
|
(lastPageBufTs > pHandle->currMergeLimitTs && order->order == TSDB_ORDER_DESC)) {
|
||||||
|
pHandle->currMergeLimitTs = lastPageBufTs;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
blockDataCleanup(pHandle->pDataBlock);
|
blockDataCleanup(pHandle->pDataBlock);
|
||||||
}
|
}
|
||||||
|
@ -982,11 +1000,24 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
||||||
SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0);
|
SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0);
|
||||||
int32_t szSort = 0;
|
int32_t szSort = 0;
|
||||||
|
|
||||||
|
if (pOrder->order == TSDB_ORDER_ASC) {
|
||||||
|
pHandle->currMergeLimitTs = INT64_MAX;
|
||||||
|
} else {
|
||||||
|
pHandle->currMergeLimitTs = INT64_MIN;
|
||||||
|
}
|
||||||
|
|
||||||
SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES);
|
SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES);
|
||||||
SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
|
SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param);
|
SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param);
|
||||||
|
if (pBlk != NULL) {
|
||||||
|
SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrder->slotId);
|
||||||
|
int64_t firstRowTs = *(int64_t*)tsCol->pData;
|
||||||
|
if ((pOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
|
||||||
|
(pOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
if (pBlk != NULL) {
|
if (pBlk != NULL) {
|
||||||
szSort += blockDataGetSize(pBlk);
|
szSort += blockDataGetSize(pBlk);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue