fix: scheduler timer stop issue
This commit is contained in:
parent
a15120f97e
commit
90b60d9d01
|
@ -39,6 +39,8 @@ bool taosTmrStop(tmr_h tmrId);
|
||||||
|
|
||||||
bool taosTmrStopA(tmr_h *tmrId);
|
bool taosTmrStopA(tmr_h *tmrId);
|
||||||
|
|
||||||
|
bool taosTmrIsStopped(tmr_h* timerId);
|
||||||
|
|
||||||
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *handle, tmr_h *pTmrId);
|
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *handle, tmr_h *pTmrId);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -6115,7 +6115,7 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
|
||||||
TSDB_CHECK_CODE(code, lino, _end);
|
TSDB_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
goto _end;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6142,7 +6142,7 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
|
||||||
acquired = false;
|
acquired = false;
|
||||||
TSDB_CHECK_CODE(code, lino, _end);
|
TSDB_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
goto _end;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->step == EXTERNAL_ROWS_MAIN && pReader->innerReader[1] != NULL) {
|
if (pReader->step == EXTERNAL_ROWS_MAIN && pReader->innerReader[1] != NULL) {
|
||||||
|
@ -6168,7 +6168,7 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
|
||||||
TSDB_CHECK_CODE(code, lino, _end);
|
TSDB_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
goto _end;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -657,6 +657,7 @@ int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType
|
||||||
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
|
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
|
||||||
void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
|
void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
|
||||||
int32_t schValidateSubplan(SSchJob *pJob, SSubplan* pSubplan, int32_t level, int32_t idx, int32_t taskNum);
|
int32_t schValidateSubplan(SSchJob *pJob, SSubplan* pSubplan, int32_t level, int32_t idx, int32_t taskNum);
|
||||||
|
void schStopTaskDelayTimer(SSchJob *pJob, SSchTask* pTask, bool syncOp);
|
||||||
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel);
|
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel);
|
||||||
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
|
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
|
||||||
void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode);
|
void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode);
|
||||||
|
|
|
@ -428,9 +428,7 @@ void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) {
|
||||||
|
|
||||||
schDropTaskOnExecNode(pJob, pTask);
|
schDropTaskOnExecNode(pJob, pTask);
|
||||||
if (pTask->delayTimer) {
|
if (pTask->delayTimer) {
|
||||||
if (!taosTmrStopA(&pTask->delayTimer)) {
|
schStopTaskDelayTimer(pJob, pTask, false);
|
||||||
SCH_TASK_WLOG("stop task delayTimer failed, may stopped, status:%d", pTask->status);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
taosHashClear(pTask->execNodes);
|
taosHashClear(pTask->execNodes);
|
||||||
(void)schRemoveTaskFromExecList(pJob, pTask); // ignore error
|
(void)schRemoveTaskFromExecList(pJob, pTask); // ignore error
|
||||||
|
@ -1361,9 +1359,7 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
|
||||||
|
|
||||||
SCH_LOCK_TASK(pTask);
|
SCH_LOCK_TASK(pTask);
|
||||||
if (pTask->delayTimer) {
|
if (pTask->delayTimer) {
|
||||||
if (!taosTmrStopA(&pTask->delayTimer)) {
|
schStopTaskDelayTimer(pJob, pTask, true);
|
||||||
SCH_TASK_WLOG("stop delayTimer failed, status:%d", pTask->status);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
schDropTaskOnExecNode(pJob, pTask);
|
schDropTaskOnExecNode(pJob, pTask);
|
||||||
SCH_UNLOCK_TASK(pTask);
|
SCH_UNLOCK_TASK(pTask);
|
||||||
|
|
|
@ -418,4 +418,16 @@ int32_t schValidateSubplan(SSchJob *pJob, SSubplan* pSubplan, int32_t level, int
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void schStopTaskDelayTimer(SSchJob *pJob, SSchTask* pTask, bool syncOp) {
|
||||||
|
if (!taosTmrStopA(&pTask->delayTimer)) {
|
||||||
|
if (syncOp) {
|
||||||
|
while (!taosTmrIsStopped(&pTask->delayTimer)) {
|
||||||
|
taosMsleep(1);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
SCH_TASK_WLOG("stop task delayTimer failed, may stopped, status:%d", pTask->status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -472,6 +472,20 @@ bool taosTmrStopA(tmr_h* timerId) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool taosTmrIsStopped(tmr_h* timerId) {
|
||||||
|
uintptr_t id = (uintptr_t)*timerId;
|
||||||
|
|
||||||
|
tmr_obj_t* timer = findTimer(id);
|
||||||
|
if (timer == NULL) {
|
||||||
|
tmrDebug("timer[id=%" PRIuPTR "] does not exist", id);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint8_t state = atomic_load_8(&timer->state);
|
||||||
|
|
||||||
|
return (state == TIMER_STATE_CANCELED) || (state == TIMER_STATE_STOPPED);
|
||||||
|
}
|
||||||
|
|
||||||
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId) {
|
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId) {
|
||||||
tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
|
tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
|
||||||
if (ctrl == NULL || ctrl->label[0] == 0) {
|
if (ctrl == NULL || ctrl->label[0] == 0) {
|
||||||
|
|
Loading…
Reference in New Issue