From bc779edda943c78b168155da1bcd20c57553b74a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 16 Nov 2023 17:34:47 +0800 Subject: [PATCH] fix(stream): fix bug in delay send. --- source/libs/stream/src/streamDispatch.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index b3e3cb554d..42280b0d0f 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1061,7 +1061,11 @@ static void dispatchDataInFuture(void* param, void* tmrId) { // this message has been sent successfully, let's try next one. static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) { destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask)); + + bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); + pTask->msgInfo.pData = NULL; + pTask->msgInfo.dispatchMsgType = 0; int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs; @@ -1078,8 +1082,7 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); // otherwise, continue dispatch the first block to down stream task in pipeline - ETaskStatus status = streamTaskGetStatus(pTask, NULL); - if (status == TASK_STATUS__CK) { + if (delayDispatch) { int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s in checkpoint status, add in timer, try dispatch data in 500ms, ref:%d", pTask->id.idStr, ref); if (pTask->msgInfo.pTimer != NULL) {