ehn/add-sync-heartbeat-sent-time-to-log

This commit is contained in:
dmchen 2024-11-18 18:25:52 +08:00
parent 6c835302a7
commit 472e8361ea
5 changed files with 93 additions and 17 deletions

View File

@ -29,6 +29,7 @@ typedef struct SSyncIndexMgr {
SyncTerm privateTerm[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA]; // for advanced function
int64_t startTimeArr[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
int64_t recvTimeArr[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
int64_t sentTimeArr[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
int32_t replicaNum;
int32_t totalReplicaNum;
SSyncNode *pNode;
@ -45,7 +46,9 @@ void syncIndexMgrCopyIfExist(SSyncIndexMgr * pNewIndex, SSyncIndexMgr
void syncIndexMgrSetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t startTime);
int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId);
void syncIndexMgrSetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t recvTime);
void syncIndexMgrSetSentTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t sentTime);
int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId);
int64_t syncIndexMgrGetSentTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId);
void syncIndexMgrSetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, SyncTerm term);
SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId);

View File

@ -46,12 +46,12 @@ extern "C" {
#define sLDebug(...) if (sDebugFlag & DEBUG_DEBUG) { taosPrintLongString("SYN ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); }
#define sLTrace(...) if (sDebugFlag & DEBUG_TRACE) { taosPrintLongString("SYN ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); }
#define sNFatal(pNode, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintNodeLog("SYN FATAL ", DEBUG_FATAL, 255, pNode, __VA_ARGS__); }
#define sNError(pNode, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintNodeLog("SYN ERROR ", DEBUG_ERROR, 255, pNode, __VA_ARGS__); }
#define sNWarn(pNode, ...) if (sDebugFlag & DEBUG_WARN) { syncPrintNodeLog("SYN WARN ", DEBUG_WARN, 255, pNode, __VA_ARGS__); }
#define sNInfo(pNode, ...) if (sDebugFlag & DEBUG_INFO) { syncPrintNodeLog("SYN ", DEBUG_INFO, 255, pNode, __VA_ARGS__); }
#define sNDebug(pNode, ...) if (sDebugFlag & DEBUG_DEBUG) { syncPrintNodeLog("SYN ", DEBUG_DEBUG, sDebugFlag, pNode, __VA_ARGS__); }
#define sNTrace(pNode, ...) if (sDebugFlag & DEBUG_TRACE) { syncPrintNodeLog("SYN ", DEBUG_TRACE, sDebugFlag, pNode, __VA_ARGS__); }
#define sNFatal(pNode, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintNodeLog("SYN FATAL ", DEBUG_FATAL, 255, true, pNode, __VA_ARGS__); }
#define sNError(pNode, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintNodeLog("SYN ERROR ", DEBUG_ERROR, 255, true, pNode, __VA_ARGS__); }
#define sNWarn(pNode, ...) if (sDebugFlag & DEBUG_WARN) { syncPrintNodeLog("SYN WARN ", DEBUG_WARN, 255, true, pNode, __VA_ARGS__); }
#define sNInfo(pNode, ...) if (sDebugFlag & DEBUG_INFO) { syncPrintNodeLog("SYN ", DEBUG_INFO, 255, true, pNode, __VA_ARGS__); }
#define sNDebug(pNode, ...) if (sDebugFlag & DEBUG_DEBUG) { syncPrintNodeLog("SYN ", DEBUG_DEBUG, sDebugFlag, false, pNode, __VA_ARGS__); }
#define sNTrace(pNode, ...) if (sDebugFlag & DEBUG_TRACE) { syncPrintNodeLog("SYN ", DEBUG_TRACE, sDebugFlag, false, pNode, __VA_ARGS__); }
#define sSFatal(pSender, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintSnapshotSenderLog("SYN FATAL ", DEBUG_FATAL, 255, pSender, __VA_ARGS__); }
#define sSError(pSender, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintSnapshotSenderLog("SYN ERROR ", DEBUG_ERROR, 255, pSender, __VA_ARGS__); }
@ -85,7 +85,8 @@ void syncUtilMsgHtoN(void* msg);
void syncUtilGenerateArbToken(int32_t nodeId, int32_t groupId, char* buf);
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...);
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool formatTime, SSyncNode* pNode,
const char* format, ...);
void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender,
const char* format, ...);
void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,

View File

