Merge pull request #18472 from taosdata/feature/3.0_mhli

refactor(sync): optimized heartbeat timer
This commit is contained in:
Shengliang Guan 2022-11-26 15:45:03 +08:00 committed by GitHub
commit fdcdba625b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 52 additions and 31 deletions

View File

@ -29,6 +29,7 @@ extern "C" {
#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2) #define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2)
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN) #define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
#define HEARTBEAT_TIMER_MS 1000 #define HEARTBEAT_TIMER_MS 1000
#define HEARTBEAT_TICK_NUM 20
typedef struct SSyncEnv { typedef struct SSyncEnv {
uint8_t isStart; uint8_t isStart;

View File

@ -61,6 +61,7 @@ typedef struct SSyncHbTimerData {
SSyncTimer* pTimer; SSyncTimer* pTimer;
SRaftId destId; SRaftId destId;
uint64_t logicClock; uint64_t logicClock;
int64_t execTime;
int64_t rid; int64_t rid;
} SSyncHbTimerData; } SSyncHbTimerData;

View File

@ -94,7 +94,7 @@ void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const c
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed); void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed, int64_t execTime);
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff); void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff);
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);

View File

@ -698,6 +698,7 @@ static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRa
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
int32_t ret = 0; int32_t ret = 0;
int64_t tsNow = taosGetTimestampMs();
if (syncIsInit()) { if (syncIsInit()) {
SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid); SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
if (pData == NULL) { if (pData == NULL) {
@ -705,15 +706,16 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
pData->rid = syncHbTimerDataAdd(pData); pData->rid = syncHbTimerDataAdd(pData);
} }
pSyncTimer->hbDataRid = pData->rid; pSyncTimer->hbDataRid = pData->rid;
pSyncTimer->timeStamp = taosGetTimestampMs(); pSyncTimer->timeStamp = tsNow;
pData->syncNodeRid = pSyncNode->rid; pData->syncNodeRid = pSyncNode->rid;
pData->pTimer = pSyncTimer; pData->pTimer = pSyncTimer;
pData->destId = pSyncTimer->destId; pData->destId = pSyncTimer->destId;
pData->logicClock = pSyncTimer->logicClock; pData->logicClock = pSyncTimer->logicClock;
pData->execTime = tsNow + pSyncTimer->timerMS;
taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid), syncEnv()->pTimerManager, taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
&pSyncTimer->pTimer); syncEnv()->pTimerManager, &pSyncTimer->pTimer);
} else { } else {
sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId); sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
} }
@ -1979,6 +1981,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
int64_t hbDataRid = (int64_t)param; int64_t hbDataRid = (int64_t)param;
int64_t tsNow = taosGetTimestampMs();
SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid); SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
if (pData == NULL) { if (pData == NULL) {
@ -2023,35 +2026,51 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
int64_t msgLogicClock = atomic_load_64(&pData->logicClock); int64_t msgLogicClock = atomic_load_64(&pData->logicClock);
if (timerLogicClock == msgLogicClock) { if (timerLogicClock == msgLogicClock) {
if (tsNow > pData->execTime) {
#if 0
sTrace(
"vgId:%d, hbDataRid:%ld, EXECUTE this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, "
"---------",
pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
#endif
pData->execTime += pSyncTimer->timerMS;
SRpcMsg rpcMsg = {0};
(void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);
SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
pSyncMsg->srcId = pSyncNode->myRaftId;
pSyncMsg->destId = pData->destId;
pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
pSyncMsg->commitIndex = pSyncNode->commitIndex;
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
pSyncMsg->privateTerm = 0;
pSyncMsg->timeStamp = tsNow;
// update reset time
int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
pSyncTimer->timeStamp = tsNow;
// send msg
syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
} else {
#if 0
sTrace(
"vgId:%d, hbDataRid:%ld, pass this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, ---------",
pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
#endif
}
if (syncIsInit()) { if (syncIsInit()) {
// sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId); // sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId);
taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid, syncEnv()->pTimerManager, taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid,
&pSyncTimer->pTimer); syncEnv()->pTimerManager, &pSyncTimer->pTimer);
} else { } else {
sError("sync env is stop, reset peer hb timer error"); sError("sync env is stop, reset peer hb timer error");
} }
SRpcMsg rpcMsg = {0};
(void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);
// update reset time
int64_t tsNow = taosGetTimestampMs();
int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
pSyncTimer->timeStamp = tsNow;
SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
pSyncMsg->srcId = pSyncNode->myRaftId;
pSyncMsg->destId = pData->destId;
pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
pSyncMsg->commitIndex = pSyncNode->commitIndex;
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
pSyncMsg->privateTerm = 0;
pSyncMsg->timeStamp = tsNow;
// send msg
syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed);
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
} else { } else {
sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId, sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
timerLogicClock, msgLogicClock); timerLogicClock, msgLogicClock);

View File

@ -230,7 +230,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
pSyncMsg->timeStamp = ts; pSyncMsg->timeStamp = ts;
// send msg // send msg
syncLogSendHeartbeat(pSyncNode, pSyncMsg, true, 0); syncLogSendHeartbeat(pSyncNode, pSyncMsg, true, 0, 0);
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
} }

View File

@ -438,7 +438,7 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s); host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
} }
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed) { void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed, int64_t execTime) {
if (!(sDebugFlag & DEBUG_TRACE)) return; if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64]; char host[64];
@ -453,8 +453,8 @@ void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool
} else { } else {
sNTrace(pSyncNode, sNTrace(pSyncNode,
"send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
"}, timer-elapsed:%" PRId64, "}, timer-elapsed:%" PRId64 ", next-exec:%" PRId64,
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timerElapsed); host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timerElapsed, execTime);
} }
} }