Merge pull request #28790 from taosdata/ehn/add-sync-msg-statisd

Ehn:add sync msg statis
This commit is contained in:
Shengliang Guan 2024-11-19 10:54:30 +08:00 committed by GitHub
commit 46e93820d9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 31 additions and 3 deletions

View File

@ -234,6 +234,10 @@ struct SSyncNode {
bool isStart;
// statis
int32_t sendCount;
int32_t recvCount;
int32_t slowCount;
};
// open/close --------------

View File

@ -39,6 +39,7 @@ typedef struct SSyncLogReplMgr {
int64_t peerStartTime;
int32_t retryBackoff;
int32_t peerId;
int32_t sendCount;
} SSyncLogReplMgr;
typedef struct SSyncLogBufEntry {

View File

@ -104,6 +104,11 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
goto _IGNORE;
}
int32_t nRef = atomic_fetch_add_32(&ths->recvCount, 1);
if (nRef <= 0) {
sError("vgId:%d, recv count is %d", ths->vgId, nRef);
}
int32_t code = syncBuildAppendEntriesReply(&rpcRsp, ths->vgId);
if (code != 0) {
syncLogRecvAppendEntries(ths, pMsg, "build rsp error");

View File

@ -88,6 +88,22 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI
pMsg->destId = *destRaftId;
TAOS_CHECK_RETURN(syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg));
int32_t nRef = 0;
if (pSyncNode != NULL) {
nRef = atomic_fetch_add_32(&pSyncNode->sendCount, 1);
if (nRef <= 0) {
sError("vgId:%d, send count is %d", pSyncNode->vgId, nRef);
}
}
SSyncLogReplMgr* mgr = syncNodeGetLogReplMgr(pSyncNode, (SRaftId*)destRaftId);
if (mgr != NULL) {
nRef = atomic_fetch_add_32(&mgr->sendCount, 1);
if (nRef <= 0) {
sError("vgId:%d, send count is %d", pSyncNode->vgId, nRef);
}
}
TAOS_RETURN(TSDB_CODE_SUCCESS);
}

View File

@ -152,8 +152,9 @@ static void syncLogReplStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLe
for (int32_t i = 0; i < pSyncNode->replicaNum; i++) {
SSyncLogReplMgr* pMgr = pSyncNode->logReplMgrs[i];
if (pMgr == NULL) break;
len += tsnprintf(buf + len, bufLen - len, "%d:%d [%" PRId64 ", %" PRId64 ", %" PRId64 "]", i, pMgr->restored,
len += tsnprintf(buf + len, bufLen - len, "%d:%d [%" PRId64 ", %" PRId64 ", %" PRId64 "] ", i, pMgr->restored,
pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex);
len += tsnprintf(buf + len, bufLen - len, "%d", pMgr->sendCount);
if (i + 1 < pSyncNode->replicaNum) {
len += tsnprintf(buf + len, bufLen - len, "%s", ", ");
}
@ -234,14 +235,15 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
", elect-times:%d, as-leader-times:%d, as-assigned-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, "
"aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64
", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64
", buffer:%s, repl-mgrs:%s, members:%s, hb:%s, hb-reply:%s, arb-token:%s",
", buffer:%s, repl-mgrs:%s, members:%s, hb:%s, hb-reply:%s, arb-token:%s, msg[sent:%d, recv:%d, slow-recev:%d]",
pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, pNode->assignedCommitIndex,
appliedIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex,
snapshot.lastApplyTerm, pNode->electNum, pNode->becomeLeaderNum, pNode->becomeAssignedLeaderNum,
pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems, pNode->snapshottingIndex,
pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish,
syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, bufferStatesStr,
replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr, pNode->arbToken);
replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr, pNode->arbToken, pNode->sendCount, pNode->recvCount,
pNode->slowCount);
}
}