diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index eb2adda394..a19102fdbe 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -233,9 +233,9 @@ typedef struct { } SEpAddrMsg; typedef struct { - char *fqdn; + char fqdn[TSDB_FQDN_LEN]; uint16_t port; -} SEpAddr1; +} SEpAddr; typedef struct { int32_t numOfVnodes; diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 40c85520d4..10eba7f49c 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -55,9 +55,9 @@ typedef struct SQueryProfileSummary { * @param pJob * @return */ -int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob); +int32_t scheduleQueryJob(void *pRpc, SQueryDag* pDag, void** pJob); -int32_t scheduleFetchRows(void *pJob, void *data); +int32_t scheduleFetchRows(void *pRpc, void *pJob, void *data); /** @@ -65,7 +65,7 @@ int32_t scheduleFetchRows(void *pJob, void *data); * @param pJob * @return */ -int32_t scheduleCancelJob(void *pJob); +int32_t scheduleCancelJob(void *pRpc, void *pJob); void scheduleFreeJob(void *pJob); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 5abb678a07..f4f05062b5 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -508,6 +508,7 @@ int32_t* taosGetErrno(); //scheduler #define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501) //scheduler status error +#define TSDB_CODE_SCH_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2502) //scheduler internal error diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 4620a816da..8609c31748 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -27,6 +27,7 @@ extern "C" { #include "thash.h" #define SCHEDULE_DEFAULT_JOB_NUMBER 1000 +#define SCHEDULE_DEFAULT_TASK_NUMBER 1000 enum { SCH_STATUS_NOT_START = 1, @@ -43,18 +44,21 @@ typedef struct SSchedulerMgmt { } SSchedulerMgmt; typedef struct SQueryTask { - uint64_t taskId; // task id - char *msg; // operator tree - int8_t status; // task status - SQueryProfileSummary summary; // task execution summary + uint64_t taskId; // task id + SSubplan *plan; // subplan + char *msg; // operator tree + int8_t status; // task status + SEpAddr execAddr; // task actual executed node address + SQueryProfileSummary summary; // task execution summary + SArray *childern; // the datasource tasks,from which to fetch the result, element is SQueryTask* + SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* } SQueryTask; typedef struct SQueryLevel { + int32_t level; int8_t status; int32_t taskNum; - SArray *subTasks; // Element is SQueryTask - SArray *subPlans; // Element is SSubplan } SQueryLevel; typedef struct SQueryJob { @@ -70,6 +74,7 @@ typedef struct SQueryJob { #define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__) +#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) #define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 7d387cbc66..deaecaae53 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -50,6 +50,66 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_ */ } +int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { + for (int32_t i = 0; i < job->levelNum; ++i) { + SQueryLevel *level = taosArrayGet(job->levels, i); + + for (int32_t m = 0; m < level->taskNum; ++m) { + SQueryTask *task = taosArrayGet(level->subTasks, m); + SSubplan *plan = task->plan; + int32_t childNum = (int32_t)taosArrayGetSize(plan->pChildern); + int32_t parentNum = (int32_t)taosArrayGetSize(plan->pParents); + + if (childNum > 0) { + task->childern = taosArrayInit(childNum, POINTER_BYTES); + if (NULL == task->childern) { + qError("taosArrayInit %d failed", childNum); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } + + for (int32_t n = 0; n < childNum; ++n) { + SSubplan *child = taosArrayGet(plan->pChildern, n); + SQueryTask *childTask = taosHashGet(planToTask, &child, POINTER_BYTES); + if (childTask) { + qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + if (NULL == taosArrayPush(task->childern, &childTask)) { + qError("taosArrayPush failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } + + if (parentNum > 0) { + task->parents = taosArrayInit(parentNum, POINTER_BYTES); + if (NULL == task->parents) { + qError("taosArrayInit %d failed", parentNum); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } + + for (int32_t n = 0; n < parentNum; ++n) { + SSubplan *parent = taosArrayGet(plan->pParents, n); + SQueryTask *parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES); + if (parentTask) { + qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + if (NULL == taosArrayPush(task->parents, &parentTask)) { + qError("taosArrayPush failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } + } + } + + return TSDB_CODE_SUCCESS; +} + + int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { int32_t code = 0; int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans); @@ -58,10 +118,16 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_TASK_NUMBER, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (NULL == planToTask) { + qError("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + job->levels = taosArrayInit(levelNum, sizeof(SQueryLevel)); if (NULL == job->levels) { qError("taosArrayInit %d failed", levelNum); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } job->levelNum = levelNum; @@ -77,32 +143,39 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { level.status = SCH_STATUS_NOT_START; for (int32_t i = 0; i < levelNum; ++i) { + level.level = i; levelPlans = taosArrayGetP(dag->pSubplans, i); if (NULL == levelPlans) { qError("no level plans for level %d", i); - SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } levelPlanNum = (int32_t)taosArrayGetSize(levelPlans); if (levelPlanNum <= 0) { qError("invalid level plans number:%d, level:%d", levelPlanNum, i); - SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } level.taskNum = levelPlanNum; - level.subPlans = levelPlans; level.subTasks = taosArrayInit(levelPlanNum, sizeof(SQueryTask)); if (NULL == level.subTasks) { qError("taosArrayInit %d failed", levelPlanNum); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } for (int32_t n = 0; n < levelPlanNum; ++n) { + SSubplan *plan = taosArrayGet(levelPlans, n); SQueryTask *task = taosArrayGet(level.subTasks, n); task->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); + task->plan = plan; task->status = SCH_STATUS_NOT_START; + + if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &task, POINTER_BYTES)) { + qError("taosHashPut failed"); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } } if (NULL == taosArrayPush(job->levels, &level)) { @@ -111,6 +184,12 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { } } + SCH_ERR_JRET(schBuildTaskRalation(job, planToTask)); + + if (planToTask) { + taosHashCleanup(planToTask); + } + return TSDB_CODE_SUCCESS; _return: @@ -118,20 +197,64 @@ _return: taosArrayDestroy(level.subTasks); } + if (planToTask) { + taosHashCleanup(planToTask); + } + SCH_RET(code); } +int32_t schAvailableEpSet(SEpSet *epSet) { -int32_t schJobExecute(SQueryJob *job) { - switch (job->status) { +} + + +int32_t schAsyncLaunchTask(SQueryJob *job, SQueryTask *task) { + +} + + + +int32_t schTaskRun(SQueryJob *job, SQueryTask *task) { + SSubplan *plan = task->plan; + + switch (task->status) { case SCH_STATUS_NOT_START: - + SCH_ERR_RET(qSubPlanToString(plan, &task->msg)); + if (plan->execEpSet.numOfEps <= 0) { + SCH_ERR_RET(schAvailableEpSet(&plan->execEpSet)); + } + SCH_ERR_RET(schAsyncLaunchTask(job, task)); + break; + case SCH_STATUS_EXECUTING: + break; + case SCH_STATUS_SUCCEED: break; - default: - SCH_JOB_ERR_LOG("invalid job status:%d", job->status); + SCH_JOB_ERR_LOG("invalid level status:%d, levelIdx:%d", job->status, job->levelIdx); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } + + return TSDB_CODE_SUCCESS; +} + +int32_t schJobRun(SQueryJob *job) { + bool cont = true; + + while (cont) { + switch (job->status) { + case SCH_STATUS_NOT_START: + case SCH_STATUS_EXECUTING: + + break; + + default: + SCH_JOB_ERR_LOG("invalid job status:%d", job->status); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + } + + return TSDB_CODE_SUCCESS; } @@ -145,7 +268,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { } -int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob) { +int32_t scheduleQueryJob(void *pRpc, SQueryDag* pDag, void** pJob) { if (NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -158,7 +281,7 @@ int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob) { SCH_ERR_JRET(schValidateAndBuildJob(pDag, job)); - SCH_ERR_JRET(schJobExecute(job)); + SCH_ERR_JRET(schJobRun(job)); *(SQueryJob **)pJob = job; @@ -172,9 +295,9 @@ _return: SCH_RET(code); } -int32_t scheduleFetchRows(void *pJob, void *data); +int32_t scheduleFetchRows(void *pRpc, void *pJob, void *data); -int32_t scheduleCancelJob(void *pJob); +int32_t scheduleCancelJob(void *pRpc, void *pJob); void scheduleFreeJob(void *pJob) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index e3a5c9e034..377b1bb602 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -503,6 +503,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error" //scheduler TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status error") +TAOS_DEFINE_ERROR(TSDB_CODE_SCH_INTERNAL_ERROR, "scheduler internal error") #ifdef TAOS_ERROR_C