From c73a003ee316ad2f5cb2fba2d360ea5424ded5a0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 2 Jul 2024 19:13:20 +0800 Subject: [PATCH] fix(stream): set the initial checkpointId for a new created stream. --- source/dnode/mnode/impl/inc/mndStream.h | 1 + source/dnode/mnode/impl/src/mndStream.c | 3 ++- source/dnode/mnode/impl/src/mndStreamUtil.c | 15 +++++++++++++++ 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 69f430778b..b261f89057 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -153,6 +153,7 @@ bool mndAllTaskSendCheckpointId(SCheckpointConsensusInfo *pInfo, int32_t numO void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo); int32_t doSendConsensusCheckpointRsp(SRestoreCheckpointInfo *pInfo, SRpcMsg *pMsg, int64_t checkpointId); int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId); +int32_t mndRegisterConsensusChkptId(SHashObj* pHash, int64_t streamId); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index fd9daf57db..a137c10ed5 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -801,8 +801,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { // add into buffer firstly // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already. taosThreadMutexLock(&execInfo.lock); - mDebug("stream stream:%s start to register tasks into task nodeList", createReq.name); + mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name); saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo); + mndRegisterConsensusChkptId(execInfo.pStreamConsensus, streamObj.uid); taosThreadMutexUnlock(&execInfo.lock); // execute creation diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 66d32f53eb..8fb5bc8a99 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -930,6 +930,8 @@ bool mndAllTaskSendCheckpointId(SCheckpointConsensusInfo* pInfo, int32_t numOfTa int64_t mndGetConsensusCheckpointId(SCheckpointConsensusInfo* pInfo, SStreamObj* pStream) { if (pInfo->genTs > 0) { // there is no checkpoint ever generated if the checkpointId is 0. + mDebug("existed consensus-checkpointId:%" PRId64 " for stream:0x%" PRIx64 " %s exist, and return", + pInfo->checkpointId, pStream->uid, pStream->name); return pInfo->checkpointId; } @@ -965,3 +967,16 @@ int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) { numOfStreams); return TSDB_CODE_SUCCESS; } + +int32_t mndRegisterConsensusChkptId(SHashObj* pHash, int64_t streamId) { + void* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId)); + ASSERT(pInfo == NULL); + + SCheckpointConsensusInfo p = {.genTs = taosGetTimestampMs(), .checkpointId = 0, .pTaskList = NULL}; + taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p)); + + SCheckpointConsensusInfo* pChkptInfo = (SCheckpointConsensusInfo*)taosHashGet(pHash, &streamId, sizeof(streamId)); + ASSERT(pChkptInfo->genTs > 0 && pChkptInfo->checkpointId == 0); + mDebug("s-task:0x%" PRIx64 " set the initial consensus-checkpointId:0", streamId); + return TSDB_CODE_SUCCESS; +} \ No newline at end of file