add telemetry timer
This commit is contained in:
parent
1db7368570
commit
8a09500429
|
@ -135,8 +135,9 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "mnode-show", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_SHOW_RETRIEVE, "mnode-retrieve", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "mnode-status", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "mnode-trans-tmr", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "mnode-kill-trans", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "mnode-telem-tmr", SMTimerReq, SMTimerReq)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SMCreateTopicReq, SMCreateTopicRsp)
|
||||
|
@ -145,7 +146,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_RESET_OFFSET, "mnode-reset-offset", SMqCMResetOffsetReq, SMqCMResetOffsetRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-timer", SMTimerReq, SMTimerReq)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-tmr", SMTimerReq, SMTimerReq)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg)
|
||||
|
||||
// Requests handled by VNODE
|
||||
|
|
|
@ -81,6 +81,7 @@ typedef struct SMnode {
|
|||
tmr_h timer;
|
||||
tmr_h transTimer;
|
||||
tmr_h mqTimer;
|
||||
tmr_h telemTimer;
|
||||
char *path;
|
||||
SMnodeCfg cfg;
|
||||
int64_t checkTime;
|
||||
|
|
|
@ -71,7 +71,7 @@ int32_t mndInitTrans(SMnode *pMnode) {
|
|||
.updateFp = (SdbUpdateFp)mndTransActionUpdate,
|
||||
.deleteFp = (SdbDeleteFp)mndTransActionDelete};
|
||||
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TRANS_TIMER, mndProcessTransReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_KILL_TRANS, mndProcessKillTransReq);
|
||||
|
||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndGetTransMeta);
|
||||
|
|
|
@ -36,6 +36,10 @@
|
|||
#include "mndUser.h"
|
||||
#include "mndVgroup.h"
|
||||
|
||||
#define MQ_TIMER_MS 3000
|
||||
#define TRNAS_TIMER_MS 6000
|
||||
#define TELEM_TIMER_MS 86400000
|
||||
|
||||
int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||
if (pMnode == NULL || pMnode->sendReqToDnodeFp == NULL) {
|
||||
terrno = TSDB_CODE_MND_NOT_READY;
|
||||
|
@ -73,16 +77,16 @@ static void *mndBuildTimerMsg(int32_t *pContLen) {
|
|||
return pReq;
|
||||
}
|
||||
|
||||
static void mndTransReExecute(void *param, void *tmrId) {
|
||||
static void mndExecuteTransaction(void *param, void *tmrId) {
|
||||
SMnode *pMnode = param;
|
||||
if (mndIsMaster(pMnode)) {
|
||||
int32_t contLen = 0;
|
||||
void *pReq = mndBuildTimerMsg(&contLen);
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pReq, .contLen = contLen};
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
|
||||
pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
|
||||
}
|
||||
|
||||
taosTmrReset(mndTransReExecute, 3000, pMnode, pMnode->timer, &pMnode->transTimer);
|
||||
taosTmrReset(mndExecuteTransaction, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer);
|
||||
}
|
||||
|
||||
static void mndCalMqRebalance(void *param, void *tmrId) {
|
||||
|
@ -94,25 +98,39 @@ static void mndCalMqRebalance(void *param, void *tmrId) {
|
|||
pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg);
|
||||
}
|
||||
|
||||
taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->mqTimer);
|
||||
taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer);
|
||||
}
|
||||
|
||||
static void mndExecuteTelemetry(void *param, void *tmrId) {
|
||||
SMnode *pMnode = param;
|
||||
if (mndIsMaster(pMnode)) {
|
||||
int32_t contLen = 0;
|
||||
void *pReq = mndBuildTimerMsg(&contLen);
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
|
||||
pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg);
|
||||
}
|
||||
|
||||
taosTmrReset(mndExecuteTelemetry, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer);
|
||||
}
|
||||
|
||||
static int32_t mndInitTimer(SMnode *pMnode) {
|
||||
if (pMnode->timer == NULL) {
|
||||
pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND");
|
||||
}
|
||||
|
||||
pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND");
|
||||
if (pMnode->timer == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (taosTmrReset(mndTransReExecute, 6000, pMnode, pMnode->timer, &pMnode->transTimer)) {
|
||||
if (taosTmrReset(mndExecuteTransaction, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer)) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->mqTimer)) {
|
||||
if (taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer)) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (taosTmrReset(mndCalMqRebalance, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer)) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
@ -404,7 +422,8 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
if (pRpcMsg->msgType != TDMT_MND_TRANS && pRpcMsg->msgType != TDMT_MND_MQ_TIMER && pRpcMsg->msgType != TDMT_MND_MQ_DO_REBALANCE) {
|
||||
if (pRpcMsg->msgType != TDMT_MND_TRANS_TIMER && pRpcMsg->msgType != TDMT_MND_MQ_TIMER &&
|
||||
pRpcMsg->msgType != TDMT_MND_MQ_DO_REBALANCE && pRpcMsg->msgType != TDMT_MND_TELEM_TIMER) {
|
||||
SRpcConnInfo connInfo = {0};
|
||||
if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) {
|
||||
taosFreeQitem(pMsg);
|
||||
|
|
Loading…
Reference in New Issue