Merge pull request #18467 from taosdata/fix/TD-20050_2
refactor: adjust syncLogHeartbeat
This commit is contained in:
commit
a98b8f9397
|
@ -48,7 +48,7 @@ extern "C" {
|
||||||
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
||||||
|
|
||||||
int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode);
|
int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode);
|
||||||
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpcMsg* pMsg, const char* debugStr);
|
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpcMsg* pMsg);
|
||||||
|
|
||||||
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
|
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
|
||||||
int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot);
|
int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot);
|
||||||
|
|
|
@ -94,11 +94,11 @@ void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const c
|
||||||
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
|
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
|
||||||
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
|
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
|
||||||
|
|
||||||
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s);
|
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed);
|
||||||
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s);
|
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff);
|
||||||
|
|
||||||
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
|
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
|
||||||
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
|
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, int64_t timeDiff);
|
||||||
|
|
||||||
void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s);
|
void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s);
|
||||||
void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s);
|
void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s);
|
||||||
|
@ -115,7 +115,7 @@ void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMs
|
||||||
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);
|
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);
|
||||||
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);
|
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);
|
||||||
|
|
||||||
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);
|
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, int32_t voteGranted, const char* s);
|
||||||
void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s);
|
void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s);
|
||||||
|
|
||||||
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);
|
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);
|
||||||
|
|
|
@ -2034,6 +2034,11 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
(void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);
|
(void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);
|
||||||
|
|
||||||
|
// update reset time
|
||||||
|
int64_t tsNow = taosGetTimestampMs();
|
||||||
|
int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
|
||||||
|
pSyncTimer->timeStamp = tsNow;
|
||||||
|
|
||||||
SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
|
SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
|
||||||
pSyncMsg->srcId = pSyncNode->myRaftId;
|
pSyncMsg->srcId = pSyncNode->myRaftId;
|
||||||
pSyncMsg->destId = pData->destId;
|
pSyncMsg->destId = pData->destId;
|
||||||
|
@ -2041,17 +2046,11 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
||||||
pSyncMsg->commitIndex = pSyncNode->commitIndex;
|
pSyncMsg->commitIndex = pSyncNode->commitIndex;
|
||||||
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
|
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
|
||||||
pSyncMsg->privateTerm = 0;
|
pSyncMsg->privateTerm = 0;
|
||||||
pSyncMsg->timeStamp = taosGetTimestampMs();
|
pSyncMsg->timeStamp = tsNow;
|
||||||
|
|
||||||
// update reset time
|
|
||||||
int64_t tsNow = taosGetTimestampMs();
|
|
||||||
int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
|
|
||||||
pSyncTimer->timeStamp = tsNow;
|
|
||||||
char logBuf[64];
|
|
||||||
snprintf(logBuf, sizeof(logBuf), "timer-elapsed:%" PRId64, timerElapsed);
|
|
||||||
|
|
||||||
// send msg
|
// send msg
|
||||||
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg, logBuf);
|
syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed);
|
||||||
|
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
|
sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
|
||||||
|
@ -2161,9 +2160,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
|
|
||||||
int64_t tsMs = taosGetTimestampMs();
|
int64_t tsMs = taosGetTimestampMs();
|
||||||
int64_t timeDiff = tsMs - pMsg->timeStamp;
|
int64_t timeDiff = tsMs - pMsg->timeStamp;
|
||||||
char buf[128];
|
syncLogRecvHeartbeat(ths, pMsg, timeDiff);
|
||||||
snprintf(buf, sizeof(buf), "net elapsed:%" PRId64, timeDiff);
|
|
||||||
syncLogRecvHeartbeat(ths, pMsg, buf);
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
(void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);
|
(void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);
|
||||||
|
@ -2173,7 +2170,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
pMsgReply->srcId = ths->myRaftId;
|
pMsgReply->srcId = ths->myRaftId;
|
||||||
pMsgReply->term = ths->pRaftStore->currentTerm;
|
pMsgReply->term = ths->pRaftStore->currentTerm;
|
||||||
pMsgReply->privateTerm = 8864; // magic number
|
pMsgReply->privateTerm = 8864; // magic number
|
||||||
pMsgReply->timeStamp = taosGetTimestampMs();
|
pMsgReply->timeStamp = tsMs;
|
||||||
|
|
||||||
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
|
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
|
||||||
syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
|
syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
|
||||||
|
@ -2240,9 +2237,7 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
|
|
||||||
int64_t tsMs = taosGetTimestampMs();
|
int64_t tsMs = taosGetTimestampMs();
|
||||||
int64_t timeDiff = tsMs - pMsg->timeStamp;
|
int64_t timeDiff = tsMs - pMsg->timeStamp;
|
||||||
char buf[128];
|
syncLogRecvHeartbeatReply(ths, pMsg, timeDiff);
|
||||||
snprintf(buf, sizeof(buf), "net elapsed:%" PRId64, timeDiff);
|
|
||||||
syncLogRecvHeartbeatReply(ths, pMsg, buf);
|
|
||||||
|
|
||||||
// update last reply time, make decision whether the other node is alive or not
|
// update last reply time, make decision whether the other node is alive or not
|
||||||
syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
|
syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
|
||||||
|
|
|
@ -207,8 +207,7 @@ int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* dest
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg, const char* debugStr) {
|
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) {
|
||||||
syncLogSendHeartbeat(pSyncNode, pMsg->pCont, debugStr);
|
|
||||||
return syncNodeSendMsgById(destId, pSyncNode, pMsg);
|
return syncNodeSendMsgById(destId, pSyncNode, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -231,7 +230,8 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
|
||||||
pSyncMsg->timeStamp = ts;
|
pSyncMsg->timeStamp = ts;
|
||||||
|
|
||||||
// send msg
|
// send msg
|
||||||
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg, "x");
|
syncLogSendHeartbeat(pSyncNode, pSyncMsg, true, 0);
|
||||||
|
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -94,7 +94,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
|
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
|
||||||
syncLogRecvRequestVote(ths, pMsg, "not in my config");
|
syncLogRecvRequestVote(ths, pMsg, -1, "not in my config");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -133,13 +133,8 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
pReply->voteGranted = grant;
|
pReply->voteGranted = grant;
|
||||||
|
|
||||||
// trace log
|
// trace log
|
||||||
do {
|
syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, "");
|
||||||
char logBuf[32];
|
syncLogSendRequestVoteReply(ths, pReply, "");
|
||||||
snprintf(logBuf, sizeof(logBuf), "grant:%d", pReply->voteGranted);
|
|
||||||
syncLogRecvRequestVote(ths, pMsg, logBuf);
|
|
||||||
syncLogSendRequestVoteReply(ths, pReply, "");
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
|
@ -396,6 +396,8 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
|
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
|
||||||
|
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||||
|
|
||||||
int64_t tsNow = taosGetTimestampMs();
|
int64_t tsNow = taosGetTimestampMs();
|
||||||
int64_t timeDIff = tsNow - pMsg->timeStamp;
|
int64_t timeDIff = tsNow - pMsg->timeStamp;
|
||||||
sNTrace(
|
sNTrace(
|
||||||
|
@ -404,11 +406,15 @@ void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char*
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
|
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
|
||||||
|
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||||
|
|
||||||
sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd,
|
sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd,
|
||||||
syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, pMsg->fcIndex, s);
|
syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, pMsg->fcIndex, s);
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
|
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
|
||||||
|
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||||
|
|
||||||
char host[64];
|
char host[64];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||||
|
@ -420,6 +426,8 @@ void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
|
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
|
||||||
|
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||||
|
|
||||||
char host[64];
|
char host[64];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
|
@ -430,28 +438,42 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
|
||||||
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
|
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
|
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed) {
|
||||||
|
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||||
|
|
||||||
char host[64];
|
char host[64];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||||
|
|
||||||
sNTrace(pSyncNode,
|
if (printX) {
|
||||||
"send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 "}, %s",
|
sNTrace(pSyncNode,
|
||||||
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s);
|
"send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
|
||||||
|
"}, x",
|
||||||
|
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp);
|
||||||
|
} else {
|
||||||
|
sNTrace(pSyncNode,
|
||||||
|
"send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
|
||||||
|
"}, timer-elapsed:%" PRId64,
|
||||||
|
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timerElapsed);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
|
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff) {
|
||||||
|
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||||
|
|
||||||
char host[64];
|
char host[64];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
|
|
||||||
sNTrace(pSyncNode,
|
sNTrace(pSyncNode,
|
||||||
"recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
|
"recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
|
||||||
"}, %s",
|
"}, net elapsed:%" PRId64,
|
||||||
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s);
|
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timeDiff);
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
|
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
|
||||||
|
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||||
|
|
||||||
char host[64];
|
char host[64];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||||
|
@ -460,15 +482,19 @@ void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p
|
||||||
pMsg->term, pMsg->timeStamp, s);
|
pMsg->term, pMsg->timeStamp, s);
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
|
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, int64_t timeDiff) {
|
||||||
|
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||||
|
|
||||||
char host[64];
|
char host[64];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s", host, port,
|
sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", ts:%" PRId64 "}, net elapsed:%" PRId64,
|
||||||
pMsg->term, pMsg->timeStamp, s);
|
host, port, pMsg->term, pMsg->timeStamp, timeDiff);
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
|
void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
|
||||||
|
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||||
|
|
||||||
char host[64];
|
char host[64];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||||
|
@ -476,6 +502,8 @@ void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMs
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
|
void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
|
||||||
|
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||||
|
|
||||||
char host[64];
|
char host[64];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
|
@ -483,6 +511,8 @@ void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMs
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
|
void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
|
||||||
|
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||||
|
|
||||||
char host[64];
|
char host[64];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||||
|
@ -491,6 +521,8 @@ void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshot
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
|
void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
|
||||||
|
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||||
|
|
||||||
char host[64];
|
char host[64];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
|
@ -499,6 +531,8 @@ void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshot
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
|
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
|
||||||
|
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||||
|
|
||||||
char host[64];
|
char host[64];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||||
|
@ -510,6 +544,8 @@ void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* p
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
|
void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
|
||||||
|
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||||
|
|
||||||
char host[64];
|
char host[64];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
|
@ -522,6 +558,8 @@ void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* p
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
|
void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
|
||||||
|
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||||
|
|
||||||
char host[64];
|
char host[64];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||||
|
@ -533,6 +571,8 @@ void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMs
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
|
void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
|
||||||
|
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||||
|
|
||||||
char host[64];
|
char host[64];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
|
@ -544,6 +584,8 @@ void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMs
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
|
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
|
||||||
|
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||||
|
|
||||||
char host[64];
|
char host[64];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
|
@ -556,6 +598,8 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
|
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
|
||||||
|
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||||
|
|
||||||
char host[64];
|
char host[64];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||||
|
@ -566,16 +610,28 @@ void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs
|
||||||
pMsg->dataLen, s);
|
pMsg->dataLen, s);
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
|
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, int32_t voteGranted, const char* s) {
|
||||||
|
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||||
|
|
||||||
char logBuf[256];
|
char logBuf[256];
|
||||||
char host[64];
|
char host[64];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
sNTrace(pSyncNode, "recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s",
|
|
||||||
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
|
if (voteGranted == -1) {
|
||||||
|
sNTrace(pSyncNode,
|
||||||
|
"recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host,
|
||||||
|
port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
|
||||||
|
} else {
|
||||||
|
sNTrace(pSyncNode,
|
||||||
|
"recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, granted:%d",
|
||||||
|
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, voteGranted);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) {
|
void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) {
|
||||||
|
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||||
|
|
||||||
char host[64];
|
char host[64];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||||
|
@ -584,6 +640,8 @@ void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
|
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
|
||||||
|
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||||
|
|
||||||
char host[64];
|
char host[64];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
|
@ -592,6 +650,8 @@ void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
|
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
|
||||||
|
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||||
|
|
||||||
char host[64];
|
char host[64];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||||
|
|
|
@ -709,8 +709,8 @@
|
||||||
,,,system-test,python3 ./test.py -f 7-tmq/tmqDnodeRestart1.py
|
,,,system-test,python3 ./test.py -f 7-tmq/tmqDnodeRestart1.py
|
||||||
,,,system-test,python3 ./test.py -f 7-tmq/tmqUpdate-1ctb.py
|
,,,system-test,python3 ./test.py -f 7-tmq/tmqUpdate-1ctb.py
|
||||||
,,,system-test,python3 ./test.py -f 7-tmq/tmqUpdateWithConsume.py
|
,,,system-test,python3 ./test.py -f 7-tmq/tmqUpdateWithConsume.py
|
||||||
,,,system-test,python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot0.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot0.py
|
||||||
,,,system-test,python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot1.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot1.py
|
||||||
,,,system-test,python3 ./test.py -f 7-tmq/tmqDelete-1ctb.py
|
,,,system-test,python3 ./test.py -f 7-tmq/tmqDelete-1ctb.py
|
||||||
,,,system-test,python3 ./test.py -f 7-tmq/tmqDelete-multiCtb.py
|
,,,system-test,python3 ./test.py -f 7-tmq/tmqDelete-multiCtb.py
|
||||||
,,,system-test,python3 ./test.py -f 7-tmq/tmqDropStb.py
|
,,,system-test,python3 ./test.py -f 7-tmq/tmqDropStb.py
|
||||||
|
@ -718,8 +718,8 @@
|
||||||
,,,system-test,python3 ./test.py -f 7-tmq/tmqDropNtb-snapshot0.py
|
,,,system-test,python3 ./test.py -f 7-tmq/tmqDropNtb-snapshot0.py
|
||||||
,,,system-test,python3 ./test.py -f 7-tmq/tmqDropNtb-snapshot1.py
|
,,,system-test,python3 ./test.py -f 7-tmq/tmqDropNtb-snapshot1.py
|
||||||
,,,system-test,python3 ./test.py -f 7-tmq/tmqUdf.py
|
,,,system-test,python3 ./test.py -f 7-tmq/tmqUdf.py
|
||||||
,,,system-test,python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot0.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot0.py
|
||||||
,,,system-test,python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py
|
||||||
,,,system-test,python3 ./test.py -f 7-tmq/stbTagFilter-1ctb.py
|
,,,system-test,python3 ./test.py -f 7-tmq/stbTagFilter-1ctb.py
|
||||||
,,,system-test,python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py
|
,,,system-test,python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py
|
||||||
,,,system-test,python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
|
,,,system-test,python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
|
||||||
|
|
Loading…
Reference in New Issue