From 2071c5a74e7a47b10f658627c7c73b2b5cb0fa06 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 6 Jun 2022 20:02:27 +0800 Subject: [PATCH] fix(sync): sending snapshot --- source/libs/sync/src/syncAppendEntries.c | 28 ++++++++++++++++-------- source/libs/sync/src/syncMain.c | 2 +- source/libs/sync/src/syncRaftLog.c | 11 ++++++++-- source/libs/sync/src/syncSnapshot.c | 14 ++++++++---- source/libs/sync/test/syncTestTool.cpp | 4 ++-- 5 files changed, 41 insertions(+), 18 deletions(-) diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 97600c3f15..4fb3df3d9a 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -451,9 +451,11 @@ static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries return false; } -static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg, SSyncRaftEntry** ppAppendEntry) { +static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg, SSyncRaftEntry** ppAppendEntry, + bool* pEntryAlreadyWritten) { int32_t code; *ppAppendEntry = NULL; + *pEntryAlreadyWritten = false; // not conflict by default bool conflict = false; @@ -466,10 +468,15 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg, SSyn *ppAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); ASSERT(*ppAppendEntry != NULL); - // log not match, conflict, need delete ASSERT(extraIndex == (*ppAppendEntry)->index); if (pExtraEntry->term != (*ppAppendEntry)->term) { + // log not match, conflict, need delete conflict = true; + } else { + // log match, already written + ASSERT(extraIndex == (*ppAppendEntry)->index && pExtraEntry->term == (*ppAppendEntry)->term); + *pEntryAlreadyWritten = true; + sInfo("entry already written, term:%lu, index:%ld", pExtraEntry->term, pExtraEntry->index); } syncEntryDestory(pExtraEntry); @@ -606,17 +613,20 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs if (hasExtraEntries && hasAppendEntries) { // make log same SSyncRaftEntry* pAppendEntry; - code = syncNodeMakeLogSame(ths, pMsg, &pAppendEntry); + bool entryAlreadyWritten; + code = syncNodeMakeLogSame(ths, pMsg, &pAppendEntry, &entryAlreadyWritten); ASSERT(code == 0); ASSERT(pAppendEntry != NULL); - // append new entries - code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); - ASSERT(code == 0); + if (!entryAlreadyWritten) { + // append new entries + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); + ASSERT(code == 0); - // pre commit - code = syncNodePreCommit(ths, pAppendEntry); - ASSERT(code == 0); + // pre commit + code = syncNodePreCommit(ths, pAppendEntry); + ASSERT(code == 0); + } syncEntryDestory(pAppendEntry); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 4656a8e579..2aafbb1fb5 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1612,7 +1612,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { int32_t ret = 0; syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg); - SyncIndex index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1; + SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore); SyncTerm term = ths->pRaftStore->currentTerm; SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index); assert(pEntry != NULL); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 95f5926a6f..a375152ae3 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -477,13 +477,20 @@ cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore) { cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal); cJSON_AddStringToObject(pRoot, "pWal", u64buf); - snprintf(u64buf, sizeof(u64buf), "%ld", logStoreLastIndex(pLogStore)); + snprintf(u64buf, sizeof(u64buf), "%ld", raftLogLastIndex(pLogStore)); cJSON_AddStringToObject(pRoot, "LastIndex", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore)); + snprintf(u64buf, sizeof(u64buf), "%lu", raftLogLastTerm(pLogStore)); cJSON_AddStringToObject(pRoot, "LastTerm", u64buf); snprintf(u64buf, sizeof(u64buf), "%ld", pData->beginIndex); cJSON_AddStringToObject(pRoot, "beginIndex", u64buf); + + SyncIndex endIndex = raftLogEndIndex(pLogStore); + snprintf(u64buf, sizeof(u64buf), "%ld", endIndex); + cJSON_AddStringToObject(pRoot, "endIndex", u64buf); + + int32_t count = raftLogEntryCount(pLogStore); + cJSON_AddNumberToObject(pRoot, "entryCount", count); } cJSON* pJson = cJSON_CreateObject(); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index bfb60e448c..36388e6d6a 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -14,6 +14,7 @@ */ #include "syncSnapshot.h" +#include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncUtil.h" #include "wal.h" @@ -284,7 +285,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) { cJSON *pJson = snapshotSender2Json(pSender); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -398,7 +399,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { cJSON *pJson = snapshotReceiver2Json(pReceiver); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -427,8 +428,13 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true); - walRestoreFromSnapshot(pSyncNode->pWal, pMsg->lastIndex); - sInfo("walRestoreFromSnapshot lastIndex:%ld", pMsg->lastIndex); + pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1); + char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore); + sInfo("snapshot receive finish, update log begin index:%ld, raft log:%s", pMsg->lastIndex + 1, logSimpleStr); + taosMemoryFree(logSimpleStr); + + // walRestoreFromSnapshot(pSyncNode->pWal, pMsg->lastIndex); + // sInfo("walRestoreFromSnapshot lastIndex:%ld", pMsg->lastIndex); pReceiver->pWriter = NULL; snapshotReceiverStop(pReceiver); diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index f733b6f007..f772b225fe 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -295,7 +295,7 @@ int main(int argc, char** argv) { int32_t lastApplyTerm = atoi(argv[5]); int32_t writeRecordNum = atoi(argv[6]); bool isStandBy = atoi(argv[7]); - bool isConfigChange = atoi(argv[8]); + int32_t isConfigChange = atoi(argv[8]); int32_t iterTimes = atoi(argv[9]); int32_t finishLastApplyIndex = atoi(argv[10]); int32_t finishLastApplyTerm = atoi(argv[11]); @@ -308,7 +308,7 @@ int main(int argc, char** argv) { // check parameter assert(replicaNum >= 1 && replicaNum <= 5); - assert(myIndex >= 0 && myIndex < replicaNum); + //assert(myIndex >= 0 && myIndex < replicaNum); assert(lastApplyIndex >= -1); assert(lastApplyTerm >= 0); assert(writeRecordNum >= 0);