From 56e65d13d10752daa800b938762c632aa5805684 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 8 Jul 2022 17:37:25 +0800 Subject: [PATCH] refactor(sync): add fake syncRestoreFromSnapshot --- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 2 + source/dnode/vnode/src/vnd/vnodeSync.c | 12 +++++- source/libs/sync/src/syncAppendEntriesReply.c | 7 ++-- source/libs/sync/src/syncCommit.c | 3 +- source/libs/sync/src/syncReplication.c | 2 +- .../tsim/sync/vnodesnapshot-restart.sim | 14 +++++++ tests/script/tsim/sync/vnodesnapshot.sim | 7 +--- tests/script/tsim/sync/vnodesnapshot2.sim | 40 +++++++++++++++++++ 8 files changed, 73 insertions(+), 14 deletions(-) create mode 100644 tests/script/tsim/sync/vnodesnapshot-restart.sim create mode 100644 tests/script/tsim/sync/vnodesnapshot2.sim diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index c81f77fa56..2f1042231e 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -383,6 +383,8 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_VNODE_STANDBY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 6f6102ea14..5ca2fddcce 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -330,12 +330,12 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg); ASSERT(pSyncMsg != NULL); - code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); + code = syncNodeOnRequestVoteSnapshotCb(pSyncNode, pSyncMsg); syncRequestVoteDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg); ASSERT(pSyncMsg != NULL); - code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); + code = syncNodeOnRequestVoteReplySnapshotCb(pSyncNode, pSyncMsg); syncRequestVoteReplyDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_BATCH) { SyncAppendEntriesBatch *pSyncMsg = syncAppendEntriesBatchFromRpcMsg2(pMsg); @@ -347,6 +347,14 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ASSERT(pSyncMsg != NULL); code = syncNodeOnAppendEntriesReplySnapshot2Cb(pSyncNode, pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) { + SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg); + code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg); + syncSnapshotSendDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) { + SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg); + code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg); + syncSnapshotRspDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) { code = vnodeSetStandBy(pVnode); if (code != 0 && terrno != 0) code = terrno; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 3f19b6759e..afea32d672 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -193,9 +193,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie // start snapshot SSnapshot oldSnapshot; ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &oldSnapshot); - ASSERT(oldSnapshot.lastApplyIndex >= newMatchIndex + 1); - syncNodeStartSnapshotOnce(ths, newMatchIndex + 1, oldSnapshot.lastApplyIndex, oldSnapshot.lastApplyTerm, - pMsg); // term maybe not ok? + if (oldSnapshot.lastApplyIndex > newMatchIndex) { + syncNodeStartSnapshotOnce(ths, newMatchIndex + 1, oldSnapshot.lastApplyIndex, oldSnapshot.lastApplyTerm, + pMsg); // term maybe not ok? + } syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), oldSnapshot.lastApplyIndex + 1); syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 982d37826a..3e8b020230 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -57,8 +57,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { pSyncNode->commitIndex = snapshot.lastApplyIndex; char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%ld to index:%ld", pSyncNode->commitIndex, - snapshot.lastApplyIndex); + snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%ld to index:%ld", commitBegin, commitEnd); syncNodeEventLog(pSyncNode, eventLog); } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index f1c093fe37..c52a96a514 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -336,7 +336,7 @@ int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaft sDebug( "vgId:%d, send sync-append-entries-batch to %s:%d, {term:%lu, pre-index:%ld, pre-term:%lu, pterm:%lu, " "commit:%ld, " - "datalen:%d, dataCount:%d}", + "datalen:%d, datacount:%d}", pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, pMsg->dataLen, pMsg->dataCount); } while (0); diff --git a/tests/script/tsim/sync/vnodesnapshot-restart.sim b/tests/script/tsim/sync/vnodesnapshot-restart.sim new file mode 100644 index 0000000000..3ed82fdfe7 --- /dev/null +++ b/tests/script/tsim/sync/vnodesnapshot-restart.sim @@ -0,0 +1,14 @@ +system sh/stop_dnodes.sh + +system sh/cfg.sh -n dnode1 -c supportVnodes -v 0 + +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start +system sh/exec.sh -n dnode4 -s start + +#system sh/exec.sh -n dnode1 -s stop -x SIGINT +#system sh/exec.sh -n dnode2 -s stop -x SIGINT +#system sh/exec.sh -n dnode3 -s stop -x SIGINT +#system sh/exec.sh -n dnode4 -s stop -x SIGINT + diff --git a/tests/script/tsim/sync/vnodesnapshot.sim b/tests/script/tsim/sync/vnodesnapshot.sim index aa42047400..347255b489 100644 --- a/tests/script/tsim/sync/vnodesnapshot.sim +++ b/tests/script/tsim/sync/vnodesnapshot.sim @@ -140,7 +140,6 @@ endi sql create table ct1 using stb tags(1000) -system sh/exec.sh -n dnode4 -s stop -x SIGINT $N = 100 $count = 0 @@ -154,12 +153,8 @@ endw #sql insert into ct1 values(now+0s, 10, 2.0, 3.0) #sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3) - #sql flush database db; -#system sh/exec.sh -n dnode4 -s start - -#sql insert into ct1 values(now+1s, 81, 8.1, 8.1)(now+2s, -92, -9.2, -9.2)(now+3s, -73, -7.3, -7.3) @@ -169,7 +164,7 @@ sleep 5000 system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT system sh/exec.sh -n dnode3 -s stop -x SIGINT -#system sh/exec.sh -n dnode4 -s stop -x SIGINT +system sh/exec.sh -n dnode4 -s stop -x SIGINT diff --git a/tests/script/tsim/sync/vnodesnapshot2.sim b/tests/script/tsim/sync/vnodesnapshot2.sim new file mode 100644 index 0000000000..c5df5644b1 --- /dev/null +++ b/tests/script/tsim/sync/vnodesnapshot2.sim @@ -0,0 +1,40 @@ +system sh/stop_dnodes.sh + + +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start + + + +sleep 5000 + + +sql use db + + + + +$N = 100 +$count = 0 +while $count < $N + $ms = 1591201000000 + $count + sql insert into ct1 values( $ms , $count , 2.1, 3.1) + $count = $count + 1 +endw + + + + + + + +sleep 5000 + + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT + + +