From d61e2bd1c42148e1f09257c7433919db6b965527 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 17 Dec 2021 20:58:45 +0800 Subject: [PATCH 1/8] define some scheduler apis --- include/common/taosmsg.h | 4 +- include/libs/scheduler/scheduler.h | 6 +- include/util/taoserror.h | 1 + source/libs/scheduler/inc/schedulerInt.h | 17 ++- source/libs/scheduler/src/scheduler.c | 151 ++++++++++++++++++++--- source/util/src/terror.c | 1 + 6 files changed, 155 insertions(+), 25 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index eb2adda394..a19102fdbe 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -233,9 +233,9 @@ typedef struct { } SEpAddrMsg; typedef struct { - char *fqdn; + char fqdn[TSDB_FQDN_LEN]; uint16_t port; -} SEpAddr1; +} SEpAddr; typedef struct { int32_t numOfVnodes; diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 40c85520d4..10eba7f49c 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -55,9 +55,9 @@ typedef struct SQueryProfileSummary { * @param pJob * @return */ -int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob); +int32_t scheduleQueryJob(void *pRpc, SQueryDag* pDag, void** pJob); -int32_t scheduleFetchRows(void *pJob, void *data); +int32_t scheduleFetchRows(void *pRpc, void *pJob, void *data); /** @@ -65,7 +65,7 @@ int32_t scheduleFetchRows(void *pJob, void *data); * @param pJob * @return */ -int32_t scheduleCancelJob(void *pJob); +int32_t scheduleCancelJob(void *pRpc, void *pJob); void scheduleFreeJob(void *pJob); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 5abb678a07..f4f05062b5 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -508,6 +508,7 @@ int32_t* taosGetErrno(); //scheduler #define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501) //scheduler status error +#define TSDB_CODE_SCH_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2502) //scheduler internal error diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 4620a816da..8609c31748 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -27,6 +27,7 @@ extern "C" { #include "thash.h" #define SCHEDULE_DEFAULT_JOB_NUMBER 1000 +#define SCHEDULE_DEFAULT_TASK_NUMBER 1000 enum { SCH_STATUS_NOT_START = 1, @@ -43,18 +44,21 @@ typedef struct SSchedulerMgmt { } SSchedulerMgmt; typedef struct SQueryTask { - uint64_t taskId; // task id - char *msg; // operator tree - int8_t status; // task status - SQueryProfileSummary summary; // task execution summary + uint64_t taskId; // task id + SSubplan *plan; // subplan + char *msg; // operator tree + int8_t status; // task status + SEpAddr execAddr; // task actual executed node address + SQueryProfileSummary summary; // task execution summary + SArray *childern; // the datasource tasks,from which to fetch the result, element is SQueryTask* + SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* } SQueryTask; typedef struct SQueryLevel { + int32_t level; int8_t status; int32_t taskNum; - SArray *subTasks; // Element is SQueryTask - SArray *subPlans; // Element is SSubplan } SQueryLevel; typedef struct SQueryJob { @@ -70,6 +74,7 @@ typedef struct SQueryJob { #define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__) +#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) #define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 7d387cbc66..deaecaae53 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -50,6 +50,66 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_ */ } +int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { + for (int32_t i = 0; i < job->levelNum; ++i) { + SQueryLevel *level = taosArrayGet(job->levels, i); + + for (int32_t m = 0; m < level->taskNum; ++m) { + SQueryTask *task = taosArrayGet(level->subTasks, m); + SSubplan *plan = task->plan; + int32_t childNum = (int32_t)taosArrayGetSize(plan->pChildern); + int32_t parentNum = (int32_t)taosArrayGetSize(plan->pParents); + + if (childNum > 0) { + task->childern = taosArrayInit(childNum, POINTER_BYTES); + if (NULL == task->childern) { + qError("taosArrayInit %d failed", childNum); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } + + for (int32_t n = 0; n < childNum; ++n) { + SSubplan *child = taosArrayGet(plan->pChildern, n); + SQueryTask *childTask = taosHashGet(planToTask, &child, POINTER_BYTES); + if (childTask) { + qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + if (NULL == taosArrayPush(task->childern, &childTask)) { + qError("taosArrayPush failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } + + if (parentNum > 0) { + task->parents = taosArrayInit(parentNum, POINTER_BYTES); + if (NULL == task->parents) { + qError("taosArrayInit %d failed", parentNum); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } + + for (int32_t n = 0; n < parentNum; ++n) { + SSubplan *parent = taosArrayGet(plan->pParents, n); + SQueryTask *parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES); + if (parentTask) { + qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + if (NULL == taosArrayPush(task->parents, &parentTask)) { + qError("taosArrayPush failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } + } + } + + return TSDB_CODE_SUCCESS; +} + + int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { int32_t code = 0; int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans); @@ -58,10 +118,16 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_TASK_NUMBER, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (NULL == planToTask) { + qError("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + job->levels = taosArrayInit(levelNum, sizeof(SQueryLevel)); if (NULL == job->levels) { qError("taosArrayInit %d failed", levelNum); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } job->levelNum = levelNum; @@ -77,32 +143,39 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { level.status = SCH_STATUS_NOT_START; for (int32_t i = 0; i < levelNum; ++i) { + level.level = i; levelPlans = taosArrayGetP(dag->pSubplans, i); if (NULL == levelPlans) { qError("no level plans for level %d", i); - SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } levelPlanNum = (int32_t)taosArrayGetSize(levelPlans); if (levelPlanNum <= 0) { qError("invalid level plans number:%d, level:%d", levelPlanNum, i); - SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } level.taskNum = levelPlanNum; - level.subPlans = levelPlans; level.subTasks = taosArrayInit(levelPlanNum, sizeof(SQueryTask)); if (NULL == level.subTasks) { qError("taosArrayInit %d failed", levelPlanNum); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } for (int32_t n = 0; n < levelPlanNum; ++n) { + SSubplan *plan = taosArrayGet(levelPlans, n); SQueryTask *task = taosArrayGet(level.subTasks, n); task->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); + task->plan = plan; task->status = SCH_STATUS_NOT_START; + + if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &task, POINTER_BYTES)) { + qError("taosHashPut failed"); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } } if (NULL == taosArrayPush(job->levels, &level)) { @@ -111,6 +184,12 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { } } + SCH_ERR_JRET(schBuildTaskRalation(job, planToTask)); + + if (planToTask) { + taosHashCleanup(planToTask); + } + return TSDB_CODE_SUCCESS; _return: @@ -118,20 +197,64 @@ _return: taosArrayDestroy(level.subTasks); } + if (planToTask) { + taosHashCleanup(planToTask); + } + SCH_RET(code); } +int32_t schAvailableEpSet(SEpSet *epSet) { -int32_t schJobExecute(SQueryJob *job) { - switch (job->status) { +} + + +int32_t schAsyncLaunchTask(SQueryJob *job, SQueryTask *task) { + +} + + + +int32_t schTaskRun(SQueryJob *job, SQueryTask *task) { + SSubplan *plan = task->plan; + + switch (task->status) { case SCH_STATUS_NOT_START: - + SCH_ERR_RET(qSubPlanToString(plan, &task->msg)); + if (plan->execEpSet.numOfEps <= 0) { + SCH_ERR_RET(schAvailableEpSet(&plan->execEpSet)); + } + SCH_ERR_RET(schAsyncLaunchTask(job, task)); + break; + case SCH_STATUS_EXECUTING: + break; + case SCH_STATUS_SUCCEED: break; - default: - SCH_JOB_ERR_LOG("invalid job status:%d", job->status); + SCH_JOB_ERR_LOG("invalid level status:%d, levelIdx:%d", job->status, job->levelIdx); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } + + return TSDB_CODE_SUCCESS; +} + +int32_t schJobRun(SQueryJob *job) { + bool cont = true; + + while (cont) { + switch (job->status) { + case SCH_STATUS_NOT_START: + case SCH_STATUS_EXECUTING: + + break; + + default: + SCH_JOB_ERR_LOG("invalid job status:%d", job->status); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + } + + return TSDB_CODE_SUCCESS; } @@ -145,7 +268,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { } -int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob) { +int32_t scheduleQueryJob(void *pRpc, SQueryDag* pDag, void** pJob) { if (NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -158,7 +281,7 @@ int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob) { SCH_ERR_JRET(schValidateAndBuildJob(pDag, job)); - SCH_ERR_JRET(schJobExecute(job)); + SCH_ERR_JRET(schJobRun(job)); *(SQueryJob **)pJob = job; @@ -172,9 +295,9 @@ _return: SCH_RET(code); } -int32_t scheduleFetchRows(void *pJob, void *data); +int32_t scheduleFetchRows(void *pRpc, void *pJob, void *data); -int32_t scheduleCancelJob(void *pJob); +int32_t scheduleCancelJob(void *pRpc, void *pJob); void scheduleFreeJob(void *pJob) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index e3a5c9e034..377b1bb602 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -503,6 +503,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error" //scheduler TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status error") +TAOS_DEFINE_ERROR(TSDB_CODE_SCH_INTERNAL_ERROR, "scheduler internal error") #ifdef TAOS_ERROR_C From 8dd9cc7480d98bab448cee16d448e49fa2911b09 Mon Sep 17 00:00:00 2001 From: dapan Date: Sat, 18 Dec 2021 12:28:01 +0800 Subject: [PATCH 2/8] scheduler job code --- include/libs/planner/planner.h | 2 + source/libs/scheduler/inc/schedulerInt.h | 29 ++-- source/libs/scheduler/src/scheduler.c | 176 ++++++++++++++++++----- 3 files changed, 164 insertions(+), 43 deletions(-) diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index a7d418d45e..c147a399f0 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -105,6 +105,8 @@ typedef struct SSubplan { } SSubplan; typedef struct SQueryDag { + uint64_t queryId; + int32_t numOfSubplans; SArray *pSubplans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. } SQueryDag; diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 8609c31748..a08ccda54b 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -44,14 +44,16 @@ typedef struct SSchedulerMgmt { } SSchedulerMgmt; typedef struct SQueryTask { - uint64_t taskId; // task id - SSubplan *plan; // subplan - char *msg; // operator tree - int8_t status; // task status - SEpAddr execAddr; // task actual executed node address - SQueryProfileSummary summary; // task execution summary - SArray *childern; // the datasource tasks,from which to fetch the result, element is SQueryTask* - SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* + uint64_t taskId; // task id + SSubplan *plan; // subplan + char *msg; // operator tree + int8_t status; // task status + SEpAddr execAddr; // task actual executed node address + SQueryProfileSummary summary; // task execution summary + int32_t childReady; // child task ready number + SArray *childSrcEp; // child Eps, element is SEpAddr + SArray *childern; // the datasource tasks,from which to fetch the result, element is SQueryTask* + SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* } SQueryTask; typedef struct SQueryLevel { @@ -67,11 +69,16 @@ typedef struct SQueryJob { int32_t levelIdx; int8_t status; SQueryProfileSummary summary; - - SArray *levels; // Element is SQueryLevel, starting from 0. - SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. + + SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* + SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* + + SArray *levels; // Element is SQueryLevel, starting from 0. + SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. } SQueryJob; +#define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE + #define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__) #define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index deaecaae53..37d5b617d5 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -53,7 +53,7 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { for (int32_t i = 0; i < job->levelNum; ++i) { SQueryLevel *level = taosArrayGet(job->levels, i); - + for (int32_t m = 0; m < level->taskNum; ++m) { SQueryTask *task = taosArrayGet(level->subTasks, m); SSubplan *plan = task->plan; @@ -106,12 +106,31 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { } } + SQueryLevel *level = taosArrayGet(job->levels, 0); + if (level->taskNum > 1) { + qError("invalid plan info, level 0, taskNum:%d", level->taskNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + SQueryTask *task = taosArrayGet(level->subTasks, 0); + if (task->parents && taosArrayGetSize(task->parents) > 0) { + qError("invalid plan info, level 0, parentNum:%d", (int32_t)taosArrayGetSize(task->parents)); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + return TSDB_CODE_SUCCESS; } int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { int32_t code = 0; + + if (dag->numOfSubplans <= 0) { + qError("invalid subplan num:%d", dag->numOfSubplans); + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans); if (levelNum <= 0) { qError("invalid level num:%d", levelNum); @@ -213,47 +232,126 @@ int32_t schAsyncLaunchTask(SQueryJob *job, SQueryTask *task) { } +int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCode, bool *needRetry) { + +} -int32_t schTaskRun(SQueryJob *job, SQueryTask *task) { - SSubplan *plan = task->plan; +int32_t schProcessOnJobSuccess(SQueryJob *job) { + +} + +int32_t schProcessOnJobFailure(SQueryJob *job) { + +} + + +int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { + bool moved = false; - switch (task->status) { - case SCH_STATUS_NOT_START: - SCH_ERR_RET(qSubPlanToString(plan, &task->msg)); - if (plan->execEpSet.numOfEps <= 0) { - SCH_ERR_RET(schAvailableEpSet(&plan->execEpSet)); - } - SCH_ERR_RET(schAsyncLaunchTask(job, task)); - break; - case SCH_STATUS_EXECUTING: - break; - case SCH_STATUS_SUCCEED: - break; - default: - SCH_JOB_ERR_LOG("invalid level status:%d, levelIdx:%d", job->status, job->levelIdx); - SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved)); + if (!moved) { + qWarn("task[%d] already moved", task->taskId); + return TSDB_CODE_SUCCESS; + } + + int32_t parentNum = (int32_t)taosArrayGetSize(task->parents); + if (parentNum == 0) { + if (task->plan->level != 0) { + qError("level error"); + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + SCH_ERR_RET(schProcessOnJobSuccess()); + + return TSDB_CODE_SUCCESS; + } + + for (int32_t i = 0; i < parentNum; ++i) { + SQueryTask *par = taosArrayGet(task->parents, i); + + ++par->childReady; + + if (NULL == taosArrayPush(par->childSrcEp, &task->execAddr)) { + qError("taosArrayPush failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + if (SCH_TASK_READY_TO_LUNCH(par)) { + SCH_ERR_RET(schTaskRun(job, task)); + } } return TSDB_CODE_SUCCESS; } -int32_t schJobRun(SQueryJob *job) { - bool cont = true; +int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCode) { + bool needRetry = false; + SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry)); - while (cont) { - switch (job->status) { - case SCH_STATUS_NOT_START: - case SCH_STATUS_EXECUTING: + if (!needRetry) { + SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode); + + job->status = SCH_STATUS_FAILED; + SCH_ERR_RET(schProcessOnJobFailure(job)); - break; - - default: - SCH_JOB_ERR_LOG("invalid job status:%d", job->status); - SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); - } + return TSDB_CODE_SUCCESS; } + SCH_ERR_RET(schTaskRun(job, task)); + + return TSDB_CODE_SUCCESS; +} + + +int32_t schPushTaskToExecList(SQueryJob *job, SQueryTask *task) { + if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { + qError("taosHashPut failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) { + if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { + qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId); + return TSDB_CODE_SUCCESS + } + + if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { + qError("taosHashPut failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + *moved = true; + + return TSDB_CODE_SUCCESS; +} + + +int32_t schTaskRun(SQueryJob *job, SQueryTask *task) { + SSubplan *plan = task->plan; + + SCH_ERR_RET(qSubPlanToString(plan, &task->msg)); + if (plan->execEpSet.numOfEps <= 0) { + SCH_ERR_RET(schAvailableEpSet(&plan->execEpSet)); + } + + SCH_ERR_RET(schAsyncLaunchTask(job, task)); + + SCH_ERR_RET(schPushTaskToExecList(job, task)) + + return TSDB_CODE_SUCCESS; +} + +int32_t schJobRun(SQueryJob *job) { + SQueryLevel *level = taosArrayGet(job->levels, job->levelIdx); + for (int32_t i = 0; i < level->taskNum; ++i) { + SQueryTask *task = taosArrayGet(level->subTasks, i); + SCH_ERR_RET(schTaskRun(job, task)); + } + return TSDB_CODE_SUCCESS; } @@ -281,6 +379,18 @@ int32_t scheduleQueryJob(void *pRpc, SQueryDag* pDag, void** pJob) { SCH_ERR_JRET(schValidateAndBuildJob(pDag, job)); + job->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == job->execTasks) { + qError("taosHashInit %d failed", pDag->numOfSubplans); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + job->succTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == job->succTasks) { + qError("taosHashInit %d failed", pDag->numOfSubplans); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + SCH_ERR_JRET(schJobRun(job)); *(SQueryJob **)pJob = job; @@ -299,8 +409,10 @@ int32_t scheduleFetchRows(void *pRpc, void *pJob, void *data); int32_t scheduleCancelJob(void *pRpc, void *pJob); -void scheduleFreeJob(void *pJob) { - +void scheduleFreeJob(void *job) { + if (NULL == job) { + return; + } } void schedulerDestroy(void) { From 76738ad27a7017dfcae0e917fa997441ba6e1106 Mon Sep 17 00:00:00 2001 From: dapan Date: Sat, 18 Dec 2021 14:52:02 +0800 Subject: [PATCH 3/8] feature scheduler --- include/libs/catalog/catalog.h | 2 +- include/libs/scheduler/scheduler.h | 4 +-- source/libs/catalog/src/catalog.c | 12 +++++++++ source/libs/scheduler/inc/schedulerInt.h | 9 ++++++- source/libs/scheduler/src/scheduler.c | 32 +++++++++++++++++++++--- 5 files changed, 51 insertions(+), 8 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index ee626865fb..68eae03f51 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -137,7 +137,7 @@ int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void *pRpc, const S int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp); -int32_t catalogGetQnodeList(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet); +int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet); diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 10eba7f49c..c5002e1b36 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -23,7 +23,7 @@ extern "C" { #include "planner.h" typedef struct SSchedulerCfg { - + int32_t clusterType; } SSchedulerCfg; typedef struct SQueryProfileSummary { @@ -55,7 +55,7 @@ typedef struct SQueryProfileSummary { * @param pJob * @return */ -int32_t scheduleQueryJob(void *pRpc, SQueryDag* pDag, void** pJob); +int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob); int32_t scheduleFetchRows(void *pRpc, void *pJob, void *data); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 32cedb82b0..47731dde77 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -634,6 +634,18 @@ _return: CTG_RET(code); } +int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet) { + if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pQnodeEpSet) { + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + } + + pQnodeEpSet->inUse = 0; + pQnodeEpSet->numOfEps = 0; + + return TSDB_CODE_SUCCESS; +} + + void catalogDestroy(void) { if (ctgMgmt.pCluster) { taosHashCleanup(ctgMgmt.pCluster); //TBD diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index a08ccda54b..c5ab74a85f 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -29,6 +29,8 @@ extern "C" { #define SCHEDULE_DEFAULT_JOB_NUMBER 1000 #define SCHEDULE_DEFAULT_TASK_NUMBER 1000 +#define SCHEDULE_MAX_CONDIDATE_EP_NUM 3 + enum { SCH_STATUS_NOT_START = 1, SCH_STATUS_EXECUTING, @@ -40,6 +42,7 @@ enum { typedef struct SSchedulerMgmt { uint64_t taskId; + SSchedulerCfg cfg; SHashObj *Jobs; // key: queryId, value: SQueryJob* } SSchedulerMgmt; @@ -69,6 +72,10 @@ typedef struct SQueryJob { int32_t levelIdx; int8_t status; SQueryProfileSummary summary; + SEpSet dataSrcEps; + struct SCatalog *catalog; + void *rpc; + SEpSet *mgmtEpSet; SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* @@ -78,7 +85,7 @@ typedef struct SQueryJob { } SQueryJob; #define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE - +#define SCH_IS_DATA_SRC_TASK(task) (task->plan->type == QUERY_TYPE_SCAN) #define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__) #define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 37d5b617d5..396930dc55 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -16,6 +16,7 @@ #include "schedulerInt.h" #include "taosmsg.h" #include "query.h" +#include "catalog.h" SSchedulerMgmt schMgmt = {0}; @@ -223,8 +224,16 @@ _return: SCH_RET(code); } -int32_t schAvailableEpSet(SEpSet *epSet) { +int32_t schAvailableEpSet(SQueryJob *job, SEpSet *epSet) { + SCH_ERR_RET(catalogGetQnodeList(job->catalog, job->rpc, job->mgmtEpSet, epSet)); + if (epSet->numOfEps > SCHEDULE_MAX_CONDIDATE_EP_NUM) { + return TSDB_CODE_SUCCESS; + } + + //TODO COPY dataSrcEps TO epSet + + return TSDB_CODE_SUCCESS; } @@ -267,6 +276,13 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { return TSDB_CODE_SUCCESS; } + if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCHEDULE_MAX_CONDIDATE_EP_NUM) { + strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn)); + job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port; + + ++job->dataSrcEps.numOfEps; + } + for (int32_t i = 0; i < parentNum; ++i) { SQueryTask *par = taosArrayGet(task->parents, i); @@ -335,7 +351,7 @@ int32_t schTaskRun(SQueryJob *job, SQueryTask *task) { SCH_ERR_RET(qSubPlanToString(plan, &task->msg)); if (plan->execEpSet.numOfEps <= 0) { - SCH_ERR_RET(schAvailableEpSet(&plan->execEpSet)); + SCH_ERR_RET(schAvailableEpSet(job, &plan->execEpSet)); } SCH_ERR_RET(schAsyncLaunchTask(job, task)); @@ -362,12 +378,16 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", SCHEDULE_DEFAULT_JOB_NUMBER); } + if (cfg) { + schMgmt.cfg = *cfg; + } + return TSDB_CODE_SUCCESS; } -int32_t scheduleQueryJob(void *pRpc, SQueryDag* pDag, void** pJob) { - if (NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { +int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob) { + if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -377,6 +397,10 @@ int32_t scheduleQueryJob(void *pRpc, SQueryDag* pDag, void** pJob) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + job->catalog = pCatalog; + job->rpc = pRpc; + job->mgmtEpSet = pMgmtEps; + SCH_ERR_JRET(schValidateAndBuildJob(pDag, job)); job->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); From c9940c0e4a78d546e3ec253f3b36c49f9a93911f Mon Sep 17 00:00:00 2001 From: dapan Date: Sat, 18 Dec 2021 16:48:56 +0800 Subject: [PATCH 4/8] feature scheduler --- include/libs/planner/planner.h | 4 +- source/libs/catalog/src/catalog.c | 2 - source/libs/scheduler/inc/schedulerInt.h | 7 ++- source/libs/scheduler/src/scheduler.c | 54 +++++++++++++++++++----- 4 files changed, 50 insertions(+), 17 deletions(-) diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 3f4d21a746..8b54b88b28 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -141,8 +141,8 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* // Set datasource of this subplan, multiple calls may be made to a subplan. // @subplan subplan to be schedule // @templateId templateId of a group of datasource subplans of this @subplan -// @eps Execution location of this group of datasource subplans, is an array of SEpAddr structures -int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SArray* eps); +// @ep one execution location of this group of datasource subplans +int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep); int32_t qExplainQuery(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, char** str); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 47731dde77..1134c7763a 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -639,8 +639,6 @@ int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } - pQnodeEpSet->inUse = 0; - pQnodeEpSet->numOfEps = 0; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index c5ab74a85f..368f01e5ac 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -29,7 +29,7 @@ extern "C" { #define SCHEDULE_DEFAULT_JOB_NUMBER 1000 #define SCHEDULE_DEFAULT_TASK_NUMBER 1000 -#define SCHEDULE_MAX_CONDIDATE_EP_NUM 3 +#define SCH_MAX_CONDIDATE_EP_NUM TSDB_MAX_REPLICA enum { SCH_STATUS_NOT_START = 1, @@ -54,7 +54,6 @@ typedef struct SQueryTask { SEpAddr execAddr; // task actual executed node address SQueryProfileSummary summary; // task execution summary int32_t childReady; // child task ready number - SArray *childSrcEp; // child Eps, element is SEpAddr SArray *childern; // the datasource tasks,from which to fetch the result, element is SQueryTask* SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* } SQueryTask; @@ -73,9 +72,13 @@ typedef struct SQueryJob { int8_t status; SQueryProfileSummary summary; SEpSet dataSrcEps; + SEpAddr resEp; struct SCatalog *catalog; void *rpc; SEpSet *mgmtEpSet; + tsem_t rspSem; + int32_t userFetch; + void *res; SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 396930dc55..a7676b6c76 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -224,14 +224,21 @@ _return: SCH_RET(code); } -int32_t schAvailableEpSet(SQueryJob *job, SEpSet *epSet) { - SCH_ERR_RET(catalogGetQnodeList(job->catalog, job->rpc, job->mgmtEpSet, epSet)); - - if (epSet->numOfEps > SCHEDULE_MAX_CONDIDATE_EP_NUM) { +int32_t schAvailableEpSet(SQueryJob *job, SEpSet *epSet) { + if (epSet->numOfEps >= SCH_MAX_CONDIDATE_EP_NUM) { return TSDB_CODE_SUCCESS; } - //TODO COPY dataSrcEps TO epSet + if (SCH_HAS_QNODE_IN_CLUSTER(schMgmt.cfg.clusterType)) { + SCH_ERR_RET(catalogGetQnodeList(job->catalog, job->rpc, job->mgmtEpSet, epSet)); + } else { + for (int32_t i = 0; i < job->dataSrcEps.numOfEps; ++i) { + strncpy(epSet->fqdn[epSet->numOfEps], &job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn)); + epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i]; + + ++epSet->numOfEps; + } + } return TSDB_CODE_SUCCESS; } @@ -245,6 +252,10 @@ int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCod } +int32_t schFetchFromRemote(SQueryJob *job) { + +} + int32_t schProcessOnJobSuccess(SQueryJob *job) { @@ -271,12 +282,15 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn)); + job->resEp.port = task->execAddr.port; + SCH_ERR_RET(schProcessOnJobSuccess()); return TSDB_CODE_SUCCESS; } - if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCHEDULE_MAX_CONDIDATE_EP_NUM) { + if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CONDIDATE_EP_NUM) { strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn)); job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port; @@ -288,10 +302,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { ++par->childReady; - if (NULL == taosArrayPush(par->childSrcEp, &task->execAddr)) { - qError("taosArrayPush failed"); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } + SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, task->plan->id.templateId, &task->execAddr)); if (SCH_TASK_READY_TO_LUNCH(par)) { SCH_ERR_RET(schTaskRun(job, task)); @@ -414,6 +425,8 @@ int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pM qError("taosHashInit %d failed", pDag->numOfSubplans); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + + tsem_init(&job->rspSem, 0, 0); SCH_ERR_JRET(schJobRun(job)); @@ -429,7 +442,26 @@ _return: SCH_RET(code); } -int32_t scheduleFetchRows(void *pRpc, void *pJob, void *data); +int32_t scheduleFetchRows(void *pRpc, void *pJob, void **data) { + if (NULL == pRpc || NULL == pJob || NULL == data) { + return TSDB_CODE_QRY_INVALID_INPUT; + } + + SQueryJob *job = pJob; + + if (atomic_val_compare_exchange_32(&job->userFetch, 0, 1) != 0) { + qError("prior fetching not finished"); + return TSDB_CODE_QRY_APP_ERROR; + } + + SCH_ERR_RET(schFetchFromRemote(job)); + + tsem_wait(&job->rspSem); + + *data = job->res; + + return TSDB_CODE_SUCCESS; +} int32_t scheduleCancelJob(void *pRpc, void *pJob); From a702b1460a93b76600a34460580a6de3b5c36e71 Mon Sep 17 00:00:00 2001 From: dapan Date: Sat, 18 Dec 2021 17:35:27 +0800 Subject: [PATCH 5/8] fix compile error --- include/libs/scheduler/scheduler.h | 3 +- source/libs/planner/inc/plannerInt.h | 2 +- source/libs/planner/src/physicalPlan.c | 2 +- source/libs/planner/src/planner.c | 4 +- source/libs/scheduler/CMakeLists.txt | 4 +- source/libs/scheduler/inc/schedulerInt.h | 5 +- source/libs/scheduler/src/scheduler.c | 66 ++++++++++++------------ 7 files changed, 46 insertions(+), 40 deletions(-) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index c5002e1b36..c5f3cd8f0f 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -21,6 +21,7 @@ extern "C" { #endif #include "planner.h" +#include "catalog.h" typedef struct SSchedulerCfg { int32_t clusterType; @@ -57,7 +58,7 @@ typedef struct SQueryProfileSummary { */ int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob); -int32_t scheduleFetchRows(void *pRpc, void *pJob, void *data); +int32_t scheduleFetchRows(void *pRpc, void *pJob, void **data); /** diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 19563a8a0c..c9b7b6d235 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -100,7 +100,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str); int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql); int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag); -int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SArray* eps); +int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep); int32_t subPlanToString(const SSubplan *pPhyNode, char** str); int32_t stringToSubplan(const char* str, SSubplan** subplan); diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index f187ec0ec9..72b30012c8 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -225,6 +225,6 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD return TSDB_CODE_SUCCESS; } -int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SArray* eps) { +int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) { //todo } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 3a90acb5fd..e9a4591d4a 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -46,8 +46,8 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* return TSDB_CODE_SUCCESS; } -int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SArray* eps) { - return setSubplanExecutionNode(subplan, templateId, eps); +int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) { + return setSubplanExecutionNode(subplan, templateId, ep); } int32_t qSubPlanToString(const SSubplan *subplan, char** str) { diff --git a/source/libs/scheduler/CMakeLists.txt b/source/libs/scheduler/CMakeLists.txt index 31f1c25bea..cdb4e08205 100644 --- a/source/libs/scheduler/CMakeLists.txt +++ b/source/libs/scheduler/CMakeLists.txt @@ -9,5 +9,5 @@ target_include_directories( target_link_libraries( scheduler - PRIVATE os util planner qcom common -) \ No newline at end of file + PRIVATE os util planner qcom common catalog transport +) diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 368f01e5ac..4f363aa032 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -54,7 +54,7 @@ typedef struct SQueryTask { SEpAddr execAddr; // task actual executed node address SQueryProfileSummary summary; // task execution summary int32_t childReady; // child task ready number - SArray *childern; // the datasource tasks,from which to fetch the result, element is SQueryTask* + SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* } SQueryTask; @@ -87,6 +87,7 @@ typedef struct SQueryJob { SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. } SQueryJob; +#define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE #define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE #define SCH_IS_DATA_SRC_TASK(task) (task->plan->type == QUERY_TYPE_SCAN) @@ -99,6 +100,8 @@ typedef struct SQueryJob { #define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) +extern int32_t schTaskRun(SQueryJob *job, SQueryTask *task); + #ifdef __cplusplus } #endif diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index a7676b6c76..3862ad1ade 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -62,8 +62,8 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { int32_t parentNum = (int32_t)taosArrayGetSize(plan->pParents); if (childNum > 0) { - task->childern = taosArrayInit(childNum, POINTER_BYTES); - if (NULL == task->childern) { + task->children = taosArrayInit(childNum, POINTER_BYTES); + if (NULL == task->children) { qError("taosArrayInit %d failed", childNum); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -77,7 +77,7 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - if (NULL == taosArrayPush(task->childern, &childTask)) { + if (NULL == taosArrayPush(task->children, &childTask)) { qError("taosArrayPush failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -233,7 +233,7 @@ int32_t schAvailableEpSet(SQueryJob *job, SEpSet *epSet) { SCH_ERR_RET(catalogGetQnodeList(job->catalog, job->rpc, job->mgmtEpSet, epSet)); } else { for (int32_t i = 0; i < job->dataSrcEps.numOfEps; ++i) { - strncpy(epSet->fqdn[epSet->numOfEps], &job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn)); + strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i])); epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i]; ++epSet->numOfEps; @@ -244,6 +244,32 @@ int32_t schAvailableEpSet(SQueryJob *job, SEpSet *epSet) { } +int32_t schPushTaskToExecList(SQueryJob *job, SQueryTask *task) { + if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { + qError("taosHashPut failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) { + if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { + qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId); + return TSDB_CODE_SUCCESS; + } + + if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { + qError("taosHashPut failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + *moved = true; + + return TSDB_CODE_SUCCESS; +} + + int32_t schAsyncLaunchTask(SQueryJob *job, SQueryTask *task) { } @@ -271,7 +297,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved)); if (!moved) { - qWarn("task[%d] already moved", task->taskId); + SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status); return TSDB_CODE_SUCCESS; } @@ -285,7 +311,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn)); job->resEp.port = task->execAddr.port; - SCH_ERR_RET(schProcessOnJobSuccess()); + SCH_ERR_RET(schProcessOnJobSuccess(job)); return TSDB_CODE_SUCCESS; } @@ -331,30 +357,6 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod } -int32_t schPushTaskToExecList(SQueryJob *job, SQueryTask *task) { - if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { - qError("taosHashPut failed"); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - return TSDB_CODE_SUCCESS; -} - -int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) { - if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { - qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId); - return TSDB_CODE_SUCCESS - } - - if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { - qError("taosHashPut failed"); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - *moved = true; - - return TSDB_CODE_SUCCESS; -} int32_t schTaskRun(SQueryJob *job, SQueryTask *task) { @@ -367,7 +369,7 @@ int32_t schTaskRun(SQueryJob *job, SQueryTask *task) { SCH_ERR_RET(schAsyncLaunchTask(job, task)); - SCH_ERR_RET(schPushTaskToExecList(job, task)) + SCH_ERR_RET(schPushTaskToExecList(job, task)); return TSDB_CODE_SUCCESS; } @@ -410,7 +412,7 @@ int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pM job->catalog = pCatalog; job->rpc = pRpc; - job->mgmtEpSet = pMgmtEps; + job->mgmtEpSet = (SEpSet *)pMgmtEps; SCH_ERR_JRET(schValidateAndBuildJob(pDag, job)); From 5579b50ac6d3b4c7ab875ce8e43fc5a009003766 Mon Sep 17 00:00:00 2001 From: dapan Date: Sun, 19 Dec 2021 16:27:50 +0800 Subject: [PATCH 6/8] feature/schduler --- include/common/taosmsg.h | 20 +++ source/libs/scheduler/inc/schedulerInt.h | 1 + source/libs/scheduler/src/scheduler.c | 150 ++++++++++++++++++++++- 3 files changed, 166 insertions(+), 5 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 4b63ff8d38..0484b8f133 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -51,6 +51,8 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_SET, "mq-set" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_RSP_READY, "rsp-ready" ) + // message from client to mnode TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" ) @@ -1074,6 +1076,24 @@ typedef struct { /* data */ } SUpdateTagValRsp; +typedef struct SSchedulerQueryMsg { + uint64_t queryId; + uint64_t taskId; + uint32_t contentLen; + char msg[]; +} SSchedulerQueryMsg; + +typedef struct SSchedulerReadyMsg { + uint64_t queryId; + uint64_t taskId; +} SSchedulerReadyMsg; + +typedef struct SSchedulerFetchMsg { + uint64_t queryId; + uint64_t taskId; +} SSchedulerFetchMsg; + + #pragma pack(pop) #ifdef __cplusplus diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 4f363aa032..4d04c4dff7 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -78,6 +78,7 @@ typedef struct SQueryJob { SEpSet *mgmtEpSet; tsem_t rspSem; int32_t userFetch; + int32_t remoteFetch; void *res; SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 3862ad1ade..dbbe791136 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -270,25 +270,158 @@ int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) { } -int32_t schAsyncLaunchTask(SQueryJob *job, SQueryTask *task) { +int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { + int32_t msgSize = 0; + void *msg = NULL; + + switch (msgType) { + case TSDB_MSG_TYPE_QUERY: { + if (NULL == task->msg) { + qError("query msg is NULL"); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + int32_t len = strlen(task->msg); + msgSize = sizeof(SSchedulerQueryMsg) + len; + msg = calloc(1, msgSize); + if (NULL == msg) { + qError("calloc %d failed", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SSchedulerQueryMsg *pMsg = msg; + pMsg->queryId = job->queryId; + pMsg->taskId = task->taskId; + pMsg->contentLen = len; + memcpy(pMsg->msg, task->msg, len); + break; + } + case TSDB_MSG_TYPE_RSP_READY: { + msgSize = sizeof(SSchedulerReadyMsg); + msg = calloc(1, msgSize); + if (NULL == msg) { + qError("calloc %d failed", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SSchedulerReadyMsg *pMsg = msg; + pMsg->queryId = job->queryId; + pMsg->taskId = task->taskId; + break; + } + case TSDB_MSG_TYPE_FETCH: { + msgSize = sizeof(SSchedulerFetchMsg); + msg = calloc(1, msgSize); + if (NULL == msg) { + qError("calloc %d failed", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SSchedulerFetchMsg *pMsg = msg; + pMsg->queryId = job->queryId; + pMsg->taskId = task->taskId; + break; + } + default: + qError("unknown msg type:%d", msgType); + break; + } + + //TODO SEND MSG + + return TSDB_CODE_SUCCESS; } int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCode, bool *needRetry) { } -int32_t schFetchFromRemote(SQueryJob *job) { +int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32_t rspCode) { + int32_t code = 0; + + switch (msgType) { + case TSDB_MSG_TYPE_QUERY: + if (rspCode != TSDB_CODE_SUCCESS) { + SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); + } else { + code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RSP_READY); + if (code) { + goto _task_error; + } + } + break; + case TSDB_MSG_TYPE_RSP_READY: + if (rspCode != TSDB_CODE_SUCCESS) { + SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); + } else { + code = schProcessOnTaskSuccess(job, task); + if (code) { + goto _task_error; + } + } + break; + case TSDB_MSG_TYPE_FETCH: + SCH_ERR_JRET(rspCode); + SCH_ERR_JRET(schProcessOnDataFetched(job)); + break; + default: + qError("unknown msg type:%d received", msgType); + return TSDB_CODE_QRY_INVALID_INPUT; + } + return TSDB_CODE_SUCCESS; + +_task_error: + SCH_ERR_JRET(schProcessOnTaskFailure(job, task, code)); + return TSDB_CODE_SUCCESS; + +_return: + code = schProcessOnJobFailure(job); + return code; +} + + +int32_t schFetchFromRemote(SQueryJob *job) { + int32_t code = 0; + + if (atomic_val_compare_exchange_32(&job->remoteFetch, 0, 1) != 0) { + qInfo("prior fetching not finished"); + return TSDB_CODE_SUCCESS; + } + + SCH_ERR_JRET(schAsyncSendMsg(job, NULL, TSDB_MSG_TYPE_FETCH)); + + return TSDB_CODE_SUCCESS; + +_return: + atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); + + return code; } int32_t schProcessOnJobSuccess(SQueryJob *job) { + if (job->userFetch) { + SCH_ERR_RET(schFetchFromRemote(job)); + } + return TSDB_CODE_SUCCESS; } int32_t schProcessOnJobFailure(SQueryJob *job) { + atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); + if (job->userFetch) { + tsem_post(&job->rspSem); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t schProcessOnDataFetched(SQueryJob *job) { + atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); + + tsem_post(&job->rspSem); } @@ -367,7 +500,7 @@ int32_t schTaskRun(SQueryJob *job, SQueryTask *task) { SCH_ERR_RET(schAvailableEpSet(job, &plan->execEpSet)); } - SCH_ERR_RET(schAsyncLaunchTask(job, task)); + SCH_ERR_RET(schAsyncSendMsg(job, task, TSDB_MSG_TYPE_QUERY)); SCH_ERR_RET(schPushTaskToExecList(job, task)); @@ -450,19 +583,26 @@ int32_t scheduleFetchRows(void *pRpc, void *pJob, void **data) { } SQueryJob *job = pJob; + int32_t code = 0; if (atomic_val_compare_exchange_32(&job->userFetch, 0, 1) != 0) { qError("prior fetching not finished"); return TSDB_CODE_QRY_APP_ERROR; } - SCH_ERR_RET(schFetchFromRemote(job)); + if (job->status == SCH_STATUS_SUCCEED) { + SCH_ERR_JRET(schFetchFromRemote(job)); + } tsem_wait(&job->rspSem); *data = job->res; + job->res = NULL; - return TSDB_CODE_SUCCESS; +_return: + atomic_val_compare_exchange_32(&job->userFetch, 1, 0); + + return code; } int32_t scheduleCancelJob(void *pRpc, void *pJob); From 35f76a3c18417b58d637dc5aaabf15bd32c50819 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 20 Dec 2021 09:28:22 +0800 Subject: [PATCH 7/8] set task/job status --- source/libs/scheduler/inc/schedulerInt.h | 2 +- source/libs/scheduler/src/scheduler.c | 31 ++++++++++++++++++------ 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 4d04c4dff7..db75fc4fdd 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -101,7 +101,7 @@ typedef struct SQueryJob { #define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) -extern int32_t schTaskRun(SQueryJob *job, SQueryTask *task); +extern int32_t schLaunchTask(SQueryJob *job, SQueryTask *task); #ifdef __cplusplus } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index dbbe791136..e68be25bce 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -333,7 +333,12 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { } int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCode, bool *needRetry) { + // TODO set retry or not based on task type/errCode/retry times/job status/available eps... + // TODO if needRetry, set task retry info + *needRetry = false; + + return TSDB_CODE_SUCCESS; } int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32_t rspCode) { @@ -401,6 +406,8 @@ _return: int32_t schProcessOnJobSuccess(SQueryJob *job) { + job->status = SCH_STATUS_SUCCEED; + if (job->userFetch) { SCH_ERR_RET(schFetchFromRemote(job)); } @@ -409,6 +416,8 @@ int32_t schProcessOnJobSuccess(SQueryJob *job) { } int32_t schProcessOnJobFailure(SQueryJob *job) { + job->status = SCH_STATUS_FAILED; + atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); if (job->userFetch) { @@ -433,6 +442,8 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status); return TSDB_CODE_SUCCESS; } + + task->status = SCH_STATUS_SUCCEED; int32_t parentNum = (int32_t)taosArrayGetSize(task->parents); if (parentNum == 0) { @@ -464,7 +475,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, task->plan->id.templateId, &task->execAddr)); if (SCH_TASK_READY_TO_LUNCH(par)) { - SCH_ERR_RET(schTaskRun(job, task)); + SCH_ERR_RET(schLaunchTask(job, task)); } } @@ -484,7 +495,7 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod return TSDB_CODE_SUCCESS; } - SCH_ERR_RET(schTaskRun(job, task)); + SCH_ERR_RET(schLaunchTask(job, task)); return TSDB_CODE_SUCCESS; } @@ -492,7 +503,7 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod -int32_t schTaskRun(SQueryJob *job, SQueryTask *task) { +int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) { SSubplan *plan = task->plan; SCH_ERR_RET(qSubPlanToString(plan, &task->msg)); @@ -504,15 +515,19 @@ int32_t schTaskRun(SQueryJob *job, SQueryTask *task) { SCH_ERR_RET(schPushTaskToExecList(job, task)); + task->status = SCH_STATUS_EXECUTING; + return TSDB_CODE_SUCCESS; } -int32_t schJobRun(SQueryJob *job) { +int32_t schLaunchJob(SQueryJob *job) { SQueryLevel *level = taosArrayGet(job->levels, job->levelIdx); for (int32_t i = 0; i < level->taskNum; ++i) { SQueryTask *task = taosArrayGet(level->subTasks, i); - SCH_ERR_RET(schTaskRun(job, task)); + SCH_ERR_RET(schLaunchTask(job, task)); } + + job->status = SCH_STATUS_EXECUTING; return TSDB_CODE_SUCCESS; } @@ -563,7 +578,7 @@ int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pM tsem_init(&job->rspSem, 0, 0); - SCH_ERR_JRET(schJobRun(job)); + SCH_ERR_JRET(schLaunchJob(job)); *(SQueryJob **)pJob = job; @@ -611,11 +626,13 @@ void scheduleFreeJob(void *job) { if (NULL == job) { return; } + + //TODO } void schedulerDestroy(void) { if (schMgmt.Jobs) { - taosHashCleanup(schMgmt.Jobs); //TBD + taosHashCleanup(schMgmt.Jobs); //TODO schMgmt.Jobs = NULL; } } From 97da2ee95840ec74f676f7d90ade97d4a2543ae6 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 20 Dec 2021 10:00:19 +0800 Subject: [PATCH 8/8] modify api and compile warning --- include/libs/scheduler/scheduler.h | 4 +- source/libs/scheduler/src/scheduler.c | 129 +++++++++++++++----------- 2 files changed, 79 insertions(+), 54 deletions(-) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index c5f3cd8f0f..c01b79d7ff 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -58,7 +58,7 @@ typedef struct SQueryProfileSummary { */ int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob); -int32_t scheduleFetchRows(void *pRpc, void *pJob, void **data); +int32_t scheduleFetchRows(void *pJob, void **data); /** @@ -66,7 +66,7 @@ int32_t scheduleFetchRows(void *pRpc, void *pJob, void **data); * @param pJob * @return */ -int32_t scheduleCancelJob(void *pRpc, void *pJob); +int32_t scheduleCancelJob(void *pJob); void scheduleFreeJob(void *pJob); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index e68be25bce..2327fc5b04 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -127,6 +127,8 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { int32_t code = 0; + job->queryId = dag->queryId; + if (dag->numOfSubplans <= 0) { qError("invalid subplan num:%d", dag->numOfSubplans); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -152,7 +154,6 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { job->levelNum = levelNum; job->levelIdx = levelNum - 1; - job->status = SCH_STATUS_NOT_START; job->subPlans = dag->pSubplans; @@ -341,50 +342,6 @@ int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCod return TSDB_CODE_SUCCESS; } -int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32_t rspCode) { - int32_t code = 0; - - switch (msgType) { - case TSDB_MSG_TYPE_QUERY: - if (rspCode != TSDB_CODE_SUCCESS) { - SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); - } else { - code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RSP_READY); - if (code) { - goto _task_error; - } - } - break; - case TSDB_MSG_TYPE_RSP_READY: - if (rspCode != TSDB_CODE_SUCCESS) { - SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); - } else { - code = schProcessOnTaskSuccess(job, task); - if (code) { - goto _task_error; - } - } - break; - case TSDB_MSG_TYPE_FETCH: - SCH_ERR_JRET(rspCode); - SCH_ERR_JRET(schProcessOnDataFetched(job)); - break; - default: - qError("unknown msg type:%d received", msgType); - return TSDB_CODE_QRY_INVALID_INPUT; - } - - return TSDB_CODE_SUCCESS; - -_task_error: - SCH_ERR_JRET(schProcessOnTaskFailure(job, task, code)); - return TSDB_CODE_SUCCESS; - -_return: - code = schProcessOnJobFailure(job); - return code; -} - int32_t schFetchFromRemote(SQueryJob *job) { int32_t code = 0; @@ -500,6 +457,50 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod return TSDB_CODE_SUCCESS; } +int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32_t rspCode) { + int32_t code = 0; + + switch (msgType) { + case TSDB_MSG_TYPE_QUERY: + if (rspCode != TSDB_CODE_SUCCESS) { + SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); + } else { + code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RSP_READY); + if (code) { + goto _task_error; + } + } + break; + case TSDB_MSG_TYPE_RSP_READY: + if (rspCode != TSDB_CODE_SUCCESS) { + SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); + } else { + code = schProcessOnTaskSuccess(job, task); + if (code) { + goto _task_error; + } + } + break; + case TSDB_MSG_TYPE_FETCH: + SCH_ERR_JRET(rspCode); + SCH_ERR_JRET(schProcessOnDataFetched(job)); + break; + default: + qError("unknown msg type:%d received", msgType); + return TSDB_CODE_QRY_INVALID_INPUT; + } + + return TSDB_CODE_SUCCESS; + +_task_error: + SCH_ERR_JRET(schProcessOnTaskFailure(job, task, code)); + return TSDB_CODE_SUCCESS; + +_return: + code = schProcessOnJobFailure(job); + return code; +} + @@ -578,10 +579,17 @@ int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pM tsem_init(&job->rspSem, 0, 0); + if (0 != taosHashPut(schMgmt.Jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES)) { + qError("taosHashPut queryId:%"PRIx64" failed", job->queryId); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + job->status = SCH_STATUS_NOT_START; + SCH_ERR_JRET(schLaunchJob(job)); *(SQueryJob **)pJob = job; - + return TSDB_CODE_SUCCESS; _return: @@ -592,8 +600,8 @@ _return: SCH_RET(code); } -int32_t scheduleFetchRows(void *pRpc, void *pJob, void **data) { - if (NULL == pRpc || NULL == pJob || NULL == data) { +int32_t scheduleFetchRows(void *pJob, void **data) { + if (NULL == pJob || NULL == data) { return TSDB_CODE_QRY_INVALID_INPUT; } @@ -620,14 +628,31 @@ _return: return code; } -int32_t scheduleCancelJob(void *pRpc, void *pJob); +int32_t scheduleCancelJob(void *pJob) { + //TODO -void scheduleFreeJob(void *job) { - if (NULL == job) { + return TSDB_CODE_SUCCESS; +} + +void scheduleFreeJob(void *pJob) { + if (NULL == pJob) { return; } - //TODO + SQueryJob *job = pJob; + + if (job->status > 0) { + if (0 != taosHashRemove(schMgmt.Jobs, &job->queryId, sizeof(job->queryId))) { + qError("remove job:%"PRIx64"from mgmt failed", job->queryId); // maybe already freed + return; + } + + if (job->status == SCH_STATUS_EXECUTING) { + scheduleCancelJob(pJob); + } + } + + //TODO free job } void schedulerDestroy(void) {