diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 6600d7dd04..dbe868b54f 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -58,7 +58,6 @@ struct STokenBucket { struct STaskTimer { void* hTaskLaunchTimer; void* dispatchTimer; - void* checkTimer; }; extern SStreamGlobalEnv streamEnv; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 72d05d0a74..ff1728a6eb 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -24,8 +24,9 @@ typedef struct SLaunchHTaskInfo { } SLaunchHTaskInfo; typedef struct STaskRecheckInfo { - SStreamTask* pTask; + SStreamTask* pTask; SStreamTaskCheckReq req; + void* checkTimer; } STaskRecheckInfo; static int32_t streamSetParamForScanHistory(SStreamTask* pTask); @@ -197,6 +198,8 @@ static STaskRecheckInfo* createRecheckInfo(SStreamTask* pTask, const SStreamTask static void destroyRecheckInfo(STaskRecheckInfo* pInfo) { if (pInfo != NULL) { + taosTmrStop(pInfo->checkTimer); + pInfo->checkTimer = NULL; taosMemoryFree(pInfo); } } @@ -349,18 +352,13 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs "again, roll-back needed", id, pRsp->oldStage, (int32_t)pTask->pMeta->stage); } else { - STaskTimer* pTmr = pTask->pTimer; STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref); - if (pTmr->checkTimer != NULL) { - taosTmrReset(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer, &pTmr->checkTimer); - } else { - pTmr->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer); - } + taosTmrReset(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer, &pInfo->checkTimer); } } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 30189ad185..4925eea262 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -323,10 +323,6 @@ void tFreeStreamTask(SStreamTask* pTask) { pTask->pTimer->dispatchTimer = NULL; } - if (pTask->pTimer->checkTimer != NULL) { - taosTmrStop(pTask->pTimer->checkTimer); - pTask->pTimer->checkTimer = NULL; - } taosMemoryFreeClear(pTask->pTimer); }