set task/job status

This commit is contained in:
dapan1121 2021-12-20 09:28:22 +08:00
parent fa66a89bcc
commit 35f76a3c18
2 changed files with 25 additions and 8 deletions

View File

@ -101,7 +101,7 @@ typedef struct SQueryJob {
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
extern int32_t schTaskRun(SQueryJob *job, SQueryTask *task);
extern int32_t schLaunchTask(SQueryJob *job, SQueryTask *task);
#ifdef __cplusplus
}

View File

@ -333,7 +333,12 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
}
int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCode, bool *needRetry) {
// TODO set retry or not based on task type/errCode/retry times/job status/available eps...
// TODO if needRetry, set task retry info
*needRetry = false;
return TSDB_CODE_SUCCESS;
}
int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32_t rspCode) {
@ -401,6 +406,8 @@ _return:
int32_t schProcessOnJobSuccess(SQueryJob *job) {
job->status = SCH_STATUS_SUCCEED;
if (job->userFetch) {
SCH_ERR_RET(schFetchFromRemote(job));
}
@ -409,6 +416,8 @@ int32_t schProcessOnJobSuccess(SQueryJob *job) {
}
int32_t schProcessOnJobFailure(SQueryJob *job) {
job->status = SCH_STATUS_FAILED;
atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
if (job->userFetch) {
@ -433,6 +442,8 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status);
return TSDB_CODE_SUCCESS;
}
task->status = SCH_STATUS_SUCCEED;
int32_t parentNum = (int32_t)taosArrayGetSize(task->parents);
if (parentNum == 0) {
@ -464,7 +475,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, task->plan->id.templateId, &task->execAddr));
if (SCH_TASK_READY_TO_LUNCH(par)) {
SCH_ERR_RET(schTaskRun(job, task));
SCH_ERR_RET(schLaunchTask(job, task));
}
}
@ -484,7 +495,7 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod
return TSDB_CODE_SUCCESS;
}
SCH_ERR_RET(schTaskRun(job, task));
SCH_ERR_RET(schLaunchTask(job, task));
return TSDB_CODE_SUCCESS;
}
@ -492,7 +503,7 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod
int32_t schTaskRun(SQueryJob *job, SQueryTask *task) {
int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) {
SSubplan *plan = task->plan;
SCH_ERR_RET(qSubPlanToString(plan, &task->msg));
@ -504,15 +515,19 @@ int32_t schTaskRun(SQueryJob *job, SQueryTask *task) {
SCH_ERR_RET(schPushTaskToExecList(job, task));
task->status = SCH_STATUS_EXECUTING;
return TSDB_CODE_SUCCESS;
}
int32_t schJobRun(SQueryJob *job) {
int32_t schLaunchJob(SQueryJob *job) {
SQueryLevel *level = taosArrayGet(job->levels, job->levelIdx);
for (int32_t i = 0; i < level->taskNum; ++i) {
SQueryTask *task = taosArrayGet(level->subTasks, i);
SCH_ERR_RET(schTaskRun(job, task));
SCH_ERR_RET(schLaunchTask(job, task));
}
job->status = SCH_STATUS_EXECUTING;
return TSDB_CODE_SUCCESS;
}
@ -563,7 +578,7 @@ int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pM
tsem_init(&job->rspSem, 0, 0);
SCH_ERR_JRET(schJobRun(job));
SCH_ERR_JRET(schLaunchJob(job));
*(SQueryJob **)pJob = job;
@ -611,11 +626,13 @@ void scheduleFreeJob(void *job) {
if (NULL == job) {
return;
}
//TODO
}
void schedulerDestroy(void) {
if (schMgmt.Jobs) {
taosHashCleanup(schMgmt.Jobs); //TBD
taosHashCleanup(schMgmt.Jobs); //TODO
schMgmt.Jobs = NULL;
}
}