fix(stream): do nodeUpdate trans only after check if vnodes are all ready.
This commit is contained in:
parent
48202a1e45
commit
91121974d7
|
@ -78,7 +78,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
|
|||
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
|
||||
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
|
||||
static SArray *extractNodeListFromStream(SMnode *pMnode);
|
||||
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode);
|
||||
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady);
|
||||
|
||||
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
|
||||
|
||||
|
@ -1157,7 +1157,13 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
|
||||
bool allReady = true;
|
||||
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady);
|
||||
if (!allReady) {
|
||||
mWarn("not all vnodes are ready, ignore the checkpoint")
|
||||
taosArrayDestroy(pNodeSnapshot);
|
||||
return 0;
|
||||
}
|
||||
|
||||
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot);
|
||||
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
|
||||
|
@ -1205,7 +1211,6 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
mDebug("start to trigger checkpoint, checkpointId: %" PRId64, checkpointId);
|
||||
mndTransSetSerial(pTrans);
|
||||
|
||||
const char *pDb = mndGetStreamDB(pMnode);
|
||||
mndTransSetDbName(pTrans, pDb, "checkpoint");
|
||||
|
@ -2061,11 +2066,12 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
|
|||
return info;
|
||||
}
|
||||
|
||||
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode) {
|
||||
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
SVgObj *pVgroup = NULL;
|
||||
|
||||
*allReady = true;
|
||||
SArray *pVgroupListSnapshot = taosArrayInit(4, sizeof(SNodeEntry));
|
||||
|
||||
while (1) {
|
||||
|
@ -2077,7 +2083,22 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode) {
|
|||
SNodeEntry entry = {0};
|
||||
entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
entry.nodeId = pVgroup->vgId;
|
||||
entry.hbTimestamp = -1;
|
||||
entry.hbTimestamp = pVgroup->updateTime;
|
||||
|
||||
if (*allReady) {
|
||||
for (int32_t i = 0; i < pVgroup->replica; ++i) {
|
||||
if (!pVgroup->vnodeGid[i].syncRestore) {
|
||||
*allReady = false;
|
||||
break;
|
||||
}
|
||||
|
||||
ESyncState state = pVgroup->vnodeGid[i].syncState;
|
||||
if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR) {
|
||||
*allReady = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
char buf[256] = {0};
|
||||
EPSET_TO_STR(&entry.epset, buf);
|
||||
|
@ -2111,7 +2132,6 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
|
|||
sdbCancelFetch(pSdb, pIter);
|
||||
return terrno;
|
||||
}
|
||||
mndTransSetSerial(pTrans);
|
||||
}
|
||||
|
||||
void *p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
|
||||
|
@ -2327,10 +2347,16 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
|
||||
bool allVnodeReady = true;
|
||||
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allVnodeReady);
|
||||
if (!allVnodeReady) {
|
||||
taosArrayDestroy(pNodeSnapshot);
|
||||
atomic_store_32(&mndNodeCheckSentinel, 0);
|
||||
mWarn("not all vnodes are ready, ignore the exec nodeUpdate check");
|
||||
return 0;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&execInfo.lock);
|
||||
|
||||
removeExpirednodeEntryAndTask(pNodeSnapshot);
|
||||
|
||||
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot);
|
||||
|
|
Loading…
Reference in New Issue