refactor: adjust syncLogHeartbeat
This commit is contained in:
parent
5035e5e953
commit
39ddf9faf8
|
@ -48,7 +48,7 @@ extern "C" {
|
|||
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
||||
|
||||
int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode);
|
||||
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpcMsg* pMsg, const char* debugStr);
|
||||
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpcMsg* pMsg);
|
||||
|
||||
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
|
||||
int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot);
|
||||
|
|
|
@ -94,8 +94,8 @@ void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const c
|
|||
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
|
||||
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
|
||||
|
||||
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s);
|
||||
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s);
|
||||
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed);
|
||||
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff);
|
||||
|
||||
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
|
||||
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
|
||||
|
|
|
@ -2034,6 +2034,11 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
|||
SRpcMsg rpcMsg = {0};
|
||||
(void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);
|
||||
|
||||
// update reset time
|
||||
int64_t tsNow = taosGetTimestampMs();
|
||||
int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
|
||||
pSyncTimer->timeStamp = tsNow;
|
||||
|
||||
SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
|
||||
pSyncMsg->srcId = pSyncNode->myRaftId;
|
||||
pSyncMsg->destId = pData->destId;
|
||||
|
@ -2041,17 +2046,11 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
|||
pSyncMsg->commitIndex = pSyncNode->commitIndex;
|
||||
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
|
||||
pSyncMsg->privateTerm = 0;
|
||||
pSyncMsg->timeStamp = taosGetTimestampMs();
|
||||
|
||||
// update reset time
|
||||
int64_t tsNow = taosGetTimestampMs();
|
||||
int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
|
||||
pSyncTimer->timeStamp = tsNow;
|
||||
char logBuf[64];
|
||||
snprintf(logBuf, sizeof(logBuf), "timer-elapsed:%" PRId64, timerElapsed);
|
||||
pSyncMsg->timeStamp = tsNow;
|
||||
|
||||
// send msg
|
||||
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg, logBuf);
|
||||
syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed);
|
||||
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
|
||||
|
||||
} else {
|
||||
sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
|
||||
|
@ -2161,9 +2160,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
|
||||
int64_t tsMs = taosGetTimestampMs();
|
||||
int64_t timeDiff = tsMs - pMsg->timeStamp;
|
||||
char buf[128];
|
||||
snprintf(buf, sizeof(buf), "net elapsed:%" PRId64, timeDiff);
|
||||
syncLogRecvHeartbeat(ths, pMsg, buf);
|
||||
syncLogRecvHeartbeat(ths, pMsg, timeDiff);
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
(void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);
|
||||
|
@ -2173,7 +2170,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
pMsgReply->srcId = ths->myRaftId;
|
||||
pMsgReply->term = ths->pRaftStore->currentTerm;
|
||||
pMsgReply->privateTerm = 8864; // magic number
|
||||
pMsgReply->timeStamp = taosGetTimestampMs();
|
||||
pMsgReply->timeStamp = tsMs;
|
||||
|
||||
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
|
||||
syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
|
||||
|
|
|
@ -207,8 +207,7 @@ int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* dest
|
|||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg, const char* debugStr) {
|
||||
syncLogSendHeartbeat(pSyncNode, pMsg->pCont, debugStr);
|
||||
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) {
|
||||
return syncNodeSendMsgById(destId, pSyncNode, pMsg);
|
||||
}
|
||||
|
||||
|
@ -231,7 +230,8 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
|
|||
pSyncMsg->timeStamp = ts;
|
||||
|
||||
// send msg
|
||||
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg, "x");
|
||||
syncLogSendHeartbeat(pSyncNode, pSyncMsg, true, 0);
|
||||
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -430,25 +430,37 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
|
|||
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
|
||||
}
|
||||
|
||||
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
|
||||
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed) {
|
||||
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||
|
||||
char host[64];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||
|
||||
if (printX) {
|
||||
sNTrace(pSyncNode,
|
||||
"send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 "}, %s",
|
||||
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s);
|
||||
"send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
|
||||
"}, x",
|
||||
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp);
|
||||
} else {
|
||||
sNTrace(pSyncNode,
|
||||
"send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
|
||||
"}, timer-elapsed:%" PRId64,
|
||||
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timerElapsed);
|
||||
}
|
||||
}
|
||||
|
||||
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
|
||||
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff) {
|
||||
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||
|
||||
char host[64];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||
|
||||
sNTrace(pSyncNode,
|
||||
"recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
|
||||
"}, %s",
|
||||
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s);
|
||||
"}, net elapsed:%" PRId64,
|
||||
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timeDiff);
|
||||
}
|
||||
|
||||
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
|
||||
|
|
Loading…
Reference in New Issue