From 18446454037e89e515c946a7370cbfe09501595a Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 7 Apr 2022 15:54:05 +0800 Subject: [PATCH] config thread num in snode --- include/common/tglobal.h | 2 ++ include/util/tdef.h | 3 --- source/common/src/tglobal.c | 12 ++++++++++++ source/dnode/mgmt/sm/smWorker.c | 8 ++++---- 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 990dec09d2..006d5849de 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -70,6 +70,8 @@ extern int32_t tsNumOfVnodeSyncThreads; extern int32_t tsNumOfVnodeMergeThreads; extern int32_t tsNumOfQnodeQueryThreads; extern int32_t tsNumOfQnodeFetchThreads; +extern int32_t tsNumOfSnodeSharedThreads; +extern int32_t tsNumOfSnodeUniqueThreads; // monitor extern bool tsEnableMonitor; diff --git a/include/util/tdef.h b/include/util/tdef.h index f1af9eca1f..2a634e2327 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -472,9 +472,6 @@ typedef struct { #define TMQ_SEPARATOR ':' -#define SND_UNIQUE_THREAD_NUM 2 -#define SND_SHARED_THREAD_NUM 2 - enum { SND_WORKER_TYPE__SHARED = 1, SND_WORKER_TYPE__UNIQUE, diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 7f79be2ab2..4ecacb9d9d 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -64,6 +64,8 @@ int32_t tsNumOfVnodeSyncThreads = 2; int32_t tsNumOfVnodeMergeThreads = 2; int32_t tsNumOfQnodeQueryThreads = 2; int32_t tsNumOfQnodeFetchThreads = 2; +int32_t tsNumOfSnodeSharedThreads = 2; +int32_t tsNumOfSnodeUniqueThreads = 2; // monitor bool tsEnableMonitor = true; @@ -419,6 +421,14 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfQnodeFetchThreads = TRANGE(tsNumOfQnodeFetchThreads, 2, 4); if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1; + tsNumOfSnodeSharedThreads = tsNumOfCores / 4; + tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4); + if (cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeSharedThreads, 1, 1024, 0) != 0) return -1; + + tsNumOfSnodeUniqueThreads = tsNumOfCores / 4; + tsNumOfSnodeUniqueThreads = TRANGE(tsNumOfSnodeUniqueThreads, 2, 4); + if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeUniqueThreads, 1, 1024, 0) != 0) return -1; + if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1; if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1; @@ -552,6 +562,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfVnodeMergeThreads = cfgGetItem(pCfg, "numOfVnodeMergeThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; + tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; + tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32; tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32; diff --git a/source/dnode/mgmt/sm/smWorker.c b/source/dnode/mgmt/sm/smWorker.c index 0326d7dd9f..afa843953b 100644 --- a/source/dnode/mgmt/sm/smWorker.c +++ b/source/dnode/mgmt/sm/smWorker.c @@ -50,7 +50,7 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { return -1; } - for (int32_t i = 0; i < SND_UNIQUE_THREAD_NUM; i++) { + for (int32_t i = 0; i < tsNumOfSnodeUniqueThreads; i++) { SMultiWorker *pUniqueWorker = taosMemoryMalloc(sizeof(SMultiWorker)); if (pUniqueWorker == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -69,8 +69,8 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { } } - SSingleWorkerCfg cfg = {.min = SND_SHARED_THREAD_NUM, - .max = SND_SHARED_THREAD_NUM, + SSingleWorkerCfg cfg = {.min = tsNumOfSnodeSharedThreads, + .max = tsNumOfSnodeSharedThreads, .name = "snode-shared", .fp = (FItem)smProcessSharedQueue, .param = pMgmt}; @@ -97,7 +97,7 @@ void smStopWorker(SSnodeMgmt *pMgmt) { static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) { SMsgHead *pHead = pMsg->pCont; pHead->vgId = htonl(pHead->vgId); - return pHead->vgId % SND_UNIQUE_THREAD_NUM; + return pHead->vgId % tsNumOfSnodeUniqueThreads; } static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) {