Merge pull request #12686 from taosdata/fix/dnode
refactor: dnode monitor
This commit is contained in:
commit
012c500264
|
@ -16,7 +16,7 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "bmInt.h"
|
||||
|
||||
static void bmGetMonitorInfo(SBnodeMgmt *pMgmt, SMonBmInfo *bmInfo) {}
|
||||
void bmGetMonitorInfo(SBnodeMgmt *pMgmt, SMonBmInfo *bmInfo) {}
|
||||
|
||||
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
||||
SMonBmInfo bmInfo = {0};
|
||||
|
|
|
@ -32,7 +32,9 @@ typedef struct SDnodeMgmt {
|
|||
SSingleWorker mgmtWorker;
|
||||
ProcessCreateNodeFp processCreateNodeFp;
|
||||
ProcessDropNodeFp processDropNodeFp;
|
||||
IsNodeRequiredFp isNodeRequiredFp;
|
||||
SendMonitorReportFp sendMonitorReportFp;
|
||||
GetVnodeLoadsFp getVnodeLoadsFp;
|
||||
GetMnodeLoadsFp getMnodeLoadsFp;
|
||||
} SDnodeMgmt;
|
||||
|
||||
// dmHandle.c
|
||||
|
@ -43,11 +45,6 @@ int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
|||
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
|
||||
// dmMonitor.c
|
||||
void dmGetVnodeLoads(SDnodeMgmt *pMgmt, SMonVloadInfo *pInfo);
|
||||
void dmGetMnodeLoads(SDnodeMgmt *pMgmt, SMonMloadInfo *pInfo);
|
||||
void dmSendMonitorReport(SDnodeMgmt *pMgmt);
|
||||
|
||||
// dmWorker.c
|
||||
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt);
|
||||
|
|
|
@ -72,11 +72,11 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
|
|||
taosRUnLockLatch(&pMgmt->pData->latch);
|
||||
|
||||
SMonVloadInfo vinfo = {0};
|
||||
dmGetVnodeLoads(pMgmt, &vinfo);
|
||||
(*pMgmt->getVnodeLoadsFp)(&vinfo);
|
||||
req.pVloads = vinfo.pVloads;
|
||||
|
||||
SMonMloadInfo minfo = {0};
|
||||
dmGetMnodeLoads(pMgmt, &minfo);
|
||||
(*pMgmt->getMnodeLoadsFp)(&minfo);
|
||||
|
||||
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
|
||||
void *pHead = rpcMallocCont(contLen);
|
||||
|
@ -115,7 +115,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
|
|||
|
||||
SServerStatusRsp statusRsp = {0};
|
||||
SMonMloadInfo minfo = {0};
|
||||
dmGetMnodeLoads(pMgmt, &minfo);
|
||||
(*pMgmt->getMnodeLoadsFp)(&minfo);
|
||||
if (minfo.isMnode && minfo.load.syncState == TAOS_SYNC_STATE_ERROR) {
|
||||
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
|
||||
snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
|
||||
|
@ -123,7 +123,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
|
|||
}
|
||||
|
||||
SMonVloadInfo vinfo = {0};
|
||||
dmGetVnodeLoads(pMgmt, &vinfo);
|
||||
(*pMgmt->getVnodeLoadsFp)(&vinfo);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
|
||||
SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
|
||||
if (pLoad->syncState == TAOS_SYNC_STATE_ERROR) {
|
||||
|
|
|
@ -45,7 +45,9 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
|||
pMgmt->name = pInput->name;
|
||||
pMgmt->processCreateNodeFp = pInput->processCreateNodeFp;
|
||||
pMgmt->processDropNodeFp = pInput->processDropNodeFp;
|
||||
pMgmt->isNodeRequiredFp = pInput->isNodeRequiredFp;
|
||||
pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp;
|
||||
pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp;
|
||||
pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp;
|
||||
|
||||
if (dmStartWorker(pMgmt) != 0) {
|
||||
return -1;
|
||||
|
|
|
@ -1,104 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "dmInt.h"
|
||||
|
||||
#define dmSendLocalRecv(pMgmt, mtype, func, pInfo) \
|
||||
if (!tsMultiProcess) { \
|
||||
SRpcMsg rsp = {0}; \
|
||||
SRpcMsg req = {.msgType = mtype}; \
|
||||
SEpSet epset = {.inUse = 0, .numOfEps = 1}; \
|
||||
tstrncpy(epset.eps[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN); \
|
||||
epset.eps[0].port = tsServerPort; \
|
||||
rpcSendRecv(pMgmt->msgCb.clientRpc, &epset, &req, &rsp); \
|
||||
if (rsp.code == 0 && rsp.contLen > 0) { \
|
||||
func(rsp.pCont, rsp.contLen, pInfo); \
|
||||
} \
|
||||
rpcFreeCont(rsp.pCont); \
|
||||
}
|
||||
|
||||
static void dmGetMonitorBasicInfo(SDnodeMgmt *pMgmt, SMonBasicInfo *pInfo) {
|
||||
pInfo->protocol = 1;
|
||||
pInfo->dnode_id = pMgmt->pData->dnodeId;
|
||||
pInfo->cluster_id = pMgmt->pData->clusterId;
|
||||
tstrncpy(pInfo->dnode_ep, tsLocalEp, TSDB_EP_LEN);
|
||||
}
|
||||
|
||||
static void dmGetMonitorDnodeInfo(SDnodeMgmt *pMgmt, SMonDnodeInfo *pInfo) {
|
||||
pInfo->uptime = (taosGetTimestampMs() - pMgmt->pData->rebootTime) / (86400000.0f);
|
||||
pInfo->has_mnode = (*pMgmt->isNodeRequiredFp)(MNODE);
|
||||
pInfo->has_qnode = (*pMgmt->isNodeRequiredFp)(QNODE);
|
||||
pInfo->has_snode = (*pMgmt->isNodeRequiredFp)(SNODE);
|
||||
pInfo->has_bnode = (*pMgmt->isNodeRequiredFp)(BNODE);
|
||||
tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name));
|
||||
pInfo->logdir.size = tsLogSpace.size;
|
||||
tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name));
|
||||
pInfo->tempdir.size = tsTempSpace.size;
|
||||
}
|
||||
|
||||
static void dmGetMonitorInfo(SDnodeMgmt *pMgmt, SMonDmInfo *pInfo) {
|
||||
dmGetMonitorBasicInfo(pMgmt, &pInfo->basic);
|
||||
dmGetMonitorDnodeInfo(pMgmt, &pInfo->dnode);
|
||||
dmGetMonitorSystemInfo(&pInfo->sys);
|
||||
}
|
||||
|
||||
void dmSendMonitorReport(SDnodeMgmt *pMgmt) {
|
||||
if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return;
|
||||
dTrace("send monitor report to %s:%u", tsMonitorFqdn, tsMonitorPort);
|
||||
|
||||
SMonDmInfo dmInfo = {0};
|
||||
SMonMmInfo mmInfo = {0};
|
||||
SMonVmInfo vmInfo = {0};
|
||||
SMonQmInfo qmInfo = {0};
|
||||
SMonSmInfo smInfo = {0};
|
||||
SMonBmInfo bmInfo = {0};
|
||||
|
||||
dmGetMonitorInfo(pMgmt, &dmInfo);
|
||||
dmSendLocalRecv(pMgmt, TDMT_MON_VM_INFO, tDeserializeSMonVmInfo, &vmInfo);
|
||||
if (dmInfo.dnode.has_mnode) {
|
||||
dmSendLocalRecv(pMgmt, TDMT_MON_MM_INFO, tDeserializeSMonMmInfo, &mmInfo);
|
||||
}
|
||||
if (dmInfo.dnode.has_qnode) {
|
||||
dmSendLocalRecv(pMgmt, TDMT_MON_QM_INFO, tDeserializeSMonQmInfo, &qmInfo);
|
||||
}
|
||||
if (dmInfo.dnode.has_snode) {
|
||||
dmSendLocalRecv(pMgmt, TDMT_MON_SM_INFO, tDeserializeSMonSmInfo, &smInfo);
|
||||
}
|
||||
if (dmInfo.dnode.has_bnode) {
|
||||
dmSendLocalRecv(pMgmt, TDMT_MON_BM_INFO, tDeserializeSMonBmInfo, &bmInfo);
|
||||
}
|
||||
|
||||
monSetDmInfo(&dmInfo);
|
||||
monSetMmInfo(&mmInfo);
|
||||
monSetVmInfo(&vmInfo);
|
||||
monSetQmInfo(&qmInfo);
|
||||
monSetSmInfo(&smInfo);
|
||||
monSetBmInfo(&bmInfo);
|
||||
tFreeSMonMmInfo(&mmInfo);
|
||||
tFreeSMonVmInfo(&vmInfo);
|
||||
tFreeSMonQmInfo(&qmInfo);
|
||||
tFreeSMonSmInfo(&smInfo);
|
||||
tFreeSMonBmInfo(&bmInfo);
|
||||
monSendReport();
|
||||
}
|
||||
|
||||
void dmGetVnodeLoads(SDnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
|
||||
dmSendLocalRecv(pMgmt, TDMT_MON_VM_LOAD, tDeserializeSMonVloadInfo, pInfo);
|
||||
}
|
||||
|
||||
void dmGetMnodeLoads(SDnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
|
||||
dmSendLocalRecv(pMgmt, TDMT_MON_MM_LOAD, tDeserializeSMonMloadInfo, pInfo);
|
||||
}
|
|
@ -50,7 +50,7 @@ static void *dmMonitorThreadFp(void *param) {
|
|||
int64_t curTime = taosGetTimestampMs();
|
||||
float interval = (curTime - lastTime) / 1000.0f;
|
||||
if (interval >= tsMonitorInterval) {
|
||||
dmSendMonitorReport(pMgmt);
|
||||
(*pMgmt->sendMonitorReportFp)();
|
||||
lastTime = curTime;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -154,6 +154,6 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
dInfo("successed to write %s, deployed:%d", realfile, deployed);
|
||||
dDebug("successed to write %s, deployed:%d", realfile, deployed);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -16,8 +16,13 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "mmInt.h"
|
||||
|
||||
static void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *mmInfo) {
|
||||
mndGetMonitorInfo(pMgmt->pMnode, &mmInfo->cluster, &mmInfo->vgroup, &mmInfo->grant);
|
||||
void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *pInfo) {
|
||||
mndGetMonitorInfo(pMgmt->pMnode, &pInfo->cluster, &pInfo->vgroup, &pInfo->grant);
|
||||
}
|
||||
|
||||
void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
|
||||
pInfo->isMnode = 1;
|
||||
mndGetLoad(pMgmt->pMnode, &pInfo->load);
|
||||
}
|
||||
|
||||
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
||||
|
@ -45,11 +50,6 @@ int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
|
||||
pInfo->isMnode = 1;
|
||||
mndGetLoad(pMgmt->pMnode, &pInfo->load);
|
||||
}
|
||||
|
||||
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
||||
SMonMloadInfo mloads = {0};
|
||||
mmGetMnodeLoads(pMgmt, &mloads);
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "qmInt.h"
|
||||
|
||||
static void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {}
|
||||
void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {}
|
||||
|
||||
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
||||
SMonQmInfo qmInfo = {0};
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "smInt.h"
|
||||
|
||||
static void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
|
||||
void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
|
||||
|
||||
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
||||
SMonSmInfo smInfo = {0};
|
||||
|
|
|
@ -128,7 +128,7 @@ int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t
|
|||
|
||||
*numOfVnodes = vnodesNum;
|
||||
code = 0;
|
||||
dInfo("succcessed to read file %s", file);
|
||||
dDebug("succcessed to read file %s", file);
|
||||
|
||||
_OVER:
|
||||
if (content != NULL) taosMemoryFree(content);
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "vmInt.h"
|
||||
|
||||
static void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
|
||||
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
|
||||
pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
|
||||
if (pInfo->pVloads == NULL) return;
|
||||
|
||||
|
@ -37,7 +37,7 @@ static void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
|
|||
taosRUnLockLatch(&pMgmt->latch);
|
||||
}
|
||||
|
||||
static void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
|
||||
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
|
||||
SMonVloadInfo vloads = {0};
|
||||
vmGetVnodeLoads(pMgmt, &vloads);
|
||||
|
||||
|
|
|
@ -13,8 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_DND_IMP_H_
|
||||
#define _TD_DND_IMP_H_
|
||||
#ifndef _TD_DND_MGMT_H_
|
||||
#define _TD_DND_MGMT_H_
|
||||
|
||||
// tobe deleted
|
||||
#include "uv.h"
|
||||
|
@ -165,16 +165,13 @@ SMsgCb dmGetMsgcb(SDnode *pDnode);
|
|||
int32_t dmInitMsgHandle(SDnode *pDnode);
|
||||
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||
|
||||
// mgmt nodes
|
||||
SMgmtFunc dmGetMgmtFunc();
|
||||
SMgmtFunc bmGetMgmtFunc();
|
||||
SMgmtFunc qmGetMgmtFunc();
|
||||
SMgmtFunc smGetMgmtFunc();
|
||||
SMgmtFunc vmGetMgmtFunc();
|
||||
SMgmtFunc mmGetMgmtFunc();
|
||||
// dmMonitor.c
|
||||
void dmSendMonitorReport();
|
||||
void dmGetVnodeLoads(SMonVloadInfo *pInfo);
|
||||
void dmGetMnodeLoads(SMonMloadInfo *pInfo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_DND_IMP_H_*/
|
||||
#endif /*_TD_DND_MGMT_H_*/
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_DND_NODES_H_
|
||||
#define _TD_DND_NODES_H_
|
||||
|
||||
#include "dmInt.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
SMgmtFunc dmGetMgmtFunc();
|
||||
SMgmtFunc bmGetMgmtFunc();
|
||||
SMgmtFunc qmGetMgmtFunc();
|
||||
SMgmtFunc smGetMgmtFunc();
|
||||
SMgmtFunc vmGetMgmtFunc();
|
||||
SMgmtFunc mmGetMgmtFunc();
|
||||
|
||||
void mmGetMonitorInfo(void *pMgmt, SMonMmInfo *pInfo);
|
||||
void vmGetMonitorInfo(void *pMgmt, SMonVmInfo *pInfo);
|
||||
void qmGetMonitorInfo(void *pMgmt, SMonQmInfo *pInfo);
|
||||
void smGetMonitorInfo(void *pMgmt, SMonSmInfo *pInfo);
|
||||
void bmGetMonitorInfo(void *pMgmt, SMonBmInfo *pInfo);
|
||||
|
||||
void vmGetVnodeLoads(void *pMgmt, SMonVloadInfo *pInfo);
|
||||
void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_DND_NODES_H_*/
|
|
@ -168,11 +168,6 @@ static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static bool dmIsNodeRequired(EDndNodeType ntype) {
|
||||
SDnode *pDnode = dmInstance();
|
||||
return pDnode->wrappers[ntype].required;
|
||||
}
|
||||
|
||||
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
|
||||
SMgmtInputOpt opt = {
|
||||
.path = pWrapper->path,
|
||||
|
@ -180,7 +175,9 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
|
|||
.pData = &pWrapper->pDnode->data,
|
||||
.processCreateNodeFp = dmProcessCreateNodeReq,
|
||||
.processDropNodeFp = dmProcessDropNodeReq,
|
||||
.isNodeRequiredFp = dmIsNodeRequired,
|
||||
.sendMonitorReportFp = dmSendMonitorReport,
|
||||
.getVnodeLoadsFp = dmGetVnodeLoads,
|
||||
.getMnodeLoadsFp = dmGetMnodeLoads,
|
||||
};
|
||||
|
||||
opt.msgCb = dmGetMsgcb(pWrapper->pDnode);
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "dmMgmt.h"
|
||||
#include "dmNodes.h"
|
||||
|
||||
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
|
||||
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
|
||||
|
@ -189,7 +190,7 @@ int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) {
|
|||
}
|
||||
|
||||
dmReportStartup("dnode-transport", "initialized");
|
||||
dInfo("dnode is created, ptr:%p", pDnode);
|
||||
dDebug("dnode is created, ptr:%p", pDnode);
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
|
@ -208,7 +209,7 @@ void dmCleanupDnode(SDnode *pDnode) {
|
|||
dmCleanupClient(pDnode);
|
||||
dmCleanupServer(pDnode);
|
||||
dmClearVars(pDnode);
|
||||
dInfo("dnode is closed, ptr:%p", pDnode);
|
||||
dDebug("dnode is closed, ptr:%p", pDnode);
|
||||
}
|
||||
|
||||
void dmSetStatus(SDnode *pDnode, EDndRunStatus status) {
|
||||
|
|
|
@ -0,0 +1,170 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "dmMgmt.h"
|
||||
#include "dmNodes.h"
|
||||
|
||||
#define dmSendLocalRecv(pDnode, mtype, func, pInfo) \
|
||||
SRpcMsg rsp = {0}; \
|
||||
SRpcMsg req = {.msgType = mtype}; \
|
||||
SEpSet epset = {.inUse = 0, .numOfEps = 1}; \
|
||||
tstrncpy(epset.eps[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN); \
|
||||
epset.eps[0].port = tsServerPort; \
|
||||
rpcSendRecv(pDnode->trans.clientRpc, &epset, &req, &rsp); \
|
||||
if (rsp.code == 0 && rsp.contLen > 0) { \
|
||||
func(rsp.pCont, rsp.contLen, pInfo); \
|
||||
} \
|
||||
rpcFreeCont(rsp.pCont);
|
||||
|
||||
static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) {
|
||||
pInfo->protocol = 1;
|
||||
pInfo->dnode_id = pDnode->data.dnodeId;
|
||||
pInfo->cluster_id = pDnode->data.clusterId;
|
||||
tstrncpy(pInfo->dnode_ep, tsLocalEp, TSDB_EP_LEN);
|
||||
}
|
||||
|
||||
static void dmGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
|
||||
pInfo->uptime = (taosGetTimestampMs() - pDnode->data.rebootTime) / (86400000.0f);
|
||||
pInfo->has_mnode = pDnode->wrappers[MNODE].required;
|
||||
pInfo->has_qnode = pDnode->wrappers[QNODE].required;
|
||||
pInfo->has_snode = pDnode->wrappers[SNODE].required;
|
||||
pInfo->has_bnode = pDnode->wrappers[BNODE].required;
|
||||
tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name));
|
||||
pInfo->logdir.size = tsLogSpace.size;
|
||||
tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name));
|
||||
pInfo->tempdir.size = tsTempSpace.size;
|
||||
}
|
||||
|
||||
static void dmGetDmMonitorInfo(SDnode *pDnode) {
|
||||
SMonDmInfo dmInfo = {0};
|
||||
dmGetMonitorBasicInfo(pDnode, &dmInfo.basic);
|
||||
dmGetMonitorDnodeInfo(pDnode, &dmInfo.dnode);
|
||||
dmGetMonitorSystemInfo(&dmInfo.sys);
|
||||
monSetDmInfo(&dmInfo);
|
||||
}
|
||||
|
||||
static void dmGetMmMonitorInfo(SDnode *pDnode) {
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[MNODE];
|
||||
if (dmMarkWrapper(pWrapper) == 0) {
|
||||
SMonMmInfo mmInfo = {0};
|
||||
if (tsMultiProcess) {
|
||||
dmSendLocalRecv(pDnode, TDMT_MON_MM_INFO, tDeserializeSMonMmInfo, &mmInfo);
|
||||
} else if (pWrapper->pMgmt != NULL) {
|
||||
mmGetMonitorInfo(pWrapper->pMgmt, &mmInfo);
|
||||
}
|
||||
dmReleaseWrapper(pWrapper);
|
||||
monSetMmInfo(&mmInfo);
|
||||
tFreeSMonMmInfo(&mmInfo);
|
||||
}
|
||||
}
|
||||
|
||||
static void dmGetVmMonitorInfo(SDnode *pDnode) {
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE];
|
||||
if (dmMarkWrapper(pWrapper) == 0) {
|
||||
SMonVmInfo vmInfo = {0};
|
||||
if (tsMultiProcess) {
|
||||
dmSendLocalRecv(pDnode, TDMT_MON_VM_INFO, tDeserializeSMonVmInfo, &vmInfo);
|
||||
} else if (pWrapper->pMgmt != NULL) {
|
||||
vmGetMonitorInfo(pWrapper->pMgmt, &vmInfo);
|
||||
}
|
||||
dmReleaseWrapper(pWrapper);
|
||||
monSetVmInfo(&vmInfo);
|
||||
tFreeSMonVmInfo(&vmInfo);
|
||||
}
|
||||
}
|
||||
|
||||
static void dmGetQmMonitorInfo(SDnode *pDnode) {
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[QNODE];
|
||||
if (dmMarkWrapper(pWrapper) == 0) {
|
||||
SMonQmInfo qmInfo = {0};
|
||||
if (tsMultiProcess) {
|
||||
dmSendLocalRecv(pDnode, TDMT_MON_QM_INFO, tDeserializeSMonQmInfo, &qmInfo);
|
||||
} else if (pWrapper->pMgmt != NULL) {
|
||||
qmGetMonitorInfo(pWrapper->pMgmt, &qmInfo);
|
||||
}
|
||||
dmReleaseWrapper(pWrapper);
|
||||
monSetQmInfo(&qmInfo);
|
||||
tFreeSMonQmInfo(&qmInfo);
|
||||
}
|
||||
}
|
||||
|
||||
static void dmGetSmMonitorInfo(SDnode *pDnode) {
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[SNODE];
|
||||
if (dmMarkWrapper(pWrapper) == 0) {
|
||||
SMonSmInfo smInfo = {0};
|
||||
if (tsMultiProcess) {
|
||||
dmSendLocalRecv(pDnode, TDMT_MON_SM_INFO, tDeserializeSMonSmInfo, &smInfo);
|
||||
} else if (pWrapper->pMgmt != NULL) {
|
||||
smGetMonitorInfo(pWrapper->pMgmt, &smInfo);
|
||||
}
|
||||
dmReleaseWrapper(pWrapper);
|
||||
monSetSmInfo(&smInfo);
|
||||
tFreeSMonSmInfo(&smInfo);
|
||||
}
|
||||
}
|
||||
|
||||
static void dmGetBmMonitorInfo(SDnode *pDnode) {
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[BNODE];
|
||||
if (dmMarkWrapper(pWrapper) == 0) {
|
||||
SMonBmInfo bmInfo = {0};
|
||||
if (tsMultiProcess) {
|
||||
dmSendLocalRecv(pDnode, TDMT_MON_BM_INFO, tDeserializeSMonBmInfo, &bmInfo);
|
||||
} else if (pWrapper->pMgmt != NULL) {
|
||||
bmGetMonitorInfo(pWrapper->pMgmt, &bmInfo);
|
||||
}
|
||||
dmReleaseWrapper(pWrapper);
|
||||
monSetBmInfo(&bmInfo);
|
||||
tFreeSMonBmInfo(&bmInfo);
|
||||
}
|
||||
}
|
||||
|
||||
void dmSendMonitorReport() {
|
||||
if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return;
|
||||
dTrace("send monitor report to %s:%u", tsMonitorFqdn, tsMonitorPort);
|
||||
|
||||
SDnode *pDnode = dmInstance();
|
||||
dmGetDmMonitorInfo(pDnode);
|
||||
dmGetMmMonitorInfo(pDnode);
|
||||
dmGetVmMonitorInfo(pDnode);
|
||||
dmGetQmMonitorInfo(pDnode);
|
||||
dmGetSmMonitorInfo(pDnode);
|
||||
dmGetBmMonitorInfo(pDnode);
|
||||
monSendReport();
|
||||
}
|
||||
|
||||
void dmGetVnodeLoads(SMonVloadInfo *pInfo) {
|
||||
SDnode *pDnode = dmInstance();
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE];
|
||||
if (dmMarkWrapper(pWrapper) == 0) {
|
||||
if (tsMultiProcess) {
|
||||
dmSendLocalRecv(pDnode, TDMT_MON_VM_LOAD, tDeserializeSMonVloadInfo, pInfo);
|
||||
} else if (pWrapper->pMgmt != NULL) {
|
||||
vmGetVnodeLoads(pWrapper->pMgmt, pInfo);
|
||||
}
|
||||
dmReleaseWrapper(pWrapper);
|
||||
}
|
||||
}
|
||||
|
||||
void dmGetMnodeLoads(SMonMloadInfo *pInfo) {
|
||||
SDnode *pDnode = dmInstance();
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[MNODE];
|
||||
if (tsMultiProcess) {
|
||||
dmSendLocalRecv(pDnode, TDMT_MON_MM_LOAD, tDeserializeSMonMloadInfo, pInfo);
|
||||
} else if (pWrapper->pMgmt != NULL) {
|
||||
mmGetMnodeLoads(pWrapper->pMgmt, pInfo);
|
||||
}
|
||||
dmReleaseWrapper(pWrapper);
|
||||
}
|
|
@ -89,21 +89,23 @@ typedef enum {
|
|||
|
||||
typedef int32_t (*ProcessCreateNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
|
||||
typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
|
||||
typedef bool (*IsNodeRequiredFp)(EDndNodeType ntype);
|
||||
typedef void (*SendMonitorReportFp)();
|
||||
typedef void (*GetVnodeLoadsFp)();
|
||||
typedef void (*GetMnodeLoadsFp)();
|
||||
|
||||
typedef struct {
|
||||
int32_t dnodeId;
|
||||
int64_t clusterId;
|
||||
int64_t dnodeVer;
|
||||
int64_t updateTime;
|
||||
int64_t rebootTime;
|
||||
bool dropped;
|
||||
bool stopped;
|
||||
SEpSet mnodeEps;
|
||||
SArray *dnodeEps;
|
||||
SHashObj *dnodeHash;
|
||||
SRWLatch latch;
|
||||
SMsgCb msgCb;
|
||||
int32_t dnodeId;
|
||||
int64_t clusterId;
|
||||
int64_t dnodeVer;
|
||||
int64_t updateTime;
|
||||
int64_t rebootTime;
|
||||
bool dropped;
|
||||
bool stopped;
|
||||
SEpSet mnodeEps;
|
||||
SArray *dnodeEps;
|
||||
SHashObj *dnodeHash;
|
||||
SRWLatch latch;
|
||||
SMsgCb msgCb;
|
||||
} SDnodeData;
|
||||
|
||||
typedef struct {
|
||||
|
@ -113,7 +115,9 @@ typedef struct {
|
|||
SMsgCb msgCb;
|
||||
ProcessCreateNodeFp processCreateNodeFp;
|
||||
ProcessDropNodeFp processDropNodeFp;
|
||||
IsNodeRequiredFp isNodeRequiredFp;
|
||||
SendMonitorReportFp sendMonitorReportFp;
|
||||
GetVnodeLoadsFp getVnodeLoadsFp;
|
||||
GetMnodeLoadsFp getMnodeLoadsFp;
|
||||
} SMgmtInputOpt;
|
||||
|
||||
typedef struct {
|
||||
|
|
Loading…
Reference in New Issue