fix(stream): add new timer for each task.

This commit is contained in:
Haojun Liao 2023-09-24 02:25:53 +08:00
parent 7336228c16
commit 7167c3c5de
3 changed files with 5 additions and 12 deletions

View File

@ -58,7 +58,6 @@ struct STokenBucket {
struct STaskTimer {
void* hTaskLaunchTimer;
void* dispatchTimer;
void* checkTimer;
};
extern SStreamGlobalEnv streamEnv;

View File

@ -24,8 +24,9 @@ typedef struct SLaunchHTaskInfo {
} SLaunchHTaskInfo;
typedef struct STaskRecheckInfo {
SStreamTask* pTask;
SStreamTask* pTask;
SStreamTaskCheckReq req;
void* checkTimer;
} STaskRecheckInfo;
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
@ -197,6 +198,8 @@ static STaskRecheckInfo* createRecheckInfo(SStreamTask* pTask, const SStreamTask
static void destroyRecheckInfo(STaskRecheckInfo* pInfo) {
if (pInfo != NULL) {
taosTmrStop(pInfo->checkTimer);
pInfo->checkTimer = NULL;
taosMemoryFree(pInfo);
}
}
@ -349,18 +352,13 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
"again, roll-back needed",
id, pRsp->oldStage, (int32_t)pTask->pMeta->stage);
} else {
STaskTimer* pTmr = pTask->pTimer;
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref);
if (pTmr->checkTimer != NULL) {
taosTmrReset(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer, &pTmr->checkTimer);
} else {
pTmr->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer);
}
taosTmrReset(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer, &pInfo->checkTimer);
}
}

View File

@ -323,10 +323,6 @@ void tFreeStreamTask(SStreamTask* pTask) {
pTask->pTimer->dispatchTimer = NULL;
}
if (pTask->pTimer->checkTimer != NULL) {
taosTmrStop(pTask->pTimer->checkTimer);
pTask->pTimer->checkTimer = NULL;
}
taosMemoryFreeClear(pTask->pTimer);
}