Merge pull request #20017 from taosdata/FIX/TD-22285-main
fix: synchronize access to raftStore
This commit is contained in:
commit
4353e70c89
|
@ -71,6 +71,7 @@ typedef struct SRaftId {
|
|||
typedef struct SRaftStore {
|
||||
SyncTerm currentTerm;
|
||||
SRaftId voteFor;
|
||||
TdThreadMutex mutex;
|
||||
} SRaftStore;
|
||||
|
||||
typedef struct SSyncHbTimerData {
|
||||
|
@ -282,7 +283,7 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode);
|
|||
|
||||
// raft vote --------------
|
||||
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId);
|
||||
void syncNodeVoteForSelf(SSyncNode* pSyncNode);
|
||||
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm term);
|
||||
|
||||
// log replication
|
||||
SSyncLogReplMgr* syncNodeGetLogReplMgr(SSyncNode* pNode, SRaftId* pDestId);
|
||||
|
|
|
@ -26,14 +26,15 @@ extern "C" {
|
|||
#define RAFT_STORE_PATH_LEN (TSDB_FILENAME_LEN * 2)
|
||||
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
|
||||
|
||||
int32_t raftStoreReadFile(SSyncNode *pNode);
|
||||
int32_t raftStoreWriteFile(SSyncNode *pNode);
|
||||
int32_t raftStoreOpen(SSyncNode *pNode);
|
||||
void raftStoreClose(SSyncNode *pNode);
|
||||
|
||||
bool raftStoreHasVoted(SSyncNode *pNode);
|
||||
void raftStoreVote(SSyncNode *pNode, SRaftId *pRaftId);
|
||||
void raftStoreClearVote(SSyncNode *pNode);
|
||||
void raftStoreNextTerm(SSyncNode *pNode);
|
||||
void raftStoreSetTerm(SSyncNode *pNode, SyncTerm term);
|
||||
SyncTerm raftStoreGetTerm(SSyncNode *pNode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -120,17 +120,17 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
// prepare response msg
|
||||
pReply->srcId = ths->myRaftId;
|
||||
pReply->destId = pMsg->srcId;
|
||||
pReply->term = ths->raftStore.currentTerm;
|
||||
pReply->term = raftStoreGetTerm(ths);
|
||||
pReply->success = false;
|
||||
pReply->matchIndex = SYNC_INDEX_INVALID;
|
||||
pReply->lastSendIndex = pMsg->prevLogIndex + 1;
|
||||
pReply->startTime = ths->startTime;
|
||||
|
||||
if (pMsg->term < ths->raftStore.currentTerm) {
|
||||
if (pMsg->term < raftStoreGetTerm(ths)) {
|
||||
goto _SEND_RESPONSE;
|
||||
}
|
||||
|
||||
if (pMsg->term > ths->raftStore.currentTerm) {
|
||||
if (pMsg->term > raftStoreGetTerm(ths)) {
|
||||
pReply->term = pMsg->term;
|
||||
}
|
||||
|
||||
|
|
|
@ -50,19 +50,19 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
}
|
||||
|
||||
// drop stale response
|
||||
if (pMsg->term < ths->raftStore.currentTerm) {
|
||||
if (pMsg->term < raftStoreGetTerm(ths)) {
|
||||
syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response");
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||
if (pMsg->term > ths->raftStore.currentTerm) {
|
||||
if (pMsg->term > raftStoreGetTerm(ths)) {
|
||||
syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
|
||||
syncNodeStepDown(ths, pMsg->term);
|
||||
return -1;
|
||||
}
|
||||
|
||||
ASSERT(pMsg->term == ths->raftStore.currentTerm);
|
||||
ASSERT(pMsg->term == raftStoreGetTerm(ths));
|
||||
|
||||
sTrace("vgId:%d, received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "",
|
||||
pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex);
|
||||
|
|
|
@ -111,7 +111,7 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) {
|
|||
SyncIndex commitIndex = indexLikely;
|
||||
syncNodeUpdateCommitIndex(ths, commitIndex);
|
||||
sTrace("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index:%" PRId64 "", ths->vgId, ths->state,
|
||||
ths->raftStore.currentTerm, commitIndex);
|
||||
raftStoreGetTerm(ths), commitIndex);
|
||||
}
|
||||
return ths->commitIndex;
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) {
|
|||
SyncRequestVote* pMsg = rpcMsg.pCont;
|
||||
pMsg->srcId = pNode->myRaftId;
|
||||
pMsg->destId = pNode->peersId[i];
|
||||
pMsg->term = pNode->raftStore.currentTerm;
|
||||
pMsg->term = raftStoreGetTerm(pNode);
|
||||
|
||||
ret = syncNodeGetLastIndexTerm(pNode, &pMsg->lastLogIndex, &pMsg->lastLogTerm);
|
||||
if (ret < 0) {
|
||||
|
@ -85,10 +85,12 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
|
|||
// start election
|
||||
raftStoreNextTerm(pSyncNode);
|
||||
raftStoreClearVote(pSyncNode);
|
||||
voteGrantedReset(pSyncNode->pVotesGranted, pSyncNode->raftStore.currentTerm);
|
||||
votesRespondReset(pSyncNode->pVotesRespond, pSyncNode->raftStore.currentTerm);
|
||||
|
||||
syncNodeVoteForSelf(pSyncNode);
|
||||
SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
|
||||
voteGrantedReset(pSyncNode->pVotesGranted, currentTerm);
|
||||
votesRespondReset(pSyncNode->pVotesRespond, currentTerm);
|
||||
syncNodeVoteForSelf(pSyncNode, currentTerm);
|
||||
|
||||
if (voteGrantedMajority(pSyncNode->pVotesGranted)) {
|
||||
// only myself, to leader
|
||||
ASSERT(!pSyncNode->pVotesGranted->toLeader);
|
||||
|
|
|
@ -41,7 +41,6 @@
|
|||
static void syncNodeEqPingTimer(void* param, void* tmrId);
|
||||
static void syncNodeEqElectTimer(void* param, void* tmrId);
|
||||
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
|
||||
static int32_t syncNodeEqNoop(SSyncNode* ths);
|
||||
static int32_t syncNodeAppendNoop(SSyncNode* ths);
|
||||
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
|
||||
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
|
||||
|
@ -468,7 +467,7 @@ bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
|
|||
}
|
||||
|
||||
if (code == 0 && pEntry != NULL) {
|
||||
if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->raftStore.currentTerm) {
|
||||
if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == raftStoreGetTerm(pSyncNode)) {
|
||||
ready = true;
|
||||
}
|
||||
|
||||
|
@ -664,7 +663,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_
|
|||
int32_t code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
|
||||
if (code == 0) {
|
||||
pMsg->info.conn.applyIndex = retIndex;
|
||||
pMsg->info.conn.applyTerm = pSyncNode->raftStore.currentTerm;
|
||||
pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
|
||||
sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
|
||||
TMSG_INFO(pMsg->msgType));
|
||||
return 1;
|
||||
|
@ -911,7 +910,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
|
|||
|
||||
// init TLA+ server vars
|
||||
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
|
||||
if (raftStoreReadFile(pSyncNode) != 0) {
|
||||
if (raftStoreOpen(pSyncNode) != 0) {
|
||||
sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
|
||||
goto _error;
|
||||
}
|
||||
|
@ -1212,7 +1211,12 @@ void syncNodeClose(SSyncNode* pSyncNode) {
|
|||
if (pSyncNode == NULL) return;
|
||||
sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
|
||||
|
||||
syncNodeStopPingTimer(pSyncNode);
|
||||
syncNodeStopElectTimer(pSyncNode);
|
||||
syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
|
||||
syncNodeLogReplMgrDestroy(pSyncNode);
|
||||
|
||||
syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
|
||||
pSyncNode->pSyncRespMgr = NULL;
|
||||
voteGrantedDestroy(pSyncNode->pVotesGranted);
|
||||
|
@ -1228,10 +1232,6 @@ void syncNodeClose(SSyncNode* pSyncNode) {
|
|||
syncLogBufferDestroy(pSyncNode->pLogBuf);
|
||||
pSyncNode->pLogBuf = NULL;
|
||||
|
||||
syncNodeStopPingTimer(pSyncNode);
|
||||
syncNodeStopElectTimer(pSyncNode);
|
||||
syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
|
||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
if (pSyncNode->senders[i] != NULL) {
|
||||
sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
|
||||
|
@ -1259,6 +1259,8 @@ void syncNodeClose(SSyncNode* pSyncNode) {
|
|||
taosMemoryFree(pSyncNode->pFsm);
|
||||
}
|
||||
|
||||
raftStoreClose(pSyncNode);
|
||||
|
||||
taosMemoryFree(pSyncNode);
|
||||
}
|
||||
|
||||
|
@ -1633,7 +1635,7 @@ _END:
|
|||
|
||||
// raft state change --------------
|
||||
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
|
||||
if (term > pSyncNode->raftStore.currentTerm) {
|
||||
if (term > raftStoreGetTerm(pSyncNode)) {
|
||||
raftStoreSetTerm(pSyncNode, term);
|
||||
char tmpBuf[64];
|
||||
snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
|
||||
|
@ -1643,24 +1645,23 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
|
|||
}
|
||||
|
||||
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
|
||||
if (term > pSyncNode->raftStore.currentTerm) {
|
||||
if (term > raftStoreGetTerm(pSyncNode)) {
|
||||
raftStoreSetTerm(pSyncNode, term);
|
||||
}
|
||||
}
|
||||
|
||||
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
|
||||
if (pSyncNode->raftStore.currentTerm > newTerm) {
|
||||
sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
|
||||
pSyncNode->raftStore.currentTerm);
|
||||
SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
|
||||
if (currentTerm > newTerm) {
|
||||
sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
|
||||
return;
|
||||
}
|
||||
|
||||
do {
|
||||
sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
|
||||
pSyncNode->raftStore.currentTerm);
|
||||
sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
|
||||
} while (0);
|
||||
|
||||
if (pSyncNode->raftStore.currentTerm < newTerm) {
|
||||
if (currentTerm < newTerm) {
|
||||
raftStoreSetTerm(pSyncNode, newTerm);
|
||||
char tmpBuf[64];
|
||||
snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
|
||||
|
@ -1820,8 +1821,8 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
|
|||
|
||||
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
||||
ASSERT(lastIndex >= 0);
|
||||
sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "",
|
||||
pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex);
|
||||
sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", pSyncNode->vgId,
|
||||
raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
|
||||
}
|
||||
|
||||
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }
|
||||
|
@ -1840,7 +1841,7 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
|
|||
pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
|
||||
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
||||
sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
|
||||
pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex);
|
||||
pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
|
||||
|
||||
sNTrace(pSyncNode, "follower to candidate");
|
||||
}
|
||||
|
@ -1850,7 +1851,7 @@ void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
|
|||
syncNodeBecomeFollower(pSyncNode, "leader to follower");
|
||||
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
||||
sInfo("vgId:%d, become follower from leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
|
||||
pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex);
|
||||
pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
|
||||
|
||||
sNTrace(pSyncNode, "leader to follower");
|
||||
}
|
||||
|
@ -1860,7 +1861,7 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
|
|||
syncNodeBecomeFollower(pSyncNode, "candidate to follower");
|
||||
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
||||
sInfo("vgId:%d, become follower from candidate. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
|
||||
pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex);
|
||||
pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
|
||||
|
||||
sNTrace(pSyncNode, "candidate to follower");
|
||||
}
|
||||
|
@ -1868,7 +1869,7 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
|
|||
// just called by syncNodeVoteForSelf
|
||||
// need assert
|
||||
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
|
||||
ASSERT(term == pSyncNode->raftStore.currentTerm);
|
||||
ASSERT(term == raftStoreGetTerm(pSyncNode));
|
||||
bool voted = raftStoreHasVoted(pSyncNode);
|
||||
ASSERT(!voted);
|
||||
|
||||
|
@ -1876,8 +1877,8 @@ void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId)
|
|||
}
|
||||
|
||||
// simulate get vote from outside
|
||||
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
|
||||
syncNodeVoteForTerm(pSyncNode, pSyncNode->raftStore.currentTerm, &pSyncNode->myRaftId);
|
||||
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
|
||||
syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
|
||||
|
@ -1886,7 +1887,7 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
|
|||
SyncRequestVoteReply* pMsg = rpcMsg.pCont;
|
||||
pMsg->srcId = pSyncNode->myRaftId;
|
||||
pMsg->destId = pSyncNode->myRaftId;
|
||||
pMsg->term = pSyncNode->raftStore.currentTerm;
|
||||
pMsg->term = currentTerm;
|
||||
pMsg->voteGranted = true;
|
||||
|
||||
voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
|
||||
|
@ -2199,7 +2200,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
|||
SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
|
||||
pSyncMsg->srcId = pSyncNode->myRaftId;
|
||||
pSyncMsg->destId = pData->destId;
|
||||
pSyncMsg->term = pSyncNode->raftStore.currentTerm;
|
||||
pSyncMsg->term = raftStoreGetTerm(pSyncNode);
|
||||
pSyncMsg->commitIndex = pSyncNode->commitIndex;
|
||||
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
|
||||
pSyncMsg->privateTerm = 0;
|
||||
|
@ -2238,30 +2239,6 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
|||
syncNodeRelease(pSyncNode);
|
||||
}
|
||||
|
||||
static int32_t syncNodeEqNoop(SSyncNode* pNode) {
|
||||
if (pNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||
terrno = TSDB_CODE_SYN_NOT_LEADER;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SyncIndex index = pNode->pLogStore->syncLogWriteIndex(pNode->pLogStore);
|
||||
SyncTerm term = pNode->raftStore.currentTerm;
|
||||
SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, pNode->vgId);
|
||||
if (pEntry == NULL) return -1;
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
int32_t code = syncBuildClientRequestFromNoopEntry(&rpcMsg, pEntry, pNode->vgId);
|
||||
syncEntryDestroy(pEntry);
|
||||
|
||||
sNTrace(pNode, "propose msg, type:noop");
|
||||
code = (*pNode->syncEqMsg)(pNode->msgcb, &rpcMsg);
|
||||
if (code != 0) {
|
||||
sError("failed to propose noop msg while enqueue since %s", terrstr());
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }
|
||||
|
||||
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
|
||||
|
@ -2291,7 +2268,7 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
|
|||
if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
|
||||
sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
|
||||
terrno = TSDB_CODE_SYN_BUFFER_FULL;
|
||||
(void)syncLogFsmExecute(ths, ths->pFsm, ths->state, ths->raftStore.currentTerm, pEntry, TSDB_CODE_SYN_BUFFER_FULL);
|
||||
(void)syncLogFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, TSDB_CODE_SYN_BUFFER_FULL);
|
||||
syncEntryDestroy(pEntry);
|
||||
return -1;
|
||||
}
|
||||
|
@ -2364,7 +2341,7 @@ bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
|
|||
|
||||
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
|
||||
SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
|
||||
SyncTerm term = ths->raftStore.currentTerm;
|
||||
SyncTerm term = raftStoreGetTerm(ths);
|
||||
|
||||
SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
|
||||
if (pEntry == NULL) {
|
||||
|
@ -2380,7 +2357,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
|
|||
int32_t ret = 0;
|
||||
|
||||
SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
|
||||
SyncTerm term = ths->raftStore.currentTerm;
|
||||
SyncTerm term = raftStoreGetTerm(ths);
|
||||
SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
|
||||
ASSERT(pEntry != NULL);
|
||||
|
||||
|
@ -2418,16 +2395,17 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
(void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);
|
||||
SyncTerm currentTerm = raftStoreGetTerm(ths);
|
||||
|
||||
SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
|
||||
pMsgReply->destId = pMsg->srcId;
|
||||
pMsgReply->srcId = ths->myRaftId;
|
||||
pMsgReply->term = ths->raftStore.currentTerm;
|
||||
pMsgReply->term = currentTerm;
|
||||
pMsgReply->privateTerm = 8864; // magic number
|
||||
pMsgReply->startTime = ths->startTime;
|
||||
pMsgReply->timeStamp = tsMs;
|
||||
|
||||
if (pMsg->term == ths->raftStore.currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
|
||||
if (pMsg->term == currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
|
||||
syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
|
||||
|
||||
syncNodeResetElectTimer(ths);
|
||||
|
@ -2456,7 +2434,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
if (pMsg->term >= ths->raftStore.currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
|
||||
if (pMsg->term >= currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
|
||||
// syncNodeStepDown(ths, pMsg->term);
|
||||
SRpcMsg rpcMsgLocalCmd = {0};
|
||||
(void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);
|
||||
|
@ -2565,7 +2543,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
|
|||
int32_t code = 0;
|
||||
|
||||
SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
|
||||
SyncTerm term = ths->raftStore.currentTerm;
|
||||
SyncTerm term = raftStoreGetTerm(ths);
|
||||
SSyncRaftEntry* pEntry = NULL;
|
||||
if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
||||
pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
|
||||
|
@ -2609,73 +2587,6 @@ const char* syncStr(ESyncState state) {
|
|||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
|
||||
if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
|
||||
sNTrace(ths, "I am not follower, can not do leader transfer");
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!ths->restoreFinish) {
|
||||
sNTrace(ths, "restore not finish, can not do leader transfer");
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (pEntry->term < ths->raftStore.currentTerm) {
|
||||
sNTrace(ths, "little term:%" PRId64 ", can not do leader transfer", pEntry->term);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (pEntry->index < syncNodeGetLastIndex(ths)) {
|
||||
sNTrace(ths, "little index:%" PRId64 ", can not do leader transfer", pEntry->index);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
if (ths->vgId > 1) {
|
||||
sNTrace(ths, "I am vnode, can not do leader transfer");
|
||||
return 0;
|
||||
}
|
||||
*/
|
||||
|
||||
SyncLeaderTransfer* pSyncLeaderTransfer = pRpcMsg->pCont;
|
||||
sNTrace(ths, "do leader transfer, index:%" PRId64, pEntry->index);
|
||||
|
||||
bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
|
||||
bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
|
||||
pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort;
|
||||
|
||||
bool same = sameId || sameNodeInfo;
|
||||
if (same) {
|
||||
// reset elect timer now!
|
||||
int32_t electMS = 1;
|
||||
int32_t ret = syncNodeRestartElectTimer(ths, electMS);
|
||||
ASSERT(ret == 0);
|
||||
|
||||
sNTrace(ths, "maybe leader transfer to %s:%d %" PRId64, pSyncLeaderTransfer->newNodeInfo.nodeFqdn,
|
||||
pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr);
|
||||
}
|
||||
|
||||
if (ths->pFsm->FpLeaderTransferCb != NULL) {
|
||||
SFsmCbMeta cbMeta = {
|
||||
.code = 0,
|
||||
.currentTerm = ths->raftStore.currentTerm,
|
||||
.flag = 0,
|
||||
.index = pEntry->index,
|
||||
.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
|
||||
.isWeak = pEntry->isWeak,
|
||||
.seqNum = pEntry->seqNum,
|
||||
.state = ths->state,
|
||||
.term = pEntry->term,
|
||||
};
|
||||
ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
|
||||
for (int32_t i = 0; i < pNewCfg->replicaNum; ++i) {
|
||||
SRaftId raftId = {
|
||||
|
|
|
@ -176,7 +176,7 @@ int32_t syncBuildAppendEntriesFromRaftEntry(SSyncNode* pNode, SSyncRaftEntry* pE
|
|||
pMsg->prevLogTerm = prevLogTerm;
|
||||
pMsg->vgId = pNode->vgId;
|
||||
pMsg->srcId = pNode->myRaftId;
|
||||
pMsg->term = pNode->raftStore.currentTerm;
|
||||
pMsg->term = raftStoreGetTerm(pNode);
|
||||
pMsg->commitIndex = pNode->commitIndex;
|
||||
pMsg->privateTerm = 0;
|
||||
return 0;
|
||||
|
|
|
@ -61,6 +61,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
|||
SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem;
|
||||
ASSERTS(pMatch != NULL, "no matched log entry");
|
||||
ASSERT(pMatch->index + 1 == index);
|
||||
ASSERT(pMatch->term <= pEntry->term);
|
||||
|
||||
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
|
||||
pBuf->entries[index % pBuf->size] = tmp;
|
||||
|
@ -514,7 +515,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
|
|||
SSyncLogStore* pLogStore = pNode->pLogStore;
|
||||
SSyncFSM* pFsm = pNode->pFsm;
|
||||
ESyncState role = pNode->state;
|
||||
SyncTerm term = pNode->raftStore.currentTerm;
|
||||
SyncTerm currentTerm = raftStoreGetTerm(pNode);
|
||||
SyncGroupId vgId = pNode->vgId;
|
||||
int32_t ret = -1;
|
||||
int64_t upperIndex = TMIN(commitIndex, pBuf->matchIndex);
|
||||
|
@ -529,7 +530,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
|
|||
}
|
||||
|
||||
sTrace("vgId:%d, commit. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), role:%d, term:%" PRId64,
|
||||
pNode->vgId, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, role, term);
|
||||
pNode->vgId, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, role, currentTerm);
|
||||
|
||||
// execute in fsm
|
||||
for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) {
|
||||
|
@ -545,16 +546,16 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
|
|||
pEntry->term, TMSG_INFO(pEntry->originalRpcType));
|
||||
}
|
||||
|
||||
if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry, 0) != 0) {
|
||||
if (syncLogFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0) != 0) {
|
||||
sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64
|
||||
", role:%d, current term:%" PRId64,
|
||||
vgId, pEntry->index, pEntry->term, role, term);
|
||||
vgId, pEntry->index, pEntry->term, role, currentTerm);
|
||||
goto _out;
|
||||
}
|
||||
pBuf->commitIndex = index;
|
||||
|
||||
sTrace("vgId:%d, committed index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64 "", pNode->vgId,
|
||||
pEntry->index, pEntry->term, role, term);
|
||||
pEntry->index, pEntry->term, role, currentTerm);
|
||||
|
||||
if (!inBuf) {
|
||||
syncEntryDestroy(pEntry);
|
||||
|
@ -576,7 +577,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
|
|||
_out:
|
||||
// mark as restored if needed
|
||||
if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL &&
|
||||
pNode->raftStore.currentTerm <= pEntry->term) {
|
||||
currentTerm <= pEntry->term) {
|
||||
pNode->pFsm->FpRestoreFinishCb(pNode->pFsm);
|
||||
pNode->restoreFinish = true;
|
||||
sInfo("vgId:%d, restore finished. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
|
||||
|
|
|
@ -18,6 +18,9 @@
|
|||
#include "syncUtil.h"
|
||||
#include "tjson.h"
|
||||
|
||||
int32_t raftStoreReadFile(SSyncNode *pNode);
|
||||
int32_t raftStoreWriteFile(SSyncNode *pNode);
|
||||
|
||||
static int32_t raftStoreDecode(const SJson *pJson, SRaftStore *pStore) {
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -150,27 +153,53 @@ _OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t raftStoreOpen(SSyncNode *pNode) {
|
||||
taosThreadMutexInit(&pNode->raftStore.mutex, NULL);
|
||||
return raftStoreReadFile(pNode);
|
||||
}
|
||||
|
||||
void raftStoreClose(SSyncNode *pNode) { taosThreadMutexDestroy(&pNode->raftStore.mutex); }
|
||||
|
||||
bool raftStoreHasVoted(SSyncNode *pNode) {
|
||||
taosThreadMutexLock(&pNode->raftStore.mutex);
|
||||
bool b = syncUtilEmptyId(&pNode->raftStore.voteFor);
|
||||
taosThreadMutexUnlock(&pNode->raftStore.mutex);
|
||||
return (!b);
|
||||
}
|
||||
|
||||
void raftStoreVote(SSyncNode *pNode, SRaftId *pRaftId) {
|
||||
taosThreadMutexLock(&pNode->raftStore.mutex);
|
||||
pNode->raftStore.voteFor = *pRaftId;
|
||||
(void)raftStoreWriteFile(pNode);
|
||||
taosThreadMutexUnlock(&pNode->raftStore.mutex);
|
||||
}
|
||||
|
||||
void raftStoreClearVote(SSyncNode *pNode) {
|
||||
taosThreadMutexLock(&pNode->raftStore.mutex);
|
||||
pNode->raftStore.voteFor = EMPTY_RAFT_ID;
|
||||
(void)raftStoreWriteFile(pNode);
|
||||
taosThreadMutexUnlock(&pNode->raftStore.mutex);
|
||||
}
|
||||
|
||||
void raftStoreNextTerm(SSyncNode *pNode) {
|
||||
taosThreadMutexLock(&pNode->raftStore.mutex);
|
||||
pNode->raftStore.currentTerm++;
|
||||
(void)raftStoreWriteFile(pNode);
|
||||
taosThreadMutexUnlock(&pNode->raftStore.mutex);
|
||||
}
|
||||
|
||||
void raftStoreSetTerm(SSyncNode *pNode, SyncTerm term) {
|
||||
pNode->raftStore.currentTerm = term;
|
||||
(void)raftStoreWriteFile(pNode);
|
||||
taosThreadMutexLock(&pNode->raftStore.mutex);
|
||||
if (pNode->raftStore.currentTerm < term) {
|
||||
pNode->raftStore.currentTerm = term;
|
||||
(void)raftStoreWriteFile(pNode);
|
||||
}
|
||||
taosThreadMutexUnlock(&pNode->raftStore.mutex);
|
||||
}
|
||||
|
||||
SyncTerm raftStoreGetTerm(SSyncNode *pNode) {
|
||||
taosThreadMutexLock(&pNode->raftStore.mutex);
|
||||
SyncTerm term = pNode->raftStore.currentTerm;
|
||||
taosThreadMutexUnlock(&pNode->raftStore.mutex);
|
||||
return term;
|
||||
}
|
||||
|
|
|
@ -107,7 +107,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
|
|||
SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
|
||||
pSyncMsg->srcId = pSyncNode->myRaftId;
|
||||
pSyncMsg->destId = pSyncNode->peersId[i];
|
||||
pSyncMsg->term = pSyncNode->raftStore.currentTerm;
|
||||
pSyncMsg->term = raftStoreGetTerm(pSyncNode);
|
||||
pSyncMsg->commitIndex = pSyncNode->commitIndex;
|
||||
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
|
||||
pSyncMsg->privateTerm = 0;
|
||||
|
|
|
@ -97,15 +97,14 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
}
|
||||
|
||||
bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg);
|
||||
|
||||
// maybe update term
|
||||
if (pMsg->term > ths->raftStore.currentTerm) {
|
||||
if (pMsg->term > raftStoreGetTerm(ths)) {
|
||||
syncNodeStepDown(ths, pMsg->term);
|
||||
// syncNodeUpdateTerm(ths, pMsg->term);
|
||||
}
|
||||
ASSERT(pMsg->term <= ths->raftStore.currentTerm);
|
||||
SyncTerm currentTerm = raftStoreGetTerm(ths);
|
||||
ASSERT(pMsg->term <= currentTerm);
|
||||
|
||||
bool grant = (pMsg->term == ths->raftStore.currentTerm) && logOK &&
|
||||
bool grant = (pMsg->term == currentTerm) && logOK &&
|
||||
((!raftStoreHasVoted(ths)) || (syncUtilSameId(&ths->raftStore.voteFor, &pMsg->srcId)));
|
||||
if (grant) {
|
||||
// maybe has already voted for pMsg->srcId
|
||||
|
@ -113,7 +112,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
raftStoreVote(ths, &(pMsg->srcId));
|
||||
|
||||
// candidate ?
|
||||
syncNodeStepDown(ths, ths->raftStore.currentTerm);
|
||||
syncNodeStepDown(ths, currentTerm);
|
||||
|
||||
// forbid elect for this round
|
||||
syncNodeResetElectTimer(ths);
|
||||
|
@ -127,8 +126,9 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
SyncRequestVoteReply* pReply = rpcMsg.pCont;
|
||||
pReply->srcId = ths->myRaftId;
|
||||
pReply->destId = pMsg->srcId;
|
||||
pReply->term = ths->raftStore.currentTerm;
|
||||
pReply->term = currentTerm;
|
||||
pReply->voteGranted = grant;
|
||||
ASSERT(!grant || pMsg->term == pReply->term);
|
||||
|
||||
// trace log
|
||||
syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, "");
|
||||
|
|
|
@ -47,27 +47,21 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
syncLogRecvRequestVoteReply(ths, pMsg, "not in my config");
|
||||
return -1;
|
||||
}
|
||||
|
||||
SyncTerm currentTerm = raftStoreGetTerm(ths);
|
||||
// drop stale response
|
||||
if (pMsg->term < ths->raftStore.currentTerm) {
|
||||
if (pMsg->term < currentTerm) {
|
||||
syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response");
|
||||
return -1;
|
||||
}
|
||||
|
||||
// ASSERT(!(pMsg->term > ths->raftStore.currentTerm));
|
||||
// no need this code, because if I receive reply.term, then I must have sent for that term.
|
||||
// if (pMsg->term > ths->raftStore.currentTerm) {
|
||||
// syncNodeUpdateTerm(ths, pMsg->term);
|
||||
// }
|
||||
|
||||
if (pMsg->term > ths->raftStore.currentTerm) {
|
||||
if (pMsg->term > currentTerm) {
|
||||
syncLogRecvRequestVoteReply(ths, pMsg, "error term");
|
||||
syncNodeStepDown(ths, pMsg->term);
|
||||
return -1;
|
||||
}
|
||||
|
||||
syncLogRecvRequestVoteReply(ths, pMsg, "");
|
||||
ASSERT(pMsg->term == ths->raftStore.currentTerm);
|
||||
ASSERT(pMsg->term == currentTerm);
|
||||
|
||||
// This tallies votes even when the current state is not Candidate,
|
||||
// but they won't be looked at, so it doesn't matter.
|
||||
|
|
|
@ -143,7 +143,7 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
|
|||
.state = pNode->state,
|
||||
.seqNum = *pSeqNum,
|
||||
.term = SYNC_TERM_INVALID,
|
||||
.currentTerm = pNode->raftStore.currentTerm,
|
||||
.currentTerm = SYNC_TERM_INVALID,
|
||||
.flag = 0,
|
||||
};
|
||||
|
||||
|
|
|
@ -43,7 +43,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
|
|||
pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
|
||||
pSender->pSyncNode = pSyncNode;
|
||||
pSender->replicaIndex = replicaIndex;
|
||||
pSender->term = pSyncNode->raftStore.currentTerm;
|
||||
pSender->term = raftStoreGetTerm(pSyncNode);
|
||||
pSender->startTime = 0;
|
||||
pSender->endTime = 0;
|
||||
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
|
||||
|
@ -90,7 +90,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
|
|||
|
||||
memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig));
|
||||
pSender->sendingMS = 0;
|
||||
pSender->term = pSender->pSyncNode->raftStore.currentTerm;
|
||||
pSender->term = raftStoreGetTerm(pSender->pSyncNode);
|
||||
pSender->startTime = taosGetTimestampMs();
|
||||
pSender->lastSendTime = pSender->startTime;
|
||||
pSender->finish = false;
|
||||
|
@ -105,7 +105,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
|
|||
SyncSnapshotSend *pMsg = rpcMsg.pCont;
|
||||
pMsg->srcId = pSender->pSyncNode->myRaftId;
|
||||
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
||||
pMsg->term = pSender->pSyncNode->raftStore.currentTerm;
|
||||
pMsg->term = raftStoreGetTerm(pSender->pSyncNode);
|
||||
pMsg->beginIndex = pSender->snapshotParam.start;
|
||||
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
||||
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
||||
|
@ -185,7 +185,7 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
|
|||
SyncSnapshotSend *pMsg = rpcMsg.pCont;
|
||||
pMsg->srcId = pSender->pSyncNode->myRaftId;
|
||||
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
||||
pMsg->term = pSender->pSyncNode->raftStore.currentTerm;
|
||||
pMsg->term = raftStoreGetTerm(pSender->pSyncNode);
|
||||
pMsg->beginIndex = pSender->snapshotParam.start;
|
||||
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
||||
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
||||
|
@ -226,7 +226,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
|
|||
SyncSnapshotSend *pMsg = rpcMsg.pCont;
|
||||
pMsg->srcId = pSender->pSyncNode->myRaftId;
|
||||
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
||||
pMsg->term = pSender->pSyncNode->raftStore.currentTerm;
|
||||
pMsg->term = raftStoreGetTerm(pSender->pSyncNode);
|
||||
pMsg->beginIndex = pSender->snapshotParam.start;
|
||||
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
||||
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
||||
|
@ -314,7 +314,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
|
|||
pReceiver->pWriter = NULL;
|
||||
pReceiver->pSyncNode = pSyncNode;
|
||||
pReceiver->fromId = fromId;
|
||||
pReceiver->term = pSyncNode->raftStore.currentTerm;
|
||||
pReceiver->term = raftStoreGetTerm(pSyncNode);
|
||||
pReceiver->snapshot.data = NULL;
|
||||
pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
|
||||
pReceiver->snapshot.lastApplyTerm = 0;
|
||||
|
@ -380,7 +380,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p
|
|||
|
||||
pReceiver->start = true;
|
||||
pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT;
|
||||
pReceiver->term = pReceiver->pSyncNode->raftStore.currentTerm;
|
||||
pReceiver->term = raftStoreGetTerm(pReceiver->pSyncNode);
|
||||
pReceiver->fromId = pPreMsg->srcId;
|
||||
pReceiver->startTime = pPreMsg->startTime;
|
||||
|
||||
|
@ -437,9 +437,8 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
|
|||
}
|
||||
|
||||
// maybe update term
|
||||
if (pReceiver->snapshot.lastApplyTerm > pReceiver->pSyncNode->raftStore.currentTerm) {
|
||||
pReceiver->pSyncNode->raftStore.currentTerm = pReceiver->snapshot.lastApplyTerm;
|
||||
(void)raftStoreWriteFile(pReceiver->pSyncNode);
|
||||
if (pReceiver->snapshot.lastApplyTerm > raftStoreGetTerm(pReceiver->pSyncNode)) {
|
||||
raftStoreSetTerm(pReceiver->pSyncNode, pReceiver->snapshot.lastApplyTerm);
|
||||
}
|
||||
|
||||
// stop writer, apply data
|
||||
|
@ -584,7 +583,7 @@ _SEND_REPLY:
|
|||
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
|
||||
pRspMsg->srcId = pSyncNode->myRaftId;
|
||||
pRspMsg->destId = pMsg->srcId;
|
||||
pRspMsg->term = pSyncNode->raftStore.currentTerm;
|
||||
pRspMsg->term = raftStoreGetTerm(pSyncNode);
|
||||
pRspMsg->lastIndex = pMsg->lastIndex;
|
||||
pRspMsg->lastTerm = pMsg->lastTerm;
|
||||
pRspMsg->startTime = pReceiver->startTime;
|
||||
|
@ -640,7 +639,7 @@ _SEND_REPLY:
|
|||
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
|
||||
pRspMsg->srcId = pSyncNode->myRaftId;
|
||||
pRspMsg->destId = pMsg->srcId;
|
||||
pRspMsg->term = pSyncNode->raftStore.currentTerm;
|
||||
pRspMsg->term = raftStoreGetTerm(pSyncNode);
|
||||
pRspMsg->lastIndex = pMsg->lastIndex;
|
||||
pRspMsg->lastTerm = pMsg->lastTerm;
|
||||
pRspMsg->startTime = pReceiver->startTime;
|
||||
|
@ -690,7 +689,7 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend
|
|||
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
|
||||
pRspMsg->srcId = pSyncNode->myRaftId;
|
||||
pRspMsg->destId = pMsg->srcId;
|
||||
pRspMsg->term = pSyncNode->raftStore.currentTerm;
|
||||
pRspMsg->term = raftStoreGetTerm(pSyncNode);
|
||||
pRspMsg->lastIndex = pMsg->lastIndex;
|
||||
pRspMsg->lastTerm = pMsg->lastTerm;
|
||||
pRspMsg->startTime = pReceiver->startTime;
|
||||
|
@ -737,7 +736,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
|
|||
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
|
||||
pRspMsg->srcId = pSyncNode->myRaftId;
|
||||
pRspMsg->destId = pMsg->srcId;
|
||||
pRspMsg->term = pSyncNode->raftStore.currentTerm;
|
||||
pRspMsg->term = raftStoreGetTerm(pSyncNode);
|
||||
pRspMsg->lastIndex = pMsg->lastIndex;
|
||||
pRspMsg->lastTerm = pMsg->lastTerm;
|
||||
pRspMsg->startTime = pReceiver->startTime;
|
||||
|
@ -786,13 +785,13 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (pMsg->term < pSyncNode->raftStore.currentTerm) {
|
||||
if (pMsg->term < raftStoreGetTerm(pSyncNode)) {
|
||||
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term");
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pMsg->term > pSyncNode->raftStore.currentTerm) {
|
||||
if (pMsg->term > raftStoreGetTerm(pSyncNode)) {
|
||||
syncNodeStepDown(pSyncNode, pMsg->term);
|
||||
}
|
||||
syncNodeResetElectTimer(pSyncNode);
|
||||
|
@ -800,7 +799,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
|
|||
// state, term, seq/ack
|
||||
int32_t code = 0;
|
||||
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
|
||||
if (pMsg->term == pSyncNode->raftStore.currentTerm) {
|
||||
if (pMsg->term == raftStoreGetTerm(pSyncNode)) {
|
||||
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) {
|
||||
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot");
|
||||
code = syncNodeOnSnapshotPrep(pSyncNode, pMsg);
|
||||
|
@ -884,7 +883,7 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend
|
|||
SyncSnapshotSend *pSendMsg = rpcMsg.pCont;
|
||||
pSendMsg->srcId = pSender->pSyncNode->myRaftId;
|
||||
pSendMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
||||
pSendMsg->term = pSender->pSyncNode->raftStore.currentTerm;
|
||||
pSendMsg->term = raftStoreGetTerm(pSender->pSyncNode);
|
||||
pSendMsg->beginIndex = pSender->snapshotParam.start;
|
||||
pSendMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
||||
pSendMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
||||
|
@ -943,10 +942,11 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
|
|||
goto _ERROR;
|
||||
}
|
||||
|
||||
if (pMsg->term != pSyncNode->raftStore.currentTerm) {
|
||||
SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
|
||||
if (pMsg->term != currentTerm) {
|
||||
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match");
|
||||
sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
|
||||
pSyncNode->raftStore.currentTerm);
|
||||
currentTerm);
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
goto _ERROR;
|
||||
}
|
||||
|
|
|
@ -154,7 +154,7 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
|
|||
|
||||
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) {
|
||||
if (pNode == NULL || pNode->pLogStore == NULL) return;
|
||||
int64_t currentTerm = pNode->raftStore.currentTerm;
|
||||
int64_t currentTerm = raftStoreGetTerm(pNode);
|
||||
|
||||
// save error code, otherwise it will be overwritten
|
||||
int32_t errCode = terrno;
|
||||
|
@ -260,7 +260,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
|
|||
pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->snapshotParam.start,
|
||||
pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
|
||||
pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->replicaIndex,
|
||||
DID(&pNode->replicasId[pSender->replicaIndex]), pNode->raftStore.currentTerm, pNode->commitIndex,
|
||||
DID(&pNode->replicasId[pSender->replicaIndex]), raftStoreGetTerm(pNode), pNode->commitIndex,
|
||||
logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
|
||||
pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum, pNode->raftCfg.lastConfigIndex,
|
||||
pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock,
|
||||
|
@ -308,7 +308,7 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
|
|||
pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term,
|
||||
pReceiver->startTime, DID(&pReceiver->fromId), pReceiver->snapshotParam.start, pReceiver->snapshotParam.end,
|
||||
pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex,
|
||||
pNode->raftStore.currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex,
|
||||
raftStoreGetTerm(pNode), pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex,
|
||||
snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize,
|
||||
pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish,
|
||||
syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
|
||||
|
|
|
@ -199,7 +199,7 @@ inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
|
|||
", sby:%d, "
|
||||
"r-num:%d, "
|
||||
"lcfg:%" PRId64 ", chging:%d, rsto:%d",
|
||||
pSyncNode->vgId, syncStr(pSyncNode->state), pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex,
|
||||
pSyncNode->vgId, syncStr(pSyncNode->state), raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex,
|
||||
logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->raftCfg.isStandBy, pSyncNode->replicaNum,
|
||||
pSyncNode->raftCfg.lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish);
|
||||
|
||||
|
|
|
@ -137,7 +137,7 @@ int32_t syncNodeOnPreSnapshot(SSyncNode *ths, SyncPreSnapshot *pMsg) {
|
|||
SyncPreSnapshotReply *pMsgReply = syncPreSnapshotReplyBuild(ths->vgId);
|
||||
pMsgReply->srcId = ths->myRaftId;
|
||||
pMsgReply->destId = pMsg->srcId;
|
||||
pMsgReply->term = ths->raftStore.currentTerm;
|
||||
pMsgReply->term = raftStoreGetTerm(ths);
|
||||
|
||||
SSyncLogStoreData *pData = ths->pLogStore->data;
|
||||
SWal *pWal = pData->pWal;
|
||||
|
|
Loading…
Reference in New Issue