From 984af8a877511c7f4f88b548a1abe7503021ea9b Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 21 Oct 2022 15:13:51 +0800 Subject: [PATCH 1/3] refactor(sync): if data is null, do not update state mgr --- source/libs/sync/inc/syncInt.h | 1 + source/libs/sync/src/syncMain.c | 46 ++++++++++++++++++++++---- source/libs/sync/src/syncReplication.c | 6 ++-- 3 files changed, 45 insertions(+), 8 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 635ebd6308..a158430a0f 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -248,6 +248,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode); bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config); void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex); SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode); +char* syncNodePeerState2Str(const SSyncNode* pSyncNode); SSyncNode* syncNodeAcquire(int64_t rid); void syncNodeRelease(SSyncNode* pNode); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 93742a6d5e..e72c1aca54 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -322,6 +322,38 @@ SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) { return minMatchIndex; } +char* syncNodePeerState2Str(const SSyncNode* pSyncNode) { + int32_t len = 128; + int32_t useLen = 0; + int32_t leftLen = len - useLen; + char* pStr = taosMemoryMalloc(len); + memset(pStr, 0, len); + + char* p = pStr; + int32_t use = snprintf(p, leftLen, "{"); + useLen += use; + leftLen -= use; + + for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) { + SPeerState* pState = syncNodeGetPeerState((SSyncNode*)pSyncNode, &(pSyncNode->replicasId[i])); + ASSERT(pState != NULL); + + p = pStr + useLen; + use = snprintf(p, leftLen, "%d:%ld,%ld, ", i, pState->lastSendIndex, pState->lastSendTime); + useLen += use; + leftLen -= use; + } + + p = pStr + useLen; + use = snprintf(p, leftLen, "}"); + useLen += use; + leftLen -= use; + + // sTrace("vgId:%d, ------------------ syncNodePeerState2Str:%s", pSyncNode->vgId, pStr); + + return pStr; +} + int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { @@ -1822,8 +1854,6 @@ char* syncNode2Str(const SSyncNode* pSyncNode) { } inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { - int32_t userStrLen = strlen(str); - SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); @@ -1842,6 +1872,9 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { printStr = pCfgStr; } + char* peerStateStr = syncNodePeerState2Str(pSyncNode); + int32_t userStrLen = strlen(str) + strlen(peerStateStr); + if (userStrLen < 256) { char logBuf[256 + 256]; if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { @@ -1851,13 +1884,13 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { ", sby:%d, " "stgy:%d, bch:%d, " "r-num:%d, " - "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s", + "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode), - pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, printStr); + pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr); } else { snprintf(logBuf, sizeof(logBuf), "%s", str); } @@ -1875,19 +1908,20 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { ", sby:%d, " "stgy:%d, bch:%d, " "r-num:%d, " - "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s", + "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode), - pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, printStr); + pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr); } else { snprintf(s, len, "%s", str); } // sDebug("%s", s); // sInfo("%s", s); sTrace("%s", s); + taosMemoryFree(peerStateStr); taosMemoryFree(s); } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 431e1b46d0..4231033e5d 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -165,8 +165,10 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI SPeerState* pState = syncNodeGetPeerState(pSyncNode, destRaftId); ASSERT(pState != NULL); - pState->lastSendIndex = pMsg->prevLogIndex + 1; - pState->lastSendTime = taosGetTimestampMs(); + if (pMsg->dataLen > 0) { + pState->lastSendIndex = pMsg->prevLogIndex + 1; + pState->lastSendTime = taosGetTimestampMs(); + } return ret; } From 3c94d614935505c2df679572f2bd48a259a715ad Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 21 Oct 2022 15:22:34 +0800 Subject: [PATCH 2/3] refacotr(sync): judge peer state null --- source/libs/sync/src/syncMain.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index e72c1aca54..b30717a1ef 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -3453,7 +3453,9 @@ SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) { bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) { SPeerState* pState = syncNodeGetPeerState(ths, pDestId); - ASSERT(pState != NULL); + if (pState == NULL) { + return false; + } SyncIndex sendIndex = pMsg->prevLogIndex + 1; int64_t tsNow = taosGetTimestampMs(); From b492082ac24dddfb9d2097f08c96d25d439eae04 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 21 Oct 2022 15:31:55 +0800 Subject: [PATCH 3/3] refacotr(sync): delete DynamicQuorum --- source/libs/sync/src/syncCommit.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index f377b08671..b9d7789ca2 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -170,6 +170,8 @@ static inline int64_t syncNodeAbs64(int64_t a, int64_t b) { } int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) { + return pSyncNode->quorum; + int32_t quorum = 1; // self int64_t timeNow = taosGetTimestampMs();