From ac90f61b6376c3bf365b109bd311728ebed8f480 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 2 Jun 2022 13:53:19 +0800 Subject: [PATCH] fix(sync): wal write from middle --- source/libs/sync/src/syncAppendEntries.c | 6 +++++- source/libs/sync/src/syncRaftLog.c | 10 +++++++--- source/libs/sync/src/syncSnapshot.c | 5 +++++ 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 4ff3b95890..4ebe142315 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -20,6 +20,7 @@ #include "syncRaftStore.h" #include "syncUtil.h" #include "syncVoteMgr.h" +#include "wal.h" // TLA+ Spec // HandleAppendEntriesRequest(i, j, m) == @@ -687,7 +688,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs // execute fsm if (ths->pFsm != NULL) { for (SyncIndex i = beginIndex; i <= endIndex; ++i) { - if (i != SYNC_INDEX_INVALID) { + // notice! wal maybe deleted, update firstVer + SyncIndex walFirstVer = walGetFirstVer(ths->pWal); + + if (i != SYNC_INDEX_INVALID && i >= walFirstVer) { SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, i); assert(pEntry != NULL); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index a6397f8cba..a15786e758 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -208,9 +208,13 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) { cJSON_AddItemToObject(pRoot, "pEntries", pEntries); SyncIndex lastIndex = logStoreLastIndex(pLogStore); for (SyncIndex i = 0; i <= lastIndex; ++i) { - SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i); - cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry)); - syncEntryDestory(pEntry); + SyncIndex walFirstVer = walGetFirstVer(pData->pWal); + + if (i != SYNC_INDEX_INVALID && i >= walFirstVer) { + SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i); + cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry)); + syncEntryDestory(pEntry); + } } } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index e423d4d369..c894a34fc0 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -16,6 +16,7 @@ #include "syncSnapshot.h" #include "syncRaftStore.h" #include "syncUtil.h" +#include "wal.h" static void snapshotSenderDoStart(SSyncSnapshotSender *pSender); static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver); @@ -434,6 +435,10 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { // end, finish FSM pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true); + + walRestoreFromSnapshot(pSyncNode->pWal, pMsg->lastIndex); + sInfo("walRestoreFromSnapshot lastIndex:%ld", pMsg->lastIndex); + pReceiver->pWriter = NULL; snapshotReceiverStop(pReceiver); pReceiver->ack = pMsg->seq;