fix(stream): generate the checkpoint id after checking the max checkpoint id from vnode.

This commit is contained in:
Haojun Liao 2024-04-16 14:15:18 +08:00
parent eba924776f
commit 810eb22eb2
1 changed files with 32 additions and 4 deletions

View File

@ -821,19 +821,47 @@ int64_t mndStreamGenChkpId(SMnode *pMnode) {
SStreamObj *pStream = NULL;
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
int64_t maxChkpId = 0;
int64_t maxChkptId = 0;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
maxChkpId = TMAX(maxChkpId, pStream->checkpointId);
maxChkptId = TMAX(maxChkptId, pStream->checkpointId);
mDebug("stream:%p, %s id:%" PRIx64 "checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid,
pStream->checkpointId);
sdbRelease(pSdb, pStream);
}
mDebug("generated checkpoint %" PRId64 "", maxChkpId + 1);
return maxChkpId + 1;
{ // check the max checkpoint id from all vnodes.
int64_t maxCheckpointId = -1;
taosThreadMutexLock(&execInfo.lock);
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
if (pEntry == NULL) {
continue;
}
if (pEntry->checkpointInfo.failed) {
continue;
}
if (maxCheckpointId < pEntry->checkpointInfo.latestId) {
maxCheckpointId = pEntry->checkpointInfo.latestId;
}
}
taosThreadMutexUnlock(&execInfo.lock);
if (maxCheckpointId > maxChkptId) {
mDebug("max checkpointId in mnode:%" PRId64 ", smaller than max checkpointId in vnode:%" PRId64, maxChkptId,
maxCheckpointId);
maxChkptId = maxCheckpointId;
}
}
mDebug("generated checkpoint %" PRId64 "", maxChkptId + 1);
return maxChkptId + 1;
}
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {