diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 80927b36b9..f6ec6e9fdb 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -129,6 +129,7 @@ SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) { void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) { ASSERT(pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT); taosMemoryFree(pDataSubmit->submit.msgStr); + taosFreeQitem(pDataSubmit); } SStreamMergedSubmit* streamMergedSubmitNew() { @@ -208,12 +209,10 @@ void streamFreeQitem(SStreamQueueItem* data) { if (type == STREAM_INPUT__GET_RES) { blockDataDestroy(((SStreamTrigger*)data)->pBlock); taosFreeQitem(data); - } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__TRANS_STATE) { - taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes); - taosFreeQitem(data); + } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) { + destroyStreamDataBlock((SStreamDataBlock*)data); } else if (type == STREAM_INPUT__DATA_SUBMIT) { streamDataSubmitDestroy((SStreamDataSubmit*)data); - taosFreeQitem(data); } else if (type == STREAM_INPUT__MERGED_SUBMIT) { SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data; @@ -228,7 +227,7 @@ void streamFreeQitem(SStreamQueueItem* data) { SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data; blockDataDestroy(pRefBlock->pBlock); taosFreeQitem(pRefBlock); - } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__TRANS_STATE) { SStreamDataBlock* pBlock = (SStreamDataBlock*) data; taosArrayDestroyEx(pBlock->blocks, freeItems); taosFreeQitem(pBlock); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 63ee702ada..556de169b4 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -270,7 +270,6 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size); streamDataSubmitDestroy(px); - taosFreeQitem(pItem); return -1; } @@ -280,7 +279,6 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) int32_t code = taosWriteQitem(pQueue, pItem); if (code != TSDB_CODE_SUCCESS) { streamDataSubmitDestroy(px); - taosFreeQitem(pItem); return code; } @@ -296,13 +294,13 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size); - destroyStreamDataBlock((SStreamDataBlock*)pItem); + streamFreeQitem(pItem); return -1; } int32_t code = taosWriteQitem(pQueue, pItem); if (code != TSDB_CODE_SUCCESS) { - destroyStreamDataBlock((SStreamDataBlock*)pItem); + streamFreeQitem(pItem); return code; } @@ -312,7 +310,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) type == STREAM_INPUT__TRANS_STATE) { int32_t code = taosWriteQitem(pQueue, pItem); if (code != TSDB_CODE_SUCCESS) { - taosFreeQitem(pItem); + streamFreeQitem(pItem); return code; } @@ -323,7 +321,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) // use the default memory limit, refactor later. int32_t code = taosWriteQitem(pQueue, pItem); if (code != TSDB_CODE_SUCCESS) { - taosFreeQitem(pItem); + streamFreeQitem(pItem); return code; } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index e672b256da..da4aa02e9c 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -562,7 +562,6 @@ int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) { taosMemoryFree(pBlock); if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) { - taosFreeQitem(pTranstate); return TSDB_CODE_OUT_OF_MEMORY; }