scan multiple range
This commit is contained in:
parent
8992721e20
commit
259815fd83
|
@ -1183,14 +1183,19 @@ static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t
|
||||||
|
|
||||||
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
|
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
|
||||||
qInfo("do stream range scan. windows index:%d", *pRowIndex);
|
qInfo("do stream range scan. windows index:%d", *pRowIndex);
|
||||||
|
bool prepareRes = true;
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pResult = NULL;
|
SSDataBlock* pResult = NULL;
|
||||||
pResult = doTableScan(pInfo->pTableScanOp);
|
pResult = doTableScan(pInfo->pTableScanOp);
|
||||||
if (!pResult && prepareRangeScan(pInfo, pSDB, pRowIndex)) {
|
if (!pResult) {
|
||||||
|
prepareRes = prepareRangeScan(pInfo, pSDB, pRowIndex);
|
||||||
// scan next window data
|
// scan next window data
|
||||||
pResult = doTableScan(pInfo->pTableScanOp);
|
pResult = doTableScan(pInfo->pTableScanOp);
|
||||||
}
|
}
|
||||||
if (!pResult) {
|
if (!pResult) {
|
||||||
|
if (prepareRes) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
blockDataCleanup(pSDB);
|
blockDataCleanup(pSDB);
|
||||||
*pRowIndex = 0;
|
*pRowIndex = 0;
|
||||||
pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
||||||
|
@ -2057,7 +2062,7 @@ FETCH_NEXT_BLOCK:
|
||||||
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
|
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
|
||||||
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
||||||
checkUpdateData(pInfo, true, pSDB, false);
|
checkUpdateData(pInfo, true, pSDB, false);
|
||||||
// printDataBlock(pSDB, "stream scan update");
|
printDataBlock(pSDB, "stream scan update");
|
||||||
calBlockTbName(pInfo, pSDB);
|
calBlockTbName(pInfo, pSDB);
|
||||||
return pSDB;
|
return pSDB;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue