refactor(sync): delete some code
This commit is contained in:
parent
6c5372aad2
commit
a8109b7f37
|
@ -89,188 +89,6 @@
|
||||||
// /\ UNCHANGED <<candidateVars, leaderVars>>
|
// /\ UNCHANGED <<candidateVars, leaderVars>>
|
||||||
//
|
//
|
||||||
|
|
||||||
static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|
||||||
int32_t code;
|
|
||||||
|
|
||||||
SyncIndex delBegin = pMsg->prevLogIndex + 1;
|
|
||||||
SyncIndex delEnd = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
|
|
||||||
|
|
||||||
// invert roll back!
|
|
||||||
for (SyncIndex index = delEnd; index >= delBegin; --index) {
|
|
||||||
if (ths->pFsm->FpRollBackCb != NULL) {
|
|
||||||
SSyncRaftEntry* pRollBackEntry;
|
|
||||||
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, index, &pRollBackEntry);
|
|
||||||
ASSERT(code == 0);
|
|
||||||
ASSERT(pRollBackEntry != NULL);
|
|
||||||
|
|
||||||
if (syncUtilUserRollback(pRollBackEntry->msgType)) {
|
|
||||||
SRpcMsg rpcMsg;
|
|
||||||
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
|
|
||||||
|
|
||||||
SFsmCbMeta cbMeta = {0};
|
|
||||||
cbMeta.index = pRollBackEntry->index;
|
|
||||||
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
|
|
||||||
cbMeta.isWeak = pRollBackEntry->isWeak;
|
|
||||||
cbMeta.code = 0;
|
|
||||||
cbMeta.state = ths->state;
|
|
||||||
cbMeta.seqNum = pRollBackEntry->seqNum;
|
|
||||||
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
|
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
|
||||||
}
|
|
||||||
|
|
||||||
syncEntryDestory(pRollBackEntry);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// delete confict entries
|
|
||||||
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
|
|
||||||
ASSERT(code == 0);
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
// if FromIndex > walCommitVer, return 0
|
|
||||||
// else return num of pass entries
|
|
||||||
static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t pass = 0;
|
|
||||||
|
|
||||||
SyncIndex delBegin = FromIndex;
|
|
||||||
SyncIndex delEnd = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
|
|
||||||
|
|
||||||
// invert roll back!
|
|
||||||
for (SyncIndex index = delEnd; index >= delBegin; --index) {
|
|
||||||
if (ths->pFsm->FpRollBackCb != NULL) {
|
|
||||||
SSyncRaftEntry* pRollBackEntry;
|
|
||||||
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, index, &pRollBackEntry);
|
|
||||||
ASSERT(code == 0);
|
|
||||||
ASSERT(pRollBackEntry != NULL);
|
|
||||||
|
|
||||||
if (syncUtilUserRollback(pRollBackEntry->msgType)) {
|
|
||||||
SRpcMsg rpcMsg;
|
|
||||||
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
|
|
||||||
|
|
||||||
SFsmCbMeta cbMeta = {0};
|
|
||||||
cbMeta.index = pRollBackEntry->index;
|
|
||||||
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
|
|
||||||
cbMeta.isWeak = pRollBackEntry->isWeak;
|
|
||||||
cbMeta.code = 0;
|
|
||||||
cbMeta.state = ths->state;
|
|
||||||
cbMeta.seqNum = pRollBackEntry->seqNum;
|
|
||||||
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
|
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
|
||||||
}
|
|
||||||
|
|
||||||
syncEntryDestory(pRollBackEntry);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// update delete begin
|
|
||||||
SyncIndex walCommitVer = logStoreWalCommitVer(ths->pLogStore);
|
|
||||||
|
|
||||||
if (delBegin <= walCommitVer) {
|
|
||||||
delBegin = walCommitVer + 1;
|
|
||||||
pass = walCommitVer - delBegin + 1;
|
|
||||||
|
|
||||||
do {
|
|
||||||
char logBuf[128];
|
|
||||||
snprintf(logBuf, sizeof(logBuf), "update delete begin to %" PRId64, delBegin);
|
|
||||||
syncNodeEventLog(ths, logBuf);
|
|
||||||
} while (0);
|
|
||||||
}
|
|
||||||
|
|
||||||
// delete confict entries
|
|
||||||
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
|
|
||||||
ASSERT(code == 0);
|
|
||||||
|
|
||||||
do {
|
|
||||||
char logBuf[128];
|
|
||||||
snprintf(logBuf, sizeof(logBuf), "make log same from:%" PRId64 ", delbegin:%" PRId64 ", pass:%d", FromIndex,
|
|
||||||
delBegin, pass);
|
|
||||||
syncNodeEventLog(ths, logBuf);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
return pass;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code) {
|
|
||||||
SRpcMsg rpcMsg;
|
|
||||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
|
||||||
|
|
||||||
// leader transfer
|
|
||||||
if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
|
|
||||||
int32_t code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
|
|
||||||
ASSERT(code == 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ths->pFsm != NULL) {
|
|
||||||
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
|
|
||||||
SFsmCbMeta cbMeta = {0};
|
|
||||||
cbMeta.index = pEntry->index;
|
|
||||||
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
|
|
||||||
cbMeta.isWeak = pEntry->isWeak;
|
|
||||||
cbMeta.code = code;
|
|
||||||
cbMeta.state = ths->state;
|
|
||||||
cbMeta.seqNum = pEntry->seqNum;
|
|
||||||
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool syncNodeOnAppendEntriesBatchLogOK(SSyncNode* pSyncNode, SyncAppendEntriesBatch* pMsg) {
|
|
||||||
if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
|
|
||||||
if (pMsg->prevLogIndex > myLastIndex) {
|
|
||||||
sDebug("vgId:%d, sync log not ok, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1);
|
|
||||||
if (myPreLogTerm == SYNC_TERM_INVALID) {
|
|
||||||
sDebug("vgId:%d, sync log not ok2, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
sDebug("vgId:%d, sync log not ok3, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// really pre log match
|
|
||||||
// prevLogIndex == -1
|
|
||||||
static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries* pMsg) {
|
|
||||||
if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
|
|
||||||
if (pMsg->prevLogIndex > myLastIndex) {
|
|
||||||
sDebug("vgId:%d, sync log not ok, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1);
|
|
||||||
if (myPreLogTerm == SYNC_TERM_INVALID) {
|
|
||||||
sDebug("vgId:%d, sync log not ok2, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
sDebug("vgId:%d, sync log not ok3, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) {
|
int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) {
|
||||||
// maybe update commit index, leader notice me
|
// maybe update commit index, leader notice me
|
||||||
if (newCommitIndex > ths->commitIndex) {
|
if (newCommitIndex > ths->commitIndex) {
|
||||||
|
|
Loading…
Reference in New Issue