From 8dd9cc7480d98bab448cee16d448e49fa2911b09 Mon Sep 17 00:00:00 2001 From: dapan Date: Sat, 18 Dec 2021 12:28:01 +0800 Subject: [PATCH] scheduler job code --- include/libs/planner/planner.h | 2 + source/libs/scheduler/inc/schedulerInt.h | 29 ++-- source/libs/scheduler/src/scheduler.c | 176 ++++++++++++++++++----- 3 files changed, 164 insertions(+), 43 deletions(-) diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index a7d418d45e..c147a399f0 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -105,6 +105,8 @@ typedef struct SSubplan { } SSubplan; typedef struct SQueryDag { + uint64_t queryId; + int32_t numOfSubplans; SArray *pSubplans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. } SQueryDag; diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 8609c31748..a08ccda54b 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -44,14 +44,16 @@ typedef struct SSchedulerMgmt { } SSchedulerMgmt; typedef struct SQueryTask { - uint64_t taskId; // task id - SSubplan *plan; // subplan - char *msg; // operator tree - int8_t status; // task status - SEpAddr execAddr; // task actual executed node address - SQueryProfileSummary summary; // task execution summary - SArray *childern; // the datasource tasks,from which to fetch the result, element is SQueryTask* - SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* + uint64_t taskId; // task id + SSubplan *plan; // subplan + char *msg; // operator tree + int8_t status; // task status + SEpAddr execAddr; // task actual executed node address + SQueryProfileSummary summary; // task execution summary + int32_t childReady; // child task ready number + SArray *childSrcEp; // child Eps, element is SEpAddr + SArray *childern; // the datasource tasks,from which to fetch the result, element is SQueryTask* + SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* } SQueryTask; typedef struct SQueryLevel { @@ -67,11 +69,16 @@ typedef struct SQueryJob { int32_t levelIdx; int8_t status; SQueryProfileSummary summary; - - SArray *levels; // Element is SQueryLevel, starting from 0. - SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. + + SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* + SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* + + SArray *levels; // Element is SQueryLevel, starting from 0. + SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. } SQueryJob; +#define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE + #define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__) #define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index deaecaae53..37d5b617d5 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -53,7 +53,7 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { for (int32_t i = 0; i < job->levelNum; ++i) { SQueryLevel *level = taosArrayGet(job->levels, i); - + for (int32_t m = 0; m < level->taskNum; ++m) { SQueryTask *task = taosArrayGet(level->subTasks, m); SSubplan *plan = task->plan; @@ -106,12 +106,31 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { } } + SQueryLevel *level = taosArrayGet(job->levels, 0); + if (level->taskNum > 1) { + qError("invalid plan info, level 0, taskNum:%d", level->taskNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + SQueryTask *task = taosArrayGet(level->subTasks, 0); + if (task->parents && taosArrayGetSize(task->parents) > 0) { + qError("invalid plan info, level 0, parentNum:%d", (int32_t)taosArrayGetSize(task->parents)); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + return TSDB_CODE_SUCCESS; } int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { int32_t code = 0; + + if (dag->numOfSubplans <= 0) { + qError("invalid subplan num:%d", dag->numOfSubplans); + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans); if (levelNum <= 0) { qError("invalid level num:%d", levelNum); @@ -213,47 +232,126 @@ int32_t schAsyncLaunchTask(SQueryJob *job, SQueryTask *task) { } +int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCode, bool *needRetry) { + +} -int32_t schTaskRun(SQueryJob *job, SQueryTask *task) { - SSubplan *plan = task->plan; +int32_t schProcessOnJobSuccess(SQueryJob *job) { + +} + +int32_t schProcessOnJobFailure(SQueryJob *job) { + +} + + +int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { + bool moved = false; - switch (task->status) { - case SCH_STATUS_NOT_START: - SCH_ERR_RET(qSubPlanToString(plan, &task->msg)); - if (plan->execEpSet.numOfEps <= 0) { - SCH_ERR_RET(schAvailableEpSet(&plan->execEpSet)); - } - SCH_ERR_RET(schAsyncLaunchTask(job, task)); - break; - case SCH_STATUS_EXECUTING: - break; - case SCH_STATUS_SUCCEED: - break; - default: - SCH_JOB_ERR_LOG("invalid level status:%d, levelIdx:%d", job->status, job->levelIdx); - SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved)); + if (!moved) { + qWarn("task[%d] already moved", task->taskId); + return TSDB_CODE_SUCCESS; + } + + int32_t parentNum = (int32_t)taosArrayGetSize(task->parents); + if (parentNum == 0) { + if (task->plan->level != 0) { + qError("level error"); + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + SCH_ERR_RET(schProcessOnJobSuccess()); + + return TSDB_CODE_SUCCESS; + } + + for (int32_t i = 0; i < parentNum; ++i) { + SQueryTask *par = taosArrayGet(task->parents, i); + + ++par->childReady; + + if (NULL == taosArrayPush(par->childSrcEp, &task->execAddr)) { + qError("taosArrayPush failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + if (SCH_TASK_READY_TO_LUNCH(par)) { + SCH_ERR_RET(schTaskRun(job, task)); + } } return TSDB_CODE_SUCCESS; } -int32_t schJobRun(SQueryJob *job) { - bool cont = true; +int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCode) { + bool needRetry = false; + SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry)); - while (cont) { - switch (job->status) { - case SCH_STATUS_NOT_START: - case SCH_STATUS_EXECUTING: + if (!needRetry) { + SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode); + + job->status = SCH_STATUS_FAILED; + SCH_ERR_RET(schProcessOnJobFailure(job)); - break; - - default: - SCH_JOB_ERR_LOG("invalid job status:%d", job->status); - SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); - } + return TSDB_CODE_SUCCESS; } + SCH_ERR_RET(schTaskRun(job, task)); + + return TSDB_CODE_SUCCESS; +} + + +int32_t schPushTaskToExecList(SQueryJob *job, SQueryTask *task) { + if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { + qError("taosHashPut failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) { + if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { + qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId); + return TSDB_CODE_SUCCESS + } + + if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { + qError("taosHashPut failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + *moved = true; + + return TSDB_CODE_SUCCESS; +} + + +int32_t schTaskRun(SQueryJob *job, SQueryTask *task) { + SSubplan *plan = task->plan; + + SCH_ERR_RET(qSubPlanToString(plan, &task->msg)); + if (plan->execEpSet.numOfEps <= 0) { + SCH_ERR_RET(schAvailableEpSet(&plan->execEpSet)); + } + + SCH_ERR_RET(schAsyncLaunchTask(job, task)); + + SCH_ERR_RET(schPushTaskToExecList(job, task)) + + return TSDB_CODE_SUCCESS; +} + +int32_t schJobRun(SQueryJob *job) { + SQueryLevel *level = taosArrayGet(job->levels, job->levelIdx); + for (int32_t i = 0; i < level->taskNum; ++i) { + SQueryTask *task = taosArrayGet(level->subTasks, i); + SCH_ERR_RET(schTaskRun(job, task)); + } + return TSDB_CODE_SUCCESS; } @@ -281,6 +379,18 @@ int32_t scheduleQueryJob(void *pRpc, SQueryDag* pDag, void** pJob) { SCH_ERR_JRET(schValidateAndBuildJob(pDag, job)); + job->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == job->execTasks) { + qError("taosHashInit %d failed", pDag->numOfSubplans); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + job->succTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == job->succTasks) { + qError("taosHashInit %d failed", pDag->numOfSubplans); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + SCH_ERR_JRET(schJobRun(job)); *(SQueryJob **)pJob = job; @@ -299,8 +409,10 @@ int32_t scheduleFetchRows(void *pRpc, void *pJob, void *data); int32_t scheduleCancelJob(void *pRpc, void *pJob); -void scheduleFreeJob(void *pJob) { - +void scheduleFreeJob(void *job) { + if (NULL == job) { + return; + } } void schedulerDestroy(void) {