diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 02bb65b762..b47288bf45 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -189,7 +189,7 @@ int32_t streamInit(); void streamCleanUp(); SStreamQueue* streamQueueOpen(int64_t cap); -void streamQueueClose(SStreamQueue* queue); +void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) { ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index aaf9fdec72..22e09693c8 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -35,18 +35,17 @@ FAIL: return NULL; } -void streamQueueClose(SStreamQueue* queue) { - while (1) { - void* qItem = streamQueueNextItem(queue); - if (qItem) { - streamFreeQitem(qItem); - } else { - break; - } +void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) { + qDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->queue, taosQueueItemSize(pQueue->queue)); + + void* qItem = NULL; + while ((qItem = streamQueueNextItem(pQueue)) != NULL) { + streamFreeQitem(qItem); } - taosFreeQall(queue->qall); - taosCloseQueue(queue->queue); - taosMemoryFree(queue); + + taosFreeQall(pQueue->qall); + taosCloseQueue(pQueue->queue); + taosMemoryFree(pQueue); } #if 0 diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 9056fa8d93..eda9c1f2bb 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -220,11 +220,11 @@ void tFreeStreamTask(SStreamTask* pTask) { int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); if (pTask->inputQueue) { - streamQueueClose(pTask->inputQueue); + streamQueueClose(pTask->inputQueue, pTask->id.taskId); } if (pTask->outputInfo.queue) { - streamQueueClose(pTask->outputInfo.queue); + streamQueueClose(pTask->outputInfo.queue, pTask->id.taskId); } if (pTask->exec.qmsg) { @@ -255,6 +255,11 @@ void tFreeStreamTask(SStreamTask* pTask) { streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING); } + if (pTask->msgInfo.pData != NULL) { + destroyStreamDataBlock(pTask->msgInfo.pData); + pTask->msgInfo.pData = NULL; + } + if (pTask->id.idStr != NULL) { taosMemoryFree((void*)pTask->id.idStr); }