From b9472dc386931b76ef648e1af4f4241d8d95fbaa Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 31 May 2022 17:53:38 +0800 Subject: [PATCH] enh(sync): syncNodeAppendEntriesPeersSnapshot --- source/libs/sync/src/syncReplication.c | 93 +++++++++++++++++++++++++- 1 file changed, 92 insertions(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index cdf05b5560..864acb2993 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -20,6 +20,7 @@ #include "syncRaftEntry.h" #include "syncRaftLog.h" #include "syncRaftStore.h" +#include "syncSnapshot.h" #include "syncUtil.h" // TLA+ Spec @@ -114,7 +115,97 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { return ret; } -int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { return 0; } +int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { + assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER); + + syncIndexMgrLog2("==syncNodeAppendEntriesPeersSnapshot== pNextIndex", pSyncNode->pNextIndex); + syncIndexMgrLog2("==syncNodeAppendEntriesPeersSnapshot== pMatchIndex", pSyncNode->pMatchIndex); + logStoreSimpleLog2("==syncNodeAppendEntriesPeersSnapshot==", pSyncNode->pLogStore); + + int32_t ret = 0; + for (int i = 0; i < pSyncNode->peersNum; ++i) { + SRaftId* pDestId = &(pSyncNode->peersId[i]); + + // set prevLogIndex + SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); + + SyncIndex preLogIndex; + SyncTerm preLogTerm; + + ret = syncNodeGetPreIndexTerm(pSyncNode, nextIndex, &preLogIndex, &preLogTerm); + ASSERT(ret == 0); + + // batch optimized + // SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex); + + if (syncNodeIsIndexInSnapshot(pSyncNode, nextIndex)) { + // to claim leader + SyncAppendEntries* pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId); + assert(pMsg != NULL); + pMsg->srcId = pSyncNode->myRaftId; + pMsg->destId = *pDestId; + pMsg->term = pSyncNode->pRaftStore->currentTerm; + pMsg->prevLogIndex = preLogIndex; + pMsg->prevLogTerm = preLogTerm; + pMsg->commitIndex = pSyncNode->commitIndex; + + syncAppendEntriesLog2("==syncNodeAppendEntriesPeersSnapshot==", pMsg); + + // send AppendEntries + syncNodeAppendEntries(pSyncNode, pDestId, pMsg); + syncAppendEntriesDestroy(pMsg); + + SSyncSnapshotSender* pSender = NULL; + for (int i = 0; i < pSyncNode->replicaNum; ++i) { + if (syncUtilSameId(&((pSyncNode->replicasId)[i]), pDestId)) { + pSender = (pSyncNode->senders)[i]; + break; + } + } + ASSERT(pSender != NULL); + + snapshotSenderStart(pSender); + + } else { + SyncAppendEntries* pMsg = NULL; + SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, nextIndex); + if (pEntry != NULL) { + pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId); + assert(pMsg != NULL); + + // add pEntry into msg + uint32_t len; + char* serialized = syncEntrySerialize(pEntry, &len); + assert(len == pEntry->bytes); + memcpy(pMsg->data, serialized, len); + + taosMemoryFree(serialized); + syncEntryDestory(pEntry); + + } else { + // maybe overflow, send empty record + pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId); + assert(pMsg != NULL); + } + + assert(pMsg != NULL); + pMsg->srcId = pSyncNode->myRaftId; + pMsg->destId = *pDestId; + pMsg->term = pSyncNode->pRaftStore->currentTerm; + pMsg->prevLogIndex = preLogIndex; + pMsg->prevLogTerm = preLogTerm; + pMsg->commitIndex = pSyncNode->commitIndex; + + syncAppendEntriesLog2("==syncNodeAppendEntriesPeersSnapshot==", pMsg); + + // send AppendEntries + syncNodeAppendEntries(pSyncNode, pDestId, pMsg); + syncAppendEntriesDestroy(pMsg); + } + } + + return ret; +} int32_t syncNodeReplicate(SSyncNode* pSyncNode) { // start replicate