From 0cfc81c16b16594f368390de56a85aa789fafb1b Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Wed, 12 Jul 2023 11:08:12 +0800 Subject: [PATCH] add checkpoint id for recover --- source/libs/executor/inc/querytask.h | 1 - source/libs/executor/src/timewindowoperator.c | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index c231cd6cf4..0d7c3925af 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -92,7 +92,6 @@ struct SExecTaskInfo { STaskStopInfo stopInfo; SRWLatch lock; // secure the access of STableListInfo SStorageAPI storageAPI; - int64_t checkpointId; }; void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 522922dae6..6ec14bc218 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3052,7 +3052,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, int32_t funResSize= getMaxFunResSize(&pOperator->exprSupp, numOfCols); pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, - pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pTaskInfo->checkpointId); + pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pTaskInfo->streamInfo.snapshotVer); pInfo->dataVersion = 0; pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->recvGetAll = false; @@ -5704,7 +5704,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, - pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pTaskInfo->checkpointId); + pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pTaskInfo->streamInfo.snapshotVer); setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo);