From 03ca300520c1479f64f788cd0f81e953c66b2c10 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 11 Nov 2022 09:20:05 +0800 Subject: [PATCH 1/5] refactor(sync): modify test, syncRespMgrTest.cpp --- source/libs/sync/test/syncRespMgrTest.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/sync/test/syncRespMgrTest.cpp b/source/libs/sync/test/syncRespMgrTest.cpp index cad6eec91d..bf9c070b47 100644 --- a/source/libs/sync/test/syncRespMgrTest.cpp +++ b/source/libs/sync/test/syncRespMgrTest.cpp @@ -64,10 +64,10 @@ void syncRespMgrGetTest(uint64_t i) { void syncRespMgrGetAndDelTest(uint64_t i) { printf("------syncRespMgrGetAndDelTest-------%" PRIu64 "-- \n", i); - SRespStub stub; + SRpcHandleInfo stub; int32_t ret = syncRespMgrGetAndDel(pMgr, i, &stub); if (ret == 1) { - printStub(&stub); + //printStub(&stub); } else if (ret == 0) { printf("%" PRId64 " notFound \n", i); } From a623ec31df185ce31719531d3fb8459aa6cdc0c4 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 11 Nov 2022 09:25:38 +0800 Subject: [PATCH 2/5] refactor(sync): modify CMakeLists.txt in test --- source/libs/sync/test/CMakeLists.txt | 9 --------- 1 file changed, 9 deletions(-) diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index e584893ae0..935efbc0d3 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -287,10 +287,6 @@ target_sources(syncLeaderTransferTest PRIVATE "syncLeaderTransferTest.cpp" ) -target_sources(syncReconfigFinishTest - PRIVATE - "syncReconfigFinishTest.cpp" -) target_sources(syncRestoreFromSnapshot PRIVATE "syncRestoreFromSnapshot.cpp" @@ -601,11 +597,6 @@ target_include_directories(syncLeaderTransferTest "${TD_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) -target_include_directories(syncReconfigFinishTest - PUBLIC - "${TD_SOURCE_DIR}/include/libs/sync" - "${CMAKE_CURRENT_SOURCE_DIR}/../inc" -) target_include_directories(syncRestoreFromSnapshot PUBLIC "${TD_SOURCE_DIR}/include/libs/sync" From 1c2892988f6822dbe44d3beaa1c0b0d65762a6b3 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 11 Nov 2022 09:30:08 +0800 Subject: [PATCH 3/5] refactor(sync): modify CMakeLists.txt in test --- source/libs/sync/test/CMakeLists.txt | 4 ---- 1 file changed, 4 deletions(-) diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 935efbc0d3..b46b775b52 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -858,10 +858,6 @@ target_link_libraries(syncLeaderTransferTest sync gtest_main ) -target_link_libraries(syncReconfigFinishTest - sync - gtest_main -) target_link_libraries(syncRestoreFromSnapshot sync gtest_main From 58fd2228d94ec4ea227f09a3b0247adbad27793e Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sun, 13 Nov 2022 10:45:33 +0800 Subject: [PATCH 4/5] refactor(sync): pre snapshot --- include/libs/sync/sync.h | 2 +- source/libs/sync/inc/syncSnapshot.h | 1 + source/libs/sync/src/syncAppendEntries.c | 6 +++++- source/libs/sync/src/syncSnapshot.c | 7 ++++++- 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 7419841dc8..6060da6d3b 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -38,7 +38,7 @@ extern "C" { #define SYNC_MNODE_LOG_RETENTION 10000 #define SYNC_VNODE_LOG_RETENTION 100 #define SNAPSHOT_MAX_CLOCK_SKEW_MS 1000 * 10 -#define SNAPSHOT_WAIT_MS 1000 * 60 +#define SNAPSHOT_WAIT_MS 1000 * 30 #define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000 diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 645548c919..f64f4a9c8b 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -43,6 +43,7 @@ typedef struct SSyncSnapshotSender { int64_t sendingMS; SyncTerm term; int64_t startTime; + int64_t endTime; bool finish; // init when create diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 792ce67cd4..12c0430760 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -167,7 +167,11 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { if (pMsg->prevLogIndex >= startIndex) { SyncTerm myPreLogTerm = syncNodeGetPreTerm(ths, pMsg->prevLogIndex + 1); - ASSERT(myPreLogTerm != SYNC_TERM_INVALID); + // ASSERT(myPreLogTerm != SYNC_TERM_INVALID); + if (myPreLogTerm == SYNC_TERM_INVALID) { + syncLogRecvAppendEntries(ths, pMsg, "reject, pre-term invalid"); + goto _SEND_RESPONSE; + } if (myPreLogTerm != pMsg->prevLogTerm) { syncLogRecvAppendEntries(ths, pMsg, "reject, pre-term not match"); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index e8c5ced5bb..41dc3d3c39 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -45,6 +45,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->replicaIndex = replicaIndex; pSender->term = pSyncNode->pRaftStore->currentTerm; pSender->startTime = 0; + pSender->endTime = 0; pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot)); pSender->finish = false; } else { @@ -132,6 +133,7 @@ int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { // update flag pSender->start = false; pSender->finish = finish; + pSender->endTime = taosGetTimestampMs(); // close reader if (pSender->pReader != NULL) { @@ -265,7 +267,7 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { } if (!snapshotSenderIsStart(pSender) && pSender->finish && - taosGetTimestampMs() - pSender->startTime < SNAPSHOT_WAIT_MS) { + taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) { sNTrace(pSyncNode, "snapshot sender too frequently, ignore"); return 1; } @@ -794,6 +796,9 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) return -1; } + // update next index + syncIndexMgrSetIndex(pSyncNode->pNextIndex, &(pMsg->srcId), snapshot.lastApplyIndex + 1); + // update seq pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; From ce4f32b8e7263ff6ca693afb5a1316fd98f9d1f4 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sun, 13 Nov 2022 12:21:14 +0800 Subject: [PATCH 5/5] refactor(sync): pre snapshot --- source/libs/sync/inc/syncInt.h | 5 +++-- source/libs/sync/inc/syncSnapshot.h | 1 - source/libs/sync/src/syncReplication.c | 1 - 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 8a951ba38d..706c494048 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -290,8 +290,9 @@ void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId); void syncNodeVoteForSelf(SSyncNode* pSyncNode); // snapshot -------------- -bool syncNodeHasSnapshot(SSyncNode* pSyncNode); -void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode); +bool syncNodeHasSnapshot(SSyncNode* pSyncNode); +void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode); +int32_t syncNodeStartSnapshot(SSyncNode* pSyncNode, SRaftId* pDestId); SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode); SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode); diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index f64f4a9c8b..93b2531235 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -88,7 +88,6 @@ int32_t syncNodeOnSnapshot(SSyncNode *ths, SyncSnapshotSend *pMsg); int32_t syncNodeOnSnapshotReply(SSyncNode *ths, SyncSnapshotRsp *pMsg); // start -int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 882a4764f7..00a3d72f25 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -20,7 +20,6 @@ #include "syncRaftEntry.h" #include "syncRaftLog.h" #include "syncRaftStore.h" -#include "syncSnapshot.h" #include "syncUtil.h" // TLA+ Spec