add checkpoint id for recover

This commit is contained in:
liuyao 2023-07-12 10:48:58 +08:00
parent 9484f3da94
commit 7d4bb1b932
5 changed files with 15 additions and 10 deletions

View File

@ -380,7 +380,8 @@ typedef struct SStateStore {
SStreamStateCur* (*streamStateSessionSeekKeyCurrentNext)(SStreamState* pState, const SSessionKey* key); SStreamStateCur* (*streamStateSessionSeekKeyCurrentNext)(SStreamState* pState, const SSessionKey* key);
struct SStreamFileState* (*streamFileStateInit)(int64_t memSize, uint32_t keySize, uint32_t rowSize, struct SStreamFileState* (*streamFileStateInit)(int64_t memSize, uint32_t keySize, uint32_t rowSize,
uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark, const char*id); uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark,
const char* id, int64_t ckId);
void (*streamFileStateDestroy)(struct SStreamFileState* pFileState); void (*streamFileStateDestroy)(struct SStreamFileState* pFileState);
void (*streamFileStateClear)(struct SStreamFileState* pFileState); void (*streamFileStateClear)(struct SStreamFileState* pFileState);

View File

@ -31,7 +31,8 @@ typedef struct SStreamFileState SStreamFileState;
typedef SList SStreamSnapshot; typedef SList SStreamSnapshot;
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
GetTsFun fp, void* pFile, TSKEY delMark, const char* id); GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId,
int64_t checkpointId);
void streamFileStateDestroy(SStreamFileState* pFileState); void streamFileStateDestroy(SStreamFileState* pFileState);
void streamFileStateClear(SStreamFileState* pFileState); void streamFileStateClear(SStreamFileState* pFileState);
bool needClearDiskBuff(SStreamFileState* pFileState); bool needClearDiskBuff(SStreamFileState* pFileState);
@ -44,7 +45,7 @@ bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen);
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState); SStreamSnapshot* getSnapshot(SStreamFileState* pFileState);
int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState); int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState);
int32_t recoverSnapshot(SStreamFileState* pFileState); int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId);
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list); int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list);
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark); int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark);

View File

@ -92,6 +92,7 @@ struct SExecTaskInfo {
STaskStopInfo stopInfo; STaskStopInfo stopInfo;
SRWLatch lock; // secure the access of STableListInfo SRWLatch lock; // secure the access of STableListInfo
SStorageAPI storageAPI; SStorageAPI storageAPI;
int64_t checkpointId;
}; };
void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst); void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst);

View File

@ -3050,8 +3050,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->pUpdated = NULL; pInfo->pUpdated = NULL;
pInfo->pUpdatedMap = NULL; pInfo->pUpdatedMap = NULL;
int32_t funResSize= getMaxFunResSize(&pOperator->exprSupp, numOfCols); int32_t funResSize= getMaxFunResSize(&pOperator->exprSupp, numOfCols);
pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit(
compareTs, pInfo->pState, pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo)); tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pTaskInfo->checkpointId);
pInfo->dataVersion = 0; pInfo->dataVersion = 0;
pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
pInfo->recvGetAll = false; pInfo->recvGetAll = false;
@ -5703,7 +5704,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit( pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit(
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo)); pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pTaskInfo->checkpointId);
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
pInfo, pTaskInfo); pInfo, pTaskInfo);

View File

@ -49,7 +49,8 @@ struct SStreamFileState {
typedef SRowBuffPos SRowBuffInfo; typedef SRowBuffPos SRowBuffInfo;
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
GetTsFun fp, void* pFile, TSKEY delMark, const char* idstr) { GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId,
int64_t checkpointId) {
if (memSize <= 0) { if (memSize <= 0) {
memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE; memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
} }
@ -83,9 +84,9 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
pFileState->deleteMark = delMark; pFileState->deleteMark = delMark;
pFileState->flushMark = INT64_MIN; pFileState->flushMark = INT64_MIN;
pFileState->maxTs = INT64_MIN; pFileState->maxTs = INT64_MIN;
pFileState->id = taosStrdup(idstr); pFileState->id = taosStrdup(taskId);
recoverSnapshot(pFileState); recoverSnapshot(pFileState, checkpointId);
return pFileState; return pFileState;
_error: _error:
@ -479,7 +480,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
return code; return code;
} }
int32_t recoverSnapshot(SStreamFileState* pFileState) { int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (pFileState->maxTs != INT64_MIN) { if (pFileState->maxTs != INT64_MIN) {
int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)