From 1b36ad119c671180b499edfb73fc5d79372c7875 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 18 Oct 2022 15:24:00 +0800 Subject: [PATCH] refactor(sync): rename functions --- include/libs/sync/sync.h | 2 +- source/libs/sync/inc/syncInt.h | 2 +- source/libs/sync/src/syncAppendEntries.c | 2 +- source/libs/sync/src/syncCommit.c | 8 +++++--- source/libs/sync/src/syncMain.c | 17 ++++++++++++++++- 5 files changed, 24 insertions(+), 7 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 8c9e86f525..f3c771518a 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -36,6 +36,7 @@ extern bool gRaftDetailLog; #define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20) #define SYNC_MAX_RECV_TIME_RANGE_MS 1200 #define SYNC_ADD_QUORUM_COUNT 3 +#define SYNC_MNODE_MAX_LOG_NUM 10000 #define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000 @@ -203,7 +204,6 @@ void syncStop(int64_t rid); int32_t syncSetStandby(int64_t rid); ESyncState syncGetMyRole(int64_t rid); bool syncIsReady(int64_t rid); -bool syncIsReadyForRead(int64_t rid); const char* syncGetMyRoleStr(int64_t rid); bool syncRestoreFinish(int64_t rid); SyncTerm syncGetMyTerm(int64_t rid); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 6ef0988aab..3c3fbc7574 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -281,7 +281,7 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index); int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm); bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg); -int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag); +int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag); int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex); int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 5ea07249c6..414a4dd915 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -299,7 +299,7 @@ int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) { int32_t code = ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->commitIndex); ASSERT(code == 0); - code = syncNodeCommit(ths, beginIndex, endIndex, ths->state); + code = syncNodeDoCommit(ths, beginIndex, endIndex, ths->state); ASSERT(code == 0); } } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index f83b640876..8889fe02a0 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -45,8 +45,10 @@ // /\ UNCHANGED <> // void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { - syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex); - syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex); + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + syncNodeErrorLog(pSyncNode, "not leader, can not advance commit index"); + return; + } // advance commit index to sanpshot first SSnapshot snapshot; @@ -129,7 +131,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { // execute fsm if (pSyncNode->pFsm != NULL) { - int32_t code = syncNodeCommit(pSyncNode, beginIndex, endIndex, pSyncNode->state); + int32_t code = syncNodeDoCommit(pSyncNode, beginIndex, endIndex, pSyncNode->state); ASSERT(code == 0); } } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index dcd35c3e25..18345ea2fe 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -297,6 +297,20 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { ASSERT(rid == pSyncNode->rid); int32_t code = 0; + if (syncNodeIsMnode(pSyncNode)) { + SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore); + SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore); + int64_t logNum = endIndex - beginIndex; + if (logNum > SYNC_MNODE_MAX_LOG_NUM) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "mnode log num:%ld, do not delete", logNum); + syncNodeEventLog(pSyncNode, logBuf); + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return 0; + } + } + for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i])); if (lastApplyIndex > matchIndex) { @@ -311,6 +325,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { syncNodeEventLog(pSyncNode, logBuf); } while (0); + taosReleaseRef(tsNodeRefId, pSyncNode->rid); return 0; } } @@ -3117,7 +3132,7 @@ bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) { return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1); } -int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { +int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { if (beginIndex > endIndex) { return 0; }