From d026e7ef16a88f7d8a3cbd87f7671a1f23be90f4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 7 May 2023 22:55:49 +0800 Subject: [PATCH] refactor: update some logs. --- source/libs/stream/src/stream.c | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index c15e014993..d50e01742f 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -305,7 +305,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { - qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(size:%d, num:%.2fMiB) abort", pTask->id.idStr, + qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks, size); streamDataSubmitDestroy(pSubmitBlock); @@ -316,10 +316,13 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || - (numOfBlocks >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { - qError("s-task:%s input queue is full, capacity:%d size:%d MiB, abort", pTask->id.idStr, - STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE); + double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0; + + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && + (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { + qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", + pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks, + size); return -1; }