refactor(sync): optimized heartbeat timer
This commit is contained in:
parent
93efefcbfc
commit
4a25963732
|
@ -29,6 +29,7 @@ extern "C" {
|
||||||
#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2)
|
#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2)
|
||||||
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
|
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
|
||||||
#define HEARTBEAT_TIMER_MS 1000
|
#define HEARTBEAT_TIMER_MS 1000
|
||||||
|
#define HEARTBEAT_TICK_NUM 20
|
||||||
|
|
||||||
typedef struct SSyncEnv {
|
typedef struct SSyncEnv {
|
||||||
uint8_t isStart;
|
uint8_t isStart;
|
||||||
|
|
|
@ -61,6 +61,7 @@ typedef struct SSyncHbTimerData {
|
||||||
SSyncTimer* pTimer;
|
SSyncTimer* pTimer;
|
||||||
SRaftId destId;
|
SRaftId destId;
|
||||||
uint64_t logicClock;
|
uint64_t logicClock;
|
||||||
|
int64_t execTime;
|
||||||
int64_t rid;
|
int64_t rid;
|
||||||
} SSyncHbTimerData;
|
} SSyncHbTimerData;
|
||||||
|
|
||||||
|
|
|
@ -711,9 +711,10 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
|
||||||
pData->pTimer = pSyncTimer;
|
pData->pTimer = pSyncTimer;
|
||||||
pData->destId = pSyncTimer->destId;
|
pData->destId = pSyncTimer->destId;
|
||||||
pData->logicClock = pSyncTimer->logicClock;
|
pData->logicClock = pSyncTimer->logicClock;
|
||||||
|
pData->execTime = taosGetTimestampMs() + pSyncTimer->timerMS;
|
||||||
|
|
||||||
taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid), syncEnv()->pTimerManager,
|
taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
|
||||||
&pSyncTimer->pTimer);
|
syncEnv()->pTimerManager, &pSyncTimer->pTimer);
|
||||||
} else {
|
} else {
|
||||||
sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
|
sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
|
||||||
}
|
}
|
||||||
|
@ -1979,6 +1980,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
|
||||||
|
|
||||||
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
||||||
int64_t hbDataRid = (int64_t)param;
|
int64_t hbDataRid = (int64_t)param;
|
||||||
|
int64_t tsNow = taosGetTimestampMs();
|
||||||
|
|
||||||
SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
|
SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
|
||||||
if (pData == NULL) {
|
if (pData == NULL) {
|
||||||
|
@ -2023,36 +2025,53 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
||||||
int64_t msgLogicClock = atomic_load_64(&pData->logicClock);
|
int64_t msgLogicClock = atomic_load_64(&pData->logicClock);
|
||||||
|
|
||||||
if (timerLogicClock == msgLogicClock) {
|
if (timerLogicClock == msgLogicClock) {
|
||||||
|
if (tsNow > pData->execTime) {
|
||||||
|
#if 0
|
||||||
|
sTrace(
|
||||||
|
"vgId:%d, hbDataRid:%ld, EXECUTE this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, "
|
||||||
|
"---------",
|
||||||
|
pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
pData->execTime += pSyncTimer->timerMS;
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
(void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);
|
||||||
|
|
||||||
|
SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
|
||||||
|
pSyncMsg->srcId = pSyncNode->myRaftId;
|
||||||
|
pSyncMsg->destId = pData->destId;
|
||||||
|
pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
|
||||||
|
pSyncMsg->commitIndex = pSyncNode->commitIndex;
|
||||||
|
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
|
||||||
|
pSyncMsg->privateTerm = 0;
|
||||||
|
pSyncMsg->timeStamp = taosGetTimestampMs();
|
||||||
|
|
||||||
|
// update reset time
|
||||||
|
int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
|
||||||
|
pSyncTimer->timeStamp = tsNow;
|
||||||
|
char logBuf[64];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "timer-elapsed:%" PRId64 ", next-exec:%" PRId64, timerElapsed,
|
||||||
|
pData->execTime);
|
||||||
|
|
||||||
|
// send msg
|
||||||
|
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg, logBuf);
|
||||||
|
} else {
|
||||||
|
#if 0
|
||||||
|
sTrace(
|
||||||
|
"vgId:%d, hbDataRid:%ld, pass this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, ---------",
|
||||||
|
pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
if (syncIsInit()) {
|
if (syncIsInit()) {
|
||||||
// sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId);
|
// sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId);
|
||||||
taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid, syncEnv()->pTimerManager,
|
taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid,
|
||||||
&pSyncTimer->pTimer);
|
syncEnv()->pTimerManager, &pSyncTimer->pTimer);
|
||||||
} else {
|
} else {
|
||||||
sError("sync env is stop, reset peer hb timer error");
|
sError("sync env is stop, reset peer hb timer error");
|
||||||
}
|
}
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
|
||||||
(void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);
|
|
||||||
|
|
||||||
SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
|
|
||||||
pSyncMsg->srcId = pSyncNode->myRaftId;
|
|
||||||
pSyncMsg->destId = pData->destId;
|
|
||||||
pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
|
|
||||||
pSyncMsg->commitIndex = pSyncNode->commitIndex;
|
|
||||||
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
|
|
||||||
pSyncMsg->privateTerm = 0;
|
|
||||||
pSyncMsg->timeStamp = taosGetTimestampMs();
|
|
||||||
|
|
||||||
// update reset time
|
|
||||||
int64_t tsNow = taosGetTimestampMs();
|
|
||||||
int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
|
|
||||||
pSyncTimer->timeStamp = tsNow;
|
|
||||||
char logBuf[64];
|
|
||||||
snprintf(logBuf, sizeof(logBuf), "timer-elapsed:%" PRId64, timerElapsed);
|
|
||||||
|
|
||||||
// send msg
|
|
||||||
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg, logBuf);
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
|
sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
|
||||||
timerLogicClock, msgLogicClock);
|
timerLogicClock, msgLogicClock);
|
||||||
|
|
Loading…
Reference in New Issue