From 57cb65273401febe29121c503b3f1723a1631985 Mon Sep 17 00:00:00 2001 From: dmchen Date: Fri, 18 Oct 2024 04:41:22 +0000 Subject: [PATCH] fix/TS-5532-set-seperate-thread-update-status --- source/dnode/mgmt/mgmt_dnode/inc/dmInt.h | 2 + source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 9 ++++- source/dnode/mgmt/mgmt_dnode/src/dmInt.c | 3 ++ source/dnode/mgmt/mgmt_dnode/src/dmWorker.c | 45 +++++++++++++++++++++ 4 files changed, 57 insertions(+), 2 deletions(-) diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index 18b3f66a60..2d0b9c5a81 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -49,6 +49,7 @@ typedef struct SDnodeMgmt { // dmHandle.c SArray *dmGetMsgHandles(); void dmSendStatusReq(SDnodeMgmt *pMgmt); +void dmUpdateStatusInfo(SDnodeMgmt *pMgmt); void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq); int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); @@ -62,6 +63,7 @@ int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); // dmWorker.c int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmStartStatusThread(SDnodeMgmt *pMgmt); +int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt); void dmStopStatusThread(SDnodeMgmt *pMgmt); int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt); void dmStopNotifyThread(SDnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 87b1ae0efa..7be3af0c25 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -22,6 +22,8 @@ extern SConfig *tsCfg; +SMonVloadInfo vinfo = {0}; + static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { int32_t code = 0; if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) { @@ -163,8 +165,6 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { (void)taosThreadRwlockUnlock(&pMgmt->pData->lock); dDebug("send status req to mnode, statusSeq:%d, begin to get vnode loads", pMgmt->statusSeq); - SMonVloadInfo vinfo = {0}; - (*pMgmt->getVnodeLoadsFp)(&vinfo); req.pVloads = vinfo.pVloads; dDebug("send status req to mnode, statusSeq:%d, begin to get mnode loads", pMgmt->statusSeq); @@ -231,6 +231,11 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { dmProcessStatusRsp(pMgmt, &rpcRsp); } +void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) { + dDebug("begin to get vnode loads"); + (*pMgmt->getVnodeLoadsFp)(&vinfo); +} + void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) { int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq); if (contLen < 0) { diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index 22c2b2f5b2..701aa2d65e 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -22,6 +22,9 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) { if ((code = dmStartStatusThread(pMgmt)) != 0) { return code; } + if ((code = dmStartStatusInfoThread(pMgmt)) != 0) { + return code; + } #if defined(TD_ENTERPRISE) if ((code = dmStartNotifyThread(pMgmt)) != 0) { return code; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 58b86b20b1..7c9416fac6 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -47,6 +47,35 @@ static void *dmStatusThreadFp(void *param) { return NULL; } +static void *dmStatusInfoThreadFp(void *param) { + SDnodeMgmt *pMgmt = param; + int64_t lastTime = taosGetTimestampMs(); + setThreadName("dnode-status-info"); + + int32_t upTimeCount = 0; + int64_t upTime = 0; + + while (1) { + taosMsleep(200); + if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; + + int64_t curTime = taosGetTimestampMs(); + if (curTime < lastTime) lastTime = curTime; + float interval = (curTime - lastTime) / 1000.0f; + if (interval >= tsStatusInterval) { + dmUpdateStatusInfo(pMgmt); + lastTime = curTime; + + if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) { + upTime = taosGetOsUptime() - tsDndStartOsUptime; + tsDndUpTime = TMAX(tsDndUpTime, upTime); + } + } + } + + return NULL; +} + SDmNotifyHandle dmNotifyHdl = {.state = 0}; #define TIMESERIES_STASH_NUM 5 static void *dmNotifyThreadFp(void *param) { @@ -280,6 +309,22 @@ int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) { return 0; } +int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt) { + int32_t code = 0; + TdThreadAttr thAttr; + (void)taosThreadAttrInit(&thAttr); + (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusInfoThreadFp, pMgmt) != 0) { + code = TAOS_SYSTEM_ERROR(errno); + dError("failed to create status Info thread since %s", tstrerror(code)); + return code; + } + + (void)taosThreadAttrDestroy(&thAttr); + tmsgReportStartup("dnode-status-info", "initialized"); + return 0; +} + void dmStopStatusThread(SDnodeMgmt *pMgmt) { if (taosCheckPthreadValid(pMgmt->statusThread)) { (void)taosThreadJoin(pMgmt->statusThread, NULL);