fix(stream): fix memory leak.
This commit is contained in:
parent
08f43b9d00
commit
14b9d920ba
|
@ -119,6 +119,7 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint
|
||||||
pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock));//pBlock;
|
pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock));//pBlock;
|
||||||
taosArrayPush(pChkpoint->blocks, pBlock);
|
taosArrayPush(pChkpoint->blocks, pBlock);
|
||||||
|
|
||||||
|
taosMemoryFree(pBlock);
|
||||||
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pChkpoint) < 0) {
|
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pChkpoint) < 0) {
|
||||||
taosFreeQitem(pChkpoint);
|
taosFreeQitem(pChkpoint);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -163,6 +164,8 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
|
|
||||||
// set the task status
|
// set the task status
|
||||||
pTask->checkpointingId = checkpointId;
|
pTask->checkpointingId = checkpointId;
|
||||||
|
|
||||||
|
// set task status
|
||||||
pTask->status.taskStatus = TASK_STATUS__CK;
|
pTask->status.taskStatus = TASK_STATUS__CK;
|
||||||
|
|
||||||
//todo fix race condition: set the status and append checkpoint block
|
//todo fix race condition: set the status and append checkpoint block
|
||||||
|
@ -211,6 +214,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
code = continueDispatchCheckpointBlock(pBlock, pTask);
|
code = continueDispatchCheckpointBlock(pBlock, pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue