Merge pull request #24405 from taosdata/fix/optMsgOnMnd

opt msg on mnode
This commit is contained in:
Haojun Liao 2024-01-10 09:43:48 +08:00 committed by GitHub
commit 273e47a1fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 109 additions and 55 deletions

View File

@ -16,6 +16,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndAcct.h" #include "mndAcct.h"
#include "mndCluster.h" #include "mndCluster.h"
#include "mndCompact.h"
#include "mndCompactDetail.h"
#include "mndConsumer.h" #include "mndConsumer.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
@ -42,8 +44,6 @@
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
#include "mndView.h" #include "mndView.h"
#include "mndCompact.h"
#include "mndCompactDetail.h"
static inline int32_t mndAcquireRpc(SMnode *pMnode) { static inline int32_t mndAcquireRpc(SMnode *pMnode) {
int32_t code = 0; int32_t code = 0;
@ -158,16 +158,16 @@ static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) {
} }
} }
static void mndStreamCheckpointRemain(SMnode* pMnode) { static void mndStreamCheckpointRemain(SMnode *pMnode) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildCheckpointTickMsg(&contLen, 0); void *pReq = mndBuildCheckpointTickMsg(&contLen, 0);
if (pReq != NULL) { if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
} }
} }
static void mndStreamCheckNode(SMnode* pMnode) { static void mndStreamCheckNode(SMnode *pMnode) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
@ -275,6 +275,100 @@ static void mndCheckDnodeOffline(SMnode *pMnode) {
mndReleaseRpc(pMnode); mndReleaseRpc(pMnode);
} }
static bool mnodeIsNotLeader(SMnode *pMnode) {
terrno = 0;
taosThreadRwlockRdlock(&pMnode->lock);
SSyncState state = syncGetState(pMnode->syncMgmt.sync);
if (terrno != 0) {
taosThreadRwlockUnlock(&pMnode->lock);
return true;
}
if (state.state != TAOS_SYNC_STATE_LEADER) {
taosThreadRwlockUnlock(&pMnode->lock);
terrno = TSDB_CODE_SYN_NOT_LEADER;
return true;
}
if (!state.restored || !pMnode->restored) {
taosThreadRwlockUnlock(&pMnode->lock);
terrno = TSDB_CODE_SYN_RESTORING;
return true;
}
taosThreadRwlockUnlock(&pMnode->lock);
return false;
}
static int32_t minCronTime() {
int64_t min = INT64_MAX;
min = TMIN(min, tsTtlPushIntervalSec);
min = TMIN(min, tsTrimVDbIntervalSec);
min = TMIN(min, tsTransPullupInterval);
min = TMIN(min, tsCompactPullupInterval);
min = TMIN(min, tsMqRebalanceInterval);
min = TMIN(min, tsStreamCheckpointInterval);
min = TMIN(min, 5); // checkpointRemain
min = TMIN(min, tsStreamNodeCheckInterval);
int64_t telemInt = TMIN(60, (tsTelemInterval - 1));
min = TMIN(min, telemInt);
min = TMIN(min, tsGrantHBInterval);
min = TMIN(min, tsUptimeInterval);
return min <= 1 ? 2 : min;
}
void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) {
if (sec % tsTtlPushIntervalSec == 0) {
mndPullupTtl(pMnode);
}
if (sec % tsTrimVDbIntervalSec == 0) {
mndPullupTrimDb(pMnode);
}
if (sec % tsTransPullupInterval == 0) {
mndPullupTrans(pMnode);
}
if (sec % tsCompactPullupInterval == 0) {
mndPullupCompacts(pMnode);
}
if (sec % tsMqRebalanceInterval == 0) {
mndCalMqRebalance(pMnode);
}
if (sec % tsStreamCheckpointInterval == 0) {
mndStreamCheckpointTick(pMnode, sec);
}
if (sec % 5 == 0) {
mndStreamCheckpointRemain(pMnode);
}
if (sec % tsStreamNodeCheckInterval == 0) {
mndStreamCheckNode(pMnode);
}
if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) {
mndPullupTelem(pMnode);
}
if (sec % tsGrantHBInterval == 0) {
mndPullupGrant(pMnode);
}
if (sec % tsUptimeInterval == 0) {
mndIncreaseUpTime(pMnode);
}
}
void mndDoTimerCheckTask(SMnode *pMnode, int64_t sec) {
if (sec % (tsStatusInterval * 5) == 0) {
mndCheckDnodeOffline(pMnode);
}
if (sec % (MNODE_TIMEOUT_SEC / 2) == 0) {
mndSyncCheckTimeout(pMnode);
}
}
static void *mndThreadFp(void *param) { static void *mndThreadFp(void *param) {
SMnode *pMnode = param; SMnode *pMnode = param;
int64_t lastTime = 0; int64_t lastTime = 0;
@ -287,57 +381,15 @@ static void *mndThreadFp(void *param) {
if (lastTime % 10 != 0) continue; if (lastTime % 10 != 0) continue;
int64_t sec = lastTime / 10; int64_t sec = lastTime / 10;
if (sec % tsTtlPushIntervalSec == 0) { mndDoTimerCheckTask(pMnode, sec);
mndPullupTtl(pMnode);
}
if (sec % tsTrimVDbIntervalSec == 0) { int64_t minCron = minCronTime();
mndPullupTrimDb(pMnode); if (sec % minCron == 0 && mnodeIsNotLeader(pMnode)) {
} // not leader, do nothing
mTrace("timer not process since mnode is not leader, reason: %s", tstrerror(terrno)) terrno = 0;
if (sec % tsTransPullupInterval == 0) { continue;
mndPullupTrans(pMnode);
}
if (sec % tsCompactPullupInterval == 0) {
mndPullupCompacts(pMnode);
}
if (sec % tsMqRebalanceInterval == 0) {
mndCalMqRebalance(pMnode);
}
if (sec % tsStreamCheckpointInterval == 0) {
mndStreamCheckpointTick(pMnode, sec);
}
if (sec % 5 == 0) {
mndStreamCheckpointRemain(pMnode);
}
if (sec % tsStreamNodeCheckInterval == 0) {
mndStreamCheckNode(pMnode);
}
if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) {
mndPullupTelem(pMnode);
}
if (sec % tsGrantHBInterval == 0) {
mndPullupGrant(pMnode);
}
if (sec % tsUptimeInterval == 0) {
mndIncreaseUpTime(pMnode);
}
if (sec % (tsStatusInterval * 5) == 0) {
mndCheckDnodeOffline(pMnode);
}
if (sec % (MNODE_TIMEOUT_SEC / 2) == 0) {
mndSyncCheckTimeout(pMnode);
} }
mndDoTimerPullupTask(pMnode, sec);
} }
return NULL; return NULL;
@ -712,7 +764,9 @@ _OVER:
if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER || pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
pMsg->msgType == TDMT_MND_TRIM_DB_TIMER || pMsg->msgType == TDMT_MND_UPTIME_TIMER || pMsg->msgType == TDMT_MND_TRIM_DB_TIMER || pMsg->msgType == TDMT_MND_UPTIME_TIMER ||
pMsg->msgType == TDMT_MND_COMPACT_TIMER) { pMsg->msgType == TDMT_MND_COMPACT_TIMER || pMsg->msgType == TDMT_MND_NODECHECK_TIMER ||
pMsg->msgType == TDMT_MND_GRANT_HB_TIMER || pMsg->msgType == TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE ||
pMsg->msgType == TDMT_MND_STREAM_CHECKPOINT_TIMER) {
mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored, mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
pMnode->stopped, state.restored, syncStr(state.state)); pMnode->stopped, state.restored, syncStr(state.state));
return -1; return -1;