fix(stream): add new timer for each task.

This commit is contained in:
Haojun Liao 2023-09-24 02:25:53 +08:00
parent b7dcd94c64
commit a74b462ffb
3 changed files with 5 additions and 12 deletions

View File

@ -58,7 +58,6 @@ struct STokenBucket {
struct STaskTimer { struct STaskTimer {
void* hTaskLaunchTimer; void* hTaskLaunchTimer;
void* dispatchTimer; void* dispatchTimer;
void* checkTimer;
}; };
extern SStreamGlobalEnv streamEnv; extern SStreamGlobalEnv streamEnv;

View File

@ -24,8 +24,9 @@ typedef struct SLaunchHTaskInfo {
} SLaunchHTaskInfo; } SLaunchHTaskInfo;
typedef struct STaskRecheckInfo { typedef struct STaskRecheckInfo {
SStreamTask* pTask; SStreamTask* pTask;
SStreamTaskCheckReq req; SStreamTaskCheckReq req;
void* checkTimer;
} STaskRecheckInfo; } STaskRecheckInfo;
static int32_t streamSetParamForScanHistory(SStreamTask* pTask); static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
@ -197,6 +198,8 @@ static STaskRecheckInfo* createRecheckInfo(SStreamTask* pTask, const SStreamTask
static void destroyRecheckInfo(STaskRecheckInfo* pInfo) { static void destroyRecheckInfo(STaskRecheckInfo* pInfo) {
if (pInfo != NULL) { if (pInfo != NULL) {
taosTmrStop(pInfo->checkTimer);
pInfo->checkTimer = NULL;
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
} }
} }
@ -349,18 +352,13 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
"again, roll-back needed", "again, roll-back needed",
id, pRsp->oldStage, (int32_t)pTask->pMeta->stage); id, pRsp->oldStage, (int32_t)pTask->pMeta->stage);
} else { } else {
STaskTimer* pTmr = pTask->pTimer;
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id, stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref); pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref);
if (pTmr->checkTimer != NULL) { taosTmrReset(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer, &pInfo->checkTimer);
taosTmrReset(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer, &pTmr->checkTimer);
} else {
pTmr->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer);
}
} }
} }

View File

@ -323,10 +323,6 @@ void tFreeStreamTask(SStreamTask* pTask) {
pTask->pTimer->dispatchTimer = NULL; pTask->pTimer->dispatchTimer = NULL;
} }
if (pTask->pTimer->checkTimer != NULL) {
taosTmrStop(pTask->pTimer->checkTimer);
pTask->pTimer->checkTimer = NULL;
}
taosMemoryFreeClear(pTask->pTimer); taosMemoryFreeClear(pTask->pTimer);
} }