From 49d9552ab603b8171d0b61d53a2c0075f42d65ab Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 20 Jun 2024 11:18:23 +0800 Subject: [PATCH] fix: task reset redirect issue --- source/libs/scheduler/src/schJob.c | 2 +- source/libs/scheduler/src/schTask.c | 37 ++++++++++++++++++++++++----- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 380862f745..278768981a 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -567,7 +567,7 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) { } SSchLevel *pLevel = pTask->level; - int32_t doneNum = atomic_add_fetch_32(&pLevel->taskExecDoneNum, 1); + int32_t doneNum = atomic_load_32(&pLevel->taskExecDoneNum); if (doneNum == pLevel->taskNum) { atomic_sub_fetch_32(&pJob->levelIdx, 1); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 97c3c7d276..7f60353b1c 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -248,6 +248,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { SCH_LOG_TASK_END_TS(pTask); + atomic_add_fetch_32(&pTask->level->taskExecDoneNum, 1); + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PART_SUCC); SCH_ERR_RET(schRecordTaskSucceedNode(pJob, pTask)); @@ -483,6 +485,34 @@ _return: SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } +int32_t schResetTaskSetLevelInfo(SSchJob *pJob, SSchTask *pTask) { + SSchLevel *pLevel = pTask->level; + + SCH_TASK_DLOG("start to reset level for current task set, execDone:%d, launched:%d", + atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum)); + + if (SCH_GET_TASK_STATUS(pTask) >= JOB_TASK_STATUS_PART_SUCC) { + atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1); + } + + atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1); + + int32_t childrenNum = taosArrayGetSize(pTask->children); + for (int32_t i = 0; i < childrenNum; ++i) { + SSchTask *pChild = taosArrayGetP(pTask->children, i); + SCH_LOCK_TASK(pChild); + pLevel = pChild->level; + atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1); + atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1); + SCH_UNLOCK_TASK(pChild); + } + + SCH_TASK_DLOG("end to reset level for current task set, execDone:%d, launched:%d", + atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum)); + + return TSDB_CODE_SUCCESS; +} + int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { int32_t code = 0; @@ -498,12 +528,7 @@ int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, i SCH_TASK_DLOG("start to redirect current task set cause of error: %s", tstrerror(rspCode)); - for (int32_t i = 0; i < pJob->levelNum; ++i) { - SSchLevel *pLevel = taosArrayGet(pJob->levels, i); - - pLevel->taskExecDoneNum = 0; - pLevel->taskLaunchedNum = 0; - } + SCH_ERR_JRET(schResetTaskSetLevelInfo(pJob, pTask)); SCH_RESET_JOB_LEVEL_IDX(pJob);