diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index f4696cd019..219aecd22d 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1169,7 +1169,15 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode) { for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) { // maybe overwrite myself, no harm // just do it! - pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1; + + // pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1; + + // maybe wal is deleted + SyncIndex lastIndex; + SyncTerm lastTerm; + int32_t code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm); + ASSERT(code == 0); + pSyncNode->pNextIndex->index[i] = lastIndex + 1; } for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) { @@ -1299,9 +1307,13 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex *pPreTerm = snapshot.lastApplyTerm; } else { SSyncRaftEntry* pPreEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, preIndex); - ASSERT(pPreEntry != NULL); - *pPreIndex = pPreEntry->index; - *pPreTerm = pPreEntry->term; + if (pPreEntry != NULL) { + *pPreIndex = pPreEntry->index; + *pPreTerm = pPreEntry->term; + } else { + *pPreIndex = SYNC_INDEX_INVALID; + *pPreTerm = 0; + } } return 0; diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 864acb2993..7f2968e2d7 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -126,7 +126,6 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { for (int i = 0; i < pSyncNode->peersNum; ++i) { SRaftId* pDestId = &(pSyncNode->peersId[i]); - // set prevLogIndex SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); SyncIndex preLogIndex; @@ -139,6 +138,12 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { // SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex); if (syncNodeIsIndexInSnapshot(pSyncNode, nextIndex)) { + // will send this msg until snapshot receive finish! + SSnapshot snapshot; + pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + sInfo("nextIndex:%ld in snapshot: , begin snapshot", nextIndex, + snapshot.lastApplyIndex, snapshot.lastApplyTerm); + // to claim leader SyncAppendEntries* pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId); assert(pMsg != NULL); @@ -163,7 +168,6 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { } } ASSERT(pSender != NULL); - snapshotSenderStart(pSender); } else {