Merge remote-tracking branch 'origin/fix/3_liaohj' into fix/3_liaohj

This commit is contained in:
Haojun Liao 2023-07-27 16:11:07 +08:00
commit 3fdbd5383f
1 changed files with 36 additions and 36 deletions

View File

@ -1902,35 +1902,35 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
printDataBlock(pInfo->pRecoverRes, "scan recover"); printDataBlock(pInfo->pRecoverRes, "scan recover");
return pInfo->pRecoverRes; return pInfo->pRecoverRes;
} break; } break;
case STREAM_SCAN_FROM_UPDATERES: { // case STREAM_SCAN_FROM_UPDATERES: {
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); // generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); // prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; // pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
printDataBlock(pInfo->pUpdateRes, "recover update"); // printDataBlock(pInfo->pUpdateRes, "recover update");
return pInfo->pUpdateRes; // return pInfo->pUpdateRes;
} break; // } break;
case STREAM_SCAN_FROM_DELETE_DATA: { // case STREAM_SCAN_FROM_DELETE_DATA: {
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); // generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); // prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; // pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); // copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; // pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
printDataBlock(pInfo->pDeleteDataRes, "recover delete"); // printDataBlock(pInfo->pDeleteDataRes, "recover delete");
return pInfo->pDeleteDataRes; // return pInfo->pDeleteDataRes;
} break; // } break;
case STREAM_SCAN_FROM_DATAREADER_RANGE: { // case STREAM_SCAN_FROM_DATAREADER_RANGE: {
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); // SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
if (pSDB) { // if (pSDB) {
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; // STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; // pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
checkUpdateData(pInfo, true, pSDB, false); // checkUpdateData(pInfo, true, pSDB, false);
printDataBlock(pSDB, "scan recover update"); // printDataBlock(pSDB, "scan recover update");
calBlockTbName(pInfo, pSDB); // calBlockTbName(pInfo, pSDB);
return pSDB; // return pSDB;
} // }
blockDataCleanup(pInfo->pUpdateDataRes); // blockDataCleanup(pInfo->pUpdateDataRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; // pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} break; // } break;
default: default:
break; break;
} }
@ -1939,13 +1939,13 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pInfo->pRecoverRes != NULL) { if (pInfo->pRecoverRes != NULL) {
calBlockTbName(pInfo, pInfo->pRecoverRes); calBlockTbName(pInfo, pInfo->pRecoverRes);
if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) { if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) { // if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {
TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex); TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
} else { // } else {
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pStreamInfo->fillHistoryVer.maxVer); // pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pStreamInfo->fillHistoryVer.maxVer);
doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes); // doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
} // }
} }
if (pInfo->pCreateTbRes->info.rows > 0) { if (pInfo->pCreateTbRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_RES; pInfo->scanMode = STREAM_SCAN_FROM_RES;