refactor(sync): add leader transfer callback

This commit is contained in:
Minghao Li 2022-07-15 14:55:33 +08:00
parent 95023fd56e
commit 681993884f
4 changed files with 21 additions and 8 deletions

View File

@ -371,13 +371,15 @@ void mndPreClose(SMnode *pMnode) {
atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 0); atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 0);
syncLeaderTransfer(pMnode->syncMgmt.sync); syncLeaderTransfer(pMnode->syncMgmt.sync);
mDebug("vgId:1, mnode start leader transfer"); /*
// wait for leader transfer finish mDebug("vgId:1, mnode start leader transfer");
while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) { // wait for leader transfer finish
taosMsleep(10); while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) {
mDebug("vgId:1, mnode waiting for leader transfer"); taosMsleep(10);
} mDebug("vgId:1, mnode waiting for leader transfer");
mDebug("vgId:1, mnode finish leader transfer"); }
mDebug("vgId:1, mnode finish leader transfer");
*/
} }
} }

View File

@ -253,6 +253,7 @@ bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg);
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode); int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader); int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
// for debug -------------- // for debug --------------
void syncNodePrint(SSyncNode* pObj); void syncNodePrint(SSyncNode* pObj);

View File

@ -477,6 +477,13 @@ static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) {
static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) { static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);
// leader transfer
if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
int32_t code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
ASSERT(code == 0);
}
if (ths->pFsm != NULL) { if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta = {0}; SFsmCbMeta cbMeta = {0};

View File

@ -2598,7 +2598,7 @@ const char* syncStr(ESyncState state) {
} }
} }
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg); SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);
syncNodeEventLog(ths, "do leader transfer"); syncNodeEventLog(ths, "do leader transfer");
@ -2811,11 +2811,14 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
ASSERT(code == 0); ASSERT(code == 0);
} }
#if 0
// execute in pre-commit
// leader transfer // leader transfer
if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) { if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry); code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
ASSERT(code == 0); ASSERT(code == 0);
} }
#endif
// restore finish // restore finish
// if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok // if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok