From 335d42cf4960737d7983c0ae836ce0085ec9645e Mon Sep 17 00:00:00 2001 From: slguan Date: Thu, 29 Aug 2019 17:55:56 +0800 Subject: [PATCH] fix the issue #443 --- src/system/src/dnodeService.c | 2 +- src/system/src/mgmtDnodeInt.c | 3 ++- src/system/src/vnodeCommit.c | 3 ++- src/system/src/vnodeMeter.c | 6 +++--- src/system/src/vnodeShell.c | 5 +++-- src/system/src/vnodeStream.c | 2 +- src/system/src/vnodeSystem.c | 16 +++++----------- 7 files changed, 17 insertions(+), 20 deletions(-) diff --git a/src/system/src/dnodeService.c b/src/system/src/dnodeService.c index 8249c51faa..17c4c736d1 100644 --- a/src/system/src/dnodeService.c +++ b/src/system/src/dnodeService.c @@ -66,7 +66,7 @@ int main(int argc, char *argv[]) { exit(EXIT_FAILURE); } } else if (strcmp(argv[i], "-V") == 0) { - printf("%s %s\n", version, compatible_version); + printf("version: %s compatible_version: %s\n", version, compatible_version); printf("gitinfo: %s\n", gitinfo); printf("buildinfo: %s\n", buildinfo); return 0; diff --git a/src/system/src/mgmtDnodeInt.c b/src/system/src/mgmtDnodeInt.c index 304d491402..8be91fd519 100644 --- a/src/system/src/mgmtDnodeInt.c +++ b/src/system/src/mgmtDnodeInt.c @@ -29,6 +29,7 @@ char *mgmtBuildCreateMeterIe(STabObj *pMeter, char *pMsg, int vnode); void vnodeProcessMsgFromMgmt(SSchedMsg *smsg); void *rpcQhandle; +extern void *dmQhandle; int mgmtSendMsgToDnode(char *msg) { mTrace("msg:%s is sent to dnode", taosMsg[*msg]); @@ -38,7 +39,7 @@ int mgmtSendMsgToDnode(char *msg) { schedMsg.msg = msg; schedMsg.ahandle = NULL; schedMsg.thandle = NULL; - taosScheduleTask(rpcQhandle, &schedMsg); + taosScheduleTask(dmQhandle, &schedMsg); return 0; } diff --git a/src/system/src/vnodeCommit.c b/src/system/src/vnodeCommit.c index 9982ce45d8..a14e8118c4 100644 --- a/src/system/src/vnodeCommit.c +++ b/src/system/src/vnodeCommit.c @@ -144,6 +144,7 @@ size_t vnodeRestoreDataFromLog(int vnode, char *fileName, uint64_t *firstV) { goto _error; } + TSKEY now = taosGetTimestamp(pVnode->cfg.precision); SCommitHead head; int simpleCheck = 0; while (1) { @@ -180,7 +181,7 @@ size_t vnodeRestoreDataFromLog(int vnode, char *fileName, uint64_t *firstV) { int32_t numOfPoints = 0; (*vnodeProcessAction[head.action])(pObj, cont, head.contLen, TSDB_DATA_SOURCE_LOG, NULL, head.sversion, - &numOfPoints); + &numOfPoints, now); actions++; } else { break; diff --git a/src/system/src/vnodeMeter.c b/src/system/src/vnodeMeter.c index 27e6e74952..f94276290f 100644 --- a/src/system/src/vnodeMeter.c +++ b/src/system/src/vnodeMeter.c @@ -35,7 +35,7 @@ int tsMeterSizeOnFile; void vnodeUpdateMeter(void *param, void *tmdId); void vnodeRecoverMeterObjectFile(int vnode); -int (*vnodeProcessAction[])(SMeterObj *, char *, int, char, void *, int, int *) = {vnodeInsertPoints, +int (*vnodeProcessAction[])(SMeterObj *, char *, int, char, void *, int, int *, TSKEY) = {vnodeInsertPoints, vnodeImportPoints}; void vnodeFreeMeterObj(SMeterObj *pObj) { @@ -506,7 +506,7 @@ int vnodeRemoveMeterObj(int vnode, int sid) { } int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *param, int sversion, - int *numOfInsertPoints) { + int *numOfInsertPoints, TSKEY now) { int expectedLen, i; short numOfPoints; SSubmitMsg *pSubmit = (SSubmitMsg *)cont; @@ -528,7 +528,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi // to guarantee time stamp is the same for all vnodes pData = pSubmit->payLoad; - tsKey = taosGetTimestamp(pVnode->cfg.precision); + tsKey = now; cfile = tsKey/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision]; if (*((TSKEY *)pData) == 0) { for (i = 0; i < numOfPoints; ++i) { diff --git a/src/system/src/vnodeShell.c b/src/system/src/vnodeShell.c index c4aece259e..d193c51ac1 100644 --- a/src/system/src/vnodeShell.c +++ b/src/system/src/vnodeShell.c @@ -484,6 +484,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { int32_t numOfPoints = 0; int32_t numOfTotalPoints = 0; + TSKEY now = taosGetTimestamp(pVnode->cfg.precision); for (int32_t i = 0; i < pSubmit->numOfSid; ++i) { numOfPoints = 0; @@ -523,11 +524,11 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { // meter status is ready for insert/import if (pSubmit->import) { code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj, - sversion, &numOfPoints); + sversion, &numOfPoints, now); vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING); } else { code = vnodeInsertPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL, - sversion, &numOfPoints); + sversion, &numOfPoints, now); vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_INSERT); } diff --git a/src/system/src/vnodeStream.c b/src/system/src/vnodeStream.c index a8dcff231d..2f8cfec1f1 100644 --- a/src/system/src/vnodeStream.c +++ b/src/system/src/vnodeStream.c @@ -57,7 +57,7 @@ void vnodeProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT); if (state == TSDB_METER_STATE_READY) { - vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, &numOfPoints); + vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, &numOfPoints, taosGetTimestamp(vnodeList[pObj->vnode].cfg.precision)); vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT); } else { dError("vid:%d sid:%d id:%s, failed to insert continuous query results, state:%d", pObj->vnode, pObj->sid, diff --git a/src/system/src/vnodeSystem.c b/src/system/src/vnodeSystem.c index 8d3b66e520..e74e172af0 100644 --- a/src/system/src/vnodeSystem.c +++ b/src/system/src/vnodeSystem.c @@ -41,9 +41,11 @@ int vnodeInitSystem() { if (numOfThreads < 1) numOfThreads = 1; queryQhandle = taosInitScheduler(tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode, numOfThreads, "query"); - // numOfThreads = (1.0 - tsRatioOfQueryThreads) * tsNumOfCores * tsNumOfThreadsPerCore / 2.0; - // if (numOfThreads < 1) numOfThreads = 1; - rpcQhandle = taosInitScheduler(tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode, 1, "dnode"); + numOfThreads = (1.0 - tsRatioOfQueryThreads) * tsNumOfCores * tsNumOfThreadsPerCore / 2.0; + if (numOfThreads < 1) numOfThreads = 1; + rpcQhandle = taosInitScheduler(tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode, numOfThreads, "dnode"); + + dmQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt"); vnodeTmrCtrl = taosTmrInit(tsSessionsPerVnode + 1000, 200, 60000, "DND-vnode"); if (vnodeTmrCtrl == NULL) { @@ -70,11 +72,3 @@ int vnodeInitSystem() { return 0; } - -void vnodeInitQHandle() { - // int numOfThreads = (1.0 - tsRatioOfQueryThreads) * tsNumOfCores * tsNumOfThreadsPerCore / 2.0; - // if (numOfThreads < 1) numOfThreads = 1; - rpcQhandle = taosInitScheduler(tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode, 1, "dnode"); - - dmQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt"); -}