commit
7aea48a2a5
|
@ -41,9 +41,6 @@ extern int32_t tsMaxConnections;
|
|||
extern int32_t tsMaxShellConns;
|
||||
extern int32_t tsShellActivityTimer;
|
||||
extern int32_t tsMaxTmrCtrl;
|
||||
extern float tsNumOfThreadsPerCore;
|
||||
extern int32_t tsNumOfCommitThreads;
|
||||
extern float tsRatioOfQueryCores;
|
||||
extern int32_t tsCompressMsgSize;
|
||||
extern int32_t tsCompressColData;
|
||||
extern int32_t tsMaxNumOfDistinctResults;
|
||||
|
@ -60,6 +57,22 @@ extern int32_t tsQnodeShmSize;
|
|||
extern int32_t tsSnodeShmSize;
|
||||
extern int32_t tsBnodeShmSize;
|
||||
|
||||
// queue & threads
|
||||
extern int32_t tsNumOfRpcThreads;
|
||||
extern int32_t tsNumOfCommitThreads;
|
||||
extern int32_t tsNumOfTaskQueueThreads;
|
||||
extern int32_t tsNumOfMnodeQueryThreads;
|
||||
extern int32_t tsNumOfMnodeReadThreads;
|
||||
extern int32_t tsNumOfVnodeQueryThreads;
|
||||
extern int32_t tsNumOfVnodeFetchThreads;
|
||||
extern int32_t tsNumOfVnodeWriteThreads;
|
||||
extern int32_t tsNumOfVnodeSyncThreads;
|
||||
extern int32_t tsNumOfVnodeMergeThreads;
|
||||
extern int32_t tsNumOfQnodeQueryThreads;
|
||||
extern int32_t tsNumOfQnodeFetchThreads;
|
||||
extern int32_t tsNumOfSnodeSharedThreads;
|
||||
extern int32_t tsNumOfSnodeUniqueThreads;
|
||||
|
||||
// monitor
|
||||
extern bool tsEnableMonitor;
|
||||
extern int32_t tsMonitorInterval;
|
||||
|
|
|
@ -56,6 +56,12 @@ extern "C" {
|
|||
__typeof(b) __b = (b); \
|
||||
(__a < __b) ? __a : __b; \
|
||||
})
|
||||
|
||||
#define TRANGE(a, b, c) \
|
||||
({ \
|
||||
a = TMAX(a, b); \
|
||||
a = TMIN(a, c); \
|
||||
})
|
||||
#endif
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -472,9 +472,6 @@ typedef struct {
|
|||
|
||||
#define TMQ_SEPARATOR ':'
|
||||
|
||||
#define SND_UNIQUE_THREAD_NUM 2
|
||||
#define SND_SHARED_THREAD_NUM 2
|
||||
|
||||
enum {
|
||||
SND_WORKER_TYPE__SHARED = 1,
|
||||
SND_WORKER_TYPE__UNIQUE,
|
||||
|
|
|
@ -30,21 +30,18 @@ char tsLocalEp[TSDB_EP_LEN] = {0}; // Local End Point, hostname:port
|
|||
uint16_t tsServerPort = 6030;
|
||||
int32_t tsVersion = 30000000;
|
||||
int32_t tsStatusInterval = 1; // second
|
||||
bool tsEnableTelemetryReporting = 0;
|
||||
bool tsEnableTelemetryReporting = false;
|
||||
|
||||
// common
|
||||
int32_t tsRpcTimer = 300;
|
||||
int32_t tsRpcMaxTime = 600; // seconds;
|
||||
bool tsRpcForceTcp = 1; // disable this, means query, show command use udp protocol as default
|
||||
bool tsRpcForceTcp = true; // disable this, means query, show command use udp protocol as default
|
||||
int32_t tsMaxShellConns = 50000;
|
||||
int32_t tsMaxConnections = 50000;
|
||||
int32_t tsShellActivityTimer = 3; // second
|
||||
float tsNumOfThreadsPerCore = 1.0f;
|
||||
int32_t tsNumOfCommitThreads = 4;
|
||||
float tsRatioOfQueryCores = 1.0f;
|
||||
int32_t tsMaxBinaryDisplayWidth = 30;
|
||||
bool tsEnableSlaveQuery = 1;
|
||||
bool tsPrintAuth = 0;
|
||||
bool tsEnableSlaveQuery = true;
|
||||
bool tsPrintAuth = false;
|
||||
|
||||
// multi process
|
||||
bool tsMultiProcess = false;
|
||||
|
@ -54,8 +51,24 @@ int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4;
|
|||
int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4;
|
||||
int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4;
|
||||
|
||||
// queue & threads
|
||||
int32_t tsNumOfRpcThreads = 1;
|
||||
int32_t tsNumOfCommitThreads = 2;
|
||||
int32_t tsNumOfTaskQueueThreads = 1;
|
||||
int32_t tsNumOfMnodeQueryThreads = 1;
|
||||
int32_t tsNumOfMnodeReadThreads = 1;
|
||||
int32_t tsNumOfVnodeQueryThreads = 2;
|
||||
int32_t tsNumOfVnodeFetchThreads = 2;
|
||||
int32_t tsNumOfVnodeWriteThreads = 2;
|
||||
int32_t tsNumOfVnodeSyncThreads = 2;
|
||||
int32_t tsNumOfVnodeMergeThreads = 2;
|
||||
int32_t tsNumOfQnodeQueryThreads = 2;
|
||||
int32_t tsNumOfQnodeFetchThreads = 2;
|
||||
int32_t tsNumOfSnodeSharedThreads = 2;
|
||||
int32_t tsNumOfSnodeUniqueThreads = 2;
|
||||
|
||||
// monitor
|
||||
bool tsEnableMonitor = 1;
|
||||
bool tsEnableMonitor = true;
|
||||
int32_t tsMonitorInterval = 30;
|
||||
char tsMonitorFqdn[TSDB_FQDN_LEN] = {0};
|
||||
uint16_t tsMonitorPort = 6043;
|
||||
|
@ -124,13 +137,13 @@ int32_t tsQueryBufferSize = -1;
|
|||
int64_t tsQueryBufferSizeBytes = -1;
|
||||
|
||||
// in retrieve blocking model, the retrieve threads will wait for the completion of the query processing.
|
||||
bool tsRetrieveBlockingModel = 0;
|
||||
bool tsRetrieveBlockingModel = false;
|
||||
|
||||
// last_row(*), first(*), last_row(ts, col1, col2) query, the result fields will be the original column name
|
||||
bool tsKeepOriginalColumnName = 0;
|
||||
bool tsKeepOriginalColumnName = false;
|
||||
|
||||
// kill long query
|
||||
bool tsDeadLockKillQuery = 0;
|
||||
bool tsDeadLockKillQuery = false;
|
||||
|
||||
// tsdb config
|
||||
// For backward compatibility
|
||||
|
@ -290,7 +303,6 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt32(pCfg, "serverPort", defaultServerPort, 1, 65056, 1) != 0) return -1;
|
||||
if (cfgAddDir(pCfg, "tempDir", tsTempDir, 1) != 0) return -1;
|
||||
if (cfgAddFloat(pCfg, "minimalTempDirGB", 1.0f, 0.001f, 10000000, 1) != 0) return -1;
|
||||
if (cfgAddFloat(pCfg, "numOfThreadsPerCore", tsNumOfThreadsPerCore, 0, 10, 1) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "maxTmrCtrl", tsMaxTmrCtrl, 8, 2048, 1) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "rpcTimer", tsRpcTimer, 100, 3000, 1) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "rpcMaxTime", tsRpcMaxTime, 100, 7200, 1) != 0) return -1;
|
||||
|
@ -304,6 +316,11 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
|||
return -1;
|
||||
if (cfgAddBool(pCfg, "keepColumnName", tsKeepOriginalColumnName, 1) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "maxBinaryDisplayWidth", tsMaxBinaryDisplayWidth, 1, 65536, 1) != 0) return -1;
|
||||
|
||||
tsNumOfTaskQueueThreads = tsNumOfCores / 4;
|
||||
tsNumOfTaskQueueThreads = TRANGE(tsNumOfTaskQueueThreads, 1, 2);
|
||||
if (cfgAddInt32(pCfg, "numOfTaskQueueThreads", tsNumOfTaskQueueThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -336,8 +353,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt32(pCfg, "supportVnodes", 256, 0, 65536, 0) != 0) return -1;
|
||||
if (cfgAddDir(pCfg, "dataDir", tsDataDir, 0) != 0) return -1;
|
||||
if (cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, 0) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 100, 0) != 0) return -1;
|
||||
if (cfgAddFloat(pCfg, "ratioOfQueryCores", tsRatioOfQueryCores, 0, 2, 0) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000, 0) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelemetryReporting, 0) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "maxConnections", tsMaxConnections, 1, 100000, 0) != 0) return -1;
|
||||
|
@ -360,7 +375,59 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1;
|
||||
// if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1;
|
||||
|
||||
tsNumOfRpcThreads = tsNumOfCores / 2;
|
||||
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfCommitThreads = tsNumOfCores / 2;
|
||||
tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfMnodeQueryThreads = tsNumOfCores / 8;
|
||||
tsNumOfMnodeQueryThreads = TRANGE(tsNumOfMnodeQueryThreads, 1, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfMnodeQueryThreads", tsNumOfMnodeQueryThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfMnodeReadThreads = tsNumOfCores / 8;
|
||||
tsNumOfMnodeReadThreads = TRANGE(tsNumOfMnodeReadThreads, 1, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfVnodeQueryThreads = tsNumOfCores / 2;
|
||||
tsNumOfVnodeQueryThreads = TMIN(tsNumOfVnodeQueryThreads, 1);
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfVnodeFetchThreads = tsNumOfCores / 2;
|
||||
tsNumOfVnodeFetchThreads = TRANGE(tsNumOfVnodeFetchThreads, 2, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfVnodeWriteThreads = tsNumOfCores;
|
||||
tsNumOfVnodeWriteThreads = TMIN(tsNumOfVnodeWriteThreads, 1);
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeWriteThreads", tsNumOfVnodeWriteThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfVnodeSyncThreads = tsNumOfCores / 2;
|
||||
tsNumOfVnodeSyncThreads = TMIN(tsNumOfVnodeSyncThreads, 1);
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeSyncThreads", tsNumOfVnodeSyncThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfVnodeMergeThreads = tsNumOfCores / 8;
|
||||
tsNumOfVnodeMergeThreads = TRANGE(tsNumOfVnodeMergeThreads, 1, 1);
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeMergeThreads", tsNumOfVnodeMergeThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfQnodeQueryThreads = tsNumOfCores / 2;
|
||||
tsNumOfQnodeQueryThreads = TMIN(tsNumOfQnodeQueryThreads, 1);
|
||||
if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfQnodeFetchThreads = tsNumOfCores / 2;
|
||||
tsNumOfQnodeFetchThreads = TRANGE(tsNumOfQnodeFetchThreads, 2, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfSnodeSharedThreads = tsNumOfCores / 4;
|
||||
tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeSharedThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfSnodeUniqueThreads = tsNumOfCores / 4;
|
||||
tsNumOfSnodeUniqueThreads = TRANGE(tsNumOfSnodeUniqueThreads, 2, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeUniqueThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1;
|
||||
|
@ -424,7 +491,6 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
tsNumOfThreadsPerCore = cfgGetItem(pCfg, "numOfThreadsPerCore")->fval;
|
||||
tsMaxTmrCtrl = cfgGetItem(pCfg, "maxTmrCtrl")->i32;
|
||||
tsRpcTimer = cfgGetItem(pCfg, "rpcTimer")->i32;
|
||||
tsRpcMaxTime = cfgGetItem(pCfg, "rpcMaxTime")->i32;
|
||||
|
@ -437,7 +503,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
|||
tsMaxNumOfOrderedResults = cfgGetItem(pCfg, "maxNumOfOrderedRes")->i32;
|
||||
tsKeepOriginalColumnName = cfgGetItem(pCfg, "keepColumnName")->bval;
|
||||
tsMaxBinaryDisplayWidth = cfgGetItem(pCfg, "maxBinaryDisplayWidth")->i32;
|
||||
|
||||
tsNumOfTaskQueueThreads = cfgGetItem(pCfg, "numOfTaskQueueThreads")->i32;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -461,8 +527,6 @@ static void taosSetSystemCfg(SConfig *pCfg) {
|
|||
|
||||
static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||
tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval;
|
||||
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
|
||||
tsRatioOfQueryCores = cfgGetItem(pCfg, "ratioOfQueryCores")->fval;
|
||||
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
|
||||
tsEnableTelemetryReporting = cfgGetItem(pCfg, "telemetryReporting")->bval;
|
||||
tsMaxConnections = cfgGetItem(pCfg, "maxConnections")->i32;
|
||||
|
@ -485,7 +549,21 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
tsVnodeShmSize = cfgGetItem(pCfg, "vnodeShmSize")->i32;
|
||||
tsQnodeShmSize = cfgGetItem(pCfg, "qnodeShmSize")->i32;
|
||||
tsSnodeShmSize = cfgGetItem(pCfg, "snodeShmSize")->i32;
|
||||
// tsBnodeShmSize = cfgGetItem(pCfg, "bnodeShmSize")->i32;
|
||||
tsBnodeShmSize = cfgGetItem(pCfg, "bnodeShmSize")->i32;
|
||||
|
||||
tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32;
|
||||
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
|
||||
tsNumOfMnodeQueryThreads = cfgGetItem(pCfg, "numOfMnodeQueryThreads")->i32;
|
||||
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
|
||||
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
|
||||
tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
|
||||
tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32;
|
||||
tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32;
|
||||
tsNumOfVnodeMergeThreads = cfgGetItem(pCfg, "numOfVnodeMergeThreads")->i32;
|
||||
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
|
||||
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
|
||||
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
|
||||
tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
|
||||
|
||||
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
|
||||
tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32;
|
||||
|
|
|
@ -257,16 +257,11 @@ static int32_t dndRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, ch
|
|||
static int32_t dndInitServer(SDnode *pDnode) {
|
||||
STransMgmt *pMgmt = &pDnode->trans;
|
||||
|
||||
int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
|
||||
if (numOfThreads < 1) {
|
||||
numOfThreads = 1;
|
||||
}
|
||||
|
||||
SRpcInit rpcInit;
|
||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||
rpcInit.localPort = pDnode->serverPort;
|
||||
rpcInit.label = "DND";
|
||||
rpcInit.numOfThreads = numOfThreads;
|
||||
rpcInit.numOfThreads = tsNumOfRpcThreads;
|
||||
rpcInit.cfp = (RpcCfp)dndProcessMsg;
|
||||
rpcInit.sessions = tsMaxShellConns;
|
||||
rpcInit.connType = TAOS_CONN_SERVER;
|
||||
|
|
|
@ -129,25 +129,33 @@ int32_t mmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
|||
}
|
||||
|
||||
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
||||
SSingleWorkerCfg qCfg = {.min = 0, .max = 1, .name = "mnode-query", .fp = (FItem)mmProcessQueryQueue, .param = pMgmt};
|
||||
SSingleWorkerCfg qCfg = {.min = tsNumOfMnodeQueryThreads,
|
||||
.max = tsNumOfMnodeQueryThreads,
|
||||
.name = "mnode-query",
|
||||
.fp = (FItem)mmProcessQueryQueue,
|
||||
.param = pMgmt};
|
||||
if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) {
|
||||
dError("failed to start mnode-query worker since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
SSingleWorkerCfg rCfg = {.min = 0, .max = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt};
|
||||
SSingleWorkerCfg rCfg = {.min = tsNumOfMnodeReadThreads,
|
||||
.max = tsNumOfMnodeReadThreads,
|
||||
.name = "mnode-read",
|
||||
.fp = (FItem)mmProcessQueue,
|
||||
.param = pMgmt};
|
||||
if (tSingleWorkerInit(&pMgmt->readWorker, &rCfg) != 0) {
|
||||
dError("failed to start mnode-read worker since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
SSingleWorkerCfg wCfg = {.min = 0, .max = 1, .name = "mnode-write", .fp = (FItem)mmProcessQueue, .param = pMgmt};
|
||||
SSingleWorkerCfg wCfg = {.min = 1, .max = 1, .name = "mnode-write", .fp = (FItem)mmProcessQueue, .param = pMgmt};
|
||||
if (tSingleWorkerInit(&pMgmt->writeWorker, &wCfg) != 0) {
|
||||
dError("failed to start mnode-write worker since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
SSingleWorkerCfg sCfg = {.min = 0, .max = 1, .name = "mnode-sync", .fp = (FItem)mmProcessQueue, .param = pMgmt};
|
||||
SSingleWorkerCfg sCfg = {.min = 1, .max = 1, .name = "mnode-sync", .fp = (FItem)mmProcessQueue, .param = pMgmt};
|
||||
if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) {
|
||||
dError("failed to start mnode sync-worker since %s", terrstr());
|
||||
return -1;
|
||||
|
|
|
@ -106,13 +106,8 @@ int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
|
|||
}
|
||||
|
||||
int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
|
||||
int32_t maxFetchThreads = 4;
|
||||
int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores);
|
||||
int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1);
|
||||
int32_t maxQueryThreads = minQueryThreads;
|
||||
|
||||
SSingleWorkerCfg queryCfg = {.min = minQueryThreads,
|
||||
.max = maxQueryThreads,
|
||||
SSingleWorkerCfg queryCfg = {.min = tsNumOfVnodeQueryThreads,
|
||||
.max = tsNumOfVnodeQueryThreads,
|
||||
.name = "qnode-query",
|
||||
.fp = (FItem)qmProcessQueryQueue,
|
||||
.param = pMgmt};
|
||||
|
@ -122,8 +117,8 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
SSingleWorkerCfg fetchCfg = {.min = minFetchThreads,
|
||||
.max = maxFetchThreads,
|
||||
SSingleWorkerCfg fetchCfg = {.min = tsNumOfQnodeFetchThreads,
|
||||
.max = tsNumOfQnodeFetchThreads,
|
||||
.name = "qnode-fetch",
|
||||
.fp = (FItem)qmProcessFetchQueue,
|
||||
.param = pMgmt};
|
||||
|
|
|
@ -50,7 +50,7 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < SND_UNIQUE_THREAD_NUM; i++) {
|
||||
for (int32_t i = 0; i < tsNumOfSnodeUniqueThreads; i++) {
|
||||
SMultiWorker *pUniqueWorker = taosMemoryMalloc(sizeof(SMultiWorker));
|
||||
if (pUniqueWorker == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -69,8 +69,8 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
|
|||
}
|
||||
}
|
||||
|
||||
SSingleWorkerCfg cfg = {.min = SND_SHARED_THREAD_NUM,
|
||||
.max = SND_SHARED_THREAD_NUM,
|
||||
SSingleWorkerCfg cfg = {.min = tsNumOfSnodeSharedThreads,
|
||||
.max = tsNumOfSnodeSharedThreads,
|
||||
.name = "snode-shared",
|
||||
.fp = (FItem)smProcessSharedQueue,
|
||||
.param = pMgmt};
|
||||
|
@ -97,7 +97,7 @@ void smStopWorker(SSnodeMgmt *pMgmt) {
|
|||
static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) {
|
||||
SMsgHead *pHead = pMsg->pCont;
|
||||
pHead->vgId = htonl(pHead->vgId);
|
||||
return pHead->vgId % SND_UNIQUE_THREAD_NUM;
|
||||
return pHead->vgId % tsNumOfSnodeUniqueThreads;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) {
|
||||
|
|
|
@ -379,39 +379,31 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
}
|
||||
|
||||
int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
|
||||
int32_t maxFetchThreads = 4;
|
||||
int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores);
|
||||
int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1);
|
||||
int32_t maxQueryThreads = minQueryThreads;
|
||||
int32_t maxWriteThreads = TMAX(tsNumOfCores, 1);
|
||||
int32_t maxSyncThreads = TMAX(tsNumOfCores / 2, 1);
|
||||
int32_t maxMergeThreads = 1;
|
||||
|
||||
SQWorkerPool *pQPool = &pMgmt->queryPool;
|
||||
pQPool->name = "vnode-query";
|
||||
pQPool->min = minQueryThreads;
|
||||
pQPool->max = maxQueryThreads;
|
||||
pQPool->min = tsNumOfVnodeQueryThreads;
|
||||
pQPool->max = tsNumOfVnodeQueryThreads;
|
||||
if (tQWorkerInit(pQPool) != 0) return -1;
|
||||
|
||||
SQWorkerPool *pFPool = &pMgmt->fetchPool;
|
||||
pFPool->name = "vnode-fetch";
|
||||
pFPool->min = minFetchThreads;
|
||||
pFPool->max = maxFetchThreads;
|
||||
pFPool->min = tsNumOfVnodeFetchThreads;
|
||||
pFPool->max = tsNumOfVnodeFetchThreads;
|
||||
if (tQWorkerInit(pFPool) != 0) return -1;
|
||||
|
||||
SWWorkerPool *pWPool = &pMgmt->writePool;
|
||||
pWPool->name = "vnode-write";
|
||||
pWPool->max = maxWriteThreads;
|
||||
pWPool->max = tsNumOfVnodeWriteThreads;
|
||||
if (tWWorkerInit(pWPool) != 0) return -1;
|
||||
|
||||
pWPool = &pMgmt->syncPool;
|
||||
pWPool->name = "vnode-sync";
|
||||
pWPool->max = maxSyncThreads;
|
||||
pWPool->max = tsNumOfVnodeSyncThreads;
|
||||
if (tWWorkerInit(pWPool) != 0) return -1;
|
||||
|
||||
pWPool = &pMgmt->mergePool;
|
||||
pWPool->name = "vnode-merge";
|
||||
pWPool->max = maxMergeThreads;
|
||||
pWPool->max = tsNumOfVnodeMergeThreads;
|
||||
if (tWWorkerInit(pWPool) != 0) return -1;
|
||||
|
||||
SSingleWorkerCfg cfg = {.min = 1, .max = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt};
|
||||
|
|
|
@ -97,18 +97,14 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag
|
|||
static void* pTaskQueue = NULL;
|
||||
|
||||
int32_t initTaskQueue() {
|
||||
double factor = 4.0;
|
||||
|
||||
int32_t numOfThreads = TMAX((int)(tsNumOfCores * tsNumOfThreadsPerCore / factor), 2);
|
||||
|
||||
int32_t queueSize = tsMaxConnections * 2;
|
||||
pTaskQueue = taosInitScheduler(queueSize, numOfThreads, "tsc");
|
||||
pTaskQueue = taosInitScheduler(queueSize, tsNumOfTaskQueueThreads, "tsc");
|
||||
if (NULL == pTaskQueue) {
|
||||
qError("failed to init task queue");
|
||||
return -1;
|
||||
}
|
||||
|
||||
qDebug("task queue is initialized, numOfThreads: %d", numOfThreads);
|
||||
qDebug("task queue is initialized, numOfThreads: %d", tsNumOfTaskQueueThreads);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue