diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 6af6ebd044..bafd354360 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -304,8 +304,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) // use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already. stDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, msgLen, ver, total, size + SIZE_IN_MiB(msgLen)); - } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || - type == STREAM_INPUT__REF_DATA_BLOCK) { + } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__REF_DATA_BLOCK) { if (streamQueueIsFull(pTask->inputq.queue)) { double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); @@ -324,7 +323,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size); } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || - type == STREAM_INPUT__TRANS_STATE) { + type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE) { int32_t code = taosWriteQitem(pQueue, pItem); if (code != TSDB_CODE_SUCCESS) { streamFreeQitem(pItem);