fix: support statistics of insert_req
This commit is contained in:
parent
7da3ed2748
commit
02979f081d
|
@ -16,7 +16,7 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "vmInt.h"
|
#include "vmInt.h"
|
||||||
|
|
||||||
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
|
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
|
||||||
pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
|
pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
|
||||||
if (pInfo->pVloads == NULL) return;
|
if (pInfo->pVloads == NULL) return;
|
||||||
|
|
||||||
|
@ -29,7 +29,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
|
||||||
|
|
||||||
SVnodeObj *pVnode = *ppVnode;
|
SVnodeObj *pVnode = *ppVnode;
|
||||||
SVnodeLoad vload = {0};
|
SVnodeLoad vload = {0};
|
||||||
vnodeGetLoad(pVnode->pImpl, &vload);
|
vnodeGetLoad(pVnode->pImpl, &vload, isReset);
|
||||||
taosArrayPush(pInfo->pVloads, &vload);
|
taosArrayPush(pInfo->pVloads, &vload);
|
||||||
pIter = taosHashIterate(pMgmt->hash, pIter);
|
pIter = taosHashIterate(pMgmt->hash, pIter);
|
||||||
}
|
}
|
||||||
|
@ -39,7 +39,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
|
||||||
|
|
||||||
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
|
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
|
||||||
SMonVloadInfo vloads = {0};
|
SMonVloadInfo vloads = {0};
|
||||||
vmGetVnodeLoads(pMgmt, &vloads);
|
vmGetVnodeLoads(pMgmt, &vloads, true);
|
||||||
|
|
||||||
SArray *pVloads = vloads.pVloads;
|
SArray *pVloads = vloads.pVloads;
|
||||||
if (pVloads == NULL) return;
|
if (pVloads == NULL) return;
|
||||||
|
@ -66,10 +66,10 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
|
||||||
pInfo->vstat.totalVnodes = totalVnodes;
|
pInfo->vstat.totalVnodes = totalVnodes;
|
||||||
pInfo->vstat.masterNum = masterNum;
|
pInfo->vstat.masterNum = masterNum;
|
||||||
pInfo->vstat.numOfSelectReqs = numOfSelectReqs - pMgmt->state.numOfSelectReqs;
|
pInfo->vstat.numOfSelectReqs = numOfSelectReqs - pMgmt->state.numOfSelectReqs;
|
||||||
pInfo->vstat.numOfInsertReqs = numOfInsertReqs;
|
pInfo->vstat.numOfInsertReqs = numOfInsertReqs; // delta
|
||||||
pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
|
pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs; // delta
|
||||||
pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;
|
pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs; // delta
|
||||||
pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
|
pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs; // delta
|
||||||
pMgmt->state.totalVnodes = totalVnodes;
|
pMgmt->state.totalVnodes = totalVnodes;
|
||||||
pMgmt->state.masterNum = masterNum;
|
pMgmt->state.masterNum = masterNum;
|
||||||
pMgmt->state.numOfSelectReqs = numOfSelectReqs;
|
pMgmt->state.numOfSelectReqs = numOfSelectReqs;
|
||||||
|
@ -78,15 +78,6 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
|
||||||
pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
|
pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
|
||||||
pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
|
pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
|
||||||
|
|
||||||
printf("%s:%d: Info: nInsert:%" PRIi64 ", nInsertSuccess:%" PRIi64 ", nBatch:%" PRIi64 ", nBatchSuccess:%" PRIi64
|
|
||||||
"\n",
|
|
||||||
__func__, __LINE__, pInfo->vstat.numOfInsertReqs, pInfo->vstat.numOfInsertSuccessReqs,
|
|
||||||
pInfo->vstat.numOfBatchInsertReqs, pInfo->vstat.numOfBatchInsertSuccessReqs);
|
|
||||||
printf("%s:%d: Mgmt: nInsert:%" PRIi64 ", nInsertSuccess:%" PRIi64 ", nBatch:%" PRIi64 ", nBatchSuccess:%" PRIi64
|
|
||||||
"\n",
|
|
||||||
__func__, __LINE__, pMgmt->state.numOfInsertReqs, pMgmt->state.numOfInsertSuccessReqs,
|
|
||||||
pMgmt->state.numOfBatchInsertReqs, pMgmt->state.numOfBatchInsertSuccessReqs);
|
|
||||||
|
|
||||||
tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs);
|
tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs);
|
||||||
taosArrayDestroy(pVloads);
|
taosArrayDestroy(pVloads);
|
||||||
}
|
}
|
||||||
|
@ -118,7 +109,7 @@ int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
SMonVloadInfo vloads = {0};
|
SMonVloadInfo vloads = {0};
|
||||||
vmGetVnodeLoads(pMgmt, &vloads);
|
vmGetVnodeLoads(pMgmt, &vloads, false);
|
||||||
|
|
||||||
int32_t rspLen = tSerializeSMonVloadInfo(NULL, 0, &vloads);
|
int32_t rspLen = tSerializeSMonVloadInfo(NULL, 0, &vloads);
|
||||||
if (rspLen < 0) {
|
if (rspLen < 0) {
|
||||||
|
|
|
@ -35,7 +35,7 @@ void qmGetMonitorInfo(void *pMgmt, SMonQmInfo *pInfo);
|
||||||
void smGetMonitorInfo(void *pMgmt, SMonSmInfo *pInfo);
|
void smGetMonitorInfo(void *pMgmt, SMonSmInfo *pInfo);
|
||||||
void bmGetMonitorInfo(void *pMgmt, SMonBmInfo *pInfo);
|
void bmGetMonitorInfo(void *pMgmt, SMonBmInfo *pInfo);
|
||||||
|
|
||||||
void vmGetVnodeLoads(void *pMgmt, SMonVloadInfo *pInfo);
|
void vmGetVnodeLoads(void *pMgmt, SMonVloadInfo *pInfo, bool isReset);
|
||||||
void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo);
|
void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo);
|
||||||
void qmGetQnodeLoads(void *pMgmt, SQnodeLoad *pInfo);
|
void qmGetQnodeLoads(void *pMgmt, SQnodeLoad *pInfo);
|
||||||
|
|
||||||
|
|
|
@ -152,7 +152,7 @@ void dmGetVnodeLoads(SMonVloadInfo *pInfo) {
|
||||||
if (tsMultiProcess) {
|
if (tsMultiProcess) {
|
||||||
dmSendLocalRecv(pDnode, TDMT_MON_VM_LOAD, tDeserializeSMonVloadInfo, pInfo);
|
dmSendLocalRecv(pDnode, TDMT_MON_VM_LOAD, tDeserializeSMonVloadInfo, pInfo);
|
||||||
} else if (pWrapper->pMgmt != NULL) {
|
} else if (pWrapper->pMgmt != NULL) {
|
||||||
vmGetVnodeLoads(pWrapper->pMgmt, pInfo);
|
vmGetVnodeLoads(pWrapper->pMgmt, pInfo, false);
|
||||||
}
|
}
|
||||||
dmReleaseWrapper(pWrapper);
|
dmReleaseWrapper(pWrapper);
|
||||||
}
|
}
|
||||||
|
|
|
@ -73,7 +73,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num);
|
||||||
int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num);
|
int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num);
|
||||||
int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num);
|
int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num);
|
||||||
|
|
||||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad, bool isReset);
|
||||||
int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
|
int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
|
||||||
|
|
||||||
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
|
|
|
@ -286,10 +286,10 @@ struct SVState {
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SVStatis {
|
struct SVStatis {
|
||||||
int64_t nInsert;
|
int64_t nInsert; // delta
|
||||||
int64_t nInsertSuccess;
|
int64_t nInsertSuccess; // delta
|
||||||
int64_t nBatchInsert;
|
int64_t nBatchInsert; // delta
|
||||||
int64_t nBatchInsertSuccess;
|
int64_t nBatchInsertSuccess; // delta
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SVnodeInfo {
|
struct SVnodeInfo {
|
||||||
|
|
|
@ -15,6 +15,14 @@
|
||||||
|
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
|
#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType) \
|
||||||
|
do { \
|
||||||
|
if ((oVal) != atomic_val_compare_exchange_##vType(&(pVar), (oVal), 0)) { \
|
||||||
|
int##vType##_t newVal = atomic_sub_fetch_##vType(&(pVar), (oVal)); \
|
||||||
|
ASSERT(newVal >= 0); \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
int vnodeQueryOpen(SVnode *pVnode) {
|
int vnodeQueryOpen(SVnode *pVnode) {
|
||||||
return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), (void **)&pVnode->pQuery, &pVnode->msgCb);
|
return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), (void **)&pVnode->pQuery, &pVnode->msgCb);
|
||||||
}
|
}
|
||||||
|
@ -365,37 +373,31 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, type) \
|
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad, bool isReset) {
|
||||||
do { \
|
pLoad->vgId = TD_VID(pVnode);
|
||||||
if (oVal != atomic_val_compare_exchange_##type(&pVar, oVal, 0)) { \
|
pLoad->syncState = syncGetMyRole(pVnode->sync);
|
||||||
int##type##_t tmpVal = atomic_sub_fetch_##type(&pVar, oVal); \
|
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
|
||||||
ASSERT(tmpVal >= 0); \
|
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
|
||||||
} \
|
pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta);
|
||||||
} while (0)
|
pLoad->totalStorage = (int64_t)3 * 1073741824;
|
||||||
|
pLoad->compStorage = (int64_t)2 * 1073741824;
|
||||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
|
pLoad->pointsWritten = 100;
|
||||||
pLoad->vgId = TD_VID(pVnode);
|
pLoad->numOfSelectReqs = 1;
|
||||||
pLoad->syncState = syncGetMyRole(pVnode->sync);
|
pLoad->numOfInsertReqs = atomic_load_64(&pVnode->statis.nInsert);
|
||||||
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
|
pLoad->numOfInsertSuccessReqs = atomic_load_64(&pVnode->statis.nInsertSuccess);
|
||||||
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
|
pLoad->numOfBatchInsertReqs = atomic_load_64(&pVnode->statis.nBatchInsert);
|
||||||
pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta);
|
pLoad->numOfBatchInsertSuccessReqs = atomic_load_64(&pVnode->statis.nBatchInsertSuccess);
|
||||||
pLoad->totalStorage = (int64_t)3 * 1073741824;
|
|
||||||
pLoad->compStorage = (int64_t)2 * 1073741824;
|
|
||||||
pLoad->pointsWritten = 100;
|
|
||||||
pLoad->numOfSelectReqs = 1;
|
|
||||||
pLoad->numOfInsertReqs = atomic_load_64(&pVnode->statis.nInsert);
|
|
||||||
pLoad->numOfInsertSuccessReqs = atomic_load_64(&pVnode->statis.nInsertSuccess);
|
|
||||||
pLoad->numOfBatchInsertReqs = atomic_load_64(&pVnode->statis.nBatchInsert);
|
|
||||||
pLoad->numOfBatchInsertSuccessReqs = atomic_load_64(&pVnode->statis.nBatchInsertSuccess);
|
|
||||||
|
|
||||||
|
if (isReset) {
|
||||||
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsert, pLoad->numOfInsertReqs, 64);
|
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsert, pLoad->numOfInsertReqs, 64);
|
||||||
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsertSuccess, pLoad->numOfInsertSuccessReqs, 64);
|
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsertSuccess, pLoad->numOfInsertSuccessReqs, 64);
|
||||||
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsert, pLoad->numOfBatchInsertReqs, 64);
|
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsert, pLoad->numOfBatchInsertReqs, 64);
|
||||||
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsertSuccess, pLoad->numOfBatchInsertSuccessReqs, 64);
|
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsertSuccess, pLoad->numOfBatchInsertSuccessReqs, 64);
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) {
|
void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) {
|
||||||
if (dbname) {
|
if (dbname) {
|
||||||
*dbname = pVnode->config.dbname;
|
*dbname = pVnode->config.dbname;
|
||||||
|
|
Loading…
Reference in New Issue