From 38231fc48eaff00e31114316913eb122aa06b06a Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 11 Apr 2023 09:01:19 +0800 Subject: [PATCH] enh: add queryMaxConcurrentTables configuration --- include/common/tglobal.h | 1 + source/common/src/tglobal.c | 3 +++ source/libs/scheduler/inc/schInt.h | 5 ++--- source/libs/scheduler/src/schFlowCtrl.c | 24 ++++++++++++------------ source/libs/scheduler/src/scheduler.c | 6 ++++-- 5 files changed, 22 insertions(+), 17 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 13e8454ac3..29182ef277 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -104,6 +104,7 @@ extern int32_t tsCacheLazyLoadThreshold; // cost threshold for last/last_row lo // query client extern int32_t tsQueryPolicy; extern int32_t tsQueryRspPolicy; +extern int64_t tsQueryMaxConcurrentTables; extern int32_t tsQuerySmaOptimize; extern int32_t tsQueryRsmaTolerance; extern bool tsQueryPlannerTrace; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 56e34da4ce..af2da6ae2a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -103,6 +103,7 @@ char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table // query int32_t tsQueryPolicy = 1; int32_t tsQueryRspPolicy = 0; +int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT bool tsEnableQueryHb = false; int32_t tsQuerySmaOptimize = 0; int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data. @@ -340,6 +341,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, 0) != 0) return -1; if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1; if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1; + if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX, 1) != 0) return -1; tsNumOfRpcThreads = tsNumOfCores / 2; tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS); @@ -735,6 +737,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsKeepColumnName = cfgGetItem(pCfg, "keepColumnName")->bval; tsUseAdapter = cfgGetItem(pCfg, "useAdapter")->bval; tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval; + tsQueryMaxConcurrentTables = cfgGetItem(pCfg, "queryMaxConcurrentTables")->i64; tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32; diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index d002b5dfa9..7840fe2017 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -54,7 +54,6 @@ typedef enum { #define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000 #define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000 -#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200 // unit is TSDB_TABLE_NUM_UNIT #define SCHEDULE_DEFAULT_POLICY SCH_LOAD_SEQ #define SCHEDULE_DEFAULT_MAX_NODE_NUM 20 @@ -134,7 +133,7 @@ typedef struct SSchStatusFps { typedef struct SSchedulerCfg { uint32_t maxJobNum; - int32_t maxNodeTableNum; + int64_t maxNodeTableNum; SCH_POLICY schPolicy; bool enableReSchedule; } SSchedulerCfg; @@ -175,7 +174,7 @@ typedef struct SSchHbCallbackParam { typedef struct SSchFlowControl { SRWLatch lock; bool sorted; - int32_t tableNumSum; + int64_t tableNumSum; uint32_t execTaskNum; SArray *taskList; // Element is SSchTask* } SSchFlowControl; diff --git a/source/libs/scheduler/src/schFlowCtrl.c b/source/libs/scheduler/src/schFlowCtrl.c index 9cb95a6bbe..8c2b65e125 100644 --- a/source/libs/scheduler/src/schFlowCtrl.c +++ b/source/libs/scheduler/src/schFlowCtrl.c @@ -46,7 +46,7 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { return TSDB_CODE_SUCCESS; } - int32_t sum = 0; + int64_t sum = 0; int32_t taskNum = taosArrayGetSize(pJob->dataSrcTasks); for (int32_t i = 0; i < taskNum; ++i) { SSchTask *pTask = *(SSchTask **)taosArrayGet(pJob->dataSrcTasks, i); @@ -55,7 +55,7 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { } if (schMgmt.cfg.maxNodeTableNum <= 0 || sum < schMgmt.cfg.maxNodeTableNum) { - SCH_JOB_DLOG("job no need flow ctrl, totalTableNum:%d", sum); + SCH_JOB_DLOG("job no need flow ctrl, totalTableNum:%" PRId64, sum); return TSDB_CODE_SUCCESS; } @@ -68,7 +68,7 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { SCH_SET_JOB_NEED_FLOW_CTRL(pJob); - SCH_JOB_DLOG("job NEED flow ctrl, totalTableNum:%d", sum); + SCH_JOB_DLOG("job NEED flow ctrl, totalTableNum:%" PRId64, sum); return TSDB_CODE_SUCCESS; } @@ -94,7 +94,7 @@ int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) { --ctrl->execTaskNum; ctrl->tableNumSum -= pTask->plan->execNodeStat.tableNum; - SCH_TASK_DLOG("task quota removed, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn, + SCH_TASK_DLOG("task quota removed, fqdn:%s, port:%d, tableNum:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d", ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); _return: @@ -125,7 +125,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) { SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - SCH_TASK_DLOG("task quota added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn, + SCH_TASK_DLOG("task quota added, fqdn:%s, port:%d, tableNum:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d", ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, nctrl.tableNumSum, nctrl.execTaskNum); *enough = true; @@ -142,7 +142,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) { break; } - int32_t sum = pTask->plan->execNodeStat.tableNum + ctrl->tableNumSum; + int64_t sum = pTask->plan->execNodeStat.tableNum + ctrl->tableNumSum; if (sum <= schMgmt.cfg.maxNodeTableNum) { ctrl->tableNumSum = sum; @@ -173,7 +173,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) { _return: - SCH_TASK_DLOG("task quota %s added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", + SCH_TASK_DLOG("task quota %s added, fqdn:%s, port:%d, tableNum:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d", ((*enough) ? "" : "NOT"), ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); @@ -203,7 +203,7 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) { return TSDB_CODE_SUCCESS; } - int32_t remainNum = schMgmt.cfg.maxNodeTableNum - ctrl->tableNumSum; + int64_t remainNum = schMgmt.cfg.maxNodeTableNum - ctrl->tableNumSum; int32_t taskNum = taosArrayGetSize(ctrl->taskList); int32_t code = 0; SSchTask *pTask = NULL; @@ -217,7 +217,7 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) { SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); if (pTask->plan->execNodeStat.tableNum > remainNum && ctrl->execTaskNum > 0) { - SCH_TASK_DLOG("task NOT to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn, + SCH_TASK_DLOG("task NOT to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d", ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); continue; @@ -228,14 +228,14 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) { taosArrayRemove(ctrl->taskList, i); - SCH_TASK_DLOG("task to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn, + SCH_TASK_DLOG("task to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d", ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask)); remainNum -= pTask->plan->execNodeStat.tableNum; if (remainNum <= 0) { - SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn, ep->port, + SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d", ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum); break; @@ -244,7 +244,7 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) { if (i < (taskNum - 1)) { SSchTask *pLastTask = *(SSchTask **)taosArrayGetLast(ctrl->taskList); if (remainNum < pLastTask->plan->execNodeStat.tableNum) { - SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d, smallestInList:%d", + SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d, smallestInList:%d", ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum, pLastTask->plan->execNodeStat.tableNum); break; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 2b46a4710e..e7561ccb7e 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -18,6 +18,7 @@ #include "schInt.h" #include "tmsg.h" #include "tref.h" +#include "tglobal.h" SSchedulerMgmt schMgmt = { .jobRef = -1, @@ -30,11 +31,12 @@ int32_t schedulerInit() { } schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM; - schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM; + schMgmt.cfg.maxNodeTableNum = tsQueryMaxConcurrentTables; schMgmt.cfg.schPolicy = SCHEDULE_DEFAULT_POLICY; schMgmt.cfg.enableReSchedule = true; - qDebug("schedule policy init to %d", schMgmt.cfg.schPolicy); + qDebug("schedule init, policy: %d, maxNodeTableNum: %" PRId64", reSchedule:%d", + schMgmt.cfg.schPolicy, schMgmt.cfg.maxNodeTableNum, schMgmt.cfg.enableReSchedule); schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl); if (schMgmt.jobRef < 0) {