fix/TS-5532-set-seperate-thread-update-status
This commit is contained in:
parent
a5a748bb46
commit
57cb652734
|
@ -49,6 +49,7 @@ typedef struct SDnodeMgmt {
|
||||||
// dmHandle.c
|
// dmHandle.c
|
||||||
SArray *dmGetMsgHandles();
|
SArray *dmGetMsgHandles();
|
||||||
void dmSendStatusReq(SDnodeMgmt *pMgmt);
|
void dmSendStatusReq(SDnodeMgmt *pMgmt);
|
||||||
|
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt);
|
||||||
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq);
|
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq);
|
||||||
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
|
@ -62,6 +63,7 @@ int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
// dmWorker.c
|
// dmWorker.c
|
||||||
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt);
|
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt);
|
||||||
|
int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt);
|
||||||
void dmStopStatusThread(SDnodeMgmt *pMgmt);
|
void dmStopStatusThread(SDnodeMgmt *pMgmt);
|
||||||
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt);
|
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt);
|
||||||
void dmStopNotifyThread(SDnodeMgmt *pMgmt);
|
void dmStopNotifyThread(SDnodeMgmt *pMgmt);
|
||||||
|
|
|
@ -22,6 +22,8 @@
|
||||||
|
|
||||||
extern SConfig *tsCfg;
|
extern SConfig *tsCfg;
|
||||||
|
|
||||||
|
SMonVloadInfo vinfo = {0};
|
||||||
|
|
||||||
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
|
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
|
if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
|
||||||
|
@ -163,8 +165,6 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
|
||||||
(void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
|
(void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
|
||||||
|
|
||||||
dDebug("send status req to mnode, statusSeq:%d, begin to get vnode loads", pMgmt->statusSeq);
|
dDebug("send status req to mnode, statusSeq:%d, begin to get vnode loads", pMgmt->statusSeq);
|
||||||
SMonVloadInfo vinfo = {0};
|
|
||||||
(*pMgmt->getVnodeLoadsFp)(&vinfo);
|
|
||||||
req.pVloads = vinfo.pVloads;
|
req.pVloads = vinfo.pVloads;
|
||||||
|
|
||||||
dDebug("send status req to mnode, statusSeq:%d, begin to get mnode loads", pMgmt->statusSeq);
|
dDebug("send status req to mnode, statusSeq:%d, begin to get mnode loads", pMgmt->statusSeq);
|
||||||
|
@ -231,6 +231,11 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
|
||||||
dmProcessStatusRsp(pMgmt, &rpcRsp);
|
dmProcessStatusRsp(pMgmt, &rpcRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
|
||||||
|
dDebug("begin to get vnode loads");
|
||||||
|
(*pMgmt->getVnodeLoadsFp)(&vinfo);
|
||||||
|
}
|
||||||
|
|
||||||
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
|
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
|
||||||
int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
|
int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
|
||||||
if (contLen < 0) {
|
if (contLen < 0) {
|
||||||
|
|
|
@ -22,6 +22,9 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) {
|
||||||
if ((code = dmStartStatusThread(pMgmt)) != 0) {
|
if ((code = dmStartStatusThread(pMgmt)) != 0) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
if ((code = dmStartStatusInfoThread(pMgmt)) != 0) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
#if defined(TD_ENTERPRISE)
|
#if defined(TD_ENTERPRISE)
|
||||||
if ((code = dmStartNotifyThread(pMgmt)) != 0) {
|
if ((code = dmStartNotifyThread(pMgmt)) != 0) {
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -47,6 +47,35 @@ static void *dmStatusThreadFp(void *param) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void *dmStatusInfoThreadFp(void *param) {
|
||||||
|
SDnodeMgmt *pMgmt = param;
|
||||||
|
int64_t lastTime = taosGetTimestampMs();
|
||||||
|
setThreadName("dnode-status-info");
|
||||||
|
|
||||||
|
int32_t upTimeCount = 0;
|
||||||
|
int64_t upTime = 0;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
taosMsleep(200);
|
||||||
|
if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
|
||||||
|
|
||||||
|
int64_t curTime = taosGetTimestampMs();
|
||||||
|
if (curTime < lastTime) lastTime = curTime;
|
||||||
|
float interval = (curTime - lastTime) / 1000.0f;
|
||||||
|
if (interval >= tsStatusInterval) {
|
||||||
|
dmUpdateStatusInfo(pMgmt);
|
||||||
|
lastTime = curTime;
|
||||||
|
|
||||||
|
if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) {
|
||||||
|
upTime = taosGetOsUptime() - tsDndStartOsUptime;
|
||||||
|
tsDndUpTime = TMAX(tsDndUpTime, upTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
SDmNotifyHandle dmNotifyHdl = {.state = 0};
|
SDmNotifyHandle dmNotifyHdl = {.state = 0};
|
||||||
#define TIMESERIES_STASH_NUM 5
|
#define TIMESERIES_STASH_NUM 5
|
||||||
static void *dmNotifyThreadFp(void *param) {
|
static void *dmNotifyThreadFp(void *param) {
|
||||||
|
@ -280,6 +309,22 @@ int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt) {
|
||||||
|
int32_t code = 0;
|
||||||
|
TdThreadAttr thAttr;
|
||||||
|
(void)taosThreadAttrInit(&thAttr);
|
||||||
|
(void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusInfoThreadFp, pMgmt) != 0) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
dError("failed to create status Info thread since %s", tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
(void)taosThreadAttrDestroy(&thAttr);
|
||||||
|
tmsgReportStartup("dnode-status-info", "initialized");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
void dmStopStatusThread(SDnodeMgmt *pMgmt) {
|
void dmStopStatusThread(SDnodeMgmt *pMgmt) {
|
||||||
if (taosCheckPthreadValid(pMgmt->statusThread)) {
|
if (taosCheckPthreadValid(pMgmt->statusThread)) {
|
||||||
(void)taosThreadJoin(pMgmt->statusThread, NULL);
|
(void)taosThreadJoin(pMgmt->statusThread, NULL);
|
||||||
|
|
Loading…
Reference in New Issue