From 6b329f791bda6bee971036807eb4d45a4123dc08 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 6 Jun 2022 16:35:44 +0800 Subject: [PATCH] enh(sync): update raft core functions --- source/libs/sync/inc/syncInt.h | 2 -- source/libs/sync/src/syncAppendEntriesReply.c | 23 ++++++++++++++++++- source/libs/sync/src/syncMain.c | 18 +-------------- source/libs/sync/src/syncSnapshot.c | 19 ++++----------- 4 files changed, 27 insertions(+), 35 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 3f269625e1..ba7e35ec5b 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -152,11 +152,9 @@ typedef struct SSyncNode { SSyncRespMgr* pSyncRespMgr; // restore state - // sem_t restoreSem; bool restoreFinish; // SSnapshot* pSnapshot; SSyncSnapshotSender* senders[TSDB_MAX_REPLICA]; - SSyncSnapshotReceiver* receivers[TSDB_MAX_REPLICA]; SSyncSnapshotReceiver* pNewNodeReceiver; } SSyncNode; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index b4520f55a9..f246e4fb29 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -19,6 +19,7 @@ #include "syncInt.h" #include "syncRaftLog.h" #include "syncRaftStore.h" +#include "syncSnapshot.h" #include "syncUtil.h" #include "syncVoteMgr.h" @@ -137,7 +138,9 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); // maybe commit - syncMaybeAdvanceCommitIndex(ths); + if (ths->state == TAOS_SYNC_STATE_LEADER) { + syncMaybeAdvanceCommitIndex(ths); + } } else { SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); @@ -151,8 +154,26 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries SSnapshot snapshot; ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); if (nextIndex <= snapshot.lastApplyIndex) { + ASSERT(nextIndex == snapshot.lastApplyIndex); + nextIndex = snapshot.lastApplyIndex + 1; sInfo("reset new nextIndex %ld, snapshot.lastApplyIndex:%ld", nextIndex, snapshot.lastApplyIndex); + + // start send snapshot + // get sender + SSyncSnapshotSender* pSender = NULL; + for (int i = 0; i < ths->replicaNum; ++i) { + if (syncUtilSameId(&(pMsg->srcId), &((ths->replicasId)[i]))) { + pSender = (ths->senders)[i]; + } + } + ASSERT(pSender != NULL); + + if (!(pSender->term == ths->pRaftStore->currentTerm && pSender->finish == true)) { + snapshotSenderStart(pSender); + } else { + sInfo("snapshot send finish, send_term:%lu, current_term:%lu", pSender->term, ths->pRaftStore->currentTerm); + } } } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index fe6d2b672a..4656a8e579 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -598,12 +598,6 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { } // snapshot receivers - for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { - SSyncSnapshotReceiver* pReceiver = snapshotReceiverCreate(pSyncNode, i); - // ASSERT(pReceiver != NULL); - (pSyncNode->receivers)[i] = pReceiver; - } - pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, 100); // start in syncNodeStart @@ -705,13 +699,6 @@ void syncNodeClose(SSyncNode* pSyncNode) { } } - for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { - if ((pSyncNode->receivers)[i] != NULL) { - snapshotReceiverDestroy((pSyncNode->receivers)[i]); - (pSyncNode->receivers)[i] = NULL; - } - } - if (pSyncNode->pNewNodeReceiver != NULL) { snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver); pSyncNode->pNewNodeReceiver = NULL; @@ -1025,10 +1012,7 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { // snapshot receivers cJSON* pReceivers = cJSON_CreateArray(); - cJSON_AddItemToObject(pRoot, "receivers", pReceivers); - for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { - cJSON_AddItemToArray(pReceivers, snapshotReceiver2Json((pSyncNode->receivers)[i])); - } + cJSON_AddItemToObject(pRoot, "receiver", snapshotReceiver2Json(pSyncNode->pNewNodeReceiver)); } cJSON* pJson = cJSON_CreateObject(); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 6802fc0ed8..19f14e8e1f 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -284,7 +284,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) { cJSON *pJson = snapshotSender2Json(pSender); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -398,7 +398,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { cJSON *pJson = snapshotReceiver2Json(pReceiver); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -406,19 +406,8 @@ char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { // receiver do something int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { // get receiver - SSyncSnapshotReceiver *pReceiver = NULL; - for (int i = 0; i < pSyncNode->replicaNum; ++i) { - if (syncUtilSameId(&(pMsg->srcId), &((pSyncNode->replicasId)[i]))) { - pReceiver = (pSyncNode->receivers)[i]; - } - } - - // add new replica - if (pReceiver == NULL) { - pReceiver = pSyncNode->pNewNodeReceiver; - } - - bool needRsp = false; + SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; + bool needRsp = false; // state, term, seq/ack if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {