feat: skip tables out of limit

This commit is contained in:
shenglian zhou 2023-11-17 15:32:14 +08:00
parent ba752adc40
commit 88973c7464
2 changed files with 45 additions and 13 deletions

View File

@ -293,6 +293,9 @@ typedef struct STableMergeScanInfo {
int32_t readIdx;
SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info
SSHashObj* mTableNumRows; // uid->num of table rows
SHashObj* mSkipTables;
int64_t mergeLimit;
SSortExecInfo sortExecInfo;
} STableMergeScanInfo;

View File

@ -3255,6 +3255,24 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
if (pInfo->mergeLimit != -1) {
int64_t nRows = 0;
void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
if (pNum == NULL) {
nRows = pBlock->info.rows;
} else {
nRows += *(int64_t*)pNum + pBlock->info.rows;
}
tSimpleHashPut(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &nRows, sizeof(nRows));
if (nRows >= pInfo->mergeLimit) {
if (pInfo->mSkipTables == NULL) {
pInfo->mSkipTables = taosHashInit(pInfo->tableEndIndex - pInfo->tableStartIndex + 1,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
}
int bSkip = 1;
taosHashPut(pInfo->mSkipTables, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &bSkip, sizeof(bSkip));
}
}
pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
@ -3314,22 +3332,20 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
int32_t tableStartIdx = pInfo->tableStartIndex;
int32_t tableEndIdx = pInfo->tableEndIndex;
bool hasLimit = pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1;
int64_t mergeLimit = -1;
if (hasLimit) {
mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset;
}
tSimpleHashClear(pInfo->mTableNumRows);
size_t szRow = blockDataGetRowSize(pInfo->pResBlock);
if (hasLimit) {
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1,
NULL, pTaskInfo->id.str, mergeLimit, szRow+8, tsPQSortMemThreshold * 1024* 1024);
} else {
// if (pInfo->mergeLimit != -1) {
// pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1,
// NULL, pTaskInfo->id.str, pInfo->mergeLimit, szRow+8, tsPQSortMemThreshold * 1024* 1024);
// } else
{
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit);
tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit);
tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo);
}
@ -3341,7 +3357,8 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanSortSourceParam *param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam));
param->pOperator = pOperator;
STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, NULL);
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock,
(void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, &pInfo->mSkipTables);
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
ps->param = param;
@ -3383,6 +3400,8 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
pInfo->pSortHandle = NULL;
resetLimitInfoForNextGroup(&pInfo->limitInfo);
taosHashCleanup(pInfo->mSkipTables);
pInfo->pSortHandle = NULL;
return TSDB_CODE_SUCCESS;
}
@ -3491,7 +3510,10 @@ void destroyTableMergeScanOperatorInfo(void* param) {
taosArrayDestroy(pTableScanInfo->sortSourceParams);
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
pTableScanInfo->pSortHandle = NULL;
tSimpleHashCleanup(pTableScanInfo->mTableNumRows);
pTableScanInfo->mTableNumRows = NULL;
taosHashCleanup(pTableScanInfo->mSkipTables);
pTableScanInfo->pSortHandle = NULL;
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
@ -3581,7 +3603,14 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
pInfo->mTableNumRows = tSimpleHashInit(1024,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
pInfo->mergeLimit = -1;
bool hasLimit = pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1;
if (hasLimit) {
pInfo->mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset;
pInfo->mSkipTables = NULL;
}
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
int32_t rowSize = pInfo->pResBlock->info.rowSize;