From 7c9898cfa51495ea49dd0556057a5911a4d5c476 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 27 Jul 2023 15:56:06 +0800 Subject: [PATCH] adj stream scan --- source/libs/executor/src/scanoperator.c | 72 ++++++++++++------------- 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5f39204974..85c7024932 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1902,35 +1902,35 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { printDataBlock(pInfo->pRecoverRes, "scan recover"); return pInfo->pRecoverRes; } break; - case STREAM_SCAN_FROM_UPDATERES: { - generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); - prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; - printDataBlock(pInfo->pUpdateRes, "recover update"); - return pInfo->pUpdateRes; - } break; - case STREAM_SCAN_FROM_DELETE_DATA: { - generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); - prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; - copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); - pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; - printDataBlock(pInfo->pDeleteDataRes, "recover delete"); - return pInfo->pDeleteDataRes; - } break; - case STREAM_SCAN_FROM_DATAREADER_RANGE: { - SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); - if (pSDB) { - STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; - checkUpdateData(pInfo, true, pSDB, false); - printDataBlock(pSDB, "scan recover update"); - calBlockTbName(pInfo, pSDB); - return pSDB; - } - blockDataCleanup(pInfo->pUpdateDataRes); - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - } break; + // case STREAM_SCAN_FROM_UPDATERES: { + // generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); + // prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); + // pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; + // printDataBlock(pInfo->pUpdateRes, "recover update"); + // return pInfo->pUpdateRes; + // } break; + // case STREAM_SCAN_FROM_DELETE_DATA: { + // generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); + // prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); + // pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; + // copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); + // pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; + // printDataBlock(pInfo->pDeleteDataRes, "recover delete"); + // return pInfo->pDeleteDataRes; + // } break; + // case STREAM_SCAN_FROM_DATAREADER_RANGE: { + // SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); + // if (pSDB) { + // STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; + // pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; + // checkUpdateData(pInfo, true, pSDB, false); + // printDataBlock(pSDB, "scan recover update"); + // calBlockTbName(pInfo, pSDB); + // return pSDB; + // } + // blockDataCleanup(pInfo->pUpdateDataRes); + // pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + // } break; default: break; } @@ -1939,13 +1939,13 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { if (pInfo->pRecoverRes != NULL) { calBlockTbName(pInfo, pInfo->pRecoverRes); if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) { - if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) { - TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex); - pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); - } else { - pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pStreamInfo->fillHistoryVer.maxVer); - doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes); - } + // if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) { + TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); + // } else { + // pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pStreamInfo->fillHistoryVer.maxVer); + // doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes); + // } } if (pInfo->pCreateTbRes->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_RES;