sync index
This commit is contained in:
parent
e3ef9d49e4
commit
17e6071033
|
@ -255,6 +255,7 @@ typedef struct SyncAppendEntries {
|
|||
SRaftId srcId;
|
||||
SRaftId destId;
|
||||
// private data
|
||||
SyncTerm term;
|
||||
SyncIndex prevLogIndex;
|
||||
SyncTerm prevLogTerm;
|
||||
SyncIndex commitIndex;
|
||||
|
|
|
@ -14,6 +14,11 @@
|
|||
*/
|
||||
|
||||
#include "syncAppendEntries.h"
|
||||
#include "syncInt.h"
|
||||
#include "syncRaftLog.h"
|
||||
#include "syncRaftStore.h"
|
||||
#include "syncUtil.h"
|
||||
#include "syncVoteMgr.h"
|
||||
|
||||
// TLA+ Spec
|
||||
// HandleAppendEntriesRequest(i, j, m) ==
|
||||
|
@ -84,5 +89,117 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
int32_t ret = 0;
|
||||
syncAppendEntriesLog2("==syncNodeOnAppendEntriesCb==", pMsg);
|
||||
|
||||
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
||||
syncNodeUpdateTerm(ths, pMsg->term);
|
||||
}
|
||||
assert(pMsg->term <= ths->pRaftStore->currentTerm);
|
||||
|
||||
if (pMsg->term == ths->pRaftStore->currentTerm) {
|
||||
ths->leaderCache = pMsg->srcId;
|
||||
syncNodeResetElectTimer(ths);
|
||||
}
|
||||
assert(pMsg->dataLen >= 0);
|
||||
|
||||
SyncTerm localPreLogTerm = 0;
|
||||
if (pMsg->prevLogTerm >= SYNC_INDEX_BEGIN && pMsg->prevLogTerm <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
|
||||
SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogTerm);
|
||||
assert(pEntry != NULL);
|
||||
localPreLogTerm = pEntry->term;
|
||||
syncEntryDestory(pEntry);
|
||||
}
|
||||
|
||||
bool logOK =
|
||||
(pMsg->prevLogIndex == SYNC_INDEX_INVALID) ||
|
||||
((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) &&
|
||||
(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogIndex == localPreLogTerm));
|
||||
|
||||
// reject
|
||||
if ((pMsg->term < ths->pRaftStore->currentTerm) ||
|
||||
((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
|
||||
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild();
|
||||
pReply->srcId = ths->myRaftId;
|
||||
pReply->destId = pMsg->srcId;
|
||||
pReply->term = ths->pRaftStore->currentTerm;
|
||||
pReply->success = false;
|
||||
pReply->matchIndex = SYNC_INDEX_INVALID;
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||
syncAppendEntriesReplyDestroy(pReply);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
// return to follower state
|
||||
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
|
||||
syncNodeBecomeFollower(ths);
|
||||
}
|
||||
|
||||
// accept request
|
||||
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) {
|
||||
bool matchSuccess = false;
|
||||
if (pMsg->prevLogIndex == SYNC_INDEX_INVALID &&
|
||||
ths->pLogStore->getLastIndex(ths->pLogStore) == SYNC_INDEX_INVALID) {
|
||||
matchSuccess = true;
|
||||
}
|
||||
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
|
||||
SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogTerm);
|
||||
assert(pEntry != NULL);
|
||||
if (pMsg->prevLogTerm == pEntry->term) {
|
||||
matchSuccess = true;
|
||||
}
|
||||
syncEntryDestory(pEntry);
|
||||
}
|
||||
|
||||
if (matchSuccess) {
|
||||
// delete conflict entries
|
||||
if (ths->pLogStore->getLastIndex(ths->pLogStore) > pMsg->prevLogIndex) {
|
||||
SyncIndex fromIndex = pMsg->prevLogIndex + 1;
|
||||
ths->pLogStore->truncate(ths->pLogStore, fromIndex);
|
||||
}
|
||||
|
||||
// append one entry
|
||||
if (pMsg->dataLen > 0) {
|
||||
SSyncRaftEntry* pEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
|
||||
ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
|
||||
syncEntryDestory(pEntry);
|
||||
}
|
||||
|
||||
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild();
|
||||
pReply->srcId = ths->myRaftId;
|
||||
pReply->destId = pMsg->srcId;
|
||||
pReply->term = ths->pRaftStore->currentTerm;
|
||||
pReply->success = true;
|
||||
pReply->matchIndex = pMsg->prevLogIndex + 1;
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||
|
||||
syncAppendEntriesReplyDestroy(pReply);
|
||||
} else {
|
||||
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild();
|
||||
pReply->srcId = ths->myRaftId;
|
||||
pReply->destId = pMsg->srcId;
|
||||
pReply->term = ths->pRaftStore->currentTerm;
|
||||
pReply->success = false;
|
||||
pReply->matchIndex = SYNC_INDEX_INVALID;
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||
syncAppendEntriesReplyDestroy(pReply);
|
||||
}
|
||||
|
||||
if (pMsg->commitIndex > ths->commitIndex) {
|
||||
if (pMsg->commitIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
|
||||
// commit
|
||||
ths->commitIndex = pMsg->commitIndex;
|
||||
ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -1108,6 +1108,9 @@ cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) {
|
|||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term);
|
||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogIndex);
|
||||
cJSON_AddStringToObject(pRoot, "pre_log_index", u64buf);
|
||||
|
||||
|
|
Loading…
Reference in New Issue