From 58fd2228d94ec4ea227f09a3b0247adbad27793e Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sun, 13 Nov 2022 10:45:33 +0800 Subject: [PATCH] refactor(sync): pre snapshot --- include/libs/sync/sync.h | 2 +- source/libs/sync/inc/syncSnapshot.h | 1 + source/libs/sync/src/syncAppendEntries.c | 6 +++++- source/libs/sync/src/syncSnapshot.c | 7 ++++++- 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 7419841dc8..6060da6d3b 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -38,7 +38,7 @@ extern "C" { #define SYNC_MNODE_LOG_RETENTION 10000 #define SYNC_VNODE_LOG_RETENTION 100 #define SNAPSHOT_MAX_CLOCK_SKEW_MS 1000 * 10 -#define SNAPSHOT_WAIT_MS 1000 * 60 +#define SNAPSHOT_WAIT_MS 1000 * 30 #define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000 diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 645548c919..f64f4a9c8b 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -43,6 +43,7 @@ typedef struct SSyncSnapshotSender { int64_t sendingMS; SyncTerm term; int64_t startTime; + int64_t endTime; bool finish; // init when create diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 792ce67cd4..12c0430760 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -167,7 +167,11 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { if (pMsg->prevLogIndex >= startIndex) { SyncTerm myPreLogTerm = syncNodeGetPreTerm(ths, pMsg->prevLogIndex + 1); - ASSERT(myPreLogTerm != SYNC_TERM_INVALID); + // ASSERT(myPreLogTerm != SYNC_TERM_INVALID); + if (myPreLogTerm == SYNC_TERM_INVALID) { + syncLogRecvAppendEntries(ths, pMsg, "reject, pre-term invalid"); + goto _SEND_RESPONSE; + } if (myPreLogTerm != pMsg->prevLogTerm) { syncLogRecvAppendEntries(ths, pMsg, "reject, pre-term not match"); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index e8c5ced5bb..41dc3d3c39 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -45,6 +45,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->replicaIndex = replicaIndex; pSender->term = pSyncNode->pRaftStore->currentTerm; pSender->startTime = 0; + pSender->endTime = 0; pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot)); pSender->finish = false; } else { @@ -132,6 +133,7 @@ int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { // update flag pSender->start = false; pSender->finish = finish; + pSender->endTime = taosGetTimestampMs(); // close reader if (pSender->pReader != NULL) { @@ -265,7 +267,7 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { } if (!snapshotSenderIsStart(pSender) && pSender->finish && - taosGetTimestampMs() - pSender->startTime < SNAPSHOT_WAIT_MS) { + taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) { sNTrace(pSyncNode, "snapshot sender too frequently, ignore"); return 1; } @@ -794,6 +796,9 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) return -1; } + // update next index + syncIndexMgrSetIndex(pSyncNode->pNextIndex, &(pMsg->srcId), snapshot.lastApplyIndex + 1); + // update seq pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;