fix(stream): fix the invalid free.

This commit is contained in:
Haojun Liao 2023-11-09 18:14:27 +08:00
parent 97772e9aab
commit e1de1de421
3 changed files with 8 additions and 12 deletions

View File

@ -129,6 +129,7 @@ SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) {
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) { void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) {
ASSERT(pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT); ASSERT(pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT);
taosMemoryFree(pDataSubmit->submit.msgStr); taosMemoryFree(pDataSubmit->submit.msgStr);
taosFreeQitem(pDataSubmit);
} }
SStreamMergedSubmit* streamMergedSubmitNew() { SStreamMergedSubmit* streamMergedSubmitNew() {
@ -208,12 +209,10 @@ void streamFreeQitem(SStreamQueueItem* data) {
if (type == STREAM_INPUT__GET_RES) { if (type == STREAM_INPUT__GET_RES) {
blockDataDestroy(((SStreamTrigger*)data)->pBlock); blockDataDestroy(((SStreamTrigger*)data)->pBlock);
taosFreeQitem(data); taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__TRANS_STATE) { } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes); destroyStreamDataBlock((SStreamDataBlock*)data);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_SUBMIT) { } else if (type == STREAM_INPUT__DATA_SUBMIT) {
streamDataSubmitDestroy((SStreamDataSubmit*)data); streamDataSubmitDestroy((SStreamDataSubmit*)data);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__MERGED_SUBMIT) { } else if (type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data; SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data;
@ -228,7 +227,7 @@ void streamFreeQitem(SStreamQueueItem* data) {
SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data; SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data;
blockDataDestroy(pRefBlock->pBlock); blockDataDestroy(pRefBlock->pBlock);
taosFreeQitem(pRefBlock); taosFreeQitem(pRefBlock);
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER) { } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__TRANS_STATE) {
SStreamDataBlock* pBlock = (SStreamDataBlock*) data; SStreamDataBlock* pBlock = (SStreamDataBlock*) data;
taosArrayDestroyEx(pBlock->blocks, freeItems); taosArrayDestroyEx(pBlock->blocks, freeItems);
taosFreeQitem(pBlock); taosFreeQitem(pBlock);

View File

@ -270,7 +270,6 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size); pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
streamDataSubmitDestroy(px); streamDataSubmitDestroy(px);
taosFreeQitem(pItem);
return -1; return -1;
} }
@ -280,7 +279,6 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
int32_t code = taosWriteQitem(pQueue, pItem); int32_t code = taosWriteQitem(pQueue, pItem);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
streamDataSubmitDestroy(px); streamDataSubmitDestroy(px);
taosFreeQitem(pItem);
return code; return code;
} }
@ -296,13 +294,13 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size); pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
destroyStreamDataBlock((SStreamDataBlock*)pItem); streamFreeQitem(pItem);
return -1; return -1;
} }
int32_t code = taosWriteQitem(pQueue, pItem); int32_t code = taosWriteQitem(pQueue, pItem);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
destroyStreamDataBlock((SStreamDataBlock*)pItem); streamFreeQitem(pItem);
return code; return code;
} }
@ -312,7 +310,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
type == STREAM_INPUT__TRANS_STATE) { type == STREAM_INPUT__TRANS_STATE) {
int32_t code = taosWriteQitem(pQueue, pItem); int32_t code = taosWriteQitem(pQueue, pItem);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosFreeQitem(pItem); streamFreeQitem(pItem);
return code; return code;
} }
@ -323,7 +321,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
// use the default memory limit, refactor later. // use the default memory limit, refactor later.
int32_t code = taosWriteQitem(pQueue, pItem); int32_t code = taosWriteQitem(pQueue, pItem);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosFreeQitem(pItem); streamFreeQitem(pItem);
return code; return code;
} }

View File

@ -562,7 +562,6 @@ int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
taosMemoryFree(pBlock); taosMemoryFree(pBlock);
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) { if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) {
taosFreeQitem(pTranstate);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }