fix/TD-31542-remove-assert-sync2
This commit is contained in:
parent
f240c78f76
commit
e4a4832bb9
|
@ -100,7 +100,11 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
|
||||||
|
|
||||||
if (voteGrantedMajority(pSyncNode->pVotesGranted)) {
|
if (voteGrantedMajority(pSyncNode->pVotesGranted)) {
|
||||||
// only myself, to leader
|
// only myself, to leader
|
||||||
ASSERT(!pSyncNode->pVotesGranted->toLeader);
|
if (pSyncNode->pVotesGranted->toLeader) {
|
||||||
|
ret = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
sError("vgId:%d, failed to elect since already be to leader", pSyncNode->vgId);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
syncNodeCandidate2Leader(pSyncNode);
|
syncNodeCandidate2Leader(pSyncNode);
|
||||||
pSyncNode->pVotesGranted->toLeader = true;
|
pSyncNode->pVotesGranted->toLeader = true;
|
||||||
return ret;
|
return ret;
|
||||||
|
|
|
@ -726,7 +726,12 @@ int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
|
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
|
||||||
ASSERT(pSyncNode->raftCfg.configIndexCount >= 1);
|
if (!(pSyncNode->raftCfg.configIndexCount >= 1)) {
|
||||||
|
sError("vgId:%d, failed get snapshot config index, configIndexCount:%d", pSyncNode->vgId,
|
||||||
|
pSyncNode->raftCfg.configIndexCount);
|
||||||
|
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
|
SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
|
||||||
|
|
||||||
for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
|
for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
|
||||||
|
@ -1477,9 +1482,18 @@ int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
void syncNodePreClose(SSyncNode* pSyncNode) {
|
void syncNodePreClose(SSyncNode* pSyncNode) {
|
||||||
ASSERT(pSyncNode != NULL);
|
if (pSyncNode == NULL) {
|
||||||
ASSERT(pSyncNode->pFsm != NULL);
|
sError("failed to pre close sync node since sync node is null");
|
||||||
ASSERT(pSyncNode->pFsm->FpApplyQueueItems != NULL);
|
return;
|
||||||
|
}
|
||||||
|
if (pSyncNode->pFsm == NULL) {
|
||||||
|
sError("failed to pre close sync node since fsm is null");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (pSyncNode->pFsm->FpApplyQueueItems == NULL) {
|
||||||
|
sError("failed to pre close sync node since FpApplyQueueItems is null");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// stop elect timer
|
// stop elect timer
|
||||||
(void)syncNodeStopElectTimer(pSyncNode);
|
(void)syncNodeStopElectTimer(pSyncNode);
|
||||||
|
@ -2080,7 +2094,10 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
||||||
SyncIndex lastIndex;
|
SyncIndex lastIndex;
|
||||||
SyncTerm lastTerm;
|
SyncTerm lastTerm;
|
||||||
int32_t code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
|
int32_t code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
|
||||||
ASSERT(code == 0);
|
if (code != 0) {
|
||||||
|
sError("vgId:%d, failed to become leader since %s", pSyncNode->vgId, tstrerror(code));
|
||||||
|
return;
|
||||||
|
}
|
||||||
pSyncNode->pNextIndex->index[i] = lastIndex + 1;
|
pSyncNode->pNextIndex->index[i] = lastIndex + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2153,7 +2170,10 @@ void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
|
||||||
SyncIndex lastIndex;
|
SyncIndex lastIndex;
|
||||||
SyncTerm lastTerm;
|
SyncTerm lastTerm;
|
||||||
int32_t code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
|
int32_t code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
|
||||||
ASSERT(code == 0);
|
if (code != 0) {
|
||||||
|
sError("vgId:%d, failed to become assigned leader since %s", pSyncNode->vgId, tstrerror(code));
|
||||||
|
return;
|
||||||
|
}
|
||||||
pSyncNode->pNextIndex->index[i] = lastIndex + 1;
|
pSyncNode->pNextIndex->index[i] = lastIndex + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2196,7 +2216,10 @@ void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
|
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
|
||||||
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
|
if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
|
||||||
|
sError("vgId:%d, failed leader from candidate since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
|
||||||
|
return;
|
||||||
|
}
|
||||||
bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
|
bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
|
||||||
if (!granted) {
|
if (!granted) {
|
||||||
sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
|
sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
|
||||||
|
@ -2229,7 +2252,10 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
|
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
|
||||||
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
|
if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
|
||||||
|
sError("vgId:%d, failed candidate from follower since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
|
||||||
|
return;
|
||||||
|
}
|
||||||
pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
|
pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
|
||||||
pSyncNode->roleTimeMs = taosGetTimestampMs();
|
pSyncNode->roleTimeMs = taosGetTimestampMs();
|
||||||
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
||||||
|
@ -2259,11 +2285,17 @@ int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// just called by syncNodeVoteForSelf
|
// just called by syncNodeVoteForSelf
|
||||||
// need assert
|
|
||||||
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
|
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
|
||||||
ASSERT(term == raftStoreGetTerm(pSyncNode));
|
SyncTerm storeTerm = raftStoreGetTerm(pSyncNode);
|
||||||
|
if (term != storeTerm) {
|
||||||
|
sError("vgId:%d, failed to vote for term, term:%" PRId64 ", storeTerm:%" PRId64, pSyncNode->vgId, term, storeTerm);
|
||||||
|
return;
|
||||||
|
}
|
||||||
bool voted = raftStoreHasVoted(pSyncNode);
|
bool voted = raftStoreHasVoted(pSyncNode);
|
||||||
ASSERT(!voted);
|
if (voted) {
|
||||||
|
sError("vgId:%d, failed to vote for term since not voted", pSyncNode->vgId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
raftStoreVote(pSyncNode, pRaftId);
|
raftStoreVote(pSyncNode, pRaftId);
|
||||||
}
|
}
|
||||||
|
@ -2407,7 +2439,7 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
|
||||||
.lastConfigIndex = SYNC_INDEX_INVALID};
|
.lastConfigIndex = SYNC_INDEX_INVALID};
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
ASSERT(pPreEntry != NULL);
|
if (pPreEntry == NULL) return -1;
|
||||||
preTerm = pPreEntry->term;
|
preTerm = pPreEntry->term;
|
||||||
|
|
||||||
if (h) {
|
if (h) {
|
||||||
|
@ -3238,7 +3270,6 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
|
||||||
// append to log buffer
|
// append to log buffer
|
||||||
if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) {
|
if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) {
|
||||||
sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
|
sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
|
||||||
ASSERT(terrno != 0);
|
|
||||||
(void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false);
|
(void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false);
|
||||||
syncEntryDestroy(pEntry);
|
syncEntryDestroy(pEntry);
|
||||||
goto _out;
|
goto _out;
|
||||||
|
@ -3347,7 +3378,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
|
||||||
SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
|
SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
|
||||||
SyncTerm term = raftStoreGetTerm(ths);
|
SyncTerm term = raftStoreGetTerm(ths);
|
||||||
SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
|
SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
|
||||||
ASSERT(pEntry != NULL);
|
if (pEntry == NULL) return -1;
|
||||||
|
|
||||||
LRUHandle* h = NULL;
|
LRUHandle* h = NULL;
|
||||||
|
|
||||||
|
|
|
@ -617,6 +617,11 @@ int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTe
|
||||||
SFsmCbMeta cbMeta = {0};
|
SFsmCbMeta cbMeta = {0};
|
||||||
cbMeta.index = pEntry->index;
|
cbMeta.index = pEntry->index;
|
||||||
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index);
|
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index);
|
||||||
|
if (cbMeta.lastConfigIndex < 0) {
|
||||||
|
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
if (terrno != 0) code = terrno;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
cbMeta.isWeak = pEntry->isWeak;
|
cbMeta.isWeak = pEntry->isWeak;
|
||||||
cbMeta.code = applyCode;
|
cbMeta.code = applyCode;
|
||||||
cbMeta.state = role;
|
cbMeta.state = role;
|
||||||
|
|
Loading…
Reference in New Issue