Merge pull request #18551 from taosdata/feature/3.0_mhli
refactor(sync): add interface syncSnapshotSending, syncSnapshotRecving
This commit is contained in:
commit
0357ee124c
|
@ -43,6 +43,9 @@ extern "C" {
|
|||
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
|
||||
#define SYNC_HEART_TIMEOUT_MS 1000 * 8
|
||||
|
||||
#define SYNC_HEARTBEAT_SLOW_MS 1500
|
||||
#define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500
|
||||
|
||||
#define SYNC_MAX_BATCH_SIZE 1
|
||||
#define SYNC_INDEX_BEGIN 0
|
||||
#define SYNC_INDEX_INVALID -1
|
||||
|
@ -226,6 +229,8 @@ int32_t syncEndSnapshot(int64_t rid);
|
|||
int32_t syncLeaderTransfer(int64_t rid);
|
||||
int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
|
||||
bool syncIsReadyForRead(int64_t rid);
|
||||
bool syncSnapshotSending(int64_t rid);
|
||||
bool syncSnapshotRecving(int64_t rid);
|
||||
|
||||
SSyncState syncGetState(int64_t rid);
|
||||
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
|
||||
|
|
|
@ -195,6 +195,8 @@ typedef struct SSyncNode {
|
|||
int32_t electNum;
|
||||
int32_t becomeLeaderNum;
|
||||
int32_t configChangeNum;
|
||||
int32_t hbSlowNum;
|
||||
int32_t hbrSlowNum;
|
||||
|
||||
bool isStart;
|
||||
|
||||
|
@ -239,6 +241,8 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode,
|
|||
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode);
|
||||
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h);
|
||||
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode);
|
||||
bool syncNodeSnapshotSending(SSyncNode* pSyncNode);
|
||||
bool syncNodeSnapshotRecving(SSyncNode* pSyncNode);
|
||||
|
||||
// raft state change --------------
|
||||
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
|
||||
|
|
|
@ -447,6 +447,28 @@ bool syncIsReadyForRead(int64_t rid) {
|
|||
return ready;
|
||||
}
|
||||
|
||||
bool syncSnapshotSending(int64_t rid) {
|
||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||
if (pSyncNode == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool b = syncNodeSnapshotSending(pSyncNode);
|
||||
syncNodeRelease(pSyncNode);
|
||||
return b;
|
||||
}
|
||||
|
||||
bool syncSnapshotRecving(int64_t rid) {
|
||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||
if (pSyncNode == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool b = syncNodeSnapshotRecving(pSyncNode);
|
||||
syncNodeRelease(pSyncNode);
|
||||
return b;
|
||||
}
|
||||
|
||||
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
|
||||
if (pSyncNode->peersNum == 0) {
|
||||
sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
|
||||
|
@ -1013,6 +1035,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
|
|||
pSyncNode->electNum = 0;
|
||||
pSyncNode->becomeLeaderNum = 0;
|
||||
pSyncNode->configChangeNum = 0;
|
||||
pSyncNode->hbSlowNum = 0;
|
||||
pSyncNode->hbrSlowNum = 0;
|
||||
|
||||
sNTrace(pSyncNode, "sync open, node:%p", pSyncNode);
|
||||
|
||||
|
@ -1563,6 +1587,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
pSyncNode->leaderCache = EMPTY_RAFT_ID;
|
||||
}
|
||||
|
||||
pSyncNode->hbSlowNum = 0;
|
||||
|
||||
// state change
|
||||
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
|
||||
syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
|
@ -1607,6 +1633,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
pSyncNode->leaderTime = taosGetTimestampMs();
|
||||
|
||||
pSyncNode->becomeLeaderNum++;
|
||||
pSyncNode->hbrSlowNum = 0;
|
||||
|
||||
// reset restoreFinish
|
||||
pSyncNode->restoreFinish = false;
|
||||
|
@ -2167,6 +2194,25 @@ bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
|
|||
return b;
|
||||
}
|
||||
|
||||
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
|
||||
if (pSyncNode == NULL) return false;
|
||||
bool b = false;
|
||||
for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
|
||||
if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
|
||||
b = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return b;
|
||||
}
|
||||
|
||||
bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
|
||||
if (pSyncNode == NULL) return false;
|
||||
if (pSyncNode->pNewNodeReceiver == NULL) return false;
|
||||
if (pSyncNode->pNewNodeReceiver->start) return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
|
||||
int32_t ret = 0;
|
||||
|
||||
|
|
|
@ -283,13 +283,15 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
|
|||
"vgId:%d, sync %s "
|
||||
"%s"
|
||||
", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
|
||||
", snap-tm:%" PRIu64 ", elt-num:%d, bl-num:%d, cc-num:%d, hit:%d, mis:%d, aq:%d, snaping:%" PRId64
|
||||
", snap-tm:%" PRIu64
|
||||
", elt-num:%d, bl-num:%d, cc-num:%d, hit:%d, mis:%d, hb-slow:%d, hbr-slow:%d, aq:%d, snaping:%" PRId64
|
||||
", r-num:%d, lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s, %s, %s",
|
||||
pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex,
|
||||
pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->electNum, pNode->becomeLeaderNum,
|
||||
pNode->configChangeNum, cacheHit, cacheMiss, aqItems, pNode->snapshottingIndex, pNode->replicaNum,
|
||||
pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, pNode->electTimerLogicClock,
|
||||
pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr, hbTimeStr, hbrTimeStr);
|
||||
pNode->configChangeNum, cacheHit, cacheMiss, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems,
|
||||
pNode->snapshottingIndex, pNode->replicaNum, pNode->pRaftCfg->lastConfigIndex, pNode->changing,
|
||||
pNode->restoreFinish, quorum, pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr,
|
||||
hbTimeStr, hbrTimeStr);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -463,12 +465,23 @@ void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool
|
|||
}
|
||||
|
||||
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff) {
|
||||
if (timeDiff > SYNC_HEARTBEAT_SLOW_MS) {
|
||||
pSyncNode->hbSlowNum++;
|
||||
|
||||
char host[64];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||
sNInfo(pSyncNode,
|
||||
"recv sync-heartbeat from %s:%d slow {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
|
||||
"}, net elapsed:%" PRId64,
|
||||
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timeDiff);
|
||||
}
|
||||
|
||||
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||
|
||||
char host[64];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||
|
||||
sNTrace(pSyncNode,
|
||||
"recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
|
||||
"}, net elapsed:%" PRId64,
|
||||
|
@ -487,6 +500,17 @@ void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p
|
|||
}
|
||||
|
||||
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, int64_t timeDiff) {
|
||||
if (timeDiff > SYNC_HEARTBEAT_REPLY_SLOW_MS) {
|
||||
pSyncNode->hbrSlowNum++;
|
||||
|
||||
char host[64];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||
sNTrace(pSyncNode,
|
||||
"recv sync-heartbeat-reply from %s:%d slow {term:%" PRId64 ", ts:%" PRId64 "}, net elapsed:%" PRId64, host,
|
||||
port, pMsg->term, pMsg->timeStamp, timeDiff);
|
||||
}
|
||||
|
||||
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||
|
||||
char host[64];
|
||||
|
@ -615,7 +639,7 @@ void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs
|
|||
}
|
||||
|
||||
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, int32_t voteGranted, const char* s) {
|
||||
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||
// if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||
|
||||
char logBuf[256];
|
||||
char host[64];
|
||||
|
@ -623,42 +647,42 @@ void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, i
|
|||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||
|
||||
if (voteGranted == -1) {
|
||||
sNTrace(pSyncNode,
|
||||
"recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host,
|
||||
port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
|
||||
sNInfo(pSyncNode,
|
||||
"recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host,
|
||||
port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
|
||||
} else {
|
||||
sNTrace(pSyncNode,
|
||||
"recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, granted:%d",
|
||||
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, voteGranted);
|
||||
sNInfo(pSyncNode,
|
||||
"recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, granted:%d",
|
||||
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, voteGranted);
|
||||
}
|
||||
}
|
||||
|
||||
void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) {
|
||||
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||
// if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||
|
||||
char host[64];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||
sNTrace(pNode, "send sync-request-vote to %s:%d {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host,
|
||||
port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
|
||||
sNInfo(pNode, "send sync-request-vote to %s:%d {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host,
|
||||
port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
|
||||
}
|
||||
|
||||
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
|
||||
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||
// if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||
|
||||
char host[64];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||
sNTrace(pSyncNode, "recv sync-request-vote-reply from %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
|
||||
pMsg->voteGranted, s);
|
||||
sNInfo(pSyncNode, "recv sync-request-vote-reply from %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
|
||||
pMsg->voteGranted, s);
|
||||
}
|
||||
|
||||
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
|
||||
if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||
// if (!(sDebugFlag & DEBUG_TRACE)) return;
|
||||
|
||||
char host[64];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||
sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
|
||||
pMsg->voteGranted, s);
|
||||
sNInfo(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
|
||||
pMsg->voteGranted, s);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue