refactor: review syncNodeAppendEntriesPeersSnapshot
This commit is contained in:
parent
403f6085d5
commit
78db753a64
|
@ -1169,7 +1169,15 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
|
||||||
for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
|
for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
|
||||||
// maybe overwrite myself, no harm
|
// maybe overwrite myself, no harm
|
||||||
// just do it!
|
// just do it!
|
||||||
pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;
|
|
||||||
|
// pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;
|
||||||
|
|
||||||
|
// maybe wal is deleted
|
||||||
|
SyncIndex lastIndex;
|
||||||
|
SyncTerm lastTerm;
|
||||||
|
int32_t code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
pSyncNode->pNextIndex->index[i] = lastIndex + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
|
for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
|
||||||
|
@ -1299,9 +1307,13 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex
|
||||||
*pPreTerm = snapshot.lastApplyTerm;
|
*pPreTerm = snapshot.lastApplyTerm;
|
||||||
} else {
|
} else {
|
||||||
SSyncRaftEntry* pPreEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, preIndex);
|
SSyncRaftEntry* pPreEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, preIndex);
|
||||||
ASSERT(pPreEntry != NULL);
|
if (pPreEntry != NULL) {
|
||||||
*pPreIndex = pPreEntry->index;
|
*pPreIndex = pPreEntry->index;
|
||||||
*pPreTerm = pPreEntry->term;
|
*pPreTerm = pPreEntry->term;
|
||||||
|
} else {
|
||||||
|
*pPreIndex = SYNC_INDEX_INVALID;
|
||||||
|
*pPreTerm = 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -126,7 +126,6 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
|
||||||
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||||
SRaftId* pDestId = &(pSyncNode->peersId[i]);
|
SRaftId* pDestId = &(pSyncNode->peersId[i]);
|
||||||
|
|
||||||
// set prevLogIndex
|
|
||||||
SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);
|
SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);
|
||||||
|
|
||||||
SyncIndex preLogIndex;
|
SyncIndex preLogIndex;
|
||||||
|
@ -139,6 +138,12 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
|
||||||
// SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex);
|
// SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex);
|
||||||
|
|
||||||
if (syncNodeIsIndexInSnapshot(pSyncNode, nextIndex)) {
|
if (syncNodeIsIndexInSnapshot(pSyncNode, nextIndex)) {
|
||||||
|
// will send this msg until snapshot receive finish!
|
||||||
|
SSnapshot snapshot;
|
||||||
|
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
||||||
|
sInfo("nextIndex:%ld in snapshot: <lastApplyIndex:%ld, lastApplyTerm:%lu>, begin snapshot", nextIndex,
|
||||||
|
snapshot.lastApplyIndex, snapshot.lastApplyTerm);
|
||||||
|
|
||||||
// to claim leader
|
// to claim leader
|
||||||
SyncAppendEntries* pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId);
|
SyncAppendEntries* pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId);
|
||||||
assert(pMsg != NULL);
|
assert(pMsg != NULL);
|
||||||
|
@ -163,7 +168,6 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ASSERT(pSender != NULL);
|
ASSERT(pSender != NULL);
|
||||||
|
|
||||||
snapshotSenderStart(pSender);
|
snapshotSenderStart(pSender);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue