From fa8025f3375ce0e63f64b424efce098d9d5aa140 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Mon, 10 Jul 2023 10:14:30 +0800 Subject: [PATCH] scan operator checkpoinbt --- source/libs/executor/src/scanoperator.c | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 220740096d..dc800dd38b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1999,9 +1999,7 @@ FETCH_NEXT_BLOCK: } } break; case STREAM_CHECKPOINT: { - streamScanOperatorSaveCheckpoint(pInfo); - pAPI->stateStore.streamStateCommit(pInfo->pState); - pAPI->stateStore.streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); + qError("stream check point error. msg type: STREAM_INPUT__DATA_BLOCK"); } break; default: break; @@ -2172,6 +2170,21 @@ FETCH_NEXT_BLOCK: } goto NEXT_SUBMIT_BLK; + } else if (pInfo->blockType == STREAM_INPUT__CHECKPOINT) { + if (pInfo->validBlockIndex >= total) { + doClearBufferedBlocks(pInfo); + return NULL; + } + + int32_t current = pInfo->validBlockIndex++; + qDebug("process %d/%d input data blocks, %s", current, (int32_t) total, id); + + SSDataBlock* pBlock = taosArrayGet(pInfo->pBlockLists, current); + if (pBlock->info.type == STREAM_CHECKPOINT) { + streamScanOperatorSaveCheckpoint(pInfo); + pAPI->stateStore.streamStateCommit(pInfo->pState); + pAPI->stateStore.streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); + } } return NULL;