scan operator checkpoinbt

This commit is contained in:
liuyao 2023-07-10 10:14:30 +08:00
parent ed1181a1d1
commit fa8025f337
1 changed files with 16 additions and 3 deletions

View File

@ -1999,9 +1999,7 @@ FETCH_NEXT_BLOCK:
}
} break;
case STREAM_CHECKPOINT: {
streamScanOperatorSaveCheckpoint(pInfo);
pAPI->stateStore.streamStateCommit(pInfo->pState);
pAPI->stateStore.streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark);
qError("stream check point error. msg type: STREAM_INPUT__DATA_BLOCK");
} break;
default:
break;
@ -2172,6 +2170,21 @@ FETCH_NEXT_BLOCK:
}
goto NEXT_SUBMIT_BLK;
} else if (pInfo->blockType == STREAM_INPUT__CHECKPOINT) {
if (pInfo->validBlockIndex >= total) {
doClearBufferedBlocks(pInfo);
return NULL;
}
int32_t current = pInfo->validBlockIndex++;
qDebug("process %d/%d input data blocks, %s", current, (int32_t) total, id);
SSDataBlock* pBlock = taosArrayGet(pInfo->pBlockLists, current);
if (pBlock->info.type == STREAM_CHECKPOINT) {
streamScanOperatorSaveCheckpoint(pInfo);
pAPI->stateStore.streamStateCommit(pInfo->pState);
pAPI->stateStore.streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark);
}
}
return NULL;