From fe115ab02e4b41a98e6e7b0686d8277b8ee7050d Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Mon, 10 Oct 2022 18:20:53 +0800 Subject: [PATCH 1/6] fix: support statistics of insert_req --- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 9 +++++ source/dnode/vnode/src/inc/vnodeInt.h | 10 ++++++ source/dnode/vnode/src/vnd/vnodeCommit.c | 39 +++++++++++++++++++++ source/dnode/vnode/src/vnd/vnodeOpen.c | 4 +++ source/dnode/vnode/src/vnd/vnodeQuery.c | 8 ++--- source/dnode/vnode/src/vnd/vnodeSvr.c | 9 +++-- 6 files changed, 73 insertions(+), 6 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 4047bc2340..5451fe33f1 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -78,6 +78,15 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs; pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs; + printf("%s:%d: Info: nInsert:%" PRIi64 ", nInsertSuccess:%" PRIi64 ", nBatch:%" PRIi64 ", nBatchSuccess:%" PRIi64 + "\n", + __func__, __LINE__, pInfo->vstat.numOfInsertReqs, pInfo->vstat.numOfInsertSuccessReqs, + pInfo->vstat.numOfBatchInsertReqs, pInfo->vstat.numOfBatchInsertSuccessReqs); + printf("%s:%d: Mgmt: nInsert:%" PRIi64 ", nInsertSuccess:%" PRIi64 ", nBatch:%" PRIi64 ", nBatchSuccess:%" PRIi64 + "\n", + __func__, __LINE__, pMgmt->state.numOfInsertReqs, pMgmt->state.numOfInsertSuccessReqs, + pMgmt->state.numOfBatchInsertReqs, pMgmt->state.numOfBatchInsertSuccessReqs); + tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs); taosArrayDestroy(pVloads); } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 2a8a74d297..d4b88abb1d 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -56,6 +56,7 @@ typedef struct SSma SSma; typedef struct STsdb STsdb; typedef struct STQ STQ; typedef struct SVState SVState; +typedef struct SVStatis SVStatis; typedef struct SVBufPool SVBufPool; typedef struct SQWorker SQHandle; typedef struct STsdbKeepCfg STsdbKeepCfg; @@ -284,9 +285,17 @@ struct SVState { int64_t commitTerm; }; +struct SVStatis { + int64_t nInsert; + int64_t nInsertSuccess; + int64_t nBatchInsert; + int64_t nBatchInsertSuccess; +}; + struct SVnodeInfo { SVnodeCfg config; SVState state; + SVStatis statis; }; typedef enum { @@ -309,6 +318,7 @@ struct SVnode { char* path; SVnodeCfg config; SVState state; + SVStatis statis; STfs* pTfs; SMsgCb msgCb; TdThreadMutex mutex; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 6dc3ef86a7..ae55d50456 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -227,6 +227,11 @@ int vnodeCommit(SVnode *pVnode) { info.state.committed = pVnode->state.applied; info.state.commitTerm = pVnode->state.applyTerm; info.state.commitID = pVnode->state.commitID; + info.statis.nInsert = pVnode->statis.nInsert; + info.statis.nInsertSuccess = pVnode->statis.nInsertSuccess; + info.statis.nBatchInsert = pVnode->statis.nBatchInsert; + info.statis.nBatchInsertSuccess = pVnode->statis.nBatchInsertSuccess; + snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); if (vnodeSaveInfo(dir, &info) < 0) { ASSERT(0); @@ -352,6 +357,32 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) { return 0; } +static int vnodeEncodeStatis(const void *pObj, SJson *pJson) { + const SVStatis *pStatis = (SVStatis *)pObj; + + if (tjsonAddIntegerToObject(pJson, "insert", pStatis->nInsert) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "insert success", pStatis->nInsertSuccess) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "batch insert", pStatis->nBatchInsert) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "batch insert success", pStatis->nBatchInsertSuccess) < 0) return -1; + + return 0; +} + +static int vnodeDecodeStatis(const SJson *pJson, void *pObj) { + SVStatis *pStatis = (SVStatis *)pObj; + + int32_t code; + tjsonGetNumberValue(pJson, "insert", pStatis->nInsert, code); + if (code < 0) return -1; + tjsonGetNumberValue(pJson, "insert success", pStatis->nInsertSuccess, code); + if (code < 0) return -1; + tjsonGetNumberValue(pJson, "batch insert", pStatis->nBatchInsert, code); + if (code < 0) return -1; + tjsonGetNumberValue(pJson, "batch insert success", pStatis->nBatchInsertSuccess, code); + if (code < 0) return -1; + return 0; +} + static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) { SJson *pJson; char *pData; @@ -371,6 +402,10 @@ static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) { goto _err; } + if (tjsonAddObject(pJson, "statis", vnodeEncodeStatis, (void *)&pInfo->statis) < 0) { + goto _err; + } + pData = tjsonToString(pJson); if (pData == NULL) { goto _err; @@ -402,6 +437,10 @@ static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) { goto _err; } + if (tjsonToObject(pJson, "statis", vnodeDecodeStatis, (void *)&pInfo->statis) < 0) { + goto _err; + } + tjsonDelete(pJson); return 0; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 616aa39bdf..cd87784732 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -85,6 +85,10 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { pVnode->state.applied = info.state.committed; pVnode->state.commitID = info.state.commitID; pVnode->state.commitTerm = info.state.commitTerm; + pVnode->statis.nInsert = info.statis.nInsert; + pVnode->statis.nInsertSuccess = info.statis.nInsertSuccess; + pVnode->statis.nBatchInsert = info.statis.nBatchInsert; + pVnode->statis.nBatchInsertSuccess = info.statis.nBatchInsertSuccess; pVnode->pTfs = pTfs; pVnode->msgCb = msgCb; taosThreadMutexInit(&pVnode->lock, NULL); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 0d57c7bb74..ba8fbaf553 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -375,10 +375,10 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { pLoad->compStorage = (int64_t)2 * 1073741824; pLoad->pointsWritten = 100; pLoad->numOfSelectReqs = 1; - pLoad->numOfInsertReqs = 3; - pLoad->numOfInsertSuccessReqs = 2; - pLoad->numOfBatchInsertReqs = 5; - pLoad->numOfBatchInsertSuccessReqs = 4; + pLoad->numOfInsertReqs = atomic_load_64(&pVnode->statis.nInsert); + pLoad->numOfInsertSuccessReqs = atomic_load_64(&pVnode->statis.nInsertSuccess); + pLoad->numOfBatchInsertReqs = atomic_load_64(&pVnode->statis.nBatchInsert); + pLoad->numOfBatchInsertSuccessReqs = atomic_load_64(&pVnode->statis.nBatchInsertSuccess); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 5c8590c7c9..bff836faf5 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -812,10 +812,10 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; SSubmitRsp submitRsp = {0}; SSubmitMsgIter msgIter = {0}; - SSubmitBlk *pBlock; + SSubmitBlk *pBlock = NULL; SVCreateTbReq createTbReq = {0}; SDecoder decoder = {0}; - int32_t nRows; + int32_t nRows = 0; int32_t tsize, ret; SEncoder encoder = {0}; SArray *newTbUids = NULL; @@ -823,6 +823,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq pRsp->code = 0; pSubmitReq->version = version; + atomic_fetch_add_64(&pVnode->statis.nBatchInsert, 1); #ifdef TD_DEBUG_PRINT_ROW vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__); @@ -942,12 +943,16 @@ _exit: taosArrayDestroyEx(submitRsp.pArray, tFreeSSubmitBlkRsp); + atomic_fetch_add_64(&pVnode->statis.nInsert, submitRsp.numOfRows); + atomic_fetch_add_64(&pVnode->statis.nInsertSuccess, submitRsp.affectedRows); + // TODO: the partial success scenario and the error case // => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level // 1/level 2. // TODO: refactor if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) { tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT); + atomic_fetch_add_64(&pVnode->statis.nBatchInsertSuccess, 1); } vDebug("vgId:%d, submit success, index:%" PRId64, pVnode->config.vgId, version); From 7da3ed2748b9dd80726006c1ae2b5c5247b43836 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 11 Oct 2022 15:36:35 +0800 Subject: [PATCH 2/6] fix: support statistics of insert_req --- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 8 ++-- source/dnode/vnode/src/vnd/vnodeCommit.c | 38 ------------------ source/dnode/vnode/src/vnd/vnodeOpen.c | 4 -- source/dnode/vnode/src/vnd/vnodeQuery.c | 44 ++++++++++++++------- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- 5 files changed, 34 insertions(+), 62 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 5451fe33f1..094d0559d5 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -66,10 +66,10 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { pInfo->vstat.totalVnodes = totalVnodes; pInfo->vstat.masterNum = masterNum; pInfo->vstat.numOfSelectReqs = numOfSelectReqs - pMgmt->state.numOfSelectReqs; - pInfo->vstat.numOfInsertReqs = numOfInsertReqs - pMgmt->state.numOfInsertReqs; - pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs - pMgmt->state.numOfInsertSuccessReqs; - pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs - pMgmt->state.numOfBatchInsertReqs; - pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs - pMgmt->state.numOfBatchInsertSuccessReqs; + pInfo->vstat.numOfInsertReqs = numOfInsertReqs; + pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs; + pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs; + pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs; pMgmt->state.totalVnodes = totalVnodes; pMgmt->state.masterNum = masterNum; pMgmt->state.numOfSelectReqs = numOfSelectReqs; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index ae55d50456..922b5ac5d8 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -227,10 +227,6 @@ int vnodeCommit(SVnode *pVnode) { info.state.committed = pVnode->state.applied; info.state.commitTerm = pVnode->state.applyTerm; info.state.commitID = pVnode->state.commitID; - info.statis.nInsert = pVnode->statis.nInsert; - info.statis.nInsertSuccess = pVnode->statis.nInsertSuccess; - info.statis.nBatchInsert = pVnode->statis.nBatchInsert; - info.statis.nBatchInsertSuccess = pVnode->statis.nBatchInsertSuccess; snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); if (vnodeSaveInfo(dir, &info) < 0) { @@ -357,32 +353,6 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) { return 0; } -static int vnodeEncodeStatis(const void *pObj, SJson *pJson) { - const SVStatis *pStatis = (SVStatis *)pObj; - - if (tjsonAddIntegerToObject(pJson, "insert", pStatis->nInsert) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "insert success", pStatis->nInsertSuccess) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "batch insert", pStatis->nBatchInsert) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "batch insert success", pStatis->nBatchInsertSuccess) < 0) return -1; - - return 0; -} - -static int vnodeDecodeStatis(const SJson *pJson, void *pObj) { - SVStatis *pStatis = (SVStatis *)pObj; - - int32_t code; - tjsonGetNumberValue(pJson, "insert", pStatis->nInsert, code); - if (code < 0) return -1; - tjsonGetNumberValue(pJson, "insert success", pStatis->nInsertSuccess, code); - if (code < 0) return -1; - tjsonGetNumberValue(pJson, "batch insert", pStatis->nBatchInsert, code); - if (code < 0) return -1; - tjsonGetNumberValue(pJson, "batch insert success", pStatis->nBatchInsertSuccess, code); - if (code < 0) return -1; - return 0; -} - static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) { SJson *pJson; char *pData; @@ -402,10 +372,6 @@ static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) { goto _err; } - if (tjsonAddObject(pJson, "statis", vnodeEncodeStatis, (void *)&pInfo->statis) < 0) { - goto _err; - } - pData = tjsonToString(pJson); if (pData == NULL) { goto _err; @@ -437,10 +403,6 @@ static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) { goto _err; } - if (tjsonToObject(pJson, "statis", vnodeDecodeStatis, (void *)&pInfo->statis) < 0) { - goto _err; - } - tjsonDelete(pJson); return 0; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index cd87784732..616aa39bdf 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -85,10 +85,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { pVnode->state.applied = info.state.committed; pVnode->state.commitID = info.state.commitID; pVnode->state.commitTerm = info.state.commitTerm; - pVnode->statis.nInsert = info.statis.nInsert; - pVnode->statis.nInsertSuccess = info.statis.nInsertSuccess; - pVnode->statis.nBatchInsert = info.statis.nBatchInsert; - pVnode->statis.nBatchInsertSuccess = info.statis.nBatchInsertSuccess; pVnode->pTfs = pTfs; pVnode->msgCb = msgCb; taosThreadMutexInit(&pVnode->lock, NULL); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index ba8fbaf553..77499ee684 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -365,22 +365,36 @@ _exit: return code; } +#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, type) \ + do { \ + if (oVal != atomic_val_compare_exchange_##type(&pVar, oVal, 0)) { \ + int##type##_t tmpVal = atomic_sub_fetch_##type(&pVar, oVal); \ + ASSERT(tmpVal >= 0); \ + } \ + } while (0) + int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { - pLoad->vgId = TD_VID(pVnode); - pLoad->syncState = syncGetMyRole(pVnode->sync); - pLoad->cacheUsage = tsdbCacheGetUsage(pVnode); - pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); - pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta); - pLoad->totalStorage = (int64_t)3 * 1073741824; - pLoad->compStorage = (int64_t)2 * 1073741824; - pLoad->pointsWritten = 100; - pLoad->numOfSelectReqs = 1; - pLoad->numOfInsertReqs = atomic_load_64(&pVnode->statis.nInsert); - pLoad->numOfInsertSuccessReqs = atomic_load_64(&pVnode->statis.nInsertSuccess); - pLoad->numOfBatchInsertReqs = atomic_load_64(&pVnode->statis.nBatchInsert); - pLoad->numOfBatchInsertSuccessReqs = atomic_load_64(&pVnode->statis.nBatchInsertSuccess); - return 0; -} + pLoad->vgId = TD_VID(pVnode); + pLoad->syncState = syncGetMyRole(pVnode->sync); + pLoad->cacheUsage = tsdbCacheGetUsage(pVnode); + pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); + pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta); + pLoad->totalStorage = (int64_t)3 * 1073741824; + pLoad->compStorage = (int64_t)2 * 1073741824; + pLoad->pointsWritten = 100; + pLoad->numOfSelectReqs = 1; + pLoad->numOfInsertReqs = atomic_load_64(&pVnode->statis.nInsert); + pLoad->numOfInsertSuccessReqs = atomic_load_64(&pVnode->statis.nInsertSuccess); + pLoad->numOfBatchInsertReqs = atomic_load_64(&pVnode->statis.nBatchInsert); + pLoad->numOfBatchInsertSuccessReqs = atomic_load_64(&pVnode->statis.nBatchInsertSuccess); + + VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsert, pLoad->numOfInsertReqs, 64); + VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsertSuccess, pLoad->numOfInsertSuccessReqs, 64); + VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsert, pLoad->numOfBatchInsertReqs, 64); + VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsertSuccess, pLoad->numOfBatchInsertSuccessReqs, 64); + + return 0; + } void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) { if (dbname) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index bff836faf5..6d72c0dec6 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -951,8 +951,8 @@ _exit: // 1/level 2. // TODO: refactor if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) { - tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT); atomic_fetch_add_64(&pVnode->statis.nBatchInsertSuccess, 1); + tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT); } vDebug("vgId:%d, submit success, index:%" PRId64, pVnode->config.vgId, version); From 02979f081d8bfc0cd11622c0e282f298769763b1 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 11 Oct 2022 17:47:23 +0800 Subject: [PATCH 3/6] fix: support statistics of insert_req --- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 25 ++++------- source/dnode/mgmt/node_mgmt/inc/dmNodes.h | 2 +- source/dnode/mgmt/node_mgmt/src/dmMonitor.c | 2 +- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/inc/vnodeInt.h | 8 ++-- source/dnode/vnode/src/vnd/vnodeQuery.c | 50 +++++++++++---------- 6 files changed, 41 insertions(+), 48 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 094d0559d5..7d8b8dc0f6 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "vmInt.h" -void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { +void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) { pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad)); if (pInfo->pVloads == NULL) return; @@ -29,7 +29,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { SVnodeObj *pVnode = *ppVnode; SVnodeLoad vload = {0}; - vnodeGetLoad(pVnode->pImpl, &vload); + vnodeGetLoad(pVnode->pImpl, &vload, isReset); taosArrayPush(pInfo->pVloads, &vload); pIter = taosHashIterate(pMgmt->hash, pIter); } @@ -39,7 +39,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { SMonVloadInfo vloads = {0}; - vmGetVnodeLoads(pMgmt, &vloads); + vmGetVnodeLoads(pMgmt, &vloads, true); SArray *pVloads = vloads.pVloads; if (pVloads == NULL) return; @@ -66,10 +66,10 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { pInfo->vstat.totalVnodes = totalVnodes; pInfo->vstat.masterNum = masterNum; pInfo->vstat.numOfSelectReqs = numOfSelectReqs - pMgmt->state.numOfSelectReqs; - pInfo->vstat.numOfInsertReqs = numOfInsertReqs; - pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs; - pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs; - pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs; + pInfo->vstat.numOfInsertReqs = numOfInsertReqs; // delta + pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs; // delta + pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs; // delta + pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs; // delta pMgmt->state.totalVnodes = totalVnodes; pMgmt->state.masterNum = masterNum; pMgmt->state.numOfSelectReqs = numOfSelectReqs; @@ -78,15 +78,6 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs; pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs; - printf("%s:%d: Info: nInsert:%" PRIi64 ", nInsertSuccess:%" PRIi64 ", nBatch:%" PRIi64 ", nBatchSuccess:%" PRIi64 - "\n", - __func__, __LINE__, pInfo->vstat.numOfInsertReqs, pInfo->vstat.numOfInsertSuccessReqs, - pInfo->vstat.numOfBatchInsertReqs, pInfo->vstat.numOfBatchInsertSuccessReqs); - printf("%s:%d: Mgmt: nInsert:%" PRIi64 ", nInsertSuccess:%" PRIi64 ", nBatch:%" PRIi64 ", nBatchSuccess:%" PRIi64 - "\n", - __func__, __LINE__, pMgmt->state.numOfInsertReqs, pMgmt->state.numOfInsertSuccessReqs, - pMgmt->state.numOfBatchInsertReqs, pMgmt->state.numOfBatchInsertSuccessReqs); - tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs); taosArrayDestroy(pVloads); } @@ -118,7 +109,7 @@ int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMonVloadInfo vloads = {0}; - vmGetVnodeLoads(pMgmt, &vloads); + vmGetVnodeLoads(pMgmt, &vloads, false); int32_t rspLen = tSerializeSMonVloadInfo(NULL, 0, &vloads); if (rspLen < 0) { diff --git a/source/dnode/mgmt/node_mgmt/inc/dmNodes.h b/source/dnode/mgmt/node_mgmt/inc/dmNodes.h index 8c2d57808f..d3f1044f88 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmNodes.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmNodes.h @@ -35,7 +35,7 @@ void qmGetMonitorInfo(void *pMgmt, SMonQmInfo *pInfo); void smGetMonitorInfo(void *pMgmt, SMonSmInfo *pInfo); void bmGetMonitorInfo(void *pMgmt, SMonBmInfo *pInfo); -void vmGetVnodeLoads(void *pMgmt, SMonVloadInfo *pInfo); +void vmGetVnodeLoads(void *pMgmt, SMonVloadInfo *pInfo, bool isReset); void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo); void qmGetQnodeLoads(void *pMgmt, SQnodeLoad *pInfo); diff --git a/source/dnode/mgmt/node_mgmt/src/dmMonitor.c b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c index ecad390ef9..50d6aca53e 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMonitor.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c @@ -152,7 +152,7 @@ void dmGetVnodeLoads(SMonVloadInfo *pInfo) { if (tsMultiProcess) { dmSendLocalRecv(pDnode, TDMT_MON_VM_LOAD, tDeserializeSMonVloadInfo, pInfo); } else if (pWrapper->pMgmt != NULL) { - vmGetVnodeLoads(pWrapper->pMgmt, pInfo); + vmGetVnodeLoads(pWrapper->pMgmt, pInfo, false); } dmReleaseWrapper(pWrapper); } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index ba16bad7cf..ec300b5f9c 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -73,7 +73,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num); int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num); int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num); -int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); +int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad, bool isReset); int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName); int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index d4b88abb1d..8fe20def62 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -286,10 +286,10 @@ struct SVState { }; struct SVStatis { - int64_t nInsert; - int64_t nInsertSuccess; - int64_t nBatchInsert; - int64_t nBatchInsertSuccess; + int64_t nInsert; // delta + int64_t nInsertSuccess; // delta + int64_t nBatchInsert; // delta + int64_t nBatchInsertSuccess; // delta }; struct SVnodeInfo { diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 77499ee684..545bacb673 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -15,6 +15,14 @@ #include "vnd.h" +#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType) \ + do { \ + if ((oVal) != atomic_val_compare_exchange_##vType(&(pVar), (oVal), 0)) { \ + int##vType##_t newVal = atomic_sub_fetch_##vType(&(pVar), (oVal)); \ + ASSERT(newVal >= 0); \ + } \ + } while (0) + int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), (void **)&pVnode->pQuery, &pVnode->msgCb); } @@ -365,37 +373,31 @@ _exit: return code; } -#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, type) \ - do { \ - if (oVal != atomic_val_compare_exchange_##type(&pVar, oVal, 0)) { \ - int##type##_t tmpVal = atomic_sub_fetch_##type(&pVar, oVal); \ - ASSERT(tmpVal >= 0); \ - } \ - } while (0) - -int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { - pLoad->vgId = TD_VID(pVnode); - pLoad->syncState = syncGetMyRole(pVnode->sync); - pLoad->cacheUsage = tsdbCacheGetUsage(pVnode); - pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); - pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta); - pLoad->totalStorage = (int64_t)3 * 1073741824; - pLoad->compStorage = (int64_t)2 * 1073741824; - pLoad->pointsWritten = 100; - pLoad->numOfSelectReqs = 1; - pLoad->numOfInsertReqs = atomic_load_64(&pVnode->statis.nInsert); - pLoad->numOfInsertSuccessReqs = atomic_load_64(&pVnode->statis.nInsertSuccess); - pLoad->numOfBatchInsertReqs = atomic_load_64(&pVnode->statis.nBatchInsert); - pLoad->numOfBatchInsertSuccessReqs = atomic_load_64(&pVnode->statis.nBatchInsertSuccess); +int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad, bool isReset) { + pLoad->vgId = TD_VID(pVnode); + pLoad->syncState = syncGetMyRole(pVnode->sync); + pLoad->cacheUsage = tsdbCacheGetUsage(pVnode); + pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); + pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta); + pLoad->totalStorage = (int64_t)3 * 1073741824; + pLoad->compStorage = (int64_t)2 * 1073741824; + pLoad->pointsWritten = 100; + pLoad->numOfSelectReqs = 1; + pLoad->numOfInsertReqs = atomic_load_64(&pVnode->statis.nInsert); + pLoad->numOfInsertSuccessReqs = atomic_load_64(&pVnode->statis.nInsertSuccess); + pLoad->numOfBatchInsertReqs = atomic_load_64(&pVnode->statis.nBatchInsert); + pLoad->numOfBatchInsertSuccessReqs = atomic_load_64(&pVnode->statis.nBatchInsertSuccess); + if (isReset) { VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsert, pLoad->numOfInsertReqs, 64); VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsertSuccess, pLoad->numOfInsertSuccessReqs, 64); VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsert, pLoad->numOfBatchInsertReqs, 64); VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsertSuccess, pLoad->numOfBatchInsertSuccessReqs, 64); - - return 0; } + return 0; +} + void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) { if (dbname) { *dbname = pVnode->config.dbname; From 6680ed32d2f0a9f10cf53df40aef50576b52d18c Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 11 Oct 2022 17:50:26 +0800 Subject: [PATCH 4/6] chore: code format revert --- source/dnode/vnode/src/vnd/vnodeCommit.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 922b5ac5d8..6dc3ef86a7 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -227,7 +227,6 @@ int vnodeCommit(SVnode *pVnode) { info.state.committed = pVnode->state.applied; info.state.commitTerm = pVnode->state.applyTerm; info.state.commitID = pVnode->state.commitID; - snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); if (vnodeSaveInfo(dir, &info) < 0) { ASSERT(0); From 114ae8dbdaa79628d23b7afe1303efbec6dcf7d7 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 11 Oct 2022 18:22:29 +0800 Subject: [PATCH 5/6] fix: support statistics of insert_req --- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 3 ++- source/dnode/vnode/inc/vnode.h | 3 ++- source/dnode/vnode/src/vnd/vnodeQuery.c | 23 +++++++++++++-------- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 7d8b8dc0f6..a9e83e7904 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -29,7 +29,8 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) { SVnodeObj *pVnode = *ppVnode; SVnodeLoad vload = {0}; - vnodeGetLoad(pVnode->pImpl, &vload, isReset); + vnodeGetLoad(pVnode->pImpl, &vload); + if (isReset) vnodeResetLoad(pVnode->pImpl, &vload); taosArrayPush(pInfo->pVloads, &vload); pIter = taosHashIterate(pMgmt->hash, pIter); } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index ec300b5f9c..70d55d0320 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -73,7 +73,8 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num); int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num); int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num); -int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad, bool isReset); +void vnodeResetLoad(SVnode *pVnode, SVnodeLoad *pLoad); +int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName); int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 545bacb673..d24439c850 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -373,7 +373,7 @@ _exit: return code; } -int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad, bool isReset) { +int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { pLoad->vgId = TD_VID(pVnode); pLoad->syncState = syncGetMyRole(pVnode->sync); pLoad->cacheUsage = tsdbCacheGetUsage(pVnode); @@ -387,17 +387,22 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad, bool isReset) { pLoad->numOfInsertSuccessReqs = atomic_load_64(&pVnode->statis.nInsertSuccess); pLoad->numOfBatchInsertReqs = atomic_load_64(&pVnode->statis.nBatchInsert); pLoad->numOfBatchInsertSuccessReqs = atomic_load_64(&pVnode->statis.nBatchInsertSuccess); - - if (isReset) { - VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsert, pLoad->numOfInsertReqs, 64); - VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsertSuccess, pLoad->numOfInsertSuccessReqs, 64); - VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsert, pLoad->numOfBatchInsertReqs, 64); - VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsertSuccess, pLoad->numOfBatchInsertSuccessReqs, 64); - } - return 0; } +/** + * @brief Reset the statistics value by monitor interval + * + * @param pVnode + * @param pLoad + */ +void vnodeResetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { + VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsert, pLoad->numOfInsertReqs, 64); + VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsertSuccess, pLoad->numOfInsertSuccessReqs, 64); + VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsert, pLoad->numOfBatchInsertReqs, 64); + VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsertSuccess, pLoad->numOfBatchInsertSuccessReqs, 64); +} + void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) { if (dbname) { *dbname = pVnode->config.dbname; From 3734dd32b07a64eb4e8d7416a68f3a6abddab37a Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 11 Oct 2022 18:32:02 +0800 Subject: [PATCH 6/6] chore: code optimization for insert_req statistics --- source/dnode/vnode/src/vnd/vnodeQuery.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index d24439c850..030ba65442 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -15,12 +15,10 @@ #include "vnd.h" -#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType) \ - do { \ - if ((oVal) != atomic_val_compare_exchange_##vType(&(pVar), (oVal), 0)) { \ - int##vType##_t newVal = atomic_sub_fetch_##vType(&(pVar), (oVal)); \ - ASSERT(newVal >= 0); \ - } \ +#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType) \ + do { \ + int##vType##_t newVal = atomic_sub_fetch_##vType(&(pVar), (oVal)); \ + ASSERT(newVal >= 0); \ } while (0) int vnodeQueryOpen(SVnode *pVnode) { @@ -401,6 +399,8 @@ void vnodeResetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsertSuccess, pLoad->numOfInsertSuccessReqs, 64); VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsert, pLoad->numOfBatchInsertReqs, 64); VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsertSuccess, pLoad->numOfBatchInsertSuccessReqs, 64); + + } void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) {