From 3a75c21e66f1747024cdbdd4442c4079a1abf2e8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 24 Sep 2023 01:55:59 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/stream/src/streamRecover.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 7bc9898fb0..6b673ee1da 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -348,13 +348,13 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs "again, roll-back needed", id, pRsp->oldStage, (int32_t)pTask->pMeta->stage); } else { - stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms", id, - pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage); - STaskTimer* pTmr = pTask->pTimer; STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); - atomic_add_fetch_8(&pTask->status.timerActive, 1); + int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); + stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id, + pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref); + if (pTmr->checkTimer != NULL) { taosTmrReset(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer, &pTmr->checkTimer); } else { @@ -677,7 +677,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { } else { int32_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active ASSERT(ref == 1); - stDebug("s-task:%s set timer active flag", pTask->id.idStr); + stDebug("s-task:%s set timer active flag, ref:%d", pTask->id.idStr, ref); } } else { // timer exists ASSERT(pTask->status.timerActive == 1);