fix(stream): add more check before launching update stream task nodeEp

This commit is contained in:
Haojun Liao 2024-07-03 09:20:37 +08:00
parent c73a003ee3
commit 27cb3638c2
2 changed files with 17 additions and 3 deletions

View File

@ -2309,7 +2309,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
taosThreadMutexUnlock(&execInfo.lock);
if (numOfNodes == 0) {
mDebug("end to do stream task node change checking, no vgroup exists, do nothing");
mDebug("end to do stream task(s) node change checking, no stream tasks exist, do nothing");
execInfo.ts = ts;
atomic_store_32(&mndNodeCheckSentinel, 0);
return 0;

View File

@ -84,6 +84,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SVgObj *pVgroup = NULL;
int32_t replica = -1; // do the replica check
*allReady = true;
SArray *pVgroupListSnapshot = taosArrayInit(4, sizeof(SNodeEntry));
@ -97,6 +98,17 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime};
entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
if (replica == -1) {
replica = pVgroup->replica;
} else {
if (replica != pVgroup->replica) {
mInfo("vgId:%d replica:%d inconsistent with other vgroups replica:%d, not ready for stream operations",
pVgroup->vgId);
*allReady = false;
break;
}
}
// if not all ready till now, no need to check the remaining vgroups.
if (*allReady) {
for (int32_t i = 0; i < pVgroup->replica; ++i) {
@ -107,8 +119,10 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
}
ESyncState state = pVgroup->vnodeGid[i].syncState;
if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR) {
mInfo("vgId:%d offline/err, not ready for checkpoint or other operations", pVgroup->vgId);
if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR || state == TAOS_SYNC_STATE_LEARNER ||
state == TAOS_SYNC_STATE_CANDIDATE) {
mInfo("vgId:%d state:%d , not ready for checkpoint or other operations, not check other vgroups",
pVgroup->vgId, state);
*allReady = false;
break;
}