refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-09-24 01:55:59 +08:00
parent 52763ca2a3
commit 477f57aa0e
1 changed files with 5 additions and 5 deletions

View File

@ -348,13 +348,13 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
"again, roll-back needed",
id, pRsp->oldStage, (int32_t)pTask->pMeta->stage);
} else {
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage);
STaskTimer* pTmr = pTask->pTimer;
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
atomic_add_fetch_8(&pTask->status.timerActive, 1);
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref);
if (pTmr->checkTimer != NULL) {
taosTmrReset(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer, &pTmr->checkTimer);
} else {
@ -677,7 +677,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
} else {
int32_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active
ASSERT(ref == 1);
stDebug("s-task:%s set timer active flag", pTask->id.idStr);
stDebug("s-task:%s set timer active flag, ref:%d", pTask->id.idStr, ref);
}
} else { // timer exists
ASSERT(pTask->status.timerActive == 1);