diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index e980b64247..a66b85116f 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -496,8 +496,9 @@ static void doRetryDispatchData(void* param, void* tmrId) { } void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) { - stWarn("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d", pTask->id.idStr, waitDuration, - pTask->taskExecInfo.dispatchCount); + pTask->msgInfo.retryCount++; + stWarn("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d, retryTimes:%d", pTask->id.idStr, waitDuration, + pTask->taskExecInfo.dispatchCount, pTask->msgInfo.retryCount); if (pTask->launchTaskTimer != NULL) { taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer); @@ -606,6 +607,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { int32_t retryCount = 0; pTask->taskExecInfo.dispatchCount += 1; + pTask->msgInfo.startTs = taosGetTimestampMs(); int32_t code = doBuildDispatchMsg(pTask, pBlock); if (code == 0) { @@ -1038,17 +1040,12 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask)); pTask->msgInfo.pData = NULL; - if (pTask->msgInfo.startTs != 0) { - int64_t now = taosGetTimestampMs(); + int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs; + stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", + pTask->id.idStr, downstreamId, el); - int64_t el = now - pTask->msgInfo.startTs; - stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", - pTask->id.idStr, downstreamId, el); - - pTask->msgInfo.startTs = now; - // put data into inputQ of current task is also allowed - pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; - } + // put data into inputQ of current task is also allowed + pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; // now ready for next data output atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL); @@ -1137,10 +1134,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, numOfFailed); } - pTask->msgInfo.retryCount++; int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); - stDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d", - pTask->id.idStr, pTask->msgInfo.retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref); + stDebug("s-task:%s failed to dispatch msg to downstream code:%s, add timer to retry in %dms, ref:%d", + pTask->id.idStr, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref); streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); } else { // this message has been sent successfully, let's try next one.