diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 6bb15dfd23..42280b0d0f 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1039,10 +1039,33 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { return 0; } +static void dispatchDataInFuture(void* param, void* tmrId) { + SStreamTask* pTask = param; + if (streamTaskShouldStop(pTask)) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); + return; + } + + ETaskStatus status = streamTaskGetStatus(pTask, NULL); + if (status == TASK_STATUS__CK) { + stDebug("s-task:%s in checkpoint status, wait for 500ms to dispatch data downstream", pTask->id.idStr); + taosTmrReset(dispatchDataInFuture, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer); + } else { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s start to dispatch data, jump out of timer, ref:%d", pTask->id.idStr, ref); + streamDispatchStreamBlock(pTask); + } +} + // this message has been sent successfully, let's try next one. static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) { destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask)); + + bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); + pTask->msgInfo.pData = NULL; + pTask->msgInfo.dispatchMsgType = 0; int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs; @@ -1059,7 +1082,18 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); // otherwise, continue dispatch the first block to down stream task in pipeline - streamDispatchStreamBlock(pTask); + if (delayDispatch) { + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s in checkpoint status, add in timer, try dispatch data in 500ms, ref:%d", pTask->id.idStr, ref); + if (pTask->msgInfo.pTimer != NULL) { + taosTmrReset(dispatchDataInFuture, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer); + } else { + pTask->msgInfo.pTimer = taosTmrStart(dispatchDataInFuture, 500, pTask, streamEnv.timer); + } + } else { + streamDispatchStreamBlock(pTask); + } + return 0; }