diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 55c1b89c30..474ac5f98d 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3052,7 +3052,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, int32_t funResSize= getMaxFunResSize(&pOperator->exprSupp, numOfCols); pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, - pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pTaskInfo->streamInfo.snapshotVer); + pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId); pInfo->dataVersion = 0; pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->recvGetAll = false; @@ -5704,7 +5704,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, - pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pTaskInfo->streamInfo.snapshotVer); + pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId); setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo);