From a8109b7f370adfa49b05dbab62e5414c906c4cec Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 27 Oct 2022 14:43:07 +0800 Subject: [PATCH] refactor(sync): delete some code --- source/libs/sync/src/syncAppendEntries.c | 182 ----------------------- 1 file changed, 182 deletions(-) diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 80e0cbbf99..7dab112a51 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -89,188 +89,6 @@ // /\ UNCHANGED <> // -static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { - int32_t code; - - SyncIndex delBegin = pMsg->prevLogIndex + 1; - SyncIndex delEnd = ths->pLogStore->syncLogLastIndex(ths->pLogStore); - - // invert roll back! - for (SyncIndex index = delEnd; index >= delBegin; --index) { - if (ths->pFsm->FpRollBackCb != NULL) { - SSyncRaftEntry* pRollBackEntry; - code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, index, &pRollBackEntry); - ASSERT(code == 0); - ASSERT(pRollBackEntry != NULL); - - if (syncUtilUserRollback(pRollBackEntry->msgType)) { - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); - - SFsmCbMeta cbMeta = {0}; - cbMeta.index = pRollBackEntry->index; - cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); - cbMeta.isWeak = pRollBackEntry->isWeak; - cbMeta.code = 0; - cbMeta.state = ths->state; - cbMeta.seqNum = pRollBackEntry->seqNum; - ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta); - rpcFreeCont(rpcMsg.pCont); - } - - syncEntryDestory(pRollBackEntry); - } - } - - // delete confict entries - code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); - ASSERT(code == 0); - - return code; -} - -// if FromIndex > walCommitVer, return 0 -// else return num of pass entries -static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) { - int32_t code = 0; - int32_t pass = 0; - - SyncIndex delBegin = FromIndex; - SyncIndex delEnd = ths->pLogStore->syncLogLastIndex(ths->pLogStore); - - // invert roll back! - for (SyncIndex index = delEnd; index >= delBegin; --index) { - if (ths->pFsm->FpRollBackCb != NULL) { - SSyncRaftEntry* pRollBackEntry; - code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, index, &pRollBackEntry); - ASSERT(code == 0); - ASSERT(pRollBackEntry != NULL); - - if (syncUtilUserRollback(pRollBackEntry->msgType)) { - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); - - SFsmCbMeta cbMeta = {0}; - cbMeta.index = pRollBackEntry->index; - cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); - cbMeta.isWeak = pRollBackEntry->isWeak; - cbMeta.code = 0; - cbMeta.state = ths->state; - cbMeta.seqNum = pRollBackEntry->seqNum; - ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta); - rpcFreeCont(rpcMsg.pCont); - } - - syncEntryDestory(pRollBackEntry); - } - } - - // update delete begin - SyncIndex walCommitVer = logStoreWalCommitVer(ths->pLogStore); - - if (delBegin <= walCommitVer) { - delBegin = walCommitVer + 1; - pass = walCommitVer - delBegin + 1; - - do { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "update delete begin to %" PRId64, delBegin); - syncNodeEventLog(ths, logBuf); - } while (0); - } - - // delete confict entries - code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); - ASSERT(code == 0); - - do { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "make log same from:%" PRId64 ", delbegin:%" PRId64 ", pass:%d", FromIndex, - delBegin, pass); - syncNodeEventLog(ths, logBuf); - } while (0); - - return pass; -} - -int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code) { - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pEntry, &rpcMsg); - - // leader transfer - if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) { - int32_t code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry); - ASSERT(code == 0); - } - - if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { - SFsmCbMeta cbMeta = {0}; - cbMeta.index = pEntry->index; - cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); - cbMeta.isWeak = pEntry->isWeak; - cbMeta.code = code; - cbMeta.state = ths->state; - cbMeta.seqNum = pEntry->seqNum; - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta); - } - } - rpcFreeCont(rpcMsg.pCont); - return 0; -} - -static bool syncNodeOnAppendEntriesBatchLogOK(SSyncNode* pSyncNode, SyncAppendEntriesBatch* pMsg) { - if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) { - return true; - } - - SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode); - if (pMsg->prevLogIndex > myLastIndex) { - sDebug("vgId:%d, sync log not ok, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex); - return false; - } - - SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1); - if (myPreLogTerm == SYNC_TERM_INVALID) { - sDebug("vgId:%d, sync log not ok2, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex); - return false; - } - - if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) { - return true; - } - - sDebug("vgId:%d, sync log not ok3, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex); - return false; -} - -// really pre log match -// prevLogIndex == -1 -static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries* pMsg) { - if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) { - return true; - } - - SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode); - if (pMsg->prevLogIndex > myLastIndex) { - sDebug("vgId:%d, sync log not ok, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex); - return false; - } - - SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1); - if (myPreLogTerm == SYNC_TERM_INVALID) { - sDebug("vgId:%d, sync log not ok2, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex); - return false; - } - - if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) { - return true; - } - - sDebug("vgId:%d, sync log not ok3, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex); - return false; -} - int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) { // maybe update commit index, leader notice me if (newCommitIndex > ths->commitIndex) {