From c3bf3cc97c39def76b075bee0e05b36166561125 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 1 Jun 2022 21:57:30 +0800 Subject: [PATCH] fix: send snapshot --- source/libs/sync/src/syncMain.c | 13 ++++++++----- source/libs/sync/src/syncSnapshot.c | 5 +++-- .../libs/sync/test/syncConfigChangeSnapshotTest.cpp | 6 ++++++ 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 7878bfe7ee..ab696384a0 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -562,15 +562,18 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { pSyncNode->FpOnSnapshotRsp = syncNodeOnSnapshotRspCb; if (pSyncNode->pRaftCfg->snapshotEnable) { - pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb; - pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb; - pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb; - pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb; - } else { + sInfo("sync node use snapshot"); pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteSnapshotCb; pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplySnapshotCb; pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesSnapshotCb; pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplySnapshotCb; + + } else { + sInfo("sync node do not use snapshot"); + pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb; + pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb; + pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb; + pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb; } // tools diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index cfd5e2f5da..e06f1a9ec4 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -279,7 +279,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) { cJSON *pJson = snapshotSender2Json(pSender); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -393,7 +393,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { cJSON *pJson = snapshotReceiver2Json(pReceiver); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -432,6 +432,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { // end, finish FSM pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true); + pReceiver->pWriter = NULL; snapshotReceiverStop(pReceiver); pReceiver->ack = pMsg->seq; needRsp = false; diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index 8f36f19238..ba7199f9b0 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -127,6 +127,12 @@ int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStopWrite== pFsm:%p, pWriter:%p, isApply:%d", pFsm, pWriter, isApply); sTrace("%s", logBuf); + + if (isApply) { + gSnapshotLastApplyIndex = 7; + gSnapshotLastApplyTerm = 1; + } + return 0; }