diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index fc056c9eba..901b43023a 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -100,7 +100,11 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { if (voteGrantedMajority(pSyncNode->pVotesGranted)) { // only myself, to leader - ASSERT(!pSyncNode->pVotesGranted->toLeader); + if (pSyncNode->pVotesGranted->toLeader) { + ret = TSDB_CODE_SYN_INTERNAL_ERROR; + sError("vgId:%d, failed to elect since already be to leader", pSyncNode->vgId); + return ret; + } syncNodeCandidate2Leader(pSyncNode); pSyncNode->pVotesGranted->toLeader = true; return ret; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index d947a488bf..215c8f9e58 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -726,7 +726,12 @@ int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm) { } SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) { - ASSERT(pSyncNode->raftCfg.configIndexCount >= 1); + if (!(pSyncNode->raftCfg.configIndexCount >= 1)) { + sError("vgId:%d, failed get snapshot config index, configIndexCount:%d", pSyncNode->vgId, + pSyncNode->raftCfg.configIndexCount); + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -2; + } SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0]; for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) { @@ -1477,9 +1482,18 @@ int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) { #endif void syncNodePreClose(SSyncNode* pSyncNode) { - ASSERT(pSyncNode != NULL); - ASSERT(pSyncNode->pFsm != NULL); - ASSERT(pSyncNode->pFsm->FpApplyQueueItems != NULL); + if (pSyncNode == NULL) { + sError("failed to pre close sync node since sync node is null"); + return; + } + if (pSyncNode->pFsm == NULL) { + sError("failed to pre close sync node since fsm is null"); + return; + } + if (pSyncNode->pFsm->FpApplyQueueItems == NULL) { + sError("failed to pre close sync node since FpApplyQueueItems is null"); + return; + } // stop elect timer (void)syncNodeStopElectTimer(pSyncNode); @@ -2080,7 +2094,10 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { SyncIndex lastIndex; SyncTerm lastTerm; int32_t code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm); - ASSERT(code == 0); + if (code != 0) { + sError("vgId:%d, failed to become leader since %s", pSyncNode->vgId, tstrerror(code)); + return; + } pSyncNode->pNextIndex->index[i] = lastIndex + 1; } @@ -2153,7 +2170,10 @@ void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) { SyncIndex lastIndex; SyncTerm lastTerm; int32_t code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm); - ASSERT(code == 0); + if (code != 0) { + sError("vgId:%d, failed to become assigned leader since %s", pSyncNode->vgId, tstrerror(code)); + return; + } pSyncNode->pNextIndex->index[i] = lastIndex + 1; } @@ -2196,7 +2216,10 @@ void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) { } void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { - ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); + if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) { + sError("vgId:%d, failed leader from candidate since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state); + return; + } bool granted = voteGrantedMajority(pSyncNode->pVotesGranted); if (!granted) { sError("vgId:%d, not granted by majority.", pSyncNode->vgId); @@ -2229,7 +2252,10 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) { } void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { - ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER); + if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) { + sError("vgId:%d, failed candidate from follower since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state); + return; + } pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE; pSyncNode->roleTimeMs = taosGetTimestampMs(); SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); @@ -2259,11 +2285,17 @@ int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) { } // just called by syncNodeVoteForSelf -// need assert void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) { - ASSERT(term == raftStoreGetTerm(pSyncNode)); + SyncTerm storeTerm = raftStoreGetTerm(pSyncNode); + if (term != storeTerm) { + sError("vgId:%d, failed to vote for term, term:%" PRId64 ", storeTerm:%" PRId64, pSyncNode->vgId, term, storeTerm); + return; + } bool voted = raftStoreHasVoted(pSyncNode); - ASSERT(!voted); + if (voted) { + sError("vgId:%d, failed to vote for term since not voted", pSyncNode->vgId); + return; + } raftStoreVote(pSyncNode, pRaftId); } @@ -2407,7 +2439,7 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { .lastConfigIndex = SYNC_INDEX_INVALID}; if (code == 0) { - ASSERT(pPreEntry != NULL); + if (pPreEntry == NULL) return -1; preTerm = pPreEntry->term; if (h) { @@ -3238,7 +3270,6 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { // append to log buffer if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) { sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index); - ASSERT(terrno != 0); (void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false); syncEntryDestroy(pEntry); goto _out; @@ -3347,7 +3378,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) { SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore); SyncTerm term = raftStoreGetTerm(ths); SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId); - ASSERT(pEntry != NULL); + if (pEntry == NULL) return -1; LRUHandle* h = NULL; diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 5e6204dde8..a6e9c7de32 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -682,12 +682,18 @@ int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTe int32_t code = 0, lino = 0; bool retry = false; do { + SFsmCbMeta cbMeta = {0}; + cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index); + if (cbMeta.lastConfigIndex < -1) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + if (terrno != 0) code = terrno; + return code; + } + SRpcMsg rpcMsg = {.code = applyCode}; TAOS_CHECK_EXIT(syncEntry2OriginalRpc(pEntry, &rpcMsg)); - SFsmCbMeta cbMeta = {0}; cbMeta.index = pEntry->index; - cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index); cbMeta.isWeak = pEntry->isWeak; cbMeta.code = applyCode; cbMeta.state = role;