From 810eb22eb284513d968449148914ca7e6aa77358 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 16 Apr 2024 14:15:18 +0800 Subject: [PATCH] fix(stream): generate the checkpoint id after checking the max checkpoint id from vnode. --- source/dnode/mnode/impl/src/mndStream.c | 36 ++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 566e1a28c3..ed492fe254 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -821,19 +821,47 @@ int64_t mndStreamGenChkpId(SMnode *pMnode) { SStreamObj *pStream = NULL; void *pIter = NULL; SSdb *pSdb = pMnode->pSdb; - int64_t maxChkpId = 0; + int64_t maxChkptId = 0; while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) break; - maxChkpId = TMAX(maxChkpId, pStream->checkpointId); + maxChkptId = TMAX(maxChkptId, pStream->checkpointId); mDebug("stream:%p, %s id:%" PRIx64 "checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid, pStream->checkpointId); sdbRelease(pSdb, pStream); } - mDebug("generated checkpoint %" PRId64 "", maxChkpId + 1); - return maxChkpId + 1; + { // check the max checkpoint id from all vnodes. + int64_t maxCheckpointId = -1; + taosThreadMutexLock(&execInfo.lock); + for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { + STaskId *p = taosArrayGet(execInfo.pTaskList, i); + + STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); + if (pEntry == NULL) { + continue; + } + + if (pEntry->checkpointInfo.failed) { + continue; + } + + if (maxCheckpointId < pEntry->checkpointInfo.latestId) { + maxCheckpointId = pEntry->checkpointInfo.latestId; + } + } + + taosThreadMutexUnlock(&execInfo.lock); + if (maxCheckpointId > maxChkptId) { + mDebug("max checkpointId in mnode:%" PRId64 ", smaller than max checkpointId in vnode:%" PRId64, maxChkptId, + maxCheckpointId); + maxChkptId = maxCheckpointId; + } + } + + mDebug("generated checkpoint %" PRId64 "", maxChkptId + 1); + return maxChkptId + 1; } static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {