From 12249bb51271356a9027eb7466f3479f1dc3b37c Mon Sep 17 00:00:00 2001 From: dmchen Date: Fri, 28 Feb 2025 16:02:18 +0800 Subject: [PATCH] fix:remove-lock-from-heartbeat-log --- source/libs/sync/inc/syncUtil.h | 9 +++ source/libs/sync/src/syncUtil.c | 133 +++++++++++++++++++++++++------- 2 files changed, 115 insertions(+), 27 deletions(-) diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index e7110c0964..95ea75731c 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -53,6 +53,13 @@ extern "C" { #define sNDebug(pNode, ...) if (sDebugFlag & DEBUG_DEBUG) { syncPrintNodeLog("SYN ", DEBUG_DEBUG, sDebugFlag, true, pNode, __VA_ARGS__); } #define sNTrace(pNode, ...) if (sDebugFlag & DEBUG_TRACE) { syncPrintNodeLog("SYN ", DEBUG_TRACE, sDebugFlag, true, pNode, __VA_ARGS__); } +#define sHFatal(pNode, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintHbLog("SYN FATAL ", DEBUG_FATAL, 255, true, pNode, __VA_ARGS__); } +#define sHError(pNode, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintHbLog("SYN ERROR ", DEBUG_ERROR, 255, true, pNode, __VA_ARGS__); } +#define sHWarn(pNode, ...) if (sDebugFlag & DEBUG_WARN) { syncPrintHbLog("SYN WARN ", DEBUG_WARN, 255, true, pNode, __VA_ARGS__); } +#define sHInfo(pNode, ...) if (sDebugFlag & DEBUG_INFO) { syncPrintHbLog("SYN ", DEBUG_INFO, 255, true, pNode, __VA_ARGS__); } +#define sHDebug(pNode, ...) if (sDebugFlag & DEBUG_DEBUG) { syncPrintHbLog("SYN ", DEBUG_DEBUG, sDebugFlag, true, pNode, __VA_ARGS__); } +#define sHTrace(pNode, ...) if (sDebugFlag & DEBUG_TRACE) { syncPrintHbLog("SYN ", DEBUG_TRACE, sDebugFlag, true, pNode, __VA_ARGS__); } + #define sSFatal(pSender, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintSnapshotSenderLog("SYN FATAL ", DEBUG_FATAL, 255, pSender, __VA_ARGS__); } #define sSError(pSender, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintSnapshotSenderLog("SYN ERROR ", DEBUG_ERROR, 255, pSender, __VA_ARGS__); } #define sSWarn(pSender, ...) if (sDebugFlag & DEBUG_WARN) { syncPrintSnapshotSenderLog("SYN WARN ", DEBUG_WARN, 255, pSender, __VA_ARGS__); } @@ -87,6 +94,8 @@ void syncUtilGenerateArbToken(int32_t nodeId, int32_t groupId, char* buf); void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool formatTime, SSyncNode* pNode, const char* format, ...); +void syncPrintHbLog(const char* flags, ELogLevel level, int32_t dflag, bool formatTime, SSyncNode* pNode, + const char* format, ...); void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender, const char* format, ...); void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver, diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 44e8d97aca..435586041a 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -253,11 +253,9 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool fo va_end(argpointer); int32_t aqItems = 0; - /* if (pNode != NULL && pNode->pFsm != NULL && pNode->pFsm->FpApplyQueueItems != NULL) { aqItems = pNode->pFsm->FpApplyQueueItems(pNode->pFsm); // vnodeApplyQueueItems } - */ // restore error code terrno = errCode; @@ -272,15 +270,71 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool fo ", elect-times:%d, as-leader-times:%d, as-assigned-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, " "aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64 ", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64 - ", buffer:%s, repl-mgrs:%s, members:%s, send hb:%s, recv hb:%s, recv hb-reply:%s, arb-token:%s, msg[sent:%d, recv:%d, slow-recev:%d]", + ", buffer:%s, repl-mgrs:%s, members:%s, send hb:%s, recv hb:%s, recv hb-reply:%s, arb-token:%s, msg[sent:%d, " + "recv:%d, slow-recv:%d]", pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, pNode->assignedCommitIndex, appliedIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->electNum, pNode->becomeLeaderNum, pNode->becomeAssignedLeaderNum, pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems, pNode->snapshottingIndex, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, bufferStatesStr, - replMgrStatesStr, cfgStr, sentHbTimeStr, hbTimeStr, hbrTimeStr, pNode->arbToken, pNode->sendCount, pNode->recvCount, - pNode->slowCount); + replMgrStatesStr, cfgStr, sentHbTimeStr, hbTimeStr, hbrTimeStr, pNode->arbToken, pNode->sendCount, + pNode->recvCount, pNode->slowCount); + } +} + +void syncPrintHbLog(const char* flags, ELogLevel level, int32_t dflag, bool formatTime, SSyncNode* pNode, + const char* format, ...) { + if (pNode == NULL || pNode->pLogStore == NULL) return; + int64_t currentTerm = raftStoreGetTerm(pNode); + + // save error code, otherwise it will be overwritten + int32_t errCode = terrno; + + int32_t cacheHit = pNode->pLogStore->cacheHit; + int32_t cacheMiss = pNode->pLogStore->cacheMiss; + + char cfgStr[1024] = ""; + syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr)); + + char replMgrStatesStr[1024] = ""; + syncLogReplStates2Str(pNode, replMgrStatesStr, sizeof(replMgrStatesStr)); + + char bufferStatesStr[256] = ""; + syncLogBufferStates2Str(pNode, bufferStatesStr, sizeof(bufferStatesStr)); + + char hbrTimeStr[256] = ""; + syncHearbeatReplyTime2Str(pNode, hbrTimeStr, sizeof(hbrTimeStr), formatTime); + + char hbTimeStr[256] = ""; + syncHearbeatTime2Str(pNode, hbTimeStr, sizeof(hbTimeStr), formatTime); + + char sentHbTimeStr[512] = ""; + syncSentHearbeatTime2Str(pNode, sentHbTimeStr, sizeof(sentHbTimeStr), formatTime); + + char eventLog[512]; // {0}; + va_list argpointer; + va_start(argpointer, format); + int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer); + va_end(argpointer); + + terrno = errCode; + + if (pNode != NULL) { + taosPrintLog( + flags, level, dflag, + "vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", assigned-index:%" PRId64 ", min:%" PRId64 + ", elect-times:%d, as-leader-times:%d, as-assigned-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, " + ", snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64 + ", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64 + ", buffer:%s, repl-mgrs:%s, members:%s, send hb:%s, recv hb:%s, recv hb-reply:%s, arb-token:%s, msg[sent:%d, " + "recv:%d, slow-recv:%d]", + pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, pNode->assignedCommitIndex, + pNode->minMatchIndex, pNode->electNum, pNode->becomeLeaderNum, pNode->becomeAssignedLeaderNum, + pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, pNode->snapshottingIndex, pNode->replicaNum, + pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), + pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, bufferStatesStr, replMgrStatesStr, cfgStr, + sentHbTimeStr, hbTimeStr, hbrTimeStr, pNode->arbToken, pNode->sendCount, pNode->recvCount, pNode->slowCount); } } @@ -414,12 +468,12 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed, int64_t execTime) { if (printX) { - sNTrace(pSyncNode, + sHTrace(pSyncNode, "send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 "}, x", DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp); } else { - sNTrace(pSyncNode, + sHTrace(pSyncNode, "send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 "}, timer-elapsed:%" PRId64 ", next-exec:%" PRId64, DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timerElapsed, @@ -428,29 +482,39 @@ void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool } void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff, const char* s) { - char pBuf[TD_TIME_STR_LEN] = {0}; - if (pMsg->timeStamp > 0) { - if (formatTimestampLocal(pBuf, pMsg->timeStamp, TSDB_TIME_PRECISION_MILLI) == NULL) { - pBuf[0] = '\0'; - } - } if (timeDiff > SYNC_HEARTBEAT_SLOW_MS) { pSyncNode->hbSlowNum++; - sNTrace(pSyncNode, + char pBuf[TD_TIME_STR_LEN] = {0}; + if (pMsg->timeStamp > 0) { + if (formatTimestampLocal(pBuf, pMsg->timeStamp, TSDB_TIME_PRECISION_MILLI) == NULL) { + pBuf[0] = '\0'; + } + } + + sHError(pSyncNode, "recv sync-heartbeat from dnode:%d slow(%d ms) {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64 ", ts:%s}, QID:%s, net elapsed:%" PRId64 "ms", - DID(&pMsg->srcId), SYNC_HEARTBEAT_SLOW_MS, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pBuf, s, timeDiff); + DID(&pMsg->srcId), SYNC_HEARTBEAT_SLOW_MS, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pBuf, s, + timeDiff); } else { - sNTrace(pSyncNode, - "recv sync-heartbeat from dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64 - ", ts:%s}, QID:%s, net elapsed:%" PRId64 "ms", - DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pBuf, s, timeDiff); + if (sDebugFlag & DEBUG_TRACE) { + char pBuf[TD_TIME_STR_LEN] = {0}; + if (pMsg->timeStamp > 0) { + if (formatTimestampLocal(pBuf, pMsg->timeStamp, TSDB_TIME_PRECISION_MILLI) == NULL) { + pBuf[0] = '\0'; + } + } + sHTrace(pSyncNode, + "recv sync-heartbeat from dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64 + ", ts:%s}, QID:%s, net elapsed:%" PRId64 "ms", + DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pBuf, s, timeDiff); + } } } void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { - sNTrace(pSyncNode, "send sync-heartbeat-reply from dnode:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s", + sHTrace(pSyncNode, "send sync-heartbeat-reply from dnode:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s", DID(&pMsg->destId), pMsg->term, pMsg->timeStamp, s); } @@ -458,14 +522,29 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p if (timeDiff > SYNC_HEARTBEAT_REPLY_SLOW_MS) { pSyncNode->hbrSlowNum++; - sNTrace(pSyncNode, - "recv sync-heartbeat-reply from dnode:%d slow {term:%" PRId64 ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64, - DID(&pMsg->srcId), pMsg->term, pMsg->timeStamp, s, timeDiff); - } + char pBuf[TD_TIME_STR_LEN] = {0}; + if (pMsg->timeStamp > 0) { + if (formatTimestampLocal(pBuf, pMsg->timeStamp, TSDB_TIME_PRECISION_MILLI) == NULL) { + pBuf[0] = '\0'; + } + } - sNTrace(pSyncNode, - "recv sync-heartbeat-reply from dnode:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64, - DID(&pMsg->srcId), pMsg->term, pMsg->timeStamp, s, timeDiff); + sHError(pSyncNode, + "recv sync-heartbeat-reply from dnode:%d slow(%d ms) {term:%" PRId64 ", ts:%s}, %s, net elapsed:%" PRId64, + DID(&pMsg->srcId), SYNC_HEARTBEAT_REPLY_SLOW_MS, pMsg->term, pBuf, s, timeDiff); + } else { + if (sDebugFlag & DEBUG_TRACE) { + char pBuf[TD_TIME_STR_LEN] = {0}; + if (pMsg->timeStamp > 0) { + if (formatTimestampLocal(pBuf, pMsg->timeStamp, TSDB_TIME_PRECISION_MILLI) == NULL) { + pBuf[0] = '\0'; + } + } + sHTrace(pSyncNode, + "recv sync-heartbeat-reply from dnode:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64, + DID(&pMsg->srcId), pMsg->term, pMsg->timeStamp, s, timeDiff); + } + } } void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {