diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f64b213353..ac2a8aaeff 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -723,7 +723,7 @@ typedef struct { typedef struct { int32_t vgId; - int8_t role; + int32_t syncState; int64_t numOfTables; int64_t numOfTimeSeries; int64_t totalStorage; @@ -736,6 +736,10 @@ typedef struct { int64_t numOfBatchInsertSuccessReqs; } SVnodeLoad; +typedef struct { + int32_t syncState; +} SMnodeLoad; + typedef struct { int32_t sver; // software version int64_t dnodeVer; // dnode table version in sdb @@ -1072,13 +1076,11 @@ int32_t tDeserializeSAuthReq(void* buf, int32_t bufLen, SAuthReq* pReq); typedef struct { int32_t statusCode; - int32_t detailLen; - char* details; + char details[1024]; } SServerStatusRsp; int32_t tSerializeSServerStatusRsp(void* buf, int32_t bufLen, SServerStatusRsp* pRsp); int32_t tDeserializeSServerStatusRsp(void* buf, int32_t bufLen, SServerStatusRsp* pRsp); -void tFreeSServerStatusRsp(SServerStatusRsp* pRsp); /** * The layout of the query message payload is as following: diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index b02ffcea60..580046eea0 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -228,7 +228,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_MON_SM_INFO, "monitor-sinfo", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MON_BM_INFO, "monitor-binfo", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MON_VM_LOAD, "monitor-vload", NULL, NULL) - + TD_DEF_MSG_TYPE(TDMT_MON_MM_LOAD, "monitor-mload", NULL, NULL) + #if defined(TD_MSG_NUMBER_) TDMT_MAX #endif diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 9848125919..eed91d7561 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -80,6 +80,7 @@ void mndStop(SMnode *pMnode); * @return int32_t 0 for success, -1 for failure. */ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pCluster, SMonVgroupInfo *pVgroup, SMonGrantInfo *pGrant); +int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); /** * @brief Process the read, write, sync request. diff --git a/include/libs/monitor/monitor.h b/include/libs/monitor/monitor.h index af0580674d..9d8cf61b06 100644 --- a/include/libs/monitor/monitor.h +++ b/include/libs/monitor/monitor.h @@ -202,6 +202,14 @@ int32_t tSerializeSMonVloadInfo(void *buf, int32_t bufLen, SMonVloadInfo *pInfo) int32_t tDeserializeSMonVloadInfo(void *buf, int32_t bufLen, SMonVloadInfo *pInfo); void tFreeSMonVloadInfo(SMonVloadInfo *pInfo); +typedef struct { + int8_t isMnode; + SMnodeLoad load; +} SMonMloadInfo; + +int32_t tSerializeSMonMloadInfo(void *buf, int32_t bufLen, SMonMloadInfo *pInfo); +int32_t tDeserializeSMonMloadInfo(void *buf, int32_t bufLen, SMonMloadInfo *pInfo); + typedef struct { const char *server; uint16_t port; diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index a2f88490f0..a4ff1b23a7 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -172,6 +172,8 @@ int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); extern int32_t sDebugFlag; +const char *syncStr(ESyncState state); + #ifdef __cplusplus } #endif diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 526dec4df9..6a29c7a8fd 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -878,7 +878,5 @@ _OVER: if (rpcRsp.pCont != NULL) { rpcFreeCont(rpcRsp.pCont); } - tFreeSServerStatusRsp(&statusRsp); - return code; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index f42ef17f56..6515802dca 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1003,7 +1003,7 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { for (int32_t i = 0; i < vlen; ++i) { SVnodeLoad *pload = taosArrayGet(pReq->pVloads, i); if (tEncodeI32(&encoder, pload->vgId) < 0) return -1; - if (tEncodeI8(&encoder, pload->role) < 0) return -1; + if (tEncodeI32(&encoder, pload->syncState) < 0) return -1; if (tEncodeI64(&encoder, pload->numOfTables) < 0) return -1; if (tEncodeI64(&encoder, pload->numOfTimeSeries) < 0) return -1; if (tEncodeI64(&encoder, pload->totalStorage) < 0) return -1; @@ -1054,7 +1054,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { for (int32_t i = 0; i < vlen; ++i) { SVnodeLoad vload = {0}; if (tDecodeI32(&decoder, &vload.vgId) < 0) return -1; - if (tDecodeI8(&decoder, &vload.role) < 0) return -1; + if (tDecodeI32(&decoder, &vload.syncState) < 0) return -1; if (tDecodeI64(&decoder, &vload.numOfTables) < 0) return -1; if (tDecodeI64(&decoder, &vload.numOfTimeSeries) < 0) return -1; if (tDecodeI64(&decoder, &vload.totalStorage) < 0) return -1; @@ -3105,10 +3105,7 @@ int32_t tSerializeSServerStatusRsp(void *buf, int32_t bufLen, SServerStatusRsp * if (tStartEncode(&encoder) < 0) return -1; if (tEncodeI32(&encoder, pRsp->statusCode) < 0) return -1; - if (tEncodeI32(&encoder, pRsp->detailLen) < 0) return -1; - if (pRsp->detailLen > 0) { - if (tEncodeCStr(&encoder, pRsp->details) < 0) return -1; - } + if (tEncodeCStr(&encoder, pRsp->details) < 0) return -1; tEndEncode(&encoder); @@ -3123,23 +3120,13 @@ int32_t tDeserializeSServerStatusRsp(void *buf, int32_t bufLen, SServerStatusRsp if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->statusCode) < 0) return -1; - if (tDecodeI32(&decoder, &pRsp->detailLen) < 0) return -1; - if (pRsp->detailLen > 0) { - pRsp->details = taosMemoryCalloc(1, pRsp->detailLen); - if (pRsp->details == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - if (tDecodeCStrTo(&decoder, pRsp->details) < 0) return -1; - } + if (tDecodeCStrTo(&decoder, pRsp->details) < 0) return -1; tEndDecode(&decoder); tCoderClear(&decoder); return 0; } -void tFreeSServerStatusRsp(SServerStatusRsp *pRsp) { taosMemoryFree(pRsp->details); } - int32_t tEncodeSMqOffset(SCoder *encoder, const SMqOffset *pOffset) { if (tEncodeI32(encoder, pOffset->vgId) < 0) return -1; if (tEncodeI64(encoder, pOffset->offset) < 0) return -1; diff --git a/source/dnode/mgmt/implement/inc/dmImp.h b/source/dnode/mgmt/implement/inc/dmImp.h index 52a56305fd..32869aee9e 100644 --- a/source/dnode/mgmt/implement/inc/dmImp.h +++ b/source/dnode/mgmt/implement/inc/dmImp.h @@ -49,6 +49,7 @@ int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg) // dmMonitor.c void dmGetVnodeLoads(SDnode *pDnode, SMonVloadInfo *pInfo); +void dmGetMnodeLoads(SDnode *pDnode, SMonMloadInfo *pInfo); void dmSendMonitorReport(SDnode *pDnode); // dmWorker.c @@ -70,6 +71,7 @@ void vmSetMgmtFp(SMgmtWrapper *pWrapper); void mmSetMgmtFp(SMgmtWrapper *pWrapper); void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo); +void mmGetMnodeLoads(SMgmtWrapper *pWrapper, SMonMloadInfo *pInfo); void mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonMmInfo *mmInfo); void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *vmInfo); void qmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonQmInfo *qmInfo); diff --git a/source/dnode/mgmt/implement/src/dmHandle.c b/source/dnode/mgmt/implement/src/dmHandle.c index 713cf24145..7dc7cbc8b4 100644 --- a/source/dnode/mgmt/implement/src/dmHandle.c +++ b/source/dnode/mgmt/implement/src/dmHandle.c @@ -72,9 +72,23 @@ void dmSendStatusReq(SDnode *pDnode) { memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN); taosRUnLockLatch(&pDnode->data.latch); - SMonVloadInfo info = {0}; - dmGetVnodeLoads(pDnode, &info); - req.pVloads = info.pVloads; + SMonVloadInfo vinfo = {0}; + dmGetVnodeLoads(pDnode, &vinfo); + req.pVloads = vinfo.pVloads; + pDnode->data.unsyncedVgId = 0; + pDnode->data.vndState = TAOS_SYNC_STATE_LEADER; + for (int32_t i = 0; i < taosArrayGetSize(req.pVloads); ++i) { + SVnodeLoad *pLoad = taosArrayGet(req.pVloads, i); + if (pLoad->syncState != TAOS_SYNC_STATE_LEADER && pLoad->syncState != TAOS_SYNC_STATE_FOLLOWER) { + pDnode->data.unsyncedVgId = pLoad->vgId; + pDnode->data.vndState = pLoad->syncState; + } + } + + SMonMloadInfo minfo = {0}; + dmGetMnodeLoads(pDnode, &minfo); + pDnode->data.isMnode = minfo.isMnode; + pDnode->data.mndState = minfo.load.syncState; int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); void *pHead = rpcMallocCont(contLen); diff --git a/source/dnode/mgmt/implement/src/dmMonitor.c b/source/dnode/mgmt/implement/src/dmMonitor.c index b0774cd66f..8543310eb5 100644 --- a/source/dnode/mgmt/implement/src/dmMonitor.c +++ b/source/dnode/mgmt/implement/src/dmMonitor.c @@ -183,3 +183,29 @@ void dmGetVnodeLoads(SDnode *pDnode, SMonVloadInfo *pInfo) { } dmReleaseWrapper(pWrapper); } + +void dmGetMnodeLoads(SDnode *pDnode, SMonMloadInfo *pInfo) { + SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, MNODE); + if (pWrapper == NULL) { + pInfo->isMnode = 0; + return; + } + + bool getFromAPI = !tsMultiProcess; + if (getFromAPI) { + mmGetMnodeLoads(pWrapper, pInfo); + } else { + SRpcMsg req = {.msgType = TDMT_MON_MM_LOAD}; + SRpcMsg rsp = {0}; + SEpSet epset = {.inUse = 0, .numOfEps = 1}; + tstrncpy(epset.eps[0].fqdn, pDnode->data.localFqdn, TSDB_FQDN_LEN); + epset.eps[0].port = tsServerPort; + + dmSendRecv(pDnode, &epset, &req, &rsp); + if (rsp.code == 0 && rsp.contLen > 0) { + tDeserializeSMonMloadInfo(rsp.pCont, rsp.contLen, pInfo); + } + rpcFreeCont(rsp.pCont); + } + dmReleaseWrapper(pWrapper); +} diff --git a/source/dnode/mgmt/interface/inc/dmDef.h b/source/dnode/mgmt/interface/inc/dmDef.h index 651247ed0f..e6537dcf73 100644 --- a/source/dnode/mgmt/interface/inc/dmDef.h +++ b/source/dnode/mgmt/interface/inc/dmDef.h @@ -38,6 +38,7 @@ #include "dnode.h" #include "mnode.h" #include "monitor.h" +#include "sync.h" #ifdef __cplusplus extern "C" { @@ -110,6 +111,10 @@ typedef struct { int64_t dnodeVer; int64_t updateTime; int64_t rebootTime; + int32_t unsyncedVgId; + ESyncState vndState; + ESyncState mndState; + bool isMnode; bool dropped; SEpSet mnodeEps; SArray *dnodeEps; diff --git a/source/dnode/mgmt/interface/src/dmInt.c b/source/dnode/mgmt/interface/src/dmInt.c index 928321a53f..417fbfebf5 100644 --- a/source/dnode/mgmt/interface/src/dmInt.c +++ b/source/dnode/mgmt/interface/src/dmInt.c @@ -148,29 +148,27 @@ void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const c } static void dmGetServerStatus(SDnode *pDnode, SServerStatusRsp *pStatus) { + pStatus->details[0] = 0; + if (pDnode->status == DND_STAT_INIT) { pStatus->statusCode = TSDB_SRV_STATUS_NETWORK_OK; + snprintf(pStatus->details, sizeof(pStatus->details), "%s: %s", pDnode->startup.name, pDnode->startup.desc); } else if (pDnode->status == DND_STAT_STOPPED) { pStatus->statusCode = TSDB_SRV_STATUS_EXTING; } else { - pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK; - } - - if (pStatus->statusCode == TSDB_SRV_STATUS_NETWORK_OK) { - SStartupInfo *pStartup = &pDnode->startup; - - int32_t len = strlen(pStartup->name) + strlen(pStartup->desc); - if (len > 0) { - pStatus->details = taosMemoryCalloc(1, len + 24); - if (pStatus->details != NULL) { - pStatus->detailLen = snprintf(pStatus->details, len + 20, "%s: %s", pStartup->name, pStartup->desc) + 1; - } + SDnodeData *pData = &pDnode->data; + if (pData->isMnode && pData->mndState != TAOS_SYNC_STATE_FOLLOWER && pData->mndState != TAOS_SYNC_STATE_FOLLOWER) { + pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED; + snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(pData->mndState)); + } else if (pData->unsyncedVgId != 0 && pData->vndState != TAOS_SYNC_STATE_FOLLOWER && + pData->vndState != TAOS_SYNC_STATE_FOLLOWER) { + pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED; + snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pData->unsyncedVgId, + syncStr(pData->vndState)); + } else { + pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK; } } - - if (pStatus->statusCode == TSDB_SRV_STATUS_SERVICE_OK) { - // check the status of mnode and vnode - } } void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pReq) { @@ -198,7 +196,6 @@ void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pReq) { _OVER: rpcSendResponse(&rspMsg); - tFreeSServerStatusRsp(&statusRsp); } void dmGetMonitorSysInfo(SMonSysInfo *pInfo) { diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 5f66ae230a..4d40d1fa28 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -52,6 +52,8 @@ int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t mmProcessGetMonMmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); +int32_t mmProcessGetMnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); +void mmGetMnodeLoads(SMgmtWrapper *pWrapper, SMonMloadInfo *pInfo); // mmWorker.c int32_t mmStartWorker(SMnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 38885333c9..3456203aa2 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -46,6 +46,34 @@ int32_t mmProcessGetMonMmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { return 0; } +void mmGetMnodeLoads(SMgmtWrapper *pWrapper, SMonMloadInfo *pInfo) { + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + pInfo->isMnode = 1; + mndGetLoad(pMgmt->pMnode, &pInfo->load); +} + +int32_t mmProcessGetMnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { + SMonMloadInfo mloads = {0}; + mmGetMnodeLoads(pWrapper, &mloads); + + int32_t rspLen = tSerializeSMonMloadInfo(NULL, 0, &mloads); + if (rspLen < 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + void *pRsp = rpcMallocCont(rspLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + tSerializeSMonMloadInfo(pRsp, rspLen, &mloads); + pReq->pRsp = pRsp; + pReq->rspLen = rspLen; + return 0; +} + int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SDnode *pDnode = pWrapper->pDnode; SRpcMsg *pReq = &pMsg->rpcMsg; @@ -117,6 +145,7 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { void mmInitMsgHandle(SMgmtWrapper *pWrapper) { dmSetMsgHandle(pWrapper, TDMT_MON_MM_INFO, mmProcessMonitorMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_MON_MM_LOAD, mmProcessMonitorMsg, DEFAULT_HANDLE); // Requests handled by DNODE dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index c575266b44..e9c40fdd0f 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -36,6 +36,8 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { code = mmProcessAlterReq(pMgmt, pMsg); } else if (pMsg->rpcMsg.msgType == TDMT_MON_MM_INFO) { code = mmProcessGetMonMmInfoReq(pMgmt->pWrapper, pMsg); + } else if (pMsg->rpcMsg.msgType == TDMT_MON_MM_LOAD) { + code = mmProcessGetMnodeLoadsReq(pMgmt->pWrapper, pMsg); } else { pMsg->pNode = pMgmt->pMnode; code = mndProcessMsg(pMsg); diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index ab8328d038..491c68b010 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -29,7 +29,6 @@ typedef struct SVnodesMgmt { SHashObj *hash; SRWLatch latch; SVnodesStat state; - SVnodesStat lastState; STfs *pTfs; SQWorkerPool queryPool; SQWorkerPool fetchPool; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 751edd6f98..4b59afabd3 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -16,21 +16,42 @@ #define _DEFAULT_SOURCE #include "vmInt.h" -void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *vmInfo) { +void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *pInfo) { SVnodesMgmt *pMgmt = pWrapper->pMgmt; - tfsGetMonitorInfo(pMgmt->pTfs, &vmInfo->tfs); - taosWLockLatch(&pMgmt->latch); - vmInfo->vstat.totalVnodes = pMgmt->state.totalVnodes; - vmInfo->vstat.masterNum = pMgmt->state.masterNum; - vmInfo->vstat.numOfSelectReqs = pMgmt->state.numOfSelectReqs - pMgmt->lastState.numOfSelectReqs; - vmInfo->vstat.numOfInsertReqs = pMgmt->state.numOfInsertReqs - pMgmt->lastState.numOfInsertReqs; - vmInfo->vstat.numOfInsertSuccessReqs = pMgmt->state.numOfInsertSuccessReqs - pMgmt->lastState.numOfInsertSuccessReqs; - vmInfo->vstat.numOfBatchInsertReqs = pMgmt->state.numOfBatchInsertReqs - pMgmt->lastState.numOfBatchInsertReqs; - vmInfo->vstat.numOfBatchInsertSuccessReqs = - pMgmt->state.numOfBatchInsertSuccessReqs - pMgmt->lastState.numOfBatchInsertSuccessReqs; - pMgmt->lastState = pMgmt->state; - taosWUnLockLatch(&pMgmt->latch); + SMonVloadInfo vloads = {0}; + vmGetVnodeLoads(pWrapper, &vloads); + if (vloads.pVloads == NULL) return; + + int32_t totalVnodes = 0; + int32_t masterNum = 0; + int64_t numOfSelectReqs = 0; + int64_t numOfInsertReqs = 0; + int64_t numOfInsertSuccessReqs = 0; + int64_t numOfBatchInsertReqs = 0; + int64_t numOfBatchInsertSuccessReqs = 0; + + for (int32_t i = 0; i < taosArrayGetSize(vloads.pVloads); ++i) { + SVnodeLoad *pLoad = taosArrayGet(vloads.pVloads, i); + numOfSelectReqs += pLoad->numOfSelectReqs; + numOfInsertReqs += pLoad->numOfInsertReqs; + numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs; + numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs; + numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs; + if (pLoad->syncState == TAOS_SYNC_STATE_LEADER) masterNum++; + totalVnodes++; + } + + pInfo->vstat.totalVnodes = totalVnodes; + pInfo->vstat.masterNum = masterNum; + pInfo->vstat.numOfSelectReqs = numOfSelectReqs - pMgmt->state.numOfSelectReqs; + pInfo->vstat.numOfInsertReqs = numOfInsertReqs - pMgmt->state.numOfInsertReqs; + pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs - pMgmt->state.numOfInsertSuccessReqs; + pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs - pMgmt->state.numOfBatchInsertReqs; + pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs - pMgmt->state.numOfBatchInsertSuccessReqs; + pMgmt->state = pInfo->vstat; + + taosArrayDestroy(vloads.pVloads); } int32_t vmProcessGetMonVmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index c8e4393b59..a67533009a 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -356,19 +356,9 @@ void vmSetMgmtFp(SMgmtWrapper *pWrapper) { void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) { SVnodesMgmt *pMgmt = pWrapper->pMgmt; - SVnodesStat *pStat = &pMgmt->state; - SArray *pLoads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad)); - int32_t totalVnodes = 0; - int32_t masterNum = 0; - int64_t numOfSelectReqs = 0; - int64_t numOfInsertReqs = 0; - int64_t numOfInsertSuccessReqs = 0; - int64_t numOfBatchInsertReqs = 0; - int64_t numOfBatchInsertSuccessReqs = 0; - - pInfo->pVloads = pLoads; - if (pLoads == NULL) return; + pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad)); + if (pInfo->pVloads == NULL) return; taosRLockLatch(&pMgmt->latch); @@ -380,28 +370,9 @@ void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) { SVnodeObj *pVnode = *ppVnode; SVnodeLoad vload = {0}; vnodeGetLoad(pVnode->pImpl, &vload); - taosArrayPush(pLoads, &vload); - - numOfSelectReqs += vload.numOfSelectReqs; - numOfInsertReqs += vload.numOfInsertReqs; - numOfInsertSuccessReqs += vload.numOfInsertSuccessReqs; - numOfBatchInsertReqs += vload.numOfBatchInsertReqs; - numOfBatchInsertSuccessReqs += vload.numOfBatchInsertSuccessReqs; - totalVnodes++; - if (vload.role == TAOS_SYNC_STATE_LEADER) masterNum++; - + taosArrayPush(pInfo->pVloads, &vload); pIter = taosHashIterate(pMgmt->hash, pIter); } taosRUnLockLatch(&pMgmt->latch); - - taosWLockLatch(&pMgmt->latch); - pStat->totalVnodes = totalVnodes; - pStat->masterNum = masterNum; - pStat->numOfSelectReqs = numOfSelectReqs; - pStat->numOfInsertReqs = numOfInsertReqs; - pStat->numOfInsertSuccessReqs = numOfInsertSuccessReqs; - pStat->numOfBatchInsertReqs = numOfBatchInsertReqs; - pStat->numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs; - taosWUnLockLatch(&pMgmt->latch); } \ No newline at end of file diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 7b67308876..07e36c7622 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -47,21 +47,6 @@ typedef int32_t (*MndInitFp)(SMnode *pMnode); typedef void (*MndCleanupFp)(SMnode *pMnode); typedef int32_t (*ShowRetrieveFp)(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows); typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter); - -typedef struct SMnodeLoad { - int64_t numOfDnode; - int64_t numOfMnode; - int64_t numOfVgroup; - int64_t numOfDatabase; - int64_t numOfSuperTable; - int64_t numOfChildTable; - int64_t numOfNormalTable; - int64_t numOfColumn; - int64_t totalPoints; - int64_t totalStorage; - int64_t compStorage; -} SMnodeLoad; - typedef struct SQWorkerMgmt SQHandle; typedef struct { @@ -129,7 +114,6 @@ struct SMnode { void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); int64_t mndGenerateUid(char *name, int32_t len); -void mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndMnode.h b/source/dnode/mnode/impl/inc/mndMnode.h index 8041cc8fef..a5cdfa1061 100644 --- a/source/dnode/mnode/impl/inc/mndMnode.h +++ b/source/dnode/mnode/impl/inc/mndMnode.h @@ -22,14 +22,13 @@ extern "C" { #endif -int32_t mndInitMnode(SMnode *pMnode); -void mndCleanupMnode(SMnode *pMnode); -SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId); -void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj); -bool mndIsMnode(SMnode *pMnode, int32_t dnodeId); -void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet); -void mndUpdateMnodeRole(SMnode *pMnode); -const char *mndGetRoleStr(int32_t role); +int32_t mndInitMnode(SMnode *pMnode); +void mndCleanupMnode(SMnode *pMnode); +SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId); +void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj); +bool mndIsMnode(SMnode *pMnode, int32_t dnodeId); +void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet); +void mndUpdateMnodeRole(SMnode *pMnode); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 5ce87a413c..e9b02ca3ad 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -326,7 +326,7 @@ static int32_t mndProcessStatusReq(SNodeMsg *pReq) { SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId); if (pVgroup != NULL) { - if (pVload->role == TAOS_SYNC_STATE_LEADER) { + if (pVload->syncState == TAOS_SYNC_STATE_LEADER) { pVgroup->numOfTables = pVload->numOfTables; pVgroup->numOfTimeSeries = pVload->numOfTimeSeries; pVgroup->totalStorage = pVload->totalStorage; @@ -335,10 +335,10 @@ static int32_t mndProcessStatusReq(SNodeMsg *pReq) { } bool roleChanged = false; for (int32_t vg = 0; vg < pVgroup->replica; ++vg) { - if (pVgroup->vnodeGid[vg].role != pVload->role) { + if (pVgroup->vnodeGid[vg].role != pVload->syncState) { roleChanged = true; } - pVgroup->vnodeGid[vg].role = pVload->role; + pVgroup->vnodeGid[vg].role = pVload->syncState; } if (roleChanged) { // notify scheduler role has changed diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index f403da2f2d..b51c545a6d 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -75,19 +75,6 @@ void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj) { sdbRelease(pMnode->pSdb, pObj); } -const char *mndGetRoleStr(int32_t showType) { - switch (showType) { - case TAOS_SYNC_STATE_FOLLOWER: - return "FOLLOWER"; - case TAOS_SYNC_STATE_CANDIDATE: - return "CANDIDATE"; - case TAOS_SYNC_STATE_LEADER: - return "LEADER"; - default: - return "ERROR"; - } -} - void mndUpdateMnodeRole(SMnode *pMnode) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; @@ -637,7 +624,7 @@ static int32_t mndRetrieveMnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *p pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, b1, false); - const char *roles = mndGetRoleStr(pObj->role); + const char *roles = syncStr(pObj->role); char *b2 = taosMemoryCalloc(1, strlen(roles) + VARSTR_HEADER_SIZE); STR_WITH_MAXSIZE_TO_VARSTR(b2, roles, pShow->bytes[cols]); diff --git a/source/dnode/mnode/impl/src/mndTelem.c b/source/dnode/mnode/impl/src/mndTelem.c index d6c1b6c94f..83a5cf938f 100644 --- a/source/dnode/mnode/impl/src/mndTelem.c +++ b/source/dnode/mnode/impl/src/mndTelem.c @@ -24,20 +24,60 @@ #define TELEMETRY_SERVER "telemetry.taosdata.com" #define TELEMETRY_PORT 80 -static void mndBuildRuntimeInfo(SMnode* pMnode, SJson* pJson) { - SMnodeLoad load = {0}; - mndGetLoad(pMnode, &load); +typedef struct { + int64_t numOfDnode; + int64_t numOfMnode; + int64_t numOfVgroup; + int64_t numOfDatabase; + int64_t numOfSuperTable; + int64_t numOfChildTable; + int64_t numOfNormalTable; + int64_t numOfColumn; + int64_t totalPoints; + int64_t totalStorage; + int64_t compStorage; +} SMnodeStat; - tjsonAddDoubleToObject(pJson, "numOfDnode", load.numOfDnode); - tjsonAddDoubleToObject(pJson, "numOfMnode", load.numOfMnode); - tjsonAddDoubleToObject(pJson, "numOfVgroup", load.numOfVgroup); - tjsonAddDoubleToObject(pJson, "numOfDatabase", load.numOfDatabase); - tjsonAddDoubleToObject(pJson, "numOfSuperTable", load.numOfSuperTable); - tjsonAddDoubleToObject(pJson, "numOfChildTable", load.numOfChildTable); - tjsonAddDoubleToObject(pJson, "numOfColumn", load.numOfColumn); - tjsonAddDoubleToObject(pJson, "numOfPoint", load.totalPoints); - tjsonAddDoubleToObject(pJson, "totalStorage", load.totalStorage); - tjsonAddDoubleToObject(pJson, "compStorage", load.compStorage); +static void mndGetStat(SMnode* pMnode, SMnodeStat* pStat) { + memset(pStat, 0, sizeof(SMnodeStat)); + + SSdb* pSdb = pMnode->pSdb; + pStat->numOfDnode = sdbGetSize(pSdb, SDB_DNODE); + pStat->numOfMnode = sdbGetSize(pSdb, SDB_MNODE); + pStat->numOfVgroup = sdbGetSize(pSdb, SDB_VGROUP); + pStat->numOfDatabase = sdbGetSize(pSdb, SDB_DB); + pStat->numOfSuperTable = sdbGetSize(pSdb, SDB_STB); + + void* pIter = NULL; + while (1) { + SVgObj* pVgroup = NULL; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); + if (pIter == NULL) break; + + pStat->numOfChildTable += pVgroup->numOfTables; + pStat->numOfColumn += pVgroup->numOfTimeSeries; + pStat->totalPoints += pVgroup->pointsWritten; + pStat->totalStorage += pVgroup->totalStorage; + pStat->compStorage += pVgroup->compStorage; + + sdbRelease(pSdb, pVgroup); + } +} + +static void mndBuildRuntimeInfo(SMnode* pMnode, SJson* pJson) { + SMnodeStat mstat = {0}; + mndGetStat(pMnode, &mstat); + + tjsonAddDoubleToObject(pJson, "numOfDnode", mstat.numOfDnode); + tjsonAddDoubleToObject(pJson, "numOfMnode", mstat.numOfMnode); + tjsonAddDoubleToObject(pJson, "numOfVgroup", mstat.numOfVgroup); + tjsonAddDoubleToObject(pJson, "numOfDatabase", mstat.numOfDatabase); + tjsonAddDoubleToObject(pJson, "numOfSuperTable", mstat.numOfSuperTable); + tjsonAddDoubleToObject(pJson, "numOfChildTable", mstat.numOfChildTable); + tjsonAddDoubleToObject(pJson, "numOfColumn", mstat.numOfColumn); + tjsonAddDoubleToObject(pJson, "numOfPoint", mstat.totalPoints); + tjsonAddDoubleToObject(pJson, "totalStorage", mstat.totalStorage); + tjsonAddDoubleToObject(pJson, "compStorage", mstat.compStorage); } static char* mndBuildTelemetryReport(SMnode* pMnode) { diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 75fe409d2a..e0e4d0bac3 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -545,7 +545,7 @@ static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->vnodeGid[i].dnodeId, false); char buf1[20] = {0}; - const char *role = mndGetRoleStr(pVgroup->vnodeGid[i].role); + const char *role = syncStr(pVgroup->vnodeGid[i].role); STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->bytes[cols]); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -636,7 +636,7 @@ static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* p colDataAppend(pColInfo, numOfRows, (const char *)&val, false); char buf[20] = {0}; - STR_TO_VARSTR(buf, mndGetRoleStr(pVgid->role)); + STR_TO_VARSTR(buf, syncStr(pVgid->role)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)buf, false); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 985823653c..13fe01e16e 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -414,31 +414,6 @@ int64_t mndGenerateUid(char *name, int32_t len) { } while (true); } -void mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) { - memset(pLoad, 0, sizeof(SMnodeLoad)); - - SSdb *pSdb = pMnode->pSdb; - pLoad->numOfDnode = sdbGetSize(pSdb, SDB_DNODE); - pLoad->numOfMnode = sdbGetSize(pSdb, SDB_MNODE); - pLoad->numOfVgroup = sdbGetSize(pSdb, SDB_VGROUP); - pLoad->numOfDatabase = sdbGetSize(pSdb, SDB_DB); - pLoad->numOfSuperTable = sdbGetSize(pSdb, SDB_STB); - - void *pIter = NULL; - while (1) { - SVgObj *pVgroup = NULL; - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); - if (pIter == NULL) break; - - pLoad->numOfChildTable += pVgroup->numOfTables; - pLoad->numOfColumn += pVgroup->numOfTimeSeries; - pLoad->totalPoints += pVgroup->pointsWritten; - pLoad->totalStorage += pVgroup->totalStorage; - pLoad->compStorage += pVgroup->compStorage; - - sdbRelease(pSdb, pVgroup); - } -} int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, SMonGrantInfo *pGrantInfo) { @@ -486,7 +461,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr SMonMnodeDesc desc = {0}; desc.mnode_id = pObj->id; tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep)); - tstrncpy(desc.role, mndGetRoleStr(pObj->role), sizeof(desc.role)); + tstrncpy(desc.role, syncStr(pObj->role), sizeof(desc.role)); taosArrayPush(pClusterInfo->mnodes, &desc); sdbRelease(pSdb, pObj); @@ -520,7 +495,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; SMonVnodeDesc *pVnDesc = &desc.vnodes[i]; pVnDesc->dnode_id = pVgid->dnodeId; - tstrncpy(pVnDesc->vnode_role, mndGetRoleStr(pVgid->role), sizeof(pVnDesc->vnode_role)); + tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role)); if (pVgid->role == TAOS_SYNC_STATE_LEADER) { tstrncpy(desc.status, "ready", sizeof(desc.status)); pClusterInfo->vgroups_alive++; @@ -545,3 +520,8 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr return 0; } + +int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) { + pLoad->syncState = pMnode->syncMgmt.state; + return 0; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 4202c02a0c..3747e1dbc7 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -152,7 +152,7 @@ _exit: int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { pLoad->vgId = TD_VID(pVnode); - pLoad->role = TAOS_SYNC_STATE_LEADER; + pLoad->syncState = TAOS_SYNC_STATE_LEADER; pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); pLoad->numOfTimeSeries = 400; pLoad->totalStorage = 300; diff --git a/source/libs/monitor/src/monMsg.c b/source/libs/monitor/src/monMsg.c index adacbf479b..24ea474cea 100644 --- a/source/libs/monitor/src/monMsg.c +++ b/source/libs/monitor/src/monMsg.c @@ -473,7 +473,7 @@ int32_t tSerializeSMonVloadInfo(void *buf, int32_t bufLen, SMonVloadInfo *pInfo) for (int32_t i = 0; i < taosArrayGetSize(pInfo->pVloads); ++i) { SVnodeLoad *pLoad = taosArrayGet(pInfo->pVloads, i); if (tEncodeI32(&encoder, pLoad->vgId) < 0) return -1; - if (tEncodeI8(&encoder, pLoad->role) < 0) return -1; + if (tEncodeI32(&encoder, pLoad->syncState) < 0) return -1; if (tEncodeI64(&encoder, pLoad->numOfTables) < 0) return -1; if (tEncodeI64(&encoder, pLoad->numOfTimeSeries) < 0) return -1; if (tEncodeI64(&encoder, pLoad->totalStorage) < 0) return -1; @@ -507,7 +507,7 @@ int32_t tDeserializeSMonVloadInfo(void *buf, int32_t bufLen, SMonVloadInfo *pInf for (int32_t i = 0; i < arraySize; ++i) { SVnodeLoad load = {0}; if (tDecodeI32(&decoder, &load.vgId) < 0) return -1; - if (tDecodeI8(&decoder, &load.role) < 0) return -1; + if (tDecodeI32(&decoder, &load.syncState) < 0) return -1; if (tDecodeI64(&decoder, &load.numOfTables) < 0) return -1; if (tDecodeI64(&decoder, &load.numOfTimeSeries) < 0) return -1; if (tDecodeI64(&decoder, &load.totalStorage) < 0) return -1; @@ -530,3 +530,30 @@ void tFreeSMonVloadInfo(SMonVloadInfo *pInfo) { taosArrayDestroy(pInfo->pVloads); pInfo->pVloads = NULL; } + +int32_t tSerializeSMonMloadInfo(void *buf, int32_t bufLen, SMonMloadInfo *pInfo) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI8(&encoder, pInfo->isMnode) < 0) return -1; + if (tEncodeI32(&encoder, pInfo->load.syncState) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMonMloadInfo(void *buf, int32_t bufLen, SMonMloadInfo *pInfo) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI8(&decoder, &pInfo->isMnode) < 0) return -1; + if (tDecodeI32(&decoder, &pInfo->load.syncState) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} \ No newline at end of file diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 78e454309a..fe32b136b7 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -929,4 +929,17 @@ static void syncFreeNode(void* param) { syncNodePrint2((char*)"==syncFreeNode==", pNode); taosMemoryFree(pNode); +} + +const char* syncStr(ESyncState state) { + switch (state) { + case TAOS_SYNC_STATE_FOLLOWER: + return "FOLLOWER"; + case TAOS_SYNC_STATE_CANDIDATE: + return "CANDIDATE"; + case TAOS_SYNC_STATE_LEADER: + return "LEADER"; + default: + return "ERROR"; + } } \ No newline at end of file