diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 7d5a907fda..d8456ad626 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "vmInt.h" -void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { +void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) { pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad)); if (pInfo->pVloads == NULL) return; @@ -30,6 +30,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { SVnodeObj *pVnode = *ppVnode; SVnodeLoad vload = {0}; vnodeGetLoad(pVnode->pImpl, &vload); + if (isReset) vnodeResetLoad(pVnode->pImpl, &vload); taosArrayPush(pInfo->pVloads, &vload); pIter = taosHashIterate(pMgmt->hash, pIter); } @@ -39,7 +40,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { SMonVloadInfo vloads = {0}; - vmGetVnodeLoads(pMgmt, &vloads); + vmGetVnodeLoads(pMgmt, &vloads, true); SArray *pVloads = vloads.pVloads; if (pVloads == NULL) return; @@ -66,10 +67,10 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { pInfo->vstat.totalVnodes = totalVnodes; pInfo->vstat.masterNum = masterNum; pInfo->vstat.numOfSelectReqs = numOfSelectReqs - pMgmt->state.numOfSelectReqs; - pInfo->vstat.numOfInsertReqs = numOfInsertReqs - pMgmt->state.numOfInsertReqs; - pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs - pMgmt->state.numOfInsertSuccessReqs; - pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs - pMgmt->state.numOfBatchInsertReqs; - pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs - pMgmt->state.numOfBatchInsertSuccessReqs; + pInfo->vstat.numOfInsertReqs = numOfInsertReqs; // delta + pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs; // delta + pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs; // delta + pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs; // delta pMgmt->state.totalVnodes = totalVnodes; pMgmt->state.masterNum = masterNum; pMgmt->state.numOfSelectReqs = numOfSelectReqs; @@ -109,7 +110,7 @@ int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMonVloadInfo vloads = {0}; - vmGetVnodeLoads(pMgmt, &vloads); + vmGetVnodeLoads(pMgmt, &vloads, false); int32_t rspLen = tSerializeSMonVloadInfo(NULL, 0, &vloads); if (rspLen < 0) { diff --git a/source/dnode/mgmt/node_mgmt/inc/dmNodes.h b/source/dnode/mgmt/node_mgmt/inc/dmNodes.h index 8c2d57808f..d3f1044f88 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmNodes.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmNodes.h @@ -35,7 +35,7 @@ void qmGetMonitorInfo(void *pMgmt, SMonQmInfo *pInfo); void smGetMonitorInfo(void *pMgmt, SMonSmInfo *pInfo); void bmGetMonitorInfo(void *pMgmt, SMonBmInfo *pInfo); -void vmGetVnodeLoads(void *pMgmt, SMonVloadInfo *pInfo); +void vmGetVnodeLoads(void *pMgmt, SMonVloadInfo *pInfo, bool isReset); void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo); void qmGetQnodeLoads(void *pMgmt, SQnodeLoad *pInfo); diff --git a/source/dnode/mgmt/node_mgmt/src/dmMonitor.c b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c index ecad390ef9..50d6aca53e 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMonitor.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c @@ -152,7 +152,7 @@ void dmGetVnodeLoads(SMonVloadInfo *pInfo) { if (tsMultiProcess) { dmSendLocalRecv(pDnode, TDMT_MON_VM_LOAD, tDeserializeSMonVloadInfo, pInfo); } else if (pWrapper->pMgmt != NULL) { - vmGetVnodeLoads(pWrapper->pMgmt, pInfo); + vmGetVnodeLoads(pWrapper->pMgmt, pInfo, false); } dmReleaseWrapper(pWrapper); } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 9926c5a8c4..d006f12a3f 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -73,6 +73,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num); int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num); int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num); +void vnodeResetLoad(SVnode *pVnode, SVnodeLoad *pLoad); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 2a8a74d297..8fe20def62 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -56,6 +56,7 @@ typedef struct SSma SSma; typedef struct STsdb STsdb; typedef struct STQ STQ; typedef struct SVState SVState; +typedef struct SVStatis SVStatis; typedef struct SVBufPool SVBufPool; typedef struct SQWorker SQHandle; typedef struct STsdbKeepCfg STsdbKeepCfg; @@ -284,9 +285,17 @@ struct SVState { int64_t commitTerm; }; +struct SVStatis { + int64_t nInsert; // delta + int64_t nInsertSuccess; // delta + int64_t nBatchInsert; // delta + int64_t nBatchInsertSuccess; // delta +}; + struct SVnodeInfo { SVnodeCfg config; SVState state; + SVStatis statis; }; typedef enum { @@ -309,6 +318,7 @@ struct SVnode { char* path; SVnodeCfg config; SVState state; + SVStatis statis; STfs* pTfs; SMsgCb msgCb; TdThreadMutex mutex; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 0d57c7bb74..030ba65442 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -15,6 +15,12 @@ #include "vnd.h" +#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType) \ + do { \ + int##vType##_t newVal = atomic_sub_fetch_##vType(&(pVar), (oVal)); \ + ASSERT(newVal >= 0); \ + } while (0) + int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), (void **)&pVnode->pQuery, &pVnode->msgCb); } @@ -375,13 +381,28 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { pLoad->compStorage = (int64_t)2 * 1073741824; pLoad->pointsWritten = 100; pLoad->numOfSelectReqs = 1; - pLoad->numOfInsertReqs = 3; - pLoad->numOfInsertSuccessReqs = 2; - pLoad->numOfBatchInsertReqs = 5; - pLoad->numOfBatchInsertSuccessReqs = 4; + pLoad->numOfInsertReqs = atomic_load_64(&pVnode->statis.nInsert); + pLoad->numOfInsertSuccessReqs = atomic_load_64(&pVnode->statis.nInsertSuccess); + pLoad->numOfBatchInsertReqs = atomic_load_64(&pVnode->statis.nBatchInsert); + pLoad->numOfBatchInsertSuccessReqs = atomic_load_64(&pVnode->statis.nBatchInsertSuccess); return 0; } +/** + * @brief Reset the statistics value by monitor interval + * + * @param pVnode + * @param pLoad + */ +void vnodeResetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { + VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsert, pLoad->numOfInsertReqs, 64); + VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsertSuccess, pLoad->numOfInsertSuccessReqs, 64); + VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsert, pLoad->numOfBatchInsertReqs, 64); + VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsertSuccess, pLoad->numOfBatchInsertSuccessReqs, 64); + + +} + void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) { if (dbname) { *dbname = pVnode->config.dbname; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 295e480c6a..2482ca1a5a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -812,10 +812,10 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; SSubmitRsp submitRsp = {0}; SSubmitMsgIter msgIter = {0}; - SSubmitBlk *pBlock; + SSubmitBlk *pBlock = NULL; SVCreateTbReq createTbReq = {0}; SDecoder decoder = {0}; - int32_t nRows; + int32_t nRows = 0; int32_t tsize, ret; SEncoder encoder = {0}; SArray *newTbUids = NULL; @@ -823,6 +823,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq pRsp->code = 0; pSubmitReq->version = version; + atomic_fetch_add_64(&pVnode->statis.nBatchInsert, 1); #ifdef TD_DEBUG_PRINT_ROW vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__); @@ -942,11 +943,15 @@ _exit: taosArrayDestroyEx(submitRsp.pArray, tFreeSSubmitBlkRsp); + atomic_fetch_add_64(&pVnode->statis.nInsert, submitRsp.numOfRows); + atomic_fetch_add_64(&pVnode->statis.nInsertSuccess, submitRsp.affectedRows); + // TODO: the partial success scenario and the error case // => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level // 1/level 2. // TODO: refactor if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) { + atomic_fetch_add_64(&pVnode->statis.nBatchInsertSuccess, 1); tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT); }