fix:remove-lock-from-heartbeat-log
This commit is contained in:
parent
aa7cb880bb
commit
12249bb512
|
@ -53,6 +53,13 @@ extern "C" {
|
||||||
#define sNDebug(pNode, ...) if (sDebugFlag & DEBUG_DEBUG) { syncPrintNodeLog("SYN ", DEBUG_DEBUG, sDebugFlag, true, pNode, __VA_ARGS__); }
|
#define sNDebug(pNode, ...) if (sDebugFlag & DEBUG_DEBUG) { syncPrintNodeLog("SYN ", DEBUG_DEBUG, sDebugFlag, true, pNode, __VA_ARGS__); }
|
||||||
#define sNTrace(pNode, ...) if (sDebugFlag & DEBUG_TRACE) { syncPrintNodeLog("SYN ", DEBUG_TRACE, sDebugFlag, true, pNode, __VA_ARGS__); }
|
#define sNTrace(pNode, ...) if (sDebugFlag & DEBUG_TRACE) { syncPrintNodeLog("SYN ", DEBUG_TRACE, sDebugFlag, true, pNode, __VA_ARGS__); }
|
||||||
|
|
||||||
|
#define sHFatal(pNode, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintHbLog("SYN FATAL ", DEBUG_FATAL, 255, true, pNode, __VA_ARGS__); }
|
||||||
|
#define sHError(pNode, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintHbLog("SYN ERROR ", DEBUG_ERROR, 255, true, pNode, __VA_ARGS__); }
|
||||||
|
#define sHWarn(pNode, ...) if (sDebugFlag & DEBUG_WARN) { syncPrintHbLog("SYN WARN ", DEBUG_WARN, 255, true, pNode, __VA_ARGS__); }
|
||||||
|
#define sHInfo(pNode, ...) if (sDebugFlag & DEBUG_INFO) { syncPrintHbLog("SYN ", DEBUG_INFO, 255, true, pNode, __VA_ARGS__); }
|
||||||
|
#define sHDebug(pNode, ...) if (sDebugFlag & DEBUG_DEBUG) { syncPrintHbLog("SYN ", DEBUG_DEBUG, sDebugFlag, true, pNode, __VA_ARGS__); }
|
||||||
|
#define sHTrace(pNode, ...) if (sDebugFlag & DEBUG_TRACE) { syncPrintHbLog("SYN ", DEBUG_TRACE, sDebugFlag, true, pNode, __VA_ARGS__); }
|
||||||
|
|
||||||
#define sSFatal(pSender, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintSnapshotSenderLog("SYN FATAL ", DEBUG_FATAL, 255, pSender, __VA_ARGS__); }
|
#define sSFatal(pSender, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintSnapshotSenderLog("SYN FATAL ", DEBUG_FATAL, 255, pSender, __VA_ARGS__); }
|
||||||
#define sSError(pSender, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintSnapshotSenderLog("SYN ERROR ", DEBUG_ERROR, 255, pSender, __VA_ARGS__); }
|
#define sSError(pSender, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintSnapshotSenderLog("SYN ERROR ", DEBUG_ERROR, 255, pSender, __VA_ARGS__); }
|
||||||
#define sSWarn(pSender, ...) if (sDebugFlag & DEBUG_WARN) { syncPrintSnapshotSenderLog("SYN WARN ", DEBUG_WARN, 255, pSender, __VA_ARGS__); }
|
#define sSWarn(pSender, ...) if (sDebugFlag & DEBUG_WARN) { syncPrintSnapshotSenderLog("SYN WARN ", DEBUG_WARN, 255, pSender, __VA_ARGS__); }
|
||||||
|
@ -87,6 +94,8 @@ void syncUtilGenerateArbToken(int32_t nodeId, int32_t groupId, char* buf);
|
||||||
|
|
||||||
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool formatTime, SSyncNode* pNode,
|
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool formatTime, SSyncNode* pNode,
|
||||||
const char* format, ...);
|
const char* format, ...);
|
||||||
|
void syncPrintHbLog(const char* flags, ELogLevel level, int32_t dflag, bool formatTime, SSyncNode* pNode,
|
||||||
|
const char* format, ...);
|
||||||
void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender,
|
void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender,
|
||||||
const char* format, ...);
|
const char* format, ...);
|
||||||
void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
|
void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
|
||||||
|
|
|
@ -253,11 +253,9 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool fo
|
||||||
va_end(argpointer);
|
va_end(argpointer);
|
||||||
|
|
||||||
int32_t aqItems = 0;
|
int32_t aqItems = 0;
|
||||||
/*
|
|
||||||
if (pNode != NULL && pNode->pFsm != NULL && pNode->pFsm->FpApplyQueueItems != NULL) {
|
if (pNode != NULL && pNode->pFsm != NULL && pNode->pFsm->FpApplyQueueItems != NULL) {
|
||||||
aqItems = pNode->pFsm->FpApplyQueueItems(pNode->pFsm); // vnodeApplyQueueItems
|
aqItems = pNode->pFsm->FpApplyQueueItems(pNode->pFsm); // vnodeApplyQueueItems
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
// restore error code
|
// restore error code
|
||||||
terrno = errCode;
|
terrno = errCode;
|
||||||
|
@ -272,15 +270,71 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool fo
|
||||||
", elect-times:%d, as-leader-times:%d, as-assigned-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, "
|
", elect-times:%d, as-leader-times:%d, as-assigned-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, "
|
||||||
"aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64
|
"aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64
|
||||||
", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64
|
", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64
|
||||||
", buffer:%s, repl-mgrs:%s, members:%s, send hb:%s, recv hb:%s, recv hb-reply:%s, arb-token:%s, msg[sent:%d, recv:%d, slow-recev:%d]",
|
", buffer:%s, repl-mgrs:%s, members:%s, send hb:%s, recv hb:%s, recv hb-reply:%s, arb-token:%s, msg[sent:%d, "
|
||||||
|
"recv:%d, slow-recv:%d]",
|
||||||
pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, pNode->assignedCommitIndex,
|
pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, pNode->assignedCommitIndex,
|
||||||
appliedIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex,
|
appliedIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex,
|
||||||
snapshot.lastApplyTerm, pNode->electNum, pNode->becomeLeaderNum, pNode->becomeAssignedLeaderNum,
|
snapshot.lastApplyTerm, pNode->electNum, pNode->becomeLeaderNum, pNode->becomeAssignedLeaderNum,
|
||||||
pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems, pNode->snapshottingIndex,
|
pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems, pNode->snapshottingIndex,
|
||||||
pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish,
|
pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish,
|
||||||
syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, bufferStatesStr,
|
syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, bufferStatesStr,
|
||||||
replMgrStatesStr, cfgStr, sentHbTimeStr, hbTimeStr, hbrTimeStr, pNode->arbToken, pNode->sendCount, pNode->recvCount,
|
replMgrStatesStr, cfgStr, sentHbTimeStr, hbTimeStr, hbrTimeStr, pNode->arbToken, pNode->sendCount,
|
||||||
pNode->slowCount);
|
pNode->recvCount, pNode->slowCount);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncPrintHbLog(const char* flags, ELogLevel level, int32_t dflag, bool formatTime, SSyncNode* pNode,
|
||||||
|
const char* format, ...) {
|
||||||
|
if (pNode == NULL || pNode->pLogStore == NULL) return;
|
||||||
|
int64_t currentTerm = raftStoreGetTerm(pNode);
|
||||||
|
|
||||||
|
// save error code, otherwise it will be overwritten
|
||||||
|
int32_t errCode = terrno;
|
||||||
|
|
||||||
|
int32_t cacheHit = pNode->pLogStore->cacheHit;
|
||||||
|
int32_t cacheMiss = pNode->pLogStore->cacheMiss;
|
||||||
|
|
||||||
|
char cfgStr[1024] = "";
|
||||||
|
syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr));
|
||||||
|
|
||||||
|
char replMgrStatesStr[1024] = "";
|
||||||
|
syncLogReplStates2Str(pNode, replMgrStatesStr, sizeof(replMgrStatesStr));
|
||||||
|
|
||||||
|
char bufferStatesStr[256] = "";
|
||||||
|
syncLogBufferStates2Str(pNode, bufferStatesStr, sizeof(bufferStatesStr));
|
||||||
|
|
||||||
|
char hbrTimeStr[256] = "";
|
||||||
|
syncHearbeatReplyTime2Str(pNode, hbrTimeStr, sizeof(hbrTimeStr), formatTime);
|
||||||
|
|
||||||
|
char hbTimeStr[256] = "";
|
||||||
|
syncHearbeatTime2Str(pNode, hbTimeStr, sizeof(hbTimeStr), formatTime);
|
||||||
|
|
||||||
|
char sentHbTimeStr[512] = "";
|
||||||
|
syncSentHearbeatTime2Str(pNode, sentHbTimeStr, sizeof(sentHbTimeStr), formatTime);
|
||||||
|
|
||||||
|
char eventLog[512]; // {0};
|
||||||
|
va_list argpointer;
|
||||||
|
va_start(argpointer, format);
|
||||||
|
int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
|
||||||
|
va_end(argpointer);
|
||||||
|
|
||||||
|
terrno = errCode;
|
||||||
|
|
||||||
|
if (pNode != NULL) {
|
||||||
|
taosPrintLog(
|
||||||
|
flags, level, dflag,
|
||||||
|
"vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", assigned-index:%" PRId64 ", min:%" PRId64
|
||||||
|
", elect-times:%d, as-leader-times:%d, as-assigned-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, "
|
||||||
|
", snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64
|
||||||
|
", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64
|
||||||
|
", buffer:%s, repl-mgrs:%s, members:%s, send hb:%s, recv hb:%s, recv hb-reply:%s, arb-token:%s, msg[sent:%d, "
|
||||||
|
"recv:%d, slow-recv:%d]",
|
||||||
|
pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, pNode->assignedCommitIndex,
|
||||||
|
pNode->minMatchIndex, pNode->electNum, pNode->becomeLeaderNum, pNode->becomeAssignedLeaderNum,
|
||||||
|
pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, pNode->snapshottingIndex, pNode->replicaNum,
|
||||||
|
pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode),
|
||||||
|
pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, bufferStatesStr, replMgrStatesStr, cfgStr,
|
||||||
|
sentHbTimeStr, hbTimeStr, hbrTimeStr, pNode->arbToken, pNode->sendCount, pNode->recvCount, pNode->slowCount);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -414,12 +468,12 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
|
||||||
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed,
|
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed,
|
||||||
int64_t execTime) {
|
int64_t execTime) {
|
||||||
if (printX) {
|
if (printX) {
|
||||||
sNTrace(pSyncNode,
|
sHTrace(pSyncNode,
|
||||||
"send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
|
"send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
|
||||||
", ts:%" PRId64 "}, x",
|
", ts:%" PRId64 "}, x",
|
||||||
DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp);
|
DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp);
|
||||||
} else {
|
} else {
|
||||||
sNTrace(pSyncNode,
|
sHTrace(pSyncNode,
|
||||||
"send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
|
"send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
|
||||||
", ts:%" PRId64 "}, timer-elapsed:%" PRId64 ", next-exec:%" PRId64,
|
", ts:%" PRId64 "}, timer-elapsed:%" PRId64 ", next-exec:%" PRId64,
|
||||||
DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timerElapsed,
|
DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timerElapsed,
|
||||||
|
@ -428,29 +482,39 @@ void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff, const char* s) {
|
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff, const char* s) {
|
||||||
|
if (timeDiff > SYNC_HEARTBEAT_SLOW_MS) {
|
||||||
|
pSyncNode->hbSlowNum++;
|
||||||
|
|
||||||
char pBuf[TD_TIME_STR_LEN] = {0};
|
char pBuf[TD_TIME_STR_LEN] = {0};
|
||||||
if (pMsg->timeStamp > 0) {
|
if (pMsg->timeStamp > 0) {
|
||||||
if (formatTimestampLocal(pBuf, pMsg->timeStamp, TSDB_TIME_PRECISION_MILLI) == NULL) {
|
if (formatTimestampLocal(pBuf, pMsg->timeStamp, TSDB_TIME_PRECISION_MILLI) == NULL) {
|
||||||
pBuf[0] = '\0';
|
pBuf[0] = '\0';
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (timeDiff > SYNC_HEARTBEAT_SLOW_MS) {
|
|
||||||
pSyncNode->hbSlowNum++;
|
|
||||||
|
|
||||||
sNTrace(pSyncNode,
|
sHError(pSyncNode,
|
||||||
"recv sync-heartbeat from dnode:%d slow(%d ms) {term:%" PRId64 ", commit-index:%" PRId64
|
"recv sync-heartbeat from dnode:%d slow(%d ms) {term:%" PRId64 ", commit-index:%" PRId64
|
||||||
", min-match:%" PRId64 ", ts:%s}, QID:%s, net elapsed:%" PRId64 "ms",
|
", min-match:%" PRId64 ", ts:%s}, QID:%s, net elapsed:%" PRId64 "ms",
|
||||||
DID(&pMsg->srcId), SYNC_HEARTBEAT_SLOW_MS, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pBuf, s, timeDiff);
|
DID(&pMsg->srcId), SYNC_HEARTBEAT_SLOW_MS, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pBuf, s,
|
||||||
|
timeDiff);
|
||||||
} else {
|
} else {
|
||||||
sNTrace(pSyncNode,
|
if (sDebugFlag & DEBUG_TRACE) {
|
||||||
|
char pBuf[TD_TIME_STR_LEN] = {0};
|
||||||
|
if (pMsg->timeStamp > 0) {
|
||||||
|
if (formatTimestampLocal(pBuf, pMsg->timeStamp, TSDB_TIME_PRECISION_MILLI) == NULL) {
|
||||||
|
pBuf[0] = '\0';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sHTrace(pSyncNode,
|
||||||
"recv sync-heartbeat from dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
|
"recv sync-heartbeat from dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
|
||||||
", ts:%s}, QID:%s, net elapsed:%" PRId64 "ms",
|
", ts:%s}, QID:%s, net elapsed:%" PRId64 "ms",
|
||||||
DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pBuf, s, timeDiff);
|
DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pBuf, s, timeDiff);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
|
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
|
||||||
sNTrace(pSyncNode, "send sync-heartbeat-reply from dnode:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s",
|
sHTrace(pSyncNode, "send sync-heartbeat-reply from dnode:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s",
|
||||||
DID(&pMsg->destId), pMsg->term, pMsg->timeStamp, s);
|
DID(&pMsg->destId), pMsg->term, pMsg->timeStamp, s);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -458,15 +522,30 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p
|
||||||
if (timeDiff > SYNC_HEARTBEAT_REPLY_SLOW_MS) {
|
if (timeDiff > SYNC_HEARTBEAT_REPLY_SLOW_MS) {
|
||||||
pSyncNode->hbrSlowNum++;
|
pSyncNode->hbrSlowNum++;
|
||||||
|
|
||||||
sNTrace(pSyncNode,
|
char pBuf[TD_TIME_STR_LEN] = {0};
|
||||||
"recv sync-heartbeat-reply from dnode:%d slow {term:%" PRId64 ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64,
|
if (pMsg->timeStamp > 0) {
|
||||||
DID(&pMsg->srcId), pMsg->term, pMsg->timeStamp, s, timeDiff);
|
if (formatTimestampLocal(pBuf, pMsg->timeStamp, TSDB_TIME_PRECISION_MILLI) == NULL) {
|
||||||
|
pBuf[0] = '\0';
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sNTrace(pSyncNode,
|
sHError(pSyncNode,
|
||||||
|
"recv sync-heartbeat-reply from dnode:%d slow(%d ms) {term:%" PRId64 ", ts:%s}, %s, net elapsed:%" PRId64,
|
||||||
|
DID(&pMsg->srcId), SYNC_HEARTBEAT_REPLY_SLOW_MS, pMsg->term, pBuf, s, timeDiff);
|
||||||
|
} else {
|
||||||
|
if (sDebugFlag & DEBUG_TRACE) {
|
||||||
|
char pBuf[TD_TIME_STR_LEN] = {0};
|
||||||
|
if (pMsg->timeStamp > 0) {
|
||||||
|
if (formatTimestampLocal(pBuf, pMsg->timeStamp, TSDB_TIME_PRECISION_MILLI) == NULL) {
|
||||||
|
pBuf[0] = '\0';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sHTrace(pSyncNode,
|
||||||
"recv sync-heartbeat-reply from dnode:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64,
|
"recv sync-heartbeat-reply from dnode:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64,
|
||||||
DID(&pMsg->srcId), pMsg->term, pMsg->timeStamp, s, timeDiff);
|
DID(&pMsg->srcId), pMsg->term, pMsg->timeStamp, s, timeDiff);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
|
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
|
||||||
sNDebug(pSyncNode,
|
sNDebug(pSyncNode,
|
||||||
|
|
Loading…
Reference in New Issue