diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a137c10ed5..54b24fe0b4 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2309,7 +2309,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { taosThreadMutexUnlock(&execInfo.lock); if (numOfNodes == 0) { - mDebug("end to do stream task node change checking, no vgroup exists, do nothing"); + mDebug("end to do stream task(s) node change checking, no stream tasks exist, do nothing"); execInfo.ts = ts; atomic_store_32(&mndNodeCheckSentinel, 0); return 0; diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 8fb5bc8a99..a15b817784 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -84,6 +84,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; SVgObj *pVgroup = NULL; + int32_t replica = -1; // do the replica check *allReady = true; SArray *pVgroupListSnapshot = taosArrayInit(4, sizeof(SNodeEntry)); @@ -97,6 +98,17 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime}; entry.epset = mndGetVgroupEpset(pMnode, pVgroup); + if (replica == -1) { + replica = pVgroup->replica; + } else { + if (replica != pVgroup->replica) { + mInfo("vgId:%d replica:%d inconsistent with other vgroups replica:%d, not ready for stream operations", + pVgroup->vgId); + *allReady = false; + break; + } + } + // if not all ready till now, no need to check the remaining vgroups. if (*allReady) { for (int32_t i = 0; i < pVgroup->replica; ++i) { @@ -107,8 +119,10 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { } ESyncState state = pVgroup->vnodeGid[i].syncState; - if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR) { - mInfo("vgId:%d offline/err, not ready for checkpoint or other operations", pVgroup->vgId); + if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR || state == TAOS_SYNC_STATE_LEARNER || + state == TAOS_SYNC_STATE_CANDIDATE) { + mInfo("vgId:%d state:%d , not ready for checkpoint or other operations, not check other vgroups", + pVgroup->vgId, state); *allReady = false; break; }