From 99d602b809f40b0654367366a2ab524e1a54772e Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 7 Jan 2022 16:12:15 +0800 Subject: [PATCH 1/5] schduler add debug info --- include/libs/scheduler/scheduler.h | 2 +- source/libs/scheduler/inc/schedulerInt.h | 48 +++---- source/libs/scheduler/src/scheduler.c | 155 ++++++++++++----------- 3 files changed, 109 insertions(+), 96 deletions(-) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index b2ba7acebf..6d7c5bdd01 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -24,7 +24,7 @@ extern "C" { #include "catalog.h" typedef struct SSchedulerCfg { - int32_t maxJobNum; + uint32_t maxJobNum; } SSchedulerCfg; typedef struct SQueryProfileSummary { diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index a7ec39bfde..100e9fb2cf 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -37,10 +37,10 @@ enum { }; typedef struct SSchedulerMgmt { - uint64_t taskId; - uint64_t sId; + uint64_t taskId; // sequential taksId + uint64_t sId; // schedulerId SSchedulerCfg cfg; - SHashObj *jobs; // key: queryId, value: SQueryJob* + SHashObj *jobs; // key: queryId, value: SQueryJob* } SSchedulerMgmt; typedef struct SSchCallbackParam { @@ -83,30 +83,29 @@ typedef struct SSchJobAttr { typedef struct SSchJob { uint64_t queryId; - int32_t levelNum; - int32_t levelIdx; - int8_t status; SSchJobAttr attr; - SEpSet dataSrcEps; - SEpAddr resEp; + + int32_t levelNum; void *transport; SArray *nodeList; // qnode/vnode list, element is SQueryNodeAddr - tsem_t rspSem; - int32_t userFetch; - int32_t remoteFetch; - SSchTask *fetchTask; - - int32_t errCode; - void *res; - int32_t resNumOfRows; + SArray *levels; // Element is SQueryLevel, starting from 0. SArray + SArray *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler + int32_t levelIdx; + SEpSet dataSrcEps; SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask* - SArray *levels; // Element is SQueryLevel, starting from 0. SArray - SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. SArray - + int8_t status; + SEpAddr resEp; + tsem_t rspSem; + int32_t userFetch; + int32_t remoteFetch; + SSchTask *fetchTask; + int32_t errCode; + void *res; + int32_t resNumOfRows; SQueryProfileSummary summary; } SSchJob; @@ -115,12 +114,17 @@ typedef struct SSchJob { #define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN) #define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY) -#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__) -#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) +#define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != QUERY_TYPE_MODIFY) +#define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob) + +#define SCH_JOB_ELOG(param, ...) qError("QID:%"PRIx64" " param, pJob->queryId, __VA_ARGS__) +#define SCH_JOB_DLOG(param, ...) qDebug("QID:%"PRIx64" " param, pJob->queryId, __VA_ARGS__) + +#define SCH_TASK_ELOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__) +#define SCH_TASK_DLOG(param, ...) qDebug("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__) #define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) -#define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0) #define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) #define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock)) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index df4f121773..0ce2d4af4a 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -112,87 +112,87 @@ static void cleanupTask(SSchTask* pTask) { taosArrayDestroy(pTask->candidateAddrs); } -int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *pJob) { +int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) { int32_t code = 0; - pJob->queryId = dag->queryId; + job->queryId = dag->queryId; if (dag->numOfSubplans <= 0) { - qError("invalid subplan num:%d", dag->numOfSubplans); + SCH_JOB_ELOG("invalid subplan num:%d", dag->numOfSubplans); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans); if (levelNum <= 0) { - qError("invalid level num:%d", levelNum); + SCH_JOB_ELOG("invalid level num:%d", levelNum); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_TASK_NUMBER, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); if (NULL == planToTask) { - qError("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER); + SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel)); - if (NULL == pJob->levels) { - qError("taosArrayInit %d failed", levelNum); + job->levels = taosArrayInit(levelNum, sizeof(SSchLevel)); + if (NULL == job->levels) { + SCH_JOB_ELOG("taosArrayInit %d failed", levelNum); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - //?? - pJob->attr.needFetch = true; + job->attr.needFetch = true; - pJob->levelNum = levelNum; - pJob->levelIdx = levelNum - 1; + job->levelNum = levelNum; + job->levelIdx = levelNum - 1; - pJob->subPlans = dag->pSubplans; + job->subPlans = dag->pSubplans; SSchLevel level = {0}; - SArray *levelPlans = NULL; - int32_t levelPlanNum = 0; + SArray *plans = NULL; + int32_t taskNum = 0; SSchLevel *pLevel = NULL; level.status = JOB_TASK_STATUS_NOT_START; for (int32_t i = 0; i < levelNum; ++i) { - if (NULL == taosArrayPush(pJob->levels, &level)) { - qError("taosArrayPush failed"); + if (NULL == taosArrayPush(job->levels, &level)) { + SCH_JOB_ELOG("taosArrayPush level failed, level:%d", i); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - pLevel = taosArrayGet(pJob->levels, i); + pLevel = taosArrayGet(job->levels, i); pLevel->level = i; - levelPlans = taosArrayGetP(dag->pSubplans, i); - if (NULL == levelPlans) { - qError("no level plans for level %d", i); - SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); - } - - levelPlanNum = (int32_t)taosArrayGetSize(levelPlans); - if (levelPlanNum <= 0) { - qError("invalid level plans number:%d, level:%d", levelPlanNum, i); - SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); - } - - pLevel->taskNum = levelPlanNum; - pLevel->subTasks = taosArrayInit(levelPlanNum, sizeof(SSchTask)); + plans = taosArrayGetP(dag->pSubplans, i); + if (NULL == plans) { + SCH_JOB_ELOG("empty level plan, level:%d", i); + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } + + taskNum = (int32_t)taosArrayGetSize(plans); + if (taskNum <= 0) { + SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i); + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } + + pLevel->taskNum = taskNum; + + pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask)); if (NULL == pLevel->subTasks) { - qError("taosArrayInit %d failed", levelPlanNum); + SCH_JOB_ELOG("taosArrayInit %d failed", taskNum); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - for (int32_t n = 0; n < levelPlanNum; ++n) { - SSubplan *plan = taosArrayGetP(levelPlans, n); + for (int32_t n = 0; n < taskNum; ++n) { + SSubplan *plan = taosArrayGetP(plans, n); if (plan->type == QUERY_TYPE_MODIFY) { - pJob->attr.needFetch = false; + job->attr.needFetch = false; } else { - pJob->attr.queryJob = true; + job->attr.queryJob = true; } - SSchTask task = initTask(pJob, plan, pLevel); + SSchTask task = initTask(job, plan, pLevel); void *p = taosArrayPush(pLevel->subTasks, &task); if (NULL == p) { qError("taosArrayPush failed"); @@ -206,7 +206,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *pJob) { } } - SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask)); + SCH_ERR_JRET(schBuildTaskRalation(job, planToTask)); if (planToTask) { taosHashCleanup(planToTask); @@ -384,7 +384,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved)); if (!moved) { - SCH_TASK_ERR_LOG(" task may already moved, status:%d", task->status); + SCH_TASK_ELOG(" task may already moved, status:%d", task->status); return TSDB_CODE_SUCCESS; } @@ -458,11 +458,11 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry)); if (!needRetry) { - SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode); + SCH_TASK_ELOG("task failed[%x], no more retry", errCode); SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved)); if (!moved) { - SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status); + SCH_TASK_ELOG("task may already moved, status:%d", task->status); } if (SCH_TASK_NEED_WAIT_ALL(task)) { @@ -825,7 +825,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) { SCH_ERR_RET(schSetTaskCandidateAddrs(job, task)); if (NULL == task->candidateAddrs || taosArrayGetSize(task->candidateAddrs) <= 0) { - SCH_TASK_ERR_LOG("no valid candidate node for task:%"PRIx64, task->taskId); + SCH_TASK_ELOG("no valid candidate node for task:%"PRIx64, task->taskId); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } @@ -888,41 +888,16 @@ uint64_t schGenSchId(void) { } -int32_t schedulerInit(SSchedulerCfg *cfg) { - if (schMgmt.jobs) { - qError("scheduler already init"); - return TSDB_CODE_QRY_INVALID_INPUT; - } - if (cfg) { - schMgmt.cfg = *cfg; - - if (schMgmt.cfg.maxJobNum <= 0) { - schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; - } - } else { - schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; - } - - schMgmt.jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - if (NULL == schMgmt.jobs) { - SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum); - } - - schMgmt.sId = schGenSchId(); - - return TSDB_CODE_SUCCESS; -} - - -int32_t scheduleExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) { +int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) { if (nodeList && taosArrayGetSize(nodeList) <= 0) { - qInfo("qnodeList is empty"); + qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId); } int32_t code = 0; SSchJob *job = calloc(1, sizeof(SSchJob)); if (NULL == job) { + qError("QID:%"PRIx64" calloc %d failed", sizeof(SSchJob)); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -975,18 +950,52 @@ int32_t scheduleExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, return TSDB_CODE_SUCCESS; _return: + *(SSchJob **)pJob = NULL; scheduleFreeJob(job); SCH_RET(code); } + +int32_t schedulerInit(SSchedulerCfg *cfg) { + if (schMgmt.jobs) { + qError("scheduler already initialized"); + return TSDB_CODE_QRY_INVALID_INPUT; + } + + if (cfg) { + schMgmt.cfg = *cfg; + + if (schMgmt.cfg.maxJobNum == 0) { + schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; + } + } else { + schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; + } + + schMgmt.jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == schMgmt.jobs) { + qError("init schduler jobs failed, num:%u", schMgmt.cfg.maxJobNum); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + if (taosGetSystemUUID(&schMgmt.sId, sizeof(schMgmt.sId))) { + qError("generate schdulerId failed, errno:%d", errno); + SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR); + } + + qInfo("scheduler %"PRIx64" initizlized, maxJob:%u", schMgmt.cfg.maxJobNum); + + return TSDB_CODE_SUCCESS; +} + int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, SQueryResult *pRes) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SCH_ERR_RET(scheduleExecJobImpl(transport, nodeList, pDag, pJob, true)); + SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, true)); SSchJob *job = *(SSchJob **)pJob; @@ -1001,7 +1010,7 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - return scheduleExecJobImpl(transport, nodeList, pDag, pJob, false); + return schExecJobImpl(transport, nodeList, pDag, pJob, false); } From 3edb51d1bc822af4e998579ab63c94dd4bb578eb Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 7 Jan 2022 16:50:48 +0800 Subject: [PATCH 2/5] feature/qnode --- source/libs/scheduler/inc/schedulerInt.h | 1 - source/libs/scheduler/src/scheduler.c | 272 +++++++++++------------ 2 files changed, 126 insertions(+), 147 deletions(-) diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 100e9fb2cf..0982411cf5 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -84,7 +84,6 @@ typedef struct SSchJobAttr { typedef struct SSchJob { uint64_t queryId; SSchJobAttr attr; - int32_t levelNum; void *transport; SArray *nodeList; // qnode/vnode list, element is SQueryNodeAddr diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 0ce2d4af4a..300f7154a4 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -20,109 +20,106 @@ static SSchedulerMgmt schMgmt = {0}; -int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) { - for (int32_t i = 0; i < job->levelNum; ++i) { - SSchLevel *level = taosArrayGet(job->levels, i); +static int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { + for (int32_t i = 0; i < pJob->levelNum; ++i) { + SSchLevel *pLevel = taosArrayGet(pJob->levels, i); - for (int32_t m = 0; m < level->taskNum; ++m) { - SSchTask *task = taosArrayGet(level->subTasks, m); - SSubplan *plan = task->plan; - int32_t childNum = plan->pChildren ? (int32_t)taosArrayGetSize(plan->pChildren) : 0; - int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0; + for (int32_t m = 0; m < pLevel->taskNum; ++m) { + SSchTask *pTask = taosArrayGet(pLevel->subTasks, m); + SSubplan *pPlan = pTask->plan; + int32_t childNum = pPlan->pChildren ? (int32_t)taosArrayGetSize(pPlan->pChildren) : 0; + int32_t parentNum = pPlan->pParents ? (int32_t)taosArrayGetSize(pPlan->pParents) : 0; if (childNum > 0) { - task->children = taosArrayInit(childNum, POINTER_BYTES); - if (NULL == task->children) { - qError("taosArrayInit %d failed", childNum); + pTask->children = taosArrayInit(childNum, POINTER_BYTES); + if (NULL == pTask->children) { + SCH_TASK_ELOG("taosArrayInit %d children failed", childNum); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } } for (int32_t n = 0; n < childNum; ++n) { - SSubplan **child = taosArrayGet(plan->pChildren, n); + SSubplan **child = taosArrayGet(pPlan->pChildren, n); SSchTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES); if (NULL == childTask || NULL == *childTask) { - qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); + SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - if (NULL == taosArrayPush(task->children, childTask)) { - qError("taosArrayPush failed"); + if (NULL == taosArrayPush(pTask->children, childTask)) { + SCH_TASK_ELOG("taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } } if (parentNum > 0) { - task->parents = taosArrayInit(parentNum, POINTER_BYTES); - if (NULL == task->parents) { - qError("taosArrayInit %d failed", parentNum); + pTask->parents = taosArrayInit(parentNum, POINTER_BYTES); + if (NULL == pTask->parents) { + SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } } for (int32_t n = 0; n < parentNum; ++n) { - SSubplan **parent = taosArrayGet(plan->pParents, n); + SSubplan **parent = taosArrayGet(pPlan->pParents, n); SSchTask **parentTask = taosHashGet(planToTask, parent, POINTER_BYTES); if (NULL == parentTask || NULL == *parentTask) { - qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); + SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - if (NULL == taosArrayPush(task->parents, parentTask)) { - qError("taosArrayPush failed"); + if (NULL == taosArrayPush(pTask->parents, parentTask)) { + SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - } + } + + SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum); } } - SSchLevel *level = taosArrayGet(job->levels, 0); - if (job->attr.queryJob && level->taskNum > 1) { - qError("invalid plan info, level 0, taskNum:%d", level->taskNum); + SSchLevel *pLevel = taosArrayGet(pJob->levels, 0); + if (pJob->attr.queryJob && pLevel->taskNum > 1) { + SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - SSchTask *task = taosArrayGet(level->subTasks, 0); - if (task->parents && taosArrayGetSize(task->parents) > 0) { - qError("invalid plan info, level 0, parentNum:%d", (int32_t)taosArrayGetSize(task->parents)); - SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + for (int32_t i = 0; i < pLevel->taskNum; ++i) { + SSchTask *pTask = taosArrayGet(pLevel->subTasks, i); + + if (pTask->parents && taosArrayGetSize(pTask->parents) > 0) { + SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", (int32_t)taosArrayGetSize(pTask->parents)); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } } - return TSDB_CODE_SUCCESS; } -static SSchTask initTask(SSchJob* pJob, SSubplan* plan, SSchLevel *pLevel) { - SSchTask task = {0}; - if (plan->type == QUERY_TYPE_MODIFY) { - pJob->attr.needFetch = false; - } else { - pJob->attr.queryJob = true; - } +static int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) { + pTask->plan = pPlan; + pTask->level = pLevel; + pTask->status = JOB_TASK_STATUS_NOT_START; + pTask->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); - task.plan = plan; - task.level = pLevel; - task.status = JOB_TASK_STATUS_NOT_START; - task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); - - return task; + return TSDB_CODE_SUCCESS; } -static void cleanupTask(SSchTask* pTask) { +static void schFreeTask(SSchTask* pTask) { taosArrayDestroy(pTask->candidateAddrs); } -int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) { +static int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) { int32_t code = 0; - job->queryId = dag->queryId; + pJob->queryId = pDag->queryId; - if (dag->numOfSubplans <= 0) { - SCH_JOB_ELOG("invalid subplan num:%d", dag->numOfSubplans); + if (pDag->numOfSubplans <= 0) { + SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans); + int32_t levelNum = (int32_t)taosArrayGetSize(pDag->pSubplans); if (levelNum <= 0) { SCH_JOB_ELOG("invalid level num:%d", levelNum); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -134,18 +131,16 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - job->levels = taosArrayInit(levelNum, sizeof(SSchLevel)); - if (NULL == job->levels) { + pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel)); + if (NULL == pJob->levels) { SCH_JOB_ELOG("taosArrayInit %d failed", levelNum); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - job->attr.needFetch = true; + pJob->levelNum = levelNum; + pJob->levelIdx = levelNum - 1; - job->levelNum = levelNum; - job->levelIdx = levelNum - 1; - - job->subPlans = dag->pSubplans; + pJob->subPlans = pDag->pSubplans; SSchLevel level = {0}; SArray *plans = NULL; @@ -155,16 +150,16 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) { level.status = JOB_TASK_STATUS_NOT_START; for (int32_t i = 0; i < levelNum; ++i) { - if (NULL == taosArrayPush(job->levels, &level)) { + if (NULL == taosArrayPush(pJob->levels, &level)) { SCH_JOB_ELOG("taosArrayPush level failed, level:%d", i); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - pLevel = taosArrayGet(job->levels, i); + pLevel = taosArrayGet(pJob->levels, i); pLevel->level = i; - plans = taosArrayGetP(dag->pSubplans, i); + plans = taosArrayGetP(pDag->pSubplans, i); if (NULL == plans) { SCH_JOB_ELOG("empty level plan, level:%d", i); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); @@ -186,38 +181,34 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) { for (int32_t n = 0; n < taskNum; ++n) { SSubplan *plan = taosArrayGetP(plans, n); - if (plan->type == QUERY_TYPE_MODIFY) { - job->attr.needFetch = false; - } else { - job->attr.queryJob = true; - } - SSchTask task = initTask(job, plan, pLevel); + SCH_SET_JOB_TYPE(&pJob->attr, plan->type); + + SSchTask task = {0}; + SSchTask *pTask = &task; + + schInitTask(pJob, &task, plan, pLevel); + void *p = taosArrayPush(pLevel->subTasks, &task); if (NULL == p) { - qError("taosArrayPush failed"); + SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) { - qError("taosHashPut failed"); + SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + + SCH_TASK_DLOG("task initialized, level:%d", pLevel->level); } + + SCH_JOB_DLOG("level initialized, taskNum:%d", taskNum); } - SCH_ERR_JRET(schBuildTaskRalation(job, planToTask)); - - if (planToTask) { - taosHashCleanup(planToTask); - } - - return TSDB_CODE_SUCCESS; + SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask)); _return: - if (pLevel->subTasks) { - taosArrayDestroy(pLevel->subTasks); - } if (planToTask) { taosHashCleanup(planToTask); @@ -226,7 +217,7 @@ _return: SCH_RET(code); } -int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) { +static int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) { if (task->candidateAddrs) { return TSDB_CODE_SUCCESS; } @@ -273,7 +264,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) { return TSDB_CODE_SUCCESS; } -int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { +static int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { if (0 != taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) { qError("failed to add new task, taskId:0x%"PRIx64", reqId:0x"PRIx64", out of memory", pJob->queryId); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -284,7 +275,7 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) { +static int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) { if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { qError("remove task taskId:0x%"PRIx64" from execTasks failed, reqId:0x%"PRIx64, task->taskId, job->queryId); return TSDB_CODE_SUCCESS; @@ -300,7 +291,7 @@ int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) { return TSDB_CODE_SUCCESS; } -int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { +static int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { qWarn("remove task[%"PRIx64"] from execTasks failed, it may not exist", task->taskId); } @@ -315,7 +306,7 @@ int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { return TSDB_CODE_SUCCESS; } -int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) { +static int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) { // TODO set retry or not based on task type/errCode/retry times/job status/available eps... // TODO if needRetry, set task retry info @@ -325,7 +316,7 @@ int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, b } -int32_t schFetchFromRemote(SSchJob *job) { +static int32_t schFetchFromRemote(SSchJob *job) { int32_t code = 0; if (atomic_val_compare_exchange_32(&job->remoteFetch, 0, 1) != 0) { @@ -344,11 +335,11 @@ _return: } -int32_t schProcessOnJobPartialSuccess(SSchJob *job) { +static int32_t schProcessOnJobPartialSuccess(SSchJob *job) { job->status = JOB_TASK_STATUS_PARTIAL_SUCCEED; bool needFetch = job->userFetch; - if ((!job->attr.needFetch) && job->attr.syncSchedule) { + if ((!SCH_JOB_NEED_FETCH(&job->attr)) && job->attr.syncSchedule) { tsem_post(&job->rspSem); } @@ -359,27 +350,27 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *job) { return TSDB_CODE_SUCCESS; } -int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) { +static int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) { job->status = JOB_TASK_STATUS_FAILED; job->errCode = errCode; atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); - if (job->userFetch || ((!job->attr.needFetch) && job->attr.syncSchedule)) { + if (job->userFetch || ((!SCH_JOB_NEED_FETCH(&job->attr)) && job->attr.syncSchedule)) { tsem_post(&job->rspSem); } return TSDB_CODE_SUCCESS; } -int32_t schProcessOnDataFetched(SSchJob *job) { +static int32_t schProcessOnDataFetched(SSchJob *job) { atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); tsem_post(&job->rspSem); } -int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { +static int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { bool moved = false; SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved)); @@ -451,7 +442,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { return TSDB_CODE_SUCCESS; } -int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { +static int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { bool needRetry = false; bool moved = false; int32_t taskDone = 0; @@ -488,7 +479,7 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { return TSDB_CODE_SUCCESS; } -int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { +static int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t code = 0; switch (msgType) { @@ -576,7 +567,7 @@ _return: } -int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) { +static int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) { int32_t code = 0; SSchCallbackParam *pParam = (SSchCallbackParam *)param; @@ -602,32 +593,32 @@ _return: SCH_RET(code); } -int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) { +static int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code); } -int32_t schHandleCreateTableCallback(void* param, const SDataBuf* pMsg, int32_t code) { +static int32_t schHandleCreateTableCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code); } -int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) { +static int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code); } -int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) { +static int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code); } -int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) { +static int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code); } -int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) { +static int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) { SSchCallbackParam *pParam = (SSchCallbackParam *)param; qDebug("drop task rsp received, queryId:%"PRIx64 ",taksId:%"PRIx64 ",code:%d", pParam->queryId, pParam->taskId, code); } -int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { +static int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { switch (msgType) { case TDMT_VND_CREATE_TABLE: *fp = schHandleCreateTableCallback; @@ -656,7 +647,7 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { } -int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) { +static int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) { int32_t code = 0; SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { @@ -695,7 +686,7 @@ _return: SCH_RET(code); } -void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) { +static void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) { epSet->inUse = addr->inUse; epSet->numOfEps = addr->numOfEps; @@ -706,7 +697,7 @@ void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) { } -int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { +static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { uint32_t msgSize = 0; void *msg = NULL; int32_t code = 0; @@ -819,7 +810,7 @@ _return: } -int32_t schLaunchTask(SSchJob *job, SSchTask *task) { +static int32_t schLaunchTask(SSchJob *job, SSchTask *task) { SSubplan *plan = task->plan; SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen)); SCH_ERR_RET(schSetTaskCandidateAddrs(job, task)); @@ -837,7 +828,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) { return TSDB_CODE_SUCCESS; } -int32_t schLaunchJob(SSchJob *job) { +static int32_t schLaunchJob(SSchJob *job) { SSchLevel *level = taosArrayGet(job->levels, job->levelIdx); for (int32_t i = 0; i < level->taskNum; ++i) { SSchTask *task = taosArrayGet(level->subTasks, i); @@ -849,7 +840,7 @@ int32_t schLaunchJob(SSchJob *job) { return TSDB_CODE_SUCCESS; } -void schDropJobAllTasks(SSchJob *job) { +static void schDropJobAllTasks(SSchJob *job) { void *pIter = taosHashIterate(job->succTasks, NULL); while (pIter) { SSchTask *task = *(SSchTask **)pIter; @@ -877,82 +868,71 @@ void schDropJobAllTasks(SSchJob *job) { } } -uint64_t schGenSchId(void) { - uint64_t sId = 0; - - // TODO - - qDebug("Gen sId:0x%"PRIx64, sId); - - return sId; -} - - - -int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) { +static int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** job, bool syncSchedule) { if (nodeList && taosArrayGetSize(nodeList) <= 0) { qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId); } int32_t code = 0; - SSchJob *job = calloc(1, sizeof(SSchJob)); - if (NULL == job) { + SSchJob *pJob = calloc(1, sizeof(SSchJob)); + if (NULL == pJob) { qError("QID:%"PRIx64" calloc %d failed", sizeof(SSchJob)); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - job->attr.syncSchedule = syncSchedule; - job->transport = transport; - job->nodeList = nodeList; + pJob->attr.syncSchedule = syncSchedule; + pJob->transport = transport; + pJob->nodeList = nodeList; - SCH_ERR_JRET(schValidateAndBuildJob(pDag, job)); + SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob)); - job->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - if (NULL == job->execTasks) { - qError("taosHashInit %d failed", pDag->numOfSubplans); + pJob->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == pJob->execTasks) { + SCH_JOB_ELOG("taosHashInit %d execTasks failed", pDag->numOfSubplans); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - job->succTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - if (NULL == job->succTasks) { - qError("taosHashInit %d failed", pDag->numOfSubplans); + pJob->succTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == pJob->succTasks) { + SCH_JOB_ELOG("taosHashInit %d succTasks failed", pDag->numOfSubplans); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - job->failTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - if (NULL == job->failTasks) { - qError("taosHashInit %d failed", pDag->numOfSubplans); + pJob->failTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == pJob->failTasks) { + SCH_JOB_ELOG("taosHashInit %d failTasks failed", pDag->numOfSubplans); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - tsem_init(&job->rspSem, 0, 0); + tsem_init(&pJob->rspSem, 0, 0); - code = taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES); + code = taosHashPut(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId), &pJob, POINTER_BYTES); if (0 != code) { if (HASH_NODE_EXIST(code)) { - qError("taosHashPut queryId:0x%"PRIx64" already exist", job->queryId); + SCH_JOB_ELOG("job already exist, type:%d", pJob->attr.queryJob); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } else { - qError("taosHashPut queryId:0x%"PRIx64" failed", job->queryId); + qError("taosHashPut queryId:0x%"PRIx64" failed", pJob->queryId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } } - job->status = JOB_TASK_STATUS_NOT_START; - SCH_ERR_JRET(schLaunchJob(job)); + pJob->status = JOB_TASK_STATUS_NOT_START; + SCH_ERR_JRET(schLaunchJob(pJob)); - *(SSchJob **)pJob = job; + *(SSchJob **)job = pJob; if (syncSchedule) { - tsem_wait(&job->rspSem); + tsem_wait(&pJob->rspSem); } return TSDB_CODE_SUCCESS; _return: - *(SSchJob **)pJob = NULL; - scheduleFreeJob(job); + *(SSchJob **)job = NULL; + + scheduleFreeJob(pJob); SCH_RET(code); } @@ -1022,7 +1002,7 @@ int32_t scheduleFetchRows(void *pJob, void **data) { SSchJob *job = pJob; int32_t code = 0; - if (!job->attr.needFetch) { + if (!SCH_JOB_NEED_FETCH(&job->attr)) { qError("no need to fetch data"); SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } @@ -1101,7 +1081,7 @@ void scheduleFreeJob(void *pJob) { int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks); for(int32_t j = 0; j < numOfTasks; ++j) { SSchTask* pTask = taosArrayGet(pLevel->subTasks, j); - cleanupTask(pTask); + schFreeTask(pTask); } taosArrayDestroy(pLevel->subTasks); From 896df2bdc67b4874511a1e26c42aaf2a2f0aae50 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 7 Jan 2022 18:38:51 +0800 Subject: [PATCH 3/5] feature/qnode --- include/libs/qcom/query.h | 5 +- source/libs/scheduler/inc/schedulerInt.h | 8 +- source/libs/scheduler/src/scheduler.c | 244 ++++++++++++----------- 3 files changed, 137 insertions(+), 120 deletions(-) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index e1eef1c3f5..da88366f11 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -87,13 +87,14 @@ typedef struct SUseDbOutput { SDBVgroupInfo dbVgroup; } SUseDbOutput; -typedef enum { +enum { META_TYPE_NON_TABLE = 1, META_TYPE_CTABLE, META_TYPE_TABLE, - META_TYPE_BOTH_TABLE, + META_TYPE_BOTH_TABLE }; + typedef struct STableMetaOutput { int32_t metaType; char ctbFname[TSDB_TABLE_FNAME_LEN]; diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 0982411cf5..014385c3a6 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -97,7 +97,7 @@ typedef struct SSchJob { SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask* int8_t status; - SEpAddr resEp; + SQueryNodeAddr resNode; tsem_t rspSem; int32_t userFetch; int32_t remoteFetch; @@ -109,7 +109,7 @@ typedef struct SSchJob { } SSchJob; #define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE -#define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE +#define SCH_TASK_READY_TO_LUNCH(task) (atomic_load_32(&(task)->childReady) >= taosArrayGetSize((task)->children)) #define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN) #define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY) @@ -130,8 +130,8 @@ typedef struct SSchJob { #define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) -extern int32_t schLaunchTask(SSchJob *job, SSchTask *task); -extern int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType); +static int32_t schLaunchTask(SSchJob *job, SSchTask *task); +static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType); #ifdef __cplusplus } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 300f7154a4..6c6d5d8385 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -20,7 +20,7 @@ static SSchedulerMgmt schMgmt = {0}; -static int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { +int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { for (int32_t i = 0; i < pJob->levelNum; ++i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); @@ -31,6 +31,11 @@ static int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { int32_t parentNum = pPlan->pParents ? (int32_t)taosArrayGetSize(pPlan->pParents) : 0; if (childNum > 0) { + if (pJob->levelIdx == pLevel->level) { + SCH_JOB_ELOG("invalid query plan, lowest level, childNum:%d", childNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + pTask->children = taosArrayInit(childNum, POINTER_BYTES); if (NULL == pTask->children) { SCH_TASK_ELOG("taosArrayInit %d children failed", childNum); @@ -53,6 +58,11 @@ static int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { } if (parentNum > 0) { + if (0 == pLevel->level) { + SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", parentNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + pTask->parents = taosArrayInit(parentNum, POINTER_BYTES); if (NULL == pTask->parents) { SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum); @@ -84,19 +94,10 @@ static int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - for (int32_t i = 0; i < pLevel->taskNum; ++i) { - SSchTask *pTask = taosArrayGet(pLevel->subTasks, i); - - if (pTask->parents && taosArrayGetSize(pTask->parents) > 0) { - SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", (int32_t)taosArrayGetSize(pTask->parents)); - SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); - } - } - return TSDB_CODE_SUCCESS; } -static int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) { +int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) { pTask->plan = pPlan; pTask->level = pLevel; pTask->status = JOB_TASK_STATUS_NOT_START; @@ -105,11 +106,11 @@ static int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSch return TSDB_CODE_SUCCESS; } -static void schFreeTask(SSchTask* pTask) { +void schFreeTask(SSchTask* pTask) { taosArrayDestroy(pTask->candidateAddrs); } -static int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) { +int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) { int32_t code = 0; pJob->queryId = pDag->queryId; @@ -217,41 +218,48 @@ _return: SCH_RET(code); } -static int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) { - if (task->candidateAddrs) { +int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { + if (NULL != pTask->candidateAddrs) { return TSDB_CODE_SUCCESS; } - task->candidateIdx = 0; - task->candidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); - if (NULL == task->candidateAddrs) { - qError("taosArrayInit failed"); + pTask->candidateIdx = 0; + pTask->candidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); + if (NULL == pTask->candidateAddrs) { + SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CONDIDATE_EP_NUM); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - if (task->plan->execNode.numOfEps > 0) { - if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) { - qError("taosArrayPush failed"); + if (pTask->plan->execNode.numOfEps > 0) { + if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) { + SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + SCH_TASK_DLOG("use execNode from plan as candidate addr"); + return TSDB_CODE_SUCCESS; } int32_t addNum = 0; - int32_t nodeNum = taosArrayGetSize(job->nodeList); + int32_t nodeNum = taosArrayGetSize(pJob->nodeList); for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { - SQueryNodeAddr *naddr = taosArrayGet(job->nodeList, i); + SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i); - if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) { - qError("taosArrayPush failed"); + if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) { + SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } ++addNum; } + if (addNum <= 0) { + SCH_TASK_ELOG("no available execNode as condidate addr, nodeNum:%d", nodeNum); + return TSDB_CODE_QRY_INVALID_INPUT; + } + /* for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i])); @@ -264,7 +272,7 @@ static int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) { return TSDB_CODE_SUCCESS; } -static int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { +int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { if (0 != taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) { qError("failed to add new task, taskId:0x%"PRIx64", reqId:0x"PRIx64", out of memory", pJob->queryId); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -275,7 +283,7 @@ static int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } -static int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) { +int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) { if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { qError("remove task taskId:0x%"PRIx64" from execTasks failed, reqId:0x%"PRIx64, task->taskId, job->queryId); return TSDB_CODE_SUCCESS; @@ -291,7 +299,7 @@ static int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) return TSDB_CODE_SUCCESS; } -static int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { +int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { qWarn("remove task[%"PRIx64"] from execTasks failed, it may not exist", task->taskId); } @@ -306,7 +314,7 @@ static int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) return TSDB_CODE_SUCCESS; } -static int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) { +int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) { // TODO set retry or not based on task type/errCode/retry times/job status/available eps... // TODO if needRetry, set task retry info @@ -316,7 +324,7 @@ static int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t err } -static int32_t schFetchFromRemote(SSchJob *job) { +int32_t schFetchFromRemote(SSchJob *job) { int32_t code = 0; if (atomic_val_compare_exchange_32(&job->remoteFetch, 0, 1) != 0) { @@ -335,7 +343,7 @@ _return: } -static int32_t schProcessOnJobPartialSuccess(SSchJob *job) { +int32_t schProcessOnJobPartialSuccess(SSchJob *job) { job->status = JOB_TASK_STATUS_PARTIAL_SUCCEED; bool needFetch = job->userFetch; @@ -350,7 +358,7 @@ static int32_t schProcessOnJobPartialSuccess(SSchJob *job) { return TSDB_CODE_SUCCESS; } -static int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) { +int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) { job->status = JOB_TASK_STATUS_FAILED; job->errCode = errCode; @@ -363,57 +371,58 @@ static int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) { return TSDB_CODE_SUCCESS; } -static int32_t schProcessOnDataFetched(SSchJob *job) { +int32_t schProcessOnDataFetched(SSchJob *job) { atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); tsem_post(&job->rspSem); } -static int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { +int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { bool moved = false; - SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved)); + SCH_ERR_RET(schMoveTaskToSuccList(pJob, pTask, &moved)); if (!moved) { - SCH_TASK_ELOG(" task may already moved, status:%d", task->status); + SCH_TASK_ELOG(" task may already moved, status:%d", pTask->status); return TSDB_CODE_SUCCESS; } - task->status = JOB_TASK_STATUS_SUCCEED; + pTask->status = JOB_TASK_STATUS_SUCCEED; - int32_t parentNum = task->parents ? (int32_t)taosArrayGetSize(task->parents) : 0; + int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0; if (parentNum == 0) { - if (task->plan->level != 0) { - qError("level error"); + if (pTask->level->level != 0) { + SCH_TASK_ELOG("no parent task level error, level:%d", pTask->level->level); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } int32_t taskDone = 0; - if (SCH_TASK_NEED_WAIT_ALL(task)) { - SCH_LOCK(SCH_WRITE, &task->level->lock); - task->level->taskSucceed++; - taskDone = task->level->taskSucceed + task->level->taskFailed; - SCH_UNLOCK(SCH_WRITE, &task->level->lock); + if (SCH_TASK_NEED_WAIT_ALL(pTask)) { + SCH_LOCK(SCH_WRITE, &pTask->level->lock); + pTask->level->taskSucceed++; + taskDone = pTask->level->taskSucceed + pTask->level->taskFailed; + SCH_UNLOCK(SCH_WRITE, &pTask->level->lock); - if (taskDone < task->level->taskNum) { - qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum); + if (taskDone < pTask->level->taskNum) { + SCH_TASK_ELOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum); return TSDB_CODE_SUCCESS; + } else if (taskDone > pTask->level->taskNum) { + assert(0); } - if (task->level->taskFailed > 0) { - job->status = JOB_TASK_STATUS_FAILED; - SCH_ERR_RET(schProcessOnJobFailure(job, TSDB_CODE_QRY_APP_ERROR)); + if (pTask->level->taskFailed > 0) { + pJob->status = JOB_TASK_STATUS_FAILED; + SCH_ERR_RET(schProcessOnJobFailure(pJob, TSDB_CODE_QRY_APP_ERROR)); return TSDB_CODE_SUCCESS; } } else { - strncpy(job->resEp.fqdn, task->execAddr.epAddr[task->execAddr.inUse].fqdn, sizeof(job->resEp.fqdn)); - job->resEp.port = task->execAddr.epAddr[task->execAddr.inUse].port; + pJob->resNode = pTask->execAddr; } - job->fetchTask = task; - SCH_ERR_RET(schProcessOnJobPartialSuccess(job)); + pJob->fetchTask = pTask; + SCH_ERR_RET(schProcessOnJobPartialSuccess(pJob)); return TSDB_CODE_SUCCESS; } @@ -428,58 +437,58 @@ static int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { */ for (int32_t i = 0; i < parentNum; ++i) { - SSchTask *par = *(SSchTask **)taosArrayGet(task->parents, i); + SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i); - ++par->childReady; + atomic_add_fetch_32(&par->childReady, 1); - SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, task->plan->id.templateId, &task->execAddr)); + SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &pTask->execAddr)); if (SCH_TASK_READY_TO_LUNCH(par)) { - SCH_ERR_RET(schLaunchTask(job, par)); + SCH_ERR_RET(schLaunchTask(pJob, par)); } } return TSDB_CODE_SUCCESS; } -static int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { +int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { bool needRetry = false; bool moved = false; int32_t taskDone = 0; - SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry)); + SCH_ERR_RET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry)); if (!needRetry) { SCH_TASK_ELOG("task failed[%x], no more retry", errCode); - SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved)); + SCH_ERR_RET(schMoveTaskToFailList(pJob, pTask, &moved)); if (!moved) { - SCH_TASK_ELOG("task may already moved, status:%d", task->status); + SCH_TASK_ELOG("task may already moved, status:%d", pTask->status); } - if (SCH_TASK_NEED_WAIT_ALL(task)) { - SCH_LOCK(SCH_WRITE, &task->level->lock); - task->level->taskFailed++; - taskDone = task->level->taskSucceed + task->level->taskFailed; - SCH_UNLOCK(SCH_WRITE, &task->level->lock); + if (SCH_TASK_NEED_WAIT_ALL(pTask)) { + SCH_LOCK(SCH_WRITE, &pTask->level->lock); + pTask->level->taskFailed++; + taskDone = pTask->level->taskSucceed + pTask->level->taskFailed; + SCH_UNLOCK(SCH_WRITE, &pTask->level->lock); - if (taskDone < task->level->taskNum) { - qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum); + if (taskDone < pTask->level->taskNum) { + qDebug("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum); return TSDB_CODE_SUCCESS; } } - job->status = JOB_TASK_STATUS_FAILED; - SCH_ERR_RET(schProcessOnJobFailure(job, errCode)); + pJob->status = JOB_TASK_STATUS_FAILED; + SCH_ERR_RET(schProcessOnJobFailure(pJob, errCode)); return TSDB_CODE_SUCCESS; } - SCH_ERR_RET(schLaunchTask(job, task)); + SCH_ERR_RET(schLaunchTask(pJob, pTask)); return TSDB_CODE_SUCCESS; } -static int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { +int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t code = 0; switch (msgType) { @@ -567,7 +576,7 @@ _return: } -static int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) { +int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) { int32_t code = 0; SSchCallbackParam *pParam = (SSchCallbackParam *)param; @@ -593,32 +602,32 @@ _return: SCH_RET(code); } -static int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code); } -static int32_t schHandleCreateTableCallback(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t schHandleCreateTableCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code); } -static int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code); } -static int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code); } -static int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code); } -static int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) { SSchCallbackParam *pParam = (SSchCallbackParam *)param; qDebug("drop task rsp received, queryId:%"PRIx64 ",taksId:%"PRIx64 ",code:%d", pParam->queryId, pParam->taskId, code); } -static int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { +int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { switch (msgType) { case TDMT_VND_CREATE_TABLE: *fp = schHandleCreateTableCallback; @@ -647,7 +656,7 @@ static int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { } -static int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) { +int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) { int32_t code = 0; SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { @@ -686,7 +695,7 @@ _return: SCH_RET(code); } -static void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) { +void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) { epSet->inUse = addr->inUse; epSet->numOfEps = addr->numOfEps; @@ -697,7 +706,7 @@ static void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) { } -static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { +int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { uint32_t msgSize = 0; void *msg = NULL; int32_t code = 0; @@ -810,37 +819,40 @@ _return: } -static int32_t schLaunchTask(SSchJob *job, SSchTask *task) { - SSubplan *plan = task->plan; - SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen)); - SCH_ERR_RET(schSetTaskCandidateAddrs(job, task)); +int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { + SSubplan *plan = pTask->plan; + + SCH_ERR_RET(qSubPlanToString(plan, &pTask->msg, &pTask->msgLen)); + SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask)); - if (NULL == task->candidateAddrs || taosArrayGetSize(task->candidateAddrs) <= 0) { - SCH_TASK_ELOG("no valid candidate node for task:%"PRIx64, task->taskId); + if (NULL == pTask->candidateAddrs || taosArrayGetSize(pTask->candidateAddrs) <= 0) { + SCH_TASK_ELOG("no valid candidate node for task:%"PRIx64, pTask->taskId); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } // NOTE: race condition: the task should be put into the hash table before send msg to server - SCH_ERR_RET(schPushTaskToExecList(job, task)); - SCH_ERR_RET(schBuildAndSendMsg(job, task, plan->msgType)); + SCH_ERR_RET(schPushTaskToExecList(pJob, pTask)); + SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, plan->msgType)); - task->status = JOB_TASK_STATUS_EXECUTING; - return TSDB_CODE_SUCCESS; -} - -static int32_t schLaunchJob(SSchJob *job) { - SSchLevel *level = taosArrayGet(job->levels, job->levelIdx); - for (int32_t i = 0; i < level->taskNum; ++i) { - SSchTask *task = taosArrayGet(level->subTasks, i); - SCH_ERR_RET(schLaunchTask(job, task)); - } - - job->status = JOB_TASK_STATUS_EXECUTING; + pTask->status = JOB_TASK_STATUS_EXECUTING; return TSDB_CODE_SUCCESS; } -static void schDropJobAllTasks(SSchJob *job) { +int32_t schLaunchJob(SSchJob *pJob) { + SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx); + + for (int32_t i = 0; i < level->taskNum; ++i) { + SSchTask *pTask = taosArrayGet(level->subTasks, i); + SCH_ERR_RET(schLaunchTask(pJob, pTask)); + } + + pJob->status = JOB_TASK_STATUS_EXECUTING; + + return TSDB_CODE_SUCCESS; +} + +void schDropJobAllTasks(SSchJob *job) { void *pIter = taosHashIterate(job->succTasks, NULL); while (pIter) { SSchTask *task = *(SSchTask **)pIter; @@ -868,7 +880,7 @@ static void schDropJobAllTasks(SSchJob *job) { } } -static int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** job, bool syncSchedule) { +int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** job, bool syncSchedule) { if (nodeList && taosArrayGetSize(nodeList) <= 0) { qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId); } @@ -876,7 +888,7 @@ static int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag int32_t code = 0; SSchJob *pJob = calloc(1, sizeof(SSchJob)); if (NULL == pJob) { - qError("QID:%"PRIx64" calloc %d failed", sizeof(SSchJob)); + qError("QID:%"PRIx64" calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob)); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -909,23 +921,27 @@ static int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag code = taosHashPut(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId), &pJob, POINTER_BYTES); if (0 != code) { if (HASH_NODE_EXIST(code)) { - SCH_JOB_ELOG("job already exist, type:%d", pJob->attr.queryJob); + SCH_JOB_ELOG("job already exist, isQueryJob:%d", pJob->attr.queryJob); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } else { - qError("taosHashPut queryId:0x%"PRIx64" failed", pJob->queryId); - SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + SCH_JOB_ELOG("taosHashPut job failed, errno:%d", errno); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } } pJob->status = JOB_TASK_STATUS_NOT_START; + SCH_ERR_JRET(schLaunchJob(pJob)); *(SSchJob **)job = pJob; - + if (syncSchedule) { + SCH_JOB_DLOG("will wait for rsp now"); tsem_wait(&pJob->rspSem); } + SCH_JOB_DLOG("job exec done"); + return TSDB_CODE_SUCCESS; _return: @@ -960,12 +976,12 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - if (taosGetSystemUUID(&schMgmt.sId, sizeof(schMgmt.sId))) { + if (taosGetSystemUUID((char *)&schMgmt.sId, sizeof(schMgmt.sId))) { qError("generate schdulerId failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR); } - qInfo("scheduler %"PRIx64" initizlized, maxJob:%u", schMgmt.cfg.maxJobNum); + qInfo("scheduler %"PRIx64" initizlized, maxJob:%u", schMgmt.sId, schMgmt.cfg.maxJobNum); return TSDB_CODE_SUCCESS; } From 30556c676f4e6187f5926992e76523f39aec5094 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Sat, 8 Jan 2022 10:35:29 +0800 Subject: [PATCH 4/5] feature/qnode --- source/libs/scheduler/inc/schedulerInt.h | 6 + source/libs/scheduler/src/scheduler.c | 141 ++++++++++++++--------- 2 files changed, 94 insertions(+), 53 deletions(-) diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 014385c3a6..270f255ec0 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -113,6 +113,12 @@ typedef struct SSchJob { #define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN) #define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY) +#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) +#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) + +#define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st) +#define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status) + #define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != QUERY_TYPE_MODIFY) #define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 6c6d5d8385..56e0fe9a5c 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -100,7 +100,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) { pTask->plan = pPlan; pTask->level = pLevel; - pTask->status = JOB_TASK_STATUS_NOT_START; + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START); pTask->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); return TSDB_CODE_SUCCESS; @@ -236,7 +236,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCH_TASK_DLOG("use execNode from plan as candidate addr"); + SCH_TASK_DLOG("use execNode from plan as candidate addr, numOfEps:%d", pTask->plan->execNode.numOfEps); return TSDB_CODE_SUCCESS; } @@ -273,13 +273,19 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { } int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { - if (0 != taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) { - qError("failed to add new task, taskId:0x%"PRIx64", reqId:0x"PRIx64", out of memory", pJob->queryId); + int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES); + if (0 != code) { + if (HASH_NODE_EXIST(code)) { + SCH_TASK_ELOG("task already in exec list, code:%x", code); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + SCH_TASK_ELOG("taosHashPut task to exec list failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - qDebug("add one task, taskId:0x%"PRIx64", numOfTasks:%d, reqId:0x%"PRIx64, pTask->taskId, taosHashGetSize(pJob->execTasks), - pJob->queryId); + SCH_TASK_DLOG("task added to exec list, numOfTasks:%d", taosHashGetSize(pJob->execTasks)); + return TSDB_CODE_SUCCESS; } @@ -387,7 +393,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } - pTask->status = JOB_TASK_STATUS_SUCCEED; + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED); int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0; if (parentNum == 0) { @@ -460,10 +466,14 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) if (!needRetry) { SCH_TASK_ELOG("task failed[%x], no more retry", errCode); - SCH_ERR_RET(schMoveTaskToFailList(pJob, pTask, &moved)); - if (!moved) { - SCH_TASK_ELOG("task may already moved, status:%d", pTask->status); - } + if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) { + SCH_ERR_RET(schMoveTaskToFailList(pJob, pTask, &moved)); + if (!moved) { + SCH_TASK_ELOG("task may already moved, status:%d", pTask->status); + } + } + + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED); if (SCH_TASK_NEED_WAIT_ALL(pTask)) { SCH_LOCK(SCH_WRITE, &pTask->level->lock); @@ -476,11 +486,10 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) return TSDB_CODE_SUCCESS; } } - - pJob->status = JOB_TASK_STATUS_FAILED; + SCH_ERR_RET(schProcessOnJobFailure(pJob, errCode)); - return TSDB_CODE_SUCCESS; + return errCode; } SCH_ERR_RET(schLaunchTask(pJob, pTask)); @@ -706,7 +715,7 @@ void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) { } -int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { +int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { uint32_t msgSize = 0; void *msg = NULL; int32_t code = 0; @@ -714,22 +723,22 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { switch (msgType) { case TDMT_VND_CREATE_TABLE: case TDMT_VND_SUBMIT: { - if (NULL == task->msg || task->msgLen <= 0) { + if (NULL == pTask->msg || pTask->msgLen <= 0) { qError("submit msg is NULL"); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - msgSize = task->msgLen; - msg = task->msg; + msgSize = pTask->msgLen; + msg = pTask->msg; break; } case TDMT_VND_QUERY: { - if (NULL == task->msg) { + if (NULL == pTask->msg) { qError("query msg is NULL"); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - msgSize = sizeof(SSubQueryMsg) + task->msgLen; + msgSize = sizeof(SSubQueryMsg) + pTask->msgLen; msg = calloc(1, msgSize); if (NULL == msg) { qError("calloc %d failed", msgSize); @@ -738,12 +747,12 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { SSubQueryMsg *pMsg = msg; - pMsg->header.vgId = htonl(task->plan->execNode.nodeId); + pMsg->header.vgId = htonl(pTask->plan->execNode.nodeId); pMsg->sId = htobe64(schMgmt.sId); - pMsg->queryId = htobe64(job->queryId); - pMsg->taskId = htobe64(task->taskId); - pMsg->contentLen = htonl(task->msgLen); - memcpy(pMsg->msg, task->msg, task->msgLen); + pMsg->queryId = htobe64(pJob->queryId); + pMsg->taskId = htobe64(pTask->taskId); + pMsg->contentLen = htonl(pTask->msgLen); + memcpy(pMsg->msg, pTask->msg, pTask->msgLen); break; } case TDMT_VND_RES_READY: { @@ -756,14 +765,14 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { SResReadyMsg *pMsg = msg; - pMsg->header.vgId = htonl(task->plan->execNode.nodeId); + pMsg->header.vgId = htonl(pTask->plan->execNode.nodeId); pMsg->sId = htobe64(schMgmt.sId); - pMsg->queryId = htobe64(job->queryId); - pMsg->taskId = htobe64(task->taskId); + pMsg->queryId = htobe64(pJob->queryId); + pMsg->taskId = htobe64(pTask->taskId); break; } case TDMT_VND_FETCH: { - if (NULL == task) { + if (NULL == pTask) { SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } msgSize = sizeof(SResFetchMsg); @@ -775,10 +784,10 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { SResFetchMsg *pMsg = msg; - pMsg->header.vgId = htonl(task->plan->execNode.nodeId); + pMsg->header.vgId = htonl(pTask->plan->execNode.nodeId); pMsg->sId = htobe64(schMgmt.sId); - pMsg->queryId = htobe64(job->queryId); - pMsg->taskId = htobe64(task->taskId); + pMsg->queryId = htobe64(pJob->queryId); + pMsg->taskId = htobe64(pTask->taskId); break; } case TDMT_VND_DROP_TASK:{ @@ -791,10 +800,10 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { STaskDropMsg *pMsg = msg; - pMsg->header.vgId = htonl(task->plan->execNode.nodeId); + pMsg->header.vgId = htonl(pTask->plan->execNode.nodeId); pMsg->sId = htobe64(schMgmt.sId); - pMsg->queryId = htobe64(job->queryId); - pMsg->taskId = htobe64(task->taskId); + pMsg->queryId = htobe64(pJob->queryId); + pMsg->taskId = htobe64(pTask->taskId); break; } default: @@ -804,11 +813,11 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { } SEpSet epSet; - SQueryNodeAddr *addr = taosArrayGet(task->candidateAddrs, task->candidateIdx); + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); schConvertAddrToEpSet(addr, &epSet); - SCH_ERR_JRET(schAsyncSendMsg(job->transport, &epSet, job->queryId, task->taskId, msgType, msg, msgSize)); + SCH_ERR_JRET(schAsyncSendMsg(pJob->transport, &epSet, pJob->queryId, pTask->taskId, msgType, msg, msgSize)); return TSDB_CODE_SUCCESS; @@ -818,25 +827,51 @@ _return: SCH_RET(code); } - -int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { - SSubplan *plan = pTask->plan; - - SCH_ERR_RET(qSubPlanToString(plan, &pTask->msg, &pTask->msgLen)); - SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask)); - - if (NULL == pTask->candidateAddrs || taosArrayGetSize(pTask->candidateAddrs) <= 0) { - SCH_TASK_ELOG("no valid candidate node for task:%"PRIx64, pTask->taskId); - SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); +static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { + int8_t status = SCH_GET_JOB_STATUS(pJob); + if (pStatus) { + *pStatus = status; } - // NOTE: race condition: the task should be put into the hash table before send msg to server - SCH_ERR_RET(schPushTaskToExecList(pJob, pTask)); - SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, plan->msgType)); + return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED + || status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING); +} - pTask->status = JOB_TASK_STATUS_EXECUTING; +int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { + int8_t status = 0; + int32_t code = 0; + + if (schJobNeedToStop(pJob, &status)) { + SCH_TASK_ELOG("no need to launch task cause of job status, job status:%d", status); + SCH_ERR_RET(pJob->errCode); + } + + SSubplan *plan = pTask->plan; + + if (NULL == pTask->msg) { + code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen); + if (TSDB_CODE_SUCCESS != code || NULL == pTask->msg || pTask->msgLen <= 0) { + SCH_TASK_ELOG("subplanToString error, code:%x, msg:%p, len:%d", code, pTask->msg, pTask->msgLen); + SCH_ERR_JRET(code); + } + } + + SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask)); + + // NOTE: race condition: the task should be put into the hash table before send msg to server + SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask)); + + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING); + + SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, plan->msgType)); return TSDB_CODE_SUCCESS; + +_return: + + code = schProcessOnTaskFailure(pJob, pTask, code); + + SCH_RET(code); } int32_t schLaunchJob(SSchJob *pJob) { @@ -936,11 +971,11 @@ int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void* *(SSchJob **)job = pJob; if (syncSchedule) { - SCH_JOB_DLOG("will wait for rsp now"); + SCH_JOB_DLOG("will wait for rsp now, job status:%d", SCH_GET_JOB_STATUS(pJob)); tsem_wait(&pJob->rspSem); } - SCH_JOB_DLOG("job exec done"); + SCH_JOB_DLOG("job exec done, job status:%d", SCH_GET_JOB_STATUS(pJob)); return TSDB_CODE_SUCCESS; From 797d08915f1e1daf0054e2bb47ec589da16ef68d Mon Sep 17 00:00:00 2001 From: dapan Date: Sat, 8 Jan 2022 15:14:33 +0800 Subject: [PATCH 5/5] feature/qnode --- source/libs/catalog/src/catalog.c | 10 ++++++++ source/libs/parser/src/astValidate.c | 35 +++++++++++++++++++++++++- source/libs/planner/src/physicalPlan.c | 17 ++++++++----- source/libs/scheduler/src/scheduler.c | 17 +++++++------ 4 files changed, 64 insertions(+), 15 deletions(-) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index da0951fe1f..94f34b8e17 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1226,6 +1226,16 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S tNameGetFullDbName(pTableName, db); CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, db, false, &dbVgroup)); + // REMOEV THIS .... + if (0 == tbMeta->vgId) { + SVgroupInfo vgroup = {0}; + + catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pTableName, &vgroup); + + tbMeta->vgId = vgroup.vgId; + } + // REMOVE THIS .... + if (tbMeta->tableType == TSDB_SUPER_TABLE) { CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbVgroup, pVgroupList)); } else { diff --git a/source/libs/parser/src/astValidate.c b/source/libs/parser/src/astValidate.c index 3ca3d87a79..5cabbb5e3b 100644 --- a/source/libs/parser/src/astValidate.c +++ b/source/libs/parser/src/astValidate.c @@ -3628,6 +3628,33 @@ int32_t evaluateSqlNode(SSqlNode* pNode, int32_t tsPrecision, SMsgBuf* pMsgBuf) return TSDB_CODE_SUCCESS; } +int32_t setTableVgroupList(SParseBasicCtx *pCtx, SName* name, SVgroupsInfo **pVgList) { + SArray* vgroupList = NULL; + int32_t code = catalogGetTableDistVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, name, &vgroupList); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + int32_t vgroupNum = taosArrayGetSize(vgroupList); + + SVgroupsInfo *vgList = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupMsg) * vgroupNum); + + vgList->numOfVgroups = vgroupNum; + + for (int32_t i = 0; i < vgroupNum; ++i) { + SVgroupInfo *vg = taosArrayGet(vgroupList, i); + vgList->vgroups[i].vgId = vg->vgId; + vgList->vgroups[i].numOfEps = vg->numOfEps; + memcpy(vgList->vgroups[i].epAddr, vg->epAddr, sizeof(vgList->vgroups[i].epAddr)); + } + + *pVgList = vgList; + + taosArrayDestroy(vgroupList); + + return TSDB_CODE_SUCCESS; +} + int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmtInfo* pQueryInfo, char* msgBuf, int32_t msgBufLen) { assert(pCtx != NULL && pInfo != NULL); int32_t code = 0; @@ -3916,7 +3943,7 @@ int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmt if (code != TSDB_CODE_SUCCESS) { return code; } - + data.pTableMeta = taosArrayInit(1, POINTER_BYTES); taosArrayPush(data.pTableMeta, &pmt); @@ -3926,6 +3953,12 @@ int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmt pQueryInfo->pTableMetaInfo[0]->name = *name; pQueryInfo->numOfTables = 1; + code = setTableVgroupList(pCtx, name, &pQueryInfo->pTableMetaInfo[0]->vgroupList); + if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(data.pTableMeta); + return code; + } + // evaluate the sqlnode STableMeta* pTableMeta = (STableMeta*) taosArrayGetP(data.pTableMeta, 0); assert(pTableMeta != NULL); diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 461f16cdf0..bbb84223ac 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -160,9 +160,6 @@ static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableI return (SPhyNode*)node; } -static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) { - return createUserTableScanNode(pPlanNode, pTable, OP_TableScan); -} static bool isSystemTable(SQueryTableInfo* pTable) { // todo @@ -259,12 +256,20 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) { return (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType); } +static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, SSubplan* subplan) { + vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[0]), &subplan->execNode); + + return createUserTableScanNode(pPlanNode, pTable, OP_TableScan); +} + + static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo; + if (needMultiNodeScan(pTable)) { return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable)); } - return createSingleTableScanNode(pPlanNode, pTable); + return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan); } static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { @@ -322,12 +327,12 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) { if (QNODE_MODIFY == pRoot->info.type) { splitModificationOpSubPlan(pCxt, pRoot); } else { - SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE); + SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN); ++(pCxt->nextId.templateId); subplan->msgType = TDMT_VND_QUERY; subplan->pNode = createPhyNode(pCxt, pRoot); - subplan->pDataSink = createDataDispatcher(pCxt, pRoot); + subplan->pDataSink = createDataDispatcher(pCxt, pRoot); } // todo deal subquery } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 5190546cef..3f6d0f1702 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -242,18 +242,19 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { } int32_t addNum = 0; + int32_t nodeNum = 0; if (pJob->nodeList) { - int32_t nodeNum = taosArrayGetSize(pJob->nodeList); - - for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { - SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i); + nodeNum = taosArrayGetSize(pJob->nodeList); - if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) { - SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { + SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i); + + if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) { + SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } } } - } if (addNum <= 0) { SCH_TASK_ELOG("no available execNode as condidate addr, nodeNum:%d", nodeNum);