From f1a41f5a3777336b098925734a585c757fadf2ba Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 13 Nov 2022 17:21:30 +0800 Subject: [PATCH] refact: sync message headfile --- source/libs/sync/inc/syncInt.h | 15 +++++++++++++-- source/libs/sync/inc/syncMessage.h | 23 +++-------------------- source/libs/sync/inc/syncTimeout.h | 2 +- source/libs/sync/src/syncMain.c | 5 +---- source/libs/sync/src/syncMessage.c | 5 ++--- source/libs/sync/src/syncTimeout.c | 2 +- 6 files changed, 21 insertions(+), 31 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 2a3c07c2ef..940cc4f391 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -239,11 +239,22 @@ void syncNodeClose(SSyncNode* pSyncNode); void syncNodePreClose(SSyncNode* pSyncNode); int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak); -// option -bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode); SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex); +// on message --------------------- +int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pMsg); +int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex); +int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg); +int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg); +int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg); +int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg); +int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg); +int32_t syncNodeOnSnapshotReply(SSyncNode* ths, const SRpcMsg* pMsg); +int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg); +int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pMsg); +int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pMsg); + // timer control -------------- int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index ebd57edd73..92e7b555a4 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -236,26 +236,6 @@ typedef struct SyncLocalCmd { SyncIndex fcIndex; // follower commit index } SyncLocalCmd; -// on message ---------------------- -int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg); -int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg); -int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg); -int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg); -int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg); -int32_t syncNodeOnSnapshotReply(SSyncNode* ths, const SRpcMsg* pMsg); -int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg); -int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pMsg); -int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pMsg); - -int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex); - -// option ---------------------------------- -bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); -ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode); - -const char* syncTimerTypeStr( ESyncTimeoutType timerType); -const char* syncLocalCmdGetStr(ESyncLocalCmd cmd); - int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType ttype, uint64_t logicClock, int32_t ms, SSyncNode* pNode); int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginal, uint64_t seq, bool isWeak, int32_t vgId); int32_t syncBuildClientRequestFromNoopEntry(SRpcMsg* pMsg, const SSyncRaftEntry* pEntry, int32_t vgId); @@ -273,6 +253,9 @@ int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildLeaderTransfer(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildLocalCmd(SRpcMsg* pMsg, int32_t vgId); +const char* syncTimerTypeStr(ESyncTimeoutType timerType); +const char* syncLocalCmdGetStr(ESyncLocalCmd cmd); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncTimeout.h b/source/libs/sync/inc/syncTimeout.h index 85c8e8f58f..66f6e6ee18 100644 --- a/source/libs/sync/inc/syncTimeout.h +++ b/source/libs/sync/inc/syncTimeout.h @@ -34,7 +34,7 @@ extern "C" { // /\ voterLog' = [voterLog EXCEPT ![i] = [j \in {} |-> <<>>]] // /\ UNCHANGED <> // -int32_t syncNodeOnTimer(SSyncNode* ths, const SRpcMsg* pMsg); +int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 8f2d9ad4dd..33813af28a 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -138,7 +138,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { } else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) { code = syncNodeOnHeartbeatReply(pSyncNode, pMsg); } else if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { - code = syncNodeOnTimer(pSyncNode, pMsg); + code = syncNodeOnTimeout(pSyncNode, pMsg); } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL); } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { @@ -1034,9 +1034,6 @@ void syncNodeClose(SSyncNode* pSyncNode) { taosMemoryFree(pSyncNode); } -// option -// bool syncNodeSnapshotEnable(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; } - ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotStrategy; } // timer control -------------- diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 8da522a4cf..ce98419980 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -15,10 +15,7 @@ #define _DEFAULT_SOURCE #include "syncMessage.h" -#include "syncRaftCfg.h" #include "syncRaftEntry.h" -#include "syncUtil.h" -#include "tcoding.h" int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, SSyncNode* pNode) { @@ -189,6 +186,7 @@ int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId) { return 0; } +#if 0 int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId) { int32_t bytes = sizeof(SyncPreSnapshot); pMsg->pCont = rpcMallocCont(bytes); @@ -222,6 +220,7 @@ int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId) { pPreSnapshotReply->vgId = vgId; return 0; } +#endif int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) { int32_t bytes = sizeof(SyncSnapshotSend) + dataLen; diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 9bf2bb5697..7793ca4104 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -85,7 +85,7 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) { return 0; } -int32_t syncNodeOnTimer(SSyncNode* ths, const SRpcMsg* pRpc) { +int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pRpc) { int32_t ret = 0; SyncTimeout* pMsg = pRpc->pCont;