diff --git a/source/libs/sync/inc/syncIndexMgr.h b/source/libs/sync/inc/syncIndexMgr.h index 3c372a3b12..ed7a17b4c7 100644 --- a/source/libs/sync/inc/syncIndexMgr.h +++ b/source/libs/sync/inc/syncIndexMgr.h @@ -29,6 +29,7 @@ typedef struct SSyncIndexMgr { SyncTerm privateTerm[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA]; // for advanced function int64_t startTimeArr[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA]; int64_t recvTimeArr[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA]; + int64_t sentTimeArr[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA]; int32_t replicaNum; int32_t totalReplicaNum; SSyncNode *pNode; @@ -45,7 +46,9 @@ void syncIndexMgrCopyIfExist(SSyncIndexMgr * pNewIndex, SSyncIndexMgr void syncIndexMgrSetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t startTime); int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId); void syncIndexMgrSetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t recvTime); +void syncIndexMgrSetSentTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t sentTime); int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId); +int64_t syncIndexMgrGetSentTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId); void syncIndexMgrSetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, SyncTerm term); SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId); diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 1606f47592..7b71491f47 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -46,12 +46,12 @@ extern "C" { #define sLDebug(...) if (sDebugFlag & DEBUG_DEBUG) { taosPrintLongString("SYN ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); } #define sLTrace(...) if (sDebugFlag & DEBUG_TRACE) { taosPrintLongString("SYN ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); } -#define sNFatal(pNode, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintNodeLog("SYN FATAL ", DEBUG_FATAL, 255, pNode, __VA_ARGS__); } -#define sNError(pNode, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintNodeLog("SYN ERROR ", DEBUG_ERROR, 255, pNode, __VA_ARGS__); } -#define sNWarn(pNode, ...) if (sDebugFlag & DEBUG_WARN) { syncPrintNodeLog("SYN WARN ", DEBUG_WARN, 255, pNode, __VA_ARGS__); } -#define sNInfo(pNode, ...) if (sDebugFlag & DEBUG_INFO) { syncPrintNodeLog("SYN ", DEBUG_INFO, 255, pNode, __VA_ARGS__); } -#define sNDebug(pNode, ...) if (sDebugFlag & DEBUG_DEBUG) { syncPrintNodeLog("SYN ", DEBUG_DEBUG, sDebugFlag, pNode, __VA_ARGS__); } -#define sNTrace(pNode, ...) if (sDebugFlag & DEBUG_TRACE) { syncPrintNodeLog("SYN ", DEBUG_TRACE, sDebugFlag, pNode, __VA_ARGS__); } +#define sNFatal(pNode, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintNodeLog("SYN FATAL ", DEBUG_FATAL, 255, true, pNode, __VA_ARGS__); } +#define sNError(pNode, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintNodeLog("SYN ERROR ", DEBUG_ERROR, 255, true, pNode, __VA_ARGS__); } +#define sNWarn(pNode, ...) if (sDebugFlag & DEBUG_WARN) { syncPrintNodeLog("SYN WARN ", DEBUG_WARN, 255, true, pNode, __VA_ARGS__); } +#define sNInfo(pNode, ...) if (sDebugFlag & DEBUG_INFO) { syncPrintNodeLog("SYN ", DEBUG_INFO, 255, true, pNode, __VA_ARGS__); } +#define sNDebug(pNode, ...) if (sDebugFlag & DEBUG_DEBUG) { syncPrintNodeLog("SYN ", DEBUG_DEBUG, sDebugFlag, false, pNode, __VA_ARGS__); } +#define sNTrace(pNode, ...) if (sDebugFlag & DEBUG_TRACE) { syncPrintNodeLog("SYN ", DEBUG_TRACE, sDebugFlag, false, pNode, __VA_ARGS__); } #define sSFatal(pSender, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintSnapshotSenderLog("SYN FATAL ", DEBUG_FATAL, 255, pSender, __VA_ARGS__); } #define sSError(pSender, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintSnapshotSenderLog("SYN ERROR ", DEBUG_ERROR, 255, pSender, __VA_ARGS__); } @@ -85,7 +85,8 @@ void syncUtilMsgHtoN(void* msg); void syncUtilGenerateArbToken(int32_t nodeId, int32_t groupId, char* buf); -void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...); +void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool formatTime, SSyncNode* pNode, + const char* format, ...); void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender, const char* format, ...); void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver, diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index 4946912941..ec7354040f 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -155,6 +155,18 @@ void syncIndexMgrSetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, i DID(pRaftId), CID(pRaftId)); } +void syncIndexMgrSetSentTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t sentTime) { + for (int i = 0; i < pIndexMgr->totalReplicaNum; ++i) { + if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) { + (pIndexMgr->sentTimeArr)[i] = sentTime; + return; + } + } + + sError("vgId:%d, indexmgr set sent-time:%" PRId64 " for dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, sentTime, + DID(pRaftId), CID(pRaftId)); +} + int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) { for (int i = 0; i < pIndexMgr->totalReplicaNum; ++i) { if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) { @@ -168,6 +180,19 @@ int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId return TSDB_CODE_SYN_INVALID_ID; } +int64_t syncIndexMgrGetSentTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) { + for (int i = 0; i < pIndexMgr->totalReplicaNum; ++i) { + if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) { + int64_t recvTime = (pIndexMgr->sentTimeArr)[i]; + return recvTime; + } + } + + sError("vgId:%d, indexmgr get sent-time from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId), + CID(pRaftId)); + return TSDB_CODE_SYN_INVALID_ID; +} + void syncIndexMgrSetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, SyncTerm term) { for (int i = 0; i < pIndexMgr->totalReplicaNum; ++i) { if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) { diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 66c49834d8..d90f7d2521 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -108,7 +108,13 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI } int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) { - return syncNodeSendMsgById(destId, pSyncNode, pMsg); + SRaftId destIdTmp = *destId; + TAOS_CHECK_RETURN(syncNodeSendMsgById(destId, pSyncNode, pMsg)); + + int64_t tsMs = taosGetTimestampMs(); + syncIndexMgrSetSentTime(pSyncNode->pMatchIndex, &destIdTmp, tsMs); + + return TSDB_CODE_SUCCESS; } int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) { diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 3907bd5976..65c7f9761e 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -22,6 +22,7 @@ #include "syncRaftStore.h" #include "syncSnapshot.h" #include "tglobal.h" +#include "ttime.h" static void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) { int32_t len = tsnprintf(buf, bufLen, "{num:%d, as:%d, [", pCfg->replicaNum, pCfg->myIndex); @@ -108,13 +109,40 @@ void syncUtilGenerateArbToken(int32_t nodeId, int32_t groupId, char* buf) { (void)snprintf(buf, TSDB_ARB_TOKEN_SIZE, "d%d#g%d#%" PRId64 "#%d", nodeId, groupId, currentMs, randVal); } +static void syncPrintTime(bool formatTime, int32_t* len, int64_t tsMs, int32_t i, char* buf, int32_t bufLen) { + if (formatTime) { + char pBuf[TD_TIME_STR_LEN] = {0}; + if (tsMs > 0) { + if (taosFormatUtcTime(pBuf, TD_TIME_STR_LEN, tsMs, TSDB_TIME_PRECISION_MILLI) != 0) { + pBuf[0] = '\0'; + } + } + (*len) += tsnprintf(buf + (*len), bufLen - (*len), "%d:%s", i, pBuf); + } else { + (*len) += tsnprintf(buf + (*len), bufLen - (*len), "%d:%" PRId64, i, tsMs); + } +} + // for leader -static void syncHearbeatReplyTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { +static void syncHearbeatReplyTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen, bool formatTime) { int32_t len = 0; len += tsnprintf(buf + len, bufLen - len, "%s", "{"); for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) { int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->replicasId[i])); - len += tsnprintf(buf + len, bufLen - len, "%d:%" PRId64, i, tsMs); + syncPrintTime(formatTime, &len, tsMs, i, buf, bufLen); + if (i < pSyncNode->replicaNum - 1) { + len += tsnprintf(buf + len, bufLen - len, "%s", ","); + } + } + len += tsnprintf(buf + len, bufLen - len, "%s", "}"); +} + +static void syncSentHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen, bool formatTime) { + int32_t len = 0; + len += tsnprintf(buf + len, bufLen - len, "%s", "{"); + for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) { + int64_t tsMs = syncIndexMgrGetSentTime(pSyncNode->pMatchIndex, &(pSyncNode->replicasId[i])); + syncPrintTime(formatTime, &len, tsMs, i, buf, bufLen); if (i < pSyncNode->replicaNum - 1) { len += tsnprintf(buf + len, bufLen - len, "%s", ","); } @@ -123,12 +151,12 @@ static void syncHearbeatReplyTime2Str(SSyncNode* pSyncNode, char* buf, int32_t b } // for follower -static void syncHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { +static void syncHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen, bool formatTime) { int32_t len = 0; len += tsnprintf(buf + len, bufLen - len, "%s", "{"); for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) { int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->replicasId[i])); - len += tsnprintf(buf + len, bufLen - len, "%d:%" PRId64, i, tsMs); + syncPrintTime(formatTime, &len, tsMs, i, buf, bufLen); if (i < pSyncNode->replicaNum - 1) { len += tsnprintf(buf + len, bufLen - len, "%s", ","); } @@ -174,7 +202,8 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { len += tsnprintf(buf + len, bufLen - len, "%s", "}"); } -void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) { +void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool formatTime, SSyncNode* pNode, + const char* format, ...) { if (pNode == NULL || pNode->pLogStore == NULL) return; int64_t currentTerm = raftStoreGetTerm(pNode); @@ -206,10 +235,13 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo syncLogBufferStates2Str(pNode, bufferStatesStr, sizeof(bufferStatesStr)); char hbrTimeStr[256] = ""; - syncHearbeatReplyTime2Str(pNode, hbrTimeStr, sizeof(hbrTimeStr)); + syncHearbeatReplyTime2Str(pNode, hbrTimeStr, sizeof(hbrTimeStr), formatTime); char hbTimeStr[256] = ""; - syncHearbeatTime2Str(pNode, hbTimeStr, sizeof(hbTimeStr)); + syncHearbeatTime2Str(pNode, hbTimeStr, sizeof(hbTimeStr), formatTime); + + char sentHbTimeStr[512] = ""; + syncSentHearbeatTime2Str(pNode, sentHbTimeStr, sizeof(sentHbTimeStr), formatTime); char eventLog[512]; // {0}; va_list argpointer; @@ -235,14 +267,14 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo ", elect-times:%d, as-leader-times:%d, as-assigned-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, " "aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64 ", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64 - ", buffer:%s, repl-mgrs:%s, members:%s, hb:%s, hb-reply:%s, arb-token:%s, msg[sent:%d, recv:%d, slow-recev:%d]", + ", buffer:%s, repl-mgrs:%s, members:%s, send hb:%s, recv hb:%s, recv hb-reply:%s, arb-token:%s, msg[sent:%d, recv:%d, slow-recev:%d]", pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, pNode->assignedCommitIndex, appliedIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->electNum, pNode->becomeLeaderNum, pNode->becomeAssignedLeaderNum, pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems, pNode->snapshottingIndex, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, bufferStatesStr, - replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr, pNode->arbToken, pNode->sendCount, pNode->recvCount, + replMgrStatesStr, cfgStr, sentHbTimeStr, hbTimeStr, hbrTimeStr, pNode->arbToken, pNode->sendCount, pNode->recvCount, pNode->slowCount); } }