fix: task reset redirect issue
This commit is contained in:
parent
c3ae66c95c
commit
49d9552ab6
|
@ -567,7 +567,7 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SSchLevel *pLevel = pTask->level;
|
SSchLevel *pLevel = pTask->level;
|
||||||
int32_t doneNum = atomic_add_fetch_32(&pLevel->taskExecDoneNum, 1);
|
int32_t doneNum = atomic_load_32(&pLevel->taskExecDoneNum);
|
||||||
if (doneNum == pLevel->taskNum) {
|
if (doneNum == pLevel->taskNum) {
|
||||||
atomic_sub_fetch_32(&pJob->levelIdx, 1);
|
atomic_sub_fetch_32(&pJob->levelIdx, 1);
|
||||||
|
|
||||||
|
|
|
@ -248,6 +248,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
||||||
|
|
||||||
SCH_LOG_TASK_END_TS(pTask);
|
SCH_LOG_TASK_END_TS(pTask);
|
||||||
|
|
||||||
|
atomic_add_fetch_32(&pTask->level->taskExecDoneNum, 1);
|
||||||
|
|
||||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PART_SUCC);
|
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PART_SUCC);
|
||||||
|
|
||||||
SCH_ERR_RET(schRecordTaskSucceedNode(pJob, pTask));
|
SCH_ERR_RET(schRecordTaskSucceedNode(pJob, pTask));
|
||||||
|
@ -483,6 +485,34 @@ _return:
|
||||||
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
|
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t schResetTaskSetLevelInfo(SSchJob *pJob, SSchTask *pTask) {
|
||||||
|
SSchLevel *pLevel = pTask->level;
|
||||||
|
|
||||||
|
SCH_TASK_DLOG("start to reset level for current task set, execDone:%d, launched:%d",
|
||||||
|
atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum));
|
||||||
|
|
||||||
|
if (SCH_GET_TASK_STATUS(pTask) >= JOB_TASK_STATUS_PART_SUCC) {
|
||||||
|
atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
|
||||||
|
|
||||||
|
int32_t childrenNum = taosArrayGetSize(pTask->children);
|
||||||
|
for (int32_t i = 0; i < childrenNum; ++i) {
|
||||||
|
SSchTask *pChild = taosArrayGetP(pTask->children, i);
|
||||||
|
SCH_LOCK_TASK(pChild);
|
||||||
|
pLevel = pChild->level;
|
||||||
|
atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
|
||||||
|
atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
|
||||||
|
SCH_UNLOCK_TASK(pChild);
|
||||||
|
}
|
||||||
|
|
||||||
|
SCH_TASK_DLOG("end to reset level for current task set, execDone:%d, launched:%d",
|
||||||
|
atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum));
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
|
int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -498,12 +528,7 @@ int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, i
|
||||||
|
|
||||||
SCH_TASK_DLOG("start to redirect current task set cause of error: %s", tstrerror(rspCode));
|
SCH_TASK_DLOG("start to redirect current task set cause of error: %s", tstrerror(rspCode));
|
||||||
|
|
||||||
for (int32_t i = 0; i < pJob->levelNum; ++i) {
|
SCH_ERR_JRET(schResetTaskSetLevelInfo(pJob, pTask));
|
||||||
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
|
|
||||||
|
|
||||||
pLevel->taskExecDoneNum = 0;
|
|
||||||
pLevel->taskLaunchedNum = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
SCH_RESET_JOB_LEVEL_IDX(pJob);
|
SCH_RESET_JOB_LEVEL_IDX(pJob);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue