enh: add queryMaxConcurrentTables configuration
This commit is contained in:
parent
874725d17a
commit
38231fc48e
|
@ -104,6 +104,7 @@ extern int32_t tsCacheLazyLoadThreshold; // cost threshold for last/last_row lo
|
|||
// query client
|
||||
extern int32_t tsQueryPolicy;
|
||||
extern int32_t tsQueryRspPolicy;
|
||||
extern int64_t tsQueryMaxConcurrentTables;
|
||||
extern int32_t tsQuerySmaOptimize;
|
||||
extern int32_t tsQueryRsmaTolerance;
|
||||
extern bool tsQueryPlannerTrace;
|
||||
|
|
|
@ -103,6 +103,7 @@ char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table
|
|||
// query
|
||||
int32_t tsQueryPolicy = 1;
|
||||
int32_t tsQueryRspPolicy = 0;
|
||||
int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT
|
||||
bool tsEnableQueryHb = false;
|
||||
int32_t tsQuerySmaOptimize = 0;
|
||||
int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data.
|
||||
|
@ -340,6 +341,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, 0) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1;
|
||||
if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX, 1) != 0) return -1;
|
||||
|
||||
tsNumOfRpcThreads = tsNumOfCores / 2;
|
||||
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS);
|
||||
|
@ -735,6 +737,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
|||
tsKeepColumnName = cfgGetItem(pCfg, "keepColumnName")->bval;
|
||||
tsUseAdapter = cfgGetItem(pCfg, "useAdapter")->bval;
|
||||
tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval;
|
||||
tsQueryMaxConcurrentTables = cfgGetItem(pCfg, "queryMaxConcurrentTables")->i64;
|
||||
|
||||
tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32;
|
||||
|
||||
|
|
|
@ -54,7 +54,6 @@ typedef enum {
|
|||
|
||||
#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000
|
||||
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
|
||||
#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200 // unit is TSDB_TABLE_NUM_UNIT
|
||||
#define SCHEDULE_DEFAULT_POLICY SCH_LOAD_SEQ
|
||||
#define SCHEDULE_DEFAULT_MAX_NODE_NUM 20
|
||||
|
||||
|
@ -134,7 +133,7 @@ typedef struct SSchStatusFps {
|
|||
|
||||
typedef struct SSchedulerCfg {
|
||||
uint32_t maxJobNum;
|
||||
int32_t maxNodeTableNum;
|
||||
int64_t maxNodeTableNum;
|
||||
SCH_POLICY schPolicy;
|
||||
bool enableReSchedule;
|
||||
} SSchedulerCfg;
|
||||
|
@ -175,7 +174,7 @@ typedef struct SSchHbCallbackParam {
|
|||
typedef struct SSchFlowControl {
|
||||
SRWLatch lock;
|
||||
bool sorted;
|
||||
int32_t tableNumSum;
|
||||
int64_t tableNumSum;
|
||||
uint32_t execTaskNum;
|
||||
SArray *taskList; // Element is SSchTask*
|
||||
} SSchFlowControl;
|
||||
|
|
|
@ -46,7 +46,7 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t sum = 0;
|
||||
int64_t sum = 0;
|
||||
int32_t taskNum = taosArrayGetSize(pJob->dataSrcTasks);
|
||||
for (int32_t i = 0; i < taskNum; ++i) {
|
||||
SSchTask *pTask = *(SSchTask **)taosArrayGet(pJob->dataSrcTasks, i);
|
||||
|
@ -55,7 +55,7 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
|
|||
}
|
||||
|
||||
if (schMgmt.cfg.maxNodeTableNum <= 0 || sum < schMgmt.cfg.maxNodeTableNum) {
|
||||
SCH_JOB_DLOG("job no need flow ctrl, totalTableNum:%d", sum);
|
||||
SCH_JOB_DLOG("job no need flow ctrl, totalTableNum:%" PRId64, sum);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -68,7 +68,7 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
|
|||
|
||||
SCH_SET_JOB_NEED_FLOW_CTRL(pJob);
|
||||
|
||||
SCH_JOB_DLOG("job NEED flow ctrl, totalTableNum:%d", sum);
|
||||
SCH_JOB_DLOG("job NEED flow ctrl, totalTableNum:%" PRId64, sum);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -94,7 +94,7 @@ int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) {
|
|||
--ctrl->execTaskNum;
|
||||
ctrl->tableNumSum -= pTask->plan->execNodeStat.tableNum;
|
||||
|
||||
SCH_TASK_DLOG("task quota removed, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn,
|
||||
SCH_TASK_DLOG("task quota removed, fqdn:%s, port:%d, tableNum:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d", ep->fqdn,
|
||||
ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
|
||||
|
||||
_return:
|
||||
|
@ -125,7 +125,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
|
|||
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SCH_TASK_DLOG("task quota added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn,
|
||||
SCH_TASK_DLOG("task quota added, fqdn:%s, port:%d, tableNum:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d", ep->fqdn,
|
||||
ep->port, pTask->plan->execNodeStat.tableNum, nctrl.tableNumSum, nctrl.execTaskNum);
|
||||
|
||||
*enough = true;
|
||||
|
@ -142,7 +142,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
|
|||
break;
|
||||
}
|
||||
|
||||
int32_t sum = pTask->plan->execNodeStat.tableNum + ctrl->tableNumSum;
|
||||
int64_t sum = pTask->plan->execNodeStat.tableNum + ctrl->tableNumSum;
|
||||
|
||||
if (sum <= schMgmt.cfg.maxNodeTableNum) {
|
||||
ctrl->tableNumSum = sum;
|
||||
|
@ -173,7 +173,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
|
|||
|
||||
_return:
|
||||
|
||||
SCH_TASK_DLOG("task quota %s added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d",
|
||||
SCH_TASK_DLOG("task quota %s added, fqdn:%s, port:%d, tableNum:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d",
|
||||
((*enough) ? "" : "NOT"), ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum,
|
||||
ctrl->execTaskNum);
|
||||
|
||||
|
@ -203,7 +203,7 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t remainNum = schMgmt.cfg.maxNodeTableNum - ctrl->tableNumSum;
|
||||
int64_t remainNum = schMgmt.cfg.maxNodeTableNum - ctrl->tableNumSum;
|
||||
int32_t taskNum = taosArrayGetSize(ctrl->taskList);
|
||||
int32_t code = 0;
|
||||
SSchTask *pTask = NULL;
|
||||
|
@ -217,7 +217,7 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
|
|||
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
|
||||
|
||||
if (pTask->plan->execNodeStat.tableNum > remainNum && ctrl->execTaskNum > 0) {
|
||||
SCH_TASK_DLOG("task NOT to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn,
|
||||
SCH_TASK_DLOG("task NOT to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d", ep->fqdn,
|
||||
ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
|
||||
|
||||
continue;
|
||||
|
@ -228,14 +228,14 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
|
|||
|
||||
taosArrayRemove(ctrl->taskList, i);
|
||||
|
||||
SCH_TASK_DLOG("task to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn,
|
||||
SCH_TASK_DLOG("task to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d", ep->fqdn,
|
||||
ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
|
||||
|
||||
SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
|
||||
|
||||
remainNum -= pTask->plan->execNodeStat.tableNum;
|
||||
if (remainNum <= 0) {
|
||||
SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn, ep->port,
|
||||
SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d", ep->fqdn, ep->port,
|
||||
ctrl->tableNumSum, ctrl->execTaskNum);
|
||||
|
||||
break;
|
||||
|
@ -244,7 +244,7 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
|
|||
if (i < (taskNum - 1)) {
|
||||
SSchTask *pLastTask = *(SSchTask **)taosArrayGetLast(ctrl->taskList);
|
||||
if (remainNum < pLastTask->plan->execNodeStat.tableNum) {
|
||||
SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d, smallestInList:%d",
|
||||
SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d, smallestInList:%d",
|
||||
ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum, pLastTask->plan->execNodeStat.tableNum);
|
||||
|
||||
break;
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include "schInt.h"
|
||||
#include "tmsg.h"
|
||||
#include "tref.h"
|
||||
#include "tglobal.h"
|
||||
|
||||
SSchedulerMgmt schMgmt = {
|
||||
.jobRef = -1,
|
||||
|
@ -30,11 +31,12 @@ int32_t schedulerInit() {
|
|||
}
|
||||
|
||||
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM;
|
||||
schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM;
|
||||
schMgmt.cfg.maxNodeTableNum = tsQueryMaxConcurrentTables;
|
||||
schMgmt.cfg.schPolicy = SCHEDULE_DEFAULT_POLICY;
|
||||
schMgmt.cfg.enableReSchedule = true;
|
||||
|
||||
qDebug("schedule policy init to %d", schMgmt.cfg.schPolicy);
|
||||
qDebug("schedule init, policy: %d, maxNodeTableNum: %" PRId64", reSchedule:%d",
|
||||
schMgmt.cfg.schPolicy, schMgmt.cfg.maxNodeTableNum, schMgmt.cfg.enableReSchedule);
|
||||
|
||||
schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl);
|
||||
if (schMgmt.jobRef < 0) {
|
||||
|
|
Loading…
Reference in New Issue