From 97a71fd81e93a48e4939311ecb10db3e454026c3 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sun, 6 Mar 2022 17:59:24 +0800 Subject: [PATCH] sync refactor --- source/libs/sync/inc/syncEnv.h | 1 + source/libs/sync/inc/syncInt.h | 8 ++++--- source/libs/sync/inc/syncMessage.h | 4 ++-- source/libs/sync/src/syncIO.c | 4 ++-- source/libs/sync/src/syncMain.c | 24 ++++++++++++------- source/libs/sync/src/syncMessage.c | 5 +++- .../sync/test/syncIOSendMsgClientTest.cpp | 2 +- .../sync/test/syncIOSendMsgServerTest.cpp | 2 +- source/libs/sync/test/syncIOSendMsgTest.cpp | 2 +- source/libs/sync/test/syncIOTickPingTest.cpp | 2 +- source/libs/sync/test/syncIOTickQTest.cpp | 2 +- source/libs/sync/test/syncPingTest.cpp | 11 +++++++++ source/libs/sync/test/syncRaftStoreTest.cpp | 4 ++-- source/libs/sync/test/syncTest.cpp | 2 +- 14 files changed, 48 insertions(+), 25 deletions(-) diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index 7e60822a28..44d0efb033 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -29,6 +29,7 @@ extern "C" { #include "ttimer.h" #define TIMER_MAX_MS 0x7FFFFFFF +#define PING_TIMER_MS 1000 typedef struct SSyncEnv { tmr_h pEnvTickTimer; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 8d85ac8ccb..0ee33f0912 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -154,9 +154,11 @@ typedef struct SSyncNode { SyncIndex commitIndex; // timer - tmr_h pPingTimer; - int32_t pingTimerMS; - uint8_t pingTimerEnable; + tmr_h pPingTimer; + int32_t pingTimerMS; + // uint8_t pingTimerEnable; + uint64_t pingTimerLogicClock; + uint64_t pingTimerLogicClockUser; TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp uint64_t pingTimerCounter; diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 54a29004d6..95135d161b 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -52,13 +52,13 @@ typedef enum ESyncTimeoutType { SYNC_TIMEOUT_PING = 100, SYNC_TIMEOUT_ELECTION, SYNC_TIMEOUT_HEARTBEAT, - } ESyncTimeoutType; typedef struct SyncTimeout { uint32_t bytes; uint32_t msgType; ESyncTimeoutType timeoutType; + uint64_t logicClock; void* data; } SyncTimeout; @@ -69,7 +69,7 @@ void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg); void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg); cJSON* syncTimeout2Json(const SyncTimeout* pMsg); -SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, void* data); +SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, void* data); // --------------------------------------------- typedef struct SyncPing { diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 354240cc6f..3250d4c303 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -80,7 +80,7 @@ int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { pMsg->msgType, pMsg->contLen); { cJSON *pJson = syncRpcMsg2Json(pMsg); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); sTrace("process syncMessage send: pMsg:%s ", serialized); free(serialized); cJSON_Delete(pJson); @@ -211,7 +211,7 @@ static void *syncIOConsumerFunc(void *param) { SSyncIO *io = param; STaosQall *qall; - SRpcMsg *pRpcMsg, rpcMsg; + SRpcMsg * pRpcMsg, rpcMsg; int type; qall = taosAllocateQall(); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index fe2025ae45..ed7ab23b0c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -99,8 +99,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId); pSyncNode->pPingTimer = NULL; - pSyncNode->pingTimerMS = 1000; - atomic_store_8(&pSyncNode->pingTimerEnable, 0); + pSyncNode->pingTimerMS = PING_TIMER_MS; + atomic_store_64(&pSyncNode->pingTimerLogicClock, 0); + atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0); pSyncNode->FpPingTimer = syncNodeEqPingTimer; pSyncNode->pingTimerCounter = 0; @@ -154,6 +155,9 @@ void syncNodePingSelf(SSyncNode* pSyncNode) { } int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { + atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); + pSyncNode->pingTimerMS = PING_TIMER_MS; + if (pSyncNode->pPingTimer == NULL) { pSyncNode->pPingTimer = taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager); @@ -162,12 +166,11 @@ int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { &pSyncNode->pPingTimer); } - atomic_store_8(&pSyncNode->pingTimerEnable, 1); return 0; } int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { - atomic_store_8(&pSyncNode->pingTimerEnable, 0); + atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1); pSyncNode->pingTimerMS = TIMER_MAX_MS; return 0; } @@ -301,7 +304,7 @@ static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { } if (pMsg->timeoutType == SYNC_TIMEOUT_PING) { - if (atomic_load_8(&ths->pingTimerEnable)) { + if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) { ++(ths->pingTimerCounter); syncNodePingAll(ths); } @@ -316,11 +319,13 @@ static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { static void syncNodeEqPingTimer(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; - if (atomic_load_8(&pSyncNode->pingTimerEnable)) { + if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) { // pSyncNode->pingTimerMS += 100; - SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, pSyncNode); - SRpcMsg rpcMsg; + SyncTimeout* pSyncMsg = + syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock), pSyncNode); + + SRpcMsg rpcMsg; syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); syncTimeoutDestroy(pSyncMsg); @@ -328,7 +333,8 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } else { - sTrace("syncNodeEqPingTimer: pingTimerEnable:%u ", pSyncNode->pingTimerEnable); + sTrace("syncNodeEqPingTimer: pingTimerLogicClock:%lu, pingTimerLogicClockUser:%lu", pSyncNode->pingTimerLogicClock, + pSyncNode->pingTimerLogicClockUser); } } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index f1434947a1..d1b4a6a2c6 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -123,6 +123,8 @@ cJSON* syncTimeout2Json(const SyncTimeout* pMsg) { cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->logicClock); + cJSON_AddStringToObject(pRoot, "logicClock", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data); cJSON_AddStringToObject(pRoot, "data", u64buf); @@ -131,9 +133,10 @@ cJSON* syncTimeout2Json(const SyncTimeout* pMsg) { return pJson; } -SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, void* data) { +SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, void* data) { SyncTimeout* pMsg = syncTimeoutBuild(); pMsg->timeoutType = timeoutType; + pMsg->logicClock = logicClock; pMsg->data = data; return pMsg; } diff --git a/source/libs/sync/test/syncIOSendMsgClientTest.cpp b/source/libs/sync/test/syncIOSendMsgClientTest.cpp index 0d06c4f811..83ac724789 100644 --- a/source/libs/sync/test/syncIOSendMsgClientTest.cpp +++ b/source/libs/sync/test/syncIOSendMsgClientTest.cpp @@ -1,5 +1,5 @@ -#include #include +#include #include "syncIO.h" #include "syncInt.h" #include "syncRaftStore.h" diff --git a/source/libs/sync/test/syncIOSendMsgServerTest.cpp b/source/libs/sync/test/syncIOSendMsgServerTest.cpp index 1582e097d3..b0f177962f 100644 --- a/source/libs/sync/test/syncIOSendMsgServerTest.cpp +++ b/source/libs/sync/test/syncIOSendMsgServerTest.cpp @@ -1,5 +1,5 @@ -#include #include +#include #include "syncIO.h" #include "syncInt.h" #include "syncRaftStore.h" diff --git a/source/libs/sync/test/syncIOSendMsgTest.cpp b/source/libs/sync/test/syncIOSendMsgTest.cpp index 9b2cfcf1d8..c25ad3b1dd 100644 --- a/source/libs/sync/test/syncIOSendMsgTest.cpp +++ b/source/libs/sync/test/syncIOSendMsgTest.cpp @@ -1,5 +1,5 @@ -#include #include +#include #include "syncIO.h" #include "syncInt.h" #include "syncRaftStore.h" diff --git a/source/libs/sync/test/syncIOTickPingTest.cpp b/source/libs/sync/test/syncIOTickPingTest.cpp index 42b9a73432..777fc035e2 100644 --- a/source/libs/sync/test/syncIOTickPingTest.cpp +++ b/source/libs/sync/test/syncIOTickPingTest.cpp @@ -1,5 +1,5 @@ -#include #include +#include #include "syncIO.h" #include "syncInt.h" #include "syncRaftStore.h" diff --git a/source/libs/sync/test/syncIOTickQTest.cpp b/source/libs/sync/test/syncIOTickQTest.cpp index 3d31c596bf..5615058cc3 100644 --- a/source/libs/sync/test/syncIOTickQTest.cpp +++ b/source/libs/sync/test/syncIOTickQTest.cpp @@ -1,5 +1,5 @@ -#include #include +#include #include "syncIO.h" #include "syncInt.h" #include "syncRaftStore.h" diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index 77413a713b..4e5f7b78b5 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -84,6 +84,17 @@ int main(int argc, char** argv) { assert(ret == 0); taosMsleep(10000); + + ret = syncNodeStopPingTimer(pSyncNode); + assert(ret == 0); + + taosMsleep(10000); + + ret = syncNodeStartPingTimer(pSyncNode); + assert(ret == 0); + + taosMsleep(10000); + ret = syncNodeStopPingTimer(pSyncNode); assert(ret == 0); diff --git a/source/libs/sync/test/syncRaftStoreTest.cpp b/source/libs/sync/test/syncRaftStoreTest.cpp index 9cfbe5a4ea..447dab0cbc 100644 --- a/source/libs/sync/test/syncRaftStoreTest.cpp +++ b/source/libs/sync/test/syncRaftStoreTest.cpp @@ -1,6 +1,6 @@ -#include -#include #include "syncRaftStore.h" +//#include +#include #include "syncIO.h" #include "syncInt.h" diff --git a/source/libs/sync/test/syncTest.cpp b/source/libs/sync/test/syncTest.cpp index c1c5658aba..1f9f4846cc 100644 --- a/source/libs/sync/test/syncTest.cpp +++ b/source/libs/sync/test/syncTest.cpp @@ -1,5 +1,5 @@ -#include #include +#include #include "syncIO.h" #include "syncInt.h" #include "syncRaftStore.h"