From 8d789bba31d31f389ff15fdb96ade77d1493123d Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 4 Mar 2022 15:48:09 +0800 Subject: [PATCH 1/4] sync enq --- include/libs/sync/sync.h | 13 ++-- source/libs/sync/inc/syncElection.h | 3 + source/libs/sync/inc/syncIO.h | 3 +- source/libs/sync/inc/syncInt.h | 28 ++++++-- source/libs/sync/inc/syncMessage.h | 5 ++ source/libs/sync/inc/syncReplication.h | 2 + source/libs/sync/src/syncElection.c | 4 ++ source/libs/sync/src/syncIO.c | 42 ++++++++--- source/libs/sync/src/syncMain.c | 79 ++++++++++++++++---- source/libs/sync/src/syncMessage.c | 53 ++++++++++++++ source/libs/sync/src/syncReplication.c | 2 + source/libs/sync/test/CMakeLists.txt | 14 ++++ source/libs/sync/test/syncEnqTest.cpp | 99 ++++++++++++++++++++++++++ source/libs/sync/test/syncPingTest.cpp | 10 +-- 14 files changed, 315 insertions(+), 42 deletions(-) create mode 100644 source/libs/sync/test/syncEnqTest.cpp diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 53fad4607a..d6fdadc1fb 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -138,6 +138,8 @@ typedef struct SSyncInfo { void* rpcClient; int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); + void* queue; + int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg); } SSyncInfo; @@ -147,13 +149,10 @@ typedef struct SSyncNode SSyncNode; int32_t syncInit(); void syncCleanUp(); -int64_t syncStart(const SSyncInfo* pSyncInfo); -void syncStop(int64_t rid); -int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); - -int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak); -// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak); - +int64_t syncStart(const SSyncInfo* pSyncInfo); +void syncStop(int64_t rid); +int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); +int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak); ESyncState syncGetMyRole(int64_t rid); void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole); diff --git a/source/libs/sync/inc/syncElection.h b/source/libs/sync/inc/syncElection.h index 7e9e637854..ed5b86fa98 100644 --- a/source/libs/sync/inc/syncElection.h +++ b/source/libs/sync/inc/syncElection.h @@ -26,6 +26,9 @@ extern "C" { #include "syncInt.h" #include "taosdef.h" +void syncNodeElect(SSyncNode* pSyncNode); +void syncNodeRequestVotePeers(SSyncNode* pSyncNode); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index 238948b403..40fa31d969 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -58,9 +58,10 @@ extern SSyncIO *gSyncIO; int32_t syncIOStart(char *host, uint16_t port); int32_t syncIOStop(); -int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg); int32_t syncIOTickQ(); int32_t syncIOTickPing(); +int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg); +int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index aedb9662b1..8268624501 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -113,6 +113,8 @@ typedef struct SSyncNode { char path[TSDB_FILENAME_LEN]; void* rpcClient; int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); + void* queue; + int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg); // init internal SNodeInfo me; @@ -149,19 +151,19 @@ typedef struct SSyncNode { // timer tmr_h pPingTimer; int32_t pingTimerMS; - uint8_t pingTimerStart; + uint8_t pingTimerEnable; TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp uint64_t pingTimerCounter; tmr_h pElectTimer; int32_t electTimerMS; - uint8_t electTimerStart; + uint8_t electTimerEnable; TAOS_TMR_CALLBACK FpElectTimer; // Timer Fp uint64_t electTimerCounter; tmr_h pHeartbeatTimer; int32_t heartbeatTimerMS; - uint8_t heartbeatTimerStart; + uint8_t heartbeatTimerEnable; TAOS_TMR_CALLBACK FpHeartbeatTimer; // Timer Fp uint64_t heartbeatTimerCounter; @@ -180,8 +182,24 @@ void syncNodeClose(SSyncNode* pSyncNode); void syncNodePingAll(SSyncNode* pSyncNode); void syncNodePingPeers(SSyncNode* pSyncNode); void syncNodePingSelf(SSyncNode* pSyncNode); -int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); -int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); + +int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); +int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); + +int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode); +int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode); +int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms); + +int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); +int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); +int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms); + +void syncNodeBecomeFollower(SSyncNode* pSyncNode); +void syncNodeBecomeLeader(SSyncNode* pSyncNode); +void syncNodeFollower2Candidate(SSyncNode* pSyncNode); +void syncNodeCandidate2Leader(SSyncNode* pSyncNode); +void syncNodeLeader2Follower(SSyncNode* pSyncNode); +void syncNodeCandidate2Follower(SSyncNode* pSyncNode); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 3057e23bc2..5aa27e616b 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -30,6 +30,7 @@ extern "C" { // encode as uint32 typedef enum ESyncMessageType { + SYNC_UNKNOWN = 99, SYNC_PING = 101, SYNC_PING_REPLY = 103, SYNC_CLIENT_REQUEST = 105, @@ -40,6 +41,10 @@ typedef enum ESyncMessageType { SYNC_APPEND_ENTRIES_REPLY = 115, } ESyncMessageType; +// --------------------------------------------- +cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg); +cJSON* syncRpcUnknownMsg2Json(); + // --------------------------------------------- typedef struct SyncPing { uint32_t bytes; diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index 7f97ae9e49..a9875d5cae 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -26,6 +26,8 @@ extern "C" { #include "syncInt.h" #include "taosdef.h" +void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 329105e2a1..433201b849 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -14,3 +14,7 @@ */ #include "syncElection.h" + +void syncNodeElect(SSyncNode* pSyncNode) {} + +void syncNodeRequestVotePeers(SSyncNode* pSyncNode) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 3ba145a96b..52cc0cda50 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -40,17 +40,6 @@ static void syncIOTickPingFunc(void *param, void *tmrId); // ---------------------------- // public function ------------ -int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { - sTrace( - "<--- syncIOSendMsg ---> clientRpc:%p, numOfEps:%d, inUse:%d, destAddr:%s-%u, pMsg->ahandle:%p, pMsg->handle:%p, " - "pMsg->msgType:%d, pMsg->contLen:%d", - clientRpc, pEpSet->numOfEps, pEpSet->inUse, pEpSet->eps[0].fqdn, pEpSet->eps[0].port, pMsg->ahandle, pMsg->handle, - pMsg->msgType, pMsg->contLen); - pMsg->handle = NULL; - rpcSendRequest(clientRpc, pEpSet, pMsg, NULL); - return 0; -} - int32_t syncIOStart(char *host, uint16_t port) { gSyncIO = syncIOCreate(host, port); assert(gSyncIO != NULL); @@ -83,6 +72,35 @@ int32_t syncIOTickPing() { return ret; } +int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { + sTrace( + "<--- syncIOSendMsg ---> clientRpc:%p, numOfEps:%d, inUse:%d, destAddr:%s-%u, pMsg->ahandle:%p, pMsg->handle:%p, " + "pMsg->msgType:%d, pMsg->contLen:%d", + clientRpc, pEpSet->numOfEps, pEpSet->inUse, pEpSet->eps[0].fqdn, pEpSet->eps[0].port, pMsg->ahandle, pMsg->handle, + pMsg->msgType, pMsg->contLen); + { + cJSON *pJson = syncRpcMsg2Json(pMsg); + char * serialized = cJSON_Print(pJson); + sTrace("process syncMessage send: pMsg:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + pMsg->handle = NULL; + rpcSendRequest(clientRpc, pEpSet, pMsg, NULL); + return 0; +} + +int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) { + SRpcMsg *pTemp; + pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + memcpy(pTemp, pMsg, sizeof(SRpcMsg)); + + STaosQueue *pMsgQ = queue; + taosWriteQitem(pMsgQ, pTemp); + + return 0; +} + // local function ------------ static int32_t syncIOStartInternal(SSyncIO *io) { taosBlockSIGPIPE(); @@ -215,6 +233,7 @@ static void *syncIOConsumerFunc(void *param) { syncPingFromRpcMsg(pRpcMsg, pSyncMsg); // memcpy(pSyncMsg, tmpRpcMsg.pCont, tmpRpcMsg.contLen); io->FpOnSyncPing(io->pSyncNode, pSyncMsg); + syncPingDestroy(pSyncMsg); } } else if (pRpcMsg->msgType == SYNC_PING_REPLY) { @@ -223,6 +242,7 @@ static void *syncIOConsumerFunc(void *param) { pSyncMsg = syncPingReplyBuild(pRpcMsg->contLen); syncPingReplyFromRpcMsg(pRpcMsg, pSyncMsg); io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg); + syncPingReplyDestroy(pSyncMsg); } } else { ; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 7e01e7e81c..4183bfbe9a 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -56,8 +56,6 @@ void syncStop(int64_t rid) {} int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; } -// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { return 0; } - int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; } ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; } @@ -76,6 +74,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->rpcClient = pSyncInfo->rpcClient; pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; + pSyncNode->queue = pSyncInfo->queue; + pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg; pSyncNode->me = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex]; pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1; @@ -93,7 +93,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->pPingTimer = NULL; pSyncNode->pingTimerMS = 1000; - atomic_store_8(&pSyncNode->pingTimerStart, 0); + atomic_store_8(&pSyncNode->pingTimerEnable, 0); pSyncNode->FpPingTimer = syncNodePingTimerCb; pSyncNode->pingTimerCounter = 0; @@ -148,22 +148,76 @@ void syncNodePingSelf(SSyncNode* pSyncNode) { int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { if (pSyncNode->pPingTimer == NULL) { pSyncNode->pPingTimer = - taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager); + taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager); } else { - taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager, + taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } - atomic_store_8(&pSyncNode->pingTimerStart, 1); + atomic_store_8(&pSyncNode->pingTimerEnable, 1); return 0; } int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { - atomic_store_8(&pSyncNode->pingTimerStart, 0); - pSyncNode->pingTimerCounter = TIMER_MAX_MS; + atomic_store_8(&pSyncNode->pingTimerEnable, 0); + pSyncNode->pingTimerMS = TIMER_MAX_MS; return 0; } +int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode) { + if (pSyncNode->pElectTimer == NULL) { + pSyncNode->pElectTimer = + taosTmrStart(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager); + } else { + taosTmrReset(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pElectTimer); + } + + atomic_store_8(&pSyncNode->electTimerEnable, 1); + return 0; +} + +int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) { + atomic_store_8(&pSyncNode->electTimerEnable, 0); + pSyncNode->electTimerMS = TIMER_MAX_MS; + return 0; +} + +int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; } + +int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { + if (pSyncNode->pHeartbeatTimer == NULL) { + pSyncNode->pHeartbeatTimer = + taosTmrStart(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager); + } else { + taosTmrReset(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pHeartbeatTimer); + } + + atomic_store_8(&pSyncNode->heartbeatTimerEnable, 1); + return 0; +} + +int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { + atomic_store_8(&pSyncNode->heartbeatTimerEnable, 0); + pSyncNode->heartbeatTimerMS = TIMER_MAX_MS; + return 0; +} + +int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; } + +void syncNodeBecomeFollower(SSyncNode* pSyncNode) {} + +void syncNodeBecomeLeader(SSyncNode* pSyncNode) {} + +void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {} + +void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {} + +void syncNodeLeader2Follower(SSyncNode* pSyncNode) {} + +void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {} + // ------ local funciton --------- static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { sTrace("syncNodePing pSyncNode:%p ", pSyncNode); @@ -204,7 +258,6 @@ static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pM } static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { - sTrace("syncNodeSendMsgById pSyncNode:%p ", pSyncNode); SEpSet epSet; syncUtilraftId2EpSet(destRaftId, &epSet); pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); @@ -225,7 +278,7 @@ static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { { cJSON* pJson = syncPing2Json(pMsg); char* serialized = cJSON_Print(pJson); - sTrace("syncNodeOnPingCb syncNodePing pMsg:%s ", serialized); + sTrace("process syncMessage recv: syncNodeOnPingCb pMsg:%s ", serialized); free(serialized); cJSON_Delete(pJson); } @@ -245,7 +298,7 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { { cJSON* pJson = syncPingReply2Json(pMsg); char* serialized = cJSON_Print(pJson); - sTrace("syncNodeOnPingReplyCb syncNodePing pMsg:%s ", serialized); + sTrace("process syncMessage recv: syncNodeOnPingReplyCb pMsg:%s ", serialized); free(serialized); cJSON_Delete(pJson); } @@ -275,7 +328,7 @@ static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesR static void syncNodePingTimerCb(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; - if (atomic_load_8(&pSyncNode->pingTimerStart)) { + if (atomic_load_8(&pSyncNode->pingTimerEnable)) { ++(pSyncNode->pingTimerCounter); // pSyncNode->pingTimerMS += 100; @@ -289,6 +342,6 @@ static void syncNodePingTimerCb(void* param, void* tmrId) { taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } else { - sTrace("syncNodePingTimerCb: pingTimerStart:%u ", pSyncNode->pingTimerStart); + sTrace("syncNodePingTimerCb: pingTimerEnable:%u ", pSyncNode->pingTimerEnable); } } \ No newline at end of file diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 4c44b4691c..4dbae9bcba 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -20,6 +20,59 @@ void onMessage(SRaft* pRaft, void* pMsg) {} +// --------------------------------------------- +cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { + cJSON* pRoot; + + if (pRpcMsg->msgType == SYNC_PING) { + SyncPing* pSyncMsg = (SyncPing*)pRpcMsg->pCont; + pRoot = syncPing2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_PING_REPLY) { + SyncPingReply* pSyncMsg = (SyncPingReply*)pRpcMsg->pCont; + pRoot = syncPingReply2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) { + pRoot = syncRpcUnknownMsg2Json(); + + } else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST_REPLY) { + pRoot = syncRpcUnknownMsg2Json(); + + } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) { + SyncRequestVote* pSyncMsg = (SyncRequestVote*)pRpcMsg->pCont; + pRoot = syncRequestVote2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) { + SyncRequestVoteReply* pSyncMsg = (SyncRequestVoteReply*)pRpcMsg->pCont; + pRoot = syncRequestVoteReply2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) { + SyncAppendEntries* pSyncMsg = (SyncAppendEntries*)pRpcMsg->pCont; + pRoot = syncAppendEntries2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) { + SyncAppendEntriesReply* pSyncMsg = (SyncAppendEntriesReply*)pRpcMsg->pCont; + pRoot = syncAppendEntriesReply2Json(pSyncMsg); + + } else { + pRoot = syncRpcUnknownMsg2Json(); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "RpcMsg", pRoot); + return pJson; +} + +cJSON* syncRpcUnknownMsg2Json() { + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "msgType", SYNC_UNKNOWN); + cJSON_AddStringToObject(pRoot, "data", "known message"); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncPing", pRoot); + return pJson; +} + // ---- message process SyncPing---- SyncPing* syncPingBuild(uint32_t dataLen) { uint32_t bytes = SYNC_PING_FIX_LEN + dataLen; diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 4cea7c150e..e3e551fd2b 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -14,3 +14,5 @@ */ #include "syncReplication.h" + +void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {} \ No newline at end of file diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 2913d230b2..4c5f7ffa56 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -8,6 +8,7 @@ add_executable(syncIOSendMsgTest "") add_executable(syncIOSendMsgClientTest "") add_executable(syncIOSendMsgServerTest "") add_executable(syncRaftStoreTest "") +add_executable(syncEnqTest "") target_sources(syncTest @@ -50,6 +51,10 @@ target_sources(syncRaftStoreTest PRIVATE "syncRaftStoreTest.cpp" ) +target_sources(syncEnqTest + PRIVATE + "syncEnqTest.cpp" +) target_include_directories(syncTest @@ -102,6 +107,11 @@ target_include_directories(syncRaftStoreTest "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncEnqTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -144,6 +154,10 @@ target_link_libraries(syncRaftStoreTest sync gtest_main ) +target_link_libraries(syncEnqTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncEnqTest.cpp b/source/libs/sync/test/syncEnqTest.cpp new file mode 100644 index 0000000000..57e2da6193 --- /dev/null +++ b/source/libs/sync/test/syncEnqTest.cpp @@ -0,0 +1,99 @@ +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t ports[3] = {7010, 7110, 7210}; + +SSyncNode* doSync(int myIndex) { + SSyncFSM* pFsm; + + SSyncInfo syncInfo; + syncInfo.vgId = 1; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping"); + + SSyncCfg* pCfg = &syncInfo.syncCfg; + pCfg->myIndex = myIndex; + pCfg->replicaNum = 3; + + pCfg->nodeInfo[0].nodePort = ports[0]; + snprintf(pCfg->nodeInfo[0].nodeFqdn, sizeof(pCfg->nodeInfo[0].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + + pCfg->nodeInfo[1].nodePort = ports[1]; + snprintf(pCfg->nodeInfo[1].nodeFqdn, sizeof(pCfg->nodeInfo[1].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); + + pCfg->nodeInfo[2].nodePort = ports[2]; + snprintf(pCfg->nodeInfo[2].nodeFqdn, sizeof(pCfg->nodeInfo[2].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); + + SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); + assert(pSyncNode != NULL); + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->pSyncNode = pSyncNode; + + return pSyncNode; +} + +void timerPingAll(void* param, void* tmrId) { + SSyncNode* pSyncNode = (SSyncNode*)param; + syncNodePingAll(pSyncNode); +} + +int main(int argc, char** argv) { + // taosInitLog((char*)"syncPingTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + logTest(); + + int myIndex = 0; + if (argc >= 2) { + myIndex = atoi(argv[1]); + if (myIndex > 2 || myIndex < 0) { + fprintf(stderr, "myIndex:%d error. should be 0 - 2", myIndex); + return 1; + } + } + + int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); + assert(ret == 0); + + ret = syncEnvStart(); + assert(ret == 0); + + SSyncNode* pSyncNode = doSync(myIndex); + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + + for (int i = 0; i < 10; ++i) { + SyncPingReply* pSyncMsg = syncPingReplyBuild3(&pSyncNode->raftId, &pSyncNode->raftId); + SRpcMsg rpcMsg; + syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + taosMsleep(1000); + } + + while (1) { + taosMsleep(1000); + } + + return 0; +} diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index 8268128347..06ba5ba6ce 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -22,6 +22,8 @@ SSyncNode* doSync(int myIndex) { syncInfo.vgId = 1; syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping"); @@ -80,11 +82,9 @@ int main(int argc, char** argv) { ret = syncNodeStartPingTimer(pSyncNode); assert(ret == 0); - /* - taosMsleep(10000); - ret = syncNodeStopPingTimer(pSyncNode); - assert(ret == 0); - */ + taosMsleep(10000); + ret = syncNodeStopPingTimer(pSyncNode); + assert(ret == 0); while (1) { taosMsleep(1000); From 98b35306998cd9e0be9c27be9b782b8d09ec12aa Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 4 Mar 2022 16:54:25 +0800 Subject: [PATCH 2/4] sync timeout --- source/libs/sync/inc/syncIO.h | 7 ++++--- source/libs/sync/inc/syncInt.h | 4 ++++ source/libs/sync/inc/syncMessage.h | 14 ++++++++++++++ source/libs/sync/src/syncIO.c | 8 +++++--- source/libs/sync/src/syncMain.c | 6 ++++++ source/libs/sync/test/syncEnqTest.cpp | 2 +- 6 files changed, 34 insertions(+), 7 deletions(-) diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index 40fa31d969..a948de8ac1 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -31,11 +31,11 @@ extern "C" { typedef struct SSyncIO { STaosQueue *pMsgQ; - STaosQset * pQset; + STaosQset *pQset; pthread_t consumerTid; - void * serverRpc; - void * clientRpc; + void *serverRpc; + void *clientRpc; SEpSet myAddr; void *ioTimerTickQ; @@ -49,6 +49,7 @@ typedef struct SSyncIO { int32_t (*FpOnSyncRequestVoteReply)(SSyncNode *pSyncNode, SyncRequestVoteReply *pMsg); int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg); int32_t (*FpOnSyncAppendEntriesReply)(SSyncNode *pSyncNode, SyncAppendEntriesReply *pMsg); + int32_t (*FpOnSyncTimeout)(SSyncNode *pSyncNode, SyncTimeout *pMsg); int8_t isStart; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 8268624501..d67b419b24 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -71,6 +71,9 @@ extern int32_t sDebugFlag; struct SRaft; typedef struct SRaft SRaft; +struct SyncTimeout; +typedef struct SyncTimeout SyncTimeout; + struct SyncPing; typedef struct SyncPing SyncPing; @@ -174,6 +177,7 @@ typedef struct SSyncNode { int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg); int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg); + int32_t (*FpOnTimeout)(SSyncNode* pSyncNode, SyncTimeout* pMsg); } SSyncNode; diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 5aa27e616b..a2e745b3d9 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -39,12 +39,26 @@ typedef enum ESyncMessageType { SYNC_REQUEST_VOTE_REPLY = 111, SYNC_APPEND_ENTRIES = 113, SYNC_APPEND_ENTRIES_REPLY = 115, + SYNC_TIMEOUT = 117, } ESyncMessageType; // --------------------------------------------- cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg); cJSON* syncRpcUnknownMsg2Json(); +// --------------------------------------------- +typedef enum ESyncTimeoutType { + SYNC_TIMEOUT_PING = 0, + SYNC_TIMEOUT_ELECTION, + SYNC_TIMEOUT_HEARTBEAT, + +} ESyncTimeoutType; + +typedef struct SyncTimeout { + ESyncTimeoutType type; + void* data; +} SyncTimeout; + // --------------------------------------------- typedef struct SyncPing { uint32_t bytes; diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 52cc0cda50..ffd982f233 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -80,7 +80,7 @@ int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { pMsg->msgType, pMsg->contLen); { cJSON *pJson = syncRpcMsg2Json(pMsg); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); sTrace("process syncMessage send: pMsg:%s ", serialized); free(serialized); cJSON_Delete(pJson); @@ -211,7 +211,7 @@ static void *syncIOConsumerFunc(void *param) { SSyncIO *io = param; STaosQall *qall; - SRpcMsg * pRpcMsg, rpcMsg; + SRpcMsg *pRpcMsg, rpcMsg; int type; qall = taosAllocateQall(); @@ -244,7 +244,9 @@ static void *syncIOConsumerFunc(void *param) { io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg); syncPingReplyDestroy(pSyncMsg); } - } else { + + } else if (pRpcMsg->msgType == SYNC_TIMEOUT) { + } else { ; } } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 4183bfbe9a..57bc754735 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -37,6 +37,7 @@ static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); +static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg); // --------------------------------- int32_t syncInit() { @@ -326,6 +327,11 @@ static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesR return ret; } +static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { + int32_t ret = 0; + return ret; +} + static void syncNodePingTimerCb(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; if (atomic_load_8(&pSyncNode->pingTimerEnable)) { diff --git a/source/libs/sync/test/syncEnqTest.cpp b/source/libs/sync/test/syncEnqTest.cpp index 57e2da6193..0bf43f933e 100644 --- a/source/libs/sync/test/syncEnqTest.cpp +++ b/source/libs/sync/test/syncEnqTest.cpp @@ -85,7 +85,7 @@ int main(int argc, char** argv) { for (int i = 0; i < 10; ++i) { SyncPingReply* pSyncMsg = syncPingReplyBuild3(&pSyncNode->raftId, &pSyncNode->raftId); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); taosMsleep(1000); From b75da82c964dcc1c864507a2abc07240aff03f6f Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sat, 5 Mar 2022 12:28:34 +0800 Subject: [PATCH 3/4] sync timeout --- source/libs/sync/inc/syncIO.h | 6 +-- source/libs/sync/inc/syncMessage.h | 20 ++++++-- source/libs/sync/src/syncIO.c | 9 +++- source/libs/sync/src/syncMain.c | 53 ++++++++++++++++++-- source/libs/sync/src/syncMessage.c | 67 +++++++++++++++++++++++++- source/libs/sync/test/syncPingTest.cpp | 1 + 6 files changed, 144 insertions(+), 12 deletions(-) diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index a948de8ac1..160fefd086 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -31,11 +31,11 @@ extern "C" { typedef struct SSyncIO { STaosQueue *pMsgQ; - STaosQset *pQset; + STaosQset * pQset; pthread_t consumerTid; - void *serverRpc; - void *clientRpc; + void * serverRpc; + void * clientRpc; SEpSet myAddr; void *ioTimerTickQ; diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index a2e745b3d9..b022044528 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -30,7 +30,8 @@ extern "C" { // encode as uint32 typedef enum ESyncMessageType { - SYNC_UNKNOWN = 99, + SYNC_UNKNOWN = 77, + SYNC_TIMEOUT = 99, SYNC_PING = 101, SYNC_PING_REPLY = 103, SYNC_CLIENT_REQUEST = 105, @@ -39,7 +40,7 @@ typedef enum ESyncMessageType { SYNC_REQUEST_VOTE_REPLY = 111, SYNC_APPEND_ENTRIES = 113, SYNC_APPEND_ENTRIES_REPLY = 115, - SYNC_TIMEOUT = 117, + } ESyncMessageType; // --------------------------------------------- @@ -48,17 +49,28 @@ cJSON* syncRpcUnknownMsg2Json(); // --------------------------------------------- typedef enum ESyncTimeoutType { - SYNC_TIMEOUT_PING = 0, + SYNC_TIMEOUT_PING = 100, SYNC_TIMEOUT_ELECTION, SYNC_TIMEOUT_HEARTBEAT, } ESyncTimeoutType; typedef struct SyncTimeout { - ESyncTimeoutType type; + uint32_t bytes; + uint32_t msgType; + ESyncTimeoutType timeoutType; void* data; } SyncTimeout; +SyncTimeout* syncTimeoutBuild(); +void syncTimeoutDestroy(SyncTimeout* pMsg); +void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen); +void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg); +void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg); +void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg); +cJSON* syncTimeout2Json(const SyncTimeout* pMsg); +SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, void* data); + // --------------------------------------------- typedef struct SyncPing { uint32_t bytes; diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index ffd982f233..55c4d57201 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -246,7 +246,14 @@ static void *syncIOConsumerFunc(void *param) { } } else if (pRpcMsg->msgType == SYNC_TIMEOUT) { - } else { + if (io->FpOnSyncTimeout != NULL) { + SyncTimeout *pSyncMsg; + pSyncMsg = syncTimeoutBuild(); + syncTimeoutFromRpcMsg(pRpcMsg, pSyncMsg); + io->FpOnSyncTimeout(io->pSyncNode, pSyncMsg); + syncTimeoutDestroy(pSyncMsg); + } + } else { ; } } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 57bc754735..5f3a276a68 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -27,6 +27,10 @@ static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNo static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); static void syncNodePingTimerCb(void* param, void* tmrId); +static void syncNodeEqPingTimer(void* param, void* tmrId); +static void syncNodeEqElectTimer(void* param, void* tmrId); +static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); + static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg); static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg); @@ -95,7 +99,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->pPingTimer = NULL; pSyncNode->pingTimerMS = 1000; atomic_store_8(&pSyncNode->pingTimerEnable, 0); - pSyncNode->FpPingTimer = syncNodePingTimerCb; + pSyncNode->FpPingTimer = syncNodeEqPingTimer; pSyncNode->pingTimerCounter = 0; pSyncNode->FpOnPing = syncNodeOnPingCb; @@ -104,6 +108,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb; pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb; pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb; + pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb; return pSyncNode; } @@ -329,6 +334,27 @@ static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesR static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { int32_t ret = 0; + sTrace("<-- syncNodeOnTimeoutCb -->"); + + { + cJSON* pJson = syncTimeout2Json(pMsg); + char* serialized = cJSON_Print(pJson); + sTrace("process syncMessage recv: syncNodeOnTimeoutCb pMsg:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + if (pMsg->timeoutType == SYNC_TIMEOUT_PING) { + if (atomic_load_8(&ths->pingTimerEnable)) { + ++(ths->pingTimerCounter); + syncNodePingAll(ths); + } + + } else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) { + } else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) { + } else { + } + return ret; } @@ -336,7 +362,6 @@ static void syncNodePingTimerCb(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; if (atomic_load_8(&pSyncNode->pingTimerEnable)) { ++(pSyncNode->pingTimerCounter); - // pSyncNode->pingTimerMS += 100; sTrace( "syncNodePingTimerCb: pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, " @@ -350,4 +375,26 @@ static void syncNodePingTimerCb(void* param, void* tmrId) { } else { sTrace("syncNodePingTimerCb: pingTimerEnable:%u ", pSyncNode->pingTimerEnable); } -} \ No newline at end of file +} + +static void syncNodeEqPingTimer(void* param, void* tmrId) { + SSyncNode* pSyncNode = (SSyncNode*)param; + if (atomic_load_8(&pSyncNode->pingTimerEnable)) { + // pSyncNode->pingTimerMS += 100; + + SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, pSyncNode); + SRpcMsg rpcMsg; + syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + syncTimeoutDestroy(pSyncMsg); + + taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, + &pSyncNode->pPingTimer); + } else { + sTrace("syncNodeEqPingTimer: pingTimerEnable:%u ", pSyncNode->pingTimerEnable); + } +} + +static void syncNodeEqElectTimer(void* param, void* tmrId) {} + +static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 4dbae9bcba..f1434947a1 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -24,7 +24,12 @@ void onMessage(SRaft* pRaft, void* pMsg) {} cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { cJSON* pRoot; - if (pRpcMsg->msgType == SYNC_PING) { + // in compiler optimization, switch case = if else constants + if (pRpcMsg->msgType == SYNC_TIMEOUT) { + SyncTimeout* pSyncMsg = (SyncTimeout*)pRpcMsg->pCont; + pRoot = syncTimeout2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_PING) { SyncPing* pSyncMsg = (SyncPing*)pRpcMsg->pCont; pRoot = syncPing2Json(pSyncMsg); @@ -73,6 +78,66 @@ cJSON* syncRpcUnknownMsg2Json() { return pJson; } +// ---- message process SyncTimeout---- +SyncTimeout* syncTimeoutBuild() { + uint32_t bytes = sizeof(SyncTimeout); + SyncTimeout* pMsg = malloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = SYNC_TIMEOUT; + return pMsg; +} + +void syncTimeoutDestroy(SyncTimeout* pMsg) { + if (pMsg != NULL) { + free(pMsg); + } +} + +void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen) { + assert(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg) { + memcpy(pMsg, buf, len); + assert(len == pMsg->bytes); +} + +void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncTimeoutSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg) { + syncTimeoutDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +cJSON* syncTimeout2Json(const SyncTimeout* pMsg) { + char u64buf[128]; + + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType); + snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data); + cJSON_AddStringToObject(pRoot, "data", u64buf); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncTimeout", pRoot); + return pJson; +} + +SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, void* data) { + SyncTimeout* pMsg = syncTimeoutBuild(); + pMsg->timeoutType = timeoutType; + pMsg->data = data; + return pMsg; +} + // ---- message process SyncPing---- SyncPing* syncPingBuild(uint32_t dataLen) { uint32_t bytes = SYNC_PING_FIX_LEN + dataLen; diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index 06ba5ba6ce..77413a713b 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -78,6 +78,7 @@ int main(int argc, char** argv) { SSyncNode* pSyncNode = doSync(myIndex); gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; ret = syncNodeStartPingTimer(pSyncNode); assert(ret == 0); From b7e22154b281f36035e702c97fe868bf3024187f Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sat, 5 Mar 2022 15:03:49 +0800 Subject: [PATCH 4/4] sync timeout --- source/libs/sync/inc/syncInt.h | 9 ++-- source/libs/sync/inc/syncUtil.h | 4 +- source/libs/sync/inc/syncVoteMgr.h | 27 +++++++++- source/libs/sync/src/syncMain.c | 20 -------- source/libs/sync/src/syncUtil.c | 7 ++- source/libs/sync/src/syncVoteMgr.c | 80 ++++++++++++++++++++++++++++++ 6 files changed, 118 insertions(+), 29 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index d67b419b24..2b62f9b8b3 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -121,14 +121,17 @@ typedef struct SSyncNode { // init internal SNodeInfo me; + SRaftId raftId; + int32_t peersNum; SNodeInfo peers[TSDB_MAX_REPLICA]; + SRaftId peersId[TSDB_MAX_REPLICA]; + + int32_t replicaNum; + SRaftId replicasId[TSDB_MAX_REPLICA]; // raft algorithm SSyncFSM* pFsm; - SRaftId raftId; - SRaftId peersId[TSDB_MAX_REPLICA]; - int32_t replicaNum; int32_t quorum; // life cycle diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 93d2c12525..e1078d5738 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -39,8 +39,9 @@ void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet); void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId); +bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2); + // ---- SSyncBuffer ---- -#if 0 void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len); void syncUtilbufDestroy(SSyncBuffer* syncBuf); @@ -48,7 +49,6 @@ void syncUtilbufDestroy(SSyncBuffer* syncBuf); void syncUtilbufCopy(const SSyncBuffer* src, SSyncBuffer* dest); void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest); -#endif #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncVoteMgr.h b/source/libs/sync/inc/syncVoteMgr.h index b841f2e316..e2307e9e66 100644 --- a/source/libs/sync/inc/syncVoteMgr.h +++ b/source/libs/sync/inc/syncVoteMgr.h @@ -24,13 +24,36 @@ extern "C" { #include #include #include "syncInt.h" +#include "syncMessage.h" +#include "syncUtil.h" #include "taosdef.h" typedef struct SVotesGranted { + SyncTerm term; + int32_t quorum; + int32_t votes; + bool toLeader; + SSyncNode *pSyncNode; } SVotesGranted; -typedef struct SVotesResponded { -} SVotesResponded; +SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode); +void voteGrantedDestroy(SVotesGranted *pVotesGranted); +bool voteGrantedMajority(SVotesGranted *pVotesGranted); +void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg); +void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term); + +typedef struct SVotesRespond { + SRaftId (*replicas)[TSDB_MAX_REPLICA]; + bool isRespond[TSDB_MAX_REPLICA]; + int32_t replicaNum; + SyncTerm term; + SSyncNode *pSyncNode; +} SVotesRespond; + +SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode); +bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId); +void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg); +void Reset(SVotesRespond *pVotesRespond, SyncTerm term); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 5f3a276a68..6a0663dd57 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -25,7 +25,6 @@ static int32_t tsNodeRefId = -1; // ------ local funciton --------- static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); -static void syncNodePingTimerCb(void* param, void* tmrId); static void syncNodeEqPingTimer(void* param, void* tmrId); static void syncNodeEqElectTimer(void* param, void* tmrId); @@ -358,25 +357,6 @@ static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { return ret; } -static void syncNodePingTimerCb(void* param, void* tmrId) { - SSyncNode* pSyncNode = (SSyncNode*)param; - if (atomic_load_8(&pSyncNode->pingTimerEnable)) { - ++(pSyncNode->pingTimerCounter); - - sTrace( - "syncNodePingTimerCb: pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, " - "tmrId:%p ", - pSyncNode->pingTimerCounter, pSyncNode->pingTimerMS, pSyncNode->pPingTimer, tmrId); - - syncNodePingAll(pSyncNode); - - taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, - &pSyncNode->pPingTimer); - } else { - sTrace("syncNodePingTimerCb: pingTimerEnable:%u ", pSyncNode->pingTimerEnable); - } -} - static void syncNodeEqPingTimer(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; if (atomic_load_8(&pSyncNode->pingTimerEnable)) { diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index b4959a810b..7b4d6ee366 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -68,8 +68,12 @@ void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaft raftId->vgId = vgId; } +bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) { + bool ret = pId1->addr == pId2->addr && pId1->vgId == pId2->vgId; + return ret; +} + // ---- SSyncBuffer ----- -#if 0 void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len) { syncBuf->len = len; syncBuf->data = malloc(syncBuf->len); @@ -87,4 +91,3 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) { dest->data = malloc(dest->len); memcpy(dest->data, src->data, dest->len); } -#endif \ No newline at end of file diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c index 02cf4ac033..c9f0ceab57 100644 --- a/source/libs/sync/src/syncVoteMgr.c +++ b/source/libs/sync/src/syncVoteMgr.c @@ -14,3 +14,83 @@ */ #include "syncVoteMgr.h" + +SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) { + SVotesGranted *pVotesGranted = malloc(sizeof(SVotesGranted)); + assert(pVotesGranted != NULL); + memset(pVotesGranted, 0, sizeof(SVotesGranted)); + + pVotesGranted->quorum = pSyncNode->quorum; + pVotesGranted->term = 0; + pVotesGranted->votes = 0; + pVotesGranted->toLeader = false; + pVotesGranted->pSyncNode = pSyncNode; + + return pVotesGranted; +} + +void voteGrantedDestroy(SVotesGranted *pVotesGranted) { + if (pVotesGranted != NULL) { + free(pVotesGranted); + } +} + +bool voteGrantedMajority(SVotesGranted *pVotesGranted) { + bool ret = pVotesGranted->votes >= pVotesGranted->quorum; + return ret; +} + +void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) { + assert(pMsg->voteGranted == true); + assert(pMsg->term == pVotesGranted->term); + pVotesGranted->votes++; +} + +void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) { + pVotesGranted->term = term; + pVotesGranted->votes = 0; + pVotesGranted->toLeader = false; +} + +SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) { + SVotesRespond *pVotesRespond = malloc(sizeof(SVotesRespond)); + assert(pVotesRespond != NULL); + memset(pVotesRespond, 0, sizeof(SVotesRespond)); + + pVotesRespond->replicas = &(pSyncNode->replicasId); + pVotesRespond->replicaNum = pSyncNode->replicaNum; + pVotesRespond->term = 0; + pVotesRespond->pSyncNode = pSyncNode; + + return pVotesRespond; +} + +bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) { + bool ret = false; + for (int i = 0; i < pVotesRespond->replicaNum; ++i) { + if (syncUtilSameId(&(*pVotesRespond->replicas)[i], pRaftId) && pVotesRespond->isRespond[i]) { + ret = true; + break; + } + } + return ret; +} + +void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg) { + assert(pVotesRespond->term == pMsg->term); + for (int i = 0; i < pVotesRespond->replicaNum; ++i) { + if (syncUtilSameId(&(*pVotesRespond->replicas)[i], &pMsg->srcId)) { + assert(pVotesRespond->isRespond[i] == false); + pVotesRespond->isRespond[i] = true; + return; + } + } + assert(0); +} + +void Reset(SVotesRespond *pVotesRespond, SyncTerm term) { + pVotesRespond->term = term; + for (int i = 0; i < pVotesRespond->replicaNum; ++i) { + pVotesRespond->isRespond[i] = false; + } +} \ No newline at end of file