config thread num in snode
This commit is contained in:
parent
6dd6a00728
commit
1844645403
|
@ -70,6 +70,8 @@ extern int32_t tsNumOfVnodeSyncThreads;
|
||||||
extern int32_t tsNumOfVnodeMergeThreads;
|
extern int32_t tsNumOfVnodeMergeThreads;
|
||||||
extern int32_t tsNumOfQnodeQueryThreads;
|
extern int32_t tsNumOfQnodeQueryThreads;
|
||||||
extern int32_t tsNumOfQnodeFetchThreads;
|
extern int32_t tsNumOfQnodeFetchThreads;
|
||||||
|
extern int32_t tsNumOfSnodeSharedThreads;
|
||||||
|
extern int32_t tsNumOfSnodeUniqueThreads;
|
||||||
|
|
||||||
// monitor
|
// monitor
|
||||||
extern bool tsEnableMonitor;
|
extern bool tsEnableMonitor;
|
||||||
|
|
|
@ -472,9 +472,6 @@ typedef struct {
|
||||||
|
|
||||||
#define TMQ_SEPARATOR ':'
|
#define TMQ_SEPARATOR ':'
|
||||||
|
|
||||||
#define SND_UNIQUE_THREAD_NUM 2
|
|
||||||
#define SND_SHARED_THREAD_NUM 2
|
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
SND_WORKER_TYPE__SHARED = 1,
|
SND_WORKER_TYPE__SHARED = 1,
|
||||||
SND_WORKER_TYPE__UNIQUE,
|
SND_WORKER_TYPE__UNIQUE,
|
||||||
|
|
|
@ -64,6 +64,8 @@ int32_t tsNumOfVnodeSyncThreads = 2;
|
||||||
int32_t tsNumOfVnodeMergeThreads = 2;
|
int32_t tsNumOfVnodeMergeThreads = 2;
|
||||||
int32_t tsNumOfQnodeQueryThreads = 2;
|
int32_t tsNumOfQnodeQueryThreads = 2;
|
||||||
int32_t tsNumOfQnodeFetchThreads = 2;
|
int32_t tsNumOfQnodeFetchThreads = 2;
|
||||||
|
int32_t tsNumOfSnodeSharedThreads = 2;
|
||||||
|
int32_t tsNumOfSnodeUniqueThreads = 2;
|
||||||
|
|
||||||
// monitor
|
// monitor
|
||||||
bool tsEnableMonitor = true;
|
bool tsEnableMonitor = true;
|
||||||
|
@ -419,6 +421,14 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
tsNumOfQnodeFetchThreads = TRANGE(tsNumOfQnodeFetchThreads, 2, 4);
|
tsNumOfQnodeFetchThreads = TRANGE(tsNumOfQnodeFetchThreads, 2, 4);
|
||||||
if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
|
tsNumOfSnodeSharedThreads = tsNumOfCores / 4;
|
||||||
|
tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4);
|
||||||
|
if (cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeSharedThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
|
tsNumOfSnodeUniqueThreads = tsNumOfCores / 4;
|
||||||
|
tsNumOfSnodeUniqueThreads = TRANGE(tsNumOfSnodeUniqueThreads, 2, 4);
|
||||||
|
if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeUniqueThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
|
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1;
|
||||||
if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1;
|
if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1;
|
||||||
|
@ -552,6 +562,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
tsNumOfVnodeMergeThreads = cfgGetItem(pCfg, "numOfVnodeMergeThreads")->i32;
|
tsNumOfVnodeMergeThreads = cfgGetItem(pCfg, "numOfVnodeMergeThreads")->i32;
|
||||||
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
|
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
|
||||||
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
|
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
|
||||||
|
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
|
||||||
|
tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
|
||||||
|
|
||||||
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
|
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
|
||||||
tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32;
|
tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32;
|
||||||
|
|
|
@ -50,7 +50,7 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < SND_UNIQUE_THREAD_NUM; i++) {
|
for (int32_t i = 0; i < tsNumOfSnodeUniqueThreads; i++) {
|
||||||
SMultiWorker *pUniqueWorker = taosMemoryMalloc(sizeof(SMultiWorker));
|
SMultiWorker *pUniqueWorker = taosMemoryMalloc(sizeof(SMultiWorker));
|
||||||
if (pUniqueWorker == NULL) {
|
if (pUniqueWorker == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -69,8 +69,8 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SSingleWorkerCfg cfg = {.min = SND_SHARED_THREAD_NUM,
|
SSingleWorkerCfg cfg = {.min = tsNumOfSnodeSharedThreads,
|
||||||
.max = SND_SHARED_THREAD_NUM,
|
.max = tsNumOfSnodeSharedThreads,
|
||||||
.name = "snode-shared",
|
.name = "snode-shared",
|
||||||
.fp = (FItem)smProcessSharedQueue,
|
.fp = (FItem)smProcessSharedQueue,
|
||||||
.param = pMgmt};
|
.param = pMgmt};
|
||||||
|
@ -97,7 +97,7 @@ void smStopWorker(SSnodeMgmt *pMgmt) {
|
||||||
static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) {
|
static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) {
|
||||||
SMsgHead *pHead = pMsg->pCont;
|
SMsgHead *pHead = pMsg->pCont;
|
||||||
pHead->vgId = htonl(pHead->vgId);
|
pHead->vgId = htonl(pHead->vgId);
|
||||||
return pHead->vgId % SND_UNIQUE_THREAD_NUM;
|
return pHead->vgId % tsNumOfSnodeUniqueThreads;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) {
|
static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) {
|
||||||
|
|
Loading…
Reference in New Issue