fix: support statistics of insert_req
This commit is contained in:
parent
452f224727
commit
fe115ab02e
|
@ -78,6 +78,15 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
|
||||||
pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
|
pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
|
||||||
pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
|
pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
|
||||||
|
|
||||||
|
printf("%s:%d: Info: nInsert:%" PRIi64 ", nInsertSuccess:%" PRIi64 ", nBatch:%" PRIi64 ", nBatchSuccess:%" PRIi64
|
||||||
|
"\n",
|
||||||
|
__func__, __LINE__, pInfo->vstat.numOfInsertReqs, pInfo->vstat.numOfInsertSuccessReqs,
|
||||||
|
pInfo->vstat.numOfBatchInsertReqs, pInfo->vstat.numOfBatchInsertSuccessReqs);
|
||||||
|
printf("%s:%d: Mgmt: nInsert:%" PRIi64 ", nInsertSuccess:%" PRIi64 ", nBatch:%" PRIi64 ", nBatchSuccess:%" PRIi64
|
||||||
|
"\n",
|
||||||
|
__func__, __LINE__, pMgmt->state.numOfInsertReqs, pMgmt->state.numOfInsertSuccessReqs,
|
||||||
|
pMgmt->state.numOfBatchInsertReqs, pMgmt->state.numOfBatchInsertSuccessReqs);
|
||||||
|
|
||||||
tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs);
|
tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs);
|
||||||
taosArrayDestroy(pVloads);
|
taosArrayDestroy(pVloads);
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,6 +56,7 @@ typedef struct SSma SSma;
|
||||||
typedef struct STsdb STsdb;
|
typedef struct STsdb STsdb;
|
||||||
typedef struct STQ STQ;
|
typedef struct STQ STQ;
|
||||||
typedef struct SVState SVState;
|
typedef struct SVState SVState;
|
||||||
|
typedef struct SVStatis SVStatis;
|
||||||
typedef struct SVBufPool SVBufPool;
|
typedef struct SVBufPool SVBufPool;
|
||||||
typedef struct SQWorker SQHandle;
|
typedef struct SQWorker SQHandle;
|
||||||
typedef struct STsdbKeepCfg STsdbKeepCfg;
|
typedef struct STsdbKeepCfg STsdbKeepCfg;
|
||||||
|
@ -284,9 +285,17 @@ struct SVState {
|
||||||
int64_t commitTerm;
|
int64_t commitTerm;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct SVStatis {
|
||||||
|
int64_t nInsert;
|
||||||
|
int64_t nInsertSuccess;
|
||||||
|
int64_t nBatchInsert;
|
||||||
|
int64_t nBatchInsertSuccess;
|
||||||
|
};
|
||||||
|
|
||||||
struct SVnodeInfo {
|
struct SVnodeInfo {
|
||||||
SVnodeCfg config;
|
SVnodeCfg config;
|
||||||
SVState state;
|
SVState state;
|
||||||
|
SVStatis statis;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
@ -309,6 +318,7 @@ struct SVnode {
|
||||||
char* path;
|
char* path;
|
||||||
SVnodeCfg config;
|
SVnodeCfg config;
|
||||||
SVState state;
|
SVState state;
|
||||||
|
SVStatis statis;
|
||||||
STfs* pTfs;
|
STfs* pTfs;
|
||||||
SMsgCb msgCb;
|
SMsgCb msgCb;
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
|
|
|
@ -227,6 +227,11 @@ int vnodeCommit(SVnode *pVnode) {
|
||||||
info.state.committed = pVnode->state.applied;
|
info.state.committed = pVnode->state.applied;
|
||||||
info.state.commitTerm = pVnode->state.applyTerm;
|
info.state.commitTerm = pVnode->state.applyTerm;
|
||||||
info.state.commitID = pVnode->state.commitID;
|
info.state.commitID = pVnode->state.commitID;
|
||||||
|
info.statis.nInsert = pVnode->statis.nInsert;
|
||||||
|
info.statis.nInsertSuccess = pVnode->statis.nInsertSuccess;
|
||||||
|
info.statis.nBatchInsert = pVnode->statis.nBatchInsert;
|
||||||
|
info.statis.nBatchInsertSuccess = pVnode->statis.nBatchInsertSuccess;
|
||||||
|
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
|
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
|
||||||
if (vnodeSaveInfo(dir, &info) < 0) {
|
if (vnodeSaveInfo(dir, &info) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -352,6 +357,32 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int vnodeEncodeStatis(const void *pObj, SJson *pJson) {
|
||||||
|
const SVStatis *pStatis = (SVStatis *)pObj;
|
||||||
|
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "insert", pStatis->nInsert) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "insert success", pStatis->nInsertSuccess) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "batch insert", pStatis->nBatchInsert) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "batch insert success", pStatis->nBatchInsertSuccess) < 0) return -1;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int vnodeDecodeStatis(const SJson *pJson, void *pObj) {
|
||||||
|
SVStatis *pStatis = (SVStatis *)pObj;
|
||||||
|
|
||||||
|
int32_t code;
|
||||||
|
tjsonGetNumberValue(pJson, "insert", pStatis->nInsert, code);
|
||||||
|
if (code < 0) return -1;
|
||||||
|
tjsonGetNumberValue(pJson, "insert success", pStatis->nInsertSuccess, code);
|
||||||
|
if (code < 0) return -1;
|
||||||
|
tjsonGetNumberValue(pJson, "batch insert", pStatis->nBatchInsert, code);
|
||||||
|
if (code < 0) return -1;
|
||||||
|
tjsonGetNumberValue(pJson, "batch insert success", pStatis->nBatchInsertSuccess, code);
|
||||||
|
if (code < 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) {
|
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) {
|
||||||
SJson *pJson;
|
SJson *pJson;
|
||||||
char *pData;
|
char *pData;
|
||||||
|
@ -371,6 +402,10 @@ static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (tjsonAddObject(pJson, "statis", vnodeEncodeStatis, (void *)&pInfo->statis) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
pData = tjsonToString(pJson);
|
pData = tjsonToString(pJson);
|
||||||
if (pData == NULL) {
|
if (pData == NULL) {
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -402,6 +437,10 @@ static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (tjsonToObject(pJson, "statis", vnodeDecodeStatis, (void *)&pInfo->statis) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
tjsonDelete(pJson);
|
tjsonDelete(pJson);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -85,6 +85,10 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
||||||
pVnode->state.applied = info.state.committed;
|
pVnode->state.applied = info.state.committed;
|
||||||
pVnode->state.commitID = info.state.commitID;
|
pVnode->state.commitID = info.state.commitID;
|
||||||
pVnode->state.commitTerm = info.state.commitTerm;
|
pVnode->state.commitTerm = info.state.commitTerm;
|
||||||
|
pVnode->statis.nInsert = info.statis.nInsert;
|
||||||
|
pVnode->statis.nInsertSuccess = info.statis.nInsertSuccess;
|
||||||
|
pVnode->statis.nBatchInsert = info.statis.nBatchInsert;
|
||||||
|
pVnode->statis.nBatchInsertSuccess = info.statis.nBatchInsertSuccess;
|
||||||
pVnode->pTfs = pTfs;
|
pVnode->pTfs = pTfs;
|
||||||
pVnode->msgCb = msgCb;
|
pVnode->msgCb = msgCb;
|
||||||
taosThreadMutexInit(&pVnode->lock, NULL);
|
taosThreadMutexInit(&pVnode->lock, NULL);
|
||||||
|
|
|
@ -375,10 +375,10 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
|
||||||
pLoad->compStorage = (int64_t)2 * 1073741824;
|
pLoad->compStorage = (int64_t)2 * 1073741824;
|
||||||
pLoad->pointsWritten = 100;
|
pLoad->pointsWritten = 100;
|
||||||
pLoad->numOfSelectReqs = 1;
|
pLoad->numOfSelectReqs = 1;
|
||||||
pLoad->numOfInsertReqs = 3;
|
pLoad->numOfInsertReqs = atomic_load_64(&pVnode->statis.nInsert);
|
||||||
pLoad->numOfInsertSuccessReqs = 2;
|
pLoad->numOfInsertSuccessReqs = atomic_load_64(&pVnode->statis.nInsertSuccess);
|
||||||
pLoad->numOfBatchInsertReqs = 5;
|
pLoad->numOfBatchInsertReqs = atomic_load_64(&pVnode->statis.nBatchInsert);
|
||||||
pLoad->numOfBatchInsertSuccessReqs = 4;
|
pLoad->numOfBatchInsertSuccessReqs = atomic_load_64(&pVnode->statis.nBatchInsertSuccess);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -812,10 +812,10 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
||||||
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
|
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
|
||||||
SSubmitRsp submitRsp = {0};
|
SSubmitRsp submitRsp = {0};
|
||||||
SSubmitMsgIter msgIter = {0};
|
SSubmitMsgIter msgIter = {0};
|
||||||
SSubmitBlk *pBlock;
|
SSubmitBlk *pBlock = NULL;
|
||||||
SVCreateTbReq createTbReq = {0};
|
SVCreateTbReq createTbReq = {0};
|
||||||
SDecoder decoder = {0};
|
SDecoder decoder = {0};
|
||||||
int32_t nRows;
|
int32_t nRows = 0;
|
||||||
int32_t tsize, ret;
|
int32_t tsize, ret;
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
SArray *newTbUids = NULL;
|
SArray *newTbUids = NULL;
|
||||||
|
@ -823,6 +823,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
||||||
|
|
||||||
pRsp->code = 0;
|
pRsp->code = 0;
|
||||||
pSubmitReq->version = version;
|
pSubmitReq->version = version;
|
||||||
|
atomic_fetch_add_64(&pVnode->statis.nBatchInsert, 1);
|
||||||
|
|
||||||
#ifdef TD_DEBUG_PRINT_ROW
|
#ifdef TD_DEBUG_PRINT_ROW
|
||||||
vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
|
vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
|
||||||
|
@ -942,12 +943,16 @@ _exit:
|
||||||
|
|
||||||
taosArrayDestroyEx(submitRsp.pArray, tFreeSSubmitBlkRsp);
|
taosArrayDestroyEx(submitRsp.pArray, tFreeSSubmitBlkRsp);
|
||||||
|
|
||||||
|
atomic_fetch_add_64(&pVnode->statis.nInsert, submitRsp.numOfRows);
|
||||||
|
atomic_fetch_add_64(&pVnode->statis.nInsertSuccess, submitRsp.affectedRows);
|
||||||
|
|
||||||
// TODO: the partial success scenario and the error case
|
// TODO: the partial success scenario and the error case
|
||||||
// => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level
|
// => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level
|
||||||
// 1/level 2.
|
// 1/level 2.
|
||||||
// TODO: refactor
|
// TODO: refactor
|
||||||
if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
|
if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
|
||||||
tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
|
tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
|
||||||
|
atomic_fetch_add_64(&pVnode->statis.nBatchInsertSuccess, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
vDebug("vgId:%d, submit success, index:%" PRId64, pVnode->config.vgId, version);
|
vDebug("vgId:%d, submit success, index:%" PRId64, pVnode->config.vgId, version);
|
||||||
|
|
Loading…
Reference in New Issue