@ -155,6 +155,18 @@ void syncIndexMgrSetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, i
DID(pRaftId), CID(pRaftId));
}
void syncIndexMgrSetSentTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t sentTime) {
for (int i = 0; i < pIndexMgr->totalReplicaNum; ++i) {
if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
(pIndexMgr->sentTimeArr)[i] = sentTime;
return;
}
}
sError("vgId:%d, indexmgr set sent-time:%" PRId64 " for dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, sentTime,
DID(pRaftId), CID(pRaftId));
}
int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) {
for (int i = 0; i < pIndexMgr->totalReplicaNum; ++i) {
if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
@ -168,6 +180,19 @@ int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId
return TSDB_CODE_SYN_INVALID_ID;
}
int64_t syncIndexMgrGetSentTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) {
for (int i = 0; i < pIndexMgr->totalReplicaNum; ++i) {
if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
int64_t recvTime = (pIndexMgr->sentTimeArr)[i];
return recvTime;
}
}
sError("vgId:%d, indexmgr get sent-time from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId),
CID(pRaftId));
return TSDB_CODE_SYN_INVALID_ID;
}
void syncIndexMgrSetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, SyncTerm term) {
for (int i = 0; i < pIndexMgr->totalReplicaNum; ++i) {
if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {

View File

@ -92,7 +92,12 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI
}
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) {
return syncNodeSendMsgById(destId, pSyncNode, pMsg);
TAOS_CHECK_RETURN(syncNodeSendMsgById(destId, pSyncNode, pMsg));
int64_t tsMs = taosGetTimestampMs();
syncIndexMgrSetSentTime(pSyncNode->pMatchIndex, destId, tsMs);
return TSDB_CODE_SUCCESS;
}
int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {

View File

@ -22,6 +22,7 @@
#include "syncRaftStore.h"
#include "syncSnapshot.h"
#include "tglobal.h"
#include "ttime.h"
static void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) {
int32_t len = tsnprintf(buf, bufLen, "{num:%d, as:%d, [", pCfg->replicaNum, pCfg->myIndex);
@ -109,12 +110,41 @@ void syncUtilGenerateArbToken(int32_t nodeId, int32_t groupId, char* buf) {
}
// for leader
static void syncHearbeatReplyTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
static void syncHearbeatReplyTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen, bool formatTime) {
int32_t len = 0;
len += tsnprintf(buf + len, bufLen - len, "%s", "{");
for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->replicasId[i]));
len += tsnprintf(buf + len, bufLen - len, "%d:%" PRId64, i, tsMs);
if (formatTime) {
char pBuf[TD_TIME_STR_LEN] = {0};
if (tsMs > 0) {
taosFormatUtcTime(pBuf, TD_TIME_STR_LEN, tsMs, TSDB_TIME_PRECISION_MILLI);
}
len += tsnprintf(buf + len, bufLen - len, "%d:%s", i, pBuf);
} else {
len += tsnprintf(buf + len, bufLen - len, "%d:%" PRId64, i, tsMs);
}
if (i < pSyncNode->replicaNum - 1) {
len += tsnprintf(buf + len, bufLen - len, "%s", ",");
}
}
len += tsnprintf(buf + len, bufLen - len, "%s", "}");
}
static void syncSentHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen, bool formatTime) {
int32_t len = 0;
len += tsnprintf(buf + len, bufLen - len, "%s", "{");
for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
int64_t tsMs = syncIndexMgrGetSentTime(pSyncNode->pMatchIndex, &(pSyncNode->replicasId[i]));
if (formatTime) {
char pBuf[TD_TIME_STR_LEN] = {0};
if (tsMs > 0) {
taosFormatUtcTime(pBuf, TD_TIME_STR_LEN, tsMs, TSDB_TIME_PRECISION_MILLI);
}
len += tsnprintf(buf + len, bufLen - len, "%d:%s", i, pBuf);
} else {
len += tsnprintf(buf + len, bufLen - len, "%d:%" PRId64, i, tsMs);
}
if (i < pSyncNode->replicaNum - 1) {
len += tsnprintf(buf + len, bufLen - len, "%s", ",");
}
@ -123,12 +153,20 @@ static void syncHearbeatReplyTime2Str(SSyncNode* pSyncNode, char* buf, int32_t b
}
// for follower
static void syncHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
static void syncHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen, bool formatTime) {
int32_t len = 0;
len += tsnprintf(buf + len, bufLen - len, "%s", "{");
for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->replicasId[i]));
len += tsnprintf(buf + len, bufLen - len, "%d:%" PRId64, i, tsMs);
if (formatTime) {
char pBuf[TD_TIME_STR_LEN] = {0};
if (tsMs > 0) {
taosFormatUtcTime(pBuf, TD_TIME_STR_LEN, tsMs, TSDB_TIME_PRECISION_MILLI);
}
len += tsnprintf(buf + len, bufLen - len, "%d:%s", i, pBuf);
} else {
len += tsnprintf(buf + len, bufLen - len, "%d:%" PRId64, i, tsMs);
}
if (i < pSyncNode->replicaNum - 1) {
len += tsnprintf(buf + len, bufLen - len, "%s", ",");
}
@ -173,7 +211,8 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
len += tsnprintf(buf + len, bufLen - len, "%s", "}");
}
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) {
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool formatTime, SSyncNode* pNode,
const char* format, ...) {
if (pNode == NULL || pNode->pLogStore == NULL) return;
int64_t currentTerm = raftStoreGetTerm(pNode);
@ -205,10 +244,13 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
syncLogBufferStates2Str(pNode, bufferStatesStr, sizeof(bufferStatesStr));
char hbrTimeStr[256] = "";
syncHearbeatReplyTime2Str(pNode, hbrTimeStr, sizeof(hbrTimeStr));
syncHearbeatReplyTime2Str(pNode, hbrTimeStr, sizeof(hbrTimeStr), formatTime);
char hbTimeStr[256] = "";
syncHearbeatTime2Str(pNode, hbTimeStr, sizeof(hbTimeStr));
syncHearbeatTime2Str(pNode, hbTimeStr, sizeof(hbTimeStr), formatTime);
char sentHbTimeStr[512] = "";
syncSentHearbeatTime2Str(pNode, sentHbTimeStr, sizeof(sentHbTimeStr), formatTime);
char eventLog[512]; // {0};
va_list argpointer;
@ -234,14 +276,14 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
", elect-times:%d, as-leader-times:%d, as-assigned-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, "
"aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64
", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64
", buffer:%s, repl-mgrs:%s, members:%s, hb:%s, hb-reply:%s, arb-token:%s",
", buffer:%s, repl-mgrs:%s, members:%s, send hb:%s:, recv hb:%s, recv hb-reply:%s, arb-token:%s",
pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, pNode->assignedCommitIndex,
appliedIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex,
snapshot.lastApplyTerm, pNode->electNum, pNode->becomeLeaderNum, pNode->becomeAssignedLeaderNum,
pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems, pNode->snapshottingIndex,
pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish,
syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, bufferStatesStr,
replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr, pNode->arbToken);
replMgrStatesStr, cfgStr, sentHbTimeStr, hbTimeStr, hbrTimeStr, pNode->arbToken);
}
}