feat: remove limit reached from merge scan operator

This commit is contained in:
slzhou 2024-01-24 11:37:47 +08:00
parent 70f869ce33
commit 57a9ac75a8
3 changed files with 24 additions and 25 deletions

View File

@ -204,6 +204,10 @@ void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void* param), vo
*/
int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* keyLen, const STupleHandle* pTuple);
/**
* @brief set the merge limit reached callback. it calls mergeLimitReached param with tableUid and param
*/
void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReached)(uint64_t tableUid, void* param), void* param);
#ifdef __cplusplus
}
#endif

View File

@ -3324,26 +3324,16 @@ _error:
return NULL;
}
static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock* pBlock) {
int64_t nRows = 0;
void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
if (pNum == NULL) {
nRows = pBlock->info.rows;
tSimpleHashPut(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &nRows, sizeof(nRows));
} else {
*(int64_t*)pNum = *(int64_t*)pNum + pBlock->info.rows;
nRows = *(int64_t*)pNum;
}
if (nRows >= pInfo->mergeLimit) {
static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeScanInfo) {
STableMergeScanInfo* pInfo = pTableMergeScanInfo;
if (pInfo->mSkipTables == NULL) {
pInfo->mSkipTables = taosHashInit(pInfo->tableEndIndex - pInfo->tableStartIndex + 1,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
}
int bSkip = 1;
taosHashPut(pInfo->mSkipTables, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &bSkip, sizeof(bSkip));
if (pInfo->mSkipTables != NULL) {
taosHashPut(pInfo->mSkipTables, &uid, sizeof(uid), &bSkip, sizeof(bSkip));
}
return TSDB_CODE_SUCCESS;
}
static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) {
@ -3459,10 +3449,6 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
}
pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
if (pInfo->mergeLimit != -1) {
tableMergeScanDoSkipTable(pInfo, pBlock);
}
pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
return pBlock;
@ -3529,6 +3515,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit);
tsortSetMergeLimitReachedFp(pInfo->pSortHandle, tableMergeScanDoSkipTable, pInfo);
tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo);
tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL);
@ -3756,8 +3743,6 @@ void destroyTableMergeScanOperatorInfo(void* param) {
taosArrayDestroy(pTableScanInfo->sortSourceParams);
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
pTableScanInfo->pSortHandle = NULL;
tSimpleHashCleanup(pTableScanInfo->mTableNumRows);
pTableScanInfo->mTableNumRows = NULL;
taosHashCleanup(pTableScanInfo->mSkipTables);
pTableScanInfo->mSkipTables = NULL;
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
@ -3849,8 +3834,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
pInfo->mTableNumRows = tSimpleHashInit(1024,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
pInfo->mergeLimit = -1;
bool hasLimit = pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1;
if (hasLimit) {

View File

@ -75,6 +75,9 @@ struct SSortHandle {
bool (*abortCheckFn)(void* param);
void* abortCheckParam;
void (*mergeLimitReachedFn)(uint64_t tableUid, void* param);
void* mergeLimitReachedParam;
};
void tsortSetSingleTableMerge(SSortHandle* pHandle) {
@ -1056,6 +1059,9 @@ static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSH
int64_t keepRows = pOrigBlk->info.rows;
if (nRows >= pHandle->mergeLimit) {
if (pHandle->mergeLimitReachedFn) {
pHandle->mergeLimitReachedFn(pOrigBlk->info.id.uid, pHandle->mergeLimitReachedParam);
}
keepRows = pHandle->mergeLimit - prevRows;
}
@ -1651,3 +1657,8 @@ int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* ke
}
return ret;
}
void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReachedCb)(uint64_t tableUid, void* param), void* param) {
pHandle->mergeLimitReachedFn = mergeLimitReachedCb;
pHandle->mergeLimitReachedParam = param;
}