diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index be92de774b..9e0878e118 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -132,7 +132,7 @@ typedef struct SSchLevel { int32_t taskSucceed; int32_t taskNum; int32_t taskLaunchedNum; - SHashObj *flowCtrl; // key is ep, element is SSchFlowControl + int32_t taskDoneNum; SArray *subTasks; // Element is SQueryTask } SSchLevel; @@ -175,11 +175,13 @@ typedef struct SSchJob { SArray *levels; // starting from 0. SArray SNodeList *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler + SArray *dataSrcTasks; // SArray int32_t levelIdx; SEpSet dataSrcEps; SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask* + SHashObj *flowCtrl; // key is ep, element is SSchFlowControl SExplainCtx *explainCtx; int8_t status; @@ -200,7 +202,7 @@ typedef struct SSchJob { extern SSchedulerMgmt schMgmt; -#define SCH_TASK_READY_TO_LUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children)) +#define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children)) #define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1) #define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0) @@ -223,7 +225,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true #define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl) -#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEAF_TASK(_job, _task) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) +#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) #define SCH_SET_JOB_TYPE(_job, type) (_job)->attr.queryJob = ((type) != SUBPLAN_TYPE_MODIFY) #define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) @@ -261,7 +263,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task); int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType); SSchJob *schAcquireJob(int64_t refId); int32_t schReleaseJob(int64_t refId); -void schFreeFlowCtrl(SSchLevel *pLevel); +void schFreeFlowCtrl(SSchJob *pJob); int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel); int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask); int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough); diff --git a/source/libs/scheduler/src/schFlowCtrl.c b/source/libs/scheduler/src/schFlowCtrl.c index 993521da87..fbcca403bb 100644 --- a/source/libs/scheduler/src/schFlowCtrl.c +++ b/source/libs/scheduler/src/schFlowCtrl.c @@ -19,13 +19,13 @@ #include "catalog.h" #include "tref.h" -void schFreeFlowCtrl(SSchLevel *pLevel) { - if (NULL == pLevel->flowCtrl) { +void schFreeFlowCtrl(SSchJob *pJob) { + if (NULL == pJob->flowCtrl) { return; } SSchFlowControl *ctrl = NULL; - void *pIter = taosHashIterate(pLevel->flowCtrl, NULL); + void *pIter = taosHashIterate(pJob->flowCtrl, NULL); while (pIter) { ctrl = (SSchFlowControl *)pIter; @@ -33,11 +33,11 @@ void schFreeFlowCtrl(SSchLevel *pLevel) { taosArrayDestroy(ctrl->taskList); } - pIter = taosHashIterate(pLevel->flowCtrl, pIter); + pIter = taosHashIterate(pJob->flowCtrl, pIter); } - taosHashCleanup(pLevel->flowCtrl); - pLevel->flowCtrl = NULL; + taosHashCleanup(pJob->flowCtrl); + pJob->flowCtrl = NULL; } int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { @@ -47,9 +47,9 @@ int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { } int32_t sum = 0; - - for (int32_t i = 0; i < pLevel->taskNum; ++i) { - SSchTask *pTask = taosArrayGet(pLevel->subTasks, i); + int32_t taskNum = taosArrayGetSize(pJob->dataSrcTasks); + for (int32_t i = 0; i < taskNum; ++i) { + SSchTask *pTask = *(SSchTask **)taosArrayGet(pJob->dataSrcTasks, i); sum += pTask->plan->execNodeStat.tableNum; } @@ -59,9 +59,9 @@ int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { return TSDB_CODE_SUCCESS; } - pLevel->flowCtrl = taosHashInit(pLevel->taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - if (NULL == pLevel->flowCtrl) { - SCH_JOB_ELOG("taosHashInit %d flowCtrl failed", pLevel->taskNum); + pJob->flowCtrl = taosHashInit(pJob->taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (NULL == pJob->flowCtrl) { + SCH_JOB_ELOG("taosHashInit %d flowCtrl failed", pJob->taskNum); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -78,7 +78,7 @@ int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) { int32_t code = 0; SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); - ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp)); + ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp)); if (NULL == ctrl) { SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); @@ -110,11 +110,11 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) { SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); do { - ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp)); + ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp)); if (NULL == ctrl) { SSchFlowControl nctrl = {.tableNumSum = pTask->plan->execNodeStat.tableNum, .execTaskNum = 1}; - code = taosHashPut(pLevel->flowCtrl, ep, sizeof(SEp), &nctrl, sizeof(nctrl)); + code = taosHashPut(pJob->flowCtrl, ep, sizeof(SEp), &nctrl, sizeof(nctrl)); if (code) { if (HASH_NODE_EXIST(code)) { continue; @@ -273,10 +273,9 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask)); - SSchLevel *pLevel = pTask->level; SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); - SSchFlowControl *ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp)); + SSchFlowControl *ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp)); if (NULL == ctrl) { SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index dcd87557aa..7a75c00e6e 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -391,6 +391,8 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { SCH_TASK_ELOG("taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + + SCH_TASK_DLOG("children info, the %d child TID %" PRIx64, n, (*childTask)->taskId); } if (parentNum > 0) { @@ -423,6 +425,8 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + + SCH_TASK_DLOG("parents info, the %d parent TID %" PRIx64, n, (*parentTask)->taskId); } SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum); @@ -464,6 +468,17 @@ int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *ad return TSDB_CODE_SUCCESS; } +int32_t schRecordQueryDataSrc(SSchJob *pJob, SSchTask *pTask) { + if (!SCH_IS_DATA_SRC_QRY_TASK(pTask)) { + return TSDB_CODE_SUCCESS; + } + + taosArrayPush(pJob->dataSrcTasks, &pTask); + + return TSDB_CODE_SUCCESS; +} + + int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { int32_t code = 0; pJob->queryId = pDag->queryId; @@ -473,6 +488,11 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + pJob->dataSrcTasks = taosArrayInit(pDag->numOfSubplans, POINTER_BYTES); + if (NULL == pJob->dataSrcTasks) { + SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans); if (levelNum <= 0) { SCH_JOB_ELOG("invalid level num:%d", levelNum); @@ -551,6 +571,8 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + SCH_ERR_JRET(schRecordQueryDataSrc(pJob, p)); + if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) { SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -629,6 +651,17 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } +int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) { + int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId)); + if (code) { + SCH_TASK_ELOG("task failed to rm from execTask list, code:%x", code); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + return TSDB_CODE_SUCCESS; +} + + int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES); if (0 != code) { @@ -774,6 +807,9 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1); + SCH_ERR_RET(schRemoveTaskFromExecList(pJob, pTask)); + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START); + if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask)); SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask)); @@ -947,6 +983,32 @@ _return: SCH_RET(schProcessOnJobFailure(pJob, errCode)); } +int32_t schLaunchNextLevelTasks(SSchJob *pJob, SSchTask *pTask) { + if (!SCH_IS_QUERY_JOB(pJob)) { + return TSDB_CODE_SUCCESS; + } + + SSchLevel *pLevel = pTask->level; + int32_t doneNum = atomic_add_fetch_32(&pLevel->taskDoneNum, 1); + if (doneNum == pLevel->taskNum) { + pJob->levelIdx--; + + pLevel = taosArrayGet(pJob->levels, pJob->levelIdx); + for (int32_t i = 0; i < pLevel->taskNum; ++i) { + SSchTask *pTask = taosArrayGet(pLevel->subTasks, i); + + if (pTask->children && taosArrayGetSize(pTask->children) > 0) { + continue; + } + + SCH_ERR_RET(schLaunchTask(pJob, pTask)); + } + } + + return TSDB_CODE_SUCCESS; +} + + // Note: no more task error processing, handled in function internal int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { bool moved = false; @@ -1015,11 +1077,13 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { qSetSubplanExecutionNode(par->plan, pTask->plan->id.groupId, &source); SCH_UNLOCK(SCH_WRITE, &par->lock); - if (SCH_TASK_READY_TO_LUNCH(readyNum, par)) { - SCH_ERR_RET(schLaunchTaskImpl(pJob, par)); + if (SCH_TASK_READY_FOR_LAUNCH(readyNum, par)) { + SCH_ERR_RET(schLaunchTask(pJob, par)); } } + SCH_ERR_RET(schLaunchNextLevelTasks(pJob, pTask)); + return TSDB_CODE_SUCCESS; _return: @@ -2400,8 +2464,6 @@ void schFreeJobImpl(void *job) { for (int32_t i = 0; i < numOfLevels; ++i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); - schFreeFlowCtrl(pLevel); - int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks); for (int32_t j = 0; j < numOfTasks; ++j) { SSchTask *pTask = taosArrayGet(pLevel->subTasks, j); @@ -2411,12 +2473,15 @@ void schFreeJobImpl(void *job) { taosArrayDestroy(pLevel->subTasks); } + schFreeFlowCtrl(pJob); + taosHashCleanup(pJob->execTasks); taosHashCleanup(pJob->failTasks); taosHashCleanup(pJob->succTasks); taosArrayDestroy(pJob->levels); taosArrayDestroy(pJob->nodeList); + taosArrayDestroy(pJob->dataSrcTasks); qExplainFreeCtx(pJob->explainCtx);