feat: extract rows within limit before sort to disk

This commit is contained in:
shenglian zhou 2024-01-22 15:18:49 +08:00
parent 3980e1d955
commit 3f441bb8cf
1 changed files with 36 additions and 2 deletions

View File

@ -1040,6 +1040,33 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
return 0; return 0;
} }
static SSDataBlock* getBlockWithinLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk) {
int64_t keepRows = pOrigBlk->info.rows;
int64_t nRows = 0;
int64_t prevRows = 0;
void* pNum = tSimpleHashGet(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid));
if (pNum == NULL) {
prevRows = 0;
nRows = pOrigBlk->info.rows;
tSimpleHashPut(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid), &nRows, sizeof(nRows));
} else {
prevRows = *(int64_t*)pNum;
*(int64_t*)pNum = *(int64_t*)pNum + pOrigBlk->info.rows;
nRows = *(int64_t*)pNum;
}
if (nRows >= pHandle->mergeLimit) {
keepRows = pHandle->mergeLimit - prevRows;
}
SSDataBlock* pBlock = NULL;
if (keepRows != pOrigBlk->info.rows) {
pBlock = blockDataExtractBlock(pOrigBlk, 0, keepRows);
} else {
pBlock = createOneDataBlock(pOrigBlk, true);
}
return pBlock;
}
static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0); SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0);
size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource);
@ -1062,10 +1089,17 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
pHandle->currMergeLimitTs = INT64_MIN; pHandle->currMergeLimitTs = INT64_MIN;
} }
SSHashObj* mTableNumRows = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES); SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES);
SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
while (1) { while (1) {
SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param);
int64_t p = taosGetTimestampUs();
if (pBlk != NULL && pHandle->mergeLimit != -1) {
pBlk = getBlockWithinLimit(pHandle, mTableNumRows, pBlk);
}
if (pBlk != NULL) { if (pBlk != NULL) {
SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrder->slotId); SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrder->slotId);
int64_t firstRowTs = *(int64_t*)tsCol->pData; int64_t firstRowTs = *(int64_t*)tsCol->pData;
@ -1074,6 +1108,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
continue; continue;
} }
} }
if (pBlk != NULL) { if (pBlk != NULL) {
szSort += blockDataGetSize(pBlk); szSort += blockDataGetSize(pBlk);
@ -1091,7 +1126,6 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) { if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) {
tSimpleHashClear(mUidBlk); tSimpleHashClear(mUidBlk);
int64_t p = taosGetTimestampUs();
code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tSimpleHashCleanup(mUidBlk); tSimpleHashCleanup(mUidBlk);
@ -1131,7 +1165,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
} }
taosArrayDestroy(aExtSrc); taosArrayDestroy(aExtSrc);
tSimpleHashCleanup(mTableNumRows);
pHandle->type = SORT_SINGLESOURCE_SORT; pHandle->type = SORT_SINGLESOURCE_SORT;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }