Merge pull request #27451 from taosdata/fix/TD-31542-remove-assert-sync2
fix/TD-31542-remove-assert-sync2
This commit is contained in:
commit
7eec514863
|
@ -100,7 +100,11 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
|
|||
|
||||
if (voteGrantedMajority(pSyncNode->pVotesGranted)) {
|
||||
// only myself, to leader
|
||||
ASSERT(!pSyncNode->pVotesGranted->toLeader);
|
||||
if (pSyncNode->pVotesGranted->toLeader) {
|
||||
ret = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
sError("vgId:%d, failed to elect since already be to leader", pSyncNode->vgId);
|
||||
return ret;
|
||||
}
|
||||
syncNodeCandidate2Leader(pSyncNode);
|
||||
pSyncNode->pVotesGranted->toLeader = true;
|
||||
return ret;
|
||||
|
|
|
@ -726,7 +726,12 @@ int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm) {
|
|||
}
|
||||
|
||||
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
|
||||
ASSERT(pSyncNode->raftCfg.configIndexCount >= 1);
|
||||
if (!(pSyncNode->raftCfg.configIndexCount >= 1)) {
|
||||
sError("vgId:%d, failed get snapshot config index, configIndexCount:%d", pSyncNode->vgId,
|
||||
pSyncNode->raftCfg.configIndexCount);
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -2;
|
||||
}
|
||||
SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
|
||||
|
||||
for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
|
||||
|
@ -1477,9 +1482,18 @@ int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
|
|||
#endif
|
||||
|
||||
void syncNodePreClose(SSyncNode* pSyncNode) {
|
||||
ASSERT(pSyncNode != NULL);
|
||||
ASSERT(pSyncNode->pFsm != NULL);
|
||||
ASSERT(pSyncNode->pFsm->FpApplyQueueItems != NULL);
|
||||
if (pSyncNode == NULL) {
|
||||
sError("failed to pre close sync node since sync node is null");
|
||||
return;
|
||||
}
|
||||
if (pSyncNode->pFsm == NULL) {
|
||||
sError("failed to pre close sync node since fsm is null");
|
||||
return;
|
||||
}
|
||||
if (pSyncNode->pFsm->FpApplyQueueItems == NULL) {
|
||||
sError("failed to pre close sync node since FpApplyQueueItems is null");
|
||||
return;
|
||||
}
|
||||
|
||||
// stop elect timer
|
||||
(void)syncNodeStopElectTimer(pSyncNode);
|
||||
|
@ -2080,7 +2094,10 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
SyncIndex lastIndex;
|
||||
SyncTerm lastTerm;
|
||||
int32_t code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
|
||||
ASSERT(code == 0);
|
||||
if (code != 0) {
|
||||
sError("vgId:%d, failed to become leader since %s", pSyncNode->vgId, tstrerror(code));
|
||||
return;
|
||||
}
|
||||
pSyncNode->pNextIndex->index[i] = lastIndex + 1;
|
||||
}
|
||||
|
||||
|
@ -2153,7 +2170,10 @@ void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
|
|||
SyncIndex lastIndex;
|
||||
SyncTerm lastTerm;
|
||||
int32_t code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
|
||||
ASSERT(code == 0);
|
||||
if (code != 0) {
|
||||
sError("vgId:%d, failed to become assigned leader since %s", pSyncNode->vgId, tstrerror(code));
|
||||
return;
|
||||
}
|
||||
pSyncNode->pNextIndex->index[i] = lastIndex + 1;
|
||||
}
|
||||
|
||||
|
@ -2196,7 +2216,10 @@ void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
|
|||
}
|
||||
|
||||
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
|
||||
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
|
||||
if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
|
||||
sError("vgId:%d, failed leader from candidate since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
|
||||
return;
|
||||
}
|
||||
bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
|
||||
if (!granted) {
|
||||
sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
|
||||
|
@ -2229,7 +2252,10 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
|
|||
}
|
||||
|
||||
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
|
||||
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
|
||||
if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
|
||||
sError("vgId:%d, failed candidate from follower since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
|
||||
return;
|
||||
}
|
||||
pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
|
||||
pSyncNode->roleTimeMs = taosGetTimestampMs();
|
||||
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
||||
|
@ -2259,11 +2285,17 @@ int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
|
|||
}
|
||||
|
||||
// just called by syncNodeVoteForSelf
|
||||
// need assert
|
||||
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
|
||||
ASSERT(term == raftStoreGetTerm(pSyncNode));
|
||||
SyncTerm storeTerm = raftStoreGetTerm(pSyncNode);
|
||||
if (term != storeTerm) {
|
||||
sError("vgId:%d, failed to vote for term, term:%" PRId64 ", storeTerm:%" PRId64, pSyncNode->vgId, term, storeTerm);
|
||||
return;
|
||||
}
|
||||
bool voted = raftStoreHasVoted(pSyncNode);
|
||||
ASSERT(!voted);
|
||||
if (voted) {
|
||||
sError("vgId:%d, failed to vote for term since not voted", pSyncNode->vgId);
|
||||
return;
|
||||
}
|
||||
|
||||
raftStoreVote(pSyncNode, pRaftId);
|
||||
}
|
||||
|
@ -2407,7 +2439,7 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
|
|||
.lastConfigIndex = SYNC_INDEX_INVALID};
|
||||
|
||||
if (code == 0) {
|
||||
ASSERT(pPreEntry != NULL);
|
||||
if (pPreEntry == NULL) return -1;
|
||||
preTerm = pPreEntry->term;
|
||||
|
||||
if (h) {
|
||||
|
@ -3238,7 +3270,6 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
|
|||
// append to log buffer
|
||||
if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) {
|
||||
sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
|
||||
ASSERT(terrno != 0);
|
||||
(void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false);
|
||||
syncEntryDestroy(pEntry);
|
||||
goto _out;
|
||||
|
@ -3347,7 +3378,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
|
|||
SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
|
||||
SyncTerm term = raftStoreGetTerm(ths);
|
||||
SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
|
||||
ASSERT(pEntry != NULL);
|
||||
if (pEntry == NULL) return -1;
|
||||
|
||||
LRUHandle* h = NULL;
|
||||
|
||||
|
|
|
@ -682,12 +682,18 @@ int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTe
|
|||
int32_t code = 0, lino = 0;
|
||||
bool retry = false;
|
||||
do {
|
||||
SFsmCbMeta cbMeta = {0};
|
||||
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index);
|
||||
if (cbMeta.lastConfigIndex < -1) {
|
||||
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
if (terrno != 0) code = terrno;
|
||||
return code;
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {.code = applyCode};
|
||||
TAOS_CHECK_EXIT(syncEntry2OriginalRpc(pEntry, &rpcMsg));
|
||||
|
||||
SFsmCbMeta cbMeta = {0};
|
||||
cbMeta.index = pEntry->index;
|
||||
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index);
|
||||
cbMeta.isWeak = pEntry->isWeak;
|
||||
cbMeta.code = applyCode;
|
||||
cbMeta.state = role;
|
||||
|
|
Loading…
Reference in New Issue