From 31ac1e3eaa801bbc778f1897fcd73c78dccae458 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 8 May 2023 00:00:55 +0800 Subject: [PATCH] fix(stream): revise the input queue capacity. --- source/libs/stream/src/stream.c | 4 ++-- source/libs/stream/src/streamData.c | 7 ++++++- source/libs/stream/src/streamExec.c | 5 ++--- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index d50e01742f..9a39bb09dc 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -17,7 +17,7 @@ #include "ttimer.h" #define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480 -#define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (200) +#define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (100) int32_t streamInit() { int8_t old; @@ -299,7 +299,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0; - qDebug("s-task:%s submit enqueue %p %p msgLen:%dB ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, + qDebug("s-task:%s submit enqueue %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, pItem, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen, pSubmitBlock->submit.ver, numOfBlocks, size); diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index d10928cadc..7fb35ad2ad 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -128,7 +128,12 @@ static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit2* pDataSubmit) } SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit) { - SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0); + int32_t len = 0; + if (pSubmit->type == STREAM_INPUT__DATA_SUBMIT) { + len = pSubmit->submit.msgLen; + } + + SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, len); if (pSubmitClone == NULL) { return NULL; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c1d25da86d..93ee3916ba 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -173,8 +173,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { batchCnt++; - qDebug("s-task:%s scan exec block num %d, block limit %d", pTask->id.idStr, batchCnt, batchSz); - + qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz); if (batchCnt >= batchSz) { break; } @@ -207,7 +206,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { } if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { - qDebug("task %d scan exec dispatch block num %d", pTask->id.taskId, batchCnt); + qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt); streamDispatch(pTask); }