fix: in-place update when hash memember is found

This commit is contained in:
slzhou 2023-11-21 10:14:37 +08:00
parent f969579f7f
commit e9ac378d2c
1 changed files with 23 additions and 16 deletions

View File

@ -3200,6 +3200,27 @@ _error:
return NULL; return NULL;
} }
static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock* pBlock) {
int64_t nRows = 0;
void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
if (pNum == NULL) {
nRows = pBlock->info.rows;
tSimpleHashPut(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &nRows, sizeof(nRows));
} else {
*(int64_t*)pNum = *(int64_t*)pNum + pBlock->info.rows;
}
if (nRows >= pInfo->mergeLimit) {
if (pInfo->mSkipTables == NULL) {
pInfo->mSkipTables = taosHashInit(pInfo->tableEndIndex - pInfo->tableStartIndex + 1,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
}
int bSkip = 1;
taosHashPut(pInfo->mSkipTables, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &bSkip, sizeof(bSkip));
}
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* getBlockForTableMergeScan(void* param) { static SSDataBlock* getBlockForTableMergeScan(void* param) {
STableMergeScanSortSourceParam* source = param; STableMergeScanSortSourceParam* source = param;
SOperatorInfo* pOperator = source->pOperator; SOperatorInfo* pOperator = source->pOperator;
@ -3258,23 +3279,9 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid); pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
if (pInfo->mergeLimit != -1) { if (pInfo->mergeLimit != -1) {
int64_t nRows = 0; tableMergeScanDoSkipTable(pInfo, pBlock);
void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
if (pNum == NULL) {
nRows = pBlock->info.rows;
} else {
nRows += *(int64_t*)pNum + pBlock->info.rows;
}
tSimpleHashPut(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &nRows, sizeof(nRows));
if (nRows >= pInfo->mergeLimit) {
if (pInfo->mSkipTables == NULL) {
pInfo->mSkipTables = taosHashInit(pInfo->tableEndIndex - pInfo->tableStartIndex + 1,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
}
int bSkip = 1;
taosHashPut(pInfo->mSkipTables, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &bSkip, sizeof(bSkip));
}
} }
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;