From e57a914f00816e93fa41fc578e2115dd544f2e52 Mon Sep 17 00:00:00 2001 From: dmchen Date: Fri, 15 Nov 2024 16:34:04 +0800 Subject: [PATCH 1/2] ehn/add-sync-msg-statisd --- source/libs/sync/inc/syncInt.h | 4 ++++ source/libs/sync/inc/syncPipeline.h | 1 + source/libs/sync/src/syncAppendEntries.c | 5 +++++ source/libs/sync/src/syncReplication.c | 11 +++++++++++ source/libs/sync/src/syncUtil.c | 8 +++++--- 5 files changed, 26 insertions(+), 3 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 0b653ddbe9..b19d1184a7 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -234,6 +234,10 @@ struct SSyncNode { bool isStart; + // statis + int32_t sendCount; + int32_t recvCount; + int32_t slowCount; }; // open/close -------------- diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index 427a3690f2..eeb24d2f16 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -39,6 +39,7 @@ typedef struct SSyncLogReplMgr { int64_t peerStartTime; int32_t retryBackoff; int32_t peerId; + int32_t sendCount; } SSyncLogReplMgr; typedef struct SSyncLogBufEntry { diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 0345880874..682d3f9e88 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -104,6 +104,11 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { goto _IGNORE; } + int32_t nRef = atomic_fetch_add_32(&ths->recvCount, 1); + if (nRef <= 0) { + sError("vgId:%d, recv count is %d", ths->vgId, nRef); + } + int32_t code = syncBuildAppendEntriesReply(&rpcRsp, ths->vgId); if (code != 0) { syncLogRecvAppendEntries(ths, pMsg, "build rsp error"); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 247b5624c3..a900b4707d 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -88,6 +88,17 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI pMsg->destId = *destRaftId; TAOS_CHECK_RETURN(syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg)); + int32_t nRef = atomic_fetch_add_32(&pSyncNode->sendCount, 1); + if (nRef <= 0) { + sError("vgId:%d, send count is %d", pSyncNode->vgId, nRef); + } + + SSyncLogReplMgr* mgr = syncNodeGetLogReplMgr(pSyncNode, (SRaftId*)destRaftId); + nRef = atomic_fetch_add_32(&mgr->sendCount, 1); + if (nRef <= 0) { + sError("vgId:%d, send count is %d", pSyncNode->vgId, nRef); + } + TAOS_RETURN(TSDB_CODE_SUCCESS); } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 9058b6ecef..3907bd5976 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -152,8 +152,9 @@ static void syncLogReplStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLe for (int32_t i = 0; i < pSyncNode->replicaNum; i++) { SSyncLogReplMgr* pMgr = pSyncNode->logReplMgrs[i]; if (pMgr == NULL) break; - len += tsnprintf(buf + len, bufLen - len, "%d:%d [%" PRId64 ", %" PRId64 ", %" PRId64 "]", i, pMgr->restored, + len += tsnprintf(buf + len, bufLen - len, "%d:%d [%" PRId64 ", %" PRId64 ", %" PRId64 "] ", i, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex); + len += tsnprintf(buf + len, bufLen - len, "%d", pMgr->sendCount); if (i + 1 < pSyncNode->replicaNum) { len += tsnprintf(buf + len, bufLen - len, "%s", ", "); } @@ -234,14 +235,15 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo ", elect-times:%d, as-leader-times:%d, as-assigned-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, " "aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64 ", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64 - ", buffer:%s, repl-mgrs:%s, members:%s, hb:%s, hb-reply:%s, arb-token:%s", + ", buffer:%s, repl-mgrs:%s, members:%s, hb:%s, hb-reply:%s, arb-token:%s, msg[sent:%d, recv:%d, slow-recev:%d]", pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, pNode->assignedCommitIndex, appliedIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->electNum, pNode->becomeLeaderNum, pNode->becomeAssignedLeaderNum, pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems, pNode->snapshottingIndex, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, bufferStatesStr, - replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr, pNode->arbToken); + replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr, pNode->arbToken, pNode->sendCount, pNode->recvCount, + pNode->slowCount); } } From daa3c6bbf4b8b2eb260776253317cc5754291275 Mon Sep 17 00:00:00 2001 From: dmchen Date: Mon, 18 Nov 2024 15:57:06 +0800 Subject: [PATCH 2/2] ehn/add-sync-msg-statis --- source/libs/sync/src/syncReplication.c | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index a900b4707d..66c49834d8 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -88,15 +88,20 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI pMsg->destId = *destRaftId; TAOS_CHECK_RETURN(syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg)); - int32_t nRef = atomic_fetch_add_32(&pSyncNode->sendCount, 1); - if (nRef <= 0) { - sError("vgId:%d, send count is %d", pSyncNode->vgId, nRef); + int32_t nRef = 0; + if (pSyncNode != NULL) { + nRef = atomic_fetch_add_32(&pSyncNode->sendCount, 1); + if (nRef <= 0) { + sError("vgId:%d, send count is %d", pSyncNode->vgId, nRef); + } } SSyncLogReplMgr* mgr = syncNodeGetLogReplMgr(pSyncNode, (SRaftId*)destRaftId); - nRef = atomic_fetch_add_32(&mgr->sendCount, 1); - if (nRef <= 0) { - sError("vgId:%d, send count is %d", pSyncNode->vgId, nRef); + if (mgr != NULL) { + nRef = atomic_fetch_add_32(&mgr->sendCount, 1); + if (nRef <= 0) { + sError("vgId:%d, send count is %d", pSyncNode->vgId, nRef); + } } TAOS_RETURN(TSDB_CODE_SUCCESS);