From 0b20b914bbd5d7f0ff8a0f770fec3ff056dd470e Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 31 Oct 2022 14:17:26 +0800 Subject: [PATCH 1/2] refactor(sync): add sync local cmd --- include/libs/sync/sync.h | 2 ++ source/dnode/mnode/impl/src/mndMain.c | 5 ++++ source/dnode/vnode/src/vnd/vnodeSync.c | 5 ++++ source/libs/sync/src/syncMain.c | 39 +++++++++++++++++++++----- 4 files changed, 44 insertions(+), 7 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index b6c4a58150..95ee2ca2bc 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -232,6 +232,8 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader); int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex); int32_t syncEndSnapshot(int64_t rid); +int32_t syncStepDown(int64_t rid, SyncTerm newTerm); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 7207343582..98a24286f6 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -586,6 +586,11 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { SRpcMsg rsp = {.code = code, .info = pMsg->info}; tmsgSendRsp(&rsp); + } else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) { + SyncLocalCmd *pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg); + code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg); + syncLocalCmdDestroy(pSyncMsg); + } else { mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType)); code = -1; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 7acf5b4003..bf665fd6db 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -350,6 +350,11 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg); syncSnapshotRspDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) { + SyncLocalCmd *pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg); + code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg); + syncLocalCmdDestroy(pSyncMsg); + } else { vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType); code = -1; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 02e5c643a4..535db4181c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -528,6 +528,20 @@ int32_t syncEndSnapshot(int64_t rid) { return code; } +int32_t syncStepDown(int64_t rid, SyncTerm newTerm) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; + } + ASSERT(rid == pSyncNode->rid); + + syncNodeStepDown(pSyncNode, newTerm); + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return 0; +} + int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) { if (pSyncNode->peersNum == 0) { sDebug("only one replica, cannot leader transfer"); @@ -3041,12 +3055,6 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { SRpcMsg rpcMsg; syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg); -#if 1 - if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) { - syncNodeStepDown(ths, pMsg->term); - } -#endif - if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) { syncNodeResetElectTimer(ths); ths->minMatchIndex = pMsg->minMatchIndex; @@ -3058,6 +3066,12 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { #endif } +#if 1 + if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) { + syncNodeStepDown(ths, pMsg->term); + } +#endif + /* // htonl SMsgHead* pHead = rpcMsg.pCont; @@ -3081,6 +3095,17 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) { return 0; } +int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) { + if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) { + syncNodeStepDown(ths, pMsg->sdNewTerm); + + } else { + syncNodeErrorLog(ths, "error local cmd"); + } + + return 0; +} + // TLA+ Spec // ClientRequest(i, v) == // /\ state[i] = Leader @@ -3379,7 +3404,7 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde // ASSERT(code == 0); // ASSERT(pEntry != NULL); if (code != 0 || pEntry == NULL) { - syncNodeErrorLog(ths, "get log entry error"); + syncNodeErrorLog(ths, "get log entry error"); continue; } } From 77ba900776d63143cc95f5f26e2d5375703b4ffe Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 31 Oct 2022 16:12:40 +0800 Subject: [PATCH 2/2] refactor(sync): add sync local cmd, step down --- include/libs/sync/syncTools.h | 3 +++ source/libs/sync/inc/syncInt.h | 2 ++ source/libs/sync/src/syncMain.c | 31 +++++++++++++++++++++++++++--- source/libs/sync/src/syncMessage.c | 8 ++++++++ 4 files changed, 41 insertions(+), 3 deletions(-) diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index eedc403493..a1cff2b738 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -679,10 +679,13 @@ void syncReconfigFinishLog(const SyncReconfigFinish* pMsg); void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg); // --------------------------------------------- + typedef enum { SYNC_LOCAL_CMD_STEP_DOWN = 100, } ESyncLocalCmd; +const char* syncLocalCmdGetStr(int32_t cmd); + typedef struct SyncLocalCmd { uint32_t bytes; int32_t vgId; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index ae053328ab..f4949e1016 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -341,6 +341,8 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); +void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s); + // for debug -------------- void syncNodePrint(SSyncNode* pObj); void syncNodePrint2(char* s, SSyncNode* pObj); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 535db4181c..7142e8fb22 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -3066,11 +3066,27 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { #endif } -#if 1 if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) { - syncNodeStepDown(ths, pMsg->term); + // syncNodeStepDown(ths, pMsg->term); + SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId); + pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN; + pSyncMsg->sdNewTerm = pMsg->term; + + SRpcMsg rpcMsgLocalCmd; + syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd); + + if (ths->FpEqMsg != NULL && ths->msgcb != NULL) { + int32_t code = ths->FpEqMsg(ths->msgcb, &rpcMsgLocalCmd); + if (code != 0) { + sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code); + rpcFreeCont(rpcMsgLocalCmd.pCont); + } else { + sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRIu64, ths->vgId, pSyncMsg->sdNewTerm); + } + } + + syncLocalCmdDestroy(pSyncMsg); } -#endif /* // htonl @@ -3096,6 +3112,8 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) { } int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) { + syncLogRecvLocalCmd(ths, pMsg, ""); + if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) { syncNodeStepDown(ths, pMsg->sdNewTerm); @@ -3768,3 +3786,10 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p host, port, pMsg->term, pMsg->privateTerm, s); syncNodeEventLog(pSyncNode, logBuf); } + +void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRIu64 "}, %s", pMsg->cmd, + syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, s); + syncNodeEventLog(pSyncNode, logBuf); +} \ No newline at end of file diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 3c36633fe8..f9609d9c39 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -3097,6 +3097,14 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg) { } // --------------------------------------------- +const char* syncLocalCmdGetStr(int32_t cmd) { + if (cmd == SYNC_LOCAL_CMD_STEP_DOWN) { + return "step-down"; + } + + return "unknown-local-cmd"; +} + SyncLocalCmd* syncLocalCmdBuild(int32_t vgId) { uint32_t bytes = sizeof(SyncLocalCmd); SyncLocalCmd* pMsg = taosMemoryMalloc(bytes);