fix(sync): update nextindex, matchindex when backto nolog

This commit is contained in:
Minghao Li 2022-06-23 10:10:57 +08:00
parent 2f2715aca4
commit 9786ba2fbe
5 changed files with 71 additions and 13 deletions

View File

@ -28,6 +28,7 @@ extern bool gRaftDetailLog;
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1
#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF
typedef uint64_t SyncNodeId;
typedef int32_t SyncGroupId;

View File

@ -43,7 +43,7 @@ void setElectTimerMS(int64_t rid, int32_t electTimerMS);
void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS);
// for compatibility, the same as syncPropose
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak);
// utils
const char* syncUtilState2String(ESyncState state);
@ -468,7 +468,7 @@ typedef struct SyncLeaderTransfer {
SRaftId destId;
*/
SNodeInfo newNodeInfo;
SRaftId newLeaderId;
SRaftId newLeaderId;
} SyncLeaderTransfer;
SyncLeaderTransfer* syncLeaderTransferBuild(int32_t vgId);
@ -489,17 +489,16 @@ void syncLeaderTransferPrint2(char* s, const SyncLeaderTransfer* pMsg);
void syncLeaderTransferLog(const SyncLeaderTransfer* pMsg);
void syncLeaderTransferLog2(char* s, const SyncLeaderTransfer* pMsg);
// ---------------------------------------------
typedef struct SyncReconfigFinish {
uint32_t bytes;
int32_t vgId;
uint32_t msgType;
SSyncCfg oldCfg;
SSyncCfg newCfg;
uint32_t bytes;
int32_t vgId;
uint32_t msgType;
SSyncCfg oldCfg;
SSyncCfg newCfg;
SyncIndex newCfgIndex;
SyncTerm newCfgTerm;
uint64_t newCfgSeqNum;
SyncTerm newCfgTerm;
uint64_t newCfgSeqNum;
} SyncReconfigFinish;
@ -521,8 +520,6 @@ void syncReconfigFinishPrint2(char* s, const SyncReconfigFinish* pMsg);
void syncReconfigFinishLog(const SyncReconfigFinish* pMsg);
void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg);
// on message ----------------------
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);

View File

@ -436,6 +436,11 @@ static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries
}
SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1);
if (myPreLogTerm == SYNC_TERM_INVALID) {
sError("vgId:%d sync get pre term error, preindex:%ld", pSyncNode->vgId, pMsg->prevLogIndex);
return false;
}
if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) {
if (gRaftDetailLog) {
sTrace(

View File

@ -345,7 +345,7 @@ bool syncCanLeaderTransfer(int64_t rid) {
return matchOK;
}
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
int32_t ret = syncPropose(rid, pMsg, isWeak);
return ret;
}
@ -1888,6 +1888,16 @@ SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
return syncStartIndex;
}
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
SyncIndex preIndex = index - 1;
if (preIndex < SYNC_INDEX_INVALID) {
preIndex = SYNC_INDEX_INVALID;
}
return preIndex;
}
/*
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
ASSERT(index >= SYNC_INDEX_BEGIN);
@ -1900,7 +1910,42 @@ SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
SyncIndex preIndex = index - 1;
return preIndex;
}
*/
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
if (index < SYNC_INDEX_BEGIN) {
return SYNC_TERM_INVALID;
}
if (index == SYNC_INDEX_BEGIN) {
return 0;
}
SyncTerm preTerm = 0;
SyncIndex preIndex = index - 1;
SSyncRaftEntry* pPreEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
if (code == 0) {
ASSERT(pPreEntry != NULL);
preTerm = pPreEntry->term;
taosMemoryFree(pPreEntry);
return preTerm;
} else {
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex == preIndex) {
return snapshot.lastApplyTerm;
}
}
}
}
return SYNC_TERM_INVALID;
}
#if 0
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
ASSERT(index >= SYNC_INDEX_BEGIN);
@ -1938,6 +1983,7 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
ASSERT(0);
return -1;
}
#endif
#if 0
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {

View File

@ -139,6 +139,15 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
// pre index, pre term
SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex);
if (preLogTerm == SYNC_TERM_INVALID) {
SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex);
syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID);
sError("vgId:%d sync get pre term error, nextIndex:%ld, update next-index:%ld, match-index:%d, raftid:%ld",
pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr);
return -1;
}
// batch optimized
// SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex);