From c2b348bec5019ae28839a77f9a210c49b0c623a9 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 20 Jul 2022 15:34:09 +0800 Subject: [PATCH 1/6] refactor(sync): add trace log --- source/libs/sync/src/syncAppendEntries.c | 310 +++++++++++++----- source/libs/sync/src/syncAppendEntriesReply.c | 226 +++++++++---- source/libs/sync/src/syncMain.c | 6 +- source/libs/sync/src/syncRaftStore.c | 22 +- source/libs/sync/src/syncReplication.c | 30 +- 5 files changed, 427 insertions(+), 167 deletions(-) diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 9678b335fd..0e26d1ea65 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -92,13 +92,24 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { int32_t ret = 0; - // print log - syncAppendEntriesLog2("==syncNodeOnAppendEntriesCb==", pMsg); - // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - syncNodeEventLog(ths, "recv sync-append-entries, maybe replica already dropped"); - return ret; + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 + ", commit:" PRId64 ", pterm:" PRIu64 + ", " + "datalen:%d}, maybe replica already dropped", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, + pMsg->dataLen); + syncNodeErrorLog(ths, logBuf); + } while (0); + + return -1; } // maybe update term @@ -117,12 +128,25 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { do { // return to follower state if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) { - syncNodeEventLog(ths, "recv sync-append-entries, candidate to follower"); + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 + ", commit:" PRId64 ", pterm:" PRIu64 + ", " + "datalen:%d}, candidate to follower", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, + pMsg->dataLen); + syncNodeEventLog(ths, logBuf); + } while (0); syncNodeBecomeFollower(ths, "from candidate by append entries"); // ret or reply? - return ret; + return -1; } } while (0); @@ -149,10 +173,17 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { if ((pMsg->term < ths->pRaftStore->currentTerm) || ((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) { do { - char logBuf[128]; + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries, reject, pre-index:%" PRId64 ", pre-term:%" PRIu64 ", datalen:%d", - pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen); + "recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 + ", commit:" PRId64 ", pterm:" PRIu64 + ", " + "datalen:%d}, reject", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, + pMsg->dataLen); syncNodeEventLog(ths, logBuf); } while (0); @@ -165,12 +196,15 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // msg event log do { - char host[128]; + char host[64]; uint16_t port; syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port); - sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 - ", success:%d, match-index:%" PRId64 "}", - ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 + ", success:%d, match-index:%" PRId64 "}", + host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); + syncNodeEventLog(ths, logBuf); } while (0); SRpcMsg rpcMsg; @@ -193,10 +227,17 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { bool hasAppendEntries = pMsg->dataLen > 0; do { - char logBuf[128]; + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries, accept, pre-index:%" PRId64 ", pre-term:%" PRIu64 ", datalen:%d", - pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen); + "recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 + ", commit:" PRId64 ", pterm:" PRIu64 + ", " + "datalen:%d}, accept", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, + pMsg->dataLen); syncNodeEventLog(ths, logBuf); } while (0); @@ -349,12 +390,15 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // msg event log do { - char host[128]; + char host[64]; uint16_t port; syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port); - sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 - ", success:%d, match-index:%" PRId64 "}", - ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 + ", success:%d, match-index:%" PRId64 "}", + host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); + syncNodeEventLog(ths, logBuf); } while (0); SRpcMsg rpcMsg; @@ -558,7 +602,21 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - syncNodeEventLog(ths, "recv sync-append-entries-batch, maybe replica already dropped"); + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries-batch from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 + ", commit:" PRId64 ", pterm:" PRIu64 + ", " + "count:%d}, maybe replica already dropped", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, + pMsg->dataCount); + syncNodeErrorLog(ths, logBuf); + } while (0); + return ret; } @@ -582,7 +640,20 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc do { bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE; if (condition) { - syncNodeEventLog(ths, "recv sync-append-entries-batch, candidate to follower"); + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries-batch from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 + ", commit:" PRId64 ", pterm:" PRIu64 + ", " + "count:%d}, candidate to follower", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, + pMsg->dataCount); + syncNodeEventLog(ths, logBuf); + } while (0); syncNodeBecomeFollower(ths, "from candidate by append entries"); // do not reply? @@ -603,11 +674,17 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc (pMsg->prevLogIndex <= ths->commitIndex); if (condition) { do { - char logBuf[128]; + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries-batch, fake match2, {pre-index:%" PRId64 ", pre-term:%" PRIu64 - ", datalen:%d, datacount:%d}", - pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen, pMsg->dataCount); + "recv sync-append-entries-batch from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 + ", commit:" PRId64 ", pterm:" PRIu64 + ", " + "count:%d}, fake match2", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, + pMsg->dataCount); syncNodeEventLog(ths, logBuf); } while (0); @@ -663,12 +740,15 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc // msg event log do { - char host[128]; + char host[64]; uint16_t port; syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port); - sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 - ", success:%d, match-index:%" PRId64 "}", - ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 + ", success:%d, match-index:%" PRId64 "}", + host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); + syncNodeEventLog(ths, logBuf); } while (0); // send response @@ -703,11 +783,17 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc if (condition) { do { - char logBuf[128]; + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries-batch, not match, {pre-index:%" PRId64 ", pre-term:%" PRIu64 - ", datalen:%d, datacount:%d}", - pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen, pMsg->dataCount); + "recv sync-append-entries-batch from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 + ", commit:" PRId64 ", pterm:" PRIu64 + ", " + "count:%d}, not match", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, + pMsg->dataCount); syncNodeEventLog(ths, logBuf); } while (0); @@ -725,12 +811,15 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc // msg event log do { - char host[128]; + char host[64]; uint16_t port; syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port); - sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 - ", success:%d, match-index:%" PRId64 "}", - ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 + ", success:%d, match-index:%" PRId64 "}", + host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); + syncNodeEventLog(ths, logBuf); } while (0); // send response @@ -763,11 +852,17 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc SOffsetAndContLen* metaTableArr = syncAppendEntriesBatchMetaTableArray(pMsg); do { - char logBuf[128]; + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries-batch, match, {pre-index:%" PRId64 ", pre-term:%" PRIu64 - ", datalen:%d, datacount:%d}", - pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen, pMsg->dataCount); + "recv sync-append-entries-batch from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 + ", commit:" PRId64 ", pterm:" PRIu64 + ", " + "count:%d}, match", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, + pMsg->dataCount); syncNodeEventLog(ths, logBuf); } while (0); @@ -809,12 +904,15 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc // msg event log do { - char host[128]; + char host[64]; uint16_t port; syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port); - sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 - ", success:%d, match-index:%" PRId64 "}", - ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 + ", success:%d, match-index:%" PRId64 "}", + host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); + syncNodeEventLog(ths, logBuf); } while (0); // send response @@ -866,12 +964,23 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs int32_t ret = 0; int32_t code = 0; - // print log - syncAppendEntriesLog2("==syncNodeOnAppendEntriesSnapshotCb==", pMsg); - // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - syncNodeEventLog(ths, "recv sync-append-entries, maybe replica already dropped"); + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 + ", commit:" PRId64 ", pterm:" PRIu64 + ", " + "datalen:%d}, maybe replica dropped", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, + pMsg->dataLen); + syncNodeErrorLog(ths, logBuf); + } while (0); + return ret; } @@ -895,7 +1004,20 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs do { bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE; if (condition) { - syncNodeEventLog(ths, "recv sync-append-entries, candidate to follower"); + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 + ", commit:" PRId64 ", pterm:" PRIu64 + ", " + "datalen:%d}, candidate to follower", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, + pMsg->dataLen); + syncNodeEventLog(ths, logBuf); + } while (0); syncNodeBecomeFollower(ths, "from candidate by append entries"); // do not reply? @@ -976,10 +1098,17 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs (pMsg->prevLogIndex <= ths->commitIndex); if (condition) { do { - char logBuf[128]; + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries, fake match2, pre-index:%" PRId64 ", pre-term:%" PRIu64 ", datalen:%d", - pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen); + "recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 + ", commit:" PRId64 ", pterm:" PRIu64 + ", " + "datalen:%d}, fake match2", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, + pMsg->dataLen); syncNodeEventLog(ths, logBuf); } while (0); @@ -1028,12 +1157,15 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs // msg event log do { - char host[128]; + char host[64]; uint16_t port; syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port); - sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 - ", success:%d, match-index:%" PRId64 "}", - ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 + ", success:%d, match-index:%" PRId64 "}", + host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); + syncNodeEventLog(ths, logBuf); } while (0); // send response @@ -1067,11 +1199,20 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs bool condition = condition1 || condition2; if (condition) { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries, not match, pre-index:%" PRId64 ", pre-term:%" PRIu64 ", datalen:%d", - pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen); - syncNodeEventLog(ths, logBuf); + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 + ", commit:" PRId64 ", pterm:" PRIu64 + ", " + "datalen:%d}, not match", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, + pMsg->dataLen); + syncNodeEventLog(ths, logBuf); + } while (0); // prepare response msg SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); @@ -1084,12 +1225,15 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs // msg event log do { - char host[128]; + char host[64]; uint16_t port; syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port); - sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 - ", success:%d, match-index:%" PRId64 "}", - ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 + ", success:%d, match-index:%" PRId64 "}", + host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); + syncNodeEventLog(ths, logBuf); } while (0); // send response @@ -1120,11 +1264,20 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs // has entries in SyncAppendEntries msg bool hasAppendEntries = pMsg->dataLen > 0; - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries, match, pre-index:%" PRId64 ", pre-term:%" PRIu64 ", datalen:%d", - pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen); - syncNodeEventLog(ths, logBuf); + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 + ", commit:" PRId64 ", pterm:" PRIu64 + ", " + "datalen:%d}, match", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, + pMsg->dataLen); + syncNodeEventLog(ths, logBuf); + } while (0); if (hasExtraEntries) { // make log same, rollback deleted entries @@ -1160,12 +1313,15 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs // msg event log do { - char host[128]; + char host[64]; uint16_t port; syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port); - sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 - ", success:%d, match-index:%" PRId64 "}", - ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 + ", success:%d, match-index:%" PRId64 "}", + host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); + syncNodeEventLog(ths, logBuf); } while (0); // send response diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 5137922522..5149b47147 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -40,39 +40,59 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { int32_t ret = 0; - // print log - syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplyCb==", pMsg); - // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped"); - return 0; + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 + "}, maybe replica " + "already dropped", + host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex); + syncNodeErrorLog(ths, logBuf); + } while (0); + + return -1; } // drop stale response if (pMsg->term < ths->pRaftStore->currentTerm) { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, recv-term:%" PRIu64 ", drop stale response", - pMsg->term); - syncNodeEventLog(ths, logBuf); + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 + "}, drop stale response", + host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex); + syncNodeEventLog(ths, logBuf); + } while (0); + return 0; } - if (gRaftDetailLog) { - syncNodeEventLog(ths, "recv sync-append-entries-reply, before"); - } - syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pNextIndex", ths->pNextIndex); - syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pMatchIndex", ths->pMatchIndex); - // no need this code, because if I receive reply.term, then I must have sent for that term. // if (pMsg->term > ths->pRaftStore->currentTerm) { // syncNodeUpdateTerm(ths, pMsg->term); // } if (pMsg->term > ths->pRaftStore->currentTerm) { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%" PRIu64, pMsg->term); - syncNodeErrorLog(ths, logBuf); + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 + "}, error term", + host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex); + syncNodeErrorLog(ths, logBuf); + } while (0); + return -1; } @@ -100,13 +120,23 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); } - if (gRaftDetailLog) { - syncNodeEventLog(ths, "recv sync-append-entries-reply, after"); - } - syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pNextIndex", ths->pNextIndex); - syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pMatchIndex", ths->pMatchIndex); + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); + SyncIndex matchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 + "}, after next:" PRId64 + ", " + "match:" PRId64 "", + host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, nextIndex, matchIndex); + syncNodeEventLog(ths, logBuf); + } while (0); - return ret; + return 0; } // only start once @@ -147,35 +177,55 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { int32_t ret = 0; - // print log - do { - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, term:%lu, match:%ld, success:%d", pMsg->term, - pMsg->matchIndex, pMsg->success); - syncNodeEventLog(ths, logBuf); - - } while (0); - // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped"); + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 + "}, maybe replica " + "already dropped", + host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex); + syncNodeErrorLog(ths, logBuf); + } while (0); + return -1; } // drop stale response if (pMsg->term < ths->pRaftStore->currentTerm) { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, recv-term:%" PRIu64 ", drop stale response", - pMsg->term); - syncNodeEventLog(ths, logBuf); - return -1; + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 + "}, drop stale response", + host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex); + syncNodeEventLog(ths, logBuf); + } while (0); + + return 0; } // error term if (pMsg->term > ths->pRaftStore->currentTerm) { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%" PRIu64, pMsg->term); - syncNodeErrorLog(ths, logBuf); + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 + "}, error term", + host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex); + syncNodeErrorLog(ths, logBuf); + } while (0); + return -1; } @@ -293,45 +343,81 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie } while (0); } + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); + SyncIndex matchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 + "}, after next:" PRId64 + ", " + "match:" PRId64 "", + host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, nextIndex, matchIndex); + syncNodeEventLog(ths, logBuf); + } while (0); + return 0; } int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { int32_t ret = 0; - // print log - syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplySnapshotCb==", pMsg); - // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped"); - return 0; + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 + "}, maybe replica " + "already dropped", + host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex); + syncNodeErrorLog(ths, logBuf); + } while (0); + + return -1; } // drop stale response if (pMsg->term < ths->pRaftStore->currentTerm) { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, recv-term:%" PRIu64 ", drop stale response", - pMsg->term); - syncNodeEventLog(ths, logBuf); + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 + "}, drop stale response", + host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex); + syncNodeEventLog(ths, logBuf); + } while (0); + return 0; } - if (gRaftDetailLog) { - syncNodeEventLog(ths, "recv sync-append-entries-reply, before"); - } - syncIndexMgrLog2("recv sync-append-entries-reply, before pNextIndex:", ths->pNextIndex); - syncIndexMgrLog2("recv sync-append-entries-reply, before pMatchIndex:", ths->pMatchIndex); - // no need this code, because if I receive reply.term, then I must have sent for that term. // if (pMsg->term > ths->pRaftStore->currentTerm) { // syncNodeUpdateTerm(ths, pMsg->term); // } if (pMsg->term > ths->pRaftStore->currentTerm) { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%" PRIu64, pMsg->term); - syncNodeErrorLog(ths, logBuf); + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 + "}, error term", + host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex); + syncNodeErrorLog(ths, logBuf); + } while (0); + return -1; } @@ -404,11 +490,21 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries } } - if (gRaftDetailLog) { - syncNodeEventLog(ths, "recv sync-append-entries-reply, after"); - } - syncIndexMgrLog2("recv sync-append-entries-reply, after pNextIndex:", ths->pNextIndex); - syncIndexMgrLog2("recv sync-append-entries-reply, after pMatchIndex:", ths->pMatchIndex); + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); + SyncIndex matchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 + "}, after next:" PRId64 + ", " + "match:" PRId64 "", + host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, nextIndex, matchIndex); + syncNodeEventLog(ths, logBuf); + } while (0); return 0; } \ No newline at end of file diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index d55a385bb8..c91fce57e3 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1564,7 +1564,8 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { snprintf(logBuf, sizeof(logBuf), "%s", str); } // sDebug("%s", logBuf); - sInfo("%s", logBuf); + // sInfo("%s", logBuf); + sTrace("%s", logBuf); } else { int len = 256 + userStrLen; @@ -1586,7 +1587,8 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { snprintf(s, len, "%s", str); } // sDebug("%s", s); - sInfo("%s", s); + // sInfo("%s", s); + sTrace("%s", s); taosMemoryFree(s); } diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index 9f5cba6c66..e7fa757c36 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -108,10 +108,10 @@ int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { cJSON *pRoot = cJSON_CreateObject(); char u64Buf[128] = {0}; - snprintf(u64Buf, sizeof(u64Buf), "%lu", pRaftStore->currentTerm); + snprintf(u64Buf, sizeof(u64Buf), "" PRIu64 "", pRaftStore->currentTerm); cJSON_AddStringToObject(pRoot, "current_term", u64Buf); - snprintf(u64Buf, sizeof(u64Buf), "%lu", pRaftStore->voteFor.addr); + snprintf(u64Buf, sizeof(u64Buf), "" PRIu64 "", pRaftStore->voteFor.addr); cJSON_AddStringToObject(pRoot, "vote_for_addr", u64Buf); cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId); @@ -142,11 +142,11 @@ int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term"); ASSERT(cJSON_IsString(pCurrentTerm)); - sscanf(pCurrentTerm->valuestring, "%lu", &(pRaftStore->currentTerm)); + sscanf(pCurrentTerm->valuestring, "" PRIu64 "", &(pRaftStore->currentTerm)); cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr"); ASSERT(cJSON_IsString(pVoteForAddr)); - sscanf(pVoteForAddr->valuestring, "%lu", &(pRaftStore->voteFor.addr)); + sscanf(pVoteForAddr->valuestring, "" PRIu64 "", &(pRaftStore->voteFor.addr)); cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid"); pRaftStore->voteFor.vgId = pVoteForVgid->valueint; @@ -188,11 +188,11 @@ cJSON *raftStore2Json(SRaftStore *pRaftStore) { cJSON *pRoot = cJSON_CreateObject(); if (pRaftStore != NULL) { - snprintf(u64buf, sizeof(u64buf), "%lu", pRaftStore->currentTerm); + snprintf(u64buf, sizeof(u64buf), "" PRIu64 "", pRaftStore->currentTerm); cJSON_AddStringToObject(pRoot, "currentTerm", u64buf); cJSON *pVoteFor = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%lu", pRaftStore->voteFor.addr); + snprintf(u64buf, sizeof(u64buf), "" PRIu64 "", pRaftStore->voteFor.addr); cJSON_AddStringToObject(pVoteFor, "addr", u64buf); { uint64_t u64 = pRaftStore->voteFor.addr; @@ -216,7 +216,7 @@ cJSON *raftStore2Json(SRaftStore *pRaftStore) { char *raftStore2Str(SRaftStore *pRaftStore) { cJSON *pJson = raftStore2Json(pRaftStore); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -224,25 +224,25 @@ char *raftStore2Str(SRaftStore *pRaftStore) { // for debug ------------------- void raftStorePrint(SRaftStore *pObj) { char *serialized = raftStore2Str(pObj); - printf("raftStorePrint | len:%lu | %s \n", strlen(serialized), serialized); + printf("raftStorePrint | len:" PRIu64 " | %s \n", strlen(serialized), serialized); fflush(NULL); taosMemoryFree(serialized); } void raftStorePrint2(char *s, SRaftStore *pObj) { char *serialized = raftStore2Str(pObj); - printf("raftStorePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + printf("raftStorePrint2 | len:" PRIu64 " | %s | %s \n", strlen(serialized), s, serialized); fflush(NULL); taosMemoryFree(serialized); } void raftStoreLog(SRaftStore *pObj) { char *serialized = raftStore2Str(pObj); - sTrace("raftStoreLog | len:%lu | %s", strlen(serialized), serialized); + sTrace("raftStoreLog | len:" PRIu64 " | %s", strlen(serialized), serialized); taosMemoryFree(serialized); } void raftStoreLog2(char *s, SRaftStore *pObj) { char *serialized = raftStore2Str(pObj); - sTrace("raftStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + sTrace("raftStoreLog2 | len:" PRIu64 " | %s | %s", strlen(serialized), s, serialized); taosMemoryFree(serialized); } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 968026a3aa..c55b00003c 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -315,15 +315,18 @@ int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, c int32_t ret = 0; do { - char host[128]; + char host[64]; uint16_t port; syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port); - sDebug("vgId:%d, send sync-append-entries to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64 - ", pterm:%" PRIu64 ", commit:%" PRId64 - ", " - "datalen:%d}", - pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, - pMsg->commitIndex, pMsg->dataLen); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "send sync-append-entries to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64 + ", pterm:%" PRIu64 ", commit:%" PRId64 + ", " + "datalen:%d}", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, + pMsg->dataLen); + syncNodeEventLog(pSyncNode, logBuf); } while (0); SRpcMsg rpcMsg; @@ -335,13 +338,16 @@ int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, c int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntriesBatch* pMsg) { do { - char host[128]; + char host[64]; uint16_t port; syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port); - sDebug("vgId:%d, send sync-append-entries-batch to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 - ", pre-term:%" PRIu64 ", pterm:%" PRIu64 ", commit:%" PRId64 ", datalen:%d, datacount:%d}", - pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, - pMsg->commitIndex, pMsg->dataLen, pMsg->dataCount); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "send sync-append-entries-batch to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64 + ", pterm:%" PRIu64 ", commit:%" PRId64 ", datalen:%d, datacount:%d}", + pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, + pMsg->commitIndex, pMsg->dataLen, pMsg->dataCount); + syncNodeEventLog(pSyncNode, logBuf); } while (0); SRpcMsg rpcMsg; From 7808fdfccbfd98567e8cf69ae03905625b33093d Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 20 Jul 2022 17:19:42 +0800 Subject: [PATCH 2/6] refactor(sync): add trace log --- source/libs/sync/inc/syncInt.h | 16 + source/libs/sync/src/syncAppendEntries.c | 340 ++---------------- source/libs/sync/src/syncAppendEntriesReply.c | 186 ++-------- source/libs/sync/src/syncElection.c | 13 +- source/libs/sync/src/syncMain.c | 124 ++++++- source/libs/sync/src/syncReplication.c | 29 +- source/libs/sync/src/syncRequestVote.c | 52 +-- source/libs/sync/src/syncRequestVoteReply.c | 76 +--- 8 files changed, 229 insertions(+), 607 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 3a30cf801e..64f66e390a 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -257,6 +257,22 @@ int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode); int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader); int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry); +// trace log +void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); +void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); + +void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s); +void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s); + +void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s); +void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s); + +void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s); +void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s); + +void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); +void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); + // for debug -------------- void syncNodePrint(SSyncNode* pObj); void syncNodePrint2(char* s, SSyncNode* pObj); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 0e26d1ea65..50c66172da 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -94,21 +94,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 - ", commit:" PRId64 ", pterm:" PRIu64 - ", " - "datalen:%d}, maybe replica already dropped", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, - pMsg->dataLen); - syncNodeErrorLog(ths, logBuf); - } while (0); - + syncLogRecvAppendEntries(ths, pMsg, "maybe replica already dropped"); return -1; } @@ -125,30 +111,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { } ASSERT(pMsg->dataLen >= 0); - do { - // return to follower state - if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) { - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 - ", commit:" PRId64 ", pterm:" PRIu64 - ", " - "datalen:%d}, candidate to follower", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, - pMsg->dataLen); - syncNodeEventLog(ths, logBuf); - } while (0); - - syncNodeBecomeFollower(ths, "from candidate by append entries"); - - // ret or reply? - return -1; - } - } while (0); + // return to follower state + if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) { + syncLogRecvAppendEntries(ths, pMsg, "candidate to follower"); + syncNodeBecomeFollower(ths, "from candidate by append entries"); + return -1; // ret or reply? + } SyncTerm localPreLogTerm = 0; if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { @@ -172,20 +140,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // reject request if ((pMsg->term < ths->pRaftStore->currentTerm) || ((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) { - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 - ", commit:" PRId64 ", pterm:" PRIu64 - ", " - "datalen:%d}, reject", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, - pMsg->dataLen); - syncNodeEventLog(ths, logBuf); - } while (0); + syncLogRecvAppendEntries(ths, pMsg, "reject"); SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); pReply->srcId = ths->myRaftId; @@ -195,17 +150,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { pReply->matchIndex = SYNC_INDEX_INVALID; // msg event log - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 - ", success:%d, match-index:%" PRId64 "}", - host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); - syncNodeEventLog(ths, logBuf); - } while (0); + syncLogSendAppendEntriesReply(ths, pReply, ""); SRpcMsg rpcMsg; syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); @@ -226,20 +171,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // has entries in SyncAppendEntries msg bool hasAppendEntries = pMsg->dataLen > 0; - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 - ", commit:" PRId64 ", pterm:" PRIu64 - ", " - "datalen:%d}, accept", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, - pMsg->dataLen); - syncNodeEventLog(ths, logBuf); - } while (0); + syncLogRecvAppendEntries(ths, pMsg, "accept"); if (hasExtraEntries && hasAppendEntries) { // not conflict by default @@ -389,17 +321,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { } // msg event log - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 - ", success:%d, match-index:%" PRId64 "}", - host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); - syncNodeEventLog(ths, logBuf); - } while (0); + syncLogSendAppendEntriesReply(ths, pReply, ""); SRpcMsg rpcMsg; syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); @@ -602,22 +524,8 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries-batch from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 - ", commit:" PRId64 ", pterm:" PRIu64 - ", " - "count:%d}, maybe replica already dropped", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, - pMsg->dataCount); - syncNodeErrorLog(ths, logBuf); - } while (0); - - return ret; + syncLogRecvAppendEntriesBatch(ths, pMsg, "maybe replica already dropped"); + return -1; } // maybe update term @@ -640,28 +548,13 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc do { bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE; if (condition) { - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries-batch from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 - ", commit:" PRId64 ", pterm:" PRIu64 - ", " - "count:%d}, candidate to follower", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, - pMsg->dataCount); - syncNodeEventLog(ths, logBuf); - } while (0); - + syncLogRecvAppendEntriesBatch(ths, pMsg, "candidate to follower"); syncNodeBecomeFollower(ths, "from candidate by append entries"); - // do not reply? - return ret; + return 0; // do not reply? } } while (0); - // fake match2 + // fake match // // condition1: // preIndex <= my commit index @@ -673,20 +566,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && (pMsg->prevLogIndex <= ths->commitIndex); if (condition) { - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries-batch from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 - ", commit:" PRId64 ", pterm:" PRIu64 - ", " - "count:%d}, fake match2", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, - pMsg->dataCount); - syncNodeEventLog(ths, logBuf); - } while (0); + syncLogRecvAppendEntriesBatch(ths, pMsg, "fake match"); SyncIndex matchIndex = ths->commitIndex; bool hasAppendEntries = pMsg->dataLen > 0; @@ -739,17 +619,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc pReply->matchIndex = matchIndex; // msg event log - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 - ", success:%d, match-index:%" PRId64 "}", - host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); - syncNodeEventLog(ths, logBuf); - } while (0); + syncLogSendAppendEntriesReply(ths, pReply, ""); // send response SRpcMsg rpcMsg; @@ -782,20 +652,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc bool condition = condition1 || condition2; if (condition) { - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries-batch from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 - ", commit:" PRId64 ", pterm:" PRIu64 - ", " - "count:%d}, not match", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, - pMsg->dataCount); - syncNodeEventLog(ths, logBuf); - } while (0); + syncLogRecvAppendEntriesBatch(ths, pMsg, "not match"); // maybe update commit index by snapshot syncNodeMaybeUpdateCommitBySnapshot(ths); @@ -810,17 +667,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc pReply->matchIndex = ths->commitIndex; // msg event log - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 - ", success:%d, match-index:%" PRId64 "}", - host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); - syncNodeEventLog(ths, logBuf); - } while (0); + syncLogSendAppendEntriesReply(ths, pReply, ""); // send response SRpcMsg rpcMsg; @@ -851,20 +698,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc bool hasAppendEntries = pMsg->dataLen > 0; SOffsetAndContLen* metaTableArr = syncAppendEntriesBatchMetaTableArray(pMsg); - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries-batch from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 - ", commit:" PRId64 ", pterm:" PRIu64 - ", " - "count:%d}, match", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, - pMsg->dataCount); - syncNodeEventLog(ths, logBuf); - } while (0); + syncLogRecvAppendEntriesBatch(ths, pMsg, "really match"); if (hasExtraEntries) { // make log same, rollback deleted entries @@ -903,17 +737,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + pMsg->dataCount : pMsg->prevLogIndex; // msg event log - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 - ", success:%d, match-index:%" PRId64 "}", - host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); - syncNodeEventLog(ths, logBuf); - } while (0); + syncLogSendAppendEntriesReply(ths, pReply, ""); // send response SRpcMsg rpcMsg; @@ -966,22 +790,8 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 - ", commit:" PRId64 ", pterm:" PRIu64 - ", " - "datalen:%d}, maybe replica dropped", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, - pMsg->dataLen); - syncNodeErrorLog(ths, logBuf); - } while (0); - - return ret; + syncLogRecvAppendEntries(ths, pMsg, "maybe replica already dropped"); + return -1; } // maybe update term @@ -1004,24 +814,9 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs do { bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE; if (condition) { - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 - ", commit:" PRId64 ", pterm:" PRIu64 - ", " - "datalen:%d}, candidate to follower", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, - pMsg->dataLen); - syncNodeEventLog(ths, logBuf); - } while (0); - + syncLogRecvAppendEntries(ths, pMsg, "candidate to follower"); syncNodeBecomeFollower(ths, "from candidate by append entries"); - // do not reply? - return ret; + return 0; // do not reply? } } while (0); @@ -1084,7 +879,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs } while (0); #endif - // fake match2 + // fake match // // condition1: // preIndex <= my commit index @@ -1097,20 +892,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && (pMsg->prevLogIndex <= ths->commitIndex); if (condition) { - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 - ", commit:" PRId64 ", pterm:" PRIu64 - ", " - "datalen:%d}, fake match2", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, - pMsg->dataLen); - syncNodeEventLog(ths, logBuf); - } while (0); + syncLogRecvAppendEntries(ths, pMsg, "fake match"); SyncIndex matchIndex = ths->commitIndex; bool hasAppendEntries = pMsg->dataLen > 0; @@ -1156,17 +938,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs pReply->matchIndex = matchIndex; // msg event log - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 - ", success:%d, match-index:%" PRId64 "}", - host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); - syncNodeEventLog(ths, logBuf); - } while (0); + syncLogSendAppendEntriesReply(ths, pReply, ""); // send response SRpcMsg rpcMsg; @@ -1199,20 +971,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs bool condition = condition1 || condition2; if (condition) { - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 - ", commit:" PRId64 ", pterm:" PRIu64 - ", " - "datalen:%d}, not match", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, - pMsg->dataLen); - syncNodeEventLog(ths, logBuf); - } while (0); + syncLogRecvAppendEntries(ths, pMsg, "not match"); // prepare response msg SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); @@ -1224,17 +983,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs pReply->matchIndex = SYNC_INDEX_INVALID; // msg event log - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 - ", success:%d, match-index:%" PRId64 "}", - host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); - syncNodeEventLog(ths, logBuf); - } while (0); + syncLogSendAppendEntriesReply(ths, pReply, ""); // send response SRpcMsg rpcMsg; @@ -1264,20 +1013,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs // has entries in SyncAppendEntries msg bool hasAppendEntries = pMsg->dataLen > 0; - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 - ", commit:" PRId64 ", pterm:" PRIu64 - ", " - "datalen:%d}, match", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, - pMsg->dataLen); - syncNodeEventLog(ths, logBuf); - } while (0); + syncLogRecvAppendEntries(ths, pMsg, "really match"); if (hasExtraEntries) { // make log same, rollback deleted entries @@ -1312,17 +1048,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + 1 : pMsg->prevLogIndex; // msg event log - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 - ", success:%d, match-index:%" PRId64 "}", - host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex); - syncNodeEventLog(ths, logBuf); - } while (0); + syncLogSendAppendEntriesReply(ths, pReply, ""); // send response SRpcMsg rpcMsg; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 5149b47147..81d050e179 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -42,36 +42,13 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 - "}, maybe replica " - "already dropped", - host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex); - syncNodeErrorLog(ths, logBuf); - } while (0); - + syncLogRecvAppendEntriesReply(ths, pMsg, "maybe replica already dropped"); return -1; } // drop stale response if (pMsg->term < ths->pRaftStore->currentTerm) { - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 - "}, drop stale response", - host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex); - syncNodeEventLog(ths, logBuf); - } while (0); - + syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response"); return 0; } @@ -81,23 +58,15 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p // } if (pMsg->term > ths->pRaftStore->currentTerm) { - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 - "}, error term", - host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex); - syncNodeErrorLog(ths, logBuf); - } while (0); - + syncLogRecvAppendEntriesReply(ths, pMsg, "error term"); return -1; } ASSERT(pMsg->term == ths->pRaftStore->currentTerm); + SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); + SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); + if (pMsg->success) { // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); @@ -120,20 +89,13 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); } + SyncIndex afterNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); + SyncIndex afterMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); - SyncIndex matchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 - "}, after next:" PRId64 - ", " - "match:" PRId64 "", - host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, nextIndex, matchIndex); - syncNodeEventLog(ths, logBuf); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "before next:%ld, match:%ld, after next:%ld, match:%ld", beforeNextIndex, + beforeMatchIndex, afterNextIndex, afterMatchIndex); + syncLogRecvAppendEntriesReply(ths, pMsg, logBuf); } while (0); return 0; @@ -179,58 +141,27 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 - "}, maybe replica " - "already dropped", - host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex); - syncNodeErrorLog(ths, logBuf); - } while (0); - + syncLogRecvAppendEntriesReply(ths, pMsg, "maybe replica already dropped"); return -1; } // drop stale response if (pMsg->term < ths->pRaftStore->currentTerm) { - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 - "}, drop stale response", - host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex); - syncNodeEventLog(ths, logBuf); - } while (0); - + syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response"); return 0; } // error term if (pMsg->term > ths->pRaftStore->currentTerm) { - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 - "}, error term", - host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex); - syncNodeErrorLog(ths, logBuf); - } while (0); - + syncLogRecvAppendEntriesReply(ths, pMsg, "error term"); return -1; } ASSERT(pMsg->term == ths->pRaftStore->currentTerm); + SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); + SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); + if (pMsg->success) { SyncIndex newNextIndex = pMsg->matchIndex + 1; SyncIndex newMatchIndex = pMsg->matchIndex; @@ -343,20 +274,13 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie } while (0); } + SyncIndex afterNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); + SyncIndex afterMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); - SyncIndex matchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 - "}, after next:" PRId64 - ", " - "match:" PRId64 "", - host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, nextIndex, matchIndex); - syncNodeEventLog(ths, logBuf); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "before next:%ld, match:%ld, after next:%ld, match:%ld", beforeNextIndex, + beforeMatchIndex, afterNextIndex, afterMatchIndex); + syncLogRecvAppendEntriesReply(ths, pMsg, logBuf); } while (0); return 0; @@ -367,36 +291,13 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 - "}, maybe replica " - "already dropped", - host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex); - syncNodeErrorLog(ths, logBuf); - } while (0); - + syncLogRecvAppendEntriesReply(ths, pMsg, "maybe replica already dropped"); return -1; } // drop stale response if (pMsg->term < ths->pRaftStore->currentTerm) { - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 - "}, drop stale response", - host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex); - syncNodeEventLog(ths, logBuf); - } while (0); - + syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response"); return 0; } @@ -406,23 +307,15 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries // } if (pMsg->term > ths->pRaftStore->currentTerm) { - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 - "}, error term", - host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex); - syncNodeErrorLog(ths, logBuf); - } while (0); - + syncLogRecvAppendEntriesReply(ths, pMsg, "error term"); return -1; } ASSERT(pMsg->term == ths->pRaftStore->currentTerm); + SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); + SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); + if (pMsg->success) { // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); @@ -490,20 +383,13 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries } } + SyncIndex afterNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); + SyncIndex afterMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); - SyncIndex matchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 - "}, after next:" PRId64 - ", " - "match:" PRId64 "", - host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, nextIndex, matchIndex); - syncNodeEventLog(ths, logBuf); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "before next:%ld, match:%ld, after next:%ld, match:%ld", beforeNextIndex, + beforeMatchIndex, afterNextIndex, afterMatchIndex); + syncLogRecvAppendEntriesReply(ths, pMsg, logBuf); } while (0); return 0; diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 53a708c6c2..375f2e5730 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -120,18 +120,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) { int32_t ret = 0; - - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "send sync-request-vote to %s:%d {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "", host, port, - pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm); - syncNodeEventLog(pSyncNode, logBuf); - - } while (0); + syncLogSendRequestVote(pSyncNode, pMsg, ""); SRpcMsg rpcMsg; syncRequestVote2RpcMsg(pMsg, &rpcMsg); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index c91fce57e3..1c2d4c793c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2925,4 +2925,126 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) { } return true; -} \ No newline at end of file +} + +void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "send sync-request-vote to %s:%d {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "}, %s", host, port, + pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s); + syncNodeEventLog(pSyncNode, logBuf); +} + +void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) { + char logBuf[256]; + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + snprintf(logBuf, sizeof(logBuf), + "recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "}, %s", host, + port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s); + syncNodeEventLog(pSyncNode, logBuf); +} + +void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "send sync-request-vote-reply to %s:%d {term:%" PRIu64 ", grant:%d}, %s", host, port, + pMsg->term, pMsg->voteGranted, s); + syncNodeEventLog(pSyncNode, logBuf); +} + +void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, %s", host, + port, pMsg->term, pMsg->voteGranted, s); + syncNodeEventLog(pSyncNode, logBuf); +} + +void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "send sync-append-entries to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64 + ", pterm:%" PRIu64 ", commit:%" PRId64 + ", " + "datalen:%d}, %s", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, + pMsg->dataLen, s); + syncNodeEventLog(pSyncNode, logBuf); +} + +void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 + ", commit:" PRId64 ", pterm:" PRIu64 + ", " + "datalen:%d}, %s", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, + pMsg->dataLen, s); + syncNodeErrorLog(pSyncNode, logBuf); +} + +void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "send sync-append-entries-batch to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64 + ", pterm:%" PRIu64 ", commit:%" PRId64 ", datalen:%d, count:%d}, %s", + pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, + pMsg->commitIndex, pMsg->dataLen, pMsg->dataCount, s); + syncNodeEventLog(pSyncNode, logBuf); +} + +void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries-batch from %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64 + ", pterm:%" PRIu64 ", commit:%" PRId64 ", datalen:%d, count:%d}, %s", + pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, + pMsg->commitIndex, pMsg->dataLen, pMsg->dataCount, s); + syncNodeErrorLog(pSyncNode, logBuf); +} + +void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 ", success:%d, match:%" PRId64 + "}, %s", + host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s); + syncNodeEventLog(pSyncNode, logBuf); +} + +void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 + "}, %s", + host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s); + syncNodeErrorLog(pSyncNode, logBuf); +} diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index c55b00003c..fa3b5d52d7 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -313,21 +313,7 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) { int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) { int32_t ret = 0; - - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "send sync-append-entries to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64 - ", pterm:%" PRIu64 ", commit:%" PRId64 - ", " - "datalen:%d}", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, - pMsg->dataLen); - syncNodeEventLog(pSyncNode, logBuf); - } while (0); + syncLogSendAppendEntries(pSyncNode, pMsg, ""); SRpcMsg rpcMsg; syncAppendEntries2RpcMsg(pMsg, &rpcMsg); @@ -337,18 +323,7 @@ int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, c int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntriesBatch* pMsg) { - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "send sync-append-entries-batch to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64 - ", pterm:%" PRIu64 ", commit:%" PRId64 ", datalen:%d, datacount:%d}", - pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, - pMsg->commitIndex, pMsg->dataLen, pMsg->dataCount); - syncNodeEventLog(pSyncNode, logBuf); - } while (0); + syncLogSendAppendEntriesBatch(pSyncNode, pMsg, ""); SRpcMsg rpcMsg; syncAppendEntriesBatch2RpcMsg(pMsg, &rpcMsg); diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 3eaec65cfe..bad32c5f91 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -47,18 +47,7 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - do { - char logBuf[256]; - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - snprintf(logBuf, sizeof(logBuf), - "recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 - "}, maybe replica already dropped", - host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm); - syncNodeEventLog(ths, logBuf); - } while (0); - + syncLogRecvRequestVote(ths, pMsg, "maybe replica already dropped"); return -1; } @@ -91,15 +80,10 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { // trace log do { - char logBuf[256]; - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - snprintf(logBuf, sizeof(logBuf), - "recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 - "}, reply-grant:%d", - host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted); - syncNodeEventLog(ths, logBuf); + char logBuf[32]; + snprintf(logBuf, sizeof(logBuf), "grant:%d", pReply->voteGranted); + syncLogRecvRequestVote(ths, pMsg, logBuf); + syncLogSendRequestVoteReply(ths, pReply, ""); } while (0); SRpcMsg rpcMsg; @@ -212,18 +196,7 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) { // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - do { - char logBuf[256]; - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - snprintf(logBuf, sizeof(logBuf), - "recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 - "}, maybe replica already dropped", - host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm); - syncNodeEventLog(ths, logBuf); - } while (0); - + syncLogRecvRequestVote(ths, pMsg, "maybe replica already dropped"); return -1; } @@ -254,15 +227,10 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) { // trace log do { - char logBuf[256]; - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - snprintf(logBuf, sizeof(logBuf), - "recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 - "}, reply-grant:%d", - host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted); - syncNodeEventLog(ths, logBuf); + char logBuf[32]; + snprintf(logBuf, sizeof(logBuf), "grant:%d", pReply->voteGranted); + syncLogRecvRequestVote(ths, pMsg, logBuf); + syncLogSendRequestVoteReply(ths, pReply, ""); } while (0); SRpcMsg rpcMsg; diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 9ae70ca8da..566b80881f 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -40,40 +40,15 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { int32_t ret = 0; - // trace log - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host, - port, pMsg->term, pMsg->voteGranted); - syncNodeEventLog(ths, logBuf); - } while (0); - // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port, - pMsg->term, pMsg->voteGranted); - syncNodeErrorLog(ths, logBuf); + syncLogRecvRequestVoteReply(ths, pMsg, "maybe replica already dropped"); return -1; } // drop stale response if (pMsg->term < ths->pRaftStore->currentTerm) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port, - pMsg->term, pMsg->voteGranted); - syncNodeErrorLog(ths, logBuf); + syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response"); return -1; } @@ -84,16 +59,11 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) // } if (pMsg->term > ths->pRaftStore->currentTerm) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term", - host, port, pMsg->term, pMsg->voteGranted); - syncNodeErrorLog(ths, logBuf); + syncLogRecvRequestVoteReply(ths, pMsg, "error term"); return -1; } + syncLogRecvRequestVoteReply(ths, pMsg, ""); ASSERT(pMsg->term == ths->pRaftStore->currentTerm); // This tallies votes even when the current state is not Candidate, @@ -185,40 +155,15 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { int32_t ret = 0; - // trace log - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host, - port, pMsg->term, pMsg->voteGranted); - syncNodeEventLog(ths, logBuf); - } while (0); - // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port, - pMsg->term, pMsg->voteGranted); - syncNodeErrorLog(ths, logBuf); + syncLogRecvRequestVoteReply(ths, pMsg, "maybe replica already dropped"); return -1; } // drop stale response if (pMsg->term < ths->pRaftStore->currentTerm) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port, - pMsg->term, pMsg->voteGranted); - syncNodeErrorLog(ths, logBuf); + syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response"); return -1; } @@ -229,16 +174,11 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl // } if (pMsg->term > ths->pRaftStore->currentTerm) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term", - host, port, pMsg->term, pMsg->voteGranted); - syncNodeErrorLog(ths, logBuf); + syncLogRecvRequestVoteReply(ths, pMsg, "error term"); return -1; } + syncLogRecvRequestVoteReply(ths, pMsg, ""); ASSERT(pMsg->term == ths->pRaftStore->currentTerm); // This tallies votes even when the current state is not Candidate, From 94b3e9d2f097d01335235b19a4ec5764e5e490fa Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 20 Jul 2022 19:00:55 +0800 Subject: [PATCH 3/6] refactor(sync): add trace log --- source/libs/sync/src/syncMain.c | 6 +++--- source/libs/sync/src/syncRaftStore.c | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 1c2d4c793c..1cefdf73ef 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2990,8 +2990,8 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64 - ", commit:" PRId64 ", pterm:" PRIu64 + "recv sync-append-entries from %s:%d {term:%" PRIu64 ", pre-index:%" PRIu64 ", pre-term:%" PRIu64 + ", commit:%" PRIu64 ", pterm:%" PRIu64 ", " "datalen:%d}, %s", host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, @@ -3043,7 +3043,7 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64 + "recv sync-append-entries-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 ", success:%d, match:%" PRIu64 "}, %s", host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s); syncNodeErrorLog(pSyncNode, logBuf); diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index e7fa757c36..f552570337 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -224,25 +224,25 @@ char *raftStore2Str(SRaftStore *pRaftStore) { // for debug ------------------- void raftStorePrint(SRaftStore *pObj) { char *serialized = raftStore2Str(pObj); - printf("raftStorePrint | len:" PRIu64 " | %s \n", strlen(serialized), serialized); + printf("raftStorePrint | len:%" PRIu64 " | %s \n", strlen(serialized), serialized); fflush(NULL); taosMemoryFree(serialized); } void raftStorePrint2(char *s, SRaftStore *pObj) { char *serialized = raftStore2Str(pObj); - printf("raftStorePrint2 | len:" PRIu64 " | %s | %s \n", strlen(serialized), s, serialized); + printf("raftStorePrint2 | len:%" PRIu64 " | %s | %s \n", strlen(serialized), s, serialized); fflush(NULL); taosMemoryFree(serialized); } void raftStoreLog(SRaftStore *pObj) { char *serialized = raftStore2Str(pObj); - sTrace("raftStoreLog | len:" PRIu64 " | %s", strlen(serialized), serialized); + sTrace("raftStoreLog | len:%" PRIu64 " | %s", strlen(serialized), serialized); taosMemoryFree(serialized); } void raftStoreLog2(char *s, SRaftStore *pObj) { char *serialized = raftStore2Str(pObj); - sTrace("raftStoreLog2 | len:" PRIu64 " | %s | %s", strlen(serialized), s, serialized); + sTrace("raftStoreLog2 | len:%" PRIu64 " | %s | %s", strlen(serialized), s, serialized); taosMemoryFree(serialized); } From 3491896b7a24477a564b2cd34df0bfd079c4fa9d Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 20 Jul 2022 19:12:02 +0800 Subject: [PATCH 4/6] refactor(sync): add trace log --- source/libs/sync/src/syncMain.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 1cefdf73ef..b769bf5273 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -3043,7 +3043,7 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 ", success:%d, match:%" PRIu64 + "recv sync-append-entries-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 ", success:%d, match:%" PRId64 "}, %s", host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s); syncNodeErrorLog(pSyncNode, logBuf); From b77e0a6750a8f765e5683418254a2fb345dddceb Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 20 Jul 2022 19:58:26 +0800 Subject: [PATCH 5/6] refactor(sync): add trace log --- source/libs/sync/src/syncIndexMgr.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index a9c1147fc1..39bede23f6 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -79,8 +79,7 @@ SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaf } } - syncNodeLog3("syncIndexMgrGetIndex", pSyncIndexMgr->pSyncNode); - ASSERT(0); + return SYNC_INDEX_INVALID; } cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) { @@ -126,7 +125,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) { char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) { cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } From 540f519ad75665e0f617d640a58dcf310c22aafe Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 20 Jul 2022 20:24:49 +0800 Subject: [PATCH 6/6] refactor(sync): add trace log --- source/libs/sync/src/syncMain.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index b769bf5273..94f22c3601 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -3007,8 +3007,8 @@ void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries snprintf(logBuf, sizeof(logBuf), "send sync-append-entries-batch to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64 ", pterm:%" PRIu64 ", commit:%" PRId64 ", datalen:%d, count:%d}, %s", - pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, - pMsg->commitIndex, pMsg->dataLen, pMsg->dataCount, s); + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, + pMsg->dataLen, pMsg->dataCount, s); syncNodeEventLog(pSyncNode, logBuf); } @@ -3020,8 +3020,8 @@ void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-batch from %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64 ", pterm:%" PRIu64 ", commit:%" PRId64 ", datalen:%d, count:%d}, %s", - pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, - pMsg->commitIndex, pMsg->dataLen, pMsg->dataCount, s); + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, + pMsg->dataLen, pMsg->dataCount, s); syncNodeErrorLog(pSyncNode, logBuf); }