diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 7dfea31f5c..5785089a20 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -255,6 +255,7 @@ typedef struct SyncAppendEntries { SRaftId srcId; SRaftId destId; // private data + SyncTerm term; SyncIndex prevLogIndex; SyncTerm prevLogTerm; SyncIndex commitIndex; diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 55d369a115..87d6669f59 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -14,6 +14,11 @@ */ #include "syncAppendEntries.h" +#include "syncInt.h" +#include "syncRaftLog.h" +#include "syncRaftStore.h" +#include "syncUtil.h" +#include "syncVoteMgr.h" // TLA+ Spec // HandleAppendEntriesRequest(i, j, m) == @@ -84,5 +89,117 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { int32_t ret = 0; syncAppendEntriesLog2("==syncNodeOnAppendEntriesCb==", pMsg); + if (pMsg->term > ths->pRaftStore->currentTerm) { + syncNodeUpdateTerm(ths, pMsg->term); + } + assert(pMsg->term <= ths->pRaftStore->currentTerm); + + if (pMsg->term == ths->pRaftStore->currentTerm) { + ths->leaderCache = pMsg->srcId; + syncNodeResetElectTimer(ths); + } + assert(pMsg->dataLen >= 0); + + SyncTerm localPreLogTerm = 0; + if (pMsg->prevLogTerm >= SYNC_INDEX_BEGIN && pMsg->prevLogTerm <= ths->pLogStore->getLastIndex(ths->pLogStore)) { + SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogTerm); + assert(pEntry != NULL); + localPreLogTerm = pEntry->term; + syncEntryDestory(pEntry); + } + + bool logOK = + (pMsg->prevLogIndex == SYNC_INDEX_INVALID) || + ((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) && + (pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogIndex == localPreLogTerm)); + + // reject + if ((pMsg->term < ths->pRaftStore->currentTerm) || + ((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) { + SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); + pReply->srcId = ths->myRaftId; + pReply->destId = pMsg->srcId; + pReply->term = ths->pRaftStore->currentTerm; + pReply->success = false; + pReply->matchIndex = SYNC_INDEX_INVALID; + + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); + syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + syncAppendEntriesReplyDestroy(pReply); + + return ret; + } + + // return to follower state + if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) { + syncNodeBecomeFollower(ths); + } + + // accept request + if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) { + bool matchSuccess = false; + if (pMsg->prevLogIndex == SYNC_INDEX_INVALID && + ths->pLogStore->getLastIndex(ths->pLogStore) == SYNC_INDEX_INVALID) { + matchSuccess = true; + } + if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { + SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogTerm); + assert(pEntry != NULL); + if (pMsg->prevLogTerm == pEntry->term) { + matchSuccess = true; + } + syncEntryDestory(pEntry); + } + + if (matchSuccess) { + // delete conflict entries + if (ths->pLogStore->getLastIndex(ths->pLogStore) > pMsg->prevLogIndex) { + SyncIndex fromIndex = pMsg->prevLogIndex + 1; + ths->pLogStore->truncate(ths->pLogStore, fromIndex); + } + + // append one entry + if (pMsg->dataLen > 0) { + SSyncRaftEntry* pEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); + ths->pLogStore->appendEntry(ths->pLogStore, pEntry); + syncEntryDestory(pEntry); + } + + SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); + pReply->srcId = ths->myRaftId; + pReply->destId = pMsg->srcId; + pReply->term = ths->pRaftStore->currentTerm; + pReply->success = true; + pReply->matchIndex = pMsg->prevLogIndex + 1; + + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); + syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + + syncAppendEntriesReplyDestroy(pReply); + } else { + SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); + pReply->srcId = ths->myRaftId; + pReply->destId = pMsg->srcId; + pReply->term = ths->pRaftStore->currentTerm; + pReply->success = false; + pReply->matchIndex = SYNC_INDEX_INVALID; + + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); + syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + syncAppendEntriesReplyDestroy(pReply); + } + + if (pMsg->commitIndex > ths->commitIndex) { + if (pMsg->commitIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { + // commit + ths->commitIndex = pMsg->commitIndex; + ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex); + } + } + } + return ret; } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 2447ea2218..509ede274b 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -1108,6 +1108,9 @@ cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) { cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); cJSON_AddItemToObject(pRoot, "destId", pDestId); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogIndex); cJSON_AddStringToObject(pRoot, "pre_log_index", u64buf);