From 5564eb215c5982f65e70c7a18a89c1c27717ef8f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 3 Oct 2023 00:38:24 +0800 Subject: [PATCH] fix(stream): fix invalid check. --- source/dnode/vnode/src/tq/tqStreamTask.c | 2 +- source/libs/stream/src/streamDispatch.c | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 069cc4cbbd..2040f8e323 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -370,7 +370,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - if (streamQueueGetNumOfItems(pTask->inputInfo.queue)) { + if (streamQueueIsFull(pTask->inputInfo.queue, true)) { tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr); streamMetaReleaseTask(pStreamMeta, pTask); continue; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 2d701d6bb0..5b76354dff 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1128,7 +1128,6 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // all msg rsp already, continue if (leftRsp == 0) { ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT); - stDebug("s-task:%s output status is set to:%d", id, pTask->outputInfo.status); // we need to re-try send dispatch msg to downstream tasks int32_t numOfFailed = taosArrayGetSize(pTask->msgInfo.pRetryList);