refactor: do some internal refactor.
This commit is contained in:
parent
4ea737571d
commit
3a75c21e66
|
@ -348,13 +348,13 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
||||||
"again, roll-back needed",
|
"again, roll-back needed",
|
||||||
id, pRsp->oldStage, (int32_t)pTask->pMeta->stage);
|
id, pRsp->oldStage, (int32_t)pTask->pMeta->stage);
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms", id,
|
|
||||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage);
|
|
||||||
|
|
||||||
STaskTimer* pTmr = pTask->pTimer;
|
STaskTimer* pTmr = pTask->pTimer;
|
||||||
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
|
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
|
||||||
|
|
||||||
atomic_add_fetch_8(&pTask->status.timerActive, 1);
|
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
|
||||||
|
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id,
|
||||||
|
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref);
|
||||||
|
|
||||||
if (pTmr->checkTimer != NULL) {
|
if (pTmr->checkTimer != NULL) {
|
||||||
taosTmrReset(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer, &pTmr->checkTimer);
|
taosTmrReset(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer, &pTmr->checkTimer);
|
||||||
} else {
|
} else {
|
||||||
|
@ -677,7 +677,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
||||||
} else {
|
} else {
|
||||||
int32_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active
|
int32_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active
|
||||||
ASSERT(ref == 1);
|
ASSERT(ref == 1);
|
||||||
stDebug("s-task:%s set timer active flag", pTask->id.idStr);
|
stDebug("s-task:%s set timer active flag, ref:%d", pTask->id.idStr, ref);
|
||||||
}
|
}
|
||||||
} else { // timer exists
|
} else { // timer exists
|
||||||
ASSERT(pTask->status.timerActive == 1);
|
ASSERT(pTask->status.timerActive == 1);
|
||||||
|
|
Loading…
Reference in New Issue