diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index 18b3f66a60..cbf1959e75 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -28,6 +28,7 @@ typedef struct SDnodeMgmt { const char *path; const char *name; TdThread statusThread; + TdThread statusInfoThread; TdThread notifyThread; TdThread monitorThread; TdThread auditThread; @@ -49,6 +50,7 @@ typedef struct SDnodeMgmt { // dmHandle.c SArray *dmGetMsgHandles(); void dmSendStatusReq(SDnodeMgmt *pMgmt); +void dmUpdateStatusInfo(SDnodeMgmt *pMgmt); void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq); int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); @@ -62,7 +64,9 @@ int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); // dmWorker.c int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmStartStatusThread(SDnodeMgmt *pMgmt); +int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt); void dmStopStatusThread(SDnodeMgmt *pMgmt); +void dmStopStatusInfoThread(SDnodeMgmt *pMgmt); int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt); void dmStopNotifyThread(SDnodeMgmt *pMgmt); int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 87b1ae0efa..d0f8fbae87 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -22,6 +22,8 @@ extern SConfig *tsCfg; +SMonVloadInfo tsVinfo = {0}; + static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { int32_t code = 0; if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) { @@ -163,9 +165,16 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { (void)taosThreadRwlockUnlock(&pMgmt->pData->lock); dDebug("send status req to mnode, statusSeq:%d, begin to get vnode loads", pMgmt->statusSeq); - SMonVloadInfo vinfo = {0}; - (*pMgmt->getVnodeLoadsFp)(&vinfo); - req.pVloads = vinfo.pVloads; + if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) { + dError("failed to lock status info lock"); + return; + } + req.pVloads = tsVinfo.pVloads; + tsVinfo.pVloads = NULL; + if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) { + dError("failed to unlock status info lock"); + return; + } dDebug("send status req to mnode, statusSeq:%d, begin to get mnode loads", pMgmt->statusSeq); SMonMloadInfo minfo = {0}; @@ -231,6 +240,28 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { dmProcessStatusRsp(pMgmt, &rpcRsp); } +void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) { + SMonVloadInfo vinfo = {0}; + dDebug("begin to get vnode loads"); + (*pMgmt->getVnodeLoadsFp)(&vinfo); + dDebug("begin to lock status info"); + if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) { + dError("failed to lock status info lock"); + return; + } + if (tsVinfo.pVloads == NULL) { + tsVinfo.pVloads = vinfo.pVloads; + vinfo.pVloads = NULL; + } else { + taosArrayDestroy(vinfo.pVloads); + vinfo.pVloads = NULL; + } + if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) { + dError("failed to unlock status info lock"); + return; + } +} + void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) { int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq); if (contLen < 0) { diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index 22c2b2f5b2..ed156ac1ec 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -22,6 +22,9 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) { if ((code = dmStartStatusThread(pMgmt)) != 0) { return code; } + if ((code = dmStartStatusInfoThread(pMgmt)) != 0) { + return code; + } #if defined(TD_ENTERPRISE) if ((code = dmStartNotifyThread(pMgmt)) != 0) { return code; @@ -44,6 +47,7 @@ static void dmStopMgmt(SDnodeMgmt *pMgmt) { dmStopMonitorThread(pMgmt); dmStopAuditThread(pMgmt); dmStopStatusThread(pMgmt); + dmStopStatusInfoThread(pMgmt); #if defined(TD_ENTERPRISE) dmStopNotifyThread(pMgmt); #endif diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 58b86b20b1..7fc9920816 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -47,6 +47,49 @@ static void *dmStatusThreadFp(void *param) { return NULL; } +extern SMonVloadInfo tsVinfo; +static void *dmStatusInfoThreadFp(void *param) { + SDnodeMgmt *pMgmt = param; + int64_t lastTime = taosGetTimestampMs(); + setThreadName("dnode-status-info"); + + int32_t upTimeCount = 0; + int64_t upTime = 0; + + while (1) { + taosMsleep(200); + if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; + + int64_t curTime = taosGetTimestampMs(); + if (curTime < lastTime) lastTime = curTime; + float interval = (curTime - lastTime) / 1000.0f; + if (interval >= tsStatusInterval) { + dmUpdateStatusInfo(pMgmt); + lastTime = curTime; + + if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) { + upTime = taosGetOsUptime() - tsDndStartOsUptime; + tsDndUpTime = TMAX(tsDndUpTime, upTime); + } + } + } + dDebug("begin to lock status info when thread exit"); + if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) { + dError("failed to lock status info lock"); + return NULL; + } + if (tsVinfo.pVloads != NULL) { + taosArrayDestroy(tsVinfo.pVloads); + tsVinfo.pVloads = NULL; + } + if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) { + dError("failed to unlock status info lock"); + return NULL; + } + + return NULL; +} + SDmNotifyHandle dmNotifyHdl = {.state = 0}; #define TIMESERIES_STASH_NUM 5 static void *dmNotifyThreadFp(void *param) { @@ -280,6 +323,22 @@ int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) { return 0; } +int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt) { + int32_t code = 0; + TdThreadAttr thAttr; + (void)taosThreadAttrInit(&thAttr); + (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + if (taosThreadCreate(&pMgmt->statusInfoThread, &thAttr, dmStatusInfoThreadFp, pMgmt) != 0) { + code = TAOS_SYSTEM_ERROR(errno); + dError("failed to create status Info thread since %s", tstrerror(code)); + return code; + } + + (void)taosThreadAttrDestroy(&thAttr); + tmsgReportStartup("dnode-status-info", "initialized"); + return 0; +} + void dmStopStatusThread(SDnodeMgmt *pMgmt) { if (taosCheckPthreadValid(pMgmt->statusThread)) { (void)taosThreadJoin(pMgmt->statusThread, NULL); @@ -287,6 +346,13 @@ void dmStopStatusThread(SDnodeMgmt *pMgmt) { } } +void dmStopStatusInfoThread(SDnodeMgmt *pMgmt) { + if (taosCheckPthreadValid(pMgmt->statusInfoThread)) { + (void)taosThreadJoin(pMgmt->statusInfoThread, NULL); + taosThreadClear(&pMgmt->statusInfoThread); + } +} + int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) { int32_t code = 0; TdThreadAttr thAttr; diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index 277dd2e02a..5e4f7163e7 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -214,6 +214,7 @@ int32_t dmInitVars(SDnode *pDnode) { } (void)taosThreadRwlockInit(&pData->lock, NULL); + (void)taosThreadMutexInit(&pData->statusInfolock, NULL); (void)taosThreadMutexInit(&pDnode->mutex, NULL); return 0; } diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index b5842acbad..de20f807e9 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -144,6 +144,7 @@ typedef struct { char machineId[TSDB_MACHINE_ID_LEN + 1]; EEncryptAlgor encryptAlgorigthm; EEncryptScope encryptScope; + TdThreadMutex statusInfolock; } SDnodeData; typedef struct {