From 194722ee24e187a6c59f690db3f30912dcf32f54 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 4 Mar 2022 17:26:47 +0800 Subject: [PATCH 1/7] feature/qnode --- include/libs/nodes/plannodes.h | 1 + include/libs/qcom/query.h | 5 +- include/libs/scheduler/scheduler.h | 1 + source/libs/parser/src/astCreateFuncs.c | 38 +- source/libs/scheduler/inc/schedulerInt.h | 60 +++- source/libs/scheduler/src/schFlowCtrl.c | 216 ++++++++++++ source/libs/scheduler/src/scheduler.c | 328 +++++++++++------- source/libs/scheduler/test/schedulerTests.cpp | 2 +- 8 files changed, 482 insertions(+), 169 deletions(-) create mode 100644 source/libs/scheduler/src/schFlowCtrl.c diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index cf5d17cd74..c1516969ff 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -192,6 +192,7 @@ typedef struct SSubplan { int32_t msgType; // message type for subplan, used to denote the send message type to vnode. int32_t level; // the execution level of current subplan, starting from 0 in a top-down manner. SQueryNodeAddr execNode; // for the scan/modify subplan, the optional execution node + SQueryNodeStat execNodeStat; // only for scan subplan SNodeList* pChildren; // the datasource subplan,from which to fetch the result SNodeList* pParents; // the data destination subplan, get data from current subplan SPhysiNode* pNode; // physical plan of current subplan diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 6d3f97fc4e..ad1f988f4c 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -35,7 +35,6 @@ enum { JOB_TASK_STATUS_CANCELLING, JOB_TASK_STATUS_CANCELLED, JOB_TASK_STATUS_DROPPING, - JOB_TASK_STATUS_FREEING, }; enum { @@ -133,6 +132,10 @@ typedef struct SQueryNodeAddr { SEpSet epset; } SQueryNodeAddr; +typedef struct SQueryNodeStat { + double tableNum; //table number in million +} SQueryNodeStat; + int32_t initTaskQueue(); int32_t cleanupTaskQueue(); diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 518cabb636..4f0f0b4953 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -25,6 +25,7 @@ extern "C" { typedef struct SSchedulerCfg { uint32_t maxJobNum; + double maxNodeTableNum; } SSchedulerCfg; typedef struct SQueryProfileSummary { diff --git a/source/libs/parser/src/astCreateFuncs.c b/source/libs/parser/src/astCreateFuncs.c index a531f3de71..a9e10da143 100644 --- a/source/libs/parser/src/astCreateFuncs.c +++ b/source/libs/parser/src/astCreateFuncs.c @@ -44,7 +44,7 @@ static SDatabaseOptions* setDbBlocks(SAstCreateContext* pCxt, SDatabaseOptions* int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_TOTAL_BLOCKS || val > TSDB_MAX_TOTAL_BLOCKS) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option totalBlocks: %d valid range: [%d, %d]", val, TSDB_MIN_TOTAL_BLOCKS, TSDB_MAX_TOTAL_BLOCKS); + "invalid db option totalBlocks: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_TOTAL_BLOCKS, TSDB_MAX_TOTAL_BLOCKS); pCxt->valid = false; return pOptions; } @@ -56,7 +56,7 @@ static SDatabaseOptions* setDbCache(SAstCreateContext* pCxt, SDatabaseOptions* p int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_CACHE_BLOCK_SIZE || val > TSDB_MAX_CACHE_BLOCK_SIZE) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option cacheBlockSize: %d valid range: [%d, %d]", val, TSDB_MIN_CACHE_BLOCK_SIZE, TSDB_MAX_CACHE_BLOCK_SIZE); + "invalid db option cacheBlockSize: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_CACHE_BLOCK_SIZE, TSDB_MAX_CACHE_BLOCK_SIZE); pCxt->valid = false; return pOptions; } @@ -68,7 +68,7 @@ static SDatabaseOptions* setDbCacheLast(SAstCreateContext* pCxt, SDatabaseOption int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_DB_CACHE_LAST_ROW || val > TSDB_MAX_DB_CACHE_LAST_ROW) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option cacheLast: %d valid range: [%d, %d]", val, TSDB_MIN_DB_CACHE_LAST_ROW, TSDB_MAX_DB_CACHE_LAST_ROW); + "invalid db option cacheLast: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_DB_CACHE_LAST_ROW, TSDB_MAX_DB_CACHE_LAST_ROW); pCxt->valid = false; return pOptions; } @@ -80,7 +80,7 @@ static SDatabaseOptions* setDbComp(SAstCreateContext* pCxt, SDatabaseOptions* pO int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_COMP_LEVEL || val > TSDB_MAX_COMP_LEVEL) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option compression: %d valid range: [%d, %d]", val, TSDB_MIN_COMP_LEVEL, TSDB_MAX_COMP_LEVEL); + "invalid db option compression: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_COMP_LEVEL, TSDB_MAX_COMP_LEVEL); pCxt->valid = false; return pOptions; } @@ -92,7 +92,7 @@ static SDatabaseOptions* setDbDays(SAstCreateContext* pCxt, SDatabaseOptions* pO int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_DAYS_PER_FILE || val > TSDB_MAX_DAYS_PER_FILE) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option daysPerFile: %d valid range: [%d, %d]", val, TSDB_MIN_DAYS_PER_FILE, TSDB_MAX_DAYS_PER_FILE); + "invalid db option daysPerFile: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_DAYS_PER_FILE, TSDB_MAX_DAYS_PER_FILE); pCxt->valid = false; return pOptions; } @@ -104,7 +104,7 @@ static SDatabaseOptions* setDbFsync(SAstCreateContext* pCxt, SDatabaseOptions* p int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_FSYNC_PERIOD || val > TSDB_MAX_FSYNC_PERIOD) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option fsyncPeriod: %d valid range: [%d, %d]", val, TSDB_MIN_FSYNC_PERIOD, TSDB_MAX_FSYNC_PERIOD); + "invalid db option fsyncPeriod: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_FSYNC_PERIOD, TSDB_MAX_FSYNC_PERIOD); pCxt->valid = false; return pOptions; } @@ -116,7 +116,7 @@ static SDatabaseOptions* setDbMaxRows(SAstCreateContext* pCxt, SDatabaseOptions* int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_MAX_ROW_FBLOCK || val > TSDB_MAX_MAX_ROW_FBLOCK) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option maxRowsPerBlock: %d valid range: [%d, %d]", val, TSDB_MIN_MAX_ROW_FBLOCK, TSDB_MAX_MAX_ROW_FBLOCK); + "invalid db option maxRowsPerBlock: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_MAX_ROW_FBLOCK, TSDB_MAX_MAX_ROW_FBLOCK); pCxt->valid = false; return pOptions; } @@ -128,7 +128,7 @@ static SDatabaseOptions* setDbMinRows(SAstCreateContext* pCxt, SDatabaseOptions* int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_MIN_ROW_FBLOCK || val > TSDB_MAX_MIN_ROW_FBLOCK) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option minRowsPerBlock: %d valid range: [%d, %d]", val, TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK); + "invalid db option minRowsPerBlock: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK); pCxt->valid = false; return pOptions; } @@ -140,7 +140,7 @@ static SDatabaseOptions* setDbKeep(SAstCreateContext* pCxt, SDatabaseOptions* pO int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_KEEP || val > TSDB_MAX_KEEP) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option keep: %d valid range: [%d, %d]", val, TSDB_MIN_KEEP, TSDB_MAX_KEEP); + "invalid db option keep: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_KEEP, TSDB_MAX_KEEP); pCxt->valid = false; return pOptions; } @@ -168,7 +168,7 @@ static SDatabaseOptions* setDbQuorum(SAstCreateContext* pCxt, SDatabaseOptions* int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_DB_QUORUM_OPTION || val > TSDB_MAX_DB_QUORUM_OPTION) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option quorum: %d valid range: [%d, %d]", val, TSDB_MIN_DB_QUORUM_OPTION, TSDB_MAX_DB_QUORUM_OPTION); + "invalid db option quorum: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_DB_QUORUM_OPTION, TSDB_MAX_DB_QUORUM_OPTION); pCxt->valid = false; return pOptions; } @@ -180,7 +180,7 @@ static SDatabaseOptions* setDbReplica(SAstCreateContext* pCxt, SDatabaseOptions* int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_DB_REPLICA_OPTION || val > TSDB_MAX_DB_REPLICA_OPTION) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option replications: %d valid range: [%d, %d]", val, TSDB_MIN_DB_REPLICA_OPTION, TSDB_MAX_DB_REPLICA_OPTION); + "invalid db option replications: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_DB_REPLICA_OPTION, TSDB_MAX_DB_REPLICA_OPTION); pCxt->valid = false; return pOptions; } @@ -192,7 +192,7 @@ static SDatabaseOptions* setDbTtl(SAstCreateContext* pCxt, SDatabaseOptions* pOp int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_DB_TTL_OPTION) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option ttl: %d, should be greater than or equal to %d", val, TSDB_MIN_DB_TTL_OPTION); + "invalid db option ttl: %" PRId64 ", should be greater than or equal to %d", val, TSDB_MIN_DB_TTL_OPTION); pCxt->valid = false; return pOptions; } @@ -203,7 +203,7 @@ static SDatabaseOptions* setDbTtl(SAstCreateContext* pCxt, SDatabaseOptions* pOp static SDatabaseOptions* setDbWal(SAstCreateContext* pCxt, SDatabaseOptions* pOptions, const SToken* pVal) { int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_WAL_LEVEL || val > TSDB_MAX_WAL_LEVEL) { - snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "invalid db option walLevel: %d, only 1-2 allowed", val); + snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "invalid db option walLevel: %" PRId64 ", only 1-2 allowed", val); pCxt->valid = false; return pOptions; } @@ -215,7 +215,7 @@ static SDatabaseOptions* setDbVgroups(SAstCreateContext* pCxt, SDatabaseOptions* int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_VNODES_PER_DB || val > TSDB_MAX_VNODES_PER_DB) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option vgroups: %d valid range: [%d, %d]", val, TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB); + "invalid db option vgroups: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB); pCxt->valid = false; return pOptions; } @@ -226,7 +226,7 @@ static SDatabaseOptions* setDbVgroups(SAstCreateContext* pCxt, SDatabaseOptions* static SDatabaseOptions* setDbSingleStable(SAstCreateContext* pCxt, SDatabaseOptions* pOptions, const SToken* pVal) { int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_DB_SINGLE_STABLE_OPTION || val > TSDB_MAX_DB_SINGLE_STABLE_OPTION) { - snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "invalid db option singleStable: %d, only 0-1 allowed", val); + snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "invalid db option singleStable: %" PRId64 ", only 0-1 allowed", val); pCxt->valid = false; return pOptions; } @@ -237,7 +237,7 @@ static SDatabaseOptions* setDbSingleStable(SAstCreateContext* pCxt, SDatabaseOpt static SDatabaseOptions* setDbStreamMode(SAstCreateContext* pCxt, SDatabaseOptions* pOptions, const SToken* pVal) { int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_DB_STREAM_MODE_OPTION || val > TSDB_MAX_DB_STREAM_MODE_OPTION) { - snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "invalid db option streamMode: %d, only 0-1 allowed", val); + snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "invalid db option streamMode: %" PRId64 ", only 0-1 allowed", val); pCxt->valid = false; return pOptions; } @@ -269,7 +269,7 @@ static STableOptions* setTableKeep(SAstCreateContext* pCxt, STableOptions* pOpti int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_KEEP || val > TSDB_MAX_KEEP) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid table option keep: %d valid range: [%d, %d]", val, TSDB_MIN_KEEP, TSDB_MAX_KEEP); + "invalid table option keep: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_KEEP, TSDB_MAX_KEEP); pCxt->valid = false; return pOptions; } @@ -281,7 +281,7 @@ static STableOptions* setTableTtl(SAstCreateContext* pCxt, STableOptions* pOptio int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_DB_TTL_OPTION) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid table option ttl: %d, should be greater than or equal to %d", val, TSDB_MIN_DB_TTL_OPTION); + "invalid table option ttl: %" PRId64 ", should be greater than or equal to %d", val, TSDB_MIN_DB_TTL_OPTION); pCxt->valid = false; return pOptions; } @@ -292,7 +292,7 @@ static STableOptions* setTableTtl(SAstCreateContext* pCxt, STableOptions* pOptio static STableOptions* setTableComment(SAstCreateContext* pCxt, STableOptions* pOptions, const SToken* pVal) { if (pVal->n >= sizeof(pOptions->comments)) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid table option comment, length cannot exceed %d", sizeof(pOptions->comments) - 1); + "invalid table option comment, length cannot exceed %d", (int32_t)(sizeof(pOptions->comments) - 1)); pCxt->valid = false; return pOptions; } diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index f9259fd645..7813251bfb 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -26,8 +26,9 @@ extern "C" { #include "scheduler.h" #include "thash.h" -#define SCHEDULE_DEFAULT_JOB_NUMBER 1000 -#define SCHEDULE_DEFAULT_TASK_NUMBER 1000 +#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000 +#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000 +#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 20.0 /* in million */ #define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA @@ -72,17 +73,26 @@ typedef struct SSchCallbackParam { uint64_t queryId; int64_t refId; uint64_t taskId; + SEpSet epSet; } SSchCallbackParam; +typedef struct SSchFlowControl { + SRWLatch lock; + double tableNumSum; + uint32_t execTaskNum; + SArray *taskList; // Element is SQueryTask* +} SSchFlowControl; + typedef struct SSchLevel { - int32_t level; - int8_t status; - SRWLatch lock; - int32_t taskFailed; - int32_t taskSucceed; - int32_t taskNum; - int32_t taskLaunchIdx; // launch startup index - SArray *subTasks; // Element is SQueryTask + int32_t level; + int8_t status; + SRWLatch lock; + int32_t taskFailed; + int32_t taskSucceed; + int32_t taskNum; + int32_t taskLaunchedNum; + SHashObj *flowCtrl; // key is ep, element is SSchFlowControl + SArray *subTasks; // Element is SQueryTask } SSchLevel; typedef struct SSchTask { @@ -102,13 +112,14 @@ typedef struct SSchTask { int32_t childReady; // child task ready number SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* - void* handle; // task send handle + void* handle; // task send handle } SSchTask; typedef struct SSchJobAttr { bool needFetch; bool syncSchedule; bool queryJob; + bool needFlowCtrl; } SSchJobAttr; typedef struct SSchJob { @@ -140,6 +151,8 @@ typedef struct SSchJob { SQueryProfileSummary summary; } SSchJob; +extern SSchedulerMgmt schMgmt; + #define SCH_TASK_READY_TO_LUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children)) #define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) @@ -152,8 +165,17 @@ typedef struct SSchJob { #define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st) #define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status) -#define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != SUBPLAN_TYPE_MODIFY) -#define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob) +#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true +#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl) +#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEAF_TASK(_job, _task) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) + +#define SCH_SET_JOB_TYPE(_job, type) (_job)->attr.queryJob = ((type) != SUBPLAN_TYPE_MODIFY) +#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) +#define SCH_JOB_NEED_FETCH(_job) SCH_IS_QUERY_JOB(_job) +#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum) +#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum) +#define SCH_GET_CUR_EP(_addr) (&(_addr)->epset.eps[(_addr)->epset.inUse]) +#define SCH_SWITCH_EPSET(_addr) ((_addr)->epset.inUse = ((_addr)->epset.inUse + 1) % (_addr)->epset.numOfEps) #define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__) #define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__) @@ -173,10 +195,18 @@ typedef struct SSchJob { #define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) -static int32_t schLaunchTask(SSchJob *job, SSchTask *task); -static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType); +int32_t schLaunchTask(SSchJob *job, SSchTask *task); +int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType); SSchJob *schAcquireJob(int64_t refId); int32_t schReleaseJob(int64_t refId); +void schFreeFlowCtrl(SSchLevel *pLevel); +int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel); +int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask); +int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough); +int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask); +int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask); +int32_t schFetchFromRemote(SSchJob *pJob); + #ifdef __cplusplus } diff --git a/source/libs/scheduler/src/schFlowCtrl.c b/source/libs/scheduler/src/schFlowCtrl.c new file mode 100644 index 0000000000..2b3b5e20ae --- /dev/null +++ b/source/libs/scheduler/src/schFlowCtrl.c @@ -0,0 +1,216 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "schedulerInt.h" +#include "tmsg.h" +#include "query.h" +#include "catalog.h" +#include "tref.h" + +void schFreeFlowCtrl(SSchLevel *pLevel) { + if (NULL == pLevel->flowCtrl) { + return; + } + + SSchFlowControl *ctrl = NULL; + void *pIter = taosHashIterate(pLevel->flowCtrl, NULL); + while (pIter) { + ctrl = (SSchFlowControl *)pIter; + + if (ctrl->taskList) { + taosArrayDestroy(ctrl->taskList); + } + + pIter = taosHashIterate(pLevel->flowCtrl, pIter); + } + + taosHashCleanup(pLevel->flowCtrl); + pLevel->flowCtrl = NULL; +} + +int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { + if (!SCH_IS_QUERY_JOB(pJob)) { + return TSDB_CODE_SUCCESS; + } + + double sum = 0; + + for (int32_t i = 0; i < pLevel->taskNum; ++i) { + SSchTask *pTask = taosArrayGet(pLevel->subTasks, i); + + sum += pTask->plan->execNodeStat.tableNum; + } + + if (sum < schMgmt.cfg.maxNodeTableNum) { + return TSDB_CODE_SUCCESS; + } + + pLevel->flowCtrl = taosHashInit(pLevel->taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (NULL == pLevel->flowCtrl) { + SCH_JOB_ELOG("taosHashInit %d flowCtrl failed", pLevel->taskNum); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SCH_SET_JOB_NEED_FLOW_CTRL(pJob); + + return TSDB_CODE_SUCCESS; +} + +int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) { + SSchLevel *pLevel = pTask->level; + SSchFlowControl *ctrl = NULL; + int32_t code = 0; + SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); + + ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp)); + if (NULL == ctrl) { + SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + SCH_LOCK(SCH_WRITE, &ctrl->lock); + if (ctrl->execTaskNum <= 0) { + SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + --ctrl->execTaskNum; + ctrl->tableNumSum -= pTask->plan->execNodeStat.tableNum; + +_return: + + SCH_UNLOCK(SCH_WRITE, &ctrl->lock); + + SCH_RET(code); +} + +int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) { + SSchLevel *pLevel = pTask->level; + int32_t code = 0; + SSchFlowControl *ctrl = NULL; + + do { + ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, SCH_GET_CUR_EP(&pTask->plan->execNode), sizeof(SEp)); + if (NULL == ctrl) { + SSchFlowControl nctrl = {.tableNumSum = pTask->plan->execNodeStat.tableNum, .execTaskNum = 1}; + + code = taosHashPut(pLevel->flowCtrl, SCH_GET_CUR_EP(&pTask->plan->execNode), sizeof(SEp), &nctrl, sizeof(nctrl)); + if (code) { + if (HASH_NODE_EXIST(code)) { + continue; + } + + SCH_TASK_ELOG("taosHashPut flowCtrl failed, size:%d", (int32_t)sizeof(nctrl)); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + *enough = true; + return TSDB_CODE_SUCCESS; + } + + SCH_LOCK(SCH_WRITE, &ctrl->lock); + + if (0 == ctrl->execTaskNum) { + ctrl->tableNumSum = pTask->plan->execNodeStat.tableNum; + ++ctrl->execTaskNum; + + *enough = true; + break; + } + + double sum = pTask->plan->execNodeStat.tableNum + ctrl->tableNumSum; + + if (sum <= schMgmt.cfg.maxNodeTableNum) { + ctrl->tableNumSum = sum; + ++ctrl->execTaskNum; + + *enough = true; + break; + } + + if (NULL == ctrl->taskList) { + ctrl->taskList = taosArrayInit(pLevel->taskNum, POINTER_BYTES); + if (NULL == ctrl->taskList) { + SCH_TASK_ELOG("taosArrayInit taskList failed, size:%d", (int32_t)pLevel->taskNum); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } + + if (NULL == taosArrayPush(ctrl->taskList, &pTask)) { + SCH_TASK_ELOG("taosArrayPush to taskList failed, size:%d", (int32_t)taosArrayGetSize(ctrl->taskList)); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + *enough = false; + + break; + } while (true); + +_return: + + SCH_UNLOCK(SCH_WRITE, &ctrl->lock); + + SCH_RET(code); +} + +int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) { + do { + SCH_LOCK(SCH_WRITE, &ctrl->lock); + + if (NULL == ctrl->taskList || taosArrayGetSize(ctrl->taskList) <= 0) { + SCH_UNLOCK(SCH_WRITE, &ctrl->lock); + return TSDB_CODE_SUCCESS; + } + + SSchTask *pTask = *(SSchTask **)taosArrayPop(ctrl->taskList); + + SCH_UNLOCK(SCH_WRITE, &ctrl->lock); + + bool enough = false; + SCH_ERR_RET(schCheckIncTaskFlowQuota(pJob, pTask, &enough)); + + if (enough) { + SCH_ERR_RET(schLaunchTaskImpl(pJob, pTask)); + continue; + } + + break; + } while (true); + + return TSDB_CODE_SUCCESS; +} + + +int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) { + if (!SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { + return TSDB_CODE_SUCCESS; + } + + SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask)); + + SSchLevel *pLevel = pTask->level; + SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); + + SSchFlowControl *ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp)); + if (NULL == ctrl) { + SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + SCH_ERR_RET(schLaunchTasksInFlowCtrlListImpl(pJob, ctrl)); + +} + + diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 2b5ad4f26b..e6a5c285de 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -91,6 +91,18 @@ void schFreeTask(SSchTask* pTask) { } +static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { + int8_t status = SCH_GET_JOB_STATUS(pJob); + if (pStatus) { + *pStatus = status; + } + + return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED + || status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING + || status == JOB_TASK_STATUS_SUCCEED); +} + + int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { int32_t lastMsgType = atomic_load_32(&pTask->lastMsgType); @@ -197,6 +209,7 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { _return: SCH_JOB_ELOG("invalid job status update, from %d to %d", oriStatus, newStatus); + SCH_ERR_RET(code); } @@ -275,7 +288,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { } SSchLevel *pLevel = taosArrayGet(pJob->levels, 0); - if (pJob->attr.queryJob && pLevel->taskNum > 1) { + if (SCH_IS_QUERY_JOB(pJob) && pLevel->taskNum > 1) { SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } @@ -285,10 +298,9 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) { - int32_t idx = atomic_load_8(&pTask->candidateIdx); - SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, idx); + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); if (NULL == addr) { - SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", idx, (int32_t)taosArrayGetSize(pTask->candidateAddrs)); + SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs)); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } @@ -323,9 +335,9 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_TASK_NUMBER, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_MAX_TASK_NUM, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); if (NULL == planToTask) { - SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER); + SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -379,7 +391,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { for (int32_t n = 0; n < taskNum; ++n) { SSubplan *plan = (SSubplan*)nodesListGetNode(plans->pNodeList, n); - SCH_SET_JOB_TYPE(&pJob->attr, plan->subplanType); + SCH_SET_JOB_TYPE(pJob, plan->subplanType); SSchTask task = {0}; SSchTask *pTask = &task; @@ -564,15 +576,45 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) { } -int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) { +int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) { // TODO set retry or not based on task type/errCode/retry times/job status/available eps... - // TODO if needRetry, set task retry info - // TODO set condidateIdx - // TODO record failed but tried task *needRetry = false; return TSDB_CODE_SUCCESS; + + //TODO CHECK epList/condidateList + if (SCH_IS_DATA_SRC_TASK(pTask)) { + + } else { + int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); + + if ((pTask->candidateIdx + 1) >= candidateNum) { + return TSDB_CODE_SUCCESS; + } + + ++pTask->candidateIdx; + } + + +} + +int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { + atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1); + + if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { + SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask)); + } + + if (SCH_IS_DATA_SRC_TASK(pTask)) { + SCH_SWITCH_EPSET(&pTask->plan->execNode); + } else { + ++pTask->candidateIdx; + } + + SCH_ERR_RET(schLaunchTask(pJob, pTask)); + + return TSDB_CODE_SUCCESS; } int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) { @@ -588,14 +630,14 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod } int32_t code = atomic_load_32(&pJob->errCode); - SCH_ERR_RET(code); - SCH_JOB_ELOG("job errCode is invalid, errCode:%d", code); + SCH_JOB_DLOG("job failed with error: %s", tstrerror(code)); + + SCH_RET(code); } - -// Note: no more error processing, handled in function internal +// Note: no more task error processing, handled in function internal int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAILED, errCode)); } @@ -606,38 +648,8 @@ int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) { } -// Note: no more error processing, handled in function internal -int32_t schFetchFromRemote(SSchJob *pJob) { - int32_t code = 0; - - if (atomic_val_compare_exchange_32(&pJob->remoteFetch, 0, 1) != 0) { - SCH_JOB_ELOG("prior fetching not finished, remoteFetch:%d", atomic_load_32(&pJob->remoteFetch)); - return TSDB_CODE_SUCCESS; - } - void *res = atomic_load_ptr(&pJob->res); - if (res) { - atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); - - SCH_JOB_DLOG("res already fetched, res:%p", res); - return TSDB_CODE_SUCCESS; - } - - SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH)); - - return TSDB_CODE_SUCCESS; - -_return: - - atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); - - schProcessOnJobFailure(pJob, code); - - return code; -} - - -// Note: no more error processing, handled in function internal +// Note: no more task error processing, handled in function internal int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { int32_t code = 0; @@ -655,9 +667,7 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { _return: - SCH_ERR_RET(schProcessOnJobFailure(pJob, code)); - - SCH_RET(code); + SCH_RET(schProcessOnJobFailure(pJob, code)); } int32_t schProcessOnDataFetched(SSchJob *job) { @@ -665,8 +675,16 @@ int32_t schProcessOnDataFetched(SSchJob *job) { tsem_post(&job->rspSem); } -// Note: no more error processing, handled in function internal +// Note: no more task error processing, handled in function internal int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { + int8_t status = 0; + + if (schJobNeedToStop(pJob, &status)) { + SCH_TASK_DLOG("task failed not processed cause of job status, job status:%d", status); + + SCH_RET(atomic_load_32(&pJob->errCode)); + } + bool needRetry = false; bool moved = false; int32_t taskDone = 0; @@ -674,16 +692,16 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode)); - SCH_ERR_JRET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry)); + SCH_ERR_JRET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry)); if (!needRetry) { SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode)); if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) { - code = schMoveTaskToFailList(pJob, pTask, &moved); - if (code && moved) { - SCH_ERR_RET(errCode); - } + SCH_ERR_JRET(schMoveTaskToFailList(pJob, pTask, &moved)); + } else { + SCH_TASK_DLOG("task already done, no more failure process, status:%d", SCH_GET_TASK_STATUS(pTask)); + return TSDB_CODE_SUCCESS; } SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED); @@ -702,35 +720,29 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) } } } else { - // Note: no more error processing, already handled - SCH_ERR_RET(schLaunchTask(pJob, pTask)); + SCH_ERR_JRET(schHandleTaskRetry(pJob, pTask)); return TSDB_CODE_SUCCESS; } _return: - SCH_ERR_RET(schProcessOnJobFailure(pJob, errCode)); - - SCH_ERR_RET(errCode); + SCH_RET(schProcessOnJobFailure(pJob, errCode)); } - -// Note: no more error processing, handled in function internal +// Note: no more task error processing, handled in function internal int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { bool moved = false; int32_t code = 0; - SSchTask *pErrTask = pTask; - code = schMoveTaskToSuccList(pJob, pTask, &moved); - if (code && moved) { - SCH_ERR_RET(code); - } + SCH_ERR_JRET(schMoveTaskToSuccList(pJob, pTask, &moved)); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED); SCH_ERR_JRET(schRecordTaskSucceedNode(pJob, pTask)); - + + SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask)); + int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0; if (parentNum == 0) { int32_t taskDone = 0; @@ -759,14 +771,9 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { pJob->fetchTask = pTask; - code = schMoveTaskToExecList(pJob, pTask, &moved); - if (code && moved) { - SCH_ERR_RET(code); - } + SCH_ERR_JRET(schMoveTaskToExecList(pJob, pTask, &moved)); - SCH_ERR_RET(schProcessOnJobPartialSuccess(pJob)); - - return TSDB_CODE_SUCCESS; + SCH_RET(schProcessOnJobPartialSuccess(pJob)); } /* @@ -780,8 +787,6 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { for (int32_t i = 0; i < parentNum; ++i) { SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i); - pErrTask = par; - int32_t readyNum = atomic_add_fetch_32(&par->childReady, 1); SCH_LOCK(SCH_WRITE, &par->lock); @@ -790,7 +795,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { SCH_UNLOCK(SCH_WRITE, &par->lock); if (SCH_TASK_READY_TO_LUNCH(readyNum, par)) { - SCH_ERR_RET(schLaunchTask(pJob, par)); + SCH_ERR_RET(schLaunchTaskImpl(pJob, par)); } } @@ -798,22 +803,55 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { _return: - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pErrTask, code)); - - SCH_ERR_RET(code); + SCH_RET(schProcessOnJobFailure(pJob, code)); } + +// Note: no more error processing, handled in function internal +int32_t schFetchFromRemote(SSchJob *pJob) { + int32_t code = 0; + + if (atomic_val_compare_exchange_32(&pJob->remoteFetch, 0, 1) != 0) { + SCH_JOB_ELOG("prior fetching not finished, remoteFetch:%d", atomic_load_32(&pJob->remoteFetch)); + return TSDB_CODE_SUCCESS; + } + + void *res = atomic_load_ptr(&pJob->res); + if (res) { + atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); + + SCH_JOB_DLOG("res already fetched, res:%p", res); + return TSDB_CODE_SUCCESS; + } + + SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH)); + + return TSDB_CODE_SUCCESS; + +_return: + + atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); + + SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code)); +} + + +// Note: no more task error processing, handled in function internal int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t code = 0; + int8_t status = 0; + + if (schJobNeedToStop(pJob, &status)) { + SCH_TASK_ELOG("rsp not processed cause of job status, job status:%d", status); + + SCH_RET(atomic_load_32(&pJob->errCode)); + } SCH_ERR_JRET(schValidateTaskReceivedMsgType(pJob, pTask, msgType)); switch (msgType) { case TDMT_VND_CREATE_TABLE_RSP: { - if (rspCode != TSDB_CODE_SUCCESS) { - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); - } - + SCH_ERR_JRET(rspCode); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); break; @@ -828,9 +866,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch pJob->resNumOfRows += rsp->affectedRows; #else - if (rspCode != TSDB_CODE_SUCCESS) { - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); - } + SCH_ERR_JRET(rspCode); SSubmitRsp *rsp = (SSubmitRsp *)msg; if (rsp) { @@ -845,9 +881,11 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch case TDMT_VND_QUERY_RSP: { SQueryTableRsp *rsp = (SQueryTableRsp *)msg; - if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) { - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); + SCH_ERR_JRET(rspCode); + if (NULL == msg) { + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } + SCH_ERR_JRET(rsp->code); SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY)); @@ -856,9 +894,11 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch case TDMT_VND_RES_READY_RSP: { SResReadyRsp *rsp = (SResReadyRsp *)msg; - if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) { - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); + SCH_ERR_JRET(rspCode); + if (NULL == msg) { + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } + SCH_ERR_JRET(rsp->code); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); @@ -867,14 +907,15 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch case TDMT_VND_FETCH_RSP: { SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; - if (rspCode != TSDB_CODE_SUCCESS || NULL == msg) { - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); + SCH_ERR_JRET(rspCode); + if (NULL == msg) { + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } - + if (pJob->res) { SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->res); tfree(rsp); - SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); } atomic_store_ptr(&pJob->res, rsp); @@ -886,7 +927,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed); - SCH_ERR_JRET(schProcessOnDataFetched(pJob)); + schProcessOnDataFetched(pJob); break; } case TDMT_VND_DROP_TASK_RSP: { @@ -904,9 +945,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch _return: - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, code)); - - SCH_RET(code); + SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } @@ -1057,7 +1096,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t code = 0; bool isCandidateAddr = false; if (NULL == addr) { - addr = taosArrayGet(pTask->candidateAddrs, atomic_load_8(&pTask->candidateIdx)); + addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); isCandidateAddr = true; } @@ -1177,28 +1216,17 @@ _return: SCH_RET(code); } -static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { - int8_t status = SCH_GET_JOB_STATUS(pJob); - if (pStatus) { - *pStatus = status; - } - return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED - || status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING); -} - - -// Note: no more error processing, handled in function internal -int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { +int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { int8_t status = 0; int32_t code = 0; + + atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1); if (schJobNeedToStop(pJob, &status)) { - SCH_TASK_ELOG("no need to launch task cause of job status, job status:%d", status); + SCH_TASK_DLOG("no need to launch task cause of job status, job status:%d", status); - code = atomic_load_32(&pJob->errCode); - SCH_ERR_RET(code); - SCH_RET(TSDB_CODE_SCH_STATUS_ERROR); + SCH_RET(atomic_load_32(&pJob->errCode)); } SSubplan *plan = pTask->plan; @@ -1207,38 +1235,69 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen); if (TSDB_CODE_SUCCESS != code || NULL == pTask->msg || pTask->msgLen <= 0) { SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, pTask->msgLen); - SCH_ERR_JRET(code); + SCH_ERR_RET(code); } else { SCH_TASK_DLOG("physical plan len:%d, %s", pTask->msgLen, pTask->msg); } } - SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask)); + SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask)); // NOTE: race condition: the task should be put into the hash table before send msg to server if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) { - SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask)); + SCH_ERR_RET(schPushTaskToExecList(pJob, pTask)); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING); } - SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType)); + + SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType)); + + return TSDB_CODE_SUCCESS; +} + +// Note: no more error processing, handled in function internal +int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { + bool enough = false; + int32_t code = 0; + + if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { + SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough)); + + if (enough) { + SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask)); + } + } else { + SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask)); + } + return TSDB_CODE_SUCCESS; _return: - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, code)); - SCH_RET(code); + + SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } +int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) { + for (int32_t i = 0; i < level->taskNum; ++i) { + SSchTask *pTask = taosArrayGet(level->subTasks, i); + + SCH_ERR_RET(schLaunchTask(pJob, pTask)); + } + + return TSDB_CODE_SUCCESS; +} + + + int32_t schLaunchJob(SSchJob *pJob) { SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx); - + SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_EXECUTING)); - - for (int32_t i = 0; i < level->taskNum; ++i) { - SSchTask *pTask = taosArrayGet(level->subTasks, i); - SCH_ERR_RET(schLaunchTask(pJob, pTask)); - } - + + SCH_ERR_RET(schCheckJobNeedFlowCtrl(pJob, level)); + + SCH_ERR_RET(schLaunchLevelTasks(pJob, level)); + return TSDB_CODE_SUCCESS; } @@ -1312,6 +1371,8 @@ void schFreeJobImpl(void *job) { for(int32_t i = 0; i < numOfLevels; ++i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); + schFreeFlowCtrl(pLevel); + int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks); for(int32_t j = 0; j < numOfTasks; ++j) { SSchTask* pTask = taosArrayGet(pLevel->subTasks, j); @@ -1423,10 +1484,11 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { schMgmt.cfg = *cfg; if (schMgmt.cfg.maxJobNum == 0) { - schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; + schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM; } } else { - schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; + schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM; + schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM; } schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl); @@ -1611,7 +1673,7 @@ int32_t schedulerFetchRows(int64_t job, void** pData) { SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - if (!SCH_JOB_NEED_FETCH(&pJob->attr)) { + if (!SCH_JOB_NEED_FETCH(pJob)) { SCH_JOB_ELOG("no need to fetch data, status:%d", SCH_GET_JOB_STATUS(pJob)); taosReleaseRef(schMgmt.jobRef, job); SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 80fd7e35b3..61cc8cd25d 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -535,7 +535,7 @@ TEST(queryTest, normalCase) { char *tablename = "table1"; SVgroupInfo vgInfo = {0}; int64_t job = 0; - SQueryPlan dag = {0}; + SQueryPlan dag; schtInitLogFile(); From 71fdf76b8af7f4fa012fc215a29784c2b27541fa Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 4 Mar 2022 19:36:03 +0800 Subject: [PATCH 2/7] feature/scheduler --- include/common/tmsg.h | 5 ++ include/libs/catalog/catalog.h | 2 +- source/common/src/tmsg.c | 4 ++ source/dnode/mnode/impl/src/mndDb.c | 74 +++++++++++++++++++++-------- source/libs/qcom/src/querymsg.c | 2 +- 5 files changed, 65 insertions(+), 22 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 25d668e788..883ebdd0cd 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -140,6 +140,8 @@ typedef enum _mgmt_table { #define TSDB_KILL_MSG_LEN 30 +#define TSDB_TABLE_NUM_UNIT 100000 + #define TSDB_VN_READ_ACCCESS ((char)0x1) #define TSDB_VN_WRITE_ACCCESS ((char)0x2) #define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS) @@ -566,6 +568,7 @@ typedef struct { char db[TSDB_DB_FNAME_LEN]; int64_t dbId; int32_t vgVersion; + int32_t numOfTable; } SUseDbReq; int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); @@ -791,8 +794,10 @@ typedef struct SVgroupInfo { uint32_t hashBegin; uint32_t hashEnd; SEpSet epset; + int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT } SVgroupInfo; + typedef struct { int32_t numOfVgroups; SVgroupInfo vgroups[]; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index c6183780f9..4e3377a4ce 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -74,9 +74,9 @@ typedef struct SDbVgVersion { char dbFName[TSDB_DB_FNAME_LEN]; int64_t dbId; int32_t vgVersion; + int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT } SDbVgVersion; - int32_t catalogInit(SCatalogCfg *cfg); /** diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index ae31dff310..8b4f572fee 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1423,6 +1423,7 @@ int32_t tSerializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) { if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; if (tEncodeI64(&encoder, pReq->dbId) < 0) return -1; if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1; + if (tEncodeI32(&encoder, pReq->numOfTable) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -1438,6 +1439,7 @@ int32_t tDeserializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) { if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; if (tDecodeI64(&decoder, &pReq->dbId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->numOfTable) < 0) return -1; tEndDecode(&decoder); tCoderClear(&decoder); @@ -1482,6 +1484,7 @@ static int32_t tSerializeSUseDbRspImp(SCoder *pEncoder, SUseDbRsp *pRsp) { if (tEncodeU32(pEncoder, pVgInfo->hashBegin) < 0) return -1; if (tEncodeU32(pEncoder, pVgInfo->hashEnd) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pVgInfo->epset) < 0) return -1; + if (tEncodeI32(pEncoder, pVgInfo->numOfTable) < 0) return -1; } return 0; @@ -1542,6 +1545,7 @@ int32_t tDeserializeSUseDbRspImp(SCoder *pDecoder, SUseDbRsp *pRsp) { if (tDecodeU32(pDecoder, &vgInfo.hashBegin) < 0) return -1; if (tDecodeU32(pDecoder, &vgInfo.hashEnd) < 0) return -1; if (tDecodeSEpSet(pDecoder, &vgInfo.epset) < 0) return -1; + if (tDecodeI32(pDecoder, &vgInfo.numOfTable) < 0) return -1; taosArrayPush(pRsp->pVgroupInfos, &vgInfo); } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 974f5fc982..1b75d04b44 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -885,6 +885,29 @@ DROP_DB_OVER: return code; } +void mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode, int32_t *num) { + int32_t vindex = 0; + SSdb *pSdb = pMnode->pSdb; + + void *pIter = NULL; + while (vindex < pDb->cfg.numOfVgroups) { + SVgObj *pVgroup = NULL; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + + if (pVgroup->dbUid == pDb->uid) { + *num += pVgroup->numOfTables / TSDB_TABLE_NUM_UNIT; + + vindex++; + } + + sdbRelease(pSdb, pVgroup); + } + + sdbCancelFetch(pSdb, pIter); +} + + static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { int32_t vindex = 0; SSdb *pSdb = pMnode->pSdb; @@ -900,6 +923,7 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { vgInfo.vgId = pVgroup->vgId; vgInfo.hashBegin = pVgroup->hashBegin; vgInfo.hashEnd = pVgroup->hashEnd; + vgInfo.numOfTable = pVgroup->numOfTables / TSDB_TABLE_NUM_UNIT; vgInfo.epset.numOfEps = pVgroup->replica; for (int32_t gid = 0; gid < pVgroup->replica; ++gid) { SVnodeGid *pVgid = &pVgroup->vnodeGid[gid]; @@ -967,7 +991,10 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { goto USE_DB_OVER; } - if (usedbReq.vgVersion < pDb->vgVersion || usedbReq.dbId != pDb->uid) { + int32_t numOfTable = 0; + mndGetDBTableNum(pDb, pMnode, &numOfTable); + + if (usedbReq.vgVersion < pDb->vgVersion || usedbReq.dbId != pDb->uid || numOfTable != usedbReq.numOfTable) { mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos); } @@ -1017,6 +1044,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, SDbVgVersion *pDbVgVersion = &pDbs[i]; pDbVgVersion->dbId = htobe64(pDbVgVersion->dbId); pDbVgVersion->vgVersion = htonl(pDbVgVersion->vgVersion); + pDbVgVersion->numOfTable = htonl(pDbVgVersion->numOfTable); SUseDbRsp usedbRsp = {0}; @@ -1027,28 +1055,34 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, usedbRsp.uid = pDbVgVersion->dbId; usedbRsp.vgVersion = -1; taosArrayPush(batchUseRsp.pArray, &usedbRsp); - } else if (pDbVgVersion->vgVersion >= pDb->vgVersion) { - mDebug("db:%s, version not changed", pDbVgVersion->dbFName); + continue; + } + + int32_t numOfTable = 0; + mndGetDBTableNum(pDb, pMnode, &numOfTable); + + if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable) { + mDebug("db:%s, version & numOfTable not changed", pDbVgVersion->dbFName); mndReleaseDb(pMnode, pDb); continue; - } else { - usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo)); - if (usedbRsp.pVgroupInfos == NULL) { - mndReleaseDb(pMnode, pDb); - mError("db:%s, failed to malloc usedb response", pDb->name); - continue; - } - - mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos); - memcpy(usedbRsp.db, pDb->name, TSDB_DB_FNAME_LEN); - usedbRsp.uid = pDb->uid; - usedbRsp.vgVersion = pDb->vgVersion; - usedbRsp.vgNum = (int32_t)taosArrayGetSize(usedbRsp.pVgroupInfos); - usedbRsp.hashMethod = pDb->hashMethod; - - taosArrayPush(batchUseRsp.pArray, &usedbRsp); - mndReleaseDb(pMnode, pDb); } + + usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo)); + if (usedbRsp.pVgroupInfos == NULL) { + mndReleaseDb(pMnode, pDb); + mError("db:%s, failed to malloc usedb response", pDb->name); + continue; + } + + mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos); + memcpy(usedbRsp.db, pDb->name, TSDB_DB_FNAME_LEN); + usedbRsp.uid = pDb->uid; + usedbRsp.vgVersion = pDb->vgVersion; + usedbRsp.vgNum = (int32_t)taosArrayGetSize(usedbRsp.pVgroupInfos); + usedbRsp.hashMethod = pDb->hashMethod; + + taosArrayPush(batchUseRsp.pArray, &usedbRsp); + mndReleaseDb(pMnode, pDb); } int32_t rspLen = tSerializeSUseDbBatchRsp(NULL, 0, &batchUseRsp); diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 2a52e01dc1..41c20ad264 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -247,7 +247,7 @@ int32_t queryProcessTableMetaRsp(void *output, char *msg, int32_t msgSize) { PROCESS_META_OVER: if (code != 0) { - qError("failed to process table meta rsp since %s", terrstr()); + qError("failed to process table meta rsp since %s", tstrerror(code)); } tFreeSTableMetaRsp(&metaRsp); From b6b4563ac7b2e49bd36b0bc794223221ce175460 Mon Sep 17 00:00:00 2001 From: dapan Date: Sat, 5 Mar 2022 10:54:36 +0800 Subject: [PATCH 3/7] feature/scheduler --- include/common/tmsg.h | 3 ++- include/libs/qcom/query.h | 3 ++- source/client/src/clientHb.c | 1 + source/libs/catalog/src/catalog.c | 14 +++++++++++--- source/libs/qcom/src/querymsg.c | 2 ++ 5 files changed, 18 insertions(+), 5 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 883ebdd0cd..2e4d3ff44e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -171,6 +171,7 @@ typedef struct { char db[TSDB_DB_FNAME_LEN]; int64_t dbId; int32_t vgVersion; + int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT } SBuildUseDBInput; typedef struct SField { @@ -568,7 +569,7 @@ typedef struct { char db[TSDB_DB_FNAME_LEN]; int64_t dbId; int32_t vgVersion; - int32_t numOfTable; + int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT } SUseDbReq; int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index ad1f988f4c..84c246dbf5 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -82,6 +82,7 @@ typedef struct STableMeta { typedef struct SDBVgInfo { int32_t vgVersion; int8_t hashMethod; + int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT SHashObj *vgHash; //key:vgId, value:SVgroupInfo } SDBVgInfo; @@ -133,7 +134,7 @@ typedef struct SQueryNodeAddr { } SQueryNodeAddr; typedef struct SQueryNodeStat { - double tableNum; //table number in million + double tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT } SQueryNodeStat; int32_t initTaskQueue(); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index e9e64a78b8..d32b2a7bd5 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -224,6 +224,7 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl SDbVgVersion *db = &dbs[i]; db->dbId = htobe64(db->dbId); db->vgVersion = htonl(db->vgVersion); + db->numOfTable = htonl(db->numOfTable); } SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = sizeof(SDbVgVersion) * dbNum, .value = dbs}; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 5779906761..f857ea95c3 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1331,7 +1331,7 @@ int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SD } bool newAdded = false; - SDbVgVersion vgVersion = {.dbId = dbId, .vgVersion = dbInfo->vgVersion}; + SDbVgVersion vgVersion = {.dbId = dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable}; SCtgDBCache *dbCache = NULL; CTG_ERR_RET(ctgGetAddDBCache(pCtg, dbFName, dbId, &dbCache)); @@ -1344,8 +1344,15 @@ int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SD CTG_ERR_RET(ctgWAcquireVgInfo(pCtg, dbCache)); if (dbCache->vgInfo) { - if (dbInfo->vgVersion <= dbCache->vgInfo->vgVersion) { - ctgInfo("db vgVersion is old, dbFName:%s, vgVersion:%d, currentVersion:%d", dbFName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion); + if (dbInfo->vgVersion < dbCache->vgInfo->vgVersion) { + ctgDebug("db vgVersion is old, dbFName:%s, vgVersion:%d, currentVersion:%d", dbFName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion); + ctgWReleaseVgInfo(dbCache); + + return TSDB_CODE_SUCCESS; + } + + if (dbInfo->vgVersion == dbCache->vgInfo->vgVersion && dbInfo->numOfTable == dbCache->vgInfo->numOfTable) { + ctgDebug("no new db vgVersion or numOfTable, dbFName:%s, vgVersion:%d, numOfTable:%d", dbFName, dbInfo->vgVersion, dbInfo->numOfTable); ctgWReleaseVgInfo(dbCache); return TSDB_CODE_SUCCESS; @@ -1511,6 +1518,7 @@ int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const if (inCache) { input.dbId = (*dbCache)->dbId; input.vgVersion = (*dbCache)->vgInfo->vgVersion; + input.numOfTable = (*dbCache)->vgInfo->numOfTable; } else { input.vgVersion = CTG_DEFAULT_INVALID_VERSION; } diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 41c20ad264..ab0bbd319a 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -42,6 +42,7 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) { for (int32_t i = 0; i < usedbRsp->vgNum; ++i) { SVgroupInfo *pVgInfo = taosArrayGet(usedbRsp->pVgroupInfos, i); + pOut->dbVgroup->numOfTable += pVgInfo->numOfTable; if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pVgInfo->vgId, sizeof(int32_t), pVgInfo, sizeof(SVgroupInfo))) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -84,6 +85,7 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms usedbReq.db[sizeof(usedbReq.db) - 1] = 0; usedbReq.vgVersion = pInput->vgVersion; usedbReq.dbId = pInput->dbId; + usedbReq.numOfTable = pInput->numOfTable; int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq); void *pBuf = rpcMallocCont(bufLen); From 6830d866708fd2fd68461e89992b00e1ebb39c2d Mon Sep 17 00:00:00 2001 From: dapan Date: Sat, 5 Mar 2022 11:21:09 +0800 Subject: [PATCH 4/7] feature/scheduler --- source/libs/scheduler/test/schedulerTests.cpp | 105 ++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 61cc8cd25d..c974ffebc4 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -634,6 +634,111 @@ TEST(queryTest, normalCase) { schedulerDestroy(); } +TEST(queryTest, flowCtrlCase) { + void *mockPointer = (void *)0x1; + char *clusterId = "cluster1"; + char *dbname = "1.db1"; + char *tablename = "table1"; + SVgroupInfo vgInfo = {0}; + int64_t job = 0; + SQueryPlan dag; + + schtInitLogFile(); + + SArray *qnodeList = taosArrayInit(1, sizeof(SEp)); + + SEp qnodeAddr = {0}; + strcpy(qnodeAddr.fqdn, "qnode0.ep"); + qnodeAddr.port = 6031; + taosArrayPush(qnodeList, &qnodeAddr); + + int32_t code = schedulerInit(NULL); + ASSERT_EQ(code, 0); + + schtBuildQueryDag(&dag); + + schtSetPlanToString(); + schtSetExecNode(); + schtSetAsyncSendMsgToServer(); + + code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job); + ASSERT_EQ(code, 0); + + + SSchJob *pJob = schAcquireJob(job); + + void *pIter = taosHashIterate(pJob->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SQueryTableRsp rsp = {0}; + code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); + + ASSERT_EQ(code, 0); + pIter = taosHashIterate(pJob->execTasks, pIter); + } + + pIter = taosHashIterate(pJob->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SResReadyRsp rsp = {0}; + code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); + printf("code:%d", code); + ASSERT_EQ(code, 0); + pIter = taosHashIterate(pJob->execTasks, pIter); + } + + pIter = taosHashIterate(pJob->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SQueryTableRsp rsp = {0}; + code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); + + ASSERT_EQ(code, 0); + pIter = taosHashIterate(pJob->execTasks, pIter); + } + + pIter = taosHashIterate(pJob->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SResReadyRsp rsp = {0}; + code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); + ASSERT_EQ(code, 0); + + pIter = taosHashIterate(pJob->execTasks, pIter); + } + + pthread_attr_t thattr; + pthread_attr_init(&thattr); + + pthread_t thread1; + pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, &job); + + void *data = NULL; + code = schedulerFetchRows(job, &data); + ASSERT_EQ(code, 0); + + SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data; + ASSERT_EQ(pRsp->completed, 1); + ASSERT_EQ(pRsp->numOfRows, 10); + tfree(data); + + data = NULL; + code = schedulerFetchRows(job, &data); + ASSERT_EQ(code, 0); + ASSERT_TRUE(data == NULL); + + schReleaseJob(job); + + schedulerFreeJob(job); + + schtFreeQueryDag(&dag); + + schedulerDestroy(); +} TEST(insertTest, normalCase) { From 9b4bda4f18273c1b3273fb0c9320512fd0231443 Mon Sep 17 00:00:00 2001 From: dapan Date: Sat, 5 Mar 2022 12:15:22 +0800 Subject: [PATCH 5/7] feature/scheduler --- source/libs/scheduler/src/scheduler.c | 6 +++--- source/libs/scheduler/test/schedulerTests.cpp | 21 ++++++++++++------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index e6a5c285de..a5e419e00b 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -114,18 +114,18 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m case TDMT_VND_FETCH_RSP: case TDMT_VND_DROP_TASK: if (lastMsgType != (msgType - 1)) { - SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%d, rspType:%d", lastMsgType, msgType); + SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType)); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING && SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_PARTIAL_SUCCEED) { - SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%d", SCH_GET_TASK_STATUS(pTask), msgType); + SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%s", SCH_GET_TASK_STATUS(pTask), TMSG_INFO(msgType)); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } break; default: - SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%d", msgType, SCH_GET_TASK_STATUS(pTask)); + SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%d", TMSG_INFO(msgType), SCH_GET_TASK_STATUS(pTask)); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index c974ffebc4..7771fbdf0b 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -113,6 +113,9 @@ void schtBuildQueryDag(SQueryPlan *dag) { mergePlan->pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode)); mergePlan->msgType = TDMT_VND_QUERY; + merge->pNodeList = nodesMakeList(); + scan->pNodeList = nodesMakeList(); + nodesListAppend(merge->pNodeList, (SNode*)mergePlan); nodesListAppend(scan->pNodeList, (SNode*)scanPlan); @@ -170,6 +173,8 @@ void schtBuildInsertDag(SQueryPlan *dag) { insertPlan[1].pDataSink = (SDataSinkNode*)calloc(1, sizeof(SDataSinkNode)); insertPlan[1].msgType = TDMT_VND_SUBMIT; + inserta->pNodeList = nodesMakeList(); + nodesListAppend(inserta->pNodeList, (SNode*)insertPlan); insertPlan += 1; nodesListAppend(inserta->pNodeList, (SNode*)insertPlan); @@ -537,8 +542,6 @@ TEST(queryTest, normalCase) { int64_t job = 0; SQueryPlan dag; - schtInitLogFile(); - SArray *qnodeList = taosArrayInit(1, sizeof(SEp)); SEp qnodeAddr = {0}; @@ -675,7 +678,8 @@ TEST(queryTest, flowCtrlCase) { code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); - pIter = taosHashIterate(pJob->execTasks, pIter); + taosHashCancelIterate(pJob->execTasks, pIter); + pIter = NULL; } pIter = taosHashIterate(pJob->execTasks, NULL); @@ -686,7 +690,8 @@ TEST(queryTest, flowCtrlCase) { code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); printf("code:%d", code); ASSERT_EQ(code, 0); - pIter = taosHashIterate(pJob->execTasks, pIter); + taosHashCancelIterate(pJob->execTasks, pIter); + pIter = NULL; } pIter = taosHashIterate(pJob->execTasks, NULL); @@ -697,7 +702,8 @@ TEST(queryTest, flowCtrlCase) { code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); - pIter = taosHashIterate(pJob->execTasks, pIter); + taosHashCancelIterate(pJob->execTasks, pIter); + pIter = NULL; } pIter = taosHashIterate(pJob->execTasks, NULL); @@ -708,7 +714,8 @@ TEST(queryTest, flowCtrlCase) { code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); - pIter = taosHashIterate(pJob->execTasks, pIter); + taosHashCancelIterate(pJob->execTasks, pIter); + pIter = NULL; } pthread_attr_t thattr; @@ -750,8 +757,6 @@ TEST(insertTest, normalCase) { SQueryPlan dag; uint64_t numOfRows = 0; - schtInitLogFile(); - SArray *qnodeList = taosArrayInit(1, sizeof(SEp)); SEp qnodeAddr = {0}; From 3cab6df16b85e31d4e473d0d77efd176b77f8102 Mon Sep 17 00:00:00 2001 From: dapan Date: Sat, 5 Mar 2022 17:19:21 +0800 Subject: [PATCH 6/7] feature/scheduler --- include/libs/qcom/query.h | 2 +- include/libs/scheduler/scheduler.h | 2 +- source/libs/scheduler/inc/schedulerInt.h | 5 +- source/libs/scheduler/src/schFlowCtrl.c | 22 ++- source/libs/scheduler/src/scheduler.c | 8 +- source/libs/scheduler/test/schedulerTests.cpp | 136 ++++++++++++------ 6 files changed, 122 insertions(+), 53 deletions(-) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 84c246dbf5..5aa3ca1acf 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -134,7 +134,7 @@ typedef struct SQueryNodeAddr { } SQueryNodeAddr; typedef struct SQueryNodeStat { - double tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT + int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT } SQueryNodeStat; int32_t initTaskQueue(); diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 4f0f0b4953..56da9ece6f 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -25,7 +25,7 @@ extern "C" { typedef struct SSchedulerCfg { uint32_t maxJobNum; - double maxNodeTableNum; + int32_t maxNodeTableNum; } SSchedulerCfg; typedef struct SQueryProfileSummary { diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 7813251bfb..2af10a0e2d 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -28,7 +28,8 @@ extern "C" { #define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000 #define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000 -#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 20.0 /* in million */ +#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 20 // unit is TSDB_TABLE_NUM_UNIT + #define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA @@ -78,7 +79,7 @@ typedef struct SSchCallbackParam { typedef struct SSchFlowControl { SRWLatch lock; - double tableNumSum; + int32_t tableNumSum; uint32_t execTaskNum; SArray *taskList; // Element is SQueryTask* } SSchFlowControl; diff --git a/source/libs/scheduler/src/schFlowCtrl.c b/source/libs/scheduler/src/schFlowCtrl.c index 2b3b5e20ae..1dc99354a1 100644 --- a/source/libs/scheduler/src/schFlowCtrl.c +++ b/source/libs/scheduler/src/schFlowCtrl.c @@ -42,10 +42,11 @@ void schFreeFlowCtrl(SSchLevel *pLevel) { int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { if (!SCH_IS_QUERY_JOB(pJob)) { + SCH_JOB_DLOG("job no need flow ctrl, queryJob:%d", SCH_IS_QUERY_JOB(pJob)); return TSDB_CODE_SUCCESS; } - double sum = 0; + int32_t sum = 0; for (int32_t i = 0; i < pLevel->taskNum; ++i) { SSchTask *pTask = taosArrayGet(pLevel->subTasks, i); @@ -54,6 +55,7 @@ int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { } if (sum < schMgmt.cfg.maxNodeTableNum) { + SCH_JOB_DLOG("job no need flow ctrl, totalTableNum:%d", sum); return TSDB_CODE_SUCCESS; } @@ -65,6 +67,8 @@ int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { SCH_SET_JOB_NEED_FLOW_CTRL(pJob); + SCH_JOB_DLOG("job NEED flow ctrl, totalTableNum:%d", sum); + return TSDB_CODE_SUCCESS; } @@ -89,6 +93,9 @@ int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) { --ctrl->execTaskNum; ctrl->tableNumSum -= pTask->plan->execNodeStat.tableNum; + SCH_TASK_DLOG("task quota removed, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", + ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); + _return: SCH_UNLOCK(SCH_WRITE, &ctrl->lock); @@ -100,13 +107,14 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) { SSchLevel *pLevel = pTask->level; int32_t code = 0; SSchFlowControl *ctrl = NULL; + SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); do { - ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, SCH_GET_CUR_EP(&pTask->plan->execNode), sizeof(SEp)); + ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp)); if (NULL == ctrl) { SSchFlowControl nctrl = {.tableNumSum = pTask->plan->execNodeStat.tableNum, .execTaskNum = 1}; - code = taosHashPut(pLevel->flowCtrl, SCH_GET_CUR_EP(&pTask->plan->execNode), sizeof(SEp), &nctrl, sizeof(nctrl)); + code = taosHashPut(pLevel->flowCtrl, ep, sizeof(SEp), &nctrl, sizeof(nctrl)); if (code) { if (HASH_NODE_EXIST(code)) { continue; @@ -116,6 +124,9 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + SCH_TASK_DLOG("task quota added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", + ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, nctrl.tableNumSum, nctrl.execTaskNum); + *enough = true; return TSDB_CODE_SUCCESS; } @@ -130,7 +141,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) { break; } - double sum = pTask->plan->execNodeStat.tableNum + ctrl->tableNumSum; + int32_t sum = pTask->plan->execNodeStat.tableNum + ctrl->tableNumSum; if (sum <= schMgmt.cfg.maxNodeTableNum) { ctrl->tableNumSum = sum; @@ -160,6 +171,9 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) { _return: + SCH_TASK_DLOG("task quota %s added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", + ((*enough)?"":"NOT"), ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); + SCH_UNLOCK(SCH_WRITE, &ctrl->lock); SCH_RET(code); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index a5e419e00b..32cac59b81 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -130,6 +130,8 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + atomic_store_32(&pTask->lastMsgType, -1); + return TSDB_CODE_SUCCESS; } @@ -500,6 +502,8 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) { if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) { SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask)); + } else { + SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks)); } int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES); @@ -735,6 +739,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { bool moved = false; int32_t code = 0; + SCH_TASK_DLOG("taskOnSuccess, status:%d", SCH_GET_TASK_STATUS(pTask)); + SCH_ERR_JRET(schMoveTaskToSuccList(pJob, pTask, &moved)); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED); @@ -1689,7 +1695,7 @@ int32_t schedulerFetchRows(int64_t job, void** pData) { SCH_JOB_ELOG("job failed or dropping, status:%d", status); SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); } else if (status == JOB_TASK_STATUS_SUCCEED) { - SCH_JOB_ELOG("job already succeed, status:%d", status); + SCH_JOB_DLOG("job already succeed, status:%d", status); goto _return; } else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { SCH_ERR_JRET(schFetchFromRemote(pJob)); diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 73060fcf6c..663df5bcd1 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -126,6 +126,68 @@ void schtBuildQueryDag(SQueryPlan *dag) { nodesListAppend(dag->pSubplans, (SNode*)scan); } +void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) { + uint64_t qId = schtQueryId; + int32_t scanPlanNum = 20; + + dag->queryId = qId; + dag->numOfSubplans = 2; + dag->pSubplans = nodesMakeList(); + SNodeListNode *scan = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST); + SNodeListNode *merge = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST); + + SSubplan *scanPlan = (SSubplan *)calloc(scanPlanNum, sizeof(SSubplan)); + SSubplan *mergePlan = (SSubplan *)calloc(1, sizeof(SSubplan)); + + merge->pNodeList = nodesMakeList(); + scan->pNodeList = nodesMakeList(); + + mergePlan->pChildren = nodesMakeList(); + + for (int32_t i = 0; i < scanPlanNum; ++i) { + scanPlan[i].id.queryId = qId; + scanPlan[i].id.templateId = 0x0000000000000002; + scanPlan[i].id.subplanId = 0x0000000000000003 + i; + scanPlan[i].subplanType = SUBPLAN_TYPE_SCAN; + + scanPlan[i].execNode.nodeId = 1 + i; + scanPlan[i].execNode.epset.inUse = 0; + scanPlan[i].execNodeStat.tableNum = rand() % 30; + addEpIntoEpSet(&scanPlan[i].execNode.epset, "ep0", 6030); + addEpIntoEpSet(&scanPlan[i].execNode.epset, "ep1", 6030); + addEpIntoEpSet(&scanPlan[i].execNode.epset, "ep2", 6030); + scanPlan[i].execNode.epset.inUse = rand() % 3; + + scanPlan[i].pChildren = NULL; + scanPlan[i].level = 1; + scanPlan[i].pParents = nodesMakeList(); + scanPlan[i].pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode)); + scanPlan[i].msgType = TDMT_VND_QUERY; + + nodesListAppend(scanPlan[i].pParents, (SNode*)mergePlan); + nodesListAppend(mergePlan->pChildren, (SNode*)(scanPlan + i)); + + nodesListAppend(scan->pNodeList, (SNode*)(scanPlan + i)); + } + + mergePlan->id.queryId = qId; + mergePlan->id.templateId = schtMergeTemplateId; + mergePlan->id.subplanId = 0x5555; + mergePlan->subplanType = SUBPLAN_TYPE_MERGE; + mergePlan->level = 0; + mergePlan->execNode.epset.numOfEps = 0; + + mergePlan->pParents = NULL; + mergePlan->pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode)); + mergePlan->msgType = TDMT_VND_QUERY; + + nodesListAppend(merge->pNodeList, (SNode*)mergePlan); + + nodesListAppend(dag->pSubplans, (SNode*)merge); + nodesListAppend(dag->pSubplans, (SNode*)scan); +} + + void schtFreeQueryDag(SQueryPlan *dag) { } @@ -650,6 +712,8 @@ TEST(queryTest, flowCtrlCase) { schtInitLogFile(); + srand(time(NULL)); + SArray *qnodeList = taosArrayInit(1, sizeof(SEp)); SEp qnodeAddr = {0}; @@ -660,7 +724,7 @@ TEST(queryTest, flowCtrlCase) { int32_t code = schedulerInit(NULL); ASSERT_EQ(code, 0); - schtBuildQueryDag(&dag); + schtBuildQueryFlowCtrlDag(&dag); schtSetPlanToString(); schtSetExecNode(); @@ -671,54 +735,38 @@ TEST(queryTest, flowCtrlCase) { SSchJob *pJob = schAcquireJob(job); + + bool queryDone = false; - void *pIter = taosHashIterate(pJob->execTasks, NULL); - while (pIter) { - SSchTask *task = *(SSchTask **)pIter; - - SQueryTableRsp rsp = {0}; - code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); + while (!queryDone) { + void *pIter = taosHashIterate(pJob->execTasks, NULL); + if (NULL == pIter) { + break; + } - ASSERT_EQ(code, 0); - taosHashCancelIterate(pJob->execTasks, pIter); - pIter = NULL; - } + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; - pIter = taosHashIterate(pJob->execTasks, NULL); - while (pIter) { - SSchTask *task = *(SSchTask **)pIter; + taosHashCancelIterate(pJob->execTasks, pIter); - SResReadyRsp rsp = {0}; - code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); - printf("code:%d", code); - ASSERT_EQ(code, 0); - taosHashCancelIterate(pJob->execTasks, pIter); - pIter = NULL; - } + if (task->lastMsgType == TDMT_VND_QUERY) { + SQueryTableRsp rsp = {0}; + code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); + + ASSERT_EQ(code, 0); + } else if (task->lastMsgType == TDMT_VND_RES_READY) { + SResReadyRsp rsp = {0}; + code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); + ASSERT_EQ(code, 0); + } else { + queryDone = true; + break; + } + + pIter = NULL; + } + } - pIter = taosHashIterate(pJob->execTasks, NULL); - while (pIter) { - SSchTask *task = *(SSchTask **)pIter; - - SQueryTableRsp rsp = {0}; - code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); - - ASSERT_EQ(code, 0); - taosHashCancelIterate(pJob->execTasks, pIter); - pIter = NULL; - } - - pIter = taosHashIterate(pJob->execTasks, NULL); - while (pIter) { - SSchTask *task = *(SSchTask **)pIter; - - SResReadyRsp rsp = {0}; - code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); - ASSERT_EQ(code, 0); - - taosHashCancelIterate(pJob->execTasks, pIter); - pIter = NULL; - } pthread_attr_t thattr; pthread_attr_init(&thattr); From d781ea842ed44f702ec2f515607c7d4cd635f4fe Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 7 Mar 2022 11:14:15 +0800 Subject: [PATCH 7/7] feature/scheduler --- include/libs/catalog/catalog.h | 2 +- source/libs/catalog/src/catalog.c | 50 +++++++++---- source/libs/parser/src/astTranslate.c | 2 + source/libs/scheduler/inc/schedulerInt.h | 4 +- source/libs/scheduler/src/schFlowCtrl.c | 93 +++++++++++++++++++----- source/libs/scheduler/src/scheduler.c | 1 + 6 files changed, 119 insertions(+), 33 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 4e3377a4ce..8db7d34d3e 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -95,7 +95,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle); */ void catalogFreeHandle(SCatalog* pCatalog); -int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId); +int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t *tableNum); /** * Get a DB's all vgroup info. diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index f857ea95c3..6127c587ec 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -965,7 +965,7 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName CTG_RET(code); } -int32_t ctgStbVersionCompare(const void* key1, const void* key2) { +int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2) { if (*(uint64_t *)key1 < ((SSTableMetaVersion*)key2)->suid) { return -1; } else if (*(uint64_t *)key1 > ((SSTableMetaVersion*)key2)->suid) { @@ -975,7 +975,7 @@ int32_t ctgStbVersionCompare(const void* key1, const void* key2) { } } -int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) { +int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2) { if (*(int64_t *)key1 < ((SDbVgVersion*)key2)->dbId) { return -1; } else if (*(int64_t *)key1 > ((SDbVgVersion*)key2)->dbId) { @@ -985,6 +985,27 @@ int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) { } } +int32_t ctgStbVersionSortCompare(const void* key1, const void* key2) { + if (((SSTableMetaVersion*)key1)->suid < ((SSTableMetaVersion*)key2)->suid) { + return -1; + } else if (((SSTableMetaVersion*)key1)->suid > ((SSTableMetaVersion*)key2)->suid) { + return 1; + } else { + return 0; + } +} + +int32_t ctgDbVgVersionSortCompare(const void* key1, const void* key2) { + if (((SDbVgVersion*)key1)->dbId < ((SDbVgVersion*)key2)->dbId) { + return -1; + } else if (((SDbVgVersion*)key1)->dbId > ((SDbVgVersion*)key2)->dbId) { + return 1; + } else { + return 0; + } +} + + int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) { mgmt->slotRIdx = 0; mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND; @@ -1034,7 +1055,7 @@ _return: CTG_RET(code); } -int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t compare) { +int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare, __compar_fn_t searchCompare) { int16_t widx = abs(id % mgmt->slotNum); SCtgRentSlot *slot = &mgmt->slots[widx]; @@ -1048,12 +1069,12 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si if (slot->needSort) { qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta)); - taosArraySort(slot->meta, compare); + taosArraySort(slot->meta, sortCompare); slot->needSort = false; qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta)); } - void *orig = taosArraySearch(slot->meta, &id, compare, TD_EQ); + void *orig = taosArraySearch(slot->meta, &id, searchCompare, TD_EQ); if (NULL == orig) { qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta)); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); @@ -1075,7 +1096,7 @@ _return: CTG_RET(code); } -int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t compare) { +int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortCompare, __compar_fn_t searchCompare) { int16_t widx = abs(id % mgmt->slotNum); SCtgRentSlot *slot = &mgmt->slots[widx]; @@ -1088,12 +1109,12 @@ int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t compare) } if (slot->needSort) { - taosArraySort(slot->meta, compare); + taosArraySort(slot->meta, sortCompare); slot->needSort = false; qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type); } - int32_t idx = taosArraySearchIdx(slot->meta, &id, compare, TD_EQ); + int32_t idx = taosArraySearchIdx(slot->meta, &id, searchCompare, TD_EQ); if (idx < 0) { qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); @@ -1240,7 +1261,7 @@ void ctgRemoveStbRent(SCatalog* pCtg, SCtgTbMetaCache *cache) { uint64_t *suid = NULL; taosHashGetKey(pIter, (void **)&suid, NULL); - if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionCompare)) { + if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) { ctgDebug("stb removed from rent, suid:%"PRIx64, *suid); } @@ -1264,7 +1285,7 @@ int32_t ctgRemoveDB(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) { ctgInfo("db removed from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId); - CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbCache->dbId, ctgDbVgVersionCompare)); + CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbCache->dbId, ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare)); ctgDebug("db removed from rent, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId); @@ -1372,7 +1393,7 @@ int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SD dbCache = NULL; strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName)); - CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare)); + CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare)); CTG_RET(code); } @@ -1405,7 +1426,7 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid); - ctgMetaRentRemove(&pCtg->stbRent, orig->suid, ctgStbVersionCompare); + ctgMetaRentRemove(&pCtg->stbRent, orig->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare); } origSuid = orig->suid; @@ -1932,7 +1953,7 @@ int32_t ctgActRemoveStb(SCtgMetaAction *action) { ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid); - CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgStbVersionCompare)); + CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)); ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid); @@ -2171,7 +2192,7 @@ void catalogFreeHandle(SCatalog* pCtg) { ctgInfo("handle freed, culsterId:%"PRIx64, clusterId); } -int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId) { +int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t *tableNum) { CTG_API_ENTER(); if (NULL == pCtg || NULL == dbFName || NULL == version || NULL == dbId) { @@ -2202,6 +2223,7 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers *version = dbCache->vgInfo->vgVersion; *dbId = dbCache->dbId; + *tableNum = dbCache->vgInfo->numOfTable; ctgReleaseVgInfo(dbCache); ctgReleaseDBCache(pCtg, dbCache); diff --git a/source/libs/parser/src/astTranslate.c b/source/libs/parser/src/astTranslate.c index be5cf62a3c..b9d9835c88 100644 --- a/source/libs/parser/src/astTranslate.c +++ b/source/libs/parser/src/astTranslate.c @@ -863,6 +863,8 @@ static int32_t translateUseDatabase(STranslateContext* pCxt, SUseDatabaseStmt* p SUseDbReq usedbReq = {0}; tNameExtractFullName(&name, usedbReq.db); + catalogGetDBVgVersion(pCxt->pParseCxt->pCatalog, usedbReq.db, &usedbReq.vgVersion, &usedbReq.dbId, &usedbReq.numOfTable); + pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo)); if (NULL== pCxt->pCmdMsg) { return TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 2af10a0e2d..12c4f824ac 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -79,9 +79,10 @@ typedef struct SSchCallbackParam { typedef struct SSchFlowControl { SRWLatch lock; + bool sorted; int32_t tableNumSum; uint32_t execTaskNum; - SArray *taskList; // Element is SQueryTask* + SArray *taskList; // Element is SSchTask* } SSchFlowControl; typedef struct SSchLevel { @@ -207,6 +208,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough); int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask); int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask); int32_t schFetchFromRemote(SSchJob *pJob); +int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode); #ifdef __cplusplus diff --git a/source/libs/scheduler/src/schFlowCtrl.c b/source/libs/scheduler/src/schFlowCtrl.c index 1dc99354a1..9fba6523b6 100644 --- a/source/libs/scheduler/src/schFlowCtrl.c +++ b/source/libs/scheduler/src/schFlowCtrl.c @@ -165,6 +165,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) { } *enough = false; + ctrl->sorted = false; break; } while (true); @@ -179,31 +180,89 @@ _return: SCH_RET(code); } +int32_t schTaskTableNumCompare(const void* key1, const void* key2) { + SSchTask *pTask1 = *(SSchTask **)key1; + SSchTask *pTask2 = *(SSchTask **)key2; + + if (pTask1->plan->execNodeStat.tableNum < pTask2->plan->execNodeStat.tableNum) { + return 1; + } else if (pTask1->plan->execNodeStat.tableNum > pTask2->plan->execNodeStat.tableNum) { + return -1; + } else { + return 0; + } +} + + int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) { - do { - SCH_LOCK(SCH_WRITE, &ctrl->lock); - - if (NULL == ctrl->taskList || taosArrayGetSize(ctrl->taskList) <= 0) { - SCH_UNLOCK(SCH_WRITE, &ctrl->lock); - return TSDB_CODE_SUCCESS; - } - - SSchTask *pTask = *(SSchTask **)taosArrayPop(ctrl->taskList); - + SCH_LOCK(SCH_WRITE, &ctrl->lock); + + if (NULL == ctrl->taskList || taosArrayGetSize(ctrl->taskList) <= 0) { SCH_UNLOCK(SCH_WRITE, &ctrl->lock); + return TSDB_CODE_SUCCESS; + } - bool enough = false; - SCH_ERR_RET(schCheckIncTaskFlowQuota(pJob, pTask, &enough)); + int32_t remainNum = schMgmt.cfg.maxNodeTableNum - ctrl->tableNumSum; + int32_t taskNum = taosArrayGetSize(ctrl->taskList); + int32_t code = 0; + SSchTask *pTask = NULL; + + if (taskNum > 1 && !ctrl->sorted) { + taosArraySort(ctrl->taskList, schTaskTableNumCompare); // desc order + } + + for (int32_t i = 0; i < taskNum; ++i) { + pTask = *(SSchTask **)taosArrayGet(ctrl->taskList, i); + SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); + + if (pTask->plan->execNodeStat.tableNum > remainNum && ctrl->execTaskNum > 0) { + SCH_TASK_DLOG("task NOT to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", + ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); - if (enough) { - SCH_ERR_RET(schLaunchTaskImpl(pJob, pTask)); continue; } + + ctrl->tableNumSum += pTask->plan->execNodeStat.tableNum; + ++ctrl->execTaskNum; - break; - } while (true); + taosArrayRemove(ctrl->taskList, i); + + SCH_TASK_DLOG("task to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", + ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); + + SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask)); + + remainNum -= pTask->plan->execNodeStat.tableNum; + if (remainNum <= 0) { + SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d", + ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum); + + break; + } - return TSDB_CODE_SUCCESS; + if (i < (taskNum - 1)) { + SSchTask *pLastTask = *(SSchTask **)taosArrayGetLast(ctrl->taskList); + if (remainNum < pLastTask->plan->execNodeStat.tableNum) { + SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d, smallestInList:%d", + ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum, pLastTask->plan->execNodeStat.tableNum); + + break; + } + } + + --i; + --taskNum; + } + +_return: + + SCH_UNLOCK(SCH_WRITE, &ctrl->lock); + + if (code) { + code = schProcessOnTaskFailure(pJob, pTask, code); + } + + SCH_RET(code); } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 32cac59b81..fe886dfcdb 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -608,6 +608,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask)); + SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask)); } if (SCH_IS_DATA_SRC_TASK(pTask)) {