From 1a4b7622e418c8a5c8aa8263f317db7e8e3dff50 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 3 Nov 2022 09:39:20 +0800 Subject: [PATCH 1/3] enh(sync): add sync pre stop --- include/libs/sync/sync.h | 1 + source/dnode/mnode/impl/src/mndMain.c | 1 + source/dnode/vnode/src/vnd/vnodeOpen.c | 1 + source/libs/sync/inc/syncInt.h | 1 + source/libs/sync/src/syncMain.c | 17 +++++++++++++++++ 5 files changed, 21 insertions(+) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 694a6ef62b..b6ff93ec85 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -211,6 +211,7 @@ void syncCleanUp(); int64_t syncOpen(SSyncInfo* pSyncInfo); void syncStart(int64_t rid); void syncStop(int64_t rid); +void syncPreStop(int64_t rid); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak); int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index f604a9289a..1b2d85bd29 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -429,6 +429,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { void mndPreClose(SMnode *pMnode) { if (pMnode != NULL) { syncLeaderTransfer(pMnode->syncMgmt.sync); + syncPreStop(pMnode->syncMgmt.sync); } } diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 0696ec0901..f7164c4ac3 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -245,6 +245,7 @@ _err: void vnodePreClose(SVnode *pVnode) { if (pVnode) { syncLeaderTransfer(pVnode->sync); + syncPreStop(pVnode->sync); } } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 6ec29d69f5..a5ff653b69 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -218,6 +218,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo); void syncNodeStart(SSyncNode* pSyncNode); void syncNodeStartStandBy(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode); +void syncNodePreClose(SSyncNode* pSyncNode); int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak); // option diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index eb78ea2894..81077e5361 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -81,6 +81,15 @@ void syncStop(int64_t rid) { } } +void syncPreStop(int64_t rid) { + SSyncNode* pSyncNode = syncNodeAcquire(rid); + if (pSyncNode == NULL) return; + + syncNodePreClose(pSyncNode); + + syncNodeRelease(pSyncNode); +} + static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) { if (!syncNodeInConfig(pSyncNode, pCfg)) return false; return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1; @@ -1222,6 +1231,14 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { ASSERT(ret == 0); } +void syncNodePreClose(SSyncNode* pSyncNode) { + // stop elect timer + syncNodeStopElectTimer(pSyncNode); + + // stop heartbeat timer + syncNodeStopHeartbeatTimer(pSyncNode); +} + void syncNodeClose(SSyncNode* pSyncNode) { if (pSyncNode == NULL) { return; From 102969086da4cb6a58cc27c7dc00e47f8a067db3 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 3 Nov 2022 10:25:38 +0800 Subject: [PATCH 2/3] refactor(sync): add local-cmd:follower-commit --- source/libs/sync/inc/syncTools.h | 2 ++ source/libs/sync/src/syncAppendEntries.c | 5 +++++ source/libs/sync/src/syncMain.c | 23 +++++++++++++++++++--- source/libs/sync/src/syncMessage.c | 5 +++++ source/libs/sync/test/syncLocalCmdTest.cpp | 1 + 5 files changed, 33 insertions(+), 3 deletions(-) diff --git a/source/libs/sync/inc/syncTools.h b/source/libs/sync/inc/syncTools.h index b48519a5b0..2d87fcf7fa 100644 --- a/source/libs/sync/inc/syncTools.h +++ b/source/libs/sync/inc/syncTools.h @@ -729,6 +729,7 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg); typedef enum { SYNC_LOCAL_CMD_STEP_DOWN = 100, + SYNC_LOCAL_CMD_FOLLOWER_CMT, } ESyncLocalCmd; const char* syncLocalCmdGetStr(int32_t cmd); @@ -742,6 +743,7 @@ typedef struct SyncLocalCmd { int32_t cmd; SyncTerm sdNewTerm; // step down new term + SyncIndex fcIndex;// follower commit index } SyncLocalCmd; diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 4638475e71..f0e296d872 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -90,6 +90,11 @@ // int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) { + if (ths->state != TAOS_SYNC_STATE_FOLLOWER) { + syncNodeEventLog(ths, "can not do follower commit"); + return -1; + } + // maybe update commit index, leader notice me if (newCommitIndex > ths->commitIndex) { // has commit entry in local diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 81077e5361..014ad0425d 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2842,11 +2842,25 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { syncNodeResetElectTimer(ths); ths->minMatchIndex = pMsg->minMatchIndex; -#if 0 if (ths->state == TAOS_SYNC_STATE_FOLLOWER) { - syncNodeFollowerCommit(ths, pMsg->commitIndex); + // syncNodeFollowerCommit(ths, pMsg->commitIndex); + SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId); + pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT; + pSyncMsg->fcIndex = pMsg->commitIndex; + + SRpcMsg rpcMsgLocalCmd; + syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd); + + if (ths->syncEqMsg != NULL && ths->msgcb != NULL) { + int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd); + if (code != 0) { + sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code); + rpcFreeCont(rpcMsgLocalCmd.pCont); + } else { + sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index: %" PRIu64, ths->vgId, pSyncMsg->fcIndex); + } + } } -#endif } if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) { @@ -2900,6 +2914,9 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) { if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) { syncNodeStepDown(ths, pMsg->sdNewTerm); + } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) { + syncNodeFollowerCommit(ths, pMsg->fcIndex); + } else { syncNodeErrorLog(ths, "error local cmd"); } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 91e8ec91b7..d0df931a88 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -3400,6 +3400,8 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg) { const char* syncLocalCmdGetStr(int32_t cmd) { if (cmd == SYNC_LOCAL_CMD_STEP_DOWN) { return "step-down"; + } else if (cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) { + return "follower-commit"; } return "unknown-local-cmd"; @@ -3511,6 +3513,9 @@ cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg) { snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->sdNewTerm); cJSON_AddStringToObject(pRoot, "sd-new-term", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->fcIndex); + cJSON_AddStringToObject(pRoot, "fc-index", u64buf); } cJSON* pJson = cJSON_CreateObject(); diff --git a/source/libs/sync/test/syncLocalCmdTest.cpp b/source/libs/sync/test/syncLocalCmdTest.cpp index de908bf9c1..b42626df29 100644 --- a/source/libs/sync/test/syncLocalCmdTest.cpp +++ b/source/libs/sync/test/syncLocalCmdTest.cpp @@ -21,6 +21,7 @@ SyncLocalCmd *createMsg() { pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); pMsg->destId.vgId = 100; pMsg->sdNewTerm = 123; + pMsg->fcIndex = 456; pMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN; return pMsg; From 1fc79e289d5b56a8964c56a2f16e8caec31a514d Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 3 Nov 2022 11:50:24 +0800 Subject: [PATCH 3/3] refactor(sync): modify leader transfer --- source/libs/sync/src/syncMain.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 014ad0425d..5cd1ba3025 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -444,8 +444,12 @@ int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) { return -1; } - SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0]; - int32_t ret = syncNodeLeaderTransferTo(pSyncNode, newLeader); + int32_t ret = 0; + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0]; + ret = syncNodeLeaderTransferTo(pSyncNode, newLeader); + } + return ret; }