From 326714a0772e75858c2539242fd935cd6dd9569c Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 7 Jun 2022 14:35:20 +0800 Subject: [PATCH] fix(sync): snapshot maybe change when sending --- source/libs/sync/inc/syncSnapshot.h | 16 +++++++++------- source/libs/sync/src/syncAppendEntriesReply.c | 13 +++++-------- source/libs/sync/src/syncSnapshot.c | 4 ++-- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index c028e732b8..f9f890aa8a 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -39,8 +39,8 @@ typedef struct SSyncSnapshotSender { bool start; int32_t seq; int32_t ack; - void * pReader; - void * pCurrentBlock; + void *pReader; + void *pCurrentBlock; int32_t blockLen; SSnapshot snapshot; int64_t sendingMS; @@ -50,20 +50,22 @@ typedef struct SSyncSnapshotSender { bool finish; } SSyncSnapshotSender; +void snapshotSenderDoStart(SSyncSnapshotSender *pSender); + SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex); void snapshotSenderDestroy(SSyncSnapshotSender *pSender); void snapshotSenderStart(SSyncSnapshotSender *pSender); void snapshotSenderStop(SSyncSnapshotSender *pSender); int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender); -cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender); -char * snapshotSender2Str(SSyncSnapshotSender *pSender); +cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); +char *snapshotSender2Str(SSyncSnapshotSender *pSender); typedef struct SSyncSnapshotReceiver { bool start; int32_t ack; - void * pWriter; + void *pWriter; SyncTerm term; SSyncNode *pSyncNode; @@ -74,8 +76,8 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t repl void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); -cJSON * snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); -char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); +cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); +char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg); int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg); diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 82b462eacd..47e76dab4c 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -174,18 +174,15 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries } ASSERT(pSender != NULL); - // calculate sentryIndex SyncIndex sentryIndex; - if (pSender->start) { + if (pSender->start && pSender->term == ths->pRaftStore->currentTerm) { + // already start sentryIndex = pSender->snapshot.lastApplyIndex; } else { - // start send snapshot - if (!(pSender->term == ths->pRaftStore->currentTerm && pSender->finish == true)) { - snapshotSenderStart(pSender); - } else { - sInfo("snapshot send finish, send_term:%lu, current_term:%lu", pSender->term, ths->pRaftStore->currentTerm); - } + // start send snapshot, first time + snapshotSenderDoStart(pSender); + pSender->start = true; sentryIndex = pSender->snapshot.lastApplyIndex; } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 36388e6d6a..4df955083b 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -19,7 +19,7 @@ #include "syncUtil.h" #include "wal.h" -static void snapshotSenderDoStart(SSyncSnapshotSender *pSender); +// static void snapshotSenderDoStart(SSyncSnapshotSender *pSender); static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver); SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { @@ -59,7 +59,7 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { } // begin send snapshot (current term, seq begin) -static void snapshotSenderDoStart(SSyncSnapshotSender *pSender) { +void snapshotSenderDoStart(SSyncSnapshotSender *pSender) { pSender->term = pSender->pSyncNode->pRaftStore->currentTerm; pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;