diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bef323cdba..c7251a26b9 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -717,6 +717,18 @@ typedef struct { char charset[TD_LOCALE_LEN]; // tsCharset } SClusterCfg; +typedef struct { + int32_t openVnodes; + int32_t totalVnodes; + int32_t masterNum; + int64_t numOfSelectReqs; + int64_t numOfInsertReqs; + int64_t numOfInsertSuccessReqs; + int64_t numOfBatchInsertReqs; + int64_t numOfBatchInsertSuccessReqs; + int64_t errors; +} SVnodesStat; + typedef struct { int32_t vgId; int8_t role; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 63d1fc5014..6010c80f31 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -218,6 +218,15 @@ enum { TD_NEW_MSG_SEG(TDMT_SCH_MSG) TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "scheduler-link-broken", NULL, NULL) + // Monitor info exchange between processes + TD_NEW_MSG_SEG(TDMT_MON_MSG) + TD_DEF_MSG_TYPE(TDMT_MON_MM_INFO, "monitor-minfo", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MON_VM_INFO, "monitor-vinfo", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MON_QM_INFO, "monitor-qinfo", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MON_SM_INFO, "monitor-sinfo", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MON_BM_INFO, "monitor-binfo", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MON_VM_LOAD, "monitor-vload", NULL, NULL) + #if defined(TD_MSG_NUMBER_) TDMT_MAX #endif diff --git a/include/libs/monitor/monitor.h b/include/libs/monitor/monitor.h index fc7a4bb95d..f5080fbe7b 100644 --- a/include/libs/monitor/monitor.h +++ b/include/libs/monitor/monitor.h @@ -19,6 +19,7 @@ #include "tarray.h" #include "tdef.h" #include "tlog.h" +#include "tmsg.h" #ifdef __cplusplus extern "C" { @@ -29,6 +30,44 @@ extern "C" { #define MON_VER_LEN 12 #define MON_LOG_LEN 1024 +typedef struct { + int64_t ts; + ELogLevel level; + char content[MON_LOG_LEN]; +} SMonLogItem; + +typedef struct { + SArray *logs; // array of SMonLogItem + int32_t numOfErrorLogs; + int32_t numOfInfoLogs; + int32_t numOfDebugLogs; + int32_t numOfTraceLogs; +} SMonLogs; + +typedef struct { + char name[TSDB_FILENAME_LEN]; + int8_t level; + SDiskSize size; +} SMonDiskDesc; + +typedef struct { + double cpu_engine; + double cpu_system; + float cpu_cores; + int64_t mem_engine; // KB + int64_t mem_system; // KB + int64_t mem_total; // KB + int64_t disk_engine; // Byte + int64_t disk_used; // Byte + int64_t disk_total; // Byte + int64_t net_in; // bytes + int64_t net_out; // bytes + int64_t io_read; // bytes + int64_t io_write; // bytes + int64_t io_read_disk; // bytes + int64_t io_write_disk; // bytes +} SMonSysInfo; + typedef struct { int32_t dnode_id; char dnode_ep[TSDB_EP_LEN]; @@ -36,6 +75,19 @@ typedef struct { int32_t protocol; } SMonBasicInfo; +typedef struct { + float uptime; // day + int8_t has_mnode; + SMonDiskDesc logdir; + SMonDiskDesc tempdir; +} SMonDnodeInfo; + +typedef struct { + SMonBasicInfo basic; + SMonDnodeInfo dnode; + SMonSysInfo sys; +} SMonDmInfo; + typedef struct { int32_t dnode_id; char dnode_ep[TSDB_EP_LEN]; @@ -87,46 +139,65 @@ typedef struct { } SMonGrantInfo; typedef struct { - float uptime; // day - double cpu_engine; - double cpu_system; - float cpu_cores; - int64_t mem_engine; // KB - int64_t mem_system; // KB - int64_t mem_total; // KB - int64_t disk_engine; // Byte - int64_t disk_used; // Byte - int64_t disk_total; // Byte - int64_t net_in; // bytes - int64_t net_out; // bytes - int64_t io_read; // bytes - int64_t io_write; // bytes - int64_t io_read_disk; // bytes - int64_t io_write_disk; // bytes - int64_t req_select; - int64_t req_insert; - int64_t req_insert_success; - int64_t req_insert_batch; - int64_t req_insert_batch_success; - int32_t errors; - int32_t vnodes_num; - int32_t masters; - int8_t has_mnode; -} SMonDnodeInfo; + SMonClusterInfo cluster; + SMonVgroupInfo vgroup; + SMonGrantInfo grant; + SMonSysInfo sys; + SMonLogs log; +} SMonMmInfo; + +int32_t tSerializeSMonMmInfo(void *buf, int32_t bufLen, SMonMmInfo *pInfo); +int32_t tDeserializeSMonMmInfo(void *buf, int32_t bufLen, SMonMmInfo *pInfo); +void tFreeSMonMmInfo(SMonMmInfo *pInfo); typedef struct { - char name[TSDB_FILENAME_LEN]; - int8_t level; - SDiskSize size; -} SMonDiskDesc; - -typedef struct { - SArray *datadirs; // array of SMonDiskDesc - SMonDiskDesc logdir; - SMonDiskDesc tempdir; + SArray *datadirs; // array of SMonDiskDesc } SMonDiskInfo; -typedef struct SMonInfo SMonInfo; +typedef struct { + SMonDiskInfo tfs; + SVnodesStat vstat; + SMonSysInfo sys; + SMonLogs log; +} SMonVmInfo; + +int32_t tSerializeSMonVmInfo(void *buf, int32_t bufLen, SMonVmInfo *pInfo); +int32_t tDeserializeSMonVmInfo(void *buf, int32_t bufLen, SMonVmInfo *pInfo); +void tFreeSMonVmInfo(SMonVmInfo *pInfo); + +typedef struct { + SMonSysInfo sys; + SMonLogs log; +} SMonQmInfo; + +int32_t tSerializeSMonQmInfo(void *buf, int32_t bufLen, SMonQmInfo *pInfo); +int32_t tDeserializeSMonQmInfo(void *buf, int32_t bufLen, SMonQmInfo *pInfo); +void tFreeSMonQmInfo(SMonQmInfo *pInfo); + +typedef struct { + SMonSysInfo sys; + SMonLogs log; +} SMonSmInfo; + +int32_t tSerializeSMonSmInfo(void *buf, int32_t bufLen, SMonSmInfo *pInfo); +int32_t tDeserializeSMonSmInfo(void *buf, int32_t bufLen, SMonSmInfo *pInfo); +void tFreeSMonSmInfo(SMonSmInfo *pInfo); +typedef struct { + SMonSysInfo sys; + SMonLogs log; +} SMonBmInfo; + +int32_t tSerializeSMonBmInfo(void *buf, int32_t bufLen, SMonBmInfo *pInfo); +int32_t tDeserializeSMonBmInfo(void *buf, int32_t bufLen, SMonBmInfo *pInfo); +void tFreeSMonBmInfo(SMonBmInfo *pInfo); + +typedef struct { + SArray *pVloads; // SVnodeLoad +} SMonVloadInfo; + +int32_t tSerializeSMonVloadInfo(void *buf, int32_t bufLen, SMonVloadInfo *pInfo); +int32_t tDeserializeSMonVloadInfo(void *buf, int32_t bufLen, SMonVloadInfo *pInfo); +void tFreeSMonVloadInfo(SMonVloadInfo *pInfo); typedef struct { const char *server; @@ -138,16 +209,14 @@ typedef struct { int32_t monInit(const SMonCfg *pCfg); void monCleanup(); void monRecordLog(int64_t ts, ELogLevel level, const char *content); - -SMonInfo *monCreateMonitorInfo(); -void monSetBasicInfo(SMonInfo *pMonitor, SMonBasicInfo *pInfo); -void monSetClusterInfo(SMonInfo *pMonitor, SMonClusterInfo *pInfo); -void monSetVgroupInfo(SMonInfo *pMonitor, SMonVgroupInfo *pInfo); -void monSetGrantInfo(SMonInfo *pMonitor, SMonGrantInfo *pInfo); -void monSetDnodeInfo(SMonInfo *pMonitor, SMonDnodeInfo *pInfo); -void monSetDiskInfo(SMonInfo *pMonitor, SMonDiskInfo *pInfo); -void monSendReport(SMonInfo *pMonitor); -void monCleanupMonitorInfo(SMonInfo *pMonitor); +int32_t monGetLogs(SMonLogs *logs); +void monSetDmInfo(SMonDmInfo *pInfo); +void monSetMmInfo(SMonMmInfo *pInfo); +void monSetVmInfo(SMonVmInfo *pInfo); +void monSetQmInfo(SMonQmInfo *pInfo); +void monSetSmInfo(SMonSmInfo *pInfo); +void monSetBmInfo(SMonBmInfo *pInfo); +void monSendReport(); #ifdef __cplusplus } diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index e1cc6f7829..022f11bb0e 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -44,8 +44,8 @@ int32_t taosGetTotalMemory(int64_t *totalKB); int32_t taosGetProcMemory(int64_t *usedKB); int32_t taosGetSysMemory(int64_t *usedKB); int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize); -int32_t taosGetProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes); -int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes); +void taosGetProcIODelta(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes); +void taosGetCardInfoDelta(int64_t *receive_bytes, int64_t *transmit_bytes); void taosKillSystem(); int32_t taosGetSystemUUID(char *uid, int32_t uidlen); diff --git a/source/dnode/mgmt/bm/bmHandle.c b/source/dnode/mgmt/bm/bmHandle.c index 2ae9b3817f..b73acd14c3 100644 --- a/source/dnode/mgmt/bm/bmHandle.c +++ b/source/dnode/mgmt/bm/bmHandle.c @@ -16,6 +16,34 @@ #define _DEFAULT_SOURCE #include "bmInt.h" +void bmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonBmInfo *bmInfo) { +} + +int32_t bmProcessGetMonBmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { + SMonBmInfo bmInfo = {0}; + bmGetMonitorInfo(pWrapper, &bmInfo); + dmGetMonitorSysInfo(&bmInfo.sys); + monGetLogs(&bmInfo.log); + + int32_t rspLen = tSerializeSMonBmInfo(NULL, 0, &bmInfo); + if (rspLen < 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + void *pRsp = rpcMallocCont(rspLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + tSerializeSMonBmInfo(pRsp, rspLen, &bmInfo); + pReq->pRsp = pRsp; + pReq->rspLen = rspLen; + tFreeSMonBmInfo(&bmInfo); + return 0; +} + int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SDnode *pDnode = pWrapper->pDnode; SRpcMsg *pReq = &pMsg->rpcMsg; @@ -54,4 +82,6 @@ int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } } -void bmInitMsgHandle(SMgmtWrapper *pWrapper) {} +void bmInitMsgHandle(SMgmtWrapper *pWrapper) { + dndSetMsgHandle(pWrapper, TDMT_MON_BM_INFO, bmProcessMonitorMsg, DEFAULT_HANDLE); +} diff --git a/source/dnode/mgmt/bm/bmWorker.c b/source/dnode/mgmt/bm/bmWorker.c index a5a97f6af0..cf2d7ac939 100644 --- a/source/dnode/mgmt/bm/bmWorker.c +++ b/source/dnode/mgmt/bm/bmWorker.c @@ -33,7 +33,37 @@ static void bmSendErrorRsps(SMgmtWrapper *pWrapper, STaosQall *qall, int32_t num } } -static void bmProcessQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { +static inline void bmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { + SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, + .ahandle = pMsg->rpcMsg.ahandle, + .code = code, + .pCont = pMsg->pRsp, + .contLen = pMsg->rspLen}; + tmsgSendRsp(&rsp); +} + +static void bmProcessMonQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SBnodeMgmt *pMgmt = pInfo->ahandle; + + dTrace("msg:%p, get from bnode monitor queue", pMsg); + SRpcMsg *pRpc = &pMsg->rpcMsg; + int32_t code = -1; + + if (pMsg->rpcMsg.msgType == TDMT_MON_BM_INFO) { + code = bmProcessGetMonBmInfoReq(pMgmt->pWrapper, pMsg); + } + + if (pRpc->msgType & 1U) { + if (code != 0 && terrno != 0) code = terrno; + bmSendRsp(pMgmt->pWrapper, pMsg, code); + } + + dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); + rpcFreeCont(pRpc->pCont); + taosFreeQitem(pMsg); +} + +static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SBnodeMgmt *pMgmt = pInfo->ahandle; SMgmtWrapper *pWrapper = pMgmt->pWrapper; @@ -72,18 +102,37 @@ int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return 0; } +int32_t bmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SBnodeMgmt *pMgmt = pWrapper->pMgmt; + SSingleWorker *pWorker = &pMgmt->monitorWorker; + + dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); + taosWriteQitem(pWorker->queue, pMsg); + return 0; +} + int32_t bmStartWorker(SBnodeMgmt *pMgmt) { - SMultiWorkerCfg cfg = {.max = 1, .name = "bnode-write", .fp = (FItems)bmProcessQueue, .param = pMgmt}; + SMultiWorkerCfg cfg = {.max = 1, .name = "bnode-write", .fp = (FItems)bmProcessWriteQueue, .param = pMgmt}; if (tMultiWorkerInit(&pMgmt->writeWorker, &cfg) != 0) { - dError("failed to start bnode write worker since %s", terrstr()); + dError("failed to start bnode-write worker since %s", terrstr()); return -1; } + if (tsMultiProcess) { + SSingleWorkerCfg mCfg = { + .min = 1, .max = 1, .name = "bnode-monitor", .fp = (FItem)bmProcessMonQueue, .param = pMgmt}; + if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { + dError("failed to start bnode-monitor worker since %s", terrstr()); + return -1; + } + } + dDebug("bnode workers are initialized"); return 0; } void bmStopWorker(SBnodeMgmt *pMgmt) { + tSingleWorkerCleanup(&pMgmt->monitorWorker); tMultiWorkerCleanup(&pMgmt->writeWorker); dDebug("bnode workers are closed"); } diff --git a/source/dnode/mgmt/dm/dmHandle.c b/source/dnode/mgmt/dm/dmHandle.c index cb712bfb48..685958aaf0 100644 --- a/source/dnode/mgmt/dm/dmHandle.c +++ b/source/dnode/mgmt/dm/dmHandle.c @@ -42,8 +42,9 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, VNODES); if (pWrapper != NULL) { - req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad)); - vmMonitorVnodeLoads(pWrapper, req.pVloads); + SMonVloadInfo info = {0}; + dmGetVnodeLoads(pWrapper, &info); + req.pVloads = info.pVloads; dndReleaseWrapper(pWrapper); } @@ -117,7 +118,6 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { return TSDB_CODE_OPS_NOT_SUPPORT; } - static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, EDndType ntype, SNodeMsg *pMsg) { SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype); if (pWrapper != NULL) { @@ -209,7 +209,7 @@ void dmInitMsgHandle(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, dmProcessMgmtMsg, DEFAULT_HANDLE); // Requests handled by MNODE - dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, dmProcessStatusMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, dmProcessMonitorMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmProcessMgmtMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmProcessMgmtMsg, DEFAULT_HANDLE); } diff --git a/source/dnode/mgmt/dm/dmMonitor.c b/source/dnode/mgmt/dm/dmMonitor.c index 22d9086971..fb5855070c 100644 --- a/source/dnode/mgmt/dm/dmMonitor.c +++ b/source/dnode/mgmt/dm/dmMonitor.c @@ -14,21 +14,7 @@ */ #define _DEFAULT_SOURCE -#include "dndInt.h" - -static int32_t dmGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo) { - tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name)); - pInfo->logdir.size = tsLogSpace.size; - tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name)); - pInfo->tempdir.size = tsTempSpace.size; - - SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, VNODES); - if (pWrapper != NULL) { - vmMonitorTfsInfo(pWrapper, pInfo); - dndReleaseWrapper(pWrapper); - } - return 0; -} +#include "dmInt.h" static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) { pInfo->protocol = 1; @@ -39,6 +25,163 @@ static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) { static void dmGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) { pInfo->uptime = (taosGetTimestampMs() - pDnode->rebootTime) / (86400000.0f); + SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, MNODE); + if (pWrapper != NULL) { + pInfo->has_mnode = pWrapper->required; + dndReleaseWrapper(pWrapper); + } + tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name)); + pInfo->logdir.size = tsLogSpace.size; + tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name)); + pInfo->tempdir.size = tsTempSpace.size; +} + +static void dmGetMonitorInfo(SDnode *pDnode, SMonDmInfo *pInfo) { + dmGetMonitorBasicInfo(pDnode, &pInfo->basic); + dmGetMonitorSysInfo(&pInfo->sys); + dmGetMonitorDnodeInfo(pDnode, &pInfo->dnode); +} + +void dmSendMonitorReport(SDnode *pDnode) { + if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return; + dTrace("send monitor report to %s:%u", tsMonitorFqdn, tsMonitorPort); + + SMonDmInfo dmInfo = {0}; + SMonMmInfo mmInfo = {0}; + SMonVmInfo vmInfo = {0}; + SMonQmInfo qmInfo = {0}; + SMonSmInfo smInfo = {0}; + SMonBmInfo bmInfo = {0}; + + SRpcMsg req = {0}; + SRpcMsg rsp; + SEpSet epset = {.inUse = 0, .numOfEps = 1}; + tstrncpy(epset.eps[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN); + epset.eps[0].port = tsServerPort; + + SMgmtWrapper *pWrapper = NULL; + dmGetMonitorInfo(pDnode, &dmInfo); + + bool getFromAPI = !tsMultiProcess; + pWrapper = &pDnode->wrappers[MNODE]; + if (getFromAPI) { + if (dndMarkWrapper(pWrapper) != 0) { + mmGetMonitorInfo(pWrapper, &mmInfo); + dndReleaseWrapper(pWrapper); + } + } else { + if (pWrapper->required) { + req.msgType = TDMT_MON_MM_INFO; + dndSendRecv(pDnode, &epset, &req, &rsp); + if (rsp.code == 0 && rsp.contLen > 0) { + tDeserializeSMonMmInfo(rsp.pCont, rsp.contLen, &mmInfo); + } + rpcFreeCont(rsp.pCont); + } + } + + pWrapper = &pDnode->wrappers[VNODES]; + if (getFromAPI) { + if (dndMarkWrapper(pWrapper) != 0) { + vmGetMonitorInfo(pWrapper, &vmInfo); + dndReleaseWrapper(pWrapper); + } + } else { + if (pWrapper->required) { + req.msgType = TDMT_MON_VM_INFO; + dndSendRecv(pDnode, &epset, &req, &rsp); + if (rsp.code == 0 && rsp.contLen > 0) { + tDeserializeSMonVmInfo(rsp.pCont, rsp.contLen, &vmInfo); + } + rpcFreeCont(rsp.pCont); + } + } + + pWrapper = &pDnode->wrappers[QNODE]; + if (getFromAPI) { + if (dndMarkWrapper(pWrapper) != 0) { + qmGetMonitorInfo(pWrapper, &qmInfo); + dndReleaseWrapper(pWrapper); + } + } else { + if (pWrapper->required) { + req.msgType = TDMT_MON_QM_INFO; + dndSendRecv(pDnode, &epset, &req, &rsp); + if (rsp.code == 0 && rsp.contLen > 0) { + tDeserializeSMonQmInfo(rsp.pCont, rsp.contLen, &qmInfo); + } + rpcFreeCont(rsp.pCont); + } + } + + pWrapper = &pDnode->wrappers[SNODE]; + if (getFromAPI) { + if (dndMarkWrapper(pWrapper) != 0) { + smGetMonitorInfo(pWrapper, &smInfo); + dndReleaseWrapper(pWrapper); + } + } else { + if (pWrapper->required) { + req.msgType = TDMT_MON_SM_INFO; + dndSendRecv(pDnode, &epset, &req, &rsp); + if (rsp.code == 0 && rsp.contLen > 0) { + tDeserializeSMonSmInfo(rsp.pCont, rsp.contLen, &smInfo); + } + rpcFreeCont(rsp.pCont); + } + } + + pWrapper = &pDnode->wrappers[BNODE]; + if (getFromAPI) { + if (dndMarkWrapper(pWrapper) != 0) { + bmGetMonitorInfo(pWrapper, &bmInfo); + dndReleaseWrapper(pWrapper); + } + } else { + if (pWrapper->required) { + req.msgType = TDMT_MON_BM_INFO; + dndSendRecv(pDnode, &epset, &req, &rsp); + if (rsp.code == 0 && rsp.contLen > 0) { + tDeserializeSMonBmInfo(rsp.pCont, rsp.contLen, &bmInfo); + } + rpcFreeCont(rsp.pCont); + } + } + + monSetDmInfo(&dmInfo); + monSetMmInfo(&mmInfo); + monSetVmInfo(&vmInfo); + monSetQmInfo(&qmInfo); + monSetSmInfo(&smInfo); + monSetBmInfo(&bmInfo); + tFreeSMonMmInfo(&mmInfo); + tFreeSMonVmInfo(&vmInfo); + tFreeSMonQmInfo(&qmInfo); + tFreeSMonSmInfo(&smInfo); + tFreeSMonBmInfo(&bmInfo); + monSendReport(); +} + +void dmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) { + bool getFromAPI = !tsMultiProcess; + if (getFromAPI) { + vmGetVnodeLoads(pWrapper, pInfo); + } else { + SRpcMsg req = {.msgType = TDMT_MON_VM_LOAD}; + SRpcMsg rsp = {0}; + SEpSet epset = {.inUse = 0, .numOfEps = 1}; + tstrncpy(epset.eps[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN); + epset.eps[0].port = tsServerPort; + + dndSendRecv(pWrapper->pDnode, &epset, &req, &rsp); + if (rsp.code == 0 && rsp.contLen > 0) { + tDeserializeSMonVloadInfo(rsp.pCont, rsp.contLen, pInfo); + } + rpcFreeCont(rsp.pCont); + } +} + +void dmGetMonitorSysInfo(SMonSysInfo *pInfo) { taosGetCpuUsage(&pInfo->cpu_engine, &pInfo->cpu_system); taosGetCpuCores(&pInfo->cpu_cores); taosGetProcMemory(&pInfo->mem_engine); @@ -47,61 +190,6 @@ static void dmGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) { pInfo->disk_engine = 0; pInfo->disk_used = tsDataSpace.size.used; pInfo->disk_total = tsDataSpace.size.total; - taosGetCardInfo(&pInfo->net_in, &pInfo->net_out); - taosGetProcIO(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk); - - SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, VNODES); - if (pWrapper != NULL) { - vmMonitorVnodeReqs(pWrapper, pInfo); - dndReleaseWrapper(pWrapper); - } - - pWrapper = dndAcquireWrapper(pDnode, MNODE); - if (pWrapper != NULL) { - pInfo->has_mnode = pWrapper->required; - dndReleaseWrapper(pWrapper); - } + taosGetCardInfoDelta(&pInfo->net_in, &pInfo->net_out); + taosGetProcIODelta(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk); } - -void dmSendMonitorReport(SDnode *pDnode) { - if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return; - dTrace("send monitor report to %s:%u", tsMonitorFqdn, tsMonitorPort); - - SMonInfo *pMonitor = monCreateMonitorInfo(); - if (pMonitor == NULL) return; - - SMonBasicInfo basicInfo = {0}; - dmGetMonitorBasicInfo(pDnode, &basicInfo); - monSetBasicInfo(pMonitor, &basicInfo); - - SMonClusterInfo clusterInfo = {0}; - SMonVgroupInfo vgroupInfo = {0}; - SMonGrantInfo grantInfo = {0}; - - SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, MNODE); - if (pWrapper != NULL) { - if (mmMonitorMnodeInfo(pWrapper, &clusterInfo, &vgroupInfo, &grantInfo) == 0) { - monSetClusterInfo(pMonitor, &clusterInfo); - monSetVgroupInfo(pMonitor, &vgroupInfo); - monSetGrantInfo(pMonitor, &grantInfo); - } - dndReleaseWrapper(pWrapper); - } - - SMonDnodeInfo dnodeInfo = {0}; - dmGetMonitorDnodeInfo(pDnode, &dnodeInfo); - monSetDnodeInfo(pMonitor, &dnodeInfo); - - SMonDiskInfo diskInfo = {0}; - if (dmGetMonitorDiskInfo(pDnode, &diskInfo) == 0) { - monSetDiskInfo(pMonitor, &diskInfo); - } - - taosArrayDestroy(clusterInfo.dnodes); - taosArrayDestroy(clusterInfo.mnodes); - taosArrayDestroy(vgroupInfo.vgroups); - taosArrayDestroy(diskInfo.datadirs); - - monSendReport(pMonitor); - monCleanupMonitorInfo(pMonitor); -} \ No newline at end of file diff --git a/source/dnode/mgmt/dm/dmWorker.c b/source/dnode/mgmt/dm/dmWorker.c index ec7392d7e4..41b38c8bb7 100644 --- a/source/dnode/mgmt/dm/dmWorker.c +++ b/source/dnode/mgmt/dm/dmWorker.c @@ -101,9 +101,9 @@ int32_t dmStartWorker(SDnodeMgmt *pMgmt) { return -1; } - SSingleWorkerCfg scfg = {.min = 1, .max = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt}; - if (tSingleWorkerInit(&pMgmt->statusWorker, &scfg) != 0) { - dError("failed to start dnode status worker since %s", terrstr()); + SSingleWorkerCfg scfg = {.min = 1, .max = 1, .name = "dnode-monitor", .fp = (FItem)dmProcessQueue, .param = pMgmt}; + if (tSingleWorkerInit(&pMgmt->monitorWorker, &scfg) != 0) { + dError("failed to start dnode monitor worker since %s", terrstr()); return -1; } @@ -113,7 +113,7 @@ int32_t dmStartWorker(SDnodeMgmt *pMgmt) { void dmStopWorker(SDnodeMgmt *pMgmt) { tSingleWorkerCleanup(&pMgmt->mgmtWorker); - tSingleWorkerCleanup(&pMgmt->statusWorker); + tSingleWorkerCleanup(&pMgmt->monitorWorker); if (pMgmt->threadId != NULL) { taosDestoryThread(pMgmt->threadId); @@ -131,9 +131,9 @@ int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return 0; } -int32_t dmProcessStatusMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { +int32_t dmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SDnodeMgmt *pMgmt = pWrapper->pMgmt; - SSingleWorker *pWorker = &pMgmt->statusWorker; + SSingleWorker *pWorker = &pMgmt->monitorWorker; dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); taosWriteQitem(pWorker->queue, pMsg); diff --git a/source/dnode/mgmt/inc/bmInt.h b/source/dnode/mgmt/inc/bmInt.h index 919b1d2c7c..84a6a53e99 100644 --- a/source/dnode/mgmt/inc/bmInt.h +++ b/source/dnode/mgmt/inc/bmInt.h @@ -17,6 +17,7 @@ #define _TD_DND_BNODE_INT_H_ #include "dndInt.h" + #include "bnode.h" #ifdef __cplusplus @@ -29,6 +30,7 @@ typedef struct SBnodeMgmt { SMgmtWrapper *pWrapper; const char *path; SMultiWorker writeWorker; + SSingleWorker monitorWorker; } SBnodeMgmt; // bmInt.c @@ -39,11 +41,13 @@ int32_t bmDrop(SMgmtWrapper *pWrapper); void bmInitMsgHandle(SMgmtWrapper *pWrapper); int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t bmProcessGetMonBmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); // bmWorker.c int32_t bmStartWorker(SBnodeMgmt *pMgmt); void bmStopWorker(SBnodeMgmt *pMgmt); int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t bmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/inc/dmInt.h b/source/dnode/mgmt/inc/dmInt.h index 5dcd1be47c..a671368f06 100644 --- a/source/dnode/mgmt/inc/dmInt.h +++ b/source/dnode/mgmt/inc/dmInt.h @@ -32,7 +32,7 @@ typedef struct SDnodeMgmt { TdThread *threadId; SRWLatch latch; SSingleWorker mgmtWorker; - SSingleWorker statusWorker; + SSingleWorker monitorWorker; SMsgCb msgCb; const char *path; SDnode *pDnode; @@ -54,6 +54,7 @@ int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t dmProcessCDnodeReq(SDnode *pDnode, SNodeMsg *pMsg); // dmMonitor.c +void dmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo); void dmSendMonitorReport(SDnode *pDnode); // dmWorker.c @@ -61,7 +62,7 @@ int32_t dmStartThread(SDnodeMgmt *pMgmt); int32_t dmStartWorker(SDnodeMgmt *pMgmt); void dmStopWorker(SDnodeMgmt *pMgmt); int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t dmProcessStatusMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t dmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/inc/dndInt.h b/source/dnode/mgmt/inc/dndInt.h index 5659becb20..7faf1e4276 100644 --- a/source/dnode/mgmt/inc/dndInt.h +++ b/source/dnode/mgmt/inc/dndInt.h @@ -169,6 +169,7 @@ void dndCleanupTrans(SDnode *pDnode); SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper); SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper); int32_t dndInitMsgHandle(SDnode *pDnode); +void dndSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); // mgmt void dmSetMgmtFp(SMgmtWrapper *pWrapper); @@ -182,22 +183,13 @@ void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet); void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet); void dmSendRedirectRsp(SDnodeMgmt *pMgmt, const SRpcMsg *pMsg); -typedef struct { - int32_t openVnodes; - int32_t totalVnodes; - int32_t masterNum; - int64_t numOfSelectReqs; - int64_t numOfInsertReqs; - int64_t numOfInsertSuccessReqs; - int64_t numOfBatchInsertReqs; - int64_t numOfBatchInsertSuccessReqs; -} SVnodesStat; - -void vmMonitorVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads); -int32_t vmMonitorTfsInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo); -void vmMonitorVnodeReqs(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo); -int32_t mmMonitorMnodeInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, - SMonGrantInfo *pGrantInfo); +void dmGetMonitorSysInfo(SMonSysInfo *pInfo); +void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo); +void mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonMmInfo *mmInfo); +void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *vmInfo); +void qmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonQmInfo *qmInfo); +void smGetMonitorInfo(SMgmtWrapper *pWrapper, SMonSmInfo *smInfo); +void bmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonBmInfo *bmInfo); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/inc/mmInt.h b/source/dnode/mgmt/inc/mmInt.h index d09d15255d..df63e22059 100644 --- a/source/dnode/mgmt/inc/mmInt.h +++ b/source/dnode/mgmt/inc/mmInt.h @@ -32,6 +32,7 @@ typedef struct SMnodeMgmt { SSingleWorker readWorker; SSingleWorker writeWorker; SSingleWorker syncWorker; + SSingleWorker monitorWorker; SReplica replicas[TSDB_MAX_REPLICA]; int8_t replica; int8_t selfIndex; @@ -51,6 +52,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper); int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t mmProcessGetMonMmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); // mmWorker.c int32_t mmStartWorker(SMnodeMgmt *pMgmt); @@ -59,6 +61,7 @@ int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t mmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); diff --git a/source/dnode/mgmt/inc/qmInt.h b/source/dnode/mgmt/inc/qmInt.h index 02905413ab..012869d637 100644 --- a/source/dnode/mgmt/inc/qmInt.h +++ b/source/dnode/mgmt/inc/qmInt.h @@ -30,6 +30,7 @@ typedef struct SQnodeMgmt { const char *path; SSingleWorker queryWorker; SSingleWorker fetchWorker; + SSingleWorker monitorWorker; } SQnodeMgmt; // qmInt.c @@ -40,6 +41,7 @@ int32_t qmDrop(SMgmtWrapper *pWrapper); void qmInitMsgHandle(SMgmtWrapper *pWrapper); int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t qmProcessGetMonQmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); // qmWorker.c int32_t qmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); @@ -50,6 +52,7 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt); void qmStopWorker(SQnodeMgmt *pMgmt); int32_t qmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t qmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t qmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/inc/smInt.h b/source/dnode/mgmt/inc/smInt.h index 285ec84942..039dea2491 100644 --- a/source/dnode/mgmt/inc/smInt.h +++ b/source/dnode/mgmt/inc/smInt.h @@ -32,6 +32,7 @@ typedef struct SSnodeMgmt { int8_t uniqueWorkerInUse; SArray *uniqueWorkers; // SArray SSingleWorker sharedWorker; + SSingleWorker monitorWorker; } SSnodeMgmt; // smInt.c @@ -42,6 +43,7 @@ int32_t smDrop(SMgmtWrapper *pWrapper); void smInitMsgHandle(SMgmtWrapper *pWrapper); int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t smProcessGetMonSmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); // smWorker.c int32_t smStartWorker(SSnodeMgmt *pMgmt); @@ -50,6 +52,7 @@ int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t smProcessExecMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/inc/vmInt.h b/source/dnode/mgmt/inc/vmInt.h index 889ad7c164..f8466fe4f2 100644 --- a/source/dnode/mgmt/inc/vmInt.h +++ b/source/dnode/mgmt/inc/vmInt.h @@ -28,6 +28,7 @@ typedef struct SVnodesMgmt { SHashObj *hash; SRWLatch latch; SVnodesStat state; + SVnodesStat lastState; STfs *pTfs; SQWorkerPool queryPool; SQWorkerPool fetchPool; @@ -38,6 +39,7 @@ typedef struct SVnodesMgmt { SDnode *pDnode; SMgmtWrapper *pWrapper; SSingleWorker mgmtWorker; + SSingleWorker monitorWorker; } SVnodesMgmt; typedef struct { @@ -91,6 +93,8 @@ int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); int32_t vmProcessSyncVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); +int32_t vmProcessGetMonVmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); +int32_t vmProcessGetVnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); // vmFile.c int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); @@ -114,6 +118,7 @@ int32_t vmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t vmProcessMergeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrappert, SNodeMsg *pMsg); +int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/main/dndTransport.c b/source/dnode/mgmt/main/dndTransport.c index e76633bb1f..3b0aca4b39 100644 --- a/source/dnode/mgmt/main/dndTransport.c +++ b/source/dnode/mgmt/main/dndTransport.c @@ -12,7 +12,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ - + #define _DEFAULT_SOURCE #include "dndInt.h" @@ -482,4 +482,8 @@ SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) { .parent = pWrapper, .name = pWrapper->name}; return cfg; +} + +void dndSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) { + rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp); } \ No newline at end of file diff --git a/source/dnode/mgmt/mm/mmHandle.c b/source/dnode/mgmt/mm/mmHandle.c index e5495c7e1d..eb72106e4b 100644 --- a/source/dnode/mgmt/mm/mmHandle.c +++ b/source/dnode/mgmt/mm/mmHandle.c @@ -16,6 +16,36 @@ #define _DEFAULT_SOURCE #include "mmInt.h" +void mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonMmInfo *mmInfo) { + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + mndGetMonitorInfo(pMgmt->pMnode, &mmInfo->cluster, &mmInfo->vgroup, &mmInfo->grant); +} + +int32_t mmProcessGetMonMmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { + SMonMmInfo mmInfo = {0}; + mmGetMonitorInfo(pWrapper, &mmInfo); + dmGetMonitorSysInfo(&mmInfo.sys); + monGetLogs(&mmInfo.log); + + int32_t rspLen = tSerializeSMonMmInfo(NULL, 0, &mmInfo); + if (rspLen < 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + void *pRsp = rpcMallocCont(rspLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + tSerializeSMonMmInfo(pRsp, rspLen, &mmInfo); + pReq->pRsp = pRsp; + pReq->rspLen = rspLen; + tFreeSMonMmInfo(&mmInfo); + return 0; +} + int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SDnode *pDnode = pWrapper->pDnode; SRpcMsg *pReq = &pMsg->rpcMsg; @@ -74,6 +104,8 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { } void mmInitMsgHandle(SMgmtWrapper *pWrapper) { + dndSetMsgHandle(pWrapper, TDMT_MON_MM_INFO, mmProcessMonitorMsg, DEFAULT_HANDLE); + // Requests handled by DNODE dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); @@ -163,5 +195,4 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, mmProcessQueryMsg, MNODE_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, mmProcessQueryMsg, MNODE_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, mmProcessQueryMsg, MNODE_HANDLE); - } diff --git a/source/dnode/mgmt/mm/mmInt.c b/source/dnode/mgmt/mm/mmInt.c index cd6ea1499f..8d4ac80e72 100644 --- a/source/dnode/mgmt/mm/mmInt.c +++ b/source/dnode/mgmt/mm/mmInt.c @@ -241,8 +241,3 @@ void mmSetMgmtFp(SMgmtWrapper *pWrapper) { pWrapper->fp = mgmtFp; } -int32_t mmMonitorMnodeInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, - SMonGrantInfo *pGrantInfo) { - SMnodeMgmt *pMgmt = pWrapper->pMgmt; - return mndGetMonitorInfo(pMgmt->pMnode, pClusterInfo, pVgroupInfo, pGrantInfo); -} diff --git a/source/dnode/mgmt/mm/mmWorker.c b/source/dnode/mgmt/mm/mmWorker.c index 735ef53b37..44bac61906 100644 --- a/source/dnode/mgmt/mm/mmWorker.c +++ b/source/dnode/mgmt/mm/mmWorker.c @@ -23,19 +23,18 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { SRpcMsg *pRpc = &pMsg->rpcMsg; int32_t code = -1; - if (pMsg->rpcMsg.msgType != TDMT_DND_ALTER_MNODE) { + if (pMsg->rpcMsg.msgType == TDMT_DND_ALTER_MNODE) { + code = mmProcessAlterReq(pMgmt, pMsg); + } else if (pMsg->rpcMsg.msgType == TDMT_MON_MM_INFO) { + code = mmProcessGetMonMmInfoReq(pMgmt->pWrapper, pMsg); + } else { pMsg->pNode = pMgmt->pMnode; code = mndProcessMsg(pMsg); - } else { - code = mmProcessAlterReq(pMgmt, pMsg); } if (pRpc->msgType & 1U) { if (pRpc->handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { - if (code != 0) { - code = terrno; - dError("msg:%p, failed to process since %s", pMsg, terrstr()); - } + if (code != 0 && terrno != 0) code = terrno; SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp}; tmsgSendRsp(&rsp); } @@ -98,6 +97,15 @@ int32_t mmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return 0; } +int32_t mmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + SSingleWorker *pWorker = &pMgmt->monitorWorker; + + dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); + taosWriteQitem(pWorker->queue, pMsg); + return 0; +} + static int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) { SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); if (pMsg == NULL) return -1; @@ -157,15 +165,24 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { SSingleWorkerCfg sCfg = {.min = 1, .max = 1, .name = "mnode-sync", .fp = (FItem)mmProcessQueue, .param = pMgmt}; if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) { - dError("failed to start mnode sync-worker since %s", terrstr()); + dError("failed to start mnode mnode-sync worker since %s", terrstr()); return -1; } + if (tsMultiProcess) { + SSingleWorkerCfg mCfg = {.min = 1, .max = 1, .name = "mnode-monitor", .fp = (FItem)mmProcessQueue, .param = pMgmt}; + if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { + dError("failed to start mnode mnode-monitor worker since %s", terrstr()); + return -1; + } + } + dDebug("mnode workers are initialized"); return 0; } void mmStopWorker(SMnodeMgmt *pMgmt) { + tSingleWorkerCleanup(&pMgmt->monitorWorker); tSingleWorkerCleanup(&pMgmt->queryWorker); tSingleWorkerCleanup(&pMgmt->readWorker); tSingleWorkerCleanup(&pMgmt->writeWorker); diff --git a/source/dnode/mgmt/qm/qmHandle.c b/source/dnode/mgmt/qm/qmHandle.c index 77a9db1175..11b4ff0622 100644 --- a/source/dnode/mgmt/qm/qmHandle.c +++ b/source/dnode/mgmt/qm/qmHandle.c @@ -16,6 +16,34 @@ #define _DEFAULT_SOURCE #include "qmInt.h" +void qmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonQmInfo *qmInfo) { +} + +int32_t qmProcessGetMonQmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { + SMonQmInfo qmInfo = {0}; + qmGetMonitorInfo(pWrapper, &qmInfo); + dmGetMonitorSysInfo(&qmInfo.sys); + monGetLogs(&qmInfo.log); + + int32_t rspLen = tSerializeSMonQmInfo(NULL, 0, &qmInfo); + if (rspLen < 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + void *pRsp = rpcMallocCont(rspLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + tSerializeSMonQmInfo(pRsp, rspLen, &qmInfo); + pReq->pRsp = pRsp; + pReq->rspLen = rspLen; + tFreeSMonQmInfo(&qmInfo); + return 0; +} + int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SDnode *pDnode = pWrapper->pDnode; SRpcMsg *pReq = &pMsg->rpcMsg; @@ -55,6 +83,8 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } void qmInitMsgHandle(SMgmtWrapper *pWrapper) { + dndSetMsgHandle(pWrapper, TDMT_MON_QM_INFO, qmProcessMonitorMsg, DEFAULT_HANDLE); + // Requests handled by VNODE dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, qmProcessQueryMsg, QNODE_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, qmProcessQueryMsg, QNODE_HANDLE); diff --git a/source/dnode/mgmt/qm/qmWorker.c b/source/dnode/mgmt/qm/qmWorker.c index db0752949d..6c8382aef9 100644 --- a/source/dnode/mgmt/qm/qmWorker.c +++ b/source/dnode/mgmt/qm/qmWorker.c @@ -16,11 +16,36 @@ #define _DEFAULT_SOURCE #include "qmInt.h" -static void qmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { - SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code}; +static inline void qmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { + SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, + .ahandle = pMsg->rpcMsg.ahandle, + .code = code, + .pCont = pMsg->pRsp, + .contLen = pMsg->rspLen}; tmsgSendRsp(&rsp); } +static void qmProcessMonQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SQnodeMgmt *pMgmt = pInfo->ahandle; + + dTrace("msg:%p, get from qnode monitor queue", pMsg); + SRpcMsg *pRpc = &pMsg->rpcMsg; + int32_t code = -1; + + if (pMsg->rpcMsg.msgType == TDMT_MON_SM_INFO) { + code = qmProcessGetMonQmInfoReq(pMgmt->pWrapper, pMsg); + } + + if (pRpc->msgType & 1U) { + if (code != 0 && terrno != 0) code = terrno; + qmSendRsp(pMgmt->pWrapper, pMsg, code); + } + + dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); + rpcFreeCont(pRpc->pCont); + taosFreeQitem(pMsg); +} + static void qmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { SQnodeMgmt *pMgmt = pInfo->ahandle; @@ -66,6 +91,15 @@ int32_t qmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return 0; } +int32_t qmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SQnodeMgmt *pMgmt = pWrapper->pMgmt; + SSingleWorker *pWorker = &pMgmt->monitorWorker; + + dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); + taosWriteQitem(pWorker->queue, pMsg); + return 0; +} + static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) { SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); if (pMsg == NULL) { @@ -128,11 +162,21 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { return -1; } + if (tsMultiProcess) { + SSingleWorkerCfg mCfg = { + .min = 1, .max = 1, .name = "qnode-monitor", .fp = (FItem)qmProcessMonQueue, .param = pMgmt}; + if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { + dError("failed to start qnode-monitor worker since %s", terrstr()); + return -1; + } + } + dDebug("qnode workers are initialized"); return 0; } void qmStopWorker(SQnodeMgmt *pMgmt) { + tSingleWorkerCleanup(&pMgmt->monitorWorker); tSingleWorkerCleanup(&pMgmt->queryWorker); tSingleWorkerCleanup(&pMgmt->fetchWorker); dDebug("qnode workers are closed"); diff --git a/source/dnode/mgmt/sm/smHandle.c b/source/dnode/mgmt/sm/smHandle.c index a1fa41a686..fed6b6528c 100644 --- a/source/dnode/mgmt/sm/smHandle.c +++ b/source/dnode/mgmt/sm/smHandle.c @@ -16,6 +16,33 @@ #define _DEFAULT_SOURCE #include "smInt.h" +void smGetMonitorInfo(SMgmtWrapper *pWrapper, SMonSmInfo *smInfo) {} + +int32_t smProcessGetMonSmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { + SMonSmInfo smInfo = {0}; + smGetMonitorInfo(pWrapper, &smInfo); + dmGetMonitorSysInfo(&smInfo.sys); + monGetLogs(&smInfo.log); + + int32_t rspLen = tSerializeSMonSmInfo(NULL, 0, &smInfo); + if (rspLen < 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + void *pRsp = rpcMallocCont(rspLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + tSerializeSMonSmInfo(pRsp, rspLen, &smInfo); + pReq->pRsp = pRsp; + pReq->rspLen = rspLen; + tFreeSMonSmInfo(&smInfo); + return 0; +} + int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SDnode *pDnode = pWrapper->pDnode; SRpcMsg *pReq = &pMsg->rpcMsg; @@ -55,6 +82,8 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } void smInitMsgHandle(SMgmtWrapper *pWrapper) { + dndSetMsgHandle(pWrapper, TDMT_MON_SM_INFO, smProcessMonitorMsg, DEFAULT_HANDLE); + // Requests handled by SNODE dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, smProcessMgmtMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, smProcessExecMsg, DEFAULT_HANDLE); diff --git a/source/dnode/mgmt/sm/smWorker.c b/source/dnode/mgmt/sm/smWorker.c index afa843953b..a29d5d1abc 100644 --- a/source/dnode/mgmt/sm/smWorker.c +++ b/source/dnode/mgmt/sm/smWorker.c @@ -16,6 +16,36 @@ #define _DEFAULT_SOURCE #include "smInt.h" +static inline void smSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { + SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, + .ahandle = pMsg->rpcMsg.ahandle, + .code = code, + .pCont = pMsg->pRsp, + .contLen = pMsg->rspLen}; + tmsgSendRsp(&rsp); +} + +static void smProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SSnodeMgmt *pMgmt = pInfo->ahandle; + + dTrace("msg:%p, get from snode monitor queue", pMsg); + SRpcMsg *pRpc = &pMsg->rpcMsg; + int32_t code = -1; + + if (pMsg->rpcMsg.msgType == TDMT_MON_SM_INFO) { + code = smProcessGetMonSmInfoReq(pMgmt->pWrapper, pMsg); + } + + if (pRpc->msgType & 1U) { + if (code != 0 && terrno != 0) code = terrno; + smSendRsp(pMgmt->pWrapper, pMsg, code); + } + + dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); + rpcFreeCont(pRpc->pCont); + taosFreeQitem(pMsg); +} + static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SSnodeMgmt *pMgmt = pInfo->ahandle; @@ -80,11 +110,21 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { return -1; } + if (tsMultiProcess) { + SSingleWorkerCfg mCfg = { + .min = 1, .max = 1, .name = "snode-monitor", .fp = (FItem)smProcessMonitorQueue, .param = pMgmt}; + if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { + dError("failed to start snode-monitor worker since %s", terrstr()); + return -1; + } + } + dDebug("snode workers are initialized"); return 0; } void smStopWorker(SSnodeMgmt *pMgmt) { + tSingleWorkerCleanup(&pMgmt->monitorWorker); for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) { SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, i); tMultiWorkerCleanup(pWorker); @@ -120,6 +160,15 @@ int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return 0; } +int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SSnodeMgmt *pMgmt = pWrapper->pMgmt; + SSingleWorker *pWorker = &pMgmt->monitorWorker; + + dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); + taosWriteQitem(pWorker->queue, pMsg); + return 0; +} + int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SSnodeMgmt *pMgmt = pWrapper->pMgmt; int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg); diff --git a/source/dnode/mgmt/vm/vmHandle.c b/source/dnode/mgmt/vm/vmHandle.c index 52af9cac60..ecad414fd6 100644 --- a/source/dnode/mgmt/vm/vmHandle.c +++ b/source/dnode/mgmt/vm/vmHandle.c @@ -16,6 +16,71 @@ #define _DEFAULT_SOURCE #include "vmInt.h" +void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *vmInfo) { + SVnodesMgmt *pMgmt = pWrapper->pMgmt; + tfsGetMonitorInfo(pMgmt->pTfs, &vmInfo->tfs); + + taosWLockLatch(&pMgmt->latch); + vmInfo->vstat.totalVnodes = pMgmt->state.totalVnodes; + vmInfo->vstat.masterNum = pMgmt->state.masterNum; + vmInfo->vstat.numOfSelectReqs = pMgmt->state.numOfSelectReqs - pMgmt->lastState.numOfSelectReqs; + vmInfo->vstat.numOfInsertReqs = pMgmt->state.numOfInsertReqs - pMgmt->lastState.numOfInsertReqs; + vmInfo->vstat.numOfInsertSuccessReqs = pMgmt->state.numOfInsertSuccessReqs - pMgmt->lastState.numOfInsertSuccessReqs; + vmInfo->vstat.numOfBatchInsertReqs = pMgmt->state.numOfBatchInsertReqs - pMgmt->lastState.numOfBatchInsertReqs; + vmInfo->vstat.numOfBatchInsertSuccessReqs = + pMgmt->state.numOfBatchInsertSuccessReqs - pMgmt->lastState.numOfBatchInsertSuccessReqs; + pMgmt->lastState = pMgmt->state; + taosWUnLockLatch(&pMgmt->latch); +} + +int32_t vmProcessGetMonVmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { + SMonVmInfo vmInfo = {0}; + vmGetMonitorInfo(pWrapper, &vmInfo); + dmGetMonitorSysInfo(&vmInfo.sys); + monGetLogs(&vmInfo.log); + + int32_t rspLen = tSerializeSMonVmInfo(NULL, 0, &vmInfo); + if (rspLen < 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + void *pRsp = rpcMallocCont(rspLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + tSerializeSMonVmInfo(pRsp, rspLen, &vmInfo); + pReq->pRsp = pRsp; + pReq->rspLen = rspLen; + tFreeSMonVmInfo(&vmInfo); + return 0; +} + +int32_t vmProcessGetVnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { + SMonVloadInfo vloads = {0}; + vmGetVnodeLoads(pWrapper, &vloads); + + int32_t rspLen = tSerializeSMonVloadInfo(NULL, 0, &vloads); + if (rspLen < 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + void *pRsp = rpcMallocCont(rspLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + tSerializeSMonVloadInfo(pRsp, rspLen, &vloads); + pReq->pRsp = pRsp; + pReq->rspLen = rspLen; + tFreeSMonVloadInfo(&vloads); + return 0; +} + static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->vgId = pCreate->vgId; pCfg->wsize = pCreate->cacheBlockSize; @@ -239,6 +304,9 @@ int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { } void vmInitMsgHandle(SMgmtWrapper *pWrapper) { + dndSetMsgHandle(pWrapper, TDMT_MON_VM_INFO, vmProcessMonitorMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MON_VM_LOAD, vmProcessMonitorMsg, DEFAULT_HANDLE); + // Requests handled by VNODE dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg, DEFAULT_HANDLE); diff --git a/source/dnode/mgmt/vm/vmInt.c b/source/dnode/mgmt/vm/vmInt.c index b3390ba31c..6a1a5c3987 100644 --- a/source/dnode/mgmt/vm/vmInt.c +++ b/source/dnode/mgmt/vm/vmInt.c @@ -344,38 +344,21 @@ void vmSetMgmtFp(SMgmtWrapper *pWrapper) { pWrapper->fp = mgmtFp; } -int32_t vmMonitorTfsInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo) { - SVnodesMgmt *pMgmt = pWrapper->pMgmt; - if (pMgmt == NULL) return -1; - - return tfsGetMonitorInfo(pMgmt->pTfs, pInfo); -} - -void vmMonitorVnodeReqs(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo) { - SVnodesMgmt *pMgmt = pWrapper->pMgmt; - if (pMgmt == NULL) return; - - SVnodesStat *pStat = &pMgmt->state; - pInfo->req_select = pStat->numOfSelectReqs; - pInfo->req_insert = pStat->numOfInsertReqs; - pInfo->req_insert_success = pStat->numOfInsertSuccessReqs; - pInfo->req_insert_batch = pStat->numOfBatchInsertReqs; - pInfo->req_insert_batch_success = pStat->numOfBatchInsertSuccessReqs; - pInfo->errors = tsNumOfErrorLogs; - pInfo->vnodes_num = pStat->totalVnodes; - pInfo->masters = pStat->masterNum; -} - -void vmMonitorVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads) { +void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) { SVnodesMgmt *pMgmt = pWrapper->pMgmt; SVnodesStat *pStat = &pMgmt->state; - int32_t totalVnodes = 0; - int32_t masterNum = 0; - int64_t numOfSelectReqs = 0; - int64_t numOfInsertReqs = 0; - int64_t numOfInsertSuccessReqs = 0; - int64_t numOfBatchInsertReqs = 0; - int64_t numOfBatchInsertSuccessReqs = 0; + SArray *pLoads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad)); + + int32_t totalVnodes = 0; + int32_t masterNum = 0; + int64_t numOfSelectReqs = 0; + int64_t numOfInsertReqs = 0; + int64_t numOfInsertSuccessReqs = 0; + int64_t numOfBatchInsertReqs = 0; + int64_t numOfBatchInsertSuccessReqs = 0; + + pInfo->pVloads = pLoads; + if (pLoads == NULL) return; taosRLockLatch(&pMgmt->latch); @@ -402,6 +385,7 @@ void vmMonitorVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads) { taosRUnLockLatch(&pMgmt->latch); + taosWLockLatch(&pMgmt->latch); pStat->totalVnodes = totalVnodes; pStat->masterNum = masterNum; pStat->numOfSelectReqs = numOfSelectReqs; @@ -409,4 +393,5 @@ void vmMonitorVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads) { pStat->numOfInsertSuccessReqs = numOfInsertSuccessReqs; pStat->numOfBatchInsertReqs = numOfBatchInsertReqs; pStat->numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs; + taosWUnLockLatch(&pMgmt->latch); } \ No newline at end of file diff --git a/source/dnode/mgmt/vm/vmWorker.c b/source/dnode/mgmt/vm/vmWorker.c index ed1a4ca2f4..ff92cf880b 100644 --- a/source/dnode/mgmt/vm/vmWorker.c +++ b/source/dnode/mgmt/vm/vmWorker.c @@ -16,8 +16,12 @@ #define _DEFAULT_SOURCE #include "vmInt.h" -static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { - SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code}; +static inline void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { + SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, + .ahandle = pMsg->rpcMsg.ahandle, + .code = code, + .pCont = pMsg->pRsp, + .contLen = pMsg->rspLen}; tmsgSendRsp(&rsp); } @@ -26,9 +30,15 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { int32_t code = -1; tmsg_t msgType = pMsg->rpcMsg.msgType; - dTrace("msg:%p, will be processed in vnode-mgmt queue", pMsg); + dTrace("msg:%p, will be processed in vnode-m queue", pMsg); switch (msgType) { + case TDMT_MON_VM_INFO: + code = vmProcessGetMonVmInfoReq(pMgmt->pWrapper, pMsg); + break; + case TDMT_MON_VM_LOAD: + code = vmProcessGetVnodeLoadsReq(pMgmt->pWrapper, pMsg); + break; case TDMT_DND_CREATE_VNODE: code = vmProcessCreateVnodeReq(pMgmt, pMsg); break; @@ -255,6 +265,15 @@ int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return 0; } +int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SVnodesMgmt *pMgmt = pWrapper->pMgmt; + SSingleWorker *pWorker = &pMgmt->monitorWorker; + + dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); + taosWriteQitem(pWorker->queue, pMsg); + return 0; +} + static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) { SVnodesMgmt *pMgmt = pWrapper->pMgmt; SMsgHead *pHead = pRpc->pCont; @@ -412,11 +431,21 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { return -1; } + if (tsMultiProcess) { + SSingleWorkerCfg mCfg = { + .min = 1, .max = 1, .name = "vnode-monitor", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt}; + if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { + dError("failed to start mnode vnode-monitor worker since %s", terrstr()); + return -1; + } + } + dDebug("vnode workers are initialized"); return 0; } void vmStopWorker(SVnodesMgmt *pMgmt) { + tSingleWorkerCleanup(&pMgmt->monitorWorker); tSingleWorkerCleanup(&pMgmt->mgmtWorker); tQWorkerCleanup(&pMgmt->fetchPool); tQWorkerCleanup(&pMgmt->queryPool); diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 314e70db9b..6bc3ea9fc0 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -237,7 +237,7 @@ int32_t mndGetDnodeSize(SMnode *pMnode) { bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs) { int64_t interval = TABS(pDnode->lastAccessTime - curMs); - if (interval > 3500 * tsStatusInterval) { + if (interval > 30000 * tsStatusInterval) { if (pDnode->rebootTime > 0) { pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT; } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 5c3dd778e1..86bbc0257b 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -502,7 +502,11 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr SMonVgroupDesc desc = {0}; desc.vgroup_id = pVgroup->vgId; - strncpy(desc.database_name, pVgroup->dbName, sizeof(desc.database_name)); + + SName name = {0}; + tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + tNameGetDbName(&name, desc.database_name); + desc.tables_num = pVgroup->numOfTables; pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries; tstrncpy(desc.status, "unsynced", sizeof(desc.status)); diff --git a/source/libs/monitor/inc/monInt.h b/source/libs/monitor/inc/monInt.h index 452c38f66b..ae1af4ba62 100644 --- a/source/libs/monitor/inc/monInt.h +++ b/source/libs/monitor/inc/monInt.h @@ -18,43 +18,32 @@ #include "monitor.h" -#include "tarray.h" #include "tjson.h" typedef struct { - int64_t ts; - ELogLevel level; - char content[MON_LOG_LEN]; -} SMonLogItem; - -typedef struct { - int64_t time; - int64_t req_select; - int64_t req_insert; - int64_t req_insert_batch; - int64_t net_in; - int64_t net_out; - int64_t io_read; - int64_t io_write; - int64_t io_read_disk; - int64_t io_write_disk; -} SMonState; - -typedef struct SMonInfo { - int64_t curTime; - SMonState lastState; - SArray *logs; // array of SMonLogItem - SJson *pJson; + int64_t curTime; + int64_t lastTime; + SJson *pJson; + SMonLogs log; + SMonDmInfo dmInfo; + SMonMmInfo mmInfo; + SMonVmInfo vmInfo; + SMonSmInfo smInfo; + SMonQmInfo qmInfo; + SMonBmInfo bmInfo; } SMonInfo; typedef struct { TdThreadMutex lock; - SArray *logs; // array of SMonLogItem - int32_t maxLogs; - const char *server; - uint16_t port; - bool comp; - SMonState state; + SArray *logs; // array of SMonLogItem + SMonCfg cfg; + int64_t lastTime; + SMonDmInfo dmInfo; + SMonMmInfo mmInfo; + SMonVmInfo vmInfo; + SMonSmInfo smInfo; + SMonQmInfo qmInfo; + SMonBmInfo bmInfo; } SMonitor; #ifdef __cplusplus diff --git a/source/libs/monitor/src/monitor.c b/source/libs/monitor/src/monMain.c similarity index 52% rename from source/libs/monitor/src/monitor.c rename to source/libs/monitor/src/monMain.c index b92c08d51c..c90b1f58e8 100644 --- a/source/libs/monitor/src/monitor.c +++ b/source/libs/monitor/src/monMain.c @@ -17,7 +17,6 @@ #include "monInt.h" #include "taoserror.h" #include "thttp.h" -#include "tlog.h" #include "ttime.h" static SMonitor tsMonitor = {0}; @@ -25,7 +24,7 @@ static SMonitor tsMonitor = {0}; void monRecordLog(int64_t ts, ELogLevel level, const char *content) { taosThreadMutexLock(&tsMonitor.lock); int32_t size = taosArrayGetSize(tsMonitor.logs); - if (size < tsMonitor.maxLogs) { + if (size < tsMonitor.cfg.maxLogs) { SMonLogItem item = {.ts = ts, .level = level}; SMonLogItem *pItem = taosArrayPush(tsMonitor.logs, &item); if (pItem != NULL) { @@ -35,6 +34,68 @@ void monRecordLog(int64_t ts, ELogLevel level, const char *content) { taosThreadMutexUnlock(&tsMonitor.lock); } +int32_t monGetLogs(SMonLogs *logs) { + taosThreadMutexLock(&tsMonitor.lock); + logs->logs = taosArrayDup(tsMonitor.logs); + logs->numOfInfoLogs = tsNumOfInfoLogs; + logs->numOfErrorLogs = tsNumOfErrorLogs; + logs->numOfDebugLogs = tsNumOfDebugLogs; + logs->numOfTraceLogs = tsNumOfTraceLogs; + tsNumOfInfoLogs = 0; + tsNumOfErrorLogs = 0; + tsNumOfDebugLogs = 0; + tsNumOfTraceLogs = 0; + taosArrayClear(tsMonitor.logs); + taosThreadMutexUnlock(&tsMonitor.lock); + if (logs->logs == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + return 0; +} + +void monSetDmInfo(SMonDmInfo *pInfo) { + taosThreadMutexLock(&tsMonitor.lock); + memcpy(&tsMonitor.dmInfo, pInfo, sizeof(SMonDmInfo)); + taosThreadMutexUnlock(&tsMonitor.lock); + memset(pInfo, 0, sizeof(SMonDmInfo)); +} + +void monSetMmInfo(SMonMmInfo *pInfo) { + taosThreadMutexLock(&tsMonitor.lock); + memcpy(&tsMonitor.mmInfo, pInfo, sizeof(SMonMmInfo)); + taosThreadMutexUnlock(&tsMonitor.lock); + memset(pInfo, 0, sizeof(SMonMmInfo)); +} + +void monSetVmInfo(SMonVmInfo *pInfo) { + taosThreadMutexLock(&tsMonitor.lock); + memcpy(&tsMonitor.vmInfo, pInfo, sizeof(SMonVmInfo)); + taosThreadMutexUnlock(&tsMonitor.lock); + memset(pInfo, 0, sizeof(SMonVmInfo)); +} + +void monSetQmInfo(SMonQmInfo *pInfo) { + taosThreadMutexLock(&tsMonitor.lock); + memcpy(&tsMonitor.qmInfo, pInfo, sizeof(SMonQmInfo)); + taosThreadMutexUnlock(&tsMonitor.lock); + memset(pInfo, 0, sizeof(SMonQmInfo)); +} + +void monSetSmInfo(SMonSmInfo *pInfo) { + taosThreadMutexLock(&tsMonitor.lock); + memcpy(&tsMonitor.smInfo, pInfo, sizeof(SMonSmInfo)); + taosThreadMutexUnlock(&tsMonitor.lock); + memset(pInfo, 0, sizeof(SMonSmInfo)); +} + +void monSetBmInfo(SMonBmInfo *pInfo) { + taosThreadMutexLock(&tsMonitor.lock); + memcpy(&tsMonitor.bmInfo, pInfo, sizeof(SMonBmInfo)); + taosThreadMutexUnlock(&tsMonitor.lock); + memset(pInfo, 0, sizeof(SMonBmInfo)); +} + int32_t monInit(const SMonCfg *pCfg) { tsMonitor.logs = taosArrayInit(16, sizeof(SMonLogItem)); if (tsMonitor.logs == NULL) { @@ -42,12 +103,9 @@ int32_t monInit(const SMonCfg *pCfg) { return -1; } - tsMonitor.maxLogs = pCfg->maxLogs; - tsMonitor.server = pCfg->server; - tsMonitor.port = pCfg->port; - tsMonitor.comp = pCfg->comp; + tsMonitor.cfg = *pCfg; tsLogFp = monRecordLog; - tsMonitor.state.time = taosGetTimestampMs(); + tsMonitor.lastTime = taosGetTimestampMs(); taosThreadMutexInit(&tsMonitor.lock, NULL); return 0; } @@ -56,42 +114,65 @@ void monCleanup() { tsLogFp = NULL; taosArrayDestroy(tsMonitor.logs); tsMonitor.logs = NULL; + tFreeSMonMmInfo(&tsMonitor.mmInfo); + tFreeSMonVmInfo(&tsMonitor.vmInfo); + tFreeSMonSmInfo(&tsMonitor.smInfo); + tFreeSMonQmInfo(&tsMonitor.qmInfo); + tFreeSMonBmInfo(&tsMonitor.bmInfo); taosThreadMutexDestroy(&tsMonitor.lock); } -SMonInfo *monCreateMonitorInfo() { +static void monCleanupMonitorInfo(SMonInfo *pMonitor) { + tsMonitor.lastTime = pMonitor->curTime; + taosArrayDestroy(pMonitor->log.logs); + tFreeSMonMmInfo(&pMonitor->mmInfo); + tFreeSMonVmInfo(&pMonitor->vmInfo); + tFreeSMonSmInfo(&pMonitor->smInfo); + tFreeSMonQmInfo(&pMonitor->qmInfo); + tFreeSMonBmInfo(&pMonitor->bmInfo); + tjsonDelete(pMonitor->pJson); + taosMemoryFree(pMonitor); +} + +static SMonInfo *monCreateMonitorInfo() { SMonInfo *pMonitor = taosMemoryCalloc(1, sizeof(SMonInfo)); if (pMonitor == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + monGetLogs(&pMonitor->log); + taosThreadMutexLock(&tsMonitor.lock); - pMonitor->logs = taosArrayDup(tsMonitor.logs); - taosArrayClear(tsMonitor.logs); + memcpy(&pMonitor->dmInfo, &tsMonitor.dmInfo, sizeof(SMonDmInfo)); + memcpy(&pMonitor->mmInfo, &tsMonitor.mmInfo, sizeof(SMonMmInfo)); + memcpy(&pMonitor->vmInfo, &tsMonitor.vmInfo, sizeof(SMonVmInfo)); + memcpy(&pMonitor->smInfo, &tsMonitor.smInfo, sizeof(SMonSmInfo)); + memcpy(&pMonitor->qmInfo, &tsMonitor.qmInfo, sizeof(SMonQmInfo)); + memcpy(&pMonitor->bmInfo, &tsMonitor.bmInfo, sizeof(SMonBmInfo)); + memset(&tsMonitor.dmInfo, 0, sizeof(SMonDmInfo)); + memset(&tsMonitor.mmInfo, 0, sizeof(SMonMmInfo)); + memset(&tsMonitor.vmInfo, 0, sizeof(SMonVmInfo)); + memset(&tsMonitor.smInfo, 0, sizeof(SMonSmInfo)); + memset(&tsMonitor.qmInfo, 0, sizeof(SMonQmInfo)); + memset(&tsMonitor.bmInfo, 0, sizeof(SMonBmInfo)); taosThreadMutexUnlock(&tsMonitor.lock); pMonitor->pJson = tjsonCreateObject(); - if (pMonitor->pJson == NULL || pMonitor->logs == NULL) { + if (pMonitor->pJson == NULL || pMonitor->log.logs == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; monCleanupMonitorInfo(pMonitor); return NULL; } pMonitor->curTime = taosGetTimestampMs(); - pMonitor->lastState = tsMonitor.state; + pMonitor->lastTime = tsMonitor.lastTime; return pMonitor; } -void monCleanupMonitorInfo(SMonInfo *pMonitor) { - tsMonitor.state = pMonitor->lastState; - tsMonitor.state.time = pMonitor->curTime; - taosArrayDestroy(pMonitor->logs); - tjsonDelete(pMonitor->pJson); - taosMemoryFree(pMonitor); -} +static void monGenBasicJson(SMonInfo *pMonitor) { + SMonBasicInfo *pInfo = &pMonitor->dmInfo.basic; -void monSetBasicInfo(SMonInfo *pMonitor, SMonBasicInfo *pInfo) { SJson *pJson = pMonitor->pJson; char buf[40] = {0}; taosFormatUtcTime(buf, sizeof(buf), pMonitor->curTime, TSDB_TIME_PRECISION_MILLI); @@ -104,7 +185,10 @@ void monSetBasicInfo(SMonInfo *pMonitor, SMonBasicInfo *pInfo) { tjsonAddDoubleToObject(pJson, "protocol", pInfo->protocol); } -void monSetClusterInfo(SMonInfo *pMonitor, SMonClusterInfo *pInfo) { +static void monGenClusterJson(SMonInfo *pMonitor) { + SMonClusterInfo *pInfo = &pMonitor->mmInfo.cluster; + if (pMonitor->mmInfo.cluster.first_ep_dnode_id == 0) return; + SJson *pJson = tjsonCreateObject(); if (pJson == NULL) return; if (tjsonAddItemToObject(pMonitor->pJson, "cluster_info", pJson) != 0) { @@ -154,7 +238,10 @@ void monSetClusterInfo(SMonInfo *pMonitor, SMonClusterInfo *pInfo) { } } -void monSetVgroupInfo(SMonInfo *pMonitor, SMonVgroupInfo *pInfo) { +static void monGenVgroupJson(SMonInfo *pMonitor) { + SMonVgroupInfo *pInfo = &pMonitor->mmInfo.vgroup; + if (pMonitor->mmInfo.cluster.first_ep_dnode_id == 0) return; + SJson *pJson = tjsonAddArrayToObject(pMonitor->pJson, "vgroup_infos"); if (pJson == NULL) return; @@ -190,7 +277,10 @@ void monSetVgroupInfo(SMonInfo *pMonitor, SMonVgroupInfo *pInfo) { } } -void monSetGrantInfo(SMonInfo *pMonitor, SMonGrantInfo *pInfo) { +static void monGenGrantJson(SMonInfo *pMonitor) { + SMonGrantInfo *pInfo = &pMonitor->mmInfo.grant; + if (pMonitor->mmInfo.cluster.first_ep_dnode_id == 0) return; + SJson *pJson = tjsonCreateObject(); if (pJson == NULL) return; if (tjsonAddItemToObject(pMonitor->pJson, "grant_info", pJson) != 0) { @@ -203,7 +293,11 @@ void monSetGrantInfo(SMonInfo *pMonitor, SMonGrantInfo *pInfo) { tjsonAddDoubleToObject(pJson, "timeseries_total", pInfo->timeseries_total); } -void monSetDnodeInfo(SMonInfo *pMonitor, SMonDnodeInfo *pInfo) { +static void monGenDnodeJson(SMonInfo *pMonitor) { + SMonDnodeInfo *pInfo = &pMonitor->dmInfo.dnode; + SMonSysInfo *pSys = &pMonitor->dmInfo.sys; + SVnodesStat *pStat = &pMonitor->vmInfo.vstat; + SJson *pJson = tjsonCreateObject(); if (pJson == NULL) return; if (tjsonAddItemToObject(pMonitor->pJson, "dnode_info", pJson) != 0) { @@ -211,58 +305,83 @@ void monSetDnodeInfo(SMonInfo *pMonitor, SMonDnodeInfo *pInfo) { return; } - SMonState *pLast = &pMonitor->lastState; - double interval = (pMonitor->curTime - pLast->time) / 1000.0; - double req_select_rate = (pInfo->req_select - pLast->req_select) / interval; - double req_insert_rate = (pInfo->req_insert - pLast->req_insert) / interval; - double req_insert_batch_rate = (pInfo->req_insert_batch - pLast->req_insert_batch) / interval; - double net_in_rate = (pInfo->net_in - pLast->net_in) / interval; - double net_out_rate = (pInfo->net_out - pLast->net_out) / interval; - double io_read_rate = (pInfo->io_read - pLast->io_read) / interval; - double io_write_rate = (pInfo->io_write - pLast->io_write) / interval; - double io_read_disk_rate = (pInfo->io_read_disk - pLast->io_read_disk) / interval; - double io_write_disk_rate = (pInfo->io_write_disk - pLast->io_write_disk) / interval; - pLast->req_select = pInfo->req_select; - pLast->req_insert = pInfo->req_insert; - pLast->req_insert_batch = pInfo->req_insert_batch; - pLast->net_in = pInfo->net_in; - pLast->net_out = pInfo->net_out; - pLast->io_read = pInfo->io_read; - pLast->io_write = pInfo->io_write; - pLast->io_read_disk = pInfo->io_read_disk; - pLast->io_write_disk = pInfo->io_write_disk; + double interval = (pMonitor->curTime - pMonitor->lastTime) / 1000.0; + if (pMonitor->curTime - pMonitor->lastTime == 0) { + interval = 1; + } + + double cpu_engine = 0; + double mem_engine = 0; + double net_in = 0; + double net_out = 0; + double io_read = 0; + double io_write = 0; + double io_read_disk = 0; + double io_write_disk = 0; + + SMonSysInfo *sysArrays[6]; + sysArrays[0] = &pMonitor->dmInfo.sys; + sysArrays[1] = &pMonitor->mmInfo.sys; + sysArrays[2] = &pMonitor->vmInfo.sys; + sysArrays[3] = &pMonitor->qmInfo.sys; + sysArrays[4] = &pMonitor->smInfo.sys; + sysArrays[5] = &pMonitor->bmInfo.sys; + for (int32_t i = 0; i < 6; ++i) { + cpu_engine += sysArrays[i]->cpu_engine; + mem_engine += sysArrays[i]->mem_engine; + net_in += sysArrays[i]->net_in; + net_out += sysArrays[i]->net_out; + io_read += sysArrays[i]->io_read; + io_write += sysArrays[i]->io_write; + io_read_disk += sysArrays[i]->io_read_disk; + io_write_disk += sysArrays[i]->io_write_disk; + } + + double req_select_rate = pStat->numOfSelectReqs / interval; + double req_insert_rate = pStat->numOfInsertReqs / interval; + double req_insert_batch_rate = pStat->numOfBatchInsertReqs / interval; + double net_in_rate = net_in / interval; + double net_out_rate = net_out / interval; + double io_read_rate = io_read / interval; + double io_write_rate = io_write / interval; + double io_read_disk_rate = io_read_disk / interval; + double io_write_disk_rate = io_write_disk / interval; tjsonAddDoubleToObject(pJson, "uptime", pInfo->uptime); - tjsonAddDoubleToObject(pJson, "cpu_engine", pInfo->cpu_engine); - tjsonAddDoubleToObject(pJson, "cpu_system", pInfo->cpu_system); - tjsonAddDoubleToObject(pJson, "cpu_cores", pInfo->cpu_cores); - tjsonAddDoubleToObject(pJson, "mem_engine", pInfo->mem_engine); - tjsonAddDoubleToObject(pJson, "mem_system", pInfo->mem_system); - tjsonAddDoubleToObject(pJson, "mem_total", pInfo->mem_total); - tjsonAddDoubleToObject(pJson, "disk_engine", pInfo->disk_engine); - tjsonAddDoubleToObject(pJson, "disk_used", pInfo->disk_used); - tjsonAddDoubleToObject(pJson, "disk_total", pInfo->disk_total); + tjsonAddDoubleToObject(pJson, "cpu_engine", cpu_engine); + tjsonAddDoubleToObject(pJson, "cpu_system", pSys->cpu_system); + tjsonAddDoubleToObject(pJson, "cpu_cores", pSys->cpu_cores); + tjsonAddDoubleToObject(pJson, "mem_engine", mem_engine); + tjsonAddDoubleToObject(pJson, "mem_system", pSys->mem_system); + tjsonAddDoubleToObject(pJson, "mem_total", pSys->mem_total); + tjsonAddDoubleToObject(pJson, "disk_engine", pSys->disk_engine); + tjsonAddDoubleToObject(pJson, "disk_used", pSys->disk_used); + tjsonAddDoubleToObject(pJson, "disk_total", pSys->disk_total); tjsonAddDoubleToObject(pJson, "net_in", net_in_rate); tjsonAddDoubleToObject(pJson, "net_out", net_out_rate); tjsonAddDoubleToObject(pJson, "io_read", io_read_rate); tjsonAddDoubleToObject(pJson, "io_write", io_write_rate); tjsonAddDoubleToObject(pJson, "io_read_disk", io_read_disk_rate); tjsonAddDoubleToObject(pJson, "io_write_disk", io_write_disk_rate); - tjsonAddDoubleToObject(pJson, "req_select", pInfo->req_select); + tjsonAddDoubleToObject(pJson, "req_select", pStat->numOfSelectReqs); tjsonAddDoubleToObject(pJson, "req_select_rate", req_select_rate); - tjsonAddDoubleToObject(pJson, "req_insert", pInfo->req_insert); - tjsonAddDoubleToObject(pJson, "req_insert_success", pInfo->req_insert_success); + tjsonAddDoubleToObject(pJson, "req_insert", pStat->numOfInsertReqs); + tjsonAddDoubleToObject(pJson, "req_insert_success", pStat->numOfInsertSuccessReqs); tjsonAddDoubleToObject(pJson, "req_insert_rate", req_insert_rate); - tjsonAddDoubleToObject(pJson, "req_insert_batch", pInfo->req_insert_batch); - tjsonAddDoubleToObject(pJson, "req_insert_batch_success", pInfo->req_insert_batch_success); + tjsonAddDoubleToObject(pJson, "req_insert_batch", pStat->numOfBatchInsertReqs); + tjsonAddDoubleToObject(pJson, "req_insert_batch_success", pStat->numOfBatchInsertSuccessReqs); tjsonAddDoubleToObject(pJson, "req_insert_batch_rate", req_insert_batch_rate); - tjsonAddDoubleToObject(pJson, "errors", pInfo->errors); - tjsonAddDoubleToObject(pJson, "vnodes_num", pInfo->vnodes_num); - tjsonAddDoubleToObject(pJson, "masters", pInfo->masters); + tjsonAddDoubleToObject(pJson, "errors", pStat->errors); + tjsonAddDoubleToObject(pJson, "vnodes_num", pStat->totalVnodes); + tjsonAddDoubleToObject(pJson, "masters", pStat->masterNum); tjsonAddDoubleToObject(pJson, "has_mnode", pInfo->has_mnode); } -void monSetDiskInfo(SMonInfo *pMonitor, SMonDiskInfo *pInfo) { +static void monGenDiskJson(SMonInfo *pMonitor) { + SMonDiskInfo *pInfo = &pMonitor->vmInfo.tfs; + SMonDiskDesc *pLogDesc = &pMonitor->dmInfo.dnode.logdir; + SMonDiskDesc *pTempDesc = &pMonitor->dmInfo.dnode.tempdir; + SJson *pJson = tjsonCreateObject(); if (pJson == NULL) return; if (tjsonAddItemToObject(pMonitor->pJson, "disk_infos", pJson) != 0) { @@ -290,18 +409,18 @@ void monSetDiskInfo(SMonInfo *pMonitor, SMonDiskInfo *pInfo) { SJson *pLogdirJson = tjsonCreateObject(); if (pLogdirJson == NULL) return; if (tjsonAddItemToObject(pJson, "logdir", pLogdirJson) != 0) return; - tjsonAddStringToObject(pLogdirJson, "name", pInfo->logdir.name); - tjsonAddDoubleToObject(pLogdirJson, "avail", pInfo->logdir.size.avail); - tjsonAddDoubleToObject(pLogdirJson, "used", pInfo->logdir.size.used); - tjsonAddDoubleToObject(pLogdirJson, "total", pInfo->logdir.size.total); + tjsonAddStringToObject(pLogdirJson, "name", pLogDesc->name); + tjsonAddDoubleToObject(pLogdirJson, "avail", pLogDesc->size.avail); + tjsonAddDoubleToObject(pLogdirJson, "used", pLogDesc->size.used); + tjsonAddDoubleToObject(pLogdirJson, "total", pLogDesc->size.total); SJson *pTempdirJson = tjsonCreateObject(); if (pTempdirJson == NULL) return; if (tjsonAddItemToObject(pJson, "tempdir", pTempdirJson) != 0) return; - tjsonAddStringToObject(pTempdirJson, "name", pInfo->tempdir.name); - tjsonAddDoubleToObject(pTempdirJson, "avail", pInfo->tempdir.size.avail); - tjsonAddDoubleToObject(pTempdirJson, "used", pInfo->tempdir.size.used); - tjsonAddDoubleToObject(pTempdirJson, "total", pInfo->tempdir.size.total); + tjsonAddStringToObject(pTempdirJson, "name", pTempDesc->name); + tjsonAddDoubleToObject(pTempdirJson, "avail", pTempDesc->size.avail); + tjsonAddDoubleToObject(pTempdirJson, "used", pTempDesc->size.used); + tjsonAddDoubleToObject(pTempdirJson, "total", pTempDesc->size.total); } static const char *monLogLevelStr(ELogLevel level) { @@ -319,7 +438,7 @@ static const char *monLogLevelStr(ELogLevel level) { } } -static void monSetLogInfo(SMonInfo *pMonitor) { +static void monGenLogJson(SMonInfo *pMonitor) { SJson *pJson = tjsonCreateObject(); if (pJson == NULL) return; if (tjsonAddItemToObject(pMonitor->pJson, "log_infos", pJson) != 0) { @@ -330,20 +449,41 @@ static void monSetLogInfo(SMonInfo *pMonitor) { SJson *pLogsJson = tjsonAddArrayToObject(pJson, "logs"); if (pLogsJson == NULL) return; - for (int32_t i = 0; i < taosArrayGetSize(pMonitor->logs); ++i) { - SJson *pLogJson = tjsonCreateObject(); - if (pLogJson == NULL) continue; + SMonLogs *logs[6]; + logs[0] = &pMonitor->log; + logs[1] = &pMonitor->mmInfo.log; + logs[2] = &pMonitor->vmInfo.log; + logs[3] = &pMonitor->smInfo.log; + logs[4] = &pMonitor->qmInfo.log; + logs[5] = &pMonitor->bmInfo.log; - SMonLogItem *pLogItem = taosArrayGet(pMonitor->logs, i); + int32_t numOfErrorLogs = 0; + int32_t numOfInfoLogs = 0; + int32_t numOfDebugLogs = 0; + int32_t numOfTraceLogs = 0; - char buf[40] = {0}; - taosFormatUtcTime(buf, sizeof(buf), pLogItem->ts, TSDB_TIME_PRECISION_MILLI); + for (int32_t j = 0; j < 6; j++) { + SMonLogs *pLog = logs[j]; + numOfErrorLogs += pLog->numOfErrorLogs; + numOfInfoLogs += pLog->numOfInfoLogs; + numOfDebugLogs += pLog->numOfDebugLogs; + numOfTraceLogs += pLog->numOfTraceLogs; - tjsonAddStringToObject(pLogJson, "ts", buf); - tjsonAddStringToObject(pLogJson, "level", monLogLevelStr(pLogItem->level)); - tjsonAddStringToObject(pLogJson, "content", pLogItem->content); + for (int32_t i = 0; i < taosArrayGetSize(pLog->logs); ++i) { + SJson *pLogJson = tjsonCreateObject(); + if (pLogJson == NULL) continue; - if (tjsonAddItemToArray(pLogsJson, pLogJson) != 0) tjsonDelete(pLogJson); + SMonLogItem *pLogItem = taosArrayGet(pLog->logs, i); + + char buf[40] = {0}; + taosFormatUtcTime(buf, sizeof(buf), pLogItem->ts, TSDB_TIME_PRECISION_MILLI); + + tjsonAddStringToObject(pLogJson, "ts", buf); + tjsonAddStringToObject(pLogJson, "level", monLogLevelStr(pLogItem->level)); + tjsonAddStringToObject(pLogJson, "content", pLogItem->content); + + if (tjsonAddItemToArray(pLogsJson, pLogJson) != 0) tjsonDelete(pLogJson); + } } SJson *pSummaryJson = tjsonAddArrayToObject(pJson, "summary"); @@ -352,35 +492,48 @@ static void monSetLogInfo(SMonInfo *pMonitor) { SJson *pLogError = tjsonCreateObject(); if (pLogError == NULL) return; tjsonAddStringToObject(pLogError, "level", "error"); - tjsonAddDoubleToObject(pLogError, "total", tsNumOfErrorLogs); + tjsonAddDoubleToObject(pLogError, "total", numOfErrorLogs); if (tjsonAddItemToArray(pSummaryJson, pLogError) != 0) tjsonDelete(pLogError); SJson *pLogInfo = tjsonCreateObject(); if (pLogInfo == NULL) return; tjsonAddStringToObject(pLogInfo, "level", "info"); - tjsonAddDoubleToObject(pLogInfo, "total", tsNumOfInfoLogs); + tjsonAddDoubleToObject(pLogInfo, "total", numOfInfoLogs); if (tjsonAddItemToArray(pSummaryJson, pLogInfo) != 0) tjsonDelete(pLogInfo); SJson *pLogDebug = tjsonCreateObject(); if (pLogDebug == NULL) return; tjsonAddStringToObject(pLogDebug, "level", "debug"); - tjsonAddDoubleToObject(pLogDebug, "total", tsNumOfDebugLogs); + tjsonAddDoubleToObject(pLogDebug, "total", numOfDebugLogs); if (tjsonAddItemToArray(pSummaryJson, pLogDebug) != 0) tjsonDelete(pLogDebug); SJson *pLogTrace = tjsonCreateObject(); if (pLogTrace == NULL) return; tjsonAddStringToObject(pLogTrace, "level", "trace"); - tjsonAddDoubleToObject(pLogTrace, "total", tsNumOfTraceLogs); + tjsonAddDoubleToObject(pLogTrace, "total", numOfTraceLogs); if (tjsonAddItemToArray(pSummaryJson, pLogTrace) != 0) tjsonDelete(pLogTrace); } -void monSendReport(SMonInfo *pMonitor) { - monSetLogInfo(pMonitor); +void monSendReport() { + SMonInfo *pMonitor = monCreateMonitorInfo(); + if (pMonitor == NULL) return; + + monGenBasicJson(pMonitor); + monGenClusterJson(pMonitor); + monGenVgroupJson(pMonitor); + monGenGrantJson(pMonitor); + monGenDnodeJson(pMonitor); + monGenDiskJson(pMonitor); + monGenLogJson(pMonitor); char *pCont = tjsonToString(pMonitor->pJson); if (pCont != NULL) { - EHttpCompFlag flag = tsMonitor.comp ? HTTP_GZIP : HTTP_FLAT; - taosSendHttpReport(tsMonitor.server, tsMonitor.port, pCont, strlen(pCont), flag); + EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT; + if (taosSendHttpReport(tsMonitor.cfg.server, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) { + uError("failed to send monitor msg since %s", terrstr()); + } taosMemoryFree(pCont); } + + monCleanupMonitorInfo(pMonitor); } diff --git a/source/libs/monitor/src/monMsg.c b/source/libs/monitor/src/monMsg.c new file mode 100644 index 0000000000..94645f9da1 --- /dev/null +++ b/source/libs/monitor/src/monMsg.c @@ -0,0 +1,531 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "monInt.h" +#include "tcoding.h" +#include "tencode.h" + +static int32_t tEncodeSMonSysInfo(SCoder *encoder, const SMonSysInfo *pInfo) { + if (tEncodeDouble(encoder, pInfo->cpu_engine) < 0) return -1; + if (tEncodeDouble(encoder, pInfo->cpu_system) < 0) return -1; + if (tEncodeFloat(encoder, pInfo->cpu_cores) < 0) return -1; + if (tEncodeI64(encoder, pInfo->mem_engine) < 0) return -1; + if (tEncodeI64(encoder, pInfo->mem_system) < 0) return -1; + if (tEncodeI64(encoder, pInfo->mem_total) < 0) return -1; + if (tEncodeI64(encoder, pInfo->disk_engine) < 0) return -1; + if (tEncodeI64(encoder, pInfo->disk_used) < 0) return -1; + if (tEncodeI64(encoder, pInfo->disk_total) < 0) return -1; + if (tEncodeI64(encoder, pInfo->net_in) < 0) return -1; + if (tEncodeI64(encoder, pInfo->net_out) < 0) return -1; + if (tEncodeI64(encoder, pInfo->io_read) < 0) return -1; + if (tEncodeI64(encoder, pInfo->io_write) < 0) return -1; + if (tEncodeI64(encoder, pInfo->io_read_disk) < 0) return -1; + if (tEncodeI64(encoder, pInfo->io_write_disk) < 0) return -1; + return 0; +} + +static int32_t tDecodeSMonSysInfo(SCoder *decoder, SMonSysInfo *pInfo) { + if (tDecodeDouble(decoder, &pInfo->cpu_engine) < 0) return -1; + if (tDecodeDouble(decoder, &pInfo->cpu_system) < 0) return -1; + if (tDecodeFloat(decoder, &pInfo->cpu_cores) < 0) return -1; + if (tDecodeI64(decoder, &pInfo->mem_engine) < 0) return -1; + if (tDecodeI64(decoder, &pInfo->mem_system) < 0) return -1; + if (tDecodeI64(decoder, &pInfo->mem_total) < 0) return -1; + if (tDecodeI64(decoder, &pInfo->disk_engine) < 0) return -1; + if (tDecodeI64(decoder, &pInfo->disk_used) < 0) return -1; + if (tDecodeI64(decoder, &pInfo->disk_total) < 0) return -1; + if (tDecodeI64(decoder, &pInfo->net_in) < 0) return -1; + if (tDecodeI64(decoder, &pInfo->net_out) < 0) return -1; + if (tDecodeI64(decoder, &pInfo->io_read) < 0) return -1; + if (tDecodeI64(decoder, &pInfo->io_write) < 0) return -1; + if (tDecodeI64(decoder, &pInfo->io_read_disk) < 0) return -1; + if (tDecodeI64(decoder, &pInfo->io_write_disk) < 0) return -1; + return 0; +} + +int32_t tEncodeSMonLogs(SCoder *encoder, const SMonLogs *pInfo) { + if (tEncodeI32(encoder, pInfo->numOfErrorLogs) < 0) return -1; + if (tEncodeI32(encoder, pInfo->numOfInfoLogs) < 0) return -1; + if (tEncodeI32(encoder, pInfo->numOfDebugLogs) < 0) return -1; + if (tEncodeI32(encoder, pInfo->numOfTraceLogs) < 0) return -1; + if (tEncodeI32(encoder, taosArrayGetSize(pInfo->logs)) < 0) return -1; + for (int32_t i = 0; i < taosArrayGetSize(pInfo->logs); ++i) { + SMonLogItem *pLog = taosArrayGet(pInfo->logs, i); + if (tEncodeI64(encoder, pLog->ts) < 0) return -1; + if (tEncodeI8(encoder, pLog->level) < 0) return -1; + if (tEncodeCStr(encoder, pLog->content) < 0) return -1; + } + return 0; +} + +static int32_t tDecodeSMonLogs(SCoder *decoder, SMonLogs *pInfo) { + if (tDecodeI32(decoder, &pInfo->numOfErrorLogs) < 0) return -1; + if (tDecodeI32(decoder, &pInfo->numOfInfoLogs) < 0) return -1; + if (tDecodeI32(decoder, &pInfo->numOfDebugLogs) < 0) return -1; + if (tDecodeI32(decoder, &pInfo->numOfTraceLogs) < 0) return -1; + + int32_t arraySize = 0; + if (tDecodeI32(decoder, &arraySize) < 0) return -1; + + pInfo->logs = taosArrayInit(arraySize, sizeof(SMonLogItem)); + if (pInfo->logs == NULL) return -1; + + for (int32_t i = 0; i < arraySize; ++i) { + SMonLogItem desc = {0}; + if (tDecodeI64(decoder, &desc.ts) < 0) return -1; + int8_t level = 0; + if (tDecodeI8(decoder, &level) < 0) return -1; + desc.level = level; + if (tDecodeCStrTo(decoder, desc.content) < 0) return -1; + taosArrayPush(pInfo->logs, &desc); + } + + return 0; +} + +int32_t tEncodeSMonClusterInfo(SCoder *encoder, const SMonClusterInfo *pInfo) { + if (tEncodeCStr(encoder, pInfo->first_ep) < 0) return -1; + if (tEncodeI32(encoder, pInfo->first_ep_dnode_id) < 0) return -1; + if (tEncodeCStr(encoder, pInfo->version) < 0) return -1; + if (tEncodeFloat(encoder, pInfo->master_uptime) < 0) return -1; + if (tEncodeI32(encoder, pInfo->monitor_interval) < 0) return -1; + if (tEncodeI32(encoder, pInfo->vgroups_total) < 0) return -1; + if (tEncodeI32(encoder, pInfo->vgroups_alive) < 0) return -1; + if (tEncodeI32(encoder, pInfo->vnodes_total) < 0) return -1; + if (tEncodeI32(encoder, pInfo->vnodes_alive) < 0) return -1; + if (tEncodeI32(encoder, pInfo->connections_total) < 0) return -1; + if (tEncodeI32(encoder, taosArrayGetSize(pInfo->dnodes)) < 0) return -1; + if (tEncodeI32(encoder, taosArrayGetSize(pInfo->mnodes)) < 0) return -1; + for (int32_t i = 0; i < taosArrayGetSize(pInfo->dnodes); ++i) { + SMonDnodeDesc *pDesc = taosArrayGet(pInfo->dnodes, i); + if (tEncodeI32(encoder, pDesc->dnode_id) < 0) return -1; + if (tEncodeCStr(encoder, pDesc->dnode_ep) < 0) return -1; + if (tEncodeCStr(encoder, pDesc->status) < 0) return -1; + } + for (int32_t i = 0; i < taosArrayGetSize(pInfo->mnodes); ++i) { + SMonMnodeDesc *pDesc = taosArrayGet(pInfo->mnodes, i); + if (tEncodeI32(encoder, pDesc->mnode_id) < 0) return -1; + if (tEncodeCStr(encoder, pDesc->mnode_ep) < 0) return -1; + if (tEncodeCStr(encoder, pDesc->role) < 0) return -1; + } + return 0; +} + +int32_t tDecodeSMonClusterInfo(SCoder *decoder, SMonClusterInfo *pInfo) { + if (tDecodeCStrTo(decoder, pInfo->first_ep) < 0) return -1; + if (tDecodeI32(decoder, &pInfo->first_ep_dnode_id) < 0) return -1; + if (tDecodeCStrTo(decoder, pInfo->version) < 0) return -1; + if (tDecodeFloat(decoder, &pInfo->master_uptime) < 0) return -1; + if (tDecodeI32(decoder, &pInfo->monitor_interval) < 0) return -1; + if (tDecodeI32(decoder, &pInfo->vgroups_total) < 0) return -1; + if (tDecodeI32(decoder, &pInfo->vgroups_alive) < 0) return -1; + if (tDecodeI32(decoder, &pInfo->vnodes_total) < 0) return -1; + if (tDecodeI32(decoder, &pInfo->vnodes_alive) < 0) return -1; + if (tDecodeI32(decoder, &pInfo->connections_total) < 0) return -1; + + int32_t dnodesSize = 0; + int32_t mnodesSize = 0; + if (tDecodeI32(decoder, &dnodesSize) < 0) return -1; + if (tDecodeI32(decoder, &mnodesSize) < 0) return -1; + + pInfo->dnodes = taosArrayInit(dnodesSize, sizeof(SMonDnodeDesc)); + pInfo->mnodes = taosArrayInit(mnodesSize, sizeof(SMonMnodeDesc)); + if (pInfo->dnodes == NULL || pInfo->mnodes == NULL) return -1; + + for (int32_t i = 0; i < dnodesSize; ++i) { + SMonDnodeDesc desc = {0}; + if (tDecodeI32(decoder, &desc.dnode_id) < 0) return -1; + if (tDecodeCStrTo(decoder, desc.dnode_ep) < 0) return -1; + if (tDecodeCStrTo(decoder, desc.status) < 0) return -1; + taosArrayPush(pInfo->dnodes, &desc); + } + + for (int32_t i = 0; i < mnodesSize; ++i) { + SMonMnodeDesc desc = {0}; + if (tDecodeI32(decoder, &desc.mnode_id) < 0) return -1; + if (tDecodeCStrTo(decoder, desc.mnode_ep) < 0) return -1; + if (tDecodeCStrTo(decoder, desc.role) < 0) return -1; + taosArrayPush(pInfo->mnodes, &desc); + } + return 0; +} + +int32_t tEncodeSMonVgroupInfo(SCoder *encoder, const SMonVgroupInfo *pInfo) { + if (tEncodeI32(encoder, taosArrayGetSize(pInfo->vgroups)) < 0) return -1; + for (int32_t i = 0; i < taosArrayGetSize(pInfo->vgroups); ++i) { + SMonVgroupDesc *pDesc = taosArrayGet(pInfo->vgroups, i); + if (tEncodeI32(encoder, pDesc->vgroup_id) < 0) return -1; + if (tEncodeI32(encoder, pDesc->tables_num) < 0) return -1; + if (tEncodeCStr(encoder, pDesc->database_name) < 0) return -1; + if (tEncodeCStr(encoder, pDesc->status) < 0) return -1; + for (int32_t j = 0; j < TSDB_MAX_REPLICA; ++j) { + SMonVnodeDesc *pVDesc = &pDesc->vnodes[j]; + if (tEncodeI32(encoder, pVDesc->dnode_id) < 0) return -1; + if (tEncodeCStr(encoder, pVDesc->vnode_role) < 0) return -1; + } + } + return 0; +} + +int32_t tDecodeSMonVgroupInfo(SCoder *decoder, SMonVgroupInfo *pInfo) { + int32_t arraySize = 0; + if (tDecodeI32(decoder, &arraySize) < 0) return -1; + + pInfo->vgroups = taosArrayInit(arraySize, sizeof(SMonVgroupDesc)); + if (pInfo->vgroups == NULL) return -1; + + for (int32_t i = 0; i < arraySize; ++i) { + SMonVgroupDesc desc = {0}; + if (tDecodeI32(decoder, &desc.vgroup_id) < 0) return -1; + if (tDecodeI32(decoder, &desc.tables_num) < 0) return -1; + if (tDecodeCStrTo(decoder, desc.database_name) < 0) return -1; + if (tDecodeCStrTo(decoder, desc.status) < 0) return -1; + for (int32_t j = 0; j < TSDB_MAX_REPLICA; ++j) { + SMonVnodeDesc vdesc = {0}; + if (tDecodeI32(decoder, &vdesc.dnode_id) < 0) return -1; + if (tDecodeCStrTo(decoder, vdesc.vnode_role) < 0) return -1; + } + taosArrayPush(pInfo->vgroups, &desc); + } + return 0; +} + +int32_t tEncodeSMonGrantInfo(SCoder *encoder, const SMonGrantInfo *pInfo) { + if (tEncodeI32(encoder, pInfo->expire_time) < 0) return -1; + if (tEncodeI32(encoder, pInfo->timeseries_used) < 0) return -1; + if (tEncodeI32(encoder, pInfo->timeseries_total) < 0) return -1; + return 0; +} + +int32_t tDecodeSMonGrantInfo(SCoder *decoder, SMonGrantInfo *pInfo) { + if (tDecodeI32(decoder, &pInfo->expire_time) < 0) return -1; + if (tDecodeI32(decoder, &pInfo->timeseries_used) < 0) return -1; + if (tDecodeI32(decoder, &pInfo->timeseries_total) < 0) return -1; + return 0; +} + +int32_t tSerializeSMonMmInfo(void *buf, int32_t bufLen, SMonMmInfo *pInfo) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeSMonClusterInfo(&encoder, &pInfo->cluster) < 0) return -1; + if (tEncodeSMonVgroupInfo(&encoder, &pInfo->vgroup) < 0) return -1; + if (tEncodeSMonGrantInfo(&encoder, &pInfo->grant) < 0) return -1; + if (tEncodeSMonSysInfo(&encoder, &pInfo->sys) < 0) return -1; + if (tEncodeSMonLogs(&encoder, &pInfo->log) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMonMmInfo(void *buf, int32_t bufLen, SMonMmInfo *pInfo) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeSMonClusterInfo(&decoder, &pInfo->cluster) < 0) return -1; + if (tDecodeSMonVgroupInfo(&decoder, &pInfo->vgroup) < 0) return -1; + if (tDecodeSMonGrantInfo(&decoder, &pInfo->grant) < 0) return -1; + if (tDecodeSMonSysInfo(&decoder, &pInfo->sys) < 0) return -1; + if (tDecodeSMonLogs(&decoder, &pInfo->log) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + +void tFreeSMonMmInfo(SMonMmInfo *pInfo) { + taosArrayDestroy(pInfo->log.logs); + taosArrayDestroy(pInfo->cluster.mnodes); + taosArrayDestroy(pInfo->cluster.dnodes); + taosArrayDestroy(pInfo->vgroup.vgroups); + pInfo->cluster.mnodes = NULL; + pInfo->cluster.dnodes = NULL; + pInfo->vgroup.vgroups = NULL; + pInfo->log.logs = NULL; +} + +int32_t tEncodeSMonDiskDesc(SCoder *encoder, const SMonDiskDesc *pDesc) { + if (tEncodeCStr(encoder, pDesc->name) < 0) return -1; + if (tEncodeI8(encoder, pDesc->level) < 0) return -1; + if (tEncodeI64(encoder, pDesc->size.total) < 0) return -1; + if (tEncodeI64(encoder, pDesc->size.used) < 0) return -1; + if (tEncodeI64(encoder, pDesc->size.avail) < 0) return -1; +} + +static int32_t tDecodeSMonDiskDesc(SCoder *decoder, SMonDiskDesc *pDesc) { + if (tDecodeCStrTo(decoder, pDesc->name) < 0) return -1; + if (tDecodeI8(decoder, &pDesc->level) < 0) return -1; + if (tDecodeI64(decoder, &pDesc->size.total) < 0) return -1; + if (tDecodeI64(decoder, &pDesc->size.used) < 0) return -1; + if (tDecodeI64(decoder, &pDesc->size.avail) < 0) return -1; + return 0; +} + +int32_t tEncodeSMonDiskInfo(SCoder *encoder, const SMonDiskInfo *pInfo) { + if (tEncodeI32(encoder, taosArrayGetSize(pInfo->datadirs)) < 0) return -1; + for (int32_t i = 0; i < taosArrayGetSize(pInfo->datadirs); ++i) { + SMonDiskDesc *pDesc = taosArrayGet(pInfo->datadirs, i); + if (tEncodeSMonDiskDesc(encoder, pDesc) < 0) return -1; + } + return 0; +} + +static int32_t tDecodeSMonDiskInfo(SCoder *decoder, SMonDiskInfo *pInfo) { + int32_t arraySize = 0; + if (tDecodeI32(decoder, &arraySize) < 0) return -1; + + pInfo->datadirs = taosArrayInit(arraySize, sizeof(SMonDiskDesc)); + if (pInfo->datadirs == NULL) return -1; + + for (int32_t i = 0; i < arraySize; ++i) { + SMonDiskDesc desc = {0}; + if (tDecodeSMonDiskDesc(decoder, &desc) < 0) return -1; + taosArrayPush(pInfo->datadirs, &desc); + } + + return 0; +} + +int32_t tEncodeSVnodesStat(SCoder *encoder, const SVnodesStat *pStat) { + if (tEncodeI32(encoder, pStat->openVnodes) < 0) return -1; + if (tEncodeI32(encoder, pStat->totalVnodes) < 0) return -1; + if (tEncodeI32(encoder, pStat->masterNum) < 0) return -1; + if (tEncodeI64(encoder, pStat->numOfSelectReqs) < 0) return -1; + if (tEncodeI64(encoder, pStat->numOfInsertReqs) < 0) return -1; + if (tEncodeI64(encoder, pStat->numOfInsertSuccessReqs) < 0) return -1; + if (tEncodeI64(encoder, pStat->numOfBatchInsertReqs) < 0) return -1; + if (tEncodeI64(encoder, pStat->numOfBatchInsertSuccessReqs) < 0) return -1; + if (tEncodeI64(encoder, pStat->errors) < 0) return -1; + return 0; +} + +static int32_t tDecodeSVnodesStat(SCoder *decoder, SVnodesStat *pStat) { + if (tDecodeI32(decoder, &pStat->openVnodes) < 0) return -1; + if (tDecodeI32(decoder, &pStat->totalVnodes) < 0) return -1; + if (tDecodeI32(decoder, &pStat->masterNum) < 0) return -1; + if (tDecodeI64(decoder, &pStat->numOfSelectReqs) < 0) return -1; + if (tDecodeI64(decoder, &pStat->numOfInsertReqs) < 0) return -1; + if (tDecodeI64(decoder, &pStat->numOfInsertSuccessReqs) < 0) return -1; + if (tDecodeI64(decoder, &pStat->numOfBatchInsertReqs) < 0) return -1; + if (tDecodeI64(decoder, &pStat->numOfBatchInsertSuccessReqs) < 0) return -1; + if (tDecodeI64(decoder, &pStat->errors) < 0) return -1; + return 0; +} + +int32_t tSerializeSMonVmInfo(void *buf, int32_t bufLen, SMonVmInfo *pInfo) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeSMonDiskInfo(&encoder, &pInfo->tfs) < 0) return -1; + if (tEncodeSVnodesStat(&encoder, &pInfo->vstat) < 0) return -1; + if (tEncodeSMonSysInfo(&encoder, &pInfo->sys) < 0) return -1; + if (tEncodeSMonLogs(&encoder, &pInfo->log) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMonVmInfo(void *buf, int32_t bufLen, SMonVmInfo *pInfo) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeSMonDiskInfo(&decoder, &pInfo->tfs) < 0) return -1; + if (tDecodeSVnodesStat(&decoder, &pInfo->vstat) < 0) return -1; + if (tDecodeSMonSysInfo(&decoder, &pInfo->sys) < 0) return -1; + if (tDecodeSMonLogs(&decoder, &pInfo->log) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + +void tFreeSMonVmInfo(SMonVmInfo *pInfo) { + taosArrayDestroy(pInfo->log.logs); + taosArrayDestroy(pInfo->tfs.datadirs); + pInfo->log.logs = NULL; + pInfo->tfs.datadirs = NULL; +} + +int32_t tSerializeSMonQmInfo(void *buf, int32_t bufLen, SMonQmInfo *pInfo) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeSMonSysInfo(&encoder, &pInfo->sys) < 0) return -1; + if (tEncodeSMonLogs(&encoder, &pInfo->log) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMonQmInfo(void *buf, int32_t bufLen, SMonQmInfo *pInfo) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeSMonSysInfo(&decoder, &pInfo->sys) < 0) return -1; + if (tDecodeSMonLogs(&decoder, &pInfo->log) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + +void tFreeSMonQmInfo(SMonQmInfo *pInfo) { + taosArrayDestroy(pInfo->log.logs); + pInfo->log.logs = NULL; +} + +int32_t tSerializeSMonSmInfo(void *buf, int32_t bufLen, SMonSmInfo *pInfo) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeSMonSysInfo(&encoder, &pInfo->sys) < 0) return -1; + if (tEncodeSMonLogs(&encoder, &pInfo->log) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMonSmInfo(void *buf, int32_t bufLen, SMonSmInfo *pInfo) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeSMonSysInfo(&decoder, &pInfo->sys) < 0) return -1; + if (tDecodeSMonLogs(&decoder, &pInfo->log) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + +void tFreeSMonSmInfo(SMonSmInfo *pInfo) { + taosArrayDestroy(pInfo->log.logs); + pInfo->log.logs = NULL; +} + +int32_t tSerializeSMonBmInfo(void *buf, int32_t bufLen, SMonBmInfo *pInfo) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeSMonSysInfo(&encoder, &pInfo->sys) < 0) return -1; + if (tEncodeSMonLogs(&encoder, &pInfo->log) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMonBmInfo(void *buf, int32_t bufLen, SMonBmInfo *pInfo) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeSMonSysInfo(&decoder, &pInfo->sys) < 0) return -1; + if (tDecodeSMonLogs(&decoder, &pInfo->log) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + +void tFreeSMonBmInfo(SMonBmInfo *pInfo) { + taosArrayDestroy(pInfo->log.logs); + pInfo->log.logs = NULL; +} + +int32_t tSerializeSMonVloadInfo(void *buf, int32_t bufLen, SMonVloadInfo *pInfo) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, taosArrayGetSize(pInfo->pVloads)) < 0) return -1; + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pVloads); ++i) { + SVnodeLoad *pLoad = taosArrayGet(pInfo->pVloads, i); + if (tEncodeI32(&encoder, pLoad->vgId) < 0) return -1; + if (tEncodeI8(&encoder, pLoad->role) < 0) return -1; + if (tEncodeI64(&encoder, pLoad->numOfTables) < 0) return -1; + if (tEncodeI64(&encoder, pLoad->numOfTimeSeries) < 0) return -1; + if (tEncodeI64(&encoder, pLoad->totalStorage) < 0) return -1; + if (tEncodeI64(&encoder, pLoad->compStorage) < 0) return -1; + if (tEncodeI64(&encoder, pLoad->pointsWritten) < 0) return -1; + if (tEncodeI64(&encoder, pLoad->numOfSelectReqs) < 0) return -1; + if (tEncodeI64(&encoder, pLoad->numOfInsertReqs) < 0) return -1; + if (tEncodeI64(&encoder, pLoad->numOfInsertSuccessReqs) < 0) return -1; + if (tEncodeI64(&encoder, pLoad->numOfBatchInsertReqs) < 0) return -1; + if (tEncodeI64(&encoder, pLoad->numOfBatchInsertSuccessReqs) < 0) return -1; + } + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMonVloadInfo(void *buf, int32_t bufLen, SMonVloadInfo *pInfo) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + + int32_t arraySize = 0; + if (tDecodeI32(&decoder, &arraySize) < 0) return -1; + + pInfo->pVloads = taosArrayInit(arraySize, sizeof(SVnodeLoad)); + if (pInfo->pVloads == NULL) return -1; + + for (int32_t i = 0; i < arraySize; ++i) { + SVnodeLoad load = {0}; + if (tDecodeI32(&decoder, &load.vgId) < 0) return -1; + if (tDecodeI8(&decoder, &load.role) < 0) return -1; + if (tDecodeI64(&decoder, &load.numOfTables) < 0) return -1; + if (tDecodeI64(&decoder, &load.numOfTimeSeries) < 0) return -1; + if (tDecodeI64(&decoder, &load.totalStorage) < 0) return -1; + if (tDecodeI64(&decoder, &load.compStorage) < 0) return -1; + if (tDecodeI64(&decoder, &load.pointsWritten) < 0) return -1; + if (tDecodeI64(&decoder, &load.numOfSelectReqs) < 0) return -1; + if (tDecodeI64(&decoder, &load.numOfInsertReqs) < 0) return -1; + if (tDecodeI64(&decoder, &load.numOfInsertSuccessReqs) < 0) return -1; + if (tDecodeI64(&decoder, &load.numOfBatchInsertReqs) < 0) return -1; + if (tDecodeI64(&decoder, &load.numOfBatchInsertSuccessReqs) < 0) return -1; + taosArrayPush(pInfo->pVloads, &load); + } + + tEndDecode(&decoder); + tCoderClear(&decoder); + return 0; +} + +void tFreeSMonVloadInfo(SMonVloadInfo *pInfo) { + taosArrayDestroy(pInfo->pVloads); + pInfo->pVloads = NULL; +} \ No newline at end of file diff --git a/source/libs/monitor/test/CMakeLists.txt b/source/libs/monitor/test/CMakeLists.txt index e3ab7e7337..cd1b94574d 100644 --- a/source/libs/monitor/test/CMakeLists.txt +++ b/source/libs/monitor/test/CMakeLists.txt @@ -1,14 +1,14 @@ enable_testing() aux_source_directory(. MONITOR_TEST_SRC) -add_executable(monitor_test ${MONITOR_TEST_SRC}) +add_executable(monitorTest ${MONITOR_TEST_SRC}) target_link_libraries( - monitor_test + monitorTest PUBLIC monitor PUBLIC gtest_main ) add_test( - NAME monitor_test - COMMAND monitor_test + NAME monitorTest + COMMAND monitorTest ) diff --git a/source/libs/monitor/test/monTest.cpp b/source/libs/monitor/test/monTest.cpp index 05c9fb8b38..ceb9805e3d 100644 --- a/source/libs/monitor/test/monTest.cpp +++ b/source/libs/monitor/test/monTest.cpp @@ -22,7 +22,7 @@ class MonitorTest : public ::testing::Test { cfg.maxLogs = 2; cfg.port = 80; cfg.server = "localhost"; - cfg.comp = 0; + cfg.comp = 1; monInit(&cfg); } @@ -32,24 +32,64 @@ class MonitorTest : public ::testing::Test { void SetUp() override {} void TearDown() override {} - void GetBasicInfo(SMonInfo *pMonitor, SMonBasicInfo *pInfo); - void GetClusterInfo(SMonInfo *pMonitor, SMonClusterInfo *pInfo); - void GetVgroupInfo(SMonInfo *pMonitor, SMonVgroupInfo *pInfo); - void GetGrantInfo(SMonInfo *pMonitor, SMonGrantInfo *pInfo); - void GetDnodeInfo(SMonInfo *pMonitor, SMonDnodeInfo *pInfo); - void GetDiskInfo(SMonInfo *pMonitor, SMonDiskInfo *pInfo); + void GetBasicInfo(SMonBasicInfo *pInfo); + void GetDnodeInfo(SMonDnodeInfo *pInfo); + void GetSysInfo(SMonSysInfo *pInfo); + + void GetClusterInfo(SMonClusterInfo *pInfo); + void GetVgroupInfo(SMonVgroupInfo *pInfo); + void GetGrantInfo(SMonGrantInfo *pInfo); + + void GetVnodeStat(SVnodesStat *pStat); + void GetDiskInfo(SMonDiskInfo *pInfo); + + void GetLogInfo(SMonLogs *logs); + void AddLogInfo1(); void AddLogInfo2(); }; -void MonitorTest::GetBasicInfo(SMonInfo *pMonitor, SMonBasicInfo *pInfo) { +void MonitorTest::GetBasicInfo(SMonBasicInfo *pInfo) { pInfo->dnode_id = 1; strcpy(pInfo->dnode_ep, "localhost"); pInfo->cluster_id = 6980428120398645172; pInfo->protocol = 1; } -void MonitorTest::GetClusterInfo(SMonInfo *pMonitor, SMonClusterInfo *pInfo) { +void MonitorTest::GetDnodeInfo(SMonDnodeInfo *pInfo) { + pInfo->uptime = 1.2; + pInfo->has_mnode = 1; + + strcpy(pInfo->logdir.name, "/log/dir/d"); + pInfo->logdir.size.avail = 41; + pInfo->logdir.size.total = 42; + pInfo->logdir.size.used = 43; + + strcpy(pInfo->tempdir.name, "/data/dir/d"); + pInfo->tempdir.size.avail = 51; + pInfo->tempdir.size.total = 52; + pInfo->tempdir.size.used = 53; +} + +void MonitorTest::GetSysInfo(SMonSysInfo *pInfo) { + pInfo->cpu_engine = 2.1; + pInfo->cpu_system = 2.1; + pInfo->cpu_cores = 2; + pInfo->mem_engine = 3.1; + pInfo->mem_system = 3.2; + pInfo->mem_total = 3.3; + pInfo->disk_engine = 4.1; + pInfo->disk_used = 4.2; + pInfo->disk_total = 4.3; + pInfo->net_in = 5.1; + pInfo->net_out = 5.2; + pInfo->io_read = 6.1; + pInfo->io_write = 6.2; + pInfo->io_read_disk = 7.1; + pInfo->io_write_disk = 7.2; +} + +void MonitorTest::GetClusterInfo(SMonClusterInfo *pInfo) { strcpy(pInfo->first_ep, "localhost:6030"); pInfo->first_ep_dnode_id = 1; strcpy(pInfo->version, "3.0.0.0"); @@ -86,7 +126,7 @@ void MonitorTest::GetClusterInfo(SMonInfo *pMonitor, SMonClusterInfo *pInfo) { taosArrayPush(pInfo->mnodes, &m2); } -void MonitorTest::GetVgroupInfo(SMonInfo *pMonitor, SMonVgroupInfo *pInfo) { +void MonitorTest::GetVgroupInfo(SMonVgroupInfo *pInfo) { pInfo->vgroups = taosArrayInit(4, sizeof(SMonVgroupDesc)); SMonVgroupDesc vg1 = {0}; @@ -121,41 +161,24 @@ void MonitorTest::GetVgroupInfo(SMonInfo *pMonitor, SMonVgroupInfo *pInfo) { taosArrayPush(pInfo->vgroups, &vg3); } -void MonitorTest::GetGrantInfo(SMonInfo *pMonitor, SMonGrantInfo *pInfo) { +void MonitorTest::GetGrantInfo(SMonGrantInfo *pInfo) { pInfo->expire_time = 1234567; pInfo->timeseries_total = 234567; pInfo->timeseries_used = 34567; } -void MonitorTest::GetDnodeInfo(SMonInfo *pMonitor, SMonDnodeInfo *pInfo) { - pInfo->uptime = 1.2; - pInfo->cpu_engine = 2.1; - pInfo->cpu_system = 2.1; - pInfo->cpu_cores = 2; - pInfo->mem_engine = 3.1; - pInfo->mem_system = 3.2; - pInfo->mem_total = 3.3; - pInfo->disk_engine = 4.1; - pInfo->disk_used = 4.2; - pInfo->disk_total = 4.3; - pInfo->net_in = 5.1; - pInfo->net_out = 5.2; - pInfo->io_read = 6.1; - pInfo->io_write = 6.2; - pInfo->io_read_disk = 7.1; - pInfo->io_write_disk = 7.2; - pInfo->req_select = 8; - pInfo->req_insert = 9; - pInfo->req_insert_success = 10; - pInfo->req_insert_batch = 11; - pInfo->req_insert_batch_success = 12; +void MonitorTest::GetVnodeStat(SVnodesStat *pInfo) { + pInfo->numOfSelectReqs = 8; + pInfo->numOfInsertReqs = 9; + pInfo->numOfInsertSuccessReqs = 10; + pInfo->numOfBatchInsertReqs = 11; + pInfo->numOfBatchInsertSuccessReqs = 12; pInfo->errors = 4; - pInfo->vnodes_num = 5; - pInfo->masters = 6; - pInfo->has_mnode = 1; + pInfo->totalVnodes = 5; + pInfo->masterNum = 6; } -void MonitorTest::GetDiskInfo(SMonInfo *pMonitor, SMonDiskInfo *pInfo) { +void MonitorTest::GetDiskInfo(SMonDiskInfo *pInfo) { pInfo->datadirs = taosArrayInit(2, sizeof(SMonDiskDesc)); SMonDiskDesc d1 = {0}; strcpy(d1.name, "/t1/d1/d"); @@ -180,16 +203,25 @@ void MonitorTest::GetDiskInfo(SMonInfo *pMonitor, SMonDiskInfo *pInfo) { d3.size.total = 32; d3.size.used = 33; taosArrayPush(pInfo->datadirs, &d3); +} - strcpy(pInfo->logdir.name, "/log/dir/d"); - pInfo->logdir.size.avail = 41; - pInfo->logdir.size.total = 42; - pInfo->logdir.size.used = 43; +void MonitorTest::GetLogInfo(SMonLogs *logs) { + logs->logs = taosArrayInit(4, sizeof(SMonLogItem)); - strcpy(pInfo->tempdir.name, "/data/dir/d"); - pInfo->tempdir.size.avail = 51; - pInfo->tempdir.size.total = 52; - pInfo->tempdir.size.used = 53; + SMonLogItem item1 = {.level = DEBUG_INFO}; + item1.ts = taosGetTimestampMs(); + strcpy(item1.content, "log test1"); + taosArrayPush(logs->logs, &item1); + + SMonLogItem item2 = {.level = DEBUG_ERROR}; + item2.ts = taosGetTimestampMs(); + strcpy(item2.content, "log test2"); + taosArrayPush(logs->logs, &item2); + + logs->numOfErrorLogs = 1; + logs->numOfInfoLogs = 2; + logs->numOfDebugLogs = 3; + logs->numOfTraceLogs = 4; } void MonitorTest::AddLogInfo1() { @@ -206,46 +238,52 @@ void MonitorTest::AddLogInfo2() { TEST_F(MonitorTest, 01_Full) { AddLogInfo1(); - SMonInfo *pMonitor = monCreateMonitorInfo(); - if (pMonitor == NULL) return; + SMonDmInfo dmInfo = {0}; + GetBasicInfo(&dmInfo.basic); + GetDnodeInfo(&dmInfo.dnode); + GetSysInfo(&dmInfo.sys); - SMonBasicInfo basicInfo = {0}; - GetBasicInfo(pMonitor, &basicInfo); - monSetBasicInfo(pMonitor, &basicInfo); + SMonMmInfo mmInfo = {0}; + GetClusterInfo(&mmInfo.cluster); + GetVgroupInfo(&mmInfo.vgroup); + GetGrantInfo(&mmInfo.grant); + GetSysInfo(&mmInfo.sys); + GetLogInfo(&mmInfo.log); - SMonClusterInfo clusterInfo = {0}; - SMonVgroupInfo vgroupInfo = {0}; - SMonGrantInfo grantInfo = {0}; - GetClusterInfo(pMonitor, &clusterInfo); - GetVgroupInfo(pMonitor, &vgroupInfo); - GetGrantInfo(pMonitor, &grantInfo); - monSetClusterInfo(pMonitor, &clusterInfo); - monSetVgroupInfo(pMonitor, &vgroupInfo); - monSetGrantInfo(pMonitor, &grantInfo); + SMonVmInfo vmInfo = {0}; + GetDiskInfo(&vmInfo.tfs); + GetVnodeStat(&vmInfo.vstat); + GetSysInfo(&vmInfo.sys); + GetLogInfo(&vmInfo.log); - SMonDnodeInfo dnodeInfo = {0}; - GetDnodeInfo(pMonitor, &dnodeInfo); - monSetDnodeInfo(pMonitor, &dnodeInfo); + SMonQmInfo qmInfo = {0}; + GetSysInfo(&qmInfo.sys); + GetLogInfo(&qmInfo.log); - SMonDiskInfo diskInfo = {0}; - GetDiskInfo(pMonitor, &diskInfo); - monSetDiskInfo(pMonitor, &diskInfo); + SMonSmInfo smInfo = {0}; + GetSysInfo(&smInfo.sys); + GetLogInfo(&smInfo.log); - monSendReport(pMonitor); - monCleanupMonitorInfo(pMonitor); + SMonBmInfo bmInfo = {0}; + GetSysInfo(&bmInfo.sys); + GetLogInfo(&bmInfo.log); - taosArrayDestroy(clusterInfo.dnodes); - taosArrayDestroy(clusterInfo.mnodes); - taosArrayDestroy(vgroupInfo.vgroups); - taosArrayDestroy(diskInfo.datadirs); + monSetDmInfo(&dmInfo); + monSetMmInfo(&mmInfo); + monSetVmInfo(&vmInfo); + monSetQmInfo(&qmInfo); + monSetSmInfo(&smInfo); + monSetBmInfo(&bmInfo); + + tFreeSMonMmInfo(&mmInfo); + tFreeSMonVmInfo(&vmInfo); + tFreeSMonSmInfo(&smInfo); + tFreeSMonQmInfo(&qmInfo); + tFreeSMonBmInfo(&bmInfo); + monSendReport(); } TEST_F(MonitorTest, 02_Log) { AddLogInfo2(); - - SMonInfo *pMonitor = monCreateMonitorInfo(); - if (pMonitor == NULL) return; - - monSendReport(pMonitor); - monCleanupMonitorInfo(pMonitor); + monSendReport(); } diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 90f15dd7d0..cd1fbf8e0e 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -117,7 +117,7 @@ _OVER: static void clientConnCb(uv_connect_t* req, int32_t status) { if (status < 0) { terrno = TAOS_SYSTEM_ERROR(status); - uError("Connection error %s\n", uv_strerror(status)); + uError("connection error %s", uv_strerror(status)); uv_close((uv_handle_t*)req->handle, NULL); return; } diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index eb5712d2e5..26de26ab67 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -129,14 +129,12 @@ static void taosGetProcIOnfos() { static int32_t taosGetSysCpuInfo(SysCpuInfo *cpuInfo) { TdFilePtr pFile = taosOpenFile(tsSysCpuFile, TD_FILE_READ | TD_FILE_STREAM); if (pFile == NULL) { - // printf("open file:%s failed", tsSysCpuFile); return -1; } char *line = NULL; ssize_t _bytes = taosGetLineFile(pFile, &line); if ((_bytes < 0) || (line == NULL)) { - // printf("read file:%s failed", tsSysCpuFile); taosCloseFile(&pFile); return -1; } @@ -153,14 +151,12 @@ static int32_t taosGetSysCpuInfo(SysCpuInfo *cpuInfo) { static int32_t taosGetProcCpuInfo(ProcCpuInfo *cpuInfo) { TdFilePtr pFile = taosOpenFile(tsProcCpuFile, TD_FILE_READ | TD_FILE_STREAM); if (pFile == NULL) { - // printf("open file:%s failed", tsProcCpuFile); return -1; } char *line = NULL; ssize_t _bytes = taosGetLineFile(pFile, &line); if ((_bytes < 0) || (line == NULL)) { - // printf("read file:%s failed", tsProcCpuFile); taosCloseFile(&pFile); return -1; } @@ -182,12 +178,12 @@ static int32_t taosGetProcCpuInfo(ProcCpuInfo *cpuInfo) { #endif bool taosCheckSystemIsSmallEnd() { - union check{ - int16_t i; - char ch[2]; - }c; - c.i=1; - return c.ch[0]==1; + union check { + int16_t i; + char ch[2]; + } c; + c.i = 1; + return c.ch[0] == 1; } void taosGetSystemInfo() { @@ -617,6 +613,28 @@ int32_t taosGetProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int #endif } +void taosGetProcIODelta(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) { + static int64_t last_rchars = 0; + static int64_t last_wchars = 0; + static int64_t last_read_bytes = 0; + static int64_t last_write_bytes = 0; + + static int64_t cur_rchars = 0; + static int64_t cur_wchars = 0; + static int64_t cur_read_bytes = 0; + static int64_t cur_write_bytes = 0; + if (taosGetProcIO(&cur_rchars, &cur_wchars, &cur_read_bytes, &cur_write_bytes) == 0) { + *rchars = cur_rchars - last_rchars; + *wchars = cur_wchars - last_wchars; + *read_bytes = cur_read_bytes - last_read_bytes; + *write_bytes = cur_write_bytes - last_write_bytes; + last_rchars = cur_rchars; + last_wchars = cur_wchars; + last_read_bytes = cur_read_bytes; + last_write_bytes = cur_write_bytes; + } +} + int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes) { #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) *receive_bytes = 0; @@ -672,6 +690,20 @@ int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes) { #endif } +void taosGetCardInfoDelta(int64_t *receive_bytes, int64_t *transmit_bytes) { + static int64_t last_receive_bytes = 0; + static int64_t last_transmit_bytes = 0; + + static int64_t cur_receive_bytes = 0; + static int64_t cur_transmit_bytes = 0; + if (taosGetCardInfo(&cur_receive_bytes, &cur_transmit_bytes) == 0) { + *receive_bytes = cur_receive_bytes - last_receive_bytes; + *transmit_bytes = cur_transmit_bytes - last_transmit_bytes; + last_receive_bytes = cur_receive_bytes; + last_transmit_bytes = cur_transmit_bytes; + } +} + void taosKillSystem() { #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) printf("function taosKillSystem, exit!"); diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index e7e870e998..74d7c15e78 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -348,7 +348,7 @@ SConfigItem *cfgGetItem(SConfig *pCfg, const char *name) { } } - uError("name:%s, cfg not found", name); + // uError("name:%s, cfg not found", name); terrno = TSDB_CODE_CFG_NOT_FOUND; return NULL; }