support level schedule
This commit is contained in:
parent
403b2ba1c2
commit
c6c8134a92
|
@ -132,7 +132,7 @@ typedef struct SSchLevel {
|
|||
int32_t taskSucceed;
|
||||
int32_t taskNum;
|
||||
int32_t taskLaunchedNum;
|
||||
SHashObj *flowCtrl; // key is ep, element is SSchFlowControl
|
||||
int32_t taskDoneNum;
|
||||
SArray *subTasks; // Element is SQueryTask
|
||||
} SSchLevel;
|
||||
|
||||
|
@ -175,11 +175,13 @@ typedef struct SSchJob {
|
|||
SArray *levels; // starting from 0. SArray<SSchLevel>
|
||||
SNodeList *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler
|
||||
|
||||
SArray *dataSrcTasks; // SArray<SQueryTask*>
|
||||
int32_t levelIdx;
|
||||
SEpSet dataSrcEps;
|
||||
SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask*
|
||||
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
|
||||
SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask*
|
||||
SHashObj *flowCtrl; // key is ep, element is SSchFlowControl
|
||||
|
||||
SExplainCtx *explainCtx;
|
||||
int8_t status;
|
||||
|
@ -200,7 +202,7 @@ typedef struct SSchJob {
|
|||
|
||||
extern SSchedulerMgmt schMgmt;
|
||||
|
||||
#define SCH_TASK_READY_TO_LUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
|
||||
#define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
|
||||
|
||||
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
|
||||
#define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0)
|
||||
|
@ -223,7 +225,7 @@ extern SSchedulerMgmt schMgmt;
|
|||
|
||||
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
|
||||
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
|
||||
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEAF_TASK(_job, _task) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
|
||||
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
|
||||
|
||||
#define SCH_SET_JOB_TYPE(_job, type) (_job)->attr.queryJob = ((type) != SUBPLAN_TYPE_MODIFY)
|
||||
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
|
||||
|
@ -261,7 +263,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task);
|
|||
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
|
||||
SSchJob *schAcquireJob(int64_t refId);
|
||||
int32_t schReleaseJob(int64_t refId);
|
||||
void schFreeFlowCtrl(SSchLevel *pLevel);
|
||||
void schFreeFlowCtrl(SSchJob *pJob);
|
||||
int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
|
||||
int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);
|
||||
int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough);
|
||||
|
|
|
@ -19,13 +19,13 @@
|
|||
#include "catalog.h"
|
||||
#include "tref.h"
|
||||
|
||||
void schFreeFlowCtrl(SSchLevel *pLevel) {
|
||||
if (NULL == pLevel->flowCtrl) {
|
||||
void schFreeFlowCtrl(SSchJob *pJob) {
|
||||
if (NULL == pJob->flowCtrl) {
|
||||
return;
|
||||
}
|
||||
|
||||
SSchFlowControl *ctrl = NULL;
|
||||
void *pIter = taosHashIterate(pLevel->flowCtrl, NULL);
|
||||
void *pIter = taosHashIterate(pJob->flowCtrl, NULL);
|
||||
while (pIter) {
|
||||
ctrl = (SSchFlowControl *)pIter;
|
||||
|
||||
|
@ -33,11 +33,11 @@ void schFreeFlowCtrl(SSchLevel *pLevel) {
|
|||
taosArrayDestroy(ctrl->taskList);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pLevel->flowCtrl, pIter);
|
||||
pIter = taosHashIterate(pJob->flowCtrl, pIter);
|
||||
}
|
||||
|
||||
taosHashCleanup(pLevel->flowCtrl);
|
||||
pLevel->flowCtrl = NULL;
|
||||
taosHashCleanup(pJob->flowCtrl);
|
||||
pJob->flowCtrl = NULL;
|
||||
}
|
||||
|
||||
int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
|
||||
|
@ -47,9 +47,9 @@ int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
|
|||
}
|
||||
|
||||
int32_t sum = 0;
|
||||
|
||||
for (int32_t i = 0; i < pLevel->taskNum; ++i) {
|
||||
SSchTask *pTask = taosArrayGet(pLevel->subTasks, i);
|
||||
int32_t taskNum = taosArrayGetSize(pJob->dataSrcTasks);
|
||||
for (int32_t i = 0; i < taskNum; ++i) {
|
||||
SSchTask *pTask = *(SSchTask **)taosArrayGet(pJob->dataSrcTasks, i);
|
||||
|
||||
sum += pTask->plan->execNodeStat.tableNum;
|
||||
}
|
||||
|
@ -59,9 +59,9 @@ int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
pLevel->flowCtrl = taosHashInit(pLevel->taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||
if (NULL == pLevel->flowCtrl) {
|
||||
SCH_JOB_ELOG("taosHashInit %d flowCtrl failed", pLevel->taskNum);
|
||||
pJob->flowCtrl = taosHashInit(pJob->taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||
if (NULL == pJob->flowCtrl) {
|
||||
SCH_JOB_ELOG("taosHashInit %d flowCtrl failed", pJob->taskNum);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -78,7 +78,7 @@ int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) {
|
|||
int32_t code = 0;
|
||||
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
|
||||
|
||||
ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp));
|
||||
ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp));
|
||||
if (NULL == ctrl) {
|
||||
SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port);
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
|
@ -110,11 +110,11 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
|
|||
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
|
||||
|
||||
do {
|
||||
ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp));
|
||||
ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp));
|
||||
if (NULL == ctrl) {
|
||||
SSchFlowControl nctrl = {.tableNumSum = pTask->plan->execNodeStat.tableNum, .execTaskNum = 1};
|
||||
|
||||
code = taosHashPut(pLevel->flowCtrl, ep, sizeof(SEp), &nctrl, sizeof(nctrl));
|
||||
code = taosHashPut(pJob->flowCtrl, ep, sizeof(SEp), &nctrl, sizeof(nctrl));
|
||||
if (code) {
|
||||
if (HASH_NODE_EXIST(code)) {
|
||||
continue;
|
||||
|
@ -273,10 +273,9 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) {
|
|||
|
||||
SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask));
|
||||
|
||||
SSchLevel *pLevel = pTask->level;
|
||||
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
|
||||
|
||||
SSchFlowControl *ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp));
|
||||
SSchFlowControl *ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp));
|
||||
if (NULL == ctrl) {
|
||||
SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port);
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
|
|
|
@ -391,6 +391,8 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
|
|||
SCH_TASK_ELOG("taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SCH_TASK_DLOG("children info, the %d child TID %" PRIx64, n, (*childTask)->taskId);
|
||||
}
|
||||
|
||||
if (parentNum > 0) {
|
||||
|
@ -423,6 +425,8 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
|
|||
SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SCH_TASK_DLOG("parents info, the %d parent TID %" PRIx64, n, (*parentTask)->taskId);
|
||||
}
|
||||
|
||||
SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum);
|
||||
|
@ -464,6 +468,17 @@ int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *ad
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schRecordQueryDataSrc(SSchJob *pJob, SSchTask *pTask) {
|
||||
if (!SCH_IS_DATA_SRC_QRY_TASK(pTask)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
taosArrayPush(pJob->dataSrcTasks, &pTask);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
||||
int32_t code = 0;
|
||||
pJob->queryId = pDag->queryId;
|
||||
|
@ -473,6 +488,11 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
|||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
pJob->dataSrcTasks = taosArrayInit(pDag->numOfSubplans, POINTER_BYTES);
|
||||
if (NULL == pJob->dataSrcTasks) {
|
||||
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
|
||||
if (levelNum <= 0) {
|
||||
SCH_JOB_ELOG("invalid level num:%d", levelNum);
|
||||
|
@ -551,6 +571,8 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
|||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schRecordQueryDataSrc(pJob, p));
|
||||
|
||||
if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) {
|
||||
SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
|
@ -629,6 +651,17 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
|
||||
int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
|
||||
if (code) {
|
||||
SCH_TASK_ELOG("task failed to rm from execTask list, code:%x", code);
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
|
||||
int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
|
||||
if (0 != code) {
|
||||
|
@ -774,6 +807,9 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
|
|||
int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
|
||||
atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
|
||||
|
||||
SCH_ERR_RET(schRemoveTaskFromExecList(pJob, pTask));
|
||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
|
||||
|
||||
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
|
||||
SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask));
|
||||
SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
|
||||
|
@ -947,6 +983,32 @@ _return:
|
|||
SCH_RET(schProcessOnJobFailure(pJob, errCode));
|
||||
}
|
||||
|
||||
int32_t schLaunchNextLevelTasks(SSchJob *pJob, SSchTask *pTask) {
|
||||
if (!SCH_IS_QUERY_JOB(pJob)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSchLevel *pLevel = pTask->level;
|
||||
int32_t doneNum = atomic_add_fetch_32(&pLevel->taskDoneNum, 1);
|
||||
if (doneNum == pLevel->taskNum) {
|
||||
pJob->levelIdx--;
|
||||
|
||||
pLevel = taosArrayGet(pJob->levels, pJob->levelIdx);
|
||||
for (int32_t i = 0; i < pLevel->taskNum; ++i) {
|
||||
SSchTask *pTask = taosArrayGet(pLevel->subTasks, i);
|
||||
|
||||
if (pTask->children && taosArrayGetSize(pTask->children) > 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SCH_ERR_RET(schLaunchTask(pJob, pTask));
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
// Note: no more task error processing, handled in function internal
|
||||
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
||||
bool moved = false;
|
||||
|
@ -1015,11 +1077,13 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
|||
qSetSubplanExecutionNode(par->plan, pTask->plan->id.groupId, &source);
|
||||
SCH_UNLOCK(SCH_WRITE, &par->lock);
|
||||
|
||||
if (SCH_TASK_READY_TO_LUNCH(readyNum, par)) {
|
||||
SCH_ERR_RET(schLaunchTaskImpl(pJob, par));
|
||||
if (SCH_TASK_READY_FOR_LAUNCH(readyNum, par)) {
|
||||
SCH_ERR_RET(schLaunchTask(pJob, par));
|
||||
}
|
||||
}
|
||||
|
||||
SCH_ERR_RET(schLaunchNextLevelTasks(pJob, pTask));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
@ -2400,8 +2464,6 @@ void schFreeJobImpl(void *job) {
|
|||
for (int32_t i = 0; i < numOfLevels; ++i) {
|
||||
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
|
||||
|
||||
schFreeFlowCtrl(pLevel);
|
||||
|
||||
int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
|
||||
for (int32_t j = 0; j < numOfTasks; ++j) {
|
||||
SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
|
||||
|
@ -2411,12 +2473,15 @@ void schFreeJobImpl(void *job) {
|
|||
taosArrayDestroy(pLevel->subTasks);
|
||||
}
|
||||
|
||||
schFreeFlowCtrl(pJob);
|
||||
|
||||
taosHashCleanup(pJob->execTasks);
|
||||
taosHashCleanup(pJob->failTasks);
|
||||
taosHashCleanup(pJob->succTasks);
|
||||
|
||||
taosArrayDestroy(pJob->levels);
|
||||
taosArrayDestroy(pJob->nodeList);
|
||||
taosArrayDestroy(pJob->dataSrcTasks);
|
||||
|
||||
qExplainFreeCtx(pJob->explainCtx);
|
||||
|
||||
|
|
Loading…
Reference in New Issue