refactor(sync): add trace log

This commit is contained in:
Minghao Li 2022-07-20 15:34:09 +08:00
parent 90e7d794f3
commit c2b348bec5
5 changed files with 427 additions and 167 deletions

View File

@ -92,13 +92,24 @@
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
int32_t ret = 0;
// print log
syncAppendEntriesLog2("==syncNodeOnAppendEntriesCb==", pMsg);
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
syncNodeEventLog(ths, "recv sync-append-entries, maybe replica already dropped");
return ret;
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"datalen:%d}, maybe replica already dropped",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen);
syncNodeErrorLog(ths, logBuf);
} while (0);
return -1;
}
// maybe update term
@ -117,12 +128,25 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
do {
// return to follower state
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
syncNodeEventLog(ths, "recv sync-append-entries, candidate to follower");
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"datalen:%d}, candidate to follower",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
} while (0);
syncNodeBecomeFollower(ths, "from candidate by append entries");
// ret or reply?
return ret;
return -1;
}
} while (0);
@ -149,10 +173,17 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
if ((pMsg->term < ths->pRaftStore->currentTerm) ||
((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
do {
char logBuf[128];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries, reject, pre-index:%" PRId64 ", pre-term:%" PRIu64 ", datalen:%d",
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
"recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"datalen:%d}, reject",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
} while (0);
@ -165,12 +196,15 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// msg event log
do {
char host[128];
char host[64];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
SRpcMsg rpcMsg;
@ -193,10 +227,17 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
bool hasAppendEntries = pMsg->dataLen > 0;
do {
char logBuf[128];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries, accept, pre-index:%" PRId64 ", pre-term:%" PRIu64 ", datalen:%d",
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
"recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"datalen:%d}, accept",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
} while (0);
@ -349,12 +390,15 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// msg event log
do {
char host[128];
char host[64];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
SRpcMsg rpcMsg;
@ -558,7 +602,21 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
syncNodeEventLog(ths, "recv sync-append-entries-batch, maybe replica already dropped");
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-batch from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"count:%d}, maybe replica already dropped",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataCount);
syncNodeErrorLog(ths, logBuf);
} while (0);
return ret;
}
@ -582,7 +640,20 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
do {
bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE;
if (condition) {
syncNodeEventLog(ths, "recv sync-append-entries-batch, candidate to follower");
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-batch from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"count:%d}, candidate to follower",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataCount);
syncNodeEventLog(ths, logBuf);
} while (0);
syncNodeBecomeFollower(ths, "from candidate by append entries");
// do not reply?
@ -603,11 +674,17 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
(pMsg->prevLogIndex <= ths->commitIndex);
if (condition) {
do {
char logBuf[128];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-batch, fake match2, {pre-index:%" PRId64 ", pre-term:%" PRIu64
", datalen:%d, datacount:%d}",
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen, pMsg->dataCount);
"recv sync-append-entries-batch from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"count:%d}, fake match2",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataCount);
syncNodeEventLog(ths, logBuf);
} while (0);
@ -663,12 +740,15 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
// msg event log
do {
char host[128];
char host[64];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
// send response
@ -703,11 +783,17 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
if (condition) {
do {
char logBuf[128];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-batch, not match, {pre-index:%" PRId64 ", pre-term:%" PRIu64
", datalen:%d, datacount:%d}",
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen, pMsg->dataCount);
"recv sync-append-entries-batch from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"count:%d}, not match",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataCount);
syncNodeEventLog(ths, logBuf);
} while (0);
@ -725,12 +811,15 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
// msg event log
do {
char host[128];
char host[64];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
// send response
@ -763,11 +852,17 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
SOffsetAndContLen* metaTableArr = syncAppendEntriesBatchMetaTableArray(pMsg);
do {
char logBuf[128];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-batch, match, {pre-index:%" PRId64 ", pre-term:%" PRIu64
", datalen:%d, datacount:%d}",
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen, pMsg->dataCount);
"recv sync-append-entries-batch from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"count:%d}, match",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataCount);
syncNodeEventLog(ths, logBuf);
} while (0);
@ -809,12 +904,15 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
// msg event log
do {
char host[128];
char host[64];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
// send response
@ -866,12 +964,23 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
int32_t ret = 0;
int32_t code = 0;
// print log
syncAppendEntriesLog2("==syncNodeOnAppendEntriesSnapshotCb==", pMsg);
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
syncNodeEventLog(ths, "recv sync-append-entries, maybe replica already dropped");
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"datalen:%d}, maybe replica dropped",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen);
syncNodeErrorLog(ths, logBuf);
} while (0);
return ret;
}
@ -895,7 +1004,20 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
do {
bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE;
if (condition) {
syncNodeEventLog(ths, "recv sync-append-entries, candidate to follower");
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"datalen:%d}, candidate to follower",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
} while (0);
syncNodeBecomeFollower(ths, "from candidate by append entries");
// do not reply?
@ -976,10 +1098,17 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
(pMsg->prevLogIndex <= ths->commitIndex);
if (condition) {
do {
char logBuf[128];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries, fake match2, pre-index:%" PRId64 ", pre-term:%" PRIu64 ", datalen:%d",
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
"recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"datalen:%d}, fake match2",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
} while (0);
@ -1028,12 +1157,15 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
// msg event log
do {
char host[128];
char host[64];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
// send response
@ -1067,11 +1199,20 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
bool condition = condition1 || condition2;
if (condition) {
char logBuf[128];
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries, not match, pre-index:%" PRId64 ", pre-term:%" PRIu64 ", datalen:%d",
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
"recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"datalen:%d}, not match",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
} while (0);
// prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
@ -1084,12 +1225,15 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
// msg event log
do {
char host[128];
char host[64];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
// send response
@ -1120,11 +1264,20 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
// has entries in SyncAppendEntries msg
bool hasAppendEntries = pMsg->dataLen > 0;
char logBuf[128];
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries, match, pre-index:%" PRId64 ", pre-term:%" PRIu64 ", datalen:%d",
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
"recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"datalen:%d}, match",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
} while (0);
if (hasExtraEntries) {
// make log same, rollback deleted entries
@ -1160,12 +1313,15 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
// msg event log
do {
char host[128];
char host[64];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
// send response

View File

@ -40,39 +40,59 @@
int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
int32_t ret = 0;
// print log
syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplyCb==", pMsg);
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped");
return 0;
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, maybe replica "
"already dropped",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex);
syncNodeErrorLog(ths, logBuf);
} while (0);
return -1;
}
// drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, recv-term:%" PRIu64 ", drop stale response",
pMsg->term);
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, drop stale response",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
return 0;
}
if (gRaftDetailLog) {
syncNodeEventLog(ths, "recv sync-append-entries-reply, before");
}
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pNextIndex", ths->pNextIndex);
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pMatchIndex", ths->pMatchIndex);
// no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) {
// syncNodeUpdateTerm(ths, pMsg->term);
// }
if (pMsg->term > ths->pRaftStore->currentTerm) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%" PRIu64, pMsg->term);
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, error term",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex);
syncNodeErrorLog(ths, logBuf);
} while (0);
return -1;
}
@ -100,13 +120,23 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
}
if (gRaftDetailLog) {
syncNodeEventLog(ths, "recv sync-append-entries-reply, after");
}
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pNextIndex", ths->pNextIndex);
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pMatchIndex", ths->pMatchIndex);
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
SyncIndex matchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, after next:" PRId64
", "
"match:" PRId64 "",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, nextIndex, matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
return ret;
return 0;
}
// only start once
@ -147,35 +177,55 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync
int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
int32_t ret = 0;
// print log
do {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, term:%lu, match:%ld, success:%d", pMsg->term,
pMsg->matchIndex, pMsg->success);
syncNodeEventLog(ths, logBuf);
} while (0);
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped");
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, maybe replica "
"already dropped",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex);
syncNodeErrorLog(ths, logBuf);
} while (0);
return -1;
}
// drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, recv-term:%" PRIu64 ", drop stale response",
pMsg->term);
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, drop stale response",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex);
syncNodeEventLog(ths, logBuf);
return -1;
} while (0);
return 0;
}
// error term
if (pMsg->term > ths->pRaftStore->currentTerm) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%" PRIu64, pMsg->term);
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, error term",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex);
syncNodeErrorLog(ths, logBuf);
} while (0);
return -1;
}
@ -293,45 +343,81 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
} while (0);
}
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
SyncIndex matchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, after next:" PRId64
", "
"match:" PRId64 "",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, nextIndex, matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
return 0;
}
int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
int32_t ret = 0;
// print log
syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplySnapshotCb==", pMsg);
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped");
return 0;
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, maybe replica "
"already dropped",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex);
syncNodeErrorLog(ths, logBuf);
} while (0);
return -1;
}
// drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, recv-term:%" PRIu64 ", drop stale response",
pMsg->term);
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, drop stale response",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
return 0;
}
if (gRaftDetailLog) {
syncNodeEventLog(ths, "recv sync-append-entries-reply, before");
}
syncIndexMgrLog2("recv sync-append-entries-reply, before pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv sync-append-entries-reply, before pMatchIndex:", ths->pMatchIndex);
// no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) {
// syncNodeUpdateTerm(ths, pMsg->term);
// }
if (pMsg->term > ths->pRaftStore->currentTerm) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%" PRIu64, pMsg->term);
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, error term",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex);
syncNodeErrorLog(ths, logBuf);
} while (0);
return -1;
}
@ -404,11 +490,21 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
}
}
if (gRaftDetailLog) {
syncNodeEventLog(ths, "recv sync-append-entries-reply, after");
}
syncIndexMgrLog2("recv sync-append-entries-reply, after pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv sync-append-entries-reply, after pMatchIndex:", ths->pMatchIndex);
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
SyncIndex matchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, after next:" PRId64
", "
"match:" PRId64 "",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, nextIndex, matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
return 0;
}

View File

@ -1564,7 +1564,8 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
snprintf(logBuf, sizeof(logBuf), "%s", str);
}
// sDebug("%s", logBuf);
sInfo("%s", logBuf);
// sInfo("%s", logBuf);
sTrace("%s", logBuf);
} else {
int len = 256 + userStrLen;
@ -1586,7 +1587,8 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
snprintf(s, len, "%s", str);
}
// sDebug("%s", s);
sInfo("%s", s);
// sInfo("%s", s);
sTrace("%s", s);
taosMemoryFree(s);
}

View File

@ -108,10 +108,10 @@ int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) {
cJSON *pRoot = cJSON_CreateObject();
char u64Buf[128] = {0};
snprintf(u64Buf, sizeof(u64Buf), "%lu", pRaftStore->currentTerm);
snprintf(u64Buf, sizeof(u64Buf), "" PRIu64 "", pRaftStore->currentTerm);
cJSON_AddStringToObject(pRoot, "current_term", u64Buf);
snprintf(u64Buf, sizeof(u64Buf), "%lu", pRaftStore->voteFor.addr);
snprintf(u64Buf, sizeof(u64Buf), "" PRIu64 "", pRaftStore->voteFor.addr);
cJSON_AddStringToObject(pRoot, "vote_for_addr", u64Buf);
cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId);
@ -142,11 +142,11 @@ int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) {
cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term");
ASSERT(cJSON_IsString(pCurrentTerm));
sscanf(pCurrentTerm->valuestring, "%lu", &(pRaftStore->currentTerm));
sscanf(pCurrentTerm->valuestring, "" PRIu64 "", &(pRaftStore->currentTerm));
cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr");
ASSERT(cJSON_IsString(pVoteForAddr));
sscanf(pVoteForAddr->valuestring, "%lu", &(pRaftStore->voteFor.addr));
sscanf(pVoteForAddr->valuestring, "" PRIu64 "", &(pRaftStore->voteFor.addr));
cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid");
pRaftStore->voteFor.vgId = pVoteForVgid->valueint;
@ -188,11 +188,11 @@ cJSON *raftStore2Json(SRaftStore *pRaftStore) {
cJSON *pRoot = cJSON_CreateObject();
if (pRaftStore != NULL) {
snprintf(u64buf, sizeof(u64buf), "%lu", pRaftStore->currentTerm);
snprintf(u64buf, sizeof(u64buf), "" PRIu64 "", pRaftStore->currentTerm);
cJSON_AddStringToObject(pRoot, "currentTerm", u64buf);
cJSON *pVoteFor = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%lu", pRaftStore->voteFor.addr);
snprintf(u64buf, sizeof(u64buf), "" PRIu64 "", pRaftStore->voteFor.addr);
cJSON_AddStringToObject(pVoteFor, "addr", u64buf);
{
uint64_t u64 = pRaftStore->voteFor.addr;
@ -224,25 +224,25 @@ char *raftStore2Str(SRaftStore *pRaftStore) {
// for debug -------------------
void raftStorePrint(SRaftStore *pObj) {
char *serialized = raftStore2Str(pObj);
printf("raftStorePrint | len:%lu | %s \n", strlen(serialized), serialized);
printf("raftStorePrint | len:" PRIu64 " | %s \n", strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void raftStorePrint2(char *s, SRaftStore *pObj) {
char *serialized = raftStore2Str(pObj);
printf("raftStorePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
printf("raftStorePrint2 | len:" PRIu64 " | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void raftStoreLog(SRaftStore *pObj) {
char *serialized = raftStore2Str(pObj);
sTrace("raftStoreLog | len:%lu | %s", strlen(serialized), serialized);
sTrace("raftStoreLog | len:" PRIu64 " | %s", strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void raftStoreLog2(char *s, SRaftStore *pObj) {
char *serialized = raftStore2Str(pObj);
sTrace("raftStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
sTrace("raftStoreLog2 | len:" PRIu64 " | %s | %s", strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}

View File

@ -315,15 +315,18 @@ int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, c
int32_t ret = 0;
do {
char host[128];
char host[64];
uint16_t port;
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
sDebug("vgId:%d, send sync-append-entries to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
", pterm:%" PRIu64 ", commit:%" PRId64
", "
"datalen:%d}",
pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm,
pMsg->commitIndex, pMsg->dataLen);
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
pMsg->dataLen);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
SRpcMsg rpcMsg;
@ -335,13 +338,16 @@ int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, c
int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId,
const SyncAppendEntriesBatch* pMsg) {
do {
char host[128];
char host[64];
uint16_t port;
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
sDebug("vgId:%d, send sync-append-entries-batch to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64
", pre-term:%" PRIu64 ", pterm:%" PRIu64 ", commit:%" PRId64 ", datalen:%d, datacount:%d}",
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries-batch to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
", pterm:%" PRIu64 ", commit:%" PRId64 ", datalen:%d, datacount:%d}",
pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm,
pMsg->commitIndex, pMsg->dataLen, pMsg->dataCount);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
SRpcMsg rpcMsg;