diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index cf04bcc1b8..ee3026806b 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -391,8 +391,8 @@ static void doRetryDispatchData(void* param, void* tmrId) { SStreamTask* pTask = param; if (streamTaskShouldStop(&pTask->status)) { - atomic_sub_fetch_8(&pTask->status.timerActive, 1); - qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr); + int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); + qDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); return; } @@ -409,17 +409,22 @@ static void doRetryDispatchData(void* param, void* tmrId) { streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); } } else { - atomic_sub_fetch_8(&pTask->status.timerActive, 1); - qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr); + int32_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); + qDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); } } else { - atomic_sub_fetch_8(&pTask->status.timerActive, 1); + int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); + qDebug("s-task:%s send success, jump out of timer, ref:%d", pTask->id.idStr, ref); } } void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) { - qError("s-task:%s dispatch data in %" PRId64 "ms", pTask->id.idStr, waitDuration); - taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer); + qWarn("s-task:%s dispatch data in %" PRId64 "ms, in timer", pTask->id.idStr, waitDuration); + if (pTask->launchTaskTimer != NULL) { + taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer); + } else { + pTask->launchTaskTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer); + } } int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz, @@ -540,8 +545,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry - qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms", - pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS); + int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); + + qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d", + pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref); streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); break; } @@ -997,8 +1004,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time - qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data", - id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS); + int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); + qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data, ref:%d", + id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, ref); streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); } else { // pipeline send data in output queue // this message has been sent successfully, let's try next one.