Merge pull request #15211 from taosdata/feature/3.0_mhli
refactor(sync): add trace log
This commit is contained in:
commit
f391570f02
|
@ -257,6 +257,22 @@ int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
|
||||||
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
|
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
|
||||||
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
|
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
|
||||||
|
|
||||||
|
// trace log
|
||||||
|
void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);
|
||||||
|
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);
|
||||||
|
|
||||||
|
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);
|
||||||
|
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);
|
||||||
|
|
||||||
|
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);
|
||||||
|
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);
|
||||||
|
|
||||||
|
void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s);
|
||||||
|
void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s);
|
||||||
|
|
||||||
|
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
|
||||||
|
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
|
||||||
|
|
||||||
// for debug --------------
|
// for debug --------------
|
||||||
void syncNodePrint(SSyncNode* pObj);
|
void syncNodePrint(SSyncNode* pObj);
|
||||||
void syncNodePrint2(char* s, SSyncNode* pObj);
|
void syncNodePrint2(char* s, SSyncNode* pObj);
|
||||||
|
|
|
@ -92,13 +92,10 @@
|
||||||
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
// print log
|
|
||||||
syncAppendEntriesLog2("==syncNodeOnAppendEntriesCb==", pMsg);
|
|
||||||
|
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
||||||
syncNodeEventLog(ths, "recv sync-append-entries, maybe replica already dropped");
|
syncLogRecvAppendEntries(ths, pMsg, "maybe replica already dropped");
|
||||||
return ret;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// maybe update term
|
// maybe update term
|
||||||
|
@ -114,17 +111,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
}
|
}
|
||||||
ASSERT(pMsg->dataLen >= 0);
|
ASSERT(pMsg->dataLen >= 0);
|
||||||
|
|
||||||
do {
|
// return to follower state
|
||||||
// return to follower state
|
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
|
||||||
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
|
syncLogRecvAppendEntries(ths, pMsg, "candidate to follower");
|
||||||
syncNodeEventLog(ths, "recv sync-append-entries, candidate to follower");
|
syncNodeBecomeFollower(ths, "from candidate by append entries");
|
||||||
|
return -1; // ret or reply?
|
||||||
syncNodeBecomeFollower(ths, "from candidate by append entries");
|
}
|
||||||
|
|
||||||
// ret or reply?
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
SyncTerm localPreLogTerm = 0;
|
SyncTerm localPreLogTerm = 0;
|
||||||
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
|
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
|
||||||
|
@ -148,13 +140,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
// reject request
|
// reject request
|
||||||
if ((pMsg->term < ths->pRaftStore->currentTerm) ||
|
if ((pMsg->term < ths->pRaftStore->currentTerm) ||
|
||||||
((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
|
((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
|
||||||
do {
|
syncLogRecvAppendEntries(ths, pMsg, "reject");
|
||||||
char logBuf[128];
|
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
|
||||||
"recv sync-append-entries, reject, pre-index:%" PRId64 ", pre-term:%" PRIu64 ", datalen:%d",
|
|
||||||
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
|
|
||||||
syncNodeEventLog(ths, logBuf);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
||||||
pReply->srcId = ths->myRaftId;
|
pReply->srcId = ths->myRaftId;
|
||||||
|
@ -164,14 +150,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
pReply->matchIndex = SYNC_INDEX_INVALID;
|
pReply->matchIndex = SYNC_INDEX_INVALID;
|
||||||
|
|
||||||
// msg event log
|
// msg event log
|
||||||
do {
|
syncLogSendAppendEntriesReply(ths, pReply, "");
|
||||||
char host[128];
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
|
|
||||||
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
|
|
||||||
", success:%d, match-index:%" PRId64 "}",
|
|
||||||
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||||
|
@ -192,13 +171,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
// has entries in SyncAppendEntries msg
|
// has entries in SyncAppendEntries msg
|
||||||
bool hasAppendEntries = pMsg->dataLen > 0;
|
bool hasAppendEntries = pMsg->dataLen > 0;
|
||||||
|
|
||||||
do {
|
syncLogRecvAppendEntries(ths, pMsg, "accept");
|
||||||
char logBuf[128];
|
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
|
||||||
"recv sync-append-entries, accept, pre-index:%" PRId64 ", pre-term:%" PRIu64 ", datalen:%d",
|
|
||||||
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
|
|
||||||
syncNodeEventLog(ths, logBuf);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
if (hasExtraEntries && hasAppendEntries) {
|
if (hasExtraEntries && hasAppendEntries) {
|
||||||
// not conflict by default
|
// not conflict by default
|
||||||
|
@ -348,14 +321,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// msg event log
|
// msg event log
|
||||||
do {
|
syncLogSendAppendEntriesReply(ths, pReply, "");
|
||||||
char host[128];
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
|
|
||||||
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
|
|
||||||
", success:%d, match-index:%" PRId64 "}",
|
|
||||||
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||||
|
@ -558,8 +524,8 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
||||||
|
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
||||||
syncNodeEventLog(ths, "recv sync-append-entries-batch, maybe replica already dropped");
|
syncLogRecvAppendEntriesBatch(ths, pMsg, "maybe replica already dropped");
|
||||||
return ret;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// maybe update term
|
// maybe update term
|
||||||
|
@ -582,15 +548,13 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
||||||
do {
|
do {
|
||||||
bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE;
|
bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE;
|
||||||
if (condition) {
|
if (condition) {
|
||||||
syncNodeEventLog(ths, "recv sync-append-entries-batch, candidate to follower");
|
syncLogRecvAppendEntriesBatch(ths, pMsg, "candidate to follower");
|
||||||
|
|
||||||
syncNodeBecomeFollower(ths, "from candidate by append entries");
|
syncNodeBecomeFollower(ths, "from candidate by append entries");
|
||||||
// do not reply?
|
return 0; // do not reply?
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
// fake match2
|
// fake match
|
||||||
//
|
//
|
||||||
// condition1:
|
// condition1:
|
||||||
// preIndex <= my commit index
|
// preIndex <= my commit index
|
||||||
|
@ -602,14 +566,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
||||||
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
|
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
|
||||||
(pMsg->prevLogIndex <= ths->commitIndex);
|
(pMsg->prevLogIndex <= ths->commitIndex);
|
||||||
if (condition) {
|
if (condition) {
|
||||||
do {
|
syncLogRecvAppendEntriesBatch(ths, pMsg, "fake match");
|
||||||
char logBuf[128];
|
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
|
||||||
"recv sync-append-entries-batch, fake match2, {pre-index:%" PRId64 ", pre-term:%" PRIu64
|
|
||||||
", datalen:%d, datacount:%d}",
|
|
||||||
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen, pMsg->dataCount);
|
|
||||||
syncNodeEventLog(ths, logBuf);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
SyncIndex matchIndex = ths->commitIndex;
|
SyncIndex matchIndex = ths->commitIndex;
|
||||||
bool hasAppendEntries = pMsg->dataLen > 0;
|
bool hasAppendEntries = pMsg->dataLen > 0;
|
||||||
|
@ -662,14 +619,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
||||||
pReply->matchIndex = matchIndex;
|
pReply->matchIndex = matchIndex;
|
||||||
|
|
||||||
// msg event log
|
// msg event log
|
||||||
do {
|
syncLogSendAppendEntriesReply(ths, pReply, "");
|
||||||
char host[128];
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
|
|
||||||
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
|
|
||||||
", success:%d, match-index:%" PRId64 "}",
|
|
||||||
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
// send response
|
// send response
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
|
@ -702,14 +652,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
||||||
bool condition = condition1 || condition2;
|
bool condition = condition1 || condition2;
|
||||||
|
|
||||||
if (condition) {
|
if (condition) {
|
||||||
do {
|
syncLogRecvAppendEntriesBatch(ths, pMsg, "not match");
|
||||||
char logBuf[128];
|
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
|
||||||
"recv sync-append-entries-batch, not match, {pre-index:%" PRId64 ", pre-term:%" PRIu64
|
|
||||||
", datalen:%d, datacount:%d}",
|
|
||||||
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen, pMsg->dataCount);
|
|
||||||
syncNodeEventLog(ths, logBuf);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
// maybe update commit index by snapshot
|
// maybe update commit index by snapshot
|
||||||
syncNodeMaybeUpdateCommitBySnapshot(ths);
|
syncNodeMaybeUpdateCommitBySnapshot(ths);
|
||||||
|
@ -724,14 +667,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
||||||
pReply->matchIndex = ths->commitIndex;
|
pReply->matchIndex = ths->commitIndex;
|
||||||
|
|
||||||
// msg event log
|
// msg event log
|
||||||
do {
|
syncLogSendAppendEntriesReply(ths, pReply, "");
|
||||||
char host[128];
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
|
|
||||||
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
|
|
||||||
", success:%d, match-index:%" PRId64 "}",
|
|
||||||
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
// send response
|
// send response
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
|
@ -762,14 +698,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
||||||
bool hasAppendEntries = pMsg->dataLen > 0;
|
bool hasAppendEntries = pMsg->dataLen > 0;
|
||||||
SOffsetAndContLen* metaTableArr = syncAppendEntriesBatchMetaTableArray(pMsg);
|
SOffsetAndContLen* metaTableArr = syncAppendEntriesBatchMetaTableArray(pMsg);
|
||||||
|
|
||||||
do {
|
syncLogRecvAppendEntriesBatch(ths, pMsg, "really match");
|
||||||
char logBuf[128];
|
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
|
||||||
"recv sync-append-entries-batch, match, {pre-index:%" PRId64 ", pre-term:%" PRIu64
|
|
||||||
", datalen:%d, datacount:%d}",
|
|
||||||
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen, pMsg->dataCount);
|
|
||||||
syncNodeEventLog(ths, logBuf);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
if (hasExtraEntries) {
|
if (hasExtraEntries) {
|
||||||
// make log same, rollback deleted entries
|
// make log same, rollback deleted entries
|
||||||
|
@ -808,14 +737,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
||||||
pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + pMsg->dataCount : pMsg->prevLogIndex;
|
pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + pMsg->dataCount : pMsg->prevLogIndex;
|
||||||
|
|
||||||
// msg event log
|
// msg event log
|
||||||
do {
|
syncLogSendAppendEntriesReply(ths, pReply, "");
|
||||||
char host[128];
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
|
|
||||||
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
|
|
||||||
", success:%d, match-index:%" PRId64 "}",
|
|
||||||
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
// send response
|
// send response
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
|
@ -866,13 +788,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
// print log
|
|
||||||
syncAppendEntriesLog2("==syncNodeOnAppendEntriesSnapshotCb==", pMsg);
|
|
||||||
|
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
||||||
syncNodeEventLog(ths, "recv sync-append-entries, maybe replica already dropped");
|
syncLogRecvAppendEntries(ths, pMsg, "maybe replica already dropped");
|
||||||
return ret;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// maybe update term
|
// maybe update term
|
||||||
|
@ -895,11 +814,9 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
do {
|
do {
|
||||||
bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE;
|
bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE;
|
||||||
if (condition) {
|
if (condition) {
|
||||||
syncNodeEventLog(ths, "recv sync-append-entries, candidate to follower");
|
syncLogRecvAppendEntries(ths, pMsg, "candidate to follower");
|
||||||
|
|
||||||
syncNodeBecomeFollower(ths, "from candidate by append entries");
|
syncNodeBecomeFollower(ths, "from candidate by append entries");
|
||||||
// do not reply?
|
return 0; // do not reply?
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
|
@ -962,7 +879,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
} while (0);
|
} while (0);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// fake match2
|
// fake match
|
||||||
//
|
//
|
||||||
// condition1:
|
// condition1:
|
||||||
// preIndex <= my commit index
|
// preIndex <= my commit index
|
||||||
|
@ -975,13 +892,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
|
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
|
||||||
(pMsg->prevLogIndex <= ths->commitIndex);
|
(pMsg->prevLogIndex <= ths->commitIndex);
|
||||||
if (condition) {
|
if (condition) {
|
||||||
do {
|
syncLogRecvAppendEntries(ths, pMsg, "fake match");
|
||||||
char logBuf[128];
|
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
|
||||||
"recv sync-append-entries, fake match2, pre-index:%" PRId64 ", pre-term:%" PRIu64 ", datalen:%d",
|
|
||||||
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
|
|
||||||
syncNodeEventLog(ths, logBuf);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
SyncIndex matchIndex = ths->commitIndex;
|
SyncIndex matchIndex = ths->commitIndex;
|
||||||
bool hasAppendEntries = pMsg->dataLen > 0;
|
bool hasAppendEntries = pMsg->dataLen > 0;
|
||||||
|
@ -1027,14 +938,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
pReply->matchIndex = matchIndex;
|
pReply->matchIndex = matchIndex;
|
||||||
|
|
||||||
// msg event log
|
// msg event log
|
||||||
do {
|
syncLogSendAppendEntriesReply(ths, pReply, "");
|
||||||
char host[128];
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
|
|
||||||
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
|
|
||||||
", success:%d, match-index:%" PRId64 "}",
|
|
||||||
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
// send response
|
// send response
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
|
@ -1067,11 +971,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
bool condition = condition1 || condition2;
|
bool condition = condition1 || condition2;
|
||||||
|
|
||||||
if (condition) {
|
if (condition) {
|
||||||
char logBuf[128];
|
syncLogRecvAppendEntries(ths, pMsg, "not match");
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
|
||||||
"recv sync-append-entries, not match, pre-index:%" PRId64 ", pre-term:%" PRIu64 ", datalen:%d",
|
|
||||||
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
|
|
||||||
syncNodeEventLog(ths, logBuf);
|
|
||||||
|
|
||||||
// prepare response msg
|
// prepare response msg
|
||||||
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
||||||
|
@ -1083,14 +983,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
pReply->matchIndex = SYNC_INDEX_INVALID;
|
pReply->matchIndex = SYNC_INDEX_INVALID;
|
||||||
|
|
||||||
// msg event log
|
// msg event log
|
||||||
do {
|
syncLogSendAppendEntriesReply(ths, pReply, "");
|
||||||
char host[128];
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
|
|
||||||
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
|
|
||||||
", success:%d, match-index:%" PRId64 "}",
|
|
||||||
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
// send response
|
// send response
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
|
@ -1120,11 +1013,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
// has entries in SyncAppendEntries msg
|
// has entries in SyncAppendEntries msg
|
||||||
bool hasAppendEntries = pMsg->dataLen > 0;
|
bool hasAppendEntries = pMsg->dataLen > 0;
|
||||||
|
|
||||||
char logBuf[128];
|
syncLogRecvAppendEntries(ths, pMsg, "really match");
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
|
||||||
"recv sync-append-entries, match, pre-index:%" PRId64 ", pre-term:%" PRIu64 ", datalen:%d",
|
|
||||||
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
|
|
||||||
syncNodeEventLog(ths, logBuf);
|
|
||||||
|
|
||||||
if (hasExtraEntries) {
|
if (hasExtraEntries) {
|
||||||
// make log same, rollback deleted entries
|
// make log same, rollback deleted entries
|
||||||
|
@ -1159,14 +1048,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + 1 : pMsg->prevLogIndex;
|
pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + 1 : pMsg->prevLogIndex;
|
||||||
|
|
||||||
// msg event log
|
// msg event log
|
||||||
do {
|
syncLogSendAppendEntriesReply(ths, pReply, "");
|
||||||
char host[128];
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
|
|
||||||
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
|
|
||||||
", success:%d, match-index:%" PRId64 "}",
|
|
||||||
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
// send response
|
// send response
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
|
|
|
@ -40,44 +40,33 @@
|
||||||
int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
|
int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
// print log
|
|
||||||
syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplyCb==", pMsg);
|
|
||||||
|
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
||||||
syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped");
|
syncLogRecvAppendEntriesReply(ths, pMsg, "maybe replica already dropped");
|
||||||
return 0;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// drop stale response
|
// drop stale response
|
||||||
if (pMsg->term < ths->pRaftStore->currentTerm) {
|
if (pMsg->term < ths->pRaftStore->currentTerm) {
|
||||||
char logBuf[128];
|
syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response");
|
||||||
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, recv-term:%" PRIu64 ", drop stale response",
|
|
||||||
pMsg->term);
|
|
||||||
syncNodeEventLog(ths, logBuf);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (gRaftDetailLog) {
|
|
||||||
syncNodeEventLog(ths, "recv sync-append-entries-reply, before");
|
|
||||||
}
|
|
||||||
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pNextIndex", ths->pNextIndex);
|
|
||||||
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pMatchIndex", ths->pMatchIndex);
|
|
||||||
|
|
||||||
// no need this code, because if I receive reply.term, then I must have sent for that term.
|
// no need this code, because if I receive reply.term, then I must have sent for that term.
|
||||||
// if (pMsg->term > ths->pRaftStore->currentTerm) {
|
// if (pMsg->term > ths->pRaftStore->currentTerm) {
|
||||||
// syncNodeUpdateTerm(ths, pMsg->term);
|
// syncNodeUpdateTerm(ths, pMsg->term);
|
||||||
// }
|
// }
|
||||||
|
|
||||||
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
||||||
char logBuf[128];
|
syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
|
||||||
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%" PRIu64, pMsg->term);
|
|
||||||
syncNodeErrorLog(ths, logBuf);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
|
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
|
||||||
|
|
||||||
|
SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
|
||||||
|
SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
|
||||||
|
|
||||||
if (pMsg->success) {
|
if (pMsg->success) {
|
||||||
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
|
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
|
||||||
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
|
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
|
||||||
|
@ -100,13 +89,16 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
|
||||||
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
|
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (gRaftDetailLog) {
|
SyncIndex afterNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
|
||||||
syncNodeEventLog(ths, "recv sync-append-entries-reply, after");
|
SyncIndex afterMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
|
||||||
}
|
do {
|
||||||
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pNextIndex", ths->pNextIndex);
|
char logBuf[256];
|
||||||
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pMatchIndex", ths->pMatchIndex);
|
snprintf(logBuf, sizeof(logBuf), "before next:%ld, match:%ld, after next:%ld, match:%ld", beforeNextIndex,
|
||||||
|
beforeMatchIndex, afterNextIndex, afterMatchIndex);
|
||||||
|
syncLogRecvAppendEntriesReply(ths, pMsg, logBuf);
|
||||||
|
} while (0);
|
||||||
|
|
||||||
return ret;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// only start once
|
// only start once
|
||||||
|
@ -147,40 +139,29 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync
|
||||||
int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
|
int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
// print log
|
|
||||||
do {
|
|
||||||
char logBuf[256];
|
|
||||||
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, term:%lu, match:%ld, success:%d", pMsg->term,
|
|
||||||
pMsg->matchIndex, pMsg->success);
|
|
||||||
syncNodeEventLog(ths, logBuf);
|
|
||||||
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
||||||
syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped");
|
syncLogRecvAppendEntriesReply(ths, pMsg, "maybe replica already dropped");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// drop stale response
|
// drop stale response
|
||||||
if (pMsg->term < ths->pRaftStore->currentTerm) {
|
if (pMsg->term < ths->pRaftStore->currentTerm) {
|
||||||
char logBuf[128];
|
syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response");
|
||||||
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, recv-term:%" PRIu64 ", drop stale response",
|
return 0;
|
||||||
pMsg->term);
|
|
||||||
syncNodeEventLog(ths, logBuf);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// error term
|
// error term
|
||||||
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
||||||
char logBuf[128];
|
syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
|
||||||
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%" PRIu64, pMsg->term);
|
|
||||||
syncNodeErrorLog(ths, logBuf);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
|
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
|
||||||
|
|
||||||
|
SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
|
||||||
|
SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
|
||||||
|
|
||||||
if (pMsg->success) {
|
if (pMsg->success) {
|
||||||
SyncIndex newNextIndex = pMsg->matchIndex + 1;
|
SyncIndex newNextIndex = pMsg->matchIndex + 1;
|
||||||
SyncIndex newMatchIndex = pMsg->matchIndex;
|
SyncIndex newMatchIndex = pMsg->matchIndex;
|
||||||
|
@ -293,50 +274,48 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
|
||||||
} while (0);
|
} while (0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SyncIndex afterNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
|
||||||
|
SyncIndex afterMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
|
||||||
|
do {
|
||||||
|
char logBuf[256];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "before next:%ld, match:%ld, after next:%ld, match:%ld", beforeNextIndex,
|
||||||
|
beforeMatchIndex, afterNextIndex, afterMatchIndex);
|
||||||
|
syncLogRecvAppendEntriesReply(ths, pMsg, logBuf);
|
||||||
|
} while (0);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
|
int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
// print log
|
|
||||||
syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplySnapshotCb==", pMsg);
|
|
||||||
|
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
||||||
syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped");
|
syncLogRecvAppendEntriesReply(ths, pMsg, "maybe replica already dropped");
|
||||||
return 0;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// drop stale response
|
// drop stale response
|
||||||
if (pMsg->term < ths->pRaftStore->currentTerm) {
|
if (pMsg->term < ths->pRaftStore->currentTerm) {
|
||||||
char logBuf[128];
|
syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response");
|
||||||
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, recv-term:%" PRIu64 ", drop stale response",
|
|
||||||
pMsg->term);
|
|
||||||
syncNodeEventLog(ths, logBuf);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (gRaftDetailLog) {
|
|
||||||
syncNodeEventLog(ths, "recv sync-append-entries-reply, before");
|
|
||||||
}
|
|
||||||
syncIndexMgrLog2("recv sync-append-entries-reply, before pNextIndex:", ths->pNextIndex);
|
|
||||||
syncIndexMgrLog2("recv sync-append-entries-reply, before pMatchIndex:", ths->pMatchIndex);
|
|
||||||
|
|
||||||
// no need this code, because if I receive reply.term, then I must have sent for that term.
|
// no need this code, because if I receive reply.term, then I must have sent for that term.
|
||||||
// if (pMsg->term > ths->pRaftStore->currentTerm) {
|
// if (pMsg->term > ths->pRaftStore->currentTerm) {
|
||||||
// syncNodeUpdateTerm(ths, pMsg->term);
|
// syncNodeUpdateTerm(ths, pMsg->term);
|
||||||
// }
|
// }
|
||||||
|
|
||||||
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
||||||
char logBuf[128];
|
syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
|
||||||
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%" PRIu64, pMsg->term);
|
|
||||||
syncNodeErrorLog(ths, logBuf);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
|
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
|
||||||
|
|
||||||
|
SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
|
||||||
|
SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
|
||||||
|
|
||||||
if (pMsg->success) {
|
if (pMsg->success) {
|
||||||
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
|
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
|
||||||
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
|
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
|
||||||
|
@ -404,11 +383,14 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (gRaftDetailLog) {
|
SyncIndex afterNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
|
||||||
syncNodeEventLog(ths, "recv sync-append-entries-reply, after");
|
SyncIndex afterMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
|
||||||
}
|
do {
|
||||||
syncIndexMgrLog2("recv sync-append-entries-reply, after pNextIndex:", ths->pNextIndex);
|
char logBuf[256];
|
||||||
syncIndexMgrLog2("recv sync-append-entries-reply, after pMatchIndex:", ths->pMatchIndex);
|
snprintf(logBuf, sizeof(logBuf), "before next:%ld, match:%ld, after next:%ld, match:%ld", beforeNextIndex,
|
||||||
|
beforeMatchIndex, afterNextIndex, afterMatchIndex);
|
||||||
|
syncLogRecvAppendEntriesReply(ths, pMsg, logBuf);
|
||||||
|
} while (0);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
|
@ -120,18 +120,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
|
||||||
|
|
||||||
int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) {
|
int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
syncLogSendRequestVote(pSyncNode, pMsg, "");
|
||||||
do {
|
|
||||||
char host[64];
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
|
|
||||||
char logBuf[256];
|
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
|
||||||
"send sync-request-vote to %s:%d {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "", host, port,
|
|
||||||
pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm);
|
|
||||||
syncNodeEventLog(pSyncNode, logBuf);
|
|
||||||
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
syncRequestVote2RpcMsg(pMsg, &rpcMsg);
|
syncRequestVote2RpcMsg(pMsg, &rpcMsg);
|
||||||
|
|
|
@ -79,8 +79,7 @@ SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaf
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
syncNodeLog3("syncIndexMgrGetIndex", pSyncIndexMgr->pSyncNode);
|
return SYNC_INDEX_INVALID;
|
||||||
ASSERT(0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
|
cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
|
||||||
|
@ -126,7 +125,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
|
||||||
|
|
||||||
char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
|
char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
|
||||||
cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr);
|
cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr);
|
||||||
char * serialized = cJSON_Print(pJson);
|
char *serialized = cJSON_Print(pJson);
|
||||||
cJSON_Delete(pJson);
|
cJSON_Delete(pJson);
|
||||||
return serialized;
|
return serialized;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1564,7 +1564,8 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
|
||||||
snprintf(logBuf, sizeof(logBuf), "%s", str);
|
snprintf(logBuf, sizeof(logBuf), "%s", str);
|
||||||
}
|
}
|
||||||
// sDebug("%s", logBuf);
|
// sDebug("%s", logBuf);
|
||||||
sInfo("%s", logBuf);
|
// sInfo("%s", logBuf);
|
||||||
|
sTrace("%s", logBuf);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
int len = 256 + userStrLen;
|
int len = 256 + userStrLen;
|
||||||
|
@ -1586,7 +1587,8 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
|
||||||
snprintf(s, len, "%s", str);
|
snprintf(s, len, "%s", str);
|
||||||
}
|
}
|
||||||
// sDebug("%s", s);
|
// sDebug("%s", s);
|
||||||
sInfo("%s", s);
|
// sInfo("%s", s);
|
||||||
|
sTrace("%s", s);
|
||||||
taosMemoryFree(s);
|
taosMemoryFree(s);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2923,4 +2925,126 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
|
||||||
|
char host[64];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||||
|
char logBuf[256];
|
||||||
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
|
"send sync-request-vote to %s:%d {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "}, %s", host, port,
|
||||||
|
pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
|
||||||
|
syncNodeEventLog(pSyncNode, logBuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
|
||||||
|
char logBuf[256];
|
||||||
|
char host[64];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
|
"recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "}, %s", host,
|
||||||
|
port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
|
||||||
|
syncNodeEventLog(pSyncNode, logBuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
|
||||||
|
char host[64];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||||
|
char logBuf[256];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "send sync-request-vote-reply to %s:%d {term:%" PRIu64 ", grant:%d}, %s", host, port,
|
||||||
|
pMsg->term, pMsg->voteGranted, s);
|
||||||
|
syncNodeEventLog(pSyncNode, logBuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
|
||||||
|
char host[64];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
|
char logBuf[256];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, %s", host,
|
||||||
|
port, pMsg->term, pMsg->voteGranted, s);
|
||||||
|
syncNodeEventLog(pSyncNode, logBuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
|
||||||
|
char host[64];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||||
|
char logBuf[256];
|
||||||
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
|
"send sync-append-entries to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
|
||||||
|
", pterm:%" PRIu64 ", commit:%" PRId64
|
||||||
|
", "
|
||||||
|
"datalen:%d}, %s",
|
||||||
|
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
|
||||||
|
pMsg->dataLen, s);
|
||||||
|
syncNodeEventLog(pSyncNode, logBuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
|
||||||
|
char host[64];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
|
char logBuf[256];
|
||||||
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
|
"recv sync-append-entries from %s:%d {term:%" PRIu64 ", pre-index:%" PRIu64 ", pre-term:%" PRIu64
|
||||||
|
", commit:%" PRIu64 ", pterm:%" PRIu64
|
||||||
|
", "
|
||||||
|
"datalen:%d}, %s",
|
||||||
|
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
|
||||||
|
pMsg->dataLen, s);
|
||||||
|
syncNodeErrorLog(pSyncNode, logBuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) {
|
||||||
|
char host[64];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||||
|
char logBuf[256];
|
||||||
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
|
"send sync-append-entries-batch to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
|
||||||
|
", pterm:%" PRIu64 ", commit:%" PRId64 ", datalen:%d, count:%d}, %s",
|
||||||
|
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
|
||||||
|
pMsg->dataLen, pMsg->dataCount, s);
|
||||||
|
syncNodeEventLog(pSyncNode, logBuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) {
|
||||||
|
char host[64];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
|
char logBuf[256];
|
||||||
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
|
"recv sync-append-entries-batch from %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
|
||||||
|
", pterm:%" PRIu64 ", commit:%" PRId64 ", datalen:%d, count:%d}, %s",
|
||||||
|
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
|
||||||
|
pMsg->dataLen, pMsg->dataCount, s);
|
||||||
|
syncNodeErrorLog(pSyncNode, logBuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
|
||||||
|
char host[64];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||||
|
char logBuf[256];
|
||||||
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
|
"send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 ", success:%d, match:%" PRId64
|
||||||
|
"}, %s",
|
||||||
|
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
|
||||||
|
syncNodeEventLog(pSyncNode, logBuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
|
||||||
|
char host[64];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
|
char logBuf[256];
|
||||||
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
|
"recv sync-append-entries-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 ", success:%d, match:%" PRId64
|
||||||
|
"}, %s",
|
||||||
|
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
|
||||||
|
syncNodeErrorLog(pSyncNode, logBuf);
|
||||||
|
}
|
||||||
|
|
|
@ -108,10 +108,10 @@ int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) {
|
||||||
cJSON *pRoot = cJSON_CreateObject();
|
cJSON *pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
char u64Buf[128] = {0};
|
char u64Buf[128] = {0};
|
||||||
snprintf(u64Buf, sizeof(u64Buf), "%lu", pRaftStore->currentTerm);
|
snprintf(u64Buf, sizeof(u64Buf), "" PRIu64 "", pRaftStore->currentTerm);
|
||||||
cJSON_AddStringToObject(pRoot, "current_term", u64Buf);
|
cJSON_AddStringToObject(pRoot, "current_term", u64Buf);
|
||||||
|
|
||||||
snprintf(u64Buf, sizeof(u64Buf), "%lu", pRaftStore->voteFor.addr);
|
snprintf(u64Buf, sizeof(u64Buf), "" PRIu64 "", pRaftStore->voteFor.addr);
|
||||||
cJSON_AddStringToObject(pRoot, "vote_for_addr", u64Buf);
|
cJSON_AddStringToObject(pRoot, "vote_for_addr", u64Buf);
|
||||||
|
|
||||||
cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId);
|
cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId);
|
||||||
|
@ -142,11 +142,11 @@ int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) {
|
||||||
|
|
||||||
cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term");
|
cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term");
|
||||||
ASSERT(cJSON_IsString(pCurrentTerm));
|
ASSERT(cJSON_IsString(pCurrentTerm));
|
||||||
sscanf(pCurrentTerm->valuestring, "%lu", &(pRaftStore->currentTerm));
|
sscanf(pCurrentTerm->valuestring, "" PRIu64 "", &(pRaftStore->currentTerm));
|
||||||
|
|
||||||
cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr");
|
cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr");
|
||||||
ASSERT(cJSON_IsString(pVoteForAddr));
|
ASSERT(cJSON_IsString(pVoteForAddr));
|
||||||
sscanf(pVoteForAddr->valuestring, "%lu", &(pRaftStore->voteFor.addr));
|
sscanf(pVoteForAddr->valuestring, "" PRIu64 "", &(pRaftStore->voteFor.addr));
|
||||||
|
|
||||||
cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid");
|
cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid");
|
||||||
pRaftStore->voteFor.vgId = pVoteForVgid->valueint;
|
pRaftStore->voteFor.vgId = pVoteForVgid->valueint;
|
||||||
|
@ -188,11 +188,11 @@ cJSON *raftStore2Json(SRaftStore *pRaftStore) {
|
||||||
cJSON *pRoot = cJSON_CreateObject();
|
cJSON *pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
if (pRaftStore != NULL) {
|
if (pRaftStore != NULL) {
|
||||||
snprintf(u64buf, sizeof(u64buf), "%lu", pRaftStore->currentTerm);
|
snprintf(u64buf, sizeof(u64buf), "" PRIu64 "", pRaftStore->currentTerm);
|
||||||
cJSON_AddStringToObject(pRoot, "currentTerm", u64buf);
|
cJSON_AddStringToObject(pRoot, "currentTerm", u64buf);
|
||||||
|
|
||||||
cJSON *pVoteFor = cJSON_CreateObject();
|
cJSON *pVoteFor = cJSON_CreateObject();
|
||||||
snprintf(u64buf, sizeof(u64buf), "%lu", pRaftStore->voteFor.addr);
|
snprintf(u64buf, sizeof(u64buf), "" PRIu64 "", pRaftStore->voteFor.addr);
|
||||||
cJSON_AddStringToObject(pVoteFor, "addr", u64buf);
|
cJSON_AddStringToObject(pVoteFor, "addr", u64buf);
|
||||||
{
|
{
|
||||||
uint64_t u64 = pRaftStore->voteFor.addr;
|
uint64_t u64 = pRaftStore->voteFor.addr;
|
||||||
|
@ -216,7 +216,7 @@ cJSON *raftStore2Json(SRaftStore *pRaftStore) {
|
||||||
|
|
||||||
char *raftStore2Str(SRaftStore *pRaftStore) {
|
char *raftStore2Str(SRaftStore *pRaftStore) {
|
||||||
cJSON *pJson = raftStore2Json(pRaftStore);
|
cJSON *pJson = raftStore2Json(pRaftStore);
|
||||||
char * serialized = cJSON_Print(pJson);
|
char *serialized = cJSON_Print(pJson);
|
||||||
cJSON_Delete(pJson);
|
cJSON_Delete(pJson);
|
||||||
return serialized;
|
return serialized;
|
||||||
}
|
}
|
||||||
|
@ -224,25 +224,25 @@ char *raftStore2Str(SRaftStore *pRaftStore) {
|
||||||
// for debug -------------------
|
// for debug -------------------
|
||||||
void raftStorePrint(SRaftStore *pObj) {
|
void raftStorePrint(SRaftStore *pObj) {
|
||||||
char *serialized = raftStore2Str(pObj);
|
char *serialized = raftStore2Str(pObj);
|
||||||
printf("raftStorePrint | len:%lu | %s \n", strlen(serialized), serialized);
|
printf("raftStorePrint | len:%" PRIu64 " | %s \n", strlen(serialized), serialized);
|
||||||
fflush(NULL);
|
fflush(NULL);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
}
|
}
|
||||||
|
|
||||||
void raftStorePrint2(char *s, SRaftStore *pObj) {
|
void raftStorePrint2(char *s, SRaftStore *pObj) {
|
||||||
char *serialized = raftStore2Str(pObj);
|
char *serialized = raftStore2Str(pObj);
|
||||||
printf("raftStorePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
|
printf("raftStorePrint2 | len:%" PRIu64 " | %s | %s \n", strlen(serialized), s, serialized);
|
||||||
fflush(NULL);
|
fflush(NULL);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
}
|
}
|
||||||
void raftStoreLog(SRaftStore *pObj) {
|
void raftStoreLog(SRaftStore *pObj) {
|
||||||
char *serialized = raftStore2Str(pObj);
|
char *serialized = raftStore2Str(pObj);
|
||||||
sTrace("raftStoreLog | len:%lu | %s", strlen(serialized), serialized);
|
sTrace("raftStoreLog | len:%" PRIu64 " | %s", strlen(serialized), serialized);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
}
|
}
|
||||||
|
|
||||||
void raftStoreLog2(char *s, SRaftStore *pObj) {
|
void raftStoreLog2(char *s, SRaftStore *pObj) {
|
||||||
char *serialized = raftStore2Str(pObj);
|
char *serialized = raftStore2Str(pObj);
|
||||||
sTrace("raftStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
sTrace("raftStoreLog2 | len:%" PRIu64 " | %s | %s", strlen(serialized), s, serialized);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
}
|
}
|
||||||
|
|
|
@ -313,18 +313,7 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
|
||||||
|
|
||||||
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) {
|
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
syncLogSendAppendEntries(pSyncNode, pMsg, "");
|
||||||
do {
|
|
||||||
char host[128];
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
|
|
||||||
sDebug("vgId:%d, send sync-append-entries to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
|
|
||||||
", pterm:%" PRIu64 ", commit:%" PRId64
|
|
||||||
", "
|
|
||||||
"datalen:%d}",
|
|
||||||
pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm,
|
|
||||||
pMsg->commitIndex, pMsg->dataLen);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
|
syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
|
||||||
|
@ -334,15 +323,7 @@ int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, c
|
||||||
|
|
||||||
int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId,
|
int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId,
|
||||||
const SyncAppendEntriesBatch* pMsg) {
|
const SyncAppendEntriesBatch* pMsg) {
|
||||||
do {
|
syncLogSendAppendEntriesBatch(pSyncNode, pMsg, "");
|
||||||
char host[128];
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
|
|
||||||
sDebug("vgId:%d, send sync-append-entries-batch to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64
|
|
||||||
", pre-term:%" PRIu64 ", pterm:%" PRIu64 ", commit:%" PRId64 ", datalen:%d, datacount:%d}",
|
|
||||||
pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm,
|
|
||||||
pMsg->commitIndex, pMsg->dataLen, pMsg->dataCount);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
syncAppendEntriesBatch2RpcMsg(pMsg, &rpcMsg);
|
syncAppendEntriesBatch2RpcMsg(pMsg, &rpcMsg);
|
||||||
|
|
|
@ -47,18 +47,7 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
|
||||||
|
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
||||||
do {
|
syncLogRecvRequestVote(ths, pMsg, "maybe replica already dropped");
|
||||||
char logBuf[256];
|
|
||||||
char host[64];
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
|
||||||
"recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
|
|
||||||
"}, maybe replica already dropped",
|
|
||||||
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm);
|
|
||||||
syncNodeEventLog(ths, logBuf);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,15 +80,10 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
|
||||||
|
|
||||||
// trace log
|
// trace log
|
||||||
do {
|
do {
|
||||||
char logBuf[256];
|
char logBuf[32];
|
||||||
char host[64];
|
snprintf(logBuf, sizeof(logBuf), "grant:%d", pReply->voteGranted);
|
||||||
uint16_t port;
|
syncLogRecvRequestVote(ths, pMsg, logBuf);
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
syncLogSendRequestVoteReply(ths, pReply, "");
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
|
||||||
"recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
|
|
||||||
"}, reply-grant:%d",
|
|
||||||
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted);
|
|
||||||
syncNodeEventLog(ths, logBuf);
|
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
|
@ -212,18 +196,7 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
|
||||||
|
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
||||||
do {
|
syncLogRecvRequestVote(ths, pMsg, "maybe replica already dropped");
|
||||||
char logBuf[256];
|
|
||||||
char host[64];
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
|
||||||
"recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
|
|
||||||
"}, maybe replica already dropped",
|
|
||||||
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm);
|
|
||||||
syncNodeEventLog(ths, logBuf);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -254,15 +227,10 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
|
||||||
|
|
||||||
// trace log
|
// trace log
|
||||||
do {
|
do {
|
||||||
char logBuf[256];
|
char logBuf[32];
|
||||||
char host[64];
|
snprintf(logBuf, sizeof(logBuf), "grant:%d", pReply->voteGranted);
|
||||||
uint16_t port;
|
syncLogRecvRequestVote(ths, pMsg, logBuf);
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
syncLogSendRequestVoteReply(ths, pReply, "");
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
|
||||||
"recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
|
|
||||||
"}, reply-grant:%d",
|
|
||||||
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted);
|
|
||||||
syncNodeEventLog(ths, logBuf);
|
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
|
|
|
@ -40,40 +40,15 @@
|
||||||
int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
|
int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
// trace log
|
|
||||||
do {
|
|
||||||
char host[64];
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
|
||||||
char logBuf[256];
|
|
||||||
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host,
|
|
||||||
port, pMsg->term, pMsg->voteGranted);
|
|
||||||
syncNodeEventLog(ths, logBuf);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
||||||
char host[64];
|
syncLogRecvRequestVoteReply(ths, pMsg, "maybe replica already dropped");
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
|
||||||
char logBuf[256];
|
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
|
||||||
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port,
|
|
||||||
pMsg->term, pMsg->voteGranted);
|
|
||||||
syncNodeErrorLog(ths, logBuf);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// drop stale response
|
// drop stale response
|
||||||
if (pMsg->term < ths->pRaftStore->currentTerm) {
|
if (pMsg->term < ths->pRaftStore->currentTerm) {
|
||||||
char host[64];
|
syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response");
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
|
||||||
char logBuf[256];
|
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
|
||||||
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port,
|
|
||||||
pMsg->term, pMsg->voteGranted);
|
|
||||||
syncNodeErrorLog(ths, logBuf);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,16 +59,11 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
|
||||||
// }
|
// }
|
||||||
|
|
||||||
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
||||||
char host[64];
|
syncLogRecvRequestVoteReply(ths, pMsg, "error term");
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
|
||||||
char logBuf[256];
|
|
||||||
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term",
|
|
||||||
host, port, pMsg->term, pMsg->voteGranted);
|
|
||||||
syncNodeErrorLog(ths, logBuf);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
syncLogRecvRequestVoteReply(ths, pMsg, "");
|
||||||
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
|
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
|
||||||
|
|
||||||
// This tallies votes even when the current state is not Candidate,
|
// This tallies votes even when the current state is not Candidate,
|
||||||
|
@ -185,40 +155,15 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
|
||||||
int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
|
int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
// trace log
|
|
||||||
do {
|
|
||||||
char host[64];
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
|
||||||
char logBuf[256];
|
|
||||||
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host,
|
|
||||||
port, pMsg->term, pMsg->voteGranted);
|
|
||||||
syncNodeEventLog(ths, logBuf);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
||||||
char host[64];
|
syncLogRecvRequestVoteReply(ths, pMsg, "maybe replica already dropped");
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
|
||||||
char logBuf[256];
|
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
|
||||||
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port,
|
|
||||||
pMsg->term, pMsg->voteGranted);
|
|
||||||
syncNodeErrorLog(ths, logBuf);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// drop stale response
|
// drop stale response
|
||||||
if (pMsg->term < ths->pRaftStore->currentTerm) {
|
if (pMsg->term < ths->pRaftStore->currentTerm) {
|
||||||
char host[64];
|
syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response");
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
|
||||||
char logBuf[256];
|
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
|
||||||
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port,
|
|
||||||
pMsg->term, pMsg->voteGranted);
|
|
||||||
syncNodeErrorLog(ths, logBuf);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -229,16 +174,11 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl
|
||||||
// }
|
// }
|
||||||
|
|
||||||
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
||||||
char host[64];
|
syncLogRecvRequestVoteReply(ths, pMsg, "error term");
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
|
||||||
char logBuf[256];
|
|
||||||
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term",
|
|
||||||
host, port, pMsg->term, pMsg->voteGranted);
|
|
||||||
syncNodeErrorLog(ths, logBuf);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
syncLogRecvRequestVoteReply(ths, pMsg, "");
|
||||||
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
|
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
|
||||||
|
|
||||||
// This tallies votes even when the current state is not Candidate,
|
// This tallies votes even when the current state is not Candidate,
|
||||||
|
|
Loading…
Reference in New Issue