fix(sync): wal write from middle
This commit is contained in:
parent
e4f04491df
commit
ac90f61b63
|
@ -20,6 +20,7 @@
|
||||||
#include "syncRaftStore.h"
|
#include "syncRaftStore.h"
|
||||||
#include "syncUtil.h"
|
#include "syncUtil.h"
|
||||||
#include "syncVoteMgr.h"
|
#include "syncVoteMgr.h"
|
||||||
|
#include "wal.h"
|
||||||
|
|
||||||
// TLA+ Spec
|
// TLA+ Spec
|
||||||
// HandleAppendEntriesRequest(i, j, m) ==
|
// HandleAppendEntriesRequest(i, j, m) ==
|
||||||
|
@ -687,7 +688,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
// execute fsm
|
// execute fsm
|
||||||
if (ths->pFsm != NULL) {
|
if (ths->pFsm != NULL) {
|
||||||
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
|
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
|
||||||
if (i != SYNC_INDEX_INVALID) {
|
// notice! wal maybe deleted, update firstVer
|
||||||
|
SyncIndex walFirstVer = walGetFirstVer(ths->pWal);
|
||||||
|
|
||||||
|
if (i != SYNC_INDEX_INVALID && i >= walFirstVer) {
|
||||||
SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, i);
|
SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, i);
|
||||||
assert(pEntry != NULL);
|
assert(pEntry != NULL);
|
||||||
|
|
||||||
|
|
|
@ -208,9 +208,13 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) {
|
||||||
cJSON_AddItemToObject(pRoot, "pEntries", pEntries);
|
cJSON_AddItemToObject(pRoot, "pEntries", pEntries);
|
||||||
SyncIndex lastIndex = logStoreLastIndex(pLogStore);
|
SyncIndex lastIndex = logStoreLastIndex(pLogStore);
|
||||||
for (SyncIndex i = 0; i <= lastIndex; ++i) {
|
for (SyncIndex i = 0; i <= lastIndex; ++i) {
|
||||||
SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i);
|
SyncIndex walFirstVer = walGetFirstVer(pData->pWal);
|
||||||
cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
|
|
||||||
syncEntryDestory(pEntry);
|
if (i != SYNC_INDEX_INVALID && i >= walFirstVer) {
|
||||||
|
SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i);
|
||||||
|
cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#include "syncSnapshot.h"
|
#include "syncSnapshot.h"
|
||||||
#include "syncRaftStore.h"
|
#include "syncRaftStore.h"
|
||||||
#include "syncUtil.h"
|
#include "syncUtil.h"
|
||||||
|
#include "wal.h"
|
||||||
|
|
||||||
static void snapshotSenderDoStart(SSyncSnapshotSender *pSender);
|
static void snapshotSenderDoStart(SSyncSnapshotSender *pSender);
|
||||||
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver);
|
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver);
|
||||||
|
@ -434,6 +435,10 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||||
// end, finish FSM
|
// end, finish FSM
|
||||||
pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
|
pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
|
||||||
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
|
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
|
||||||
|
|
||||||
|
walRestoreFromSnapshot(pSyncNode->pWal, pMsg->lastIndex);
|
||||||
|
sInfo("walRestoreFromSnapshot lastIndex:%ld", pMsg->lastIndex);
|
||||||
|
|
||||||
pReceiver->pWriter = NULL;
|
pReceiver->pWriter = NULL;
|
||||||
snapshotReceiverStop(pReceiver);
|
snapshotReceiverStop(pReceiver);
|
||||||
pReceiver->ack = pMsg->seq;
|
pReceiver->ack = pMsg->seq;
|
||||||
|
|
Loading…
Reference in New Issue