From 88973c7464b2685c3d87be05c1d42ed410c4468a Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Fri, 17 Nov 2023 15:32:14 +0800 Subject: [PATCH] feat: skip tables out of limit --- source/libs/executor/inc/executorInt.h | 3 ++ source/libs/executor/src/scanoperator.c | 55 +++++++++++++++++++------ 2 files changed, 45 insertions(+), 13 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 331ce44366..41ffc26cd2 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -293,6 +293,9 @@ typedef struct STableMergeScanInfo { int32_t readIdx; SSDataBlock* pResBlock; SSampleExecInfo sample; // sample execution info + SSHashObj* mTableNumRows; // uid->num of table rows + SHashObj* mSkipTables; + int64_t mergeLimit; SSortExecInfo sortExecInfo; } STableMergeScanInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c47e14ad0d..4b2b7889d2 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3255,6 +3255,24 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid); + if (pInfo->mergeLimit != -1) { + int64_t nRows = 0; + void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid)); + if (pNum == NULL) { + nRows = pBlock->info.rows; + } else { + nRows += *(int64_t*)pNum + pBlock->info.rows; + } + tSimpleHashPut(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &nRows, sizeof(nRows)); + if (nRows >= pInfo->mergeLimit) { + if (pInfo->mSkipTables == NULL) { + pInfo->mSkipTables = taosHashInit(pInfo->tableEndIndex - pInfo->tableStartIndex + 1, + taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); + } + int bSkip = 1; + taosHashPut(pInfo->mSkipTables, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &bSkip, sizeof(bSkip)); + } + } pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; @@ -3314,22 +3332,20 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t tableStartIdx = pInfo->tableStartIndex; int32_t tableEndIdx = pInfo->tableEndIndex; - bool hasLimit = pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1; - int64_t mergeLimit = -1; - if (hasLimit) { - mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset; - } + tSimpleHashClear(pInfo->mTableNumRows); + size_t szRow = blockDataGetRowSize(pInfo->pResBlock); - if (hasLimit) { - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, - NULL, pTaskInfo->id.str, mergeLimit, szRow+8, tsPQSortMemThreshold * 1024* 1024); - } else { +// if (pInfo->mergeLimit != -1) { +// pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, +// NULL, pTaskInfo->id.str, pInfo->mergeLimit, szRow+8, tsPQSortMemThreshold * 1024* 1024); +// } else + { pInfo->sortBufSize = 2048 * pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); - tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit); + tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit); tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo); } @@ -3341,7 +3357,8 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanSortSourceParam *param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam)); param->pOperator = pOperator; STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx); - pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, NULL); + pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, + (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, &pInfo->mSkipTables); SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); ps->param = param; @@ -3383,6 +3400,8 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->pSortHandle = NULL; resetLimitInfoForNextGroup(&pInfo->limitInfo); + taosHashCleanup(pInfo->mSkipTables); + pInfo->pSortHandle = NULL; return TSDB_CODE_SUCCESS; } @@ -3491,7 +3510,10 @@ void destroyTableMergeScanOperatorInfo(void* param) { taosArrayDestroy(pTableScanInfo->sortSourceParams); tsortDestroySortHandle(pTableScanInfo->pSortHandle); pTableScanInfo->pSortHandle = NULL; - + tSimpleHashCleanup(pTableScanInfo->mTableNumRows); + pTableScanInfo->mTableNumRows = NULL; + taosHashCleanup(pTableScanInfo->mSkipTables); + pTableScanInfo->pSortHandle = NULL; destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI); pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock); @@ -3581,7 +3603,14 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order); pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo); - + pInfo->mTableNumRows = tSimpleHashInit(1024, + taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); + pInfo->mergeLimit = -1; + bool hasLimit = pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1; + if (hasLimit) { + pInfo->mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset; + pInfo->mSkipTables = NULL; + } pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false); int32_t rowSize = pInfo->pResBlock->info.rowSize;