diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index c231cd6cf4..0d7c3925af 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -92,7 +92,6 @@ struct SExecTaskInfo { STaskStopInfo stopInfo; SRWLatch lock; // secure the access of STableListInfo SStorageAPI storageAPI; - int64_t checkpointId; }; void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 522922dae6..6ec14bc218 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3052,7 +3052,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, int32_t funResSize= getMaxFunResSize(&pOperator->exprSupp, numOfCols); pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, - pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pTaskInfo->checkpointId); + pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pTaskInfo->streamInfo.snapshotVer); pInfo->dataVersion = 0; pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->recvGetAll = false; @@ -5704,7 +5704,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, - pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pTaskInfo->checkpointId); + pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pTaskInfo->streamInfo.snapshotVer); setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo);