fix(stream): fix memory leak.

This commit is contained in:
Haojun Liao 2024-04-23 19:48:50 +08:00
parent ace5e12e65
commit 82cde46614
2 changed files with 2 additions and 1 deletions

View File

@ -190,6 +190,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code));
streamFreeQitem((SStreamQueueItem*)pBlock);
return code;
}
}

View File

@ -46,6 +46,7 @@ static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBl
ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH);
code = streamTaskPutDataIntoOutputQ(pTask, pBlock);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamDataBlock(pBlock);
return code;
}
@ -76,7 +77,6 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks);
if (code != TSDB_CODE_SUCCESS) { // back pressure and record position
destroyStreamDataBlock(pStreamBlocks);
return code;
}