diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6acd8589fb..28832ffec8 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3200,6 +3200,27 @@ _error: return NULL; } +static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock* pBlock) { + int64_t nRows = 0; + void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid)); + if (pNum == NULL) { + nRows = pBlock->info.rows; + tSimpleHashPut(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &nRows, sizeof(nRows)); + } else { + *(int64_t*)pNum = *(int64_t*)pNum + pBlock->info.rows; + } + + if (nRows >= pInfo->mergeLimit) { + if (pInfo->mSkipTables == NULL) { + pInfo->mSkipTables = taosHashInit(pInfo->tableEndIndex - pInfo->tableStartIndex + 1, + taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); + } + int bSkip = 1; + taosHashPut(pInfo->mSkipTables, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &bSkip, sizeof(bSkip)); + } + return TSDB_CODE_SUCCESS; +} + static SSDataBlock* getBlockForTableMergeScan(void* param) { STableMergeScanSortSourceParam* source = param; SOperatorInfo* pOperator = source->pOperator; @@ -3258,23 +3279,9 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid); if (pInfo->mergeLimit != -1) { - int64_t nRows = 0; - void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid)); - if (pNum == NULL) { - nRows = pBlock->info.rows; - } else { - nRows += *(int64_t*)pNum + pBlock->info.rows; - } - tSimpleHashPut(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &nRows, sizeof(nRows)); - if (nRows >= pInfo->mergeLimit) { - if (pInfo->mSkipTables == NULL) { - pInfo->mSkipTables = taosHashInit(pInfo->tableEndIndex - pInfo->tableStartIndex + 1, - taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); - } - int bSkip = 1; - taosHashPut(pInfo->mSkipTables, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &bSkip, sizeof(bSkip)); - } + tableMergeScanDoSkipTable(pInfo, pBlock); } + pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;