fix(stream): set correct msg size;

This commit is contained in:
Haojun Liao 2023-05-06 20:03:07 +08:00
parent 9392c03b7f
commit c141abfbb8
2 changed files with 4 additions and 3 deletions

View File

@ -305,8 +305,9 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) &&
(numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) {
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, abort", pTask->id.idStr,
STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE);
qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(size:%d, num:%.2fMiB) abort", pTask->id.idStr,
STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE,
numOfBlocks, size);
streamDataSubmitDestroy(pSubmitBlock);
return -1;
}

View File

@ -68,7 +68,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
}
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) {
SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0);
SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, submit.msgLen);
if (pDataSubmit == NULL) {
return NULL;
}