diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index e7980bc2dd..a59b14259a 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -74,22 +74,18 @@ static FORCE_INLINE int32_t syncLogGetNextRetryBackoff(SSyncLogReplMgr* pMgr) { return TMIN(pMgr->retryBackoff + 1, SYNC_MAX_RETRY_BACKOFF); } -static FORCE_INLINE int32_t syncLogReplMgrUpdateTerm(SSyncLogReplMgr* pMgr, SyncIndex index, SyncTerm term) { - if (pMgr->endIndex == 0) return -1; - ASSERT(pMgr->startIndex <= index && index < pMgr->endIndex); - pMgr->states[(index + pMgr->size) % pMgr->size].term = term; - return 0; -} - SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); + int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, SRaftId* pDestId, bool* pBarrier); -int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); + +int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); + int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg); int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode); @@ -111,7 +107,6 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode); SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf); int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf); int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SyncIndex toIndex); -int32_t syncLogBufferReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 00f3791ea2..1fc6798471 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -150,7 +150,7 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { goto _err; } SyncIndex commitIndex = snapshot.lastApplyIndex; - SyncTerm commitTerm = snapshot.lastApplyTerm; + SyncTerm commitTerm = TMAX(snapshot.lastApplyTerm, 0); if (syncLogValidateAlignmentOfCommit(pNode, commitIndex)) { terrno = TSDB_CODE_WAL_LOG_INCOMPLETE; goto _err;