Merge pull request #24584 from taosdata/szhou/feat/td-28228

feat: extract rows within limit before sort to disk
This commit is contained in:
dapan1121 2024-01-29 08:47:46 +08:00 committed by GitHub
commit 5cf16dea84
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 71 additions and 30 deletions

View File

@ -204,6 +204,10 @@ void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void* param), vo
*/
int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* keyLen, const STupleHandle* pTuple);
/**
* @brief set the merge limit reached callback. it calls mergeLimitReached param with tableUid and param
*/
void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReached)(uint64_t tableUid, void* param), void* param);
#ifdef __cplusplus
}
#endif

View File

@ -3311,26 +3311,16 @@ _error:
return NULL;
}
static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock* pBlock) {
int64_t nRows = 0;
void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
if (pNum == NULL) {
nRows = pBlock->info.rows;
tSimpleHashPut(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &nRows, sizeof(nRows));
} else {
*(int64_t*)pNum = *(int64_t*)pNum + pBlock->info.rows;
nRows = *(int64_t*)pNum;
}
if (nRows >= pInfo->mergeLimit) {
static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeScanInfo) {
STableMergeScanInfo* pInfo = pTableMergeScanInfo;
if (pInfo->mSkipTables == NULL) {
pInfo->mSkipTables = taosHashInit(pInfo->tableEndIndex - pInfo->tableStartIndex + 1,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
}
int bSkip = 1;
taosHashPut(pInfo->mSkipTables, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &bSkip, sizeof(bSkip));
if (pInfo->mSkipTables != NULL) {
taosHashPut(pInfo->mSkipTables, &uid, sizeof(uid), &bSkip, sizeof(bSkip));
}
return TSDB_CODE_SUCCESS;
}
static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) {
@ -3446,10 +3436,6 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
}
pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
if (pInfo->mergeLimit != -1) {
tableMergeScanDoSkipTable(pInfo, pBlock);
}
pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
return pBlock;
@ -3516,6 +3502,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit);
tsortSetMergeLimitReachedFp(pInfo->pSortHandle, tableMergeScanDoSkipTable, pInfo);
tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo);
tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL);
@ -3743,8 +3730,6 @@ void destroyTableMergeScanOperatorInfo(void* param) {
taosArrayDestroy(pTableScanInfo->sortSourceParams);
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
pTableScanInfo->pSortHandle = NULL;
tSimpleHashCleanup(pTableScanInfo->mTableNumRows);
pTableScanInfo->mTableNumRows = NULL;
taosHashCleanup(pTableScanInfo->mSkipTables);
pTableScanInfo->mSkipTables = NULL;
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
@ -3836,8 +3821,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
pInfo->mTableNumRows = tSimpleHashInit(1024,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
pInfo->mergeLimit = -1;
bool hasLimit = pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1;
if (hasLimit) {

View File

@ -75,6 +75,9 @@ struct SSortHandle {
bool (*abortCheckFn)(void* param);
void* abortCheckParam;
void (*mergeLimitReachedFn)(uint64_t tableUid, void* param);
void* mergeLimitReachedParam;
};
void tsortSetSingleTableMerge(SSortHandle* pHandle) {
@ -1040,6 +1043,39 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
return 0;
}
static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk, bool* pExtractedBlock) {
int64_t nRows = 0;
int64_t prevRows = 0;
void* pNum = tSimpleHashGet(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid));
if (pNum == NULL) {
prevRows = 0;
nRows = pOrigBlk->info.rows;
tSimpleHashPut(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid), &nRows, sizeof(nRows));
} else {
prevRows = *(int64_t*)pNum;
*(int64_t*)pNum = *(int64_t*)pNum + pOrigBlk->info.rows;
nRows = *(int64_t*)pNum;
}
int64_t keepRows = pOrigBlk->info.rows;
if (nRows >= pHandle->mergeLimit) {
if (pHandle->mergeLimitReachedFn) {
pHandle->mergeLimitReachedFn(pOrigBlk->info.id.uid, pHandle->mergeLimitReachedParam);
}
keepRows = pHandle->mergeLimit - prevRows;
}
SSDataBlock* pBlock = NULL;
if (keepRows != pOrigBlk->info.rows) {
pBlock = blockDataExtractBlock(pOrigBlk, 0, keepRows);
*pExtractedBlock = true;
} else {
*pExtractedBlock = false;
pBlock = pOrigBlk;
}
return pBlock;
}
static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0);
size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource);
@ -1062,10 +1098,18 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
pHandle->currMergeLimitTs = INT64_MIN;
}
SSHashObj* mTableNumRows = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES);
SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
while (1) {
SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param);
int64_t p = taosGetTimestampUs();
bool bExtractedBlock = false;
if (pBlk != NULL && pHandle->mergeLimit > 0) {
pBlk = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock);
}
if (pBlk != NULL) {
SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrder->slotId);
int64_t firstRowTs = *(int64_t*)tsCol->pData;
@ -1074,6 +1118,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
continue;
}
}
if (pBlk != NULL) {
szSort += blockDataGetSize(pBlk);
@ -1081,8 +1126,11 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
if (ppBlk != NULL) {
SSDataBlock* tBlk = *(SSDataBlock**)(ppBlk);
blockDataMerge(tBlk, pBlk);
if (bExtractedBlock) {
blockDataDestroy(pBlk);
}
} else {
SSDataBlock* tBlk = createOneDataBlock(pBlk, true);
SSDataBlock* tBlk = (bExtractedBlock) ? pBlk : createOneDataBlock(pBlk, true);
tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES);
taosArrayPush(aBlkSort, &tBlk);
}
@ -1091,7 +1139,6 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) {
tSimpleHashClear(mUidBlk);
int64_t p = taosGetTimestampUs();
code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc);
if (code != TSDB_CODE_SUCCESS) {
tSimpleHashCleanup(mUidBlk);
@ -1131,7 +1178,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
}
taosArrayDestroy(aExtSrc);
tSimpleHashCleanup(mTableNumRows);
pHandle->type = SORT_SINGLESOURCE_SORT;
return TSDB_CODE_SUCCESS;
}
@ -1610,3 +1657,8 @@ int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* ke
}
return ret;
}
void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReachedCb)(uint64_t tableUid, void* param), void* param) {
pHandle->mergeLimitReachedFn = mergeLimitReachedCb;
pHandle->mergeLimitReachedParam = param;
}

View File

@ -129,6 +129,7 @@ endi
$offset = $tbNum * $rowNum
$offset = $offset - 1
print select * from $stb order by ts limit 2 offset $offset
sql select * from $stb order by ts limit 2 offset $offset
if $rows != 1 then
return -1