diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index c649e645a0..3b7a76bfc7 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -598,7 +598,7 @@ int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode); int32_t schProcessOnJobPartialSuccess(SSchJob *pJob); void schFreeTask(SSchJob *pJob, SSchTask *pTask); void schDropTaskInHashList(SSchJob *pJob, SHashObj *list); -int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type); +int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pTask); int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level); int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask); int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel); @@ -613,7 +613,7 @@ int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32 int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode); void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask); int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, int32_t rspCode); -int32_t schNotifyJobAllTasks(SSchJob *pJob, ETaskNotifyType type); +int32_t schNotifyJobAllTasks(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType type); extern SSchDebug gSCHDebug; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 87370e8993..b565619e75 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -638,8 +638,8 @@ void schDropJobAllTasks(SSchJob *pJob) { // schDropTaskInHashList(pJob, pJob->failTasks); } -int32_t schNotifyJobAllTasks(SSchJob *pJob, ETaskNotifyType type) { - SCH_RET(schNotifyTaskInHashList(pJob, pJob->execTasks, type)); +int32_t schNotifyJobAllTasks(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType type) { + SCH_RET(schNotifyTaskInHashList(pJob, pJob->execTasks, type, pTask)); } void schFreeJobImpl(void *job) { diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 291a383393..7b8decc007 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -89,7 +89,7 @@ int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rs if (pRsp) { SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp)); } else { - SCH_ERR_JRET(schNotifyJobAllTasks(pJob, TASK_NOTIFY_FINISHED)); + SCH_ERR_JRET(schNotifyJobAllTasks(pJob, pTask, TASK_NOTIFY_FINISHED)); } taosMemoryFreeClear(msg); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index b284edf39a..d96c01fc76 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -1265,18 +1265,22 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { } } -int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type) { +int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pCurrTask) { int32_t code = TSDB_CODE_SUCCESS; + + SCH_ERR_RET(schNotifyTaskOnExecNode(pJob, pCurrTask, type)); + void *pIter = taosHashIterate(list, NULL); while (pIter) { SSchTask *pTask = *(SSchTask **)pIter; - - SCH_LOCK_TASK(pTask); - code = schNotifyTaskOnExecNode(pJob, pTask, type); - SCH_UNLOCK_TASK(pTask); - - if (TSDB_CODE_SUCCESS != code) { - break; + if (pTask != pCurrTask) { + SCH_LOCK_TASK(pTask); + code = schNotifyTaskOnExecNode(pJob, pTask, type); + SCH_UNLOCK_TASK(pTask); + + if (TSDB_CODE_SUCCESS != code) { + break; + } } pIter = taosHashIterate(list, pIter);