From 78a5680cf492fcd0515137aea947f0a2624569dc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 12 Sep 2023 15:59:09 +0800 Subject: [PATCH] fix(stream): add some logs. --- source/libs/stream/src/stream.c | 11 +++++++++-- source/libs/stream/src/streamQueue.c | 10 +++++----- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 32b36f8848..75ac86b462 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -223,9 +223,16 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock destroyStreamDataBlock(pBlock); } else { ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH); - code = taosWriteQitem(pTask->outputInfo.queue->pQueue, pBlock); + STaosQueue* pQueue = pTask->outputInfo.queue->pQueue; + code = taosWriteQitem(pQueue, pBlock); + + int32_t total = taosQueueItemSize(pQueue); + double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); if (code != 0) { - qError("s-task:%s failed to put res into outputQ", pTask->id.idStr); + qError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost", + pTask->id.idStr, total, size, tstrerror(code)); + } else { + qInfo("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size); } streamDispatchStreamBlock(pTask); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index bd64e0779a..510c8dc9b1 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -274,9 +274,9 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) if (type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue)) { -// qError( -// "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", -// pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); + qTrace( + "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", + pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); streamDataSubmitDestroy(px); taosFreeQitem(pItem); return -1; @@ -298,8 +298,8 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { if (streamQueueIsFull(pQueue)) { -// qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", -// pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); + qTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", + pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); destroyStreamDataBlock((SStreamDataBlock*)pItem); return -1; }