fix(stream): fix memory leak for stream processing.

This commit is contained in:
Haojun Liao 2023-05-17 19:40:21 +08:00
parent 73b3b3fc37
commit 4d5bd2b6de
1 changed files with 1 additions and 0 deletions

View File

@ -297,6 +297,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && total > STREAM_TASK_INPUT_QUEUEU_CAPACITY) { if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && total > STREAM_TASK_INPUT_QUEUEU_CAPACITY) {
qError("s-task:%s input queue is full, capacity:%d, abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY); qError("s-task:%s input queue is full, capacity:%d, abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY);
streamDataSubmitDestroy(pSubmitBlock); streamDataSubmitDestroy(pSubmitBlock);
taosFreeQitem(pSubmitBlock);
return -1; return -1;
} }