fix: dead lock issue

This commit is contained in:
dapan1121 2023-08-25 15:20:12 +08:00
parent e0cc4e7ed7
commit cf50665c20
4 changed files with 17 additions and 13 deletions

View File

@ -598,7 +598,7 @@ int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode);
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob); int32_t schProcessOnJobPartialSuccess(SSchJob *pJob);
void schFreeTask(SSchJob *pJob, SSchTask *pTask); void schFreeTask(SSchJob *pJob, SSchTask *pTask);
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list); void schDropTaskInHashList(SSchJob *pJob, SHashObj *list);
int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type); int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pTask);
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level); int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask); int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel); int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel);
@ -613,7 +613,7 @@ int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32
int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode); int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode);
void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask); void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask);
int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, int32_t rspCode); int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, int32_t rspCode);
int32_t schNotifyJobAllTasks(SSchJob *pJob, ETaskNotifyType type); int32_t schNotifyJobAllTasks(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType type);
extern SSchDebug gSCHDebug; extern SSchDebug gSCHDebug;

View File

@ -638,8 +638,8 @@ void schDropJobAllTasks(SSchJob *pJob) {
// schDropTaskInHashList(pJob, pJob->failTasks); // schDropTaskInHashList(pJob, pJob->failTasks);
} }
int32_t schNotifyJobAllTasks(SSchJob *pJob, ETaskNotifyType type) { int32_t schNotifyJobAllTasks(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType type) {
SCH_RET(schNotifyTaskInHashList(pJob, pJob->execTasks, type)); SCH_RET(schNotifyTaskInHashList(pJob, pJob->execTasks, type, pTask));
} }
void schFreeJobImpl(void *job) { void schFreeJobImpl(void *job) {

View File

@ -89,7 +89,7 @@ int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rs
if (pRsp) { if (pRsp) {
SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp)); SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
} else { } else {
SCH_ERR_JRET(schNotifyJobAllTasks(pJob, TASK_NOTIFY_FINISHED)); SCH_ERR_JRET(schNotifyJobAllTasks(pJob, pTask, TASK_NOTIFY_FINISHED));
} }
taosMemoryFreeClear(msg); taosMemoryFreeClear(msg);

View File

@ -1265,18 +1265,22 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
} }
} }
int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type) { int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pCurrTask) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SCH_ERR_RET(schNotifyTaskOnExecNode(pJob, pCurrTask, type));
void *pIter = taosHashIterate(list, NULL); void *pIter = taosHashIterate(list, NULL);
while (pIter) { while (pIter) {
SSchTask *pTask = *(SSchTask **)pIter; SSchTask *pTask = *(SSchTask **)pIter;
if (pTask != pCurrTask) {
SCH_LOCK_TASK(pTask); SCH_LOCK_TASK(pTask);
code = schNotifyTaskOnExecNode(pJob, pTask, type); code = schNotifyTaskOnExecNode(pJob, pTask, type);
SCH_UNLOCK_TASK(pTask); SCH_UNLOCK_TASK(pTask);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
break; break;
}
} }
pIter = taosHashIterate(list, pIter); pIter = taosHashIterate(list, pIter);