diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 9a30bc7727..c15e014993 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -305,8 +305,9 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { - qError("s-task:%s input queue is full, capacity:%d size:%d MiB, abort", pTask->id.idStr, - STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE); + qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(size:%d, num:%.2fMiB) abort", pTask->id.idStr, + STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, + numOfBlocks, size); streamDataSubmitDestroy(pSubmitBlock); return -1; } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index ae616260f3..d10928cadc 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -68,7 +68,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock } SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) { - SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0); + SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, submit.msgLen); if (pDataSubmit == NULL) { return NULL; }