From 681993884f12727f777d59879ce8abb078e40c79 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 15 Jul 2022 14:55:33 +0800 Subject: [PATCH] refactor(sync): add leader transfer callback --- source/dnode/mnode/impl/src/mndMain.c | 16 +++++++++------- source/libs/sync/inc/syncInt.h | 1 + source/libs/sync/src/syncAppendEntries.c | 7 +++++++ source/libs/sync/src/syncMain.c | 5 ++++- 4 files changed, 21 insertions(+), 8 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 40ac4514c5..041fc2a2d1 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -371,13 +371,15 @@ void mndPreClose(SMnode *pMnode) { atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 0); syncLeaderTransfer(pMnode->syncMgmt.sync); - mDebug("vgId:1, mnode start leader transfer"); - // wait for leader transfer finish - while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) { - taosMsleep(10); - mDebug("vgId:1, mnode waiting for leader transfer"); - } - mDebug("vgId:1, mnode finish leader transfer"); + /* + mDebug("vgId:1, mnode start leader transfer"); + // wait for leader transfer finish + while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) { + taosMsleep(10); + mDebug("vgId:1, mnode waiting for leader transfer"); + } + mDebug("vgId:1, mnode finish leader transfer"); + */ } } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index e361c8021c..1f26033f63 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -253,6 +253,7 @@ bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg); int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode); int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader); +int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry); // for debug -------------- void syncNodePrint(SSyncNode* pObj); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 8409e1c711..dca80f6826 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -477,6 +477,13 @@ static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) { static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); + + // leader transfer + if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) { + int32_t code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry); + ASSERT(code == 0); + } + if (ths->pFsm != NULL) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta = {0}; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 80b2032ab7..387400c25e 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2598,7 +2598,7 @@ const char* syncStr(ESyncState state) { } } -static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { +int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg); syncNodeEventLog(ths, "do leader transfer"); @@ -2811,11 +2811,14 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ASSERT(code == 0); } +#if 0 + // execute in pre-commit // leader transfer if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) { code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry); ASSERT(code == 0); } +#endif // restore finish // if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok