Merge pull request #17709 from taosdata/feature/3.0_mhli
refactor(sync): delete some code
This commit is contained in:
commit
8f88339988
|
@ -89,188 +89,6 @@
|
|||
// /\ UNCHANGED <<candidateVars, leaderVars>>
|
||||
//
|
||||
|
||||
static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||
int32_t code;
|
||||
|
||||
SyncIndex delBegin = pMsg->prevLogIndex + 1;
|
||||
SyncIndex delEnd = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
|
||||
|
||||
// invert roll back!
|
||||
for (SyncIndex index = delEnd; index >= delBegin; --index) {
|
||||
if (ths->pFsm->FpRollBackCb != NULL) {
|
||||
SSyncRaftEntry* pRollBackEntry;
|
||||
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, index, &pRollBackEntry);
|
||||
ASSERT(code == 0);
|
||||
ASSERT(pRollBackEntry != NULL);
|
||||
|
||||
if (syncUtilUserRollback(pRollBackEntry->msgType)) {
|
||||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
|
||||
|
||||
SFsmCbMeta cbMeta = {0};
|
||||
cbMeta.index = pRollBackEntry->index;
|
||||
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
|
||||
cbMeta.isWeak = pRollBackEntry->isWeak;
|
||||
cbMeta.code = 0;
|
||||
cbMeta.state = ths->state;
|
||||
cbMeta.seqNum = pRollBackEntry->seqNum;
|
||||
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
}
|
||||
|
||||
syncEntryDestory(pRollBackEntry);
|
||||
}
|
||||
}
|
||||
|
||||
// delete confict entries
|
||||
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
|
||||
ASSERT(code == 0);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
// if FromIndex > walCommitVer, return 0
|
||||
// else return num of pass entries
|
||||
static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) {
|
||||
int32_t code = 0;
|
||||
int32_t pass = 0;
|
||||
|
||||
SyncIndex delBegin = FromIndex;
|
||||
SyncIndex delEnd = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
|
||||
|
||||
// invert roll back!
|
||||
for (SyncIndex index = delEnd; index >= delBegin; --index) {
|
||||
if (ths->pFsm->FpRollBackCb != NULL) {
|
||||
SSyncRaftEntry* pRollBackEntry;
|
||||
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, index, &pRollBackEntry);
|
||||
ASSERT(code == 0);
|
||||
ASSERT(pRollBackEntry != NULL);
|
||||
|
||||
if (syncUtilUserRollback(pRollBackEntry->msgType)) {
|
||||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
|
||||
|
||||
SFsmCbMeta cbMeta = {0};
|
||||
cbMeta.index = pRollBackEntry->index;
|
||||
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
|
||||
cbMeta.isWeak = pRollBackEntry->isWeak;
|
||||
cbMeta.code = 0;
|
||||
cbMeta.state = ths->state;
|
||||
cbMeta.seqNum = pRollBackEntry->seqNum;
|
||||
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
}
|
||||
|
||||
syncEntryDestory(pRollBackEntry);
|
||||
}
|
||||
}
|
||||
|
||||
// update delete begin
|
||||
SyncIndex walCommitVer = logStoreWalCommitVer(ths->pLogStore);
|
||||
|
||||
if (delBegin <= walCommitVer) {
|
||||
delBegin = walCommitVer + 1;
|
||||
pass = walCommitVer - delBegin + 1;
|
||||
|
||||
do {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "update delete begin to %" PRId64, delBegin);
|
||||
syncNodeEventLog(ths, logBuf);
|
||||
} while (0);
|
||||
}
|
||||
|
||||
// delete confict entries
|
||||
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
|
||||
ASSERT(code == 0);
|
||||
|
||||
do {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "make log same from:%" PRId64 ", delbegin:%" PRId64 ", pass:%d", FromIndex,
|
||||
delBegin, pass);
|
||||
syncNodeEventLog(ths, logBuf);
|
||||
} while (0);
|
||||
|
||||
return pass;
|
||||
}
|
||||
|
||||
int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code) {
|
||||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
// leader transfer
|
||||
if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
|
||||
int32_t code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
|
||||
ASSERT(code == 0);
|
||||
}
|
||||
|
||||
if (ths->pFsm != NULL) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
|
||||
SFsmCbMeta cbMeta = {0};
|
||||
cbMeta.index = pEntry->index;
|
||||
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
|
||||
cbMeta.isWeak = pEntry->isWeak;
|
||||
cbMeta.code = code;
|
||||
cbMeta.state = ths->state;
|
||||
cbMeta.seqNum = pEntry->seqNum;
|
||||
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
|
||||
}
|
||||
}
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static bool syncNodeOnAppendEntriesBatchLogOK(SSyncNode* pSyncNode, SyncAppendEntriesBatch* pMsg) {
|
||||
if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) {
|
||||
return true;
|
||||
}
|
||||
|
||||
SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
|
||||
if (pMsg->prevLogIndex > myLastIndex) {
|
||||
sDebug("vgId:%d, sync log not ok, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
|
||||
return false;
|
||||
}
|
||||
|
||||
SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1);
|
||||
if (myPreLogTerm == SYNC_TERM_INVALID) {
|
||||
sDebug("vgId:%d, sync log not ok2, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) {
|
||||
return true;
|
||||
}
|
||||
|
||||
sDebug("vgId:%d, sync log not ok3, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
|
||||
return false;
|
||||
}
|
||||
|
||||
// really pre log match
|
||||
// prevLogIndex == -1
|
||||
static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries* pMsg) {
|
||||
if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) {
|
||||
return true;
|
||||
}
|
||||
|
||||
SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
|
||||
if (pMsg->prevLogIndex > myLastIndex) {
|
||||
sDebug("vgId:%d, sync log not ok, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
|
||||
return false;
|
||||
}
|
||||
|
||||
SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1);
|
||||
if (myPreLogTerm == SYNC_TERM_INVALID) {
|
||||
sDebug("vgId:%d, sync log not ok2, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) {
|
||||
return true;
|
||||
}
|
||||
|
||||
sDebug("vgId:%d, sync log not ok3, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) {
|
||||
// maybe update commit index, leader notice me
|
||||
if (newCommitIndex > ths->commitIndex) {
|
||||
|
|
Loading…
Reference in New Issue