diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 0633bb8201..60be1ca74d 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -41,7 +41,6 @@ extern int32_t tsMaxConnections; extern int32_t tsMaxShellConns; extern int32_t tsShellActivityTimer; extern int32_t tsMaxTmrCtrl; -extern float tsRatioOfQueryCores; extern int32_t tsCompressMsgSize; extern int32_t tsCompressColData; extern int32_t tsMaxNumOfDistinctResults; @@ -62,6 +61,13 @@ extern int32_t tsBnodeShmSize; extern int32_t tsNumOfRpcThreads; extern int32_t tsNumOfCommitThreads; extern int32_t tsNumOfTaskQueueThreads; +extern int32_t tsNumOfMnodeQueryThreads; +extern int32_t tsNumOfMnodeReadThreads; +extern int32_t tsNumOfVnodeQueryThreads; +extern int32_t tsNumOfVnodeFetchThreads; +extern int32_t tsNumOfVnodeWriteThreads; +extern int32_t tsNumOfVnodeSyncThreads; +extern int32_t tsNumOfVnodeMergeThreads; // monitor extern bool tsEnableMonitor; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index af30acf141..73086045bb 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -39,7 +39,6 @@ bool tsRpcForceTcp = true; // disable this, means query, show command use ud int32_t tsMaxShellConns = 50000; int32_t tsMaxConnections = 50000; int32_t tsShellActivityTimer = 3; // second -float tsRatioOfQueryCores = 1.0f; int32_t tsMaxBinaryDisplayWidth = 30; bool tsEnableSlaveQuery = true; bool tsPrintAuth = false; @@ -56,6 +55,13 @@ int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4; int32_t tsNumOfRpcThreads = 1; int32_t tsNumOfCommitThreads = 2; int32_t tsNumOfTaskQueueThreads = 1; +int32_t tsNumOfMnodeQueryThreads = 1; +int32_t tsNumOfMnodeReadThreads = 1; +int32_t tsNumOfVnodeQueryThreads = 2; +int32_t tsNumOfVnodeFetchThreads = 2; +int32_t tsNumOfVnodeWriteThreads = 2; +int32_t tsNumOfVnodeSyncThreads = 2; +int32_t tsNumOfVnodeMergeThreads = 2; // monitor bool tsEnableMonitor = true; @@ -343,7 +349,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "supportVnodes", 256, 0, 65536, 0) != 0) return -1; if (cfgAddDir(pCfg, "dataDir", tsDataDir, 0) != 0) return -1; if (cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, 0) != 0) return -1; - if (cfgAddFloat(pCfg, "ratioOfQueryCores", tsRatioOfQueryCores, 0, 2, 0) != 0) return -1; if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000, 0) != 0) return -1; if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelemetryReporting, 0) != 0) return -1; if (cfgAddInt32(pCfg, "maxConnections", tsMaxConnections, 1, 100000, 0) != 0) return -1; @@ -372,9 +377,37 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, 4); if (cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, 0) != 0) return -1; - tsNumOfCommitThreads = tsNumOfCommitThreads / 2; + tsNumOfCommitThreads = tsNumOfCores / 2; tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4); - if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 100, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, 0) != 0) return -1; + + tsNumOfMnodeQueryThreads = tsNumOfCores / 8; + tsNumOfMnodeQueryThreads = TRANGE(tsNumOfMnodeQueryThreads, 1, 4); + if (cfgAddInt32(pCfg, "numOfMnodeQueryThreads", tsNumOfMnodeQueryThreads, 1, 1024, 0) != 0) return -1; + + tsNumOfMnodeReadThreads = tsNumOfCores / 8; + tsNumOfMnodeReadThreads = TRANGE(tsNumOfMnodeReadThreads, 1, 4); + if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, 0) != 0) return -1; + + tsNumOfVnodeQueryThreads = tsNumOfCores / 2; + tsNumOfVnodeQueryThreads = TMIN(tsNumOfVnodeQueryThreads, 1); + if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1; + + tsNumOfVnodeFetchThreads = tsNumOfCores / 2; + tsNumOfVnodeFetchThreads = TRANGE(tsNumOfVnodeFetchThreads, 2, 4); + if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1; + + tsNumOfVnodeWriteThreads = tsNumOfCores; + tsNumOfVnodeWriteThreads = TMIN(tsNumOfVnodeWriteThreads, 1); + if (cfgAddInt32(pCfg, "numOfVnodeWriteThreads", tsNumOfVnodeWriteThreads, 1, 1024, 0) != 0) return -1; + + tsNumOfVnodeSyncThreads = tsNumOfCores / 2; + tsNumOfVnodeSyncThreads = TMIN(tsNumOfVnodeSyncThreads, 1); + if (cfgAddInt32(pCfg, "numOfVnodeSyncThreads", tsNumOfVnodeSyncThreads, 1, 1024, 0) != 0) return -1; + + tsNumOfVnodeMergeThreads = tsNumOfCores / 8; + tsNumOfVnodeMergeThreads = TRANGE(tsNumOfVnodeMergeThreads, 1, 1); + if (cfgAddInt32(pCfg, "numOfVnodeMergeThreads", tsNumOfVnodeMergeThreads, 1, 1024, 0) != 0) return -1; if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1; @@ -474,7 +507,6 @@ static void taosSetSystemCfg(SConfig *pCfg) { static int32_t taosSetServerCfg(SConfig *pCfg) { tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval; - tsRatioOfQueryCores = cfgGetItem(pCfg, "ratioOfQueryCores")->fval; tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32; tsEnableTelemetryReporting = cfgGetItem(pCfg, "telemetryReporting")->bval; tsMaxConnections = cfgGetItem(pCfg, "maxConnections")->i32; @@ -501,6 +533,13 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32; tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32; + tsNumOfMnodeQueryThreads = cfgGetItem(pCfg, "numOfMnodeQueryThreads")->i32; + tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32; + tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32; + tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32; + tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32; + tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32; + tsNumOfVnodeMergeThreads = cfgGetItem(pCfg, "numOfVnodeMergeThreads")->i32; tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32; diff --git a/source/dnode/mgmt/mm/mmWorker.c b/source/dnode/mgmt/mm/mmWorker.c index cde4eb853a..735ef53b37 100644 --- a/source/dnode/mgmt/mm/mmWorker.c +++ b/source/dnode/mgmt/mm/mmWorker.c @@ -129,25 +129,33 @@ int32_t mmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { } int32_t mmStartWorker(SMnodeMgmt *pMgmt) { - SSingleWorkerCfg qCfg = {.min = 0, .max = 1, .name = "mnode-query", .fp = (FItem)mmProcessQueryQueue, .param = pMgmt}; + SSingleWorkerCfg qCfg = {.min = tsNumOfMnodeQueryThreads, + .max = tsNumOfMnodeQueryThreads, + .name = "mnode-query", + .fp = (FItem)mmProcessQueryQueue, + .param = pMgmt}; if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) { dError("failed to start mnode-query worker since %s", terrstr()); return -1; } - SSingleWorkerCfg rCfg = {.min = 0, .max = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt}; + SSingleWorkerCfg rCfg = {.min = tsNumOfMnodeReadThreads, + .max = tsNumOfMnodeReadThreads, + .name = "mnode-read", + .fp = (FItem)mmProcessQueue, + .param = pMgmt}; if (tSingleWorkerInit(&pMgmt->readWorker, &rCfg) != 0) { dError("failed to start mnode-read worker since %s", terrstr()); return -1; } - SSingleWorkerCfg wCfg = {.min = 0, .max = 1, .name = "mnode-write", .fp = (FItem)mmProcessQueue, .param = pMgmt}; + SSingleWorkerCfg wCfg = {.min = 1, .max = 1, .name = "mnode-write", .fp = (FItem)mmProcessQueue, .param = pMgmt}; if (tSingleWorkerInit(&pMgmt->writeWorker, &wCfg) != 0) { dError("failed to start mnode-write worker since %s", terrstr()); return -1; } - SSingleWorkerCfg sCfg = {.min = 0, .max = 1, .name = "mnode-sync", .fp = (FItem)mmProcessQueue, .param = pMgmt}; + SSingleWorkerCfg sCfg = {.min = 1, .max = 1, .name = "mnode-sync", .fp = (FItem)mmProcessQueue, .param = pMgmt}; if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) { dError("failed to start mnode sync-worker since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/qm/qmWorker.c b/source/dnode/mgmt/qm/qmWorker.c index 5ceb9dbf88..c9090337ab 100644 --- a/source/dnode/mgmt/qm/qmWorker.c +++ b/source/dnode/mgmt/qm/qmWorker.c @@ -108,7 +108,7 @@ int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { int32_t qmStartWorker(SQnodeMgmt *pMgmt) { int32_t maxFetchThreads = 4; int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores); - int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1); + int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * 1), 1); int32_t maxQueryThreads = minQueryThreads; SSingleWorkerCfg queryCfg = {.min = minQueryThreads, diff --git a/source/dnode/mgmt/vm/vmWorker.c b/source/dnode/mgmt/vm/vmWorker.c index c228e6e7dd..ed1a4ca2f4 100644 --- a/source/dnode/mgmt/vm/vmWorker.c +++ b/source/dnode/mgmt/vm/vmWorker.c @@ -379,39 +379,31 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { } int32_t vmStartWorker(SVnodesMgmt *pMgmt) { - int32_t maxFetchThreads = 4; - int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores); - int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1); - int32_t maxQueryThreads = minQueryThreads; - int32_t maxWriteThreads = TMAX(tsNumOfCores, 1); - int32_t maxSyncThreads = TMAX(tsNumOfCores / 2, 1); - int32_t maxMergeThreads = 1; - SQWorkerPool *pQPool = &pMgmt->queryPool; pQPool->name = "vnode-query"; - pQPool->min = minQueryThreads; - pQPool->max = maxQueryThreads; + pQPool->min = tsNumOfVnodeQueryThreads; + pQPool->max = tsNumOfVnodeQueryThreads; if (tQWorkerInit(pQPool) != 0) return -1; SQWorkerPool *pFPool = &pMgmt->fetchPool; pFPool->name = "vnode-fetch"; - pFPool->min = minFetchThreads; - pFPool->max = maxFetchThreads; + pFPool->min = tsNumOfVnodeFetchThreads; + pFPool->max = tsNumOfVnodeFetchThreads; if (tQWorkerInit(pFPool) != 0) return -1; SWWorkerPool *pWPool = &pMgmt->writePool; pWPool->name = "vnode-write"; - pWPool->max = maxWriteThreads; + pWPool->max = tsNumOfVnodeWriteThreads; if (tWWorkerInit(pWPool) != 0) return -1; pWPool = &pMgmt->syncPool; pWPool->name = "vnode-sync"; - pWPool->max = maxSyncThreads; + pWPool->max = tsNumOfVnodeSyncThreads; if (tWWorkerInit(pWPool) != 0) return -1; pWPool = &pMgmt->mergePool; pWPool->name = "vnode-merge"; - pWPool->max = maxMergeThreads; + pWPool->max = tsNumOfVnodeMergeThreads; if (tWWorkerInit(pWPool) != 0) return -1; SSingleWorkerCfg cfg = {.min = 1, .max = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt};