Merge pull request #28810 from taosdata/ehn/add-sync-heartbeat-sent-time-to-log
ehn:add-sync-heartbeat-sent-time-to-log
This commit is contained in:
commit
53d9ee5012
|
@ -29,6 +29,7 @@ typedef struct SSyncIndexMgr {
|
|||
SyncTerm privateTerm[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA]; // for advanced function
|
||||
int64_t startTimeArr[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
|
||||
int64_t recvTimeArr[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
|
||||
int64_t sentTimeArr[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
|
||||
int32_t replicaNum;
|
||||
int32_t totalReplicaNum;
|
||||
SSyncNode *pNode;
|
||||
|
@ -45,7 +46,9 @@ void syncIndexMgrCopyIfExist(SSyncIndexMgr * pNewIndex, SSyncIndexMgr
|
|||
void syncIndexMgrSetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t startTime);
|
||||
int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId);
|
||||
void syncIndexMgrSetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t recvTime);
|
||||
void syncIndexMgrSetSentTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t sentTime);
|
||||
int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId);
|
||||
int64_t syncIndexMgrGetSentTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId);
|
||||
void syncIndexMgrSetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, SyncTerm term);
|
||||
SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId);
|
||||
|
||||
|
|
|
@ -46,12 +46,12 @@ extern "C" {
|
|||
#define sLDebug(...) if (sDebugFlag & DEBUG_DEBUG) { taosPrintLongString("SYN ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); }
|
||||
#define sLTrace(...) if (sDebugFlag & DEBUG_TRACE) { taosPrintLongString("SYN ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); }
|
||||
|
||||
#define sNFatal(pNode, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintNodeLog("SYN FATAL ", DEBUG_FATAL, 255, pNode, __VA_ARGS__); }
|
||||
#define sNError(pNode, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintNodeLog("SYN ERROR ", DEBUG_ERROR, 255, pNode, __VA_ARGS__); }
|
||||
#define sNWarn(pNode, ...) if (sDebugFlag & DEBUG_WARN) { syncPrintNodeLog("SYN WARN ", DEBUG_WARN, 255, pNode, __VA_ARGS__); }
|
||||
#define sNInfo(pNode, ...) if (sDebugFlag & DEBUG_INFO) { syncPrintNodeLog("SYN ", DEBUG_INFO, 255, pNode, __VA_ARGS__); }
|
||||
#define sNDebug(pNode, ...) if (sDebugFlag & DEBUG_DEBUG) { syncPrintNodeLog("SYN ", DEBUG_DEBUG, sDebugFlag, pNode, __VA_ARGS__); }
|
||||
#define sNTrace(pNode, ...) if (sDebugFlag & DEBUG_TRACE) { syncPrintNodeLog("SYN ", DEBUG_TRACE, sDebugFlag, pNode, __VA_ARGS__); }
|
||||
#define sNFatal(pNode, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintNodeLog("SYN FATAL ", DEBUG_FATAL, 255, true, pNode, __VA_ARGS__); }
|
||||
#define sNError(pNode, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintNodeLog("SYN ERROR ", DEBUG_ERROR, 255, true, pNode, __VA_ARGS__); }
|
||||
#define sNWarn(pNode, ...) if (sDebugFlag & DEBUG_WARN) { syncPrintNodeLog("SYN WARN ", DEBUG_WARN, 255, true, pNode, __VA_ARGS__); }
|
||||
#define sNInfo(pNode, ...) if (sDebugFlag & DEBUG_INFO) { syncPrintNodeLog("SYN ", DEBUG_INFO, 255, true, pNode, __VA_ARGS__); }
|
||||
#define sNDebug(pNode, ...) if (sDebugFlag & DEBUG_DEBUG) { syncPrintNodeLog("SYN ", DEBUG_DEBUG, sDebugFlag, false, pNode, __VA_ARGS__); }
|
||||
#define sNTrace(pNode, ...) if (sDebugFlag & DEBUG_TRACE) { syncPrintNodeLog("SYN ", DEBUG_TRACE, sDebugFlag, false, pNode, __VA_ARGS__); }
|
||||
|
||||
#define sSFatal(pSender, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintSnapshotSenderLog("SYN FATAL ", DEBUG_FATAL, 255, pSender, __VA_ARGS__); }
|
||||
#define sSError(pSender, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintSnapshotSenderLog("SYN ERROR ", DEBUG_ERROR, 255, pSender, __VA_ARGS__); }
|
||||
|
@ -85,7 +85,8 @@ void syncUtilMsgHtoN(void* msg);
|
|||
|
||||
void syncUtilGenerateArbToken(int32_t nodeId, int32_t groupId, char* buf);
|
||||
|
||||
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...);
|
||||
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool formatTime, SSyncNode* pNode,
|
||||
const char* format, ...);
|
||||
void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender,
|
||||
const char* format, ...);
|
||||
void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
|
||||
|
|
|
@ -155,6 +155,18 @@ void syncIndexMgrSetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, i
|
|||
DID(pRaftId), CID(pRaftId));
|
||||
}
|
||||
|
||||
void syncIndexMgrSetSentTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t sentTime) {
|
||||
for (int i = 0; i < pIndexMgr->totalReplicaNum; ++i) {
|
||||
if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
|
||||
(pIndexMgr->sentTimeArr)[i] = sentTime;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
sError("vgId:%d, indexmgr set sent-time:%" PRId64 " for dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, sentTime,
|
||||
DID(pRaftId), CID(pRaftId));
|
||||
}
|
||||
|
||||
int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) {
|
||||
for (int i = 0; i < pIndexMgr->totalReplicaNum; ++i) {
|
||||
if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
|
||||
|
@ -168,6 +180,19 @@ int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId
|
|||
return TSDB_CODE_SYN_INVALID_ID;
|
||||
}
|
||||
|
||||
int64_t syncIndexMgrGetSentTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) {
|
||||
for (int i = 0; i < pIndexMgr->totalReplicaNum; ++i) {
|
||||
if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
|
||||
int64_t recvTime = (pIndexMgr->sentTimeArr)[i];
|
||||
return recvTime;
|
||||
}
|
||||
}
|
||||
|
||||
sError("vgId:%d, indexmgr get sent-time from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId),
|
||||
CID(pRaftId));
|
||||
return TSDB_CODE_SYN_INVALID_ID;
|
||||
}
|
||||
|
||||
void syncIndexMgrSetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, SyncTerm term) {
|
||||
for (int i = 0; i < pIndexMgr->totalReplicaNum; ++i) {
|
||||
if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
|
||||
|
|
|
@ -108,7 +108,13 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI
|
|||
}
|
||||
|
||||
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) {
|
||||
return syncNodeSendMsgById(destId, pSyncNode, pMsg);
|
||||
SRaftId destIdTmp = *destId;
|
||||
TAOS_CHECK_RETURN(syncNodeSendMsgById(destId, pSyncNode, pMsg));
|
||||
|
||||
int64_t tsMs = taosGetTimestampMs();
|
||||
syncIndexMgrSetSentTime(pSyncNode->pMatchIndex, &destIdTmp, tsMs);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#include "syncRaftStore.h"
|
||||
#include "syncSnapshot.h"
|
||||
#include "tglobal.h"
|
||||
#include "ttime.h"
|
||||
|
||||
static void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) {
|
||||
int32_t len = tsnprintf(buf, bufLen, "{num:%d, as:%d, [", pCfg->replicaNum, pCfg->myIndex);
|
||||
|
@ -108,13 +109,40 @@ void syncUtilGenerateArbToken(int32_t nodeId, int32_t groupId, char* buf) {
|
|||
(void)snprintf(buf, TSDB_ARB_TOKEN_SIZE, "d%d#g%d#%" PRId64 "#%d", nodeId, groupId, currentMs, randVal);
|
||||
}
|
||||
|
||||
static void syncPrintTime(bool formatTime, int32_t* len, int64_t tsMs, int32_t i, char* buf, int32_t bufLen) {
|
||||
if (formatTime) {
|
||||
char pBuf[TD_TIME_STR_LEN] = {0};
|
||||
if (tsMs > 0) {
|
||||
if (taosFormatUtcTime(pBuf, TD_TIME_STR_LEN, tsMs, TSDB_TIME_PRECISION_MILLI) != 0) {
|
||||
pBuf[0] = '\0';
|
||||
}
|
||||
}
|
||||
(*len) += tsnprintf(buf + (*len), bufLen - (*len), "%d:%s", i, pBuf);
|
||||
} else {
|
||||
(*len) += tsnprintf(buf + (*len), bufLen - (*len), "%d:%" PRId64, i, tsMs);
|
||||
}
|
||||
}
|
||||
|
||||
// for leader
|
||||
static void syncHearbeatReplyTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
|
||||
static void syncHearbeatReplyTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen, bool formatTime) {
|
||||
int32_t len = 0;
|
||||
len += tsnprintf(buf + len, bufLen - len, "%s", "{");
|
||||
for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
|
||||
int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->replicasId[i]));
|
||||
len += tsnprintf(buf + len, bufLen - len, "%d:%" PRId64, i, tsMs);
|
||||
syncPrintTime(formatTime, &len, tsMs, i, buf, bufLen);
|
||||
if (i < pSyncNode->replicaNum - 1) {
|
||||
len += tsnprintf(buf + len, bufLen - len, "%s", ",");
|
||||
}
|
||||
}
|
||||
len += tsnprintf(buf + len, bufLen - len, "%s", "}");
|
||||
}
|
||||
|
||||
static void syncSentHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen, bool formatTime) {
|
||||
int32_t len = 0;
|
||||
len += tsnprintf(buf + len, bufLen - len, "%s", "{");
|
||||
for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
|
||||
int64_t tsMs = syncIndexMgrGetSentTime(pSyncNode->pMatchIndex, &(pSyncNode->replicasId[i]));
|
||||
syncPrintTime(formatTime, &len, tsMs, i, buf, bufLen);
|
||||
if (i < pSyncNode->replicaNum - 1) {
|
||||
len += tsnprintf(buf + len, bufLen - len, "%s", ",");
|
||||
}
|
||||
|
@ -123,12 +151,12 @@ static void syncHearbeatReplyTime2Str(SSyncNode* pSyncNode, char* buf, int32_t b
|
|||
}
|
||||
|
||||
// for follower
|
||||
static void syncHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
|
||||
static void syncHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen, bool formatTime) {
|
||||
int32_t len = 0;
|
||||
len += tsnprintf(buf + len, bufLen - len, "%s", "{");
|
||||
for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
|
||||
int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->replicasId[i]));
|
||||
len += tsnprintf(buf + len, bufLen - len, "%d:%" PRId64, i, tsMs);
|
||||
syncPrintTime(formatTime, &len, tsMs, i, buf, bufLen);
|
||||
if (i < pSyncNode->replicaNum - 1) {
|
||||
len += tsnprintf(buf + len, bufLen - len, "%s", ",");
|
||||
}
|
||||
|
@ -174,7 +202,8 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
|
|||
len += tsnprintf(buf + len, bufLen - len, "%s", "}");
|
||||
}
|
||||
|
||||
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) {
|
||||
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool formatTime, SSyncNode* pNode,
|
||||
const char* format, ...) {
|
||||
if (pNode == NULL || pNode->pLogStore == NULL) return;
|
||||
int64_t currentTerm = raftStoreGetTerm(pNode);
|
||||
|
||||
|
@ -206,10 +235,13 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
|
|||
syncLogBufferStates2Str(pNode, bufferStatesStr, sizeof(bufferStatesStr));
|
||||
|
||||
char hbrTimeStr[256] = "";
|
||||
syncHearbeatReplyTime2Str(pNode, hbrTimeStr, sizeof(hbrTimeStr));
|
||||
syncHearbeatReplyTime2Str(pNode, hbrTimeStr, sizeof(hbrTimeStr), formatTime);
|
||||
|
||||
char hbTimeStr[256] = "";
|
||||
syncHearbeatTime2Str(pNode, hbTimeStr, sizeof(hbTimeStr));
|
||||
syncHearbeatTime2Str(pNode, hbTimeStr, sizeof(hbTimeStr), formatTime);
|
||||
|
||||
char sentHbTimeStr[512] = "";
|
||||
syncSentHearbeatTime2Str(pNode, sentHbTimeStr, sizeof(sentHbTimeStr), formatTime);
|
||||
|
||||
char eventLog[512]; // {0};
|
||||
va_list argpointer;
|
||||
|
@ -235,14 +267,14 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
|
|||
", elect-times:%d, as-leader-times:%d, as-assigned-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, "
|
||||
"aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64
|
||||
", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64
|
||||
", buffer:%s, repl-mgrs:%s, members:%s, hb:%s, hb-reply:%s, arb-token:%s, msg[sent:%d, recv:%d, slow-recev:%d]",
|
||||
", buffer:%s, repl-mgrs:%s, members:%s, send hb:%s, recv hb:%s, recv hb-reply:%s, arb-token:%s, msg[sent:%d, recv:%d, slow-recev:%d]",
|
||||
pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, pNode->assignedCommitIndex,
|
||||
appliedIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex,
|
||||
snapshot.lastApplyTerm, pNode->electNum, pNode->becomeLeaderNum, pNode->becomeAssignedLeaderNum,
|
||||
pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems, pNode->snapshottingIndex,
|
||||
pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish,
|
||||
syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, bufferStatesStr,
|
||||
replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr, pNode->arbToken, pNode->sendCount, pNode->recvCount,
|
||||
replMgrStatesStr, cfgStr, sentHbTimeStr, hbTimeStr, hbrTimeStr, pNode->arbToken, pNode->sendCount, pNode->recvCount,
|
||||
pNode->slowCount);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue