enh(sync): syncNodeAppendEntriesPeersSnapshot

This commit is contained in:
Minghao Li 2022-05-31 17:53:38 +08:00
parent ba212ce396
commit b9472dc386
1 changed files with 92 additions and 1 deletions

View File

@ -20,6 +20,7 @@
#include "syncRaftEntry.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncSnapshot.h"
#include "syncUtil.h"
// TLA+ Spec
@ -114,7 +115,97 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
return ret;
}
int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { return 0; }
int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
syncIndexMgrLog2("==syncNodeAppendEntriesPeersSnapshot== pNextIndex", pSyncNode->pNextIndex);
syncIndexMgrLog2("==syncNodeAppendEntriesPeersSnapshot== pMatchIndex", pSyncNode->pMatchIndex);
logStoreSimpleLog2("==syncNodeAppendEntriesPeersSnapshot==", pSyncNode->pLogStore);
int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SRaftId* pDestId = &(pSyncNode->peersId[i]);
// set prevLogIndex
SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);
SyncIndex preLogIndex;
SyncTerm preLogTerm;
ret = syncNodeGetPreIndexTerm(pSyncNode, nextIndex, &preLogIndex, &preLogTerm);
ASSERT(ret == 0);
// batch optimized
// SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex);
if (syncNodeIsIndexInSnapshot(pSyncNode, nextIndex)) {
// to claim leader
SyncAppendEntries* pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId);
assert(pMsg != NULL);
pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = *pDestId;
pMsg->term = pSyncNode->pRaftStore->currentTerm;
pMsg->prevLogIndex = preLogIndex;
pMsg->prevLogTerm = preLogTerm;
pMsg->commitIndex = pSyncNode->commitIndex;
syncAppendEntriesLog2("==syncNodeAppendEntriesPeersSnapshot==", pMsg);
// send AppendEntries
syncNodeAppendEntries(pSyncNode, pDestId, pMsg);
syncAppendEntriesDestroy(pMsg);
SSyncSnapshotSender* pSender = NULL;
for (int i = 0; i < pSyncNode->replicaNum; ++i) {
if (syncUtilSameId(&((pSyncNode->replicasId)[i]), pDestId)) {
pSender = (pSyncNode->senders)[i];
break;
}
}
ASSERT(pSender != NULL);
snapshotSenderStart(pSender);
} else {
SyncAppendEntries* pMsg = NULL;
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, nextIndex);
if (pEntry != NULL) {
pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId);
assert(pMsg != NULL);
// add pEntry into msg
uint32_t len;
char* serialized = syncEntrySerialize(pEntry, &len);
assert(len == pEntry->bytes);
memcpy(pMsg->data, serialized, len);
taosMemoryFree(serialized);
syncEntryDestory(pEntry);
} else {
// maybe overflow, send empty record
pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId);
assert(pMsg != NULL);
}
assert(pMsg != NULL);
pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = *pDestId;
pMsg->term = pSyncNode->pRaftStore->currentTerm;
pMsg->prevLogIndex = preLogIndex;
pMsg->prevLogTerm = preLogTerm;
pMsg->commitIndex = pSyncNode->commitIndex;
syncAppendEntriesLog2("==syncNodeAppendEntriesPeersSnapshot==", pMsg);
// send AppendEntries
syncNodeAppendEntries(pSyncNode, pDestId, pMsg);
syncAppendEntriesDestroy(pMsg);
}
}
return ret;
}
int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
// start replicate