diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 4d04c4dff7..db75fc4fdd 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -101,7 +101,7 @@ typedef struct SQueryJob { #define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) -extern int32_t schTaskRun(SQueryJob *job, SQueryTask *task); +extern int32_t schLaunchTask(SQueryJob *job, SQueryTask *task); #ifdef __cplusplus } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index dbbe791136..e68be25bce 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -333,7 +333,12 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { } int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCode, bool *needRetry) { + // TODO set retry or not based on task type/errCode/retry times/job status/available eps... + // TODO if needRetry, set task retry info + *needRetry = false; + + return TSDB_CODE_SUCCESS; } int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32_t rspCode) { @@ -401,6 +406,8 @@ _return: int32_t schProcessOnJobSuccess(SQueryJob *job) { + job->status = SCH_STATUS_SUCCEED; + if (job->userFetch) { SCH_ERR_RET(schFetchFromRemote(job)); } @@ -409,6 +416,8 @@ int32_t schProcessOnJobSuccess(SQueryJob *job) { } int32_t schProcessOnJobFailure(SQueryJob *job) { + job->status = SCH_STATUS_FAILED; + atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); if (job->userFetch) { @@ -433,6 +442,8 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status); return TSDB_CODE_SUCCESS; } + + task->status = SCH_STATUS_SUCCEED; int32_t parentNum = (int32_t)taosArrayGetSize(task->parents); if (parentNum == 0) { @@ -464,7 +475,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, task->plan->id.templateId, &task->execAddr)); if (SCH_TASK_READY_TO_LUNCH(par)) { - SCH_ERR_RET(schTaskRun(job, task)); + SCH_ERR_RET(schLaunchTask(job, task)); } } @@ -484,7 +495,7 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod return TSDB_CODE_SUCCESS; } - SCH_ERR_RET(schTaskRun(job, task)); + SCH_ERR_RET(schLaunchTask(job, task)); return TSDB_CODE_SUCCESS; } @@ -492,7 +503,7 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod -int32_t schTaskRun(SQueryJob *job, SQueryTask *task) { +int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) { SSubplan *plan = task->plan; SCH_ERR_RET(qSubPlanToString(plan, &task->msg)); @@ -504,15 +515,19 @@ int32_t schTaskRun(SQueryJob *job, SQueryTask *task) { SCH_ERR_RET(schPushTaskToExecList(job, task)); + task->status = SCH_STATUS_EXECUTING; + return TSDB_CODE_SUCCESS; } -int32_t schJobRun(SQueryJob *job) { +int32_t schLaunchJob(SQueryJob *job) { SQueryLevel *level = taosArrayGet(job->levels, job->levelIdx); for (int32_t i = 0; i < level->taskNum; ++i) { SQueryTask *task = taosArrayGet(level->subTasks, i); - SCH_ERR_RET(schTaskRun(job, task)); + SCH_ERR_RET(schLaunchTask(job, task)); } + + job->status = SCH_STATUS_EXECUTING; return TSDB_CODE_SUCCESS; } @@ -563,7 +578,7 @@ int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pM tsem_init(&job->rspSem, 0, 0); - SCH_ERR_JRET(schJobRun(job)); + SCH_ERR_JRET(schLaunchJob(job)); *(SQueryJob **)pJob = job; @@ -611,11 +626,13 @@ void scheduleFreeJob(void *job) { if (NULL == job) { return; } + + //TODO } void schedulerDestroy(void) { if (schMgmt.Jobs) { - taosHashCleanup(schMgmt.Jobs); //TBD + taosHashCleanup(schMgmt.Jobs); //TODO schMgmt.Jobs = NULL; } }