adjust vnode threads
This commit is contained in:
parent
e4652d5a7c
commit
0a5cfe754b
|
@ -41,7 +41,6 @@ extern int32_t tsMaxConnections;
|
||||||
extern int32_t tsMaxShellConns;
|
extern int32_t tsMaxShellConns;
|
||||||
extern int32_t tsShellActivityTimer;
|
extern int32_t tsShellActivityTimer;
|
||||||
extern int32_t tsMaxTmrCtrl;
|
extern int32_t tsMaxTmrCtrl;
|
||||||
extern float tsRatioOfQueryCores;
|
|
||||||
extern int32_t tsCompressMsgSize;
|
extern int32_t tsCompressMsgSize;
|
||||||
extern int32_t tsCompressColData;
|
extern int32_t tsCompressColData;
|
||||||
extern int32_t tsMaxNumOfDistinctResults;
|
extern int32_t tsMaxNumOfDistinctResults;
|
||||||
|
@ -62,6 +61,13 @@ extern int32_t tsBnodeShmSize;
|
||||||
extern int32_t tsNumOfRpcThreads;
|
extern int32_t tsNumOfRpcThreads;
|
||||||
extern int32_t tsNumOfCommitThreads;
|
extern int32_t tsNumOfCommitThreads;
|
||||||
extern int32_t tsNumOfTaskQueueThreads;
|
extern int32_t tsNumOfTaskQueueThreads;
|
||||||
|
extern int32_t tsNumOfMnodeQueryThreads;
|
||||||
|
extern int32_t tsNumOfMnodeReadThreads;
|
||||||
|
extern int32_t tsNumOfVnodeQueryThreads;
|
||||||
|
extern int32_t tsNumOfVnodeFetchThreads;
|
||||||
|
extern int32_t tsNumOfVnodeWriteThreads;
|
||||||
|
extern int32_t tsNumOfVnodeSyncThreads;
|
||||||
|
extern int32_t tsNumOfVnodeMergeThreads;
|
||||||
|
|
||||||
// monitor
|
// monitor
|
||||||
extern bool tsEnableMonitor;
|
extern bool tsEnableMonitor;
|
||||||
|
|
|
@ -39,7 +39,6 @@ bool tsRpcForceTcp = true; // disable this, means query, show command use ud
|
||||||
int32_t tsMaxShellConns = 50000;
|
int32_t tsMaxShellConns = 50000;
|
||||||
int32_t tsMaxConnections = 50000;
|
int32_t tsMaxConnections = 50000;
|
||||||
int32_t tsShellActivityTimer = 3; // second
|
int32_t tsShellActivityTimer = 3; // second
|
||||||
float tsRatioOfQueryCores = 1.0f;
|
|
||||||
int32_t tsMaxBinaryDisplayWidth = 30;
|
int32_t tsMaxBinaryDisplayWidth = 30;
|
||||||
bool tsEnableSlaveQuery = true;
|
bool tsEnableSlaveQuery = true;
|
||||||
bool tsPrintAuth = false;
|
bool tsPrintAuth = false;
|
||||||
|
@ -56,6 +55,13 @@ int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4;
|
||||||
int32_t tsNumOfRpcThreads = 1;
|
int32_t tsNumOfRpcThreads = 1;
|
||||||
int32_t tsNumOfCommitThreads = 2;
|
int32_t tsNumOfCommitThreads = 2;
|
||||||
int32_t tsNumOfTaskQueueThreads = 1;
|
int32_t tsNumOfTaskQueueThreads = 1;
|
||||||
|
int32_t tsNumOfMnodeQueryThreads = 1;
|
||||||
|
int32_t tsNumOfMnodeReadThreads = 1;
|
||||||
|
int32_t tsNumOfVnodeQueryThreads = 2;
|
||||||
|
int32_t tsNumOfVnodeFetchThreads = 2;
|
||||||
|
int32_t tsNumOfVnodeWriteThreads = 2;
|
||||||
|
int32_t tsNumOfVnodeSyncThreads = 2;
|
||||||
|
int32_t tsNumOfVnodeMergeThreads = 2;
|
||||||
|
|
||||||
// monitor
|
// monitor
|
||||||
bool tsEnableMonitor = true;
|
bool tsEnableMonitor = true;
|
||||||
|
@ -343,7 +349,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
if (cfgAddInt32(pCfg, "supportVnodes", 256, 0, 65536, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "supportVnodes", 256, 0, 65536, 0) != 0) return -1;
|
||||||
if (cfgAddDir(pCfg, "dataDir", tsDataDir, 0) != 0) return -1;
|
if (cfgAddDir(pCfg, "dataDir", tsDataDir, 0) != 0) return -1;
|
||||||
if (cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, 0) != 0) return -1;
|
if (cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, 0) != 0) return -1;
|
||||||
if (cfgAddFloat(pCfg, "ratioOfQueryCores", tsRatioOfQueryCores, 0, 2, 0) != 0) return -1;
|
|
||||||
if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000, 0) != 0) return -1;
|
||||||
if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelemetryReporting, 0) != 0) return -1;
|
if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelemetryReporting, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "maxConnections", tsMaxConnections, 1, 100000, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "maxConnections", tsMaxConnections, 1, 100000, 0) != 0) return -1;
|
||||||
|
@ -372,9 +377,37 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, 4);
|
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, 4);
|
||||||
if (cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
tsNumOfCommitThreads = tsNumOfCommitThreads / 2;
|
tsNumOfCommitThreads = tsNumOfCores / 2;
|
||||||
tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4);
|
tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4);
|
||||||
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 100, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
|
tsNumOfMnodeQueryThreads = tsNumOfCores / 8;
|
||||||
|
tsNumOfMnodeQueryThreads = TRANGE(tsNumOfMnodeQueryThreads, 1, 4);
|
||||||
|
if (cfgAddInt32(pCfg, "numOfMnodeQueryThreads", tsNumOfMnodeQueryThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
|
tsNumOfMnodeReadThreads = tsNumOfCores / 8;
|
||||||
|
tsNumOfMnodeReadThreads = TRANGE(tsNumOfMnodeReadThreads, 1, 4);
|
||||||
|
if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
|
tsNumOfVnodeQueryThreads = tsNumOfCores / 2;
|
||||||
|
tsNumOfVnodeQueryThreads = TMIN(tsNumOfVnodeQueryThreads, 1);
|
||||||
|
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
|
tsNumOfVnodeFetchThreads = tsNumOfCores / 2;
|
||||||
|
tsNumOfVnodeFetchThreads = TRANGE(tsNumOfVnodeFetchThreads, 2, 4);
|
||||||
|
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
|
tsNumOfVnodeWriteThreads = tsNumOfCores;
|
||||||
|
tsNumOfVnodeWriteThreads = TMIN(tsNumOfVnodeWriteThreads, 1);
|
||||||
|
if (cfgAddInt32(pCfg, "numOfVnodeWriteThreads", tsNumOfVnodeWriteThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
|
tsNumOfVnodeSyncThreads = tsNumOfCores / 2;
|
||||||
|
tsNumOfVnodeSyncThreads = TMIN(tsNumOfVnodeSyncThreads, 1);
|
||||||
|
if (cfgAddInt32(pCfg, "numOfVnodeSyncThreads", tsNumOfVnodeSyncThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
|
tsNumOfVnodeMergeThreads = tsNumOfCores / 8;
|
||||||
|
tsNumOfVnodeMergeThreads = TRANGE(tsNumOfVnodeMergeThreads, 1, 1);
|
||||||
|
if (cfgAddInt32(pCfg, "numOfVnodeMergeThreads", tsNumOfVnodeMergeThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
|
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1;
|
||||||
|
@ -474,7 +507,6 @@ static void taosSetSystemCfg(SConfig *pCfg) {
|
||||||
|
|
||||||
static int32_t taosSetServerCfg(SConfig *pCfg) {
|
static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval;
|
tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval;
|
||||||
tsRatioOfQueryCores = cfgGetItem(pCfg, "ratioOfQueryCores")->fval;
|
|
||||||
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
|
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
|
||||||
tsEnableTelemetryReporting = cfgGetItem(pCfg, "telemetryReporting")->bval;
|
tsEnableTelemetryReporting = cfgGetItem(pCfg, "telemetryReporting")->bval;
|
||||||
tsMaxConnections = cfgGetItem(pCfg, "maxConnections")->i32;
|
tsMaxConnections = cfgGetItem(pCfg, "maxConnections")->i32;
|
||||||
|
@ -501,6 +533,13 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
|
|
||||||
tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32;
|
tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32;
|
||||||
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
|
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
|
||||||
|
tsNumOfMnodeQueryThreads = cfgGetItem(pCfg, "numOfMnodeQueryThreads")->i32;
|
||||||
|
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
|
||||||
|
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
|
||||||
|
tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
|
||||||
|
tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32;
|
||||||
|
tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32;
|
||||||
|
tsNumOfVnodeMergeThreads = cfgGetItem(pCfg, "numOfVnodeMergeThreads")->i32;
|
||||||
|
|
||||||
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
|
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
|
||||||
tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32;
|
tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32;
|
||||||
|
|
|
@ -129,25 +129,33 @@ int32_t mmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
||||||
SSingleWorkerCfg qCfg = {.min = 0, .max = 1, .name = "mnode-query", .fp = (FItem)mmProcessQueryQueue, .param = pMgmt};
|
SSingleWorkerCfg qCfg = {.min = tsNumOfMnodeQueryThreads,
|
||||||
|
.max = tsNumOfMnodeQueryThreads,
|
||||||
|
.name = "mnode-query",
|
||||||
|
.fp = (FItem)mmProcessQueryQueue,
|
||||||
|
.param = pMgmt};
|
||||||
if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) {
|
if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) {
|
||||||
dError("failed to start mnode-query worker since %s", terrstr());
|
dError("failed to start mnode-query worker since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSingleWorkerCfg rCfg = {.min = 0, .max = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt};
|
SSingleWorkerCfg rCfg = {.min = tsNumOfMnodeReadThreads,
|
||||||
|
.max = tsNumOfMnodeReadThreads,
|
||||||
|
.name = "mnode-read",
|
||||||
|
.fp = (FItem)mmProcessQueue,
|
||||||
|
.param = pMgmt};
|
||||||
if (tSingleWorkerInit(&pMgmt->readWorker, &rCfg) != 0) {
|
if (tSingleWorkerInit(&pMgmt->readWorker, &rCfg) != 0) {
|
||||||
dError("failed to start mnode-read worker since %s", terrstr());
|
dError("failed to start mnode-read worker since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSingleWorkerCfg wCfg = {.min = 0, .max = 1, .name = "mnode-write", .fp = (FItem)mmProcessQueue, .param = pMgmt};
|
SSingleWorkerCfg wCfg = {.min = 1, .max = 1, .name = "mnode-write", .fp = (FItem)mmProcessQueue, .param = pMgmt};
|
||||||
if (tSingleWorkerInit(&pMgmt->writeWorker, &wCfg) != 0) {
|
if (tSingleWorkerInit(&pMgmt->writeWorker, &wCfg) != 0) {
|
||||||
dError("failed to start mnode-write worker since %s", terrstr());
|
dError("failed to start mnode-write worker since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSingleWorkerCfg sCfg = {.min = 0, .max = 1, .name = "mnode-sync", .fp = (FItem)mmProcessQueue, .param = pMgmt};
|
SSingleWorkerCfg sCfg = {.min = 1, .max = 1, .name = "mnode-sync", .fp = (FItem)mmProcessQueue, .param = pMgmt};
|
||||||
if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) {
|
if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) {
|
||||||
dError("failed to start mnode sync-worker since %s", terrstr());
|
dError("failed to start mnode sync-worker since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -108,7 +108,7 @@ int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
|
||||||
int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
|
int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
|
||||||
int32_t maxFetchThreads = 4;
|
int32_t maxFetchThreads = 4;
|
||||||
int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores);
|
int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores);
|
||||||
int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1);
|
int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * 1), 1);
|
||||||
int32_t maxQueryThreads = minQueryThreads;
|
int32_t maxQueryThreads = minQueryThreads;
|
||||||
|
|
||||||
SSingleWorkerCfg queryCfg = {.min = minQueryThreads,
|
SSingleWorkerCfg queryCfg = {.min = minQueryThreads,
|
||||||
|
|
|
@ -379,39 +379,31 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
|
int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
|
||||||
int32_t maxFetchThreads = 4;
|
|
||||||
int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores);
|
|
||||||
int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1);
|
|
||||||
int32_t maxQueryThreads = minQueryThreads;
|
|
||||||
int32_t maxWriteThreads = TMAX(tsNumOfCores, 1);
|
|
||||||
int32_t maxSyncThreads = TMAX(tsNumOfCores / 2, 1);
|
|
||||||
int32_t maxMergeThreads = 1;
|
|
||||||
|
|
||||||
SQWorkerPool *pQPool = &pMgmt->queryPool;
|
SQWorkerPool *pQPool = &pMgmt->queryPool;
|
||||||
pQPool->name = "vnode-query";
|
pQPool->name = "vnode-query";
|
||||||
pQPool->min = minQueryThreads;
|
pQPool->min = tsNumOfVnodeQueryThreads;
|
||||||
pQPool->max = maxQueryThreads;
|
pQPool->max = tsNumOfVnodeQueryThreads;
|
||||||
if (tQWorkerInit(pQPool) != 0) return -1;
|
if (tQWorkerInit(pQPool) != 0) return -1;
|
||||||
|
|
||||||
SQWorkerPool *pFPool = &pMgmt->fetchPool;
|
SQWorkerPool *pFPool = &pMgmt->fetchPool;
|
||||||
pFPool->name = "vnode-fetch";
|
pFPool->name = "vnode-fetch";
|
||||||
pFPool->min = minFetchThreads;
|
pFPool->min = tsNumOfVnodeFetchThreads;
|
||||||
pFPool->max = maxFetchThreads;
|
pFPool->max = tsNumOfVnodeFetchThreads;
|
||||||
if (tQWorkerInit(pFPool) != 0) return -1;
|
if (tQWorkerInit(pFPool) != 0) return -1;
|
||||||
|
|
||||||
SWWorkerPool *pWPool = &pMgmt->writePool;
|
SWWorkerPool *pWPool = &pMgmt->writePool;
|
||||||
pWPool->name = "vnode-write";
|
pWPool->name = "vnode-write";
|
||||||
pWPool->max = maxWriteThreads;
|
pWPool->max = tsNumOfVnodeWriteThreads;
|
||||||
if (tWWorkerInit(pWPool) != 0) return -1;
|
if (tWWorkerInit(pWPool) != 0) return -1;
|
||||||
|
|
||||||
pWPool = &pMgmt->syncPool;
|
pWPool = &pMgmt->syncPool;
|
||||||
pWPool->name = "vnode-sync";
|
pWPool->name = "vnode-sync";
|
||||||
pWPool->max = maxSyncThreads;
|
pWPool->max = tsNumOfVnodeSyncThreads;
|
||||||
if (tWWorkerInit(pWPool) != 0) return -1;
|
if (tWWorkerInit(pWPool) != 0) return -1;
|
||||||
|
|
||||||
pWPool = &pMgmt->mergePool;
|
pWPool = &pMgmt->mergePool;
|
||||||
pWPool->name = "vnode-merge";
|
pWPool->name = "vnode-merge";
|
||||||
pWPool->max = maxMergeThreads;
|
pWPool->max = tsNumOfVnodeMergeThreads;
|
||||||
if (tWWorkerInit(pWPool) != 0) return -1;
|
if (tWWorkerInit(pWPool) != 0) return -1;
|
||||||
|
|
||||||
SSingleWorkerCfg cfg = {.min = 1, .max = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt};
|
SSingleWorkerCfg cfg = {.min = 1, .max = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt};
|
||||||
|
|
Loading…
Reference in New Issue