From 220cef4a61c1c2a8d857c812549b2ab378e6dd47 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Tue, 9 Jan 2024 11:02:10 +0000 Subject: [PATCH] opt msg on mnode --- source/dnode/mnode/impl/src/mndMain.c | 164 +++++++++++++++++--------- 1 file changed, 109 insertions(+), 55 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 38a92b43e7..c0fd93c65d 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -16,6 +16,8 @@ #define _DEFAULT_SOURCE #include "mndAcct.h" #include "mndCluster.h" +#include "mndCompact.h" +#include "mndCompactDetail.h" #include "mndConsumer.h" #include "mndDb.h" #include "mndDnode.h" @@ -42,8 +44,6 @@ #include "mndUser.h" #include "mndVgroup.h" #include "mndView.h" -#include "mndCompact.h" -#include "mndCompactDetail.h" static inline int32_t mndAcquireRpc(SMnode *pMnode) { int32_t code = 0; @@ -158,16 +158,16 @@ static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) { } } -static void mndStreamCheckpointRemain(SMnode* pMnode) { +static void mndStreamCheckpointRemain(SMnode *pMnode) { int32_t contLen = 0; - void *pReq = mndBuildCheckpointTickMsg(&contLen, 0); + void *pReq = mndBuildCheckpointTickMsg(&contLen, 0); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); } } -static void mndStreamCheckNode(SMnode* pMnode) { +static void mndStreamCheckNode(SMnode *pMnode) { int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { @@ -275,6 +275,100 @@ static void mndCheckDnodeOffline(SMnode *pMnode) { mndReleaseRpc(pMnode); } +static bool mnodeIsNotLeader(SMnode *pMnode) { + terrno = 0; + taosThreadRwlockRdlock(&pMnode->lock); + SSyncState state = syncGetState(pMnode->syncMgmt.sync); + if (terrno != 0) { + taosThreadRwlockUnlock(&pMnode->lock); + return true; + } + + if (state.state != TAOS_SYNC_STATE_LEADER) { + taosThreadRwlockUnlock(&pMnode->lock); + terrno = TSDB_CODE_SYN_NOT_LEADER; + return true; + } + if (!state.restored || !pMnode->restored) { + taosThreadRwlockUnlock(&pMnode->lock); + terrno = TSDB_CODE_SYN_RESTORING; + return true; + } + taosThreadRwlockUnlock(&pMnode->lock); + return false; +} + +static int32_t minCronTime() { + int64_t min = INT64_MAX; + min = TMIN(min, tsTtlPushIntervalSec); + min = TMIN(min, tsTrimVDbIntervalSec); + min = TMIN(min, tsTransPullupInterval); + min = TMIN(min, tsCompactPullupInterval); + min = TMIN(min, tsMqRebalanceInterval); + min = TMIN(min, tsStreamCheckpointInterval); + min = TMIN(min, 5); // checkpointRemain + min = TMIN(min, tsStreamNodeCheckInterval); + + int64_t telemInt = TMIN(60, (tsTelemInterval - 1)); + min = TMIN(min, telemInt); + min = TMIN(min, tsGrantHBInterval); + min = TMIN(min, tsUptimeInterval); + + return min <= 1 ? 2 : min; +} +void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) { + if (sec % tsTtlPushIntervalSec == 0) { + mndPullupTtl(pMnode); + } + + if (sec % tsTrimVDbIntervalSec == 0) { + mndPullupTrimDb(pMnode); + } + + if (sec % tsTransPullupInterval == 0) { + mndPullupTrans(pMnode); + } + + if (sec % tsCompactPullupInterval == 0) { + mndPullupCompacts(pMnode); + } + + if (sec % tsMqRebalanceInterval == 0) { + mndCalMqRebalance(pMnode); + } + + if (sec % tsStreamCheckpointInterval == 0) { + mndStreamCheckpointTick(pMnode, sec); + } + + if (sec % 5 == 0) { + mndStreamCheckpointRemain(pMnode); + } + + if (sec % tsStreamNodeCheckInterval == 0) { + mndStreamCheckNode(pMnode); + } + + if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) { + mndPullupTelem(pMnode); + } + + if (sec % tsGrantHBInterval == 0) { + mndPullupGrant(pMnode); + } + + if (sec % tsUptimeInterval == 0) { + mndIncreaseUpTime(pMnode); + } +} +void mndDoTimerCheckTask(SMnode *pMnode, int64_t sec) { + if (sec % (tsStatusInterval * 5) == 0) { + mndCheckDnodeOffline(pMnode); + } + if (sec % (MNODE_TIMEOUT_SEC / 2) == 0) { + mndSyncCheckTimeout(pMnode); + } +} static void *mndThreadFp(void *param) { SMnode *pMnode = param; int64_t lastTime = 0; @@ -287,57 +381,15 @@ static void *mndThreadFp(void *param) { if (lastTime % 10 != 0) continue; int64_t sec = lastTime / 10; - if (sec % tsTtlPushIntervalSec == 0) { - mndPullupTtl(pMnode); - } + mndDoTimerCheckTask(pMnode, sec); - if (sec % tsTrimVDbIntervalSec == 0) { - mndPullupTrimDb(pMnode); - } - - if (sec % tsTransPullupInterval == 0) { - mndPullupTrans(pMnode); - } - - if (sec % tsCompactPullupInterval == 0) { - mndPullupCompacts(pMnode); - } - - if (sec % tsMqRebalanceInterval == 0) { - mndCalMqRebalance(pMnode); - } - - if (sec % tsStreamCheckpointInterval == 0) { - mndStreamCheckpointTick(pMnode, sec); - } - - if (sec % 5 == 0) { - mndStreamCheckpointRemain(pMnode); - } - - if (sec % tsStreamNodeCheckInterval == 0) { - mndStreamCheckNode(pMnode); - } - - if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) { - mndPullupTelem(pMnode); - } - - if (sec % tsGrantHBInterval == 0) { - mndPullupGrant(pMnode); - } - - if (sec % tsUptimeInterval == 0) { - mndIncreaseUpTime(pMnode); - } - - if (sec % (tsStatusInterval * 5) == 0) { - mndCheckDnodeOffline(pMnode); - } - - if (sec % (MNODE_TIMEOUT_SEC / 2) == 0) { - mndSyncCheckTimeout(pMnode); + int64_t minCron = minCronTime(); + if (sec % minCron == 0 && mnodeIsNotLeader(pMnode)) { + // not leader, do nothing + mTrace("timer not process since mnode is not leader, reason: %s", tstrerror(terrno)) terrno = 0; + continue; } + mndDoTimerPullupTask(pMnode, sec); } return NULL; @@ -712,7 +764,9 @@ _OVER: if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER || pMsg->msgType == TDMT_MND_TRIM_DB_TIMER || pMsg->msgType == TDMT_MND_UPTIME_TIMER || - pMsg->msgType == TDMT_MND_COMPACT_TIMER) { + pMsg->msgType == TDMT_MND_COMPACT_TIMER || pMsg->msgType == TDMT_MND_NODECHECK_TIMER || + pMsg->msgType == TDMT_MND_GRANT_HB_TIMER || pMsg->msgType == TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE || + pMsg->msgType == TDMT_MND_STREAM_CHECKPOINT_TIMER) { mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored, pMnode->stopped, state.restored, syncStr(state.state)); return -1;