diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 2d20562a6c..3819dd7ca8 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -380,7 +380,8 @@ typedef struct SStateStore { SStreamStateCur* (*streamStateSessionSeekKeyCurrentNext)(SStreamState* pState, const SSessionKey* key); struct SStreamFileState* (*streamFileStateInit)(int64_t memSize, uint32_t keySize, uint32_t rowSize, - uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark, const char*id); + uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark, + const char* id, int64_t ckId); void (*streamFileStateDestroy)(struct SStreamFileState* pFileState); void (*streamFileStateClear)(struct SStreamFileState* pFileState); diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index b2255013ca..052231fe39 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -31,7 +31,8 @@ typedef struct SStreamFileState SStreamFileState; typedef SList SStreamSnapshot; SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, - GetTsFun fp, void* pFile, TSKEY delMark, const char* id); + GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, + int64_t checkpointId); void streamFileStateDestroy(SStreamFileState* pFileState); void streamFileStateClear(SStreamFileState* pFileState); bool needClearDiskBuff(SStreamFileState* pFileState); @@ -44,7 +45,7 @@ bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen); SStreamSnapshot* getSnapshot(SStreamFileState* pFileState); int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState); -int32_t recoverSnapshot(SStreamFileState* pFileState); +int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId); int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list); int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark); diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index 0d7c3925af..c231cd6cf4 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -92,6 +92,7 @@ struct SExecTaskInfo { STaskStopInfo stopInfo; SRWLatch lock; // secure the access of STableListInfo SStorageAPI storageAPI; + int64_t checkpointId; }; void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 7d90c7e644..522922dae6 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3050,8 +3050,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; int32_t funResSize= getMaxFunResSize(&pOperator->exprSupp, numOfCols); - pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, - compareTs, pInfo->pState, pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo)); + pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit( + tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, + pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pTaskInfo->checkpointId); pInfo->dataVersion = 0; pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->recvGetAll = false; @@ -5703,7 +5704,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, - pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo)); + pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pTaskInfo->checkpointId); setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index dd857141c1..063c15e4f3 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -49,7 +49,8 @@ struct SStreamFileState { typedef SRowBuffPos SRowBuffInfo; SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, - GetTsFun fp, void* pFile, TSKEY delMark, const char* idstr) { + GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, + int64_t checkpointId) { if (memSize <= 0) { memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE; } @@ -83,9 +84,9 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->deleteMark = delMark; pFileState->flushMark = INT64_MIN; pFileState->maxTs = INT64_MIN; - pFileState->id = taosStrdup(idstr); + pFileState->id = taosStrdup(taskId); - recoverSnapshot(pFileState); + recoverSnapshot(pFileState, checkpointId); return pFileState; _error: @@ -479,7 +480,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { return code; } -int32_t recoverSnapshot(SStreamFileState* pFileState) { +int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { int32_t code = TSDB_CODE_SUCCESS; if (pFileState->maxTs != INT64_MIN) { int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)