From 90b60d9d013127eb51a78163ea886247e3294263 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 28 Nov 2024 13:56:27 +0800 Subject: [PATCH] fix: scheduler timer stop issue --- include/util/ttimer.h | 2 ++ source/dnode/vnode/src/tsdb/tsdbRead2.c | 6 +++--- source/libs/scheduler/inc/schInt.h | 1 + source/libs/scheduler/src/schTask.c | 8 ++------ source/libs/scheduler/src/schUtil.c | 12 ++++++++++++ source/util/src/ttimer.c | 14 ++++++++++++++ 6 files changed, 34 insertions(+), 9 deletions(-) diff --git a/include/util/ttimer.h b/include/util/ttimer.h index 4111a8ca28..53a8f0a19f 100644 --- a/include/util/ttimer.h +++ b/include/util/ttimer.h @@ -39,6 +39,8 @@ bool taosTmrStop(tmr_h tmrId); bool taosTmrStopA(tmr_h *tmrId); +bool taosTmrIsStopped(tmr_h* timerId); + bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *handle, tmr_h *pTmrId); #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index f30f7eb310..f3797c538b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -6115,7 +6115,7 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { TSDB_CHECK_CODE(code, lino, _end); } - goto _end; + return code; } } @@ -6142,7 +6142,7 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { acquired = false; TSDB_CHECK_CODE(code, lino, _end); } - goto _end; + return code; } if (pReader->step == EXTERNAL_ROWS_MAIN && pReader->innerReader[1] != NULL) { @@ -6168,7 +6168,7 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { TSDB_CHECK_CODE(code, lino, _end); } - goto _end; + return code; } } diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 72914a963f..ae99dff10c 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -657,6 +657,7 @@ int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level); void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask); int32_t schValidateSubplan(SSchJob *pJob, SSubplan* pSubplan, int32_t level, int32_t idx, int32_t taskNum); +void schStopTaskDelayTimer(SSchJob *pJob, SSchTask* pTask, bool syncOp); int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel); int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask); void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 706879b0cf..bd737d6c89 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -428,9 +428,7 @@ void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) { schDropTaskOnExecNode(pJob, pTask); if (pTask->delayTimer) { - if (!taosTmrStopA(&pTask->delayTimer)) { - SCH_TASK_WLOG("stop task delayTimer failed, may stopped, status:%d", pTask->status); - } + schStopTaskDelayTimer(pJob, pTask, false); } taosHashClear(pTask->execNodes); (void)schRemoveTaskFromExecList(pJob, pTask); // ignore error @@ -1361,9 +1359,7 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { SCH_LOCK_TASK(pTask); if (pTask->delayTimer) { - if (!taosTmrStopA(&pTask->delayTimer)) { - SCH_TASK_WLOG("stop delayTimer failed, status:%d", pTask->status); - } + schStopTaskDelayTimer(pJob, pTask, true); } schDropTaskOnExecNode(pJob, pTask); SCH_UNLOCK_TASK(pTask); diff --git a/source/libs/scheduler/src/schUtil.c b/source/libs/scheduler/src/schUtil.c index 29f0df4974..722ec8849c 100644 --- a/source/libs/scheduler/src/schUtil.c +++ b/source/libs/scheduler/src/schUtil.c @@ -418,4 +418,16 @@ int32_t schValidateSubplan(SSchJob *pJob, SSubplan* pSubplan, int32_t level, int return TSDB_CODE_SUCCESS; } +void schStopTaskDelayTimer(SSchJob *pJob, SSchTask* pTask, bool syncOp) { + if (!taosTmrStopA(&pTask->delayTimer)) { + if (syncOp) { + while (!taosTmrIsStopped(&pTask->delayTimer)) { + taosMsleep(1); + } + } else { + SCH_TASK_WLOG("stop task delayTimer failed, may stopped, status:%d", pTask->status); + } + } +} + diff --git a/source/util/src/ttimer.c b/source/util/src/ttimer.c index b11f65da8f..e8ed86b4bf 100644 --- a/source/util/src/ttimer.c +++ b/source/util/src/ttimer.c @@ -472,6 +472,20 @@ bool taosTmrStopA(tmr_h* timerId) { return ret; } +bool taosTmrIsStopped(tmr_h* timerId) { + uintptr_t id = (uintptr_t)*timerId; + + tmr_obj_t* timer = findTimer(id); + if (timer == NULL) { + tmrDebug("timer[id=%" PRIuPTR "] does not exist", id); + return true; + } + + uint8_t state = atomic_load_8(&timer->state); + + return (state == TIMER_STATE_CANCELED) || (state == TIMER_STATE_STOPPED); +} + bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId) { tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle; if (ctrl == NULL || ctrl->label[0] == 0) {