From a8383369ba3959eb77b6069bfefdb2489fdf6874 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Tue, 28 May 2024 15:13:58 +0800 Subject: [PATCH] enh: arb check roletime before check sync --- include/dnode/mnode/mnode.h | 2 ++ source/dnode/mnode/impl/src/mndArbGroup.c | 9 ++++++- source/dnode/mnode/impl/src/mndMain.c | 31 ++++++++++++++--------- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 108e6f18a6..fe96fe1117 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -101,6 +101,8 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr */ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); +int64_t mndGetRoleTimeMs(SMnode *pMnode); + /** * @brief Process the rpc, sync request. * diff --git a/source/dnode/mnode/impl/src/mndArbGroup.c b/source/dnode/mnode/impl/src/mndArbGroup.c index 50338fe889..b00da9ba3f 100644 --- a/source/dnode/mnode/impl/src/mndArbGroup.c +++ b/source/dnode/mnode/impl/src/mndArbGroup.c @@ -540,6 +540,14 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) { return -1; } + int64_t roleTimeMs = mndGetRoleTimeMs(pMnode); + int64_t nowMs = taosGetTimestampMs(); + if (nowMs - roleTimeMs < tsArbHeartBeatIntervalSec * 1000 * 2) { + mInfo("arb skip to check sync since mnd had just switch over, roleTime:%" PRId64 " now:%" PRId64, roleTimeMs, + nowMs); + return 0; + } + SArray *pUpdateArray = taosArrayInit(16, sizeof(SArbGroup)); while (1) { @@ -551,7 +559,6 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) { taosThreadMutexUnlock(&pArbGroup->mutex); int32_t vgId = arbGroupDup.vgId; - int64_t nowMs = taosGetTimestampMs(); bool member0IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 0, nowMs); bool member1IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 1, nowMs); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index a78edcb05e..850c527a14 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -334,6 +334,8 @@ static int32_t minCronTime() { min = TMIN(min, tsStreamCheckpointInterval); min = TMIN(min, 6); // checkpointRemain min = TMIN(min, tsStreamNodeCheckInterval); + min = TMIN(min, tsArbHeartBeatIntervalSec); + min = TMIN(min, tsArbCheckSyncIntervalSec); int64_t telemInt = TMIN(60, (tsTelemInterval - 1)); min = TMIN(min, telemInt); @@ -390,6 +392,18 @@ void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) { if (sec % tsUptimeInterval == 0) { mndIncreaseUpTime(pMnode); } + + if (sec % (tsArbHeartBeatIntervalSec) == 0) { + if (mndPullupArbHeartbeat(pMnode) != 0) { + mError("failed to pullup arb heartbeat, since:%s", terrstr()); + } + } + + if (sec % (tsArbCheckSyncIntervalSec) == 0) { + if (mndPullupArbCheckSync(pMnode) != 0) { + mError("failed to pullup arb check sync, since:%s", terrstr()); + } + } } void mndDoTimerCheckTask(SMnode *pMnode, int64_t sec) { if (sec % (tsStatusInterval * 5) == 0) { @@ -421,18 +435,6 @@ static void *mndThreadFp(void *param) { continue; } mndDoTimerPullupTask(pMnode, sec); - - if (sec % (tsArbHeartBeatIntervalSec) == 0) { - if (mndPullupArbHeartbeat(pMnode) != 0) { - mError("failed to pullup arb heartbeat, since:%s", terrstr()); - } - } - - if (sec % (tsArbCheckSyncIntervalSec) == 0) { - if (mndPullupArbCheckSync(pMnode) != 0) { - mError("failed to pullup arb check sync, since:%s", terrstr()); - } - } } return NULL; @@ -1076,6 +1078,11 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) { return 0; } +int64_t mndGetRoleTimeMs(SMnode *pMnode) { + SSyncState state = syncGetState(pMnode->syncMgmt.sync); + return state.roleTimeMs; +} + void mndSetRestored(SMnode *pMnode, bool restored) { if (restored) { taosThreadRwlockWrlock(&pMnode->lock);