fix(stream): revise the input queue capacity.

This commit is contained in:
Haojun Liao 2023-05-08 00:00:55 +08:00
parent d026e7ef16
commit 31ac1e3eaa
3 changed files with 10 additions and 6 deletions

View File

@ -17,7 +17,7 @@
#include "ttimer.h"
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (200)
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (100)
int32_t streamInit() {
int8_t old;
@ -299,7 +299,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0;
qDebug("s-task:%s submit enqueue %p %p msgLen:%dB ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
qDebug("s-task:%s submit enqueue %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
pItem, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen,
pSubmitBlock->submit.ver, numOfBlocks, size);

View File

@ -128,7 +128,12 @@ static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit2* pDataSubmit)
}
SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit) {
SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0);
int32_t len = 0;
if (pSubmit->type == STREAM_INPUT__DATA_SUBMIT) {
len = pSubmit->submit.msgLen;
}
SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, len);
if (pSubmitClone == NULL) {
return NULL;
}

View File

@ -173,8 +173,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
batchCnt++;
qDebug("s-task:%s scan exec block num %d, block limit %d", pTask->id.idStr, batchCnt, batchSz);
qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz);
if (batchCnt >= batchSz) {
break;
}
@ -207,7 +206,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
}
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
qDebug("task %d scan exec dispatch block num %d", pTask->id.taskId, batchCnt);
qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt);
streamDispatch(pTask);
}