diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index d48accb2f9..690b78cd1a 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -78,7 +78,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static SArray *extractNodeListFromStream(SMnode *pMnode); -static SArray *mndTakeVgroupSnapshot(SMnode *pMnode); +static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); @@ -1157,7 +1157,13 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { } } - SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode); + bool allReady = true; + SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady); + if (!allReady) { + mWarn("not all vnodes are ready, ignore the checkpoint") + taosArrayDestroy(pNodeSnapshot); + return 0; + } SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot); bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); @@ -1205,7 +1211,6 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { } mDebug("start to trigger checkpoint, checkpointId: %" PRId64, checkpointId); - mndTransSetSerial(pTrans); const char *pDb = mndGetStreamDB(pMnode); mndTransSetDbName(pTrans, pDb, "checkpoint"); @@ -2061,11 +2066,12 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP return info; } -static SArray *mndTakeVgroupSnapshot(SMnode *pMnode) { +static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; SVgObj *pVgroup = NULL; + *allReady = true; SArray *pVgroupListSnapshot = taosArrayInit(4, sizeof(SNodeEntry)); while (1) { @@ -2077,7 +2083,22 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode) { SNodeEntry entry = {0}; entry.epset = mndGetVgroupEpset(pMnode, pVgroup); entry.nodeId = pVgroup->vgId; - entry.hbTimestamp = -1; + entry.hbTimestamp = pVgroup->updateTime; + + if (*allReady) { + for (int32_t i = 0; i < pVgroup->replica; ++i) { + if (!pVgroup->vnodeGid[i].syncRestore) { + *allReady = false; + break; + } + + ESyncState state = pVgroup->vnodeGid[i].syncState; + if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR) { + *allReady = false; + break; + } + } + } char buf[256] = {0}; EPSET_TO_STR(&entry.epset, buf); @@ -2111,7 +2132,6 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange sdbCancelFetch(pSdb, pIter); return terrno; } - mndTransSetSerial(pTrans); } void *p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb)); @@ -2327,10 +2347,16 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { return 0; } - SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode); + bool allVnodeReady = true; + SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allVnodeReady); + if (!allVnodeReady) { + taosArrayDestroy(pNodeSnapshot); + atomic_store_32(&mndNodeCheckSentinel, 0); + mWarn("not all vnodes are ready, ignore the exec nodeUpdate check"); + return 0; + } taosThreadMutexLock(&execInfo.lock); - removeExpirednodeEntryAndTask(pNodeSnapshot); SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot);