refactor(sync): add trace log
This commit is contained in:
parent
862a51e6ee
commit
93efefcbfc
|
@ -70,6 +70,7 @@ typedef struct SSyncTimer {
|
||||||
uint64_t logicClock;
|
uint64_t logicClock;
|
||||||
uint64_t counter;
|
uint64_t counter;
|
||||||
int32_t timerMS;
|
int32_t timerMS;
|
||||||
|
int64_t timeStamp;
|
||||||
SRaftId destId;
|
SRaftId destId;
|
||||||
int64_t hbDataRid;
|
int64_t hbDataRid;
|
||||||
} SSyncTimer;
|
} SSyncTimer;
|
||||||
|
|
|
@ -35,6 +35,7 @@ typedef struct SyncTimeout {
|
||||||
ESyncTimeoutType timeoutType;
|
ESyncTimeoutType timeoutType;
|
||||||
uint64_t logicClock;
|
uint64_t logicClock;
|
||||||
int32_t timerMS;
|
int32_t timerMS;
|
||||||
|
int64_t timeStamp;
|
||||||
void* data; // need optimized
|
void* data; // need optimized
|
||||||
} SyncTimeout;
|
} SyncTimeout;
|
||||||
|
|
||||||
|
|
|
@ -48,7 +48,7 @@ extern "C" {
|
||||||
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
||||||
|
|
||||||
int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode);
|
int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode);
|
||||||
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpcMsg* pMsg);
|
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpcMsg* pMsg, const char* debugStr);
|
||||||
|
|
||||||
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
|
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
|
||||||
int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot);
|
int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot);
|
||||||
|
|
|
@ -691,6 +691,7 @@ static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRa
|
||||||
pSyncTimer->timerMS = pSyncNode->hbBaseLine;
|
pSyncTimer->timerMS = pSyncNode->hbBaseLine;
|
||||||
pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
|
pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
|
||||||
pSyncTimer->destId = destId;
|
pSyncTimer->destId = destId;
|
||||||
|
pSyncTimer->timeStamp = taosGetTimestampMs();
|
||||||
atomic_store_64(&pSyncTimer->logicClock, 0);
|
atomic_store_64(&pSyncTimer->logicClock, 0);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -704,6 +705,7 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
|
||||||
pData->rid = syncHbTimerDataAdd(pData);
|
pData->rid = syncHbTimerDataAdd(pData);
|
||||||
}
|
}
|
||||||
pSyncTimer->hbDataRid = pData->rid;
|
pSyncTimer->hbDataRid = pData->rid;
|
||||||
|
pSyncTimer->timeStamp = taosGetTimestampMs();
|
||||||
|
|
||||||
pData->syncNodeRid = pSyncNode->rid;
|
pData->syncNodeRid = pSyncNode->rid;
|
||||||
pData->pTimer = pSyncTimer;
|
pData->pTimer = pSyncTimer;
|
||||||
|
@ -1897,7 +1899,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
sTrace("enqueue ping msg");
|
// sTrace("enqueue ping msg");
|
||||||
code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
|
code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
sError("failed to sync enqueue ping msg since %s", terrstr());
|
sError("failed to sync enqueue ping msg since %s", terrstr());
|
||||||
|
@ -2041,8 +2043,15 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
||||||
pSyncMsg->privateTerm = 0;
|
pSyncMsg->privateTerm = 0;
|
||||||
pSyncMsg->timeStamp = taosGetTimestampMs();
|
pSyncMsg->timeStamp = taosGetTimestampMs();
|
||||||
|
|
||||||
|
// update reset time
|
||||||
|
int64_t tsNow = taosGetTimestampMs();
|
||||||
|
int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
|
||||||
|
pSyncTimer->timeStamp = tsNow;
|
||||||
|
char logBuf[64];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "timer-elapsed:%" PRId64, timerElapsed);
|
||||||
|
|
||||||
// send msg
|
// send msg
|
||||||
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
|
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg, logBuf);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
|
sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
|
||||||
|
@ -2151,8 +2160,9 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
SyncHeartbeat* pMsg = pRpcMsg->pCont;
|
SyncHeartbeat* pMsg = pRpcMsg->pCont;
|
||||||
|
|
||||||
int64_t tsMs = taosGetTimestampMs();
|
int64_t tsMs = taosGetTimestampMs();
|
||||||
|
int64_t timeDiff = tsMs - pMsg->timeStamp;
|
||||||
char buf[128];
|
char buf[128];
|
||||||
snprintf(buf, sizeof(buf), "recv local time:%" PRId64, tsMs);
|
snprintf(buf, sizeof(buf), "net elapsed:%" PRId64, timeDiff);
|
||||||
syncLogRecvHeartbeat(ths, pMsg, buf);
|
syncLogRecvHeartbeat(ths, pMsg, buf);
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
|
@ -2229,8 +2239,9 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
|
SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
|
||||||
|
|
||||||
int64_t tsMs = taosGetTimestampMs();
|
int64_t tsMs = taosGetTimestampMs();
|
||||||
|
int64_t timeDiff = tsMs - pMsg->timeStamp;
|
||||||
char buf[128];
|
char buf[128];
|
||||||
snprintf(buf, sizeof(buf), "recv local time:%" PRId64, tsMs);
|
snprintf(buf, sizeof(buf), "net elapsed:%" PRId64, timeDiff);
|
||||||
syncLogRecvHeartbeatReply(ths, pMsg, buf);
|
syncLogRecvHeartbeatReply(ths, pMsg, buf);
|
||||||
|
|
||||||
// update last reply time, make decision whether the other node is alive or not
|
// update last reply time, make decision whether the other node is alive or not
|
||||||
|
|
|
@ -35,6 +35,7 @@ int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t l
|
||||||
pTimeout->timeoutType = timeoutType;
|
pTimeout->timeoutType = timeoutType;
|
||||||
pTimeout->logicClock = logicClock;
|
pTimeout->logicClock = logicClock;
|
||||||
pTimeout->timerMS = timerMS;
|
pTimeout->timerMS = timerMS;
|
||||||
|
pTimeout->timeStamp = taosGetTimestampMs();
|
||||||
pTimeout->data = pNode;
|
pTimeout->data = pNode;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -207,8 +207,8 @@ int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* dest
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) {
|
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg, const char* debugStr) {
|
||||||
syncLogSendHeartbeat(pSyncNode, pMsg->pCont, "");
|
syncLogSendHeartbeat(pSyncNode, pMsg->pCont, debugStr);
|
||||||
return syncNodeSendMsgById(destId, pSyncNode, pMsg);
|
return syncNodeSendMsgById(destId, pSyncNode, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -231,7 +231,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
|
||||||
pSyncMsg->timeStamp = ts;
|
pSyncMsg->timeStamp = ts;
|
||||||
|
|
||||||
// send msg
|
// send msg
|
||||||
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
|
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg, "x");
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -396,8 +396,11 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
|
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
|
||||||
sNTrace(pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, data:%p}, %s",
|
int64_t tsNow = taosGetTimestampMs();
|
||||||
syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s);
|
int64_t timeDIff = tsNow - pMsg->timeStamp;
|
||||||
|
sNTrace(
|
||||||
|
pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, ts:%" PRId64 ", elapsed:%" PRId64 ", data:%p}, %s",
|
||||||
|
syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->timeStamp, timeDIff, pMsg->data, s);
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
|
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
|
||||||
|
|
Loading…
Reference in New Issue