From 4a25963732d3c26ff25b13b8b3af7fc95bf906d4 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sat, 26 Nov 2022 10:46:57 +0800 Subject: [PATCH 1/3] refactor(sync): optimized heartbeat timer --- source/libs/sync/inc/syncEnv.h | 1 + source/libs/sync/inc/syncInt.h | 1 + source/libs/sync/src/syncMain.c | 71 +++++++++++++++++++++------------ 3 files changed, 47 insertions(+), 26 deletions(-) diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index 04e8e5edd4..265da9703d 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -29,6 +29,7 @@ extern "C" { #define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2) #define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN) #define HEARTBEAT_TIMER_MS 1000 +#define HEARTBEAT_TICK_NUM 20 typedef struct SSyncEnv { uint8_t isStart; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index e882f7461d..22ae922f62 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -61,6 +61,7 @@ typedef struct SSyncHbTimerData { SSyncTimer* pTimer; SRaftId destId; uint64_t logicClock; + int64_t execTime; int64_t rid; } SSyncHbTimerData; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 3de14b917f..7881ee0f3a 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -711,9 +711,10 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { pData->pTimer = pSyncTimer; pData->destId = pSyncTimer->destId; pData->logicClock = pSyncTimer->logicClock; + pData->execTime = taosGetTimestampMs() + pSyncTimer->timerMS; - taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid), syncEnv()->pTimerManager, - &pSyncTimer->pTimer); + taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid), + syncEnv()->pTimerManager, &pSyncTimer->pTimer); } else { sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId); } @@ -1979,6 +1980,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { int64_t hbDataRid = (int64_t)param; + int64_t tsNow = taosGetTimestampMs(); SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid); if (pData == NULL) { @@ -2023,36 +2025,53 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { int64_t msgLogicClock = atomic_load_64(&pData->logicClock); if (timerLogicClock == msgLogicClock) { + if (tsNow > pData->execTime) { +#if 0 + sTrace( + "vgId:%d, hbDataRid:%ld, EXECUTE this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, " + "---------", + pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime); +#endif + + pData->execTime += pSyncTimer->timerMS; + + SRpcMsg rpcMsg = {0}; + (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId); + + SyncHeartbeat* pSyncMsg = rpcMsg.pCont; + pSyncMsg->srcId = pSyncNode->myRaftId; + pSyncMsg->destId = pData->destId; + pSyncMsg->term = pSyncNode->pRaftStore->currentTerm; + pSyncMsg->commitIndex = pSyncNode->commitIndex; + pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); + pSyncMsg->privateTerm = 0; + pSyncMsg->timeStamp = taosGetTimestampMs(); + + // update reset time + int64_t timerElapsed = tsNow - pSyncTimer->timeStamp; + pSyncTimer->timeStamp = tsNow; + char logBuf[64]; + snprintf(logBuf, sizeof(logBuf), "timer-elapsed:%" PRId64 ", next-exec:%" PRId64, timerElapsed, + pData->execTime); + + // send msg + syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg, logBuf); + } else { +#if 0 + sTrace( + "vgId:%d, hbDataRid:%ld, pass this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, ---------", + pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime); +#endif + } + if (syncIsInit()) { // sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId); - taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid, syncEnv()->pTimerManager, - &pSyncTimer->pTimer); + taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid, + syncEnv()->pTimerManager, &pSyncTimer->pTimer); } else { sError("sync env is stop, reset peer hb timer error"); } - SRpcMsg rpcMsg = {0}; - (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId); - - SyncHeartbeat* pSyncMsg = rpcMsg.pCont; - pSyncMsg->srcId = pSyncNode->myRaftId; - pSyncMsg->destId = pData->destId; - pSyncMsg->term = pSyncNode->pRaftStore->currentTerm; - pSyncMsg->commitIndex = pSyncNode->commitIndex; - pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); - pSyncMsg->privateTerm = 0; - pSyncMsg->timeStamp = taosGetTimestampMs(); - - // update reset time - int64_t tsNow = taosGetTimestampMs(); - int64_t timerElapsed = tsNow - pSyncTimer->timeStamp; - pSyncTimer->timeStamp = tsNow; - char logBuf[64]; - snprintf(logBuf, sizeof(logBuf), "timer-elapsed:%" PRId64, timerElapsed); - - // send msg - syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg, logBuf); - } else { sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId, timerLogicClock, msgLogicClock); From 9026a46c6fc3cb4b9dfa6eaf3f8218d597109669 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 26 Nov 2022 11:54:38 +0800 Subject: [PATCH 2/3] fix: compile error --- source/libs/sync/inc/syncUtil.h | 2 +- source/libs/sync/src/syncMain.c | 5 +++-- source/libs/sync/src/syncReplication.c | 2 +- source/libs/sync/src/syncUtil.c | 6 +++--- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 8c0793a9ea..ce6ff25c89 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -94,7 +94,7 @@ void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const c void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); -void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed); +void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed, int64_t execTime); void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff); void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 00811b8497..74bc364bc6 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -698,6 +698,7 @@ static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRa static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { int32_t ret = 0; + int64_t tsNow = taosGetTimestampMs(); if (syncIsInit()) { SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid); if (pData == NULL) { @@ -705,13 +706,13 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { pData->rid = syncHbTimerDataAdd(pData); } pSyncTimer->hbDataRid = pData->rid; - pSyncTimer->timeStamp = taosGetTimestampMs(); + pSyncTimer->timeStamp = tsNow; pData->syncNodeRid = pSyncNode->rid; pData->pTimer = pSyncTimer; pData->destId = pSyncTimer->destId; pData->logicClock = pSyncTimer->logicClock; - pData->execTime = taosGetTimestampMs() + pSyncTimer->timerMS; + pData->execTime = tsNow + pSyncTimer->timerMS; taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid), syncEnv()->pTimerManager, &pSyncTimer->pTimer); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 9dbd9fb370..7d2a8b46fd 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -230,7 +230,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) { pSyncMsg->timeStamp = ts; // send msg - syncLogSendHeartbeat(pSyncNode, pSyncMsg, true, 0); + syncLogSendHeartbeat(pSyncNode, pSyncMsg, true, 0, 0); syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index ee383baac0..caf23ac84b 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -438,7 +438,7 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s); } -void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed) { +void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed, int64_t execTime) { if (!(sDebugFlag & DEBUG_TRACE)) return; char host[64]; @@ -453,8 +453,8 @@ void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool } else { sNTrace(pSyncNode, "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 - "}, timer-elapsed:%" PRId64, - host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timerElapsed); + "}, timer-elapsed:%" PRId64 ", next-exec:%" PRId64, + host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timerElapsed, execTime); } } From b6dbc462c8b5d8f41419ba0848d68f54bf95d872 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 26 Nov 2022 11:56:13 +0800 Subject: [PATCH 3/3] fix: compile error --- source/libs/sync/src/syncMain.c | 3 --- 1 file changed, 3 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 74bc364bc6..d0fe16aaaa 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2051,9 +2051,6 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { // update reset time int64_t timerElapsed = tsNow - pSyncTimer->timeStamp; pSyncTimer->timeStamp = tsNow; - char logBuf[64]; - snprintf(logBuf, sizeof(logBuf), "timer-elapsed:%" PRId64 ", next-exec:%" PRId64, timerElapsed, - pData->execTime); // send msg syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);